Add more data & fix ooms

This commit is contained in:
Andras Schmelczer 2026-01-31 14:39:46 +00:00
parent f60fbec9d4
commit a8cc44ea97
8 changed files with 242 additions and 82 deletions

View file

@ -0,0 +1,63 @@
import argparse
from pathlib import Path
import polars as pl
def transform_crime(crime_dir: Path, output_path: Path) -> None:
csvs = sorted(crime_dir.rglob("*.csv"))
print(f"Found {len(csvs)} CSV files across {len(list(crime_dir.iterdir()))} months")
df = pl.scan_csv(
csvs,
schema_overrides={"LSOA code": pl.Utf8, "Crime type": pl.Utf8, "Month": pl.Utf8},
).select("LSOA code", "Crime type", "Month")
# Extract year, count crimes per LSOA / year / crime type
yearly_counts = (
df.filter(pl.col("LSOA code").is_not_null() & (pl.col("LSOA code") != ""))
.with_columns(pl.col("Month").str.slice(0, 4).alias("year"))
.group_by("LSOA code", "year", "Crime type")
.agg(pl.len().alias("count"))
.group_by("LSOA code", "Crime type")
.agg(pl.col("count").mean().round(1).alias("yearly_avg"))
.collect(engine="streaming")
)
print(f"Crime types: {sorted(yearly_counts['Crime type'].unique().to_list())}")
# Pivot crime types into columns
wide = yearly_counts.pivot(
on="Crime type",
index="LSOA code",
values="yearly_avg",
)
# Fill nulls with 0 and rename columns to be descriptive
value_cols = [col for col in wide.columns if col != "LSOA code"]
wide = wide.with_columns(pl.col(col).fill_null(0) for col in value_cols)
wide = wide.rename({col: f"{col} (avg/yr)" for col in value_cols})
print(f"Output shape: {wide.shape}")
print(f"Columns: {wide.columns}")
wide.write_parquet(output_path, compression="zstd")
print(f"Saved to {output_path}")
def main() -> None:
parser = argparse.ArgumentParser(
description="Transform crime CSVs into yearly average by LSOA and crime type"
)
parser.add_argument(
"--input", type=Path, required=True, help="Directory containing crime data"
)
parser.add_argument(
"--output", type=Path, required=True, help="Output parquet file path"
)
args = parser.parse_args()
transform_crime(args.input, args.output)
if __name__ == "__main__":
main()

View file

