"""Build PMTiles point tiles for the crime heatmap overlay. The output intentionally keeps point features rather than H3/grid aggregates so MapLibre can render a true client-side heatmap. Police.uk coordinates are published anonymous map points, not exact offence locations. """ from __future__ import annotations import argparse import json import shutil import subprocess import tempfile from pathlib import Path import polars as pl from pipeline.local_temp import local_tmp_dir from pipeline.transform.crime import find_street_crime_csvs def _latest_months(crime_dir: Path, month_count: int) -> list[str]: csvs, _ignored = find_street_crime_csvs(crime_dir) months = sorted({path.parent.name for path in csvs}) if not months: raise FileNotFoundError(f"No street crime CSVs found in {crime_dir}") return months[-month_count:] def _street_csvs_for_months(crime_dir: Path, months: set[str]) -> list[Path]: csvs, _ignored = find_street_crime_csvs(crime_dir) selected = [path for path in csvs if path.parent.name in months] if not selected: raise FileNotFoundError(f"No street crime CSVs found for {sorted(months)}") return selected def _require_tippecanoe() -> str: executable = shutil.which("tippecanoe") if executable is None: raise RuntimeError( "tippecanoe is required to build crime hotspot PMTiles. " "Install tippecanoe and rerun this target." ) return executable def _write_geojsonseq(csvs: list[Path], output_path: Path) -> int: df = ( pl.scan_csv( csvs, schema_overrides={ "Longitude": pl.Float64, "Latitude": pl.Float64, "Month": pl.Utf8, "Crime type": pl.Utf8, }, ignore_errors=True, ) .select( pl.col("Longitude").alias("lon"), pl.col("Latitude").alias("lat"), pl.col("Month").alias("month"), pl.col("Crime type").alias("crime_type"), ) .drop_nulls(["lon", "lat"]) .filter(pl.col("lon").is_between(-9.5, 5.0)) .filter(pl.col("lat").is_between(49.0, 57.0)) .collect(engine="streaming") ) with output_path.open("w") as file: for row in df.iter_rows(named=True): feature = { "type": "Feature", "geometry": { "type": "Point", "coordinates": [row["lon"], row["lat"]], }, "properties": { "count": 1, "weight": 1, "month": row["month"], "crime_type": row["crime_type"], }, } file.write(json.dumps(feature, separators=(",", ":")) + "\n") return df.height def build_crime_hotspot_tiles( crime_dir: Path, output_path: Path, months: int, min_zoom: int, max_zoom: int, ) -> None: tippecanoe = _require_tippecanoe() selected_months = set(_latest_months(crime_dir, months)) csvs = _street_csvs_for_months(crime_dir, selected_months) output_path.parent.mkdir(parents=True, exist_ok=True) with tempfile.TemporaryDirectory(dir=local_tmp_dir()) as tmp: ndjson_path = Path(tmp) / "crime_hotspots.geojsonseq" feature_count = _write_geojsonseq(csvs, ndjson_path) print( f"Writing {feature_count:,} approximate crime heatmap points " f"from {min(selected_months)} to {max(selected_months)}" ) subprocess.run( [ tippecanoe, "--force", "--output", str(output_path), "--layer", "crime_hotspots", "--minimum-zoom", str(min_zoom), "--maximum-zoom", str(max_zoom), "--drop-densest-as-needed", "--extend-zooms-if-still-dropping", str(ndjson_path), ], check=True, ) def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--input", type=Path, required=True, help="Crime CSV directory") parser.add_argument( "--output", type=Path, required=True, help="Output .pmtiles path" ) parser.add_argument( "--months", type=int, default=12, help="Latest complete months to include in the heatmap", ) parser.add_argument("--min-zoom", type=int, default=12) parser.add_argument("--max-zoom", type=int, default=16) args = parser.parse_args() build_crime_hotspot_tiles( args.input, args.output, args.months, args.min_zoom, args.max_zoom, ) if __name__ == "__main__": main()