"""Generic utility for extracting DataTables data from HTML pages.
Many Northern Ireland government statistics pages use R's flexdashboard/DT package
to embed DataTables widgets. The data is stored as column-transposed JSON inside
``<script type="application/json">`` blocks with a ``{"x": {"data": [...], ...}}``
structure, where ``x["data"]`` is a list of column arrays (not row arrays) and
``x["container"]`` holds the HTML table header with column names.
Example:
>>> from bolster.utils.datatables import datatables_to_dataframe
>>> payload = {
... "data": [["A", "B"], [1, 2]],
... "container": "<table><thead><tr><th>Name</th><th>Value</th></tr></thead></table>",
... }
>>> df = datatables_to_dataframe(payload)
>>> list(df.columns)
['Name', 'Value']
"""
import json
import logging
import pandas as pd
from bs4 import BeautifulSoup
from .web import session
[docs]
logger = logging.getLogger(__name__)
[docs]
class DataTablesError(Exception):
"""Raised when DataTables extraction fails."""
[docs]
def fetch_datatables_json(url: str, timeout: int = 30) -> dict:
"""Fetch an HTML page and extract the embedded DT widget JSON payload.
The payload is the parsed content of the largest
``<script type="application/json">`` block whose ``x.data`` key is a
column-transposed list (i.e. a list of lists).
Args:
url: URL of the HTML page containing a DataTables widget.
timeout: HTTP request timeout in seconds.
Returns:
The ``x`` sub-dict from the DT widget payload, containing at minimum
``"data"`` (list of column arrays) and ``"container"`` (HTML header).
Raises:
DataTablesError: If the page cannot be fetched or no DT payload is found.
Example:
>>> from bolster.utils.datatables import DataTablesError
>>> try:
... fetch_datatables_json("https://example.com/data.html")
... except DataTablesError:
... print("DataTablesError raised for invalid page")
DataTablesError raised for invalid page
A successful call returns a dict extracted from the page's DT widget.
The shape mirrors what ``_extract_datatables_payload`` returns:
>>> sample_html = (
... '<script type="application/json">'
... '{"x": {"data": [["Belfast", "Derry"], [1200, 800]],'
... ' "container": "<thead><tr><th>City</th><th>Count</th></tr></thead>"}}'
... "</script>"
... )
>>> payload = _extract_datatables_payload(sample_html)
>>> sorted(payload.keys())
['container', 'data']
>>> len(payload["data"])
2
>>> payload["data"][0]
['Belfast', 'Derry']
"""
try:
response = session.get(url, timeout=timeout)
response.raise_for_status()
except Exception as e:
raise DataTablesError(f"Failed to fetch page {url}: {e}") from e
return _extract_datatables_payload(response.text, url)
def _extract_datatables_payload(html: str, source_url: str = "") -> dict:
"""Extract the DT widget payload from an HTML string.
Args:
html: Full HTML page content.
source_url: Source URL used only in error messages.
Returns:
The ``x`` sub-dict from the largest matching DT widget payload.
Raises:
DataTablesError: If no valid DT widget payload is found.
"""
soup = BeautifulSoup(html, "html.parser")
json_scripts = soup.find_all("script", type="application/json")
if not json_scripts:
raise DataTablesError(f"No application/json script blocks found in {source_url}")
candidates = []
for script in json_scripts:
text = script.string
if not text:
continue
try:
parsed = json.loads(text)
except json.JSONDecodeError:
continue
if not isinstance(parsed, dict):
continue
x = parsed.get("x")
if not isinstance(x, dict):
continue
data = x.get("data")
if isinstance(data, list) and data and isinstance(data[0], list):
candidates.append((len(text), x))
if not candidates:
raise DataTablesError(
f"No DataTables column-transposed payload found in {source_url}. "
"Expected a script block with x.data as a list of column arrays."
)
# Return the payload from the largest matching script block
_, best = max(candidates, key=lambda t: t[0])
return best
def _parse_column_headers(container_html: str) -> list[str]:
"""Extract column header names from the DT container HTML.
Args:
container_html: HTML string containing a ``<thead>`` with ``<th>`` cells.
Returns:
List of column header strings in order.
"""
soup = BeautifulSoup(container_html, "html.parser")
return [th.get_text(strip=True) for th in soup.find_all("th")]
[docs]
def datatables_to_dataframe(payload: dict) -> pd.DataFrame:
"""Convert a DT widget payload into a row-oriented DataFrame.
The ``payload["data"]`` field is a list of column arrays (column-transposed).
This function transposes it into a normal row-oriented DataFrame and uses
column names from ``payload["container"]`` if available.
Args:
payload: The ``x`` sub-dict from a DT widget JSON block, as returned by
:func:`fetch_datatables_json`.
Returns:
DataFrame with one row per record and columns named from the HTML header.
Raises:
DataTablesError: If ``payload["data"]`` is missing or malformed.
Example:
>>> payload = {
... "data": [["a", "b"], [1, 2]],
... "container": "<table><thead><tr><th>Name</th><th>Value</th></tr></thead></table>",
... }
>>> df = datatables_to_dataframe(payload)
>>> list(df.columns)
['Name', 'Value']
>>> len(df)
2
"""
col_arrays = payload.get("data")
if not isinstance(col_arrays, list) or not col_arrays:
raise DataTablesError("payload['data'] is missing or empty")
if not isinstance(col_arrays[0], list):
raise DataTablesError("payload['data'] must be a list of column arrays")
n_cols = len(col_arrays)
n_rows = len(col_arrays[0])
# Validate all columns have the same length
for i, col in enumerate(col_arrays):
if len(col) != n_rows:
raise DataTablesError(f"Column {i} has {len(col)} rows but column 0 has {n_rows} rows")
# Build column names from container HTML if available
container = payload.get("container", "")
col_names: list[str] = []
if container:
col_names = _parse_column_headers(container)
if len(col_names) != n_cols:
if col_names:
logger.warning(
"Column header count (%d) does not match data column count (%d); falling back to positional names",
len(col_names),
n_cols,
)
col_names = [f"col_{i}" for i in range(n_cols)]
return pd.DataFrame(dict(zip(col_names, col_arrays, strict=False)))
[docs]
def get_column_headers_from_url(url: str, timeout: int = 30) -> list[str]:
"""Fetch a DataTables page and return its column header names.
Convenience helper for discovery.
Args:
url: URL of the HTML page.
timeout: HTTP request timeout in seconds.
Returns:
List of column header strings.
"""
payload = fetch_datatables_json(url, timeout=timeout)
container = payload.get("container", "")
return _parse_column_headers(container) if container else []