Module pdpipe.core

Defines pipelines for processing pandas.DataFrame-based datasets.

>>> import pdpipe as pdp
>>> pipeline = pdp.ColDrop('Name') + pdp.Bin({'Speed': [0,5]})
>>> pipeline = pdp.ColDrop('Name').Bin({'Speed': [0,5]}, drop=True)

Creating pipeline stages that operate on column subsets

Many pipeline stages in pdpipe operate on a subset of columns, allowing the caller to determine this subset by either providing a fixed set of column labels or by providing a callable that determines the column subset dynamically from input dataframes. The pdpipe.cq module addresses a unique but important use case of fittable column qualifier, which is to dynamically extract a column subset on stage fit time, but keep it fixed for future transformations.

As a general rule, every pipeline stage in pdpipe that supports the columns parameter should inherently support fittable column qualifier, and generally the correct interpretation of both single and multiple labels as arguments. To unify the implementation of such functionality, and ease of creation of new pipeline stages, such columns should be created by extending the ColumnsBasedPipelineStage base class, found in this module (pdpipe.core).

The main interface of sub-classes of this base class with it is through the columns, exclude_columns and none_columns constructor arguments, and the "private" _get_columns(df, fit) method:

  • Any extending subclass should accept the columns constructor parameter and forward it, without transforming it, to the constructor of ColumnsBasedPipelineStage. E.g. super().__init__(columns=columns, **kwargs). See the implementation of any such extending class for a more complete example.

  • Extending subclasses can decide if they want to expose the exclude_columns parameter or not. Note that most of its functionality can anyway be gained by providing the columns parameter with a column qualifier object that is a difference between two column qualifiers; e.g. columns=cq.OfDtype(np.number) - cq.OfDtype(np.int64) is equivalent to providing columns=cq.OfDtype(np.number), exclude_columns=cq.OfDtype(np.int64). However, exposing the exclude_columns parameter can allow for specific unique behaviours; for example, if the none_columns parameter - which configures the behavior when columns is provided with None - is set with a cq.OfDtypes('category') column qualifier, which means that all categorical columns are selected when columns=None, then exposing exclude_columns allows for easy specification of the "all categorical columns except X" by just giving a column qualifier capturing X to exclude_columns, instead of having to reconstruct the default column qualifier by hand and substract from it the one representing X.

  • When wishing to get the subset of columns to operate on, in fit_transform or transform time, it is attained by calling self._get_columns(df, fit=True) (or with fit=False if just transforming), providing it the input dataframe.

  • Additionally, to get a description and application message with a nice string representation of the list of columns to operate on, the desc_temp constructor parameter of ColumnsBasedPipelineStage can be provided with a format string with a place holder where the column list should go. E.g. "Drop columns {}" for the DropCol pipeline stage.

There are two correct ways to extend it, depending on whether the pipeline stage you're creating is inherently fittable or not:

  1. If the stage is NOT inherently fittable, then the ability to accept fittable column qualifier objects makes it so. However, to enable extending subclasses to implement their transformation using a single method, they can simply implement the abstract method _transformation(self, df, verbose, fit). It should treat the df and verbose parameters normally, but forward the fit parameter to the _get_columns method when calling it. This is enough to get a pipeline stage with the desired behavior, with the super-class handling all the fit/transform functionality.

  2. If the stage IS inherently fittable, then do not use the _transformation abstract method (it has to be implemented, so just have it raise a NotImplementedError). Instead, simply override the _fit_transform and _transform method of ColumnsBasedPipelineStage, calling the fit parameter of the _get_columns method with the correct arguement: True when fit-transforming and False when transforming.

Again, taking a look at the VERY concise implementation of simple columns-based stages, like ColDrop or ValDrop in pdpipe.basic_stages, will probably make things clearer, and you can use those implementations as a template for yours.

Expand source code
"""Defines pipelines for processing pandas.DataFrame-based datasets.

>>> import pdpipe as pdp
>>> pipeline = pdp.ColDrop('Name') + pdp.Bin({'Speed': [0,5]})
>>> pipeline = pdp.ColDrop('Name').Bin({'Speed': [0,5]}, drop=True)

## Creating pipeline stages that operate on column subsets

Many pipeline stages in pdpipe operate on a subset of columns, allowing the
caller to determine this subset by either providing a fixed set of column
labels or by providing a callable that determines the column subset dynamically
from input dataframes. The `pdpipe.cq` module addresses a unique but important
use case of fittable column qualifier, which is to dynamically extract a column
subset on stage fit time, but keep it fixed for future transformations.

As a general rule, every pipeline stage in pdpipe that supports the `columns`
parameter should inherently support fittable column qualifier, and generally
the correct interpretation of both single and multiple labels as arguments. To
unify the implementation of such functionality, and ease of creation of new
pipeline stages, such columns should be created by extending the
ColumnsBasedPipelineStage base class, found in this module (`pdpipe.core`).

The main interface of sub-classes of this base class with it is through the
`columns`, `exclude_columns` and `none_columns` constructor arguments, and the
"private" `_get_columns(df, fit)` method:

* Any extending subclass should accept the `columns` constructor parameter
  and forward it, without transforming it, to the constructor of
  ColumnsBasedPipelineStage. E.g.
  `super().__init__(columns=columns, **kwargs)`. See the implementation of
  any such extending class for a more complete example.

* Extending subclasses can decide if they want to expose the
  `exclude_columns` parameter or not. Note that most of its functionality
  can anyway be gained by providing the `columns` parameter with a column
  qualifier object that is a difference between two column qualifiers; e.g.
  `columns=cq.OfDtype(np.number) - cq.OfDtype(np.int64)` is equivalent to
  providing `columns=cq.OfDtype(np.number),
  exclude_columns=cq.OfDtype(np.int64)`. However, exposing the
  `exclude_columns` parameter can allow for specific unique behaviours; for
  example, if the `none_columns` parameter - which configures the behavior
  when `columns` is provided with `None` - is set with
  a `cq.OfDtypes('category')` column qualifier, which means that all
  categorical columns are selected when `columns=None`, then exposing
  `exclude_columns` allows for easy specification of the "all categorical
  columns except X" by just giving a column qualifier capturing X to
  `exclude_columns`, instead of having to reconstruct the default column
  qualifier by hand and substract from it the one representing X.

* When wishing to get the subset of columns to operate on, in
  `fit_transform` or `transform` time, it is attained by calling
  `self._get_columns(df, fit=True)` (or with `fit=False` if just
  transforming), providing it the input dataframe.

* Additionally, to get a description and application message with a nice
  string representation of the list of columns to operate on, the
  `desc_temp` constructor parameter of ColumnsBasedPipelineStage can be
  provided with a format string with a place holder where the column list
  should go. E.g. `"Drop columns {}"` for the DropCol pipeline stage.

There are two correct ways to extend it, depending on whether the pipeline
stage you're creating is inherently fittable or not:

1. If the stage is NOT inherently fittable, then the ability to accept
   fittable column qualifier objects makes it so. However, to enable
   extending subclasses to implement their transformation using a single
   method, they can simply implement the abstract method
   `_transformation(self, df, verbose, fit)`. It should treat the `df` and
   `verbose` parameters normally, but forward the `fit` parameter to the
   `_get_columns` method when calling it. This is enough to get a pipeline
   stage with the desired behavior, with the super-class handling all the
   fit/transform functionality.

2. If the stage IS inherently fittable, then do not use the
   `_transformation` abstract method (it has to be implemented, so just
   have it raise a NotImplementedError). Instead, simply override the
   `_fit_transform` and `_transform` method of ColumnsBasedPipelineStage,
   calling the `fit` parameter of the `_get_columns` method with the
   correct arguement: `True` when fit-transforming and `False` when
   transforming.

Again, taking a look at the VERY concise implementation of simple columns-based
stages, like ColDrop or ValDrop in `pdpipe.basic_stages`, will probably make
things clearer, and you can use those implementations as a template for yours.
"""

import sys
import abc
import time
import inspect
import collections
import textwrap

try:
    from pympler.asizeof import asizeof
except ImportError:
    from sys import getsizeof as asizeof

from .cq import is_fittable_column_qualifier, AllColumns
from .shared import _get_args_list
from .exceptions import (
    FailedPreconditionError,
    FailedPostconditionError,
    UnfittedPipelineStageError,
    PipelineApplicationError
)


# === loading stage attributes ===

def __get_append_stage_attr_doc(class_obj):
    doc = class_obj.__doc__
    first_line = doc[0:doc.find('.') + 1]
    if "An" in first_line:
        new_first_line = first_line.replace("An", "Creates and adds an", 1)
    else:
        new_first_line = first_line.replace("A", "Creates and adds a", 1)
    new_first_line = new_first_line[0:-1] + (
        " to this pipeline stage.")
    return doc.replace(first_line, new_first_line, 1)


def __load_stage_attribute__(class_obj):

    def _append_stage_func(self, *args, **kwds):
        # self is always a PdPipelineStage
        return self + class_obj(*args, **kwds)
    _append_stage_func.__doc__ = __get_append_stage_attr_doc(class_obj)
    _append_stage_func.__name__ = class_obj.__name__  # .lower()
    _append_stage_func.__signature__ = inspect.signature(class_obj.__init__)
    setattr(PdPipelineStage, class_obj.__name__, _append_stage_func)

    # unbound_method = types.MethodType(_append_stage_func, class_obj)
    # setattr(class_obj, class_obj.__name__, unbound_method)


def __load_stage_attributes_from_module__(module_name):
    module_obj = sys.modules[module_name]
    for name, obj in inspect.getmembers(module_obj):
        if inspect.isclass(obj) and obj.__module__ == module_name:
            class_obj = getattr(module_obj, name)
            if issubclass(class_obj, PdPipelineStage) and (
                    class_obj.__name__ != 'PdPipelineStage'):
                __load_stage_attribute__(class_obj)


# === basic classes ===

class PdpApplicationContext(dict):
    """An object encapsulating the application context of a pipeline.

    It is meant to communicate data, information and variables between
    different stages of a pipeline.

    Parameters
    ----------
    fit_context : PdpApplicationContext, optional
        Another application context object, representing the application
        context of a previous fit of the pipelline this application context
        is initialized for. Optional.
    """

    def __init__(self, fit_context=None):
        self.__locked__ = False
        self._fit_context__ = fit_context

    def __setitem__(self, key, value):
        if not self.__locked__:
            super().__setitem__(key, value)

    def __delitem__(self, key):
        if not self.__locked__:
            super().__delitem__(key)

    def pop(self, key, default):
        """If key is in the dictionary, remove it and return its value, else
        return default. If default is not given and key is not in the
        dictionary, a KeyError is raised.
        """
        if not self.__locked__:
            return super().pop(key, default)
        return super().__getitem__(key)

    def clear(self):
        """Remove all items from the dictionary."""
        if not self.__locked__:
            super().clear()

    def popitem(self):
        """Not implemented!"""
        raise NotImplementedError

    def update(self, other):
        """Update the dictionary with the key/value pairs from other,
        overwriting existing keys. Return None.
        update() accepts either another dictionary object or an iterable of
        key/value pairs (as tuples or other iterables of length two). If
        keyword arguments are specified, the dictionary is then updated with
        those key/value pairs: d.update(red=1, blue=2).
        """
        if not self.__locked__:
            super().update(other)

    def lock(self):
        """Locks this application context for changes."""
        self.__locked__ = True

    def fit_context(self):
        """Returns a locked PdpApplicationContext object of a previous fit."""
        return self._fit_context__


class PdPipelineStage(abc.ABC):
    """A stage of a pandas DataFrame-processing pipeline.

    Parameters
    ----------
    exraise : bool, default True
        If true, a pdpipe.FailedPreconditionError is raised when this
        stage is applied to a dataframe for which the precondition does
        not hold. Otherwise the stage is skipped. Additionally, if true, a
        pdpipe.FailedPostconditionError is raised if an expected post-condition
        does not hold for an output dataframe (after pipeline application).
        Otherwise pipeline application continues uninterrupted.
    exmsg : str, default None
        The message of the exception that is raised on a failed
        precondition if exraise is set to True. A default message is used
        if None is given.
    desc : str, default None
        A short description of this stage, used as its string representation.
        A default description is used if None is given.
    prec : callable, default None
        This can be assigned a callable that returns boolean values for input
        dataframes, which will be used to determine whether input dataframes
        satisfy the preconditions for this pipeline stage (see the `exraise`
        parameter for the behaviour of failed preconditions). See `pdpipe.cond`
        for more information on specialised Condition objects.
    post : callable, default None
        This can be assigned a callable that returns boolean values for input
        dataframes, which will be used to determine whether input dataframes
        satisfy the postconditions for this pipeline stage (see the `exraise`
        parameter for the behaviour of failed postconditions). See
        `pdpipe.cond` for more information on specialised Condition objects.
    skip : callable, default None
        This can be assigned a callable that returns boolean values for input
        dataframes, which will be used to determine whether this stage should
        be skipped for input dataframes - if the callable returns True for an
        input dataframe, this stage will be skipped. See `pdpipe.cond` for more
        information on specialised Condition objects.
    name : str, default ''
        The name of this stage. Pipelines can be sliced by this name.

    Attributes
    ----------
    fit_context : `PdpApplicationContext`
        An application context object that is only re-initialized before
        `fit_transform` calls, and is locked after pipeline application. It is
        injected into the PipelineStage by the encapsulating pipeline object.
    application_context : `PdpApplicationContext`
        An application context object that is re-initialized before every
        pipeline application (so, also during transform operations of fitted
        pipelines), and is locked after pipeline application.It is injected
        into the PipelineStage by the encapsulating pipeline object.
    """

    _DEF_EXC_MSG = 'Precondition failed in stage {}!'
    _DEF_DESCRIPTION = 'A pipeline stage.'
    _INIT_KWARGS = ['exraise', 'exmsg', 'desc', 'prec', 'skip', 'name']

    def __init__(self, exraise=True, exmsg=None, desc=None, prec=None,
                 post=None, skip=None, name=''):
        if not isinstance(name, str):
            raise ValueError(
                f"'name' must be a str, not {type(name).__name__}."
            )
        if desc is None:
            desc = PdPipelineStage._DEF_DESCRIPTION
        if exmsg is None:
            exmsg = PdPipelineStage._DEF_EXC_MSG.format(desc)

        self._exraise = exraise
        self._exmsg = exmsg
        self._exmsg_post = exmsg.replace(
            'precondition', 'postcondition').replace(
            'Precondition', 'Postcondition')
        self._desc = desc
        self._prec_arg = prec
        self._post_arg = post
        self._skip = skip
        self._appmsg = f"{name + ': ' if name else ''}{desc}"
        self._name = name
        self.fit_context: PdpApplicationContext = None
        self.application_context: PdpApplicationContext = None
        self.is_fitted = False

    @classmethod
    def _init_kwargs(cls):
        return cls._INIT_KWARGS

    @abc.abstractmethod
    def _prec(self, df):  # pylint: disable=R0201,W0613
        """Returns True if this stage can be applied to the given dataframe."""
        raise NotImplementedError

    def _compound_prec(self, df):
        if self._prec_arg:
            return self._prec_arg(df)
        return self._prec(df)

    def _post(self, df):  # pylint: disable=R0201,W0613
        """Returns True if this stage resulted in an expected output frame."""
        return True

    def _compound_post(self, df):
        if self._post_arg:
            return self._post_arg(df)
        return self._post(df)

    def _fit_transform(self, df, verbose):
        """Fits this stage and transforms the input dataframe."""
        return self._transform(df, verbose)

    def _is_fittable(self):
        if self.__class__._fit_transform == PdPipelineStage._fit_transform:
            return False
        return True

    def _raise_precondition_error(self):
        try:
            raise FailedPreconditionError(
                f"{self._exmsg} [Reason] {self._prec_arg.error_message}")
        except AttributeError:
            raise FailedPreconditionError(self._exmsg)

    def _raise_postcondition_error(self):
        try:
            raise FailedPostconditionError(
                f"{self._exmsg_post} [Reason] {self._post_arg.error_message}")
        except AttributeError:
            raise FailedPostconditionError(self._exmsg_post)

    @abc.abstractmethod
    def _transform(self, df, verbose):
        """Transforms the given dataframe without fitting this stage."""
        raise NotImplementedError("_transform method not implemented!")

    def apply(self, df, exraise=None, verbose=False):
        """Applies this pipeline stage to the given dataframe.

        If the stage is not fitted fit_transform is called. Otherwise,
        transform is called.

        Parameters
        ----------
        df : pandas.DataFrame
            The dataframe to which this pipeline stage will be applied.
        exraise : bool, default None
            Override preconditions and postconditions behaviour for this call.
            If None, the default behaviour of this stage is used, as determined
            by the exraise constructor parameter.
        verbose : bool, default False
            If True an explanation message is printed after the precondition
            is checked but before the application of the pipeline stage.
            Defaults to False.

        Returns
        -------
        pandas.DataFrame
            The resulting dataframe.
        """
        if exraise is None:
            exraise = self._exraise
        if self._skip and self._skip(df):
            return df
        if self._compound_prec(df=df):
            if verbose:
                msg = '- ' + '\n  '.join(textwrap.wrap(self._appmsg))
                print(msg, flush=True)
            if self.is_fitted:
                res_df = self._transform(df, verbose=verbose)
            else:
                res_df = self._fit_transform(df, verbose=verbose)
            if exraise and not self._compound_post(df=res_df):
                self._raise_postcondition_error()
            return res_df
        if exraise:
            self._raise_precondition_error()
        return df

    __call__ = apply

    def fit_transform(self, X, y=None, exraise=None, verbose=False):
        """Fits this stage and transforms the given dataframe.

        Parameters
        ----------
        X : pandas.DataFrame
            The dataframe to transform and fit this pipeline stage by.
        y : array-like, optional
            Targets for supervised learning.
        exraise : bool, default None
            Override preconditions and postconditions behaviour for this call.
            If None, the default behaviour of this stage is used, as determined
            by the exraise constructor parameter.
        verbose : bool, default False
            If True an explanation message is printed after the precondition
            is checked but before the application of the pipeline stage.
            Defaults to False.

        Returns
        -------
        pandas.DataFrame
            The resulting dataframe.
        """
        if exraise is None:
            exraise = self._exraise
        if self._compound_prec(X):
            if verbose:
                msg = '- ' + '\n  '.join(textwrap.wrap(self._appmsg))
                print(msg, flush=True)
            res_df = self._fit_transform(X, verbose=verbose)
            if exraise and not self._compound_post(df=res_df):
                self._raise_postcondition_error()
            return res_df
        if exraise:
            self._raise_precondition_error()
        return X

    def fit(self, X, y=None, exraise=None, verbose=False):
        """Fits this stage without transforming the given dataframe.

        Parameters
        ----------
        X : pandas.DataFrame
            The dataframe to be transformed.
        y : array-like, optional
            Targets for supervised learning.
        exraise : bool, default None
            Override preconditions and postconditions behaviour for this call.
            If None, the default behaviour of this stage is used, as determined
            by the exraise constructor parameter.
        verbose : bool, default False
            If True an explanation message is printed after the precondition
            is checked but before the application of the pipeline stage.
            Defaults to False.

        Returns
        -------
        pandas.DataFrame
            The resulting dataframe.
        """
        if exraise is None:
            exraise = self._exraise
        if self._compound_prec(X):
            if verbose:
                msg = '- ' + '\n  '.join(textwrap.wrap(self._appmsg))
                print(msg, flush=True)
            res_df = self._fit_transform(X, verbose=verbose)
            if exraise and not self._compound_post(df=res_df):
                self._raise_postcondition_error()
            return X
        if exraise:
            self._raise_precondition_error()
        return X

    def transform(self, X, y=None, exraise=None, verbose=False):
        """Transforms the given dataframe without fitting this stage.

        If this stage is fittable but is not fitter, an
        UnfittedPipelineStageError is raised.

        Parameters
        ----------
        X : pandas.DataFrame
            The dataframe to be transformed.
        y : array-like, optional
            Targets for supervised learning.
        exraise : bool, default None
            Override preconditions and postconditions behaviour for this call.
            If None, the default behaviour of this stage is used, as determined
            by the exraise constructor parameter.
        verbose : bool, default False
            If True an explanation message is printed after the precondition
            is checked but before the application of the pipeline stage.
            Defaults to False.

        Returns
        -------
        pandas.DataFrame
            The resulting dataframe.
        """
        if exraise is None:
            exraise = self._exraise
        if self._compound_prec(X):
            if verbose:
                msg = '- ' + '\n  '.join(textwrap.wrap(self._appmsg))
                print(msg, flush=True)
            if self._is_fittable():
                if self.is_fitted:
                    res_df = self._transform(X, verbose=verbose)
                    if exraise and not self._compound_post(df=res_df):
                        self._raise_postcondition_error()
                    return res_df
                raise UnfittedPipelineStageError(
                    "transform of an unfitted pipeline stage was called!")
            res_df = self._transform(X, verbose=verbose)
            if exraise and not self._compound_post(df=res_df):
                self._raise_postcondition_error()
            return res_df
        if exraise:
            self._raise_precondition_error()
        return X

    def __add__(self, other):
        if isinstance(other, PdPipeline):
            return PdPipeline([self, *other._stages])
        if isinstance(other, PdPipelineStage):
            return PdPipeline([self, other])
        return NotImplemented

    def __str__(self):
        return f"PdPipelineStage: {self._desc}"

    def __repr__(self):
        return self.__str__()

    def description(self):
        """Returns the description of this pipeline stage"""
        return self._desc

    def _mem_str(self):
        total = asizeof(self)
        lines = []
        for a in dir(self):
            if not a.startswith('__'):
                att = getattr(self, a)
                if not callable(att):
                    size = asizeof(att)
                    if size > 500000:  # pragma: no cover
                        lines.append('  - {}, {:.2f}Mb ({:0>5.2f}%)\n'.format(
                            a, size / 1000000, 100 * size / total))
                    elif size > 1000:  # pragma: no cover
                        lines.append('  - {}, {:.2f}Kb ({:0>5.2f}%)\n'.format(
                            a, size / 1000, 100 * size / total))
                    else:
                        lines.append('  - {}, {}b ({:0>5.2f}%)\n'.format(
                            a, size, 100 * size / total))
        return ''.join(lines)


