Source code for bolster.data_sources.nisra.migration

"""NISRA Migration Estimates - Official and Derived.

This module provides access to NISRA migration data through two approaches:

1. **Official Migration Statistics**: Published NISRA long-term international migration
   estimates from administrative data and the International Passenger Survey (IPS).

2. **Derived Migration Estimates**: Calculated from demographic components using the
   demographic accounting equation:

       Net Migration = Population Change - Natural Change
       Net Migration = ΔPopulation - (Births - Deaths)

Both approaches are useful:
- Official statistics are authoritative but published with a lag
- Derived estimates can be calculated for more recent periods
- Comparing both validates the demographic equation approach

Data Sources:
    **Official Migration**: https://www.nisra.gov.uk/statistics/population/long-term-international-migration-statistics

    **Derived Migration** (combines three NISRA sources):
    - **Population**: https://www.nisra.gov.uk/statistics/people-and-communities/population
    - **Births**: https://www.nisra.gov.uk/statistics/births-deaths-and-marriages/births
    - **Deaths**: https://www.nisra.gov.uk/statistics/births-deaths-and-marriages/deaths

Update Frequency: Annual (both official and derived)
Geographic Coverage: Northern Ireland
Reference Period: Mid-year (July to June) for official; Calendar year for derived

Example:
    >>> from bolster.data_sources.nisra import migration
    >>>
    >>> # Get official NISRA migration statistics
    >>> official = migration.get_official_migration()
    >>> sorted(official.columns.tolist())
    ['date', 'net_migration', 'year']

    >>> # Get derived migration estimates (from demographic equation)
    >>> derived = migration.get_derived_migration()
    >>> 'net_migration' in derived.columns
    True

    >>> # Compare official vs derived for validation
    >>> comparison = migration.compare_official_vs_derived(official, derived)
    >>> 'absolute_difference' in comparison.columns
    True
"""

import logging
import re
from pathlib import Path

import pandas as pd
from bs4 import BeautifulSoup

from bolster.utils.web import session

from . import births, deaths, population
from ._base import NISRADataNotFoundError, NISRAValidationError, download_file

