Skip to content
This repository was archived by the owner on Jun 2, 2025. It is now read-only.

Commit a63f097

Browse files
authored
Add Power Perceiver Production DataPipes
#minor
2 parents 512fde1 + 0e7e9d0 commit a63f097

File tree

496 files changed

+5483
-103
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

496 files changed

+5483
-103
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ __pycache__/
66
# C extensions
77
*.so
88

9+
.idea/*
10+
.idea
11+
12+
913
# Distribution / packaging
1014
.Python
1115
build/

README.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,27 @@
11
# ocf_datapipes
22
OCF's DataPipe based dataloader for training and inference
3+
4+
5+
## Adding a new DataPipe
6+
A general outline for a new DataPipe should go something
7+
like this:
8+
9+
```python
10+
from torchdata.datapipes.iter import IterDataPipe
11+
from torchdata.datapipes import functional_datapipe
12+
13+
@functional_datapipe("<pipelet_name>")
14+
class <PipeletName>IterDataPipe(IterDataPipe):
15+
def __init__(self):
16+
pass
17+
18+
def __iter__(self):
19+
pass
20+
```
21+
22+
### Experimental DataPipes
23+
24+
For new datapipes being developed for new models or input modalities, to somewhat separate the more experimental and in
25+
development datapipes from the ones better tested for production purposes, there is an `ocf_datapipes.experimental` namespace for
26+
developing these more research-y datapipes. These datapipes might not, and probably are not, tested.
27+
Once the model(s) using them are in production, they should be upgraded to one of the other namespaces and have tests added.

environment.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,6 @@ dependencies:
1515
pip:
1616
- einops
1717
- pathy
18+
- git+https://github.com/SheffieldSolar/PV_Live-API
19+
- pyaml_env
20+
- nowcasting_datamodel

ocf_datapipes/batch/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from .merge_numpy_examples_to_batch import (
2+
MergeNumpyExamplesToBatchIterDataPipe as MergeNumpyExamplesToBatch,
3+
)
4+
from .merge_numpy_modalities import MergeNumpyModalitiesIterDataPipe as MergeNumpyModalities
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from torchdata.datapipes import functional_datapipe
2+
from torchdata.datapipes.iter import IterDataPipe
3+
4+
from ocf_datapipes.utils.consts import NumpyBatch
5+
from ocf_datapipes.utils.utils import stack_np_examples_into_batch
6+
7+
8+
@functional_datapipe("merge_numpy_examples_to_batch")
9+
class MergeNumpyExamplesToBatchIterDataPipe(IterDataPipe):
10+
def __init__(self, source_datapipe: IterDataPipe, n_examples_per_batch: int):
11+
self.source_datapipe = source_datapipe
12+
self.n_examples_per_batch = n_examples_per_batch
13+
14+
def __iter__(self) -> NumpyBatch:
15+
np_examples = []
16+
for np_batch in self.source_datapipe:
17+
np_examples.append(np_batch)
18+
if len(np_examples) == self.n_examples_per_batch:
19+
yield stack_np_examples_into_batch(np_examples)
20+
np_examples = []
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from torchdata.datapipes import functional_datapipe
2+
from torchdata.datapipes.iter import IterDataPipe, Zipper
3+
4+
from ocf_datapipes.utils.consts import BatchKey, NumpyBatch
5+
6+
7+
@functional_datapipe("merge_numpy_modalities")
8+
class MergeNumpyModalitiesIterDataPipe(IterDataPipe):
9+
def __init__(self, source_datapipes: [IterDataPipe]):
10+
self.source_datapipes = source_datapipes
11+
12+
def __iter__(self) -> NumpyBatch:
13+
for np_batches in Zipper(*self.source_datapipes):
14+
example: NumpyBatch = {}
15+
for np_batch in np_batches:
16+
example.update(np_batch)
17+
yield example

ocf_datapipes/config/model.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,12 @@
2222
from nowcasting_datamodel.models.pv import providers, pv_output, solar_sheffield_passiv
2323

2424
# nowcasting_dataset imports
25-
from nowcasting_dataset.consts import (
25+
from ocf_datapipes.utils.consts import (
2626
DEFAULT_N_GSP_PER_EXAMPLE,
2727
DEFAULT_N_PV_SYSTEMS_PER_EXAMPLE,
2828
NWP_VARIABLE_NAMES,
2929
SAT_VARIABLE_NAMES,
3030
)
31-
from nowcasting_dataset.dataset.split import split
32-
from pathy import Pathy
33-
from pydantic import BaseModel, Field, root_validator, validator
3431

3532
IMAGE_SIZE_PIXELS = 64
3633
IMAGE_SIZE_PIXELS_FIELD = Field(
@@ -622,14 +619,6 @@ class Process(Base):
622619
" return data at 11:30, 12:00, 12:30, and 13:00."
623620
),
624621
)
625-
split_method: split.SplitMethod = Field(
626-
split.SplitMethod.DAY_RANDOM_TEST_DATE,
627-
description=(
628-
"The method used to split the t0 datetimes into train, validation and test sets."
629-
" If the split method produces no t0 datetimes for any split_name, then"
630-
" n_<split_name>_batches must also be set to 0."
631-
),
632-
)
633622

634623
train_test_validation_split: List[int] = Field(
635624
[10, 1, 1],

ocf_datapipes/convert/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from .gsp import ConvertGSPToNumpyBatchIterDataPipe as ConvertGSPToNumpyBatch
2+
from .nwp import ConvertNWPToNumpyBatchIterDataPipe as ConvertNWPToNumpyBatch
3+
from .pv import ConvertPVToNumpyBatchIterDataPipe as ConvertPVToNumpyBatch
4+
from .satellite import ConvertSatelliteToNumpyBatchIterDataPipe as ConvertSatelliteToNumpyBatch

ocf_datapipes/convert/gsp.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from torchdata.datapipes import functional_datapipe
2+
from torchdata.datapipes.iter import IterDataPipe
3+
4+
from ocf_datapipes.utils.consts import BatchKey, NumpyBatch
5+
from ocf_datapipes.utils.utils import datetime64_to_float
6+
7+
8+
@functional_datapipe("convert_gsp_to_numpy_batch")
9+
class ConvertGSPToNumpyBatchIterDataPipe(IterDataPipe):
10+
def __init__(self, source_dp: IterDataPipe):
11+
super().__init__()
12+
self.source_dp = source_dp
13+
14+
def __iter__(self) -> NumpyBatch:
15+
for xr_data in self.source_dp:
16+
example: NumpyBatch = {
17+
BatchKey.gsp: xr_data.values,
18+
BatchKey.gsp_t0_idx: xr_data.attrs["t0_idx"],
19+
BatchKey.gsp_id: xr_data.gsp_id.values,
20+
BatchKey.gsp_capacity_mwp: xr_data.isel(time_utc=0)["capacity_mwp"].values,
21+
BatchKey.gsp_time_utc: datetime64_to_float(xr_data["time_utc"].values),
22+
}
23+
24+
# Coordinates
25+
for batch_key, dataset_key in (
26+
(BatchKey.gsp_y_osgb, "y_osgb"),
27+
(BatchKey.gsp_x_osgb, "x_osgb"),
28+
):
29+
values = xr_data[dataset_key].values
30+
# Expand dims so EncodeSpaceTime works!
31+
example[batch_key] = values # np.expand_dims(values, axis=1)
32+
33+
yield example

ocf_datapipes/convert/nwp.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import numpy as np
2+
from torchdata.datapipes import functional_datapipe
3+
from torchdata.datapipes.iter import IterDataPipe
4+
5+
from ocf_datapipes.utils.consts import BatchKey, NumpyBatch
6+
from ocf_datapipes.utils.utils import datetime64_to_float
7+
8+
9+
@functional_datapipe("convert_nwp_to_numpy_batch")
10+
class ConvertNWPToNumpyBatchIterDataPipe(IterDataPipe):
11+
def __init__(self, source_dp: IterDataPipe):
12+
super().__init__()
13+
self.source_dp = source_dp
14+
15+
def __iter__(self) -> NumpyBatch:
16+
for xr_data in self.source_dp:
17+
example: NumpyBatch = {
18+
BatchKey.nwp: xr_data.values,
19+
BatchKey.nwp_t0_idx: xr_data.attrs["t0_idx"],
20+
}
21+
22+
target_time = xr_data.target_time_utc.values
23+
example[BatchKey.nwp_target_time_utc] = datetime64_to_float(target_time)
24+
example[BatchKey.nwp_channel_names] = xr_data.channel.values
25+
example[BatchKey.nwp_step] = (xr_data.step.values / np.timedelta64(1, "h")).astype(
26+
np.int64
27+
)
28+
example[BatchKey.nwp_init_time_utc] = datetime64_to_float(xr_data.init_time_utc.values)
29+
30+
for batch_key, dataset_key in (
31+
(BatchKey.nwp_y_osgb, "y_osgb"),
32+
(BatchKey.nwp_x_osgb, "x_osgb"),
33+
):
34+
example[batch_key] = xr_data[dataset_key].values
35+
36+
yield example

ocf_datapipes/convert/pv.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import numpy as np
2+
from torchdata.datapipes import functional_datapipe
3+
from torchdata.datapipes.iter import IterDataPipe
4+
5+
from ocf_datapipes.utils.consts import BatchKey, NumpyBatch
6+
from ocf_datapipes.utils.utils import datetime64_to_float
7+
8+
9+
@functional_datapipe("convert_pv_to_numpy_batch")
10+
class ConvertPVToNumpyBatchIterDataPipe(IterDataPipe):
11+
def __init__(self, source_dp: IterDataPipe):
12+
super().__init__()
13+
self.source_dp = source_dp
14+
15+
def __iter__(self) -> NumpyBatch:
16+
for xr_data in self.source_dp:
17+
example: NumpyBatch = {
18+
BatchKey.pv: xr_data.values,
19+
BatchKey.pv_t0_idx: xr_data.attrs["t0_idx"],
20+
BatchKey.pv_system_row_number: xr_data["pv_system_row_number"].values,
21+
BatchKey.pv_id: xr_data["pv_system_id"].values.astype(np.float32),
22+
BatchKey.pv_capacity_wp: xr_data["capacity_wp"].values,
23+
BatchKey.pv_time_utc: datetime64_to_float(xr_data["time_utc"].values),
24+
BatchKey.pv_x_osgb: xr_data["x_osgb"].values,
25+
BatchKey.pv_y_osgb: xr_data["y_osgb"].values,
26+
}
27+
28+
yield example

ocf_datapipes/convert/satellite.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
from torchdata.datapipes import functional_datapipe
2+
from torchdata.datapipes.iter import IterDataPipe
3+
4+
from ocf_datapipes.utils.consts import BatchKey, NumpyBatch
5+
from ocf_datapipes.utils.utils import datetime64_to_float
6+
7+
8+
@functional_datapipe("convert_satellite_to_numpy_batch")
9+
class ConvertSatelliteToNumpyBatchIterDataPipe(IterDataPipe):
10+
def __init__(self, source_dp: IterDataPipe, is_hrv: bool = False):
11+
super().__init__()
12+
self.source_dp = source_dp
13+
self.is_hrv = is_hrv
14+
15+
def __iter__(self) -> NumpyBatch:
16+
for xr_data in self.source_dp:
17+
if self.is_hrv:
18+
example: NumpyBatch = {
19+
BatchKey.hrvsatellite_actual: xr_data.values,
20+
BatchKey.hrvsatellite_t0_idx: xr_data.attrs["t0_idx"],
21+
BatchKey.hrvsatellite_time_utc: datetime64_to_float(xr_data["time_utc"].values),
22+
}
23+
24+
for batch_key, dataset_key in (
25+
(BatchKey.hrvsatellite_y_osgb, "y_osgb"),
26+
(BatchKey.hrvsatellite_x_osgb, "x_osgb"),
27+
(BatchKey.hrvsatellite_y_geostationary, "y_geostationary"),
28+
(BatchKey.hrvsatellite_x_geostationary, "x_geostationary"),
29+
):
30+
# HRVSatellite coords are already float32.
31+
example[batch_key] = xr_data[dataset_key].values
32+
else:
33+
example: NumpyBatch = {
34+
BatchKey.satellite_actual: xr_data.values,
35+
BatchKey.satellite_t0_idx: xr_data.attrs["t0_idx"],
36+
BatchKey.satellite_time_utc: datetime64_to_float(xr_data["time_utc"].values),
37+
}
38+
39+
for batch_key, dataset_key in (
40+
(BatchKey.satellite_y_osgb, "y_osgb"),
41+
(BatchKey.satellite_x_osgb, "x_osgb"),
42+
(BatchKey.satellite_y_geostationary, "y_geostationary"),
43+
(BatchKey.satellite_x_geostationary, "x_geostationary"),
44+
):
45+
# HRVSatellite coords are already float32.
46+
example[batch_key] = xr_data[dataset_key].values
47+
48+
yield example
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
"""Collection of experimental datapipes, explicitly not ready for production,
2+
but can be upgraded later, these might not be tested, and are to make quick experiments easier"""

ocf_datapipes/load/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1+
# from .configuration import OpenConfigurationIterDataPipe as OpenConfiguration
12
from .gsp import OpenGSPIterDataPipe as OpenGSP
23
from .nwp import OpenNWPIterDataPipe as OpenNWP
34
from .pv import OpenPVFromDBIterDataPipe as OpenPVFromDB
45
from .pv import OpenPVFromNetCDFIterDataPipe as OpenPVFromNetCDF
5-
from .satellite import OpenSatelliteDataPipe as OpenSatellite
6+
from .satellite import OpenSatelliteIterDataPipe as OpenSatellite
67

78
try:
89
import rioxarray # Rioxarray is sometimes a pain to install, so only load this if its installed

ocf_datapipes/load/configuration.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from torchdata.datapipes import functional_datapipe
55
from torchdata.datapipes.iter import IterDataPipe
66

7+
from ocf_datapipes.config.model import Configuration
8+
79

810
@functional_datapipe("open_config")
911
class OpenConfigurationIterDataPipe(IterDataPipe):
@@ -14,7 +16,7 @@ def __iter__(self):
1416
with fsspec.open(self.configuration_filename, mode="r") as stream:
1517
configuration = parse_config(data=stream)
1618

17-
# TODO Load into Pydantic Configuration class
19+
configuration = Configuration(**configuration)
1820

1921
while True:
2022
yield configuration

ocf_datapipes/load/gsp.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import datetime
2+
from pathlib import Path
3+
from typing import Optional, Union
24

35
import geopandas as gpd
46
import numpy as np
@@ -7,20 +9,38 @@
79
from torchdata.datapipes import functional_datapipe
810
from torchdata.datapipes.iter import IterDataPipe
911

12+
from ocf_datapipes.utils.eso import get_gsp_metadata_from_eso, get_gsp_shape_from_eso
13+
14+
try:
15+
from ocf_datapipes.utils.eso import get_gsp_metadata_from_eso, get_gsp_shape_from_eso
16+
17+
_has_pvlive = True
18+
except ImportError:
19+
print("Unable to import PVLive utils, please provide filenames with OpenGSP")
20+
_has_pvlive = False
21+
1022

1123
@functional_datapipe("open_gsp")
1224
class OpenGSPIterDataPipe(IterDataPipe):
1325
def __init__(
1426
self,
15-
gsp_pv_power_zarr_path: str,
16-
gsp_id_to_region_id_filename: str,
17-
sheffield_solar_region_path: str,
27+
gsp_pv_power_zarr_path: Union[str, Path],
28+
gsp_id_to_region_id_filename: Optional[str] = None,
29+
sheffield_solar_region_path: Optional[str] = None,
1830
threshold_mw: int = 0,
1931
sample_period_duration: datetime.timedelta = datetime.timedelta(minutes=30),
2032
):
2133
self.gsp_pv_power_zarr_path = gsp_pv_power_zarr_path
22-
self.gsp_id_to_region_id_filename = gsp_id_to_region_id_filename
23-
self.sheffield_solar_region_path = sheffield_solar_region_path
34+
if (
35+
gsp_id_to_region_id_filename is None
36+
or sheffield_solar_region_path is None
37+
and _has_pvlive
38+
):
39+
self.gsp_id_to_region_id_filename = get_gsp_metadata_from_eso()
40+
self.sheffield_solar_region_path = get_gsp_shape_from_eso()
41+
else:
42+
self.gsp_id_to_region_id_filename = gsp_id_to_region_id_filename
43+
self.sheffield_solar_region_path = sheffield_solar_region_path
2444
self.threshold_mw = threshold_mw
2545
self.sample_period_duration = sample_period_duration
2646

@@ -33,6 +53,12 @@ def __iter__(self):
3353
# Load GSP generation xr.Dataset:
3454
gsp_pv_power_mw_ds = xr.open_dataset(self.gsp_pv_power_zarr_path, engine="zarr")
3555

56+
# Have to remove ID 0 (National one) for rest to work
57+
# TODO Do filtering later, deal with national here for now
58+
gsp_pv_power_mw_ds = gsp_pv_power_mw_ds.isel(
59+
gsp_id=slice(1, len(gsp_pv_power_mw_ds.gsp_id))
60+
)
61+
3662
# Ensure the centroids have the same GSP ID index as the GSP PV power:
3763
gsp_id_to_shape = gsp_id_to_shape.loc[gsp_pv_power_mw_ds.gsp_id]
3864

@@ -44,7 +70,6 @@ def __iter__(self):
4470
x_osgb=gsp_id_to_shape.geometry.centroid.x.astype(np.float32),
4571
y_osgb=gsp_id_to_shape.geometry.centroid.y.astype(np.float32),
4672
capacity_mwp=gsp_pv_power_mw_ds.installedcapacity_mwp.data.astype(np.float32),
47-
t0_idx=self.t0_idx,
4873
)
4974

5075
del gsp_id_to_shape, gsp_pv_power_mw_ds
@@ -86,7 +111,6 @@ def _put_gsp_data_into_an_xr_dataarray(
86111
x_osgb: np.ndarray,
87112
y_osgb: np.ndarray,
88113
capacity_mwp: np.ndarray,
89-
t0_idx: int,
90114
) -> xr.DataArray:
91115
# Convert to xr.DataArray:
92116
data_array = xr.DataArray(
@@ -99,5 +123,4 @@ def _put_gsp_data_into_an_xr_dataarray(
99123
y_osgb=("gsp_id", y_osgb),
100124
capacity_mwp=(("time_utc", "gsp_id"), capacity_mwp),
101125
)
102-
data_array.attrs["t0_idx"] = t0_idx
103126
return data_array

0 commit comments

Comments
 (0)