class ColumnsBasedPipelineStage(PdPipelineStage):
    """A pipeline stage that operates on a subset of dataframe columns.

    Parameters
    ---------
    columns : single label, iterable or callable
        The label, or an iterable of labels, of columns to use. Alternatively,
        this parameter can be assigned a callable returning an iterable of
        labels from an input pandas.DataFrame. See `pdpipe.cq`.
    exclude_columns : single label, iterable or callable, optional
        The label, or an iterable of labels, of columns to exclude, given the
        `columns` parameter. Alternatively, this parameter can be assigned a
        callable returning a labels iterable from an input pandas.DataFrame.
        See `pdpipe.cq`. Optional. By default no columns are excluded.
    desc_temp : str, optional
        If given, assumed to be a format string, and every appearance of {} in
        it is replaced with an appropriate string representation of the columns
        parameter, and is used as the pipeline description. Ignored if `desc`
        is provided.
    none_columns : iterable, callable or str, default 'error'
        Determines how None values supplied to the 'columns' parameter should
        be handled. If set to 'error', the default, a ValueError is raised if
        None is encountered. If set to 'all', it is interpreted to mean all
        columns of input dataframes should be operated on. If an iterable is
        provided it is interpreted as the default list of columns to operate on
        when `columns=None`. If a callable is provided, it is interpreted as
        the default column qualifier that determines input columns when
        `columns=None`.
    **kwargs
        Additionally supports all constructor parameters of PdPipelineStage.
    """

    @staticmethod
    def _interpret_columns_param(columns, none_error=False, none_columns=None):
        """Interprets the value provided to the columns parameter and returns
        a list version of it - if needed - a string representation of it.
        """
        if columns is None:
            if none_error:
                raise ValueError((
                    'None is not a valid argument for the columns parameter of'
                    ' this pipeline stage.'))
            return ColumnsBasedPipelineStage._interpret_columns_param(
                columns=none_columns)
        if isinstance(columns, str):
            # always check str first, because it has __iter__
            return [columns], columns
        if callable(columns):
            # if isinstance(columns, ColumnQualifier):
            #     return columns, columns.__repr__() or ''
            return columns, columns.__doc__ or ''
        # if it was a single string it was already made a list, and it's not a
        # callable, so it's either an iterable of labels... or
        if hasattr(columns, '__iter__'):
            return columns, ', '.join(str(elem) for elem in columns)
        # a single non-string label.
        return [columns], str(columns)

    def __init__(
            self, columns, exclude_columns=None, desc_temp=None,
            none_columns='error', **kwargs):
        self._exclude_columns = exclude_columns
        if exclude_columns:
            self._exclude_columns = self._interpret_columns_param(
                exclude_columns)
        self._none_error = False
        self._none_cols = None
        # handle none_columns
        if isinstance(none_columns, str):
            if none_columns == 'error':
                self._none_error = True
            elif none_columns == 'all':
                self._none_cols = AllColumns()
            else:
                raise ValueError((
                    "'error' and 'all' are the only valid string arguments"
                    " to the none_columns constructor parameter!"))
        elif hasattr(none_columns, '__iter__'):
            self._none_cols = none_columns
        elif callable(none_columns):
            self._none_cols = none_columns
        else:
            raise ValueError((
                "Valid arguments to the none_columns constructor parameter"
                " are 'error', 'all', an iterable of labels or a callable!"
            ))
        # done handling none_columns
        self._col_arg, self._col_str = self._interpret_columns_param(
            columns, self._none_error, none_columns=self._none_cols)
        if (kwargs.get('desc') is None) and desc_temp:
            kwargs['desc'] = desc_temp.format(self._col_str)
        if kwargs.get('exmsg') is None:
            kwargs['exmsg'] = (
                'Pipeline stage failed because not all columns {} '
                'were found in the input dataframe.'
            ).format(self._col_str)
        super().__init__(**kwargs)

    def _is_fittable(self):
        return is_fittable_column_qualifier(self._col_arg)

    @staticmethod
    def __get_cols_by_arg(col_arg, df, fit=False):
        try:
            if fit:
                # try to treat col_arg as a fittable column qualifier
                return col_arg.fit_transform(df)
            # else, no need to fit, so try to treat _col_arg as a callable
            return col_arg(df)
        except AttributeError:
            # got here cause col_arg has no fit_transform method...
            try:
                # so try and treat it as a callable again
                return col_arg(df)
            except TypeError:
                # calling col_arg 2 lines above failed; its a list of labels
                return col_arg
        except TypeError:
            # calling _col_arg 10 lines above failed; its a list of labels
            return col_arg

    def _get_columns(self, df, fit=False):
        cols = ColumnsBasedPipelineStage.__get_cols_by_arg(
            self._col_arg, df, fit=fit)
        if self._exclude_columns:
            exc_cols = ColumnsBasedPipelineStage.__get_cols_by_arg(
                self._exclude_columns, df, fit=fit)
            return [x for x in cols if x not in exc_cols]
        return cols

    def _prec(self, df):
        return set(self._get_columns(df=df)).issubset(df.columns)

    @abc.abstractmethod
    def _transformation(self, df, verbose, fit):
        raise NotImplementedError((
            "Classes extending ColumnsBasedPipelineStage must implement the "
            "_transformation method!"))

    def _fit_transform(self, df, verbose):
        self.is_fitted = True
        return self._transformation(df, verbose, fit=True)

    def _transform(self, df, verbose):
        return self._transformation(df, verbose, fit=False)


def _always_true(x):
    return True


class AdHocStage(PdPipelineStage):
    """An ad-hoc stage of a pandas DataFrame-processing pipeline.

    The signature for both the `transform` and the optional `fit_transform`
    callables is adaptive: The first argument is used positionally (so no
    specific name is assumed or used) to supply the callable with the pandas
    DataFrame object to transform. The following additional keyword arguments
    are supplied if the are included in the callable's signature:
    `verbose` - Passed on from PdPipelineStage's `fit`, `fit_transform`
    and `apply` methods.

    `fit_context` and `application_context` - Provides fit-specific and
    application-specific contexts (see `PdpApplicationContext`) usually
    available to pipeline stages using `self.fit_context` and
    `self.application_context`.

    Parameters
    ----------
    transform : callable
        The transformation this stage applies to dataframes. If the
        fit_transform parameter is also populated than this transformation is
        only applied on calls to transform. See documentation for the exact
        signature.
    fit_transform : callable, optional
        The transformation this stage applies to dataframes, only on
        fit_transform. Optional. See documentation for the exact signature.
    prec : callable, default None
        A callable that returns a boolean value. Represent a a precondition
        used to determine whether this stage can be applied to a given
        dataframe. If None is given, set to a function always returning True.

    Example
    -------
        >>> import pandas as pd; import pdpipe as pdp;
        >>> df = pd.DataFrame([[1, 'a'], [2, 'b']], [1, 2], ['num', 'char'])
        >>> drop_num = pdp.AdHocStage(
        ...   transform=lambda df: df.drop(['num'], axis=1),
        ...   prec=lambda df: 'num' in df.columns
        ... )
        >>> drop_num.apply(df)
          char
        1    a
        2    b
    """

    def __init__(self, transform, fit_transform=None, prec=None, **kwargs):
        if prec is None:
            prec = _always_true
        self._adhoc_transform = transform
        self._adhoc_fit_transform = fit_transform
        self._adhoc_prec = prec
        self._transform_kwargs = _get_args_list(self._adhoc_transform)
        try:
            self._fit_transform_kwargs = _get_args_list(
                self._adhoc_fit_transform)
        except TypeError:  # fit_transform is None
            self._fit_transform_kwargs = {}
        super().__init__(**kwargs)

    def _prec(self, df):
        return self._adhoc_prec(df)

    def _fit_transform(self, df, verbose):
        self.is_fitted = True
        if self._adhoc_fit_transform is None:
            self.is_fitted = True
            return self._transform(df, verbose)
        kwargs = {
            'verbose': verbose,
            'fit_context': self.fit_context,
            'application_context': self.application_context,
        }
        kwargs = {
            k: v for k, v in kwargs.items() if k in self._fit_transform_kwargs}
        return self._adhoc_fit_transform(df, **kwargs)

    def _transform(self, df, verbose):
        kwargs = {
            'verbose': verbose,
            'fit_context': self.fit_context,
            'application_context': self.application_context,
        }
        kwargs = {
            k: v for k, v in kwargs.items() if k in self._transform_kwargs}
        return self._adhoc_transform(df, **kwargs)


