Fork me on GitHub

Tsfresh时间序列特征提取包介绍

目录

  • 背景

  • 第一部分 tsfresh包安装

  • 第二部分 特征提取及介绍

  • 第三部分 特征参数设置

  • 第四部分 特征选择和过滤

  • 参考文献及资料

背景

时间序列类型的数据是我们数据处理经常遇到的数据类型,这类数据主要特征是具备时间属性,即数据按照时间顺序先后产生。

https://www.iotword.com/4212.html

tsfresh是一个时间序列数据特征提取和特征选取工具包,主要用于时间序列数据的特征工程。官网地址为:https://github.com/blue-yonder/tsfresh

第一部分 tsfresh包安装

安装tsfresh可以使用pip命令进行安装:

1
# pip install tsfresh

包文件介绍:

第一部分 项目结构

第二部分 特征提取及介绍

tsfresh中的特征提取代码在tsfresh/feature_extraction/feature_calculators.py中。我们逐个介绍这些特征的包。

对于特征分为两类:

  • 简单类:特征提取只输出有个单个数值。
  • 组合类:

1、时间序列平方和

函数:tsfresh.feature_extraction.feature_calculators.abs_energy(x)
$$
E=\sum_{i=1}^{n} x_{i}^{2},其中{x_i}_{i=1}^n为时间序列;
$$
源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
def abs_energy(x):
"""
Returns the absolute energy of the time series which is the sum over the squared values
.. math::
E=\sum_{i=1}^{n} x_{i}^{2}
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
if not isinstance(x, (np.ndarray, pd.Series)):
x = np.asarray(x)
return np.dot(x, x)

2、时间序列一阶差分绝对和

函数:tsfresh.feature_extraction.feature_calculators.absolute_sum_of_changes(x)
$$
\sum_{i=1}^{n-1}\left|x_{i+1}-x_{i}\right|
$$
源码如下:

1
2
3
4
5
6
7
8
9
10
11
def absolute_sum_of_changes(x):
"""
Returns the sum over the absolute value of consecutive changes in the series x
.. math::
\\sum_{i=1, \ldots, n-1} \\mid x_{i+1}- x_i \\mid
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
return np.sum(np.abs(np.diff(x)))

3、时间序列各阶自相关系数的聚合统计特征

函数:tsfresh.feature_extraction.feature_calculators.agg_autocorrelation(x, param)

该特征为各阶自相关系数的聚合统计特征。
$$
R(l)=\frac{1}{(n-l) \sigma^{2}} \sum_{i=1}^{n-l}\left(x_{i}-\mu\right)\left(x_{i+l}-\mu\right)
$$
parma(list) 包含一个字典{“f_agg”: x, “maxlag”, n} 其中x为聚合函数名,n为最大差分阶数。函数返回时序数据的各阶差分值之间的聚合(方差、均值)统计特征。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def agg_autocorrelation(x, param):
r"""
Calculates the value of an aggregation function :math:`f_{agg}` (e.g. the variance or the mean) over the
autocorrelation :math:`R(l)` for different lags. The autocorrelation :math:`R(l)` for lag :math:`l` is defined as
.. math::
R(l) = \frac{1}{(n-l)\sigma^{2}} \sum_{t=1}^{n-l}(X_{t}-\mu )(X_{t+l}-\mu)
where :math:`X_i` are the values of the time series, :math:`n` its length. Finally, :math:`\sigma^2` and
:math:`\mu` are estimators for its variance and mean
(See `Estimation of the Autocorrelation function <http://en.wikipedia.org/wiki/Autocorrelation#Estimation>`_).
The :math:`R(l)` for different lags :math:`l` form a vector. This feature calculator applies the aggregation
function :math:`f_{agg}` to this vector and returns
.. math::
f_{agg} \left( R(1), \ldots, R(m)\right) \quad \text{for} \quad m = max(n, maxlag).
Here :math:`maxlag` is the second parameter passed to this function.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:param param: contains dictionaries {"f_agg": x, "maxlag", n} with x str, the name of a numpy function
(e.g. "mean", "var", "std", "median"), its the name of the aggregator function that is applied to the
autocorrelations. Further, n is an int and the maximal number of lags to consider.
:type param: list
:return: the value of this feature
:return type: float
"""
# if the time series is longer than the following threshold, we use fft to calculate the acf
THRESHOLD_TO_USE_FFT = 1250
var = np.var(x)
n = len(x)
max_maxlag = max([config["maxlag"] for config in param])

if np.abs(var) < 10**-10 or n == 1:
a = [0] * len(x)
else:
a = acf(x, unbiased=True, fft=n > THRESHOLD_TO_USE_FFT, nlags=max_maxlag)[1:]
return [("f_agg_\"{}\"__maxlag_{}".format(config["f_agg"], config["maxlag"]),
getattr(np, config["f_agg"])(a[:int(config["maxlag"])])) for config in param]

4、时间序列基于分块时序聚合值的线性回归

函数:tsfresh.feature_extraction.feature_calculators.agg_linear_trend(x, param)

