perfect-postcode/pipeline/transform/merge.py
2026-05-26 19:45:13 +01:00

1156 lines
40 KiB
Python

import argparse
import re
import numpy as np
import polars as pl
from pathlib import Path
import pyogrio
from pyproj import Transformer
from scipy.spatial import cKDTree
from shapely import from_wkb, points
from shapely.geometry.base import BaseGeometry
from shapely.strtree import STRtree
from thefuzz import fuzz
from pipeline.utils.fuzzy_join import normalize_address_key
from pipeline.utils.postcode_mapping import build_postcode_mapping
MIN_FLOOR_AREA_M2 = 10
CONSERVATION_AREA_FEATURE = "Within conservation area"
LISTED_BUILDING_FEATURE = "Listed building"
LISTED_BUILDING_MATCH_RADIUS_M = 250.0
LISTED_BUILDING_NEAREST_POSTCODES = 3
LISTED_BUILDING_MIN_MATCH_SCORE = 95
_UNPUBLISHED_CONSERVATION_AREA_PREFIX = "no data available for publication"
_IOD_PERCENTILE_COLUMNS = [
"Education, Skills and Training Score",
"Income Score (rate)",
"Employment Score (rate)",
"Health Deprivation and Disability Score",
"Indoors Sub-domain Score",
"Outdoors Sub-domain Score",
]
_AREA_COLUMNS = [
"Postcode",
"lat",
"lon",
# Runtime provenance for deciding whether missing coordinates are skippable.
"ctry25cd",
# Keyed lookup for postcode-level side tables (e.g. crime time series).
"lsoa21",
# Deprivation
"Income Score",
"Employment Score",
"Education, Skills and Training Score",
"Health Deprivation and Disability Score",
"Housing Conditions Score",
"Air Quality and Road Safety Score",
# Ethnicity
"% South Asian",
"% East Asian",
"% Black",
"% Mixed",
"% White",
"% Other",
# Crime
"Anti-social behaviour (avg/yr)",
"Violence and sexual offences (avg/yr)",
"Criminal damage and arson (avg/yr)",
"Burglary (avg/yr)",
"Vehicle crime (avg/yr)",
"Robbery (avg/yr)",
"Other theft (avg/yr)",
"Shoplifting (avg/yr)",
"Drugs (avg/yr)",
"Possession of weapons (avg/yr)",
"Public order (avg/yr)",
"Bicycle theft (avg/yr)",
"Theft from the person (avg/yr)",
"Other crime (avg/yr)",
"Serious crime (avg/yr)",
"Minor crime (avg/yr)",
"Serious crime per 1k residents (avg/yr)",
"Minor crime per 1k residents (avg/yr)",
# Amenities
"Number of restaurants within 2km",
"Number of grocery shops and supermarkets within 2km",
# Environment
"Noise (dB)",
"Max available download speed (Mbps)",
CONSERVATION_AREA_FEATURE,
# Schools
"Good+ primary schools within 5km",
"Good+ secondary schools within 5km",
"Good+ primary schools within 2km",
"Good+ secondary schools within 2km",
"Outstanding primary schools within 5km",
"Outstanding secondary schools within 5km",
"Outstanding primary schools within 2km",
"Outstanding secondary schools within 2km",
# Demographics
"Median age",
# Politics
"Voter turnout (%)",
"% Labour",
"% Conservative",
"% Liberal Democrat",
"% Reform UK",
"% Green",
"% Other parties",
]
_DYNAMIC_POI_DISTANCE_RE = re.compile(r"^Distance to nearest amenity \(.+\) \(km\)$")
_DYNAMIC_POI_COUNT_RE = re.compile(r"^Number of amenities \(.+\) within (2|5)km$")
TREE_DENSITY_FEATURE = "Street tree density percentile"
_POSTCODE_TREE_DENSITY_PERCENTILE_RE = re.compile(
r"^Tree canopy density percentile within \d+m$"
)
_RENT_SOURCE_UNAVAILABLE_LADS = {
# ONS PIPR does not publish LAD-level private-rent estimates for these
# small authorities. Keep rent null there, but fail on any other LAD miss.
"E06000053": "Isles of Scilly",
"E09000001": "City of London",
}
_NUMBER_RE = re.compile(r"\d+")
_LISTED_NAME_STOP_WORDS = {
"A",
"AN",
"AND",
"AT",
"BY",
"IN",
"OF",
"ON",
"THE",
"TO",
"WITH",
}
def _is_dynamic_poi_metric_column(column: str) -> bool:
return bool(
_DYNAMIC_POI_DISTANCE_RE.match(column) or _DYNAMIC_POI_COUNT_RE.match(column)
)
def _numbers_compatible(left: str, right: str) -> bool:
"""Require address/list-entry numbers to agree when either side has numbers."""
left_nums = set(_NUMBER_RE.findall(left))
right_nums = set(_NUMBER_RE.findall(right))
smaller, larger = (
(left_nums, right_nums)
if len(left_nums) <= len(right_nums)
else (right_nums, left_nums)
)
if not smaller and larger:
return False
return smaller.issubset(larger)
def _listed_candidate_schema() -> dict[str, pl.DataType]:
return {
"postcode": pl.Utf8,
"_listed_match_name": pl.Utf8,
"_listed_grade": pl.Utf8,
"_listed_entry": pl.Int64,
}
def _empty_listed_candidates() -> pl.DataFrame:
return pl.DataFrame(schema=_listed_candidate_schema())
def _empty_listed_property_flags() -> pl.DataFrame:
return pl.DataFrame(
schema={
"postcode": pl.Utf8,
"pp_address": pl.Utf8,
LISTED_BUILDING_FEATURE: pl.Utf8,
}
)
def _is_matchable_listed_name(name_key: str | None) -> bool:
if not name_key:
return False
if _NUMBER_RE.search(name_key):
return True
substantive_tokens = [
token
for token in name_key.split()
if token not in _LISTED_NAME_STOP_WORDS and len(token) >= 3
]
return len(substantive_tokens) >= 2
def _load_listed_building_points(listed_buildings_path: Path) -> pl.DataFrame:
"""Load Historic England NHLE listed-building point attributes."""
columns = ["ListEntry", "Name", "Grade", "Easting", "Northing"]
info = pyogrio.read_info(listed_buildings_path)
geometry_type = str(info.get("geometry_type") or "")
if "Point" not in geometry_type:
raise ValueError(
f"Expected listed-building point data, got geometry {geometry_type!r}"
)
_, table = pyogrio.read_arrow(
listed_buildings_path,
columns=columns,
read_geometry=False,
)
df = pl.from_arrow(table)
missing = sorted(set(columns) - set(df.columns))
if missing:
raise ValueError(
f"{listed_buildings_path} is missing listed-building columns: {missing}"
)
return (
df.select(
pl.col("ListEntry").cast(pl.Int64),
pl.col("Name").cast(pl.Utf8),
pl.col("Grade").cast(pl.Utf8),
pl.col("Easting").cast(pl.Float64),
pl.col("Northing").cast(pl.Float64),
)
.drop_nulls(["Name", "Easting", "Northing"])
.with_columns(normalize_address_key(pl.col("Name")).alias("_listed_match_name"))
.filter(pl.col("_listed_match_name").is_not_null())
)
def _postcode_listed_building_candidates(
listed_points: pl.DataFrame,
active_postcodes: pl.DataFrame,
*,
nearest_postcodes: int = LISTED_BUILDING_NEAREST_POSTCODES,
max_distance_m: float = LISTED_BUILDING_MATCH_RADIUS_M,
) -> pl.DataFrame:
"""Assign each listed-building point to nearby active postcode candidates."""
if listed_points.is_empty() or active_postcodes.is_empty():
return _empty_listed_candidates()
required_postcode_cols = {"postcode", "east1m", "north1m"}
missing = sorted(required_postcode_cols - set(active_postcodes.columns))
if missing:
raise ValueError(f"Active postcode data missing required columns: {missing}")
required_listed_cols = {
"_listed_match_name",
"Grade",
"ListEntry",
"Easting",
"Northing",
}
missing = sorted(required_listed_cols - set(listed_points.columns))
if missing:
raise ValueError(f"Listed-building data missing required columns: {missing}")
postcodes = active_postcodes.drop_nulls(["postcode", "east1m", "north1m"])
postcodes = postcodes.filter(
pl.col("east1m").is_finite() & pl.col("north1m").is_finite()
)
listed = listed_points.drop_nulls(["_listed_match_name", "Easting", "Northing"])
listed = listed.filter(
pl.col("Easting").is_finite() & pl.col("Northing").is_finite()
)
if postcodes.is_empty() or listed.is_empty():
return _empty_listed_candidates()
postcode_coords = np.column_stack(
[postcodes["east1m"].to_numpy(), postcodes["north1m"].to_numpy()]
)
listed_coords = np.column_stack(
[listed["Easting"].to_numpy(), listed["Northing"].to_numpy()]
)
k = max(1, min(nearest_postcodes, postcodes.height))
distances, indices = cKDTree(postcode_coords).query(
listed_coords,
k=k,
distance_upper_bound=max_distance_m,
)
if k == 1:
distances = distances[:, np.newaxis]
indices = indices[:, np.newaxis]
postcode_values = postcodes["postcode"].to_list()
listed_names = listed["_listed_match_name"].to_list()
listed_grades = listed["Grade"].to_list()
listed_entries = listed["ListEntry"].to_list()
rows: list[tuple[str, str, str | None, int | None]] = []
for listed_idx in range(listed.height):
name_key = listed_names[listed_idx]
if not _is_matchable_listed_name(name_key):
continue
seen_postcodes: set[str] = set()
for distance, postcode_idx in zip(distances[listed_idx], indices[listed_idx]):
if not np.isfinite(distance) or postcode_idx >= postcodes.height:
continue
postcode = postcode_values[int(postcode_idx)]
if postcode in seen_postcodes:
continue
seen_postcodes.add(postcode)
rows.append(
(
postcode,
name_key,
listed_grades[listed_idx],
listed_entries[listed_idx],
)
)
if not rows:
return _empty_listed_candidates()
return (
pl.DataFrame(
rows,
schema=[
"postcode",
"_listed_match_name",
"_listed_grade",
"_listed_entry",
],
orient="row",
)
.cast(_listed_candidate_schema())
.unique(["postcode", "_listed_match_name", "_listed_entry"])
)
def _matched_listed_building_flags(
properties: pl.LazyFrame,
listed_candidates: pl.DataFrame,
*,
min_score: int = LISTED_BUILDING_MIN_MATCH_SCORE,
) -> pl.DataFrame:
"""Return property keys that conservatively match an NHLE listed entry."""
if listed_candidates.is_empty():
return _empty_listed_property_flags()
candidate_postcodes = listed_candidates.select("postcode").unique()
property_candidates = (
properties.select("postcode", "pp_address", "epc_address")
.join(candidate_postcodes.lazy(), on="postcode", how="semi")
.with_columns(
normalize_address_key(pl.col("pp_address")).alias("_pp_match_address"),
normalize_address_key(pl.col("epc_address")).alias("_epc_match_address"),
)
.filter(
pl.col("pp_address").is_not_null()
& (
pl.col("_pp_match_address").is_not_null()
| pl.col("_epc_match_address").is_not_null()
)
)
.collect(engine="streaming")
)
if property_candidates.is_empty():
return _empty_listed_property_flags()
listed_by_postcode: dict[str, list[str]] = {}
for postcode, name in listed_candidates.select(
"postcode", "_listed_match_name"
).iter_rows():
if postcode and name:
listed_by_postcode.setdefault(postcode, []).append(name)
matches: list[tuple[str, str, str]] = []
for row in property_candidates.iter_rows(named=True):
postcode = row["postcode"]
listed_names = listed_by_postcode.get(postcode)
if not listed_names:
continue
address_keys = []
for col in ("_pp_match_address", "_epc_match_address"):
value = row.get(col)
if value and value not in address_keys:
address_keys.append(value)
matched = False
for address_key in address_keys:
for listed_name in listed_names:
if not _numbers_compatible(address_key, listed_name):
continue
if fuzz.token_set_ratio(address_key, listed_name) >= min_score:
matched = True
break
if matched:
break
if matched:
matches.append((postcode, row["pp_address"], "Yes"))
if not matches:
return _empty_listed_property_flags()
return (
pl.DataFrame(
matches,
schema=["postcode", "pp_address", LISTED_BUILDING_FEATURE],
orient="row",
)
.cast(
{
"postcode": pl.Utf8,
"pp_address": pl.Utf8,
LISTED_BUILDING_FEATURE: pl.Utf8,
}
)
.unique(["postcode", "pp_address"])
)
def _listed_building_flags(
properties: pl.LazyFrame,
active_postcodes: pl.DataFrame,
listed_buildings_path: Path,
) -> pl.DataFrame:
print(f"Loading listed-building points from {listed_buildings_path}...")
listed_points = _load_listed_building_points(listed_buildings_path)
print(f"Loaded {listed_points.height} listed-building point records")
listed_candidates = _postcode_listed_building_candidates(
listed_points, active_postcodes
)
print(
"Matching listed-building names to property addresses across "
f"{listed_candidates['postcode'].n_unique()} nearby postcodes..."
)
flags = _matched_listed_building_flags(properties, listed_candidates)
print(f"Matched {flags.height} property addresses to listed-building entries")
return flags
def _normalise_crs(crs: object | None) -> str:
return str(crs) if crs else "EPSG:4326"
def _is_unpublished_conservation_area_record(name: object) -> bool:
return (
isinstance(name, str)
and name.strip().casefold().startswith(_UNPUBLISHED_CONSERVATION_AREA_PREFIX)
)
def _load_conservation_area_geometries(
conservation_areas_path: Path,
) -> tuple[list[BaseGeometry], str]:
metadata, table = pyogrio.read_arrow(conservation_areas_path, columns=["NAME"])
geometry_name = metadata.get("geometry_name") or table.column_names[-1]
names = table["NAME"].combine_chunks().to_pylist()
geometries = []
skipped_unpublished = 0
for name, geom in zip(
names, from_wkb(table[geometry_name].combine_chunks().to_pylist()), strict=True
):
if _is_unpublished_conservation_area_record(name):
skipped_unpublished += 1
elif geom is not None and not geom.is_empty:
geometries.append(geom)
if not geometries:
raise ValueError(
f"{conservation_areas_path} does not contain any usable polygon geometries"
)
if skipped_unpublished:
print(
"Skipped "
f"{skipped_unpublished} Historic England unpublished conservation-area "
"placeholder polygons"
)
return geometries, _normalise_crs(metadata.get("crs"))
def _postcode_conservation_area_flags(
postcodes: pl.DataFrame,
conservation_geometries: list[BaseGeometry],
conservation_crs: object | None,
batch_size: int = 100_000,
) -> pl.DataFrame:
required = {"postcode", "lat", "lon"}
missing = sorted(required - set(postcodes.columns))
if missing:
raise ValueError(f"Postcode data missing required columns: {missing}")
all_postcodes = postcodes.select("postcode").drop_nulls().unique()
valid_points = postcodes.select("postcode", "lat", "lon").drop_nulls()
if valid_points.is_empty():
return all_postcodes.with_columns(pl.lit("No").alias(CONSERVATION_AREA_FEATURE))
lat = valid_points["lat"].to_numpy()
lon = valid_points["lon"].to_numpy()
finite = np.isfinite(lat) & np.isfinite(lon)
valid_points = valid_points.filter(pl.Series(finite))
if valid_points.is_empty():
return all_postcodes.with_columns(pl.lit("No").alias(CONSERVATION_AREA_FEATURE))
lat = valid_points["lat"].to_numpy()
lon = valid_points["lon"].to_numpy()
transformer = Transformer.from_crs(
"EPSG:4326", _normalise_crs(conservation_crs), always_xy=True
)
x, y = transformer.transform(lon, lat)
tree = STRtree(conservation_geometries)
inside = np.zeros(valid_points.height, dtype=bool)
for start in range(0, valid_points.height, batch_size):
end = min(start + batch_size, valid_points.height)
point_batch = points(x[start:end], y[start:end])
matches = tree.query(point_batch, predicate="intersects")
if matches.size > 0:
inside[start + matches[0]] = True
matched = (
valid_points.select("postcode")
.with_columns(pl.Series("_within_conservation_area", inside))
.group_by("postcode")
.agg(pl.col("_within_conservation_area").max())
.with_columns(
pl.when(pl.col("_within_conservation_area"))
.then(pl.lit("Yes"))
.otherwise(pl.lit("No"))
.alias(CONSERVATION_AREA_FEATURE)
)
.select("postcode", CONSERVATION_AREA_FEATURE)
)
return (
all_postcodes.join(matched, on="postcode", how="left")
.with_columns(pl.col(CONSERVATION_AREA_FEATURE).fill_null("No"))
.select("postcode", CONSERVATION_AREA_FEATURE)
)
def _conservation_area_by_postcode(
postcodes: pl.LazyFrame,
conservation_areas_path: Path,
) -> pl.LazyFrame:
print(f"Loading conservation area polygons from {conservation_areas_path}...")
geometries, crs = _load_conservation_area_geometries(conservation_areas_path)
postcode_points = postcodes.select("postcode", "lat", "lon").collect(
engine="streaming"
)
print(
"Computing conservation area membership for "
f"{postcode_points.height} active English postcodes..."
)
return _postcode_conservation_area_flags(postcode_points, geometries, crs).lazy()
def _less_deprived_percentile_expr(column: str) -> pl.Expr:
"""Convert an IoD deprivation score to a 0-100 less-deprived percentile."""
non_null_count = pl.col(column).count()
descending_rank = pl.col(column).rank("average", descending=True)
return (
pl.when(pl.col(column).is_null())
.then(None)
.when(pl.col(column) == pl.col(column).min())
.then(100.0)
.when(pl.col(column) == pl.col(column).max())
.then(0.0)
.when(non_null_count > 1)
.then(((descending_rank - 1) / (non_null_count - 1) * 100).round(1))
.otherwise(100.0)
.alias(column)
)
def _tree_density_by_postcode(tree_density_postcodes_path: Path) -> pl.LazyFrame:
tree_density = pl.scan_parquet(tree_density_postcodes_path)
columns = set(tree_density.collect_schema().names())
if "postcode" not in columns:
raise ValueError(
f"{tree_density_postcodes_path} is missing required column: postcode"
)
if TREE_DENSITY_FEATURE in columns:
density_column = TREE_DENSITY_FEATURE
else:
candidates = sorted(
c for c in columns if _POSTCODE_TREE_DENSITY_PERCENTILE_RE.match(c)
)
if len(candidates) != 1:
raise ValueError(
f'{tree_density_postcodes_path} must contain column "{TREE_DENSITY_FEATURE}" '
'or exactly one "Tree canopy density percentile within {radius}m" column; '
f"found {len(candidates)} postcode percentile columns"
)
density_column = candidates[0]
return (
tree_density.select(
pl.col("postcode"),
pl.col(density_column).cast(pl.Float32).alias(TREE_DENSITY_FEATURE),
)
.drop_nulls(["postcode"])
.unique(["postcode"])
)
def _validate_lad_source_coverage(
iod_path: Path, ethnicity_path: Path, rental_prices_path: Path
) -> None:
iod_lads = (
pl.read_parquet(
iod_path,
columns=[
"Local Authority District code (2024)",
"Local Authority District name (2024)",
],
)
.rename(
{
"Local Authority District code (2024)": "lad",
"Local Authority District name (2024)": "lad_name",
}
)
.unique(["lad"])
)
ethnicity_lads = pl.read_parquet(ethnicity_path, columns=["Geography_code"]).rename(
{"Geography_code": "lad"}
)
missing_ethnicity = iod_lads.join(ethnicity_lads, on="lad", how="anti").sort("lad")
if missing_ethnicity.height > 0:
raise ValueError(
"Ethnicity data is missing 2024 LAD coverage: "
f"{missing_ethnicity.to_dicts()}"
)
rental_lads = pl.read_parquet(rental_prices_path, columns=["area_code"]).rename(
{"area_code": "lad"}
)
missing_rent = iod_lads.join(rental_lads, on="lad", how="anti").sort("lad")
unexpected_missing_rent = missing_rent.filter(
~pl.col("lad").is_in(list(_RENT_SOURCE_UNAVAILABLE_LADS))
)
if unexpected_missing_rent.height > 0:
raise ValueError(
"Rental data is missing 2024 LAD coverage: "
f"{unexpected_missing_rent.to_dicts()}"
)
if missing_rent.height > 0:
print(
"PIPR has no LAD-level rent estimates for source-unavailable LADs; "
f"rent will remain null there: {missing_rent.to_dicts()}"
)
def _validate_property_postcodes(df: pl.DataFrame) -> None:
invalid = df.filter(
pl.col("Postcode").is_null()
| (pl.col("Postcode").cast(pl.Utf8).str.strip_chars() == "")
)
if invalid.height == 0:
return
sample_cols = [
col
for col in ("Postcode", "Address per Property Register", "Last known price")
if col in invalid.columns
]
sample = invalid.select(sample_cols).head(10).to_dicts()
raise ValueError(
"Property rows missing a postcode after merge: "
f"{invalid.height} rows. Sample: {sample}"
)
def _build(
epc_pp_path: Path,
arcgis_path: Path,
iod_path: Path,
poi_proximity_path: Path,
ethnicity_path: Path,
crime_path: Path,
noise_path: Path,
school_proximity_path: Path,
broadband_path: Path,
conservation_areas_path: Path,
rental_prices_path: Path,
lsoa_population_path: Path,
median_age_path: Path,
election_results_path: Path,
tree_density_postcodes_path: Path | None = None,
listed_buildings_path: Path | None = None,
) -> tuple[pl.DataFrame, pl.DataFrame]:
"""Build postcode and properties dataframes from epc_pp + auxiliary data.
Returns (postcode_df, properties_df).
"""
_validate_lad_source_coverage(iod_path, ethnicity_path, rental_prices_path)
wide = pl.scan_parquet(epc_pp_path).filter(
pl.col("total_floor_area").is_null()
| (pl.col("total_floor_area") > MIN_FLOOR_AREA_M2)
)
# Remap terminated postcodes to nearest active successor
postcode_mapping = build_postcode_mapping(arcgis_path)
wide = (
wide.join(
postcode_mapping.lazy(),
left_on="postcode",
right_on="old_postcode",
how="left",
)
.with_columns(
pl.coalesce("new_postcode", "postcode").alias("postcode"),
)
.drop("new_postcode")
)
arcgis_raw = pl.scan_parquet(arcgis_path)
postcode_country = arcgis_raw.select(
pl.col("pcds").alias("postcode"),
pl.col("ctry25cd"),
).unique(["postcode"])
wide = wide.join(postcode_country, on="postcode", how="left")
if listed_buildings_path is not None:
active_postcodes_for_listed = (
arcgis_raw.filter(pl.col("ctry25cd") == "E92000001")
.filter(pl.col("doterm").is_null())
.select(
pl.col("pcds").alias("postcode"),
"east1m",
"north1m",
)
.collect(engine="streaming")
)
listed_flags = _listed_building_flags(
wide.select("postcode", "pp_address", "epc_address"),
active_postcodes_for_listed,
listed_buildings_path,
)
wide = wide.join(listed_flags.lazy(), on=["postcode", "pp_address"], how="left")
else:
wide = wide.with_columns(
pl.lit(None, dtype=pl.Utf8).alias(LISTED_BUILDING_FEATURE)
)
wide = wide.with_columns(pl.col(LISTED_BUILDING_FEATURE).fill_null("No"))
arcgis = (
arcgis_raw.filter(pl.col("ctry25cd") == "E92000001") # England only
.filter(pl.col("doterm").is_null()) # Active postcodes only
# NSPL Feb 2026 renamed geographic code columns to {field}{year}cd.
# Alias them back to the short canonical names used across the
# pipeline so downstream joins don't need to know about NSPL's
# versioning scheme.
.select(
pl.col("pcds").alias("postcode"),
"lat",
pl.col("long").alias("lon"),
pl.col("lsoa21cd").alias("lsoa21"),
pl.col("oa21cd").alias("oa21"),
pl.col("pcon24cd").alias("pcon"),
)
)
wide = wide.join(arcgis, on="postcode", how="left")
iod = pl.scan_parquet(iod_path).with_columns(
*(_less_deprived_percentile_expr(c) for c in _IOD_PERCENTILE_COLUMNS)
)
wide = wide.join(iod, left_on="lsoa21", right_on="LSOA code (2021)", how="left")
ethnicity = pl.scan_parquet(ethnicity_path)
wide = wide.join(
ethnicity,
left_on="Local Authority District code (2024)",
right_on="Geography_code",
how="left",
)
# Derive bedroom count: habitable rooms - 1 (assuming 1 reception room), clipped to 0..4
wide = wide.with_columns(
(pl.col("number_habitable_rooms") - 1)
.clip(0, 4)
.cast(pl.UInt8)
.alias("_bedrooms"),
)
rental = pl.scan_parquet(rental_prices_path).select(
"area_code", "bedrooms", "mean_monthly_rent"
)
wide = wide.join(
rental,
left_on=["Local Authority District code (2024)", "_bedrooms"],
right_on=["area_code", "bedrooms"],
how="left",
)
crime = pl.scan_parquet(crime_path)
wide = wide.join(crime, left_on="lsoa21", right_on="LSOA code", how="left")
wide = wide.with_columns(
pl.sum_horizontal(
"Violence and sexual offences (avg/yr)",
"Robbery (avg/yr)",
"Burglary (avg/yr)",
"Possession of weapons (avg/yr)",
).alias("serious_crime_avg_yr"),
pl.sum_horizontal(
"Anti-social behaviour (avg/yr)",
"Criminal damage and arson (avg/yr)",
"Shoplifting (avg/yr)",
"Bicycle theft (avg/yr)",
"Theft from the person (avg/yr)",
"Other theft (avg/yr)",
"Vehicle crime (avg/yr)",
"Public order (avg/yr)",
"Drugs (avg/yr)",
"Other crime (avg/yr)",
).alias("minor_crime_avg_yr"),
)
lsoa_pop = pl.scan_parquet(lsoa_population_path)
wide = wide.join(lsoa_pop, on="lsoa21", how="left")
wide = wide.with_columns(
pl.when(pl.col("population") > 0)
.then((pl.col("serious_crime_avg_yr") / pl.col("population") * 1000).round(1))
.alias("serious_crime_per_1k"),
pl.when(pl.col("population") > 0)
.then((pl.col("minor_crime_avg_yr") / pl.col("population") * 1000).round(1))
.alias("minor_crime_per_1k"),
).drop("population")
median_age = pl.scan_parquet(median_age_path)
wide = wide.join(median_age, on="lsoa21", how="left")
election = pl.scan_parquet(election_results_path)
wide = wide.join(election, on="pcon", how="left")
poi_counts = pl.scan_parquet(poi_proximity_path)
wide = wide.join(poi_counts, on="postcode", how="left")
noise_cols = ["road_noise_lden_db", "rail_noise_lden_db", "airport_noise_lden_db"]
noise = (
pl.scan_parquet(noise_path)
.with_columns(
# NaN → null so max_horizontal ignores missing instead of propagating NaN
*[pl.col(c).fill_nan(None) for c in noise_cols],
)
.with_columns(
pl.max_horizontal(*noise_cols).alias("noise_lden_db"),
)
.select("postcode", "noise_lden_db")
)
wide = wide.join(noise, on="postcode", how="left")
school_proximity = pl.scan_parquet(school_proximity_path)
wide = wide.join(school_proximity, on="postcode", how="left")
conservation_areas = _conservation_area_by_postcode(
arcgis.select("postcode", "lat", "lon"), conservation_areas_path
)
wide = wide.join(conservation_areas, on="postcode", how="left").with_columns(
pl.col(CONSERVATION_AREA_FEATURE).fill_null("No")
)
if tree_density_postcodes_path is not None:
tree_density = _tree_density_by_postcode(tree_density_postcodes_path)
wide = wide.join(tree_density, on="postcode", how="left")
# Broadband: derive max available download speed tier per postcode from
# Ofcom availability percentages. Tiers: Gigabit ≥1000, UFBB ≥300,
# UFBB(100) ≥100, SFBB ≥30 Mbps. Stored as string enum.
broadband = (
pl.scan_parquet(broadband_path)
.select(
pl.col("postcode_space").alias("bb_postcode"),
pl.when(pl.col("Gigabit availability (% premises)") > 0)
.then(1000)
.when(pl.col("UFBB availability (% premises)") > 0)
.then(300)
.when(pl.col("UFBB (100Mbit/s) availability (% premises)") > 0)
.then(100)
.when(pl.col("SFBB availability (% premises)") > 0)
.then(30)
.otherwise(10)
.cast(pl.UInt16)
.alias("max_download_speed"),
)
.group_by("bb_postcode")
.agg(pl.col("max_download_speed").max())
.with_columns(pl.col("max_download_speed").cast(pl.Utf8))
)
wide = wide.join(broadband, left_on="postcode", right_on="bb_postcode", how="left")
# Derive property_type: prefer EPC data, fall back to price-paid.
# For Houses, use built_form (e.g. Semi-Detached, Mid-Terrace) for finer detail.
bad_built_form = pl.col("built_form").is_null() | pl.col("built_form").is_in(
["NO DATA!", "Not Recorded"]
)
has_epc = pl.col("epc_property_type").is_not_null()
is_house = pl.col("epc_property_type") == "House"
wide = wide.with_columns(
pl.when(has_epc & is_house & ~bad_built_form)
.then(pl.col("built_form"))
.when(has_epc & is_house)
.then(pl.col("pp_property_type"))
.when(has_epc)
.then(pl.col("epc_property_type"))
.otherwise(pl.col("pp_property_type"))
# Unify EPC's "Flat"/"Maisonette" with price-paid's "Flats/Maisonettes",
# collapse terrace sub-types, and fold rare types into "Other"
.replace(
{
"Flat": "Flats/Maisonettes",
"Maisonette": "Flats/Maisonettes",
"End-Terrace": "Terraced",
"Mid-Terrace": "Terraced",
"Enclosed End-Terrace": "Terraced",
"Enclosed Mid-Terrace": "Terraced",
"Bungalow": "Other",
"Park home": "Other",
}
)
.alias("property_type")
)
wide = (
wide.with_columns(
pl.when(pl.col("duration") == "U")
.then(None)
.otherwise(pl.col("duration"))
.alias("duration"),
pl.when(pl.col("current_energy_rating") == "INVALID!")
.then(None)
.otherwise(pl.col("current_energy_rating"))
.alias("current_energy_rating"),
)
.with_columns(
(pl.col("latest_price") / pl.col("total_floor_area"))
.round(0)
.cast(pl.Int32)
.alias("Price per sqm"),
)
.drop(
"inspection_date",
"_bedrooms",
"LSOA name (2021)",
"Local Authority District code (2024)",
"Local Authority District name (2024)",
"Wider Barriers Sub-domain Score",
"Geographical Barriers Sub-domain Score",
"Adult Skills Sub-domain Score",
"Children and Young People Sub-domain Score",
"Crime Score",
"Living Environment Score",
"Index of Multiple Deprivation (IMD) Score",
"Income Deprivation Affecting Older People (IDAOPI) Score (rate)",
"Income Deprivation Affecting Children Index (IDACI) Score (rate)",
"Barriers to Housing and Services Score",
"oa21",
"pcon",
"epc_property_type",
"pp_property_type",
"built_form",
)
.rename(
{
"date_of_transfer": "Date of last transaction",
"construction_age_band": "Construction year",
"is_construction_date_approximate": "Is construction date approximate",
"Income Score (rate)": "Income Score",
"Employment Score (rate)": "Employment Score",
"Indoors Sub-domain Score": "Housing Conditions Score",
"Outdoors Sub-domain Score": "Air Quality and Road Safety Score",
"pp_address": "Address per Property Register",
"epc_address": "Address per EPC",
"postcode": "Postcode",
"duration": "Leasehold/Freehold",
"current_energy_rating": "Current energy rating",
"potential_energy_rating": "Potential energy rating",
"total_floor_area": "Total floor area (sqm)",
"property_type": "Property type",
"restaurants_2km": "Number of restaurants within 2km",
"groceries_2km": "Number of grocery shops and supermarkets within 2km",
"latest_price": "Last known price",
"number_habitable_rooms": "Number of bedrooms & living rooms",
"noise_lden_db": "Noise (dB)",
"good_primary_5km": "Good+ primary schools within 5km",
"good_secondary_5km": "Good+ secondary schools within 5km",
"good_primary_2km": "Good+ primary schools within 2km",
"good_secondary_2km": "Good+ secondary schools within 2km",
"outstanding_primary_5km": "Outstanding primary schools within 5km",
"outstanding_secondary_5km": "Outstanding secondary schools within 5km",
"outstanding_primary_2km": "Outstanding primary schools within 2km",
"outstanding_secondary_2km": "Outstanding secondary schools within 2km",
"max_download_speed": "Max available download speed (Mbps)",
"serious_crime_avg_yr": "Serious crime (avg/yr)",
"minor_crime_avg_yr": "Minor crime (avg/yr)",
"serious_crime_per_1k": "Serious crime per 1k residents (avg/yr)",
"minor_crime_per_1k": "Minor crime per 1k residents (avg/yr)",
"mean_monthly_rent": "Estimated monthly rent",
"floor_height": "Interior height (m)",
"was_council_house": "Former council house",
"median_age": "Median age",
"turnout_pct": "Voter turnout (%)",
}
)
)
print("Collecting with streaming engine...")
df = wide.collect(engine="streaming")
_validate_property_postcodes(df)
# Split into postcode-level and property-level dataframes
area_cols = [
c for c in df.columns if c in _AREA_COLUMNS or _is_dynamic_poi_metric_column(c)
]
postcode_df = df.select(area_cols).group_by("Postcode").first()
print(f"Postcode rows: {postcode_df.height} (unique postcodes)")
property_cols = [
c
for c in df.columns
if (c not in _AREA_COLUMNS and not _is_dynamic_poi_metric_column(c))
or c == "Postcode"
]
properties_df = df.select(property_cols)
print(f"Property rows: {properties_df.height}")
return postcode_df, properties_df
def main():
parser = argparse.ArgumentParser(
description="Build wide property dataframe with all joins"
)
parser.add_argument(
"--epc-pp", type=Path, required=True, help="EPC-Price Paid joined parquet file"
)
parser.add_argument(
"--arcgis", type=Path, required=True, help="ArcGIS postcode data parquet file"
)
parser.add_argument(
"--iod",
type=Path,
required=True,
help="Index of Deprivation parquet file (optional)",
)
parser.add_argument(
"--poi-proximity",
type=Path,
help="POI proximity counts parquet file (optional)",
)
parser.add_argument(
"--ethnicity",
type=Path,
required=True,
help="Ethnicity by local authority parquet file (optional)",
)
parser.add_argument(
"--crime",
type=Path,
required=True,
help="Crime by LSOA parquet file (optional)",
)
parser.add_argument(
"--noise", type=Path, required=True, help="Road noise by postcode parquet file"
)
parser.add_argument(
"--school-proximity",
type=Path,
required=True,
help="School proximity counts parquet file",
)
parser.add_argument(
"--broadband",
type=Path,
required=True,
help="Broadband performance by output area parquet file",
)
parser.add_argument(
"--conservation-areas",
type=Path,
required=True,
help="Historic England conservation areas GeoPackage",
)
parser.add_argument(
"--listed-buildings",
type=Path,
required=False,
help="Historic England NHLE listed-building points GeoPackage",
)
parser.add_argument(
"--rental-prices",
type=Path,
required=True,
help="ONS rental prices by LA and bedroom count parquet file",
)
parser.add_argument(
"--lsoa-population",
type=Path,
required=True,
help="Census 2021 population by LSOA parquet file",
)
parser.add_argument(
"--median-age",
type=Path,
required=True,
help="Census 2021 median age by LSOA parquet file",
)
parser.add_argument(
"--election-results",
type=Path,
required=True,
help="2024 General Election results by constituency parquet file",
)
parser.add_argument(
"--tree-density-postcodes",
type=Path,
required=False,
help="Postcode-level tree density parquet from pipeline.transform.tree_density",
)
parser.add_argument(
"--output-postcodes",
type=Path,
required=True,
help="Output postcode parquet file path",
)
parser.add_argument(
"--output-properties",
type=Path,
required=True,
help="Output properties parquet file path",
)
args = parser.parse_args()
postcode_df, properties_df = _build(
epc_pp_path=args.epc_pp,
arcgis_path=args.arcgis,
iod_path=args.iod,
poi_proximity_path=args.poi_proximity,
ethnicity_path=args.ethnicity,
crime_path=args.crime,
noise_path=args.noise,
school_proximity_path=args.school_proximity,
broadband_path=args.broadband,
conservation_areas_path=args.conservation_areas,
rental_prices_path=args.rental_prices,
lsoa_population_path=args.lsoa_population,
median_age_path=args.median_age,
election_results_path=args.election_results,
tree_density_postcodes_path=args.tree_density_postcodes,
listed_buildings_path=args.listed_buildings,
)
print(f"\nPostcode columns: {postcode_df.columns}")
print(f"Postcode rows: {postcode_df.height}")
postcode_df.write_parquet(args.output_postcodes)
size_mb = args.output_postcodes.stat().st_size / (1024 * 1024)
print(f"Wrote {args.output_postcodes} ({size_mb:.1f} MB)")
print(f"\nProperty columns: {properties_df.columns}")
print(f"Property rows: {properties_df.height}")
properties_df.write_parquet(args.output_properties)
size_mb = args.output_properties.stat().st_size / (1024 * 1024)
print(f"Wrote {args.output_properties} ({size_mb:.1f} MB)")
if __name__ == "__main__":
main()