class PdPipeline(PdPipelineStage, collections.abc.Sequence):
    """A pipeline for processing pandas DataFrame objects.

    `transformer_getter` is useful to avoid applying pipeline stages that are
    aimed to filter out items in a big dataset to create a training set for a
    machine learning model, for example, but should not be applied on future
    individual items to be transformed by the fitted pipeline.

    Parameters
    ----------
    stages : list
        A list of PdPipelineStage objects making up this pipeline.
    transform_getter : callable, optional
        A callable that can be applied to the fitted pipeline to produce a
        sub-pipeline of it which should be used to transform dataframes after
        the pipeline has been fitted. If not given, the fitted pipeline is used
        entirely.
    """

    _DEF_EXC_MSG = 'Pipeline precondition failed!'

    def __init__(self, stages, transformer_getter=None, **kwargs):
        self._stages = stages
        self._trans_getter = transformer_getter
        self.is_fitted = False
        super_kwargs = {
            'exraise': False,
            'exmsg': PdPipeline._DEF_EXC_MSG,
        }
        super_kwargs.update(**kwargs)
        super().__init__(**super_kwargs)

    # implementing a collections.abc.Sequence abstract method
    def __getitem__(self, index):
        if isinstance(index, slice):
            return PdPipeline(self._stages[index])

        if isinstance(index, list) and all(isinstance(x, str) for x in index):
            stages = [stage for stage in self._stages if stage._name in index]
            return PdPipeline(stages)

        if isinstance(index, str):
            stages = [stage for stage in self._stages if stage._name == index]
            if len(stages) == 0:
                raise ValueError(f"'{index}' is not exist.")
            return stages[0]

        return self._stages[index]

    # implementing a collections.abc.Sequence abstract method
    def __len__(self):
        return len(self._stages)

    def _prec(self, df):
        # PdPipeline overrides apply in a way which makes this moot
        raise NotImplementedError

    def _post(self, df):
        # PdPipeline overrides apply in a way which makes this moot
        raise NotImplementedError

    def _transform(self, df, verbose):
        # PdPipeline overrides apply in a way which makes this moot
        raise NotImplementedError

    def _post_transform_lock(self):
        self.application_context.lock()
        self.fit_context.lock()

    def apply(self, df, exraise=None, verbose=False, time=False):
        """Applies this pipeline stage to the given dataframe.

        If the stage is not fitted fit_transform is called. Otherwise,
        transform is called.

        Parameters
        ----------
        df : pandas.DataFrame
            The dataframe to which this pipeline stage will be applied.
        exraise : bool, default None
            Determines behaviour if the precondition of composing stages is not
            fulfilled by the input dataframe: If True, a
            pdpipe.FailedPreconditionError is raised. If False, the stage is
            skipped. If not given, or set to None, the default behaviour of
            each stage is used, as determined by its 'exraise' constructor
            parameter.
        verbose : bool, default False
            If True an explanation message is printed after the precondition
            is checked but before the application of the pipeline stage.
            Defaults to False.
        time : bool, default False
            If True, per-stage application time is measured and reported when
            pipeline application is done.

        Returns
        -------
        pandas.DataFrame
            The resulting dataframe.
        """
        self.application_context = PdpApplicationContext()
        if self.is_fitted:
            res = self.transform(
                X=df,
                exraise=exraise,
                verbose=verbose,
                time=time
            )
            self._post_transform_lock()
            return res
        self.fit_context = PdpApplicationContext()
        res = self.fit_transform(
            X=df,
            exraise=exraise,
            verbose=verbose,
            time=time
        )
        self._post_transform_lock()
        return res

    def __timed_fit_transform(self, X, y=None, exraise=None, verbose=None):
        self.application_context = PdpApplicationContext()
        self.fit_context = PdpApplicationContext()
        inter_x = X
        times = []
        prev = time.time()
        for i, stage in enumerate(self._stages):
            try:
                stage.fit_context = self.fit_context
                stage.application_context = self.application_context
                inter_x = stage.fit_transform(
                    X=inter_x,
                    y=None,
                    exraise=exraise,
                    verbose=verbose,
                )
                now = time.time()
                times.append(now - prev)
                prev = now
            except Exception as e:
                raise PipelineApplicationError(
                    f"Exception raised in stage [ {i}] {stage}"
                ) from e
        self.is_fitted = True
        print("\nPipeline total application time: {:.3f}s.\n Details:".format(
            sum(times)))
        print(self.__times_str__(times))
        self._post_transform_lock()
        return inter_x

    def fit_transform(self, X, y=None, exraise=None, verbose=None, time=False):
        """Fits this pipeline and transforms the input dataframe.

        Parameters
        ----------
        X : pandas.DataFrame
            The dataframe to transform and fit this pipeline by.
        y : array-like, optional
            Targets for supervised learning.
        exraise : bool, default None
            Determines behaviour if the precondition of composing stages is not
            fulfilled by the input dataframe: If True, a
            pdpipe.FailedPreconditionError is raised. If False, the stage is
            skipped. If not given, or set to None, the default behaviour of
            each stage is used, as determined by its 'exraise' constructor
            parameter.
        verbose : bool, default False
            If True an explanation message is printed after the precondition
            of each stage is checked but before its application. Otherwise, no
            messages are printed.
        time : bool, default False
            If True, per-stage application time is measured and reported when
            pipeline application is done.

        Returns
        -------
        pandas.DataFrame
            The resulting dataframe.
        """
        if time:
            return self.__timed_fit_transform(
                X=X, y=y, exraise=exraise, verbose=verbose)
        inter_x = X
        self.application_context = PdpApplicationContext()
        self.fit_context = PdpApplicationContext()
        for i, stage in enumerate(self._stages):
            try:
                stage.fit_context = self.fit_context
                stage.application_context = self.application_context
                inter_x = stage.fit_transform(
                    X=inter_x,
                    y=None,
                    exraise=exraise,
                    verbose=verbose,
                )
            except Exception as e:
                raise PipelineApplicationError(
                    f"Exception raised in stage [ {i}] {stage}"
                ) from e
        self._post_transform_lock()
        self.is_fitted = True
        return inter_x

    def fit(self, X, y=None, exraise=None, verbose=None, time=None):
        """Fits this pipeline without transforming the input dataframe.

        Parameters
        ----------
        X : pandas.DataFrame
            The dataframe to fit this pipeline by.
        y : array-like, optional
            Targets for supervised learning.
        exraise : bool, default None
            Determines behaviour if the precondition of composing stages is not
            fulfilled by the input dataframe: If True, a
            pdpipe.FailedPreconditionError is raised. If False, the stage is
            skipped. If not given, or set to None, the default behaviour of
            each stage is used, as determined by its 'exraise' constructor
            parameter.
        verbose : bool, default False
            If True an explanation message is printed after the precondition
            of each stage is checked but before its application. Otherwise, no
            messages are printed.
        time : bool, default False
            If True, per-stage application time is measured and reported when
            pipeline application is done.

        Returns
        -------
        pandas.DataFrame
            The input dataframe, unchanged.
        """
        self.fit_transform(
            X=X,
            y=None,
            exraise=exraise,
            verbose=verbose,
            time=time,
        )
        return X

    def __timed_transform(self, X, y=None, exraise=None, verbose=None):
        inter_x = X
        times = []
        prev = time.time()
        self.application_context = PdpApplicationContext()
        self.fit_context = PdpApplicationContext()
        for i, stage in enumerate(self._stages):
            try:
                stage.fit_context = self.fit_context
                stage.application_context = self.application_context
                inter_x = stage.transform(
                    X=inter_x,
                    y=None,
                    exraise=exraise,
                    verbose=verbose,
                )
                now = time.time()
                times.append(now - prev)
                prev = now
            except Exception as e:
                raise PipelineApplicationError(
                    f"Exception raised in stage [ {i}] {stage}"
                ) from e
        self.is_fitted = True
        print("\nPipeline total application time: {:.3f}s.\n Details:".format(
            sum(times)))
        print(self.__times_str__(times))
        self._post_transform_lock()
        return inter_x

    def transform(self, X, y=None, exraise=None, verbose=None, time=False):
        """Transforms the given dataframe without fitting this pipeline.

        If any stage in this pipeline is fittable but is not fitted, an
        UnfittedPipelineStageError is raised before transformation starts.

        Parameters
        ----------
        X : pandas.DataFrame
            The dataframe to transform.
        y : array-like, optional
            Targets for supervised learning.
        exraise : bool, default None
            Determines behaviour if the precondition of composing stages is not
            fulfilled by the input dataframe: If True, a
            pdpipe.FailedPreconditionError is raised. If False, the stage is
            skipped. If not given, or set to None, the default behaviour of
            each stage is used, as determined by its 'exraise' constructor
            parameter.
        verbose : bool, default False
            If True an explanation message is printed after the precondition
            of each stage is checked but before its application. Otherwise, no
            messages are printed.
        time : bool, default False
            If True, per-stage application time is measured and reported when
            pipeline application is done.

        Returns
        -------
        pandas.DataFrame
            The resulting dataframe.
        """
        for stage in self._stages:
            if stage._is_fittable() and not stage.is_fitted:
                raise UnfittedPipelineStageError((
                    "PipelineStage {} in pipeline is fittable but"
                    " unfitted!").format(stage))
        if time:
            return self.__timed_transform(
                X=X, y=y, exraise=exraise, verbose=verbose)
        inter_df = X
        self.application_context = PdpApplicationContext()
        for i, stage in enumerate(self._stages):
            try:
                stage.application_context = self.application_context
                inter_df = stage.transform(
                    X=inter_df,
                    y=None,
                    exraise=exraise,
                    verbose=verbose,
                )
            except Exception as e:
                raise PipelineApplicationError(
                    f"Exception raised in stage [ {i}] {stage}"
                ) from e
        self._post_transform_lock()
        return inter_df

    __call__ = apply

    def __add__(self, other):
        if isinstance(other, PdPipeline):
            return PdPipeline([*self._stages, *other._stages])
        if isinstance(other, PdPipelineStage):
            return PdPipeline([*self._stages, other])
        return NotImplemented

    def __times_str__(self, times):
        res = "A pdpipe pipeline:\n"
        stime = sum(times)
        if stime > 0:  # pragma: no cover
            percentages = [100 * x / stime for x in times]
        else:  # pragma: no cover
            percentages = [0 for x in times]
        res += '[ 0] [{:0>5.2f}s ({:0>5.2f}%)]  '.format(
            times[0], percentages[0]
        ) + "\n      ".join(
            textwrap.wrap(self._stages[0].description())
        ) + '\n'
        for i, stage in enumerate(self._stages[1:]):
            res += '[{:>2}] [{:0>5.2f}s ({:0>5.2f}%)]  '.format(
                i + 1, times[i + 1], percentages[i + 1]
            ) + "\n      ".join(
                textwrap.wrap(stage.description())
            ) + '\n'
        return res

    def __str__(self):
        res = "A pdpipe pipeline:\n"
        res += '[ 0]  ' + "\n      ".join(
            textwrap.wrap(self._stages[0].description())) + '\n'
        for i, stage in enumerate(self._stages[1:]):
            res += '[{:>2}]  '.format(i + 1) + "\n      ".join(
                textwrap.wrap(stage.description())) + '\n'
        return res

    def _mem_str(self, total):
        total = asizeof(self)
        lines = []
        for i, stage in enumerate(self._stages):
            size = asizeof(stage)
            if size > 500000:  # pragma: no cover
                lines.append('[{:>2}] {:.2f}Mb ({:0>5.2f}%), {}\n'.format(
                    i, size / 1000000, 100 * size / total,
                    stage.description()))
            elif size > 1000:  # pragma: no cover
                lines.append('[{:>2}] {:.2f}Kb ({:0>5.2f}%), {}\n'.format(
                    i, size / 1000, 100 * size / total, stage.description()))
            else:
                lines.append('[{:>2}] {:}b ({:0>5.2f}%), {}\n'.format(
                    i, size, 100 * size / total, stage.description()))
            lines.append(stage._mem_str())
        return ''.join(lines)

    def memory_report(self):
        """Prints a detailed memory report of the pipeline object to screen.

        To get better memory estimates make sure the pympler Python package is
        installed. Without it, sys.getsizeof is used, which can be extremely
        underestimate memory size of Python objects.
        """
        print("=== Pipeline memory report ===")
        size = asizeof(self)
        if size > 500000:  # pragma: no cover
            print("Total pipeline size in memory: {:.2f}Mb".format(
                size / 1000000))
        elif size > 1000:  # pragma: no cover
            print("Total pipeline size in memory: {:.2f}Kb".format(
                size / 1000))
        else:
            print("Total pipeline size in memory: {:.2f}b".format(
                size))
        print("Per-stage memory structure:")
        print(self._mem_str(total=size))

    def get_transformer(self):
        """Return the transformer induced by this fitted pipeline.

           This transformer is a `pdpipe` pipeline that transforms input data
           in a way corresponding to this pipline after it has been fitted. By
           default this is the pipeline itself, but the `transform_getter`
           constructor parameter can be used to return a sub-pipeline of the
           fitted pipeline instead, for cases where some stages should only be
           applied when fitting this pipeline to data.

        Returns
        -------
        pdpipe.PdPipeline
            The corresponding transformer pipeline induced by this pipeline.
        """
        try:
            return self._trans_getter(self)
        except TypeError:  # pragma: no cover
            return self

    # def drop(self, index):
    #     """Returns this pipeline with the stage of the given index removed.
    #     Arguments
    #     ---------
    #     index


def make_pdpipeline(*stages):
    """Constructs a PdPipeline from the given pipeline stages.

    Parameters
    ----------
    *stages : pdpipe.PipelineStage objects
       PdPipeline stages given as positional arguments.

    Returns
    -------
    p : pdpipe.PdPipeline
        The resulting pipeline.

    Examples
    --------
        >>> import pdpipe as pdp
        >>> p = make_pdpipeline(pdp.ColDrop('count'), pdp.DropDuplicates())
    """
    return PdPipeline(stages=stages)

Functions

def make_pdpipeline(*stages)

Constructs a PdPipeline from the given pipeline stages.

Parameters

*stages : pdpipe.PipelineStage objects
 

PdPipeline stages given as positional arguments.

Returns

p : pdpipe.PdPipeline
The resulting pipeline.

Examples

>>> import pdpipe as pdp
>>> p = make_pdpipeline(pdp.ColDrop('count'), pdp.DropDuplicates())
Expand source code
def make_pdpipeline(*stages):
    """Constructs a PdPipeline from the given pipeline stages.

    Parameters
    ----------
    *stages : pdpipe.PipelineStage objects
       PdPipeline stages given as positional arguments.

    Returns
    -------
    p : pdpipe.PdPipeline
        The resulting pipeline.

    Examples
    --------
        >>> import pdpipe as pdp
        >>> p = make_pdpipeline(pdp.ColDrop('count'), pdp.DropDuplicates())
    """
    return PdPipeline(stages=stages)

Classes

class AdHocStage (transform, fit_transform=None, prec=None, **kwargs)

An ad-hoc stage of a pandas DataFrame-processing pipeline.

The signature for both the transform and the optional fit_transform callables is adaptive: The first argument is used positionally (so no specific name is assumed or used) to supply the callable with the pandas DataFrame object to transform. The following additional keyword arguments are supplied if the are included in the callable's signature: verbose - Passed on from PdPipelineStage's fit, fit_transform and apply methods.

fit_context and application_context - Provides fit-specific and application-specific contexts (see PdpApplicationContext) usually available to pipeline stages using self.fit_context and self.application_context.

Parameters

transform : callable
The transformation this stage applies to dataframes. If the fit_transform parameter is also populated than this transformation is only applied on calls to transform. See documentation for the exact signature.
fit_transform : callable, optional
The transformation this stage applies to dataframes, only on fit_transform. Optional. See documentation for the exact signature.
prec : callable, default None
A callable that returns a boolean value. Represent a a precondition used to determine whether this stage can be applied to a given dataframe. If None is given, set to a function always returning True.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1, 'a'], [2, 'b']], [1, 2], ['num', 'char'])
>>> drop_num = pdp.AdHocStage(
...   transform=lambda df: df.drop(['num'], axis=1),
...   prec=lambda df: 'num' in df.columns
... )
>>> drop_num.apply(df)
  char
1    a
2    b
Expand source code
class AdHocStage(PdPipelineStage):
    """An ad-hoc stage of a pandas DataFrame-processing pipeline.

    The signature for both the `transform` and the optional `fit_transform`
    callables is adaptive: The first argument is used positionally (so no
    specific name is assumed or used) to supply the callable with the pandas
    DataFrame object to transform. The following additional keyword arguments
    are supplied if the are included in the callable's signature:
    `verbose` - Passed on from PdPipelineStage's `fit`, `fit_transform`
    and `apply` methods.

    `fit_context` and `application_context` - Provides fit-specific and
    application-specific contexts (see `PdpApplicationContext`) usually
    available to pipeline stages using `self.fit_context` and
    `self.application_context`.

    Parameters
    ----------
    transform : callable
        The transformation this stage applies to dataframes. If the
        fit_transform parameter is also populated than this transformation is
        only applied on calls to transform. See documentation for the exact
        signature.
    fit_transform : callable, optional
        The transformation this stage applies to dataframes, only on
        fit_transform. Optional. See documentation for the exact signature.
    prec : callable, default None
        A callable that returns a boolean value. Represent a a precondition
        used to determine whether this stage can be applied to a given
        dataframe. If None is given, set to a function always returning True.

    Example
    -------
        >>> import pandas as pd; import pdpipe as pdp;
        >>> df = pd.DataFrame([[1, 'a'], [2, 'b']], [1, 2], ['num', 'char'])
        >>> drop_num = pdp.AdHocStage(
        ...   transform=lambda df: df.drop(['num'], axis=1),
        ...   prec=lambda df: 'num' in df.columns
        ... )
        >>> drop_num.apply(df)
          char
        1    a
        2    b
    """

    def __init__(self, transform, fit_transform=None, prec=None, **kwargs):
        if prec is None:
            prec = _always_true
        self._adhoc_transform = transform
        self._adhoc_fit_transform = fit_transform
        self._adhoc_prec = prec
        self._transform_kwargs = _get_args_list(self._adhoc_transform)
        try:
            self._fit_transform_kwargs = _get_args_list(
                self._adhoc_fit_transform)
        except TypeError:  # fit_transform is None
            self._fit_transform_kwargs = {}
        super().__init__(**kwargs)

    def _prec(self, df):
        return self._adhoc_prec(df)

    def _fit_transform(self, df, verbose):
        self.is_fitted = True
        if self._adhoc_fit_transform is None:
            self.is_fitted = True
            return self._transform(df, verbose)
        kwargs = {
            'verbose': verbose,
            'fit_context': self.fit_context,
            'application_context': self.application_context,
        }
        kwargs = {
            k: v for k, v in kwargs.items() if k in self._fit_transform_kwargs}
        return self._adhoc_fit_transform(df, **kwargs)

    def _transform(self, df, verbose):
        kwargs = {
            'verbose': verbose,
            'fit_context': self.fit_context,
            'application_context': self.application_context,
        }
        kwargs = {
            k: v for k, v in kwargs.items() if k in self._transform_kwargs}
        return self._adhoc_transform(df, **kwargs)

Ancestors

Inherited members

class ColumnsBasedPipelineStage (columns, exclude_columns=None, desc_temp=None, none_columns='error', **kwargs)

A pipeline stage that operates on a subset of dataframe columns.

Parameters

columns : single label, iterable or callable
The label, or an iterable of labels, of columns to use. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq.
exclude_columns : single label, iterable or callable, optional
The label, or an iterable of labels, of columns to exclude, given the columns parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. See pdpipe.cq. Optional. By default no columns are excluded.
desc_temp : str, optional
If given, assumed to be a format string, and every appearance of {} in it is replaced with an appropriate string representation of the columns parameter, and is used as the pipeline description. Ignored if desc is provided.
none_columns : iterable, callable or str, default 'error'
Determines how None values supplied to the 'columns' parameter should be handled. If set to 'error', the default, a ValueError is raised if None is encountered. If set to 'all', it is interpreted to mean all columns of input dataframes should be operated on. If an iterable is provided it is interpreted as the default list of columns to operate on when columns=None. If a callable is provided, it is interpreted as the default column qualifier that determines input columns when columns=None.
**kwargs
Additionally supports all constructor parameters of PdPipelineStage.
Expand source code
class ColumnsBasedPipelineStage(PdPipelineStage):
    """A pipeline stage that operates on a subset of dataframe columns.

    Parameters
    ---------
    columns : single label, iterable or callable
        The label, or an iterable of labels, of columns to use. Alternatively,
        this parameter can be assigned a callable returning an iterable of
        labels from an input pandas.DataFrame. See `pdpipe.cq`.
    exclude_columns : single label, iterable or callable, optional
        The label, or an iterable of labels, of columns to exclude, given the
        `columns` parameter. Alternatively, this parameter can be assigned a
        callable returning a labels iterable from an input pandas.DataFrame.
        See `pdpipe.cq`. Optional. By default no columns are excluded.
    desc_temp : str, optional
        If given, assumed to be a format string, and every appearance of {} in
        it is replaced with an appropriate string representation of the columns
        parameter, and is used as the pipeline description. Ignored if `desc`
        is provided.
    none_columns : iterable, callable or str, default 'error'
        Determines how None values supplied to the 'columns' parameter should
        be handled. If set to 'error', the default, a ValueError is raised if
        None is encountered. If set to 'all', it is interpreted to mean all
        columns of input dataframes should be operated on. If an iterable is
        provided it is interpreted as the default list of columns to operate on
        when `columns=None`. If a callable is provided, it is interpreted as
        the default column qualifier that determines input columns when
        `columns=None`.
    **kwargs
        Additionally supports all constructor parameters of PdPipelineStage.
    """

    @staticmethod
    def _interpret_columns_param(columns, none_error=False, none_columns=None):
        """Interprets the value provided to the columns parameter and returns
        a list version of it - if needed - a string representation of it.
        """
        if columns is None:
            if none_error:
                raise ValueError((
                    'None is not a valid argument for the columns parameter of'
                    ' this pipeline stage.'))
            return ColumnsBasedPipelineStage._interpret_columns_param(
                columns=none_columns)
        if isinstance(columns, str):
            # always check str first, because it has __iter__
            return [columns], columns
        if callable(columns):
            # if isinstance(columns, ColumnQualifier):
            #     return columns, columns.__repr__() or ''
            return columns, columns.__doc__ or ''
        # if it was a single string it was already made a list, and it's not a
        # callable, so it's either an iterable of labels... or
        if hasattr(columns, '__iter__'):
            return columns, ', '.join(str(elem) for elem in columns)
        # a single non-string label.
        return [columns], str(columns)

    def __init__(
            self, columns, exclude_columns=None, desc_temp=None,
            none_columns='error', **kwargs):
        self._exclude_columns = exclude_columns
        if exclude_columns:
            self._exclude_columns = self._interpret_columns_param(
                exclude_columns)
        self._none_error = False
        self._none_cols = None
        # handle none_columns
        if isinstance(none_columns, str):
            if none_columns == 'error':
                self._none_error = True
            elif none_columns == 'all':
                self._none_cols = AllColumns()
            else:
                raise ValueError((
                    "'error' and 'all' are the only valid string arguments"
                    " to the none_columns constructor parameter!"))
        elif hasattr(none_columns, '__iter__'):
            self._none_cols = none_columns
        elif callable(none_columns):
            self._none_cols = none_columns
        else:
            raise ValueError((
                "Valid arguments to the none_columns constructor parameter"
                " are 'error', 'all', an iterable of labels or a callable!"
            ))
        # done handling none_columns
        self._col_arg, self._col_str = self._interpret_columns_param(
            columns, self._none_error, none_columns=self._none_cols)
        if (kwargs.get('desc') is None) and desc_temp:
            kwargs['desc'] = desc_temp.format(self._col_str)
        if kwargs.get('exmsg') is None:
            kwargs['exmsg'] = (
                'Pipeline stage failed because not all columns {} '
                'were found in the input dataframe.'
            ).format(self._col_str)
        super().__init__(**kwargs)

    def _is_fittable(self):
        return is_fittable_column_qualifier(self._col_arg)

    @staticmethod
    def __get_cols_by_arg(col_arg, df, fit=False):
        try:
            if fit:
                # try to treat col_arg as a fittable column qualifier
                return col_arg.fit_transform(df)
            # else, no need to fit, so try to treat _col_arg as a callable
            return col_arg(df)
        except AttributeError:
            # got here cause col_arg has no fit_transform method...
            try:
                # so try and treat it as a callable again
                return col_arg(df)
            except TypeError:
                # calling col_arg 2 lines above failed; its a list of labels
                return col_arg
        except TypeError:
            # calling _col_arg 10 lines above failed; its a list of labels
            return col_arg

    def _get_columns(self, df, fit=False):
        cols = ColumnsBasedPipelineStage.__get_cols_by_arg(
            self._col_arg, df, fit=fit)
        if self._exclude_columns:
            exc_cols = ColumnsBasedPipelineStage.__get_cols_by_arg(
                self._exclude_columns, df, fit=fit)
            return [x for x in cols if x not in exc_cols]
        return cols

    def _prec(self, df):
        return set(self._get_columns(df=df)).issubset(df.columns)

    @abc.abstractmethod
    def _transformation(self, df, verbose, fit):
        raise NotImplementedError((
            "Classes extending ColumnsBasedPipelineStage must implement the "
            "_transformation method!"))

    def _fit_transform(self, df, verbose):
        self.is_fitted = True
        return self._transformation(df, verbose, fit=True)

    def _transform(self, df, verbose):
        return self._transformation(df, verbose, fit=False)

