Source code for bolster.data_sources.nisra.deaths

"""NISRA Weekly Death Registrations Data Source.

Provides access to weekly death registration statistics for Northern Ireland with breakdowns by:
- Demographics (age, sex)
- Geography (Local Government Districts)
- Place of death (hospital, home, care home, etc.)

Data is based on registration date, not death occurrence date. Most deaths are registered
within 5 days in Northern Ireland.

Data Source:
    **Mother Page**: https://www.nisra.gov.uk/statistics/death-statistics/weekly-death-registrations-northern-ireland

    This page lists all weekly death registration publications in reverse chronological order
    (newest first). The module automatically scrapes this page to find the current year's
    publication (e.g., "Weekly Death Registrations in Northern Ireland, 2025"), then downloads
    the latest weekly Excel file from that publication's detail page.

    The weekly files are cumulative, containing all weeks for the year to date. This ensures
    the module always retrieves the most recent data without hardcoding week numbers or dates.

Update Frequency: Weekly (published Fridays for week ending previous Friday)
Geographic Coverage: Northern Ireland

Example:
    >>> from bolster.data_sources.nisra import deaths
    >>> # Get latest demographics breakdown
    >>> df = deaths.get_latest_deaths(dimension='demographics')
    >>> sorted(df.columns.tolist())
    ['age_range', 'deaths', 'sex', 'week_ending']
    >>> len(df) > 0
    True
"""

import logging
from datetime import datetime
from pathlib import Path
from typing import Literal

import pandas as pd
from openpyxl import load_workbook

from bolster.utils.web import session

from ._base import (
    NISRADataNotFoundError,
    NISRAValidationError,
    download_file,
    safe_float,
    safe_int,
    scrape_download_links,
)