源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def agg_linear_trend(x, param):
"""
Calculates a linear least-squares regression for values of the time series that were aggregated over chunks versus
the sequence from 0 up to the number of chunks minus one.
This feature assumes the signal to be uniformly sampled. It will not use the time stamps to fit the model.
The parameters attr controls which of the characteristics are returned. Possible extracted attributes are "pvalue",
"rvalue", "intercept", "slope", "stderr", see the documentation of linregress for more information.
The chunksize is regulated by "chunk_len". It specifies how many time series values are in each chunk.
Further, the aggregation function is controlled by "f_agg", which can use "max", "min" or , "mean", "median"
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:param param: contains dictionaries {"attr": x, "chunk_len": l, "f_agg": f} with x, f an string and l an int
:type param: list
:return: the different feature values
:return type: pandas.Series
"""
# todo: we could use the index of the DataFrame here

calculated_agg = {}
res_data = []
res_index = []

for parameter_combination in param:

chunk_len = parameter_combination["chunk_len"]
f_agg = parameter_combination["f_agg"]

aggregate_result = _aggregate_on_chunks(x, f_agg, chunk_len)
if f_agg not in calculated_agg or chunk_len not in calculated_agg[f_agg]:
if chunk_len >= len(x):
calculated_agg[f_agg] = {chunk_len: np.NaN}
else:
lin_reg_result = linregress(range(len(aggregate_result)), aggregate_result)
calculated_agg[f_agg] = {chunk_len: lin_reg_result}

attr = parameter_combination["attr"]

if chunk_len >= len(x):
res_data.append(np.NaN)
else:
res_data.append(getattr(calculated_agg[f_agg][chunk_len], attr))

res_index.append("f_agg_\"{}\"__chunk_len_{}__attr_\"{}\"".format(f_agg, chunk_len, attr))

return zip(res_index, res_data)

5、时间序列近似熵

函数:tsfresh.feature_extraction.feature_calculators.approximate_entropy(x, m, r)

源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def approximate_entropy(x, m, r):
"""
Implements a vectorized Approximate entropy algorithm.
https://en.wikipedia.org/wiki/Approximate_entropy
For short time-series this method is highly dependent on the parameters,
but should be stable for N > 2000, see:
Yentes et al. (2012) -
*The Appropriate Use of Approximate Entropy and Sample Entropy with Short Data Sets*
Other shortcomings and alternatives discussed in:
Richman & Moorman (2000) -
*Physiological time-series analysis using approximate entropy and sample entropy*
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:param m: Length of compared run of data
:type m: int
:param r: Filtering level, must be positive
:type r: float
:return: Approximate entropy
:return type: float
"""
if not isinstance(x, (np.ndarray, pd.Series)):
x = np.asarray(x)

N = x.size
r *= np.std(x)
if r < 0:
raise ValueError("Parameter r must be positive.")
if N <= m+1:
return 0

def _phi(m):
x_re = np.array([x[i:i+m] for i in range(N - m + 1)])
C = np.sum(np.max(np.abs(x_re[:, np.newaxis] - x_re[np.newaxis, :]),
axis=2) <= r, axis=0) / (N-m+1)
return np.sum(np.log(C)) / (N - m + 1.0)

return np.abs(_phi(m) - _phi(m + 1))

6、时间序列自回归系数

函数:tsfresh.feature_extraction.feature_calculators.ar_coefficient(x, param)

衡量时序数据的的周期性、不可预测性和波动性。

源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def ar_coefficient(x, param):
"""
This feature calculator fits the unconditional maximum likelihood
of an autoregressive AR(k) process.
The k parameter is the maximum lag of the process
.. math::
X_{t}=\\varphi_0 +\\sum _{{i=1}}^{k}\\varphi_{i}X_{{t-i}}+\\varepsilon_{t}
For the configurations from param which should contain the maxlag "k" and such an AR process is calculated. Then
the coefficients :math:`\\varphi_{i}` whose index :math:`i` contained from "coeff" are returned.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:param param: contains dictionaries {"coeff": x, "k": y} with x,y int
:type param: list
:return x: the different feature values
:return type: pandas.Series
"""
calculated_ar_params = {}

x_as_list = list(x)
calculated_AR = AR(x_as_list)

res = {}

for parameter_combination in param:
k = parameter_combination["k"]
p = parameter_combination["coeff"]

column_name = "k_{}__coeff_{}".format(k, p)

if k not in calculated_ar_params:
try:
calculated_ar_params[k] = calculated_AR.fit(maxlag=k, solver="mle").params
except (LinAlgError, ValueError):
calculated_ar_params[k] = [np.NaN]*k

mod = calculated_ar_params[k]

if p <= k:
try:
res[column_name] = mod[p]
except IndexError:
res[column_name] = 0
else:
res[column_name] = np.NaN

return [(key, value) for key, value in res.items()]

7、时间序列ADF检验

函数:tsfresh.feature_extraction.feature_calculators.augmented_dickey_fuller(x, param)

测试一个自回归模型是否存在单位根,衡量时序数据的平稳性。

