"""Module that implements a few useful count related functions
Takes inspiration on the usual counta and countif functions in Excel
* Add a column counting occurrence of values in a given column. Useful for sorting/grouping by frequency
* Add a cumulative count of unique keys in a given column
* Add a column counting the number of null values in a row
* Add a column counting the number of non-null values in a row
* Add a column checking if the values in a row are different
* Add a column counting related keys in another dataframe (e.g., countif)
"""
import logging
import pandas as pd
import numpy as np
logger = logging.getLogger(__name__)
[docs]
def count_values_in_col(
df_input,
col,
column_name=None,
combined=True,
percentage=False,
detailed=False,
):
"""Generates a column counting occurrence of values in a given column.
If several columns provided, it will generate a column for each of them, but
if combined is True, it will generate a column counting unique
combinations of values in the columns.
Parameters
----------
df_input : pd.DataFrame
Dataframe to be analyzed
col : str or list of str
Name of the column containing values to tally
column_name : str or list of str, optional, default None
Name of the columns to be created containing the count of values
If None, the column name will be "count_[col]".
combined : bool, optional, default True
Whether or not compute the counts combining all the columns provided
percentage: bool, optional, default False
Whether to return percentage over total count
detailed: bool, optional, default False
Whether to return only the combined count and drop the extra details
Returns
-------
pd.DataFrame
New dataframe with a new column containing the count of values
"""
if not isinstance(df_input, pd.DataFrame):
raise TypeError("Expecting a dataframe")
if not column_name or column_name == "" or column_name != column_name:
column_name = ""
flag_auto_name = True
else:
flag_auto_name = False
if isinstance(col, (str, list)):
if isinstance(col, list):
cols_list = col
else:
cols_list = [col]
else:
raise TypeError("Expecting a string or list/tuple of strings")
for c in cols_list:
if c not in df_input.columns:
raise ValueError("Column %s not in DataFrame" % c)
if isinstance(column_name, str) and not flag_auto_name:
column_name = [column_name]
if isinstance(column_name, list) and len(column_name) != len(cols_list):
raise ValueError("column_name must be same length as col")
df = df_input.copy(deep=True)
if combined:
if len(cols_list) > 1:
s1 = df[cols_list].fillna("nan").astype("str").agg("_".join, axis=1)
else:
s1 = df[cols_list[0]].astype("str")
df["combined_count_source"] = s1
count_summary = s1.value_counts(dropna=False)
count_list = [count_summary[val] for index, val in enumerate(s1)]
if flag_auto_name:
cn = "count_combined"
else:
cn = column_name[0]
df[cn] = count_list
if percentage:
count_records = float(df.shape[0])
df[cn] = df[cn] / count_records
if detailed:
for i, c in enumerate(cols_list):
# count_summary = df[c].value_counts(dropna=False)
# count_list = [count_summary[val] for index, val in enumerate(df[c])]
if flag_auto_name:
cn = "count_" + c
else:
cn = column_name[i]
# df[cn] = count_list
df[cn] = df[c].map(df[c].value_counts(dropna=False))
if percentage:
count_records = float(df.shape[0])
df[cn] = df[cn] / count_records
return df
[docs]
def count_cumulative_unique(
df,
column_name,
dest_column_name,
case_sensitive=True,
):
"""Generates a running total of cumulative unique values in a given column.
Parameters
----------
df : pd.DataFrame
Dataframe to be analyzed
column_name : Hashable
Name of the column containing values from which a running count
of unique values will be created.
dest_column_name : str
Name of the column to be created containing the cumulative count
of unique values.
case_sensitive : bool, optional, default True
Whether or not uppercase and lowercase letters
will be considered equal (e.g., 'A' != 'a' if `True`).
Returns
-------
pd.DataFrame
Dataframe with a new column containing the cumulative count of
unique values in the given column.
"""
if not isinstance(df, pd.DataFrame):
raise TypeError("Expecting a dataframe")
df = df.copy()
if not case_sensitive:
# Make it so that the the same uppercase and lowercase
# letter are treated as one unique value
df[column_name] = df[column_name].astype(str).map(str.lower)
df[dest_column_name] = (
(df[[column_name]].drop_duplicates().assign(dummyabcxyz=1).dummyabcxyz.cumsum())
.reindex(df.index)
.ffill()
.astype(int)
)
return df
[docs]
def count_isna(df, cols):
"""Returns the number of null values in the columns specified in cols
Parameters
----------
df : pd.DataFrame
Dataframe to be analyzed
cols : list
List of columns to be analyzed
Returns
-------
pd.Series
Series with the number of null values in the columns specified in cols
"""
if not isinstance(df, pd.DataFrame):
raise TypeError("Expecting a dataframe")
if not isinstance(cols, list):
raise TypeError("Expecting a list")
if not all([c in df.columns for c in cols]):
raise ValueError("Column not in DataFrame")
res = df[cols].apply(lambda r: sum([True for v in r.values if pd.isna(v)]), axis=1)
return res
[docs]
def count_notna(df, cols):
"""Returns the number of non-null values in the columns specified in cols
Parameters
----------
df : pd.DataFrame
Dataframe to be analyzed
cols : list
List of columns to be analyzed
Returns
-------
pd.Series
Series with the number of non-null values in the columns specified in cols
"""
if not isinstance(df, pd.DataFrame):
raise TypeError("Expecting a dataframe")
if not isinstance(cols, list):
raise TypeError("Expecting a list")
if not all([c in df.columns for c in cols]):
raise ValueError("Column not in DataFrame")
res = df[cols].apply(lambda r: sum([True for v in r.values if pd.notna(v)]), axis=1)
return res
[docs]
def has_different_values(df, cols):
"""Returns True if the values in the columns specified in cols are different
Parameters
----------
df : pd.DataFrame
Dataframe to be analyzed
cols : list
List of columns to be analyzed
Returns
-------
pd.Series
Series with True if the values in the columns specified in cols are different
"""
if not isinstance(df, pd.DataFrame):
raise TypeError("Expecting a dataframe")
if not isinstance(cols, list):
raise TypeError("Expecting a list")
if not all([c in df.columns for c in cols]):
raise ValueError("Column not in DataFrame")
vals = df[cols].apply(lambda r: [v for v in r.values if pd.notna(v)], axis=1)
def array_eq(a):
if a:
array1 = np.array(a)
return (array1 == array1[0]).all()
return False
res = [array_eq(v) for v in vals]
return res