栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

第五届全国工业互联网数据创新应用大赛 机组数据驱动的风电场短期风况预测 【4】【数据管理】

第五届全国工业互联网数据创新应用大赛 机组数据驱动的风电场短期风况预测 【4】【数据管理】

声明
  1. 系列博客不会出现任何真实数据
  2. 比赛期间将拒绝一切私信,有问题请发评论

系列文章目录

第五届全国工业互联网数据创新应用大赛 机组数据驱动的风电场短期风况预测 【0】【风况预测评分规则-最终得分R的计算】【已弃用】
第五届全国工业互联网数据创新应用大赛 机组数据驱动的风电场短期风况预测 【1】【验证集制作】【已弃用】
第五届全国工业互联网数据创新应用大赛 机组数据驱动的风电场短期风况预测 【2】【验证集使用】【已弃用】
第五届全国工业互联网数据创新应用大赛 机组数据驱动的风电场短期风况预测 【3】【计算最终得分】
第五届全国工业互联网数据创新应用大赛 机组数据驱动的风电场短期风况预测 【4】【数据管理】
第五届全国工业互联网数据创新应用大赛 机组数据驱动的风电场短期风况预测 【5】【数据可视化】【测试集_初赛】
第五届全国工业互联网数据创新应用大赛 机组数据驱动的风电场短期风况预测 【6】【数据可视化】【训练集】
第五届全国工业互联网数据创新应用大赛 机组数据驱动的风电场短期风况预测 【7】【数据管理】【验证集本地化】
第五届全国工业互联网数据创新应用大赛 机组数据驱动的风电场短期风况预测 【8】【数据可视化】【验证集】
第五届全国工业互联网数据创新应用大赛 机组数据驱动的风电场短期风况预测 【9】【数据可视化】【气象数据】


文章目录
  • 声明
  • 系列文章目录
  • 更新说明
  • 前言
  • 一、必要的准备
  • 二、设计思路
    • 1. 参考测试集
    • 2. 参考评分规则计算过程
  • 三、使用帮助
    • 1. 重要基础
    • 2. 数据总览
    • 3. 数据访问(以测试集为例)
      • 3.1 遍历访问风场
      • 3.2 遍历访问时段
      • 3.3 遍历访问风机
      • 3.4 访问指定风机(以 风场2 秋_13 风机x37 为例)
    • 4. 数据集访问
      • 4.1 单位数据集的概念
      • 4.2 根据单位数据集的概念划分训练集和验证集
        • 4.2.1 generate_indexes
        • 4.2.2 get_indexes
      • 4.3 根据索引访问训练集
      • 4.4 根据索引访问验证集
      • 4.5 访问测试集
  • 四、`main.py`


更新说明


前言

这个系列弃用【1】【验证集制作】和【2】【验证集使用】最大的原因就是数据管理方式出现了极大的变化。我旧版的数据管理相当糟糕,基本上数据全存在字典里,访问数据的时候大概是这样子的

dataset['风场1']['x26']

这种访问数据的方式在跑模型的时候,模型的训练速度真的慢的一批,一查发现CPU利用率远高于GPU,训练速度的瓶颈基本上就在数据读取上了。所以我一气之下写了个新的数据管理代码,数据全存在numpy数组里,这个数据访问速度的瓶颈一下子就解决了。


一、必要的准备
  1. 你需要把mian.py和train放在同一目录下,然后运行以下代码
from main import *
dm = Data_Manager()


2. 如果检测到以下任一个文件不存在,会自动开始处理