源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def augmented_dickey_fuller(x, param):
"""
The Augmented Dickey-Fuller test is a hypothesis test which checks whether a unit root is present in a time
series sample. This feature calculator returns the value of the respective test statistic.
See the statsmodels implementation for references and more details.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:param param: contains dictionaries {"attr": x} with x str, either "teststat", "pvalue" or "usedlag"
:type param: list
:return: the value of this feature
:return type: float
"""
res = None
try:
res = adfuller(x)
except LinAlgError:
res = np.NaN, np.NaN, np.NaN
except ValueError: # occurs if sample size is too small
res = np.NaN, np.NaN, np.NaN
except MissingDataError: # is thrown for e.g. inf or nan in the data
res = np.NaN, np.NaN, np.NaN

return [('attr_"{}"'.format(config["attr"]),
res[0] if config["attr"] == "teststat"
else res[1] if config["attr"] == "pvalue"
else res[2] if config["attr"] == "usedlag" else np.NaN)
for config in param]

8、时间序列lag阶自相关性

函数:tsfresh.feature_extraction.feature_calculators.autocorrelation(x, lag)

计算lag阶滞后时序数据的自相关性(浮点数)

源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def autocorrelation(x, lag):
"""
Calculates the autocorrelation of the specified lag, according to the formula [1]
.. math::
\\frac{1}{(n-l)\sigma^{2}} \\sum_{t=1}^{n-l}(X_{t}-\\mu )(X_{t+l}-\\mu)
where :math:`n` is the length of the time series :math:`X_i`, :math:`\sigma^2` its variance and :math:`\mu` its
mean. `l` denotes the lag.
.. rubric:: References
[1] https://en.wikipedia.org/wiki/Autocorrelation#Estimation
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:param lag: the lag
:type lag: int
:return: the value of this feature
:return type: float
"""
# This is important: If a series is passed, the product below is calculated
# based on the index, which corresponds to squaring the series.
if type(x) is pd.Series:
x = x.values
if len(x) < lag:
return np.nan
# Slice the relevant subseries based on the lag
y1 = x[:(len(x)-lag)]
y2 = x[lag:]
# Subtract the mean of the whole series x
x_mean = np.mean(x)
# The result is sometimes referred to as "covariation"
sum_product = np.sum((y1 - x_mean) * (y2 - x_mean))
# Return the normalized unbiased covariance
v = np.var(x)
if np.isclose(v, 0):
return np.NaN
else:
return sum_product / ((len(x) - lag) * v)

9、时间序列分组熵

函数:tsfresh.feature_extraction.feature_calculators.binned_entropy(x, max_bins)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def binned_entropy(x, max_bins):
"""
First bins the values of x into max_bins equidistant bins.
Then calculates the value of
.. math::
- \\sum_{k=0}^{min(max\\_bins, len(x))} p_k log(p_k) \\cdot \\mathbf{1}_{(p_k > 0)}
where :math:`p_k` is the percentage of samples in bin :math:`k`.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:param max_bins: the maximal number of bins
:type max_bins: int
:return: the value of this feature
:return type: float
"""
if not isinstance(x, (np.ndarray, pd.Series)):
x = np.asarray(x)
hist, bin_edges = np.histogram(x, bins=max_bins)
probs = hist / x.size
return - np.sum(p * np.math.log(p) for p in probs if p != 0)

10、时序数据非线性度量

函数:tsfresh.feature_extraction.feature_calculators.c3(x, lag)

  • 基于物理学的时序数据非线性度量(浮点数)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def c3(x, lag):
"""
This function calculates the value of
.. math::
\\frac{1}{n-2lag} \sum_{i=0}^{n-2lag} x_{i + 2 \cdot lag}^2 \cdot x_{i + lag} \cdot x_{i}
which is
.. math::
\\mathbb{E}[L^2(X)^2 \cdot L(X) \cdot X]
where :math:`\\mathbb{E}` is the mean and :math:`L` is the lag operator. It was proposed in [1] as a measure of
non linearity in the time series.
.. rubric:: References
| [1] Schreiber, T. and Schmitz, A. (1997).
| Discrimination power of measures for nonlinearity in a time series
| PHYSICAL REVIEW E, VOLUME 55, NUMBER 5
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:param lag: the lag that should be used in the calculation of the feature
:type lag: int
:return: the value of this feature
:return type: float
"""
if not isinstance(x, (np.ndarray, pd.Series)):
x = np.asarray(x)
n = x.size
if 2 * lag >= n:
return 0
else:
return np.mean((_roll(x, 2 * -lag) * _roll(x, -lag) * x)[0:(n - 2 * lag)])

11、时间序列给定区间的统计量

函数:tsfresh.feature_extraction.feature_calculators.change_quantiles(x, ql, qh, isabs, f_agg)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def change_quantiles(x, ql, qh, isabs, f_agg):
"""
First fixes a corridor given by the quantiles ql and qh of the distribution of x.
Then calculates the average, absolute value of consecutive changes of the series x inside this corridor.
Think about selecting a corridor on the
y-Axis and only calculating the mean of the absolute change of the time series inside this corridor.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:param ql: the lower quantile of the corridor
:type ql: float
:param qh: the higher quantile of the corridor
:type qh: float
:param isabs: should the absolute differences be taken?
:type isabs: bool
:param f_agg: the aggregator function that is applied to the differences in the bin
:type f_agg: str, name of a numpy function (e.g. mean, var, std, median)
:return: the value of this feature
:return type: float
"""
if ql >= qh:
ValueError("ql={} should be lower than qh={}".format(ql, qh))