Ancestors

Subclasses

Inherited members

class PdPipeline (stages, transformer_getter=None, **kwargs)

A pipeline for processing pandas DataFrame objects.

transformer_getter is useful to avoid applying pipeline stages that are aimed to filter out items in a big dataset to create a training set for a machine learning model, for example, but should not be applied on future individual items to be transformed by the fitted pipeline.

Parameters

stages : list
A list of PdPipelineStage objects making up this pipeline.
transform_getter : callable, optional
A callable that can be applied to the fitted pipeline to produce a sub-pipeline of it which should be used to transform dataframes after the pipeline has been fitted. If not given, the fitted pipeline is used entirely.
Expand source code
class PdPipeline(PdPipelineStage, collections.abc.Sequence):
    """A pipeline for processing pandas DataFrame objects.

    `transformer_getter` is useful to avoid applying pipeline stages that are
    aimed to filter out items in a big dataset to create a training set for a
    machine learning model, for example, but should not be applied on future
    individual items to be transformed by the fitted pipeline.

    Parameters
    ----------
    stages : list
        A list of PdPipelineStage objects making up this pipeline.
    transform_getter : callable, optional
        A callable that can be applied to the fitted pipeline to produce a
        sub-pipeline of it which should be used to transform dataframes after
        the pipeline has been fitted. If not given, the fitted pipeline is used
        entirely.
    """

    _DEF_EXC_MSG = 'Pipeline precondition failed!'

    def __init__(self, stages, transformer_getter=None, **kwargs):
        self._stages = stages
        self._trans_getter = transformer_getter
        self.is_fitted = False
        super_kwargs = {
            'exraise': False,
            'exmsg': PdPipeline._DEF_EXC_MSG,
        }
        super_kwargs.update(**kwargs)
        super().__init__(**super_kwargs)

    # implementing a collections.abc.Sequence abstract method
    def __getitem__(self, index):
        if isinstance(index, slice):
            return PdPipeline(self._stages[index])

        if isinstance(index, list) and all(isinstance(x, str) for x in index):
            stages = [stage for stage in self._stages if stage._name in index]
            return PdPipeline(stages)

        if isinstance(index, str):
            stages = [stage for stage in self._stages if stage._name == index]
            if len(stages) == 0:
                raise ValueError(f"'{index}' is not exist.")
            return stages[0]

        return self._stages[index]

    # implementing a collections.abc.Sequence abstract method
    def __len__(self):
        return len(self._stages)

    def _prec(self, df):
        # PdPipeline overrides apply in a way which makes this moot
        raise NotImplementedError

    def _post(self, df):
        # PdPipeline overrides apply in a way which makes this moot
        raise NotImplementedError

    def _transform(self, df, verbose):
        # PdPipeline overrides apply in a way which makes this moot
        raise NotImplementedError

    def _post_transform_lock(self):
        self.application_context.lock()
        self.fit_context.lock()

    def apply(self, df, exraise=None, verbose=False, time=False):
        """Applies this pipeline stage to the given dataframe.

        If the stage is not fitted fit_transform is called. Otherwise,
        transform is called.

        Parameters
        ----------
        df : pandas.DataFrame
            The dataframe to which this pipeline stage will be applied.
        exraise : bool, default None
            Determines behaviour if the precondition of composing stages is not
            fulfilled by the input dataframe: If True, a
            pdpipe.FailedPreconditionError is raised. If False, the stage is
            skipped. If not given, or set to None, the default behaviour of
            each stage is used, as determined by its 'exraise' constructor
            parameter.
        verbose : bool, default False
            If True an explanation message is printed after the precondition
            is checked but before the application of the pipeline stage.
            Defaults to False.
        time : bool, default False
            If True, per-stage application time is measured and reported when
            pipeline application is done.

        Returns
        -------
        pandas.DataFrame
            The resulting dataframe.
        """
        self.application_context = PdpApplicationContext()
        if self.is_fitted:
            res = self.transform(
                X=df,
                exraise=exraise,
                verbose=verbose,
                time=time
            )
            self._post_transform_lock()
            return res
        self.fit_context = PdpApplicationContext()
        res = self.fit_transform(
            X=df,
            exraise=exraise,
            verbose=verbose,
            time=time
        )
        self._post_transform_lock()
        return res

    def __timed_fit_transform(self, X, y=None, exraise=None, verbose=None):
        self.application_context = PdpApplicationContext()
        self.fit_context = PdpApplicationContext()
        inter_x = X
        times = []
        prev = time.time()
        for i, stage in enumerate(self._stages):
            try:
                stage.fit_context = self.fit_context
                stage.application_context = self.application_context
                inter_x = stage.fit_transform(
                    X=inter_x,
                    y=None,
                    exraise=exraise,
                    verbose=verbose,
                )
                now = time.time()
                times.append(now - prev)
                prev = now
            except Exception as e:
                raise PipelineApplicationError(
                    f"Exception raised in stage [ {i}] {stage}"
                ) from e
        self.is_fitted = True
        print("\nPipeline total application time: {:.3f}s.\n Details:".format(
            sum(times)))
        print(self.__times_str__(times))
        self._post_transform_lock()
        return inter_x

    def fit_transform(self, X, y=None, exraise=None, verbose=None, time=False):
        """Fits this pipeline and transforms the input dataframe.

        Parameters
        ----------
        X : pandas.DataFrame
            The dataframe to transform and fit this pipeline by.
        y : array-like, optional
            Targets for supervised learning.
        exraise : bool, default None
            Determines behaviour if the precondition of composing stages is not
            fulfilled by the input dataframe: If True, a
            pdpipe.FailedPreconditionError is raised. If False, the stage is
            skipped. If not given, or set to None, the default behaviour of
            each stage is used, as determined by its 'exraise' constructor
            parameter.
        verbose : bool, default False
            If True an explanation message is printed after the precondition
            of each stage is checked but before its application. Otherwise, no
            messages are printed.
        time : bool, default False
            If True, per-stage application time is measured and reported when
            pipeline application is done.

        Returns
        -------
        pandas.DataFrame
            The resulting dataframe.
        """
        if time:
            return self.__timed_fit_transform(
                X=X, y=y, exraise=exraise, verbose=verbose)
        inter_x = X
        self.application_context = PdpApplicationContext()
        self.fit_context = PdpApplicationContext()
        for i, stage in enumerate(self._stages):
            try:
                stage.fit_context = self.fit_context
                stage.application_context = self.application_context
                inter_x = stage.fit_transform(
                    X=inter_x,
                    y=None,
                    exraise=exraise,
                    verbose=verbose,
                )
            except Exception as e:
                raise PipelineApplicationError(
                    f"Exception raised in stage [ {i}] {stage}"
                ) from e
        self._post_transform_lock()
        self.is_fitted = True
        return inter_x

    def fit(self, X, y=None, exraise=None, verbose=None, time=None):
        """Fits this pipeline without transforming the input dataframe.

        Parameters
        ----------
        X : pandas.DataFrame
            The dataframe to fit this pipeline by.
        y : array-like, optional
            Targets for supervised learning.
        exraise : bool, default None
            Determines behaviour if the precondition of composing stages is not
            fulfilled by the input dataframe: If True, a
            pdpipe.FailedPreconditionError is raised. If False, the stage is
            skipped. If not given, or set to None, the default behaviour of
            each stage is used, as determined by its 'exraise' constructor
            parameter.
        verbose : bool, default False
            If True an explanation message is printed after the precondition
            of each stage is checked but before its application. Otherwise, no
            messages are printed.
        time : bool, default False
            If True, per-stage application time is measured and reported when
            pipeline application is done.

        Returns
        -------
        pandas.DataFrame
            The input dataframe, unchanged.
        """
        self.fit_transform(
            X=X,
            y=None,
            exraise=exraise,
            verbose=verbose,
            time=time,
        )
        return X

    def __timed_transform(self, X, y=None, exraise=None, verbose=None):
        inter_x = X
        times = []
        prev = time.time()
        self.application_context = PdpApplicationContext()
        self.fit_context = PdpApplicationContext()
        for i, stage in enumerate(self._stages):
            try:
                stage.fit_context = self.fit_context
                stage.application_context = self.application_context
                inter_x = stage.transform(
                    X=inter_x,
                    y=None,
                    exraise=exraise,
                    verbose=verbose,
                )
                now = time.time()
                times.append(now - prev)
                prev = now
            except Exception as e:
                raise PipelineApplicationError(
                    f"Exception raised in stage [ {i}] {stage}"
                ) from e
        self.is_fitted = True
        print("\nPipeline total application time: {:.3f}s.\n Details:".format(
            sum(times)))
        print(self.__times_str__(times))
        self._post_transform_lock()
        return inter_x

    def transform(self, X, y=None, exraise=None, verbose=None, time=False):
        """Transforms the given dataframe without fitting this pipeline.

        If any stage in this pipeline is fittable but is not fitted, an
        UnfittedPipelineStageError is raised before transformation starts.

        Parameters
        ----------
        X : pandas.DataFrame
            The dataframe to transform.
        y : array-like, optional
            Targets for supervised learning.
        exraise : bool, default None
            Determines behaviour if the precondition of composing stages is not
            fulfilled by the input dataframe: If True, a
            pdpipe.FailedPreconditionError is raised. If False, the stage is
            skipped. If not given, or set to None, the default behaviour of
            each stage is used, as determined by its 'exraise' constructor
            parameter.
        verbose : bool, default False
            If True an explanation message is printed after the precondition
            of each stage is checked but before its application. Otherwise, no
            messages are printed.
        time : bool, default False
            If True, per-stage application time is measured and reported when
            pipeline application is done.

        Returns
        -------
        pandas.DataFrame
            The resulting dataframe.
        """
        for stage in self._stages:
            if stage._is_fittable() and not stage.is_fitted:
                raise UnfittedPipelineStageError((
                    "PipelineStage {} in pipeline is fittable but"
                    " unfitted!").format(stage))
        if time:
            return self.__timed_transform(
                X=X, y=y, exraise=exraise, verbose=verbose)
        inter_df = X
        self.application_context = PdpApplicationContext()
        for i, stage in enumerate(self._stages):
            try:
                stage.application_context = self.application_context
                inter_df = stage.transform(
                    X=inter_df,
                    y=None,
                    exraise=exraise,
                    verbose=verbose,
                )
            except Exception as e:
                raise PipelineApplicationError(
                    f"Exception raised in stage [ {i}] {stage}"
                ) from e
        self._post_transform_lock()
        return inter_df

    __call__ = apply

    def __add__(self, other):
        if isinstance(other, PdPipeline):
            return PdPipeline([*self._stages, *other._stages])
        if isinstance(other, PdPipelineStage):
            return PdPipeline([*self._stages, other])
        return NotImplemented

    def __times_str__(self, times):
        res = "A pdpipe pipeline:\n"
        stime = sum(times)
        if stime > 0:  # pragma: no cover
            percentages = [100 * x / stime for x in times]
        else:  # pragma: no cover
            percentages = [0 for x in times]
        res += '[ 0] [{:0>5.2f}s ({:0>5.2f}%)]  '.format(
            times[0], percentages[0]
        ) + "\n      ".join(
            textwrap.wrap(self._stages[0].description())
        ) + '\n'
        for i, stage in enumerate(self._stages[1:]):
            res += '[{:>2}] [{:0>5.2f}s ({:0>5.2f}%)]  '.format(
                i + 1, times[i + 1], percentages[i + 1]
            ) + "\n      ".join(
                textwrap.wrap(stage.description())
            ) + '\n'
        return res

    def __str__(self):
        res = "A pdpipe pipeline:\n"
        res += '[ 0]  ' + "\n      ".join(
            textwrap.wrap(self._stages[0].description())) + '\n'
        for i, stage in enumerate(self._stages[1:]):
            res += '[{:>2}]  '.format(i + 1) + "\n      ".join(
                textwrap.wrap(stage.description())) + '\n'
        return res

    def _mem_str(self, total):
        total = asizeof(self)
        lines = []
        for i, stage in enumerate(self._stages):
            size = asizeof(stage)
            if size > 500000:  # pragma: no cover
                lines.append('[{:>2}] {:.2f}Mb ({:0>5.2f}%), {}\n'.format(
                    i, size / 1000000, 100 * size / total,
                    stage.description()))
            elif size > 1000:  # pragma: no cover
                lines.append('[{:>2}] {:.2f}Kb ({:0>5.2f}%), {}\n'.format(
                    i, size / 1000, 100 * size / total, stage.description()))
            else:
                lines.append('[{:>2}] {:}b ({:0>5.2f}%), {}\n'.format(
                    i, size, 100 * size / total, stage.description()))
            lines.append(stage._mem_str())
        return ''.join(lines)

    def memory_report(self):
        """Prints a detailed memory report of the pipeline object to screen.

        To get better memory estimates make sure the pympler Python package is
        installed. Without it, sys.getsizeof is used, which can be extremely
        underestimate memory size of Python objects.
        """
        print("=== Pipeline memory report ===")
        size = asizeof(self)
        if size > 500000:  # pragma: no cover
            print("Total pipeline size in memory: {:.2f}Mb".format(
                size / 1000000))
        elif size > 1000:  # pragma: no cover
            print("Total pipeline size in memory: {:.2f}Kb".format(
                size / 1000))
        else:
            print("Total pipeline size in memory: {:.2f}b".format(
                size))
        print("Per-stage memory structure:")
        print(self._mem_str(total=size))

    def get_transformer(self):
        """Return the transformer induced by this fitted pipeline.

           This transformer is a `pdpipe` pipeline that transforms input data
           in a way corresponding to this pipline after it has been fitted. By
           default this is the pipeline itself, but the `transform_getter`
           constructor parameter can be used to return a sub-pipeline of the
           fitted pipeline instead, for cases where some stages should only be
           applied when fitting this pipeline to data.

        Returns
        -------
        pdpipe.PdPipeline
            The corresponding transformer pipeline induced by this pipeline.
        """
        try:
            return self._trans_getter(self)
        except TypeError:  # pragma: no cover
            return self

Ancestors

  • PdPipelineStage
  • abc.ABC
  • collections.abc.Sequence
  • collections.abc.Reversible
  • collections.abc.Collection
  • collections.abc.Sized
  • collections.abc.Iterable
  • collections.abc.Container

Methods

def apply(self, df, exraise=None, verbose=False, time=False)

Applies this pipeline stage to the given dataframe.

If the stage is not fitted fit_transform is called. Otherwise, transform is called.

Parameters

df : pandas.DataFrame
The dataframe to which this pipeline stage will be applied.
exraise : bool, default None
Determines behaviour if the precondition of composing stages is not fulfilled by the input dataframe: If True, a pdpipe.FailedPreconditionError is raised. If False, the stage is skipped. If not given, or set to None, the default behaviour of each stage is used, as determined by its 'exraise' constructor parameter.
verbose : bool, default False
If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False.
time : bool, default False
If True, per-stage application time is measured and reported when pipeline application is done.

Returns

pandas.DataFrame
The resulting dataframe.
Expand source code
def apply(self, df, exraise=None, verbose=False, time=False):
    """Applies this pipeline stage to the given dataframe.

    If the stage is not fitted fit_transform is called. Otherwise,
    transform is called.

    Parameters
    ----------
    df : pandas.DataFrame
        The dataframe to which this pipeline stage will be applied.
    exraise : bool, default None
        Determines behaviour if the precondition of composing stages is not
        fulfilled by the input dataframe: If True, a
        pdpipe.FailedPreconditionError is raised. If False, the stage is
        skipped. If not given, or set to None, the default behaviour of
        each stage is used, as determined by its 'exraise' constructor
        parameter.
    verbose : bool, default False
        If True an explanation message is printed after the precondition
        is checked but before the application of the pipeline stage.
        Defaults to False.
    time : bool, default False
        If True, per-stage application time is measured and reported when
        pipeline application is done.

    Returns
    -------
    pandas.DataFrame
        The resulting dataframe.
    """
    self.application_context = PdpApplicationContext()
    if self.is_fitted:
        res = self.transform(
            X=df,
            exraise=exraise,
            verbose=verbose,
            time=time
        )
        self._post_transform_lock()
        return res
    self.fit_context = PdpApplicationContext()
    res = self.fit_transform(
        X=df,
        exraise=exraise,
        verbose=verbose,
        time=time
    )
    self._post_transform_lock()
    return res
def fit(self, X, y=None, exraise=None, verbose=None, time=None)

Fits this pipeline without transforming the input dataframe.

Parameters

