|
| 1 | +""" Function to get data from live database """ |
| 2 | +import logging |
| 3 | +import os |
| 4 | +from datetime import datetime, timedelta, timezone |
| 5 | +from typing import List |
| 6 | + |
| 7 | +import numpy as np |
| 8 | +import pandas as pd |
| 9 | +import xarray as xr |
| 10 | +from nowcasting_datamodel import N_GSP |
| 11 | +from nowcasting_datamodel.connection import DatabaseConnection |
| 12 | +from nowcasting_datamodel.models.base import Base_Forecast |
| 13 | +from nowcasting_datamodel.models.gsp import GSPYield, GSPYieldSQL, Location |
| 14 | +from nowcasting_datamodel.read.read_gsp import get_gsp_yield |
| 15 | +from torchdata.datapipes import functional_datapipe |
| 16 | +from torchdata.datapipes.iter import IterDataPipe |
| 17 | + |
| 18 | +from ocf_datapipes.load.gsp.utils import put_gsp_data_into_an_xr_dataarray |
| 19 | +from ocf_datapipes.utils.eso import get_gsp_shape_from_eso |
| 20 | + |
| 21 | +logger = logging.getLogger(__name__) |
| 22 | + |
| 23 | + |
| 24 | +@functional_datapipe("open_gsp_from_database") |
| 25 | +class OpenGSPFromDatabaseIterDataPipe(IterDataPipe): |
| 26 | + """Get and open the GSP data""" |
| 27 | + |
| 28 | + def __init__( |
| 29 | + self, |
| 30 | + history_minutes: int = 90, |
| 31 | + interpolate_minutes: int = 60, |
| 32 | + load_extra_minutes: int = 60, |
| 33 | + ): |
| 34 | + """ |
| 35 | + Get and open the GSP data |
| 36 | +
|
| 37 | + Args: |
| 38 | + history_minutes: How many history minutes to use |
| 39 | + interpolate_minutes: How many minutes to interpolate |
| 40 | + load_extra_minutes: How many extra minutes to load |
| 41 | + """ |
| 42 | + |
| 43 | + self.interpolate_minutes = interpolate_minutes |
| 44 | + self.load_extra_minutes = load_extra_minutes |
| 45 | + self.history_duration = timedelta(minutes=history_minutes) |
| 46 | + |
| 47 | + def __iter__(self) -> xr.DataArray: |
| 48 | + """Get and return GSP data""" |
| 49 | + |
| 50 | + logger.debug("Getting GSP data") |
| 51 | + |
| 52 | + gsp_pv_power_mw_df, gsp_capacity = get_gsp_power_from_database( |
| 53 | + history_duration=self.history_duration, |
| 54 | + interpolate_minutes=self.interpolate_minutes, |
| 55 | + load_extra_minutes=self.load_extra_minutes, |
| 56 | + ) |
| 57 | + |
| 58 | + # get shape file |
| 59 | + gsp_id_to_shape = get_gsp_shape_from_eso(return_filename=False) |
| 60 | + |
| 61 | + # Ensure the centroids have the same GSP ID index as the GSP PV power: |
| 62 | + gsp_id_to_shape = gsp_id_to_shape.loc[gsp_pv_power_mw_df.columns] |
| 63 | + |
| 64 | + data_array = put_gsp_data_into_an_xr_dataarray( |
| 65 | + gsp_pv_power_mw=gsp_pv_power_mw_df.astype(np.float32), |
| 66 | + time_utc=gsp_pv_power_mw_df.index.values, |
| 67 | + gsp_id=gsp_pv_power_mw_df.columns, |
| 68 | + # TODO: Try using `gsp_id_to_shape.geometry.envelope.centroid`. See issue #76. |
| 69 | + x_osgb=gsp_id_to_shape.geometry.centroid.x.astype(np.float32), |
| 70 | + y_osgb=gsp_id_to_shape.geometry.centroid.y.astype(np.float32), |
| 71 | + capacity_megawatt_power=gsp_capacity.astype(np.float32), |
| 72 | + ) |
| 73 | + |
| 74 | + del gsp_id_to_shape, gsp_pv_power_mw_df |
| 75 | + while True: |
| 76 | + yield data_array |
| 77 | + |
| 78 | + |
| 79 | +def get_gsp_power_from_database( |
| 80 | + history_duration: timedelta, interpolate_minutes: int, load_extra_minutes: int |
| 81 | +) -> (pd.DataFrame, pd.DataFrame): |
| 82 | + """ |
| 83 | + Get gsp power from database |
| 84 | +
|
| 85 | + Args: |
| 86 | + history_duration: a timedelta of how many minutes to load in the past |
| 87 | + interpolate_minutes: how many minutes we should interpolate the data froward for |
| 88 | + load_extra_minutes: the extra minutes we should load, in order to load more data. |
| 89 | + This is because some data from a site lags significantly behind 'now' |
| 90 | +
|
| 91 | + Returns:pandas data frame with the following columns pv systems indexes |
| 92 | + The index is the datetime |
| 93 | +
|
| 94 | + """ |
| 95 | + |
| 96 | + logger.info("Loading GSP data from database") |
| 97 | + logger.debug(f"{history_duration=}") |
| 98 | + logger.debug(f"{interpolate_minutes=}") |
| 99 | + logger.debug(f"{load_extra_minutes=}") |
| 100 | + |
| 101 | + extra_duration = timedelta(minutes=load_extra_minutes) |
| 102 | + now = pd.to_datetime(datetime.now(tz=timezone.utc)).floor("30T") |
| 103 | + start_utc = now - history_duration |
| 104 | + start_utc_extra = start_utc - extra_duration |
| 105 | + |
| 106 | + # create empty dataframe with 30 mins periods |
| 107 | + empty_df = pd.DataFrame( |
| 108 | + index=pd.date_range(start=start_utc_extra, end=now, freq="30T", tz=timezone.utc) |
| 109 | + ) |
| 110 | + |
| 111 | + # make database connection |
| 112 | + url = os.getenv("DB_URL") |
| 113 | + db_connection = DatabaseConnection(url=url, base=Base_Forecast) |
| 114 | + |
| 115 | + with db_connection.get_session() as session: |
| 116 | + # We minus 1 second just to make sure we don't that value |
| 117 | + gsp_yields: List[GSPYieldSQL] = get_gsp_yield( |
| 118 | + session=session, |
| 119 | + start_datetime_utc=start_utc_extra - timedelta(seconds=1), |
| 120 | + gsp_ids=list(range(1, N_GSP + 1)), |
| 121 | + filter_nans=False, |
| 122 | + ) |
| 123 | + |
| 124 | + logger.debug(f"Found {len(gsp_yields)} GSP yields from the database") |
| 125 | + |
| 126 | + gsp_yields_dict = [] |
| 127 | + for gsp_yield in gsp_yields: |
| 128 | + location = Location.from_orm(gsp_yield.location) |
| 129 | + gsp_yield = GSPYield.from_orm(gsp_yield) |
| 130 | + |
| 131 | + gsp_yield_dict = gsp_yield.__dict__ |
| 132 | + gsp_yield_dict["installed_capacity_mw"] = location.installed_capacity_mw |
| 133 | + gsp_yield_dict["solar_generation_mw"] = gsp_yield_dict["solar_generation_kw"] / 1000 |
| 134 | + gsp_yield_dict["gsp_id"] = location.gsp_id |
| 135 | + gsp_yields_dict.append(gsp_yield_dict) |
| 136 | + |
| 137 | + gsp_yields_df = pd.DataFrame(gsp_yields_dict) |
| 138 | + gsp_yields_df.fillna(0, inplace=True) |
| 139 | + |
| 140 | + logger.debug(gsp_yields_df.columns) |
| 141 | + |
| 142 | + if len(gsp_yields_df) == 0: |
| 143 | + logger.warning("Found no gsp yields, this might cause an error") |
| 144 | + else: |
| 145 | + logger.debug(f"Found {len(gsp_yields_df)} gsp yields") |
| 146 | + |
| 147 | + if len(gsp_yields_df) == 0: |
| 148 | + return pd.DataFrame(columns=["gsp_id"]), pd.DataFrame(columns=["gsp_id"]) |
| 149 | + |
| 150 | + # pivot on |
| 151 | + gsp_yields_df = gsp_yields_df[ |
| 152 | + ["datetime_utc", "gsp_id", "solar_generation_mw", "installed_capacity_mw"] |
| 153 | + ] |
| 154 | + logger.debug(gsp_yields_df.columns) |
| 155 | + logger.debug(gsp_yields_df.index) |
| 156 | + gsp_yields_df.drop_duplicates( |
| 157 | + ["datetime_utc", "gsp_id", "solar_generation_mw"], keep="last", inplace=True |
| 158 | + ) |
| 159 | + logger.debug(gsp_yields_df.columns) |
| 160 | + logger.debug(gsp_yields_df.index) |
| 161 | + gsp_power_df = gsp_yields_df.pivot( |
| 162 | + index="datetime_utc", columns="gsp_id", values="solar_generation_mw" |
| 163 | + ) |
| 164 | + |
| 165 | + gsp_capacity_df = gsp_yields_df.pivot( |
| 166 | + index="datetime_utc", columns="gsp_id", values="installed_capacity_mw" |
| 167 | + ) |
| 168 | + |
| 169 | + logger.debug(f"{empty_df=}") |
| 170 | + logger.debug(f"{gsp_power_df=}") |
| 171 | + gsp_power_df = empty_df.join(gsp_power_df) |
| 172 | + gsp_capacity_df = empty_df.join(gsp_capacity_df) |
| 173 | + |
| 174 | + # interpolate in between, maximum 'live_interpolate_minutes' mins |
| 175 | + # note data is in 30 minutes chunks |
| 176 | + limit = int(interpolate_minutes / 30) |
| 177 | + if limit > 0: |
| 178 | + gsp_power_df.interpolate( |
| 179 | + limit=limit, inplace=True, method="cubic", fill_value="extrapolate" |
| 180 | + ) |
| 181 | + gsp_capacity_df.interpolate( |
| 182 | + limit=limit, inplace=True, method="cubic", fill_value="extrapolate" |
| 183 | + ) |
| 184 | + |
| 185 | + # filter out the extra minutes loaded |
| 186 | + logger.debug(f"{len(gsp_power_df)} of datetimes before filter on {start_utc}") |
| 187 | + gsp_power_df = gsp_power_df[gsp_power_df.index >= start_utc] |
| 188 | + gsp_capacity_df = gsp_capacity_df[gsp_capacity_df.index >= start_utc] |
| 189 | + logger.debug(f"{len(gsp_power_df)} of datetimes after filter on {start_utc}") |
| 190 | + |
| 191 | + # clip values to 0, this just stops any interpolation going below zero |
| 192 | + gsp_power_df.clip(lower=0, inplace=True) |
| 193 | + |
| 194 | + return gsp_power_df, gsp_capacity_df |
0 commit comments