div = np.diff(x)
if isabs:
div = np.abs(div)
# All values that originate from the corridor between the quantiles ql and qh will have the category 0,
# other will be np.NaN
try:
bin_cat = pd.qcut(x, [ql, qh], labels=False)
bin_cat_0 = bin_cat == 0
except ValueError: # Occurs when ql are qh effectively equal, e.g. x is not long enough or is too categorical
return 0
# We only count changes that start and end inside the corridor
ind = (bin_cat_0 & _roll(bin_cat_0, 1))[1:]
if sum(ind) == 0:
return 0
else:
ind_inside_corridor = np.where(ind == 1)
aggregator = getattr(np, f_agg)
return aggregator(div[ind_inside_corridor])

12、时间序列复杂度

函数:tsfresh.feature_extraction.feature_calculators.cid_ce(x, normalize)

用来评估时间序列的复杂度,越复杂的序列有越多的谷峰。 (浮点数)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def cid_ce(x, normalize):
"""
This function calculator is an estimate for a time series complexity [1] (A more complex time series has more peaks,
valleys etc.). It calculates the value of
.. math::
\\sqrt{ \\sum_{i=0}^{n-2lag} ( x_{i} - x_{i+1})^2 }
.. rubric:: References
| [1] Batista, Gustavo EAPA, et al (2014).
| CID: an efficient complexity-invariant distance for time series.
| Data Mining and Knowledge Discovery 28.3 (2014): 634-669.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:param normalize: should the time series be z-transformed?
:type normalize: bool
:return: the value of this feature
:return type: float
"""
if not isinstance(x, (np.ndarray, pd.Series)):
x = np.asarray(x)
if normalize:
s = np.std(x)
if s!=0:
x = (x - np.mean(x))/s
else:
return 0.0

x = np.diff(x)
return np.sqrt(np.dot(x, x))

13、时间序列高于均值个数

函数:tsfresh.feature_extraction.feature_calculators.count_above_mean(x)

源码如下:

1
2
3
4
5
6
7
8
9
10
def count_above_mean(x):
"""
Returns the number of values in x that are higher than the mean of x
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
m = np.mean(x)
return np.where(x > m)[0].size

14、时间序列低于均值个数

函数:tsfresh.feature_extraction.feature_calculators.count_below_mean(x)

源代码:

1
2
3
4
5
6
7
8
9
10
def count_below_mean(x):
"""
Returns the number of values in x that are lower than the mean of x
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
m = np.mean(x)
return np.where(x < m)[0].size

15、时间序列Ricker小波分析

函数:tsfresh.feature_extraction.feature_calculators.cwt_coefficients(x, param)

  • 连续的小波分析,ricker子波是地震勘探中常用的子波类型,ricker子波是基于波动方程严格推导得到的。(pandas.Series)

16、时间序列分块局部熵比率

tsfresh.feature_extraction.feature_calculators.energy_ratio_by_chunks(x, param)

将时序数据分块后,计算目标块数据的熵与全体的熵比率。当数据不够均分时,会将多余的数据在前面的块中散布。(浮点数)

17、时间序列绝对傅里叶变换的谱统计量

tsfresh.feature_extraction.feature_calculators.fft_aggregated(x, param)

18、时间序列傅里叶变换系数

tsfresh.feature_extraction.feature_calculators.fft_coefficient(x, param)

19、时间序列第一个最大值位置

函数:tsfresh.feature_extraction.feature_calculators.first_location_of_maximum(x)

源代码:

1
2
3
4
5
6
7
8
9
10
11
12
def first_location_of_maximum(x):
"""
Returns the first location of the maximum value of x.
The position is calculated relatively to the length of x.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
if not isinstance(x, (np.ndarray, pd.Series)):
x = np.asarray(x)
return np.argmax(x) / len(x) if len(x) > 0 else np.NaN

20、时间序列第一个最小值位置

函数:tsfresh.feature_extraction.feature_calculators.first_location_of_minimum(x)

源代码:

1
2
3
4
5
6
7
8
9
10
11
12
def first_location_of_minimum(x):
"""
Returns the first location of the minimal value of x.
The position is calculated relatively to the length of x.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
if not isinstance(x, (np.ndarray, pd.Series)):
x = np.asarray(x)
return np.argmin(x) / len(x) if len(x) > 0 else np.NaN

21、时间序列Langevin模型拟合的多项式系数

tsfresh.feature_extraction.feature_calculators.friedrich_coefficients(x, param)

22、时间序列数值是否有重复

函数:tsfresh.feature_extraction.feature_calculators.has_duplicate(x)

