"""Download police.uk crime archive ZIPs. The archive page lists rolling monthly snapshots. Newer snapshots overlap older ones. By default this downloader selects non-overlapping snapshots, extracts only street-crime CSVs, and removes each ZIP after extraction so disk use is bounded by extracted street CSVs plus one temporary archive. """ from __future__ import annotations import argparse import hashlib import html import json import re import shutil import sys import zipfile from dataclasses import asdict, dataclass from datetime import UTC, datetime from pathlib import Path, PurePosixPath from urllib.parse import urljoin import httpx from tqdm import tqdm ARCHIVE_URL = "https://data.police.uk/data/archive/" ARCHIVE_LINK_RE = re.compile( r'
\s*.*?' r'' r"(?P\s*\((?P[^)]+)\)\s*" r'

\s*(?P.*?)\s*

\s*' r'

(?P.*?)

', re.DOTALL, ) VALID_MD5_RE = re.compile(r"^[0-9a-fA-F]{32}$") MONTH_RE = re.compile(r"^\d{4}-\d{2}$") STREET_CRIME_CSV_RE = re.compile(r"^\d{4}-\d{2}-.+-street\.csv$") CONTAINED_RANGE_RE = re.compile( r"Contains data from (?P[A-Za-z]+) (?P\d{4}) " r"to (?P[A-Za-z]+) (?P\d{4})", re.IGNORECASE, ) MONTH_NAMES = { "jan": 1, "january": 1, "feb": 2, "february": 2, "mar": 3, "march": 3, "apr": 4, "april": 4, "may": 5, "jun": 6, "june": 6, "jul": 7, "july": 7, "aug": 8, "august": 8, "sep": 9, "sept": 9, "september": 9, "oct": 10, "october": 10, "nov": 11, "november": 11, "dec": 12, "december": 12, } @dataclass(frozen=True) class CrimeArchive: month: str label: str url: str filename: str size: str contained_range: str md5: str | None raw_md5: str def _clean_text(value: str) -> str: text = re.sub(r"<[^>]+>", " ", value) return re.sub(r"\s+", " ", html.unescape(text)).strip() def parse_archives(page_html: str, base_url: str = ARCHIVE_URL) -> list[CrimeArchive]: """Parse monthly crime archive links from the police.uk archive page.""" archives: list[CrimeArchive] = [] for match in ARCHIVE_LINK_RE.finditer(page_html): raw_md5 = _clean_text(match.group("md5")).lower() md5 = raw_md5 if VALID_MD5_RE.fullmatch(raw_md5) else None href = html.unescape(match.group("href")) archives.append( CrimeArchive( month=match.group("month"), label=_clean_text(match.group("label")), url=urljoin(base_url, href), filename=Path(href).name, size=_clean_text(match.group("size")), contained_range=_clean_text(match.group("contained_range")), md5=md5, raw_md5=raw_md5, ) ) return archives def fetch_archives(archive_url: str = ARCHIVE_URL) -> list[CrimeArchive]: """Fetch and parse the archive index.""" with httpx.Client( follow_redirects=True, timeout=httpx.Timeout(30.0, read=60.0), headers={"User-Agent": "perfect-postcode-data-pipeline/1.0"}, ) as client: response = client.get(archive_url) response.raise_for_status() archives = parse_archives(response.text, archive_url) if not archives: raise RuntimeError(f"No monthly archive ZIPs found at {archive_url}") return archives def filter_archives( archives: list[CrimeArchive], *, from_month: str | None = None, to_month: str | None = None, limit: int | None = None, ) -> list[CrimeArchive]: """Filter archives by inclusive YYYY-MM bounds while preserving page order.""" filtered = [ archive for archive in archives if (from_month is None or archive.month >= from_month) and (to_month is None or archive.month <= to_month) ] if limit is not None: filtered = filtered[:limit] return filtered def _month_to_index(month: str) -> int: year, month_num = (int(part) for part in month.split("-")) return year * 12 + month_num def _format_month(year: int, month_name: str) -> str | None: month_num = MONTH_NAMES.get(month_name.strip().lower()) if month_num is None: return None return f"{year:04d}-{month_num:02d}" def parse_contained_range(contained_range: str) -> tuple[str, str] | None: """Return inclusive YYYY-MM bounds from a police.uk contained-range label.""" match = CONTAINED_RANGE_RE.search(contained_range) if match is None: return None start = _format_month( int(match.group("start_year")), match.group("start_month") ) end = _format_month(int(match.group("end_year")), match.group("end_month")) if start is None or end is None: return None return start, end def select_coverage_archives(archives: list[CrimeArchive]) -> list[CrimeArchive]: """Select non-overlapping snapshots that still cover the available history. The source publishes rolling multi-year snapshots. Downloading every monthly snapshot mostly fetches duplicate data; for our aggregate LSOA counts we only need continuous month coverage. """ selected: list[CrimeArchive] = [] earliest_covered_start: int | None = None def sort_key(archive: CrimeArchive) -> int: parsed_range = parse_contained_range(archive.contained_range) if parsed_range is not None: return _month_to_index(parsed_range[1]) return _month_to_index(archive.month) for archive in sorted(archives, key=sort_key, reverse=True): parsed_range = parse_contained_range(archive.contained_range) if parsed_range is None: selected.append(archive) continue start, end = parsed_range start_index = _month_to_index(start) end_index = _month_to_index(end) if earliest_covered_start is None or end_index < earliest_covered_start: if ( earliest_covered_start is not None and end_index < earliest_covered_start - 1 ): print( "Warning: archive ranges are not adjacent; " f"coverage gap before {archive.filename}", file=sys.stderr, ) selected.append(archive) earliest_covered_start = start_index return selected def file_md5(path: Path) -> str: digest = hashlib.md5() with path.open("rb") as file: for chunk in iter(lambda: file.read(1024 * 1024), b""): digest.update(chunk) return digest.hexdigest() def download_archive( archive: CrimeArchive, archive_dir: Path, *, verify: bool, force: bool, timeout: float, ) -> Path: """Download one archive ZIP, resuming an existing .part file when possible.""" dest = archive_dir / archive.filename partial = dest.with_suffix(dest.suffix + ".part") if force: dest.unlink(missing_ok=True) partial.unlink(missing_ok=True) if dest.exists(): if verify and archive.md5 is not None: actual_md5 = file_md5(dest) if actual_md5 == archive.md5: print(f"{archive.filename}: already downloaded") return dest print( f"{archive.filename}: checksum mismatch, downloading again", file=sys.stderr, ) dest.unlink() partial.unlink(missing_ok=True) else: print(f"{archive.filename}: already downloaded") return dest resume_from = partial.stat().st_size if partial.exists() else 0 headers = {"Range": f"bytes={resume_from}-"} if resume_from else {} with httpx.stream( "GET", archive.url, headers=headers, follow_redirects=True, timeout=httpx.Timeout(30.0, read=timeout), ) as response: if response.status_code == 206 and resume_from: mode = "ab" initial = resume_from else: response.raise_for_status() mode = "wb" initial = 0 total_header = int(response.headers.get("content-length", 0)) total = initial + total_header if total_header else None with ( partial.open(mode) as output, tqdm( total=total, initial=initial, unit="B", unit_scale=True, unit_divisor=1024, desc=archive.filename, ) as progress, ): for chunk in response.iter_bytes(chunk_size=1024 * 1024): output.write(chunk) progress.update(len(chunk)) partial.replace(dest) if verify and archive.md5 is not None: actual_md5 = file_md5(dest) if actual_md5 != archive.md5: dest.unlink(missing_ok=True) raise RuntimeError( f"{archive.filename}: MD5 mismatch: expected {archive.md5}, got {actual_md5}" ) return dest def _is_street_crime_csv(path: PurePosixPath | Path) -> bool: return STREET_CRIME_CSV_RE.fullmatch(path.name) is not None def _safe_csv_members( archive: zipfile.ZipFile, *, street_only: bool, ) -> list[tuple[zipfile.ZipInfo, PurePosixPath]]: members: list[tuple[zipfile.ZipInfo, PurePosixPath]] = [] for info in archive.infolist(): rel_path = PurePosixPath(info.filename) if ( info.is_dir() or rel_path.is_absolute() or ".." in rel_path.parts or rel_path.suffix.lower() != ".csv" ): continue if street_only and not _is_street_crime_csv(rel_path): continue members.append((info, rel_path)) return members def extract_csvs( zip_path: Path, output_dir: Path, *, overwrite: bool = False, street_only: bool = True, ) -> tuple[int, int]: """Extract CSVs from one ZIP. Returns (extracted, skipped).""" extracted = 0 skipped = 0 with zipfile.ZipFile(zip_path) as archive: for info, rel_path in _safe_csv_members(archive, street_only=street_only): dest = output_dir.joinpath(*rel_path.parts) if dest.exists() and not overwrite: skipped += 1 continue dest.parent.mkdir(parents=True, exist_ok=True) with archive.open(info) as source, dest.open("wb") as target: shutil.copyfileobj(source, target) extracted += 1 return extracted, skipped def _remove_path(path: Path) -> None: if not path.exists() and not path.is_symlink(): return if path.is_dir() and not path.is_symlink(): shutil.rmtree(path) else: path.unlink() def prepare_archive_dir(output_dir: Path, *, keep_archives: bool) -> Path: """Return the archive work dir, pruning retained ZIP caches by default.""" retained_archive_dir = output_dir / "_archives" temp_archive_dir = output_dir / "_download_tmp" if keep_archives: retained_archive_dir.mkdir(parents=True, exist_ok=True) return retained_archive_dir if retained_archive_dir.exists() or retained_archive_dir.is_symlink(): print(f"Removing retained ZIP cache: {retained_archive_dir}") _remove_path(retained_archive_dir) _remove_path(temp_archive_dir) temp_archive_dir.mkdir(parents=True, exist_ok=True) return temp_archive_dir def prune_unused_csvs(output_dir: Path) -> tuple[int, int]: """Remove extracted non-street CSVs left by older downloader versions.""" removed = 0 bytes_removed = 0 for path in output_dir.rglob("*.csv"): if _is_street_crime_csv(path): continue try: bytes_removed += path.stat().st_size except OSError: pass path.unlink() removed += 1 return removed, bytes_removed def write_manifest( output_dir: Path, archive_url: str, archives: list[CrimeArchive], *, available_archive_count: int, archive_strategy: str, keep_archives: bool, street_only: bool, ) -> None: manifest = { "source": archive_url, "fetched_at": datetime.now(UTC).isoformat(), "available_archive_count": available_archive_count, "archive_strategy": archive_strategy, "keep_archives": keep_archives, "street_only": street_only, "archives": [asdict(archive) for archive in archives], } path = output_dir / "archive_manifest.json" path.write_text(json.dumps(manifest, indent=2) + "\n") def _month_arg(value: str) -> str: if not MONTH_RE.fullmatch(value): raise argparse.ArgumentTypeError("month must be in YYYY-MM format") return value def main() -> None: parser = argparse.ArgumentParser( description="Download police.uk crime archives needed for street-crime aggregates" ) parser.add_argument( "--output", type=Path, required=True, help="Directory for extracted street-crime CSVs; ZIPs are temporary unless --keep-archives is set", ) parser.add_argument( "--archive-url", default=ARCHIVE_URL, help=f"Archive index URL (default: {ARCHIVE_URL})", ) parser.add_argument( "--from-month", type=_month_arg, help="Only download archives from this YYYY-MM onwards", ) parser.add_argument( "--to-month", type=_month_arg, help="Only download archives up to this YYYY-MM", ) parser.add_argument( "--limit", type=int, help="Download at most this many archives after filtering", ) parser.add_argument( "--archive-strategy", choices=("coverage", "all"), default="coverage", help=( "coverage selects non-overlapping snapshots for continuous month " "coverage; all downloads every matching monthly snapshot" ), ) parser.add_argument( "--list", action="store_true", help="Print the archive URLs that would be downloaded and exit", ) parser.add_argument( "--no-extract", dest="extract", action="store_false", help="Download ZIPs only; do not extract CSVs", ) parser.add_argument( "--extract-all-csvs", action="store_true", help="Extract outcomes and stop/search CSVs as well as street-crime CSVs", ) parser.add_argument( "--keep-archives", action="store_true", help="Retain downloaded ZIPs under _archives/ instead of deleting them after extraction", ) parser.add_argument( "--keep-unused-csvs", action="store_true", help="Do not prune non-street CSVs left by older downloader runs", ) parser.add_argument( "--overwrite-extracted", action="store_true", help="Overwrite CSVs when extracting overlapping archive snapshots", ) parser.add_argument( "--no-verify", dest="verify", action="store_false", help="Skip MD5 verification", ) parser.add_argument( "--force", action="store_true", help="Redownload archives even if ZIP files already exist", ) parser.add_argument( "--timeout", type=float, default=600.0, help="Per-read timeout in seconds for large ZIP downloads", ) args = parser.parse_args() if not args.extract and not args.keep_archives: raise SystemExit("--no-extract requires --keep-archives") print("Fetching police.uk archive index...") available_archives = filter_archives( fetch_archives(args.archive_url), from_month=args.from_month, to_month=args.to_month, limit=args.limit, ) archives = ( select_coverage_archives(available_archives) if args.archive_strategy == "coverage" else available_archives ) if not archives: raise SystemExit("No archives matched the requested filters") bad_md5 = [ archive.filename for archive in archives if archive.raw_md5 and not archive.md5 ] if bad_md5: print( "Warning: ignoring malformed MD5 values for " + ", ".join(bad_md5[:5]) + ("..." if len(bad_md5) > 5 else ""), file=sys.stderr, ) print( f"Selected {len(archives)} of {len(available_archives)} matching monthly " f"archive ZIPs using {args.archive_strategy!r} strategy" ) if args.list: for archive in archives: print(f"{archive.month}\t{archive.url}\t{archive.raw_md5}") return args.output.mkdir(parents=True, exist_ok=True) archive_dir = prepare_archive_dir(args.output, keep_archives=args.keep_archives) if not args.keep_unused_csvs: removed, bytes_removed = prune_unused_csvs(args.output) if removed: print( f"Removed {removed} unused non-street CSVs " f"({bytes_removed / 1024 / 1024:.1f} MiB)" ) street_only = not args.extract_all_csvs write_manifest( args.output, args.archive_url, archives, available_archive_count=len(available_archives), archive_strategy=args.archive_strategy, keep_archives=args.keep_archives, street_only=street_only, ) total_extracted = 0 total_skipped = 0 for index, archive in enumerate(archives, start=1): print(f"[{index}/{len(archives)}] {archive.label} ({archive.size})") zip_path = download_archive( archive, archive_dir, verify=args.verify, force=args.force, timeout=args.timeout, ) if args.extract: extracted, skipped = extract_csvs( zip_path, args.output, overwrite=args.overwrite_extracted, street_only=street_only, ) total_extracted += extracted total_skipped += skipped print( f"{archive.filename}: extracted {extracted} CSVs" + (f", skipped {skipped} existing CSVs" if skipped else "") ) if not args.keep_archives: zip_path.unlink(missing_ok=True) if args.extract: if not args.keep_archives: _remove_path(archive_dir) print( ( f"Done. ZIPs saved in {archive_dir}; " if args.keep_archives else "Done. ZIPs were temporary; " ) + f"extracted {total_extracted} CSVs" + (f" and skipped {total_skipped} existing CSVs" if total_skipped else "") + "." ) else: print(f"Done. ZIPs saved in {archive_dir}.") if __name__ == "__main__": main()