Source code for pydit.wrangling.date_time_calculations

"""Module with functions for date and time calculations.
IMPORTANT: adapted to England and Wales only, edit the calendar class to regional specs.
"""

# pylint: disable=unexpected-keyword-arg
# pylint: disable=bare-except
# ruff: noqa: E722
import logging
from datetime import datetime, date, timedelta

import pandas as pd
from pandas.tseries.offsets import CDay
from pandas.tseries.holiday import (
    AbstractHolidayCalendar,
    DateOffset,
    EasterMonday,
    GoodFriday,
    Holiday,
    MO,
    next_monday,
    next_monday_or_tuesday,
)

logger = logging.getLogger(__name__)


[docs] class EnglandAndWalesHolidayCalendar(AbstractHolidayCalendar): """Calendar class for England and Wales.""" rules = [ Holiday("New Years Day", month=1, day=1, observance=next_monday), GoodFriday, EasterMonday, Holiday( "Early May bank holiday", month=5, day=1, offset=DateOffset(weekday=MO(1)) ), Holiday( "Spring bank holiday", month=5, day=31, offset=DateOffset(weekday=MO(-1)) ), Holiday( "Summer bank holiday", month=8, day=31, offset=DateOffset(weekday=MO(-1)) ), Holiday("Christmas Day", month=12, day=25, observance=next_monday), Holiday("Boxing Day", month=12, day=26, observance=next_monday_or_tuesday), ]
[docs] class business_calendar: """Class to calculate the business hours between datetimes Parameters ---------- start_date : date, optional start date of the business calendar, by default 1st Jan 2010 end_date : date, optional end date of the business calendar, by default one year from current date bus_start_time : int, optional start time of the business day, defaults to 9 (9am) bus_end_time : int, optional end time of the business day, defaults to 17 (5pm) returns ------- object with methods to calculate business hours and minutes between two datetimes """ def __init__( self, start_date=None, end_date=None, bus_start_time=9, bus_end_time=17 ): if start_date is None: self.start_date = date(2010, 1, 1) else: if isinstance(start_date, date): try: self.start_date = start_date.date() except: self.start_date = start_date else: raise TypeError("start_date must be a date/datetime object") if end_date is None: self.end_date = datetime.now().date().replace(year=datetime.now().year + 1) else: if isinstance(end_date, date): try: self.end_date = end_date.date() except: self.end_date = end_date else: raise TypeError("end_date must be a date/datetime object") self._cal = EnglandAndWalesHolidayCalendar() self._dayindex = pd.bdate_range( start=self.start_date, end=self.end_date, freq=CDay(calendar=self._cal) ) self.bus_start_time = bus_start_time self.bus_end_time = bus_end_time
[docs] def business_mins(self, datetime_start, datetime_end): """Calculate the business minutes between two datetimes""" mins_in_working_day = (self.bus_end_time - self.bus_start_time) * 60 day_series = self._dayindex.to_series() # will return dates found between the dates we provide d = day_series[datetime_start.date() : datetime_end.date()] daycount = len(d) if len(d) == 0: return 0 else: first_day_start = d.iloc[0].replace(hour=self.bus_start_time, minute=0) first_day_end = d.iloc[0].replace(hour=self.bus_end_time, minute=0) first_period_start = max(first_day_start, datetime_start) first_period_end = min(first_day_end, datetime_end) if first_period_end <= first_period_start: first_day_mins = 0 else: first_day_sec = first_period_end - first_period_start first_day_mins = first_day_sec.seconds / 60 if daycount == 1: return first_day_mins else: # we calculate last day last_period_start = d.iloc[-1].replace( hour=self.bus_start_time, minute=0 ) # we know it will always start in the bus_start_time last_day_end = d.iloc[-1].replace(hour=self.bus_end_time, minute=0) last_period_end = min(last_day_end, datetime_end) if last_period_end <= last_period_start: last_day_mins = 0 else: last_day_sec = last_period_end - last_period_start last_day_mins = last_day_sec.seconds / 60 if daycount > 2: # we calculate middle days if appropriate middle_days_mins = (daycount - 2) * mins_in_working_day else: middle_days_mins = 0 return first_day_mins + last_day_mins + middle_days_mins
[docs] def business_hours(self, datetime_start, datetime_end): """Calculate the number of business hours between two datetimes.""" return int( round( self.business_mins( datetime_start, datetime_end, ) / 60, 0, ) )
[docs] def calculate_business_hours(df, start_col, end_col, bus_start_time=9, bus_end_time=17): """Calculate the number of business hours between two datetimes.""" df = df.copy() df["business_hours"] = df.apply( lambda x: business_calendar( bus_start_time=bus_start_time, bus_end_time=bus_end_time ).business_hours(x[start_col], x[end_col]), axis=1, ) return df
[docs] def calculate_business_hours_fast( df, start_col, end_col, bus_start_time=9, bus_end_time=17 ): """Calculate the number of business hours between two datetimes.""" df = df.copy() cal = business_calendar(bus_start_time=bus_start_time, bus_end_time=bus_end_time) df["business_hours"] = df.apply( lambda x: cal.business_hours(x[start_col], x[end_col]), axis=1, ) return df
[docs] def first_and_end_of_month(d, return_datetime=True): """Function to return the first and last day of a month Parameters ---------- d : datetime.date or datetime.datetime or str The date to use as reference. return_datetime : bool, optional, default: True If True, returns datetime.datetime objects, else datetime.date objects Returns ------- tuple A tuple with the first and last day of the month Note that when returning datetimes, the last day will have the time set to 23:59:59 if you need something else as the last time, you can adjust it after calling this function. e.g. fom_eom(date(2024, 8, 10))[1].replace(hour=0, minute=0, second=1) to be the very first second of the day of if we want it to be the first second of the following month: fom_eom(date(2024, 8, 10))[1] + timedelta(seconds=1) """ if isinstance(d, str): try: d = datetime.strptime(d, "%Y-%m-%d") except ValueError: try: d = datetime.strptime(d, "%Y-%m-%d %H:%M:%S") except ValueError as e: raise ValueError( "Invalid date format, expecting YYYY-MM-DD or YYYY-MM-DD HH:MM:SS" ) from e if isinstance(d, date): d = datetime(d.year, d.month, d.day) start = d.replace(day=1) next_month = d.replace(day=28) + timedelta(days=4) end = ( next_month - timedelta(days=next_month.day) + timedelta(hours=23, minutes=59, seconds=59) ) if return_datetime is False: start = date(start.year, start.month, start.day) end = date(end.year, end.month, end.day) return (start, end)
[docs] def date_relative_in_words( input_date, reference_datetime: datetime | None = None ) -> str: """Return a human description of how many months ago or in the future a date occurred/occurs. Parameters ---------- input_date : str or datetime The date to compare to the reference date. Can be a string or a datetime object. reference_datetime : datetime, optional The date to compare the input_date to. If None, the current date and time will be used. Returns ------- str A human-readable string describing how long ago or in the future the input_date is relative to the reference_datetime. Possible outputs include "within a week ago", "in 3 days", "2 months ago", "in more than two years", etc. """ if input_date is None or input_date == "": return "" if not isinstance(input_date, (str, datetime)): return "" dt = pd.to_datetime(input_date, errors="coerce") if pd.isna(dt): return "" dt = dt.to_pydatetime() dt = dt.replace(tzinfo=None) reference = reference_datetime or datetime.now() months = (reference.year - dt.year) * 12 + (reference.month - dt.month) days_diff = (reference - dt).days if days_diff >= 0 and days_diff <= 7: return "within a week ago" elif days_diff < 0 and days_diff >= -7: return "within a week from now" elif days_diff > 7 and days_diff <= 30: return f"{days_diff} day{'s' if days_diff > 1 else ''} ago" elif days_diff < -7 and days_diff >= -30: return f"in {-days_diff} day{'s' if days_diff < -1 else ''}" elif months > 0 and months < 24: return f"{months} month{'s' if months > 1 else ''} ago" elif months >= 24: return "more than two years ago" elif months < 0 and months > -24: return f"in {-months} month{'s' if months < -1 else ''}" elif months <= -24: return "in more than two years"
if __name__ == "__main__": pass