Source code for pydit.wrangling.duplicates

"""Module for checking for duplicates in a dataframe.

It wraps the pandas.DataFrame.duplicated() to perform a more "end to end" check
with logging and informational messages. This can be useful in an audit scenario
as we tend to have to do a lot of duplicate checks in intermediate files.

"""

import logging

import pandas as pd
import numpy as np
from pandas.api.types import is_string_dtype
from pandas.api.types import is_numeric_dtype

logger = logging.getLogger(__name__)

# pylint: disable=logging-not-lazy
# pylint: disable=logging-fstring-interpolation


[docs] def check_duplicates( obj, columns=None, keep=False, ascending=None, add_indicator_column=False, also_return_non_duplicates=False, dropna=True, silent=False, ): """Check for duplicates in a dataframe. Parameters ---------- obj: DataFrame or Series The dataframe or series to check for duplicates columns: str or list, optional Column or list of column(s) to check even if it is one column only. If multiple columns provided the check is combined duplicates. keep: 'first','last' or False, optional Argument for pandas df.duplicated() method. Defaults to 'first'. ascending: True, False, boolean list with same len() as columns, or None, optional Sorting criteria to provide to DataFrame.sort_values() which runs just before the duplicates check. Defaults to None. indicator: bool, optional If True, a boolean column is added to the dataframe to flag duplicate rows. Defaults to False also_return_non_duplicates: bool, optional If True, the return values will include non-duplicate rows too. dropna: bool, optional If True, the check will ignore NaN values. Defaults to True. silent: bool Minimises outputs Defaults to False. Returns ------- pandas.DataFrame Returns the DataFrame with the duplicates or None if no duplicates found. If also_return_non_duplicates is True, the return values will include non-duplicate rows too. """ if silent: logger.setLevel(logging.CRITICAL) if not isinstance(obj, (pd.DataFrame, pd.Series)): raise TypeError("obj must be a pandas DataFrame or Series") if isinstance(obj, pd.Series): # If it is Series we convert it to DataFrame if obj.name is None and isinstance(columns, str): obj.name = columns else: obj.name = "data" columns = "data" df = obj.to_frame() else: df = obj.copy() if isinstance(columns, str): if columns in df.columns: cols = [columns] else: raise ValueError(f"column {columns} not in dataframe") else: if isinstance(columns, list): check_isin = [x in df.columns for x in columns] if all(check_isin): cols = columns else: raise ValueError("at least one column provided not in dataframe") else: cols = df.columns fields = ",".join(cols) has_all_nans = df[cols].isnull().apply(all, axis=1) has_any_nans = df[cols].isnull().apply(any, axis=1) all_nans_count = sum(has_all_nans) not_all_nans_count = sum(has_any_nans) - all_nans_count if dropna: if all_nans_count > 0: df = df.dropna(subset=cols, how="all") logger.info("Dropping %s records with all nan:", all_nans_count) if not_all_nans_count > 0: logger.info( "Of the remaining %s records, %s has nans", df.shape[0], not_all_nans_count, ) else: if all_nans_count > 0: logger.info("Dataframe includes %s records with all nan:", all_nans_count) if not_all_nans_count > 0: logger.info("and %s records with some nan:", not_all_nans_count) # Boolean series with the results of all duplicated() method ser_duplicates = df.duplicated(cols, keep=False) ser_duplicates_first = df.duplicated(cols, keep="first") if keep == "last": ser_duplicates_last = df.duplicated( cols, keep="last", ) ser_duplicates_unique = np.logical_and(ser_duplicates, ~ser_duplicates_first) logger.info("Duplicates in fields: %s", fields) if ser_duplicates.any(): logger.info("(using keep=%s)", keep) logger.info("Found %s unique duplicates instances", ser_duplicates_unique.sum()) logger.info("Totalling %s rows", ser_duplicates.sum()) logger.info("of a population of %s", len(df)) if not dropna: if all_nans_count > 0: logger.info("Remember, duplicates count include 1 for the nan rows") blanks_acum = 0 for c in cols: if is_numeric_dtype(df[c]): blanks = (pd.isna(df[c])).sum() if blanks > 0: logger.warning(f"{blanks} rows with nan in {c}") zeroes = (df[c] == 0).sum() if zeroes > 0: logger.warning(f"{zeroes} rows with zeroes in {c}") elif is_string_dtype(df[c]): blanks = ((pd.isna(df[c])) | (df[c].str.strip() == "")).sum() if blanks > 0: logger.warning(f"{blanks} rows with blanks or nan in {c}") else: blanks = (pd.isna(df[c])).sum() if blanks > 0: logger.warning(f"{blanks} rows with nans in {c}") blanks_acum += blanks if blanks_acum == 0: logger.info("No blanks found in the key column(s) provided") if ascending is not None: # Ascending df = df.sort_values(cols, ascending=ascending) logger.info("Sorting by %s with params: %s ", cols, ascending) if also_return_non_duplicates: # we return the non duplicates and follow the keep argument # for which duplicates to keep logger.info("Returning non-duplicates plus keep=%s", keep) if add_indicator_column: df["_duplicates"] = ser_duplicates if keep == "first": dfres = df.loc[(~ser_duplicates) | ~ser_duplicates_first].copy() elif keep == "last": dfres = df.loc[(~ser_duplicates) | ~ser_duplicates_last].copy() else: dfres = df.copy() else: # we just return the duplicates logger.info("Returning duplicates applying pandas keep=%s", keep) if add_indicator_column: df["_duplicates"] = ser_duplicates if keep == "first": dfres = df.loc[ser_duplicates_first].copy() elif keep == "last": dfres = df.loc[ser_duplicates_last].copy() else: dfres = df.loc[ser_duplicates].copy() return dfres else: logger.info("No duplicates found") if add_indicator_column: df["_duplicates"] = False if also_return_non_duplicates: return df # return an empty dataframe return None