Module pdpipe.core
Defines pipelines for processing pandas.DataFrame-based datasets.
>>> import pdpipe as pdp
>>> pipeline = pdp.ColDrop('Name') + pdp.Bin({'Speed': [0,5]})
>>> pipeline = pdp.ColDrop('Name').Bin({'Speed': [0,5]}, drop=True)
Creating pipeline stages that operate on column subsets
Many pipeline stages in pdpipe operate on a subset of columns, allowing the
caller to determine this subset by either providing a fixed set of column
labels or by providing a callable that determines the column subset dynamically
from input dataframes. The pdpipe.cq
module addresses a unique but important
use case of fittable column qualifier, which is to dynamically extract a column
subset on stage fit time, but keep it fixed for future transformations.
As a general rule, every pipeline stage in pdpipe that supports the columns
parameter should inherently support fittable column qualifier, and generally
the correct interpretation of both single and multiple labels as arguments. To
unify the implementation of such functionality, and ease of creation of new
pipeline stages, such columns should be created by extending the
ColumnsBasedPipelineStage base class, found in this module (pdpipe.core
).
The main interface of sub-classes of this base class with it is through the
columns
, exclude_columns
and none_columns
constructor arguments, and the
"private" _get_columns(df, fit)
method:
-
Any extending subclass should accept the
columns
constructor parameter and forward it, without transforming it, to the constructor of ColumnsBasedPipelineStage. E.g.super().__init__(columns=columns, **kwargs)
. See the implementation of any such extending class for a more complete example. -
Extending subclasses can decide if they want to expose the
exclude_columns
parameter or not. Note that most of its functionality can anyway be gained by providing thecolumns
parameter with a column qualifier object that is a difference between two column qualifiers; e.g.columns=cq.OfDtype(np.number) - cq.OfDtype(np.int64)
is equivalent to providingcolumns=cq.OfDtype(np.number), exclude_columns=cq.OfDtype(np.int64)
. However, exposing theexclude_columns
parameter can allow for specific unique behaviours; for example, if thenone_columns
parameter - which configures the behavior whencolumns
is provided withNone
- is set with acq.OfDtypes('category')
column qualifier, which means that all categorical columns are selected whencolumns=None
, then exposingexclude_columns
allows for easy specification of the "all categorical columns except X" by just giving a column qualifier capturing X toexclude_columns
, instead of having to reconstruct the default column qualifier by hand and substract from it the one representing X. -
When wishing to get the subset of columns to operate on, in
fit_transform
ortransform
time, it is attained by callingself._get_columns(df, fit=True)
(or withfit=False
if just transforming), providing it the input dataframe. -
Additionally, to get a description and application message with a nice string representation of the list of columns to operate on, the
desc_temp
constructor parameter of ColumnsBasedPipelineStage can be provided with a format string with a place holder where the column list should go. E.g."Drop columns {}"
for the DropCol pipeline stage.
There are two correct ways to extend it, depending on whether the pipeline stage you're creating is inherently fittable or not:
-
If the stage is NOT inherently fittable, then the ability to accept fittable column qualifier objects makes it so. However, to enable extending subclasses to implement their transformation using a single method, they can simply implement the abstract method
_transformation(self, df, verbose, fit)
. It should treat thedf
andverbose
parameters normally, but forward thefit
parameter to the_get_columns
method when calling it. This is enough to get a pipeline stage with the desired behavior, with the super-class handling all the fit/transform functionality. -
If the stage IS inherently fittable, then do not use the
_transformation
abstract method (it has to be implemented, so just have it raise a NotImplementedError). Instead, simply override the_fit_transform
and_transform
method of ColumnsBasedPipelineStage, calling thefit
parameter of the_get_columns
method with the correct arguement:True
when fit-transforming andFalse
when transforming.
Again, taking a look at the VERY concise implementation of simple columns-based
stages, like ColDrop or ValDrop in pdpipe.basic_stages
, will probably make
things clearer, and you can use those implementations as a template for yours.
Expand source code
"""Defines pipelines for processing pandas.DataFrame-based datasets.
>>> import pdpipe as pdp
>>> pipeline = pdp.ColDrop('Name') + pdp.Bin({'Speed': [0,5]})
>>> pipeline = pdp.ColDrop('Name').Bin({'Speed': [0,5]}, drop=True)
## Creating pipeline stages that operate on column subsets
Many pipeline stages in pdpipe operate on a subset of columns, allowing the
caller to determine this subset by either providing a fixed set of column
labels or by providing a callable that determines the column subset dynamically
from input dataframes. The `pdpipe.cq` module addresses a unique but important
use case of fittable column qualifier, which is to dynamically extract a column
subset on stage fit time, but keep it fixed for future transformations.
As a general rule, every pipeline stage in pdpipe that supports the `columns`
parameter should inherently support fittable column qualifier, and generally
the correct interpretation of both single and multiple labels as arguments. To
unify the implementation of such functionality, and ease of creation of new
pipeline stages, such columns should be created by extending the
ColumnsBasedPipelineStage base class, found in this module (`pdpipe.core`).
The main interface of sub-classes of this base class with it is through the
`columns`, `exclude_columns` and `none_columns` constructor arguments, and the
"private" `_get_columns(df, fit)` method:
* Any extending subclass should accept the `columns` constructor parameter
and forward it, without transforming it, to the constructor of
ColumnsBasedPipelineStage. E.g.
`super().__init__(columns=columns, **kwargs)`. See the implementation of
any such extending class for a more complete example.
* Extending subclasses can decide if they want to expose the
`exclude_columns` parameter or not. Note that most of its functionality
can anyway be gained by providing the `columns` parameter with a column
qualifier object that is a difference between two column qualifiers; e.g.
`columns=cq.OfDtype(np.number) - cq.OfDtype(np.int64)` is equivalent to
providing `columns=cq.OfDtype(np.number),
exclude_columns=cq.OfDtype(np.int64)`. However, exposing the
`exclude_columns` parameter can allow for specific unique behaviours; for
example, if the `none_columns` parameter - which configures the behavior
when `columns` is provided with `None` - is set with
a `cq.OfDtypes('category')` column qualifier, which means that all
categorical columns are selected when `columns=None`, then exposing
`exclude_columns` allows for easy specification of the "all categorical
columns except X" by just giving a column qualifier capturing X to
`exclude_columns`, instead of having to reconstruct the default column
qualifier by hand and substract from it the one representing X.
* When wishing to get the subset of columns to operate on, in
`fit_transform` or `transform` time, it is attained by calling
`self._get_columns(df, fit=True)` (or with `fit=False` if just
transforming), providing it the input dataframe.
* Additionally, to get a description and application message with a nice
string representation of the list of columns to operate on, the
`desc_temp` constructor parameter of ColumnsBasedPipelineStage can be
provided with a format string with a place holder where the column list
should go. E.g. `"Drop columns {}"` for the DropCol pipeline stage.
There are two correct ways to extend it, depending on whether the pipeline
stage you're creating is inherently fittable or not:
1. If the stage is NOT inherently fittable, then the ability to accept
fittable column qualifier objects makes it so. However, to enable
extending subclasses to implement their transformation using a single
method, they can simply implement the abstract method
`_transformation(self, df, verbose, fit)`. It should treat the `df` and
`verbose` parameters normally, but forward the `fit` parameter to the
`_get_columns` method when calling it. This is enough to get a pipeline
stage with the desired behavior, with the super-class handling all the
fit/transform functionality.
2. If the stage IS inherently fittable, then do not use the
`_transformation` abstract method (it has to be implemented, so just
have it raise a NotImplementedError). Instead, simply override the
`_fit_transform` and `_transform` method of ColumnsBasedPipelineStage,
calling the `fit` parameter of the `_get_columns` method with the
correct arguement: `True` when fit-transforming and `False` when
transforming.
Again, taking a look at the VERY concise implementation of simple columns-based
stages, like ColDrop or ValDrop in `pdpipe.basic_stages`, will probably make
things clearer, and you can use those implementations as a template for yours.
"""
import sys
import abc
import time
import inspect
import collections
import textwrap
try:
from pympler.asizeof import asizeof
except ImportError:
from sys import getsizeof as asizeof
from .cq import is_fittable_column_qualifier, AllColumns
from .shared import _get_args_list
from .exceptions import (
FailedPreconditionError,
FailedPostconditionError,
UnfittedPipelineStageError,
PipelineApplicationError
)
# === loading stage attributes ===
def __get_append_stage_attr_doc(class_obj):
doc = class_obj.__doc__
first_line = doc[0:doc.find('.') + 1]
if "An" in first_line:
new_first_line = first_line.replace("An", "Creates and adds an", 1)
else:
new_first_line = first_line.replace("A", "Creates and adds a", 1)
new_first_line = new_first_line[0:-1] + (
" to this pipeline stage.")
return doc.replace(first_line, new_first_line, 1)
def __load_stage_attribute__(class_obj):
def _append_stage_func(self, *args, **kwds):
# self is always a PdPipelineStage
return self + class_obj(*args, **kwds)
_append_stage_func.__doc__ = __get_append_stage_attr_doc(class_obj)
_append_stage_func.__name__ = class_obj.__name__ # .lower()
_append_stage_func.__signature__ = inspect.signature(class_obj.__init__)
setattr(PdPipelineStage, class_obj.__name__, _append_stage_func)
# unbound_method = types.MethodType(_append_stage_func, class_obj)
# setattr(class_obj, class_obj.__name__, unbound_method)
def __load_stage_attributes_from_module__(module_name):
module_obj = sys.modules[module_name]
for name, obj in inspect.getmembers(module_obj):
if inspect.isclass(obj) and obj.__module__ == module_name:
class_obj = getattr(module_obj, name)
if issubclass(class_obj, PdPipelineStage) and (
class_obj.__name__ != 'PdPipelineStage'):
__load_stage_attribute__(class_obj)
# === basic classes ===
class PdpApplicationContext(dict):
"""An object encapsulating the application context of a pipeline.
It is meant to communicate data, information and variables between
different stages of a pipeline.
Parameters
----------
fit_context : PdpApplicationContext, optional
Another application context object, representing the application
context of a previous fit of the pipelline this application context
is initialized for. Optional.
"""
def __init__(self, fit_context=None):
self.__locked__ = False
self._fit_context__ = fit_context
def __setitem__(self, key, value):
if not self.__locked__:
super().__setitem__(key, value)
def __delitem__(self, key):
if not self.__locked__:
super().__delitem__(key)
def pop(self, key, default):
"""If key is in the dictionary, remove it and return its value, else
return default. If default is not given and key is not in the
dictionary, a KeyError is raised.
"""
if not self.__locked__:
return super().pop(key, default)
return super().__getitem__(key)
def clear(self):
"""Remove all items from the dictionary."""
if not self.__locked__:
super().clear()
def popitem(self):
"""Not implemented!"""
raise NotImplementedError
def update(self, other):
"""Update the dictionary with the key/value pairs from other,
overwriting existing keys. Return None.
update() accepts either another dictionary object or an iterable of
key/value pairs (as tuples or other iterables of length two). If
keyword arguments are specified, the dictionary is then updated with
those key/value pairs: d.update(red=1, blue=2).
"""
if not self.__locked__:
super().update(other)
def lock(self):
"""Locks this application context for changes."""
self.__locked__ = True
def fit_context(self):
"""Returns a locked PdpApplicationContext object of a previous fit."""
return self._fit_context__
class PdPipelineStage(abc.ABC):
"""A stage of a pandas DataFrame-processing pipeline.
Parameters
----------
exraise : bool, default True
If true, a pdpipe.FailedPreconditionError is raised when this
stage is applied to a dataframe for which the precondition does
not hold. Otherwise the stage is skipped. Additionally, if true, a
pdpipe.FailedPostconditionError is raised if an expected post-condition
does not hold for an output dataframe (after pipeline application).
Otherwise pipeline application continues uninterrupted.
exmsg : str, default None
The message of the exception that is raised on a failed
precondition if exraise is set to True. A default message is used
if None is given.
desc : str, default None
A short description of this stage, used as its string representation.
A default description is used if None is given.
prec : callable, default None
This can be assigned a callable that returns boolean values for input
dataframes, which will be used to determine whether input dataframes
satisfy the preconditions for this pipeline stage (see the `exraise`
parameter for the behaviour of failed preconditions). See `pdpipe.cond`
for more information on specialised Condition objects.
post : callable, default None
This can be assigned a callable that returns boolean values for input
dataframes, which will be used to determine whether input dataframes
satisfy the postconditions for this pipeline stage (see the `exraise`
parameter for the behaviour of failed postconditions). See
`pdpipe.cond` for more information on specialised Condition objects.
skip : callable, default None
This can be assigned a callable that returns boolean values for input
dataframes, which will be used to determine whether this stage should
be skipped for input dataframes - if the callable returns True for an
input dataframe, this stage will be skipped. See `pdpipe.cond` for more
information on specialised Condition objects.
name : str, default ''
The name of this stage. Pipelines can be sliced by this name.
Attributes
----------
fit_context : `PdpApplicationContext`
An application context object that is only re-initialized before
`fit_transform` calls, and is locked after pipeline application. It is
injected into the PipelineStage by the encapsulating pipeline object.
application_context : `PdpApplicationContext`
An application context object that is re-initialized before every
pipeline application (so, also during transform operations of fitted
pipelines), and is locked after pipeline application.It is injected
into the PipelineStage by the encapsulating pipeline object.
"""
_DEF_EXC_MSG = 'Precondition failed in stage {}!'
_DEF_DESCRIPTION = 'A pipeline stage.'
_INIT_KWARGS = ['exraise', 'exmsg', 'desc', 'prec', 'skip', 'name']
def __init__(self, exraise=True, exmsg=None, desc=None, prec=None,
post=None, skip=None, name=''):
if not isinstance(name, str):
raise ValueError(
f"'name' must be a str, not {type(name).__name__}."
)
if desc is None:
desc = PdPipelineStage._DEF_DESCRIPTION
if exmsg is None:
exmsg = PdPipelineStage._DEF_EXC_MSG.format(desc)
self._exraise = exraise
self._exmsg = exmsg
self._exmsg_post = exmsg.replace(
'precondition', 'postcondition').replace(
'Precondition', 'Postcondition')
self._desc = desc
self._prec_arg = prec
self._post_arg = post
self._skip = skip
self._appmsg = f"{name + ': ' if name else ''}{desc}"
self._name = name
self.fit_context: PdpApplicationContext = None
self.application_context: PdpApplicationContext = None
self.is_fitted = False
@classmethod
def _init_kwargs(cls):
return cls._INIT_KWARGS
@abc.abstractmethod
def _prec(self, df): # pylint: disable=R0201,W0613
"""Returns True if this stage can be applied to the given dataframe."""
raise NotImplementedError
def _compound_prec(self, df):
if self._prec_arg:
return self._prec_arg(df)
return self._prec(df)
def _post(self, df): # pylint: disable=R0201,W0613
"""Returns True if this stage resulted in an expected output frame."""
return True
def _compound_post(self, df):
if self._post_arg:
return self._post_arg(df)
return self._post(df)
def _fit_transform(self, df, verbose):
"""Fits this stage and transforms the input dataframe."""
return self._transform(df, verbose)
def _is_fittable(self):
if self.__class__._fit_transform == PdPipelineStage._fit_transform:
return False
return True
def _raise_precondition_error(self):
try:
raise FailedPreconditionError(
f"{self._exmsg} [Reason] {self._prec_arg.error_message}")
except AttributeError:
raise FailedPreconditionError(self._exmsg)
def _raise_postcondition_error(self):
try:
raise FailedPostconditionError(
f"{self._exmsg_post} [Reason] {self._post_arg.error_message}")
except AttributeError:
raise FailedPostconditionError(self._exmsg_post)
@abc.abstractmethod
def _transform(self, df, verbose):
"""Transforms the given dataframe without fitting this stage."""
raise NotImplementedError("_transform method not implemented!")
def apply(self, df, exraise=None, verbose=False):
"""Applies this pipeline stage to the given dataframe.
If the stage is not fitted fit_transform is called. Otherwise,
transform is called.
Parameters
----------
df : pandas.DataFrame
The dataframe to which this pipeline stage will be applied.
exraise : bool, default None
Override preconditions and postconditions behaviour for this call.
If None, the default behaviour of this stage is used, as determined
by the exraise constructor parameter.
verbose : bool, default False
If True an explanation message is printed after the precondition
is checked but before the application of the pipeline stage.
Defaults to False.
Returns
-------
pandas.DataFrame
The resulting dataframe.
"""
if exraise is None:
exraise = self._exraise
if self._skip and self._skip(df):
return df
if self._compound_prec(df=df):
if verbose:
msg = '- ' + '\n '.join(textwrap.wrap(self._appmsg))
print(msg, flush=True)
if self.is_fitted:
res_df = self._transform(df, verbose=verbose)
else:
res_df = self._fit_transform(df, verbose=verbose)
if exraise and not self._compound_post(df=res_df):
self._raise_postcondition_error()
return res_df
if exraise:
self._raise_precondition_error()
return df
__call__ = apply
def fit_transform(self, X, y=None, exraise=None, verbose=False):
"""Fits this stage and transforms the given dataframe.
Parameters
----------
X : pandas.DataFrame
The dataframe to transform and fit this pipeline stage by.
y : array-like, optional
Targets for supervised learning.
exraise : bool, default None
Override preconditions and postconditions behaviour for this call.
If None, the default behaviour of this stage is used, as determined
by the exraise constructor parameter.
verbose : bool, default False
If True an explanation message is printed after the precondition
is checked but before the application of the pipeline stage.
Defaults to False.
Returns
-------
pandas.DataFrame
The resulting dataframe.
"""
if exraise is None:
exraise = self._exraise
if self._compound_prec(X):
if verbose:
msg = '- ' + '\n '.join(textwrap.wrap(self._appmsg))
print(msg, flush=True)
res_df = self._fit_transform(X, verbose=verbose)
if exraise and not self._compound_post(df=res_df):
self._raise_postcondition_error()
return res_df
if exraise:
self._raise_precondition_error()
return X
def fit(self, X, y=None, exraise=None, verbose=False):
"""Fits this stage without transforming the given dataframe.
Parameters
----------
X : pandas.DataFrame
The dataframe to be transformed.
y : array-like, optional
Targets for supervised learning.
exraise : bool, default None
Override preconditions and postconditions behaviour for this call.
If None, the default behaviour of this stage is used, as determined
by the exraise constructor parameter.
verbose : bool, default False
If True an explanation message is printed after the precondition
is checked but before the application of the pipeline stage.
Defaults to False.
Returns
-------
pandas.DataFrame
The resulting dataframe.
"""
if exraise is None:
exraise = self._exraise
if self._compound_prec(X):
if verbose:
msg = '- ' + '\n '.join(textwrap.wrap(self._appmsg))
print(msg, flush=True)
res_df = self._fit_transform(X, verbose=verbose)
if exraise and not self._compound_post(df=res_df):
self._raise_postcondition_error()
return X
if exraise:
self._raise_precondition_error()
return X
def transform(self, X, y=None, exraise=None, verbose=False):
"""Transforms the given dataframe without fitting this stage.
If this stage is fittable but is not fitter, an
UnfittedPipelineStageError is raised.
Parameters
----------
X : pandas.DataFrame
The dataframe to be transformed.
y : array-like, optional
Targets for supervised learning.
exraise : bool, default None
Override preconditions and postconditions behaviour for this call.
If None, the default behaviour of this stage is used, as determined
by the exraise constructor parameter.
verbose : bool, default False
If True an explanation message is printed after the precondition
is checked but before the application of the pipeline stage.
Defaults to False.
Returns
-------
pandas.DataFrame
The resulting dataframe.
"""
if exraise is None:
exraise = self._exraise
if self._compound_prec(X):
if verbose:
msg = '- ' + '\n '.join(textwrap.wrap(self._appmsg))
print(msg, flush=True)
if self._is_fittable():
if self.is_fitted:
res_df = self._transform(X, verbose=verbose)
if exraise and not self._compound_post(df=res_df):
self._raise_postcondition_error()
return res_df
raise UnfittedPipelineStageError(
"transform of an unfitted pipeline stage was called!")
res_df = self._transform(X, verbose=verbose)
if exraise and not self._compound_post(df=res_df):
self._raise_postcondition_error()
return res_df
if exraise:
self._raise_precondition_error()
return X
def __add__(self, other):
if isinstance(other, PdPipeline):
return PdPipeline([self, *other._stages])
if isinstance(other, PdPipelineStage):
return PdPipeline([self, other])
return NotImplemented
def __str__(self):
return f"PdPipelineStage: {self._desc}"
def __repr__(self):
return self.__str__()
def description(self):
"""Returns the description of this pipeline stage"""
return self._desc
def _mem_str(self):
total = asizeof(self)
lines = []
for a in dir(self):
if not a.startswith('__'):
att = getattr(self, a)
if not callable(att):
size = asizeof(att)
if size > 500000: # pragma: no cover
lines.append(' - {}, {:.2f}Mb ({:0>5.2f}%)\n'.format(
a, size / 1000000, 100 * size / total))
elif size > 1000: # pragma: no cover
lines.append(' - {}, {:.2f}Kb ({:0>5.2f}%)\n'.format(
a, size / 1000, 100 * size / total))
else:
lines.append(' - {}, {}b ({:0>5.2f}%)\n'.format(
a, size, 100 * size / total))
return ''.join(lines)
class ColumnsBasedPipelineStage(PdPipelineStage):
"""A pipeline stage that operates on a subset of dataframe columns.
Parameters
---------
columns : single label, iterable or callable
The label, or an iterable of labels, of columns to use. Alternatively,
this parameter can be assigned a callable returning an iterable of
labels from an input pandas.DataFrame. See `pdpipe.cq`.
exclude_columns : single label, iterable or callable, optional
The label, or an iterable of labels, of columns to exclude, given the
`columns` parameter. Alternatively, this parameter can be assigned a
callable returning a labels iterable from an input pandas.DataFrame.
See `pdpipe.cq`. Optional. By default no columns are excluded.
desc_temp : str, optional
If given, assumed to be a format string, and every appearance of {} in
it is replaced with an appropriate string representation of the columns
parameter, and is used as the pipeline description. Ignored if `desc`
is provided.
none_columns : iterable, callable or str, default 'error'
Determines how None values supplied to the 'columns' parameter should
be handled. If set to 'error', the default, a ValueError is raised if
None is encountered. If set to 'all', it is interpreted to mean all
columns of input dataframes should be operated on. If an iterable is
provided it is interpreted as the default list of columns to operate on
when `columns=None`. If a callable is provided, it is interpreted as
the default column qualifier that determines input columns when
`columns=None`.
**kwargs
Additionally supports all constructor parameters of PdPipelineStage.
"""
@staticmethod
def _interpret_columns_param(columns, none_error=False, none_columns=None):
"""Interprets the value provided to the columns parameter and returns
a list version of it - if needed - a string representation of it.
"""
if columns is None:
if none_error:
raise ValueError((
'None is not a valid argument for the columns parameter of'
' this pipeline stage.'))
return ColumnsBasedPipelineStage._interpret_columns_param(
columns=none_columns)
if isinstance(columns, str):
# always check str first, because it has __iter__
return [columns], columns
if callable(columns):
# if isinstance(columns, ColumnQualifier):
# return columns, columns.__repr__() or ''
return columns, columns.__doc__ or ''
# if it was a single string it was already made a list, and it's not a
# callable, so it's either an iterable of labels... or
if hasattr(columns, '__iter__'):
return columns, ', '.join(str(elem) for elem in columns)
# a single non-string label.
return [columns], str(columns)
def __init__(
self, columns, exclude_columns=None, desc_temp=None,
none_columns='error', **kwargs):
self._exclude_columns = exclude_columns
if exclude_columns:
self._exclude_columns = self._interpret_columns_param(
exclude_columns)
self._none_error = False
self._none_cols = None
# handle none_columns
if isinstance(none_columns, str):
if none_columns == 'error':
self._none_error = True
elif none_columns == 'all':
self._none_cols = AllColumns()
else:
raise ValueError((
"'error' and 'all' are the only valid string arguments"
" to the none_columns constructor parameter!"))
elif hasattr(none_columns, '__iter__'):
self._none_cols = none_columns
elif callable(none_columns):
self._none_cols = none_columns
else:
raise ValueError((
"Valid arguments to the none_columns constructor parameter"
" are 'error', 'all', an iterable of labels or a callable!"
))
# done handling none_columns
self._col_arg, self._col_str = self._interpret_columns_param(
columns, self._none_error, none_columns=self._none_cols)
if (kwargs.get('desc') is None) and desc_temp:
kwargs['desc'] = desc_temp.format(self._col_str)
if kwargs.get('exmsg') is None:
kwargs['exmsg'] = (
'Pipeline stage failed because not all columns {} '
'were found in the input dataframe.'
).format(self._col_str)
super().__init__(**kwargs)
def _is_fittable(self):
return is_fittable_column_qualifier(self._col_arg)
@staticmethod
def __get_cols_by_arg(col_arg, df, fit=False):
try:
if fit:
# try to treat col_arg as a fittable column qualifier
return col_arg.fit_transform(df)
# else, no need to fit, so try to treat _col_arg as a callable
return col_arg(df)
except AttributeError:
# got here cause col_arg has no fit_transform method...
try:
# so try and treat it as a callable again
return col_arg(df)
except TypeError:
# calling col_arg 2 lines above failed; its a list of labels
return col_arg
except TypeError:
# calling _col_arg 10 lines above failed; its a list of labels
return col_arg
def _get_columns(self, df, fit=False):
cols = ColumnsBasedPipelineStage.__get_cols_by_arg(
self._col_arg, df, fit=fit)
if self._exclude_columns:
exc_cols = ColumnsBasedPipelineStage.__get_cols_by_arg(
self._exclude_columns, df, fit=fit)
return [x for x in cols if x not in exc_cols]
return cols
def _prec(self, df):
return set(self._get_columns(df=df)).issubset(df.columns)
@abc.abstractmethod
def _transformation(self, df, verbose, fit):
raise NotImplementedError((
"Classes extending ColumnsBasedPipelineStage must implement the "
"_transformation method!"))
def _fit_transform(self, df, verbose):
self.is_fitted = True
return self._transformation(df, verbose, fit=True)
def _transform(self, df, verbose):
return self._transformation(df, verbose, fit=False)
def _always_true(x):
return True
class AdHocStage(PdPipelineStage):
"""An ad-hoc stage of a pandas DataFrame-processing pipeline.
The signature for both the `transform` and the optional `fit_transform`
callables is adaptive: The first argument is used positionally (so no
specific name is assumed or used) to supply the callable with the pandas
DataFrame object to transform. The following additional keyword arguments
are supplied if the are included in the callable's signature:
`verbose` - Passed on from PdPipelineStage's `fit`, `fit_transform`
and `apply` methods.
`fit_context` and `application_context` - Provides fit-specific and
application-specific contexts (see `PdpApplicationContext`) usually
available to pipeline stages using `self.fit_context` and
`self.application_context`.
Parameters
----------
transform : callable
The transformation this stage applies to dataframes. If the
fit_transform parameter is also populated than this transformation is
only applied on calls to transform. See documentation for the exact
signature.
fit_transform : callable, optional
The transformation this stage applies to dataframes, only on
fit_transform. Optional. See documentation for the exact signature.
prec : callable, default None
A callable that returns a boolean value. Represent a a precondition
used to determine whether this stage can be applied to a given
dataframe. If None is given, set to a function always returning True.
Example
-------
>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1, 'a'], [2, 'b']], [1, 2], ['num', 'char'])
>>> drop_num = pdp.AdHocStage(
... transform=lambda df: df.drop(['num'], axis=1),
... prec=lambda df: 'num' in df.columns
... )
>>> drop_num.apply(df)
char
1 a
2 b
"""
def __init__(self, transform, fit_transform=None, prec=None, **kwargs):
if prec is None:
prec = _always_true
self._adhoc_transform = transform
self._adhoc_fit_transform = fit_transform
self._adhoc_prec = prec
self._transform_kwargs = _get_args_list(self._adhoc_transform)
try:
self._fit_transform_kwargs = _get_args_list(
self._adhoc_fit_transform)
except TypeError: # fit_transform is None
self._fit_transform_kwargs = {}
super().__init__(**kwargs)
def _prec(self, df):
return self._adhoc_prec(df)
def _fit_transform(self, df, verbose):
self.is_fitted = True
if self._adhoc_fit_transform is None:
self.is_fitted = True
return self._transform(df, verbose)
kwargs = {
'verbose': verbose,
'fit_context': self.fit_context,
'application_context': self.application_context,
}
kwargs = {
k: v for k, v in kwargs.items() if k in self._fit_transform_kwargs}
return self._adhoc_fit_transform(df, **kwargs)
def _transform(self, df, verbose):
kwargs = {
'verbose': verbose,
'fit_context': self.fit_context,
'application_context': self.application_context,
}
kwargs = {
k: v for k, v in kwargs.items() if k in self._transform_kwargs}
return self._adhoc_transform(df, **kwargs)
class PdPipeline(PdPipelineStage, collections.abc.Sequence):
"""A pipeline for processing pandas DataFrame objects.
`transformer_getter` is useful to avoid applying pipeline stages that are
aimed to filter out items in a big dataset to create a training set for a
machine learning model, for example, but should not be applied on future
individual items to be transformed by the fitted pipeline.
Parameters
----------
stages : list
A list of PdPipelineStage objects making up this pipeline.
transform_getter : callable, optional
A callable that can be applied to the fitted pipeline to produce a
sub-pipeline of it which should be used to transform dataframes after
the pipeline has been fitted. If not given, the fitted pipeline is used
entirely.
"""
_DEF_EXC_MSG = 'Pipeline precondition failed!'
def __init__(self, stages, transformer_getter=None, **kwargs):
self._stages = stages
self._trans_getter = transformer_getter
self.is_fitted = False
super_kwargs = {
'exraise': False,
'exmsg': PdPipeline._DEF_EXC_MSG,
}
super_kwargs.update(**kwargs)
super().__init__(**super_kwargs)
# implementing a collections.abc.Sequence abstract method
def __getitem__(self, index):
if isinstance(index, slice):
return PdPipeline(self._stages[index])
if isinstance(index, list) and all(isinstance(x, str) for x in index):
stages = [stage for stage in self._stages if stage._name in index]
return PdPipeline(stages)
if isinstance(index, str):
stages = [stage for stage in self._stages if stage._name == index]
if len(stages) == 0:
raise ValueError(f"'{index}' is not exist.")
return stages[0]
return self._stages[index]
# implementing a collections.abc.Sequence abstract method
def __len__(self):
return len(self._stages)
def _prec(self, df):
# PdPipeline overrides apply in a way which makes this moot
raise NotImplementedError
def _post(self, df):
# PdPipeline overrides apply in a way which makes this moot
raise NotImplementedError
def _transform(self, df, verbose):
# PdPipeline overrides apply in a way which makes this moot
raise NotImplementedError
def _post_transform_lock(self):
self.application_context.lock()
self.fit_context.lock()
def apply(self, df, exraise=None, verbose=False, time=False):
"""Applies this pipeline stage to the given dataframe.
If the stage is not fitted fit_transform is called. Otherwise,
transform is called.
Parameters
----------
df : pandas.DataFrame
The dataframe to which this pipeline stage will be applied.
exraise : bool, default None
Determines behaviour if the precondition of composing stages is not
fulfilled by the input dataframe: If True, a
pdpipe.FailedPreconditionError is raised. If False, the stage is
skipped. If not given, or set to None, the default behaviour of
each stage is used, as determined by its 'exraise' constructor
parameter.
verbose : bool, default False
If True an explanation message is printed after the precondition
is checked but before the application of the pipeline stage.
Defaults to False.
time : bool, default False
If True, per-stage application time is measured and reported when
pipeline application is done.
Returns
-------
pandas.DataFrame
The resulting dataframe.
"""
self.application_context = PdpApplicationContext()
if self.is_fitted:
res = self.transform(
X=df,
exraise=exraise,
verbose=verbose,
time=time
)
self._post_transform_lock()
return res
self.fit_context = PdpApplicationContext()
res = self.fit_transform(
X=df,
exraise=exraise,
verbose=verbose,
time=time
)
self._post_transform_lock()
return res
def __timed_fit_transform(self, X, y=None, exraise=None, verbose=None):
self.application_context = PdpApplicationContext()
self.fit_context = PdpApplicationContext()
inter_x = X
times = []
prev = time.time()
for i, stage in enumerate(self._stages):
try:
stage.fit_context = self.fit_context
stage.application_context = self.application_context
inter_x = stage.fit_transform(
X=inter_x,
y=None,
exraise=exraise,
verbose=verbose,
)
now = time.time()
times.append(now - prev)
prev = now
except Exception as e:
raise PipelineApplicationError(
f"Exception raised in stage [ {i}] {stage}"
) from e
self.is_fitted = True
print("\nPipeline total application time: {:.3f}s.\n Details:".format(
sum(times)))
print(self.__times_str__(times))
self._post_transform_lock()
return inter_x
def fit_transform(self, X, y=None, exraise=None, verbose=None, time=False):
"""Fits this pipeline and transforms the input dataframe.
Parameters
----------
X : pandas.DataFrame
The dataframe to transform and fit this pipeline by.
y : array-like, optional
Targets for supervised learning.
exraise : bool, default None
Determines behaviour if the precondition of composing stages is not
fulfilled by the input dataframe: If True, a
pdpipe.FailedPreconditionError is raised. If False, the stage is
skipped. If not given, or set to None, the default behaviour of
each stage is used, as determined by its 'exraise' constructor
parameter.
verbose : bool, default False
If True an explanation message is printed after the precondition
of each stage is checked but before its application. Otherwise, no
messages are printed.
time : bool, default False
If True, per-stage application time is measured and reported when
pipeline application is done.
Returns
-------
pandas.DataFrame
The resulting dataframe.
"""
if time:
return self.__timed_fit_transform(
X=X, y=y, exraise=exraise, verbose=verbose)
inter_x = X
self.application_context = PdpApplicationContext()
self.fit_context = PdpApplicationContext()
for i, stage in enumerate(self._stages):
try:
stage.fit_context = self.fit_context
stage.application_context = self.application_context
inter_x = stage.fit_transform(
X=inter_x,
y=None,
exraise=exraise,
verbose=verbose,
)
except Exception as e:
raise PipelineApplicationError(
f"Exception raised in stage [ {i}] {stage}"
) from e
self._post_transform_lock()
self.is_fitted = True
return inter_x
def fit(self, X, y=None, exraise=None, verbose=None, time=None):
"""Fits this pipeline without transforming the input dataframe.
Parameters
----------
X : pandas.DataFrame
The dataframe to fit this pipeline by.
y : array-like, optional
Targets for supervised learning.
exraise : bool, default None
Determines behaviour if the precondition of composing stages is not
fulfilled by the input dataframe: If True, a
pdpipe.FailedPreconditionError is raised. If False, the stage is
skipped. If not given, or set to None, the default behaviour of
each stage is used, as determined by its 'exraise' constructor
parameter.
verbose : bool, default False
If True an explanation message is printed after the precondition
of each stage is checked but before its application. Otherwise, no
messages are printed.
time : bool, default False
If True, per-stage application time is measured and reported when
pipeline application is done.
Returns
-------
pandas.DataFrame
The input dataframe, unchanged.
"""
self.fit_transform(
X=X,
y=None,
exraise=exraise,
verbose=verbose,
time=time,
)
return X
def __timed_transform(self, X, y=None, exraise=None, verbose=None):
inter_x = X
times = []
prev = time.time()
self.application_context = PdpApplicationContext()
self.fit_context = PdpApplicationContext()
for i, stage in enumerate(self._stages):
try:
stage.fit_context = self.fit_context
stage.application_context = self.application_context
inter_x = stage.transform(
X=inter_x,
y=None,
exraise=exraise,
verbose=verbose,
)
now = time.time()
times.append(now - prev)
prev = now
except Exception as e:
raise PipelineApplicationError(
f"Exception raised in stage [ {i}] {stage}"
) from e
self.is_fitted = True
print("\nPipeline total application time: {:.3f}s.\n Details:".format(
sum(times)))
print(self.__times_str__(times))
self._post_transform_lock()
return inter_x
def transform(self, X, y=None, exraise=None, verbose=None, time=False):
"""Transforms the given dataframe without fitting this pipeline.
If any stage in this pipeline is fittable but is not fitted, an
UnfittedPipelineStageError is raised before transformation starts.
Parameters
----------
X : pandas.DataFrame
The dataframe to transform.
y : array-like, optional
Targets for supervised learning.
exraise : bool, default None
Determines behaviour if the precondition of composing stages is not
fulfilled by the input dataframe: If True, a
pdpipe.FailedPreconditionError is raised. If False, the stage is
skipped. If not given, or set to None, the default behaviour of
each stage is used, as determined by its 'exraise' constructor
parameter.
verbose : bool, default False
If True an explanation message is printed after the precondition
of each stage is checked but before its application. Otherwise, no
messages are printed.
time : bool, default False
If True, per-stage application time is measured and reported when
pipeline application is done.
Returns
-------
pandas.DataFrame
The resulting dataframe.
"""
for stage in self._stages:
if stage._is_fittable() and not stage.is_fitted:
raise UnfittedPipelineStageError((
"PipelineStage {} in pipeline is fittable but"
" unfitted!").format(stage))
if time:
return self.__timed_transform(
X=X, y=y, exraise=exraise, verbose=verbose)
inter_df = X
self.application_context = PdpApplicationContext()
for i, stage in enumerate(self._stages):
try:
stage.application_context = self.application_context
inter_df = stage.transform(
X=inter_df,
y=None,
exraise=exraise,
verbose=verbose,
)
except Exception as e:
raise PipelineApplicationError(
f"Exception raised in stage [ {i}] {stage}"
) from e
self._post_transform_lock()
return inter_df
__call__ = apply
def __add__(self, other):
if isinstance(other, PdPipeline):
return PdPipeline([*self._stages, *other._stages])
if isinstance(other, PdPipelineStage):
return PdPipeline([*self._stages, other])
return NotImplemented
def __times_str__(self, times):
res = "A pdpipe pipeline:\n"
stime = sum(times)
if stime > 0: # pragma: no cover
percentages = [100 * x / stime for x in times]
else: # pragma: no cover
percentages = [0 for x in times]
res += '[ 0] [{:0>5.2f}s ({:0>5.2f}%)] '.format(
times[0], percentages[0]
) + "\n ".join(
textwrap.wrap(self._stages[0].description())
) + '\n'
for i, stage in enumerate(self._stages[1:]):
res += '[{:>2}] [{:0>5.2f}s ({:0>5.2f}%)] '.format(
i + 1, times[i + 1], percentages[i + 1]
) + "\n ".join(
textwrap.wrap(stage.description())
) + '\n'
return res
def __str__(self):
res = "A pdpipe pipeline:\n"
res += '[ 0] ' + "\n ".join(
textwrap.wrap(self._stages[0].description())) + '\n'
for i, stage in enumerate(self._stages[1:]):
res += '[{:>2}] '.format(i + 1) + "\n ".join(
textwrap.wrap(stage.description())) + '\n'
return res
def _mem_str(self, total):
total = asizeof(self)
lines = []
for i, stage in enumerate(self._stages):
size = asizeof(stage)
if size > 500000: # pragma: no cover
lines.append('[{:>2}] {:.2f}Mb ({:0>5.2f}%), {}\n'.format(
i, size / 1000000, 100 * size / total,
stage.description()))
elif size > 1000: # pragma: no cover
lines.append('[{:>2}] {:.2f}Kb ({:0>5.2f}%), {}\n'.format(
i, size / 1000, 100 * size / total, stage.description()))
else:
lines.append('[{:>2}] {:}b ({:0>5.2f}%), {}\n'.format(
i, size, 100 * size / total, stage.description()))
lines.append(stage._mem_str())
return ''.join(lines)
def memory_report(self):
"""Prints a detailed memory report of the pipeline object to screen.
To get better memory estimates make sure the pympler Python package is
installed. Without it, sys.getsizeof is used, which can be extremely
underestimate memory size of Python objects.
"""
print("=== Pipeline memory report ===")
size = asizeof(self)
if size > 500000: # pragma: no cover
print("Total pipeline size in memory: {:.2f}Mb".format(
size / 1000000))
elif size > 1000: # pragma: no cover
print("Total pipeline size in memory: {:.2f}Kb".format(
size / 1000))
else:
print("Total pipeline size in memory: {:.2f}b".format(
size))
print("Per-stage memory structure:")
print(self._mem_str(total=size))
def get_transformer(self):
"""Return the transformer induced by this fitted pipeline.
This transformer is a `pdpipe` pipeline that transforms input data
in a way corresponding to this pipline after it has been fitted. By
default this is the pipeline itself, but the `transform_getter`
constructor parameter can be used to return a sub-pipeline of the
fitted pipeline instead, for cases where some stages should only be
applied when fitting this pipeline to data.
Returns
-------
pdpipe.PdPipeline
The corresponding transformer pipeline induced by this pipeline.
"""
try:
return self._trans_getter(self)
except TypeError: # pragma: no cover
return self
# def drop(self, index):
# """Returns this pipeline with the stage of the given index removed.
# Arguments
# ---------
# index
def make_pdpipeline(*stages):
"""Constructs a PdPipeline from the given pipeline stages.
Parameters
----------
*stages : pdpipe.PipelineStage objects
PdPipeline stages given as positional arguments.
Returns
-------
p : pdpipe.PdPipeline
The resulting pipeline.
Examples
--------
>>> import pdpipe as pdp
>>> p = make_pdpipeline(pdp.ColDrop('count'), pdp.DropDuplicates())
"""
return PdPipeline(stages=stages)
Functions
def make_pdpipeline(*stages)
-
Constructs a PdPipeline from the given pipeline stages.
Parameters
*stages
:pdpipe.PipelineStage objects
PdPipeline stages given as positional arguments.
Returns
p
:pdpipe.PdPipeline
- The resulting pipeline.
Examples
>>> import pdpipe as pdp >>> p = make_pdpipeline(pdp.ColDrop('count'), pdp.DropDuplicates())
Expand source code
def make_pdpipeline(*stages): """Constructs a PdPipeline from the given pipeline stages. Parameters ---------- *stages : pdpipe.PipelineStage objects PdPipeline stages given as positional arguments. Returns ------- p : pdpipe.PdPipeline The resulting pipeline. Examples -------- >>> import pdpipe as pdp >>> p = make_pdpipeline(pdp.ColDrop('count'), pdp.DropDuplicates()) """ return PdPipeline(stages=stages)
Classes
class AdHocStage (transform, fit_transform=None, prec=None, **kwargs)
-
An ad-hoc stage of a pandas DataFrame-processing pipeline.
The signature for both the
transform
and the optionalfit_transform
callables is adaptive: The first argument is used positionally (so no specific name is assumed or used) to supply the callable with the pandas DataFrame object to transform. The following additional keyword arguments are supplied if the are included in the callable's signature:verbose
- Passed on from PdPipelineStage'sfit
,fit_transform
andapply
methods.fit_context
andapplication_context
- Provides fit-specific and application-specific contexts (seePdpApplicationContext
) usually available to pipeline stages usingself.fit_context
andself.application_context
.Parameters
transform
:callable
- The transformation this stage applies to dataframes. If the fit_transform parameter is also populated than this transformation is only applied on calls to transform. See documentation for the exact signature.
fit_transform
:callable
, optional- The transformation this stage applies to dataframes, only on fit_transform. Optional. See documentation for the exact signature.
prec
:callable
, defaultNone
- A callable that returns a boolean value. Represent a a precondition used to determine whether this stage can be applied to a given dataframe. If None is given, set to a function always returning True.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1, 'a'], [2, 'b']], [1, 2], ['num', 'char']) >>> drop_num = pdp.AdHocStage( ... transform=lambda df: df.drop(['num'], axis=1), ... prec=lambda df: 'num' in df.columns ... ) >>> drop_num.apply(df) char 1 a 2 b
Expand source code
class AdHocStage(PdPipelineStage): """An ad-hoc stage of a pandas DataFrame-processing pipeline. The signature for both the `transform` and the optional `fit_transform` callables is adaptive: The first argument is used positionally (so no specific name is assumed or used) to supply the callable with the pandas DataFrame object to transform. The following additional keyword arguments are supplied if the are included in the callable's signature: `verbose` - Passed on from PdPipelineStage's `fit`, `fit_transform` and `apply` methods. `fit_context` and `application_context` - Provides fit-specific and application-specific contexts (see `PdpApplicationContext`) usually available to pipeline stages using `self.fit_context` and `self.application_context`. Parameters ---------- transform : callable The transformation this stage applies to dataframes. If the fit_transform parameter is also populated than this transformation is only applied on calls to transform. See documentation for the exact signature. fit_transform : callable, optional The transformation this stage applies to dataframes, only on fit_transform. Optional. See documentation for the exact signature. prec : callable, default None A callable that returns a boolean value. Represent a a precondition used to determine whether this stage can be applied to a given dataframe. If None is given, set to a function always returning True. Example ------- >>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1, 'a'], [2, 'b']], [1, 2], ['num', 'char']) >>> drop_num = pdp.AdHocStage( ... transform=lambda df: df.drop(['num'], axis=1), ... prec=lambda df: 'num' in df.columns ... ) >>> drop_num.apply(df) char 1 a 2 b """ def __init__(self, transform, fit_transform=None, prec=None, **kwargs): if prec is None: prec = _always_true self._adhoc_transform = transform self._adhoc_fit_transform = fit_transform self._adhoc_prec = prec self._transform_kwargs = _get_args_list(self._adhoc_transform) try: self._fit_transform_kwargs = _get_args_list( self._adhoc_fit_transform) except TypeError: # fit_transform is None self._fit_transform_kwargs = {} super().__init__(**kwargs) def _prec(self, df): return self._adhoc_prec(df) def _fit_transform(self, df, verbose): self.is_fitted = True if self._adhoc_fit_transform is None: self.is_fitted = True return self._transform(df, verbose) kwargs = { 'verbose': verbose, 'fit_context': self.fit_context, 'application_context': self.application_context, } kwargs = { k: v for k, v in kwargs.items() if k in self._fit_transform_kwargs} return self._adhoc_fit_transform(df, **kwargs) def _transform(self, df, verbose): kwargs = { 'verbose': verbose, 'fit_context': self.fit_context, 'application_context': self.application_context, } kwargs = { k: v for k, v in kwargs.items() if k in self._transform_kwargs} return self._adhoc_transform(df, **kwargs)
Ancestors
- PdPipelineStage
- abc.ABC
Inherited members
PdPipelineStage
:AdHocStage
AggByCols
ApplyByCols
ApplyToRows
Bin
ColByFrameFunc
ColDrop
ColRename
ColReorder
ColumnDtypeEnforcer
ColumnTransformer
ColumnsBasedPipelineStage
ConditionValidator
DropDuplicates
DropNa
DropRareTokens
DropTokensByLength
DropTokensByList
Encode
FitOnly
FreqDrop
Log
MapColVals
OneHotEncode
PdPipeline
RegexReplace
RemoveStopwords
RowDrop
Scale
Schematize
SetIndex
SnowballStem
TfidfVectorizeTokenLists
TokenizeText
UntokenizeText
ValDrop
ValKeep
apply
description
fit
fit_transform
transform
class ColumnsBasedPipelineStage (columns, exclude_columns=None, desc_temp=None, none_columns='error', **kwargs)
-
A pipeline stage that operates on a subset of dataframe columns.
Parameters
columns
:single label, iterable
orcallable
- The label, or an iterable of labels, of columns to use. Alternatively,
this parameter can be assigned a callable returning an iterable of
labels from an input pandas.DataFrame. See
pdpipe.cq
. exclude_columns
:single label, iterable
orcallable
, optional- The label, or an iterable of labels, of columns to exclude, given the
columns
parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. Seepdpipe.cq
. Optional. By default no columns are excluded. desc_temp
:str
, optional- If given, assumed to be a format string, and every appearance of {} in
it is replaced with an appropriate string representation of the columns
parameter, and is used as the pipeline description. Ignored if
desc
is provided. none_columns
:iterable, callable
orstr
, default'error'
- Determines how None values supplied to the 'columns' parameter should
be handled. If set to 'error', the default, a ValueError is raised if
None is encountered. If set to 'all', it is interpreted to mean all
columns of input dataframes should be operated on. If an iterable is
provided it is interpreted as the default list of columns to operate on
when
columns=None
. If a callable is provided, it is interpreted as the default column qualifier that determines input columns whencolumns=None
. **kwargs
- Additionally supports all constructor parameters of PdPipelineStage.
Expand source code
class ColumnsBasedPipelineStage(PdPipelineStage): """A pipeline stage that operates on a subset of dataframe columns. Parameters --------- columns : single label, iterable or callable The label, or an iterable of labels, of columns to use. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See `pdpipe.cq`. exclude_columns : single label, iterable or callable, optional The label, or an iterable of labels, of columns to exclude, given the `columns` parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. See `pdpipe.cq`. Optional. By default no columns are excluded. desc_temp : str, optional If given, assumed to be a format string, and every appearance of {} in it is replaced with an appropriate string representation of the columns parameter, and is used as the pipeline description. Ignored if `desc` is provided. none_columns : iterable, callable or str, default 'error' Determines how None values supplied to the 'columns' parameter should be handled. If set to 'error', the default, a ValueError is raised if None is encountered. If set to 'all', it is interpreted to mean all columns of input dataframes should be operated on. If an iterable is provided it is interpreted as the default list of columns to operate on when `columns=None`. If a callable is provided, it is interpreted as the default column qualifier that determines input columns when `columns=None`. **kwargs Additionally supports all constructor parameters of PdPipelineStage. """ @staticmethod def _interpret_columns_param(columns, none_error=False, none_columns=None): """Interprets the value provided to the columns parameter and returns a list version of it - if needed - a string representation of it. """ if columns is None: if none_error: raise ValueError(( 'None is not a valid argument for the columns parameter of' ' this pipeline stage.')) return ColumnsBasedPipelineStage._interpret_columns_param( columns=none_columns) if isinstance(columns, str): # always check str first, because it has __iter__ return [columns], columns if callable(columns): # if isinstance(columns, ColumnQualifier): # return columns, columns.__repr__() or '' return columns, columns.__doc__ or '' # if it was a single string it was already made a list, and it's not a # callable, so it's either an iterable of labels... or if hasattr(columns, '__iter__'): return columns, ', '.join(str(elem) for elem in columns) # a single non-string label. return [columns], str(columns) def __init__( self, columns, exclude_columns=None, desc_temp=None, none_columns='error', **kwargs): self._exclude_columns = exclude_columns if exclude_columns: self._exclude_columns = self._interpret_columns_param( exclude_columns) self._none_error = False self._none_cols = None # handle none_columns if isinstance(none_columns, str): if none_columns == 'error': self._none_error = True elif none_columns == 'all': self._none_cols = AllColumns() else: raise ValueError(( "'error' and 'all' are the only valid string arguments" " to the none_columns constructor parameter!")) elif hasattr(none_columns, '__iter__'): self._none_cols = none_columns elif callable(none_columns): self._none_cols = none_columns else: raise ValueError(( "Valid arguments to the none_columns constructor parameter" " are 'error', 'all', an iterable of labels or a callable!" )) # done handling none_columns self._col_arg, self._col_str = self._interpret_columns_param( columns, self._none_error, none_columns=self._none_cols) if (kwargs.get('desc') is None) and desc_temp: kwargs['desc'] = desc_temp.format(self._col_str) if kwargs.get('exmsg') is None: kwargs['exmsg'] = ( 'Pipeline stage failed because not all columns {} ' 'were found in the input dataframe.' ).format(self._col_str) super().__init__(**kwargs) def _is_fittable(self): return is_fittable_column_qualifier(self._col_arg) @staticmethod def __get_cols_by_arg(col_arg, df, fit=False): try: if fit: # try to treat col_arg as a fittable column qualifier return col_arg.fit_transform(df) # else, no need to fit, so try to treat _col_arg as a callable return col_arg(df) except AttributeError: # got here cause col_arg has no fit_transform method... try: # so try and treat it as a callable again return col_arg(df) except TypeError: # calling col_arg 2 lines above failed; its a list of labels return col_arg except TypeError: # calling _col_arg 10 lines above failed; its a list of labels return col_arg def _get_columns(self, df, fit=False): cols = ColumnsBasedPipelineStage.__get_cols_by_arg( self._col_arg, df, fit=fit) if self._exclude_columns: exc_cols = ColumnsBasedPipelineStage.__get_cols_by_arg( self._exclude_columns, df, fit=fit) return [x for x in cols if x not in exc_cols] return cols def _prec(self, df): return set(self._get_columns(df=df)).issubset(df.columns) @abc.abstractmethod def _transformation(self, df, verbose, fit): raise NotImplementedError(( "Classes extending ColumnsBasedPipelineStage must implement the " "_transformation method!")) def _fit_transform(self, df, verbose): self.is_fitted = True return self._transformation(df, verbose, fit=True) def _transform(self, df, verbose): return self._transformation(df, verbose, fit=False)
Ancestors
- PdPipelineStage
- abc.ABC
Subclasses
- ColDrop
- DropDuplicates
- RowDrop
- ValDrop
- ValKeep
- ColumnTransformer
- Log
- OneHotEncode
- DropRareTokens
- Encode
- Scale
Inherited members
PdPipelineStage
:AdHocStage
AggByCols
ApplyByCols
ApplyToRows
Bin
ColByFrameFunc
ColDrop
ColRename
ColReorder
ColumnDtypeEnforcer
ColumnTransformer
ColumnsBasedPipelineStage
ConditionValidator
DropDuplicates
DropNa
DropRareTokens
DropTokensByLength
DropTokensByList
Encode
FitOnly
FreqDrop
Log
MapColVals
OneHotEncode
PdPipeline
RegexReplace
RemoveStopwords
RowDrop
Scale
Schematize
SetIndex
SnowballStem
TfidfVectorizeTokenLists
TokenizeText
UntokenizeText
ValDrop
ValKeep
apply
description
fit
fit_transform
transform
class PdPipeline (stages, transformer_getter=None, **kwargs)
-
A pipeline for processing pandas DataFrame objects.
transformer_getter
is useful to avoid applying pipeline stages that are aimed to filter out items in a big dataset to create a training set for a machine learning model, for example, but should not be applied on future individual items to be transformed by the fitted pipeline.Parameters
stages
:list
- A list of PdPipelineStage objects making up this pipeline.
transform_getter
:callable
, optional- A callable that can be applied to the fitted pipeline to produce a sub-pipeline of it which should be used to transform dataframes after the pipeline has been fitted. If not given, the fitted pipeline is used entirely.
Expand source code
class PdPipeline(PdPipelineStage, collections.abc.Sequence): """A pipeline for processing pandas DataFrame objects. `transformer_getter` is useful to avoid applying pipeline stages that are aimed to filter out items in a big dataset to create a training set for a machine learning model, for example, but should not be applied on future individual items to be transformed by the fitted pipeline. Parameters ---------- stages : list A list of PdPipelineStage objects making up this pipeline. transform_getter : callable, optional A callable that can be applied to the fitted pipeline to produce a sub-pipeline of it which should be used to transform dataframes after the pipeline has been fitted. If not given, the fitted pipeline is used entirely. """ _DEF_EXC_MSG = 'Pipeline precondition failed!' def __init__(self, stages, transformer_getter=None, **kwargs): self._stages = stages self._trans_getter = transformer_getter self.is_fitted = False super_kwargs = { 'exraise': False, 'exmsg': PdPipeline._DEF_EXC_MSG, } super_kwargs.update(**kwargs) super().__init__(**super_kwargs) # implementing a collections.abc.Sequence abstract method def __getitem__(self, index): if isinstance(index, slice): return PdPipeline(self._stages[index]) if isinstance(index, list) and all(isinstance(x, str) for x in index): stages = [stage for stage in self._stages if stage._name in index] return PdPipeline(stages) if isinstance(index, str): stages = [stage for stage in self._stages if stage._name == index] if len(stages) == 0: raise ValueError(f"'{index}' is not exist.") return stages[0] return self._stages[index] # implementing a collections.abc.Sequence abstract method def __len__(self): return len(self._stages) def _prec(self, df): # PdPipeline overrides apply in a way which makes this moot raise NotImplementedError def _post(self, df): # PdPipeline overrides apply in a way which makes this moot raise NotImplementedError def _transform(self, df, verbose): # PdPipeline overrides apply in a way which makes this moot raise NotImplementedError def _post_transform_lock(self): self.application_context.lock() self.fit_context.lock() def apply(self, df, exraise=None, verbose=False, time=False): """Applies this pipeline stage to the given dataframe. If the stage is not fitted fit_transform is called. Otherwise, transform is called. Parameters ---------- df : pandas.DataFrame The dataframe to which this pipeline stage will be applied. exraise : bool, default None Determines behaviour if the precondition of composing stages is not fulfilled by the input dataframe: If True, a pdpipe.FailedPreconditionError is raised. If False, the stage is skipped. If not given, or set to None, the default behaviour of each stage is used, as determined by its 'exraise' constructor parameter. verbose : bool, default False If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False. time : bool, default False If True, per-stage application time is measured and reported when pipeline application is done. Returns ------- pandas.DataFrame The resulting dataframe. """ self.application_context = PdpApplicationContext() if self.is_fitted: res = self.transform( X=df, exraise=exraise, verbose=verbose, time=time ) self._post_transform_lock() return res self.fit_context = PdpApplicationContext() res = self.fit_transform( X=df, exraise=exraise, verbose=verbose, time=time ) self._post_transform_lock() return res def __timed_fit_transform(self, X, y=None, exraise=None, verbose=None): self.application_context = PdpApplicationContext() self.fit_context = PdpApplicationContext() inter_x = X times = [] prev = time.time() for i, stage in enumerate(self._stages): try: stage.fit_context = self.fit_context stage.application_context = self.application_context inter_x = stage.fit_transform( X=inter_x, y=None, exraise=exraise, verbose=verbose, ) now = time.time() times.append(now - prev) prev = now except Exception as e: raise PipelineApplicationError( f"Exception raised in stage [ {i}] {stage}" ) from e self.is_fitted = True print("\nPipeline total application time: {:.3f}s.\n Details:".format( sum(times))) print(self.__times_str__(times)) self._post_transform_lock() return inter_x def fit_transform(self, X, y=None, exraise=None, verbose=None, time=False): """Fits this pipeline and transforms the input dataframe. Parameters ---------- X : pandas.DataFrame The dataframe to transform and fit this pipeline by. y : array-like, optional Targets for supervised learning. exraise : bool, default None Determines behaviour if the precondition of composing stages is not fulfilled by the input dataframe: If True, a pdpipe.FailedPreconditionError is raised. If False, the stage is skipped. If not given, or set to None, the default behaviour of each stage is used, as determined by its 'exraise' constructor parameter. verbose : bool, default False If True an explanation message is printed after the precondition of each stage is checked but before its application. Otherwise, no messages are printed. time : bool, default False If True, per-stage application time is measured and reported when pipeline application is done. Returns ------- pandas.DataFrame The resulting dataframe. """ if time: return self.__timed_fit_transform( X=X, y=y, exraise=exraise, verbose=verbose) inter_x = X self.application_context = PdpApplicationContext() self.fit_context = PdpApplicationContext() for i, stage in enumerate(self._stages): try: stage.fit_context = self.fit_context stage.application_context = self.application_context inter_x = stage.fit_transform( X=inter_x, y=None, exraise=exraise, verbose=verbose, ) except Exception as e: raise PipelineApplicationError( f"Exception raised in stage [ {i}] {stage}" ) from e self._post_transform_lock() self.is_fitted = True return inter_x def fit(self, X, y=None, exraise=None, verbose=None, time=None): """Fits this pipeline without transforming the input dataframe. Parameters ---------- X : pandas.DataFrame The dataframe to fit this pipeline by. y : array-like, optional Targets for supervised learning. exraise : bool, default None Determines behaviour if the precondition of composing stages is not fulfilled by the input dataframe: If True, a pdpipe.FailedPreconditionError is raised. If False, the stage is skipped. If not given, or set to None, the default behaviour of each stage is used, as determined by its 'exraise' constructor parameter. verbose : bool, default False If True an explanation message is printed after the precondition of each stage is checked but before its application. Otherwise, no messages are printed. time : bool, default False If True, per-stage application time is measured and reported when pipeline application is done. Returns ------- pandas.DataFrame The input dataframe, unchanged. """ self.fit_transform( X=X, y=None, exraise=exraise, verbose=verbose, time=time, ) return X def __timed_transform(self, X, y=None, exraise=None, verbose=None): inter_x = X times = [] prev = time.time() self.application_context = PdpApplicationContext() self.fit_context = PdpApplicationContext() for i, stage in enumerate(self._stages): try: stage.fit_context = self.fit_context stage.application_context = self.application_context inter_x = stage.transform( X=inter_x, y=None, exraise=exraise, verbose=verbose, ) now = time.time() times.append(now - prev) prev = now except Exception as e: raise PipelineApplicationError( f"Exception raised in stage [ {i}] {stage}" ) from e self.is_fitted = True print("\nPipeline total application time: {:.3f}s.\n Details:".format( sum(times))) print(self.__times_str__(times)) self._post_transform_lock() return inter_x def transform(self, X, y=None, exraise=None, verbose=None, time=False): """Transforms the given dataframe without fitting this pipeline. If any stage in this pipeline is fittable but is not fitted, an UnfittedPipelineStageError is raised before transformation starts. Parameters ---------- X : pandas.DataFrame The dataframe to transform. y : array-like, optional Targets for supervised learning. exraise : bool, default None Determines behaviour if the precondition of composing stages is not fulfilled by the input dataframe: If True, a pdpipe.FailedPreconditionError is raised. If False, the stage is skipped. If not given, or set to None, the default behaviour of each stage is used, as determined by its 'exraise' constructor parameter. verbose : bool, default False If True an explanation message is printed after the precondition of each stage is checked but before its application. Otherwise, no messages are printed. time : bool, default False If True, per-stage application time is measured and reported when pipeline application is done. Returns ------- pandas.DataFrame The resulting dataframe. """ for stage in self._stages: if stage._is_fittable() and not stage.is_fitted: raise UnfittedPipelineStageError(( "PipelineStage {} in pipeline is fittable but" " unfitted!").format(stage)) if time: return self.__timed_transform( X=X, y=y, exraise=exraise, verbose=verbose) inter_df = X self.application_context = PdpApplicationContext() for i, stage in enumerate(self._stages): try: stage.application_context = self.application_context inter_df = stage.transform( X=inter_df, y=None, exraise=exraise, verbose=verbose, ) except Exception as e: raise PipelineApplicationError( f"Exception raised in stage [ {i}] {stage}" ) from e self._post_transform_lock() return inter_df __call__ = apply def __add__(self, other): if isinstance(other, PdPipeline): return PdPipeline([*self._stages, *other._stages]) if isinstance(other, PdPipelineStage): return PdPipeline([*self._stages, other]) return NotImplemented def __times_str__(self, times): res = "A pdpipe pipeline:\n" stime = sum(times) if stime > 0: # pragma: no cover percentages = [100 * x / stime for x in times] else: # pragma: no cover percentages = [0 for x in times] res += '[ 0] [{:0>5.2f}s ({:0>5.2f}%)] '.format( times[0], percentages[0] ) + "\n ".join( textwrap.wrap(self._stages[0].description()) ) + '\n' for i, stage in enumerate(self._stages[1:]): res += '[{:>2}] [{:0>5.2f}s ({:0>5.2f}%)] '.format( i + 1, times[i + 1], percentages[i + 1] ) + "\n ".join( textwrap.wrap(stage.description()) ) + '\n' return res def __str__(self): res = "A pdpipe pipeline:\n" res += '[ 0] ' + "\n ".join( textwrap.wrap(self._stages[0].description())) + '\n' for i, stage in enumerate(self._stages[1:]): res += '[{:>2}] '.format(i + 1) + "\n ".join( textwrap.wrap(stage.description())) + '\n' return res def _mem_str(self, total): total = asizeof(self) lines = [] for i, stage in enumerate(self._stages): size = asizeof(stage) if size > 500000: # pragma: no cover lines.append('[{:>2}] {:.2f}Mb ({:0>5.2f}%), {}\n'.format( i, size / 1000000, 100 * size / total, stage.description())) elif size > 1000: # pragma: no cover lines.append('[{:>2}] {:.2f}Kb ({:0>5.2f}%), {}\n'.format( i, size / 1000, 100 * size / total, stage.description())) else: lines.append('[{:>2}] {:}b ({:0>5.2f}%), {}\n'.format( i, size, 100 * size / total, stage.description())) lines.append(stage._mem_str()) return ''.join(lines) def memory_report(self): """Prints a detailed memory report of the pipeline object to screen. To get better memory estimates make sure the pympler Python package is installed. Without it, sys.getsizeof is used, which can be extremely underestimate memory size of Python objects. """ print("=== Pipeline memory report ===") size = asizeof(self) if size > 500000: # pragma: no cover print("Total pipeline size in memory: {:.2f}Mb".format( size / 1000000)) elif size > 1000: # pragma: no cover print("Total pipeline size in memory: {:.2f}Kb".format( size / 1000)) else: print("Total pipeline size in memory: {:.2f}b".format( size)) print("Per-stage memory structure:") print(self._mem_str(total=size)) def get_transformer(self): """Return the transformer induced by this fitted pipeline. This transformer is a `pdpipe` pipeline that transforms input data in a way corresponding to this pipline after it has been fitted. By default this is the pipeline itself, but the `transform_getter` constructor parameter can be used to return a sub-pipeline of the fitted pipeline instead, for cases where some stages should only be applied when fitting this pipeline to data. Returns ------- pdpipe.PdPipeline The corresponding transformer pipeline induced by this pipeline. """ try: return self._trans_getter(self) except TypeError: # pragma: no cover return self
Ancestors
- PdPipelineStage
- abc.ABC
- collections.abc.Sequence
- collections.abc.Reversible
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
Methods
def apply(self, df, exraise=None, verbose=False, time=False)
-
Applies this pipeline stage to the given dataframe.
If the stage is not fitted fit_transform is called. Otherwise, transform is called.
Parameters
df
:pandas.DataFrame
- The dataframe to which this pipeline stage will be applied.
exraise
:bool
, defaultNone
- Determines behaviour if the precondition of composing stages is not fulfilled by the input dataframe: If True, a pdpipe.FailedPreconditionError is raised. If False, the stage is skipped. If not given, or set to None, the default behaviour of each stage is used, as determined by its 'exraise' constructor parameter.
verbose
:bool
, defaultFalse
- If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False.
time
:bool
, defaultFalse
- If True, per-stage application time is measured and reported when pipeline application is done.
Returns
pandas.DataFrame
- The resulting dataframe.
Expand source code
def apply(self, df, exraise=None, verbose=False, time=False): """Applies this pipeline stage to the given dataframe. If the stage is not fitted fit_transform is called. Otherwise, transform is called. Parameters ---------- df : pandas.DataFrame The dataframe to which this pipeline stage will be applied. exraise : bool, default None Determines behaviour if the precondition of composing stages is not fulfilled by the input dataframe: If True, a pdpipe.FailedPreconditionError is raised. If False, the stage is skipped. If not given, or set to None, the default behaviour of each stage is used, as determined by its 'exraise' constructor parameter. verbose : bool, default False If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False. time : bool, default False If True, per-stage application time is measured and reported when pipeline application is done. Returns ------- pandas.DataFrame The resulting dataframe. """ self.application_context = PdpApplicationContext() if self.is_fitted: res = self.transform( X=df, exraise=exraise, verbose=verbose, time=time ) self._post_transform_lock() return res self.fit_context = PdpApplicationContext() res = self.fit_transform( X=df, exraise=exraise, verbose=verbose, time=time ) self._post_transform_lock() return res
def fit(self, X, y=None, exraise=None, verbose=None, time=None)
-
Fits this pipeline without transforming the input dataframe.
Parameters
X
:pandas.DataFrame
- The dataframe to fit this pipeline by.
y
:array-like
, optional- Targets for supervised learning.
exraise
:bool
, defaultNone
- Determines behaviour if the precondition of composing stages is not fulfilled by the input dataframe: If True, a pdpipe.FailedPreconditionError is raised. If False, the stage is skipped. If not given, or set to None, the default behaviour of each stage is used, as determined by its 'exraise' constructor parameter.
verbose
:bool
, defaultFalse
- If True an explanation message is printed after the precondition of each stage is checked but before its application. Otherwise, no messages are printed.
time
:bool
, defaultFalse
- If True, per-stage application time is measured and reported when pipeline application is done.
Returns
pandas.DataFrame
- The input dataframe, unchanged.
Expand source code
def fit(self, X, y=None, exraise=None, verbose=None, time=None): """Fits this pipeline without transforming the input dataframe. Parameters ---------- X : pandas.DataFrame The dataframe to fit this pipeline by. y : array-like, optional Targets for supervised learning. exraise : bool, default None Determines behaviour if the precondition of composing stages is not fulfilled by the input dataframe: If True, a pdpipe.FailedPreconditionError is raised. If False, the stage is skipped. If not given, or set to None, the default behaviour of each stage is used, as determined by its 'exraise' constructor parameter. verbose : bool, default False If True an explanation message is printed after the precondition of each stage is checked but before its application. Otherwise, no messages are printed. time : bool, default False If True, per-stage application time is measured and reported when pipeline application is done. Returns ------- pandas.DataFrame The input dataframe, unchanged. """ self.fit_transform( X=X, y=None, exraise=exraise, verbose=verbose, time=time, ) return X
def fit_transform(self, X, y=None, exraise=None, verbose=None, time=False)
-
Fits this pipeline and transforms the input dataframe.
Parameters
X
:pandas.DataFrame
- The dataframe to transform and fit this pipeline by.
y
:array-like
, optional- Targets for supervised learning.
exraise
:bool
, defaultNone
- Determines behaviour if the precondition of composing stages is not fulfilled by the input dataframe: If True, a pdpipe.FailedPreconditionError is raised. If False, the stage is skipped. If not given, or set to None, the default behaviour of each stage is used, as determined by its 'exraise' constructor parameter.
verbose
:bool
, defaultFalse
- If True an explanation message is printed after the precondition of each stage is checked but before its application. Otherwise, no messages are printed.
time
:bool
, defaultFalse
- If True, per-stage application time is measured and reported when pipeline application is done.
Returns
pandas.DataFrame
- The resulting dataframe.
Expand source code
def fit_transform(self, X, y=None, exraise=None, verbose=None, time=False): """Fits this pipeline and transforms the input dataframe. Parameters ---------- X : pandas.DataFrame The dataframe to transform and fit this pipeline by. y : array-like, optional Targets for supervised learning. exraise : bool, default None Determines behaviour if the precondition of composing stages is not fulfilled by the input dataframe: If True, a pdpipe.FailedPreconditionError is raised. If False, the stage is skipped. If not given, or set to None, the default behaviour of each stage is used, as determined by its 'exraise' constructor parameter. verbose : bool, default False If True an explanation message is printed after the precondition of each stage is checked but before its application. Otherwise, no messages are printed. time : bool, default False If True, per-stage application time is measured and reported when pipeline application is done. Returns ------- pandas.DataFrame The resulting dataframe. """ if time: return self.__timed_fit_transform( X=X, y=y, exraise=exraise, verbose=verbose) inter_x = X self.application_context = PdpApplicationContext() self.fit_context = PdpApplicationContext() for i, stage in enumerate(self._stages): try: stage.fit_context = self.fit_context stage.application_context = self.application_context inter_x = stage.fit_transform( X=inter_x, y=None, exraise=exraise, verbose=verbose, ) except Exception as e: raise PipelineApplicationError( f"Exception raised in stage [ {i}] {stage}" ) from e self._post_transform_lock() self.is_fitted = True return inter_x
def get_transformer(self)
-
Return the transformer induced by this fitted pipeline.
This transformer is a
pdpipe
pipeline that transforms input data in a way corresponding to this pipline after it has been fitted. By default this is the pipeline itself, but thetransform_getter
constructor parameter can be used to return a sub-pipeline of the fitted pipeline instead, for cases where some stages should only be applied when fitting this pipeline to data.Returns
pdpipe.PdPipeline
- The corresponding transformer pipeline induced by this pipeline.
Expand source code
def get_transformer(self): """Return the transformer induced by this fitted pipeline. This transformer is a `pdpipe` pipeline that transforms input data in a way corresponding to this pipline after it has been fitted. By default this is the pipeline itself, but the `transform_getter` constructor parameter can be used to return a sub-pipeline of the fitted pipeline instead, for cases where some stages should only be applied when fitting this pipeline to data. Returns ------- pdpipe.PdPipeline The corresponding transformer pipeline induced by this pipeline. """ try: return self._trans_getter(self) except TypeError: # pragma: no cover return self
def memory_report(self)
-
Prints a detailed memory report of the pipeline object to screen.
To get better memory estimates make sure the pympler Python package is installed. Without it, sys.getsizeof is used, which can be extremely underestimate memory size of Python objects.
Expand source code
def memory_report(self): """Prints a detailed memory report of the pipeline object to screen. To get better memory estimates make sure the pympler Python package is installed. Without it, sys.getsizeof is used, which can be extremely underestimate memory size of Python objects. """ print("=== Pipeline memory report ===") size = asizeof(self) if size > 500000: # pragma: no cover print("Total pipeline size in memory: {:.2f}Mb".format( size / 1000000)) elif size > 1000: # pragma: no cover print("Total pipeline size in memory: {:.2f}Kb".format( size / 1000)) else: print("Total pipeline size in memory: {:.2f}b".format( size)) print("Per-stage memory structure:") print(self._mem_str(total=size))
def transform(self, X, y=None, exraise=None, verbose=None, time=False)
-
Transforms the given dataframe without fitting this pipeline.
If any stage in this pipeline is fittable but is not fitted, an UnfittedPipelineStageError is raised before transformation starts.
Parameters
X
:pandas.DataFrame
- The dataframe to transform.
y
:array-like
, optional- Targets for supervised learning.
exraise
:bool
, defaultNone
- Determines behaviour if the precondition of composing stages is not fulfilled by the input dataframe: If True, a pdpipe.FailedPreconditionError is raised. If False, the stage is skipped. If not given, or set to None, the default behaviour of each stage is used, as determined by its 'exraise' constructor parameter.
verbose
:bool
, defaultFalse
- If True an explanation message is printed after the precondition of each stage is checked but before its application. Otherwise, no messages are printed.
time
:bool
, defaultFalse
- If True, per-stage application time is measured and reported when pipeline application is done.
Returns
pandas.DataFrame
- The resulting dataframe.
Expand source code
def transform(self, X, y=None, exraise=None, verbose=None, time=False): """Transforms the given dataframe without fitting this pipeline. If any stage in this pipeline is fittable but is not fitted, an UnfittedPipelineStageError is raised before transformation starts. Parameters ---------- X : pandas.DataFrame The dataframe to transform. y : array-like, optional Targets for supervised learning. exraise : bool, default None Determines behaviour if the precondition of composing stages is not fulfilled by the input dataframe: If True, a pdpipe.FailedPreconditionError is raised. If False, the stage is skipped. If not given, or set to None, the default behaviour of each stage is used, as determined by its 'exraise' constructor parameter. verbose : bool, default False If True an explanation message is printed after the precondition of each stage is checked but before its application. Otherwise, no messages are printed. time : bool, default False If True, per-stage application time is measured and reported when pipeline application is done. Returns ------- pandas.DataFrame The resulting dataframe. """ for stage in self._stages: if stage._is_fittable() and not stage.is_fitted: raise UnfittedPipelineStageError(( "PipelineStage {} in pipeline is fittable but" " unfitted!").format(stage)) if time: return self.__timed_transform( X=X, y=y, exraise=exraise, verbose=verbose) inter_df = X self.application_context = PdpApplicationContext() for i, stage in enumerate(self._stages): try: stage.application_context = self.application_context inter_df = stage.transform( X=inter_df, y=None, exraise=exraise, verbose=verbose, ) except Exception as e: raise PipelineApplicationError( f"Exception raised in stage [ {i}] {stage}" ) from e self._post_transform_lock() return inter_df
Inherited members
PdPipelineStage
:AdHocStage
AggByCols
ApplyByCols
ApplyToRows
Bin
ColByFrameFunc
ColDrop
ColRename
ColReorder
ColumnDtypeEnforcer
ColumnTransformer
ColumnsBasedPipelineStage
ConditionValidator
DropDuplicates
DropNa
DropRareTokens
DropTokensByLength
DropTokensByList
Encode
FitOnly
FreqDrop
Log
MapColVals
OneHotEncode
PdPipeline
RegexReplace
RemoveStopwords
RowDrop
Scale
Schematize
SetIndex
SnowballStem
TfidfVectorizeTokenLists
TokenizeText
UntokenizeText
ValDrop
ValKeep
description
class PdPipelineStage (exraise=True, exmsg=None, desc=None, prec=None, post=None, skip=None, name='')
-
A stage of a pandas DataFrame-processing pipeline.
Parameters
exraise
:bool
, defaultTrue
- If true, a pdpipe.FailedPreconditionError is raised when this stage is applied to a dataframe for which the precondition does not hold. Otherwise the stage is skipped. Additionally, if true, a pdpipe.FailedPostconditionError is raised if an expected post-condition does not hold for an output dataframe (after pipeline application). Otherwise pipeline application continues uninterrupted.
exmsg
:str
, defaultNone
- The message of the exception that is raised on a failed precondition if exraise is set to True. A default message is used if None is given.
desc
:str
, defaultNone
- A short description of this stage, used as its string representation. A default description is used if None is given.
prec
:callable
, defaultNone
- This can be assigned a callable that returns boolean values for input
dataframes, which will be used to determine whether input dataframes
satisfy the preconditions for this pipeline stage (see the
exraise
parameter for the behaviour of failed preconditions). Seepdpipe.cond
for more information on specialised Condition objects. post
:callable
, defaultNone
- This can be assigned a callable that returns boolean values for input
dataframes, which will be used to determine whether input dataframes
satisfy the postconditions for this pipeline stage (see the
exraise
parameter for the behaviour of failed postconditions). Seepdpipe.cond
for more information on specialised Condition objects. skip
:callable
, defaultNone
- This can be assigned a callable that returns boolean values for input
dataframes, which will be used to determine whether this stage should
be skipped for input dataframes - if the callable returns True for an
input dataframe, this stage will be skipped. See
pdpipe.cond
for more information on specialised Condition objects. name
:str
, default''
- The name of this stage. Pipelines can be sliced by this name.
Attributes
fit_context
:PdpApplicationContext
- An application context object that is only re-initialized before
fit_transform
calls, and is locked after pipeline application. It is injected into the PipelineStage by the encapsulating pipeline object. application_context
:PdpApplicationContext
- An application context object that is re-initialized before every pipeline application (so, also during transform operations of fitted pipelines), and is locked after pipeline application.It is injected into the PipelineStage by the encapsulating pipeline object.
Expand source code
class PdPipelineStage(abc.ABC): """A stage of a pandas DataFrame-processing pipeline. Parameters ---------- exraise : bool, default True If true, a pdpipe.FailedPreconditionError is raised when this stage is applied to a dataframe for which the precondition does not hold. Otherwise the stage is skipped. Additionally, if true, a pdpipe.FailedPostconditionError is raised if an expected post-condition does not hold for an output dataframe (after pipeline application). Otherwise pipeline application continues uninterrupted. exmsg : str, default None The message of the exception that is raised on a failed precondition if exraise is set to True. A default message is used if None is given. desc : str, default None A short description of this stage, used as its string representation. A default description is used if None is given. prec : callable, default None This can be assigned a callable that returns boolean values for input dataframes, which will be used to determine whether input dataframes satisfy the preconditions for this pipeline stage (see the `exraise` parameter for the behaviour of failed preconditions). See `pdpipe.cond` for more information on specialised Condition objects. post : callable, default None This can be assigned a callable that returns boolean values for input dataframes, which will be used to determine whether input dataframes satisfy the postconditions for this pipeline stage (see the `exraise` parameter for the behaviour of failed postconditions). See `pdpipe.cond` for more information on specialised Condition objects. skip : callable, default None This can be assigned a callable that returns boolean values for input dataframes, which will be used to determine whether this stage should be skipped for input dataframes - if the callable returns True for an input dataframe, this stage will be skipped. See `pdpipe.cond` for more information on specialised Condition objects. name : str, default '' The name of this stage. Pipelines can be sliced by this name. Attributes ---------- fit_context : `PdpApplicationContext` An application context object that is only re-initialized before `fit_transform` calls, and is locked after pipeline application. It is injected into the PipelineStage by the encapsulating pipeline object. application_context : `PdpApplicationContext` An application context object that is re-initialized before every pipeline application (so, also during transform operations of fitted pipelines), and is locked after pipeline application.It is injected into the PipelineStage by the encapsulating pipeline object. """ _DEF_EXC_MSG = 'Precondition failed in stage {}!' _DEF_DESCRIPTION = 'A pipeline stage.' _INIT_KWARGS = ['exraise', 'exmsg', 'desc', 'prec', 'skip', 'name'] def __init__(self, exraise=True, exmsg=None, desc=None, prec=None, post=None, skip=None, name=''): if not isinstance(name, str): raise ValueError( f"'name' must be a str, not {type(name).__name__}." ) if desc is None: desc = PdPipelineStage._DEF_DESCRIPTION if exmsg is None: exmsg = PdPipelineStage._DEF_EXC_MSG.format(desc) self._exraise = exraise self._exmsg = exmsg self._exmsg_post = exmsg.replace( 'precondition', 'postcondition').replace( 'Precondition', 'Postcondition') self._desc = desc self._prec_arg = prec self._post_arg = post self._skip = skip self._appmsg = f"{name + ': ' if name else ''}{desc}" self._name = name self.fit_context: PdpApplicationContext = None self.application_context: PdpApplicationContext = None self.is_fitted = False @classmethod def _init_kwargs(cls): return cls._INIT_KWARGS @abc.abstractmethod def _prec(self, df): # pylint: disable=R0201,W0613 """Returns True if this stage can be applied to the given dataframe.""" raise NotImplementedError def _compound_prec(self, df): if self._prec_arg: return self._prec_arg(df) return self._prec(df) def _post(self, df): # pylint: disable=R0201,W0613 """Returns True if this stage resulted in an expected output frame.""" return True def _compound_post(self, df): if self._post_arg: return self._post_arg(df) return self._post(df) def _fit_transform(self, df, verbose): """Fits this stage and transforms the input dataframe.""" return self._transform(df, verbose) def _is_fittable(self): if self.__class__._fit_transform == PdPipelineStage._fit_transform: return False return True def _raise_precondition_error(self): try: raise FailedPreconditionError( f"{self._exmsg} [Reason] {self._prec_arg.error_message}") except AttributeError: raise FailedPreconditionError(self._exmsg) def _raise_postcondition_error(self): try: raise FailedPostconditionError( f"{self._exmsg_post} [Reason] {self._post_arg.error_message}") except AttributeError: raise FailedPostconditionError(self._exmsg_post) @abc.abstractmethod def _transform(self, df, verbose): """Transforms the given dataframe without fitting this stage.""" raise NotImplementedError("_transform method not implemented!") def apply(self, df, exraise=None, verbose=False): """Applies this pipeline stage to the given dataframe. If the stage is not fitted fit_transform is called. Otherwise, transform is called. Parameters ---------- df : pandas.DataFrame The dataframe to which this pipeline stage will be applied. exraise : bool, default None Override preconditions and postconditions behaviour for this call. If None, the default behaviour of this stage is used, as determined by the exraise constructor parameter. verbose : bool, default False If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False. Returns ------- pandas.DataFrame The resulting dataframe. """ if exraise is None: exraise = self._exraise if self._skip and self._skip(df): return df if self._compound_prec(df=df): if verbose: msg = '- ' + '\n '.join(textwrap.wrap(self._appmsg)) print(msg, flush=True) if self.is_fitted: res_df = self._transform(df, verbose=verbose) else: res_df = self._fit_transform(df, verbose=verbose) if exraise and not self._compound_post(df=res_df): self._raise_postcondition_error() return res_df if exraise: self._raise_precondition_error() return df __call__ = apply def fit_transform(self, X, y=None, exraise=None, verbose=False): """Fits this stage and transforms the given dataframe. Parameters ---------- X : pandas.DataFrame The dataframe to transform and fit this pipeline stage by. y : array-like, optional Targets for supervised learning. exraise : bool, default None Override preconditions and postconditions behaviour for this call. If None, the default behaviour of this stage is used, as determined by the exraise constructor parameter. verbose : bool, default False If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False. Returns ------- pandas.DataFrame The resulting dataframe. """ if exraise is None: exraise = self._exraise if self._compound_prec(X): if verbose: msg = '- ' + '\n '.join(textwrap.wrap(self._appmsg)) print(msg, flush=True) res_df = self._fit_transform(X, verbose=verbose) if exraise and not self._compound_post(df=res_df): self._raise_postcondition_error() return res_df if exraise: self._raise_precondition_error() return X def fit(self, X, y=None, exraise=None, verbose=False): """Fits this stage without transforming the given dataframe. Parameters ---------- X : pandas.DataFrame The dataframe to be transformed. y : array-like, optional Targets for supervised learning. exraise : bool, default None Override preconditions and postconditions behaviour for this call. If None, the default behaviour of this stage is used, as determined by the exraise constructor parameter. verbose : bool, default False If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False. Returns ------- pandas.DataFrame The resulting dataframe. """ if exraise is None: exraise = self._exraise if self._compound_prec(X): if verbose: msg = '- ' + '\n '.join(textwrap.wrap(self._appmsg)) print(msg, flush=True) res_df = self._fit_transform(X, verbose=verbose) if exraise and not self._compound_post(df=res_df): self._raise_postcondition_error() return X if exraise: self._raise_precondition_error() return X def transform(self, X, y=None, exraise=None, verbose=False): """Transforms the given dataframe without fitting this stage. If this stage is fittable but is not fitter, an UnfittedPipelineStageError is raised. Parameters ---------- X : pandas.DataFrame The dataframe to be transformed. y : array-like, optional Targets for supervised learning. exraise : bool, default None Override preconditions and postconditions behaviour for this call. If None, the default behaviour of this stage is used, as determined by the exraise constructor parameter. verbose : bool, default False If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False. Returns ------- pandas.DataFrame The resulting dataframe. """ if exraise is None: exraise = self._exraise if self._compound_prec(X): if verbose: msg = '- ' + '\n '.join(textwrap.wrap(self._appmsg)) print(msg, flush=True) if self._is_fittable(): if self.is_fitted: res_df = self._transform(X, verbose=verbose) if exraise and not self._compound_post(df=res_df): self._raise_postcondition_error() return res_df raise UnfittedPipelineStageError( "transform of an unfitted pipeline stage was called!") res_df = self._transform(X, verbose=verbose) if exraise and not self._compound_post(df=res_df): self._raise_postcondition_error() return res_df if exraise: self._raise_precondition_error() return X def __add__(self, other): if isinstance(other, PdPipeline): return PdPipeline([self, *other._stages]) if isinstance(other, PdPipelineStage): return PdPipeline([self, other]) return NotImplemented def __str__(self): return f"PdPipelineStage: {self._desc}" def __repr__(self): return self.__str__() def description(self): """Returns the description of this pipeline stage""" return self._desc def _mem_str(self): total = asizeof(self) lines = [] for a in dir(self): if not a.startswith('__'): att = getattr(self, a) if not callable(att): size = asizeof(att) if size > 500000: # pragma: no cover lines.append(' - {}, {:.2f}Mb ({:0>5.2f}%)\n'.format( a, size / 1000000, 100 * size / total)) elif size > 1000: # pragma: no cover lines.append(' - {}, {:.2f}Kb ({:0>5.2f}%)\n'.format( a, size / 1000, 100 * size / total)) else: lines.append(' - {}, {}b ({:0>5.2f}%)\n'.format( a, size, 100 * size / total)) return ''.join(lines)
Ancestors
- abc.ABC
Subclasses
- ColRename
- ColReorder
- ColumnDtypeEnforcer
- ConditionValidator
- DropNa
- FreqDrop
- Schematize
- SetIndex
- ApplyToRows
- Bin
- ColByFrameFunc
- AdHocStage
- ColumnsBasedPipelineStage
- PdPipeline
- pdpipe.df._DataFrameMethodTransformer
- TfidfVectorizeTokenLists
- FitOnly
Methods
def AdHocStage(self, transform, fit_transform=None, prec=None, **kwargs)
-
Creates and adds an ad-hoc stage of a pandas DataFrame-processing pipeline to this pipeline stage.
The signature for both the
transform
and the optionalfit_transform
callables is adaptive: The first argument is used positionally (so no specific name is assumed or used) to supply the callable with the pandas DataFrame object to transform. The following additional keyword arguments are supplied if the are included in the callable's signature:verbose
- Passed on from PdPipelineStage'sfit
,fit_transform
andapply
methods.fit_context
andapplication_context
- Provides fit-specific and application-specific contexts (seePdpApplicationContext
) usually available to pipeline stages usingself.fit_context
andself.application_context
.Parameters
transform
:callable
- The transformation this stage applies to dataframes. If the fit_transform parameter is also populated than this transformation is only applied on calls to transform. See documentation for the exact signature.
fit_transform
:callable
, optional- The transformation this stage applies to dataframes, only on fit_transform. Optional. See documentation for the exact signature.
prec
:callable
, defaultNone
- A callable that returns a boolean value. Represent a a precondition used to determine whether this stage can be applied to a given dataframe. If None is given, set to a function always returning True.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1, 'a'], [2, 'b']], [1, 2], ['num', 'char']) >>> drop_num = pdp.AdHocStage( ... transform=lambda df: df.drop(['num'], axis=1), ... prec=lambda df: 'num' in df.columns ... ) >>> drop_num.apply(df) char 1 a 2 b
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def AggByCols(self, columns, func, result_columns=None, drop=True, func_desc=None, suffix=None, **kwargs)
-
Creates and adds a pipeline stage applying a series-wise function to columns to this pipeline stage. For applying element-wise function, see
ApplyByCols
.Parameters
columns
:single label, list-like
orcallable
- Column labels in the DataFrame to be transformed. Alternatively, this
parameter can be assigned a callable returning an iterable of labels
from an input pandas.DataFrame. See
pdpipe.cq
. func
:function
- The function to be applied to each of the given columns. Must work when given a pandas.Series object and return either a Scaler or `pandas.Series``. If a Scaler is returned, the result is broadcasted into a column of the original length.
result_columns
:str
orlist-like
, defaultNone
- The names of the new columns resulting from the mapping operation. Must be of the same length as columns. If None, behavior depends on the drop parameter: If drop is True, the name of the source column is used; otherwise, the name of the source column is used with a defined suffix.
drop
:bool
, defaultTrue
- If set to True, source columns are dropped after being mapped.
func_desc
:str
, defaultNone
- A function description of the given function; e.g. 'normalizing revenue by company size'. A default description is used if None is given.
suffix
:str
, optional- The suffix to add to resulting columns in case where results_columns is None and drop is set to False. Of not given, defaults to '_agg'.
Example
>>> import pandas as pd; import pdpipe as pdp; import numpy as np; >>> data = [[3.2, "acd"], [7.2, "alk"], [12.1, "alk"]] >>> df = pd.DataFrame(data, [1,2,3], ["ph","lbl"]) >>> log_ph = pdp.AggByCols("ph", np.log) >>> log_ph(df) ph lbl 1 1.163151 acd 2 1.974081 alk 3 2.493205 alk >>> min_ph = pdp.AggByCols("ph", min, drop=False, suffix='_min') >>> min_ph(df) ph ph_min lbl 1 3.2 3.2 acd 2 7.2 3.2 alk 3 12.1 3.2 alk
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def ApplyByCols(self, columns, func, result_columns=None, drop=True, func_desc=None, suffix=None, **kwargs)
-
Creates and adds a pipeline stage applying an element-wise function to columns to this pipeline stage. For applying series-wise function, see
AggByCols
.Parameters
columns
:single label, list-like
orcallable
- Column labels in the DataFrame to be transformed. Alternatively, this
parameter can be assigned a callable returning an iterable of labels
from an input pandas.DataFrame. See
pdpipe.cq
. func
:function
- The function to be applied to each element of the given columns.
result_columns
:str
orlist-like
, defaultNone
- The names of the new columns resulting from the mapping operation. Must be of the same length as columns. If None, behavior depends on the drop parameter: If drop is True, the name of the source column is used; otherwise, the name of the source column is used with the suffix '_app'.
drop
:bool
, defaultTrue
- If set to True, source columns are dropped after being mapped.
func_desc
:str
, defaultNone
- A function description of the given function; e.g. 'normalizing revenue by company size'. Optional.
suffix
:str
, defaultNone
- If provided, this string is concated to resulting column labels instead of '_app'.
Example
>>> import pandas as pd; import pdpipe as pdp; import math; >>> data = [[3.2, "acd"], [7.2, "alk"], [12.1, "alk"]] >>> df = pd.DataFrame(data, [1,2,3], ["ph","lbl"]) >>> round_ph = pdp.ApplyByCols("ph", math.ceil) >>> round_ph(df) ph lbl 1 4 acd 2 8 alk 3 13 alk
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def ApplyToRows(self, func, colname=None, follow_column=None, func_desc=None, prec=None, **kwargs)
-
Creates and adds a pipeline stage generating columns by applying a function to each row to this pipeline stage.
Parameters
func
:function
- The function to be applied to each row of the processed DataFrame.
colname
:single label
, defaultNone
- The label of the new column resulting from the function application. If None, 'new_col' is used. Ignored if a DataFrame is generated by the function (i.e. each row generates a Series rather than a value), in which case the label of each column in the resulting DataFrame is used.
follow_column
:str
, defaultNone
- Resulting columns will be inserted after this column. If None, new columns are inserted at the end of the processed DataFrame.
func_desc
:str
, defaultNone
- A function description of the given function; e.g. 'normalizing revenue by company size'. A default description is used if None is given.
prec
:function
, defaultNone
- A function taking a DataFrame, returning True if this stage is applicable to the given DataFrame. If None is given, a function always returning True is used.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> data = [[3, 2143], [10, 1321], [7, 1255]] >>> df = pd.DataFrame(data, [1,2,3], ['years', 'avg_revenue']) >>> total_rev = lambda row: row['years'] * row['avg_revenue'] >>> add_total_rev = pdp.ApplyToRows(total_rev, 'total_revenue') >>> add_total_rev(df) years avg_revenue total_revenue 1 3 2143 6429 2 10 1321 13210 3 7 1255 8785 >>> def halfer(row): ... new = {'year/2': row['years']/2, 'rev/2': row['avg_revenue']/2} ... return pd.Series(new) >>> half_cols = pdp.ApplyToRows(halfer, follow_column='years') >>> half_cols(df) years rev/2 year/2 avg_revenue 1 3 1071.5 1.5 2143 2 10 660.5 5.0 1321 3 7 627.5 3.5 1255
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def Bin(self, bin_map, drop=True, **kwargs)
-
Creates and adds a pipeline stage that adds a binned version of a column or columns to this pipeline stage.
If drop is set to True, the new columns retain the names of the source columns; otherwise, the resulting column gain the suffix '_bin'
Parameters
bin_map
:dict
- Maps column labels to bin arrays. The bin array is interpreted as containing start points of consecutive bins, except for the final point, assumed to be the end point of the last bin. Additionally, a bin array implicitly projects a left-most bin containing all elements smaller than the left-most end point and a right-most bin containing all elements larger that the right-most end point. For example, the list [0, 5, 8] is interpreted as the bins (-∞, 0), [0-5), [5-8) and [8, ∞).
drop
:bool
, defaultTrue
- If set to True, the source columns are dropped after being binned.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[-3],[4],[5],[9]], [1,2,3,4], ['speed']) >>> pdp.Bin({'speed': [5]}, drop=False).apply(df) speed speed_bin 1 -3 <5 2 4 <5 3 5 5≤ 4 9 5≤ >>> pdp.Bin({'speed': [0,5,8]}, drop=False).apply(df) speed speed_bin 1 -3 <0 2 4 0-5 3 5 5-8 4 9 8≤
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def ColByFrameFunc(self, column, func, follow_column=None, before_column=None, func_desc=None, **kwargs)
-
Creates and adds a pipeline stage adding a column by applying a dataframe-wide function to this pipeline stage.
Note that assigning
column
with the label of an existing column and providing the same label to thebefore_column
parameter will result in replacing the original column at the same location.Parameters
column
:str
- The label of the resulting column. If its the label of an existing column it will replace that column.
func
:function
- The function to be applied to the input dataframe. The function should return a pandas.Series object.
follow_column
:str
, defaultNone
- Resulting columns will be inserted after this column. If both this
parameter and
before_column
are None, new columns are inserted at the end of the processed DataFrame. before_column
:str
, defaultNone
- Resulting columns will be inserted before this column. If both this
parameter and
follow_colum
are None, new columns are inserted at the end of the processed DataFrame. If both are provided,before_column
takes precedence. func_desc
:str
, defaultNone
- A function description of the given function; e.g. 'normalizing revenue by company size'. A default description is used if None is given.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> data = [[3, 3], [2, 4], [1, 5]] >>> df = pd.DataFrame(data, [1,2,3], ["A","B"]) >>> func = lambda df: df['A'] == df['B'] >>> add_equal = pdp.ColByFrameFunc("A==B", func) >>> add_equal(df) A B A==B 1 3 3 True 2 2 4 False 3 1 5 False
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def ColDrop(self, columns: Union[object, List[object], Callable], errors: Optional[str] = None, **kwargs: object)
-
Creates and adds a pipeline stage that drops columns by name to this pipeline stage.
Parameters
columns
:single label, list-like
orcallable
- The label, or an iterable of labels, of columns to drop. Alternatively,
this parameter can be assigned a callable returning an iterable of
labels from an input pandas.DataFrame (see
pdpipe.cq
). errors
:{‘ignore’, ‘raise’}
, default‘raise’
- If ‘ignore’, suppress error and existing labels are dropped.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'char']) >>> pdp.ColDrop('num').apply(df) char 1 a 2 b
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def ColRename(self, rename_mapper: Union[Dict[~KT, ~VT], Callable], **kwargs)
-
Creates and adds a pipeline stage that renames a column or columns to this pipeline stage.
Parameters
rename_mapper
:dict-like
orcallable
- Maps old column names to new ones.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'char']) >>> pdp.ColRename({'num': 'len', 'char': 'initial'}).apply(df) len initial 1 8 a 2 5 b >>> def renamer(lbl: str): ... if lbl.startswith('n'): ... return 'foo' ... return lbl >>> pdp.ColRename(renamer).apply(df) foo char 1 8 a 2 5 b
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def ColReorder(self, positions, **kwargs)
-
Creates and adds a pipeline stage that reorders columns to this pipeline stage.
Parameters
positions
:dict
- A mapping of column names to their desired positions after reordering. Columns not included in the mapping will maintain their relative positions over the non-mapped colums.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[8,4,3,7]], columns=['a', 'b', 'c', 'd']) >>> pdp.ColReorder({'b': 0, 'c': 3}).apply(df) b a d c 0 4 8 7 3
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def ColumnDtypeEnforcer(self, column_to_dtype: Dict[~KT, ~VT], errors: Optional[str] = 'raise', **kwargs: object)
-
Creates and adds a pipeline stage enforcing column dtypes to this pipeline stage.
Parameters
column_to_dtype
:dict
oflabels / ColumnQualifiers to dtypes
- Use {col: dtype, …}, where col is a column label and dtype is a
numpy.dtype or Python type to cast one or more of the DataFrame’s
columns to column-specific types. Alternatively, you can provide
ColumnQualifier
objects as keys. If at least one such key is present, the lbl-to-dtype dict is dynamically inferred each time the pipeline stage is applied (note thatColumnQualifier
objects are fittable by default, so to have column labels re-inferred after the first stage application you'll have to setfittable=False
for theColumnQualifier
you use, seepdpipe.cq
). errors
:{‘raise’, ‘ignore’}
, default‘raise’
- Control raising of exceptions on invalid data for provided dtype. - raise : allow exceptions to be raised - ignore : suppress exceptions. On error return original object.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'initial']) >>> pdp.ColumnDtypeEnforcer({'num': float}).apply(df) num initial 1 8.0 a 2 5.0 b >>> pdp.ColumnDtypeEnforcer({pdp.cq.StartWith('n'): float}).apply(df) num initial 1 8.0 a 2 5.0 b
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def ColumnTransformer(self, columns, result_columns=None, drop=True, suffix=None, **kwargs)
-
Creates and adds a pipeline stage that applies transformation to dataframe columns to this pipeline stage.
Parameters
columns
:single label, list-like
orcallable
- Column labels in the DataFrame to be transformed. Alternatively, this
parameter can be assigned a callable returning an iterable of labels
from an input pandas.DataFrame. See
pdpipe.cq
. If None is provided all input columns are transformed. result_columns
:single label
orlist-like
, defaultNone
- Labels for the new columns resulting from the transformations. Must be of the same length as columns. If None, behavior depends on the drop parameter: If drop is True, then the label of the source column is used; otherwise, the provided 'suffix' is concatenated to the label of the source column.
drop
:bool
, defaultTrue
- If set to True, source columns are dropped after being transformed.
suffix
:str
, default'_transformed'
- The suffix transformed columns gain if no new column labels are given.
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def ColumnsBasedPipelineStage(self, columns, exclude_columns=None, desc_temp=None, none_columns='error', **kwargs)
-
Creates and adds a pipeline stage that operates on a subset of dataframe columns to this pipeline stage.
Parameters
columns
:single label, iterable
orcallable
- The label, or an iterable of labels, of columns to use. Alternatively,
this parameter can be assigned a callable returning an iterable of
labels from an input pandas.DataFrame. See
pdpipe.cq
. exclude_columns
:single label, iterable
orcallable
, optional- The label, or an iterable of labels, of columns to exclude, given the
columns
parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. Seepdpipe.cq
. Optional. By default no columns are excluded. desc_temp
:str
, optional- If given, assumed to be a format string, and every appearance of {} in
it is replaced with an appropriate string representation of the columns
parameter, and is used as the pipeline description. Ignored if
desc
is provided. none_columns
:iterable, callable
orstr
, default'error'
- Determines how None values supplied to the 'columns' parameter should
be handled. If set to 'error', the default, a ValueError is raised if
None is encountered. If set to 'all', it is interpreted to mean all
columns of input dataframes should be operated on. If an iterable is
provided it is interpreted as the default list of columns to operate on
when
columns=None
. If a callable is provided, it is interpreted as the default column qualifier that determines input columns whencolumns=None
. **kwargs
- Additionally supports all constructor parameters of PdPipelineStage.
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def ConditionValidator(self, conditions: Union[Callable, List[Callable]], reducer: Optional[Callable] = <built-in function all>, errors: Optional[str] = 'raise', **kwargs: object)
-
Creates and adds a pipeline stage that validates boolean conditions on dataframes to this pipeline stage.
The stage does not change the input dataframe in any way.
The constructor expects either a single callable or a list-like of callable objects, and checks that all these callable return True - meaning all defined conditions hold - for input dataframes.
Naturally, pdpipe
Condition
objects from thepdpipe.cond
module can be used.Parameters
conditions
:callable
orlist-like
ofcallable
- The conditions to check for input dataframes. Naturally, pdpipe
Condition
objects from thepdpipe.cond
module can be used. reducer
:callable
, optional- The callable that reduces the list of boolean result to a single
result. By default the built-in
all
function is used, so all conditions must hold for this pipeline stage to validate an input dataframe. The built-inany
function may be used to validate at least one condition holds, and of course custom reducing functions can be used. errors
:str
, default'raise'
- If set to 'raise', the default, then if the result boolean result is
False a FailedConditionError is raised on stage application. If set to
'ignore', then conditions are checked, the results are printed if the
application was called with
verbose=True
, and pipeline application continues. Any other value is interpreted as 'raise'.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1,4],[4,None],[1,11]], [1,2,3], ['a','b']) >>> pdp.ConditionValidator(lambda df: len(df.columns) == 5).apply(df) Traceback (most recent call last): ... pdpipe.exceptions.FailedConditionError: ConditionValidator stage failed; some conditions did not hold for the input dataframe! >>> pdp.ConditionValidator(pdp.cond.HasNoMissingValues()).apply(df) Traceback (most recent call last): ... pdpipe.exceptions.FailedConditionError: ConditionValidator stage failed; some conditions did not hold for the input dataframe!
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def DropDuplicates(self, columns=None, **kwargs)
-
Drop duplicates in the given columns to this pipeline stage.
Parameters
columns
:column label
orsequence
oflabels
, optional- The labels of the columns to consider for duplication drop. If not populated, duplicates are dropped from all columns.
exclude_columns
:object, iterable
orcallable
, optional- The label, or an iterable of labels, of columns to exclude, given the
columns
parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. Seepdpipe.cq
. Optional. By default no columns are excluded.
Examples
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[8, 1],[8, 2], [9, 2]], [1,2,3], ['a', 'b']) >>> pdp.DropDuplicates('a').apply(df) a b 1 8 1 3 9 2
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def DropNa(self, **kwargs)
-
Creates and adds a pipeline stage that drops null values to this pipeline stage.
Supports all parameter supported by pandas.dropna function.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1,4],[4,None],[1,11]], [1,2,3], ['a','b']) >>> pdp.DropNa().apply(df) a b 1 1 4.0 3 1 11.0
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def DropRareTokens(self, columns, threshold, drop=True, **kwargs)
-
Creates and adds a pipeline stage that drop rare tokens from token lists to this pipeline stage.
Target columns must be series of token lists; i.e. every cell in the series is an iterable of string tokens.
Note: The nltk package must be installed for this pipeline stage to work.
Parameters
columns
:single label, list-like
orcallable
- Column labels in the DataFrame to be transformed. Alternatively, this
parameter can be assigned a callable returning an iterable of labels
from an input pandas.DataFrame. See
pdpipe.cq
. threshold
:int
- The rarity threshold to use. Only tokens appearing more than this number of times in a column will remain in token lists in that column.
drop
:bool
, defaultTrue
- If set to True, the source columns are dropped after being transformed, and the resulting columns retain the names of the source columns. Otherwise, the new columns gain the suffix '_norare'.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> data = [[7, ['a', 'a', 'b']], [3, ['b', 'c', 'd']]] >>> df = pd.DataFrame(data, columns=['num', 'chars']) >>> rare_dropper = pdp.DropRareTokens('chars', 1) >>> rare_dropper(df) num chars 0 7 [a, a, b] 1 3 [b]
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def DropTokensByLength(self, columns, min_len, max_len=None, result_columns=None, drop=True, **kwargs)
-
Creates and adds a pipeline stage removing tokens by length in string-token list columns to this pipeline stage.
Parameters
columns
:single label, list-like
orcallable
- Names of token list columns on which to apply token filtering.
Alternatively, this parameter can be assigned a callable returning an
iterable of labels from an input pandas.DataFrame. See
pdpipe.cq
. min_len
:int
- The minimum length of tokens to keep. Tokens of shorter length are removed from all token lists.
max_len
:int
, defaultNone
- The maximum length of tokens to keep. If provided, tokens of longer length are removed from all token lists.
result_columns
:str
orlist-like
, defaultNone
- The names of the new columns resulting from the mapping operation. Must be of the same length as columns. If None, behavior depends on the drop parameter: If drop is True, the name of the source column is used; otherwise, the name of the source column is used with the suffix '_filtered'.
drop
:bool
, defaultTrue
- If set to True, source columns are dropped after being transformed.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> data = [[4, ["a", "bad", "nice"]], [5, ["good", "university"]]] >>> df = pd.DataFrame(data, [1,2], ["age","text"]) >>> filter_tokens = pdp.DropTokensByLength('text', 3, 5) >>> filter_tokens(df) age text 1 4 [bad, nice] 2 5 [good]
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def DropTokensByList(self, columns, bad_tokens, result_columns=None, drop=True, **kwargs)
-
Creates and adds a pipeline stage removing specific tokens in string-token list columns to this pipeline stage.
Parameters
columns
:single label, list-like
orcallable
- Names of token list columns on which to apply token filtering.
Alternatively, this parameter can be assigned a callable returning an
iterable of labels from an input pandas.DataFrame. See
pdpipe.cq
. bad_tokens
:list
ofstr
- The list of string tokens to remove from all token lists.
result_columns
:str
orlist-like
, defaultNone
- The names of the new columns resulting from the mapping operation. Must be of the same length as columns. If None, behavior depends on the drop parameter: If drop is True, the name of the source column is used; otherwise, the name of the source column is used with the suffix '_filtered'.
drop
:bool
, defaultTrue
- If set to True, source columns are dropped after being transformed.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> data = [[4, ["a", "bad", "cat"]], [5, ["bad", "not", "good"]]] >>> df = pd.DataFrame(data, [1,2], ["age","text"]) >>> filter_tokens = pdp.DropTokensByList('text', ['bad']) >>> filter_tokens(df) age text 1 4 [a, cat] 2 5 [not, good]
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def Encode(self, columns=None, exclude_columns=None, drop=True, **kwargs)
-
Creates and adds a pipeline stage that encodes categorical columns to integer values to this pipeline stage.
The encoder for each column is saved in the attribute 'encoders', which is a dict mapping each encoded column name to the sklearn.preprocessing.LabelEncoder object used to encode it.
Parameters
columns
:single label, list-like
orcallable
, defaultNone
- Column labels in the DataFrame to be encoded. If columns is None then
all the columns with object or category dtype will be converted, except
those given in the exclude_columns parameter. Alternatively,
this parameter can be assigned a callable returning an iterable of
labels from an input pandas.DataFrame. See
pdpipe.cq
. exclude_columns
:single label, list-like
orcallable
, defaultNone
- Label or labels of columns to be excluded from encoding. If None then
no column is excluded. Alternatively, this parameter can be assigned a
callable returning an iterable of labels from an input
pandas.DataFrame. See
pdpipe.cq
. drop
:bool
, defaultTrue
- If set to True, the source columns are dropped after being encoded, and the resulting encoded columns retain the names of the source columns. Otherwise, encoded columns gain the suffix '_enc'.
Attributes
encoders
:dict
- A dictionary mapping each encoded column name to the corresponding sklearn.preprocessing.LabelEncoder object. Empty object if not fitted.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> data = [[3.2, "acd"], [7.2, "alk"], [12.1, "alk"]] >>> df = pd.DataFrame(data, [1,2,3], ["ph","lbl"]) >>> encode_stage = pdp.Encode("lbl") >>> encode_stage(df) ph lbl 1 3.2 0 2 7.2 1 3 12.1 1 >>> encode_stage.encoders["lbl"].inverse_transform([0,1,1]) array(['acd', 'alk', 'alk'], dtype=object)
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def FitOnly(self, stage, **kwargs)
-
Creates and adds a wrapper that applies a stage to input data only when fitting to this pipeline stage.
In other words, the input data is not transformed if the stage has already been fitted once.
Parameters
stage
:PdPipelineStage
- The pipeline stage to operate on input data only when fitting.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'char']) >>> stage = pdp.FitOnly(pdp.ColDrop('num')) >>> stage(df) char 1 a 2 b >>> df2 = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'char']) >>> stage(df2) num char 1 8 a 2 5 b
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def FreqDrop(self, threshold: int, column: str, **kwargs)
-
Creates and adds a pipeline stage that drops rows by value frequency to this pipeline stage.
Parameters
threshold
:int
- The minimum frequency required for a value to be kept.
column
:str
- The name of the colum to check for the given value frequency.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1,4],[4,5],[1,11]], [1,2,3], ['a','b']) >>> pdp.FreqDrop(2, 'a').apply(df) a b 1 1 4 3 1 11
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def Log(self, columns=None, exclude_columns=None, drop=False, non_neg=False, const_shift=None, **kwargs)
-
Creates and adds a pipeline stage that log-transforms numeric data to this pipeline stage.
Parameters
columns
:single label, list-like
orcallable
, defaultNone
- Column names in the DataFrame to be encoded. If columns is None then
all the columns with a numeric dtype will be transformed, except those
given in the exclude_columns parameter. Alternatively,
this parameter can be assigned a callable returning an iterable of
labels from an input pandas.DataFrame. See
pdpipe.cq
. exclude_columns
:single label, list-like
orcallable
, defaultNone
- Label or labels of columns to be excluded from encoding. If None then
no column is excluded. Alternatively, this parameter can be assigned a
callable returning an iterable of labels from an input
pandas.DataFrame. See
pdpipe.cq
. Optional. drop
:bool
, defaultFalse
- If set to True, the source columns are dropped after being encoded, and the resulting encoded columns retain the names of the source columns. Otherwise, encoded columns gain the suffix '_log'.
non_neg
:bool
, defaultFalse
- If True, each transformed column is first shifted by the smallest negative value it includes (non-negative columns are thus not shifted).
const_shift
:int
, optional- If given, each transformed column is first shifted by this constant. If non_neg is True then that transformation is applied first, and only then is the column shifted by this constant.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> data = [[3.2, "acd"], [7.2, "alk"], [12.1, "alk"]] >>> df = pd.DataFrame(data, [1,2,3], ["ph","lbl"]) >>> log_stage = pdp.Log("ph", drop=True) >>> log_stage(df) ph lbl 1 1.163151 acd 2 1.974081 alk 3 2.493205 alk
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def MapColVals(self, columns: Union[object, List[object], Callable], value_map: Union[dict, pandas.core.series.Series, Callable, str, Tuple[str, dict]], result_columns: Union[object, List[object], None] = None, drop: Optional[bool] = True, suffix: Optional[str] = None, **kwargs: Dict[str, object])
-
Creates and adds a pipeline stage that replaces the values of a column by a map to this pipeline stage.
Parameters
columns
:single label, list-like
orcallable
- Column labels in the DataFrame to be mapped. Alternatively, this
parameter can be assigned a callable returning an iterable of labels
from an input pandas.DataFrame. See
pdpipe.cq
. If None is provided all input columns are mapped. value_map
:dict, pandas.Series, callable, str
ortuple
- The value-to-value map to use, mapping existing values to new one. If a dictionary is provided, its mapping is used. Values not in the dictionary as keys will be converted to NaN. If a Series is given, values are mapped by its index to its values. If a callable is given, it is applied element-wise to given columns. If a string is given, it is interpreted as the name of an attribute or a property of the series values to use as target values. If a tuple is provided, its first element is expected to be a string, interpreted as a name of a method of the series values to call, and its second element is expected to be a dict - possibly empty - mapping additional keyword arguments names to their values.
result_columns
:single label
orlist-like
, defaultNone
- Labels for the new columns resulting from the mapping operation. Must be of the same length as columns. If None, behavior depends on the drop parameter: If drop is True, then the label of the source column is used; otherwise, the label of the source column is used with the suffix given ("_map" by default).
drop
:bool
, defaultTrue
- If set to True, source columns are dropped after being mapped.
suffix
:str
, default'_map'
- The suffix mapped columns gain if no new column labels are given.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1], [3], [2]], ['UK', 'USSR', 'US'], ['Medal']) >>> value_map = {1: 'Gold', 2: 'Silver', 3: 'Bronze'} >>> pdp.MapColVals('Medal', value_map).apply(df) Medal UK Gold USSR Bronze US Silver >>> from datetime import timedelta; >>> df = pd.DataFrame( ... data=[ ... [timedelta(weeks=2)], ... [timedelta(weeks=4)], ... [timedelta(weeks=10)] ... ], ... index=['proposal', 'midterm', 'finals'], ... columns=['Due'], ... ) >>> pdp.MapColVals('Due', ('total_seconds', {})).apply(df) Due proposal 1209600.0 midterm 2419200.0 finals 6048000.0
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def OneHotEncode(self, columns=None, dummy_na=False, exclude_columns=None, drop_first=True, drop=True, **kwargs)
-
Creates and adds a pipeline stage that one-hot-encodes categorical columns to this pipeline stage.
By default only k-1 dummies are created fo k categorical levels, as to avoid perfect multicollinearity between the dummy features (also called the dummy variable trap). This is done since features are usually one-hot encoded for use with linear models, which require this behaviour.
Parameters
columns
:single label, list-like
orcallable
, defaultNone
- Column labels in the DataFrame to be encoded. If columns is None then
all the columns with object or category dtype will be converted, except
those given in the exclude_columns parameter. Alternatively,
this parameter can be assigned a callable returning an iterable of
labels from an input pandas.DataFrame. See
pdpipe.cq
. dummy_na
:bool
, defaultFalse
- Add a column to indicate NaNs, if False NaNs are ignored.
exclude_columns
:single label, list-like
orcallable
, defaultNone
- Label or labels of columns to be excluded from encoding. If None then
no column is excluded. Alternatively, this parameter can be assigned a
callable returning an iterable of labels from an input
pandas.DataFrame. See
pdpipe.cq
. Optional. drop_first
:bool
orsingle label
, defaultTrue
- Whether to get k-1 dummies out of k categorical levels by removing the first level. If a non bool argument matching one of the categories is provided, the dummy column corresponding to this value is dropped instead of the first level; if it matches no category the first category will still be dropped.
drop
:bool
, defaultTrue
- If set to True, the source columns are dropped after being encoded.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([['USA'], ['UK'], ['Greece']], [1,2,3], ['Born']) >>> pdp.OneHotEncode().apply(df) Born_UK Born_USA 1 0 1 2 1 0 3 0 0
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def PdPipeline(self, stages, transformer_getter=None, **kwargs)
-
Creates and adds a pipeline for processing pandas DataFrame objects to this pipeline stage.
transformer_getter
is useful to avoid applying pipeline stages that are aimed to filter out items in a big dataset to create a training set for a machine learning model, for example, but should not be applied on future individual items to be transformed by the fitted pipeline.Parameters
stages
:list
- A list of PdPipelineStage objects making up this pipeline.
transform_getter
:callable
, optional- A callable that can be applied to the fitted pipeline to produce a sub-pipeline of it which should be used to transform dataframes after the pipeline has been fitted. If not given, the fitted pipeline is used entirely.
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def RegexReplace(self, columns: Union[object, List[object], Callable], pattern: str, replace: str, flags: Optional[int] = 0, result_columns: Union[object, List[object], None] = None, drop: Optional[bool] = True, func_desc: Optional[str] = None, **kwargs)
-
Creates and adds a pipeline stage replacing regex occurences in a text column to this pipeline stage.
Parameters
columns
:single label, list-like
orcallable
- Column labels in the DataFrame which regex replacement be applied to.
Alternatively, this parameter can be assigned a callable returning an
iterable of labels from an input pandas.DataFrame. See
pdpipe.cq
. pattern
:str
- The regex whose occurences will be replaced.
replace
:str
- The replacement string to use. This is equivalent to repl in re.sub.
flags
:int
, default0
- Regex flags that are compatible with Python's
re
module. result_columns
:label
orlist-like
oflabels
, defaultNone
- The labels of the new columns resulting from the mapping operation. Must be of the same length as columns. If None, behavior depends on the drop parameter: If drop is True, the label of the source column is used; otherwise, the label of the source column is casted to a string and concatenated with the suffix '_reg'.
drop
:bool
, defaultTrue
- If set to True, source columns are dropped after being transformed.
Example
>>> import pandas as pd; import pdpipe as pdp; import re; >>> data = [[4, "more than 12"], [5, "with 5 more"]] >>> df = pd.DataFrame(data, [1,2], ["age","text"]) >>> clean_num = pdp.RegexReplace('text', r'\b[0-9]+\b', "NUM") >>> clean_num(df) age text 1 4 more than NUM 2 5 with NUM more >>> data = [["Mr. John", 18], ["MR. Bob", 25]] >>> df = pd.DataFrame(data, [1,2], ["name","age"]) >>> match_men = r'^mr.*' >>> censor_men = pdp.RegexReplace( ... 'name', match_men, "x", flags=re.IGNORECASE ... ) >>> censor_men(df) name age 1 x 18 2 x 25
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def RemoveStopwords(self, language, columns, drop=True, **kwargs)
-
Creates and adds a pipeline stage that removes stopwords from a tokenized list to this pipeline stage.
Target columns must be series of token lists; i.e. every cell in the series is an iterable of string tokens.
Note: The nltk package must be installed for this pipeline stage to work.
Parameters
language
:str
orarray-like
- If a string is given, interpreted as the language of the stopwords, and should then be one of the languages supported by the NLTK Stopwords Corpus. If a list is given, it is assumed to be the list of stopwords to remove.
columns
:single label, list-like
orcallable
- Column labels in the DataFrame to be transformed. Alternatively, this
parameter can be assigned a callable returning an iterable of labels
from an input pandas.DataFrame. See
pdpipe.cq
. drop
:bool
, defaultTrue
- If set to True, the source columns are dropped after stopword removal, and the resulting columns retain the names of the source columns. Otherwise, resulting columns gain the suffix '_nostop'.
Example
>> import pandas as pd; import pdpipe as pdp; >> data = [[3.2, ['kick', 'the', 'baby']]] >> df = pd.DataFrame(data, [1], ['freq', 'content']) >> remove_stopwords = pdp.RemoveStopwords('english', 'content') >> remove_stopwords(df) freq content 1 3.2 [kick, baby]
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def RowDrop(self, conditions, reduce=None, columns=None, **kwargs)
-
Creates and adds a pipeline stage that drops rows by callable conditions to this pipeline stage.
Parameters
conditions
:list-like
ordict
- The list of conditions that make a row eligible to be dropped. Each condition must be a callable that take a cell value and return a bool value. If a list of callables is given, the conditions are checked for each column value of each row. If a dict mapping column labels to callables is given, then each condition is only checked for the column values of the designated column.
reduce
:'any', 'all'
or'xor'
, default'any'
- Determines how row conditions are reduced. If set to 'all', a row must satisfy all given conditions to be dropped. If set to 'any', rows satisfying at least one of the conditions are dropped. If set to 'xor', rows satisfying exactly one of the conditions will be dropped. Set to 'any' by default.
columns
:single label, iterable
orcallable
, optional- The label, or an iterable of labels, of columns. Alternatively,
this parameter can be assigned a callable returning an iterable of
labels from an input pandas.DataFrame. See
pdpipe.cq
. If given, input conditions will be applied to the sub-dataframe made up of these columns to determine which rows to drop. Ignored ifconditions
is provided with a dict object. Ifconditions
is a list and this parameter is not provided, all columns are checked (unlessexclude_columns
is additionally provided) exclude_columns
:single label, iterable
orcallable
, optional- The label, or an iterable of labels, of columns to exclude, given the
columns
parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. Seepdpipe.cq
. Optional. By default no columns are excluded.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1,4],[4,5],[5,11]], [1,2,3], ['a','b']) >>> pdp.RowDrop([lambda x: x < 2]).apply(df) a b 2 4 5 3 5 11 >>> pdp.RowDrop({'a': lambda x: x == 4}).apply(df) a b 1 1 4 3 5 11
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def Scale(self, scaler, columns=None, exclude_columns=None, joint=False, **kwargs)
-
Creates and adds a pipeline stage that scales data to this pipeline stage.
Parameters
scaler
:str
- The type of scaler to use to scale the data. One of 'StandardScaler', 'MinMaxScaler', 'MaxAbsScaler', 'RobustScaler', 'QuantileTransformer' and 'Normalizer'. Refer to scikit-learn's documentation for usage.
columns
:single label, list-like
orcallable
, defaultNone
- Column labels in the DataFrame to be scaled. If columns is None then
all columns of numeric dtype will be scaled, except those given in the
exclude_columns parameter. Alternatively, this parameter can be
assigned a callable returning an iterable of labels from an input
pandas.DataFrame. See
pdpipe.cq
. exclude_columns
:single label, list-like
orcallable
, defaultNone
- Label or labels of columns to be excluded from encoding. Alternatively,
this parameter can be assigned a callable returning an iterable of
labels from an input pandas.DataFrame. See
pdpipe.cq
. joint
:bool
, defaultFalse
- If set to True, all scaled columns will be scaled as a single value set (meaning, only the single largest value among all input columns will be scaled to 1, and not the largest one for each column).
**kwargs
:extra keyword arguments
- All valid extra keyword arguments are forwarded to the scaler constructor on scaler creation (e.g. 'n_quantiles' for QuantileTransformer). PdPipelineStage valid keyword arguments are used to override Scale class defaults.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> data = [[3.2, 0.3], [7.2, 0.35], [12.1, 0.29]] >>> df = pd.DataFrame(data, [1,2,3], ["ph","gt"]) >>> scale_stage = pdp.Scale("StandardScaler") >>> scale_stage(df) ph gt 1 -1.181449 -0.508001 2 -0.082427 1.397001 3 1.263876 -0.889001
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def Schematize(self, columns, **kwargs)
-
Enforces a column schema on input dataframes to this pipeline stage.
Parameters
columns
:sequence
oflabels
- The dataframe schema to enforce on input dataframes.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[2, 4, 8],[3, 6, 9]], [1, 2], ['a', 'b', 'c']) >>> pdp.Schematize(['a', 'c']).apply(df) a c 1 2 8 2 3 9 >>> pdp.Schematize(['c', 'b']).apply(df) c b 1 8 4 2 9 6
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def SetIndex(self, keys, **kwargs)
-
Creates and adds a pipeline stage that set existing columns as index to this pipeline stage.
Supports all parameter supported by pandas.set_index function except for
inplace
.Example
>> import pandas as pd; import pdpipe as pdp; >> df = pd.DataFrame([[1,4],[3, 11]], [1,2], ['a','b']) >> pdp.SetIndex('a').apply(df) b a 1 4 3 11
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def SnowballStem(self, stemmer_name, columns, drop=True, min_len=None, max_len=None, **kwargs)
-
Creates and adds a pipeline stage that stems tokens in a list using the Snowball stemmer to this pipeline stage.
Target columns must be series of token lists; i.e. every cell in the series is an iterable of string tokens.
Note: The nltk package must be installed for this pipeline stage to work.
Parameters
stemmer_name
:str
- The name of the Snowball stemmer to use. Should be one of the Snowball stemmers implemented by nltk. E.g. 'EnglishStemmer'.
columns
:single label, list-like
orcallable
- Column labels in the DataFrame to be transformed. Alternatively, this
parameter can be assigned a callable returning an iterable of labels
from an input pandas.DataFrame. See
pdpipe.cq
. drop
:bool
, defaultTrue
- If set to True, the source columns are dropped after stemming, and the resulting columns retain the names of the source columns. Otherwise, resulting columns gain the suffix '_stem'.
min_len
:int
, optional- If provided, tokens shorter than this length are not stemmed.
max_len
:int
, optional- If provided, tokens longer than this length are not stemmed.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> data = [[3.2, ['kicking', 'boats']]] >>> df = pd.DataFrame(data, [1], ['freq', 'content']) >>> remove_stopwords = pdp.SnowballStem('EnglishStemmer', 'content') >>> remove_stopwords(df) freq content 1 3.2 [kick, boat]
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def TfidfVectorizeTokenLists(self, column, drop=True, hierarchical_labels=False, **kwargs)
-
Creates and adds a pipeline stage TFIDF-vectorizing a token-list column to count columns to this pipeline stage.
Every cell in the input columns is assumed to be a list of strings, each representing a single token. The resulting TF-IDF vector is exploded into individual columns, each with the label 'lbl_i' where lbl is the original column label and i is the index of column in the count vector.
The resulting columns are concatenated to the end of the dataframe.
All valid sklearn.feature_extraction.text.TfidfVectorizer keyword arguments can be provided as keyword arguments to the constructor, except 'input' and 'analyzer', which will be ignored. As usual, all valid PdPipelineStage constructor parameters can also be provided as keyword arguments.
Parameters
column
:str
- The label of the token-list column to TfIdf-vectorize.
drop
:bool
, defaultTrue
- If set to True, the source column is dropped after being transformed.
hierarchical_labels
:bool
, defaultFalse
- If set to True, the labels of resulting columns are of the form 'P_F' where P is the label of the original token-list column and F is the feature name (i.e. the string token it corresponds to). Otherwise, it is simply the feature name itself. If you plan to have two different TfidfVectorizeTokenLists pipeline stages vectorizing two different token-list columns, you should set this to true, so tf-idf features originating in different text columns do not overwrite one another.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> data = [[2, ['hovercraft', 'eels']], [5, ['eels', 'urethra']]] >>> df = pd.DataFrame(data, [1, 2], ['Age', 'tokens']) >>> tfvectorizer = pdp.TfidfVectorizeTokenLists('tokens') >>> tfvectorizer(df) Age eels hovercraft urethra 1 2 0.579739 0.814802 0.000000 2 5 0.579739 0.000000 0.814802
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def TokenizeText(self, columns, drop=True, **kwargs)
-
Creates and adds a pipeline stage that tokenizes a text column into token lists to this pipeline stage.
Note: The nltk package must be installed for this pipeline stage to work.
Parameters
columns
:single label, list-like
ofcallable
- Column labels in the DataFrame to be transformed. Alternatively, this
parameter can be assigned a callable returning an iterable of labels
from an input pandas.DataFrame. See
pdpipe.cq
. drop
:bool
, defaultTrue
- If set to True, the source columns are dropped after being tokenized, and the resulting tokenized columns retain the names of the source columns. Otherwise, tokenized columns gain the suffix '_tok'.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame( ... [[3.2, "Kick the baby!"]], [1], ['freq', 'content']) >>> tokenize_stage = pdp.TokenizeText('content') >>> tokenize_stage(df) freq content 1 3.2 [Kick, the, baby, !]
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def UntokenizeText(self, columns, drop=True, **kwargs)
-
Creates and adds a pipeline stage that joins token lists to whitespace-separated strings to this pipeline stage.
Target columns must be series of token lists; i.e. every cell in the series is an iterable of string tokens.
Note: The nltk package must be installed for this pipeline stage to work.
Parameters
columns
:single label, list-like
ofcallable
- Column labels in the DataFrame to be transformed. Alternatively, this
parameter can be assigned a callable returning an iterable of labels
from an input pandas.DataFrame. See
pdpipe.cq
. drop
:bool
, defaultTrue
- If set to True, the source columns are dropped after being untokenized, and the resulting columns retain the names of the source columns. Otherwise, untokenized columns gain the suffix '_untok'.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> data = [[3.2, ['Shake', 'and', 'bake!']]] >>> df = pd.DataFrame(data, [1], ['freq', 'content']) >>> untokenize_stage = pdp.UntokenizeText('content') >>> untokenize_stage(df) freq content 1 3.2 Shake and bake!
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def ValDrop(self, values: List[object], columns: Union[object, List[object], Callable] = None, **kwargs: object)
-
Creates and adds a pipeline stage that drops rows by value to this pipeline stage.
Parameters
values
:list-like
- A list of the values to drop.
columns
:single label, list-like
orcallable
, defaultNone
- The label, or an iterable of labels, of columns to check for the given
values. Alternatively, this parameter can be assigned a callable
returning an iterable of labels from an input pandas.DataFrame. See
pdpipe.cq
. If set to None, all columns are checked. exclude_columns
:label, iterable
orcallable
, optional- The label, or an iterable of labels, of columns to exclude, given the
columns
parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. Seepdpipe.cq
. Optional. By default no columns are excluded.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1,4],[4,5],[18,11]], [1,2,3], ['a','b']) >>> pdp.ValDrop([4], 'a').apply(df) a b 1 1 4 3 18 11 >>> pdp.ValDrop([4]).apply(df) a b 3 18 11
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def ValKeep(self, values, columns=None, **kwargs)
-
Creates and adds a pipeline stage that keeps rows by value to this pipeline stage.
Parameters
values
:list-like
- A list of the values to keep.
columns
:single label, list-like
orcallable
, defaultNone
- The label, or an iterable of labels, of columns to check for the given
values. Alternatively, this parameter can be assigned a callable
returning an iterable of labels from an input pandas.DataFrame. See
pdpipe.cq
. If set to None, all columns are checked. exclude_columns
:single label, iterable
orcallable
, optional- The label, or an iterable of labels, of columns to exclude, given the
columns
parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. Seepdpipe.cq
. Optional. By default no columns are excluded.
Example
>>> import pandas as pd; import pdpipe as pdp; >>> df = pd.DataFrame([[1,4],[4,5],[5,11]], [1,2,3], ['a','b']) >>> pdp.ValKeep([4, 5], 'a').apply(df) a b 2 4 5 3 5 11 >>> pdp.ValKeep([4, 5]).apply(df) a b 2 4 5
Expand source code
def _append_stage_func(self, *args, **kwds): # self is always a PdPipelineStage return self + class_obj(*args, **kwds)
def apply(self, df, exraise=None, verbose=False)
-
Applies this pipeline stage to the given dataframe.
If the stage is not fitted fit_transform is called. Otherwise, transform is called.
Parameters
df
:pandas.DataFrame
- The dataframe to which this pipeline stage will be applied.
exraise
:bool
, defaultNone
- Override preconditions and postconditions behaviour for this call. If None, the default behaviour of this stage is used, as determined by the exraise constructor parameter.
verbose
:bool
, defaultFalse
- If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False.
Returns
pandas.DataFrame
- The resulting dataframe.
Expand source code
def apply(self, df, exraise=None, verbose=False): """Applies this pipeline stage to the given dataframe. If the stage is not fitted fit_transform is called. Otherwise, transform is called. Parameters ---------- df : pandas.DataFrame The dataframe to which this pipeline stage will be applied. exraise : bool, default None Override preconditions and postconditions behaviour for this call. If None, the default behaviour of this stage is used, as determined by the exraise constructor parameter. verbose : bool, default False If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False. Returns ------- pandas.DataFrame The resulting dataframe. """ if exraise is None: exraise = self._exraise if self._skip and self._skip(df): return df if self._compound_prec(df=df): if verbose: msg = '- ' + '\n '.join(textwrap.wrap(self._appmsg)) print(msg, flush=True) if self.is_fitted: res_df = self._transform(df, verbose=verbose) else: res_df = self._fit_transform(df, verbose=verbose) if exraise and not self._compound_post(df=res_df): self._raise_postcondition_error() return res_df if exraise: self._raise_precondition_error() return df
def description(self)
-
Returns the description of this pipeline stage
Expand source code
def description(self): """Returns the description of this pipeline stage""" return self._desc
def fit(self, X, y=None, exraise=None, verbose=False)
-
Fits this stage without transforming the given dataframe.
Parameters
X
:pandas.DataFrame
- The dataframe to be transformed.
y
:array-like
, optional- Targets for supervised learning.
exraise
:bool
, defaultNone
- Override preconditions and postconditions behaviour for this call. If None, the default behaviour of this stage is used, as determined by the exraise constructor parameter.
verbose
:bool
, defaultFalse
- If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False.
Returns
pandas.DataFrame
- The resulting dataframe.
Expand source code
def fit(self, X, y=None, exraise=None, verbose=False): """Fits this stage without transforming the given dataframe. Parameters ---------- X : pandas.DataFrame The dataframe to be transformed. y : array-like, optional Targets for supervised learning. exraise : bool, default None Override preconditions and postconditions behaviour for this call. If None, the default behaviour of this stage is used, as determined by the exraise constructor parameter. verbose : bool, default False If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False. Returns ------- pandas.DataFrame The resulting dataframe. """ if exraise is None: exraise = self._exraise if self._compound_prec(X): if verbose: msg = '- ' + '\n '.join(textwrap.wrap(self._appmsg)) print(msg, flush=True) res_df = self._fit_transform(X, verbose=verbose) if exraise and not self._compound_post(df=res_df): self._raise_postcondition_error() return X if exraise: self._raise_precondition_error() return X
def fit_transform(self, X, y=None, exraise=None, verbose=False)
-
Fits this stage and transforms the given dataframe.
Parameters
X
:pandas.DataFrame
- The dataframe to transform and fit this pipeline stage by.
y
:array-like
, optional- Targets for supervised learning.
exraise
:bool
, defaultNone
- Override preconditions and postconditions behaviour for this call. If None, the default behaviour of this stage is used, as determined by the exraise constructor parameter.
verbose
:bool
, defaultFalse
- If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False.
Returns
pandas.DataFrame
- The resulting dataframe.
Expand source code
def fit_transform(self, X, y=None, exraise=None, verbose=False): """Fits this stage and transforms the given dataframe. Parameters ---------- X : pandas.DataFrame The dataframe to transform and fit this pipeline stage by. y : array-like, optional Targets for supervised learning. exraise : bool, default None Override preconditions and postconditions behaviour for this call. If None, the default behaviour of this stage is used, as determined by the exraise constructor parameter. verbose : bool, default False If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False. Returns ------- pandas.DataFrame The resulting dataframe. """ if exraise is None: exraise = self._exraise if self._compound_prec(X): if verbose: msg = '- ' + '\n '.join(textwrap.wrap(self._appmsg)) print(msg, flush=True) res_df = self._fit_transform(X, verbose=verbose) if exraise and not self._compound_post(df=res_df): self._raise_postcondition_error() return res_df if exraise: self._raise_precondition_error() return X
def transform(self, X, y=None, exraise=None, verbose=False)
-
Transforms the given dataframe without fitting this stage.
If this stage is fittable but is not fitter, an UnfittedPipelineStageError is raised.
Parameters
X
:pandas.DataFrame
- The dataframe to be transformed.
y
:array-like
, optional- Targets for supervised learning.
exraise
:bool
, defaultNone
- Override preconditions and postconditions behaviour for this call. If None, the default behaviour of this stage is used, as determined by the exraise constructor parameter.
verbose
:bool
, defaultFalse
- If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False.
Returns
pandas.DataFrame
- The resulting dataframe.
Expand source code
def transform(self, X, y=None, exraise=None, verbose=False): """Transforms the given dataframe without fitting this stage. If this stage is fittable but is not fitter, an UnfittedPipelineStageError is raised. Parameters ---------- X : pandas.DataFrame The dataframe to be transformed. y : array-like, optional Targets for supervised learning. exraise : bool, default None Override preconditions and postconditions behaviour for this call. If None, the default behaviour of this stage is used, as determined by the exraise constructor parameter. verbose : bool, default False If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False. Returns ------- pandas.DataFrame The resulting dataframe. """ if exraise is None: exraise = self._exraise if self._compound_prec(X): if verbose: msg = '- ' + '\n '.join(textwrap.wrap(self._appmsg)) print(msg, flush=True) if self._is_fittable(): if self.is_fitted: res_df = self._transform(X, verbose=verbose) if exraise and not self._compound_post(df=res_df): self._raise_postcondition_error() return res_df raise UnfittedPipelineStageError( "transform of an unfitted pipeline stage was called!") res_df = self._transform(X, verbose=verbose) if exraise and not self._compound_post(df=res_df): self._raise_postcondition_error() return res_df if exraise: self._raise_precondition_error() return X
class PdpApplicationContext (fit_context=None)
-
An object encapsulating the application context of a pipeline.
It is meant to communicate data, information and variables between different stages of a pipeline.
Parameters
fit_context
:PdpApplicationContext
, optional- Another application context object, representing the application context of a previous fit of the pipelline this application context is initialized for. Optional.
Expand source code
class PdpApplicationContext(dict): """An object encapsulating the application context of a pipeline. It is meant to communicate data, information and variables between different stages of a pipeline. Parameters ---------- fit_context : PdpApplicationContext, optional Another application context object, representing the application context of a previous fit of the pipelline this application context is initialized for. Optional. """ def __init__(self, fit_context=None): self.__locked__ = False self._fit_context__ = fit_context def __setitem__(self, key, value): if not self.__locked__: super().__setitem__(key, value) def __delitem__(self, key): if not self.__locked__: super().__delitem__(key) def pop(self, key, default): """If key is in the dictionary, remove it and return its value, else return default. If default is not given and key is not in the dictionary, a KeyError is raised. """ if not self.__locked__: return super().pop(key, default) return super().__getitem__(key) def clear(self): """Remove all items from the dictionary.""" if not self.__locked__: super().clear() def popitem(self): """Not implemented!""" raise NotImplementedError def update(self, other): """Update the dictionary with the key/value pairs from other, overwriting existing keys. Return None. update() accepts either another dictionary object or an iterable of key/value pairs (as tuples or other iterables of length two). If keyword arguments are specified, the dictionary is then updated with those key/value pairs: d.update(red=1, blue=2). """ if not self.__locked__: super().update(other) def lock(self): """Locks this application context for changes.""" self.__locked__ = True def fit_context(self): """Returns a locked PdpApplicationContext object of a previous fit.""" return self._fit_context__
Ancestors
- builtins.dict
Methods
def clear(self)
-
Remove all items from the dictionary.
Expand source code
def clear(self): """Remove all items from the dictionary.""" if not self.__locked__: super().clear()
def fit_context(self)
-
Returns a locked PdpApplicationContext object of a previous fit.
Expand source code
def fit_context(self): """Returns a locked PdpApplicationContext object of a previous fit.""" return self._fit_context__
def lock(self)
-
Locks this application context for changes.
Expand source code
def lock(self): """Locks this application context for changes.""" self.__locked__ = True
def pop(self, key, default)
-
If key is in the dictionary, remove it and return its value, else return default. If default is not given and key is not in the dictionary, a KeyError is raised.
Expand source code
def pop(self, key, default): """If key is in the dictionary, remove it and return its value, else return default. If default is not given and key is not in the dictionary, a KeyError is raised. """ if not self.__locked__: return super().pop(key, default) return super().__getitem__(key)
def popitem(self)
-
Not implemented!
Expand source code
def popitem(self): """Not implemented!""" raise NotImplementedError
def update(self, other)
-
Update the dictionary with the key/value pairs from other, overwriting existing keys. Return None. update() accepts either another dictionary object or an iterable of key/value pairs (as tuples or other iterables of length two). If keyword arguments are specified, the dictionary is then updated with those key/value pairs: d.update(red=1, blue=2).
Expand source code
def update(self, other): """Update the dictionary with the key/value pairs from other, overwriting existing keys. Return None. update() accepts either another dictionary object or an iterable of key/value pairs (as tuples or other iterables of length two). If keyword arguments are specified, the dictionary is then updated with those key/value pairs: d.update(red=1, blue=2). """ if not self.__locked__: super().update(other)