"""Northern Ireland Water Quality Data Integration.
Data Source: Northern Ireland Water provides public water quality data through the OpenDataNI
portal at https://admin.opendatani.gov.uk/. The service offers water quality test results from
customer tap supply points across Northern Ireland, including chemical analysis, pH levels,
hardness classifications, and safety parameters. Additionally, postcode-to-water-zone mapping
is available for geographic analysis.
Update Frequency: Water quality data is published annually, typically reflecting the previous
calendar year's test results. The dataset includes comprehensive testing from customer tap
supply points across all water zones in Northern Ireland. Postcode mapping data is updated
as required when zone boundaries change.
Example:
Access water quality data and zone information:
>>> from bolster.data_sources import ni_water
>>> quality_data = ni_water.get_water_quality()
>>> 'NI Hardness Classification' in quality_data.columns
True
>>> len(quality_data) > 0
True
The module provides utilities for analyzing water quality across Northern Ireland's supply zones,
with support for both current quality data and historical zone mapping.
"""
import csv
import logging
from urllib.error import HTTPError
import pandas as pd
from bolster import backoff
from bolster.utils.web import session
[docs]
logger = logging.getLogger(__name__)
# Legacy postcode to zone mapping (still functional)
[docs]
POSTCODE_DATASET_URL = "https://admin.opendatani.gov.uk/dataset/38a9a8f1-9346-41a2-8e5f-944d87d9caf2/resource/f2bc12c1-4277-4db5-8bd3-b7bb027cc401/download/postcode-v-zone-lookup-by-year.csv"
# Modern water quality data from OpenDataNI
[docs]
WATER_QUALITY_CSV_URL = "https://admin.opendatani.gov.uk/dataset/38a9a8f1-9346-41a2-8e5f-944d87d9caf2/resource/02d85526-c082-482c-b205-a318f97fd18d/download/2024-ni-water-customer-tap-supply-point-results.csv"
[docs]
T_HARDNESS = pd.CategoricalDtype(["Soft", "Moderately Soft", "Slightly Hard", "Moderately Hard"], ordered=True)
[docs]
INVALID_ZONE_IDENTIFIER = "No Zone Identified"
# Cache for water quality data to avoid repeated downloads
_water_quality_cache: pd.DataFrame | None = None
@backoff((HTTPError, RuntimeError))
[docs]
def get_water_quality_csv_data() -> pd.DataFrame:
"""Get the latest water quality CSV data from OpenDataNI.
This function downloads and caches the complete water quality dataset
which contains all results from customer tap supply points.
Returns:
pd.DataFrame: Complete water quality data with columns:
- Year: Sample year
- Sample Location: Location description
- Site Code: Unique site identifier
- Site Name: Human-readable site name
- Sample Id Text: Unique sample identifier
- Sample Date: Date of sample collection
- Postcode: Postcode for the sample location
- Parameter: Water quality parameter name (e.g., 'Total Hardness (mg/l)')
- PCV Limit: Prescribed Concentration Value limit
- Result: Numeric test result
- Report Value: Formatted result value
- Units: Unit of measurement
Raises:
HTTPError: If the CSV download fails
RuntimeError: If no data is found
"""
global _water_quality_cache
if _water_quality_cache is not None:
return _water_quality_cache
logger.info(f"Downloading water quality data from {WATER_QUALITY_CSV_URL}")
with session.get(WATER_QUALITY_CSV_URL, stream=True) as r:
r.raise_for_status()
_water_quality_cache = pd.read_csv(r.url)
if _water_quality_cache.empty:
raise RuntimeError("No water quality data found in CSV")
logger.info(f"Loaded {len(_water_quality_cache)} water quality records")
return _water_quality_cache
def _site_code_to_zone_code(site_code: str) -> str:
"""Convert OpenDataNI site code to legacy zone code format.
The new CSV data uses site codes like 'BALM' while the legacy API
used zone codes like 'ZS0101'. This function provides a mapping
between the two systems for backward compatibility.
Args:
site_code: Site code from the CSV data
Returns:
Legacy-style zone code
"""
# For now, we'll use the site code directly as the zone identifier
# This maintains the function signatures but uses the new data
return site_code
def _create_legacy_format_series(site_data: pd.DataFrame, zone_code: str) -> pd.Series:
"""Convert CSV format data to legacy API format for backward compatibility.
The original API returned a pandas Series with specific keys.
This function recreates that format using the new CSV data.
Args:
site_data: DataFrame containing all parameters for a specific site
zone_code: The zone/site identifier
Returns:
pd.Series in the legacy format
"""
# Create the legacy format series
result_data = {}
# Get the first row for site information
if not site_data.empty:
first_row = site_data.iloc[0]
result_data["Water Supply Zone"] = first_row.get("Site Name", f"Site {zone_code}")
# Map CSV parameters to legacy format
parameter_mapping = {
"Total hardness": "Total Hardness (mg/l)", # Note: CSV uses "Total hardness", legacy used "Total Hardness (mg/l)"
"Magnesium": "Magnesium (mg/l)",
"Potassium": "Potassium (mg/l)",
"Calcium": "Calcium (mg/l)",
}
# Extract parameter values
for _, row in site_data.iterrows():
parameter = row.get("Parameter", "")
if parameter in parameter_mapping:
legacy_key = parameter_mapping[parameter]
result_data[legacy_key] = row.get("Report Value", row.get("Result", ""))
# Calculate derived values if we have the basic measurements
try:
if "Total Hardness (mg/l)" in result_data:
# The CSV data appears to be in mg/l format, which we need to interpret
# Assuming this is already CaCO3 equivalent (which is standard for water hardness)
hardness_mg_l = float(result_data["Total Hardness (mg/l)"])
hardness_caco3 = hardness_mg_l # Assume it's already in CaCO3 equivalent
# Add the CaCO3 format that legacy API provided
result_data["Total Hardness (mg CaCO3/l)"] = str(hardness_caco3)
# Calculate degrees of hardness
result_data["Clark English Degrees"] = f"{hardness_caco3 / 14.3:.1f}"
result_data["French Degrees"] = f"{hardness_caco3 / 10.0:.1f}"
result_data["German Degrees"] = f"{hardness_caco3 / 17.8:.1f}"
# Classify hardness based on standard UK classifications
if hardness_caco3 <= 60:
hardness_class = "Soft"
dishwasher_setting = "1"
elif hardness_caco3 <= 120:
hardness_class = "Moderately Soft"
dishwasher_setting = "2"
elif hardness_caco3 <= 180:
hardness_class = "Slightly Hard"
dishwasher_setting = "3"
else:
hardness_class = "Moderately Hard"
dishwasher_setting = "4"
result_data["NI Hardness Classification"] = hardness_class
result_data["Dishwasher Setting"] = dishwasher_setting
except (ValueError, KeyError) as e:
# If we can't calculate derived values, just skip them
logger.debug(f"Could not calculate hardness classifications: {e}")
pass
return pd.Series(result_data, name=zone_code)
@backoff((HTTPError, RuntimeError))
[docs]
def get_postcode_to_water_supply_zone() -> dict[str, str]:
"""Using data from OpenDataNI to generate a map from NI Postcodes to Water Supply Zone.
>>> zones = get_postcode_to_water_supply_zone()
>>> len(zones)
49006
Zones is keyed off postcode to a Water Supply Zone
>>> zones['BT1 1AA']
'ZS0107'
There are much fewer zones than postcodes
>>> len(set(zones.values()))
65
And many postcodes that aren't associated with any zone
>>> len([k for k,v in zones.items() if v == INVALID_ZONE_IDENTIFIER])
97
"""
with session.get(POSTCODE_DATASET_URL, stream=True) as r:
lines = (line.decode("utf-8") for line in r.iter_lines())
reader = csv.DictReader(lines)
keys = reader.fieldnames[:2] # Take POSTCODE and first year
zones = dict([row[k] for k in keys] for row in reader)
if not zones:
raise RuntimeError("No data found")
return zones
[docs]
def get_water_quality_by_zone(zone_code: str, strict=False) -> pd.Series:
"""Get the latest Water Quality for a given Water Supply Zone.
Now uses modern OpenDataNI CSV data instead of the deprecated HTML API.
The zone_code can be either a legacy zone code (like 'ZS0101') or a
site code from the CSV data (like 'BALM').
Args:
zone_code: Water supply zone identifier or site code
strict: If True, raise ValueError for invalid zones
Returns:
pd.Series: Water quality data in legacy API format with indices like:
- Water Supply Zone: Human-readable zone name
- Total Hardness (mg/l): Hardness as mg/l
- Total Hardness (mg CaCO3/l): Hardness as mg CaCO3/l
- Clark English Degrees: English degrees of hardness
- French Degrees: French degrees of hardness
- German Degrees: German degrees of hardness
- NI Hardness Classification: Categorical hardness level
- Dishwasher Setting: Recommended dishwasher setting
Raises:
ValueError: If zone_code is invalid and strict=True
HTTPError: If CSV data cannot be downloaded
RuntimeError: If CSV data is empty
Example usage:
data = get_water_quality_by_zone('BALM') # Using CSV site code
print(data['Water Supply Zone'])
print(data['NI Hardness Classification'])
"""
try:
# Get the full CSV dataset
water_quality_df = get_water_quality_csv_data()
# Try to find data for this zone/site code
# First try as site code, then try as a potential zone mapping
site_data = water_quality_df[water_quality_df["Site Code"] == zone_code]
if site_data.empty:
# Try to find by site name containing the zone code
site_data = water_quality_df[water_quality_df["Site Name"].str.contains(zone_code, case=False, na=False)]
if site_data.empty:
# No data found for this zone
if strict:
raise ValueError(f"Potentially invalid Water Supply Zone {zone_code}")
logger.warning(f"Potentially invalid Water Supply Zone {zone_code}")
return pd.Series(name=zone_code)
# Convert to legacy format
return _create_legacy_format_series(site_data, zone_code)
except (HTTPError, RuntimeError) as err:
# Handle data source errors
if strict:
raise ValueError(f"Unable to retrieve data for Water Supply Zone {zone_code}") from err
logger.warning(f"Unable to retrieve data for Water Supply Zone {zone_code}: {err}")
return pd.Series(name=zone_code)
[docs]
def get_water_quality() -> pd.DataFrame:
"""Get a DataFrame of Water Quality Data from OpenDataNI.
This function now uses the modern CSV data source instead of the deprecated
HTML API. It returns water quality data for all available sites.
Returns:
pd.DataFrame: Water quality data with one row per site/zone.
Columns include hardness measurements, classifications, and
other water quality parameters. The 'NI Hardness Classification'
column uses categorical data type with proper ordering.
Raises:
HTTPError: If CSV data cannot be downloaded
RuntimeError: If CSV data is empty
Example usage:
df = get_water_quality()
print(df.shape) # Number of rows and columns
print(df['NI Hardness Classification'].value_counts(sort=False))
# Show available sites
print(df.index.tolist()) # Site codes
# Get hardness data
hardness_summary = df.groupby('NI Hardness Classification').size()
print(hardness_summary)
"""
try:
# Get the full CSV dataset
water_quality_df = get_water_quality_csv_data()
# Get unique site codes (equivalent to the old zone concept)
unique_sites = water_quality_df["Site Code"].unique()
logger.info(f"Processing water quality data for {len(unique_sites)} sites")
# Process each site to create legacy-format series
site_series_list = []
for site_code in unique_sites:
if pd.isna(site_code) or site_code == "":
continue
site_data = water_quality_df[water_quality_df["Site Code"] == site_code]
if not site_data.empty:
try:
series = _create_legacy_format_series(site_data, site_code)
if not series.empty:
site_series_list.append(series)
except Exception as e:
logger.warning(f"Error processing site {site_code}: {e}")
continue
if not site_series_list:
raise RuntimeError("No valid water quality data could be processed")
# Combine all series into a DataFrame
df = pd.DataFrame(site_series_list)
# Ensure NI Hardness Classification uses proper categorical type
if "NI Hardness Classification" in df.columns:
df = df.astype({"NI Hardness Classification": T_HARDNESS})
logger.info(f"Created water quality DataFrame with {len(df)} sites and {len(df.columns)} parameters")
return df
except Exception as e:
logger.error(f"Failed to get water quality data: {e}")
raise