"""Build a wide property dataframe and H3 aggregates from epc_pp output.""" import polars as pl import h3 from pipeline.config import AGGREGATES_DIR, H3_RESOLUTIONS, DATA_DIR, PROCESSED_DIR def _build_wide() -> pl.DataFrame: """Build the wide dataframe by joining epc_pp with all auxiliary data.""" print("Loading epc_pp...") wide = pl.read_parquet(PROCESSED_DIR / "epc_pp.parquet") print(f" {wide.shape[0]:,} rows") # GPS coordinates + LSOA from ArcGIS print("Joining GPS coordinates...") arcgis = pl.read_parquet(DATA_DIR / "arcgis_data.parquet").select( pl.col("pcds").alias("postcode"), "lat", pl.col("long").alias("lon"), "lsoa21", ) wide = wide.join(arcgis, on="postcode", how="inner") print(f" {wide.shape[0]:,} rows after GPS join") # Journey times (optional) journey_path = PROCESSED_DIR / "journey_times_bank_checkpoint.parquet" if journey_path.exists(): print("Joining journey times...") journey_times = pl.read_parquet(journey_path).select( "postcode", "public_transport_easy_minutes", "public_transport_quick_minutes", "cycling_minutes", ) wide = wide.join(journey_times, on="postcode", how="left") # Index of Deprivation iod_path = DATA_DIR / "IoD2025_Scores.parquet" if iod_path.exists(): print("Joining IoD scores...") iod = pl.read_parquet(iod_path).drop( "LSOA name (2021)", "Local Authority District code (2024)", "Local Authority District name (2024)", ) # Rename IoD columns to clean snake_case iod = iod.rename(_IOD_RENAMES) wide = wide.join( iod, left_on="lsoa21", right_on="lsoa_code", how="left" ) return wide _IOD_RENAMES = { "LSOA code (2021)": "lsoa_code", "Index of Multiple Deprivation (IMD) Score": "imd_score", "Income Score (rate)": "income_score", "Employment Score (rate)": "employment_score", "Education, Skills and Training Score": "education_score", "Health Deprivation and Disability Score": "health_score", "Crime Score": "crime_score", "Barriers to Housing and Services Score": "housing_barriers_score", "Living Environment Score": "living_environment_score", "Income Deprivation Affecting Children Index (IDACI) Score (rate)": "idaci_score", "Income Deprivation Affecting Older People (IDAOPI) Score (rate)": "idaopi_score", "Children and Young People Sub-domain Score": "children_young_people_score", "Adult Skills Sub-domain Score": "adult_skills_score", "Geographical Barriers Sub-domain Score": "geographical_barriers_score", "Wider Barriers Sub-domain Score": "wider_barriers_score", "Indoors Sub-domain Score": "indoors_score", "Outdoors Sub-domain Score": "outdoors_score", } def _add_h3_indices(df: pl.DataFrame) -> pl.DataFrame: """Compute H3 indices from lat/lon for all configured resolutions.""" print("Computing H3 indices...") # Compute per unique postcode for efficiency, then join back postcodes = df.select("postcode", "lat", "lon").unique(subset=["postcode"]) for res in H3_RESOLUTIONS: col_name = f"h3_res{res}" postcodes = postcodes.with_columns( pl.struct(["lat", "lon"]) .map_elements( lambda x, r=res: h3.latlng_to_cell(x["lat"], x["lon"], r), return_dtype=pl.Utf8, ) .alias(col_name) ) print(f" res{res}: {postcodes[col_name].n_unique():,} unique cells") h3_cols = [f"h3_res{res}" for res in H3_RESOLUTIONS] return df.join( postcodes.select("postcode", *h3_cols), on="postcode", how="left" ) def _aggregate_to_h3(df: pl.DataFrame) -> None: """Aggregate min/max of every numeric feature per H3 cell at each resolution.""" AGGREGATES_DIR.mkdir(parents=True, exist_ok=True) exclude = {"lat", "lon"} numeric_cols = [ col for col, dtype in zip(df.columns, df.dtypes) if dtype.is_numeric() and not col.startswith("h3_res") and col not in exclude ] agg_exprs = [pl.len().alias("count")] for col in numeric_cols: agg_exprs.append(pl.col(col).min().alias(f"min_{col}")) agg_exprs.append(pl.col(col).max().alias(f"max_{col}")) print("Aggregating to H3 cells...") for res in H3_RESOLUTIONS: h3_col = f"h3_res{res}" result = df.group_by(h3_col).agg(agg_exprs).rename({h3_col: "h3"}) path = AGGREGATES_DIR / f"res{res}.parquet" result.write_parquet(path) size_mb = path.stat().st_size / (1024 * 1024) print(f" {path.name}: {result.shape[0]:,} cells ({size_mb:.1f} MB)") def run(): """Run the full wide pipeline: build wide df, compute H3, aggregate.""" wide = _build_wide() wide_path = PROCESSED_DIR / "wide.parquet" wide.write_parquet(wide_path) size_mb = wide_path.stat().st_size / (1024 * 1024) print(f"Wrote {wide_path} ({size_mb:.1f} MB)") wide = _add_h3_indices(wide) _aggregate_to_h3(wide) print("Done.") if __name__ == "__main__": run()