Module pdpipe.basic_stages
Basic pdpipe PdPipelineStages.
Expand source code
"""Basic pdpipe PdPipelineStages."""
from typing import Optional, List, Dict, Union, Callable
from collections import deque
import pandas
from strct.dicts import reverse_dict_partial
from pdpipe.core import PdPipelineStage, ColumnsBasedPipelineStage
# from pdpipe.util import out_of_place_col_insert
from pdpipe.shared import (
_interpret_columns_param,
_list_str
)
import pdpipe.cond as cond
from pdpipe.types import ColumnsParamType
from pdpipe.exceptions import FailedConditionError
from pdpipe.cq import ColumnQualifier
class ColDrop(ColumnsBasedPipelineStage):
"""A pipeline stage that drops columns by name.
Parameters
----------
columns : single label, list-like or callable
The label, or an iterable of labels, of columns to drop. Alternatively,
this parameter can be assigned a callable returning an iterable of
labels from an input pandas.DataFrame (see `pdpipe.cq`).
errors : {‘ignore’, ‘raise’}, default ‘raise’
If ‘ignore’, suppress error and existing labels are dropped.
Example
-------
>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'char'])
>>> pdp.ColDrop('num').apply(df)
char
1 a
2 b
"""
def __init__(
self,
columns: ColumnsParamType,
errors: Optional[str] = None,
**kwargs: object,
) -> None:
self._errors = errors
self._post_cond = cond.HasNoColumn(columns)
super_kwargs = {
'columns': columns,
'desc_temp': 'Drop columns {}',
}
super_kwargs.update(**kwargs)
super_kwargs['none_columns'] = 'error'
super().__init__(**super_kwargs)
def _prec(self, df: pandas.DataFrame) -> bool:
if self._errors != 'ignore':
return super()._prec(df)
return True
def _post(self, df: pandas.DataFrame) -> bool:
return self._post_cond(df)
def _transformation(
self, df: pandas.DataFrame, verbose: bool, fit: bool,
) -> pandas.DataFrame:
return df.drop(
self._get_columns(df, fit=fit), axis=1, errors=self._errors)
class ValDrop(ColumnsBasedPipelineStage):
"""A pipeline stage that drops rows by value.
Parameters
----------
values : list-like
A list of the values to drop.
columns : single label, list-like or callable, default None
The label, or an iterable of labels, of columns to check for the given
values. Alternatively, this parameter can be assigned a callable
returning an iterable of labels from an input pandas.DataFrame. See
`pdpipe.cq`. If set to None, all columns are checked.
exclude_columns : label, iterable or callable, optional
The label, or an iterable of labels, of columns to exclude, given the
`columns` parameter. Alternatively, this parameter can be assigned a
callable returning a labels iterable from an input pandas.DataFrame.
See `pdpipe.cq`. Optional. By default no columns are excluded.
Example
-------
>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1,4],[4,5],[18,11]], [1,2,3], ['a','b'])
>>> pdp.ValDrop([4], 'a').apply(df)
a b
1 1 4
3 18 11
>>> pdp.ValDrop([4]).apply(df)
a b
3 18 11
"""
def __init__(
self,
values: List[object],
columns: ColumnsParamType = None,
**kwargs: object,
) -> None:
self._values = values
self._values_str = _list_str(self._values)
super_kwargs = {
'columns': columns,
'desc_temp': f'Drop values {self._values_str} in columns {{}}',
}
super_kwargs.update(**kwargs)
super_kwargs['none_columns'] = 'all'
super().__init__(**super_kwargs)
def _transformation(
self, df: pandas.DataFrame, verbose: bool, fit: bool,
) -> pandas.DataFrame:
inter_df = df
before_count = len(inter_df)
columns_to_check = self._get_columns(df, fit=fit)
for col in columns_to_check:
inter_df = inter_df[~inter_df[col].isin(self._values)]
if verbose:
print(f"{before_count - len(inter_df)} rows dropped.")
return inter_df
class ValKeep(ColumnsBasedPipelineStage):
"""A pipeline stage that keeps rows by value.
Parameters
----------
values : list-like
A list of the values to keep.
columns : single label, list-like or callable, default None
The label, or an iterable of labels, of columns to check for the given
values. Alternatively, this parameter can be assigned a callable
returning an iterable of labels from an input pandas.DataFrame. See
`pdpipe.cq`. If set to None, all columns are checked.
exclude_columns : single label, iterable or callable, optional
The label, or an iterable of labels, of columns to exclude, given the
`columns` parameter. Alternatively, this parameter can be assigned a
callable returning a labels iterable from an input pandas.DataFrame.
See `pdpipe.cq`. Optional. By default no columns are excluded.
Example
-------
>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1,4],[4,5],[5,11]], [1,2,3], ['a','b'])
>>> pdp.ValKeep([4, 5], 'a').apply(df)
a b
2 4 5
3 5 11
>>> pdp.ValKeep([4, 5]).apply(df)
a b
2 4 5
"""
def __init__(self, values, columns=None, **kwargs):
self._values = values
self._values_str = _list_str(self._values)
super_kwargs = {
'columns': columns,
'desc_temp': f'Keep values {self._values_str} in columns {{}}',
}
super_kwargs.update(**kwargs)
super_kwargs['none_columns'] = 'all'
super().__init__(**super_kwargs)
def _transformation(self, df, verbose, fit):
inter_df = df
before_count = len(inter_df)
columns_to_check = self._get_columns(df, fit=fit)
for col in columns_to_check:
inter_df = inter_df[inter_df[col].isin(self._values)]
if verbose:
print(f"{before_count - len(inter_df)} rows dropped.")
return inter_df
class ColRename(PdPipelineStage):
"""A pipeline stage that renames a column or columns.
Parameters
----------
rename_mapper : dict-like or callable
Maps old column names to new ones.
Example
-------
>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'char'])
>>> pdp.ColRename({'num': 'len', 'char': 'initial'}).apply(df)
len initial
1 8 a
2 5 b
>>> def renamer(lbl: str):
... if lbl.startswith('n'):
... return 'foo'
... return lbl
>>> pdp.ColRename(renamer).apply(df)
foo char
1 8 a
2 5 b
"""
_DEF_COLDRENAME_EXC_MSG = ("ColRename stage failed because not all columns"
" {} were found in input dataframe.")
def __init__(self, rename_mapper: Union[Dict, Callable], **kwargs):
self._rename_mapper = rename_mapper
try:
columns_str = _list_str(list(rename_mapper.keys()))
mapper_repr = str(rename_mapper)
keys_set = set(self._rename_mapper.keys())
required_labels = list(keys_set)
_tprec = cond.HasAllColumns(required_labels)
except AttributeError: # rename mapper is a callable
mapper_repr = rename_mapper.__name__
doc = rename_mapper.__doc__
if doc is None:
columns_str = f"by func {rename_mapper.__name__}"
else:
columns_str = (
f"by func {rename_mapper.__name__} with "
f"doc: {rename_mapper.__doc__}"
)
_tprec = cond.AlwaysTrue()
try:
suffix = 's' if len(rename_mapper) > 1 else ''
except TypeError:
suffix = 's'
self._tprec = _tprec
super_kwargs = {
'exmsg': ColRename._DEF_COLDRENAME_EXC_MSG.format(columns_str),
'desc': f"Rename column{suffix} with {mapper_repr}",
}
super_kwargs.update(**kwargs)
super().__init__(**super_kwargs)
def _prec(self, df):
return self._tprec(df)
def _transform(self, df, verbose):
return df.rename(columns=self._rename_mapper)
class DropNa(PdPipelineStage):
"""A pipeline stage that drops null values.
Supports all parameter supported by pandas.dropna function.
Example
-------
>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1,4],[4,None],[1,11]], [1,2,3], ['a','b'])
>>> pdp.DropNa().apply(df)
a b
1 1 4.0
3 1 11.0
"""
_DEF_DROPNA_EXC_MSG = "DropNa stage failed."
_DROPNA_KWARGS = ['axis', 'how', 'thresh', 'subset', 'inplace']
def __init__(self, **kwargs):
common = set(kwargs.keys()).intersection(DropNa._DROPNA_KWARGS)
self.dropna_kwargs = {key: kwargs.pop(key) for key in common}
super_kwargs = {
'exmsg': DropNa._DEF_DROPNA_EXC_MSG,
'desc': "Drops null values."
}
super_kwargs.update(**kwargs)
super().__init__(**super_kwargs)
def _prec(self, df):
return True
def _transform(self, df, verbose):
before_count = len(df)
ncols_before = len(df.columns)
inter_df = df.dropna(**self.dropna_kwargs)
if verbose:
print(
f"{before_count - len(inter_df)} rows, "
f"{ncols_before - len(inter_df.columns)} columns dropeed"
)
return inter_df
class SetIndex(PdPipelineStage):
"""A pipeline stage that set existing columns as index.
Supports all parameter supported by pandas.set_index function except for
`inplace`.
Example
-------
>> import pandas as pd; import pdpipe as pdp;
>> df = pd.DataFrame([[1,4],[3, 11]], [1,2], ['a','b'])
>> pdp.SetIndex('a').apply(df)
b
a
1 4
3 11
"""
_DEF_SETIDX_EXC_MSG = "SetIndex stage failed."
_DEF_SETIDX_APP_MSG = "Setting indexes..."
_SETINDEX_KWARGS = ['drop', 'append', 'verify_integrity']
def __init__(self, keys, **kwargs):
common = set(kwargs.keys()).intersection(SetIndex._SETINDEX_KWARGS)
self.setindex_kwargs = {key: kwargs.pop(key) for key in common}
self.keys = keys
if hasattr(keys, '__iter__') and not isinstance(keys, str):
_tprec = cond.HasAllColumns(list(keys))
else:
_tprec = cond.HasAllColumns([keys])
self._tprec = _tprec
super_kwargs = {
'exmsg': SetIndex._DEF_SETIDX_EXC_MSG,
'desc': "Set indexes."
}
super_kwargs.update(**kwargs)
super().__init__(**super_kwargs)
def _prec(self, df):
return self._tprec(df)
def _transform(self, df, verbose):
return df.set_index(keys=self.keys, **self.setindex_kwargs)
class FreqDrop(PdPipelineStage):
"""A pipeline stage that drops rows by value frequency.
Parameters
----------
threshold : int
The minimum frequency required for a value to be kept.
column : str
The name of the colum to check for the given value frequency.
Example
-------
>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1,4],[4,5],[1,11]], [1,2,3], ['a','b'])
>>> pdp.FreqDrop(2, 'a').apply(df)
a b
1 1 4
3 1 11
"""
_DEF_FREQDROP_EXC_MSG = ("FreqDrop stage failed because column {} was not"
" found in input dataframe.")
_DEF_FREQDROP_DESC = "Drop values with frequency < {} in column {}."
def __init__(self, threshold: int, column: str, **kwargs):
self._threshold = threshold
self._column = column
super_kwargs = {
'exmsg': FreqDrop._DEF_FREQDROP_EXC_MSG.format(self._column),
'desc': FreqDrop._DEF_FREQDROP_DESC.format(
self._threshold, self._column)
}
super_kwargs.update(**kwargs)
super().__init__(**super_kwargs)
def _prec(self, df):
return self._column in df.columns
def _transform(self, df, verbose):
inter_df = df
before_count = len(inter_df)
valcount = df[self._column].value_counts()
to_drop = valcount[valcount < self._threshold].index
inter_df = inter_df[~inter_df[self._column].isin(to_drop)]
if verbose:
print(f"{before_count - len(inter_df)} rows dropped.")
return inter_df
class ColReorder(PdPipelineStage):
"""A pipeline stage that reorders columns.
Parameters
----------
positions : dict
A mapping of column names to their desired positions after reordering.
Columns not included in the mapping will maintain their relative
positions over the non-mapped colums.
Example
-------
>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[8,4,3,7]], columns=['a', 'b', 'c', 'd'])
>>> pdp.ColReorder({'b': 0, 'c': 3}).apply(df)
b a d c
0 4 8 7 3
"""
_DEF_ORD_EXC_MSG = ("ColReorder stage failed because not all columns in {}"
" were found in input dataframe.")
def __init__(self, positions, **kwargs):
self._col_to_pos = positions
self._pos_to_col = reverse_dict_partial(positions)
super_kwargs = {
'exmsg': ColReorder._DEF_ORD_EXC_MSG.format(self._col_to_pos),
'desc': f"Reorder columns by {self._col_to_pos}",
}
super_kwargs.update(**kwargs)
super().__init__(**super_kwargs)
def _prec(self, df):
return set(self._col_to_pos.keys()).issubset(df.columns)
def _transform(self, df, verbose):
cols = df.columns
map_cols = list(self._col_to_pos.keys())
non_map_cols = deque(x for x in cols if x not in map_cols)
new_columns = []
try:
for pos in range(len(cols)):
if pos in self._pos_to_col:
new_columns.append(self._pos_to_col[pos])
else:
new_columns.append(non_map_cols.popleft())
return df[new_columns]
except (IndexError):
raise ValueError(f"Bad positions mapping given: {new_columns}")
class RowDrop(ColumnsBasedPipelineStage):
"""A pipeline stage that drops rows by callable conditions.
Parameters
----------
conditions : list-like or dict
The list of conditions that make a row eligible to be dropped. Each
condition must be a callable that take a cell value and return a bool
value. If a list of callables is given, the conditions are checked for
each column value of each row. If a dict mapping column labels to
callables is given, then each condition is only checked for the column
values of the designated column.
reduce : 'any', 'all' or 'xor', default 'any'
Determines how row conditions are reduced. If set to 'all', a row must
satisfy all given conditions to be dropped. If set to 'any', rows
satisfying at least one of the conditions are dropped. If set to 'xor',
rows satisfying exactly one of the conditions will be dropped. Set to
'any' by default.
columns : single label, iterable or callable, optional
The label, or an iterable of labels, of columns. Alternatively,
this parameter can be assigned a callable returning an iterable of
labels from an input pandas.DataFrame. See `pdpipe.cq`. If given,
input conditions will be applied to the sub-dataframe made up of
these columns to determine which rows to drop. Ignored if `conditions`
is provided with a dict object. If `conditions` is a list and this
parameter is not provided, all columns are checked (unless
`exclude_columns` is additionally provided)
exclude_columns : single label, iterable or callable, optional
The label, or an iterable of labels, of columns to exclude, given the
`columns` parameter. Alternatively, this parameter can be assigned a
callable returning a labels iterable from an input pandas.DataFrame.
See `pdpipe.cq`. Optional. By default no columns are excluded.
Example
-------
>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1,4],[4,5],[5,11]], [1,2,3], ['a','b'])
>>> pdp.RowDrop([lambda x: x < 2]).apply(df)
a b
2 4 5
3 5 11
>>> pdp.RowDrop({'a': lambda x: x == 4}).apply(df)
a b
1 1 4
3 5 11
"""
_REDUCERS = {
'all': all,
'any': any,
'xor': lambda x: sum(x) == 1
}
class DictRowCond(object):
"""Filter rows by a dict of conditions."""
def __init__(self, conditions, reducer):
self.conditions = conditions
self.reducer = reducer
def __call__(self, row):
res = [cond(row[lbl]) for lbl, cond in self.conditions.items()]
return self.reducer(res)
class ListRowCond(object):
"""Filter rows by a list of conditions."""
def __init__(self, conditions, reducer):
self.conditions = conditions
self.reducer = reducer
def __call__(self, row):
res = [self.reducer(row.apply(cond)) for cond in self.conditions]
return self.reducer(res)
def _row_condition_builder(self, conditions, reduce):
reducer = RowDrop._REDUCERS[reduce]
if self._cond_is_dict:
row_cond = RowDrop.DictRowCond(
conditions=conditions, reducer=reducer)
else:
row_cond = RowDrop.ListRowCond(
conditions=conditions, reducer=reducer)
return row_cond
def __init__(self, conditions, reduce=None, columns=None, **kwargs):
self._conditions = conditions
if reduce is None:
reduce = 'any'
self._reduce = reduce
if reduce not in RowDrop._REDUCERS.keys():
raise ValueError((
"{} is an unsupported argument for the 'reduce' parameter of "
"the RowDrop constructor!").format(reduce))
self._cond_is_dict = isinstance(conditions, dict)
if self._cond_is_dict:
valid = all([callable(cond) for cond in conditions.values()])
if not valid:
raise ValueError(
"Condition dicts given to RowDrop must map to callables!")
columns = list(conditions.keys())
else:
valid = all([callable(cond) for cond in conditions])
if not valid:
raise ValueError(
"RowDrop condition lists can contain only callables!")
self._row_cond = self._row_condition_builder(conditions, reduce)
super_kwargs = {
'columns': columns,
'desc_temp': 'Drop rows in columns {} by conditions',
}
super_kwargs.update(**kwargs)
super_kwargs['none_columns'] = 'all'
super().__init__(**super_kwargs)
def _transformation(self, df, verbose, fit):
before_count = len(df)
columns = self._get_columns(df, fit=fit)
subdf = df[columns]
drop_index = ~subdf.apply(self._row_cond, axis=1)
inter_df = df[drop_index]
if verbose:
print(f"{before_count - len(inter_df)} rows dropped.")
return inter_df
class Schematize(PdPipelineStage):
"""Enforces a column schema on input dataframes.
Parameters
----------
columns: sequence of labels
The dataframe schema to enforce on input dataframes.
Example
-------
>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[2, 4, 8],[3, 6, 9]], [1, 2], ['a', 'b', 'c'])
>>> pdp.Schematize(['a', 'c']).apply(df)
a c
1 2 8
2 3 9
>>> pdp.Schematize(['c', 'b']).apply(df)
c b
1 8 4
2 9 6
"""
def __init__(self, columns, **kwargs):
self._columns = _interpret_columns_param(columns)
self._columns_str = _list_str(self._columns)
desc = (
f"Transform input dataframes to the following schema: "
f"{self._columns_str}"
)
exmsg = (
f"Not all required columns {self._columns_str} "
f"found in input dataframe!"
)
super_kwargs = {
'exmsg': exmsg,
'desc': desc,
}
super_kwargs.update(**kwargs)
super().__init__(**super_kwargs)
def _prec(self, df):
return set(self._columns).issubset(df.columns)
def _transform(self, df, verbose=None):
return df[self._columns]
class DropDuplicates(ColumnsBasedPipelineStage):
"""Drop duplicates in the given columns.
Parameters
----------
columns: column label or sequence of labels, optional
The labels of the columns to consider for duplication drop. If not
populated, duplicates are dropped from all columns.
exclude_columns : object, iterable or callable, optional
The label, or an iterable of labels, of columns to exclude, given the
`columns` parameter. Alternatively, this parameter can be assigned a
callable returning a labels iterable from an input pandas.DataFrame.
See `pdpipe.cq`. Optional. By default no columns are excluded.
Examples
--------
>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[8, 1],[8, 2], [9, 2]], [1,2,3], ['a', 'b'])
>>> pdp.DropDuplicates('a').apply(df)
a b
1 8 1
3 9 2
"""
def __init__(self, columns=None, **kwargs):
super_kwargs = {
'columns': columns,
'desc_temp': 'Drop duplicates in columns {}',
}
super_kwargs.update(**kwargs)
super_kwargs['none_columns'] = 'all'
super().__init__(**super_kwargs)
def _transformation(self, df, verbose, fit):
columns = self._get_columns(df, fit=fit)
inter_df = df.drop_duplicates(subset=columns)
if verbose:
print(f"{len(df) - len(inter_df)} rows dropped.")
return inter_df
class ColumnDtypeEnforcer(PdPipelineStage):
"""A pipeline stage enforcing column dtypes.
Parameters
----------
column_to_dtype: dict of labels / ColumnQualifiers to dtypes
Use {col: dtype, …}, where col is a column label and dtype is a
numpy.dtype or Python type to cast one or more of the DataFrame’s
columns to column-specific types. Alternatively, you can provide
`ColumnQualifier` objects as keys. If at least one such key is present,
the lbl-to-dtype dict is dynamically inferred each time the pipeline
stage is applied (note that `ColumnQualifier` objects are fittable by
default, so to have column labels re-inferred after the first stage
application you'll have to set `fittable=False` for the
`ColumnQualifier` you use, see `pdpipe.cq`).
errors: {‘raise’, ‘ignore’}, default ‘raise’
Control raising of exceptions on invalid data for provided dtype.
- raise : allow exceptions to be raised
- ignore : suppress exceptions. On error return original object.
Example
-------
>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'initial'])
>>> pdp.ColumnDtypeEnforcer({'num': float}).apply(df)
num initial
1 8.0 a
2 5.0 b
>>> pdp.ColumnDtypeEnforcer({pdp.cq.StartWith('n'): float}).apply(df)
num initial
1 8.0 a
2 5.0 b
"""
_DEF_COL_DTYPE_ENF_EXC_MSG = (
"ColumnDtypeEnforcer stage failed because not all columns"
" {} were found in input dataframe.")
def __init__(
self,
column_to_dtype: Dict,
errors: Optional[str] = 'raise',
**kwargs: object,
) -> None:
# if none of the keys in column_to_dtype is a ColumnQualifier
if not any(isinstance(
x, ColumnQualifier) for x in column_to_dtype.keys()):
# its a static map; use it as is
self._column_to_dtype = column_to_dtype
keys_set = set(column_to_dtype.keys())
_tprec = cond.HasAllColumns(list(keys_set))
else:
# else, it's at least partly dynamic, and will have to infer it
# on run time
self._dynamic_column_to_dtype = column_to_dtype
_tprec = cond.AlwaysTrue()
self._tprec = _tprec
self._errors = errors
columns_str = _list_str(list(column_to_dtype.keys()))
suffix = 's' if len(column_to_dtype) > 1 else ''
super_kwargs = {
'exmsg': ColumnDtypeEnforcer._DEF_COL_DTYPE_ENF_EXC_MSG.format(
columns_str),
'desc': f"Enforce column{suffix} dtype with {column_to_dtype}",
}
super_kwargs.update(**kwargs)
super().__init__(**super_kwargs)
def _col_to_dtype_from_df(self, df: pandas.DataFrame) -> Dict:
try:
return self._column_to_dtype
except AttributeError:
column_to_dtype = {}
for k, dtype in self._dynamic_column_to_dtype.items():
try:
column_to_dtype.update({
lbl: dtype
for lbl in k(df)
})
except TypeError: # k is not a callable
column_to_dtype[k] = dtype
return column_to_dtype
def _prec(self, df: pandas.DataFrame) -> bool:
return self._tprec(df)
def _transform(
self,
df: pandas.DataFrame,
verbose: bool,
) -> pandas.DataFrame:
lbl_to_dtype = self._col_to_dtype_from_df(df)
return df.astype(
dtype=lbl_to_dtype,
copy=True,
errors=self._errors,
)
class ConditionValidator(PdPipelineStage):
"""A pipeline stage that validates boolean conditions on dataframes.
The stage does not change the input dataframe in any way.
The constructor expects either a single callable or a list-like of callable
objects, and checks that all these callable return True - meaning all
defined conditions hold - for input dataframes.
Naturally, pdpipe `Condition` objects from the `pdpipe.cond` module can be used.
Parameters
----------
conditions : callable or list-like of callable
The conditions to check for input dataframes. Naturally, pdpipe
`Condition` objects from the `pdpipe.cond` module can be used.
reducer : callable, optional
The callable that reduces the list of boolean result to a single
result. By default the built-in `all` function is used, so all
conditions must hold for this pipeline stage to validate an input
dataframe. The built-in `any` function may be used to validate at least
one condition holds, and of course custom reducing functions can be
used.
errors : str, default 'raise'
If set to 'raise', the default, then if the result boolean result is
False a FailedConditionError is raised on stage application. If set to
'ignore', then conditions are checked, the results are printed if the
application was called with `verbose=True`, and pipeline application
continues. Any other value is interpreted as 'raise'.
Example
-------
>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1,4],[4,None],[1,11]], [1,2,3], ['a','b'])
>>> pdp.ConditionValidator(lambda df: len(df.columns) == 5).apply(df)
Traceback (most recent call last):
...
pdpipe.exceptions.FailedConditionError: ConditionValidator stage failed; some conditions did not hold for the input dataframe!
>>> pdp.ConditionValidator(pdp.cond.HasNoMissingValues()).apply(df)
Traceback (most recent call last):
...
pdpipe.exceptions.FailedConditionError: ConditionValidator stage failed; some conditions did not hold for the input dataframe!
""" # noqa: E501
def __init__(
self,
conditions: Union[Callable, List[Callable]],
reducer: Optional[Callable] = all,
errors: Optional[str] = 'raise',
**kwargs: object,
):
if callable(conditions):
conditions = [conditions]
self._conditions = conditions
self._reducer = reducer
self._errors = 'raise'
self._raise = True
if errors == 'ignore':
self._errors = 'ignore'
self._raise = False
super_kwargs = {
'desc': "Validates conditions"
}
super_kwargs.update(**kwargs)
super().__init__(**super_kwargs)
def _prec(self, df):
return True
def _transform(self, df, verbose):
results = []
for cond_obj in self._conditions:
try:
res = cond_obj(df)
except Exception as e:
raise ValueError(
f"Supplied condition raised a {e} exception when applied "
"to input dataframe!"
) from e
if verbose and not res:
cond_repr = cond_obj.__doc__
if cond_repr is None:
cond_repr = str(cond_obj)
print(f" + Condition failed for input dataframe: {cond_repr}")
results.append(res)
reduced_result = self._reducer(results)
if self._raise and not reduced_result:
raise FailedConditionError(
"ConditionValidator stage failed; some conditions did not hold"
" for the input dataframe!")
return df
Classes
class ColDrop (columns: Union[object, List[object], Callable], errors: Optional[str] = None, **kwargs: object)
-
A pipeline stage that drops columns by name.
Parameters
columns
:single label, list-like
orcallable
- The label, or an iterable of labels, of columns to drop. Alternatively,
this parameter can be assigned a callable returning an iterable of
labels from an input pandas.DataFrame (see
pdpipe.cq
). errors
:{‘ignore’, ‘raise’}
, default‘raise’
- If ‘ignore’, suppress error and existing labels are dropped.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'char']) >>> pdp.ColDrop('num').apply(df) char 1 a 2 b
Expand source code
class ColDrop(ColumnsBasedPipelineStage): """A pipeline stage that drops columns by name. Parameters ---------- columns : single label, list-like or callable The label, or an iterable of labels, of columns to drop. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame (see `pdpipe.cq`). errors : {‘ignore’, ‘raise’}, default ‘raise’ If ‘ignore’, suppress error and existing labels are dropped. Example ------- >>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'char']) >>> pdp.ColDrop('num').apply(df) char 1 a 2 b """ def __init__( self, columns: ColumnsParamType, errors: Optional[str] = None, **kwargs: object, ) -> None: self._errors = errors self._post_cond = cond.HasNoColumn(columns) super_kwargs = { 'columns': columns, 'desc_temp': 'Drop columns {}', } super_kwargs.update(**kwargs) super_kwargs['none_columns'] = 'error' super().__init__(**super_kwargs) def _prec(self, df: pandas.DataFrame) -> bool: if self._errors != 'ignore': return super()._prec(df) return True def _post(self, df: pandas.DataFrame) -> bool: return self._post_cond(df) def _transformation( self, df: pandas.DataFrame, verbose: bool, fit: bool, ) -> pandas.DataFrame: return df.drop( self._get_columns(df, fit=fit), axis=1, errors=self._errors)
Ancestors
Inherited members
ColumnsBasedPipelineStage
:AdHocStage
AggByCols
ApplyByCols
ApplyToRows
Bin
ColByFrameFunc
ColDrop
ColRename
ColReorder
ColumnDtypeEnforcer
ColumnTransformer
ColumnsBasedPipelineStage
ConditionValidator
DropDuplicates
DropNa
DropRareTokens
DropTokensByLength
DropTokensByList
Encode
FitOnly
FreqDrop
Log
MapColVals
OneHotEncode
PdPipeline
RegexReplace
RemoveStopwords
RowDrop
Scale
Schematize
SetIndex
SnowballStem
TfidfVectorizeTokenLists
TokenizeText
UntokenizeText
ValDrop
ValKeep
apply
description
fit
fit_transform
transform
class ColRename (rename_mapper: Union[Dict[~KT, ~VT], Callable], **kwargs)
-
A pipeline stage that renames a column or columns.
Parameters
rename_mapper
:dict-like
orcallable
- Maps old column names to new ones.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'char']) >>> pdp.ColRename({'num': 'len', 'char': 'initial'}).apply(df) len initial 1 8 a 2 5 b >>> def renamer(lbl: str): ... if lbl.startswith('n'): ... return 'foo' ... return lbl >>> pdp.ColRename(renamer).apply(df) foo char 1 8 a 2 5 b
Expand source code
class ColRename(PdPipelineStage): """A pipeline stage that renames a column or columns. Parameters ---------- rename_mapper : dict-like or callable Maps old column names to new ones. Example ------- >>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'char']) >>> pdp.ColRename({'num': 'len', 'char': 'initial'}).apply(df) len initial 1 8 a 2 5 b >>> def renamer(lbl: str): ... if lbl.startswith('n'): ... return 'foo' ... return lbl >>> pdp.ColRename(renamer).apply(df) foo char 1 8 a 2 5 b """ _DEF_COLDRENAME_EXC_MSG = ("ColRename stage failed because not all columns" " {} were found in input dataframe.") def __init__(self, rename_mapper: Union[Dict, Callable], **kwargs): self._rename_mapper = rename_mapper try: columns_str = _list_str(list(rename_mapper.keys())) mapper_repr = str(rename_mapper) keys_set = set(self._rename_mapper.keys()) required_labels = list(keys_set) _tprec = cond.HasAllColumns(required_labels) except AttributeError: # rename mapper is a callable mapper_repr = rename_mapper.__name__ doc = rename_mapper.__doc__ if doc is None: columns_str = f"by func {rename_mapper.__name__}" else: columns_str = ( f"by func {rename_mapper.__name__} with " f"doc: {rename_mapper.__doc__}" ) _tprec = cond.AlwaysTrue() try: suffix = 's' if len(rename_mapper) > 1 else '' except TypeError: suffix = 's' self._tprec = _tprec super_kwargs = { 'exmsg': ColRename._DEF_COLDRENAME_EXC_MSG.format(columns_str), 'desc': f"Rename column{suffix} with {mapper_repr}", } super_kwargs.update(**kwargs) super().__init__(**super_kwargs) def _prec(self, df): return self._tprec(df) def _transform(self, df, verbose): return df.rename(columns=self._rename_mapper)
Ancestors
- PdPipelineStage
- abc.ABC
Inherited members
PdPipelineStage
:AdHocStage
AggByCols
ApplyByCols
ApplyToRows
Bin
ColByFrameFunc
ColDrop
ColRename
ColReorder
ColumnDtypeEnforcer
ColumnTransformer
ColumnsBasedPipelineStage
ConditionValidator
DropDuplicates
DropNa
DropRareTokens
DropTokensByLength
DropTokensByList
Encode
FitOnly
FreqDrop
Log
MapColVals
OneHotEncode
PdPipeline
RegexReplace
RemoveStopwords
RowDrop
Scale
Schematize
SetIndex
SnowballStem
TfidfVectorizeTokenLists
TokenizeText
UntokenizeText
ValDrop
ValKeep
apply
description
fit
fit_transform
transform
class ColReorder (positions, **kwargs)
-
A pipeline stage that reorders columns.
Parameters
positions
:dict
- A mapping of column names to their desired positions after reordering. Columns not included in the mapping will maintain their relative positions over the non-mapped colums.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[8,4,3,7]], columns=['a', 'b', 'c', 'd']) >>> pdp.ColReorder({'b': 0, 'c': 3}).apply(df) b a d c 0 4 8 7 3
Expand source code
class ColReorder(PdPipelineStage): """A pipeline stage that reorders columns. Parameters ---------- positions : dict A mapping of column names to their desired positions after reordering. Columns not included in the mapping will maintain their relative positions over the non-mapped colums. Example ------- >>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[8,4,3,7]], columns=['a', 'b', 'c', 'd']) >>> pdp.ColReorder({'b': 0, 'c': 3}).apply(df) b a d c 0 4 8 7 3 """ _DEF_ORD_EXC_MSG = ("ColReorder stage failed because not all columns in {}" " were found in input dataframe.") def __init__(self, positions, **kwargs): self._col_to_pos = positions self._pos_to_col = reverse_dict_partial(positions) super_kwargs = { 'exmsg': ColReorder._DEF_ORD_EXC_MSG.format(self._col_to_pos), 'desc': f"Reorder columns by {self._col_to_pos}", } super_kwargs.update(**kwargs) super().__init__(**super_kwargs) def _prec(self, df): return set(self._col_to_pos.keys()).issubset(df.columns) def _transform(self, df, verbose): cols = df.columns map_cols = list(self._col_to_pos.keys()) non_map_cols = deque(x for x in cols if x not in map_cols) new_columns = [] try: for pos in range(len(cols)): if pos in self._pos_to_col: new_columns.append(self._pos_to_col[pos]) else: new_columns.append(non_map_cols.popleft()) return df[new_columns] except (IndexError): raise ValueError(f"Bad positions mapping given: {new_columns}")
Ancestors
- PdPipelineStage
- abc.ABC
Inherited members
PdPipelineStage
:AdHocStage
AggByCols
ApplyByCols
ApplyToRows
Bin
ColByFrameFunc
ColDrop
ColRename
ColReorder
ColumnDtypeEnforcer
ColumnTransformer
ColumnsBasedPipelineStage
ConditionValidator
DropDuplicates
DropNa
DropRareTokens
DropTokensByLength
DropTokensByList
Encode
FitOnly
FreqDrop
Log
MapColVals
OneHotEncode
PdPipeline
RegexReplace
RemoveStopwords
RowDrop
Scale
Schematize
SetIndex
SnowballStem
TfidfVectorizeTokenLists
TokenizeText
UntokenizeText
ValDrop
ValKeep
apply
description
fit
fit_transform
transform
class ColumnDtypeEnforcer (column_to_dtype: Dict[~KT, ~VT], errors: Optional[str] = 'raise', **kwargs: object)
-
A pipeline stage enforcing column dtypes.
Parameters
column_to_dtype
:dict
oflabels / ColumnQualifiers to dtypes
- Use {col: dtype, …}, where col is a column label and dtype is a
numpy.dtype or Python type to cast one or more of the DataFrame’s
columns to column-specific types. Alternatively, you can provide
ColumnQualifier
objects as keys. If at least one such key is present, the lbl-to-dtype dict is dynamically inferred each time the pipeline stage is applied (note thatColumnQualifier
objects are fittable by default, so to have column labels re-inferred after the first stage application you'll have to setfittable=False
for theColumnQualifier
you use, seepdpipe.cq
). errors
:{‘raise’, ‘ignore’}
, default‘raise’
- Control raising of exceptions on invalid data for provided dtype. - raise : allow exceptions to be raised - ignore : suppress exceptions. On error return original object.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'initial']) >>> pdp.ColumnDtypeEnforcer({'num': float}).apply(df) num initial 1 8.0 a 2 5.0 b >>> pdp.ColumnDtypeEnforcer({pdp.cq.StartWith('n'): float}).apply(df) num initial 1 8.0 a 2 5.0 b
Expand source code
class ColumnDtypeEnforcer(PdPipelineStage): """A pipeline stage enforcing column dtypes. Parameters ---------- column_to_dtype: dict of labels / ColumnQualifiers to dtypes Use {col: dtype, …}, where col is a column label and dtype is a numpy.dtype or Python type to cast one or more of the DataFrame’s columns to column-specific types. Alternatively, you can provide `ColumnQualifier` objects as keys. If at least one such key is present, the lbl-to-dtype dict is dynamically inferred each time the pipeline stage is applied (note that `ColumnQualifier` objects are fittable by default, so to have column labels re-inferred after the first stage application you'll have to set `fittable=False` for the `ColumnQualifier` you use, see `pdpipe.cq`). errors: {‘raise’, ‘ignore’}, default ‘raise’ Control raising of exceptions on invalid data for provided dtype. - raise : allow exceptions to be raised - ignore : suppress exceptions. On error return original object. Example ------- >>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'initial']) >>> pdp.ColumnDtypeEnforcer({'num': float}).apply(df) num initial 1 8.0 a 2 5.0 b >>> pdp.ColumnDtypeEnforcer({pdp.cq.StartWith('n'): float}).apply(df) num initial 1 8.0 a 2 5.0 b """ _DEF_COL_DTYPE_ENF_EXC_MSG = ( "ColumnDtypeEnforcer stage failed because not all columns" " {} were found in input dataframe.") def __init__( self, column_to_dtype: Dict, errors: Optional[str] = 'raise', **kwargs: object, ) -> None: # if none of the keys in column_to_dtype is a ColumnQualifier if not any(isinstance( x, ColumnQualifier) for x in column_to_dtype.keys()): # its a static map; use it as is self._column_to_dtype = column_to_dtype keys_set = set(column_to_dtype.keys()) _tprec = cond.HasAllColumns(list(keys_set)) else: # else, it's at least partly dynamic, and will have to infer it # on run time self._dynamic_column_to_dtype = column_to_dtype _tprec = cond.AlwaysTrue() self._tprec = _tprec self._errors = errors columns_str = _list_str(list(column_to_dtype.keys())) suffix = 's' if len(column_to_dtype) > 1 else '' super_kwargs = { 'exmsg': ColumnDtypeEnforcer._DEF_COL_DTYPE_ENF_EXC_MSG.format( columns_str), 'desc': f"Enforce column{suffix} dtype with {column_to_dtype}", } super_kwargs.update(**kwargs) super().__init__(**super_kwargs) def _col_to_dtype_from_df(self, df: pandas.DataFrame) -> Dict: try: return self._column_to_dtype except AttributeError: column_to_dtype = {} for k, dtype in self._dynamic_column_to_dtype.items(): try: column_to_dtype.update({ lbl: dtype for lbl in k(df) }) except TypeError: # k is not a callable column_to_dtype[k] = dtype return column_to_dtype def _prec(self, df: pandas.DataFrame) -> bool: return self._tprec(df) def _transform( self, df: pandas.DataFrame, verbose: bool, ) -> pandas.DataFrame: lbl_to_dtype = self._col_to_dtype_from_df(df) return df.astype( dtype=lbl_to_dtype, copy=True, errors=self._errors, )
Ancestors
- PdPipelineStage
- abc.ABC
Inherited members
PdPipelineStage
:AdHocStage
AggByCols
ApplyByCols
ApplyToRows
Bin
ColByFrameFunc
ColDrop
ColRename
ColReorder
ColumnDtypeEnforcer
ColumnTransformer
ColumnsBasedPipelineStage
ConditionValidator
DropDuplicates
DropNa
DropRareTokens
DropTokensByLength
DropTokensByList
Encode
FitOnly
FreqDrop
Log
MapColVals
OneHotEncode
PdPipeline
RegexReplace
RemoveStopwords
RowDrop
Scale
Schematize
SetIndex
SnowballStem
TfidfVectorizeTokenLists
TokenizeText
UntokenizeText
ValDrop
ValKeep
apply
description
fit
fit_transform
transform
class ConditionValidator (conditions: Union[Callable, List[Callable]], reducer: Optional[Callable] = <built-in function all>, errors: Optional[str] = 'raise', **kwargs: object)
-
A pipeline stage that validates boolean conditions on dataframes.
The stage does not change the input dataframe in any way.
The constructor expects either a single callable or a list-like of callable objects, and checks that all these callable return True - meaning all defined conditions hold - for input dataframes.
Naturally, pdpipe
Condition
objects from thepdpipe.cond
module can be used.Parameters
conditions
:callable
orlist-like
ofcallable
- The conditions to check for input dataframes. Naturally, pdpipe
Condition
objects from thepdpipe.cond
module can be used. reducer
:callable
, optional- The callable that reduces the list of boolean result to a single
result. By default the built-in
all
function is used, so all conditions must hold for this pipeline stage to validate an input dataframe. The built-inany
function may be used to validate at least one condition holds, and of course custom reducing functions can be used. errors
:str
, default'raise'
- If set to 'raise', the default, then if the result boolean result is
False a FailedConditionError is raised on stage application. If set to
'ignore', then conditions are checked, the results are printed if the
application was called with
verbose=True
, and pipeline application continues. Any other value is interpreted as 'raise'.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1,4],[4,None],[1,11]], [1,2,3], ['a','b']) >>> pdp.ConditionValidator(lambda df: len(df.columns) == 5).apply(df) Traceback (most recent call last): ... pdpipe.exceptions.FailedConditionError: ConditionValidator stage failed; some conditions did not hold for the input dataframe! >>> pdp.ConditionValidator(pdp.cond.HasNoMissingValues()).apply(df) Traceback (most recent call last): ... pdpipe.exceptions.FailedConditionError: ConditionValidator stage failed; some conditions did not hold for the input dataframe!
Expand source code
class ConditionValidator(PdPipelineStage): """A pipeline stage that validates boolean conditions on dataframes. The stage does not change the input dataframe in any way. The constructor expects either a single callable or a list-like of callable objects, and checks that all these callable return True - meaning all defined conditions hold - for input dataframes. Naturally, pdpipe `Condition` objects from the `pdpipe.cond` module can be used. Parameters ---------- conditions : callable or list-like of callable The conditions to check for input dataframes. Naturally, pdpipe `Condition` objects from the `pdpipe.cond` module can be used. reducer : callable, optional The callable that reduces the list of boolean result to a single result. By default the built-in `all` function is used, so all conditions must hold for this pipeline stage to validate an input dataframe. The built-in `any` function may be used to validate at least one condition holds, and of course custom reducing functions can be used. errors : str, default 'raise' If set to 'raise', the default, then if the result boolean result is False a FailedConditionError is raised on stage application. If set to 'ignore', then conditions are checked, the results are printed if the application was called with `verbose=True`, and pipeline application continues. Any other value is interpreted as 'raise'. Example ------- >>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1,4],[4,None],[1,11]], [1,2,3], ['a','b']) >>> pdp.ConditionValidator(lambda df: len(df.columns) == 5).apply(df) Traceback (most recent call last): ... pdpipe.exceptions.FailedConditionError: ConditionValidator stage failed; some conditions did not hold for the input dataframe! >>> pdp.ConditionValidator(pdp.cond.HasNoMissingValues()).apply(df) Traceback (most recent call last): ... pdpipe.exceptions.FailedConditionError: ConditionValidator stage failed; some conditions did not hold for the input dataframe! """ # noqa: E501 def __init__( self, conditions: Union[Callable, List[Callable]], reducer: Optional[Callable] = all, errors: Optional[str] = 'raise', **kwargs: object, ): if callable(conditions): conditions = [conditions] self._conditions = conditions self._reducer = reducer self._errors = 'raise' self._raise = True if errors == 'ignore': self._errors = 'ignore' self._raise = False super_kwargs = { 'desc': "Validates conditions" } super_kwargs.update(**kwargs) super().__init__(**super_kwargs) def _prec(self, df): return True def _transform(self, df, verbose): results = [] for cond_obj in self._conditions: try: res = cond_obj(df) except Exception as e: raise ValueError( f"Supplied condition raised a {e} exception when applied " "to input dataframe!" ) from e if verbose and not res: cond_repr = cond_obj.__doc__ if cond_repr is None: cond_repr = str(cond_obj) print(f" + Condition failed for input dataframe: {cond_repr}") results.append(res) reduced_result = self._reducer(results) if self._raise and not reduced_result: raise FailedConditionError( "ConditionValidator stage failed; some conditions did not hold" " for the input dataframe!") return df
Ancestors
- PdPipelineStage
- abc.ABC
Inherited members
PdPipelineStage
:AdHocStage
AggByCols
ApplyByCols
ApplyToRows
Bin
ColByFrameFunc
ColDrop
ColRename
ColReorder
ColumnDtypeEnforcer
ColumnTransformer
ColumnsBasedPipelineStage
ConditionValidator
DropDuplicates
DropNa
DropRareTokens
DropTokensByLength
DropTokensByList
Encode
FitOnly
FreqDrop
Log
MapColVals
OneHotEncode
PdPipeline
RegexReplace
RemoveStopwords
RowDrop
Scale
Schematize
SetIndex
SnowballStem
TfidfVectorizeTokenLists
TokenizeText
UntokenizeText
ValDrop
ValKeep
apply
description
fit
fit_transform
transform
class DropDuplicates (columns=None, **kwargs)
-
Drop duplicates in the given columns.
Parameters
columns
:column label
orsequence
oflabels
, optional- The labels of the columns to consider for duplication drop. If not populated, duplicates are dropped from all columns.
exclude_columns
:object, iterable
orcallable
, optional- The label, or an iterable of labels, of columns to exclude, given the
columns
parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. Seepdpipe.cq
. Optional. By default no columns are excluded.
Examples
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[8, 1],[8, 2], [9, 2]], [1,2,3], ['a', 'b']) >>> pdp.DropDuplicates('a').apply(df) a b 1 8 1 3 9 2
Expand source code
class DropDuplicates(ColumnsBasedPipelineStage): """Drop duplicates in the given columns. Parameters ---------- columns: column label or sequence of labels, optional The labels of the columns to consider for duplication drop. If not populated, duplicates are dropped from all columns. exclude_columns : object, iterable or callable, optional The label, or an iterable of labels, of columns to exclude, given the `columns` parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. See `pdpipe.cq`. Optional. By default no columns are excluded. Examples -------- >>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[8, 1],[8, 2], [9, 2]], [1,2,3], ['a', 'b']) >>> pdp.DropDuplicates('a').apply(df) a b 1 8 1 3 9 2 """ def __init__(self, columns=None, **kwargs): super_kwargs = { 'columns': columns, 'desc_temp': 'Drop duplicates in columns {}', } super_kwargs.update(**kwargs) super_kwargs['none_columns'] = 'all' super().__init__(**super_kwargs) def _transformation(self, df, verbose, fit): columns = self._get_columns(df, fit=fit) inter_df = df.drop_duplicates(subset=columns) if verbose: print(f"{len(df) - len(inter_df)} rows dropped.") return inter_df
Ancestors
Inherited members
ColumnsBasedPipelineStage
:AdHocStage
AggByCols
ApplyByCols
ApplyToRows
Bin
ColByFrameFunc
ColDrop
ColRename
ColReorder
ColumnDtypeEnforcer
ColumnTransformer
ColumnsBasedPipelineStage
ConditionValidator
DropDuplicates
DropNa
DropRareTokens
DropTokensByLength
DropTokensByList
Encode
FitOnly
FreqDrop
Log
MapColVals
OneHotEncode
PdPipeline
RegexReplace
RemoveStopwords
RowDrop
Scale
Schematize
SetIndex
SnowballStem
TfidfVectorizeTokenLists
TokenizeText
UntokenizeText
ValDrop
ValKeep
apply
description
fit
fit_transform
transform
class DropNa (**kwargs)
-
A pipeline stage that drops null values.
Supports all parameter supported by pandas.dropna function.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1,4],[4,None],[1,11]], [1,2,3], ['a','b']) >>> pdp.DropNa().apply(df) a b 1 1 4.0 3 1 11.0
Expand source code
class DropNa(PdPipelineStage): """A pipeline stage that drops null values. Supports all parameter supported by pandas.dropna function. Example ------- >>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1,4],[4,None],[1,11]], [1,2,3], ['a','b']) >>> pdp.DropNa().apply(df) a b 1 1 4.0 3 1 11.0 """ _DEF_DROPNA_EXC_MSG = "DropNa stage failed." _DROPNA_KWARGS = ['axis', 'how', 'thresh', 'subset', 'inplace'] def __init__(self, **kwargs): common = set(kwargs.keys()).intersection(DropNa._DROPNA_KWARGS) self.dropna_kwargs = {key: kwargs.pop(key) for key in common} super_kwargs = { 'exmsg': DropNa._DEF_DROPNA_EXC_MSG, 'desc': "Drops null values." } super_kwargs.update(**kwargs) super().__init__(**super_kwargs) def _prec(self, df): return True def _transform(self, df, verbose): before_count = len(df) ncols_before = len(df.columns) inter_df = df.dropna(**self.dropna_kwargs) if verbose: print( f"{before_count - len(inter_df)} rows, " f"{ncols_before - len(inter_df.columns)} columns dropeed" ) return inter_df
Ancestors
- PdPipelineStage
- abc.ABC
Inherited members
PdPipelineStage
:AdHocStage
AggByCols
ApplyByCols
ApplyToRows
Bin
ColByFrameFunc
ColDrop
ColRename
ColReorder
ColumnDtypeEnforcer
ColumnTransformer
ColumnsBasedPipelineStage
ConditionValidator
DropDuplicates
DropNa
DropRareTokens
DropTokensByLength
DropTokensByList
Encode
FitOnly
FreqDrop
Log
MapColVals
OneHotEncode
PdPipeline
RegexReplace
RemoveStopwords
RowDrop
Scale
Schematize
SetIndex
SnowballStem
TfidfVectorizeTokenLists
TokenizeText
UntokenizeText
ValDrop
ValKeep
apply
description
fit
fit_transform
transform
class FreqDrop (threshold: int, column: str, **kwargs)
-
A pipeline stage that drops rows by value frequency.
Parameters
threshold
:int
- The minimum frequency required for a value to be kept.
column
:str
- The name of the colum to check for the given value frequency.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1,4],[4,5],[1,11]], [1,2,3], ['a','b']) >>> pdp.FreqDrop(2, 'a').apply(df) a b 1 1 4 3 1 11
Expand source code
class FreqDrop(PdPipelineStage): """A pipeline stage that drops rows by value frequency. Parameters ---------- threshold : int The minimum frequency required for a value to be kept. column : str The name of the colum to check for the given value frequency. Example ------- >>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1,4],[4,5],[1,11]], [1,2,3], ['a','b']) >>> pdp.FreqDrop(2, 'a').apply(df) a b 1 1 4 3 1 11 """ _DEF_FREQDROP_EXC_MSG = ("FreqDrop stage failed because column {} was not" " found in input dataframe.") _DEF_FREQDROP_DESC = "Drop values with frequency < {} in column {}." def __init__(self, threshold: int, column: str, **kwargs): self._threshold = threshold self._column = column super_kwargs = { 'exmsg': FreqDrop._DEF_FREQDROP_EXC_MSG.format(self._column), 'desc': FreqDrop._DEF_FREQDROP_DESC.format( self._threshold, self._column) } super_kwargs.update(**kwargs) super().__init__(**super_kwargs) def _prec(self, df): return self._column in df.columns def _transform(self, df, verbose): inter_df = df before_count = len(inter_df) valcount = df[self._column].value_counts() to_drop = valcount[valcount < self._threshold].index inter_df = inter_df[~inter_df[self._column].isin(to_drop)] if verbose: print(f"{before_count - len(inter_df)} rows dropped.") return inter_df
Ancestors
- PdPipelineStage
- abc.ABC
Inherited members
PdPipelineStage
:AdHocStage
AggByCols
ApplyByCols
ApplyToRows
Bin
ColByFrameFunc
ColDrop
ColRename
ColReorder
ColumnDtypeEnforcer
ColumnTransformer
ColumnsBasedPipelineStage
ConditionValidator
DropDuplicates
DropNa
DropRareTokens
DropTokensByLength
DropTokensByList
Encode
FitOnly
FreqDrop
Log
MapColVals
OneHotEncode
PdPipeline
RegexReplace
RemoveStopwords
RowDrop
Scale
Schematize
SetIndex
SnowballStem
TfidfVectorizeTokenLists
TokenizeText
UntokenizeText
ValDrop
ValKeep
apply
description
fit
fit_transform
transform
class RowDrop (conditions, reduce=None, columns=None, **kwargs)
-
A pipeline stage that drops rows by callable conditions.
Parameters
conditions
:list-like
ordict
- The list of conditions that make a row eligible to be dropped. Each condition must be a callable that take a cell value and return a bool value. If a list of callables is given, the conditions are checked for each column value of each row. If a dict mapping column labels to callables is given, then each condition is only checked for the column values of the designated column.
reduce
:'any', 'all'
or'xor'
, default'any'
- Determines how row conditions are reduced. If set to 'all', a row must satisfy all given conditions to be dropped. If set to 'any', rows satisfying at least one of the conditions are dropped. If set to 'xor', rows satisfying exactly one of the conditions will be dropped. Set to 'any' by default.
columns
:single label, iterable
orcallable
, optional- The label, or an iterable of labels, of columns. Alternatively,
this parameter can be assigned a callable returning an iterable of
labels from an input pandas.DataFrame. See
pdpipe.cq
. If given, input conditions will be applied to the sub-dataframe made up of these columns to determine which rows to drop. Ignored ifconditions
is provided with a dict object. Ifconditions
is a list and this parameter is not provided, all columns are checked (unlessexclude_columns
is additionally provided) exclude_columns
:single label, iterable
orcallable
, optional- The label, or an iterable of labels, of columns to exclude, given the
columns
parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. Seepdpipe.cq
. Optional. By default no columns are excluded.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1,4],[4,5],[5,11]], [1,2,3], ['a','b']) >>> pdp.RowDrop([lambda x: x < 2]).apply(df) a b 2 4 5 3 5 11 >>> pdp.RowDrop({'a': lambda x: x == 4}).apply(df) a b 1 1 4 3 5 11
Expand source code
class RowDrop(ColumnsBasedPipelineStage): """A pipeline stage that drops rows by callable conditions. Parameters ---------- conditions : list-like or dict The list of conditions that make a row eligible to be dropped. Each condition must be a callable that take a cell value and return a bool value. If a list of callables is given, the conditions are checked for each column value of each row. If a dict mapping column labels to callables is given, then each condition is only checked for the column values of the designated column. reduce : 'any', 'all' or 'xor', default 'any' Determines how row conditions are reduced. If set to 'all', a row must satisfy all given conditions to be dropped. If set to 'any', rows satisfying at least one of the conditions are dropped. If set to 'xor', rows satisfying exactly one of the conditions will be dropped. Set to 'any' by default. columns : single label, iterable or callable, optional The label, or an iterable of labels, of columns. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See `pdpipe.cq`. If given, input conditions will be applied to the sub-dataframe made up of these columns to determine which rows to drop. Ignored if `conditions` is provided with a dict object. If `conditions` is a list and this parameter is not provided, all columns are checked (unless `exclude_columns` is additionally provided) exclude_columns : single label, iterable or callable, optional The label, or an iterable of labels, of columns to exclude, given the `columns` parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. See `pdpipe.cq`. Optional. By default no columns are excluded. Example ------- >>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1,4],[4,5],[5,11]], [1,2,3], ['a','b']) >>> pdp.RowDrop([lambda x: x < 2]).apply(df) a b 2 4 5 3 5 11 >>> pdp.RowDrop({'a': lambda x: x == 4}).apply(df) a b 1 1 4 3 5 11 """ _REDUCERS = { 'all': all, 'any': any, 'xor': lambda x: sum(x) == 1 } class DictRowCond(object): """Filter rows by a dict of conditions.""" def __init__(self, conditions, reducer): self.conditions = conditions self.reducer = reducer def __call__(self, row): res = [cond(row[lbl]) for lbl, cond in self.conditions.items()] return self.reducer(res) class ListRowCond(object): """Filter rows by a list of conditions.""" def __init__(self, conditions, reducer): self.conditions = conditions self.reducer = reducer def __call__(self, row): res = [self.reducer(row.apply(cond)) for cond in self.conditions] return self.reducer(res) def _row_condition_builder(self, conditions, reduce): reducer = RowDrop._REDUCERS[reduce] if self._cond_is_dict: row_cond = RowDrop.DictRowCond( conditions=conditions, reducer=reducer) else: row_cond = RowDrop.ListRowCond( conditions=conditions, reducer=reducer) return row_cond def __init__(self, conditions, reduce=None, columns=None, **kwargs): self._conditions = conditions if reduce is None: reduce = 'any' self._reduce = reduce if reduce not in RowDrop._REDUCERS.keys(): raise ValueError(( "{} is an unsupported argument for the 'reduce' parameter of " "the RowDrop constructor!").format(reduce)) self._cond_is_dict = isinstance(conditions, dict) if self._cond_is_dict: valid = all([callable(cond) for cond in conditions.values()]) if not valid: raise ValueError( "Condition dicts given to RowDrop must map to callables!") columns = list(conditions.keys()) else: valid = all([callable(cond) for cond in conditions]) if not valid: raise ValueError( "RowDrop condition lists can contain only callables!") self._row_cond = self._row_condition_builder(conditions, reduce) super_kwargs = { 'columns': columns, 'desc_temp': 'Drop rows in columns {} by conditions', } super_kwargs.update(**kwargs) super_kwargs['none_columns'] = 'all' super().__init__(**super_kwargs) def _transformation(self, df, verbose, fit): before_count = len(df) columns = self._get_columns(df, fit=fit) subdf = df[columns] drop_index = ~subdf.apply(self._row_cond, axis=1) inter_df = df[drop_index] if verbose: print(f"{before_count - len(inter_df)} rows dropped.") return inter_df
Ancestors
Class variables
var DictRowCond
-
Filter rows by a dict of conditions.
var ListRowCond
-
Filter rows by a list of conditions.
Inherited members
ColumnsBasedPipelineStage
:AdHocStage
AggByCols
ApplyByCols
ApplyToRows
Bin
ColByFrameFunc
ColDrop
ColRename
ColReorder
ColumnDtypeEnforcer
ColumnTransformer
ColumnsBasedPipelineStage
ConditionValidator
DropDuplicates
DropNa
DropRareTokens
DropTokensByLength
DropTokensByList
Encode
FitOnly
FreqDrop
Log
MapColVals
OneHotEncode
PdPipeline
RegexReplace
RemoveStopwords
RowDrop
Scale
Schematize
SetIndex
SnowballStem
TfidfVectorizeTokenLists
TokenizeText
UntokenizeText
ValDrop
ValKeep
apply
description
fit
fit_transform
transform
class Schematize (columns, **kwargs)
-
Enforces a column schema on input dataframes.
Parameters
columns
:sequence
oflabels
- The dataframe schema to enforce on input dataframes.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[2, 4, 8],[3, 6, 9]], [1, 2], ['a', 'b', 'c']) >>> pdp.Schematize(['a', 'c']).apply(df) a c 1 2 8 2 3 9 >>> pdp.Schematize(['c', 'b']).apply(df) c b 1 8 4 2 9 6
Expand source code
class Schematize(PdPipelineStage): """Enforces a column schema on input dataframes. Parameters ---------- columns: sequence of labels The dataframe schema to enforce on input dataframes. Example ------- >>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[2, 4, 8],[3, 6, 9]], [1, 2], ['a', 'b', 'c']) >>> pdp.Schematize(['a', 'c']).apply(df) a c 1 2 8 2 3 9 >>> pdp.Schematize(['c', 'b']).apply(df) c b 1 8 4 2 9 6 """ def __init__(self, columns, **kwargs): self._columns = _interpret_columns_param(columns) self._columns_str = _list_str(self._columns) desc = ( f"Transform input dataframes to the following schema: " f"{self._columns_str}" ) exmsg = ( f"Not all required columns {self._columns_str} " f"found in input dataframe!" ) super_kwargs = { 'exmsg': exmsg, 'desc': desc, } super_kwargs.update(**kwargs) super().__init__(**super_kwargs) def _prec(self, df): return set(self._columns).issubset(df.columns) def _transform(self, df, verbose=None): return df[self._columns]
Ancestors
- PdPipelineStage
- abc.ABC
Inherited members
PdPipelineStage
:AdHocStage
AggByCols
ApplyByCols
ApplyToRows
Bin
ColByFrameFunc
ColDrop
ColRename
ColReorder
ColumnDtypeEnforcer
ColumnTransformer
ColumnsBasedPipelineStage
ConditionValidator
DropDuplicates
DropNa
DropRareTokens
DropTokensByLength
DropTokensByList
Encode
FitOnly
FreqDrop
Log
MapColVals
OneHotEncode
PdPipeline
RegexReplace
RemoveStopwords
RowDrop
Scale
Schematize
SetIndex
SnowballStem
TfidfVectorizeTokenLists
TokenizeText
UntokenizeText
ValDrop
ValKeep
apply
description
fit
fit_transform
transform
class SetIndex (keys, **kwargs)
-
A pipeline stage that set existing columns as index.
Supports all parameter supported by pandas.set_index function except for
inplace
.Example
>> import pandas as pd; import pdpipe as pdp; >> df = pd.DataFrame([[1,4],[3, 11]], [1,2], ['a','b']) >> pdp.SetIndex('a').apply(df) b a 1 4 3 11
Expand source code
class SetIndex(PdPipelineStage): """A pipeline stage that set existing columns as index. Supports all parameter supported by pandas.set_index function except for `inplace`. Example ------- >> import pandas as pd; import pdpipe as pdp; >> df = pd.DataFrame([[1,4],[3, 11]], [1,2], ['a','b']) >> pdp.SetIndex('a').apply(df) b a 1 4 3 11 """ _DEF_SETIDX_EXC_MSG = "SetIndex stage failed." _DEF_SETIDX_APP_MSG = "Setting indexes..." _SETINDEX_KWARGS = ['drop', 'append', 'verify_integrity'] def __init__(self, keys, **kwargs): common = set(kwargs.keys()).intersection(SetIndex._SETINDEX_KWARGS) self.setindex_kwargs = {key: kwargs.pop(key) for key in common} self.keys = keys if hasattr(keys, '__iter__') and not isinstance(keys, str): _tprec = cond.HasAllColumns(list(keys)) else: _tprec = cond.HasAllColumns([keys]) self._tprec = _tprec super_kwargs = { 'exmsg': SetIndex._DEF_SETIDX_EXC_MSG, 'desc': "Set indexes." } super_kwargs.update(**kwargs) super().__init__(**super_kwargs) def _prec(self, df): return self._tprec(df) def _transform(self, df, verbose): return df.set_index(keys=self.keys, **self.setindex_kwargs)
Ancestors
- PdPipelineStage
- abc.ABC
Inherited members
PdPipelineStage
:AdHocStage
AggByCols
ApplyByCols
ApplyToRows
Bin
ColByFrameFunc
ColDrop
ColRename
ColReorder
ColumnDtypeEnforcer
ColumnTransformer
ColumnsBasedPipelineStage
ConditionValidator
DropDuplicates
DropNa
DropRareTokens
DropTokensByLength
DropTokensByList
Encode
FitOnly
FreqDrop
Log
MapColVals
OneHotEncode
PdPipeline
RegexReplace
RemoveStopwords
RowDrop
Scale
Schematize
SetIndex
SnowballStem
TfidfVectorizeTokenLists
TokenizeText
UntokenizeText
ValDrop
ValKeep
apply
description
fit
fit_transform
transform
class ValDrop (values: List[object], columns: Union[object, List[object], Callable] = None, **kwargs: object)
-
A pipeline stage that drops rows by value.
Parameters
values
:list-like
- A list of the values to drop.
columns
:single label, list-like
orcallable
, defaultNone
- The label, or an iterable of labels, of columns to check for the given
values. Alternatively, this parameter can be assigned a callable
returning an iterable of labels from an input pandas.DataFrame. See
pdpipe.cq
. If set to None, all columns are checked. exclude_columns
:label, iterable
orcallable
, optional- The label, or an iterable of labels, of columns to exclude, given the
columns
parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. Seepdpipe.cq
. Optional. By default no columns are excluded.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1,4],[4,5],[18,11]], [1,2,3], ['a','b']) >>> pdp.ValDrop([4], 'a').apply(df) a b 1 1 4 3 18 11 >>> pdp.ValDrop([4]).apply(df) a b 3 18 11
Expand source code
class ValDrop(ColumnsBasedPipelineStage): """A pipeline stage that drops rows by value. Parameters ---------- values : list-like A list of the values to drop. columns : single label, list-like or callable, default None The label, or an iterable of labels, of columns to check for the given values. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See `pdpipe.cq`. If set to None, all columns are checked. exclude_columns : label, iterable or callable, optional The label, or an iterable of labels, of columns to exclude, given the `columns` parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. See `pdpipe.cq`. Optional. By default no columns are excluded. Example ------- >>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1,4],[4,5],[18,11]], [1,2,3], ['a','b']) >>> pdp.ValDrop([4], 'a').apply(df) a b 1 1 4 3 18 11 >>> pdp.ValDrop([4]).apply(df) a b 3 18 11 """ def __init__( self, values: List[object], columns: ColumnsParamType = None, **kwargs: object, ) -> None: self._values = values self._values_str = _list_str(self._values) super_kwargs = { 'columns': columns, 'desc_temp': f'Drop values {self._values_str} in columns {{}}', } super_kwargs.update(**kwargs) super_kwargs['none_columns'] = 'all' super().__init__(**super_kwargs) def _transformation( self, df: pandas.DataFrame, verbose: bool, fit: bool, ) -> pandas.DataFrame: inter_df = df before_count = len(inter_df) columns_to_check = self._get_columns(df, fit=fit) for col in columns_to_check: inter_df = inter_df[~inter_df[col].isin(self._values)] if verbose: print(f"{before_count - len(inter_df)} rows dropped.") return inter_df
Ancestors
Inherited members
ColumnsBasedPipelineStage
:AdHocStage
AggByCols
ApplyByCols
ApplyToRows
Bin
ColByFrameFunc
ColDrop
ColRename
ColReorder
ColumnDtypeEnforcer
ColumnTransformer
ColumnsBasedPipelineStage
ConditionValidator
DropDuplicates
DropNa
DropRareTokens
DropTokensByLength
DropTokensByList
Encode
FitOnly
FreqDrop
Log
MapColVals
OneHotEncode
PdPipeline
RegexReplace
RemoveStopwords
RowDrop
Scale
Schematize
SetIndex
SnowballStem
TfidfVectorizeTokenLists
TokenizeText
UntokenizeText
ValDrop
ValKeep
apply
description
fit
fit_transform
transform
class ValKeep (values, columns=None, **kwargs)
-
A pipeline stage that keeps rows by value.
Parameters
values
:list-like
- A list of the values to keep.
columns
:single label, list-like
orcallable
, defaultNone
- The label, or an iterable of labels, of columns to check for the given
values. Alternatively, this parameter can be assigned a callable
returning an iterable of labels from an input pandas.DataFrame. See
pdpipe.cq
. If set to None, all columns are checked. exclude_columns
:single label, iterable
orcallable
, optional- The label, or an iterable of labels, of columns to exclude, given the
columns
parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. Seepdpipe.cq
. Optional. By default no columns are excluded.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1,4],[4,5],[5,11]], [1,2,3], ['a','b']) >>> pdp.ValKeep([4, 5], 'a').apply(df) a b 2 4 5 3 5 11 >>> pdp.ValKeep([4, 5]).apply(df) a b 2 4 5
Expand source code
class ValKeep(ColumnsBasedPipelineStage): """A pipeline stage that keeps rows by value. Parameters ---------- values : list-like A list of the values to keep. columns : single label, list-like or callable, default None The label, or an iterable of labels, of columns to check for the given values. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See `pdpipe.cq`. If set to None, all columns are checked. exclude_columns : single label, iterable or callable, optional The label, or an iterable of labels, of columns to exclude, given the `columns` parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. See `pdpipe.cq`. Optional. By default no columns are excluded. Example ------- >>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1,4],[4,5],[5,11]], [1,2,3], ['a','b']) >>> pdp.ValKeep([4, 5], 'a').apply(df) a b 2 4 5 3 5 11 >>> pdp.ValKeep([4, 5]).apply(df) a b 2 4 5 """ def __init__(self, values, columns=None, **kwargs): self._values = values self._values_str = _list_str(self._values) super_kwargs = { 'columns': columns, 'desc_temp': f'Keep values {self._values_str} in columns {{}}', } super_kwargs.update(**kwargs) super_kwargs['none_columns'] = 'all' super().__init__(**super_kwargs) def _transformation(self, df, verbose, fit): inter_df = df before_count = len(inter_df) columns_to_check = self._get_columns(df, fit=fit) for col in columns_to_check: inter_df = inter_df[inter_df[col].isin(self._values)] if verbose: print(f"{before_count - len(inter_df)} rows dropped.") return inter_df
Ancestors
Inherited members
ColumnsBasedPipelineStage
:AdHocStage
AggByCols
ApplyByCols
ApplyToRows
Bin
ColByFrameFunc
ColDrop
ColRename
ColReorder
ColumnDtypeEnforcer
ColumnTransformer
ColumnsBasedPipelineStage
ConditionValidator
DropDuplicates
DropNa
DropRareTokens
DropTokensByLength
DropTokensByList
Encode
FitOnly
FreqDrop
Log
MapColVals
OneHotEncode
PdPipeline
RegexReplace
RemoveStopwords
RowDrop
Scale
Schematize
SetIndex
SnowballStem
TfidfVectorizeTokenLists
TokenizeText
UntokenizeText
ValDrop
ValKeep
apply
description
fit
fit_transform
transform