Data Source Development Guide
This guide provides comprehensive instructions for adding new data sources to Bolster. It is based on the patterns established in the NISRA (Northern Ireland Statistics and Research Agency) modules and will help you create consistent, maintainable, and well-tested data source integrations.
Overview
What is a Data Source Module?
A data source module in Bolster provides programmatic access to external datasets through:
Automatic data discovery: Scraping publication pages to find the latest data files
Intelligent caching: Avoiding unnecessary downloads with configurable TTL
Consistent parsing: Converting raw data (Excel, CSV, etc.) to pandas DataFrames
Data validation: Ensuring data integrity and consistency
Convenience functions: Helper functions for common data analysis tasks
CLI integration: Command-line access for end users
Design Philosophy
Bolster data source modules follow these principles:
Zero configuration for common cases:
get_latest_X()should “just work”Cache by default: Respect bandwidth and remote servers
Fail gracefully: Provide clear error messages when data is unavailable
Document the source: Always include URLs to the original data
Validate rigorously: Don’t trust external data without verification
Test comprehensively: Integrity tests ensure modules don’t break silently
Getting Started
Directory Structure
Data source modules live in src/bolster/data_sources/<source_name>/:
src/bolster/data_sources/
└── <source_name>/ # e.g., "nisra", "psni", "doj"
├── __init__.py # Public API exports
├── _base.py # Shared utilities (optional but recommended)
├── README.md # Module-level documentation
├── <dataset_1>.py # Individual dataset module
├── <dataset_2>.py # Individual dataset module
└── ...
Example (NISRA structure):
src/bolster/data_sources/nisra/
├── __init__.py # Exports all public functions
├── _base.py # Common utilities: caching, Excel parsing, errors
├── README.md # Comprehensive module documentation
├── population.py # Mid-year population estimates
├── births.py # Birth registrations
├── deaths.py # Death registrations
├── marriages.py # Marriage registrations
└── ... # 10 total dataset modules
Naming Conventions
Use these naming patterns for consistency:
Module directory:
<organization>(e.g.,nisra,psni,doj)Dataset files:
<dataset_topic>.py(e.g.,population.py,crime_statistics.py)Base utilities:
_base.py(underscore prefix indicates internal)Tests:
tests/test_<source>_<dataset>_integrity.py
Core Components
The _base.py Module
Create _base.py to share common functionality across dataset modules:
"""Common utilities for <SOURCE> data sources."""
import hashlib
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
import requests
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
# Cache directory
CACHE_DIR = Path.home() / ".cache" / "bolster" / "<source_name>"
CACHE_DIR.mkdir(parents=True, exist_ok=True)
Essential Components in _base.py:
Custom Exceptions:
class SourceDataError(Exception): """Base exception for data source errors.""" pass class SourceDataNotFoundError(SourceDataError): """Data file not available.""" pass class SourceValidationError(SourceDataError): """Data validation failed.""" pass
Caching Utilities:
def hash_url(url: str) -> str: """Generate a safe filename from a URL.""" return hashlib.md5(url.encode()).hexdigest() def get_cached_file(url: str, cache_ttl_hours: int = 24) -> Optional[Path]: """Return cached file if exists and fresh, else None.""" url_hash = hash_url(url) ext = Path(url).suffix or ".bin" cache_path = CACHE_DIR / f"{url_hash}{ext}" if cache_path.exists(): age = datetime.now() - datetime.fromtimestamp(cache_path.stat().st_mtime) if age.total_seconds() < cache_ttl_hours * 3600: logger.info(f"Using cached file: {cache_path}") return cache_path return None def download_file(url: str, cache_ttl_hours: int = 24, force_refresh: bool = False) -> Path: """Download a file with caching support.""" # Check cache first if not force_refresh: cached = get_cached_file(url, cache_ttl_hours) if cached: return cached # Download and cache url_hash = hash_url(url) ext = Path(url).suffix or ".bin" cache_path = CACHE_DIR / f"{url_hash}{ext}" try: logger.info(f"Downloading {url}") response = requests.get(url, timeout=30) response.raise_for_status() cache_path.write_bytes(response.content) logger.info(f"Saved to {cache_path}") return cache_path except requests.RequestException as e: raise SourceDataNotFoundError(f"Failed to download {url}: {e}")
Scraping Utilities:
def scrape_download_links(page_url: str, file_extension: str = ".xlsx") -> List[dict]: """Scrape download links from a page. Returns: List of dicts with 'url' and 'text' keys """ try: response = requests.get(page_url, timeout=30) response.raise_for_status() except requests.RequestException as e: raise SourceDataError(f"Failed to fetch page {page_url}: {e}") soup = BeautifulSoup(response.content, "html.parser") links = [] for a_tag in soup.find_all("a", href=True): href = a_tag["href"] if file_extension in href.lower(): # Make absolute URL if href.startswith("/"): url = f"https://www.example.gov.uk{href}" elif not href.startswith("http"): url = f"https://www.example.gov.uk/{href}" else: url = href links.append({"url": url, "text": a_tag.get_text(strip=True)}) return links
Excel Parsing Utilities (if working with Excel files):
def safe_int(val) -> Optional[int]: """Safely convert value to integer, handling placeholders.""" if val is None or val == "" or val == "-": return None try: return int(val) except (ValueError, TypeError): return None def safe_float(val) -> Optional[float]: """Safely convert value to float, handling placeholders.""" if val is None or val == "" or val == "-": return None try: return float(val) except (ValueError, TypeError): return None def find_header_row(sheet, expected_columns: List[str], max_rows: int = 20) -> Optional[int]: """Find the row number containing expected column headers. Args: sheet: openpyxl worksheet object expected_columns: Column names to search for (case-insensitive) max_rows: Maximum rows to search Returns: 1-based row number where headers are found, or None """ for row_idx, row in enumerate(sheet.iter_rows( min_row=1, max_row=max_rows, values_only=True), 1): row_str = [str(cell).lower() if cell else "" for cell in row] matches = sum(1 for expected in expected_columns if any(expected.lower() in cell for cell in row_str)) if matches == len(expected_columns): logger.debug(f"Found header row at row {row_idx}") return row_idx logger.warning(f"Could not find header row with columns: {expected_columns}") return None
Individual Dataset Modules
Each dataset gets its own module file (e.g., population.py, crime_statistics.py).
Module Template:
"""<Dataset Name> Data Source.
<Brief description of the dataset>
Data includes:
- <Key feature 1>
- <Key feature 2>
- <Key feature 3>
Data Source:
**Mother Page**: <URL to main listings page>
<Description of how data is discovered and accessed>
Update Frequency: <e.g., Monthly, Annual, Quarterly>
Geographic Coverage: <e.g., Northern Ireland, UK>
Reference Date: <What the dates in the data represent>
Example:
>>> from bolster.data_sources.<source> import <dataset>
>>> # Get latest data
>>> df = <dataset>.get_latest_<dataset>()
>>> print(df.head())
"""
import logging
from pathlib import Path
from typing import Tuple, Union
import pandas as pd
from ._base import (
SourceDataNotFoundError,
SourceValidationError,
download_file
)
logger = logging.getLogger(__name__)
# Base URL for this dataset
BASE_URL = "https://www.example.gov.uk/statistics/<topic>"
Required Functions:
get_latest_<dataset>_publication_url() -> Tuple[str, …]
Scrapes the source website to find the most recent data file.
def get_latest_<dataset>_publication_url() -> Tuple[str, int]: """Scrape source to find the latest data file. Returns: Tuple of (excel_file_url, year) Raises: SourceDataNotFoundError: If publication or file not found """ import requests from bs4 import BeautifulSoup try: response = requests.get(BASE_URL, timeout=30) response.raise_for_status() except requests.RequestException as e: raise SourceDataNotFoundError(f"Failed to fetch page: {e}") soup = BeautifulSoup(response.content, "html.parser") # Find the latest publication link # (Customize this for your source's HTML structure) for link in soup.find_all("a", href=True): link_text = link.get_text(strip=True) # Look for patterns in link text if "Latest Report" in link_text: href = link["href"] if href.startswith("/"): href = f"https://www.example.gov.uk{href}" # Extract year or other metadata year = 2024 # Parse from link_text or URL return href, year raise SourceDataNotFoundError("Could not find latest publication")
parse_<dataset>_file(file_path: Union[str, Path], …) -> pd.DataFrame
Parses the downloaded file into a pandas DataFrame.
def parse_<dataset>_file( file_path: Union[str, Path], # Add optional parameters for filtering/customization ) -> pd.DataFrame: """Parse data file into DataFrame. Args: file_path: Path to the data file Returns: DataFrame with columns: - column1: Description - column2: Description ... Raises: SourceValidationError: If file structure is unexpected """ file_path = Path(file_path) try: # Read the file (adjust for Excel, CSV, etc.) df = pd.read_excel(file_path, sheet_name="Data") except Exception as e: raise SourceValidationError(f"Failed to read file: {e}") # Validate expected columns expected_cols = {"col1", "col2", "col3"} if not expected_cols.issubset(df.columns): missing = expected_cols - set(df.columns) raise SourceValidationError(f"Missing columns: {missing}") # Clean and transform data df = df.rename(columns={"old_name": "new_name"}) df["date"] = pd.to_datetime(df["date"]) df = df.sort_values("date").reset_index(drop=True) # Log summary logger.info(f"Parsed {len(df)} records from {file_path.name}") return df
get_latest_<dataset>(force_refresh: bool = False, …) -> pd.DataFrame
Convenience function that discovers, downloads, and parses in one call.
def get_latest_<dataset>( force_refresh: bool = False, ) -> pd.DataFrame: """Get the latest data with automatic discovery and caching. Args: force_refresh: If True, bypass cache and download fresh data Returns: DataFrame with parsed data Raises: SourceDataNotFoundError: If latest publication cannot be found SourceValidationError: If file structure is unexpected Example: >>> df = get_latest_<dataset>() >>> print(df.shape) >>> # Filter and analyze >>> df_2024 = df[df['year'] == 2024] """ # Discover latest publication excel_url, year = get_latest_<dataset>_publication_url() logger.info(f"Downloading {year} data from: {excel_url}") # Download with appropriate cache TTL # Use longer TTL for annual data, shorter for frequent updates cache_ttl_hours = 180 * 24 # 180 days for annual data file_path = download_file( excel_url, cache_ttl_hours=cache_ttl_hours, force_refresh=force_refresh ) # Parse and return return parse_<dataset>_file(file_path)
Optional but Recommended Functions:
Validation Functions:
def validate_<dataset>_<check>(df: pd.DataFrame) -> bool: """Validate data integrity. Args: df: DataFrame from parse_<dataset>_file or get_latest_<dataset> Returns: True if validation passes Raises: SourceValidationError: If validation fails """ # Perform validation checks # Example: Check totals add up for group_key, group_data in df.groupby("category"): total = group_data["total"].iloc[0] parts_sum = group_data["part1"].sum() + group_data["part2"].sum() if total != parts_sum: raise SourceValidationError( f"{group_key}: Total ({total}) != Sum of parts ({parts_sum})" ) logger.info(f"Validation passed: {len(df)} records checked") return True
Helper/Filter Functions:
def get_<dataset>_by_year(df: pd.DataFrame, year: int) -> pd.DataFrame: """Filter data for a specific year.""" return df[df["year"] == year].reset_index(drop=True) def get_<dataset>_summary(df: pd.DataFrame) -> pd.DataFrame: """Calculate summary statistics.""" return df.groupby("category").agg({ "value": ["sum", "mean", "count"] }).reset_index()
The __init__.py File
Export the public API from __init__.py:
"""<Source Name> Data Sources.
This module provides access to <organization> open data including:
- <Dataset 1>: <Brief description>
- <Dataset 2>: <Brief description>
- ...
Example:
>>> from bolster.data_sources.<source> import <dataset>
>>> df = <dataset>.get_latest_<dataset>()
See individual module docstrings for detailed documentation.
"""
# Import key functions from each module
from .<dataset_1> import (
get_latest_<dataset_1>,
parse_<dataset_1>_file,
validate_<dataset_1>,
)
from .<dataset_2> import (
get_latest_<dataset_2>,
parse_<dataset_2>_file,
)
# Import exceptions for users
from ._base import (
SourceDataError,
SourceDataNotFoundError,
SourceValidationError,
)
__all__ = [
# Dataset 1
"get_latest_<dataset_1>",
"parse_<dataset_1>_file",
"validate_<dataset_1>",
# Dataset 2
"get_latest_<dataset_2>",
"parse_<dataset_2>_file",
# Exceptions
"SourceDataError",
"SourceDataNotFoundError",
"SourceValidationError",
]
Documentation Standards
Module-Level Documentation
Every dataset module must have comprehensive docstrings:
First paragraph: Brief description (1-2 sentences)
“Data includes:” section: Bullet list of key features
“Data Source:” section: - Mother Page: URL to main listings/publications page - Description of how data is discovered
Metadata: - Update Frequency: How often data is published - Geographic Coverage: Region covered - Reference Date: What dates in the data represent
Example: Short code snippet showing basic usage
Example (from NISRA population.py):
"""NISRA Mid-Year Population Estimates Data Source.
Provides access to mid-year population estimates for Northern Ireland with breakdowns by:
- Geography (Northern Ireland, Parliamentary Constituencies, Health and Social Care Trusts)
- Sex (All persons, Males, Females)
- Age (5-year age bands: 00-04, 05-09, ..., 85-89, 90+)
- Year (1971-present for NI overall, 2021-present for sub-geographies)
Mid-year estimates are referenced to June 30th of each year.
Data Source:
**Mother Page**: https://www.nisra.gov.uk/statistics/people-and-communities/population
This page lists all population statistics publications in reverse chronological order
(newest first). The module automatically scrapes this page to find the latest
"Mid-Year Population Estimates for Small Geographical Areas" publication, then downloads
the age bands Excel file from that publication's detail page.
Update Frequency: Annual (published ~6 months after reference date)
Geographic Coverage: Northern Ireland
Reference Date: June 30th of each year
Example:
>>> from bolster.data_sources.nisra import population
>>> # Get latest population estimates
>>> df = population.get_latest_population()
>>> print(df.head())
"""
Function Documentation
All public functions must have complete docstrings with:
Brief description: One-line summary
Args: Type-annotated parameters with descriptions
Returns: Description of return value with column details for DataFrames
Raises: Exception types and conditions
Example: Usage example with realistic code
Example:
def get_latest_population(
area: Optional[
Literal[
"all",
"Northern Ireland",
"Parliamentary Constituencies (2024)",
"Health and Social Care Trusts",
]
] = "all",
force_refresh: bool = False,
) -> pd.DataFrame:
"""Get the latest mid-year population estimates.
Automatically discovers and downloads the most recent population estimates
from the NISRA website.
Args:
area: Which geographic area(s) to return (default: "all")
force_refresh: If True, bypass cache and download fresh data
Returns:
DataFrame with columns:
- area, area_code, area_name: Geographic identifiers
- year: Reference year
- sex: "All persons", "Males", or "Females"
- age_5: 5-year age band
- age_band, age_broad: Alternative age groupings
- population: Mid-year estimate
Raises:
NISRADataNotFoundError: If latest publication cannot be found
NISRAValidationError: If file structure is unexpected
Example:
>>> # Get all data
>>> df = get_latest_population()
>>> # Get only Northern Ireland overall
>>> ni_df = get_latest_population(area='Northern Ireland')
>>> # Calculate total NI population in latest year
>>> ni_2024 = ni_df[(ni_df['year'] == 2024) & (ni_df['sex'] == 'All persons')]
>>> total = ni_2024['population'].sum()
"""
Module README
Create README.md in the module directory with:
Overview: What data is available
Quick Start: Basic usage examples
Available Datasets: Table of all dataset modules
Common Patterns: Frequently used analysis patterns
Data Quality: Notes on validation and limitations
License/Attribution: Data source licenses and attribution requirements
See src/bolster/data_sources/nisra/README.md for a complete example.
Testing
Integrity Tests
Create comprehensive integrity tests in tests/test_<source>_<dataset>_integrity.py:
"""Integrity tests for <source> <dataset> data source."""
import pytest
import pandas as pd
from bolster.data_sources.<source> import <dataset>
@pytest.fixture(scope="module")
def data():
"""Load data once for all tests."""
return <dataset>.get_latest_<dataset>()
class TestDataStructure:
"""Test data structure and format."""
def test_returns_dataframe(self, data):
"""Should return a pandas DataFrame."""
assert isinstance(data, pd.DataFrame)
def test_not_empty(self, data):
"""Should contain data."""
assert len(data) > 0
def test_has_expected_columns(self, data):
"""Should have all expected columns."""
expected = {"column1", "column2", "column3"}
assert expected.issubset(set(data.columns))
def test_no_duplicate_keys(self, data):
"""Should have no duplicate keys."""
key_cols = ["col1", "col2"]
assert not data.duplicated(subset=key_cols).any()
class TestDataQuality:
"""Test data quality and integrity."""
def test_no_null_in_required_columns(self, data):
"""Required columns should have no null values."""
required = ["column1", "column2"]
for col in required:
assert data[col].notna().all(), f"{col} has null values"
def test_valid_date_range(self, data):
"""Dates should be within reasonable range."""
assert data["date"].min() >= pd.Timestamp("2000-01-01")
assert data["date"].max() <= pd.Timestamp.now()
def test_numeric_columns_positive(self, data):
"""Numeric values should be non-negative."""
numeric_cols = ["value1", "value2"]
for col in numeric_cols:
assert (data[col] >= 0).all(), f"{col} has negative values"
class TestDataValidation:
"""Test data validation functions."""
def test_validation_passes(self, data):
"""Data validation should pass."""
assert <dataset>.validate_<dataset>(data)
def test_totals_match_sum_of_parts(self, data):
"""Total columns should equal sum of constituent parts."""
for _, row in data.iterrows():
total = row["total"]
parts_sum = row["part1"] + row["part2"]
assert abs(total - parts_sum) < 0.01, \
f"Total {total} != Parts sum {parts_sum}"
class TestHelperFunctions:
"""Test helper and filter functions."""
def test_filter_by_year(self, data):
"""Should filter to specific year."""
years = data["year"].unique()
test_year = years[0]
filtered = <dataset>.get_<dataset>_by_year(data, test_year)
assert (filtered["year"] == test_year).all()
assert len(filtered) > 0
def test_summary_statistics(self, data):
"""Should calculate summary statistics."""
summary = <dataset>.get_<dataset>_summary(data)
assert isinstance(summary, pd.DataFrame)
assert len(summary) > 0
Test Coverage Requirements:
Aim for >90% code coverage
Test all public functions
Test error conditions (invalid inputs, missing files, etc.)
Test data validation logic
Mock network calls where appropriate to avoid flakiness
CLI Integration
Adding CLI Commands
If your data source would benefit from CLI access, add commands to src/bolster/cli.py:
@cli.command("get-<dataset>")
@click.option(
"--output",
"-o",
type=click.Path(),
default="<dataset>.csv",
help="Output CSV file path",
)
@click.option(
"--year",
type=int,
help="Filter to specific year",
)
@click.option(
"--force-refresh",
is_flag=True,
help="Force download fresh data (bypass cache)",
)
def get_<dataset>_cli(output, year, force_refresh):
"""Download <dataset> data from <source>.
Examples:
# Download latest data to default file
bolster get-<dataset>
# Download data for specific year
bolster get-<dataset> --year 2024
# Force fresh download
bolster get-<dataset> --force-refresh
"""
try:
click.echo(f"Fetching <dataset> data...")
# Get data
df = get_latest_<dataset>(force_refresh=force_refresh)
# Apply filters
if year:
df = df[df["year"] == year]
click.echo(f"Filtered to year {year}: {len(df)} records")
# Save to file
df.to_csv(output, index=False)
click.echo(f"✓ Saved {len(df)} records to {output}")
except Exception as e:
click.echo(f"Error: {e}", err=True)
raise click.Abort()
Python Compatibility
Python 3.9+ Type Hints
CRITICAL: All type hints must be compatible with Python 3.9+.
DO NOT USE (Python 3.10+ only):
# ❌ WRONG - PEP 604 union syntax not in Python 3.9
def parse_file(file_path: str | Path) -> pd.DataFrame:
...
# ❌ WRONG - Lowercase generics not in Python 3.9
def get_links(page_url: str) -> list[dict]:
...
def get_mapping() -> dict[str, int]:
...
def get_url() -> tuple[str, int]:
...
USE INSTEAD (Python 3.9+ compatible):
# ✓ CORRECT - Use typing module
from typing import Dict, List, Optional, Tuple, Union
def parse_file(file_path: Union[str, Path]) -> pd.DataFrame:
...
def get_links(page_url: str) -> List[dict]:
...
def get_mapping() -> Dict[str, int]:
...
def get_url() -> Tuple[str, int]:
...
# For optional union types
def get_optional_area(area: Optional[str] = None) -> Optional[pd.DataFrame]:
...
Common Patterns:
from typing import Dict, List, Literal, Optional, Tuple, Union
from pathlib import Path
import pandas as pd
# Union types
def parse(file_path: Union[str, Path]) -> pd.DataFrame:
...
# Optional parameters
def get_data(year: Optional[int] = None) -> pd.DataFrame:
...
# List/Dict/Tuple generics
def scrape() -> List[dict]:
...
def map_columns() -> Dict[str, int]:
...
def discover() -> Tuple[str, int]:
...
# Complex Optional[Literal[...]]
def parse_file(
file_path: Union[str, Path],
area: Optional[Literal["all", "Northern Ireland", "Regions"]] = "all",
) -> pd.DataFrame:
...
Common Patterns and Best Practices
Caching Strategy
Choose appropriate cache TTL based on update frequency:
# Annual data (population, census): 180 days
cache_ttl_hours = 180 * 24
# Quarterly data (economic indicators): 60 days
cache_ttl_hours = 60 * 24
# Monthly data (births, marriages): 30 days
cache_ttl_hours = 30 * 24
# Weekly data (provisional stats): 7 days
cache_ttl_hours = 7 * 24
# Daily data (real-time feeds): 1 day
cache_ttl_hours = 24
Error Handling
Use specific exception types for different failure modes:
try:
# Network request
response = requests.get(url, timeout=30)
response.raise_for_status()
except requests.RequestException as e:
raise SourceDataNotFoundError(f"Failed to fetch page: {e}")
try:
# File parsing
df = pd.read_excel(file_path)
except Exception as e:
raise SourceValidationError(f"Failed to parse file: {e}")
# Data validation
if expected_cols not in df.columns:
raise SourceValidationError(f"Missing columns: {expected_cols}")
Logging
Use appropriate log levels:
import logging
logger = logging.getLogger(__name__)
# Info: Normal operations
logger.info(f"Downloading {year} data from {url}")
logger.info(f"Parsed {len(df)} records")
logger.info("Validation passed")
# Debug: Detailed information for troubleshooting
logger.debug(f"Found header row at row {row_idx}")
logger.debug(f"Cache hit: {cache_path}")
# Warning: Potential issues that don't stop execution
logger.warning(f"Could not find column: {col_name}, using default")
logger.warning(f"{missing_count} records have missing data")
RSS Feed Discovery
For sources with RSS feeds, use bolster.utils.rss for automatic discovery:
from bolster.utils.rss import find_feed, parse_feed
# Find RSS feed for a page
feed_url = find_feed("https://www.example.gov.uk/statistics")
if feed_url:
# Parse feed entries
entries = parse_feed(feed_url)
# Find latest publication
for entry in entries:
if "Latest Report" in entry.title:
publication_url = entry.link
publication_date = entry.published_parsed
break
Excel Parsing Tips
Common patterns for handling messy Excel files:
import openpyxl
# Skip header rows
df = pd.read_excel(file_path, sheet_name="Data", skiprows=3)
# Read specific range
wb = openpyxl.load_workbook(file_path)
sheet = wb["Table 1"]
# Find header row dynamically
header_row = find_header_row(sheet, ["Year", "Total", "Male", "Female"])
# Extract column mapping
col_map = extract_column_mapping(sheet, header_row, ["Year", "Total"])
# Read data starting from header
data = []
for row in sheet.iter_rows(min_row=header_row + 1, values_only=True):
if row[col_map["Year"]]: # Skip empty rows
data.append({
"year": safe_int(row[col_map["Year"]]),
"total": safe_int(row[col_map["Total"]]),
})
df = pd.DataFrame(data)
# Handle merged cells
# Openpyxl returns None for merged cells except the top-left
# Use forward-fill for category columns
df["category"] = df["category"].ffill()
Example Walkthrough: PSNI Crime Statistics
Let’s walk through creating a new data source for PSNI (Police Service of Northern Ireland) crime statistics.
Step 1: Research the Data Source
Find the data page: https://www.psni.police.uk/about-us/our-publications-and-reports/official-statistics/police-recorded-crime-statistics
Identify patterns: - Monthly Excel files - Files named like “Police Recorded Crime Apr 2024.xlsx” - Contains sheets: “Summary”, “Offence Type”, “Geographic Area”
Determine update frequency: Monthly (published ~15th of following month)
Check if RSS available: Yes, https://www.psni.police.uk/rss/official-statistics
Step 2: Create Directory Structure
mkdir -p src/bolster/data_sources/psni
touch src/bolster/data_sources/psni/__init__.py
touch src/bolster/data_sources/psni/_base.py
touch src/bolster/data_sources/psni/crime_statistics.py
touch src/bolster/data_sources/psni/README.md
Step 3: Create _base.py
"""Common utilities for PSNI data sources."""
import hashlib
import logging
from datetime import datetime
from pathlib import Path
from typing import List, Optional
import requests
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
# Cache directory
CACHE_DIR = Path.home() / ".cache" / "bolster" / "psni"
CACHE_DIR.mkdir(parents=True, exist_ok=True)
class PSNIDataError(Exception):
"""Base exception for PSNI data errors."""
pass
class PSNIDataNotFoundError(PSNIDataError):
"""Data file not available."""
pass
class PSNIValidationError(PSNIDataError):
"""Data validation failed."""
pass
def hash_url(url: str) -> str:
"""Generate a safe filename from a URL."""
return hashlib.md5(url.encode()).hexdigest()
def get_cached_file(url: str, cache_ttl_hours: int = 24) -> Optional[Path]:
"""Return cached file if exists and fresh, else None."""
url_hash = hash_url(url)
ext = Path(url).suffix or ".bin"
cache_path = CACHE_DIR / f"{url_hash}{ext}"
if cache_path.exists():
age = datetime.now() - datetime.fromtimestamp(cache_path.stat().st_mtime)
if age.total_seconds() < cache_ttl_hours * 3600:
logger.info(f"Using cached file: {cache_path}")
return cache_path
return None
def download_file(url: str, cache_ttl_hours: int = 24,
force_refresh: bool = False) -> Path:
"""Download a file with caching support."""
if not force_refresh:
cached = get_cached_file(url, cache_ttl_hours)
if cached:
return cached
url_hash = hash_url(url)
ext = Path(url).suffix or ".bin"
cache_path = CACHE_DIR / f"{url_hash}{ext}"
try:
logger.info(f"Downloading {url}")
response = requests.get(url, timeout=30)
response.raise_for_status()
cache_path.write_bytes(response.content)
logger.info(f"Saved to {cache_path}")
return cache_path
except requests.RequestException as e:
raise PSNIDataNotFoundError(f"Failed to download {url}: {e}")
Step 4: Create crime_statistics.py
"""PSNI Police Recorded Crime Statistics.
Provides access to monthly police recorded crime statistics for Northern Ireland.
Data includes:
- Monthly crime counts by offence type
- Geographic breakdown by policing district
- Outcome classifications (detected, undetected, etc.)
- Historical trends from 2000 onwards
Data Source:
**Mother Page**: https://www.psni.police.uk/about-us/our-publications-and-reports/official-statistics/police-recorded-crime-statistics
The module uses PSNI's RSS feed to discover the latest monthly crime statistics
publication, then downloads the Excel file.
Update Frequency: Monthly (published ~15th of following month)
Geographic Coverage: Northern Ireland (by policing district)
Reference Date: Month of crime occurrence
Example:
>>> from bolster.data_sources.psni import crime_statistics
>>> # Get latest crime data
>>> df = crime_statistics.get_latest_crime_statistics()
>>> print(df.head())
"""
import logging
import re
from datetime import datetime
from pathlib import Path
from typing import Tuple, Union
import pandas as pd
from bolster.utils.rss import find_feed, parse_feed
from ._base import PSNIDataNotFoundError, PSNIValidationError, download_file
logger = logging.getLogger(__name__)
BASE_URL = "https://www.psni.police.uk/about-us/our-publications-and-reports/official-statistics/police-recorded-crime-statistics"
def get_latest_crime_statistics_publication_url() -> Tuple[str, datetime]:
"""Find the latest crime statistics file via RSS feed.
Returns:
Tuple of (excel_file_url, publication_date)
Raises:
PSNIDataNotFoundError: If publication not found
"""
# Find RSS feed for the page
feed_url = find_feed(BASE_URL)
if not feed_url:
raise PSNIDataNotFoundError("Could not find RSS feed for PSNI statistics")
# Parse feed entries
entries = parse_feed(feed_url)
# Look for latest "Police Recorded Crime" publication
for entry in entries:
if "Police Recorded Crime" in entry.title:
# Entry has link to detail page, need to scrape for Excel file
import requests
from bs4 import BeautifulSoup
try:
response = requests.get(entry.link, timeout=30)
response.raise_for_status()
except requests.RequestException as e:
continue
soup = BeautifulSoup(response.content, "html.parser")
# Find Excel file link
for link in soup.find_all("a", href=True):
href = link["href"]
if href.endswith(".xlsx") or href.endswith(".xls"):
if href.startswith("/"):
href = f"https://www.psni.police.uk{href}"
pub_date = datetime(*entry.published_parsed[:6])
logger.info(f"Found crime statistics: {entry.title}")
return href, pub_date
raise PSNIDataNotFoundError("Could not find crime statistics publication in RSS feed")
def parse_crime_statistics_file(file_path: Union[str, Path]) -> pd.DataFrame:
"""Parse PSNI crime statistics Excel file.
Args:
file_path: Path to the Excel file
Returns:
DataFrame with columns:
- month: datetime (first day of month)
- offence_type: str (crime category)
- district: str (policing district)
- count: int (number of recorded crimes)
- detected: int (crimes with outcome)
Raises:
PSNIValidationError: If file structure is unexpected
"""
file_path = Path(file_path)
try:
# Read the main data sheet
df = pd.read_excel(file_path, sheet_name="Offence Type", skiprows=2)
except Exception as e:
raise PSNIValidationError(f"Failed to read file: {e}")
# Validate expected columns
expected_cols = {"Month", "Offence Type", "Total"}
if not expected_cols.issubset(df.columns):
missing = expected_cols - set(df.columns)
raise PSNIValidationError(f"Missing columns: {missing}")
# Clean and transform
df = df.rename(columns={
"Month": "month",
"Offence Type": "offence_type",
"Total": "count",
})
df["month"] = pd.to_datetime(df["month"])
df = df.sort_values(["month", "offence_type"]).reset_index(drop=True)
logger.info(f"Parsed {len(df)} crime records from {file_path.name}")
return df
def get_latest_crime_statistics(force_refresh: bool = False) -> pd.DataFrame:
"""Get the latest police recorded crime statistics.
Args:
force_refresh: If True, bypass cache and download fresh data
Returns:
DataFrame with monthly crime statistics
Raises:
PSNIDataNotFoundError: If latest publication cannot be found
PSNIValidationError: If file structure is unexpected
Example:
>>> df = get_latest_crime_statistics()
>>> # Filter to latest year
>>> latest_year = df["month"].max().year
>>> df_2024 = df[df["month"].dt.year == latest_year]
>>> print(f"Total crimes in {latest_year}: {df_2024['count'].sum():,}")
"""
# Discover via RSS
excel_url, pub_date = get_latest_crime_statistics_publication_url()
logger.info(f"Downloading crime statistics ({pub_date.strftime('%B %Y')}) from: {excel_url}")
# Monthly data, cache for 30 days
cache_ttl_hours = 30 * 24
file_path = download_file(excel_url, cache_ttl_hours=cache_ttl_hours,
force_refresh=force_refresh)
return parse_crime_statistics_file(file_path)
def validate_crime_statistics(df: pd.DataFrame) -> bool:
"""Validate crime statistics data integrity.
Args:
df: DataFrame from parse_crime_statistics_file or get_latest_crime_statistics
Returns:
True if validation passes
Raises:
PSNIValidationError: If validation fails
"""
# Check for reasonable crime counts (no negative, not absurdly high)
if (df["count"] < 0).any():
raise PSNIValidationError("Found negative crime counts")
if (df["count"] > 100000).any():
logger.warning("Found suspiciously high crime count")
# Check temporal continuity
month_counts = df.groupby("month").size()
if month_counts.min() < 5:
raise PSNIValidationError("Some months have too few offence types")
logger.info(f"Validation passed: {len(df)} records checked")
return True
def get_crime_statistics_by_year(df: pd.DataFrame, year: int) -> pd.DataFrame:
"""Filter crime statistics for a specific year."""
return df[df["month"].dt.year == year].reset_index(drop=True)
def get_crime_summary_by_type(df: pd.DataFrame) -> pd.DataFrame:
"""Calculate summary statistics by offence type."""
return df.groupby("offence_type").agg({
"count": ["sum", "mean", "count"]
}).reset_index()
Step 5: Create __init__.py
"""PSNI (Police Service of Northern Ireland) Data Sources.
This module provides access to PSNI open data including:
- Crime Statistics: Monthly police recorded crime data
Example:
>>> from bolster.data_sources.psni import crime_statistics
>>> df = crime_statistics.get_latest_crime_statistics()
See individual module docstrings for detailed documentation.
"""
from .crime_statistics import (
get_latest_crime_statistics,
parse_crime_statistics_file,
validate_crime_statistics,
get_crime_statistics_by_year,
get_crime_summary_by_type,
)
from ._base import (
PSNIDataError,
PSNIDataNotFoundError,
PSNIValidationError,
)
__all__ = [
# Crime statistics
"get_latest_crime_statistics",
"parse_crime_statistics_file",
"validate_crime_statistics",
"get_crime_statistics_by_year",
"get_crime_summary_by_type",
# Exceptions
"PSNIDataError",
"PSNIDataNotFoundError",
"PSNIValidationError",
]
Step 6: Create Tests
Create tests/test_psni_crime_statistics_integrity.py:
"""Integrity tests for PSNI crime statistics data source."""
import pytest
import pandas as pd
from bolster.data_sources.psni import crime_statistics
@pytest.fixture(scope="module")
def data():
"""Load data once for all tests."""
return crime_statistics.get_latest_crime_statistics()
class TestDataStructure:
"""Test data structure and format."""
def test_returns_dataframe(self, data):
assert isinstance(data, pd.DataFrame)
def test_not_empty(self, data):
assert len(data) > 0
def test_has_expected_columns(self, data):
expected = {"month", "offence_type", "count"}
assert expected.issubset(set(data.columns))
class TestDataQuality:
"""Test data quality and integrity."""
def test_no_null_in_required_columns(self, data):
required = ["month", "offence_type", "count"]
for col in required:
assert data[col].notna().all()
def test_valid_date_range(self, data):
assert data["month"].min() >= pd.Timestamp("2000-01-01")
assert data["month"].max() <= pd.Timestamp.now()
def test_crime_counts_positive(self, data):
assert (data["count"] >= 0).all()
class TestDataValidation:
"""Test data validation functions."""
def test_validation_passes(self, data):
assert crime_statistics.validate_crime_statistics(data)
class TestHelperFunctions:
"""Test helper functions."""
def test_filter_by_year(self, data):
years = data["month"].dt.year.unique()
test_year = years[0]
filtered = crime_statistics.get_crime_statistics_by_year(data, test_year)
assert (filtered["month"].dt.year == test_year).all()
assert len(filtered) > 0
def test_summary_by_type(self, data):
summary = crime_statistics.get_crime_summary_by_type(data)
assert isinstance(summary, pd.DataFrame)
assert len(summary) > 0
Step 7: Create README
Create src/bolster/data_sources/psni/README.md with comprehensive documentation.
Step 8: Test and Submit
# Run tests
pytest tests/test_psni_crime_statistics_integrity.py -v
# Run linting
make pre-commit
# Commit and push
git add src/bolster/data_sources/psni/
git add tests/test_psni_crime_statistics_integrity.py
git commit -m "feat: Add PSNI crime statistics data source"
git push origin feature/psni-crime-statistics
Checklist
Before Submitting a PR
Use this checklist to ensure your data source is complete:
Code:
[ ] Created
src/bolster/data_sources/<source>/directory[ ] Created
_base.pywith custom exceptions and utilities[ ] Created individual dataset module(s) with required functions: - [ ]
get_latest_<dataset>_publication_url()- [ ]parse_<dataset>_file()- [ ]get_latest_<dataset>()- [ ] At least one validation function - [ ] At least one helper/filter function[ ] Created
__init__.pywith public API exports[ ] All type hints are Python 3.9+ compatible (no
X | Y, nolist[X])
Documentation:
[ ] Module docstrings include: - [ ] Brief description - [ ] Data source URL (mother page) - [ ] Update frequency, geographic coverage, reference date - [ ] Usage example
[ ] All public functions have complete docstrings with Args/Returns/Raises/Example
[ ] Created comprehensive
README.mdin module directory
Testing:
[ ] Created
tests/test_<source>_<dataset>_integrity.py[ ] Tests cover: - [ ] Data structure (DataFrame, columns, types) - [ ] Data quality (no nulls, valid ranges) - [ ] Validation functions - [ ] Helper functions
[ ] All tests pass locally
[ ] Code coverage >90%
Integration:
[ ] Added CLI command to
src/bolster/cli.py(if applicable)[ ] Tested CLI command locally
[ ] Updated
docs/usage.rstwith usage examples (if significant)
Quality:
[ ] Code passes
make pre-commit(ruff linting and formatting)[ ] No hardcoded credentials or API keys
[ ] Appropriate cache TTL for update frequency
[ ] Clear error messages for common failure modes
[ ] Logging at appropriate levels (info for major operations, debug for details)
Getting Help
If you have questions while developing a data source:
Check existing modules: The NISRA modules (
src/bolster/data_sources/nisra/) provide comprehensive examplesRead the tests: Integrity tests show expected usage patterns
File an issue: https://github.com/andrewbolster/bolster/issues with questions
Start a discussion: For design questions, open a GitHub Discussion
Contributing
We welcome data source contributions! Follow these steps:
Open an issue describing the data source you want to add
Get feedback on approach before writing significant code
Follow this guide for implementation
Submit a PR with complete code, tests, and documentation
Iterate based on review feedback
Your contribution helps make Northern Ireland’s open data more accessible!