Source code for pywrangler.wranglers

"""This module contains computation engine independent wrangler interfaces
and corresponding descriptions.

"""
from typing import Any

from pywrangler.base import BaseWrangler
from pywrangler.util import sanitizer
from pywrangler.util.types import TYPE_ASCENDING, TYPE_COLUMNS

NONEVALUE = object()


[docs]class IntervalIdentifier(BaseWrangler): """Defines the reference interface for the interval identification wrangler. An interval is defined as a range of values beginning with an opening marker and ending with a closing marker (e.g. the interval daylight may be defined as all events/values occurring between sunrise and sunset). Start and end marker may be identical. The interval identification wrangler assigns ids to values such that values belonging to the same interval share the same interval id. For example, all values of the first daylight interval are assigned with id 1. All values of the second daylight interval will be assigned with id 2 and so on. By default, values which do not belong to any valid interval, are assigned the value 0 by definition (please refer to `result_type` for different result types). If start and end marker are identical or the end marker is not provided, invalid values are only possible before the first start marker is encountered. Due to messy data, start and end marker may occur multiple times in sequence until its counterpart is reached. Therefore, intervals may have different spans based on different task requirements. For example, the very first start or very last start marker may define the correct start of an interval. Accordingly, four intervals can be selected by setting `marker_start_use_first` and `marker_end_use_first`. The resulting intervals are as follows: - first start / first end - first start / last end (longest interval) - last start / first end (shortest interval) - last start / last end Opening and closing markers are included in their corresponding interval. Parameters ---------- marker_column: str Name of column which contains the opening and closing markers. marker_start: Any A value defining the start of an interval. marker_end: Any, optional A value defining the end of an interval. This value is optional. If not given, the end marker equals the start marker. marker_start_use_first: bool Identifies if the first occurring `marker_start` of an interval is used. Otherwise the last occurring `marker_start` is used. Default is False. marker_end_use_first: bool Identifies if the first occurring `marker_end` of an interval is used. Otherwise the last occurring `marker_end` is used. Default is True. orderby_columns: str, Iterable[str], optional Column names which define the order of the data (e.g. a timestamp column). Sort order can be defined with the parameter `ascending`. groupby_columns: str, Iterable[str], optional Column names which define how the data should be grouped/split into separate entities. For distributed computation engines, groupby columns should ideally reference partition keys to avoid data shuffling. ascending: bool, Iterable[bool], optional Sort ascending vs. descending. Specify list for multiple sort orders. If a list is specified, length of the list must equal length of `order_columns`. Default is True. result_type: str, optional Defines the content of the returned result. If 'raw', interval ids will be in arbitrary order with no distinction made between valid and invalid intervals. Intervals are distinguishable by interval id but the interval id may not provide any more information. If 'valid', the result is the same as 'raw' but all invalid intervals are set to 0. If 'enumerated', the result is the same as 'valid' but interval ids increase in ascending order (as defined by order) in steps of one. target_column_name: str, optional Name of the resulting target column. """ def __init__(self, marker_column: str, marker_start: Any, marker_end: Any = NONEVALUE, marker_start_use_first: bool = False, marker_end_use_first: bool = True, orderby_columns: TYPE_COLUMNS = None, groupby_columns: TYPE_COLUMNS = None, ascending: TYPE_ASCENDING = None, result_type: str = "enumerated", target_column_name: str = "iids"): self.marker_column = marker_column self.marker_start = marker_start self.marker_end = marker_end self.marker_start_use_first = marker_start_use_first self.marker_end_use_first = marker_end_use_first self.orderby_columns = sanitizer.ensure_iterable(orderby_columns) self.groupby_columns = sanitizer.ensure_iterable(groupby_columns) self.ascending = sanitizer.ensure_iterable(ascending) self.result_type = result_type self.target_column_name = target_column_name # check correct result type valid_result_types = {"raw", "valid", "enumerated"} if result_type not in valid_result_types: raise ValueError("Parameter `result_type` is invalid with: {}. " "Allowed arguments are: {}" .format(result_type, valid_result_types)) # check for identical start and end values self._identical_start_end_markers = ((marker_end == NONEVALUE) or (marker_start == marker_end)) # sanity checks for sort order if self.ascending: # check for equal number of items of order and sort columns if len(self.orderby_columns) != len(self.ascending): raise ValueError('`order_columns` and `ascending` must have ' 'equal number of items.') # check for correct sorting keywords if not all([isinstance(x, bool) for x in self.ascending]): raise ValueError('Only `True` and `False` are ' 'allowed arguments for `ascending`') # set default sort order if None is given elif self.orderby_columns: self.ascending = [True] * len(self.orderby_columns) @property def preserves_sample_size(self) -> bool: return True