之前写过使用 Python yield 实现的滑动窗口,因为用TensorFlow比较多,并且 tf.data
API 处理数据更加高效,对于大数据量的情况,选择 API 实现滑动窗口相比原生的Python方法更好。本文介绍了如何使用 tensorflow 的 tf.data
API 实现滑动窗口。
代码环境:
Python 3.7.6
TensorFlow 2.1.0
导入必要的包:
import tensorflow as tf
在时间序列建模问题中,通常需要时间序列片段,并且的多数情况下是多个维度特征的数据。因此,需要对原始的时间序列数据进行划分,实现截取类似图像的窗口数据,作为样本,构造样本数据集,然后喂给神经网络训练。
先用一个简单的例子演示所述问题:
1. batch 实现 单变量滑动窗口
tf.data.batch
方法说明:
batch(batch_size, drop_remainder=False)
batch_size
:tf.int64
标量,表示单个批次中元素的数量。drop_remainder
:(可选)tf.bool
标量,表示在batch_size
不足批大小的情况下是否删除该批次数据;默认不删除较小的批次。
构造单变量虚拟数据:
range_ds = tf.data.Dataset.range(100000)
batch 实现无重叠,窗口宽度为10的滑动窗口:
# 将数据生成batch_size=10的批数据。其中,drop_remainder 表示
# 在batch_size不足批大小的情况下是否删除该批次数据;默认不删除较小的批次。
batches = range_ds.batch(10, drop_remainder=True)
# 从批次数据中,取出五个批次并打印
for batch in batches.take(5):
print(batch.numpy())
输出:
[0 1 2 3 4 5 6 7 8 9]
[10 11 12 13 14 15 16 17 18 19]
[20 21 22 23 24 25 26 27 28 29]
[30 31 32 33 34 35 36 37 38 39]
[40 41 42 43 44 45 46 47 48 49]
1.1 无重叠采样 有偏移预测
def dense_1_step(batch):
# 将单变量时间序列数据与预测标签数据匹配
# 此处将前9个采样值作为输入,偏移一步的后9个采样值作为输出
return batch[:-1], batch[1:]
# map方法将所有批次数据实现数据与标签的匹配
predict_dense_1_step = batches.map(dense_1_step)
# 打印三个匹配好的样本
for features, label in predict_dense_1_step.take(3):
print(features.numpy(), " => ", label.numpy())
输出:
[0 1 2 3 4 5 6 7 8] => [1 2 3 4 5 6 7 8 9]
[10 11 12 13 14 15 16 17 18] => [11 12 13 14 15 16 17 18 19]
[20 21 22 23 24 25 26 27 28] => [21 22 23 24 25 26 27 28 29]
1.2 无重叠采样 无偏移预测
要预测整个窗口而不是固定的偏移量,,可以将批处理分为两部分:
batches = range_ds.batch(15, drop_remainder=True)
def label_next_5_steps(batch):
return (batch[:-5], # 一个批次内前十个采样点作为输入
batch[-5:]) # 一个批次内后五个采样点作为标签
predict_5_steps = batches.map(label_next_5_steps)
for features, label in predict_5_steps.take(3):
print(features.numpy(), " => ", label.numpy())
则输出:
[0 1 2 3 4 5 6 7 8 9] => [10 11 12 13 14]
[15 16 17 18 19 20 21 22 23 24] => [25 26 27 28 29]
[30 31 32 33 34 35 36 37 38 39] => [40 41 42 43 44]
1.3 有重叠采样 无偏移预测
如果想让样本包含的采样数据有重叠,可以使用 tf.data.Dataset.zip
实现:
feature_length = 10 # 窗口宽度
label_length = 5 # 预测输出的长度
features = range_ds.batch(feature_length, drop_remainder=True)
# skip() 方法表示取一个批次之后的数据
# labels[:-5] 表示截取该批次的前五个采样数据
labels = range_ds.batch(feature_length).skip(1).map(lambda labels: labels[:-5])
# zip 方法实现将样本数据与样本标签匹配
predict_5_steps = tf.data.Dataset.zip((features, labels))
for features, label in predict_5_steps.take(3):
print(features.numpy(), " => ", label.numpy())
输出:
[0 1 2 3 4 5 6 7 8 9] => [10 11 12 13 14]
[10 11 12 13 14 15 16 17 18 19] => [20 21 22 23 24]
[20 21 22 23 24 25 26 27 28 29] => [30 31 32 33 34]
如果将 skip(1)
改为 skip(2)
则输出:
[0 1 2 3 4 5 6 7 8 9] => [20 21 22 23 24]
[10 11 12 13 14 15 16 17 18 19] => [30 31 32 33 34]
[20 21 22 23 24 25 26 27 28 29] => [40 41 42 43 44]
可以看到样本数据与样本标签隔了一个批次。这样做没什么实际意义,只是为了方便理解 skip()
方法。
2. window 实现 单变量滑动窗口
tf.data.window()
方法
window(size, shift=None, stride=1, drop_remainder=False)
参数说明:
size
:表示拆分后每个窗口包含多少个采样点,即窗口宽度。shift
:表示滑动窗口中输入元素的跨度,即滑动步长。stride
:表示采样点之间的跨度;可选参数,默认为None
。
为了方便理解该方法的用法,请看下例:
dataset = tf.data.Dataset.range(7).window(3, None, 1, True)
for window in dataset:
print(list(window.as_numpy_iterator()))
输出:
[0, 1, 2]
[3, 4, 5]
可以看到该示例是无重叠采样,drop_remainder=True
表示丢弃不足窗口宽度的数据。
为了增加可读性,方便比较,仅保留关键代码:
range(7).window(3, 1, 1, True)
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
[4, 5, 6]
-------------------------------
range(7).window(3, 2, 1, True)
[0, 1, 2]
[2, 3, 4]
[4, 5, 6]
-------------------------------
range(7).window(3, 3, 1, True)
[0, 1, 2]
[3, 4, 5]
-------------------------------
range(7).window(3, None, 1, True)
[0, 1, 2]
[3, 4, 5]
-------------------------------
range(7).window(3, None, 2, True)
[0, 2, 4]
-------------------------------
range(7).window(3, None, 3, True)
[0, 3, 6]
-------------------------------
range(7).window(3, 1, 1, True)
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
[4, 5, 6]
-------------------------------
range(7).window(3, 1, 2, True)
[0, 2, 4]
[1, 3, 5]
[2, 4, 6]
-------------------------------
range(7).window(3, 1, 3, True)
[0, 3, 6]
Dataset.flat_map
方法可以获取数据集的数据集并将其展平为单个数据集:
window_size = 5
windows = range_ds.window(window_size, shift=1)
for x in windows.flat_map(lambda x: x).take(30):
print(x.numpy(), end=' ')
输出(为了方便说明该方法的用法,警告信息就不粘过来了):
0 1 2 3 4 1 2 3 4 5 2 3 4 5 6 3 4 5 6 7 4 5 6 7 8 5 6 7 8 9
通过函数封装:
def make_window_dataset(ds, window_size=5, shift=1, stride=1):
windows = ds.window(window_size, shift=shift, stride=stride)
def sub_to_batch(sub):
return sub.batch(window_size, drop_remainder=True)
windows = windows.flat_map(sub_to_batch)
return windows
测试
ds = make_window_dataset(range_ds, window_size=10, shift=1, stride=2)
for example in ds.take(10):
print(example.numpy())
输出:
[ 0 2 4 6 8 10 12 14 16 18]
[ 1 3 5 7 9 11 13 15 17 19]
[ 2 4 6 8 10 12 14 16 18 20]
[ 3 5 7 9 11 13 15 17 19 21]
[ 4 6 8 10 12 14 16 18 20 22]
[ 5 7 9 11 13 15 17 19 21 23]
[ 6 8 10 12 14 16 18 20 22 24]
[ 7 9 11 13 15 17 19 21 23 25]
[ 8 10 12 14 16 18 20 22 24 26]
[ 9 11 13 15 17 19 21 23 25 27]
转载:https://blog.csdn.net/weixin_39653948/article/details/105928752