X : pandas.DataFrame
The dataframe to fit this pipeline by.
y : array-like, optional
Targets for supervised learning.
exraise : bool, default None
Determines behaviour if the precondition of composing stages is not fulfilled by the input dataframe: If True, a pdpipe.FailedPreconditionError is raised. If False, the stage is skipped. If not given, or set to None, the default behaviour of each stage is used, as determined by its 'exraise' constructor parameter.
verbose : bool, default False
If True an explanation message is printed after the precondition of each stage is checked but before its application. Otherwise, no messages are printed.
time : bool, default False
If True, per-stage application time is measured and reported when pipeline application is done.

Returns

pandas.DataFrame
The input dataframe, unchanged.
Expand source code
def fit(self, X, y=None, exraise=None, verbose=None, time=None):
    """Fits this pipeline without transforming the input dataframe.

    Parameters
    ----------
    X : pandas.DataFrame
        The dataframe to fit this pipeline by.
    y : array-like, optional
        Targets for supervised learning.
    exraise : bool, default None
        Determines behaviour if the precondition of composing stages is not
        fulfilled by the input dataframe: If True, a
        pdpipe.FailedPreconditionError is raised. If False, the stage is
        skipped. If not given, or set to None, the default behaviour of
        each stage is used, as determined by its 'exraise' constructor
        parameter.
    verbose : bool, default False
        If True an explanation message is printed after the precondition
        of each stage is checked but before its application. Otherwise, no
        messages are printed.
    time : bool, default False
        If True, per-stage application time is measured and reported when
        pipeline application is done.

    Returns
    -------
    pandas.DataFrame
        The input dataframe, unchanged.
    """
    self.fit_transform(
        X=X,
        y=None,
        exraise=exraise,
        verbose=verbose,
        time=time,
    )
    return X
def fit_transform(self, X, y=None, exraise=None, verbose=None, time=False)

Fits this pipeline and transforms the input dataframe.

Parameters

X : pandas.DataFrame
The dataframe to transform and fit this pipeline by.
y : array-like, optional
Targets for supervised learning.
exraise : bool, default None
Determines behaviour if the precondition of composing stages is not fulfilled by the input dataframe: If True, a pdpipe.FailedPreconditionError is raised. If False, the stage is skipped. If not given, or set to None, the default behaviour of each stage is used, as determined by its 'exraise' constructor parameter.
verbose : bool, default False
If True an explanation message is printed after the precondition of each stage is checked but before its application. Otherwise, no messages are printed.
time : bool, default False
If True, per-stage application time is measured and reported when pipeline application is done.

Returns

pandas.DataFrame
The resulting dataframe.
Expand source code
def fit_transform(self, X, y=None, exraise=None, verbose=None, time=False):
    """Fits this pipeline and transforms the input dataframe.

    Parameters
    ----------
    X : pandas.DataFrame
        The dataframe to transform and fit this pipeline by.
    y : array-like, optional
        Targets for supervised learning.
    exraise : bool, default None
        Determines behaviour if the precondition of composing stages is not
        fulfilled by the input dataframe: If True, a
        pdpipe.FailedPreconditionError is raised. If False, the stage is
        skipped. If not given, or set to None, the default behaviour of
        each stage is used, as determined by its 'exraise' constructor
        parameter.
    verbose : bool, default False
        If True an explanation message is printed after the precondition
        of each stage is checked but before its application. Otherwise, no
        messages are printed.
    time : bool, default False
        If True, per-stage application time is measured and reported when
        pipeline application is done.

    Returns
    -------
    pandas.DataFrame
        The resulting dataframe.
    """
    if time:
        return self.__timed_fit_transform(
            X=X, y=y, exraise=exraise, verbose=verbose)
    inter_x = X
    self.application_context = PdpApplicationContext()
    self.fit_context = PdpApplicationContext()
    for i, stage in enumerate(self._stages):
        try:
            stage.fit_context = self.fit_context
            stage.application_context = self.application_context
            inter_x = stage.fit_transform(
                X=inter_x,
                y=None,
                exraise=exraise,
                verbose=verbose,
            )
        except Exception as e:
            raise PipelineApplicationError(
                f"Exception raised in stage [ {i}] {stage}"
            ) from e
    self._post_transform_lock()
    self.is_fitted = True
    return inter_x
def get_transformer(self)

Return the transformer induced by this fitted pipeline.

This transformer is a pdpipe pipeline that transforms input data in a way corresponding to this pipline after it has been fitted. By default this is the pipeline itself, but the transform_getter constructor parameter can be used to return a sub-pipeline of the fitted pipeline instead, for cases where some stages should only be applied when fitting this pipeline to data.

Returns

pdpipe.PdPipeline
The corresponding transformer pipeline induced by this pipeline.
Expand source code
def get_transformer(self):
    """Return the transformer induced by this fitted pipeline.

       This transformer is a `pdpipe` pipeline that transforms input data
       in a way corresponding to this pipline after it has been fitted. By
       default this is the pipeline itself, but the `transform_getter`
       constructor parameter can be used to return a sub-pipeline of the
       fitted pipeline instead, for cases where some stages should only be
       applied when fitting this pipeline to data.

    Returns
    -------
    pdpipe.PdPipeline
        The corresponding transformer pipeline induced by this pipeline.
    """
    try:
        return self._trans_getter(self)
    except TypeError:  # pragma: no cover
        return self
def memory_report(self)

Prints a detailed memory report of the pipeline object to screen.

To get better memory estimates make sure the pympler Python package is installed. Without it, sys.getsizeof is used, which can be extremely underestimate memory size of Python objects.

Expand source code
def memory_report(self):
    """Prints a detailed memory report of the pipeline object to screen.

    To get better memory estimates make sure the pympler Python package is
    installed. Without it, sys.getsizeof is used, which can be extremely
    underestimate memory size of Python objects.
    """
    print("=== Pipeline memory report ===")
    size = asizeof(self)
    if size > 500000:  # pragma: no cover
        print("Total pipeline size in memory: {:.2f}Mb".format(
            size / 1000000))
    elif size > 1000:  # pragma: no cover
        print("Total pipeline size in memory: {:.2f}Kb".format(
            size / 1000))
    else:
        print("Total pipeline size in memory: {:.2f}b".format(
            size))
    print("Per-stage memory structure:")
    print(self._mem_str(total=size))
def transform(self, X, y=None, exraise=None, verbose=None, time=False)

Transforms the given dataframe without fitting this pipeline.

If any stage in this pipeline is fittable but is not fitted, an UnfittedPipelineStageError is raised before transformation starts.

Parameters

X : pandas.DataFrame
The dataframe to transform.
y : array-like, optional
Targets for supervised learning.
exraise : bool, default None
Determines behaviour if the precondition of composing stages is not fulfilled by the input dataframe: If True, a pdpipe.FailedPreconditionError is raised. If False, the stage is skipped. If not given, or set to None, the default behaviour of each stage is used, as determined by its 'exraise' constructor parameter.
verbose : bool, default False
If True an explanation message is printed after the precondition of each stage is checked but before its application. Otherwise, no messages are printed.
time : bool, default False
If True, per-stage application time is measured and reported when pipeline application is done.

Returns

pandas.DataFrame
The resulting dataframe.
Expand source code
def transform(self, X, y=None, exraise=None, verbose=None, time=False):
    """Transforms the given dataframe without fitting this pipeline.

    If any stage in this pipeline is fittable but is not fitted, an
    UnfittedPipelineStageError is raised before transformation starts.

    Parameters
    ----------
    X : pandas.DataFrame
        The dataframe to transform.
    y : array-like, optional
        Targets for supervised learning.
    exraise : bool, default None
        Determines behaviour if the precondition of composing stages is not
        fulfilled by the input dataframe: If True, a
        pdpipe.FailedPreconditionError is raised. If False, the stage is
        skipped. If not given, or set to None, the default behaviour of
        each stage is used, as determined by its 'exraise' constructor
        parameter.
    verbose : bool, default False
        If True an explanation message is printed after the precondition
        of each stage is checked but before its application. Otherwise, no
        messages are printed.
    time : bool, default False
        If True, per-stage application time is measured and reported when
        pipeline application is done.

    Returns
    -------
    pandas.DataFrame
        The resulting dataframe.
    """
    for stage in self._stages:
        if stage._is_fittable() and not stage.is_fitted:
            raise UnfittedPipelineStageError((
                "PipelineStage {} in pipeline is fittable but"
                " unfitted!").format(stage))
    if time:
        return self.__timed_transform(
            X=X, y=y, exraise=exraise, verbose=verbose)
    inter_df = X
    self.application_context = PdpApplicationContext()
    for i, stage in enumerate(self._stages):
        try:
            stage.application_context = self.application_context
            inter_df = stage.transform(
                X=inter_df,
                y=None,
                exraise=exraise,
                verbose=verbose,
            )
        except Exception as e:
            raise PipelineApplicationError(
                f"Exception raised in stage [ {i}] {stage}"
            ) from e
    self._post_transform_lock()
    return inter_df

Inherited members

class PdPipelineStage (exraise=True, exmsg=None, desc=None, prec=None, post=None, skip=None, name='')

A stage of a pandas DataFrame-processing pipeline.

Parameters

exraise : bool, default True
If true, a pdpipe.FailedPreconditionError is raised when this stage is applied to a dataframe for which the precondition does not hold. Otherwise the stage is skipped. Additionally, if true, a pdpipe.FailedPostconditionError is raised if an expected post-condition does not hold for an output dataframe (after pipeline application). Otherwise pipeline application continues uninterrupted.
exmsg : str, default None
The message of the exception that is raised on a failed precondition if exraise is set to True. A default message is used if None is given.
desc : str, default None
A short description of this stage, used as its string representation. A default description is used if None is given.
prec : callable, default None
This can be assigned a callable that returns boolean values for input dataframes, which will be used to determine whether input dataframes satisfy the preconditions for this pipeline stage (see the exraise parameter for the behaviour of failed preconditions). See pdpipe.cond for more information on specialised Condition objects.
post : callable, default None
This can be assigned a callable that returns boolean values for input dataframes, which will be used to determine whether input dataframes satisfy the postconditions for this pipeline stage (see the exraise parameter for the behaviour of failed postconditions). See pdpipe.cond for more information on specialised Condition objects.
skip : callable, default None
This can be assigned a callable that returns boolean values for input dataframes, which will be used to determine whether this stage should be skipped for input dataframes - if the callable returns True for an input dataframe, this stage will be skipped. See pdpipe.cond for more information on specialised Condition objects.
name : str, default ''
The name of this stage. Pipelines can be sliced by this name.

Attributes

fit_context : PdpApplicationContext
An application context object that is only re-initialized before fit_transform calls, and is locked after pipeline application. It is injected into the PipelineStage by the encapsulating pipeline object.
application_context : PdpApplicationContext
An application context object that is re-initialized before every pipeline application (so, also during transform operations of fitted pipelines), and is locked after pipeline application.It is injected into the PipelineStage by the encapsulating pipeline object.
Expand source code
class PdPipelineStage(abc.ABC):
    """A stage of a pandas DataFrame-processing pipeline.

    Parameters
    ----------
    exraise : bool, default True
        If true, a pdpipe.FailedPreconditionError is raised when this
        stage is applied to a dataframe for which the precondition does
        not hold. Otherwise the stage is skipped. Additionally, if true, a
        pdpipe.FailedPostconditionError is raised if an expected post-condition
        does not hold for an output dataframe (after pipeline application).
        Otherwise pipeline application continues uninterrupted.
    exmsg : str, default None
        The message of the exception that is raised on a failed
        precondition if exraise is set to True. A default message is used
        if None is given.
    desc : str, default None
        A short description of this stage, used as its string representation.
        A default description is used if None is given.
    prec : callable, default None
        This can be assigned a callable that returns boolean values for input
        dataframes, which will be used to determine whether input dataframes
        satisfy the preconditions for this pipeline stage (see the `exraise`
        parameter for the behaviour of failed preconditions). See `pdpipe.cond`
        for more information on specialised Condition objects.
    post : callable, default None
        This can be assigned a callable that returns boolean values for input
        dataframes, which will be used to determine whether input dataframes
        satisfy the postconditions for this pipeline stage (see the `exraise`
        parameter for the behaviour of failed postconditions). See
        `pdpipe.cond` for more information on specialised Condition objects.
    skip : callable, default None
        This can be assigned a callable that returns boolean values for input
        dataframes, which will be used to determine whether this stage should
        be skipped for input dataframes - if the callable returns True for an
        input dataframe, this stage will be skipped. See `pdpipe.cond` for more
        information on specialised Condition objects.
    name : str, default ''
        The name of this stage. Pipelines can be sliced by this name.

    Attributes
    ----------
    fit_context : `PdpApplicationContext`
        An application context object that is only re-initialized before
        `fit_transform` calls, and is locked after pipeline application. It is
        injected into the PipelineStage by the encapsulating pipeline object.
    application_context : `PdpApplicationContext`
        An application context object that is re-initialized before every
        pipeline application (so, also during transform operations of fitted
        pipelines), and is locked after pipeline application.It is injected
        into the PipelineStage by the encapsulating pipeline object.
    """

    _DEF_EXC_MSG = 'Precondition failed in stage {}!'
    _DEF_DESCRIPTION = 'A pipeline stage.'
    _INIT_KWARGS = ['exraise', 'exmsg', 'desc', 'prec', 'skip', 'name']

    def __init__(self, exraise=True, exmsg=None, desc=None, prec=None,
                 post=None, skip=None, name=''):
        if not isinstance(name, str):
            raise ValueError(
                f"'name' must be a str, not {type(name).__name__}."
            )
        if desc is None:
            desc = PdPipelineStage._DEF_DESCRIPTION
        if exmsg is None:
            exmsg = PdPipelineStage._DEF_EXC_MSG.format(desc)

        self._exraise = exraise
        self._exmsg = exmsg
        self._exmsg_post = exmsg.replace(
            'precondition', 'postcondition').replace(
            'Precondition', 'Postcondition')
        self._desc = desc
        self._prec_arg = prec
        self._post_arg = post
        self._skip = skip
        self._appmsg = f"{name + ': ' if name else ''}{desc}"
        self._name = name
        self.fit_context: PdpApplicationContext = None
        self.application_context: PdpApplicationContext = None
        self.is_fitted = False

    @classmethod
    def _init_kwargs(cls):
        return cls._INIT_KWARGS

    @abc.abstractmethod
    def _prec(self, df):  # pylint: disable=R0201,W0613
        """Returns True if this stage can be applied to the given dataframe."""
        raise NotImplementedError

    def _compound_prec(self, df):
        if self._prec_arg:
            return self._prec_arg(df)
        return self._prec(df)

    def _post(self, df):  # pylint: disable=R0201,W0613
        """Returns True if this stage resulted in an expected output frame."""
        return True

    def _compound_post(self, df):
        if self._post_arg:
            return self._post_arg(df)
        return self._post(df)

    def _fit_transform(self, df, verbose):
        """Fits this stage and transforms the input dataframe."""
        return self._transform(df, verbose)

    def _is_fittable(self):
        if self.__class__._fit_transform == PdPipelineStage._fit_transform:
            return False
        return True

    def _raise_precondition_error(self):
        try:
            raise FailedPreconditionError(
                f"{self._exmsg} [Reason] {self._prec_arg.error_message}")
        except AttributeError:
            raise FailedPreconditionError(self._exmsg)

    def _raise_postcondition_error(self):
        try:
            raise FailedPostconditionError(
                f"{self._exmsg_post} [Reason] {self._post_arg.error_message}")
        except AttributeError:
            raise FailedPostconditionError(self._exmsg_post)

    @abc.abstractmethod
    def _transform(self, df, verbose):
        """Transforms the given dataframe without fitting this stage."""
        raise NotImplementedError("_transform method not implemented!")

    def apply(self, df, exraise=None, verbose=False):
        """Applies this pipeline stage to the given dataframe.

        If the stage is not fitted fit_transform is called. Otherwise,
        transform is called.

        Parameters
        ----------
        df : pandas.DataFrame
            The dataframe to which this pipeline stage will be applied.
        exraise : bool, default None
            Override preconditions and postconditions behaviour for this call.
            If None, the default behaviour of this stage is used, as determined
            by the exraise constructor parameter.
        verbose : bool, default False
            If True an explanation message is printed after the precondition
            is checked but before the application of the pipeline stage.
            Defaults to False.

        Returns
        -------
        pandas.DataFrame
            The resulting dataframe.
        """
        if exraise is None:
            exraise = self._exraise
        if self._skip and self._skip(df):
            return df
        if self._compound_prec(df=df):
            if verbose:
                msg = '- ' + '\n  '.join(textwrap.wrap(self._appmsg))
                print(msg, flush=True)
            if self.is_fitted:
                res_df = self._transform(df, verbose=verbose)
            else:
                res_df = self._fit_transform(df, verbose=verbose)
            if exraise and not self._compound_post(df=res_df):
                self._raise_postcondition_error()
            return res_df
        if exraise:
            self._raise_precondition_error()
        return df

    __call__ = apply

    def fit_transform(self, X, y=None, exraise=None, verbose=False):
        """Fits this stage and transforms the given dataframe.

        Parameters
        ----------
        X : pandas.DataFrame
            The dataframe to transform and fit this pipeline stage by.
        y : array-like, optional
            Targets for supervised learning.
        exraise : bool, default None
            Override preconditions and postconditions behaviour for this call.
            If None, the default behaviour of this stage is used, as determined
            by the exraise constructor parameter.
        verbose : bool, default False
            If True an explanation message is printed after the precondition
            is checked but before the application of the pipeline stage.
            Defaults to False.

        Returns
        -------
        pandas.DataFrame
            The resulting dataframe.
        """
        if exraise is None:
            exraise = self._exraise
        if self._compound_prec(X):
            if verbose:
                msg = '- ' + '\n  '.join(textwrap.wrap(self._appmsg))
                print(msg, flush=True)
            res_df = self._fit_transform(X, verbose=verbose)
            if exraise and not self._compound_post(df=res_df):
                self._raise_postcondition_error()
            return res_df
        if exraise:
            self._raise_precondition_error()
        return X

    def fit(self, X, y=None, exraise=None, verbose=False):
        """Fits this stage without transforming the given dataframe.

        Parameters
        ----------
        X : pandas.DataFrame
            The dataframe to be transformed.
        y : array-like, optional
            Targets for supervised learning.
        exraise : bool, default None
            Override preconditions and postconditions behaviour for this call.
            If None, the default behaviour of this stage is used, as determined
            by the exraise constructor parameter.
        verbose : bool, default False
            If True an explanation message is printed after the precondition
            is checked but before the application of the pipeline stage.
            Defaults to False.

        Returns
        -------
        pandas.DataFrame
            The resulting dataframe.
        """
        if exraise is None:
            exraise = self._exraise
        if self._compound_prec(X):
            if verbose:
                msg = '- ' + '\n  '.join(textwrap.wrap(self._appmsg))
                print(msg, flush=True)
            res_df = self._fit_transform(X, verbose=verbose)
            if exraise and not self._compound_post(df=res_df):
                self._raise_postcondition_error()
            return X
        if exraise:
            self._raise_precondition_error()
        return X

    def transform(self, X, y=None, exraise=None, verbose=False):
        """Transforms the given dataframe without fitting this stage.

        If this stage is fittable but is not fitter, an
        UnfittedPipelineStageError is raised.

        Parameters
        ----------
        X : pandas.DataFrame
            The dataframe to be transformed.
        y : array-like, optional
            Targets for supervised learning.
        exraise : bool, default None
            Override preconditions and postconditions behaviour for this call.
            If None, the default behaviour of this stage is used, as determined
            by the exraise constructor parameter.
        verbose : bool, default False
            If True an explanation message is printed after the precondition
            is checked but before the application of the pipeline stage.
            Defaults to False.

        Returns
        -------
        pandas.DataFrame
            The resulting dataframe.
        """
        if exraise is None:
            exraise = self._exraise
        if self._compound_prec(X):
            if verbose:
                msg = '- ' + '\n  '.join(textwrap.wrap(self._appmsg))
                print(msg, flush=True)
            if self._is_fittable():
                if self.is_fitted:
                    res_df = self._transform(X, verbose=verbose)
                    if exraise and not self._compound_post(df=res_df):
                        self._raise_postcondition_error()
                    return res_df
                raise UnfittedPipelineStageError(
                    "transform of an unfitted pipeline stage was called!")
            res_df = self._transform(X, verbose=verbose)
            if exraise and not self._compound_post(df=res_df):
                self._raise_postcondition_error()
            return res_df
        if exraise:
            self._raise_precondition_error()
        return X

    def __add__(self, other):
        if isinstance(other, PdPipeline):
            return PdPipeline([self, *other._stages])
        if isinstance(other, PdPipelineStage):
            return PdPipeline([self, other])
        return NotImplemented

    def __str__(self):
        return f"PdPipelineStage: {self._desc}"

    def __repr__(self):
        return self.__str__()

    def description(self):
        """Returns the description of this pipeline stage"""
        return self._desc

    def _mem_str(self):
        total = asizeof(self)
        lines = []
        for a in dir(self):
            if not a.startswith('__'):
                att = getattr(self, a)
                if not callable(att):
                    size = asizeof(att)
                    if size > 500000:  # pragma: no cover
                        lines.append('  - {}, {:.2f}Mb ({:0>5.2f}%)\n'.format(
                            a, size / 1000000, 100 * size / total))
                    elif size > 1000:  # pragma: no cover
                        lines.append('  - {}, {:.2f}Kb ({:0>5.2f}%)\n'.format(
                            a, size / 1000, 100 * size / total))
                    else:
                        lines.append('  - {}, {}b ({:0>5.2f}%)\n'.format(
                            a, size, 100 * size / total))
        return ''.join(lines)