[docs] logger = logging.getLogger(__name__)
# URL patterns
[docs] WEEKLY_DEATHS_LANDING_PAGE = "https://www.nisra.gov.uk/publications/weekly-death-registrations-northern-ireland-{year}"
[docs] WEEKLY_DEATHS_BASE_URL = ( "https://www.nisra.gov.uk/statistics/death-statistics/weekly-death-registrations-northern-ireland" )
[docs] HISTORICAL_DEATHS_URL = "https://www.nisra.gov.uk/publications/historical-final-weekly-deaths-data"
[docs] DimensionType = Literal["demographics", "geography", "place", "totals", "all"]
[docs] def get_latest_weekly_deaths_url() -> str: """Scrape NISRA mother page to find the latest weekly deaths file. Navigates the publication structure: 1. Scrapes mother page for current year publication 2. Follows link to publication detail page 3. Finds latest weekly Excel file 4. Extracts and validates week ending date Returns: URL of the latest weekly deaths Excel file Raises: NISRADataNotFoundError: If no file found """ import re from bs4 import BeautifulSoup # Start at the mother page mother_page = WEEKLY_DEATHS_BASE_URL try: response = session.get(mother_page, timeout=30) response.raise_for_status() soup = BeautifulSoup(response.content, "html.parser") # Find the current year's publication # Looking for "Weekly Death Registrations in Northern Ireland, {year}" current_year = datetime.now().year pub_pattern = f"Weekly Death Registrations in Northern Ireland, {current_year}" pub_link = None for link in soup.find_all("a", href=True): link_text = link.get_text(strip=True) if pub_pattern in link_text: href = link["href"] if href.startswith("/"): href = f"https://www.nisra.gov.uk{href}" pub_link = href logger.info(f"Found {current_year} deaths publication: {link_text}") break if not pub_link: # Fall back to trying the year-specific landing page directly pub_link = WEEKLY_DEATHS_LANDING_PAGE.format(year=current_year) logger.info(f"Mother page scrape failed, trying direct URL: {pub_link}") # Now scrape the publication page for Excel files pub_response = session.get(pub_link, timeout=30) pub_response.raise_for_status() pub_soup = BeautifulSoup(pub_response.content, "html.parser") # Find Excel file links excel_links = [] for link in pub_soup.find_all("a", href=True): href = link["href"] if href.endswith(".xlsx") and "weekly" in href.lower() and "historical" not in href.lower(): if href.startswith("/"): href = f"https://www.nisra.gov.uk{href}" excel_links.append({"url": href, "text": link.get_text(strip=True)}) if not excel_links: raise NISRADataNotFoundError(f"No weekly deaths Excel files found on publication page: {pub_link}") # If multiple files, try to find the most recent by parsing dates from filenames if len(excel_links) > 1: files_with_dates = [] for link in excel_links: filename = link["url"].split("/")[-1] # Try to extract "w e DD Month YYYY" or "week ending DD Month YYYY" match = re.search( r"w[_\s.]*e[_\s.]*(\d{1,2})(?:st|nd|rd|th)?\s+([A-Za-z]+)\s+(\d{4})", filename, re.IGNORECASE ) if match: try: day = int(match.group(1)) month_str = match.group(2) year = int(match.group(3)) date_str = f"{day} {month_str} {year}" date = datetime.strptime(date_str, "%d %B %Y") files_with_dates.append({"url": link["url"], "date": date, "text": link["text"]}) except ValueError: continue if files_with_dates: # Sort by date descending (most recent first) files_with_dates.sort(key=lambda x: x["date"], reverse=True) latest = files_with_dates[0] logger.info(f"Found latest weekly deaths file: week ending {latest['date'].date()}") return latest["url"] # Fallback: Return first Excel link latest = excel_links[0] logger.info(f"Found weekly deaths file: {latest['text']}") return latest["url"] except Exception as e: # Final fallback: Try the old method using scrape_download_links logger.warning(f"Mother page scraping failed ({e}), falling back to simple scraping") current_year = datetime.now().year page_url = WEEKLY_DEATHS_LANDING_PAGE.format(year=current_year) try: links = scrape_download_links(page_url, file_extension=".xlsx") except Exception: links = scrape_download_links(WEEKLY_DEATHS_BASE_URL, file_extension=".xlsx") if not links: raise NISRADataNotFoundError("No weekly deaths Excel files found on NISRA website") from e weekly_files = [ link for link in links if "weekly_deaths" in link["url"].lower() and "historical" not in link["url"].lower() ] if not weekly_files: weekly_files = links latest = weekly_files[0] logger.info(f"Found latest weekly deaths file: {latest['text']}") return latest["url"]
[docs] def parse_deaths_totals(file_path: str | Path) -> pd.DataFrame: """Parse weekly totals with COVID-19, flu/pneumonia, and excess deaths from weekly deaths file. Extracts Table 1a and creates a flat table with columns: - week_ending: Friday of the reporting week - week_number: Week number in the year - observed_deaths: Total deaths registered in the week - deaths_same_week_2024: Deaths in corresponding week in 2024 - expected_deaths_5yr: Average deaths over previous 5 years (2020-2024) - expected_deaths_ons: Average deaths using ONS methodology (2019, 2021-2024) - excess_deaths_5yr: Observed minus expected (5yr method) - expected_deaths_current: Expected deaths using current methodology - excess_deaths_current: Observed minus expected (current method) - flu_pneumonia_deaths: Deaths mentioning flu or pneumonia - covid_deaths: Deaths mentioning COVID-19 Args: file_path: Path to the weekly deaths Excel file Returns: DataFrame with weekly totals and disease-specific deaths Raises: NISRAValidationError: If data validation fails """ wb = load_workbook(file_path, data_only=True) if "Table 1a" not in wb.sheetnames: raise NISRADataNotFoundError("Table 1a (totals) not found in file") sheet = wb["Table 1a"] # Find header row (row 4 based on inspection) # Header row contains: "Registration Week", "Week Ending (Friday)", etc. header_row = 4 records = [] # Read data rows (start from row 5) for row in sheet.iter_rows(min_row=header_row + 1, values_only=True): week_number = row[0] week_ending = row[1] # Skip if no week ending date (end of data) if not week_ending or not isinstance(week_ending, datetime): break # Extract the key columns # Column indices based on inspection: # A(0): Week Number # B(1): Week Ending # C(2): Observed Deaths # D(3): Deaths in 2024 # G(6): Expected Deaths (5yr, 2020-2024) # H(7): Expected Deaths (ONS method) # I(8): Excess Deaths (old method) # J(9): Expected Deaths (current method) # M(12): Excess Deaths (current method) # P(15): Flu/Pneumonia Deaths # Q(16): COVID-19 Deaths records.append( { "week_ending": week_ending, "week_number": int(week_number) if isinstance(week_number, int | float) else week_number, "observed_deaths": int(row[2]) if row[2] is not None else None, "deaths_same_week_2024": int(row[3]) if row[3] is not None else None, "expected_deaths_5yr": float(row[6]) if row[6] is not None else None, "expected_deaths_ons": float(row[7]) if row[7] is not None else None, "excess_deaths_5yr": float(row[8]) if row[8] is not None else None, "expected_deaths_current": float(row[9]) if row[9] is not None else None, "excess_deaths_current": float(row[12]) if row[12] is not None else None, "flu_pneumonia_deaths": int(row[15]) if row[15] is not None else None, "covid_deaths": int(row[16]) if row[16] is not None else None, } ) wb.close() df = pd.DataFrame(records) df["week_ending"] = pd.to_datetime(df["week_ending"]) # Validate: observed deaths should be positive if (df["observed_deaths"] <= 0).any(): raise NISRAValidationError("Found zero or negative observed deaths") logger.info(f"Totals parsed successfully ({len(df)} weeks)") logger.info(f"Total COVID-19 deaths in period: {df['covid_deaths'].sum()}") logger.info(f"Total Flu/Pneumonia deaths in period: {df['flu_pneumonia_deaths'].sum()}") return df
[docs] def parse_deaths_demographics(file_path: str | Path) -> pd.DataFrame: """Parse demographics dimension (age, sex) from weekly deaths file. Extracts Table 2 and creates a flat table with columns: - week_ending: Friday of the reporting week - sex: Total, Male, or Female - age_range: All, 0-14, 15-44, 45-64, 65-74, 75-84, 85+ - deaths: Count of deaths Args: file_path: Path to the weekly deaths Excel file Returns: DataFrame with demographics breakdown Raises: NISRAValidationError: If data validation fails """ wb = load_workbook(file_path, data_only=True) if "Table 2" not in wb.sheetnames: raise NISRADataNotFoundError("Table 2 (demographics) not found in file") sheet = wb["Table 2"] # Find the header row (contains "Sex", "Age", and week columns) header_row = None for row_idx, row in enumerate(sheet.iter_rows(min_row=1, max_row=10, values_only=True), 1): if row[0] == "Sex" and row[1] == "Age": header_row = row_idx break if header_row is None: raise NISRADataNotFoundError("Could not find header row in Table 2") # Extract headers headers = list(sheet[header_row]) # Find week columns (start from column index 2, format: "Week N\n DD Month YYYY") # Skip "to Date" cumulative column week_cols = [] for col_idx, cell in enumerate(headers[2:], start=2): if cell.value and "Week" in str(cell.value) and "to Date" not in str(cell.value): week_cols.append(col_idx) # Build flat table records = [] current_sex = None # Track sex across rows for merged cells # Read data rows for row in sheet.iter_rows(min_row=header_row + 1, values_only=True): sex = row[0] age_range = row[1] # Skip if no sex or age (footer rows, etc.) if not sex and not age_range: continue # Skip footer notes if sex and "Note" in str(sex): break # If sex is None but age_range exists, inherit sex from previous row if sex is None: sex = current_sex else: current_sex = sex # Parse each week column for col_idx in week_cols: deaths = row[col_idx] if deaths is None or deaths == "": continue # Parse week ending date from header week_header = headers[col_idx].value week_ending = _parse_week_ending(week_header) records.append( { "week_ending": week_ending, "sex": str(sex).strip(), "age_range": str(age_range).strip(), "deaths": int(deaths), } ) wb.close() df = pd.DataFrame(records) df["week_ending"] = pd.to_datetime(df["week_ending"]) # Validate _validate_demographics(df) return df
[docs] def parse_deaths_geography(file_path: str | Path) -> pd.DataFrame: """Parse geography dimension (Local Government Districts) from weekly deaths file. Extracts Table 3 and creates a flat table with columns: - week_ending: Friday of the reporting week - lgd: Local Government District name - deaths: Count of deaths Args: file_path: Path to the weekly deaths Excel file Returns: DataFrame with geography breakdown Raises: NISRAValidationError: If data validation fails """ wb = load_workbook(file_path, data_only=True) if "Table 3" not in wb.sheetnames: raise NISRADataNotFoundError("Table 3 (geography) not found in file") sheet = wb["Table 3"] # Find header row header_row = None for row_idx, row in enumerate(sheet.iter_rows(min_row=1, max_row=10, values_only=True), 1): if "Week Ending" in str(row[1] or ""): header_row = row_idx break if header_row is None: raise NISRADataNotFoundError("Could not find header row in Table 3") # Extract LGD names from header (columns 2 onwards) # Exclude 'Total' column to avoid double-counting headers = [cell.value for cell in sheet[header_row]] lgd_columns = { idx: headers[idx] for idx in range(2, len(headers)) if headers[idx] and "Total" not in str(headers[idx]) } records = [] # Read data rows for row in sheet.iter_rows(min_row=header_row + 1, values_only=True): _week_num = row[0] # Week number column exists but not used week_ending = row[1] # Skip if no week ending date if not week_ending or not isinstance(week_ending, datetime): continue # Parse each LGD column for col_idx, lgd_name in lgd_columns.items(): deaths = row[col_idx] if deaths is not None and deaths != "": records.append({"week_ending": week_ending, "lgd": str(lgd_name).strip(), "deaths": int(deaths)}) wb.close() df = pd.DataFrame(records) df["week_ending"] = pd.to_datetime(df["week_ending"]) # Validate _validate_geography(df) return df
[docs] def parse_deaths_place(file_path: str | Path) -> pd.DataFrame: """Parse place of death dimension from weekly deaths file. Extracts Table 4 and creates a flat table with columns: - week_ending: Friday of the reporting week - place_of_death: Hospital, Care/Nursing Home, Hospice, Home, Other - deaths: Count of deaths Args: file_path: Path to the weekly deaths Excel file Returns: DataFrame with place of death breakdown Raises: NISRAValidationError: If data validation fails """ wb = load_workbook(file_path, data_only=True) if "Table 4" not in wb.sheetnames: raise NISRADataNotFoundError("Table 4 (place) not found in file") sheet = wb["Table 4"] # Find header row header_row = None for row_idx, row in enumerate(sheet.iter_rows(min_row=1, max_row=10, values_only=True), 1): if "Week Ending" in str(row[1] or ""): header_row = row_idx break if header_row is None: raise NISRADataNotFoundError("Could not find header row in Table 4") # Extract place names from header headers = [cell.value for cell in sheet[header_row]] # Clean place names (remove newlines, etc.) place_columns = {} for idx in range(2, len(headers)): if headers[idx] and headers[idx] != "Total": place_name = str(headers[idx]).replace("\n", " ").strip() place_columns[idx] = place_name records = [] # Read data rows for row in sheet.iter_rows(min_row=header_row + 1, values_only=True): week_ending = row[1] # Skip if no week ending date if not week_ending or not isinstance(week_ending, datetime): continue # Parse each place column for col_idx, place_name in place_columns.items(): deaths = row[col_idx] if deaths is not None and deaths != "": records.append({"week_ending": week_ending, "place_of_death": place_name, "deaths": int(deaths)}) wb.close() df = pd.DataFrame(records) df["week_ending"] = pd.to_datetime(df["week_ending"]) # Validate _validate_place(df) return df
[docs] def parse_deaths_file( file_path: str | Path, dimension: DimensionType = "all" ) -> pd.DataFrame | dict[str, pd.DataFrame]: """Parse weekly deaths file for one or all dimensions. Args: file_path: Path to the weekly deaths Excel file dimension: Which dimension(s) to parse: - 'totals': Weekly totals with COVID, flu/pneumonia, excess deaths - 'demographics': Age and sex breakdown - 'geography': Local Government Districts - 'place': Place of death - 'all': All dimensions (returns dict) Returns: DataFrame for single dimension, or dict of DataFrames for 'all' Example: >>> url = get_latest_weekly_deaths_url() >>> path = download_file(url, cache_ttl_hours=24*7) >>> data = parse_deaths_file(path, dimension='all') >>> sorted(data.keys()) ['demographics', 'geography', 'place', 'totals'] """ if dimension == "all": return { "totals": parse_deaths_totals(file_path), "demographics": parse_deaths_demographics(file_path), "geography": parse_deaths_geography(file_path), "place": parse_deaths_place(file_path), } if dimension == "totals": return parse_deaths_totals(file_path) if dimension == "demographics": return parse_deaths_demographics(file_path) if dimension == "geography": return parse_deaths_geography(file_path) if dimension == "place": return parse_deaths_place(file_path) raise ValueError(f"Invalid dimension: {dimension}. Must be one of: totals, demographics, geography, place, all")
[docs] def get_latest_deaths( dimension: DimensionType = "all", force_refresh: bool = False ) -> pd.DataFrame | dict[str, pd.DataFrame]: """Get the latest weekly deaths data. Args: dimension: Which dimension(s) to retrieve force_refresh: Force re-download even if cached Returns: DataFrame or dict of DataFrames depending on dimension Example: >>> df = get_latest_deaths(dimension='demographics') >>> sorted(df.columns.tolist()) ['age_range', 'deaths', 'sex', 'week_ending'] >>> len(df) > 0 True """ url = get_latest_weekly_deaths_url() file_path = download_file(url, cache_ttl_hours=7 * 24, force_refresh=force_refresh) return parse_deaths_file(file_path, dimension=dimension)
[docs] def get_historical_deaths( years: list[int] | None = None, force_refresh: bool = False, include_age_breakdowns: bool = False ) -> pd.DataFrame | dict[str, pd.DataFrame]: """Get historical weekly deaths data (2011-2024). Downloads and parses the NISRA historical weekly deaths file which contains final (not provisional) data for multiple years. Includes total deaths, respiratory deaths, COVID-19 deaths, flu/pneumonia deaths, and age breakdowns. Args: years: List of years to include (default: all available years) force_refresh: Force re-download even if cached include_age_breakdowns: If True, returns dict with 'totals' and 'age_breakdowns' DataFrames If False (default), returns only totals DataFrame Returns: If include_age_breakdowns=False: DataFrame with columns: - year: Year - week_number: Week number in year - week_ending: Friday of the reporting week - total_deaths: Total deaths registered in week - expected_deaths_5yr: Average deaths over previous 5 years - excess_deaths: Observed minus expected deaths - respiratory_deaths_involving: Deaths involving respiratory diseases - flu_pneumonia_deaths_involving: Deaths involving flu/pneumonia - covid_deaths_involving: Deaths involving COVID-19 - covid_deaths_due_to: Deaths due to COVID-19 (underlying cause) If include_age_breakdowns=True: Dict with keys: - 'totals': DataFrame as above - 'age_breakdowns': DataFrame in long format with columns: - year: Year - week_ending: Friday of the reporting week - age_range: Age range label (e.g., '0-7 days', '15-44', '85+') - deaths: Death count for this age range Example: >>> # Get totals only >>> df = get_historical_deaths() >>> 'total_deaths' in df.columns True >>> # Get totals with age breakdowns >>> data = get_historical_deaths(years=[2020, 2021], include_age_breakdowns=True) >>> sorted(data.keys()) ['age_breakdowns', 'totals'] >>> totals = data['totals'] >>> age_data = data['age_breakdowns'] >>> # Analyze age distribution >>> age_summary = age_data.groupby('age_range')['deaths'].sum() >>> len(age_summary) > 0 True """ # Download the historical file links = scrape_download_links(HISTORICAL_DEATHS_URL, file_extension=".xlsx") if not links: raise NISRADataNotFoundError("No historical deaths Excel file found") url = links[0]["url"] file_path = download_file(url, cache_ttl_hours=30 * 24, force_refresh=force_refresh) # Cache for 30 days wb = load_workbook(file_path, data_only=True) # Determine which years to parse available_years = [] for sheet_name in wb.sheetnames: if sheet_name.startswith("Weekly Deaths_"): year = int(sheet_name.split("_")[1]) available_years.append(year) if years is None: years = sorted(available_years) else: # Validate requested years invalid_years = set(years) - set(available_years) if invalid_years: raise ValueError(f"Years not available: {invalid_years}. Available: {available_years}") logger.info(f"Parsing historical deaths for years: {years}") all_records = [] age_breakdown_records = [] # Define age column mapping (consistent across years, but this makes it flexible) # These are standard column indices for the historical file format AGE_COLUMNS = { "0-7 days": 20, "7 days-1 year": 21, "1-14": 22, "15-44": 23, "45-64": 24, "65-74": 25, "75-84": 26, "85+": 27, } for year in years: sheet = wb[f"Weekly Deaths_{year}"] # Header is in row 4, data starts at row 5 # Using standardized column indices for the historical format # (these are stable across years in the historical file) for row in sheet.iter_rows(min_row=5, values_only=True): week_number = row[0] week_ending = row[2] # Week Ends (Friday) if not week_ending or not isinstance(week_ending, datetime): break # Parse main totals record all_records.append( { "year": year, "week_number": week_number, "week_ending": week_ending, "total_deaths": safe_int(row[3]), "expected_deaths_5yr": safe_float(row[4]), "expected_deaths_current": safe_float(row[8]), "excess_deaths": safe_float(row[11]), "respiratory_deaths_involving": safe_int(row[14]), "flu_pneumonia_deaths_involving": safe_int(row[16]), "covid_deaths_involving": safe_int(row[18]) or 0, "covid_deaths_due_to": safe_int(row[19]) or 0, } ) # Parse age breakdowns if requested if include_age_breakdowns: for age_range, col_idx in AGE_COLUMNS.items(): deaths = safe_int(row[col_idx]) if deaths is not None: # Only include if we have data age_breakdown_records.append( {"year": year, "week_ending": week_ending, "age_range": age_range, "deaths": deaths} ) wb.close() # Create totals DataFrame df_totals = pd.DataFrame(all_records) df_totals["week_ending"] = pd.to_datetime(df_totals["week_ending"]) logger.info( f"Historical deaths parsed: {len(df_totals)} weeks from {df_totals['year'].min()} to {df_totals['year'].max()}" ) logger.info(f"Total COVID deaths (involving): {df_totals['covid_deaths_involving'].sum()}") if include_age_breakdowns: df_age = pd.DataFrame(age_breakdown_records) df_age["week_ending"] = pd.to_datetime(df_age["week_ending"]) logger.info(f"Age breakdowns parsed: {len(df_age)} records across {df_age['age_range'].nunique()} age ranges") return {"totals": df_totals, "age_breakdowns": df_age} return df_totals
[docs] def get_combined_deaths( years: list[int] | None = None, include_current_year: bool = True, force_refresh: bool = False ) -> pd.DataFrame: """Get combined historical and current year deaths data. Combines the historical deaths file (2011-2024) with the current year's provisional data to give a complete time series including the most recent weeks. Args: years: List of years to include from historical data (default: last 5 years + current) include_current_year: Include current year provisional data (default: True) force_refresh: Force re-download of both historical and current files Returns: DataFrame with columns: - year: Year - week_number: Week number in year - week_ending: Friday of the reporting week - total_deaths: Total deaths registered in week - covid_deaths: COVID-19 deaths (from current year) or covid_deaths_involving (historical) - flu_pneumonia_deaths: Flu/pneumonia deaths - excess_deaths: Excess deaths (observed - expected) - data_source: 'historical' or 'current' Example: >>> df = get_combined_deaths(years=[2022, 2023, 2024]) >>> 'total_deaths' in df.columns True >>> 'data_source' in df.columns True """ current_year = datetime.now().year # Default to last 5 complete years if not specified if years is None: years = list(range(current_year - 5, current_year)) # Get historical data for specified years df_historical = get_historical_deaths(years=years, force_refresh=force_refresh) # Standardize column names for combining df_historical = df_historical.rename( columns={"covid_deaths_involving": "covid_deaths", "flu_pneumonia_deaths_involving": "flu_pneumonia_deaths"} ) df_historical["data_source"] = "historical" # Add current year if requested if include_current_year: try: # Get current year totals (includes COVID, flu/pneumonia, excess deaths) df_current = get_latest_deaths(dimension="totals", force_refresh=force_refresh) # Add year and week_of_year df_current["year"] = df_current["week_ending"].dt.year df_current["data_source"] = "current" # Standardize columns to match historical df_current = df_current.rename( columns={"observed_deaths": "total_deaths", "excess_deaths_current": "excess_deaths"} ) # Select matching columns common_cols = [ "year", "week_number", "week_ending", "total_deaths", "covid_deaths", "flu_pneumonia_deaths", "excess_deaths", "data_source", ] # Combine df_combined = pd.concat([df_historical[common_cols], df_current[common_cols]], ignore_index=True) logger.info(f"Combined data: {len(df_historical)} historical weeks + {len(df_current)} current year weeks") except Exception as e: logger.warning(f"Could not get current year data: {e}. Returning historical only.") df_combined = df_historical else: df_combined = df_historical # Sort by date return df_combined.sort_values("week_ending").reset_index(drop=True)
# Helper functions def _parse_week_ending(week_header: str) -> datetime: r"""Parse week ending date from Excel header. Example headers: - "Week 1\n 3 January 2025" - "Week 2\n 10 January 2025" """ import re from dateutil import parser # Extract date part (after newline) date_str = week_header.split("\n")[1].strip() if "\n" in week_header else week_header # Try to parse with dateutil try: return parser.parse(date_str) except (ValueError, TypeError): # Fallback: extract with regex match = re.search(r"(\d{1,2})\s+(\w+)\s+(\d{4})", date_str) if match: day, month, year = match.groups() date_str = f"{day} {month} {year}" return parser.parse(date_str) raise ValueError(f"Could not parse date from: {week_header}") def _validate_demographics(df: pd.DataFrame): """Validate demographics data against expected totals.""" # For each week, Male + Female should equal Total for week in df["week_ending"].unique(): week_data = df[(df["week_ending"] == week) & (df["age_range"] == "All")] total_deaths = week_data[week_data["sex"].str.contains("Total", na=False)]["deaths"].sum() male_deaths = week_data[ week_data["sex"].str.contains("Male", na=False) & ~week_data["sex"].str.contains("Female", na=False) ]["deaths"].sum() female_deaths = week_data[week_data["sex"].str.contains("Female", na=False)]["deaths"].sum() if total_deaths != (male_deaths + female_deaths): raise NISRAValidationError( f"Week {week}: Demographics validation failed. " f"Total={total_deaths}, Male+Female={male_deaths + female_deaths}" ) logger.info("Demographics validation passed") def _validate_geography(df: pd.DataFrame): """Validate geography data.""" # Basic validation: ensure we have expected LGDs lgds = df["lgd"].unique() expected_min_lgds = 10 # At least 10 LGDs expected if len(lgds) < expected_min_lgds: logger.warning(f"Only found {len(lgds)} LGDs, expected at least {expected_min_lgds}") logger.info(f"Geography validation passed ({len(lgds)} LGDs found)") def _validate_place(df: pd.DataFrame): """Validate place of death data.""" # Basic validation: ensure we have expected places places = df["place_of_death"].unique() expected_places = {"Hospital", "Home"} # At minimum missing = expected_places - set(places) if missing: logger.warning(f"Missing expected places: {missing}") logger.info(f"Place validation passed ({len(places)} places found)")
[docs] def validate_deaths_data(df: pd.DataFrame) -> bool: # pragma: no cover """Validate NISRA deaths data integrity. Args: df: DataFrame from get_latest_deaths or parse_deaths_file Returns: True if validation passes, False otherwise """ if df.empty: logger.warning("Deaths data is empty") return False required_cols = {"month", "deaths_total", "place"} if not required_cols.issubset(df.columns): missing = required_cols - set(df.columns) logger.warning(f"Missing required death data columns: {missing}") return False # Check for non-negative death counts if (df["deaths_total"] < 0).any(): logger.warning("Found negative death counts") return False # Check for reasonable monthly ranges (Northern Ireland context) monthly_max = df["deaths_total"].max() if monthly_max > 2000: # Unreasonably high for NI logger.warning(f"Unreasonably high monthly deaths: {monthly_max}") return False return True