Source code for farmnet.data.wranglers

"""
Wranglers for transforming various data sources to the farmnet data format.

Configuration file
--------------------------
Configuration file to map data source files to farmnet data format.

"""

# Copyright (C) 2023 OST Ostschweizer Fachhochschule
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

# Author: Florian Hammer <florian.hammer@ost.ch>

from pathlib import Path
import pandas as pd
from farmnet.utils import read_config

from farmnet.utils import getenv
from pathlib import Path
import os 


rcConfig = dict()


def _test_global_cfg(func,*args,**kwargs):
    global rcConfig
    func(*args,**kwargs)
    return rcConfig

[docs] def set_default_cfg(config_path: str | Path): """ Setting default configuration file. :param config_path: Path to the configuration file. If None, the default configuration is used. :type config_path: str | Path | None **Examples:** If `config_path` is `None`, `get_dataset()` returns the configuration set via :func:`~.set_default_cfg()`: >>> import json >>> default_cfg_path = Path(getenv("CONFIG_PATH", "examples/kelmarsh.toml")) >>> rcConfig_test = _test_global_cfg(set_default_cfg,default_cfg_path) >>> print(json.dumps(rcConfig_test, indent=4, sort_keys=True, ensure_ascii=False)) { "columns": [ { "name": "wind_direction", "name-from-source": "Wind direction (°)" }, { "name": "nacelle_direction", "name-from-source": "Nacelle position (°)" }, { "name": "wind_speed", "name-from-source": "Wind speed (m/s)" }, { "name": "power", "name-from-source": "Power (kW)" }, { "name": "wt_id", "name-from-source": "Wind turbine ID" } ], "csv": { "encoding": "utf8", "header": 0, "sep": "," }, "dataset": { "data": "featured_windeurope_data.parquet", "root_dir": "kelmarsh_data_imputation", "static": "Kelmarsh_WT_static.csv" }, "index": { "name": "datetime", "name-from-source": "# Date and time", "time-zone": "UTC", "time-zone-from-source": "UTC", "unit": "ns" } } """ global rcConfig rcConfig = read_config(config_path)
def _get_cfg_field(field: str, config_path: str | Path | None = None) -> dict: if config_path is not None: cfg = read_config(config_path) cfg = cfg[field].copy() else: # read form default or cached config global rcConfig cfg = rcConfig[field].copy() return cfg
[docs] def get_dataset(config_path: str | Path | None = None) -> dict: """ Return source of a dataset. :param config_path: Path to the configuration file. If None, the default configuration is used. :type config_path: str | Path | None :return: A dictionary in the FarmNet format. :rtype: dict The returned dictionary contains: - **root_dir** (`str`): Root directory of the database. - **data** (`str`): Name of the data file. - **static** (`str`): Name of the static file. **Examples:** By specifying the path to a configuration file, the dataset directory, data file, and static file is returned: >>> import json >>> default_cfg_path = Path(getenv("CONFIG_PATH", "examples/kelmarsh.toml")) >>> ds = get_dataset(default_cfg_path) >>> print(json.dumps(ds, indent=4, sort_keys=True, ensure_ascii=False)) { "data": "featured_windeurope_data.parquet", "root_dir": "kelmarsh_data_imputation", "static": "Kelmarsh_WT_static.csv" } If `config_path` is `None`, `get_dataset()` returns the database path informations of the default database set with :func:`~.set_default_cfg()`: >>> import json >>> default_cfg_path = Path(getenv("CONFIG_PATH", "examples/kelmarsh.toml")) >>> set_default_cfg(default_cfg_path) >>> ds = get_dataset() >>> print(json.dumps(ds, indent=4, sort_keys=True, ensure_ascii=False)) { "data": "featured_windeurope_data.parquet", "root_dir": "kelmarsh_data_imputation", "static": "Kelmarsh_WT_static.csv" } """ return _get_cfg_field("dataset", config_path)
[docs] def get_column_mapping(config_path: str | Path | None = None) -> dict: """ Retrieve the column mapping configuration. This function reads the dataset's column mapping from the configuration file and returns a dictionary where keys are column names from the source dataset, and values are their corresponding standardized names. :param config_path: Path to the configuration file. If None, the default configuration is used. :type config_path: str | Path | None :return: A dictionary mapping source column names to standardized column names. :rtype: dict **Examples:** By specifying the path to a configuration file, the column mapping of file is returned: >>> import json >>> default_cfg_path = Path(getenv("CONFIG_PATH", "examples/kelmarsh.toml")) >>> ds = get_column_mapping(default_cfg_path) >>> print(json.dumps(ds, indent=4, sort_keys=True, ensure_ascii=False)) { "Nacelle position (°)": "nacelle_direction", "Power (kW)": "power", "Wind direction (°)": "wind_direction", "Wind speed (m/s)": "wind_speed", "Wind turbine ID": "wt_id" } If `config_path` is `None`, `get_column_mapping()` returns the csv column mapping of the default database set with :func:`~.set_default_cfg()`: >>> import json >>> default_cfg_path = Path(getenv("CONFIG_PATH", "examples/kelmarsh.toml")) >>> set_default_cfg(default_cfg_path) >>> ds = get_column_mapping() >>> print(json.dumps(ds, indent=4, sort_keys=True, ensure_ascii=False)) { "Nacelle position (°)": "nacelle_direction", "Power (kW)": "power", "Wind direction (°)": "wind_direction", "Wind speed (m/s)": "wind_speed", "Wind turbine ID": "wt_id" } """ columns = _get_cfg_field("columns", config_path) return {col["name-from-source"]: col["name"] for col in columns}
[docs] def get_csv_fmt(config_path: str | Path | None = None) -> dict: """ Get csv format configuration. :param config_path: Path to the configuration file. If `None`, the default configuration is used. :type config_path: str | Path | None :return: A dictionary containing csv formatting details. :rtype: dict The returned dictionary contains: - **encoding** (`str`): Encoding to use for UTF when reading (ex. 'utf-8') - **sep** (`str`): Character or regex pattern to treat as the delimiter. - **header** (`int`): Row number(s) containing column labels and marking the start of the data (zero-indexed). **Examples:** By specifying the path to a configuration file, the csv configuration of file is returned: >>> import json >>> default_cfg_path = Path(getenv("CONFIG_PATH", "examples/kelmarsh.toml")) >>> ds = get_csv_fmt(default_cfg_path) >>> print(json.dumps(ds, indent=4, sort_keys=True, ensure_ascii=False)) { "encoding": "utf8", "header": 0, "sep": "," } If `config_path` is `None`, `get_csv_fmt()` returns the csv configuration of the default database set with :func:`~.set_default_cfg()`: >>> import json >>> default_cfg_path = Path(getenv("CONFIG_PATH", "examples/kelmarsh.toml")) >>> set_default_cfg(default_cfg_path) >>> ds = get_csv_fmt() >>> print(json.dumps(ds, indent=4, sort_keys=True, ensure_ascii=False)) { "encoding": "utf8", "header": 0, "sep": "," } """ return _get_cfg_field("csv", config_path)
[docs] def get_index_fmt(config_path: str | Path | None = None) -> dict: """ Retrieve index format configuration. This function extracts index-related configuration details from a given configuration file. If no file is provided, it uses the default configuration. :param config_path: Path to the configuration file. If `None`, the default configuration is used. :type config_path: str | Path | None :return: A dictionary containing index formatting details. :rtype: dict The returned dictionary contains: - **name_mapping** (`tuple[str, str]`): Mapping of source index name to target index name. - **dt_format** (`str`): Date/time format unit. - **tz_mapping** (`tuple[str, str]`): Mapping of source time zone to target time zone. **Examples:** By specifying the path to a configuration file, the index-related configuration details of file is returned: >>> import json >>> default_cfg_path = Path(getenv("CONFIG_PATH", "examples/kelmarsh.toml")) >>> ds = get_index_fmt(default_cfg_path) >>> print(json.dumps(ds, indent=4, sort_keys=True, ensure_ascii=False)) { "dt_format": "ns", "name_mapping": [ "# Date and time", "datetime" ], "tz_mapping": [ "UTC", "UTC" ] } If `config_path` is `None`, `get_index_fmt()` returns index-related configuration details of the default database set with :func:`~.set_default_cfg()`: >>> import json >>> default_cfg_path = Path(getenv("CONFIG_PATH", "examples/kelmarsh.toml")) >>> set_default_cfg(default_cfg_path) >>> ds = get_index_fmt() >>> print(json.dumps(ds, indent=4, sort_keys=True, ensure_ascii=False)) { "dt_format": "ns", "name_mapping": [ "# Date and time", "datetime" ], "tz_mapping": [ "UTC", "UTC" ] } """ cfg = _get_cfg_field("index", config_path) return { "name_mapping": (cfg["name-from-source"], cfg["name"]), "dt_format": cfg["unit"], "tz_mapping": (cfg["time-zone-from-source"], cfg["time-zone"]), }
[docs] def read_raw( fpath: Path | str, *, csv_fmt: dict | None = None, index_fmt: dict | None = None, **kwargs, ) -> pd.DataFrame: """ Read a raw data file. :param fpath: Path to the data file. :type fpath: Path or str :param csv_fmt: Dictionary of CSV format options to be passed to :func:`pandas.read_csv`. If not provided, the output of :func:`~.get_csv_fmt` is used. :type csv_fmt: dict or None :param index_fmt: Information about the index of the returned data. If not provided, the output of :func:`~.get_index_fmt` is used. :type index_fmt: dict or None :param kwargs: Additional keyword arguments passed to :func:`pandas.read_csv`. :return: Formatted raw data. :rtype: :class:`pandas.DataFrame` **Examples:** >>> default_cfg_path = Path(getenv("CONFIG_PATH", "examples/kelmarsh.toml")) >>> set_default_cfg(default_cfg_path) >>> download_path = Path(getenv("DOWNLOAD_PATH", "./data")) >>> dataset = get_dataset() >>> data_path = download_path / dataset["data"] >>> raw_data_path = download_path / "kelmarsh_raw.csv" >>> df_raw = read_raw(raw_data_path) >>> print(df_raw.to_string(max_cols=5, max_rows=10)) Wind speed (m/s) Wind speed, Standard deviation (m/s) ... Tower Acceleration Y, StdDev (mm/ss) Wind turbine ID datetime ... 2022-01-01 00:00:00+00:00 6.781222 1.182439 ... 11.422541 228 2022-01-01 00:10:00+00:00 6.936052 1.287222 ... 16.457248 228 2022-01-01 00:20:00+00:00 7.294642 1.430000 ... 16.063823 228 2022-01-01 00:30:00+00:00 8.080467 1.023509 ... 18.288907 228 2022-01-01 00:40:00+00:00 7.021328 1.066915 ... 22.059917 228 ... ... ... ... ... ... 2022-12-31 23:10:00+00:00 8.712688 1.216442 ... 28.254154 233 2022-12-31 23:20:00+00:00 9.149686 1.182500 ... 15.370069 233 2022-12-31 23:30:00+00:00 9.571797 1.619526 ... 13.412479 233 2022-12-31 23:40:00+00:00 9.549912 1.504496 ... 18.748812 233 2022-12-31 23:50:00+00:00 9.215081 1.208763 ... 19.858008 233 >>> df_raw.index.name 'datetime' >>> len(df_raw) 315360 """ if csv_fmt is None: csv_fmt = get_csv_fmt() df = pd.read_csv(fpath, **csv_fmt, **kwargs) # Set index if index_fmt is None: index_fmt = get_index_fmt() df.rename(columns=dict([index_fmt["name_mapping"]]), inplace=True) df.set_index(index_fmt["name_mapping"][1], inplace=True) # parse dates df.index = pd.to_datetime(df.index, unit=index_fmt["dt_format"]) # change time zone to UTC from_, to = index_fmt["tz_mapping"] df.index = df.index.tz_localize(from_).tz_convert(to) return df
[docs] def to_farmnet(df: pd.DataFrame, *, column_mapping: dict) -> pd.DataFrame: """ Transform a dataframe containing raw data into a FarmNet dataframe. To be used with the FarmNet data pipeline. The FarmNet dataframe is defined in the FarmNet data manifest and is used as a data interface for the FarmNet data pipeline. :param df: DataFrame containing raw data. :type df: pd.DataFrame :param column_mapping: Dictionary mapping raw column names to FarmNet column names. :type column_mapping: dict :return: A transformed FarmNet-compatible DataFrame. :rtype: :class:`pandas.DataFrame` **Examples:** >>> default_cfg_path = Path(getenv("CONFIG_PATH", "examples/kelmarsh.toml")) >>> set_default_cfg(default_cfg_path) >>> dataset = get_dataset() >>> download_path = Path(getenv("DOWNLOAD_PATH", "./data")) >>> raw_data_path = download_path / "kelmarsh_raw.csv" >>> df_raw = read_raw(raw_data_path) >>> column_mapping = get_column_mapping() >>> df_farmnet = to_farmnet(df_raw, column_mapping=column_mapping) >>> print(df_farmnet.to_string(max_cols=6, max_rows=10)) wind_direction nacelle_direction wind_speed power wt_id datetime 2022-01-01 00:00:00+00:00 185.795348 193.731354 6.781222 630.889598 228 2022-01-01 00:10:00+00:00 189.458687 193.731354 6.936052 809.339449 228 2022-01-01 00:20:00+00:00 188.648729 193.731354 7.294642 893.607333 228 2022-01-01 00:30:00+00:00 188.826550 193.731354 8.080467 995.583734 228 2022-01-01 00:40:00+00:00 191.252213 193.731354 7.021328 926.519441 228 ... ... ... ... ... ... 2022-12-31 23:10:00+00:00 210.193670 205.457916 8.712688 1447.101428 233 2022-12-31 23:20:00+00:00 208.465164 205.457916 9.149686 1572.766687 233 2022-12-31 23:30:00+00:00 213.539677 205.457916 9.571797 1653.457245 233 2022-12-31 23:40:00+00:00 213.684894 205.457916 9.549912 1670.531378 233 2022-12-31 23:50:00+00:00 209.309463 205.457916 9.215081 1563.665674 233 >>> id(df_farmnet) == id(df_raw) False >>> list(df_farmnet.columns) == list(column_mapping.values()) True """ df_renamed = df.rename(columns=column_mapping)[ column_mapping.values() ].copy() return df_renamed
if __name__ == "__main__": import doctest doctest.testmod(name="read_raw",optionflags=doctest.NORMALIZE_WHITESPACE)