3. 这终究只是一个数据加载和管理器,特征工程别忘了做。
# 然并卵
np.nan_to_num(dm.X_, copy=False)
np.nan_to_num(dm.X0, copy=False)
np.nan_to_num(dm.Y0, copy=False)
np.nan_to_num(dm.S, copy=False)
np.nan_to_num(dm.W, copy=False)
np.nan_to_num(dm.test_X_, copy=False)
np.nan_to_num(dm.test_X0, copy=False)
np.nan_to_num(dm.test_W, copy=False)
二、设计思路 1. 参考测试集

  • 【3】【计算最终得分】中有对数据进行简单的介绍,未介绍的部分接下来会补充,已介绍的这里就不再赘述了
  • 数据分两种
    • 机舱数据
      • 特征:‘变频器电网侧有功功率’, ‘外界温度’, ‘风速’, ‘风向’
      • 时间分辨率:30秒
      • 已进行0-1归一化
    • 气象数据
      • 特征:‘风速’, ‘风向’
      • 时间分辨率:3600秒,也就是1小时
      • 拿到数据我直接懵逼,我的天怎么不顺便归一化一下
  • 测试集每个时段提供的数据有
    • 13个小时的气象数据
      • 共14个时刻,时间编码为 -11 ~ 2
    • 1小时的机舱数据
      • 共120个时刻,对应气象数据数据编码的 0 ~ 1
      • 图中标明的测试样本区间
  • 测试集需要预测的数据
    • 10分钟的机舱数据
      • 共20个时刻
      • 图中标明的预测区间

既然如此,我们的训练集每个时段也需要准备13个小时的气象数据和1小时的机舱数据做 X ,10分钟的机舱数据做 Y 。

2. 参考评分规则计算过程

【3】【计算最终得分】

最后计算的是风场,其次是时段,再往前是风机,再再往前是时刻,那就按这个顺序呗。。

[风场, 时段, 风机, 时刻, 特征]

还有一个很重要的因素,气象数据是全部风机共用的,所以如果时段安排的维度比风机更深的话有可能会导致气象数据不统一。

三、使用帮助 1. 重要基础






接下来这些代码你将会经常见到

# for f in range(field_len)
# for m in range(machine_len)
# for s in range(season_len)
# for p in range(period_len)

这就是我遍历数据集的方式,f, m, s, p 变量名取得是首字母,值就是索引。

2. 数据总览
"""
               [风场, 时段, 风机, 时刻, 特征]
self.X_ : shape: (2, 17507, 25, 120, 2)    机舱数据,用作训练;特征 ['变频器电网侧有功功率', '外界温度']
self.X0 : shape: (2, 17507, 25, 120, 2)    机舱数据,用作训练;特征 ['风速', '风向']
self.Y0 : shape: (2, 17507, 25,  20, 2)    机舱数据,用作训练;特征 ['风速', '风向']

               [风场, 时段]
self.S  : shape: (2, 17507)                季节数据,用作训练

               [风场, 时段, 时刻, 特征]
self.W  : shape: (2, 17507, 14, 2)         气象数据,用作训练;特征 ['风速', '风向']

                    [风场, 时段, 风机, 特征]
self.weather : shape: (2, 17520, 26, 2)    机舱数据+气象数据;特征 ['风速', '风向']

                    [风场, 时段, 风机, 时刻, 特征]
self.test_X_ : shape: (2, 80, 25, 120, 2)       机舱数据,用作测试;特征 ['变频器电网侧有功功率', '外界温度']
self.test_X0 : shape: (2, 80, 25, 120, 2)       机舱数据,用作测试;特征 ['风速', '风向']

                    [风场, 时段, 时刻, 特征]
self.test_W  : shape: (2, 80, 14, 2)            气象数据,用作测试;特征 ['风速', '风向']
"""
3. 数据访问(以测试集为例)
from main import *
dm = Data_Manager()
dm.load_test_data()

3.1 遍历访问风场
for f in range(field_len):
    print(field[f], dm.test_X0[f].shape)

3.2 遍历访问时段
for f in range(field_len):
    for p in range(period_len):
        print(field[f], period[p], dm.test_X0[f, p].shape)


图片太长放不下

3.3 遍历访问风机

注:machine是2行25列,行代表风场,列代表风机

for f in range(field_len):
    for p in range(period_len):
        for m in range(machine_len):
            print(field[f], period[p], machine[f][m], dm.test_X0[f, p, m].shape)

