Skip to content
This repository was archived by the owner on Jun 2, 2025. It is now read-only.

Add more tests/linting updates #40

Merged
merged 21 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
7dc61ee
Add normalization tests
jacobbieker Sep 1, 2022
a1118b1
Add docstrings and NWP target time test
jacobbieker Sep 1, 2022
c78e753
Rename
jacobbieker Sep 1, 2022
dcbe63c
Update Downsample to handle other dim names
jacobbieker Sep 1, 2022
7d36094
Update PVRollingPowerWindow and docstrings
jacobbieker Sep 1, 2022
e227a41
Update SelectLiveT0Time docstring and typing and test
jacobbieker Sep 1, 2022
2f339e7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 1, 2022
ae7705e
Update docstring LiveTimeSlice
jacobbieker Sep 1, 2022
ed22fa2
Update LocationPicker docstring
jacobbieker Sep 1, 2022
78eb766
Add docstrings for converters and better tests
jacobbieker Sep 1, 2022
bdbfd30
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 1, 2022
ca22a68
Add tests for NaNs
jacobbieker Sep 1, 2022
161526b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 1, 2022
8e680fc
Docstring equalities
jacobbieker Sep 1, 2022
0016bd5
Docstring vars and dims
jacobbieker Sep 1, 2022
85f6876
Add todos and change normalizaton tests
jacobbieker Sep 1, 2022
d8b45b9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 1, 2022
50da11a
More docstring work
jacobbieker Sep 1, 2022
09f1b3d
Merge remote-tracking branch 'origin/jacob/more-tests' into jacob/mor…
jacobbieker Sep 1, 2022
05aab77
Add vars and dims tests
jacobbieker Sep 1, 2022
d9a6295
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 1, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/workflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ jobs:
pytest_cov_dir: "ocf_datapipes"
# extra things to install
sudo_apt_install: "libgeos++-dev libproj-dev proj-data proj-bin"
# brew_install: "proj geos librttopo"
os_list: '["ubuntu-latest"]'
# brew_install: "proj geos librttopo"
os_list: '["ubuntu-latest"]'
1 change: 1 addition & 0 deletions ocf_datapipes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Datapipes"""
1 change: 1 addition & 0 deletions ocf_datapipes/batch/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Datapipes for batching together data"""
from .merge_numpy_examples_to_batch import (
MergeNumpyExamplesToBatchIterDataPipe as MergeNumpyExamplesToBatch,
)
Expand Down
1 change: 1 addition & 0 deletions ocf_datapipes/config/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Configuration model"""
1 change: 1 addition & 0 deletions ocf_datapipes/convert/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Various conversion datapipes"""
from .coordinates import ConvertGeostationaryToLatLonIterDataPipe as ConvertGeostationaryToLatLon
from .coordinates import ConvertLatLonToOSGBIterDataPipe as ConvertLatLonToOSGB
from .coordinates import ConvertOSGBToLatLonIterDataPipe as ConvertOSGBToLatLon
Expand Down
36 changes: 33 additions & 3 deletions ocf_datapipes/convert/coordinates.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from typing import Union

import xarray as xr
from torchdata.datapipes import functional_datapipe
from torchdata.datapipes.iter import IterDataPipe

Expand All @@ -10,10 +13,19 @@

@functional_datapipe("convert_latlon_to_osgb")
class ConvertLatLonToOSGBIterDataPipe(IterDataPipe):
"""Convert from Lat/Lon object to OSGB"""

def __init__(self, source_datapipe: IterDataPipe):
"""
Convert from Lat/Lon to OSGB

Args:
source_datapipe: Datapipe emitting Xarray objects with latitude and longitude data
"""
self.source_datapipe = source_datapipe

