Skip to content

Commit

Permalink
Merge pull request #164 from OpenSTEF/feature/weather-tahead
Browse files Browse the repository at this point in the history
New influxDB measurement to store weather forecasts with tAhead
  • Loading branch information
bartpleiter authored Jan 2, 2025
2 parents d172a2a + 34edacd commit ad0d0f7
Show file tree
Hide file tree
Showing 5 changed files with 310 additions and 57 deletions.
76 changes: 52 additions & 24 deletions openstef_dbc/services/weather.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,31 @@ def _get_coordinates_of_location(self, location_name: str) -> Tuple[float, float

return location

def _get_source_run(
self, forecast_datetime: pd.Series, tAhead: pd.Series
) -> pd.Series:
"""Compute the datetime when weather forecast was created
Args:
forecast_datetime (pd.Series[datetime]): forecasted datetime.
tAhead (pd.Series[(int, float)]: forecasting horizon in hours
Retuns :
pd.Series of new datetimes
"""

if not pd.api.types.is_datetime64_any_dtype(forecast_datetime):
raise ValueError("forecast_datetime must be a Series of datetime.")

if not pd.api.types.is_numeric_dtype(tAhead):
raise ValueError("tahead must be a Series of int or float.")

if len(forecast_datetime) != len(tAhead):
raise ValueError("forecast_datetime and tAhead must have the same length.")

# Compute new datetimes
return forecast_datetime - pd.to_timedelta(tAhead, unit="h")

def _combine_weather_sources(
self, result: pd.DataFrame, source_order: List = None
) -> pd.DataFrame:
Expand Down Expand Up @@ -203,12 +228,13 @@ def get_weather_data(
self,
location: Union[Tuple[float, float], str],
weatherparams: List[str],
datetime_start: datetime = None,
datetime_end: datetime = None,
datetime_start: datetime = datetime.utcnow() - timedelta(days=14),
datetime_end: datetime = datetime.utcnow() + timedelta(days=2),
source: Union[List[str], str] = "optimum",
resolution: str = "15min",
country: str = "NL",
number_locations: int = 1,
type: str = "smallest_tAhead",
) -> pd.DataFrame:
"""Get weather data from database.
Expand All @@ -230,6 +256,7 @@ def get_weather_data(
resolution (str): Time resolution of the returned data, default: "15min"
country (str): Country code (2-letter: ISO 3166-1). e.g. NL
number_locations (int): number of weather locations desired
type (str) : type of weather forecast (smallest_tAhead of multiple_tAheads)
Returns:
pd.DataFrame: The most recent weather prediction
Expand All @@ -240,16 +267,6 @@ def get_weather_data(
print(df.head())
"""

if datetime_start is None:
datetime_start = datetime.utcnow() - timedelta(days=14)

datetime_start = pd.to_datetime(datetime_start)

if datetime_end is None:
datetime_end = datetime.utcnow() + timedelta(days=2)

datetime_end = pd.to_datetime(datetime_end)

# Convert to UTC and remove UTC as note
if datetime_start.tz is not None:
datetime_start = datetime_start.tz_convert("UTC").tz_localize(None)
Expand Down Expand Up @@ -294,11 +311,20 @@ def get_weather_data(
location_name.to_list()
)

if type == "smallest_tAhead":
weather_measurement_str = "weather"
influx_indices = ["source", "input_city"]
grouping_indices = ["source", "input_city"]
elif type == "multiple_tAheads":
weather_measurement_str = "weather_tAhead"
influx_indices = ["source", "input_city", "tAhead"]
grouping_indices = ["source", "input_city", "created"]

# Create the query
query = f"""
from(bucket: "forecast_latest/autogen")
|> range(start: {bind_params["_start"].strftime('%Y-%m-%dT%H:%M:%SZ')}, stop: {bind_params["_stop"].strftime('%Y-%m-%dT%H:%M:%SZ')})
|> filter(fn: (r) => r._measurement == "weather" and (r._field == "{weather_params_str}") and (r.source == "{weather_models_str}") and (r.input_city == "{weather_location_name_str}"))
|> filter(fn: (r) => r._measurement == "{weather_measurement_str}" and (r._field == "{weather_params_str}") and (r.source == "{weather_models_str}") and (r.input_city == "{weather_location_name_str}"))
"""

# Execute Query
Expand All @@ -310,7 +336,7 @@ def get_weather_data(

# Check if response is empty
if not result.empty:
result = parse_influx_result(result, ["source", "input_city"])
result = parse_influx_result(result, influx_indices)
else:
self.logger.warning("No weatherdata found. Returning empty dataframe")
return pd.DataFrame(
Expand All @@ -325,26 +351,28 @@ def get_weather_data(
if combine_sources:
self.logger.info("Combining sources into single dataframe")
result = self._combine_weather_sources(result)
result["source"] = "optimum"

# Interpolate if necesarry by input_city and source
# Compute source_run
if type == "multiple_tAheads":
result["created"] = self._get_source_run(result.index, result.tAhead)

# Interpolate if nescesarry by input_city, source (and tAhead)
with pd.option_context("future.no_silent_downcasting", True):
result = (
result.groupby(["input_city"])
result.groupby(grouping_indices)
.resample(resolution, include_groups=False)
.asfreq()
.interpolate(limit=11)
.reset_index(grouping_indices)
)
result.loc[:, result.columns != "source"] = result.loc[
:, result.columns != "source"
].interpolate(limit=11)
result = result.reset_index("input_city")

# Shift radiation by 30 minutes if resolution allows it
if "radiation" in result.columns:
shift_delta = -timedelta(minutes=30)
if shift_delta % pd.Timedelta(resolution) == timedelta(0):
result["radiation"] = result.groupby(["input_city"])["radiation"].shift(
1, shift_delta
)
result["radiation"] = result.groupby(grouping_indices)[
"radiation"
].shift(1, shift_delta)

# Drop extra rows not neccesary
result = result[result.index >= datetime_start_original]
Expand Down
53 changes: 31 additions & 22 deletions openstef_dbc/services/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,9 @@ def _write_weather_forecast_data_latest(
self,
data: pd.DataFrame,
source: str,
table: str = "weather",
dbname: str = "forecast_latest",
forecast_created_time: datetime,
table: str,
dbname: str,
tag_columns: List[str] = None,
casting_dict: Dict[str, type] = {},
):
Expand All @@ -301,6 +302,7 @@ def _write_weather_forecast_data_latest(
Args:
data: pd.DataFrame(index = "datetimeFC", columns = ['input_city','temp','windspeed'])
source: (str) source of the weatherdata
forecast_created_time: (datetime) the time at which the forecast was created
table: (str) table name
dbname: (str) database name
tag_columns: (list) the column names used as tags in influx
Expand All @@ -313,7 +315,7 @@ def _write_weather_forecast_data_latest(
influx_df = data.copy()
influx_df["source"] = source
# Add created to data
influx_df["created"] = int(datetime.utcnow().timestamp())
influx_df["created"] = int(forecast_created_time.timestamp())

if tag_columns is None:
tag_columns = ["input_city", "source"]
Expand All @@ -333,32 +335,24 @@ def _write_weather_forecast_data_t_ahead(
self,
data: pd.DataFrame,
source: str,
table: str = "weather",
dbname: str = "forecast_latest",
forecast_created_time: datetime,
table: str,
dbname: str,
desired_t_aheads: List[float],
tag_columns: List[str] = None,
casting_dict: Dict[str, type] = {},
desired_t_aheads: List[float] = [
1.0,
12.0,
15.0,
24.0,
36.0,
39.0,
48.0,
4 * 24.0,
6 * 24.0,
],
):
"""Write weather data to the database. This function writes the data to a table containing with the forecasts for different t_ahead values.
Args:
data: pd.DataFrame(index = "datetimeFC", columns = ['input_city','temp','windspeed'])
source: (str) source of the weatherdata
forecast_created_time: (datetime) the time at which the forecast was created
table: (str) table name
dbname: (str) database name
desired_t_aheads: (list) the t_ahead values for which the data should be written
tag_columns: (list) the column names used as tags in influx
casting_dict: (dict) dictionary with column names as keys and the desired datatype as values
desired_t_aheads: (list) the t_ahead values for which the data should be written
Returns:
None
Expand All @@ -367,16 +361,14 @@ def _write_weather_forecast_data_t_ahead(
influx_df = data.copy()
influx_df["source"] = source
# Add created to data
influx_df["created"] = int(datetime.utcnow().timestamp())
influx_df["created"] = int(forecast_created_time.timestamp())

if tag_columns is None:
tag_columns = ["input_city", "source"]
tag_columns.append("tAhead")

# Calculate tAheads
timediffs = (
influx_df.index.tz_localize(None) - datetime.utcnow()
).total_seconds() / 3600
timediffs = (influx_df.index - forecast_created_time).total_seconds() / 3600
# Round it to the first bigger desired_t_ahead
influx_df["tAhead"] = round_down_time_differences(timediffs, desired_t_aheads)

Expand Down Expand Up @@ -407,6 +399,18 @@ def write_weather_forecast_data(
table: str = "weather",
dbname: str = "forecast_latest",
tag_columns: List[str] = None,
forecast_created_time: datetime = datetime.utcnow(),
desired_t_aheads: List[float] = [
1.0,
12.0,
15.0,
24.0,
36.0,
39.0,
48.0,
4 * 24.0,
6 * 24.0,
],
):
"""Write weather forecast data to the database.
This function writes the data both to a table containing the latest forecasts,
Expand All @@ -418,6 +422,8 @@ def write_weather_forecast_data(
table: (str) table name
dbname: (str) database name
tag_columns: (list) the column names used as tags in influx
forecast_created_time: (datetime) the time at which the forecast was created
desired_t_aheads: (list) the t_ahead values for which the data should be written
Returns:
None
Expand Down Expand Up @@ -455,6 +461,7 @@ def write_weather_forecast_data(
self._write_weather_forecast_data_latest(
data=data,
source=source,
forecast_created_time=forecast_created_time,
table=table,
dbname=dbname,
tag_columns=tag_columns,
Expand All @@ -463,8 +470,10 @@ def write_weather_forecast_data(
self._write_weather_forecast_data_t_ahead(
data=data,
source=source,
table=table + "_tAheads",
forecast_created_time=forecast_created_time,
table=table + "_tAhead",
dbname=dbname,
desired_t_aheads=desired_t_aheads,
casting_dict=casting_dict,
tag_columns=tag_columns,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
;_time;_value;_field;_measurement;input_city;source;tAhead
0;2022-01-01 00:00:00+00:00;3.6144495010375977;windspeed;weather;Rotterdam;harm_arome;0
1;2022-01-01 01:00:00+00:00;3.5552268028259277;windspeed;weather;Rotterdam;harm_arome;1
7;2022-01-01 02:00:00+00:00;1.4958148002624512;windspeed;weather;Rotterdam;harm_arome;2
8;2022-01-01 01:00:00+00:00;0.625495970249176;windspeed;weather;Rotterdam;harm_arome;0
14;2022-01-01 02:00:00+00:00;2.078191041946411;windspeed;weather;Rotterdam;harm_arome;1
Loading

0 comments on commit ad0d0f7

Please sign in to comment.