Source code for pydit.wrangling.sequence
"""Module to check for numerical sequence of DataFrame column or Series"""
# pylint disable=import-error, bare-except, unu
import logging
from datetime import date
import itertools
import pandas as pd
from pandas.api.types import is_datetime64_any_dtype as is_datetime
from pandas import Series
logger = logging.getLogger(__name__)
[docs]
def check_sequence(obj_in, col=None):
"""Checks the numerical sequence of a series including dates
If a text column is provided it will attempt to convert to numeric after
extacting any non numeric chars.
Parameters
----------
obj_in : list, pandas.Series or pandas.DataFrame
The list, series or dataframe to check
col : str
The column name to check, if a DataFrame is provided.
Returns
-------
list
A list of the missing values in the series
"""
logger.debug("Checking sequence for %s of type %s ", obj_in, type(obj_in))
obj = None
if isinstance(obj_in, Series):
obj = obj_in.copy()
elif isinstance(obj_in, list):
obj = pd.Series(obj_in)
elif isinstance(obj_in, pd.DataFrame):
if col is None:
raise ValueError("Please provide a column name to check")
if col not in obj_in.columns:
raise ValueError(f"Column {col} not found in DataFrame")
obj = obj_in[col].copy()
if obj is None:
raise ValueError("No parsable data provided to check")
if "int" in str(obj.dtype):
logger.debug("Data is of type integers")
unique = set([i for i in obj[pd.notna(obj)]])
fullrng = set(range(min(unique), max(unique) + 1))
diff = fullrng.difference(unique)
if diff:
logger.info("Missing values: %s", len(diff))
logger.info("First 10 missing values: %s", list(diff)[:10])
return list(diff)
else:
logger.info("Sequence provided is complete")
return []
if "object" in str(obj.dtype):
logger.debug("Data is of type object, checking if it is datetime")
try:
max_value = obj[obj.notnull()].max()
if isinstance(max_value, date):
obj = pd.to_datetime(obj, errors="coerce")
logger.debug("Converted to datetime")
else:
pass
except Exception as e:
raise ValueError(
"Multiple data types detected, please cleanup the column first"
) from e
if is_datetime(obj):
unique = set([i.date() for i in obj[pd.notna(obj)]])
fullrng = pd.date_range(min(unique), max(unique), freq="d")
fullrng = set([i.date() for i in fullrng])
diff = fullrng.difference(unique)
if diff:
logger.info("Missing values: %s", len(diff))
logger.info("First 10 missing values: %s", list(diff)[:10])
working_days = [wd for wd in diff if wd.weekday() < 5]
logger.info("Working days missing: %s", len(working_days))
logger.info("First 10 working days missing: %s", working_days[:10])
return list(diff)
else:
logger.info("Sequence of dates is complete")
return []
if "float" in str(obj.dtype):
logger.debug("Data is of type floats")
unique = set([int(i) for i in obj[pd.notna(obj)]])
fullrng = set(range(min(unique), max(unique)))
diff = fullrng.difference(unique)
if diff:
logger.info("Missing values: %s", len(diff))
logger.info("First 10 missing values: %s", list(diff)[:10])
return list(diff)
else:
logger.info("Sequence provided is complete")
return []
if (
"object" in str(obj.dtype)
or "str" in str(obj.dtype)
or "string" in str(obj.dtype)
):
logger.debug("Strings object as if they were dates we already processed")
numeric_chars = obj.fillna("").str.replace(r"[^0-9]", "", regex=True)
numeric_chars_no_blank = numeric_chars[numeric_chars != ""]
numeric = pd.to_numeric(
numeric_chars_no_blank, errors="coerce", downcast="integer"
)
unique = set(numeric)
if unique:
fullrng = set(range(min(unique), max(unique)))
diff = fullrng.difference(unique)
if diff:
logger.info("Missing values: %s", len(diff))
logger.info("Fist 10:%s", list(diff)[:10])
return list(diff)
else:
logger.info("Sequence is complete")
return []
else:
raise ValueError("No numeric values found")
return
[docs]
def group_gaps(gap_list):
"""Groups a list of gaps into a list of lists of consecutive gaps
Parameters
----------
gap_list : list
A list of gaps (integers)
Returns
-------
list
A list of lists of consecutive gaps
"""
try:
def to_ranges(iterable):
iterable = sorted(set(iterable))
for key, group in itertools.groupby(
enumerate(iterable), lambda t: t[1] - t[0]
):
group = list(group)
yield [group[0][1], group[-1][1], group[-1][1] - group[0][1] + 1]
df_grouped = pd.DataFrame.from_records(
list(to_ranges(gap_list)), columns=["start", "end", "count"]
)
except TypeError as exc:
raise TypeError("Grouping only works for integers for now") from exc
return df_grouped