def __iter__(self):
def __iter__(self) -> Union[xr.DataArray, xr.Dataset]:
"""Convert from Lat/Lon to OSGB"""
for xr_data in self.source_datapipe:
xr_data["x_osgb"], xr_data["y_osgb"] = lat_lon_to_osgb(
latitude=xr_data["latitude"], longitude=xr_data["longitude"]
Expand All @@ -23,10 +35,19 @@ def __iter__(self):

@functional_datapipe("convert_osgb_to_latlon")
class ConvertOSGBToLatLonIterDataPipe(IterDataPipe):
"""Convert from OSGB to Lat/Lon"""

def __init__(self, source_datapipe: IterDataPipe):
"""
Convert from OSGB to Lat/Lon

Args:
source_datapipe: Datapipe emitting Xarray objects with OSGB data
"""
self.source_datapipe = source_datapipe

def __iter__(self):
def __iter__(self) -> Union[xr.DataArray, xr.Dataset]:
"""Convert and add lat/lon to Xarray object"""
for xr_data in self.source_datapipe:
xr_data["latitude"], xr_data["longitude"] = osgb_to_lat_lon(
x=xr_data["x_osgb"], y=xr_data["y_osgb"]
Expand All @@ -36,10 +57,19 @@ def __iter__(self):

@functional_datapipe("convert_geostationary_to_latlon")
class ConvertGeostationaryToLatLonIterDataPipe(IterDataPipe):
"""Convert from geostationary to Lat/Lon points"""

def __init__(self, source_datapipe: IterDataPipe):
"""
Convert from Geostationary to Lat/Lon points and add to Xarray object

Args:
source_datapipe: Datapipe emitting Xarray object with geostationary points
"""
self.source_datapipe = source_datapipe

def __iter__(self):
def __iter__(self) -> Union[xr.DataArray, xr.Dataset]:
"""Convert from geostationary to Lat/Lon and yield the Xarray object"""
for xr_data in self.source_datapipe:
transform = load_geostationary_area_definition_and_transform_latlon(xr_data)
xr_data["latitude"], xr_data["longitude"] = transform(
Expand Down
15 changes: 12 additions & 3 deletions ocf_datapipes/convert/gsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,21 @@

@functional_datapipe("convert_gsp_to_numpy_batch")
class ConvertGSPToNumpyBatchIterDataPipe(IterDataPipe):
def __init__(self, source_dp: IterDataPipe):
"""Convert GSP Xarray to NumpyBatch"""

def __init__(self, source_datapipe: IterDataPipe):
"""
Convert GSP Xarray to NumpyBatch object

Args:
source_datapipe: Datapipe emitting GSP Xarray object
"""
super().__init__()
self.source_dp = source_dp
self.source_datapipe = source_datapipe

def __iter__(self) -> NumpyBatch:
for xr_data in self.source_dp:
"""Convert from Xarray to NumpyBatch"""
for xr_data in self.source_datapipe:
example: NumpyBatch = {
BatchKey.gsp: xr_data.values,
BatchKey.gsp_t0_idx: xr_data.attrs["t0_idx"],
Expand Down
15 changes: 12 additions & 3 deletions ocf_datapipes/convert/nwp.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,21 @@

@functional_datapipe("convert_nwp_to_numpy_batch")
class ConvertNWPToNumpyBatchIterDataPipe(IterDataPipe):
def __init__(self, source_dp: IterDataPipe):
"""Convert NWP Xarray objects to NumpyBatch ones"""

def __init__(self, source_datapipe: IterDataPipe):
"""
Convert NWP Xarray objecs to NumpyBatch ones

Args:
source_datapipe: Datapipe emitting NWP Xarray objects
"""
super().__init__()
self.source_dp = source_dp
self.source_datapipe = source_datapipe

def __iter__(self) -> NumpyBatch:
for xr_data in self.source_dp:
"""Convert from Xarray to NumpyBatch"""
for xr_data in self.source_datapipe:
example: NumpyBatch = {
BatchKey.nwp: xr_data.values,
BatchKey.nwp_t0_idx: xr_data.attrs["t0_idx"],
Expand Down
15 changes: 12 additions & 3 deletions ocf_datapipes/convert/pv.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,21 @@

@functional_datapipe("convert_pv_to_numpy_batch")
class ConvertPVToNumpyBatchIterDataPipe(IterDataPipe):
def __init__(self, source_dp: IterDataPipe):
"""Convert PV Xarray to NumpyBatch"""

def __init__(self, source_datapipe: IterDataPipe):
"""
Convert PV Xarray objects to NumpyBatch objects

Args:
source_datapipe: Datapipe emitting PV Xarray objects
"""
super().__init__()
self.source_dp = source_dp
self.source_datapipe = source_datapipe

def __iter__(self) -> NumpyBatch:
for xr_data in self.source_dp:
"""Iterate and convert PV Xarray to NumpyBatch"""
for xr_data in self.source_datapipe:
example: NumpyBatch = {
BatchKey.pv: xr_data.values,
BatchKey.pv_t0_idx: xr_data.attrs["t0_idx"],
Expand Down
16 changes: 13 additions & 3 deletions ocf_datapipes/convert/satellite.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,23 @@

@functional_datapipe("convert_satellite_to_numpy_batch")
class ConvertSatelliteToNumpyBatchIterDataPipe(IterDataPipe):
def __init__(self, source_dp: IterDataPipe, is_hrv: bool = False):
"""Converts Xarray Satellite to NumpyBatch object"""

def __init__(self, source_datapipe: IterDataPipe, is_hrv: bool = False):
"""
Converts Xarray satellite object to NumpyBatch object

Args:
source_datapipe: Datapipe emitting Xarray satellite objects
is_hrv: Whether this is HRV satellite data or non-HRV data
"""
super().__init__()
self.source_dp = source_dp
self.source_datapipe = source_datapipe
self.is_hrv = is_hrv

def __iter__(self) -> NumpyBatch:
for xr_data in self.source_dp:
"""Convert each example to a NumpyBatch object"""
for xr_data in self.source_datapipe:
if self.is_hrv:
example: NumpyBatch = {
BatchKey.hrvsatellite_actual: xr_data.values,
Expand Down
1 change: 1 addition & 0 deletions ocf_datapipes/fake/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Fake data generators for testing"""
1 change: 1 addition & 0 deletions ocf_datapipes/load/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Loading datapipes from the raw data"""
from .configuration import OpenConfigurationIterDataPipe as OpenConfiguration
from .gsp import OpenGSPIterDataPipe as OpenGSP
from .nwp import OpenNWPIterDataPipe as OpenNWP
Expand Down
7 changes: 4 additions & 3 deletions ocf_datapipes/load/configuration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

import fsspec
from pathy import Pathy
import logging
from pyaml_env import parse_config
from torchdata.datapipes import functional_datapipe
from torchdata.datapipes.iter import IterDataPipe
Expand All @@ -16,11 +17,11 @@ def __init__(self, configuration_filename: str):
self.configuration_filename = configuration_filename

def __iter__(self):
logger.debug(f'Going to open {self.configuration_filename}')
logger.debug(f"Going to open {self.configuration_filename}")
with fsspec.open(self.configuration_filename, mode="r") as stream:
configuration = parse_config(data=stream)

logger.debug(f'Converting to Configuration ({configuration})')
logger.debug(f"Converting to Configuration ({configuration})")
configuration = Configuration(**configuration)

while True:
Expand Down
6 changes: 3 additions & 3 deletions ocf_datapipes/load/pv.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ def _load_pv_metadata(filename: str) -> pd.DataFrame:
latitude, longitude, system_id, x_osgb, y_osgb
"""
_log.info(f"Loading PV metadata from {filename}")
if 'passiv' in str(filename):
index_col = 'ss_id'
if "passiv" in str(filename):
index_col = "ss_id"
else:
index_col = 'system_id'
index_col = "system_id"
pv_metadata = pd.read_csv(filename, index_col=index_col)

if "Unnamed: 0" in pv_metadata.columns:
Expand Down
18 changes: 15 additions & 3 deletions ocf_datapipes/load/satellite.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,24 @@ def open_sat_data(
# Note that `rename` renames *both* the coordinates and dimensions, and keeps
# the connection between the dims and coordinates, so we don't have to manually
# use `data_array.set_index()`.
dataset = dataset.rename({"time": "time_utc",})
dataset = dataset.rename(
{
"time": "time_utc",
}
)
if "y" in dataset.coords.keys():
dataset = dataset.rename({"y": "y_geostationary",})
dataset = dataset.rename(
{
"y": "y_geostationary",
}
)

if "x" in dataset.coords.keys():
dataset = dataset.rename({"x": "x_geostationary",})
dataset = dataset.rename(
{
"x": "x_geostationary",
}
)

# Flip coordinates to top-left first
if dataset.y_geostationary[0] < dataset.y_geostationary[-1]:
Expand Down
1 change: 1 addition & 0 deletions ocf_datapipes/production/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Wrappers to make complete data pipelines for production systems"""
1 change: 1 addition & 0 deletions ocf_datapipes/select/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Selection datapipes"""
from .location_picker import LocationPickerIterDataPipe as LocationPicker
from .offset_t0 import OffsetT0IterDataPipe as OffsetT0
from .select_live_t0_time import SelectLiveT0TimeIterDataPipe as SelectLiveT0Time
Expand Down
16 changes: 13 additions & 3 deletions ocf_datapipes/select/location_picker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,23 @@

@functional_datapipe("location_picker")
class LocationPickerIterDataPipe(IterDataPipe):
def __init__(self, source_dp: IterDataPipe, return_all_locations: bool = False):
"""Picks locations from a dataset and returns them"""

def __init__(self, source_datapipe: IterDataPipe, return_all_locations: bool = False):
"""
Picks locations from a dataset and returns them

Args:
source_datapipe: Datapipe emitting Xarray Dataset
return_all_locations: Whether to return all locations, if True, also returns them in order
"""
super().__init__()
self.source_dp = source_dp
self.source_datapipe = source_datapipe
self.return_all_locations = return_all_locations

def __iter__(self) -> Location:
for xr_dataset in self.source_dp:
"""Returns locations from the inputs datapipe"""
for xr_dataset in self.source_datapipe:
if self.return_all_locations:
# Iterate through all locations in dataset
for location_idx in range(len(xr_dataset["x_osgb"])):
Expand Down
10 changes: 9 additions & 1 deletion ocf_datapipes/select/select_live_t0_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,18 @@ class SelectLiveT0TimeIterDataPipe(IterDataPipe):
"""Select the history for the live data"""

def __init__(self, source_datapipe: IterDataPipe, dim_name: str = "time_utc"):
"""
Select history for the Xarray object

Args:
source_datapipe: Datapipe emitting Xarray objects
dim_name: The time dimension name to use
"""
self.source_datapipe = source_datapipe
self.dim_name = dim_name

def __iter__(self):
def __iter__(self) -> pd.Timestamp:
"""Get the latest timestamp and return it"""
for xr_data in self.source_datapipe:
# Get most recent time in data
# Select the history that goes back that far
Expand Down
10 changes: 10 additions & 0 deletions ocf_datapipes/select/select_live_time_slice.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,22 @@ def __init__(
history_duration: timedelta,
dim_name: str = "time_utc",
):
"""
Select the history for the live time slice

Args:
source_datapipe: Datapipe emitting Xarray object
t0_datapipe: Datapipe emitting t0 timestamps
history_duration: Amount of time for the history
dim_name: Time dimension name
"""
self.source_datapipe = source_datapipe
self.t0_datapipe = t0_datapipe
self.history_duration = np.timedelta64(history_duration)
self.dim_name = dim_name

def __iter__(self):
"""Select the recent live data"""
for xr_data, t0 in Zipper(self.source_datapipe, self.t0_datapipe):
xr_data = xr_data.sel({self.dim_name: slice(t0 - self.history_duration, t0)})
yield xr_data
1 change: 1 addition & 0 deletions ocf_datapipes/transform/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Transforms for the data in both xarray and numpy formats"""
1 change: 1 addition & 0 deletions ocf_datapipes/transform/numpy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Numpy transforms"""
from .add_topographic_data import AddTopographicDataIterDataPipe as AddTopographicData
from .align_gsp_to_5_min import AlignGSPto5MinIterDataPipe as AlignGSPto5Min
from .encode_space_time import EncodeSpaceTimeIterDataPipe as EncodeSpaceTime
Expand Down
1 change: 1 addition & 0 deletions ocf_datapipes/transform/xarray/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Xarray transforms"""
from .add_t0idx_and_sample_period_duration import (
AddT0IdxAndSamplePeriodDurationIterDataPipe as AddT0IdxAndSamplePeriodDuration,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import timedelta
from typing import Union

import xarray as xr
Expand All @@ -7,13 +8,29 @@

@functional_datapipe("add_t0_idx_and_sample_period_duration")
class AddT0IdxAndSamplePeriodDurationIterDataPipe(IterDataPipe):
def __init__(self, source_datapipe: IterDataPipe, sample_period_duration, history_duration):
"""Add t0_idx and sample_period_duration attributes to datasets for downstream tasks"""

def __init__(
self,
source_datapipe: IterDataPipe,
sample_period_duration: timedelta,
history_duration: timedelta,
):
"""
Adds two attributes, t0_idx, and sample_period_duration for downstream datapipes to use

Args:
source_datapipe: Datapipe emitting a Xarray DataSet or DataArray
sample_period_duration: Time between samples
history_duration: Amount of history in each example
"""
self.source_datapipe = source_datapipe
self.sample_period_duration = sample_period_duration
self.history_duration = history_duration
self.t0_idx = int(self.history_duration / self.sample_period_duration)

def __iter__(self) -> Union[xr.DataArray, xr.Dataset]:
"""Adds the two attributes to the xarray objects and returns them"""
for xr_data in self.source_datapipe:
xr_data.attrs["t0_idx"] = self.t0_idx
xr_data.attrs["sample_period_duration"] = self.sample_period_duration
Expand Down
Loading