1
2
3
4
5
6
7
8
9
10
11
def has_duplicate(x):
"""
Checks if any value in x occurs more than once
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: bool
"""
if not isinstance(x, (np.ndarray, pd.Series)):
x = np.asarray(x)
return x.size != np.unique(x).size

23、时间序列最大值是否有重复

函数:tsfresh.feature_extraction.feature_calculators.has_duplicate_max(x)

1
2
3
4
5
6
7
8
9
10
11
def has_duplicate(x):
"""
Checks if any value in x occurs more than once
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: bool
"""
if not isinstance(x, (np.ndarray, pd.Series)):
x = np.asarray(x)
return x.size != np.unique(x).size

24、时间序列最小值是否有重复

函数:tsfresh.feature_extraction.feature_calculators.has_duplicate_min(x)

1
2
3
4
5
6
7
8
9
10
11
def has_duplicate_min(x):
"""
Checks if the minimal value of x is observed more than once
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: bool
"""
if not isinstance(x, (np.ndarray, pd.Series)):
x = np.asarray(x)
return np.sum(x == np.min(x)) >= 2

25、时间序列分位数索引

tsfresh.feature_extraction.feature_calculators.index_mass_quantile(x, param)

26、时间序列峰度

tsfresh.feature_extraction.feature_calculators.kurtosis(x)

描述数据分布形态的陡缓程度

1
2
3
4
5
6
7
8
9
10
11
12
13
def kurtosis(x):
"""
Returns the kurtosis of x (calculated with the adjusted Fisher-Pearson standardized
moment coefficient G2).

:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
if not isinstance(x, pd.Series):
x = pd.Series(x)
return pd.Series.kurtosis(x)

27、时间序列标准差是否大于r倍偏差

函数:tsfresh.feature_extraction.feature_calculators.large_standard_deviation(x, r)

源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def large_standard_deviation(x, r):
"""
Boolean variable denoting if the standard dev of x is higher
than 'r' times the range = difference between max and min of x.
Hence it checks if
.. math::
std(x) > r * (max(X)-min(X))
According to a rule of the thumb, the standard deviation should be a forth of the range of the values.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:param r: the percentage of the range to compare with
:type r: float
:return: the value of this feature
:return type: bool
"""
if not isinstance(x, (np.ndarray, pd.Series)):
x = np.asarray(x)
return np.std(x) > (r * (np.max(x) - np.min(x)))

28、时间序列最后一个最大值位置

函数:tsfresh.feature_extraction.feature_calculators.last_location_of_maximum(x)

源代码如下:

1
2
3
4
5
6
7
8
9
10
11
def last_location_of_maximum(x):
"""
Returns the relative last location of the maximum value of x.
The position is calculated relatively to the length of x.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
x = np.asarray(x)
return 1.0 - np.argmax(x[::-1]) / len(x) if len(x) > 0 else np.NaN

29、时间序列最后一个最大值位置

函数:tsfresh.feature_extraction.feature_calculators.last_location_of_minimum(x)

源代码:

1
2
3
4
5
6
7
8
9
10
11
def last_location_of_minimum(x):
"""
Returns the last location of the minimal value of x.
The position is calculated relatively to the length of x.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
x = np.asarray(x)
return 1.0 - np.argmin(x[::-1]) / len(x) if len(x) > 0 else np.NaN

30、时间序列的长度

函数:tsfresh.feature_extraction.feature_calculators.length(x)

源代码:

1
2
3
4
5
6
7
8
9
def length(x):
"""
Returns the length of x
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: int
"""
return len(x)

31、时间序列线性回归分析

tsfresh.feature_extraction.feature_calculators.linear_trend(x, param)

32、时间序列均值上的最长连续自列长度

tsfresh.feature_extraction.feature_calculators.longest_strike_above_mean(x)

33、时间序列均值下的最长连续自列长度

tsfresh.feature_extraction.feature_calculators.longest_strike_below_mean(x)

34、时间序列最大langevin不动点

tsfresh.feature_extraction.feature_calculators.max_langevin_fixed_point(x, r, m)

35、时间序列最大值

函数:tsfresh.feature_extraction.feature_calculators.maximum(x)

该特征为时间序列最大值。
$$
MAX = max{x_{i}}_{i=1}^{n}
$$
源代码为:

1
2
3
4
5
6
7
8
9
def maximum(x):
"""
Calculates the highest value of the time series x.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
return np.max(x)

36、时间序列平均值

函数:tsfresh.feature_extraction.feature_calculators.mean(x)

该特征为时间序列平均值。
$$
Mean = Mean{x_{i}}_{i=1}^{n}
$$
源代码为:

1
2
3
4
5
6
7
8
9
def mean(x):
"""
Returns the mean of x
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
return np.mean(x)

37、时间序列一阶差分绝对平均值

函数:tsfresh.feature_extraction.feature_calculators.mean_abs_change(x)

$$
\frac{1}{n}\sum_{i=1}^{n-1}\left|x_{i+1}-x_{i}\right|
$$
源代码如下:

1
2
3
4
5
6
7
8
9
10
11
def mean_abs_change(x):
"""
Returns the mean over the absolute differences between subsequent time series values which is
.. math::
\\frac{1}{n} \\sum_{i=1,\ldots, n-1} | x_{i+1} - x_{i}|
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
return np.mean(np.abs(np.diff(x)))

