From a8cc44ea9751fca4fe72c57d042f5ff180298a5e Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sat, 31 Jan 2026 14:39:46 +0000 Subject: [PATCH] Add more data & fix ooms --- pipeline/download/ethnicity.py | 58 ++++++++++++ pipeline/journey_times/__main__.py | 16 +++- pipeline/transform/crime.py | 63 ++++++++++++ pipeline/transform/join_epc_pp.py | 2 +- pipeline/transform/merge.py | 142 +++++++++++++++++----------- pipeline/transform/transform_poi.py | 4 +- pipeline/utils/fuzzy_join.py | 4 +- pipeline/utils/poi_counts.py | 35 +++---- 8 files changed, 242 insertions(+), 82 deletions(-) create mode 100644 pipeline/download/ethnicity.py create mode 100644 pipeline/transform/crime.py diff --git a/pipeline/download/ethnicity.py b/pipeline/download/ethnicity.py new file mode 100644 index 0000000..22a21ad --- /dev/null +++ b/pipeline/download/ethnicity.py @@ -0,0 +1,58 @@ +import argparse +from pathlib import Path + +import httpx +import polars as pl + +pl.Config.set_tbl_cols(-1) + + +URL = "https://www.ethnicity-facts-figures.service.gov.uk/uk-population-by-ethnicity/national-and-regional-populations/regional-ethnic-diversity/latest/downloads/population-by-ethnicity-and-local-authority-2021.csv" + + +def download_and_convert(output_path: Path) -> None: + print("Downloading ethnicity data...") + response = httpx.get(URL, follow_redirects=True, timeout=60) + response.raise_for_status() + + df = pl.read_csv(response.content) + print(f"Raw shape: {df.head(100)}") + + # Keep only broad ethnicity categories (5+1), exclude "All" totals + df = df.filter( + (pl.col("Ethnicity_type") == "ONS 2021 5+1") & (pl.col("Ethnicity") != "All") + ) + + # Pivot: one row per local authority, columns = ethnicity percentages + wide = df.pivot( + on="Ethnicity", + index="Geography_code", + values="Value1", + ) + + # Rename columns to be descriptive + rename_map = { + col: f"% {col}" for col in wide.columns if col != "Geography_code" + } + wide = wide.rename(rename_map) + + print(f"Output shape: {wide.shape}") + print(f"Columns: {wide.columns}") + + wide.write_parquet(output_path, compression="zstd") + print(f"Saved to {output_path}") + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Download and convert ethnicity by local authority data" + ) + parser.add_argument( + "--output", type=Path, required=True, help="Output parquet file path" + ) + args = parser.parse_args() + download_and_convert(args.output) + + +if __name__ == "__main__": + main() diff --git a/pipeline/journey_times/__main__.py b/pipeline/journey_times/__main__.py index 40e8452..5a5a944 100644 --- a/pipeline/journey_times/__main__.py +++ b/pipeline/journey_times/__main__.py @@ -5,7 +5,13 @@ from datetime import date, timedelta import polars as pl from tqdm import tqdm -from .config import DESTINATIONS, MAX_CONCURRENT, MAX_POSTCODES, OUTPUT_DIR, MAX_DISTANCE_KM +from .config import ( + DESTINATIONS, + MAX_CONCURRENT, + MAX_POSTCODES, + OUTPUT_DIR, + MAX_DISTANCE_KM, +) from .results import CheckpointSaver, results_to_dataframe, save_results from .tfl_client import fetch_journey_times from pipeline.utils import haversine_km_expr @@ -31,7 +37,9 @@ def main(): # Filter to postcodes within range of destination postcodes_df = postcodes_df.with_columns( - haversine_km_expr("lat", "long", destination.lat, destination.lon).alias("distance_km") + haversine_km_expr("lat", "long", destination.lat, destination.lon).alias( + "distance_km" + ) ).filter(pl.col("distance_km") <= MAX_DISTANCE_KM) print(f"Filtered to {postcodes_df.height:,} postcodes within {MAX_DISTANCE_KM}km") @@ -50,7 +58,9 @@ def main(): checkpoint_saver = CheckpointSaver( destination_name=destination.name, - on_save=lambda path, count: print(f"Checkpoint saved: {count:,} results to {path}"), + on_save=lambda path, count: print( + f"Checkpoint saved: {count:,} results to {path}" + ), ) def on_result(result): diff --git a/pipeline/transform/crime.py b/pipeline/transform/crime.py new file mode 100644 index 0000000..6eecb83 --- /dev/null +++ b/pipeline/transform/crime.py @@ -0,0 +1,63 @@ +import argparse +from pathlib import Path + +import polars as pl + + +def transform_crime(crime_dir: Path, output_path: Path) -> None: + csvs = sorted(crime_dir.rglob("*.csv")) + print(f"Found {len(csvs)} CSV files across {len(list(crime_dir.iterdir()))} months") + + df = pl.scan_csv( + csvs, + schema_overrides={"LSOA code": pl.Utf8, "Crime type": pl.Utf8, "Month": pl.Utf8}, + ).select("LSOA code", "Crime type", "Month") + + # Extract year, count crimes per LSOA / year / crime type + yearly_counts = ( + df.filter(pl.col("LSOA code").is_not_null() & (pl.col("LSOA code") != "")) + .with_columns(pl.col("Month").str.slice(0, 4).alias("year")) + .group_by("LSOA code", "year", "Crime type") + .agg(pl.len().alias("count")) + .group_by("LSOA code", "Crime type") + .agg(pl.col("count").mean().round(1).alias("yearly_avg")) + .collect(engine="streaming") + ) + + print(f"Crime types: {sorted(yearly_counts['Crime type'].unique().to_list())}") + + # Pivot crime types into columns + wide = yearly_counts.pivot( + on="Crime type", + index="LSOA code", + values="yearly_avg", + ) + + # Fill nulls with 0 and rename columns to be descriptive + value_cols = [col for col in wide.columns if col != "LSOA code"] + wide = wide.with_columns(pl.col(col).fill_null(0) for col in value_cols) + wide = wide.rename({col: f"{col} (avg/yr)" for col in value_cols}) + + print(f"Output shape: {wide.shape}") + print(f"Columns: {wide.columns}") + + wide.write_parquet(output_path, compression="zstd") + print(f"Saved to {output_path}") + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Transform crime CSVs into yearly average by LSOA and crime type" + ) + parser.add_argument( + "--input", type=Path, required=True, help="Directory containing crime data" + ) + parser.add_argument( + "--output", type=Path, required=True, help="Output parquet file path" + ) + args = parser.parse_args() + transform_crime(args.input, args.output) + + +if __name__ == "__main__": + main() diff --git a/pipeline/transform/join_epc_pp.py b/pipeline/transform/join_epc_pp.py index 741848a..aa42b4e 100644 --- a/pipeline/transform/join_epc_pp.py +++ b/pipeline/transform/join_epc_pp.py @@ -105,7 +105,7 @@ def main(): right_postcode_col="POSTCODE", ) .drop("POSTCODE") - .collect() + .collect(engine="streaming") ) matched = joined.filter( diff --git a/pipeline/transform/merge.py b/pipeline/transform/merge.py index 26b4076..b5081b6 100644 --- a/pipeline/transform/merge.py +++ b/pipeline/transform/merge.py @@ -9,99 +9,127 @@ def _build_wide( iod_path: Path | None, poi_proximity_path: Path | None, journey_times_path: Path | None, + ethnicity_path: Path | None, + crime_path: Path | None, ) -> pl.DataFrame: """Build the wide dataframe by joining epc_pp with all auxiliary data.""" - print("Loading epc_pp...") - wide = pl.read_parquet(epc_pp_path) - print(f" {wide.shape[0]:,} rows, {wide.estimated_size('mb'):.1f} MB") + print("Scanning epc_pp...") + wide = pl.scan_parquet(epc_pp_path) # GPS coordinates + LSOA from ArcGIS print("Joining GPS coordinates...") - arcgis = pl.read_parquet(arcgis_path).select( + arcgis = pl.scan_parquet(arcgis_path).select( pl.col("pcds").alias("postcode"), "lat", pl.col("long").alias("lon"), "lsoa21", ) wide = wide.join(arcgis, on="postcode", how="inner") - print( - f" {wide.shape[0]:,} rows after GPS join, {wide.estimated_size('mb'):.1f} MB" - ) # Journey times (optional) if journey_times_path and journey_times_path.exists(): print("Joining journey times...") - journey_times = pl.read_parquet(journey_times_path).select( + journey_times = pl.scan_parquet(journey_times_path).select( "postcode", "public_transport_easy_minutes", "public_transport_quick_minutes", "cycling_minutes", ) wide = wide.join(journey_times, on="postcode", how="left") - print(f" {wide.estimated_size('mb'):.1f} MB after journey times") - # Index of Deprivation - if iod_path and iod_path.exists(): - print("Joining IoD scores...") - iod = pl.read_parquet(iod_path) - wide = wide.join(iod, left_on="lsoa21", right_on="LSOA code (2021)", how="left") - print(f" {wide.estimated_size('mb'):.1f} MB after IoD") + print("Joining IoD scores...") + iod = pl.scan_parquet(iod_path) + wide = wide.join(iod, left_on="lsoa21", right_on="LSOA code (2021)", how="left") - # POI proximity counts (pre-computed per postcode) - if poi_proximity_path and poi_proximity_path.exists(): - print("Joining POI proximity counts...") - poi_counts = pl.read_parquet(poi_proximity_path) - wide = wide.join(poi_counts, on="postcode", how="left") - print(f" {wide.estimated_size('mb'):.1f} MB after POI counts") + # Ethnicity by local authority + print("Joining ethnicity data...") + ethnicity = pl.scan_parquet(ethnicity_path) + wide = wide.join( + ethnicity, + left_on="Local Authority District code (2024)", + right_on="Geography_code", + how="left", + ) + + # Crime stats by LSOA + print("Joining crime data...") + crime = pl.scan_parquet(crime_path) + wide = wide.join(crime, left_on="lsoa21", right_on="LSOA code", how="left") + + print("Joining POI proximity counts...") + poi_counts = pl.scan_parquet(poi_proximity_path) + wide = wide.join(poi_counts, on="postcode", how="left") # Convert construction_age_band to numeric year - if "construction_age_band" in wide.columns: - wide = wide.with_columns( - pl.col("construction_age_band") - .str.replace("England and Wales: ", "") - .str.replace(" onwards", "") - .str.extract(r"(\d{4})", 1) - .cast(pl.UInt16, strict=False) - .alias("construction_age_band"), - ) + wide = wide.with_columns( + pl.col("construction_age_band") + .str.replace("England and Wales: ", "") + .str.replace(" onwards", "") + .str.extract(r"(\d{4})", 1) + .cast(pl.UInt16, strict=False) + .alias("construction_age_band"), + ) + + wide = wide.with_columns( + pl.when(pl.col("pp_property_type") == pl.col("built_form")) + .then(pl.col("pp_property_type")) + .otherwise( + pl.concat_str( + [pl.col("pp_property_type"), pl.lit("/"), pl.col("built_form")] + ) + ) + .alias("property_type_built_form") + ) - # Derived columns wide = ( - wide.with_columns( - (pl.col("latest_price") / pl.col("total_floor_area")).alias( - "Price per sqm" - ), + wide.filter(pl.col("total_floor_area") > 0).with_columns( + (pl.col("latest_price") / pl.col("total_floor_area")) + .round(0) + .cast(pl.Int32) + .alias("Price per sqm"), ) .drop( "date_of_transfer", "inspection_date", "floor_height", - "lsoa21", - "LSOA code (2021)", + "LSOA name (2021)", "Local Authority District code (2024)", "Local Authority District name (2024)", - "imd_score", - "housing_barriers_score", - "idaci_score", - "idaopi_score", - "children_young_people_score", - "adult_skills_score", - "geographical_barriers_score", - "wider_barriers_score", + "Wider Barriers Sub-domain Score", + "Geographical Barriers Sub-domain Score", + "Adult Skills Sub-domain Score", + "Children and Young People Sub-domain Score", + "Income Deprivation Affecting Older People (IDAOPI) Score (rate)", + "Income Deprivation Affecting Children Index (IDACI) Score (rate)", + "Barriers to Housing and Services Score", + "lsoa21", + "pp_property_type", + "built_form", ) .rename( { "construction_age_band": "Approximate construction age", - "income_score": "Income Score (rate)", - "employment_score": "Employment Score (rate)", - "education_score": "Education, Skills and Training Score", - "health_score": "Health Deprivation and Disability Score", - "crime_score": "Crime Score", + "pp_address": "Address per Property Register", + "epc_address": "Address per EPC", + "postcode": "Postcode", + "duration": "Leashold/Freehold", + "current_energy_rating": "Current energy rating", + "potential_energy_rating": "Potential energy rating", + "total_floor_area": "Total floor area (sqm)", + "epc_property_type": "Property type", + "property_type_built_form": "Property type/built form", + "restaurants_2km": "Restaurants within 2km", + "groceries_2km": "Groceries within 2km", + "parks_2km": "Parks within 2km", + "public_transport_2km": "Public transport within 2km", + "latest_price": "Last known price", + "number_habitable_rooms": "Rooms (including bedrooms & bathrooms)", } ) ) - return wide + print("Collecting with streaming engine...") + return wide.collect(engine="streaming") def main(): @@ -115,7 +143,7 @@ def main(): "--arcgis", type=Path, required=True, help="ArcGIS postcode data parquet file" ) parser.add_argument( - "--iod", type=Path, help="Index of Deprivation parquet file (optional)" + "--iod", type=Path, required=True, help="Index of Deprivation parquet file (optional)" ) parser.add_argument( "--poi-proximity", @@ -123,7 +151,13 @@ def main(): help="POI proximity counts parquet file (optional)", ) parser.add_argument( - "--journey-times", type=Path, help="Journey times parquet file (optional)" + "--journey-times", required=True, type=Path, help="Journey times parquet file (optional)" + ) + parser.add_argument( + "--ethnicity", type=Path, required=True, help="Ethnicity by local authority parquet file (optional)" + ) + parser.add_argument( + "--crime", type=Path, required=True, help="Crime by LSOA parquet file (optional)" ) parser.add_argument( "--output", type=Path, required=True, help="Output parquet file path" @@ -136,6 +170,8 @@ def main(): iod_path=args.iod, poi_proximity_path=args.poi_proximity, journey_times_path=args.journey_times, + ethnicity_path=args.ethnicity, + crime_path=args.crime, ) print(f"Columns: {wide.columns}") diff --git a/pipeline/transform/transform_poi.py b/pipeline/transform/transform_poi.py index 5a4608b..de92d24 100644 --- a/pipeline/transform/transform_poi.py +++ b/pipeline/transform/transform_poi.py @@ -576,7 +576,7 @@ def transform(input_path: Path) -> pl.LazyFrame: lf = pl.scan_parquet(input_path) # Get all unique categories present in the data - all_categories = lf.select("category").unique().collect().to_series().to_list() + all_categories = lf.select("category").unique().collect(engine="streaming").to_series().to_list() # Verify every non-dropped category has a mapping unmapped = [] @@ -632,7 +632,7 @@ def main(): ) args = parser.parse_args() - df = transform(args.input).collect() + df = transform(args.input).collect(engine="streaming") df.write_parquet(args.output) diff --git a/pipeline/utils/fuzzy_join.py b/pipeline/utils/fuzzy_join.py index 4ddd602..985d505 100644 --- a/pipeline/utils/fuzzy_join.py +++ b/pipeline/utils/fuzzy_join.py @@ -60,7 +60,7 @@ def fuzzy_join_on_postcode( .str.to_uppercase() .alias("_left_postcode"), ) - .collect() + .collect(engine="streaming") ) right_match = ( @@ -74,7 +74,7 @@ def fuzzy_join_on_postcode( .alias("_right_postcode"), ) .unique(subset=["_right_address", "_right_postcode"], keep="first") - .collect() + .collect(engine="streaming") ) # Group right side by postcode for fast lookup diff --git a/pipeline/utils/poi_counts.py b/pipeline/utils/poi_counts.py index b97df5e..8c39288 100644 --- a/pipeline/utils/poi_counts.py +++ b/pipeline/utils/poi_counts.py @@ -157,31 +157,24 @@ def count_pois_within_radius( # Count POIs per postcode postcode_counts = _count_pois_per_postcode(unique_postcodes, pois, radius_km) - # Write to temp file to avoid memory duplication during join print(" Writing postcode counts to temp file...") - with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as tmp: + with tempfile.NamedTemporaryFile(suffix=".parquet") as tmp: tmp_path = tmp.name postcode_counts.write_parquet(tmp_path) - del postcode_counts # Free memory + # Join using lazy evaluation + print(" Joining counts back to properties (lazy)...") + count_cols = [f"{group}_{int(radius_km)}km" for group in POI_GROUPS] - # Join using lazy evaluation - print(" Joining counts back to properties (lazy)...") - count_cols = [f"{group}_{int(radius_km)}km" for group in POI_GROUPS] + # Convert properties to lazy frame, join, then collect + result_lazy = ( + properties.lazy() + .select("postcode") + .join(pl.scan_parquet(tmp_path), on="postcode", how="left") + .select(count_cols) + .fill_null(0) + ) - # Convert properties to lazy frame, join, then collect - result_lazy = ( - properties.lazy() - .select("postcode") - .join(pl.scan_parquet(tmp_path), on="postcode", how="left") - .select(count_cols) - .fill_null(0) - ) + result_df = result_lazy.collect(engine="streaming") - result_df = result_lazy.collect() - - # Clean up temp file - os.unlink(tmp_path) - - # Extract as dict of Series - return {col: result_df[col] for col in count_cols} + return {col: result_df[col] for col in count_cols}