"""PSNI Road Traffic Collision Statistics.
Provides access to police-recorded injury road traffic collision (RTC) statistics
for Northern Ireland.
Data includes:
- Collision records with date, location, road conditions, severity
- Casualty records with age, gender, severity, road user class
- Vehicle records with type, manoeuvre, driver details
- Geographic breakdown by 11 policing districts (aligned with LGDs)
- Historical time series from 2013 onwards
Data Source:
**Primary Source**: OpenDataNI - Police Recorded Injury Road Traffic Collision Statistics
https://www.opendatani.gov.uk/dataset?q=road+traffic+collision
PSNI collects RTC statistics in accordance with STATS20 guidance from the
Department for Transport. Data covers injury collisions only (not damage-only).
Published under the Open Government Licence v3.0.
**PSNI Official Statistics**: https://www.psni.police.uk/about-us/our-publications-and-reports/official-statistics/road-traffic-collision-statistics
Update Frequency: Annual (data available ~6 months after year end)
Geographic Coverage: Northern Ireland (11 policing districts)
Reference Date: Date of collision occurrence
Time Coverage: 2013 to present
Example:
>>> from bolster.data_sources.psni import road_traffic_collisions
>>> df = road_traffic_collisions.get_collisions()
>>> 'severity' in road_traffic_collisions.get_casualties().columns
True
>>> summary = road_traffic_collisions.get_annual_summary()
>>> 'year' in summary.columns
True
"""
import logging
from datetime import datetime
from typing import Literal
import pandas as pd
from bolster.utils.web import session
from ._base import (
PSNIDataNotFoundError,
PSNIValidationError,
download_file,
get_lgd_code,
get_nuts3_code,
)
[docs]
logger = logging.getLogger(__name__)
# OpenDataNI API endpoint
[docs]
OPENDATANI_API = "https://admin.opendatani.gov.uk/api/3/action"
# District code mappings (short codes used in RTC data to full names)
[docs]
DISTRICT_CODES = {
"ANTN": "Antrim & Newtownabbey",
"ARND": "Ards & North Down",
"ARBC": "Armagh City Banbridge & Craigavon",
"BELC": "Belfast City",
"CCGL": "Causeway Coast & Glens",
"DCST": "Derry City & Strabane",
"FERO": "Fermanagh & Omagh",
"LISC": "Lisburn & Castlereagh City",
"MEAN": "Mid & East Antrim",
"MIDU": "Mid Ulster",
"NEMD": "Newry Mourne & Down",
}
# Reverse mapping
[docs]
DISTRICT_NAMES_TO_CODES = {v: k for k, v in DISTRICT_CODES.items()}
# Casualty severity codes
[docs]
SEVERITY_CODES = {
1: "Fatal",
2: "Serious",
3: "Slight",
}
# Casualty class codes (road user type)
[docs]
CASUALTY_CLASS_CODES = {
1: "Driver/Rider",
2: "Passenger (front)",
3: "Passenger (rear)",
4: "Passenger (other)",
5: "Pedestrian",
6: "Pillion passenger",
}
# Vehicle type codes
[docs]
VEHICLE_TYPE_CODES = {
1: "Pedal cycle",
2: "Motorcycle 50cc or under",
3: "Motorcycle over 50cc and up to 125cc",
4: "Motorcycle over 125cc and up to 500cc",
5: "Motorcycle over 500cc",
8: "Car",
9: "Taxi",
10: "Minibus (8-16 passengers)",
11: "Bus/Coach (17+ passengers)",
15: "Goods vehicle 3.5 tonnes mgw or under",
16: "Goods vehicle over 3.5 and under 7.5 tonnes mgw",
17: "Goods vehicle 7.5 tonnes mgw or over",
18: "Agricultural vehicle",
19: "Other motor vehicle",
20: "Other non-motor vehicle",
21: "Tram/Light rail",
22: "Mobility scooter",
23: "Electric scooter",
}
# Day of week codes
[docs]
DAY_OF_WEEK_CODES = {
1: "Sunday",
2: "Monday",
3: "Tuesday",
4: "Wednesday",
5: "Thursday",
6: "Friday",
7: "Saturday",
}
# Light conditions codes
[docs]
LIGHT_CONDITIONS_CODES = {
1: "Daylight",
2: "Darkness: street lights present and lit",
3: "Darkness: street lights present but unlit",
4: "Darkness: no street lighting",
5: "Darkness: street lighting unknown",
}
# Weather codes
[docs]
WEATHER_CODES = {
1: "Fine without high winds",
2: "Raining without high winds",
3: "Snowing without high winds",
4: "Fine with high winds",
5: "Raining with high winds",
6: "Snowing with high winds",
7: "Fog or mist",
8: "Other",
9: "Unknown",
}
# Road surface codes
[docs]
ROAD_SURFACE_CODES = {
1: "Dry",
2: "Wet/Damp",
3: "Snow",
4: "Frost/Ice",
5: "Flood (surface water over 3cm deep)",
}
def _get_available_datasets() -> list[dict]:
"""Get list of available RTC datasets from OpenDataNI.
Returns:
List of dataset metadata dictionaries with keys:
- year: int
- id: str (package ID)
- title: str
- resources: List of resource dicts
Raises:
PSNIDataNotFoundError: If API request fails
"""
try:
resp = session.get(
f"{OPENDATANI_API}/package_search",
params={"q": "police recorded injury road traffic collision northern ireland", "rows": 50},
headers={"User-Agent": "bolster/1.0"},
timeout=30,
)
resp.raise_for_status()
data = resp.json()
if not data.get("success"):
raise PSNIDataNotFoundError("OpenDataNI API returned unsuccessful response")
datasets = []
for pkg in data["result"]["results"]:
# Extract year from title or name
title = pkg.get("title", "")
name = pkg.get("name", "")
# Try to extract year (e.g., "...2024" or "...2013")
year = None
for part in title.split() + name.split("-"):
if part.isdigit() and 2010 <= int(part) <= 2030:
year = int(part)
break
if year:
datasets.append(
{
"year": year,
"id": pkg["id"],
"name": pkg["name"],
"title": title,
"resources": pkg.get("resources", []),
}
)
# Sort by year descending
datasets.sort(key=lambda x: x["year"], reverse=True)
return datasets
except Exception as e:
raise PSNIDataNotFoundError(f"Failed to fetch dataset list: {e}") from e
[docs]
def get_available_years() -> list[int]:
"""Get list of years with available RTC data.
Returns:
List of years (integers) in descending order
Example:
>>> years = get_available_years()
>>> len(years) > 0
True
"""
datasets = _get_available_datasets()
return [d["year"] for d in datasets]
def _get_resource_url(year: int, resource_type: Literal["collision", "casualty", "vehicle"]) -> str:
"""Get download URL for a specific resource type and year.
Args:
year: Data year
resource_type: One of 'collision', 'casualty', 'vehicle'
Returns:
Download URL for the CSV file
Raises:
PSNIDataNotFoundError: If resource not found
"""
datasets = _get_available_datasets()
# Find dataset for year
dataset = next((d for d in datasets if d["year"] == year), None)
if not dataset:
available = [d["year"] for d in datasets]
raise PSNIDataNotFoundError(f"No data available for year {year}. Available years: {available}")
# Find matching resource
search_terms = {
"collision": ["collision"],
"casualty": ["casualt"], # matches "casualty" and "casualties"
"vehicle": ["vehicle"],
}
for resource in dataset["resources"]:
name = resource.get("name", "").lower()
url = resource.get("url", "")
if resource.get("format", "").upper() == "CSV":
for term in search_terms[resource_type]:
if term in name or term in url.lower():
return url
raise PSNIDataNotFoundError(f"No {resource_type} CSV found for year {year}")
[docs]
def get_collisions(
year: int | None = None,
force_refresh: bool = False,
decode_values: bool = True,
) -> pd.DataFrame:
"""Get collision records for a specific year.
Each row represents a single road traffic collision with details about
date, time, location, road conditions, and severity.
Args:
year: Year to fetch (default: latest available)
force_refresh: If True, bypass cache and re-download
decode_values: If True, decode coded values to human-readable strings
Returns:
DataFrame with columns including:
- year: int
- ref: int (collision reference number)
- district: str (policing district name if decoded)
- district_code: str (original code)
- month: int
- day: int
- weekday: str (day name if decoded)
- hour: int
- vehicles: int (number of vehicles)
- casualties: int (number of casualties)
- light_conditions: str (if decoded)
- weather: str (if decoded)
- road_surface: str (if decoded)
- lgd_code: str (ONS LGD code)
- nuts3_code: str (NUTS3 region code)
Example:
>>> df = get_collisions(2024)
>>> 'severity' in df.columns or 'district' in df.columns
True
"""
if year is None:
years = get_available_years()
if not years:
raise PSNIDataNotFoundError("No RTC datasets available")
year = years[0]
logger.info(f"Using latest available year: {year}")
url = _get_resource_url(year, "collision")
file_path = download_file(url, cache_ttl_hours=24 * 30, force_refresh=force_refresh)
df = pd.read_csv(file_path)
# Standardize column names
column_mapping = {
"a_year": "year",
"a_ref": "ref",
"a_District": "district_code",
"a_type": "collision_type",
"a_veh": "vehicles",
"a_cas": "casualties",
"a_wkday": "weekday_code",
"a_day": "day",
"a_month": "month",
"a_hour": "hour",
"a_min": "minute",
"a_speed": "speed_limit",
"a_light": "light_code",
"a_weat": "weather_code",
"a_roadsc": "road_surface_code",
}
df = df.rename(columns={k: v for k, v in column_mapping.items() if k in df.columns})
# Add decoded values
if decode_values:
if "district_code" in df.columns:
df["district"] = df["district_code"].map(DISTRICT_CODES)
df["lgd_code"] = df["district"].apply(get_lgd_code)
df["nuts3_code"] = df["district"].apply(get_nuts3_code)
if "weekday_code" in df.columns:
df["weekday"] = df["weekday_code"].map(DAY_OF_WEEK_CODES)
if "light_code" in df.columns:
df["light_conditions"] = pd.to_numeric(df["light_code"], errors="coerce").map(LIGHT_CONDITIONS_CODES)
if "weather_code" in df.columns:
df["weather"] = pd.to_numeric(df["weather_code"], errors="coerce").map(WEATHER_CODES)
if "road_surface_code" in df.columns:
df["road_surface"] = pd.to_numeric(df["road_surface_code"], errors="coerce").map(ROAD_SURFACE_CODES)
# Create date column
if all(col in df.columns for col in ["year", "month", "day"]):
df["date"] = pd.to_datetime(
{"year": df["year"], "month": df["month"], "day": df["day"]},
errors="coerce",
)
logger.info(f"Loaded {len(df):,} collisions for {year}")
return df
[docs]
def get_casualties(
year: int | None = None,
force_refresh: bool = False,
decode_values: bool = True,
) -> pd.DataFrame:
"""Get casualty records for a specific year.
Each row represents a single casualty involved in a road traffic collision.
Casualties are linked to collisions via the 'ref' column.
Args:
year: Year to fetch (default: latest available)
force_refresh: If True, bypass cache and re-download
decode_values: If True, decode coded values to human-readable strings
Returns:
DataFrame with columns including:
- year: int
- ref: int (collision reference number for linking)
- vehicle_id: int
- casualty_id: int
- casualty_class: str (road user type if decoded)
- sex_code: int
- age_group: int
- severity: str ('Fatal', 'Serious', 'Slight' if decoded)
- severity_code: int (1=fatal, 2=serious, 3=slight)
Example:
>>> df = get_casualties(2024)
>>> 'severity' in df.columns
True
"""
if year is None:
years = get_available_years()
if not years:
raise PSNIDataNotFoundError("No RTC datasets available")
year = years[0]
logger.info(f"Using latest available year: {year}")
url = _get_resource_url(year, "casualty")
file_path = download_file(url, cache_ttl_hours=24 * 30, force_refresh=force_refresh)
df = pd.read_csv(file_path)
# Standardize column names
column_mapping = {
"a_year": "year",
"a_ref": "ref",
"v_id": "vehicle_id",
"c_id": "casualty_id",
"c_class": "casualty_class_code",
"c_sex": "sex_code",
"c_agegroup": "age_group",
"c_sever": "severity_code",
"c_vtype": "vehicle_type_code",
}
df = df.rename(columns={k: v for k, v in column_mapping.items() if k in df.columns})
# Add decoded values
if decode_values:
if "severity_code" in df.columns:
df["severity"] = df["severity_code"].map(SEVERITY_CODES)
if "casualty_class_code" in df.columns:
df["casualty_class"] = df["casualty_class_code"].map(CASUALTY_CLASS_CODES)
if "vehicle_type_code" in df.columns:
df["vehicle_type"] = pd.to_numeric(df["vehicle_type_code"], errors="coerce").map(VEHICLE_TYPE_CODES)
logger.info(f"Loaded {len(df):,} casualties for {year}")
return df
[docs]
def get_vehicles(
year: int | None = None,
force_refresh: bool = False,
decode_values: bool = True,
) -> pd.DataFrame:
"""Get vehicle records for a specific year.
Each row represents a single vehicle involved in a road traffic collision.
Vehicles are linked to collisions via the 'ref' column.
Args:
year: Year to fetch (default: latest available)
force_refresh: If True, bypass cache and re-download
decode_values: If True, decode coded values to human-readable strings
Returns:
DataFrame with columns including:
- year: int
- ref: int (collision reference number for linking)
- vehicle_id: int
- vehicle_type: str (if decoded)
- vehicle_type_code: int
- driver_sex_code: int
- driver_age_group: int
Example:
>>> df = get_vehicles(2024)
>>> 'vehicle_id' in df.columns
True
"""
if year is None:
years = get_available_years()
if not years:
raise PSNIDataNotFoundError("No RTC datasets available")
year = years[0]
logger.info(f"Using latest available year: {year}")
url = _get_resource_url(year, "vehicle")
file_path = download_file(url, cache_ttl_hours=24 * 30, force_refresh=force_refresh)
df = pd.read_csv(file_path)
# Standardize column names
column_mapping = {
"a_year": "year",
"a_ref": "ref",
"v_id": "vehicle_id",
"v_type": "vehicle_type_code",
"v_sex": "driver_sex_code",
"v_agegroup": "driver_age_group",
}
df = df.rename(columns={k: v for k, v in column_mapping.items() if k in df.columns})
# Add decoded values
if decode_values and "vehicle_type_code" in df.columns:
df["vehicle_type"] = df["vehicle_type_code"].map(VEHICLE_TYPE_CODES)
logger.info(f"Loaded {len(df):,} vehicles for {year}")
return df
[docs]
def get_casualties_with_collision_details(
year: int | None = None,
force_refresh: bool = False,
) -> pd.DataFrame:
"""Get casualty records merged with collision details.
Combines casualty data with collision information including date,
location, and road conditions.
Args:
year: Year to fetch (default: latest available)
force_refresh: If True, bypass cache and re-download
Returns:
DataFrame with casualty records enriched with collision details
Example:
>>> df = get_casualties_with_collision_details(2024)
>>> 'severity' in df.columns
True
"""
casualties = get_casualties(year, force_refresh=force_refresh)
collisions = get_collisions(year, force_refresh=force_refresh)
# Select key collision columns for merge
collision_cols = [
"ref",
"district",
"district_code",
"date",
"month",
"day",
"weekday",
"hour",
"light_conditions",
"weather",
"road_surface",
"lgd_code",
"nuts3_code",
]
collision_cols = [c for c in collision_cols if c in collisions.columns]
merged = casualties.merge(collisions[collision_cols], on="ref", how="left", suffixes=("", "_collision"))
logger.info(f"Merged {len(merged):,} casualty records with collision details")
return merged
[docs]
def get_annual_summary(
years: list[int] | None = None,
force_refresh: bool = False,
) -> pd.DataFrame:
"""Get annual summary statistics across multiple years.
Provides aggregated collision and casualty counts by year, useful for
trend analysis.
Args:
years: List of years to include (default: all available)
force_refresh: If True, bypass cache and re-download
Returns:
DataFrame with columns:
- year: int
- collisions: int (total collisions)
- casualties: int (total casualties)
- fatal: int (fatal casualties)
- serious: int (serious injuries)
- slight: int (slight injuries)
- fatalities_per_100_collisions: float
Example:
>>> summary = get_annual_summary()
>>> 'fatal' in summary.columns
True
"""
if years is None:
years = get_available_years()
summaries = []
for year in years:
try:
collisions = get_collisions(year, force_refresh=force_refresh)
casualties = get_casualties(year, force_refresh=force_refresh)
fatal = len(casualties[casualties["severity_code"] == 1])
serious = len(casualties[casualties["severity_code"] == 2])
slight = len(casualties[casualties["severity_code"] == 3])
summaries.append(
{
"year": year,
"collisions": len(collisions),
"casualties": len(casualties),
"fatal": fatal,
"serious": serious,
"slight": slight,
"fatalities_per_100_collisions": round(fatal / len(collisions) * 100, 2)
if len(collisions) > 0
else 0,
}
)
except PSNIDataNotFoundError as e:
logger.warning(f"Could not fetch data for {year}: {e}")
continue
df = pd.DataFrame(summaries)
df = df.sort_values("year").reset_index(drop=True)
logger.info(f"Generated annual summary for {len(df)} years")
return df
[docs]
def get_casualties_by_district(
year: int | None = None,
force_refresh: bool = False,
) -> pd.DataFrame:
"""Get casualty counts by policing district.
Args:
year: Year to fetch (default: latest available)
force_refresh: If True, bypass cache and re-download
Returns:
DataFrame with columns:
- district: str (policing district name)
- lgd_code: str (ONS LGD code)
- collisions: int
- casualties: int
- fatal: int
- serious: int
- slight: int
Example:
>>> by_district = get_casualties_by_district(2024)
>>> 'district' in by_district.columns
True
"""
df = get_casualties_with_collision_details(year, force_refresh=force_refresh)
# Aggregate by district
result = (
df.groupby(["district", "lgd_code"])
.agg(
casualties=("casualty_id", "count"),
fatal=("severity_code", lambda x: (x == 1).sum()),
serious=("severity_code", lambda x: (x == 2).sum()),
slight=("severity_code", lambda x: (x == 3).sum()),
)
.reset_index()
)
# Add collision count
collisions = get_collisions(year, force_refresh=force_refresh)
collision_counts = collisions.groupby("district").size().reset_index(name="collisions")
result = result.merge(collision_counts, on="district", how="left")
# Reorder columns
result = result[["district", "lgd_code", "collisions", "casualties", "fatal", "serious", "slight"]]
return result.sort_values("casualties", ascending=False).reset_index(drop=True)
[docs]
def get_casualties_by_road_user(
year: int | None = None,
force_refresh: bool = False,
) -> pd.DataFrame:
"""Get casualty counts by road user type.
Args:
year: Year to fetch (default: latest available)
force_refresh: If True, bypass cache and re-download
Returns:
DataFrame with columns:
- casualty_class: str (road user type)
- casualties: int
- fatal: int
- serious: int
- slight: int
- fatality_rate: float (fatal / total %)
Example:
>>> by_user = get_casualties_by_road_user(2024)
>>> 'casualty_class' in by_user.columns
True
"""
df = get_casualties(year, force_refresh=force_refresh)
result = (
df.groupby("casualty_class")
.agg(
casualties=("casualty_id", "count"),
fatal=("severity_code", lambda x: (x == 1).sum()),
serious=("severity_code", lambda x: (x == 2).sum()),
slight=("severity_code", lambda x: (x == 3).sum()),
)
.reset_index()
)
result["fatality_rate"] = (result["fatal"] / result["casualties"] * 100).round(2)
return result.sort_values("casualties", ascending=False).reset_index(drop=True)
[docs]
def validate_data(df: pd.DataFrame, data_type: Literal["collision", "casualty", "vehicle"]) -> bool:
"""Validate RTC data integrity.
Args:
df: DataFrame to validate
data_type: Type of data ('collision', 'casualty', or 'vehicle')
Returns:
True if validation passes
Raises:
PSNIValidationError: If validation fails
"""
if df.empty:
raise PSNIValidationError(f"Empty {data_type} DataFrame")
# Check for required columns based on type
required_cols = {
"collision": ["year", "ref"],
"casualty": ["year", "ref", "casualty_id"],
"vehicle": ["year", "ref", "vehicle_id"],
}
missing = set(required_cols[data_type]) - set(df.columns)
if missing:
raise PSNIValidationError(f"Missing required columns: {missing}")
# Check year range
years = df["year"].unique()
for year in years:
if not (2010 <= year <= datetime.now().year + 1):
raise PSNIValidationError(f"Invalid year value: {year}")
# Check for duplicates in key columns
if data_type == "collision":
if df.duplicated(subset=["year", "ref"]).any():
raise PSNIValidationError("Duplicate collision records found")
elif data_type == "casualty" and df.duplicated(subset=["year", "ref", "casualty_id"]).any():
raise PSNIValidationError("Duplicate casualty records found")
logger.info(f"Validation passed for {len(df):,} {data_type} records")
return True