[docs] logger = logging.getLogger(__name__)
[docs] def calculate_annual_births(births_df: pd.DataFrame) -> pd.DataFrame: """Aggregate monthly births data to annual totals. Args: births_df: DataFrame from births.get_latest_births(event_type='occurrence') Returns: DataFrame with columns: - year: int - births: int (total births in year) """ # Filter for 'Persons' (total) and aggregate by year annual = births_df[births_df["sex"] == "Persons"].groupby(births_df["month"].dt.year)["births"].sum().reset_index() annual.columns = ["year", "births"] return annual
[docs] def calculate_annual_deaths(deaths_df: pd.DataFrame) -> pd.DataFrame: """Aggregate weekly deaths data to annual totals. Args: deaths_df: DataFrame from deaths.get_historical_deaths() Returns: DataFrame with columns: - year: int - deaths: int (total deaths in year) """ # Use total_deaths column and aggregate by year annual = deaths_df.groupby("year")["total_deaths"].sum().reset_index() annual.columns = ["year", "deaths"] return annual
[docs] def calculate_annual_population(population_df: pd.DataFrame) -> pd.DataFrame: """Aggregate population data to annual totals for Northern Ireland. Args: population_df: DataFrame from population.get_latest_population(area='Northern Ireland') Returns: DataFrame with columns: - year: int - population: int (mid-year population estimate) """ # Filter for 'All persons' and aggregate by year return population_df[population_df["sex"] == "All persons"].groupby("year")["population"].sum().reset_index()
[docs] def derive_migration( population_df: pd.DataFrame, births_df: pd.DataFrame, deaths_df: pd.DataFrame, ) -> pd.DataFrame: """Derive net migration from demographic components. Uses the demographic accounting equation: Net Migration = ΔPopulation - (Births - Deaths) Args: population_df: DataFrame from population.get_latest_population() births_df: DataFrame from births.get_latest_births(event_type='occurrence') deaths_df: DataFrame from deaths.get_latest_deaths() Returns: DataFrame with columns: - year: int - population_start: int (population at start of year, June 30 t-1) - population_end: int (population at end of year, June 30 t) - births: int (births in calendar year) - deaths: int (deaths in calendar year) - natural_change: int (births - deaths) - population_change: int (population_end - population_start) - net_migration: int (derived migration estimate) - migration_rate: float (per 1,000 population) Raises: NISRAValidationError: If data sources cannot be aligned """ # Aggregate to annual data pop_annual = calculate_annual_population(population_df) births_annual = calculate_annual_births(births_df) deaths_annual = calculate_annual_deaths(deaths_df) # Merge all sources # Start with population data result = pop_annual.copy() result = result.rename(columns={"population": "population_end"}) # Add previous year's population (population at start of period) result["population_start"] = result["population_end"].shift(1) # Add births and deaths result = result.merge(births_annual, on="year", how="left") result = result.merge(deaths_annual, on="year", how="left") # Calculate natural change result["natural_change"] = result["births"] - result["deaths"] # Calculate population change result["population_change"] = result["population_end"] - result["population_start"] # Derive net migration # Note: This represents the residual between observed population change # and natural change. It captures net migration plus any measurement error. result["net_migration"] = result["population_change"] - result["natural_change"] # Calculate migration rate per 1,000 population # Use average of start and end population as denominator avg_population = (result["population_start"] + result["population_end"]) / 2 result["migration_rate"] = (result["net_migration"] / avg_population) * 1000 # Drop rows with missing critical data # First drop first row (no previous year population) # Then drop any rows missing births or deaths data result = result.dropna(subset=["population_start", "births", "deaths"]).reset_index(drop=True) # Convert counts to integers for col in [ "population_start", "population_end", "births", "deaths", "natural_change", "population_change", "net_migration", ]: result[col] = result[col].astype(int) # Round migration rate result["migration_rate"] = result["migration_rate"].round(2) # Log summary if not result.empty: latest_year = result["year"].max() latest_migration = result[result["year"] == latest_year]["net_migration"].values[0] date_range = f"{result['year'].min()}-{latest_year}" logger.info(f"Derived migration estimates for {len(result)} years ({date_range})") logger.info(f" Latest year ({latest_year}): Net migration = {latest_migration:+,}") return result
[docs] def get_latest_migration(force_refresh: bool = False) -> pd.DataFrame: """Get the latest derived migration estimates for Northern Ireland. Automatically downloads the most recent population, births, and deaths data, then calculates net migration using the demographic accounting equation. Args: force_refresh: If True, bypass cache and download fresh data for all sources Returns: DataFrame with columns: - year: int - population_start, population_end: int (mid-year estimates) - births, deaths: int (annual totals) - natural_change: int (births - deaths) - population_change: int (year-over-year change) - net_migration: int (derived estimate) - migration_rate: float (per 1,000 population) Example: >>> df = get_latest_migration() >>> 'net_migration' in df.columns True >>> len(df) > 0 True """ logger.info("Fetching data sources for migration calculation...") # Fetch all required data sources pop_df = population.get_latest_population(area="Northern Ireland", force_refresh=force_refresh) # For births, use occurrence data (actual birth dates) not registration dates births_df = births.get_latest_births(event_type="occurrence", force_refresh=force_refresh) # For deaths, use historical deaths data (provides annual totals) deaths_df = deaths.get_historical_deaths(force_refresh=force_refresh) # Derive migration from demographic components return derive_migration(pop_df, births_df, deaths_df)
[docs] def validate_demographic_equation(df: pd.DataFrame, tolerance: int = 100) -> bool: # pragma: no cover """Validate that the demographic accounting equation holds. Checks that: Population Change = Natural Change + Net Migration Args: df: DataFrame from derive_migration() or get_latest_migration() tolerance: Allowable difference due to rounding/measurement error (default: 100) Returns: True if validation passes Raises: NISRAValidationError: If equation doesn't hold within tolerance """ for _, row in df.iterrows(): year = row["year"] pop_change = row["population_change"] natural_change = row["natural_change"] migration = row["net_migration"] # Check equation expected_change = natural_change + migration difference = abs(pop_change - expected_change) if difference > tolerance: raise NISRAValidationError( f"Year {year}: Demographic equation violated. " f"Population change ({pop_change:,}) != Natural change ({natural_change:,}) + " f"Net migration ({migration:,}). Difference: {difference:,}" ) logger.info(f"Validation passed: Demographic equation holds for {len(df)} years") return True
[docs] def get_migration_by_year(df: pd.DataFrame, year: int) -> pd.DataFrame: """Filter migration data for a specific year. Args: df: DataFrame from get_latest_migration() year: Year to filter Returns: Filtered DataFrame Example: >>> df = get_latest_migration() >>> df_2024 = get_migration_by_year(df, 2024) >>> 'net_migration' in df_2024.columns True """ return df[df["year"] == year].reset_index(drop=True)
[docs] def get_migration_summary_statistics( df: pd.DataFrame, start_year: int | None = None, end_year: int | None = None ) -> dict: """Calculate summary statistics for migration data. Args: df: DataFrame from get_latest_migration() start_year: Optional start year for analysis period end_year: Optional end year for analysis period Returns: Dictionary with summary statistics: - total_years: Number of years analyzed - avg_net_migration: Average annual net migration - avg_migration_rate: Average migration rate per 1,000 - positive_years: Number of years with net immigration - negative_years: Number of years with net emigration - max_immigration_year: Year with highest immigration - max_immigration: Highest immigration value - max_emigration_year: Year with highest emigration - max_emigration: Highest emigration value (as negative) Example: >>> df = get_latest_migration() >>> stats = get_migration_summary_statistics(df, start_year=2010) >>> 'avg_net_migration' in stats True """ # Filter by year range if specified filtered = df.copy() if start_year: filtered = filtered[filtered["year"] >= start_year] if end_year: filtered = filtered[filtered["year"] <= end_year] # Handle empty data if filtered.empty: return { "total_years": 0, "avg_net_migration": 0.0, "avg_migration_rate": 0.0, "positive_years": 0, "negative_years": 0, "max_immigration_year": None, "max_immigration": None, "max_emigration_year": None, "max_emigration": None, } # Calculate statistics stats = { "total_years": len(filtered), "avg_net_migration": filtered["net_migration"].mean(), "avg_migration_rate": filtered["migration_rate"].mean(), "positive_years": (filtered["net_migration"] > 0).sum(), "negative_years": (filtered["net_migration"] < 0).sum(), } # Find max immigration and emigration max_immigration_row = filtered.loc[filtered["net_migration"].idxmax()] min_migration_row = filtered.loc[filtered["net_migration"].idxmin()] stats["max_immigration_year"] = int(max_immigration_row["year"]) stats["max_immigration"] = int(max_immigration_row["net_migration"]) stats["max_emigration_year"] = int(min_migration_row["year"]) stats["max_emigration"] = int(min_migration_row["net_migration"]) return stats
# ============================================================================= # Official NISRA Migration Statistics # ============================================================================= # Mother page URL for official migration publications
[docs] MIGRATION_MOTHER_PAGE = "https://www.nisra.gov.uk/statistics/population/long-term-international-migration-statistics"
[docs] def get_official_migration_publication_url() -> tuple[str, int]: """Scrape NISRA migration mother page to find latest Official estimates file. Navigates the publication structure: 1. Scrapes mother page for latest "Long-Term International Migration" publication 2. Follows link to publication detail page 3. Finds "Official" Excel file (Mig[YY][YY]-Official_1.xlsx) Returns: Tuple of (excel_file_url, publication_year) Raises: NISRADataNotFoundError: If publication or file not found """ try: response = session.get(MIGRATION_MOTHER_PAGE, timeout=30) response.raise_for_status() except Exception as e: raise NISRADataNotFoundError(f"Failed to fetch migration mother page: {e}") from e soup = BeautifulSoup(response.content, "html.parser") # Find latest publication link # Pattern: "Long-Term International Migration Statistics for Northern Ireland (YYYY)" pub_link = None pub_year = None for link in soup.find_all("a", href=True): link_text = link.get_text(strip=True) # Match publication title with year match = re.search(r"Long-Term International Migration.*\((\d{4})\)", link_text) if match and "publications" in link["href"]: year = int(match.group(1)) href = link["href"] if href.startswith("/"): href = f"https://www.nisra.gov.uk{href}" # Take first match (newest publication) pub_link = href pub_year = year logger.info(f"Found {year} Long-Term International Migration publication") break if not pub_link: raise NISRADataNotFoundError("Could not find migration publication on mother page") # Now scrape the publication page for the Official estimates Excel file try: pub_response = session.get(pub_link, timeout=30) pub_response.raise_for_status() except Exception as e: raise NISRADataNotFoundError(f"Failed to fetch publication page {pub_link}: {e}") from e pub_soup = BeautifulSoup(pub_response.content, "html.parser") # Find Excel link matching pattern: Mig[YY][YY]-Official_1.xlsx excel_url = None for a_tag in pub_soup.find_all("a", href=True): href = a_tag["href"] if "Official" in href and href.endswith(".xlsx"): # Make absolute URL if href.startswith("/"): excel_url = f"https://www.nisra.gov.uk{href}" elif not href.startswith("http"): excel_url = f"https://www.nisra.gov.uk/{href}" else: excel_url = href logger.info(f"Found Official estimates file: {excel_url}") break if not excel_url: raise NISRADataNotFoundError(f"Could not find Official estimates Excel file on {pub_link}") return excel_url, pub_year
[docs] def parse_official_migration_file(file_path: Path) -> pd.DataFrame: """Parse downloaded official migration Excel file into DataFrame. Extracts Table 1.1 (Net International Migration time series) from the Official estimates file and transforms it into long-format DataFrame. Args: file_path: Path to downloaded Mig[YY][YY]-Official_1.xlsx file Returns: DataFrame with columns: - year: int (mid-year) - net_migration: int (net international migration) - date: pd.Timestamp (reference date, June 30 of end year) Raises: NISRAValidationError: If file format is unexpected or parsing fails """ try: # Read Table 1.1 - Net International Migration time series # skiprows=2 to skip title and subtitle rows df = pd.read_excel(file_path, sheet_name="Table 1.1", skiprows=2) except Exception as e: raise NISRAValidationError(f"Failed to read Table 1.1 from {file_path}: {e}") from e # Find time period column (first column usually) time_col = df.columns[0] # Find net migration column (contains "Net" and "Migration" and "International") net_col = None for col in df.columns: if "Net" in str(col) and "Migration" in str(col) and "International" in str(col): net_col = col break if net_col is None: raise NISRAValidationError("Could not find 'Net International Migration' column in Table 1.1") # Extract relevant columns and clean result = pd.DataFrame( { "time_period": df[time_col], "net_migration": df[net_col], } ) # Remove rows with NaN in critical columns result = result.dropna(subset=["time_period", "net_migration"]) # Parse year from time_period (format: "Jul YYYY - Jun YYYY" or "YYYY - YYYY") # Extract the first year mentioned def extract_year(period_str): match = re.search(r"(\d{4})", str(period_str)) return int(match.group(1)) if match else None result["year"] = result["time_period"].apply(extract_year) result = result.dropna(subset=["year"]) result["year"] = result["year"].astype(int) # Create date column (reference date is June 30 of the end year, so add 1) result["date"] = pd.to_datetime(result["year"].astype(str) + "-06-30") + pd.DateOffset(years=1) # Convert net_migration to int result["net_migration"] = result["net_migration"].astype(int) # Select final columns result = result[["year", "net_migration", "date"]] # Sort by year result = result.sort_values("year").reset_index(drop=True) logger.info(f"Parsed official migration data: {len(result)} years ({result['year'].min()}-{result['year'].max()})") return result
[docs] def validate_official_migration(df: pd.DataFrame) -> bool: """Validate official migration data quality. Args: df: DataFrame from parse_official_migration_file() or get_official_migration() Returns: True if validation passes Raises: NISRAValidationError: If validation fails """ # Check for empty DataFrame if df.empty: raise NISRAValidationError("DataFrame is empty") # Check for required columns required_cols = ["year", "net_migration", "date"] missing_cols = [col for col in required_cols if col not in df.columns] if missing_cols: raise NISRAValidationError(f"Missing required columns: {missing_cols}") # Check for reasonable values (net migration should not exceed NI population ~1.9M) if (df["net_migration"].abs() > 1_900_000).any(): raise NISRAValidationError("Found unreasonably large net_migration values") logger.info(f"Validation passed: Official migration data consistent for {len(df)} years") return True
[docs] def get_official_migration(force_refresh: bool = False) -> pd.DataFrame: """Get the latest official NISRA migration statistics. Automatically downloads the most recent official migration estimates from NISRA and parses them into a structured DataFrame. Args: force_refresh: If True, bypass cache and download fresh data Returns: DataFrame with columns: - year: int (mid-year) - net_migration: int (net international migration) - date: pd.Timestamp (reference date) Raises: NISRADataNotFoundError: If publication cannot be found NISRAValidationError: If data fails validation Example: >>> official = get_official_migration() >>> sorted(official.columns.tolist()) ['date', 'net_migration', 'year'] >>> len(official) > 0 True """ logger.info("Fetching latest official migration statistics...") # Get latest publication URL excel_url, pub_year = get_official_migration_publication_url() # Download file (with caching, TTL=24 hours for annual data) file_path = download_file(excel_url, cache_ttl_hours=24, force_refresh=force_refresh) # Parse into DataFrame df = parse_official_migration_file(file_path) # Validate data validate_official_migration(df) logger.info(f"Successfully loaded official migration data: {len(df)} years, latest {df['year'].max()}") return df
# Alias for backward compatibility with existing derived migration function
[docs] get_derived_migration = get_latest_migration
[docs] def compare_official_vs_derived( official_df: pd.DataFrame, derived_df: pd.DataFrame, threshold: int = 1000, ) -> pd.DataFrame: """Compare official migration data with derived estimates for validation. Args: official_df: DataFrame from get_official_migration() derived_df: DataFrame from get_derived_migration() / get_latest_migration() threshold: Absolute difference threshold for flagging discrepancies (default: 1000) Returns: DataFrame with columns: - year: int - official_net_migration: int - derived_net_migration: int - absolute_difference: int - percent_difference: float - exceeds_threshold: bool Example: >>> official = get_official_migration() >>> derived = get_derived_migration() >>> comparison = compare_official_vs_derived(official, derived) >>> sorted(comparison.columns.tolist()) ['absolute_difference', 'derived_net_migration', 'exceeds_threshold', 'official_net_migration', 'percent_difference', 'year'] """ # Merge on year (inner join to get only overlapping years) comparison = official_df[["year", "net_migration"]].merge( derived_df[["year", "net_migration"]], on="year", suffixes=("_official", "_derived"), ) # Rename columns for clarity comparison = comparison.rename( columns={ "net_migration_official": "official_net_migration", "net_migration_derived": "derived_net_migration", } ) # Calculate differences comparison["absolute_difference"] = ( comparison["derived_net_migration"] - comparison["official_net_migration"] ).abs() comparison["percent_difference"] = ( (comparison["derived_net_migration"] - comparison["official_net_migration"]) / comparison["official_net_migration"].abs() * 100 ) # Flag discrepancies exceeding threshold comparison["exceeds_threshold"] = comparison["absolute_difference"] > threshold # Sort by year comparison = comparison.sort_values("year").reset_index(drop=True) # Log summary mean_abs_diff = comparison["absolute_difference"].mean() mean_pct_diff = comparison["percent_difference"].abs().mean() flagged_years = comparison["exceeds_threshold"].sum() logger.info(f"Cross-validation complete: {len(comparison)} overlapping years") logger.info(f" Mean absolute difference: {mean_abs_diff:,.0f} people") logger.info(f" Mean percentage difference: {mean_pct_diff:.1f}%") logger.info(f" Years exceeding threshold ({threshold}): {flagged_years}") return comparison