38、时间序列一阶差分平均值

函数:tsfresh.feature_extraction.feature_calculators.mean_change(x)

$$
\frac{1}{n} \sum_{i=1}^{n-1}x_{i+1}-x_{i}
$$
源代码如下:

1
2
3
4
5
6
7
8
9
10
11
def mean_change(x):
"""
Returns the mean over the differences between subsequent time series values which is
.. math::
\\frac{1}{n} \\sum_{i=1,\ldots, n-1} x_{i+1} - x_{i}
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
return np.mean(np.diff(x))

39、时间序列二阶导数的中心均值

函数:tsfresh.feature_extraction.feature_calculators.mean_second_derivative_central(x)
$$
\frac{1}{n} \sum_{i=1}^{n-1}\frac{1}{2} (x_{i+2} - 2 \cdot x_{i+1} + x_i)
$$
源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
def mean_second_derivative_central(x):
"""
Returns the mean value of a central approximation of the second derivative
.. math::
\\frac{1}{n} \\sum_{i=1,\ldots, n-1} \\frac{1}{2} (x_{i+2} - 2 \\cdot x_{i+1} + x_i)
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""

diff = (_roll(x, 1) - 2 * np.array(x) + _roll(x, -1)) / 2.0
return np.mean(diff[1:-1])

40、时间序列中位数

函数:tsfresh.feature_extraction.feature_calculators.median(x)
$$
median = median{x_{i}}_{i=1}^{n}
$$
源代码如下:

1
2
3
4
5
6
7
8
9
def median(x):
"""
Returns the median of x
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
return np.median(x)

41、时间序列最小值

函数:tsfresh.feature_extraction.feature_calculators.minimum(x)
$$
min = min{x_{i}}_{i=1}^{n}
$$
源代码如下:

1
2
3
4
5
6
7
8
9
def minimum(x):
"""
Calculates the lowest value of the time series x.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
return np.min(x)

42、时间序列的交叉次数

函数:tsfresh.feature_extraction.feature_calculators.number_crossing_m(x, m)

这个特征通俗的讲:给定阀值m,查找时间序列中任意两个连续值组成的数值区间是否涵盖m值。例如时间序列[1,2,1,2,3],对于给定的m=1.5,那么交叉数为3。

需要注意的:如果连续数值为a<b,数值区间为[a,b)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def number_crossing_m(x, m):
"""
Calculates the number of crossings of x on m. A crossing is defined as two sequential values where the first value is lower than m and the next is greater, or vice-versa. If you set m to zero, you will get the number of zero crossings.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:param m: the threshold for the crossing
:type m: float
:return: the value of this feature
:return type: int
"""
if not isinstance(x, (np.ndarray, pd.Series)):
x = np.asarray(x)
# From https://stackoverflow.com/questions/3843017/efficiently-detect-sign-changes-in-python
positive = x > m
return np.where(np.bitwise_xor(positive[1:], positive[:-1]))[0].size

43、时间序列搜寻不同峰值

函数:tsfresh.feature_extraction.feature_calculators.number_cwt_peaks(x, n)

使用连续小波变换寻找时间序列中的波峰。

1
2
3
4
5
6
7
8
9
10
11
12
13
from scipy.signal import cwt, find_peaks_cwt, ricker, welch

def number_cwt_peaks(x, n):
"""
This feature calculator searches for different peaks in x. To do so, x is smoothed by a ricker wavelet and for widths ranging from 1 to n. This feature calculator returns the number of peaks that occur at enough width scales and with sufficiently high Signal-to-Noise-Ratio (SNR)
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:param n: maximum width to consider
:type n: int
:return: the value of this feature
:return type: int
"""
return len(find_peaks_cwt(vector=x, widths=np.array(list(range(1, n + 1))), wavelet=ricker))

44、时间序列领域支撑峰值数量

函数:tsfresh.feature_extraction.feature_calculators.number_peaks(x, n)

对于给定的n值(整型),

源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def number_peaks(x, n):
"""
Calculates the number of peaks of at least support n in the time series x. A peak of support n is defined as a
subsequence of x where a value occurs, which is bigger than its n neighbours to the left and to the right.
Hence in the sequence
>>> x = [3, 0, 0, 4, 0, 0, 13]
4 is a peak of support 1 and 2 because in the subsequences
>>> [0, 4, 0]
>>> [0, 0, 4, 0, 0]
4 is still the highest value. Here, 4 is not a peak of support 3 because 13 is the 3th neighbour to the right of 4
and its bigger than 4.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:param n: the support of the peak
:type n: int
:return: the value of this feature
:return type: float
"""
x_reduced = x[n:-n]

res = None
for i in range(1, n + 1):
result_first = (x_reduced > _roll(x, i)[n:-n])

if res is None:
res = result_first
else:
res &= result_first

res &= (x_reduced > _roll(x, -i)[n:-n])
return np.sum(res)

45、

