Fix crime & add actual listings
Some checks failed
CI / Check (push) Failing after 4m1s
Build and publish Docker image / build-and-push (push) Failing after 4m10s

This commit is contained in:
Andras Schmelczer 2026-05-17 11:12:25 +01:00
parent 017902b8e6
commit ebe7bbb51d
34 changed files with 2014 additions and 172754 deletions

View file

@ -1,7 +1,9 @@
"""Download police.uk crime archive ZIPs.
The archive page lists rolling monthly snapshots. Newer snapshots overlap older
ones, so extraction keeps files already written by newer archives.
ones. By default this downloader selects non-overlapping snapshots, extracts
only street-crime CSVs, and removes each ZIP after extraction so disk use is
bounded by extracted street CSVs plus one temporary archive.
"""
from __future__ import annotations
@ -33,6 +35,38 @@ ARCHIVE_LINK_RE = re.compile(
)
VALID_MD5_RE = re.compile(r"^[0-9a-fA-F]{32}$")
MONTH_RE = re.compile(r"^\d{4}-\d{2}$")
STREET_CRIME_CSV_RE = re.compile(r"^\d{4}-\d{2}-.+-street\.csv$")
CONTAINED_RANGE_RE = re.compile(
r"Contains data from (?P<start_month>[A-Za-z]+) (?P<start_year>\d{4}) "
r"to (?P<end_month>[A-Za-z]+) (?P<end_year>\d{4})",
re.IGNORECASE,
)
MONTH_NAMES = {
"jan": 1,
"january": 1,
"feb": 2,
"february": 2,
"mar": 3,
"march": 3,
"apr": 4,
"april": 4,
"may": 5,
"jun": 6,
"june": 6,
"jul": 7,
"july": 7,
"aug": 8,
"august": 8,
"sep": 9,
"sept": 9,
"september": 9,
"oct": 10,
"october": 10,
"nov": 11,
"november": 11,
"dec": 12,
"december": 12,
}
@dataclass(frozen=True)
@ -110,6 +144,74 @@ def filter_archives(
return filtered
def _month_to_index(month: str) -> int:
year, month_num = (int(part) for part in month.split("-"))
return year * 12 + month_num
def _format_month(year: int, month_name: str) -> str | None:
month_num = MONTH_NAMES.get(month_name.strip().lower())
if month_num is None:
return None
return f"{year:04d}-{month_num:02d}"
def parse_contained_range(contained_range: str) -> tuple[str, str] | None:
"""Return inclusive YYYY-MM bounds from a police.uk contained-range label."""
match = CONTAINED_RANGE_RE.search(contained_range)
if match is None:
return None
start = _format_month(
int(match.group("start_year")), match.group("start_month")
)
end = _format_month(int(match.group("end_year")), match.group("end_month"))
if start is None or end is None:
return None
return start, end
def select_coverage_archives(archives: list[CrimeArchive]) -> list[CrimeArchive]:
"""Select non-overlapping snapshots that still cover the available history.
The source publishes rolling multi-year snapshots. Downloading every monthly
snapshot mostly fetches duplicate data; for our aggregate LSOA counts we only
need continuous month coverage.
"""
selected: list[CrimeArchive] = []
earliest_covered_start: int | None = None
def sort_key(archive: CrimeArchive) -> int:
parsed_range = parse_contained_range(archive.contained_range)
if parsed_range is not None:
return _month_to_index(parsed_range[1])
return _month_to_index(archive.month)
for archive in sorted(archives, key=sort_key, reverse=True):
parsed_range = parse_contained_range(archive.contained_range)
if parsed_range is None:
selected.append(archive)
continue
start, end = parsed_range
start_index = _month_to_index(start)
end_index = _month_to_index(end)
if earliest_covered_start is None or end_index < earliest_covered_start:
if (
earliest_covered_start is not None
and end_index < earliest_covered_start - 1
):
print(
"Warning: archive ranges are not adjacent; "
f"coverage gap before {archive.filename}",
file=sys.stderr,
)
selected.append(archive)
earliest_covered_start = start_index
return selected
def file_md5(path: Path) -> str:
digest = hashlib.md5()
with path.open("rb") as file:
@ -198,8 +300,14 @@ def download_archive(
return dest
def _is_street_crime_csv(path: PurePosixPath | Path) -> bool:
return STREET_CRIME_CSV_RE.fullmatch(path.name) is not None
def _safe_csv_members(
archive: zipfile.ZipFile,
*,
street_only: bool,
) -> list[tuple[zipfile.ZipInfo, PurePosixPath]]:
members: list[tuple[zipfile.ZipInfo, PurePosixPath]] = []
for info in archive.infolist():
@ -211,6 +319,8 @@ def _safe_csv_members(
or rel_path.suffix.lower() != ".csv"
):
continue
if street_only and not _is_street_crime_csv(rel_path):
continue
members.append((info, rel_path))
return members
@ -220,13 +330,14 @@ def extract_csvs(
output_dir: Path,
*,
overwrite: bool = False,
street_only: bool = True,
) -> tuple[int, int]:
"""Extract CSVs from one ZIP. Returns (extracted, skipped)."""
extracted = 0
skipped = 0
with zipfile.ZipFile(zip_path) as archive:
for info, rel_path in _safe_csv_members(archive):
for info, rel_path in _safe_csv_members(archive, street_only=street_only):
dest = output_dir.joinpath(*rel_path.parts)
if dest.exists() and not overwrite:
skipped += 1
@ -240,12 +351,65 @@ def extract_csvs(
return extracted, skipped
def _remove_path(path: Path) -> None:
if not path.exists() and not path.is_symlink():
return
if path.is_dir() and not path.is_symlink():
shutil.rmtree(path)
else:
path.unlink()
def prepare_archive_dir(output_dir: Path, *, keep_archives: bool) -> Path:
"""Return the archive work dir, pruning retained ZIP caches by default."""
retained_archive_dir = output_dir / "_archives"
temp_archive_dir = output_dir / "_download_tmp"
if keep_archives:
retained_archive_dir.mkdir(parents=True, exist_ok=True)
return retained_archive_dir
if retained_archive_dir.exists() or retained_archive_dir.is_symlink():
print(f"Removing retained ZIP cache: {retained_archive_dir}")
_remove_path(retained_archive_dir)
_remove_path(temp_archive_dir)
temp_archive_dir.mkdir(parents=True, exist_ok=True)
return temp_archive_dir
def prune_unused_csvs(output_dir: Path) -> tuple[int, int]:
"""Remove extracted non-street CSVs left by older downloader versions."""
removed = 0
bytes_removed = 0
for path in output_dir.rglob("*.csv"):
if _is_street_crime_csv(path):
continue
try:
bytes_removed += path.stat().st_size
except OSError:
pass
path.unlink()
removed += 1
return removed, bytes_removed
def write_manifest(
output_dir: Path, archive_url: str, archives: list[CrimeArchive]
output_dir: Path,
archive_url: str,
archives: list[CrimeArchive],
*,
available_archive_count: int,
archive_strategy: str,
keep_archives: bool,
street_only: bool,
) -> None:
manifest = {
"source": archive_url,
"fetched_at": datetime.now(UTC).isoformat(),
"available_archive_count": available_archive_count,
"archive_strategy": archive_strategy,
"keep_archives": keep_archives,
"street_only": street_only,
"archives": [asdict(archive) for archive in archives],
}
path = output_dir / "archive_manifest.json"
@ -260,13 +424,13 @@ def _month_arg(value: str) -> str:
def main() -> None:
parser = argparse.ArgumentParser(
description="Download all monthly police.uk crime archive ZIPs"
description="Download police.uk crime archives needed for street-crime aggregates"
)
parser.add_argument(
"--output",
type=Path,
required=True,
help="Directory for extracted CSVs; ZIPs are kept under _archives/",
help="Directory for extracted street-crime CSVs; ZIPs are temporary unless --keep-archives is set",
)
parser.add_argument(
"--archive-url",
@ -288,6 +452,15 @@ def main() -> None:
type=int,
help="Download at most this many archives after filtering",
)
parser.add_argument(
"--archive-strategy",
choices=("coverage", "all"),
default="coverage",
help=(
"coverage selects non-overlapping snapshots for continuous month "
"coverage; all downloads every matching monthly snapshot"
),
)
parser.add_argument(
"--list",
action="store_true",
@ -299,6 +472,21 @@ def main() -> None:
action="store_false",
help="Download ZIPs only; do not extract CSVs",
)
parser.add_argument(
"--extract-all-csvs",
action="store_true",
help="Extract outcomes and stop/search CSVs as well as street-crime CSVs",
)
parser.add_argument(
"--keep-archives",
action="store_true",
help="Retain downloaded ZIPs under _archives/ instead of deleting them after extraction",
)
parser.add_argument(
"--keep-unused-csvs",
action="store_true",
help="Do not prune non-street CSVs left by older downloader runs",
)
parser.add_argument(
"--overwrite-extracted",
action="store_true",
@ -322,14 +510,21 @@ def main() -> None:
help="Per-read timeout in seconds for large ZIP downloads",
)
args = parser.parse_args()
if not args.extract and not args.keep_archives:
raise SystemExit("--no-extract requires --keep-archives")
print("Fetching police.uk archive index...")
archives = filter_archives(
available_archives = filter_archives(
fetch_archives(args.archive_url),
from_month=args.from_month,
to_month=args.to_month,
limit=args.limit,
)
archives = (
select_coverage_archives(available_archives)
if args.archive_strategy == "coverage"
else available_archives
)
if not archives:
raise SystemExit("No archives matched the requested filters")
@ -344,16 +539,34 @@ def main() -> None:
file=sys.stderr,
)
print(f"Found {len(archives)} monthly archive ZIPs")
print(
f"Selected {len(archives)} of {len(available_archives)} matching monthly "
f"archive ZIPs using {args.archive_strategy!r} strategy"
)
if args.list:
for archive in archives:
print(f"{archive.month}\t{archive.url}\t{archive.raw_md5}")
return
args.output.mkdir(parents=True, exist_ok=True)
archive_dir = args.output / "_archives"
archive_dir.mkdir(parents=True, exist_ok=True)
write_manifest(args.output, args.archive_url, archives)
archive_dir = prepare_archive_dir(args.output, keep_archives=args.keep_archives)
if not args.keep_unused_csvs:
removed, bytes_removed = prune_unused_csvs(args.output)
if removed:
print(
f"Removed {removed} unused non-street CSVs "
f"({bytes_removed / 1024 / 1024:.1f} MiB)"
)
street_only = not args.extract_all_csvs
write_manifest(
args.output,
args.archive_url,
archives,
available_archive_count=len(available_archives),
archive_strategy=args.archive_strategy,
keep_archives=args.keep_archives,
street_only=street_only,
)
total_extracted = 0
total_skipped = 0
@ -371,6 +584,7 @@ def main() -> None:
zip_path,
args.output,
overwrite=args.overwrite_extracted,
street_only=street_only,
)
total_extracted += extracted
total_skipped += skipped
@ -378,10 +592,19 @@ def main() -> None:
f"{archive.filename}: extracted {extracted} CSVs"
+ (f", skipped {skipped} existing CSVs" if skipped else "")
)
if not args.keep_archives:
zip_path.unlink(missing_ok=True)
if args.extract:
if not args.keep_archives:
_remove_path(archive_dir)
print(
f"Done. ZIPs saved in {archive_dir}; extracted {total_extracted} CSVs"
(
f"Done. ZIPs saved in {archive_dir}; "
if args.keep_archives
else "Done. ZIPs were temporary; "
)
+ f"extracted {total_extracted} CSVs"
+ (f" and skipped {total_skipped} existing CSVs" if total_skipped else "")
+ "."
)

View file

@ -1,6 +1,13 @@
from zipfile import ZipFile
from pipeline.download.crime import extract_csvs, parse_archives
from pipeline.download.crime import (
CrimeArchive,
extract_csvs,
prepare_archive_dir,
prune_unused_csvs,
select_coverage_archives,
parse_archives,
)
def test_parse_archives_reads_monthly_zip_links_only():
@ -48,6 +55,8 @@ def test_extract_csvs_preserves_existing_newer_files(tmp_path):
with ZipFile(zip_path, "w") as archive:
archive.writestr("2023-01/2023-01-city-street.csv", "older\n")
archive.writestr("2022-12/2022-12-city-street.csv", "old\n")
archive.writestr("2022-12/2022-12-city-outcomes.csv", "unused\n")
archive.writestr("2022-12/2022-12-city-stop-and-search.csv", "unused\n")
archive.writestr("../escape.csv", "bad\n")
archive.writestr("notes.txt", "ignored\n")
@ -57,4 +66,68 @@ def test_extract_csvs_preserves_existing_newer_files(tmp_path):
assert skipped == 1
assert existing.read_text() == "newer\n"
assert (output / "2022-12" / "2022-12-city-street.csv").read_text() == "old\n"
assert not (output / "2022-12" / "2022-12-city-outcomes.csv").exists()
assert not (output / "2022-12" / "2022-12-city-stop-and-search.csv").exists()
assert not (tmp_path / "escape.csv").exists()
def _archive(month: str, contained_range: str) -> CrimeArchive:
return CrimeArchive(
month=month,
label=month,
url=f"https://data.police.uk/data/archive/{month}.zip",
filename=f"{month}.zip",
size="1.0 GB",
contained_range=contained_range,
md5=None,
raw_md5="",
)
def test_select_coverage_archives_skips_overlapping_snapshots():
archives = [
_archive("2026-03", "Contains data from Apr 2023 to Mar 2026"),
_archive("2026-02", "Contains data from Mar 2023 to Feb 2026"),
_archive("2023-04", "Contains data from May 2020 to Apr 2023"),
_archive("2023-03", "Contains data from Apr 2020 to Mar 2023"),
]
selected = select_coverage_archives(archives)
assert [archive.month for archive in selected] == ["2026-03", "2023-03"]
def test_prepare_archive_dir_removes_retained_zip_cache_by_default(tmp_path):
output = tmp_path / "crime"
retained = output / "_archives"
temp = output / "_download_tmp"
retained.mkdir(parents=True)
temp.mkdir()
(retained / "old.zip").write_text("zip\n")
(temp / "old.zip.part").write_text("part\n")
archive_dir = prepare_archive_dir(output, keep_archives=False)
assert archive_dir == temp
assert archive_dir.exists()
assert list(archive_dir.iterdir()) == []
assert not retained.exists()
def test_prune_unused_csvs_removes_non_street_csvs(tmp_path):
output = tmp_path / "crime"
month_dir = output / "2024-01"
month_dir.mkdir(parents=True)
street = month_dir / "2024-01-city-street.csv"
outcomes = month_dir / "2024-01-city-outcomes.csv"
stop_search = month_dir / "2024-01-city-stop-and-search.csv"
street.write_text("street\n")
outcomes.write_text("outcomes\n")
stop_search.write_text("stop\n")
removed, _ = prune_unused_csvs(output)
assert removed == 2
assert street.exists()
assert not outcomes.exists()
assert not stop_search.exists()