Source code for bolster.utils.rss

"""RSS Feed parsing utilities for bolster.

This module provides utilities for parsing and working with RSS/Atom feeds,
with a focus on government statistics and research publications.
"""

import contextlib
import logging
from dataclasses import dataclass
from datetime import datetime

import feedparser
from dateutil import parser as date_parser

from .web import session

[docs] logger = logging.getLogger(__name__)
@dataclass
[docs] class FeedEntry: """Represents a single entry from an RSS/Atom feed."""
[docs] title: str
[docs] published: datetime | None = None
[docs] updated: datetime | None = None
[docs] summary: str | None = None
[docs] author: str | None = None
[docs] categories: list[str] = None
[docs] content: str | None = None
[docs] id: str | None = None
[docs] def __post_init__(self): """Initialize empty lists for mutable default arguments.""" if self.categories is None: self.categories = []
[docs] def to_dict(self) -> dict: """Convert entry to dictionary representation.""" return { "title": self.title, "link": self.link, "published": self.published.isoformat() if self.published else None, "updated": self.updated.isoformat() if self.updated else None, "summary": self.summary, "author": self.author, "categories": self.categories, "content": self.content, "id": self.id, }
@dataclass
[docs] class Feed: """Represents a parsed RSS/Atom feed."""
[docs] title: str
[docs] description: str | None = None
[docs] entries: list[FeedEntry] = None
[docs] language: str | None = None
[docs] updated: datetime | None = None
[docs] def __post_init__(self): """Initialize empty lists for mutable default arguments.""" if self.entries is None: self.entries = []
[docs] def to_dict(self) -> dict: """Convert feed to dictionary representation.""" return { "title": self.title, "link": self.link, "description": self.description, "language": self.language, "updated": self.updated.isoformat() if self.updated else None, "entries": [entry.to_dict() for entry in self.entries], }
[docs] def parse_date(date_str: str | None) -> datetime | None: """Parse a date string into a datetime object. Args: date_str: Date string in various formats Returns: Parsed datetime object or None if parsing fails """ if not date_str: return None try: return date_parser.parse(date_str) except (ValueError, TypeError) as e: logger.warning(f"Failed to parse date '{date_str}': {e}") return None
[docs] def parse_feed_entry(entry: feedparser.FeedParserDict) -> FeedEntry: """Parse a feedparser entry into a FeedEntry object. Args: entry: feedparser entry dictionary Returns: FeedEntry object """ # Extract categories categories = [] if hasattr(entry, "tags"): categories = [tag.get("term", "") for tag in entry.tags if tag.get("term")] # Extract dates - access underlying dict directly to bypass feedparser's deprecated # updated→published fallback (feedparser issue #310). Both .get() and attribute access # go through FeedParserDict.__getitem__ which triggers the DeprecationWarning. _d = dict.__getitem__ # shorthand for direct dict access _has = dict.__contains__ updated = None if _has(entry, "updated"): updated = parse_date(_d(entry, "updated")) elif _has(entry, "updated_parsed") and _d(entry, "updated_parsed"): with contextlib.suppress(TypeError, ValueError): updated = datetime(*_d(entry, "updated_parsed")[:6]) published = None if _has(entry, "published"): published = parse_date(_d(entry, "published")) elif _has(entry, "published_parsed") and _d(entry, "published_parsed"): with contextlib.suppress(TypeError, ValueError): published = datetime(*_d(entry, "published_parsed")[:6]) # Fall back to updated if published is not available if published is None and updated is not None: published = updated # Extract content content = None if hasattr(entry, "content") and entry.content: content = entry.content[0].get("value", "") if isinstance(entry.content, list) else entry.content # Extract summary summary = entry.get("summary", None) # Extract author author = None if hasattr(entry, "author"): author = entry.author elif hasattr(entry, "author_detail") and entry.author_detail: author = entry.author_detail.get("name", None) return FeedEntry( title=entry.get("title", "No title"), link=entry.get("link", ""), published=published, updated=updated, summary=summary, author=author, categories=categories, content=content, id=entry.get("id", None), )
[docs] def parse_rss_feed(feed_url: str, timeout: int = 30) -> Feed: """Parse an RSS or Atom feed from a URL. Args: feed_url: URL of the RSS/Atom feed timeout: Request timeout in seconds (default: 30) Returns: Feed object containing parsed feed data Raises: Exception: If the feed cannot be fetched ValueError: If the feed cannot be parsed Example: >>> feed = parse_rss_feed( ... "https://www.gov.uk/search/research-and-statistics.atom?" ... "content_store_document_type=all_research_and_statistics&" ... "organisations%5B%5D=northern-ireland-statistics-and-research-agency" ... ) >>> feed.title 'Research and statistics from Northern Ireland Statistics and Research Agency (NISRA)' >>> sorted(feed.__dataclass_fields__) ['description', 'entries', 'language', 'link', 'title', 'updated'] >>> len(feed.entries) > 0 True >>> entry = feed.entries[0] >>> sorted(entry.__dataclass_fields__) ['author', 'categories', 'content', 'id', 'link', 'published', 'summary', 'title', 'updated'] >>> isinstance(entry.title, str) and isinstance(entry.link, str) True >>> entry.link.startswith("http") True >>> from datetime import datetime >>> isinstance(entry.published, datetime) True """ # Fetch the feed try: response = session.get(feed_url, timeout=timeout) response.raise_for_status() except Exception as e: logger.error(f"Failed to fetch feed from {feed_url}: {e}") raise # Parse the feed parsed = feedparser.parse(response.content) if parsed.bozo and not parsed.entries: # Feed is malformed and has no entries error_msg = f"Failed to parse feed from {feed_url}" if hasattr(parsed, "bozo_exception"): error_msg += f": {parsed.bozo_exception}" logger.error(error_msg) raise ValueError(error_msg) # Extract feed metadata feed_info = parsed.get("feed", {}) # Extract feed updated date - bypass feedparser's deprecated fallback (issue #310) updated = None if isinstance(feed_info, dict) and dict.__contains__(feed_info, "updated"): updated = parse_date(dict.__getitem__(feed_info, "updated")) elif isinstance(feed_info, dict) and dict.__contains__(feed_info, "updated_parsed"): up = dict.__getitem__(feed_info, "updated_parsed") if up: with contextlib.suppress(TypeError, ValueError): updated = datetime(*up[:6]) # Parse entries entries = [parse_feed_entry(entry) for entry in parsed.entries] return Feed( title=feed_info.get("title", "Unknown Feed"), link=feed_info.get("link", feed_url), description=feed_info.get("description", None), language=feed_info.get("language", None), updated=updated, entries=entries, )
[docs] def filter_entries( entries: list[FeedEntry], title_contains: str | None = None, category: str | None = None, after_date: datetime | str | None = None, before_date: datetime | str | None = None, ) -> list[FeedEntry]: """Filter feed entries based on various criteria. Args: entries: List of FeedEntry objects to filter title_contains: Filter entries whose title contains this string (case-insensitive) category: Filter entries that have this category after_date: Filter entries published after this date before_date: Filter entries published before this date Returns: Filtered list of FeedEntry objects Example: >>> from bolster.utils.rss import FeedEntry, filter_entries >>> from datetime import datetime >>> entries = [ ... FeedEntry("Births Statistics April 2024", "http://example.com/1", published=datetime(2024, 4, 1)), ... FeedEntry("Deaths Statistics April 2024", "http://example.com/2", published=datetime(2024, 4, 2)), ... FeedEntry("Old Statistics 2023", "http://example.com/3", published=datetime(2023, 6, 1)), ... ] >>> recent = filter_entries(entries, title_contains="births", after_date="2024-01-01") >>> [e.title for e in recent] ['Births Statistics April 2024'] """ filtered = entries # Filter by title if title_contains: title_lower = title_contains.lower() filtered = [e for e in filtered if title_lower in e.title.lower()] # Filter by category if category: filtered = [e for e in filtered if category in e.categories] # Filter by date range if after_date: if isinstance(after_date, str): after_date = parse_date(after_date) if after_date: filtered = [e for e in filtered if e.published and e.published >= after_date] if before_date: if isinstance(before_date, str): before_date = parse_date(before_date) if before_date: filtered = [e for e in filtered if e.published and e.published <= before_date] return filtered
[docs] def get_nisra_statistics_feed(order: str = "recent", timeout: int = 30, limit: int | None = None) -> Feed: """Get the NISRA statistics feed from GOV.UK. The GOV.UK Atom feed returns 20 entries per page. When limit exceeds 20, multiple pages are fetched automatically. Args: order: Sort order - 'recent' for newest first, 'oldest' for oldest first timeout: Request timeout in seconds limit: Maximum number of entries to return (None = first page only, i.e. 20) Returns: Feed object with NISRA statistics Example: >>> feed = get_nisra_statistics_feed() >>> feed100 = get_nisra_statistics_feed(limit=100) """ order_param = "&order=release-date-oldest" if order == "oldest" else "" base_url = ( "https://www.gov.uk/search/research-and-statistics.atom?" "content_store_document_type=all_research_and_statistics&" f"organisations%5B%5D=northern-ireland-statistics-and-research-agency{order_param}" ) first_page = parse_rss_feed(base_url, timeout=timeout) if limit is None or limit <= len(first_page.entries): first_page.entries = first_page.entries[:limit] return first_page # Paginate until we have enough entries or run out of pages all_entries = list(first_page.entries) page = 2 while len(all_entries) < limit: paged_url = f"{base_url}&page={page}" try: next_page = parse_rss_feed(paged_url, timeout=timeout) except Exception: break if not next_page.entries: break all_entries.extend(next_page.entries) page += 1 return Feed( title=first_page.title, link=first_page.link, description=first_page.description, language=first_page.language, updated=first_page.updated, entries=all_entries[:limit], )