tsfresh.feature_extraction.feature_calculators.partial_autocorrelation(x, param)
$$
\alpha_k = \frac{ Cov(x_t, x_{t-k} | x_{t-1}, \ldots, x_{t-k+1})}
{\sqrt{ Var(x_t | x_{t-1}, \ldots, x_{t-k+1}) Var(x_{t-k} | x_{t-1}, \ldots, x_{t-k+1} )}}
$$

46、时间序列重复数字个数占比

函数:tsfresh.feature_extraction.feature_calculators.percentage_of_reoccurring_datapoints_to_all_datapoints(x)

源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def percentage_of_reoccurring_datapoints_to_all_datapoints(x):
"""
Returns the percentage of unique values, that are present in the time series
more than once.
len(different values occurring more than once) / len(different values)
This means the percentage is normalized to the number of unique values,
in contrast to the percentage_of_reoccurring_values_to_all_values.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
if len(x) == 0:
return np.nan

unique, counts = np.unique(x, return_counts=True)

if counts.shape[0] == 0:
return 0

return np.sum(counts > 1) / float(counts.shape[0])

47、时间序列重复数字占比

tsfresh.feature_extraction.feature_calculators.percentage_of_reoccurring_values_to_all_values(x)

源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def percentage_of_reoccurring_values_to_all_values(x):
"""
Returns the ratio of unique values, that are present in the time series
more than once.
# of data points occurring more than once / # of all data points
This means the ratio is normalized to the number of data points in the time series,
in contrast to the percentage_of_reoccurring_datapoints_to_all_datapoints.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
if not isinstance(x, pd.Series):
x = pd.Series(x)

if x.size == 0:
return np.nan

value_counts = x.value_counts()
reoccuring_values = value_counts[value_counts > 1].sum()

if np.isnan(reoccuring_values):
return 0

return reoccuring_values / x.size

48、时间序列分数位

函数:tsfresh.feature_extraction.feature_calculators.quantile(x, q)

源代码:

1
2
3
4
5
6
7
8
9
10
11
12
def quantile(x, q):
"""
Calculates the q quantile of x. This is the value of x greater than q% of the ordered values from x.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:param q: the quantile to calculate
:type q: float
:return: the value of this feature
:return type: float
"""
x = pd.Series(x)
return pd.Series.quantile(x, q)

49、时间序列指定区间数值个数

函数:tsfresh.feature_extraction.feature_calculators.range_count(x, min, max)

源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
def range_count(x, min, max):
"""
Count observed values within the interval [min, max).
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:param min: the inclusive lower bound of the range
:type min: int or float
:param max: the exclusive upper bound of the range
:type max: int or float
:return: the count of values within the range
:rtype: int
"""
return np.sum((x >= min) & (x < max))

50、时间序列sigma原则

函数:tsfresh.feature_extraction.feature_calculators.ratio_beyond_r_sigma(x, r)

源码入下:

1
2
3
4
5
6
7
8
9
10
11
def ratio_beyond_r_sigma(x, r):
"""
Ratio of values that are more than r*std(x) (so r sigma) away from the mean of x.
:param x: the time series to calculate the feature of
:type x: iterable
:return: the value of this feature
:return type: float
"""
if not isinstance(x, (np.ndarray, pd.Series)):
x = np.asarray(x)
return np.sum(np.abs(x - np.mean(x)) > r * np.std(x))/x.size

51、时间序列唯一值数量占整体的比例

函数:tsfresh.feature_extraction.feature_calculators.ratio_value_number_to_time_series_length(x)

源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def ratio_value_number_to_time_series_length(x):
"""
Returns a factor which is 1 if all values in the time series occur only once,
and below one if this is not the case.
In principle, it just returns
# unique values / # values
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
if not isinstance(x, (np.ndarray, pd.Series)):
x = np.asarray(x)
if x.size == 0:
return np.nan

return np.unique(x).size / x.size

52、时间序列样本熵

函数:tsfresh.feature_extraction.feature_calculators.sample_entropy(x)

通过度量信号中产生新模式的概率大小来衡量时间序列复杂性,新模式产生的概率越大,序列的复杂性就越大。样本熵的值越低,序列自我相似性就越高;样本熵的值越大,样本序列就越复杂。

源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def sample_entropy(x):
"""
Calculate and return sample entropy of x.
.. rubric:: References
| [1] http://en.wikipedia.org/wiki/Sample_Entropy
| [2] https://www.ncbi.nlm.nih.gov/pubmed/10843903?dopt=Abstract
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
x = np.array(x)

sample_length = 1 # number of sequential points of the time series
tolerance = 0.2 * np.std(x) # 0.2 is a common value for r - why?

n = len(x)
prev = np.zeros(n)
curr = np.zeros(n)
A = np.zeros((1, 1)) # number of matches for m = [1,...,template_length - 1]
B = np.zeros((1, 1)) # number of matches for m = [1,...,template_length]