Ancestors

  • abc.ABC

Subclasses

Methods

def AdHocStage(self, transform, fit_transform=None, prec=None, **kwargs)

Creates and adds an ad-hoc stage of a pandas DataFrame-processing pipeline to this pipeline stage.

The signature for both the transform and the optional fit_transform callables is adaptive: The first argument is used positionally (so no specific name is assumed or used) to supply the callable with the pandas DataFrame object to transform. The following additional keyword arguments are supplied if the are included in the callable's signature: verbose - Passed on from PdPipelineStage's fit, fit_transform and apply methods.

fit_context and application_context - Provides fit-specific and application-specific contexts (see PdpApplicationContext) usually available to pipeline stages using self.fit_context and self.application_context.

Parameters

transform : callable
The transformation this stage applies to dataframes. If the fit_transform parameter is also populated than this transformation is only applied on calls to transform. See documentation for the exact signature.
fit_transform : callable, optional
The transformation this stage applies to dataframes, only on fit_transform. Optional. See documentation for the exact signature.
prec : callable, default None
A callable that returns a boolean value. Represent a a precondition used to determine whether this stage can be applied to a given dataframe. If None is given, set to a function always returning True.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1, 'a'], [2, 'b']], [1, 2], ['num', 'char'])
>>> drop_num = pdp.AdHocStage(
...   transform=lambda df: df.drop(['num'], axis=1),
...   prec=lambda df: 'num' in df.columns
... )
>>> drop_num.apply(df)
  char
1    a
2    b
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def AggByCols(self, columns, func, result_columns=None, drop=True, func_desc=None, suffix=None, **kwargs)

Creates and adds a pipeline stage applying a series-wise function to columns to this pipeline stage. For applying element-wise function, see ApplyByCols.

Parameters