3.4 访问指定风机(以 风场2 秋_13 风机x37 为例)
# 希望访问 风场2 秋_13 风机x37

f = np.where(field == '风场2')[0][0]
p = np.where(period == '秋_13')[0][0]
m = np.where(machine[f] == 'x37')[0][0]

print(field[f], period[p], machine[f][m], dm.test_X0[f, p, m].shape)

4. 数据集访问 4.1 单位数据集的概念

计算最终得分 R 需要2个风场,每个风场25台风机,每台风机80个时段,每个时段20行数据,每行数据的特征数为2,这是 Y ,那么其对应 X 的 shape 应为 (2, 80, 25, 120, 2),这个X称为单位数据集。比如一个完整的测试集,就是一个单位数据集。

4.2 根据单位数据集的概念划分训练集和验证集

取数据只需要通过索引就够了,不需要另外消耗内存空间

Data_Manager提供两个方法来实现通过索引划分训练集和验证集。

4.2.1 generate_indexes

顾名思义,生成索引,首先,指定需要生成验证集的数量,然后该方法会将足够数量的索引分配给验证集,剩下的留给训练集

生成10个验证集

times = 10    # 生成10个验证集

dm.generate_indexes(times)
dm.train_indexes

dm.val_indexes

4.2.2 get_indexes

验证集部分已经按照要求为各个验证集分配索引,虽然训练集也是,但训练集还得再走一步才能处理成标准格式。

这样才是标准的训练集索引

4.3 根据索引访问训练集

对应的 X ['变频器电网侧有功功率', '外界温度'] 数据

for f in range(field_len):
    indexes = dm.get_indexes(f)
    print(field[f], dm.X_[f, indexes].shape)


对应的 X ['风速', '风向'] 数据

for f in range(field_len):
    indexes = dm.get_indexes(f)
    print(field[f], dm.X0[f, indexes].shape)


对应的 Y ['风速', '风向'] 数据

for f in range(field_len):
    indexes = dm.get_indexes(f)
    print(field[f], dm.Y0[f, indexes].shape)


对应的季节
为了配合测试集,时段分布也是均匀的(80段,每个季节各20段)

for f in range(field_len):
    indexes = dm.get_indexes(f)
    print(field[f], dm.S[f, indexes])


对应的气象数据

for f in range(field_len):
    indexes = dm.get_indexes(f)
    print(field[f], dm.W[f, indexes].shape)

4.4 根据索引访问验证集

只需要改两行代码

t = 1    # 第 t 个验证集,不要超过你设定的最大值
indexes = dm.val_indexes[field[f]][t]

只有索引不同而已,调用方式是一样的。

4.5 访问测试集

这东西本身就是单位数据集好吧,直接整个拿来用就好了。

for f in range(field_len):
    print(field[f], dm.test_X_[f].shape)

for f in range(field_len):
    print(field[f], dm.test_X0[f].shape)

for f in range(field_len):
    print(field[f], dm.test_W[f].shape)

四、main.py
import datetime
import itertools
import os
import shutil
from tqdm import tqdm

import matplotlib as mpl
import matplotlib.pyplot as plt
from mpl_toolkits import mplot3d
import numpy as np
import pandas as pd
from scipy import stats, integrate
import seaborn as sns

mpl.rcParams['font.sans-serif'] = ['SimHei']
mpl.rcParams['axes.unicode_minus'] = False

pd.set_option('display.float_format', lambda x:'%.7f'%x)

# ----------------------------------------------------------------------------------------------------
# Config
# ----------------------------------------------------------------------------------------------------

field = np.array([
    '风场1',
    '风场2'
])
field_len = field.shape[0]

machine = np.array([
    [f'x{i}' for i in range(26, 50+1)],
    [f'x{i}' for i in range(25, 49+1)]
])
machine_len = machine[0].shape[0]

season = np.array(['春', '夏', '秋', '冬'])
season_len = season.shape[0]

period = np.array([f'{s}_{str(i).zfill(2)}' for s in season for i in range(1, 20+1)])
period_len = period.shape[0]

