Source code for farmnet.data.preprocessing

# -*- coding: utf-8 -*-
"""
Functions for preprocessing

"""

__all__ = [
    "log_process",
    "join_frames",
    "add_time_cols",
    "square_root",
    "filter_constants",
    "filter_power",
    "resample",
    "filter_bin",
    "filter_regexp",
    "dropna",
    "filter_corr",
    "compose",
    "ComposableFunction",
    "remove_interval",
]

import functools
from typing import Callable, Any

import numpy as np
import pandas as pd


[docs] def log_process(func): def wrapper(*args, **kwargs): df = args[0] set(df.columns) df.shape[0] result = func(*args, **kwargs) # removed_columns = columns - set(result.columns) # n_removed_rows = n_samples - result.shape[0] # logger.info( # f"{func} Columns removed: {len(removed_columns)}, rows removed: {n_removed_rows} \n {removed_columns}" # ) return result return wrapper
[docs] def join_frames(frames: list[pd.DataFrame]) -> pd.DataFrame: """ Joins a list of pandas DataFrames using an outer join on their indices. The function performs an outer join on the list of DataFrames, merging them on their indices. Duplicate indices in the final result are removed. :param frames: List of pandas DataFrames to be joined. :type frames: list[pd.DataFrame] :return: A single DataFrame resulting from the outer join of all input DataFrames, with duplicated indices removed. :rtype: pd.DataFrame :raises IndexError: If the input list is empty. .. rubric:: Example .. code-block:: python >>> import pandas as pd >>> from farmnet.data.preprocessing import join_frames .. code-block:: python >>> df1 = pd.DataFrame({"A": [1, 2]}, index=["a", "b"]) >>> df2 = pd.DataFrame({"B": [3, 4]}, index=["b", "c"]) >>> df3 = pd.DataFrame({"C": [5]}, index=["a"]) >>> join_frames([df1, df2, df3]) A B C a 1.0 NaN 5.0 b 2.0 3.0 NaN c NaN 4.0 NaN .. code-block:: python >>> join_frames([]) Traceback (most recent call last): ... IndexError: list index out of range """ frame = frames[0].join(frames[1:], how="outer") return frame.loc[~frame.index.duplicated()]
[docs] def add_time_cols(df) -> pd.DataFrame: """ Add time-based columns to a DataFrame indexed by datetime. This function assumes that the input DataFrame has a `DatetimeIndex` and adds the following columns: - ``Year``: The year extracted from the index. - ``Month``: The month name (e.g., "January"), as an ordered categorical variable. - ``Day``: The name of the day (e.g., "Monday"). - ``Hour``: The hour of the day (0–23). - ``Minute``: The minute of the hour (0–59). :param df: A pandas DataFrame with a `DatetimeIndex`. :type df: pd.DataFrame :return: The same DataFrame with additional time-based columns. :rtype: pd.DataFrame :raises AttributeError: If the DataFrame index does not support datetime attributes. .. rubric:: Example .. code-block:: python >>> import pandas as pd >>> from datetime import datetime >>> from farmnet.data.preprocessing import add_time_cols .. code-block:: python >>> dates = pd.date_range("2024-01-01 12:34", periods=2, freq="D") >>> df = pd.DataFrame({"Value": [10, 20]}, index=dates) >>> df = add_time_cols(df) >>> df["Year"].tolist() [2024, 2024] >>> df["Month"].tolist() ['January', 'January'] >>> df["Day"].tolist() ['Monday', 'Tuesday'] >>> df["Hour"].tolist() [12, 12] >>> df["Minute"].tolist() [34, 34] If the DataFrame doesn't have a `DatetimeIndex`, ``AttributeError`` gets raised: .. code-block:: python >>> df = pd.DataFrame({"Value": [10, 20]}, index=[1,2]) >>> df = add_time_cols(df) Traceback (most recent call last): ... AttributeError: 'Index' object has no attribute 'year' """ df["Year"] = df.index.year month_type = pd.CategoricalDtype( categories=[ "January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December", ], ordered=True, ) df["Month"] = df.index.month_name().astype(month_type) df["Day"] = df.index.day_name() df["Hour"] = df.index.hour df["Minute"] = df.index.minute return df
[docs] def square_root(x): r""" Compute the square root of the sum of squares of an array. Given an array ``x``, this function calculates: .. math:: \sqrt{\sum_{i} x_i^2} :param x: Input array (any shape) for which to compute the square root of the sum of squares. :type x: array_like :return: Euclidean norm of the input array. :rtype: float .. rubric:: Example .. code-block:: python >>> import numpy as np >>> from farmnet.data.preprocessing import square_root .. code-block:: python >>> square_root([3, 4]) np.float64(5.0) >>> square_root(np.array([1, 1, 1, 1])) np.float64(2.0) >>> square_root([0]) np.float64(0.0) >>> round(square_root([1, 2, 3]), 4) np.float64(3.7417) """ return np.sqrt(np.square(x).sum())
[docs] def filter_constants(df) -> pd.DataFrame: """ Filter out constant or near-constant columns from a DataFrame. This function identifies and removes columns where the standard deviation is extremely small relative to the magnitude of values (less than 1e-6), indicating the column contains nearly constant values. :param df: Input DataFrame to process :type df: pandas.DataFrame :return: DataFrame with constant/near-constant columns removed :rtype: pandas.DataFrame .. rubric:: Example .. code-block:: python >>> import pandas as pd >>> from farmnet.data.preprocessing import filter_constants .. code-block:: python >>> df = pd.DataFrame({ ... 'A': [1, 1, 1, 1], ... 'B': [1, 2, 1, 2], ... 'C': [3, 3, 3, 3] ... }) >>> filtered = filter_constants(df) >>> print(filtered.columns) Index(['B'], dtype='object') .. note:: The threshold for considering a column constant is when the ratio of standard deviation to square root of values is less than 1e-6. .. warning:: [0,0,0,0] won't be considered constant """ df_tmp = df.std() / df.agg(square_root) tmp = df_tmp.loc[df_tmp < 1e-6].index.tolist() return df[df.columns[~df.columns.isin(tmp)]]
[docs] def filter_power( df, col: str, rated: float, thresh: float = 0.1 ) -> pd.DataFrame: """ Filter DataFrame to keep only rows where power values exceed a threshold percentage of rated power. This function filters out power values that are below a specified fraction of the rated power. :param df: Input DataFrame containing power measurements :type df: pandas.DataFrame :param col: Name of the column containing power values to filter :type col: str :param rated: Rated power value (reference value for threshold calculation) :type rated: float :param thresh: Threshold fraction of rated power (0 to 1), defaults to 0.1 :type thresh: float, optional :return: Filtered DataFrame containing only rows above the power threshold :rtype: pandas.DataFrame .. rubric:: Example .. code-block:: python >>> import pandas as pd >>> from farmnet.data.preprocessing import filter_power .. code-block:: python >>> df = pd.DataFrame({'power': [0, 50, 100, 150, 200]}) >>> filtered = filter_power(df, 'power', rated=200, thresh=0.25) >>> print(filtered) power 2 100 3 150 4 200 """ return df.loc[df[col] > rated * thresh]
[docs] def filter_bin(data,ws_col: str,p_col: str,sigma: float,cut_in: float,cut_out: float,bins=40) -> pd.DataFrame: """ Filter data points based on wind speed binning and power standard deviation. This function: 1. Bins the data by wind speed into specified ranges 2. Calculates mean and standard deviation of power for each bin 3. Filters out points where power deviates more than sigma standard deviations from the bin mean :param data: Input DataFrame containing wind speed and power data :type data: pandas.DataFrame :param ws_col: Name of the column containing wind speed values :type ws_col: str :param p_col: Name of the column containing power values :type p_col: str :param sigma: Number of standard deviations to use as threshold for filtering :type sigma: float :param cut_in: Minimum wind speed to consider (lower bound of first bin) :type cut_in: float :param cut_out: Maximum wind speed to consider (upper bound of last bin) :type cut_out: float :param bins: Number of bins to create between cut_in and cut_out, defaults to 40 :type bins: int, optional :return: Filtered DataFrame containing only inlier points :rtype: pandas.DataFrame .. rubric:: Example .. code-block:: python >>> import pandas as pd >>> from farmnet.data.preprocessing import filter_bin .. code-block:: python >>> data = pd.DataFrame({ ... 'wind_speed': [3.0, 4.5, 5.0, 5.5, 25.0], ... 'power': [100, 500, 600, 620, 50] ... }) >>> filtered = filter_bin(data, 'wind_speed', 'power', 2.0, 4.0, 25.0, bins=5) >>> filtered wind_speed power 1 4.5 500 2 5.0 600 3 5.5 620 """ windspeeds = np.linspace(cut_in, cut_out, bins) df = data.copy() df["bins"] = pd.cut(df[ws_col], bins=windspeeds) df = df.dropna(subset="bins") df_bin_mean = df.groupby("bins", observed=True).mean(numeric_only=True) df_bin_std = df.groupby("bins", observed=True).std(numeric_only=True) inlier_ids = [] for row in df.iterrows(): if ( np.abs(row[1][p_col] - df_bin_mean.loc[row[1].bins][p_col]) <= sigma * df_bin_std.loc[row[1].bins][p_col] ): inlier_ids.append(row[0]) return df.drop("bins", axis=1).loc[inlier_ids]
[docs] def resample(df, period: str = "D") -> pd.DataFrame: """Resample time series data to specified frequency and compute mean values. This function resamples a time-indexed DataFrame to a new frequency and calculates the mean of each numeric column for each new time period. The input DataFrame must have a DateTimeIndex. :param df: Input DataFrame with DateTimeIndex to be resampled :type df: pandas.DataFrame :param period: Resampling frequency string (default: "D" for daily) Common options: - "h" for hourly - "D" for daily - "W" for weekly - "M" for monthly - "Q" for quarterly - "A" or "Y" for yearly :type period: str, optional :return: Resampled DataFrame with mean values for each period :rtype: pandas.DataFrame .. rubric:: Example .. code-block:: python >>> import pandas as pd >>> import numpy as np >>> from farmnet.data.preprocessing import resample .. code-block:: python >>> # Create sample time series data >>> date_rng = pd.date_range(start='1/1/2020', end='1/10/2020', freq='h') >>> df = pd.DataFrame(date_rng, columns=['date']) >>> df['data'] = np.random.randn(len(date_rng)) >>> df = df.set_index('date') >>> >>> # Resample to daily means >>> daily_means = resample(df) >>> print(len(daily_means)) 10 """ return df.resample(period).mean()
[docs] def filter_regexp(df, regex: str = "^((?!Min).)*[^xn]$") -> pd.DataFrame: r""" Filter DataFrame columns using a regular expression. This function uses the provided regular expression to filter columns by name. By default, it excludes any column name that: - Contains the substring ``Min`` - Ends with the letter ``x`` or ``n`` :param df: The DataFrame whose columns should be filtered. :type df: pd.DataFrame :param regex: A regular expression pattern to apply to column names. :type regex: str :return: A DataFrame containing only the columns that match the regex pattern. :rtype: pd.DataFrame :raises re.error: If the regular expression is invalid. .. rubric:: Example .. code-block:: python >>> import pandas as pd >>> import numpy as np >>> from farmnet.data.preprocessing import filter_regexp .. code-block:: python >>> df = pd.DataFrame({ ... "Speed": [10, 20], ... "MinPower": [1, 2], ... "Max": [3, 4], ... "Heightn": [5, 6], ... "Weight": [7, 8] ... }) .. code-block:: python >>> filtered_df = filter_regexp(df) >>> filtered_df.columns.tolist() ['Speed', 'Weight'] .. code-block:: python >>> filtered_df = filter_regexp(df,r'^M\w*') >>> filtered_df.columns.tolist() ['MinPower', 'Max'] .. code-block:: python >>> filtered_df = filter_regexp(df,'(*)abc') Traceback (most recent call last): ... re.PatternError: nothing to repeat at position 1 """ return df.filter(regex=regex)
[docs] def dropna(df, axis=1, thresh=1.0) -> pd.DataFrame: """ Drop rows or columns with missing values based on a threshold percentage. Extends pandas' ``dropna()`` by allowing a fractional threshold (0 to 1.0) instead of requiring an absolute count of non-NA values. The threshold is interpreted as a percentage of the axis length. :param df: Input DataFrame to process :type df: pandas.DataFrame :param axis: Axis to drop missing values from (0 or 'index' for rows, 1 or 'columns' for columns) :type axis: int, optional :param thresh: Minimum percentage of non-NA values required (0.0 to 1.0) :type thresh: float, optional :return: DataFrame with NA-containing rows/columns dropped according to threshold :rtype: pandas.DataFrame .. rubric:: Example .. code-block:: python >>> import pandas as pd >>> from farmnet.data.preprocessing import dropna .. code-block:: python >>> df = pd.DataFrame({'A': [1, 2, None], 'B': [None, None, None], 'C': [4, 5, 6]}) >>> df A B C 0 1.0 None 4 1 2.0 None 5 2 NaN None 6 >>> dropna(df, axis=1, thresh=0.5) A C 0 1.0 4 1 2.0 5 2 NaN 6 >>> dropna(df, axis=1, thresh=1) C 0 4 1 5 2 6 """ return df.dropna(axis=axis, thresh=int(thresh * df.shape[~axis & 1]))
[docs] def filter_corr(df, thresh: float = 0.95, pre_choice=None) -> pd.DataFrame: """ Filters columns of a DataFrame based on the correlation threshold. This function removes highly correlated columns from a pandas DataFrame. It computes the correlation matrix and removes any columns that have an absolute correlation greater than the specified threshold with any other column. Optionally, columns can be retained in the result based on prior choices provided through `pre_choice`. :param df: The input pandas DataFrame to be filtered. :type df: pd.DataFrame :param thresh: The correlation threshold above which columns are considered highly correlated and will be discarded (default is 0.95). :type thresh: float, optional :param pre_choice: A list of columns to always retain in the output DataFrame. If None, no columns are pre-selected (default is None). :type pre_choice: list, optional :return: A pandas DataFrame with the filtered columns. :rtype: pd.DataFrame :raises ValueError: If `df` is not a pandas DataFrame. .. rubric:: Example .. code-block:: python >>> import pandas as pd >>> import numpy as np >>> from farmnet.data.preprocessing import filter_corr .. code-block:: python >>> df = pd.DataFrame({ ... 'A': np.random.randn(100), ... 'B': np.random.randn(100), ... 'C': np.random.randn(100), ... 'D': np.random.randn(100) ... }) >>> df['B'] = df['A'] * 0.9 + df['B'] * 0.1 # Create high correlation between A and B >>> df['C'] = df['A'] * 0.95 + df['C'] * 0.05 # High correlation between A and C .. code-block:: python >>> filtered_df = filter_corr(df, thresh=0.9, pre_choice=['B']) >>> filtered_df.columns Index(['A', 'B', 'D'], dtype='object') .. code-block:: python >>> filtered_df = filter_corr(df, thresh=0.9) >>> filtered_df.columns Index(['A', 'D'], dtype='object') """ corr = df.corr() corr = corr - np.eye(corr.shape[0]) mask = corr.abs() > thresh to_filter = corr[mask].unstack().dropna().to_frame() keep = set(pre_choice) if pre_choice else set() discard = set() for i, j in to_filter.index: if i not in discard: keep.add(i) if j not in keep: discard.add(j) return df[df.columns[~df.columns.isin(discard)]]
ComposableFunction = Callable[[pd.DataFrame], pd.DataFrame]
[docs] def compose(*functions: ComposableFunction) -> ComposableFunction: """ Composes multiple functions that each take a pandas DataFrame and return a pandas DataFrame. This function combines a sequence of functions into a single function. The resulting function applies the functions from right to left (i.e., first applying the last function passed and then applying the next one to the result, and so on). :param functions: A sequence of functions to be composed. Each function should take a pandas DataFrame and return a pandas DataFrame. :type functions: ComposableFunction :return: A new function that represents the composition of the input functions. :rtype: ComposableFunction :raises TypeError: If any of the input functions does not accept or return a pandas DataFrame. .. rubric:: Example .. code-block:: python >>> import pandas as pd >>> from farmnet.data.preprocessing import compose, filter_constants, dropna >>> df = pd.DataFrame({ ... 'A': [1, 1, 1, 1], ... 'B': [1, 2, 1, 2], ... 'C': [3, 4, None, 3] ... }) .. code-block:: python >>> composed_func = compose(dropna, filter_constants) >>> composed_df = composed_func(df) >>> print(composed_df) B 0 1 1 2 2 1 3 2 .. code-block:: python >>> composed_func = compose(square_root, dropna) >>> composed_df = composed_func(df) Traceback (most recent call last): ... TypeError: Series.dropna() got an unexpected keyword argument 'thresh' """ return functools.reduce(lambda f, g: lambda x: g(f(x)), functions)
[docs] def remove_interval( df: pd.DataFrame, timestamps: list[str | pd.Timestamp], delta: str | pd.Timedelta = "30D", ): """ Removes rows from a DataFrame that fall within a specified time interval around given timestamps. This function removes rows from the DataFrame where the index falls within a specified time window (before and after each timestamp) defined by the `delta` parameter. The `timestamps` parameter accepts a list of timestamps, and for each timestamp, rows within the interval of `delta` before and after the timestamp are excluded. :param df: The pandas DataFrame from which rows are to be removed. :type df: pd.DataFrame :param timestamps: A list of timestamps around which to remove rows. Each timestamp will have a time window of size `delta` before and after it. :type timestamps: list[str | pd.Timestamp] :param delta: The time window size (default is "30D", i.e., 30 days). The window is applied symmetrically before and after each timestamp. :type delta: str | pd.Timedelta, optional :return: A new DataFrame with rows removed within the specified time intervals. :rtype: pd.DataFrame :raises ValueError: If `df` does not have a datetime index. .. rubric:: Example .. code-block:: python >>> import pandas as pd >>> from farmnet.data.preprocessing import remove_interval .. code-block:: python >>> df = pd.DataFrame({ ... 'value': range(10) ... }, index=pd.date_range('2025-01-01', periods=10, freq='D')) >>> df value 2025-01-01 0 2025-01-02 1 2025-01-03 2 2025-01-04 3 2025-01-05 4 2025-01-06 5 2025-01-07 6 2025-01-08 7 2025-01-09 8 2025-01-10 9 >>> remove_interval(df, timestamps=["2025-01-05"], delta="2D") value 2025-01-01 0 2025-01-02 1 2025-01-08 7 2025-01-09 8 2025-01-10 9 """ mask = np.ones(len(df.index), dtype=bool) for timestamp in timestamps: start = pd.Timestamp(timestamp) - pd.Timedelta(delta) end = pd.Timestamp(timestamp) + pd.Timedelta(delta) mask = np.logical_and(mask, (df.index < start) | (df.index > end)) return df.loc[mask]
def match_group_count( df: pd.DataFrame, by: str, count_col: str, match: int | float | str ) -> pd.DataFrame: grouped = df.groupby(by) return grouped.filter(lambda x: x[count_col].count() == match) def rename_labels(s: pd.Series, mapping_dict: dict) -> pd.Series: """ Renames the labels of a pandas Series according to a mapping dictionary. This function applies a mapping dictionary to the values of the Series, renaming them based on the keys in the dictionary. If a value in the Series is not found in the dictionary, it will raise a KeyError. :param s: A pandas Series whose values need to be renamed. :type s: pd.Series :param mapping_dict: A dictionary mapping existing labels to new labels. :type mapping_dict: dict :return: A new pandas Series with the labels renamed according to the mapping. :rtype: pd.Series :raises KeyError: If a value in the Series does not have a corresponding key in the mapping_dict. .. rubric:: Example .. code-block:: python >>> from farmnet.data.preprocessing import dropna >>> import pandas as pd .. code-block:: python >>> s = pd.Series(["cat", "dog", "bird"]) >>> mapping = {"cat": "kitten", "dog": "puppy", "bird": "hatchling"} >>> rename_labels(s, mapping) 0 kitten 1 puppy 2 hatchling dtype: object .. code-block:: python >>> s = pd.Series(["cat", "dog", "bird"]) >>> mapping = {"cat": "kitten", "dog": "puppy"} >>> rename_labels(s, mapping) Traceback (most recent call last): ... KeyError: 'bird' """ return s.apply(lambda x: mapping_dict[x])