1
1
from typing import Optional , Union
2
2
3
- import numpy as np
4
3
import pandas as pd
5
4
import xarray as xr
6
5
from torchdata .datapipes import functional_datapipe
9
8
10
9
@functional_datapipe ("pv_power_rolling_window" )
11
10
class PVPowerRollingWindowIterDataPipe (IterDataPipe ):
11
+ """Compute rolling mean of PV power."""
12
12
def __init__ (
13
13
self ,
14
- source_dp : IterDataPipe ,
14
+ source_datapipe : IterDataPipe ,
15
15
window : Union [int , pd .tseries .offsets .DateOffset , pd .core .indexers .objects .BaseIndexer ] = 3 ,
16
16
min_periods : Optional [int ] = 2 ,
17
17
center : bool = True ,
18
18
win_type : Optional [str ] = None ,
19
19
expect_dataset : bool = True ,
20
20
):
21
- self .source_dp = source_dp
21
+ """
22
+ Compute the rolling mean of PV power data
23
+
24
+ Args:
25
+ source_datapipe: Datapipe emitting PV Xarray object
26
+
27
+ window: Size of the moving window.
28
+ If an integer, the fixed number of observations used for each window.
29
+
30
+ If an offset, the time period of each window. Each window will be a variable sized
31
+ based on the observations included in the time-period. This is only valid for
32
+ datetimelike indexes. To learn more about the offsets & frequency strings, please see:
33
+ https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases
34
+
35
+ If a BaseIndexer subclass, the window boundaries based on the defined
36
+ `get_window_bounds` method. Additional rolling keyword arguments,
37
+ namely `min_periods` and `center` will be passed to `get_window_bounds`.
38
+
39
+ min_periods: Minimum number of observations in window required to have a value;
40
+ otherwise, result is `np.nan`.
41
+
42
+ To avoid NaNs at the start and end of the timeseries, this should be <= ceil(window/2).
43
+
44
+ For a window that is specified by an offset, `min_periods` will default to 1.
45
+
46
+ For a window that is specified by an integer, `min_periods` will default to the size of
47
+ the window.
48
+
49
+ center: If False, set the window labels as the right edge of the window index.
50
+ If True, set the window labels as the center of the window index.
51
+
52
+ win_type: Window type
53
+ expect_dataset: Whether to expect a dataset or DataArray
54
+ """
55
+ self .source_datapipe = source_datapipe
22
56
self .window = window
23
57
self .min_periods = min_periods
24
58
self .center = center
25
59
self .win_type = win_type
26
60
self .expect_dataset = expect_dataset
27
61
28
- def __iter__ (self ):
29
- for xr_data in self .source_dp :
62
+ def __iter__ (self ) -> Union [xr .DataArray , xr .Dataset ]:
63
+ """Compute rolling mean of PV power"""
64
+ for xr_data in self .source_datapipe :
30
65
if self .expect_dataset :
31
66
data_to_resample = xr_data ["power_w" ]
32
67
else :
@@ -47,22 +82,3 @@ def __iter__(self):
47
82
resampled .attrs [attr_name ] = xr_data .attrs [attr_name ]
48
83
49
84
yield resampled
50
-
51
-
52
- def set_new_sample_period_and_t0_idx_attrs (xr_data , new_sample_period ) -> xr .DataArray :
53
- orig_sample_period = xr_data .attrs ["sample_period_duration" ]
54
- orig_t0_idx = xr_data .attrs ["t0_idx" ]
55
- new_sample_period = pd .Timedelta (new_sample_period )
56
- assert new_sample_period >= orig_sample_period
57
- new_t0_idx = orig_t0_idx / (new_sample_period / orig_sample_period )
58
- np .testing .assert_almost_equal (
59
- int (new_t0_idx ),
60
- new_t0_idx ,
61
- err_msg = (
62
- "The original t0_idx must be exactly divisible by"
63
- " (new_sample_period / orig_sample_period)"
64
- ),
65
- )
66
- xr_data .attrs ["sample_period_duration" ] = new_sample_period
67
- xr_data .attrs ["t0_idx" ] = int (new_t0_idx )
68
- return xr_data
0 commit comments