Source code for nyaggle.feature_store.feature_store

import functools
import os
import warnings
from typing import List, Optional, Union

import pandas as pd
import pyarrow
from tqdm import tqdm


def validate_train_test_difference(train: pd.Series, test: pd.Series):
    # % of nulls
    if test.isnull().mean() == 1.0:
        raise RuntimeError('Error in feature {}: all values in test data is null'.format(train.name))


def validate_feature(df: pd.DataFrame, y: pd.Series):
    if len(y) < len(df):
        # assuming that the first part of the dataframe is train part
        train = df.iloc[:len(y), :]
        test = df.iloc[len(y):, :]
    else:
        train = df[~y.isnull()]
        test = df[y.isnull()]

    for c in df.columns:
        validate_train_test_difference(train[c], test[c])


[docs]def save_feature(df: pd.DataFrame, feature_name: Union[int, str], directory: str = './features/', with_csv_dump: bool = False, create_directory: bool = True, reference_target_variable: Optional[pd.Series] = None, overwrite: bool = False): """ Save pandas dataframe as feather-format Args: df: The dataframe to be saved. feature_name: The name of the feature. The output file will be ``{feature_name}.f``. directory: The directory where the feature will be stored. with_csv_dump: If True, the first 1000 lines are dumped to csv file for debug. create_directory: If True, create directory if not exists. reference_target_variable: If not None, instant validation will be made on the feature. overwrite: If False and file already exists, RuntimeError will be raised. """ if create_directory: os.makedirs(directory, exist_ok=True) if reference_target_variable is not None: validate_feature(df, reference_target_variable) path = os.path.join(directory, str(feature_name) + '.f') if not overwrite and os.path.exists(path): raise RuntimeError('File already exists') df.to_feather(path) if with_csv_dump: df.head(1000).to_csv(os.path.join(directory, str(feature_name) + '.csv'), index=False)
[docs]def load_feature(feature_name: Union[int, str], directory: str = './features/', ignore_columns: List[str] = None) -> pd.DataFrame: """ Load feature as pandas DataFrame. Args: feature_name: The name of the feature (used in ``save_feature``). directory: The directory where the feature is stored. ignore_columns: The list of columns that will be dropped from the loaded dataframe. Returns: The feature dataframe """ path = os.path.join(directory, str(feature_name) + '.f') df = pd.read_feather(path) if ignore_columns: return df.drop([c for c in ignore_columns if c in df.columns], axis=1) else: return df
[docs]def load_features(base_df: Optional[pd.DataFrame], feature_names: List[Union[int, str]], directory: str = './features/', ignore_columns: List[str] = None, create_directory: bool = True, rename_duplicate: bool = True) -> pd.DataFrame: """ Load features and returns concatenated dataframe Args: base_df: The base dataframe. If not None, resulting dataframe will consist of base and loaded feature columns. feature_names: The list of feature names to be loaded. directory: The directory where the feature is stored. ignore_columns: The list of columns that will be dropped from the loaded dataframe. create_directory: If True, create directory if not exists. rename_duplicate: If True, duplicated column name will be renamed automatically (feature name will be used as suffix). If False, duplicated columns will be as-is. Returns: The merged dataframe """ if create_directory: os.makedirs(directory, exist_ok=True) dfs = [load_feature(f, directory=directory, ignore_columns=ignore_columns) for f in tqdm(feature_names)] if base_df is None: base_df = dfs[0] dfs = dfs[1:] feature_names = feature_names[1:] columns = list(base_df.columns) for df, feature_name in zip(dfs, feature_names): if len(df) != len(base_df): raise RuntimeError('DataFrame length are different. feature={}'.format(feature_name)) for c in df.columns: if c in columns: warnings.warn('A feature name {} is duplicated.'.format(c)) if rename_duplicate: while c in columns: c += '_' + str(feature_name) warnings.warn('The duplicated name in feature={} will be renamed to {}'.format(feature_name, c)) columns.append(c) concatenated = pd.concat([base_df] + dfs, axis=1) concatenated.columns = columns return concatenated
[docs]def cached_feature(feature_name: Union[int, str], directory: str = './features/', ignore_columns: List[str] = None): """ Decorator to wrap a function which returns pd.DataFrame with a memorizing callable that saves dataframe using ``feature_store.save_feature``. Args: feature_name: The name of the feature (used in ``save_feature``). directory: The directory where the feature is stored. ignore_columns: The list of columns that will be dropped from the loaded dataframe. Example: >>> from nyaggle.feature_store import cached_feature >>> >>> @cached_feature('x') >>> def make_feature_x(param) -> pd.DataFrame: >>> print('called') >>> ... >>> return df >>> >>> x = make_feature_x(...) # if x.f does not exist, call the function and save result to x.f "called" >>> x = make_feature_x(...) # load from file in the second time """ def _decorator(fun): @functools.wraps(fun) def _decorated_fun(*args, **kwargs): try: return load_feature(feature_name, directory, ignore_columns) except (pyarrow.ArrowIOError, IOError): df = fun(*args, **kwargs) assert isinstance(df, pd.DataFrame), "returning value of @cached_feature should be pd.DataFrame" save_feature(df, feature_name, directory) return df return _decorated_fun return _decorator