Source code for bolster.data_sources.daera_waste

"""DAERA NI Local Authority Collected (LAC) Municipal Waste Statistics.

Quarterly time-series data on local-authority-collected municipal waste
management across Northern Ireland, published by the Department of
Agriculture, Environment and Rural Affairs (DAERA).

Data Source:
    **Discovery page**:
    https://www.daera-ni.gov.uk/publications/northern-ireland-local-authority-collected-municipal-waste-management-statistics-time-series-data

    The module scrapes the DAERA publications page to auto-discover the
    current CSV URL (which changes with each release, e.g. ``2026-04/...``).
    It then downloads the time-series CSV and returns a tidy long-format
    DataFrame.

Update Frequency:
    Quarterly (provisional) with finalised annual revisions.  The current
    series runs from Q1 2006/07 to the most recent available quarter.

Geographic Coverage:
    All NI council areas including both pre- and post-2015 boundaries, plus
    a Northern Ireland aggregate row.  The 11 post-2015 LGD councils are:
    Antrim & Newtownabbey, Ards & North Down, Armagh City Banbridge &
    Craigavon, Belfast, Causeway Coast & Glens, Derry City & Strabane,
    Fermanagh & Omagh, Lisburn & Castlereagh, Mid & East Antrim, Mid Ulster,
    Newry Mourne & Down.

Example:
    >>> from bolster.data_sources import daera_waste
    >>> df = daera_waste.get_latest_waste_statistics()
    >>> 'council_area' in df.columns
    True
    >>> 'tonnes' in df.columns
    True

"""

from __future__ import annotations

import logging
from pathlib import Path

import pandas as pd
from bs4 import BeautifulSoup

from bolster.utils.cache import CachedDownloader, DownloadError
from bolster.utils.web import session