for i in range(n - 1):
nj = n - i - 1
ts1 = x[i]
for jj in range(nj):
j = jj + i + 1
if abs(x[j] - ts1) < tolerance: # distance between two vectors
curr[jj] = prev[jj] + 1
temp_ts_length = min(sample_length, curr[jj])
for m in range(int(temp_ts_length)):
A[m] += 1
if j < n - 1:
B[m] += 1
else:
curr[jj] = 0
for j in range(nj):
prev[j] = curr[j]

N = n * (n - 1) / 2
B = np.vstack(([N], B[0]))

# sample entropy = -1 * (log (A/B))
similarity_ratio = A / B
se = -1 * np.log(similarity_ratio)
se = np.reshape(se, -1)
return se[0]

53、

tsfresh.feature_extraction.feature_calculators.set_property(key, value)

54、时间序列分布偏度

函数:tsfresh.feature_extraction.feature_calculators.skewness(x)

偏度是统计数据分布偏斜方向和程度的度量,是统计数据分布非对称程度的数字特征。

1
2
3
4
5
6
7
8
9
10
11
12
def skewness(x):
"""
Returns the sample skewness of x (calculated with the adjusted Fisher-Pearson standardized
moment coefficient G1).
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
if not isinstance(x, pd.Series):
x = pd.Series(x)
return pd.Series.skew(x)

55、

tsfresh.feature_extraction.feature_calculators.spkt_welch_density(x, param)

56、时间序列的标准方差

函数:tsfresh.feature_extraction.feature_calculators.standard_deviation(x)

源代码如下:

1
2
3
4
5
6
7
8
9
def standard_deviation(x):
"""
Returns the standard deviation of x
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
return np.std(x)

57、时间序列重复数据的总个数

函数:tsfresh.feature_extraction.feature_calculators.sum_of_reoccurring_data_points(x)

源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
def sum_of_reoccurring_data_points(x):
"""
Returns the sum of all data points, that are present in the time series
more than once.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
unique, counts = np.unique(x, return_counts=True)
counts[counts < 2] = 0
return np.sum(counts * unique)

58、时间序列重复数据的和

函数:tsfresh.feature_extraction.feature_calculators.sum_of_reoccurring_values(x)

源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
def sum_of_reoccurring_values(x):
"""
Returns the sum of all values, that are present in the time series
more than once.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
unique, counts = np.unique(x, return_counts=True)
counts[counts < 2] = 0
counts[counts > 1] = 1
return np.sum(counts * unique)

59、时间序列和

函数:tsfresh.feature_extraction.feature_calculators.sum_values(x)

该特征为时间序列和。
$$
SUM = \sum_{i=1}^{n} x_{i}^{2}
$$
源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
def sum_values(x):
"""
Calculates the sum over the time series values
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
if len(x) == 0:
return 0

return np.sum(x)

60、

tsfresh.feature_extraction.feature_calculators.symmetry_looking(x, param)

61、

tsfresh.feature_extraction.feature_calculators.time_reversal_asymmetry_statistic(x, lag)

62、时间序列某个值的个数

函数:tsfresh.feature_extraction.feature_calculators.value_count(x, value)

源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def value_count(x, value):
"""
Count occurrences of `value` in time series x.
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:param value: the value to be counted
:type value: int or float
:return: the count
:rtype: int
"""
if not isinstance(x, (np.ndarray, pd.Series)):
x = np.asarray(x)

if np.isnan(value):
return np.isnan(x).sum()
else:
return x[x == value].size

63、时间序列方差

函数:tsfresh.feature_extraction.feature_calculators.variance(x)

源代码如下:

1
2
3
4
5
6
7
8
9
def variance(x):
"""
Returns the variance of x
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: float
"""
return np.var(x)

64、时间序列方差是否大于标准方差

函数:tsfresh.feature_extraction.feature_calculators.variance_larger_than_standard_deviation(x)

代码如下:

1
2
3
4
5
6
7
8
9
10
11
def variance_larger_than_standard_deviation(x):
"""
Boolean variable denoting if the variance of x is greater than its standard deviation. Is equal to variance of x
being larger than 1
:param x: the time series to calculate the feature of
:type x: numpy.ndarray
:return: the value of this feature
:return type: bool
"""
y = np.var(x)
return y > np.sqrt(y)

65、

tsfresh.feature_extraction.feature_calculators.linear_trend_timewise(x, param)

第三部分 特征参数设置

第四部分 特征选择和过滤

https://gitmemories.com/cuge1995/awesome-time-series

第五部分 附录

案例数据下载:

https://bj.bcebos.com/v1/ai-studio-online/85b5cb4eea5a4f259766f42a448e2c04a7499c43e1ae4cc28fbdee8e087e2385?responseContentDisposition=attachment%3B%20filename%3Dwtbdata_245days.csv&authorization=bce-auth-v1%2F0ef6765c1e494918bc0d4c3ca3e5c6d1%2F2022-05-05T14%3A17%3A03Z%2F-1%2F%2F5932bfb6aa3af1bcfb467bf2a4a6877f8823fe96c6f4fd0d4a3caa722354e3ac

参考文献及资料

1、https://blog.csdn.net/qq_39478403/article/details/115057005

2、https://tsfresh.readthedocs.io/en/latest/text/quick_start.html

0%