Source code for pydit.wrangling.anonymise

"""Module for anonymising a key/identifier column

It applies a randomly generated translation table of integers to the column.

This is a very simple anonymisation method and should be used with caution.
It can be useful for relatively small datasets for one off anonymisation, for
example for sharing with third party auditors or for testing purposes.
It is not suitable for production use or for large datasets.


"""

import random
import logging

import pandas as pd

logger = logging.getLogger(__name__)


[docs] def anonymise_key( df_list, key_list, map_table_or_dict=None, hash_list=None, create_new_hash_list=False, hash_list_size=1000000, ): """Anonymise a column of one or many dataframes with a scrambled list of integers. Will persist across the list and it will return the translation table used. Parameters ---------- df_list : list of pandas.DataFrame List of dataframes to anonymise. key_list : list of str List of key columns to anonymise, must align to the previous list of dataframes. map_table_or_dict : dict or pandas.DataFrame, optional, default None Dictionary or dataframe to use for the translation table. hash_list : list of int, optional, default None List of integers to use for the translation table. create_new_hash_list : bool, optional, default False If True, a new list of integers will be created. hash_list_size : int, optional, default 1000000 Size of the hash list to create. Returns ------- tuple A tuple of the translation table and the hash list. """ def _anonymise(df_list, key_list, map_dict=None, hash_list=None): """Internal function to perform the actual anonymisation""" master_list = [] for i, df in enumerate(df_list): keys = df[key_list[i]] master_list.extend(keys) unique_keys = set(master_list) logger.debug("Unique keys count:%s", len(unique_keys)) if map_dict is None: logger.debug("Creating new mapping dictionary") map_dict_new = {} new_keys = list(unique_keys) dummy_keys = list(range(1, len(unique_keys) + 1)) else: map_dict_new = map_dict.copy() cur_keys = set(map_dict.keys()) new_keys = unique_keys - cur_keys dummy_keys = list( range(len(cur_keys) + 1, len(cur_keys) + len(new_keys) + 1) ) logger.info("New keys count:%s", len(new_keys)) logger.info("Dummy keys count:%s", len(dummy_keys)) random.shuffle(dummy_keys) add_list = list(zip(new_keys, dummy_keys)) add_dict = dict(add_list) map_dict_new.update(add_dict) for i, df in enumerate(df_list): if hash_list: df[key_list[i] + "_anon"] = df[key_list[i]].map( lambda x: hash_list[map_dict_new[x]] ) else: df[key_list[i] + "_anon"] = df[key_list[i]].map( lambda x: map_dict_new[x] ) return map_dict_new translation = {} if create_new_hash_list: hash_list = list(range(1, hash_list_size)) random.shuffle(hash_list) logger.info("Created new hash list") if map_table_or_dict is not None: if isinstance(map_table_or_dict, dict): if ( isinstance(map_table_or_dict.values()[0], list) and len(map_table_or_dict.values()[0]) == 2 ): logger.debug("Stripping previous hash table results") clean_dict = {k: map_table_or_dict[k][0] for k in map_table_or_dict} elif len(map_table_or_dict.values()[0]) == 1: clean_dict = map_table_or_dict.copy() else: raise ValueError("Mapping dictionary format not recognised, aborting") elif isinstance(map_table_or_dict, pd.DataFrame): # TODO, do more input validation, print( "Using ", map_table_or_dict.columns[0], " as key and ", map_table_or_dict.columns[1], " as anon key.", ) clean_dict = dict( zip( map_table_or_dict[map_table_or_dict.columns[0]], map_table_or_dict[map_table_or_dict.columns[1]], ) ) else: raise TypeError("Object not a dictionary or a dataframe") else: # no mapping object provided clean_dict = None translation = _anonymise(df_list, key_list, clean_dict, hash_list) if hash_list: # The translation is done to hash_list[anon_key] instead of just the # random anon_key. This is to further scramble the results esp when we # add new items so we need to create that translation table. # also we can create a string based ID or some other criteria for a hash translation_list = [ [k, translation[k], hash_list[translation[k]]] for k in translation ] translation_df = pd.DataFrame( data=translation_list, columns=["original_key", "anon_key", "hash_key"] ) logger.info( "Finished, returning translation DataFrame and hashtable as separate return values" ) return translation_df, hash_list else: translation_list = [[k, translation[k]] for k in translation] translation_df = pd.DataFrame( data=translation_list, columns=["original_key", "anon_key"] ) logger.info("Finished, returning translation DataFrame") return translation_df