@ -105,7 +105,7 @@ def main():
right_postcode_col="POSTCODE",
)
.drop("POSTCODE")
.collect()
.collect(engine="streaming")
)
matched = joined.filter(

View file

@ -9,99 +9,127 @@ def _build_wide(
iod_path: Path | None,
poi_proximity_path: Path | None,
journey_times_path: Path | None,
ethnicity_path: Path | None,
crime_path: Path | None,
) -> pl.DataFrame:
"""Build the wide dataframe by joining epc_pp with all auxiliary data."""
print("Loading epc_pp...")
wide = pl.read_parquet(epc_pp_path)
print(f" {wide.shape[0]:,} rows, {wide.estimated_size('mb'):.1f} MB")
print("Scanning epc_pp...")
wide = pl.scan_parquet(epc_pp_path)
# GPS coordinates + LSOA from ArcGIS
print("Joining GPS coordinates...")
arcgis = pl.read_parquet(arcgis_path).select(
arcgis = pl.scan_parquet(arcgis_path).select(
pl.col("pcds").alias("postcode"),
"lat",
pl.col("long").alias("lon"),
"lsoa21",
)
wide = wide.join(arcgis, on="postcode", how="inner")
print(
f" {wide.shape[0]:,} rows after GPS join, {wide.estimated_size('mb'):.1f} MB"
)
# Journey times (optional)
if journey_times_path and journey_times_path.exists():
print("Joining journey times...")
journey_times = pl.read_parquet(journey_times_path).select(
journey_times = pl.scan_parquet(journey_times_path).select(
"postcode",
"public_transport_easy_minutes",
"public_transport_quick_minutes",
"cycling_minutes",
)
wide = wide.join(journey_times, on="postcode", how="left")
print(f" {wide.estimated_size('mb'):.1f} MB after journey times")
# Index of Deprivation
if iod_path and iod_path.exists():
print("Joining IoD scores...")
iod = pl.read_parquet(iod_path)
wide = wide.join(iod, left_on="lsoa21", right_on="LSOA code (2021)", how="left")
print(f" {wide.estimated_size('mb'):.1f} MB after IoD")
print("Joining IoD scores...")
iod = pl.scan_parquet(iod_path)
wide = wide.join(iod, left_on="lsoa21", right_on="LSOA code (2021)", how="left")
# POI proximity counts (pre-computed per postcode)
if poi_proximity_path and poi_proximity_path.exists():
print("Joining POI proximity counts...")
poi_counts = pl.read_parquet(poi_proximity_path)
wide = wide.join(poi_counts, on="postcode", how="left")
print(f" {wide.estimated_size('mb'):.1f} MB after POI counts")
# Ethnicity by local authority
print("Joining ethnicity data...")
ethnicity = pl.scan_parquet(ethnicity_path)
wide = wide.join(
ethnicity,
left_on="Local Authority District code (2024)",
right_on="Geography_code",
how="left",
)
# Crime stats by LSOA
print("Joining crime data...")
crime = pl.scan_parquet(crime_path)
wide = wide.join(crime, left_on="lsoa21", right_on="LSOA code", how="left")
print("Joining POI proximity counts...")
poi_counts = pl.scan_parquet(poi_proximity_path)
wide = wide.join(poi_counts, on="postcode", how="left")
# Convert construction_age_band to numeric year
if "construction_age_band" in wide.columns:
wide = wide.with_columns(
pl.col("construction_age_band")
.str.replace("England and Wales: ", "")
.str.replace(" onwards", "")
.str.extract(r"(\d{4})", 1)
.cast(pl.UInt16, strict=False)
.alias("construction_age_band"),
)
wide = wide.with_columns(
pl.col("construction_age_band")
.str.replace("England and Wales: ", "")
.str.replace(" onwards", "")
.str.extract(r"(\d{4})", 1)
.cast(pl.UInt16, strict=False)
.alias("construction_age_band"),
)
wide = wide.with_columns(
pl.when(pl.col("pp_property_type") == pl.col("built_form"))
.then(pl.col("pp_property_type"))
.otherwise(
pl.concat_str(
[pl.col("pp_property_type"), pl.lit("/"), pl.col("built_form")]
)
)
.alias("property_type_built_form")
)
# Derived columns
wide = (
wide.with_columns(
(pl.col("latest_price") / pl.col("total_floor_area")).alias(
"Price per sqm"
),
wide.filter(pl.col("total_floor_area") > 0).with_columns(
(pl.col("latest_price") / pl.col("total_floor_area"))
.round(0)
.cast(pl.Int32)
.alias("Price per sqm"),
)
.drop(
"date_of_transfer",
"inspection_date",
"floor_height",
"lsoa21",
"LSOA code (2021)",
"LSOA name (2021)",
"Local Authority District code (2024)",
"Local Authority District name (2024)",
"imd_score",
"housing_barriers_score",
"idaci_score",
"idaopi_score",
"children_young_people_score",
"adult_skills_score",
"geographical_barriers_score",
"wider_barriers_score",
"Wider Barriers Sub-domain Score",
"Geographical Barriers Sub-domain Score",
"Adult Skills Sub-domain Score",
"Children and Young People Sub-domain Score",
"Income Deprivation Affecting Older People (IDAOPI) Score (rate)",
"Income Deprivation Affecting Children Index (IDACI) Score (rate)",
"Barriers to Housing and Services Score",
"lsoa21",
"pp_property_type",
"built_form",
)
.rename(
{
"construction_age_band": "Approximate construction age",
"income_score": "Income Score (rate)",
"employment_score": "Employment Score (rate)",
"education_score": "Education, Skills and Training Score",
"health_score": "Health Deprivation and Disability Score",
"crime_score": "Crime Score",
"pp_address": "Address per Property Register",
"epc_address": "Address per EPC",
"postcode": "Postcode",
"duration": "Leashold/Freehold",
"current_energy_rating": "Current energy rating",
"potential_energy_rating": "Potential energy rating",
"total_floor_area": "Total floor area (sqm)",
"epc_property_type": "Property type",
"property_type_built_form": "Property type/built form",
"restaurants_2km": "Restaurants within 2km",
"groceries_2km": "Groceries within 2km",
"parks_2km": "Parks within 2km",
"public_transport_2km": "Public transport within 2km",
"latest_price": "Last known price",
"number_habitable_rooms": "Rooms (including bedrooms & bathrooms)",
}
)
)
return wide
print("Collecting with streaming engine...")
return wide.collect(engine="streaming")
def main():
@ -115,7 +143,7 @@ def main():
"--arcgis", type=Path, required=True, help="ArcGIS postcode data parquet file"
)
parser.add_argument(
"--iod", type=Path, help="Index of Deprivation parquet file (optional)"
"--iod", type=Path, required=True, help="Index of Deprivation parquet file (optional)"
)
parser.add_argument(
"--poi-proximity",
@ -123,7 +151,13 @@ def main():
help="POI proximity counts parquet file (optional)",
)
parser.add_argument(
"--journey-times", type=Path, help="Journey times parquet file (optional)"
"--journey-times", required=True, type=Path, help="Journey times parquet file (optional)"
)
parser.add_argument(
"--ethnicity", type=Path, required=True, help="Ethnicity by local authority parquet file (optional)"
)
parser.add_argument(
"--crime", type=Path, required=True, help="Crime by LSOA parquet file (optional)"
)
parser.add_argument(
"--output", type=Path, required=True, help="Output parquet file path"
@ -136,6 +170,8 @@ def main():
iod_path=args.iod,
poi_proximity_path=args.poi_proximity,
journey_times_path=args.journey_times,
ethnicity_path=args.ethnicity,
crime_path=args.crime,
)
print(f"Columns: {wide.columns}")

View file

@ -576,7 +576,7 @@ def transform(input_path: Path) -> pl.LazyFrame:
lf = pl.scan_parquet(input_path)
# Get all unique categories present in the data
all_categories = lf.select("category").unique().collect().to_series().to_list()
all_categories = lf.select("category").unique().collect(engine="streaming").to_series().to_list()
# Verify every non-dropped category has a mapping
unmapped = []
@ -632,7 +632,7 @@ def main():
)
args = parser.parse_args()
df = transform(args.input).collect()
df = transform(args.input).collect(engine="streaming")
df.write_parquet(args.output)