import polars as pl from pipeline.transform.crime import find_street_crime_csvs, transform_crime def test_find_street_crime_csvs_ignores_archive_sidecars(tmp_path): crime_dir = tmp_path / "crime" month_dir = crime_dir / "2024-01" month_dir.mkdir(parents=True) street = month_dir / "2024-01-test-force-street.csv" street.touch() (month_dir / "2024-01-test-force-outcomes.csv").touch() (month_dir / "2024-01-test-force-stop-and-search.csv").touch() (crime_dir / "notes.csv").touch() csvs, ignored_count = find_street_crime_csvs(crime_dir) assert csvs == [street] assert ignored_count == 3 def test_transform_crime_reads_only_street_crime_csvs(tmp_path): crime_dir = tmp_path / "crime" month_dir = crime_dir / "2024-01" month_dir.mkdir(parents=True) (month_dir / "2024-01-test-force-street.csv").write_text( "\n".join( [ "Crime ID,Month,Reported by,Falls within,Longitude,Latitude,Location,LSOA code,LSOA name,Crime type,Last outcome category,Context", "1,2024-01,Test Force,Test Force,-0.1,51.5,On or near Test Street,E01000001,Test LSOA,Burglary,Under investigation,", "2,2024-01,Test Force,Test Force,-0.1,51.5,On or near Test Street,E01000001,Test LSOA,Burglary,Under investigation,", "3,2024-01,Test Force,Test Force,-0.1,51.5,On or near Test Street,,No LSOA,Robbery,Under investigation,", ] ) + "\n" ) (month_dir / "2024-01-test-force-outcomes.csv").write_text( "Crime ID,Month,Reported by,Outcome type\n1,2024-01,Test Force,Charged\n" ) output = tmp_path / "crime.parquet" transform_crime(crime_dir, output) result = pl.read_parquet(output).to_dicts() assert result == [{"LSOA code": "E01000001", "Burglary (avg/yr)": 24.0}] def test_transform_crime_annualises_over_all_valid_months(tmp_path): crime_dir = tmp_path / "crime" jan_dir = crime_dir / "2024-01" feb_dir = crime_dir / "2024-02" jan_dir.mkdir(parents=True) feb_dir.mkdir(parents=True) header = "Crime ID,Month,Reported by,Falls within,Longitude,Latitude,Location,LSOA code,LSOA name,Crime type,Last outcome category,Context" (jan_dir / "2024-01-test-force-street.csv").write_text( "\n".join( [ header, "1,2024-01,Test Force,Test Force,-0.1,51.5,On or near Test Street,E01000001,Test LSOA,Burglary,Under investigation,", "2,2024-01,Test Force,Test Force,-0.1,51.5,On or near Test Street,E01000001,Test LSOA,Burglary,Under investigation,", "3,2024-01,Test Force,Test Force,-0.1,51.5,On or near Test Street,E01000002,Other LSOA,Robbery,Under investigation,", ] ) + "\n" ) (feb_dir / "2024-02-test-force-street.csv").write_text( "\n".join( [ header, "4,2024-02,Test Force,Test Force,-0.1,51.5,On or near Test Street,E01000002,Other LSOA,Robbery,Under investigation,", ] ) + "\n" ) output = tmp_path / "crime.parquet" transform_crime(crime_dir, output) result = pl.read_parquet(output).sort("LSOA code").to_dicts() assert result == [ { "LSOA code": "E01000001", "Burglary (avg/yr)": 12.0, "Robbery (avg/yr)": 0.0, }, { "LSOA code": "E01000002", "Burglary (avg/yr)": 0.0, "Robbery (avg/yr)": 12.0, }, ] def test_transform_crime_writes_by_year_output(tmp_path): crime_dir = tmp_path / "crime" jan23 = crime_dir / "2023-01" jan24 = crime_dir / "2024-01" feb24 = crime_dir / "2024-02" for d in (jan23, jan24, feb24): d.mkdir(parents=True) header = "Crime ID,Month,Reported by,Falls within,Longitude,Latitude,Location,LSOA code,LSOA name,Crime type,Last outcome category,Context" (jan23 / "2023-01-test-force-street.csv").write_text( "\n".join( [ header, "1,2023-01,F,F,-0.1,51.5,X,E01000001,L,Burglary,U,", "2,2023-01,F,F,-0.1,51.5,X,E01000001,L,Robbery,U,", ] ) + "\n" ) (jan24 / "2024-01-test-force-street.csv").write_text( "\n".join( [ header, "3,2024-01,F,F,-0.1,51.5,X,E01000001,L,Burglary,U,", "4,2024-01,F,F,-0.1,51.5,X,E01000001,L,Burglary,U,", ] ) + "\n" ) (feb24 / "2024-02-test-force-street.csv").write_text( "\n".join( [ header, "5,2024-02,F,F,-0.1,51.5,X,E01000001,L,Anti-social behaviour,U,", ] ) + "\n" ) output = tmp_path / "crime.parquet" by_year_output = tmp_path / "crime_by_year.parquet" transform_crime(crime_dir, output, by_year_output) by_year = pl.read_parquet(by_year_output) assert by_year.height == 1 cols = set(by_year.columns) assert "Burglary (by year)" in cols assert "Serious crime (by year)" in cols assert "Minor crime (by year)" in cols row = by_year.row(0, named=True) burglary = sorted(row["Burglary (by year)"], key=lambda r: r["year"]) # 2023: 1 burglary in 1 month → 12/yr; 2024: 2 in 2 months → 12/yr assert burglary == [ {"year": 2023, "count": 12.0}, {"year": 2024, "count": 12.0}, ] # Serious crime in 2023 = Burglary(12) + Robbery(12) = 24 serious = {p["year"]: p["count"] for p in row["Serious crime (by year)"]} assert serious[2023] == 24.0 assert serious[2024] == 12.0 def test_transform_crime_fails_without_valid_months(tmp_path): crime_dir = tmp_path / "crime" month_dir = crime_dir / "2024-01" month_dir.mkdir(parents=True) (month_dir / "2024-01-test-force-street.csv").write_text( "\n".join( [ "Crime ID,Month,Reported by,Falls within,Longitude,Latitude,Location,LSOA code,LSOA name,Crime type,Last outcome category,Context", "1,,Test Force,Test Force,-0.1,51.5,On or near Test Street,E01000001,Test LSOA,Burglary,Under investigation,", ] ) + "\n" ) output = tmp_path / "crime.parquet" try: transform_crime(crime_dir, output) except ValueError as exc: assert "No valid crime months" in str(exc) else: raise AssertionError("Expected ValueError") def test_transform_crime_applies_lsoa_2011_to_2021_lookup(tmp_path): crime_dir = tmp_path / "crime" month_dir = crime_dir / "2024-01" month_dir.mkdir(parents=True) header = "Crime ID,Month,Reported by,Falls within,Longitude,Latitude,Location,LSOA code,LSOA name,Crime type,Last outcome category,Context" # E01000001 was split into two 2021 LSOAs; E01000099 is unchanged. (month_dir / "2024-01-test-force-street.csv").write_text( "\n".join( [ header, "1,2024-01,F,F,-0.1,51.5,X,E01000001,L,Burglary,U,", "2,2024-01,F,F,-0.1,51.5,X,E01000001,L,Burglary,U,", "3,2024-01,F,F,-0.1,51.5,X,E01000099,L,Burglary,U,", ] ) + "\n" ) lookup_path = tmp_path / "lookup.parquet" pl.DataFrame( { "lsoa11": ["E01000001", "E01000001", "E01000099"], "lsoa21": ["E01000050", "E01000051", "E01000099"], } ).write_parquet(lookup_path) output = tmp_path / "crime.parquet" by_year_output = tmp_path / "by_year.parquet" transform_crime(crime_dir, output, by_year_output, lookup_path) # Split LSOA: 2 burglaries split evenly → 1/yr each child, annualised to 12/yr each. avg = pl.read_parquet(output).sort("LSOA code").to_dicts() assert avg == [ {"LSOA code": "E01000050", "Burglary (avg/yr)": 12.0}, {"LSOA code": "E01000051", "Burglary (avg/yr)": 12.0}, {"LSOA code": "E01000099", "Burglary (avg/yr)": 12.0}, ] by_year = pl.read_parquet(by_year_output).sort("LSOA code").to_dicts() burglaries = {row["LSOA code"]: row["Burglary (by year)"] for row in by_year} assert burglaries["E01000050"] == [{"year": 2024, "count": 12.0}] assert burglaries["E01000051"] == [{"year": 2024, "count": 12.0}] assert burglaries["E01000099"] == [{"year": 2024, "count": 12.0}]