# ----------------------------------------------------------------------------------------------------

# 各风场对应的年份
years = {
    field[0]: [2018, 2019],
    field[1]: [2017, 2018]
}

# 完整的时间序列
time_series = {field[f]: pd.Dataframe(data={'time': pd.date_range(datetime.datetime(years[field[f]][0], 1, 1, 0, 0, 0), datetime.datetime(years[field[f]][1], 12, 31, 23, 59, 30), freq='30S')}, dtype=str) for f in range(field_len)}

# ----------------------------------------------------------------------------------------------------
# Data Manager
# ----------------------------------------------------------------------------------------------------

def read_csv(path):
    return pd.read_csv(path, encoding='utf-8') if os.path.exists(path) else None

class Data_Manager:
    def __init__(self):
        self.root = '训练集'
        
        if self._check_file:
            print('正在合并同一台风机的数据并修复缺失的时间段')
            self._merge_data

        self._prepare_dataset

        self.sample = pd.Dataframe(
            data = np.array([
                [*x0, x1, x2, x3, x4] for x0, x1, x2, x3, x4 in 
                    itertools.product(
                        np.vstack([list(itertools.product([field[f]], machine[f])) for f in range(field_len)]).tolist(),    # '风场', '风机'
                        period,                     # '时段'
                        np.arange(1, 20+1) * 30,    # '时刻'
                        [None],                     # '风速'
                        [None]                      # '风向'
                    )
            ]),
            columns = ['风场', '风机', '时段', '时刻', '风速', '风向']
        )

        self.test_pred_df = self.sample.copy()

    @property
    def _prepare_dataset(self):

        """
                       [风场, 时段, 风机, 时刻, 特征]
        self.X_ : shape: (2, 17507, 25, 120, 2)    机舱数据,用作训练;特征 ['变频器电网侧有功功率', '外界温度']
        self.X0 : shape: (2, 17507, 25, 120, 2)    机舱数据,用作训练;特征 ['风速', '风向']
        self.Y0 : shape: (2, 17507, 25,  20, 2)    机舱数据,用作训练;特征 ['风速', '风向']
        
                       [风场, 时段]
        self.S  : shape: (2, 17507)                季节数据,用作训练

                       [风场, 时段, 时刻, 特征]
        self.W  : shape: (2, 17507, 14, 2)         气象数据,用作训练;特征 ['风速', '风向']

                            [风场, 时段, 风机, 特征]
        self.weather : shape: (2, 17520, 26, 2)    机舱数据+气象数据;特征 ['风速', '风向']
        """

        print(f'正在加载 {self.root}')

        '''
        部分数字说明
            730 * 24 - 13 : 730 天 <- 365 * 2; 24 小时; 13 个预留时间(小时),最前面预留11个小时,最后面预留2个小时
            17507 <- 730 * 24 - 13
            17520 <- 730 * 24
            120 : (1小时) 120个 (30秒)
            14 : 13个小时,14个时间点
        '''
        self.X_ = np.full((field_len, machine_len, 730 * 24 - 13, 120, 2), np.nan, dtype=np.float32)    # shape: (2, 25, 17507, 120, 2)
        self.X0 = np.full((field_len, machine_len, 730 * 24 - 13, 120, 2), np.nan, dtype=np.float32)    # shape: (2, 25, 17507, 120, 2)
        self.Y0 = np.full((field_len, machine_len, 730 * 24 - 13, 20, 2), np.nan, dtype=np.float32)      # shape: (2, 25, 17507, 20, 2)
        self.S = np.full((field_len, 730 * 24 - 13), np.nan, dtype=str)     # shape: (2, 17507)
        self.W = np.full((field_len, 730 * 24 - 13, 14, 2), np.nan, dtype=np.float32)    # shape: (2, 17507, 14, 2)

        # columns = ['time', 'wind_spd', 'wind_dir']
        weather = {field[f]: read_csv(os.path.join(self.root, field[f], 'weather.csv')) for f in range(field_len)}
        self.weather = {
            'time': {field[f]: weather[field[f]]['time'] for f in range(field_len)},
            'data': np.zeros((2, 26, 17520, 2), dtype=np.float32)
        }

        for f in range(field_len):

            self.weather['data'][f, 25, ...] = weather[field[f]][['wind_spd', 'wind_dir']].values

            for m in tqdm(range(machine_len)):
                datas = read_csv(os.path.join(self.root, field[f], machine[f][m]) + '.csv')
                
                # 11*120+1 : 正数第11个小时30秒开始
                # -2*120+1 : 倒数第2个小时结束(不包含最后2小时)
                # 每一个时段由每小时的第30秒开始第3600秒结束,共120个时刻
                self.X_[f, m, ...] = datas[11*120+1:-2*120+1][['变频器电网侧有功功率', '外界温度']].values.reshape(730 * 24 - 13, 120, 2)
                self.X0[f, m, ...] = datas[11*120+1:-2*120+1][['风速', '风向']].values.reshape(730 * 24 - 13, 120, 2)
                
                # 12*120+1 : 正数第12个小时30秒开始
                # -1*120+1 : 倒数第1个小时结束(不包含最后1小时)
                # 每一个时段由每小时的第30秒开始第600秒结束,共20个时刻
                self.Y0[f, m, ...] = datas[12*120+1:-1*120+1][['风速', '风向']].values.reshape(730 * 24 - 13, 120, 2)[:, :20, :]

                self.weather['data'][f, m, ...] = datas[::120][['风速', '风向']].values
            
            self.S[f] = time_series[field[f]]['time'].str.split(' ').str.get(0).str.split('-').map(lambda x: season[np.ceil(int(x[1]) / 3).astype(int) - 1])[11*120:-2*120].values.reshape(730 * 24 - 13, 120)[:, 0]
            
            for i in range(730 * 24 - 13):
                self.W[f, i, ...] = weather[field[f]][i:i+14][['wind_spd', 'wind_dir']].values

        self.X_ = np.moveaxis(self.X_, 1, 2)
        self.X0 = np.moveaxis(self.X0, 1, 2)

        self.Y0 = np.moveaxis(self.Y0, 1, 2)

        self.weather['data'] = np.moveaxis(self.weather['data'], 1, 2)

        # ---------- update ----------
        # self.n_indexes = (np.isnan(self.X0.reshape(2 * 17507, 25 * 120 * 2)).astype(np.float32).sum(-1) == 0).reshape(2, 17507) * (np.isnan(self.Y0.reshape(2 * 17507, 25 * 20 * 2)).astype(np.float32).sum(-1) == 0).reshape(2, 17507)
        # ---------- update ----------

    def load_test_data(self, test_findals = False):

        """
                            [风场, 时段, 风机, 时刻, 特征]
        self.test_X_ : shape: (2, 80, 25, 120, 2)       机舱数据,用作测试;特征 ['变频器电网侧有功功率', '外界温度']
        self.test_X0 : shape: (2, 80, 25, 120, 2)       机舱数据,用作测试;特征 ['风速', '风向']

                            [风场, 时段, 时刻, 特征]
        self.test_W  : shape: (2, 80, 14, 2)            气象数据,用作测试;特征 ['风速', '风向']
        """

        root = '测试集_决赛' if test_findals else '测试集_初赛'

        print(f'正在加载 {root}')

        self.test_X_ = np.full((field_len, machine_len, period_len, 120, 2), np.nan, dtype=np.float32)    # shape: (2, 25, 80, 120, 2)
        self.test_X0 = np.full((field_len, machine_len, period_len, 120, 2), np.nan, dtype=np.float32)    # shape: (2, 25, 80, 120, 2)
        self.test_W = np.full((field_len, period_len, 14, 2), np.nan, dtype=np.float32)    # shape: (2, 80, 14, 2)

        # columns = ['时段', '时刻', '风速', '风向']
        weather = {field[f]: read_csv(os.path.join(root, field[f], 'weather.csv')) for f in range(field_len)}

        for f in range(field_len):
            for m in tqdm(range(machine_len)):
                for p in range(period_len):
                    datas = read_csv(os.path.join(root, field[f], machine[f][m], period[p]) + '.csv')
                    if datas is not None:
                        self.test_X_[f, m, p, ...] = datas[['变频器电网侧有功功率', '外界温度']].values.reshape(120, 2)
                        self.test_X0[f, m, p, ...] = datas[['风速', '风向']].values.reshape(120, 2)
                    self.test_W[f, p, ...] = weather[field[f]].query(f"时段 == '{period[p]}'")[['风速', '风向']].values

        self.test_X_ = np.moveaxis(self.test_X_, 1, 2)
        self.test_X0 = np.moveaxis(self.test_X0, 1, 2)

    def generate_indexes(self, val_times=1):

        self.train_indexes = {field[f]: {season[s]: None for s in range(season_len)} for f in range(field_len)}
        self.val_indexes = {field[f]: {i: np.array([]) for i in range(val_times)} for f in range(field_len)}

        for f in range(field_len):
            for s in range(season_len):
                # ---------- update ----------
                # season_indexes = np.where((season[s] == self.S[f]) * self.n_indexes[f])[0]
                season_indexes = np.where(season[s] == self.S[f])[0]
                # ---------- update ----------

                train_indexes = np.random.choice(
                    season_indexes,
                    size = season_indexes.shape[0] - val_times * period_len // season_len,
                    replace=False
                )
                self.train_indexes[field[f]][season[s]] = train_indexes

                val_indexes = np.setdiff1d(season_indexes, train_indexes).reshape(val_times, period_len // season_len)
                for i in range(val_times):
                    self.val_indexes[field[f]][i] = np.hstack([self.val_indexes[field[f]][i], val_indexes[i]]).astype(int)

    def get_indexes(self, f):
        indexes = np.array([])
        for s in range(season_len):
            indexes = np.hstack([indexes, np.random.choice(self.train_indexes[field[f]][season[s]], size=20, replace=False)]).astype(int)
        return indexes

    @property
    def _merge_data(self):

        os.makedirs(self.root, exist_ok=True)
        [os.makedirs(os.path.join(self.root, field[f]), exist_ok=True) for f in range(field_len)]
        [shutil.copyfile(os.path.join('train', field[f], 'weather.csv'), os.path.join(self.root, field[f], 'weather.csv')) for f in range(field_len)]

        for f in range(field_len):
            for m in range(machine_len):

                machine_dir = os.path.join('train', field[f], machine[f][m])

                machine_data_save_path = os.path.join(self.root, field[f], machine[f][m]) + '.csv'

                print(f'Merge {machine_dir} to {machine_data_save_path} ... t', end='')

                # 用文件操作的方式将两年的数据合并, 用 pandas 合并太慢了
                with open(machine_data_save_path, 'a', encoding='utf-8') as f1:
                    f1.write('time,变频器电网侧有功功率,外界温度,风向,风速n')    # 列名
                    for data_file in os.listdir(machine_dir):
                        with open(os.path.join(machine_dir, data_file), 'r', encoding='utf-8') as f2:
                            f1.writelines(f2.readlines()[1:])    # [1:] -> 首行是列名,不写入

                # 根据 'time' 这一列合并数据
                df = pd.merge(
                    left = time_series[field[f]],
                    right = read_csv(machine_data_save_path),
                    how = 'left',
                    on = ['time']
                )

                df.loc[:, ['time', '变频器电网侧有功功率', '外界温度', '风速', '风向']].to_csv(machine_data_save_path, float_format='%.7f', index=False, encoding='utf-8')

                print('done!')

    @property
    def _check_file(self):
        for f in range(field_len):
            for m in range(machine_len):
                file = os.path.join(self.root, field[f], machine[f][m]) + '.csv'
                if not os.path.exists(file):
                    return True
            file = os.path.join(self.root, field[f], 'weather.csv')
            if not os.path.exists(file):
                return True
        return False
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/342864.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号