[docs] logger = logging.getLogger(__name__)
# ── Public page to scrape for the current file URL ──────────────────────────
[docs] DAERA_PUBLICATION_PAGE = ( "https://www.daera-ni.gov.uk/publications/" "northern-ireland-local-authority-collected-municipal-waste-" "management-statistics-time-series-data" )
[docs] DAERA_BASE_URL = "https://www.daera-ni.gov.uk"
# ── Cached downloader (namespace = "daera") ────────────────────────────────── _downloader = CachedDownloader("daera", timeout=60) # ── Canonical column renames from wide CSV to internal names ───────────────── # Each entry maps a substring of the raw CSV column header to a clean name. # Order matters: more-specific patterns must come before broader ones. _WASTE_COLUMN_MAP: dict[str, str] = { # LAC arisings "local authority collected municipal waste arisings": "lac_waste_arisings_tonnes", # LAC recycling / composting "local authority collected municipal waste preparing for reuse, dry recycling and composting (tonnes)": "lac_reuse_recycling_composting_tonnes", "local authority collected municipal waste dry recycling and composting (tonnes)": "lac_dry_recycling_composting_tonnes", "local authority collected municipal waste preparing for reuse (tonnes)": "lac_preparing_for_reuse_tonnes", "local authority collected municipal waste dry recycling (tonnes)": "lac_dry_recycling_tonnes", "local authority collected municipal waste composting (tonnes)": "lac_composting_tonnes", # LAC rates "local authority collected municipal waste preparing for reuse, dry recycling and composting rate": "lac_reuse_recycling_composting_rate_pct", "local authority collected municipal waste dry recycling and composting rate": "lac_dry_recycling_composting_rate_pct", # LAC energy recovery "local authority collected municipal waste energy recovery for specific streams": "lac_energy_recovery_specific_streams_tonnes", "local authority collected municipal waste energy recovery for mixed residual": "lac_energy_recovery_mixed_residual_tonnes", "local authority collected municipal waste energy recovery rate": "lac_energy_recovery_rate_pct", # LAC landfill "local authority collected municipal waste landfilled (tonnes)": "lac_landfilled_tonnes", "local authority collected municipal waste landfill rate": "lac_landfill_rate_pct", # NILAS "biodegradable local authority collected municipal waste to landfill": "lac_biodegradable_to_landfill_tonnes", "nilas financial year allocation before transfers": "nilas_allocation_before_transfers_tonnes", "nilas financial year allocation after transfers": "nilas_allocation_after_transfers_tonnes", # Household arisings "household waste arisings (tonnes)": "hh_waste_arisings_tonnes", # Household recycling / composting "household waste preparing for reuse, dry recycling and composting (tonnes)": "hh_reuse_recycling_composting_tonnes", "household waste dry recycling and composting (tonnes)": "hh_dry_recycling_composting_tonnes", "household waste preparing for reuse (tonnes)": "hh_preparing_for_reuse_tonnes", "household waste dry recycling (tonnes)": "hh_dry_recycling_tonnes", "household waste composting (tonnes)": "hh_composting_tonnes", # Household rates "household waste preparing for reuse, dry recycling and composting rate": "hh_reuse_recycling_composting_rate_pct", "household waste dry recycling and composting rate": "hh_dry_recycling_composting_rate_pct", # Household landfill "household waste landfilled (tonnes)": "hh_landfilled_tonnes", "household waste landfill rate": "hh_landfill_rate_pct", # Household per-household / per-capita "number of households": "num_households", "household waste arisings per household": "hh_waste_per_household_kg", "population": "population", "household waste arisings per capita": "hh_waste_per_capita_kg", # Waste from households (statutory definition) "waste from households recycling rate": "wfh_recycling_rate_pct", "waste from households recycling": "wfh_recycling_tonnes", "waste from households arisings": "wfh_arisings_tonnes", } # ── Required columns for validation ───────────────────────────────────────── _REQUIRED_COLUMNS = { "financial_year", "quarter_code", "quarter_name", "area_code", "council_area", "waste_management_group", "data_status", "lac_waste_arisings_tonnes", "hh_waste_arisings_tonnes", } # ── 11 post-2015 LGD councils expected in the data ───────────────────────────
[docs] NI_COUNCILS_POST_2015 = { "Antrim & Newtownabbey", "Ards & North Down", "Armagh City, Banbridge & Craigavon", "Belfast", "Causeway Coast & Glens", "Derry City & Strabane", "Fermanagh & Omagh", "Lisburn & Castlereagh", "Mid & East Antrim", "Mid Ulster", "Newry, Mourne & Down", }
# ── Custom exceptions ────────────────────────────────────────────────────────
[docs] class DAERADataNotFoundError(Exception): """DAERA data file or publication page could not be located."""
[docs] class DAERAValidationError(Exception): """DAERA DataFrame failed validation checks."""
# ── URL discovery ────────────────────────────────────────────────────────────
[docs] def get_waste_publication_url(prefer: str = "csv") -> str: """Scrape the DAERA publications page for the latest LAC waste CSV/Excel URL. The URL contains a date component (e.g. ``2026-04/``) that changes with each release, so this function fetches the page and finds the current link. Args: prefer: Preferred file type — ``"csv"`` (default) or ``"xlsx"``. Returns: Absolute URL of the latest time-series file. Raises: DAERADataNotFoundError: If the publication page cannot be fetched or no matching link is found. Example: >>> url = get_waste_publication_url() >>> url.endswith(".csv") or url.endswith(".xlsx") True >>> "daera-ni.gov.uk" in url True """ try: response = session.get(DAERA_PUBLICATION_PAGE, timeout=30) response.raise_for_status() except Exception as exc: # pragma: no cover - network errors raise DAERADataNotFoundError(f"Failed to fetch DAERA publication page: {exc}") from exc soup = BeautifulSoup(response.content, "html.parser") ext_order = (".csv", ".xlsx") if prefer == "csv" else (".xlsx", ".csv") candidates: dict[str, str] = {} for a in soup.find_all("a", href=True): href: str = a["href"] href_lower = href.lower() if "lac-municipal-waste" not in href_lower: continue for ext in ext_order: if href_lower.endswith(ext): if href.startswith("/"): href = f"{DAERA_BASE_URL}{href}" candidates[ext] = href break for ext in ext_order: if ext in candidates: logger.info("Discovered DAERA waste URL: %s", candidates[ext]) return candidates[ext] raise DAERADataNotFoundError(f"No LAC municipal waste CSV or Excel link found on {DAERA_PUBLICATION_PAGE}")
# ── Parsing ────────────────────────────────────────────────────────────────── def _rename_waste_columns(df: pd.DataFrame) -> pd.DataFrame: """Rename verbose CSV column headers to clean internal names. Applies a longest-match strategy: for each raw column, the *first* pattern in ``_WASTE_COLUMN_MAP`` whose text is contained in the lower-cased column header wins. Args: df: DataFrame with raw column names straight from the CSV. Returns: DataFrame with renamed columns (unmapped columns are kept as-is). """ rename_map: dict[str, str] = {} for raw_col in df.columns: raw_lower = raw_col.lower() for pattern, clean_name in _WASTE_COLUMN_MAP.items(): if pattern in raw_lower: rename_map[raw_col] = clean_name break return df.rename(columns=rename_map)
[docs] def parse_waste_file(file_path: str | Path) -> pd.DataFrame: """Parse a DAERA LAC municipal waste time-series CSV file. Reads the CSV (which uses commas as thousands separators in numeric columns), renames columns to clean internal names, and returns a tidy long-format DataFrame. Metadata columns (``QuarterCode``, ``QuarterName``, ``FinancialYear``, ``AreaCode``, ``AreaName``, ``WasteManagementGroup``, ``DataStatus``) are retained alongside all numeric waste metric columns. Args: file_path: Path to a downloaded ``.csv`` waste time-series file. Returns: DataFrame with one row per (quarter, council area) and columns including ``financial_year``, ``quarter_code``, ``quarter_name``, ``area_code``, ``council_area``, ``waste_management_group``, ``data_status``, plus numeric waste metrics. Raises: DAERAValidationError: If the file cannot be read or lacks the expected structure. Example: >>> import tempfile, pathlib >>> # In practice, use get_latest_waste_statistics() instead >>> # parse_waste_file(pathlib.Path("/path/to/download.csv")) """ file_path = Path(file_path) # The CSV is published with Windows-1252 encoding (a `\x80` byte appears # in the header row as part of a KPI footnote marker). Using latin-1 (which # is a superset of the byte range) avoids a UnicodeDecodeError while still # decoding all printable characters correctly. try: raw = pd.read_csv( file_path, thousands=",", na_values=["-", ""], encoding="latin-1", ) except Exception as exc: raise DAERAValidationError(f"Failed to read waste CSV {file_path}: {exc}") from exc if raw.empty: raise DAERAValidationError(f"Waste CSV {file_path} is empty") # Rename raw columns to internal names df = _rename_waste_columns(raw) # Rename the metadata columns too meta_renames = { "QuarterCode": "quarter_code", "QuarterName": "quarter_name", "FinancialYear": "financial_year", "AreaCode": "area_code", "AreaName": "council_area", "WasteManagementGroup": "waste_management_group", "DataStatus": "data_status", } df = df.rename(columns=meta_renames) # Parse financial year start year (e.g. "2006/07" -> 2006) df["financial_year_start"] = df["financial_year"].str.extract(r"^(\d{4})/", expand=False).astype("Int64") # Quarter ordinal (Q1=1 … Q4=4) for sorting _quarter_order = {"Q1": 1, "Q2": 2, "Q3": 3, "Q4": 4} df["quarter_number"] = df["quarter_code"].map(_quarter_order) df = df.sort_values(["financial_year_start", "quarter_number", "council_area"]).reset_index(drop=True) logger.info( "Parsed DAERA waste CSV: %d rows, %d columns, FY %s%s", len(df), len(df.columns), df["financial_year"].iloc[0] if len(df) else "?", df["financial_year"].iloc[-1] if len(df) else "?", ) return df
# ── Download + cache ─────────────────────────────────────────────────────────
[docs] def get_latest_waste_statistics(force_refresh: bool = False) -> pd.DataFrame: """Download and parse the latest DAERA LAC municipal waste statistics. Scrapes the DAERA publications page for the current CSV URL (handling date-stamped paths that change with each release), downloads the file with 30-day caching, and returns a parsed DataFrame. Args: force_refresh: If ``True``, bypass the local cache and re-download. Returns: DataFrame from :func:`parse_waste_file`. Raises: DAERADataNotFoundError: If the publication page or file cannot be fetched. DAERAValidationError: If the downloaded file cannot be parsed. Example: >>> df = get_latest_waste_statistics() >>> 'council_area' in df.columns True >>> (df['lac_waste_arisings_tonnes'] >= 0).all() True """ csv_url = get_waste_publication_url(prefer="csv") logger.info("Downloading DAERA waste statistics from %s", csv_url) try: file_path = _downloader.download(csv_url, cache_ttl_hours=24 * 30, force_refresh=force_refresh) except DownloadError as exc: raise DAERADataNotFoundError(str(exc)) from exc return parse_waste_file(file_path)
# ── Validation ───────────────────────────────────────────────────────────────
[docs] def validate_waste_data(df: pd.DataFrame) -> bool: """Validate a DAERA LAC municipal waste DataFrame. Args: df: DataFrame from :func:`get_latest_waste_statistics` or :func:`parse_waste_file`. Returns: ``True`` if all checks pass. Raises: DAERAValidationError: If the DataFrame is empty, missing required columns, has negative tonnage values, lacks expected NI councils, or covers an implausibly short time span. Example: >>> df = get_latest_waste_statistics() >>> validate_waste_data(df) True """ if df is None or df.empty: raise DAERAValidationError("Waste DataFrame is empty") missing = _REQUIRED_COLUMNS - set(df.columns) if missing: raise DAERAValidationError(f"Missing required columns: {sorted(missing)}") # No negative tonnes in the two headline series for col in ("lac_waste_arisings_tonnes", "hh_waste_arisings_tonnes"): vals = df[col].dropna() if (vals < 0).any(): raise DAERAValidationError(f"Negative values found in {col}") # At least the 11 post-2015 LGD councils should appear councils_in_data = set(df["council_area"].unique()) missing_councils = NI_COUNCILS_POST_2015 - councils_in_data if missing_councils: raise DAERAValidationError(f"Missing expected NI councils: {sorted(missing_councils)}") # Series must span at least 5 financial years (data starts 2006/07) if "financial_year_start" in df.columns: years = df["financial_year_start"].dropna() if years.nunique() < 5: raise DAERAValidationError(f"Too few financial years ({years.nunique()}); expected 5+") return True