import argparse import re import polars as pl from pathlib import Path from pipeline.utils.postcode_mapping import build_postcode_mapping MIN_FLOOR_AREA_M2 = 10 _IOD_PERCENTILE_COLUMNS = [ "Education, Skills and Training Score", "Income Score (rate)", "Employment Score (rate)", "Health Deprivation and Disability Score", "Indoors Sub-domain Score", "Outdoors Sub-domain Score", ] _AREA_COLUMNS = [ "Postcode", "lat", "lon", # Runtime provenance for deciding whether missing coordinates are skippable. "ctry25cd", # Deprivation "Income Score", "Employment Score", "Education, Skills and Training Score", "Health Deprivation and Disability Score", "Housing Conditions Score", "Air Quality and Road Safety Score", # Ethnicity "% South Asian", "% East Asian", "% Black", "% Mixed", "% White", "% Other", # Crime "Anti-social behaviour (avg/yr)", "Violence and sexual offences (avg/yr)", "Criminal damage and arson (avg/yr)", "Burglary (avg/yr)", "Vehicle crime (avg/yr)", "Robbery (avg/yr)", "Other theft (avg/yr)", "Shoplifting (avg/yr)", "Drugs (avg/yr)", "Possession of weapons (avg/yr)", "Public order (avg/yr)", "Bicycle theft (avg/yr)", "Theft from the person (avg/yr)", "Other crime (avg/yr)", "Serious crime (avg/yr)", "Minor crime (avg/yr)", "Serious crime per 1k residents (avg/yr)", "Minor crime per 1k residents (avg/yr)", # Amenities "Number of restaurants within 2km", "Number of grocery shops and supermarkets within 2km", # Environment "Noise (dB)", "Max available download speed (Mbps)", # Schools "Good+ primary schools within 5km", "Good+ secondary schools within 5km", "Good+ primary schools within 2km", "Good+ secondary schools within 2km", "Outstanding primary schools within 5km", "Outstanding secondary schools within 5km", "Outstanding primary schools within 2km", "Outstanding secondary schools within 2km", # Demographics "Median age", # Politics "Voter turnout (%)", "% Labour", "% Conservative", "% Liberal Democrat", "% Reform UK", "% Green", "% Other parties", ] _DYNAMIC_POI_DISTANCE_RE = re.compile(r"^Distance to nearest amenity \(.+\) \(km\)$") _DYNAMIC_POI_COUNT_RE = re.compile(r"^Number of amenities \(.+\) within (2|5)km$") TREE_DENSITY_FEATURE = "Street tree density percentile" _POSTCODE_TREE_DENSITY_PERCENTILE_RE = re.compile( r"^Tree canopy density percentile within \d+m$" ) _RENT_SOURCE_UNAVAILABLE_LADS = { # ONS PIPR does not publish LAD-level private-rent estimates for these # small authorities. Keep rent null there, but fail on any other LAD miss. "E06000053": "Isles of Scilly", "E09000001": "City of London", } def _is_dynamic_poi_metric_column(column: str) -> bool: return bool( _DYNAMIC_POI_DISTANCE_RE.match(column) or _DYNAMIC_POI_COUNT_RE.match(column) ) def _less_deprived_percentile_expr(column: str) -> pl.Expr: """Convert an IoD deprivation score to a 0-100 less-deprived percentile.""" non_null_count = pl.col(column).count() descending_rank = pl.col(column).rank("average", descending=True) return ( pl.when(pl.col(column).is_null()) .then(None) .when(pl.col(column) == pl.col(column).min()) .then(100.0) .when(pl.col(column) == pl.col(column).max()) .then(0.0) .when(non_null_count > 1) .then(((descending_rank - 1) / (non_null_count - 1) * 100).round(1)) .otherwise(100.0) .alias(column) ) def _tree_density_by_postcode(tree_density_postcodes_path: Path) -> pl.LazyFrame: tree_density = pl.scan_parquet(tree_density_postcodes_path) columns = set(tree_density.collect_schema().names()) if "postcode" not in columns: raise ValueError( f"{tree_density_postcodes_path} is missing required column: postcode" ) if TREE_DENSITY_FEATURE in columns: density_column = TREE_DENSITY_FEATURE else: candidates = sorted( c for c in columns if _POSTCODE_TREE_DENSITY_PERCENTILE_RE.match(c) ) if len(candidates) != 1: raise ValueError( f'{tree_density_postcodes_path} must contain column "{TREE_DENSITY_FEATURE}" ' 'or exactly one "Tree canopy density percentile within {radius}m" column; ' f"found {len(candidates)} postcode percentile columns" ) density_column = candidates[0] return ( tree_density.select( pl.col("postcode"), pl.col(density_column).cast(pl.Float32).alias(TREE_DENSITY_FEATURE), ) .drop_nulls(["postcode"]) .unique(["postcode"]) ) def _validate_lad_source_coverage( iod_path: Path, ethnicity_path: Path, rental_prices_path: Path ) -> None: iod_lads = ( pl.read_parquet( iod_path, columns=[ "Local Authority District code (2024)", "Local Authority District name (2024)", ], ) .rename( { "Local Authority District code (2024)": "lad", "Local Authority District name (2024)": "lad_name", } ) .unique(["lad"]) ) ethnicity_lads = pl.read_parquet(ethnicity_path, columns=["Geography_code"]).rename( {"Geography_code": "lad"} ) missing_ethnicity = iod_lads.join(ethnicity_lads, on="lad", how="anti").sort("lad") if missing_ethnicity.height > 0: raise ValueError( "Ethnicity data is missing 2024 LAD coverage: " f"{missing_ethnicity.to_dicts()}" ) rental_lads = pl.read_parquet(rental_prices_path, columns=["area_code"]).rename( {"area_code": "lad"} ) missing_rent = iod_lads.join(rental_lads, on="lad", how="anti").sort("lad") unexpected_missing_rent = missing_rent.filter( ~pl.col("lad").is_in(list(_RENT_SOURCE_UNAVAILABLE_LADS)) ) if unexpected_missing_rent.height > 0: raise ValueError( "Rental data is missing 2024 LAD coverage: " f"{unexpected_missing_rent.to_dicts()}" ) if missing_rent.height > 0: print( "PIPR has no LAD-level rent estimates for source-unavailable LADs; " f"rent will remain null there: {missing_rent.to_dicts()}" ) def _validate_property_postcodes(df: pl.DataFrame) -> None: invalid = df.filter( pl.col("Postcode").is_null() | (pl.col("Postcode").cast(pl.Utf8).str.strip_chars() == "") ) if invalid.height == 0: return sample_cols = [ col for col in ("Postcode", "Address per Property Register", "Last known price") if col in invalid.columns ] sample = invalid.select(sample_cols).head(10).to_dicts() raise ValueError( "Property rows missing a postcode after merge: " f"{invalid.height} rows. Sample: {sample}" ) def _build( epc_pp_path: Path, arcgis_path: Path, iod_path: Path, poi_proximity_path: Path, ethnicity_path: Path, crime_path: Path, noise_path: Path, school_proximity_path: Path, broadband_path: Path, rental_prices_path: Path, lsoa_population_path: Path, median_age_path: Path, election_results_path: Path, tree_density_postcodes_path: Path | None = None, ) -> tuple[pl.DataFrame, pl.DataFrame]: """Build postcode and properties dataframes from epc_pp + auxiliary data. Returns (postcode_df, properties_df). """ _validate_lad_source_coverage(iod_path, ethnicity_path, rental_prices_path) wide = pl.scan_parquet(epc_pp_path).filter( pl.col("total_floor_area").is_null() | (pl.col("total_floor_area") > MIN_FLOOR_AREA_M2) ) # Remap terminated postcodes to nearest active successor postcode_mapping = build_postcode_mapping(arcgis_path) wide = ( wide.join( postcode_mapping.lazy(), left_on="postcode", right_on="old_postcode", how="left", ) .with_columns( pl.coalesce("new_postcode", "postcode").alias("postcode"), ) .drop("new_postcode") ) arcgis_raw = pl.scan_parquet(arcgis_path) postcode_country = arcgis_raw.select( pl.col("pcds").alias("postcode"), pl.col("ctry25cd"), ).unique(["postcode"]) wide = wide.join(postcode_country, on="postcode", how="left") arcgis = ( arcgis_raw.filter(pl.col("ctry25cd") == "E92000001") # England only .filter(pl.col("doterm").is_null()) # Active postcodes only # NSPL Feb 2026 renamed geographic code columns to {field}{year}cd. # Alias them back to the short canonical names used across the # pipeline so downstream joins don't need to know about NSPL's # versioning scheme. .select( pl.col("pcds").alias("postcode"), "lat", pl.col("long").alias("lon"), pl.col("lsoa21cd").alias("lsoa21"), pl.col("oa21cd").alias("oa21"), pl.col("pcon24cd").alias("pcon"), ) ) wide = wide.join(arcgis, on="postcode", how="left") iod = pl.scan_parquet(iod_path).with_columns( *(_less_deprived_percentile_expr(c) for c in _IOD_PERCENTILE_COLUMNS) ) wide = wide.join(iod, left_on="lsoa21", right_on="LSOA code (2021)", how="left") ethnicity = pl.scan_parquet(ethnicity_path) wide = wide.join( ethnicity, left_on="Local Authority District code (2024)", right_on="Geography_code", how="left", ) # Derive bedroom count: habitable rooms - 1 (assuming 1 reception room), clipped to 0..4 wide = wide.with_columns( (pl.col("number_habitable_rooms") - 1) .clip(0, 4) .cast(pl.UInt8) .alias("_bedrooms"), ) rental = pl.scan_parquet(rental_prices_path).select( "area_code", "bedrooms", "mean_monthly_rent" ) wide = wide.join( rental, left_on=["Local Authority District code (2024)", "_bedrooms"], right_on=["area_code", "bedrooms"], how="left", ) crime = pl.scan_parquet(crime_path) wide = wide.join(crime, left_on="lsoa21", right_on="LSOA code", how="left") wide = wide.with_columns( pl.sum_horizontal( "Violence and sexual offences (avg/yr)", "Robbery (avg/yr)", "Burglary (avg/yr)", "Possession of weapons (avg/yr)", ).alias("serious_crime_avg_yr"), pl.sum_horizontal( "Anti-social behaviour (avg/yr)", "Criminal damage and arson (avg/yr)", "Shoplifting (avg/yr)", "Bicycle theft (avg/yr)", "Theft from the person (avg/yr)", "Other theft (avg/yr)", "Vehicle crime (avg/yr)", "Public order (avg/yr)", "Drugs (avg/yr)", "Other crime (avg/yr)", ).alias("minor_crime_avg_yr"), ) lsoa_pop = pl.scan_parquet(lsoa_population_path) wide = wide.join(lsoa_pop, on="lsoa21", how="left") wide = wide.with_columns( pl.when(pl.col("population") > 0) .then((pl.col("serious_crime_avg_yr") / pl.col("population") * 1000).round(1)) .alias("serious_crime_per_1k"), pl.when(pl.col("population") > 0) .then((pl.col("minor_crime_avg_yr") / pl.col("population") * 1000).round(1)) .alias("minor_crime_per_1k"), ).drop("population") median_age = pl.scan_parquet(median_age_path) wide = wide.join(median_age, on="lsoa21", how="left") election = pl.scan_parquet(election_results_path) wide = wide.join(election, on="pcon", how="left") poi_counts = pl.scan_parquet(poi_proximity_path) wide = wide.join(poi_counts, on="postcode", how="left") noise_cols = ["road_noise_lden_db", "rail_noise_lden_db", "airport_noise_lden_db"] noise = ( pl.scan_parquet(noise_path) .with_columns( # NaN → null so max_horizontal ignores missing instead of propagating NaN *[pl.col(c).fill_nan(None) for c in noise_cols], ) .with_columns( pl.max_horizontal(*noise_cols).alias("noise_lden_db"), ) .select("postcode", "noise_lden_db") ) wide = wide.join(noise, on="postcode", how="left") school_proximity = pl.scan_parquet(school_proximity_path) wide = wide.join(school_proximity, on="postcode", how="left") if tree_density_postcodes_path is not None: tree_density = _tree_density_by_postcode(tree_density_postcodes_path) wide = wide.join(tree_density, on="postcode", how="left") # Broadband: derive max available download speed tier per postcode from # Ofcom availability percentages. Tiers: Gigabit ≥1000, UFBB ≥300, # UFBB(100) ≥100, SFBB ≥30 Mbps. Stored as string enum. broadband = ( pl.scan_parquet(broadband_path) .select( pl.col("postcode_space").alias("bb_postcode"), pl.when(pl.col("Gigabit availability (% premises)") > 0) .then(1000) .when(pl.col("UFBB availability (% premises)") > 0) .then(300) .when(pl.col("UFBB (100Mbit/s) availability (% premises)") > 0) .then(100) .when(pl.col("SFBB availability (% premises)") > 0) .then(30) .otherwise(10) .cast(pl.UInt16) .alias("max_download_speed"), ) .group_by("bb_postcode") .agg(pl.col("max_download_speed").max()) .with_columns(pl.col("max_download_speed").cast(pl.Utf8)) ) wide = wide.join(broadband, left_on="postcode", right_on="bb_postcode", how="left") # Derive property_type: prefer EPC data, fall back to price-paid. # For Houses, use built_form (e.g. Semi-Detached, Mid-Terrace) for finer detail. bad_built_form = pl.col("built_form").is_null() | pl.col("built_form").is_in( ["NO DATA!", "Not Recorded"] ) has_epc = pl.col("epc_property_type").is_not_null() is_house = pl.col("epc_property_type") == "House" wide = wide.with_columns( pl.when(has_epc & is_house & ~bad_built_form) .then(pl.col("built_form")) .when(has_epc & is_house) .then(pl.col("pp_property_type")) .when(has_epc) .then(pl.col("epc_property_type")) .otherwise(pl.col("pp_property_type")) # Unify EPC's "Flat"/"Maisonette" with price-paid's "Flats/Maisonettes", # collapse terrace sub-types, and fold rare types into "Other" .replace( { "Flat": "Flats/Maisonettes", "Maisonette": "Flats/Maisonettes", "End-Terrace": "Terraced", "Mid-Terrace": "Terraced", "Enclosed End-Terrace": "Terraced", "Enclosed Mid-Terrace": "Terraced", "Bungalow": "Other", "Park home": "Other", } ) .alias("property_type") ) wide = ( wide.with_columns( pl.when(pl.col("duration") == "U") .then(None) .otherwise(pl.col("duration")) .alias("duration"), pl.when(pl.col("current_energy_rating") == "INVALID!") .then(None) .otherwise(pl.col("current_energy_rating")) .alias("current_energy_rating"), ) .with_columns( (pl.col("latest_price") / pl.col("total_floor_area")) .round(0) .cast(pl.Int32) .alias("Price per sqm"), ) .drop( "inspection_date", "_bedrooms", "LSOA name (2021)", "Local Authority District code (2024)", "Local Authority District name (2024)", "Wider Barriers Sub-domain Score", "Geographical Barriers Sub-domain Score", "Adult Skills Sub-domain Score", "Children and Young People Sub-domain Score", "Crime Score", "Living Environment Score", "Index of Multiple Deprivation (IMD) Score", "Income Deprivation Affecting Older People (IDAOPI) Score (rate)", "Income Deprivation Affecting Children Index (IDACI) Score (rate)", "Barriers to Housing and Services Score", "lsoa21", "oa21", "pcon", "epc_property_type", "pp_property_type", "built_form", ) .rename( { "date_of_transfer": "Date of last transaction", "construction_age_band": "Construction year", "is_construction_date_approximate": "Is construction date approximate", "Income Score (rate)": "Income Score", "Employment Score (rate)": "Employment Score", "Indoors Sub-domain Score": "Housing Conditions Score", "Outdoors Sub-domain Score": "Air Quality and Road Safety Score", "pp_address": "Address per Property Register", "epc_address": "Address per EPC", "postcode": "Postcode", "duration": "Leasehold/Freehold", "current_energy_rating": "Current energy rating", "potential_energy_rating": "Potential energy rating", "total_floor_area": "Total floor area (sqm)", "property_type": "Property type", "restaurants_2km": "Number of restaurants within 2km", "groceries_2km": "Number of grocery shops and supermarkets within 2km", "latest_price": "Last known price", "number_habitable_rooms": "Number of bedrooms & living rooms", "noise_lden_db": "Noise (dB)", "good_primary_5km": "Good+ primary schools within 5km", "good_secondary_5km": "Good+ secondary schools within 5km", "good_primary_2km": "Good+ primary schools within 2km", "good_secondary_2km": "Good+ secondary schools within 2km", "outstanding_primary_5km": "Outstanding primary schools within 5km", "outstanding_secondary_5km": "Outstanding secondary schools within 5km", "outstanding_primary_2km": "Outstanding primary schools within 2km", "outstanding_secondary_2km": "Outstanding secondary schools within 2km", "max_download_speed": "Max available download speed (Mbps)", "serious_crime_avg_yr": "Serious crime (avg/yr)", "minor_crime_avg_yr": "Minor crime (avg/yr)", "serious_crime_per_1k": "Serious crime per 1k residents (avg/yr)", "minor_crime_per_1k": "Minor crime per 1k residents (avg/yr)", "mean_monthly_rent": "Estimated monthly rent", "floor_height": "Interior height (m)", "was_council_house": "Former council house", "median_age": "Median age", "turnout_pct": "Voter turnout (%)", } ) ) print("Collecting with streaming engine...") df = wide.collect(engine="streaming") _validate_property_postcodes(df) # Split into postcode-level and property-level dataframes area_cols = [ c for c in df.columns if c in _AREA_COLUMNS or _is_dynamic_poi_metric_column(c) ] postcode_df = df.select(area_cols).group_by("Postcode").first() print(f"Postcode rows: {postcode_df.height} (unique postcodes)") property_cols = [ c for c in df.columns if (c not in _AREA_COLUMNS and not _is_dynamic_poi_metric_column(c)) or c == "Postcode" ] properties_df = df.select(property_cols) print(f"Property rows: {properties_df.height}") return postcode_df, properties_df def main(): parser = argparse.ArgumentParser( description="Build wide property dataframe with all joins" ) parser.add_argument( "--epc-pp", type=Path, required=True, help="EPC-Price Paid joined parquet file" ) parser.add_argument( "--arcgis", type=Path, required=True, help="ArcGIS postcode data parquet file" ) parser.add_argument( "--iod", type=Path, required=True, help="Index of Deprivation parquet file (optional)", ) parser.add_argument( "--poi-proximity", type=Path, help="POI proximity counts parquet file (optional)", ) parser.add_argument( "--ethnicity", type=Path, required=True, help="Ethnicity by local authority parquet file (optional)", ) parser.add_argument( "--crime", type=Path, required=True, help="Crime by LSOA parquet file (optional)", ) parser.add_argument( "--noise", type=Path, required=True, help="Road noise by postcode parquet file" ) parser.add_argument( "--school-proximity", type=Path, required=True, help="School proximity counts parquet file", ) parser.add_argument( "--broadband", type=Path, required=True, help="Broadband performance by output area parquet file", ) parser.add_argument( "--rental-prices", type=Path, required=True, help="ONS rental prices by LA and bedroom count parquet file", ) parser.add_argument( "--lsoa-population", type=Path, required=True, help="Census 2021 population by LSOA parquet file", ) parser.add_argument( "--median-age", type=Path, required=True, help="Census 2021 median age by LSOA parquet file", ) parser.add_argument( "--election-results", type=Path, required=True, help="2024 General Election results by constituency parquet file", ) parser.add_argument( "--tree-density-postcodes", type=Path, required=False, help="Postcode-level tree density parquet from pipeline.transform.tree_density", ) parser.add_argument( "--output-postcodes", type=Path, required=True, help="Output postcode parquet file path", ) parser.add_argument( "--output-properties", type=Path, required=True, help="Output properties parquet file path", ) args = parser.parse_args() postcode_df, properties_df = _build( epc_pp_path=args.epc_pp, arcgis_path=args.arcgis, iod_path=args.iod, poi_proximity_path=args.poi_proximity, ethnicity_path=args.ethnicity, crime_path=args.crime, noise_path=args.noise, school_proximity_path=args.school_proximity, broadband_path=args.broadband, rental_prices_path=args.rental_prices, lsoa_population_path=args.lsoa_population, median_age_path=args.median_age, election_results_path=args.election_results, tree_density_postcodes_path=args.tree_density_postcodes, ) print(f"\nPostcode columns: {postcode_df.columns}") print(f"Postcode rows: {postcode_df.height}") postcode_df.write_parquet(args.output_postcodes) size_mb = args.output_postcodes.stat().st_size / (1024 * 1024) print(f"Wrote {args.output_postcodes} ({size_mb:.1f} MB)") print(f"\nProperty columns: {properties_df.columns}") print(f"Property rows: {properties_df.height}") properties_df.write_parquet(args.output_properties) size_mb = args.output_properties.stat().st_size / (1024 * 1024) print(f"Wrote {args.output_properties} ({size_mb:.1f} MB)") if __name__ == "__main__": main()