perfect-postcode/pipeline/transform/crime.py
2026-05-14 08:09:19 +01:00

117 lines
3.6 KiB
Python

import argparse
import re
from pathlib import Path
import polars as pl
STREET_CRIME_CSV_RE = re.compile(r"^\d{4}-\d{2}-.+-street\.csv$")
MONTH_RE = r"^\d{4}-\d{2}$"
def find_street_crime_csvs(crime_dir: Path) -> tuple[list[Path], int]:
csvs = sorted(crime_dir.rglob("*.csv"))
street_csvs = [path for path in csvs if STREET_CRIME_CSV_RE.fullmatch(path.name)]
return street_csvs, len(csvs) - len(street_csvs)
def transform_crime(crime_dir: Path, output_path: Path) -> None:
csvs, ignored_csv_count = find_street_crime_csvs(crime_dir)
if not csvs:
raise FileNotFoundError(f"No street crime CSV files found in {crime_dir}")
month_count = len({path.parent.name for path in csvs})
print(
f"Found {len(csvs)} street crime CSV files across {month_count} months"
+ (
f" (ignored {ignored_csv_count} non-street CSVs)"
if ignored_csv_count
else ""
)
)
df = pl.scan_csv(
csvs,
schema_overrides={
"LSOA code": pl.Utf8,
"Crime type": pl.Utf8,
"Month": pl.Utf8,
},
).select("LSOA code", "Crime type", "Month")
valid_month_expr = pl.col("Month").str.contains(MONTH_RE)
valid_months = (
df.filter(valid_month_expr)
.select("Month")
.unique()
.collect(engine="streaming")["Month"]
.sort()
.to_list()
)
if not valid_months:
raise ValueError(f"No valid crime months found in {crime_dir}")
valid_month_count = len(valid_months)
print(
f"Using {valid_month_count} valid data months "
f"({valid_months[0]} to {valid_months[-1]})"
)
# Count monthly incidents, then annualise over every valid month in the dataset.
yearly_counts = (
df.filter(
valid_month_expr
& pl.col("LSOA code").is_not_null()
& (pl.col("LSOA code") != "")
& pl.col("Crime type").is_not_null()
& (pl.col("Crime type") != "")
)
.group_by("LSOA code", "Month", "Crime type")
.agg(pl.len().alias("count"))
.group_by("LSOA code", "Crime type")
.agg(
(pl.col("count").sum() / pl.lit(valid_month_count) * 12)
.round(1)
.alias("yearly_avg")
)
.collect(engine="streaming")
)
if yearly_counts.is_empty():
raise ValueError(f"No valid crime rows found in {crime_dir}")
print(f"Crime types: {sorted(yearly_counts['Crime type'].unique().to_list())}")
# Pivot crime types into columns
wide = yearly_counts.pivot(
on="Crime type",
index="LSOA code",
values="yearly_avg",
)
# Fill nulls with 0 and rename columns to be descriptive
value_cols = [col for col in wide.columns if col != "LSOA code"]
wide = wide.with_columns(pl.col(col).fill_null(0) for col in value_cols)
wide = wide.rename({col: f"{col} (avg/yr)" for col in value_cols})
print(f"Output shape: {wide.shape}")
print(f"Columns: {wide.columns}")
wide.write_parquet(output_path, compression="zstd")
print(f"Saved to {output_path}")
def main() -> None:
parser = argparse.ArgumentParser(
description="Transform crime CSVs into yearly average by LSOA and crime type"
)
parser.add_argument(
"--input", type=Path, required=True, help="Directory containing crime data"
)
parser.add_argument(
"--output", type=Path, required=True, help="Output parquet file path"
)
args = parser.parse_args()
transform_crime(args.input, args.output)
if __name__ == "__main__":
main()