columns : single label, list-like or callable
Column labels in the DataFrame to be transformed. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq.
func : function
The function to be applied to each of the given columns. Must work when given a pandas.Series object and return either a Scaler or `pandas.Series``. If a Scaler is returned, the result is broadcasted into a column of the original length.
result_columns : str or list-like, default None
The names of the new columns resulting from the mapping operation. Must be of the same length as columns. If None, behavior depends on the drop parameter: If drop is True, the name of the source column is used; otherwise, the name of the source column is used with a defined suffix.
drop : bool, default True
If set to True, source columns are dropped after being mapped.
func_desc : str, default None
A function description of the given function; e.g. 'normalizing revenue by company size'. A default description is used if None is given.
suffix : str, optional
The suffix to add to resulting columns in case where results_columns is None and drop is set to False. Of not given, defaults to '_agg'.

Example

>>> import pandas as pd; import pdpipe as pdp; import numpy as np;
>>> data = [[3.2, "acd"], [7.2, "alk"], [12.1, "alk"]]
>>> df = pd.DataFrame(data, [1,2,3], ["ph","lbl"])
>>> log_ph = pdp.AggByCols("ph", np.log)
>>> log_ph(df)
         ph  lbl
1  1.163151  acd
2  1.974081  alk
3  2.493205  alk

>>> min_ph = pdp.AggByCols("ph", min, drop=False, suffix='_min')
>>> min_ph(df)
     ph  ph_min  lbl
1   3.2     3.2  acd
2   7.2     3.2  alk
3  12.1     3.2  alk
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def ApplyByCols(self, columns, func, result_columns=None, drop=True, func_desc=None, suffix=None, **kwargs)

Creates and adds a pipeline stage applying an element-wise function to columns to this pipeline stage. For applying series-wise function, see AggByCols.

Parameters

columns : single label, list-like or callable
Column labels in the DataFrame to be transformed. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq.
func : function
The function to be applied to each element of the given columns.
result_columns : str or list-like, default None
The names of the new columns resulting from the mapping operation. Must be of the same length as columns. If None, behavior depends on the drop parameter: If drop is True, the name of the source column is used; otherwise, the name of the source column is used with the suffix '_app'.
drop : bool, default True
If set to True, source columns are dropped after being mapped.
func_desc : str, default None
A function description of the given function; e.g. 'normalizing revenue by company size'. Optional.
suffix : str, default None
If provided, this string is concated to resulting column labels instead of '_app'.

Example

>>> import pandas as pd; import pdpipe as pdp; import math;
>>> data = [[3.2, "acd"], [7.2, "alk"], [12.1, "alk"]]
>>> df = pd.DataFrame(data, [1,2,3], ["ph","lbl"])
>>> round_ph = pdp.ApplyByCols("ph", math.ceil)
>>> round_ph(df)
   ph  lbl
1   4  acd
2   8  alk
3  13  alk
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def ApplyToRows(self, func, colname=None, follow_column=None, func_desc=None, prec=None, **kwargs)

Creates and adds a pipeline stage generating columns by applying a function to each row to this pipeline stage.

Parameters

func : function
The function to be applied to each row of the processed DataFrame.
colname : single label, default None
The label of the new column resulting from the function application. If None, 'new_col' is used. Ignored if a DataFrame is generated by the function (i.e. each row generates a Series rather than a value), in which case the label of each column in the resulting DataFrame is used.
follow_column : str, default None
Resulting columns will be inserted after this column. If None, new columns are inserted at the end of the processed DataFrame.
func_desc : str, default None
A function description of the given function; e.g. 'normalizing revenue by company size'. A default description is used if None is given.
prec : function, default None
A function taking a DataFrame, returning True if this stage is applicable to the given DataFrame. If None is given, a function always returning True is used.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> data = [[3, 2143], [10, 1321], [7, 1255]]
>>> df = pd.DataFrame(data, [1,2,3], ['years', 'avg_revenue'])
>>> total_rev = lambda row: row['years'] * row['avg_revenue']
>>> add_total_rev = pdp.ApplyToRows(total_rev, 'total_revenue')
>>> add_total_rev(df)
   years  avg_revenue  total_revenue
1      3         2143           6429
2     10         1321          13210
3      7         1255           8785

>>> def halfer(row):
...     new = {'year/2': row['years']/2, 'rev/2': row['avg_revenue']/2}
...     return pd.Series(new)
>>> half_cols = pdp.ApplyToRows(halfer, follow_column='years')
>>> half_cols(df)
   years   rev/2  year/2  avg_revenue
1      3  1071.5     1.5         2143
2     10   660.5     5.0         1321
3      7   627.5     3.5         1255
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def Bin(self, bin_map, drop=True, **kwargs)

Creates and adds a pipeline stage that adds a binned version of a column or columns to this pipeline stage.

If drop is set to True, the new columns retain the names of the source columns; otherwise, the resulting column gain the suffix '_bin'

Parameters

bin_map : dict
Maps column labels to bin arrays. The bin array is interpreted as containing start points of consecutive bins, except for the final point, assumed to be the end point of the last bin. Additionally, a bin array implicitly projects a left-most bin containing all elements smaller than the left-most end point and a right-most bin containing all elements larger that the right-most end point. For example, the list [0, 5, 8] is interpreted as the bins (-∞, 0), [0-5), [5-8) and [8, ∞).
drop : bool, default True
If set to True, the source columns are dropped after being binned.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[-3],[4],[5],[9]], [1,2,3,4], ['speed'])
>>> pdp.Bin({'speed': [5]}, drop=False).apply(df)
   speed speed_bin
1     -3        <5
2      4        <5
3      5        5≤
4      9        5≤
>>> pdp.Bin({'speed': [0,5,8]}, drop=False).apply(df)
   speed speed_bin
1     -3        <0
2      4       0-5
3      5       5-8
4      9        8≤
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def ColByFrameFunc(self, column, func, follow_column=None, before_column=None, func_desc=None, **kwargs)

Creates and adds a pipeline stage adding a column by applying a dataframe-wide function to this pipeline stage.

Note that assigning column with the label of an existing column and providing the same label to the before_column parameter will result in replacing the original column at the same location.

Parameters

column : str
The label of the resulting column. If its the label of an existing column it will replace that column.
func : function
The function to be applied to the input dataframe. The function should return a pandas.Series object.
follow_column : str, default None
Resulting columns will be inserted after this column. If both this parameter and before_column are None, new columns are inserted at the end of the processed DataFrame.
before_column : str, default None
Resulting columns will be inserted before this column. If both this parameter and follow_colum are None, new columns are inserted at the end of the processed DataFrame. If both are provided, before_column takes precedence.
func_desc : str, default None
A function description of the given function; e.g. 'normalizing revenue by company size'. A default description is used if None is given.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> data = [[3, 3], [2, 4], [1, 5]]
>>> df = pd.DataFrame(data, [1,2,3], ["A","B"])
>>> func = lambda df: df['A'] == df['B']
>>> add_equal = pdp.ColByFrameFunc("A==B", func)
>>> add_equal(df)
   A  B   A==B
1  3  3   True
2  2  4  False
3  1  5  False
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def ColDrop(self, columns: Union[object, List[object], Callable], errors: Optional[str] = None, **kwargs: object)

Creates and adds a pipeline stage that drops columns by name to this pipeline stage.

Parameters

columns : single label, list-like or callable
The label, or an iterable of labels, of columns to drop. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame (see pdpipe.cq).
errors : {‘ignore’, ‘raise’}, default ‘raise’
If ‘ignore’, suppress error and existing labels are dropped.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'char'])
>>> pdp.ColDrop('num').apply(df)
  char
1    a
2    b
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def ColRename(self, rename_mapper: Union[Dict[~KT, ~VT], Callable], **kwargs)

Creates and adds a pipeline stage that renames a column or columns to this pipeline stage.

Parameters

rename_mapper : dict-like or callable
Maps old column names to new ones.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'char'])
>>> pdp.ColRename({'num': 'len', 'char': 'initial'}).apply(df)
   len initial
1    8       a
2    5       b

>>> def renamer(lbl: str):
...    if lbl.startswith('n'):
...       return 'foo'
...    return lbl
>>> pdp.ColRename(renamer).apply(df)
   foo char
1    8    a
2    5    b
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def ColReorder(self, positions, **kwargs)

Creates and adds a pipeline stage that reorders columns to this pipeline stage.

Parameters

positions : dict
A mapping of column names to their desired positions after reordering. Columns not included in the mapping will maintain their relative positions over the non-mapped colums.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[8,4,3,7]], columns=['a', 'b', 'c', 'd'])
>>> pdp.ColReorder({'b': 0, 'c': 3}).apply(df)
   b  a  d  c
0  4  8  7  3
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def ColumnDtypeEnforcer(self, column_to_dtype: Dict[~KT, ~VT], errors: Optional[str] = 'raise', **kwargs: object)

Creates and adds a pipeline stage enforcing column dtypes to this pipeline stage.

Parameters

column_to_dtype : dict of labels / ColumnQualifiers to dtypes
Use {col: dtype, …}, where col is a column label and dtype is a numpy.dtype or Python type to cast one or more of the DataFrame’s columns to column-specific types. Alternatively, you can provide ColumnQualifier objects as keys. If at least one such key is present, the lbl-to-dtype dict is dynamically inferred each time the pipeline stage is applied (note that ColumnQualifier objects are fittable by default, so to have column labels re-inferred after the first stage application you'll have to set fittable=False for the ColumnQualifier you use, see pdpipe.cq).
errors : {‘raise’, ‘ignore’}, default ‘raise’
Control raising of exceptions on invalid data for provided dtype. - raise : allow exceptions to be raised - ignore : suppress exceptions. On error return original object.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'initial'])
>>> pdp.ColumnDtypeEnforcer({'num': float}).apply(df)
   num initial
1  8.0       a
2  5.0       b

>>> pdp.ColumnDtypeEnforcer({pdp.cq.StartWith('n'): float}).apply(df)
   num initial
1  8.0       a
2  5.0       b
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def ColumnTransformer(self, columns, result_columns=None, drop=True, suffix=None, **kwargs)

Creates and adds a pipeline stage that applies transformation to dataframe columns to this pipeline stage.

Parameters

columns : single label, list-like or callable
Column labels in the DataFrame to be transformed. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq. If None is provided all input columns are transformed.
result_columns : single label or list-like, default None
Labels for the new columns resulting from the transformations. Must be of the same length as columns. If None, behavior depends on the drop parameter: If drop is True, then the label of the source column is used; otherwise, the provided 'suffix' is concatenated to the label of the source column.
drop : bool, default True
If set to True, source columns are dropped after being transformed.
suffix : str, default '_transformed'
The suffix transformed columns gain if no new column labels are given.
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def ColumnsBasedPipelineStage(self, columns, exclude_columns=None, desc_temp=None, none_columns='error', **kwargs)

Creates and adds a pipeline stage that operates on a subset of dataframe columns to this pipeline stage.

Parameters

columns : single label, iterable or callable
The label, or an iterable of labels, of columns to use. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq.
exclude_columns : single label, iterable or callable, optional
The label, or an iterable of labels, of columns to exclude, given the columns parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. See pdpipe.cq. Optional. By default no columns are excluded.
desc_temp : str, optional
If given, assumed to be a format string, and every appearance of {} in it is replaced with an appropriate string representation of the columns parameter, and is used as the pipeline description. Ignored if desc is provided.
none_columns : iterable, callable or str, default 'error'
Determines how None values supplied to the 'columns' parameter should be handled. If set to 'error', the default, a ValueError is raised if None is encountered. If set to 'all', it is interpreted to mean all columns of input dataframes should be operated on. If an iterable is provided it is interpreted as the default list of columns to operate on when columns=None. If a callable is provided, it is interpreted as the default column qualifier that determines input columns when columns=None.
**kwargs
Additionally supports all constructor parameters of PdPipelineStage.
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def ConditionValidator(self, conditions: Union[Callable, List[Callable]], reducer: Optional[Callable] = <built-in function all>, errors: Optional[str] = 'raise', **kwargs: object)

Creates and adds a pipeline stage that validates boolean conditions on dataframes to this pipeline stage.

The stage does not change the input dataframe in any way.

The constructor expects either a single callable or a list-like of callable objects, and checks that all these callable return True - meaning all defined conditions hold - for input dataframes.

Naturally, pdpipe Condition objects from the pdpipe.cond module can be used.

Parameters

conditions : callable or list-like of callable
The conditions to check for input dataframes. Naturally, pdpipe Condition objects from the pdpipe.cond module can be used.
reducer : callable, optional
The callable that reduces the list of boolean result to a single result. By default the built-in all function is used, so all conditions must hold for this pipeline stage to validate an input dataframe. The built-in any function may be used to validate at least one condition holds, and of course custom reducing functions can be used.
errors : str, default 'raise'
If set to 'raise', the default, then if the result boolean result is False a FailedConditionError is raised on stage application. If set to 'ignore', then conditions are checked, the results are printed if the application was called with verbose=True, and pipeline application continues. Any other value is interpreted as 'raise'.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1,4],[4,None],[1,11]], [1,2,3], ['a','b'])
>>> pdp.ConditionValidator(lambda df: len(df.columns) == 5).apply(df)
Traceback (most recent call last):
   ...
pdpipe.exceptions.FailedConditionError: ConditionValidator stage failed; some conditions did not hold for the input dataframe!

>>> pdp.ConditionValidator(pdp.cond.HasNoMissingValues()).apply(df)
Traceback (most recent call last):
   ...
pdpipe.exceptions.FailedConditionError: ConditionValidator stage failed; some conditions did not hold for the input dataframe!
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def DropDuplicates(self, columns=None, **kwargs)

Drop duplicates in the given columns to this pipeline stage.

Parameters

columns : column label or sequence of labels, optional
The labels of the columns to consider for duplication drop. If not populated, duplicates are dropped from all columns.
exclude_columns : object, iterable or callable, optional
The label, or an iterable of labels, of columns to exclude, given the columns parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. See pdpipe.cq. Optional. By default no columns are excluded.

Examples

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[8, 1],[8, 2], [9, 2]], [1,2,3], ['a', 'b'])
>>> pdp.DropDuplicates('a').apply(df)
   a  b
1  8  1
3  9  2
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def DropNa(self, **kwargs)

Creates and adds a pipeline stage that drops null values to this pipeline stage.

Supports all parameter supported by pandas.dropna function.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1,4],[4,None],[1,11]], [1,2,3], ['a','b'])
>>> pdp.DropNa().apply(df)
   a     b
1  1   4.0
3  1  11.0
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def DropRareTokens(self, columns, threshold, drop=True, **kwargs)

Creates and adds a pipeline stage that drop rare tokens from token lists to this pipeline stage.

Target columns must be series of token lists; i.e. every cell in the series is an iterable of string tokens.

Note: The nltk package must be installed for this pipeline stage to work.

Parameters

columns : single label, list-like or callable
Column labels in the DataFrame to be transformed. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq.
threshold : int
The rarity threshold to use. Only tokens appearing more than this number of times in a column will remain in token lists in that column.
drop : bool, default True
If set to True, the source columns are dropped after being transformed, and the resulting columns retain the names of the source columns. Otherwise, the new columns gain the suffix '_norare'.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> data = [[7, ['a', 'a', 'b']], [3, ['b', 'c', 'd']]]
>>> df = pd.DataFrame(data, columns=['num', 'chars'])
>>> rare_dropper = pdp.DropRareTokens('chars', 1)
>>> rare_dropper(df)
   num      chars
0    7  [a, a, b]
1    3        [b]
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def DropTokensByLength(self, columns, min_len, max_len=None, result_columns=None, drop=True, **kwargs)

Creates and adds a pipeline stage removing tokens by length in string-token list columns to this pipeline stage.

Parameters

columns : single label, list-like or callable
Names of token list columns on which to apply token filtering. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq.
min_len : int
The minimum length of tokens to keep. Tokens of shorter length are removed from all token lists.
max_len : int, default None
The maximum length of tokens to keep. If provided, tokens of longer length are removed from all token lists.
result_columns : str or list-like, default None
The names of the new columns resulting from the mapping operation. Must be of the same length as columns. If None, behavior depends on the drop parameter: If drop is True, the name of the source column is used; otherwise, the name of the source column is used with the suffix '_filtered'.
drop : bool, default True
If set to True, source columns are dropped after being transformed.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> data = [[4, ["a", "bad", "nice"]], [5, ["good", "university"]]]
>>> df = pd.DataFrame(data, [1,2], ["age","text"])
>>> filter_tokens = pdp.DropTokensByLength('text', 3, 5)
>>> filter_tokens(df)
   age         text
1    4  [bad, nice]
2    5       [good]
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def DropTokensByList(self, columns, bad_tokens, result_columns=None, drop=True, **kwargs)

Creates and adds a pipeline stage removing specific tokens in string-token list columns to this pipeline stage.

Parameters

columns : single label, list-like or callable
Names of token list columns on which to apply token filtering. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq.
bad_tokens : list of str
The list of string tokens to remove from all token lists.
result_columns : str or list-like, default None
The names of the new columns resulting from the mapping operation. Must be of the same length as columns. If None, behavior depends on the drop parameter: If drop is True, the name of the source column is used; otherwise, the name of the source column is used with the suffix '_filtered'.
drop : bool, default True
If set to True, source columns are dropped after being transformed.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> data = [[4, ["a", "bad", "cat"]], [5, ["bad", "not", "good"]]]
>>> df = pd.DataFrame(data, [1,2], ["age","text"])
>>> filter_tokens = pdp.DropTokensByList('text', ['bad'])
>>> filter_tokens(df)
   age         text
1    4     [a, cat]
2    5  [not, good]
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def Encode(self, columns=None, exclude_columns=None, drop=True, **kwargs)

Creates and adds a pipeline stage that encodes categorical columns to integer values to this pipeline stage.

The encoder for each column is saved in the attribute 'encoders', which is a dict mapping each encoded column name to the sklearn.preprocessing.LabelEncoder object used to encode it.

Parameters

columns : single label, list-like or callable, default None
Column labels in the DataFrame to be encoded. If columns is None then all the columns with object or category dtype will be converted, except those given in the exclude_columns parameter. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq.
exclude_columns : single label, list-like or callable, default None
Label or labels of columns to be excluded from encoding. If None then no column is excluded. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq.
drop : bool, default True
If set to True, the source columns are dropped after being encoded, and the resulting encoded columns retain the names of the source columns. Otherwise, encoded columns gain the suffix '_enc'.

Attributes

encoders : dict
A dictionary mapping each encoded column name to the corresponding sklearn.preprocessing.LabelEncoder object. Empty object if not fitted.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> data = [[3.2, "acd"], [7.2, "alk"], [12.1, "alk"]]
>>> df = pd.DataFrame(data, [1,2,3], ["ph","lbl"])
>>> encode_stage = pdp.Encode("lbl")
>>> encode_stage(df)
     ph  lbl
1   3.2    0
2   7.2    1
3  12.1    1
>>> encode_stage.encoders["lbl"].inverse_transform([0,1,1])
array(['acd', 'alk', 'alk'], dtype=object)
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def FitOnly(self, stage, **kwargs)

Creates and adds a wrapper that applies a stage to input data only when fitting to this pipeline stage.

In other words, the input data is not transformed if the stage has already been fitted once.

Parameters

stage : PdPipelineStage
The pipeline stage to operate on input data only when fitting.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'char'])
>>> stage = pdp.FitOnly(pdp.ColDrop('num'))
>>> stage(df)
  char
1    a
2    b
>>> df2 = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'char'])
>>> stage(df2)
   num char
1    8    a
2    5    b
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def FreqDrop(self, threshold: int, column: str, **kwargs)

Creates and adds a pipeline stage that drops rows by value frequency to this pipeline stage.

Parameters

threshold : int
The minimum frequency required for a value to be kept.
column : str
The name of the colum to check for the given value frequency.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1,4],[4,5],[1,11]], [1,2,3], ['a','b'])
>>> pdp.FreqDrop(2, 'a').apply(df)
   a   b
1  1   4
3  1  11
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def Log(self, columns=None, exclude_columns=None, drop=False, non_neg=False, const_shift=None, **kwargs)

Creates and adds a pipeline stage that log-transforms numeric data to this pipeline stage.

Parameters

columns : single label, list-like or callable, default None
Column names in the DataFrame to be encoded. If columns is None then all the columns with a numeric dtype will be transformed, except those given in the exclude_columns parameter. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq.
exclude_columns : single label, list-like or callable, default None
Label or labels of columns to be excluded from encoding. If None then no column is excluded. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq. Optional.
drop : bool, default False
If set to True, the source columns are dropped after being encoded, and the resulting encoded columns retain the names of the source columns. Otherwise, encoded columns gain the suffix '_log'.
non_neg : bool, default False
If True, each transformed column is first shifted by the smallest negative value it includes (non-negative columns are thus not shifted).
const_shift : int, optional
If given, each transformed column is first shifted by this constant. If non_neg is True then that transformation is applied first, and only then is the column shifted by this constant.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> data = [[3.2, "acd"], [7.2, "alk"], [12.1, "alk"]]
>>> df = pd.DataFrame(data, [1,2,3], ["ph","lbl"])
>>> log_stage = pdp.Log("ph", drop=True)
>>> log_stage(df)
         ph  lbl
1  1.163151  acd
2  1.974081  alk
3  2.493205  alk
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def MapColVals(self, columns: Union[object, List[object], Callable], value_map: Union[dict, pandas.core.series.Series, Callable, str, Tuple[str, dict]], result_columns: Union[object, List[object], None] = None, drop: Optional[bool] = True, suffix: Optional[str] = None, **kwargs: Dict[str, object])

Creates and adds a pipeline stage that replaces the values of a column by a map to this pipeline stage.

Parameters

columns : single label, list-like or callable
Column labels in the DataFrame to be mapped. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq. If None is provided all input columns are mapped.
value_map : dict, pandas.Series, callable, str or tuple
The value-to-value map to use, mapping existing values to new one. If a dictionary is provided, its mapping is used. Values not in the dictionary as keys will be converted to NaN. If a Series is given, values are mapped by its index to its values. If a callable is given, it is applied element-wise to given columns. If a string is given, it is interpreted as the name of an attribute or a property of the series values to use as target values. If a tuple is provided, its first element is expected to be a string, interpreted as a name of a method of the series values to call, and its second element is expected to be a dict - possibly empty - mapping additional keyword arguments names to their values.
result_columns : single label or list-like, default None
Labels for the new columns resulting from the mapping operation. Must be of the same length as columns. If None, behavior depends on the drop parameter: If drop is True, then the label of the source column is used; otherwise, the label of the source column is used with the suffix given ("_map" by default).
drop : bool, default True
If set to True, source columns are dropped after being mapped.
suffix : str, default '_map'
The suffix mapped columns gain if no new column labels are given.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1], [3], [2]], ['UK', 'USSR', 'US'], ['Medal'])
>>> value_map = {1: 'Gold', 2: 'Silver', 3: 'Bronze'}
>>> pdp.MapColVals('Medal', value_map).apply(df)
       Medal
UK      Gold
USSR  Bronze
US    Silver

>>> from datetime import timedelta;
>>> df = pd.DataFrame(
...    data=[
...       [timedelta(weeks=2)],
...       [timedelta(weeks=4)],
...       [timedelta(weeks=10)]
...    ],
...    index=['proposal', 'midterm', 'finals'],
...    columns=['Due'],
... )
>>> pdp.MapColVals('Due', ('total_seconds', {})).apply(df)
                Due
proposal  1209600.0
midterm   2419200.0
finals    6048000.0
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def OneHotEncode(self, columns=None, dummy_na=False, exclude_columns=None, drop_first=True, drop=True, **kwargs)

Creates and adds a pipeline stage that one-hot-encodes categorical columns to this pipeline stage.

By default only k-1 dummies are created fo k categorical levels, as to avoid perfect multicollinearity between the dummy features (also called the dummy variable trap). This is done since features are usually one-hot encoded for use with linear models, which require this behaviour.

Parameters

columns : single label, list-like or callable, default None
Column labels in the DataFrame to be encoded. If columns is None then all the columns with object or category dtype will be converted, except those given in the exclude_columns parameter. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq.
dummy_na : bool, default False
Add a column to indicate NaNs, if False NaNs are ignored.
exclude_columns : single label, list-like or callable, default None
Label or labels of columns to be excluded from encoding. If None then no column is excluded. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq. Optional.
drop_first : bool or single label, default True
Whether to get k-1 dummies out of k categorical levels by removing the first level. If a non bool argument matching one of the categories is provided, the dummy column corresponding to this value is dropped instead of the first level; if it matches no category the first category will still be dropped.
drop : bool, default True
If set to True, the source columns are dropped after being encoded.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([['USA'], ['UK'], ['Greece']], [1,2,3], ['Born'])
>>> pdp.OneHotEncode().apply(df)
   Born_UK  Born_USA
1        0         1
2        1         0
3        0         0
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def PdPipeline(self, stages, transformer_getter=None, **kwargs)

Creates and adds a pipeline for processing pandas DataFrame objects to this pipeline stage.

transformer_getter is useful to avoid applying pipeline stages that are aimed to filter out items in a big dataset to create a training set for a machine learning model, for example, but should not be applied on future individual items to be transformed by the fitted pipeline.

Parameters

stages : list
A list of PdPipelineStage objects making up this pipeline.
transform_getter : callable, optional
A callable that can be applied to the fitted pipeline to produce a sub-pipeline of it which should be used to transform dataframes after the pipeline has been fitted. If not given, the fitted pipeline is used entirely.
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def RegexReplace(self, columns: Union[object, List[object], Callable], pattern: str, replace: str, flags: Optional[int] = 0, result_columns: Union[object, List[object], None] = None, drop: Optional[bool] = True, func_desc: Optional[str] = None, **kwargs)

Creates and adds a pipeline stage replacing regex occurences in a text column to this pipeline stage.

Parameters

columns : single label, list-like or callable
Column labels in the DataFrame which regex replacement be applied to. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq.
pattern : str
The regex whose occurences will be replaced.
replace : str
The replacement string to use. This is equivalent to repl in re.sub.
flags : int, default 0
Regex flags that are compatible with Python's re module.
result_columns : label or list-like of labels, default None
The labels of the new columns resulting from the mapping operation. Must be of the same length as columns. If None, behavior depends on the drop parameter: If drop is True, the label of the source column is used; otherwise, the label of the source column is casted to a string and concatenated with the suffix '_reg'.
drop : bool, default True
If set to True, source columns are dropped after being transformed.

Example

>>> import pandas as pd; import pdpipe as pdp; import re;
>>> data = [[4, "more than 12"], [5, "with 5 more"]]
>>> df = pd.DataFrame(data, [1,2], ["age","text"])
>>> clean_num = pdp.RegexReplace('text', r'\b[0-9]+\b', "NUM")
>>> clean_num(df)
   age           text
1    4  more than NUM
2    5  with NUM more

>>> data = [["Mr. John", 18], ["MR. Bob", 25]]
>>> df = pd.DataFrame(data, [1,2], ["name","age"])
>>> match_men = r'^mr.*'
>>> censor_men = pdp.RegexReplace(
...    'name', match_men, "x", flags=re.IGNORECASE
... )
>>> censor_men(df)
  name  age
1    x   18
2    x   25
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def RemoveStopwords(self, language, columns, drop=True, **kwargs)

Creates and adds a pipeline stage that removes stopwords from a tokenized list to this pipeline stage.

Target columns must be series of token lists; i.e. every cell in the series is an iterable of string tokens.

Note: The nltk package must be installed for this pipeline stage to work.

Parameters

language : str or array-like
If a string is given, interpreted as the language of the stopwords, and should then be one of the languages supported by the NLTK Stopwords Corpus. If a list is given, it is assumed to be the list of stopwords to remove.
columns : single label, list-like or callable
Column labels in the DataFrame to be transformed. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq.
drop : bool, default True
If set to True, the source columns are dropped after stopword removal, and the resulting columns retain the names of the source columns. Otherwise, resulting columns gain the suffix '_nostop'.

Example

>> import pandas as pd; import pdpipe as pdp;
>> data = [[3.2, ['kick', 'the', 'baby']]]
>> df = pd.DataFrame(data, [1], ['freq', 'content'])
>> remove_stopwords = pdp.RemoveStopwords('english', 'content')
>> remove_stopwords(df)
   freq       content
1   3.2  [kick, baby]
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def RowDrop(self, conditions, reduce=None, columns=None, **kwargs)

Creates and adds a pipeline stage that drops rows by callable conditions to this pipeline stage.

Parameters

conditions : list-like or dict
The list of conditions that make a row eligible to be dropped. Each condition must be a callable that take a cell value and return a bool value. If a list of callables is given, the conditions are checked for each column value of each row. If a dict mapping column labels to callables is given, then each condition is only checked for the column values of the designated column.
reduce : 'any', 'all' or 'xor', default 'any'
Determines how row conditions are reduced. If set to 'all', a row must satisfy all given conditions to be dropped. If set to 'any', rows satisfying at least one of the conditions are dropped. If set to 'xor', rows satisfying exactly one of the conditions will be dropped. Set to 'any' by default.
columns : single label, iterable or callable, optional
The label, or an iterable of labels, of columns. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq. If given, input conditions will be applied to the sub-dataframe made up of these columns to determine which rows to drop. Ignored if conditions is provided with a dict object. If conditions is a list and this parameter is not provided, all columns are checked (unless exclude_columns is additionally provided)
exclude_columns : single label, iterable or callable, optional
The label, or an iterable of labels, of columns to exclude, given the columns parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. See pdpipe.cq. Optional. By default no columns are excluded.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1,4],[4,5],[5,11]], [1,2,3], ['a','b'])
>>> pdp.RowDrop([lambda x: x < 2]).apply(df)
   a   b
2  4   5
3  5  11
>>> pdp.RowDrop({'a': lambda x: x == 4}).apply(df)
   a   b
1  1   4
3  5  11
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def Scale(self, scaler, columns=None, exclude_columns=None, joint=False, **kwargs)

Creates and adds a pipeline stage that scales data to this pipeline stage.

Parameters

scaler : str
The type of scaler to use to scale the data. One of 'StandardScaler', 'MinMaxScaler', 'MaxAbsScaler', 'RobustScaler', 'QuantileTransformer' and 'Normalizer'. Refer to scikit-learn's documentation for usage.
columns : single label, list-like or callable, default None
Column labels in the DataFrame to be scaled. If columns is None then all columns of numeric dtype will be scaled, except those given in the exclude_columns parameter. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq.
exclude_columns : single label, list-like or callable, default None
Label or labels of columns to be excluded from encoding. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq.
joint : bool, default False
If set to True, all scaled columns will be scaled as a single value set (meaning, only the single largest value among all input columns will be scaled to 1, and not the largest one for each column).
**kwargs : extra keyword arguments
All valid extra keyword arguments are forwarded to the scaler constructor on scaler creation (e.g. 'n_quantiles' for QuantileTransformer). PdPipelineStage valid keyword arguments are used to override Scale class defaults.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> data = [[3.2, 0.3], [7.2, 0.35], [12.1, 0.29]]
>>> df = pd.DataFrame(data, [1,2,3], ["ph","gt"])
>>> scale_stage = pdp.Scale("StandardScaler")
>>> scale_stage(df)
         ph        gt
1 -1.181449 -0.508001
2 -0.082427  1.397001
3  1.263876 -0.889001
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def Schematize(self, columns, **kwargs)

Enforces a column schema on input dataframes to this pipeline stage.

Parameters

columns : sequence of labels
The dataframe schema to enforce on input dataframes.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[2, 4, 8],[3, 6, 9]], [1, 2], ['a', 'b', 'c'])
>>> pdp.Schematize(['a', 'c']).apply(df)
   a  c
1  2  8
2  3  9
>>> pdp.Schematize(['c', 'b']).apply(df)
   c  b
1  8  4
2  9  6
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def SetIndex(self, keys, **kwargs)

Creates and adds a pipeline stage that set existing columns as index to this pipeline stage.

Supports all parameter supported by pandas.set_index function except for inplace.

Example

>> import pandas as pd; import pdpipe as pdp;
>> df = pd.DataFrame([[1,4],[3, 11]], [1,2], ['a','b'])
>> pdp.SetIndex('a').apply(df)
    b
a
1   4
3  11
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def SnowballStem(self, stemmer_name, columns, drop=True, min_len=None, max_len=None, **kwargs)

Creates and adds a pipeline stage that stems tokens in a list using the Snowball stemmer to this pipeline stage.

Target columns must be series of token lists; i.e. every cell in the series is an iterable of string tokens.

Note: The nltk package must be installed for this pipeline stage to work.

Parameters

stemmer_name : str
The name of the Snowball stemmer to use. Should be one of the Snowball stemmers implemented by nltk. E.g. 'EnglishStemmer'.
columns : single label, list-like or callable
Column labels in the DataFrame to be transformed. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq.
drop : bool, default True
If set to True, the source columns are dropped after stemming, and the resulting columns retain the names of the source columns. Otherwise, resulting columns gain the suffix '_stem'.
min_len : int, optional
If provided, tokens shorter than this length are not stemmed.
max_len : int, optional
If provided, tokens longer than this length are not stemmed.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> data = [[3.2, ['kicking', 'boats']]]
>>> df = pd.DataFrame(data, [1], ['freq', 'content'])
>>> remove_stopwords = pdp.SnowballStem('EnglishStemmer', 'content')
>>> remove_stopwords(df)
   freq       content
1   3.2  [kick, boat]
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def TfidfVectorizeTokenLists(self, column, drop=True, hierarchical_labels=False, **kwargs)

Creates and adds a pipeline stage TFIDF-vectorizing a token-list column to count columns to this pipeline stage.

Every cell in the input columns is assumed to be a list of strings, each representing a single token. The resulting TF-IDF vector is exploded into individual columns, each with the label 'lbl_i' where lbl is the original column label and i is the index of column in the count vector.

The resulting columns are concatenated to the end of the dataframe.

All valid sklearn.feature_extraction.text.TfidfVectorizer keyword arguments can be provided as keyword arguments to the constructor, except 'input' and 'analyzer', which will be ignored. As usual, all valid PdPipelineStage constructor parameters can also be provided as keyword arguments.

Parameters

column : str
The label of the token-list column to TfIdf-vectorize.
drop : bool, default True
If set to True, the source column is dropped after being transformed.
hierarchical_labels : bool, default False
If set to True, the labels of resulting columns are of the form 'P_F' where P is the label of the original token-list column and F is the feature name (i.e. the string token it corresponds to). Otherwise, it is simply the feature name itself. If you plan to have two different TfidfVectorizeTokenLists pipeline stages vectorizing two different token-list columns, you should set this to true, so tf-idf features originating in different text columns do not overwrite one another.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> data = [[2, ['hovercraft', 'eels']], [5, ['eels', 'urethra']]]
>>> df = pd.DataFrame(data, [1, 2], ['Age', 'tokens'])
>>> tfvectorizer = pdp.TfidfVectorizeTokenLists('tokens')
>>> tfvectorizer(df)
   Age      eels  hovercraft   urethra
1    2  0.579739    0.814802  0.000000
2    5  0.579739    0.000000  0.814802
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def TokenizeText(self, columns, drop=True, **kwargs)

Creates and adds a pipeline stage that tokenizes a text column into token lists to this pipeline stage.

Note: The nltk package must be installed for this pipeline stage to work.

Parameters

columns : single label, list-like of callable
Column labels in the DataFrame to be transformed. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq.
drop : bool, default True
If set to True, the source columns are dropped after being tokenized, and the resulting tokenized columns retain the names of the source columns. Otherwise, tokenized columns gain the suffix '_tok'.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame(
...     [[3.2, "Kick the baby!"]], [1], ['freq', 'content'])
>>> tokenize_stage = pdp.TokenizeText('content')
>>> tokenize_stage(df)
   freq               content
1   3.2  [Kick, the, baby, !]
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def UntokenizeText(self, columns, drop=True, **kwargs)

Creates and adds a pipeline stage that joins token lists to whitespace-separated strings to this pipeline stage.

Target columns must be series of token lists; i.e. every cell in the series is an iterable of string tokens.

Note: The nltk package must be installed for this pipeline stage to work.

Parameters

columns : single label, list-like of callable
Column labels in the DataFrame to be transformed. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq.
drop : bool, default True
If set to True, the source columns are dropped after being untokenized, and the resulting columns retain the names of the source columns. Otherwise, untokenized columns gain the suffix '_untok'.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> data = [[3.2, ['Shake', 'and', 'bake!']]]
>>> df = pd.DataFrame(data, [1], ['freq', 'content'])
>>> untokenize_stage = pdp.UntokenizeText('content')
>>> untokenize_stage(df)
   freq          content
1   3.2  Shake and bake!
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def ValDrop(self, values: List[object], columns: Union[object, List[object], Callable] = None, **kwargs: object)

Creates and adds a pipeline stage that drops rows by value to this pipeline stage.

Parameters

values : list-like
A list of the values to drop.
columns : single label, list-like or callable, default None
The label, or an iterable of labels, of columns to check for the given values. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq. If set to None, all columns are checked.
exclude_columns : label, iterable or callable, optional
The label, or an iterable of labels, of columns to exclude, given the columns parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. See pdpipe.cq. Optional. By default no columns are excluded.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1,4],[4,5],[18,11]], [1,2,3], ['a','b'])
>>> pdp.ValDrop([4], 'a').apply(df)
    a   b
1   1   4
3  18  11
>>> pdp.ValDrop([4]).apply(df)
    a   b
3  18  11
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def ValKeep(self, values, columns=None, **kwargs)

Creates and adds a pipeline stage that keeps rows by value to this pipeline stage.

Parameters

values : list-like
A list of the values to keep.
columns : single label, list-like or callable, default None
The label, or an iterable of labels, of columns to check for the given values. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq. If set to None, all columns are checked.
exclude_columns : single label, iterable or callable, optional
The label, or an iterable of labels, of columns to exclude, given the columns parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. See pdpipe.cq. Optional. By default no columns are excluded.

Example

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1,4],[4,5],[5,11]], [1,2,3], ['a','b'])
>>> pdp.ValKeep([4, 5], 'a').apply(df)
   a   b
2  4   5
3  5  11
>>> pdp.ValKeep([4, 5]).apply(df)
   a  b
2  4  5
Expand source code
def _append_stage_func(self, *args, **kwds):
    # self is always a PdPipelineStage
    return self + class_obj(*args, **kwds)
def apply(self, df, exraise=None, verbose=False)

Applies this pipeline stage to the given dataframe.

If the stage is not fitted fit_transform is called. Otherwise, transform is called.

Parameters

df : pandas.DataFrame
The dataframe to which this pipeline stage will be applied.
exraise : bool, default None
Override preconditions and postconditions behaviour for this call. If None, the default behaviour of this stage is used, as determined by the exraise constructor parameter.
verbose : bool, default False
If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False.

Returns

pandas.DataFrame
The resulting dataframe.
Expand source code
def apply(self, df, exraise=None, verbose=False):
    """Applies this pipeline stage to the given dataframe.

    If the stage is not fitted fit_transform is called. Otherwise,
    transform is called.

    Parameters
    ----------
    df : pandas.DataFrame
        The dataframe to which this pipeline stage will be applied.
    exraise : bool, default None
        Override preconditions and postconditions behaviour for this call.
        If None, the default behaviour of this stage is used, as determined
        by the exraise constructor parameter.
    verbose : bool, default False
        If True an explanation message is printed after the precondition
        is checked but before the application of the pipeline stage.
        Defaults to False.

    Returns
    -------
    pandas.DataFrame
        The resulting dataframe.
    """
    if exraise is None:
        exraise = self._exraise
    if self._skip and self._skip(df):
        return df
    if self._compound_prec(df=df):
        if verbose:
            msg = '- ' + '\n  '.join(textwrap.wrap(self._appmsg))
            print(msg, flush=True)
        if self.is_fitted:
            res_df = self._transform(df, verbose=verbose)
        else:
            res_df = self._fit_transform(df, verbose=verbose)
        if exraise and not self._compound_post(df=res_df):
            self._raise_postcondition_error()
        return res_df
    if exraise:
        self._raise_precondition_error()
    return df
def description(self)

Returns the description of this pipeline stage

Expand source code
def description(self):
    """Returns the description of this pipeline stage"""
    return self._desc
def fit(self, X, y=None, exraise=None, verbose=False)

Fits this stage without transforming the given dataframe.

Parameters

X : pandas.DataFrame
The dataframe to be transformed.
y : array-like, optional
Targets for supervised learning.
exraise : bool, default None
Override preconditions and postconditions behaviour for this call. If None, the default behaviour of this stage is used, as determined by the exraise constructor parameter.
verbose : bool, default False
If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False.

Returns

pandas.DataFrame
The resulting dataframe.
Expand source code
def fit(self, X, y=None, exraise=None, verbose=False):
    """Fits this stage without transforming the given dataframe.

    Parameters
    ----------
    X : pandas.DataFrame
        The dataframe to be transformed.
    y : array-like, optional
        Targets for supervised learning.
    exraise : bool, default None
        Override preconditions and postconditions behaviour for this call.
        If None, the default behaviour of this stage is used, as determined
        by the exraise constructor parameter.
    verbose : bool, default False
        If True an explanation message is printed after the precondition
        is checked but before the application of the pipeline stage.
        Defaults to False.

    Returns
    -------
    pandas.DataFrame
        The resulting dataframe.
    """
    if exraise is None:
        exraise = self._exraise
    if self._compound_prec(X):
        if verbose:
            msg = '- ' + '\n  '.join(textwrap.wrap(self._appmsg))
            print(msg, flush=True)
        res_df = self._fit_transform(X, verbose=verbose)
        if exraise and not self._compound_post(df=res_df):
            self._raise_postcondition_error()
        return X
    if exraise:
        self._raise_precondition_error()
    return X
def fit_transform(self, X, y=None, exraise=None, verbose=False)

Fits this stage and transforms the given dataframe.

Parameters

X : pandas.DataFrame
The dataframe to transform and fit this pipeline stage by.
y : array-like, optional
Targets for supervised learning.
exraise : bool, default None
Override preconditions and postconditions behaviour for this call. If None, the default behaviour of this stage is used, as determined by the exraise constructor parameter.
verbose : bool, default False
If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False.

Returns

pandas.DataFrame
The resulting dataframe.
Expand source code
def fit_transform(self, X, y=None, exraise=None, verbose=False):
    """Fits this stage and transforms the given dataframe.

    Parameters
    ----------
    X : pandas.DataFrame
        The dataframe to transform and fit this pipeline stage by.
    y : array-like, optional
        Targets for supervised learning.
    exraise : bool, default None
        Override preconditions and postconditions behaviour for this call.
        If None, the default behaviour of this stage is used, as determined
        by the exraise constructor parameter.
    verbose : bool, default False
        If True an explanation message is printed after the precondition
        is checked but before the application of the pipeline stage.
        Defaults to False.

    Returns
    -------
    pandas.DataFrame
        The resulting dataframe.
    """
    if exraise is None:
        exraise = self._exraise
    if self._compound_prec(X):
        if verbose:
            msg = '- ' + '\n  '.join(textwrap.wrap(self._appmsg))
            print(msg, flush=True)
        res_df = self._fit_transform(X, verbose=verbose)
        if exraise and not self._compound_post(df=res_df):
            self._raise_postcondition_error()
        return res_df
    if exraise:
        self._raise_precondition_error()
    return X
def transform(self, X, y=None, exraise=None, verbose=False)

Transforms the given dataframe without fitting this stage.

If this stage is fittable but is not fitter, an UnfittedPipelineStageError is raised.

Parameters

X : pandas.DataFrame
The dataframe to be transformed.
y : array-like, optional
Targets for supervised learning.
exraise : bool, default None
Override preconditions and postconditions behaviour for this call. If None, the default behaviour of this stage is used, as determined by the exraise constructor parameter.
verbose : bool, default False
If True an explanation message is printed after the precondition is checked but before the application of the pipeline stage. Defaults to False.

Returns

pandas.DataFrame
The resulting dataframe.
Expand source code
def transform(self, X, y=None, exraise=None, verbose=False):
    """Transforms the given dataframe without fitting this stage.

    If this stage is fittable but is not fitter, an
    UnfittedPipelineStageError is raised.

    Parameters
    ----------
    X : pandas.DataFrame
        The dataframe to be transformed.
    y : array-like, optional
        Targets for supervised learning.
    exraise : bool, default None
        Override preconditions and postconditions behaviour for this call.
        If None, the default behaviour of this stage is used, as determined
        by the exraise constructor parameter.
    verbose : bool, default False
        If True an explanation message is printed after the precondition
        is checked but before the application of the pipeline stage.
        Defaults to False.

    Returns
    -------
    pandas.DataFrame
        The resulting dataframe.
    """
    if exraise is None:
        exraise = self._exraise
    if self._compound_prec(X):
        if verbose:
            msg = '- ' + '\n  '.join(textwrap.wrap(self._appmsg))
            print(msg, flush=True)
        if self._is_fittable():
            if self.is_fitted:
                res_df = self._transform(X, verbose=verbose)
                if exraise and not self._compound_post(df=res_df):
                    self._raise_postcondition_error()
                return res_df
            raise UnfittedPipelineStageError(
                "transform of an unfitted pipeline stage was called!")
        res_df = self._transform(X, verbose=verbose)
        if exraise and not self._compound_post(df=res_df):
            self._raise_postcondition_error()
        return res_df
    if exraise:
        self._raise_precondition_error()
    return X
class PdpApplicationContext (fit_context=None)

An object encapsulating the application context of a pipeline.

It is meant to communicate data, information and variables between different stages of a pipeline.

Parameters

fit_context : PdpApplicationContext, optional
Another application context object, representing the application context of a previous fit of the pipelline this application context is initialized for. Optional.
Expand source code
class PdpApplicationContext(dict):
    """An object encapsulating the application context of a pipeline.

    It is meant to communicate data, information and variables between
    different stages of a pipeline.

    Parameters
    ----------
    fit_context : PdpApplicationContext, optional
        Another application context object, representing the application
        context of a previous fit of the pipelline this application context
        is initialized for. Optional.
    """

    def __init__(self, fit_context=None):
        self.__locked__ = False
        self._fit_context__ = fit_context

    def __setitem__(self, key, value):
        if not self.__locked__:
            super().__setitem__(key, value)

    def __delitem__(self, key):
        if not self.__locked__:
            super().__delitem__(key)

    def pop(self, key, default):
        """If key is in the dictionary, remove it and return its value, else
        return default. If default is not given and key is not in the
        dictionary, a KeyError is raised.
        """
        if not self.__locked__:
            return super().pop(key, default)
        return super().__getitem__(key)

    def clear(self):
        """Remove all items from the dictionary."""
        if not self.__locked__:
            super().clear()

    def popitem(self):
        """Not implemented!"""
        raise NotImplementedError

    def update(self, other):
        """Update the dictionary with the key/value pairs from other,
        overwriting existing keys. Return None.
        update() accepts either another dictionary object or an iterable of
        key/value pairs (as tuples or other iterables of length two). If
        keyword arguments are specified, the dictionary is then updated with
        those key/value pairs: d.update(red=1, blue=2).
        """
        if not self.__locked__:
            super().update(other)

    def lock(self):
        """Locks this application context for changes."""
        self.__locked__ = True

    def fit_context(self):
        """Returns a locked PdpApplicationContext object of a previous fit."""
        return self._fit_context__

Ancestors

  • builtins.dict

Methods

def clear(self)

Remove all items from the dictionary.

Expand source code
def clear(self):
    """Remove all items from the dictionary."""
    if not self.__locked__:
        super().clear()
def fit_context(self)

Returns a locked PdpApplicationContext object of a previous fit.

Expand source code
def fit_context(self):
    """Returns a locked PdpApplicationContext object of a previous fit."""
    return self._fit_context__
def lock(self)

Locks this application context for changes.

Expand source code
def lock(self):
    """Locks this application context for changes."""
    self.__locked__ = True
def pop(self, key, default)

If key is in the dictionary, remove it and return its value, else return default. If default is not given and key is not in the dictionary, a KeyError is raised.

Expand source code
def pop(self, key, default):
    """If key is in the dictionary, remove it and return its value, else
    return default. If default is not given and key is not in the
    dictionary, a KeyError is raised.
    """
    if not self.__locked__:
        return super().pop(key, default)
    return super().__getitem__(key)
def popitem(self)

Not implemented!

Expand source code
def popitem(self):
    """Not implemented!"""
    raise NotImplementedError
def update(self, other)

Update the dictionary with the key/value pairs from other, overwriting existing keys. Return None. update() accepts either another dictionary object or an iterable of key/value pairs (as tuples or other iterables of length two). If keyword arguments are specified, the dictionary is then updated with those key/value pairs: d.update(red=1, blue=2).

Expand source code
def update(self, other):
    """Update the dictionary with the key/value pairs from other,
    overwriting existing keys. Return None.
    update() accepts either another dictionary object or an iterable of
    key/value pairs (as tuples or other iterables of length two). If
    keyword arguments are specified, the dictionary is then updated with
    those key/value pairs: d.update(red=1, blue=2).
    """
    if not self.__locked__:
        super().update(other)