From aab85fe32e0cccedb2c2777893deb4094a39f72f Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Tue, 2 Jun 2026 20:14:32 +0100 Subject: [PATCH] idgf --- Makefile.data | 10 +- frontend/src/hooks/useLocationSearch.ts | 5 +- pipeline/download/arcgis.py | 25 +- pipeline/download/median_age.py | 100 ++++++-- pipeline/download/noise.py | 83 +++++-- pipeline/download/places.py | 79 +++++-- pipeline/download/test_median_age.py | 75 ++++++ pipeline/download/test_noise.py | 199 ++++++++++++++++ pipeline/download/test_places.py | 87 +++++++ pipeline/download/uprn_lookup.py | 18 +- pipeline/test_validate_outputs.py | 66 ++++++ pipeline/transform/crime_spatial.py | 22 ++ pipeline/transform/join_epc_pp.py | 21 +- pipeline/transform/merge.py | 99 +++++--- pipeline/transform/noise_overlay_tiles.py | 26 ++- .../test_postcode_boundaries.py | 61 ++++- .../transform/postcode_boundaries/voronoi.py | 45 ++-- pipeline/transform/price_estimation/index.py | 69 +++--- pipeline/transform/price_estimation/knn.py | 24 ++ .../transform/price_estimation/shrinkage.py | 69 ++++-- .../transform/price_estimation/test_index.py | 135 +++++++++++ .../transform/price_estimation/test_knn.py | 40 ++++ .../price_estimation/test_shrinkage.py | 170 ++++++++------ pipeline/transform/test_crime_spatial.py | 41 ++++ pipeline/transform/test_join_epc_pp.py | 61 ++++- pipeline/transform/test_merge.py | 216 +++++++++++++++++- .../transform/test_noise_overlay_tiles.py | 110 +++++++++ pipeline/transform/test_transform_poi.py | 179 +++++++++++++++ pipeline/transform/transform_poi.py | 20 +- pipeline/utils/__init__.py | 2 + pipeline/utils/nspl_schema.py | 30 +++ pipeline/utils/test_nspl_schema.py | 57 +++++ pipeline/validate_outputs.py | 55 +++++ 33 files changed, 2016 insertions(+), 283 deletions(-) create mode 100644 pipeline/download/test_median_age.py create mode 100644 pipeline/transform/price_estimation/test_index.py create mode 100644 pipeline/transform/test_noise_overlay_tiles.py create mode 100644 pipeline/utils/nspl_schema.py create mode 100644 pipeline/utils/test_nspl_schema.py diff --git a/Makefile.data b/Makefile.data index ecc065e..31a3c03 100644 --- a/Makefile.data +++ b/Makefile.data @@ -116,9 +116,9 @@ MAP_ASSETS_DEPS := pipeline/download/map_assets.py pipeline/transform/transform_ generate-postcode-boundaries generate-travel-times enrich-actual-listings prepare: $(PRICES_STAMP) download-places tiles satellite-tiles overlay-tiles property-border-tiles generate-postcode-boundaries download-map-assets generate-travel-times | $(POSTCODES_PQ) $(PROPERTIES_PQ) $(PRICE_INDEX) - $(VALIDATE_OUTPUTS) --parquet $(POSTCODES_PQ) --parquet $(PROPERTIES_PQ) --parquet $(PRICE_INDEX) --postcode-boundary-match "$(POSTCODES_PQ)::$(PC_BOUNDARIES)" --postcode-features $(POSTCODES_PQ) --properties-subset "$(PROPERTIES_PQ)::$(POSTCODES_PQ)" --price-index $(PRICE_INDEX) + $(VALIDATE_OUTPUTS) --parquet $(POSTCODES_PQ) --parquet $(PROPERTIES_PQ) --parquet $(PRICE_INDEX) --postcode-boundary-match "$(POSTCODES_PQ)::$(PC_BOUNDARIES)" --postcode-features $(POSTCODES_PQ) --postcode-universe "$(ARCGIS)::$(POSTCODES_PQ)" --properties-subset "$(PROPERTIES_PQ)::$(POSTCODES_PQ)" --price-index $(PRICE_INDEX) merge: $(MERGE_STAMP) | $(POSTCODES_PQ) $(PROPERTIES_PQ) - $(VALIDATE_OUTPUTS) --parquet $(POSTCODES_PQ) --parquet $(PROPERTIES_PQ) --postcode-features $(POSTCODES_PQ) --properties-subset "$(PROPERTIES_PQ)::$(POSTCODES_PQ)" + $(VALIDATE_OUTPUTS) --parquet $(POSTCODES_PQ) --parquet $(PROPERTIES_PQ) --postcode-features $(POSTCODES_PQ) --postcode-universe "$(ARCGIS)::$(POSTCODES_PQ)" --properties-subset "$(PROPERTIES_PQ)::$(POSTCODES_PQ)" enrich-actual-listings: $(ACTUAL_LISTINGS_ENRICHED) tiles: $(TILES) $(SATELLITE_TILES) $(SATELLITE_HIGHRES_TILES) satellite-tiles: $(SATELLITE_TILES) @@ -413,13 +413,13 @@ $(MERGE_STAMP): $(EPC_PP) $(ARCGIS) $(IOD) $(POI_PROXIMITY) \ --tree-density-postcodes $(TREE_DENSITY_PC) \ --output-postcodes $(POSTCODES_PQ) \ --output-properties $(PROPERTIES_PQ) - $(VALIDATE_OUTPUTS) --parquet $(POSTCODES_PQ) --parquet $(PROPERTIES_PQ) --postcode-features $(POSTCODES_PQ) --properties-subset "$(PROPERTIES_PQ)::$(POSTCODES_PQ)" + $(VALIDATE_OUTPUTS) --parquet $(POSTCODES_PQ) --parquet $(PROPERTIES_PQ) --postcode-features $(POSTCODES_PQ) --postcode-universe "$(ARCGIS)::$(POSTCODES_PQ)" --properties-subset "$(PROPERTIES_PQ)::$(POSTCODES_PQ)" @touch $@ # ── Price estimation (post-merge) ─────────────────────────────────────────── $(POSTCODES_PQ) $(PROPERTIES_PQ) &: $(MERGE_STAMP) - $(VALIDATE_OUTPUTS) --parquet $(POSTCODES_PQ) --parquet $(PROPERTIES_PQ) --postcode-features $(POSTCODES_PQ) --properties-subset "$(PROPERTIES_PQ)::$(POSTCODES_PQ)" + $(VALIDATE_OUTPUTS) --parquet $(POSTCODES_PQ) --parquet $(PROPERTIES_PQ) --postcode-features $(POSTCODES_PQ) --postcode-universe "$(ARCGIS)::$(POSTCODES_PQ)" --properties-subset "$(PROPERTIES_PQ)::$(POSTCODES_PQ)" $(PRICE_INDEX): $(MERGE_STAMP) $(PRICE_INDEX_DEPS) | $(PROPERTIES_PQ) $(POSTCODES_PQ) uv run python -m pipeline.transform.price_estimation.index --input $(PROPERTIES_PQ) --postcodes $(POSTCODES_PQ) --output $@ @@ -428,7 +428,7 @@ $(PRICE_INDEX): $(MERGE_STAMP) $(PRICE_INDEX_DEPS) | $(PROPERTIES_PQ) $(POSTCODE $(PRICES_STAMP): $(MERGE_STAMP) $(PRICE_INDEX) $(PRICE_ESTIMATE_DEPS) | $(PROPERTIES_PQ) $(POSTCODES_PQ) @rm -f $@ uv run python -m pipeline.transform.price_estimation.estimate --properties $(PROPERTIES_PQ) --postcodes $(POSTCODES_PQ) --index $(PRICE_INDEX) - $(VALIDATE_OUTPUTS) --parquet $(PROPERTIES_PQ) --parquet $(POSTCODES_PQ) --parquet $(PRICE_INDEX) --postcode-features $(POSTCODES_PQ) --properties-subset "$(PROPERTIES_PQ)::$(POSTCODES_PQ)" --price-index $(PRICE_INDEX) + $(VALIDATE_OUTPUTS) --parquet $(PROPERTIES_PQ) --parquet $(POSTCODES_PQ) --parquet $(PRICE_INDEX) --postcode-features $(POSTCODES_PQ) --postcode-universe "$(ARCGIS)::$(POSTCODES_PQ)" --properties-subset "$(PROPERTIES_PQ)::$(POSTCODES_PQ)" --price-index $(PRICE_INDEX) @touch $@ $(ACTUAL_LISTINGS_ENRICHED): $(ACTUAL_LISTINGS_RAW) $(EPC) \ diff --git a/frontend/src/hooks/useLocationSearch.ts b/frontend/src/hooks/useLocationSearch.ts index df4cdfa..05def85 100644 --- a/frontend/src/hooks/useLocationSearch.ts +++ b/frontend/src/hooks/useLocationSearch.ts @@ -244,10 +244,7 @@ function legacyCombineResults(json: PlacesApiResponse, trimmed: string): SearchR export type ViewportCenter = { lat: number; lng: number }; -export function useLocationSearch( - mode?: string, - getViewportCenter?: () => ViewportCenter | null -) { +export function useLocationSearch(mode?: string, getViewportCenter?: () => ViewportCenter | null) { const [query, setQuery] = useState(''); const [results, setResults] = useState([]); const [recentSearches, setRecentSearches] = useState(readRecentSearches); diff --git a/pipeline/download/arcgis.py b/pipeline/download/arcgis.py index 418d9bf..588d5c1 100644 --- a/pipeline/download/arcgis.py +++ b/pipeline/download/arcgis.py @@ -4,27 +4,24 @@ import polars as pl from pathlib import Path from pipeline.local_temp import local_tmp_dir -from pipeline.utils import download, extract_zip +from pipeline.utils import code_col_overrides, download, extract_zip URL = "https://www.arcgis.com/sharing/rest/content/items/36b718ad00de49afb9ad364f8b815b9e/data" def convert_to_parquet(data_path: Path, parquet_path: Path) -> None: - # Classification code columns (ruc21ind, oac11ind, imd20ind) look numeric - # in early rows but contain string codes like "UN1" (Unclassified) later - # on. Force them to String to avoid mid-stream dtype inference failures. - # Note: NSPL renames these year suffixes as new releases roll in (e.g. - # Feb 2026 bumped oac from oac21ind → oac11ind, imd from imd19ind → - # imd20ind), so keep this dict in sync with the current CSV headers — - # polars silently ignores overrides for missing columns, masking drift. + # Classification code columns (e.g. ruc21ind, oac11ind, imd20ind) look + # numeric in early rows but contain string codes like "UN1" (Unclassified) + # later on. Force them to String to avoid mid-stream dtype inference + # failures. NSPL renames these year suffixes each release, and polars + # silently ignores overrides for missing columns, so match on the + # suffix-free stem (read from the header) rather than hard-coding suffixes. + csv_path = data_path / "Data/NSPL_FEB_2026_UK.csv" + names = pl.scan_csv(csv_path).collect_schema().names() df = pl.scan_csv( - data_path / "Data/NSPL_FEB_2026_UK.csv", + csv_path, try_parse_dates=True, - schema_overrides={ - "ruc21ind": pl.String, - "oac11ind": pl.String, - "imd20ind": pl.String, - }, + schema_overrides=code_col_overrides(names), ) print(f"Columns: {df.collect_schema().names()}") parquet_path.parent.mkdir(parents=True, exist_ok=True) diff --git a/pipeline/download/median_age.py b/pipeline/download/median_age.py index 4471ce8..b02c43d 100644 --- a/pipeline/download/median_age.py +++ b/pipeline/download/median_age.py @@ -43,6 +43,35 @@ AGE_BANDS = [ (85, 5), # Aged 85 years and over ] +# Canonical NOMIS TS007A (C2021_AGE_19_NAME) band labels, in the SAME order as +# AGE_BANDS. Index i here corresponds to AGE_BANDS[i]; we validate the pivot +# output against this set and use it (not positional string parsing) to order +# the columns, so a stray/relabelled/missing band fails loudly instead of +# silently mis-aligning counts against the wrong lower bound. +EXPECTED_BAND_NAMES = [ + "Aged 0 to 4 years", + "Aged 5 to 9 years", + "Aged 10 to 14 years", + "Aged 15 to 19 years", + "Aged 20 to 24 years", + "Aged 25 to 29 years", + "Aged 30 to 34 years", + "Aged 35 to 39 years", + "Aged 40 to 44 years", + "Aged 45 to 49 years", + "Aged 50 to 54 years", + "Aged 55 to 59 years", + "Aged 60 to 64 years", + "Aged 65 to 69 years", + "Aged 70 to 74 years", + "Aged 75 to 79 years", + "Aged 80 to 84 years", + "Aged 85 years and over", +] +assert len(EXPECTED_BAND_NAMES) == len(AGE_BANDS), ( + "EXPECTED_BAND_NAMES and AGE_BANDS must stay aligned 1:1" +) + def compute_median_age(counts: list[int]) -> float: """Compute median age from five-year band counts using linear interpolation.""" @@ -62,6 +91,53 @@ def compute_median_age(counts: list[int]) -> float: return float("nan") +def _bands_to_median_table(pivoted: pl.DataFrame) -> pl.DataFrame: + """Validate the pivoted age-band columns, then compute median age per LSOA. + + The pivot must contain exactly the canonical NOMIS TS007A bands; a + missing/extra/relabelled band would otherwise silently mis-align counts + against the wrong AGE_BANDS lower bound, so we fail loudly instead. + """ + # Validate the pivoted age-band columns against the canonical NOMIS set + # BEFORE computing anything. + band_cols = [c for c in pivoted.columns if c != "GEOGRAPHY_CODE"] + found = set(band_cols) + expected = set(EXPECTED_BAND_NAMES) + if found != expected: + missing = sorted(expected - found) + unexpected = sorted(found - expected) + raise ValueError( + "Census age-band columns do not match the expected NOMIS TS007A bands.\n" + f" expected {len(EXPECTED_BAND_NAMES)} bands, found {len(band_cols)}\n" + f" missing: {missing}\n" + f" unexpected: {unexpected}\n" + "Refusing to compute medians against misaligned bands." + ) + + # Use the canonical order (guaranteed aligned with AGE_BANDS), not positional + # string parsing, and treat a null band (zero-population) as 0 rather than + # crashing on sum(). + band_cols = list(EXPECTED_BAND_NAMES) + pivoted = pivoted.with_columns(pl.col(band_cols).fill_null(0)) + + print(f"Age bands found: {len(band_cols)}") + print(f" First: {band_cols[0]}") + print(f" Last: {band_cols[-1]}") + + rows = pivoted.select("GEOGRAPHY_CODE", *band_cols).to_dicts() + medians = [] + for row in rows: + counts = [row[col] for col in band_cols] + median = compute_median_age(counts) + medians.append( + {"lsoa21": row["GEOGRAPHY_CODE"], "median_age": round(median, 1)} + ) + + return pl.DataFrame(medians).with_columns( + pl.col("median_age").cast(pl.Float32), + ) + + def download_and_convert(output_path: Path) -> None: print("Downloading Census 2021 age by five-year bands from NOMIS...") frames = [] @@ -94,29 +170,7 @@ def download_and_convert(output_path: Path) -> None: values="OBS_VALUE", ) - # Extract age band columns in order and compute median - # NOMIS returns band names like "Aged 0 to 4 years", "Aged 85 years and over" - band_cols = [c for c in pivoted.columns if c != "GEOGRAPHY_CODE"] - # Sort by the lower bound of each band - band_cols.sort(key=lambda c: int(c.split()[1])) - - print(f"Age bands found: {len(band_cols)}") - print(f" First: {band_cols[0]}") - print(f" Last: {band_cols[-1]}") - - # Compute median age per LSOA - rows = pivoted.select("GEOGRAPHY_CODE", *band_cols).to_dicts() - medians = [] - for row in rows: - counts = [row[col] for col in band_cols] - median = compute_median_age(counts) - medians.append( - {"lsoa21": row["GEOGRAPHY_CODE"], "median_age": round(median, 1)} - ) - - result = pl.DataFrame(medians).with_columns( - pl.col("median_age").cast(pl.Float32), - ) + result = _bands_to_median_table(pivoted) print(f"England LSOAs: {result.height}") print( diff --git a/pipeline/download/noise.py b/pipeline/download/noise.py index 660ce30..545f559 100644 --- a/pipeline/download/noise.py +++ b/pipeline/download/noise.py @@ -83,11 +83,32 @@ NATIVE_RESOLUTION = 10 # Request pixel resolution in metres. RESOLUTION = NATIVE_RESOLUTION +# Defra encodes TRUE "no data" with this sentinel (NOT 0.0). A 0.0 cell that is +# otherwise inside the raster means "modelled below the lowest reporting band", +# i.e. genuinely quiet — see noise_overlay_tiles.py:167. +NOISE_NODATA_SENTINEL = np.float32(-96.0) + +# Lowest modelled Defra Lden reporting band (dB). Verified against the actual +# rasters: the minimum positive in-coverage value is 40.0 dB with NO values in +# (0, 40) — below the band, cells are encoded as 0.0 (genuinely quiet). We floor +# in-coverage cells to 40.0 so a below-band 0.0 surfaces as "we know it's quiet" +# (~40 dB) instead of collapsing to null ("we don't know"), WITHOUT inflating the +# ~35% of genuine 40-44.99 dB readings that a 45.0 floor would wrongly bump to 45. +# NB: 45.0 is the overlay's lowest *paint* stop (noise_overlay_tiles. +# NOISE_COLOR_STOPS[0]) — a rendering threshold, not the data's reporting floor. +NOISE_QUIET_FLOOR_DB = np.float32(40.0) + # The pipeline has postcode representative points rather than complete unit # polygons here. Use a small local footprint and take the maximum 10m cell so # postcode-level noise is not understated by centroid rounding. POSTCODE_NOISE_RADIUS_M = 50 +# Adjacent download tiles must overlap by at least the sampling radius so every +# postcode's 50m max-window is fully contained in at least one tile. Without +# this, a loud pixel just across a tile seam is invisible to a postcode on the +# far side, under-reporting noise near seams. +TILE_OVERLAP_M = POSTCODE_NOISE_RADIUS_M + # Retry/split behaviour for slow Defra WCS requests. Some 100km eastern tiles # intermittently return 504s; smaller fallback requests usually succeed. MAX_RETRIES = 3 @@ -287,6 +308,31 @@ def _download_tile( return [], [(min_e, min_n, max_e, max_n)] +def _generate_tiles( + min_e: int, + max_e: int, + min_n: int, + max_n: int, + tile_size: int, + overlap_m: int, + step: int, +) -> list[Tile]: + """Generate download tile bboxes stepping by ``step`` but extending each + tile's far edge by ``overlap_m`` so neighbours overlap. + + Overlapping neighbours guarantee that every postcode's POSTCODE_NOISE_RADIUS_M + sampling window is fully contained in at least one tile, so a loud pixel near + a seam is never lost (the sampler takes np.fmax across tiles). + """ + tiles: list[Tile] = [] + for tile_min_e in range(min_e, max_e, step): + for tile_min_n in range(min_n, max_n, step): + tile_max_e = min(tile_min_e + tile_size + overlap_m, BNG_MAX_E) + tile_max_n = min(tile_min_n + tile_size + overlap_m, BNG_MAX_N) + tiles.append((tile_min_e, tile_min_n, tile_max_e, tile_max_n)) + return tiles + + def download_raster( tile_dir: Path, wcs_base: str, @@ -296,12 +342,9 @@ def download_raster( allow_missing_tiles: bool = False, ) -> list[Path]: """Download noise GeoTIFF raster covering England, returning paths to saved files.""" - tiles = [] - for min_e in range(BNG_MIN_E, BNG_MAX_E, TILE_SIZE): - for min_n in range(BNG_MIN_N, BNG_MAX_N, TILE_SIZE): - max_e = min(min_e + TILE_SIZE, BNG_MAX_E) - max_n = min(min_n + TILE_SIZE, BNG_MAX_N) - tiles.append((min_e, min_n, max_e, max_n)) + tiles = _generate_tiles( + BNG_MIN_E, BNG_MAX_E, BNG_MIN_N, BNG_MAX_N, TILE_SIZE, TILE_OVERLAP_M, TILE_SIZE + ) print( f"[{label}] Downloading {len(tiles)} tiles at {RESOLUTION}m resolution ({MAX_WORKERS} workers)..." @@ -385,14 +428,23 @@ def sample_noise_at_postcodes( if len(candidate_indices) == 0: continue + # Defra rasters encode TRUE nodata as the -96.0 sentinel (and + # occasionally non-finite / dataset.nodata); genuinely quiet ground + # below the model's lowest reporting band is encoded as 0.0. Only + # the former is "we don't know" — the latter is a real "we know it's + # quiet" reading and must not collapse to null. So treat ONLY true + # nodata as -inf (it never wins a max and never counts as coverage), + # and clamp every in-coverage cell up to NOISE_QUIET_FLOOR_DB so a + # below-threshold 0.0 surfaces as the documented quiet floor. grid = dataset.read(1).astype(np.float32, copy=False) - invalid = ~np.isfinite(grid) | (grid == 0) + nodata = ~np.isfinite(grid) | np.isclose( + grid, NOISE_NODATA_SENTINEL, rtol=1e-5, atol=1e-5 + ) if dataset.nodata is not None: - invalid |= np.isclose( + nodata |= np.isclose( grid, np.float32(dataset.nodata), rtol=1e-5, atol=1e-5 ) - grid = grid.copy() - grid[invalid] = -np.inf + grid = np.where(nodata, -np.inf, np.maximum(grid, NOISE_QUIET_FLOOR_DB)) if filter_size > 1: grid = maximum_filter( grid, size=filter_size, mode="constant", cval=-np.inf @@ -412,12 +464,15 @@ def sample_noise_at_postcodes( sampled_indices = candidate_indices[in_bounds] sampled = grid[rows[in_bounds], cols[in_bounds]] - valid = sampled != -np.inf - if not np.any(valid): + # A finite sample means at least one in-coverage cell sat in the + # window (quiet -> floor, or louder). -inf means the whole window was + # true nodata, so the postcode stays uncovered (null) for this tile. + covered = np.isfinite(sampled) + if not np.any(covered): continue - sampled_indices = sampled_indices[valid] - sampled = sampled[valid] + sampled_indices = sampled_indices[covered] + sampled = sampled[covered] existing = noise_db[sampled_indices] noise_db[sampled_indices] = np.where( np.isnan(existing), sampled, np.maximum(existing, sampled) diff --git a/pipeline/download/places.py b/pipeline/download/places.py index ffa1f56..6b5d875 100644 --- a/pipeline/download/places.py +++ b/pipeline/download/places.py @@ -84,6 +84,38 @@ LONDON_COUNTY_CODES = {"E13000001", "E13000002"} DISPLAY_CITY_NEAREST_POSTCODE_MAX_M = 3_000 WGS84_TO_BNG = Transformer.from_crs("EPSG:4326", "EPSG:27700", always_xy=True) +# England British National Grid (EPSG:27700) bounding box, with margin. ONS NSPL stores +# postcodes that have no grid reference at the Null-Island sentinel lat=99.999999, +# long=0.000000, whose paired easting/northing collapse to the grid origin (0, 0) (or +# inf). Requiring coordinates inside this box drops the sentinel from every index, so an +# active postcode lacking a grid ref can never become a false nearest neighbour. +ENGLAND_BNG_MIN_EAST = 50_000.0 +ENGLAND_BNG_MAX_EAST = 660_000.0 +ENGLAND_BNG_MIN_NORTH = 0.0 +ENGLAND_BNG_MAX_NORTH = 660_000.0 + + +def _valid_wgs84_expr() -> pl.Expr: + """Rows with a real lat/long inside England (drops the ONS lat=99.999999, long=0.0 + no-grid-reference sentinel and any nulls), so they never enter a coordinate index.""" + return ( + pl.col("lat").is_not_null() + & pl.col("long").is_not_null() + & pl.col("lat").is_between(ENGLAND_BBOX_SOUTH, ENGLAND_BBOX_NORTH) + & pl.col("long").is_between(ENGLAND_BBOX_WEST, ENGLAND_BBOX_EAST) + ) + + +def _valid_bng_expr() -> pl.Expr: + """Rows with a real easting/northing inside England (drops the (0, 0) grid-origin / + inf paired with the ONS no-grid-reference sentinel and any nulls).""" + return ( + pl.col("east1m").is_not_null() + & pl.col("north1m").is_not_null() + & pl.col("east1m").is_between(ENGLAND_BNG_MIN_EAST, ENGLAND_BNG_MAX_EAST) + & pl.col("north1m").is_between(ENGLAND_BNG_MIN_NORTH, ENGLAND_BNG_MAX_NORTH) + ) + # Suffixes to strip from raw station names before appending the typed suffix. _STATION_STRIP = ( " tube station", @@ -303,7 +335,7 @@ def _outcode_tree(postcodes_path: Path) -> tuple[cKDTree, list[str]]: postcodes_path, columns=["pcds", "lat", "long", "ctry25cd", "doterm"] ) .filter((pl.col("ctry25cd") == ENGLAND_COUNTRY_CODE) & pl.col("doterm").is_null()) - .filter(pl.col("lat").is_not_null() & pl.col("long").is_not_null()) + .filter(_valid_wgs84_expr()) ) coords = np.column_stack( [df["lat"].to_numpy().astype(np.float64), df["long"].to_numpy().astype(np.float64)] @@ -359,12 +391,22 @@ def _build_street_places( return sorted(places, key=lambda place: place["name"].lower()) +def _poi_dedup_key(name: str, place_type: str, lat: float, lon: float) -> tuple: + """Geographic de-dup key: round(.,2) is ~1.1km lat / ~0.7km UK lon. + + Coarse enough to collapse the SAME physical POI mapped twice a few metres + apart, fine enough to keep genuinely distinct same-named POIs in different + towns (e.g. "Victoria Park" in London vs Bristol). + """ + return (name.lower(), place_type, round(lat, 2), round(lon, 2)) + + def _pois_to_places(pois: pl.DataFrame) -> list[dict]: - """Map high-value named POIs onto gazetteer place rows (M), de-duplicated by (name, type).""" + """Map high-value named POIs onto gazetteer place rows (M), de-duplicated by (name, type, coords).""" if pois.is_empty(): return [] - seen: set[tuple[str, str]] = set() + seen: set[tuple] = set() places: list[dict] = [] for row in pois.iter_rows(named=True): place_type = HIGH_VALUE_POI_CATEGORIES.get(str(row.get("category", ""))) @@ -373,7 +415,9 @@ def _pois_to_places(pois: pl.DataFrame) -> list[dict]: name = str(row.get("name") or "").strip() if len(name) < 3: continue - key = (name.lower(), place_type) + lat = float(row["lat"]) + lon = float(row["lng"]) + key = _poi_dedup_key(name, place_type, lat, lon) if key in seen: continue seen.add(key) @@ -381,8 +425,8 @@ def _pois_to_places(pois: pl.DataFrame) -> list[dict]: { "name": name, "place_type": place_type, - "lat": float(row["lat"]), - "lon": float(row["lng"]), + "lat": lat, + "lon": lon, "population": 0, "travel_destination": False, "display_city": None, @@ -395,11 +439,16 @@ def _append_high_value_pois(places: list[dict], pois_path: Path) -> int: pois = pl.read_parquet(pois_path, columns=["name", "category", "lat", "lng"]) new_places = _pois_to_places(pois) existing = { - (str(place["name"]).lower(), place["place_type"]) for place in places + _poi_dedup_key( + str(place["name"]), place["place_type"], place["lat"], place["lon"] + ) + for place in places } added = 0 for place in new_places: - key = (place["name"].lower(), place["place_type"]) + key = _poi_dedup_key( + place["name"], place["place_type"], place["lat"], place["lon"] + ) if key in existing: continue places.append(place) @@ -409,10 +458,14 @@ def _append_high_value_pois(places: list[dict], pois_path: Path) -> int: def _postcode_lookup(postcodes_path: Path) -> dict[str, tuple[float, float]]: - df = pl.read_parquet( - postcodes_path, - columns=["pcds", "lat", "long", "ctry25cd", "doterm"], - ).filter((pl.col("ctry25cd") == ENGLAND_COUNTRY_CODE) & pl.col("doterm").is_null()) + df = ( + pl.read_parquet( + postcodes_path, + columns=["pcds", "lat", "long", "ctry25cd", "doterm"], + ) + .filter((pl.col("ctry25cd") == ENGLAND_COUNTRY_CODE) & pl.col("doterm").is_null()) + .filter(_valid_wgs84_expr()) + ) return { _normalize_postcode(postcode): (float(lat), float(lon)) for postcode, lat, lon in df.select(["pcds", "lat", "long"]).iter_rows() @@ -470,7 +523,7 @@ def _london_postcode_tree(postcodes_path: Path) -> tuple[cKDTree, np.ndarray]: .filter( (pl.col("ctry25cd") == ENGLAND_COUNTRY_CODE) & pl.col("doterm").is_null() ) - .filter(pl.col("east1m").is_not_null() & pl.col("north1m").is_not_null()) + .filter(_valid_bng_expr()) .with_columns(_is_london_admin_expr().alias("is_london")) .select("east1m", "north1m", "is_london") ) diff --git a/pipeline/download/test_median_age.py b/pipeline/download/test_median_age.py new file mode 100644 index 0000000..ccff9f4 --- /dev/null +++ b/pipeline/download/test_median_age.py @@ -0,0 +1,75 @@ +import math + +import polars as pl +import pytest + +from pipeline.download import median_age +from pipeline.download.median_age import ( + AGE_BANDS, + EXPECTED_BAND_NAMES, + compute_median_age, +) + + +def test_expected_band_names_align_with_age_bands(): + assert len(EXPECTED_BAND_NAMES) == len(AGE_BANDS) + + +def test_compute_median_age_interpolates_within_median_band(): + # All weight in the 30-34 band -> median is the band midpoint via linear + # interpolation: 30 + ((50 - 0) / 100) * 5 = 32.5. + counts = [0] * len(AGE_BANDS) + counts[6] = 100 # "Aged 30 to 34 years" + assert compute_median_age(counts) == pytest.approx(32.5) + + # 50 below the median band, 100 inside the 35-39 band holding the median. + # half = 75; cumulative before band 7 = 50; 35 + ((75 - 50) / 100) * 5 = 36.25. + counts = [0] * len(AGE_BANDS) + counts[0] = 50 # below the median band + counts[7] = 100 # "Aged 35 to 39 years" holds the median + assert compute_median_age(counts) == pytest.approx(36.25) + + +def test_compute_median_age_empty_lsoa_is_nan(): + assert math.isnan(compute_median_age([0] * len(AGE_BANDS))) + + +def _pivoted(band_to_counts: dict[str, list]) -> pl.DataFrame: + """Build a pivot-shaped frame: GEOGRAPHY_CODE + one column per band.""" + n = len(next(iter(band_to_counts.values()))) + data = {"GEOGRAPHY_CODE": [f"E0100000{i}" for i in range(n)]} + data.update(band_to_counts) + return pl.DataFrame(data) + + +def test_null_band_count_is_treated_as_zero_not_crash(): + # One LSOA has a null in the 85+ band (NOMIS can return null for a band with + # zero people). It must be coerced to 0, not raise TypeError in sum(). With + # all 100 people in the 30-34 band the median is the band midpoint, 32.5. + counts_by_band = {name: [0] for name in EXPECTED_BAND_NAMES} + counts_by_band["Aged 30 to 34 years"] = [100] + counts_by_band["Aged 85 years and over"] = [None] + pivoted = _pivoted(counts_by_band) + + table = median_age._bands_to_median_table(pivoted) + + assert table.height == 1 + assert table["median_age"][0] == pytest.approx(32.5) + + +def test_missing_band_raises_clear_error(): + counts_by_band = {name: [10] for name in EXPECTED_BAND_NAMES} + del counts_by_band["Aged 85 years and over"] + pivoted = _pivoted(counts_by_band) + + with pytest.raises(ValueError, match=r"do not match the expected NOMIS"): + median_age._bands_to_median_table(pivoted) + + +def test_relabelled_band_raises_clear_error(): + counts_by_band = {name: [10] for name in EXPECTED_BAND_NAMES} + counts_by_band["Total"] = counts_by_band.pop("Aged 85 years and over") + pivoted = _pivoted(counts_by_band) + + with pytest.raises(ValueError, match=r"unexpected:"): + median_age._bands_to_median_table(pivoted) diff --git a/pipeline/download/test_noise.py b/pipeline/download/test_noise.py index ab5a01d..b7c159b 100644 --- a/pipeline/download/test_noise.py +++ b/pipeline/download/test_noise.py @@ -125,6 +125,205 @@ def test_download_raster_raises_on_missing_strict_tiles(monkeypatch, tmp_path): noise.download_raster(tmp_path, "base", "coverage", "Road") +def test_generate_tiles_neighbours_overlap_by_radius(): + tile_size = 20_000 + overlap = noise.POSTCODE_NOISE_RADIUS_M + tiles = noise._generate_tiles( + 0, 60_000, 0, 60_000, tile_size, overlap, tile_size + ) + + by_origin = {(min_e, min_n): (max_e, max_n) for min_e, min_n, max_e, max_n in tiles} + + # Horizontally adjacent tiles must overlap by >= overlap. + for (min_e, min_n), (max_e, _max_n) in by_origin.items(): + right_origin = (min_e + tile_size, min_n) + if right_origin in by_origin: + assert max_e - right_origin[0] >= overlap + + # Vertically adjacent tiles must overlap by >= overlap. + for (min_e, min_n), (_max_e, max_n) in by_origin.items(): + up_origin = (min_e, min_n + tile_size) + if up_origin in by_origin: + assert max_n - up_origin[1] >= overlap + + +def test_generate_tiles_clamps_to_grid_extent(): + tile_size = 20_000 + overlap = noise.POSTCODE_NOISE_RADIUS_M + tiles = noise._generate_tiles( + noise.BNG_MAX_E - tile_size, + noise.BNG_MAX_E, + noise.BNG_MAX_N - tile_size, + noise.BNG_MAX_N, + tile_size, + overlap, + tile_size, + ) + # The final (top-right) tile cannot extend past the England extent even + # though the overlap would otherwise push it beyond. + for _min_e, _min_n, max_e, max_n in tiles: + assert max_e <= noise.BNG_MAX_E + assert max_n <= noise.BNG_MAX_N + + +def _write_geotiff(path, data, left, top, resolution, nodata): + with rasterio.open( + path, + "w", + driver="GTiff", + height=data.shape[0], + width=data.shape[1], + count=1, + dtype=data.dtype, + crs="EPSG:27700", + transform=from_origin(left, top, resolution, resolution), + nodata=nodata, + ) as dataset: + dataset.write(data, 1) + + +def test_sample_noise_recovers_value_across_overlapping_seam(monkeypatch, tmp_path): + monkeypatch.setattr(noise, "POSTCODE_NOISE_RADIUS_M", 50) + monkeypatch.setattr(noise, "RESOLUTION", 10) + + # Two download tiles share a vertical seam at easting=100. _generate_tiles + # decides their real footprints: with the overlap fix the LEFT tile extends + # past the seam by POSTCODE_NOISE_RADIUS_M and thus covers a loud cell that + # physically sits just across the seam. + tile_size = 100 + overlap = noise.POSTCODE_NOISE_RADIUS_M + tiles = noise._generate_tiles(0, 200, 0, 100, tile_size, overlap, tile_size) + by_origin = { + (min_e, min_n): (max_e, max_n) for min_e, min_n, max_e, max_n in tiles + } + left_min_e, left_min_n = 0, 0 + left_max_e, left_max_n = by_origin[(left_min_e, left_min_n)] + # Overlap fix is what makes the left tile reach across the seam. + assert left_max_e > 100 + + # The loud 70 dB cell centre is at easting 105 (just across the seam) and + # the postcode point is at easting 75 in the left tile, within 50m of it. + res = noise.RESOLUTION + width = int((left_max_e - left_min_e) // res) + height = int((left_max_n - left_min_n) // res) + left_data = np.zeros((height, width), dtype=np.float32) + loud_row = height - 1 - int((25 - left_min_n) // res) # northing ~25 + loud_col = int((105 - left_min_e) // res) # easting ~105 + left_data[loud_row, loud_col] = 70.0 + _write_geotiff( + tmp_path / "left.tif", left_data, left_min_e, left_max_n, res, nodata=0 + ) + + # The right tile holds the same loud cell but the postcode point is NOT + # inside it, so without overlap the value would be lost for that point. + right_min_e, right_min_n = 100, 0 + right_max_e, right_max_n = by_origin[(right_min_e, right_min_n)] + rwidth = int((right_max_e - right_min_e) // res) + rheight = int((right_max_n - right_min_n) // res) + right_data = np.zeros((rheight, rwidth), dtype=np.float32) + right_data[rheight - 1 - int((25 - right_min_n) // res), 0] = 70.0 + _write_geotiff( + tmp_path / "right.tif", right_data, right_min_e, right_max_n, res, nodata=0 + ) + + result = noise.sample_noise_at_postcodes( + [tmp_path / "left.tif", tmp_path / "right.tif"], + easting=np.array([75.0]), + northing=np.array([25.0]), + label="Road", + col_name="road_noise_lden_db", + ) + + assert result.to_list() == [70.0] + + +def test_sample_noise_distinguishes_nodata_from_in_coverage_quiet( + monkeypatch, tmp_path +): + monkeypatch.setattr(noise, "POSTCODE_NOISE_RADIUS_M", 0) + monkeypatch.setattr(noise, "RESOLUTION", 10) + + # Defra encodes TRUE nodata as the -96.0 sentinel; genuinely quiet ground + # below the lowest reporting band is 0.0. With a 0m radius each postcode + # reads exactly one cell, so we can pin behaviour per cell: + # -96.0 sentinel -> null ("we don't know") + # 0.0 in-coverage -> NOISE_QUIET_FLOOR_DB ("we know it's quiet") + # 65.0 -> 65.0 (a real modelled reading) + data = np.array( + [ + [-96.0, 0.0, 65.0], + ], + dtype=np.float32, + ) + _write_geotiff(tmp_path / "noise.tif", data, 0, 10, 10, nodata=-96.0) + + result = noise.sample_noise_at_postcodes( + [tmp_path / "noise.tif"], + # Cell centres at easting 5 (nodata), 15 (quiet 0.0), 25 (loud 65). + easting=np.array([5.0, 15.0, 25.0]), + northing=np.array([5.0, 5.0, 5.0]), + label="Road", + col_name="road_noise_lden_db", + ) + + assert result.to_list() == [None, float(noise.NOISE_QUIET_FLOOR_DB), 65.0] + + +def test_sample_noise_preserves_genuine_reading_above_quiet_floor(monkeypatch, tmp_path): + monkeypatch.setattr(noise, "POSTCODE_NOISE_RADIUS_M", 0) + monkeypatch.setattr(noise, "RESOLUTION", 10) + + # The lowest Defra reporting band is 40.0 dB; genuine readings populate + # [40, ~80]. A genuine in-coverage reading at or just above the floor must be + # PRESERVED, not clamped UP to the floor — only true-quiet 0.0 is floored. A + # quiet floor set too high (e.g. 45) would inflate the ~35% of real 40-44.99 + # dB readings; this pins that they survive unchanged. + floor = float(noise.NOISE_QUIET_FLOOR_DB) + data = np.array( + [ + [42.0, floor, 0.0], + ], + dtype=np.float32, + ) + _write_geotiff(tmp_path / "noise.tif", data, 0, 10, 10, nodata=-96.0) + + result = noise.sample_noise_at_postcodes( + [tmp_path / "noise.tif"], + # Cell centres at easting 5 (42 dB), 15 (floor), 25 (quiet 0.0). + easting=np.array([5.0, 15.0, 25.0]), + northing=np.array([5.0, 5.0, 5.0]), + label="Road", + col_name="road_noise_lden_db", + ) + + # 42 preserved (NOT raised to the floor), floor preserved, 0.0 -> floor. + assert result.to_list() == [42.0, floor, floor] + # The floor must sit at/below the lowest genuine reading so nothing inflates. + assert floor <= 42.0 + + +def test_sample_noise_nodata_window_stays_null(monkeypatch, tmp_path): + monkeypatch.setattr(noise, "POSTCODE_NOISE_RADIUS_M", 15) + monkeypatch.setattr(noise, "RESOLUTION", 10) + + # A postcode whose entire 3x3 max-window is the -96.0 sentinel must remain + # null: no in-coverage cell was read, so "quiet" must NOT be inferred. + data = np.full((5, 5), -96.0, dtype=np.float32) + data[4, 4] = 70.0 # one loud cell, far from the nodata corner + _write_geotiff(tmp_path / "noise.tif", data, 0, 50, 10, nodata=-96.0) + + result = noise.sample_noise_at_postcodes( + [tmp_path / "noise.tif"], + # Top-left point: its 3x3 window is cells (rows 0-1, cols 0-1) = all -96. + easting=np.array([5.0]), + northing=np.array([45.0]), + label="Road", + col_name="road_noise_lden_db", + ) + + assert result.to_list() == [None] + + def test_sample_noise_at_postcodes_uses_local_maximum(monkeypatch, tmp_path): monkeypatch.setattr(noise, "POSTCODE_NOISE_RADIUS_M", 15) monkeypatch.setattr(noise, "RESOLUTION", 10) diff --git a/pipeline/download/test_places.py b/pipeline/download/test_places.py index 30ae9ec..48c9bae 100644 --- a/pipeline/download/test_places.py +++ b/pipeline/download/test_places.py @@ -9,10 +9,12 @@ from pipeline.download.places import ( _display_city_from_tags, _is_dlr_station, _is_tram_station, + _london_postcode_tree, _naptan_dlr_stations, _normalize_street_name, _ofs_universities, _outcode_of_postcode, + _outcode_tree, _pois_to_places, _select_university_name, _station_display_name, @@ -242,6 +244,42 @@ def test_pois_to_places_keeps_high_value_named_pois_only(): assert all(place["travel_destination"] is False for place in places) +def test_pois_to_places_keeps_distinct_same_named_pois(): + # Two genuinely distinct POIs sharing a name, far apart (London vs Bristol). + pois = pl.DataFrame( + { + "name": ["Victoria Park", "Victoria Park"], + "category": ["leisure/park", "leisure/park"], + "lat": [51.54, 51.46], + "lng": [-0.04, -2.60], + } + ) + + places = _pois_to_places(pois) + + assert len(places) == 2 + assert {(place["lat"], place["lon"]) for place in places} == { + (51.54, -0.04), + (51.46, -2.60), + } + + +def test_pois_to_places_still_dedupes_colocated(): + # The same physical POI mapped twice a few metres apart collapses to one. + pois = pl.DataFrame( + { + "name": ["Victoria Park", "Victoria Park"], + "category": ["leisure/park", "leisure/park"], + "lat": [51.5400, 51.5401], + "lng": [-0.0400, -0.0399], + } + ) + + places = _pois_to_places(pois) + + assert len(places) == 1 + + def test_display_city_from_tags_uses_explicit_london_context(): assert _display_city_from_tags({"is_in": "Croydon, London, UK"}) == "London" assert _display_city_from_tags({"is_in": "Croydon, Cambridgeshire, UK"}) is None @@ -290,3 +328,52 @@ def test_assign_london_display_city_uses_nearest_active_postcode_admin(tmp_path) assert assigned == 2 assert [place["display_city"] for place in places] == ["London", "London", None] + + +def test_no_grid_reference_sentinel_is_excluded_from_coordinate_trees(tmp_path): + # ONS NSPL stores postcodes with no grid reference at the Null-Island sentinel + # lat=99.999999, long=0.0, whose paired BNG coords collapse to the (0, 0) origin. + # Such an active postcode must never enter the nearest-neighbour indexes. + sentinel = { + "pcds": "ZZ99 9ZZ", + "lat": 99.999999, + "long": 0.0, + "doterm": None, + "ctry25cd": "E92000001", + "east1m": 0, + "north1m": 0, + "rgn25cd": "E12000007", + "lad25cd": "E09000008", + "cty25cd": "E13000002", + } + croydon_easting, croydon_northing = WGS84_TO_BNG.transform(-0.101793, 51.371273) + real = { + "pcds": "CR0 1SZ", + "lat": 51.371273, + "long": -0.101793, + "doterm": None, + "ctry25cd": "E92000001", + "east1m": int(round(croydon_easting)), + "north1m": int(round(croydon_northing)), + "rgn25cd": "E12000007", + "lad25cd": "E09000008", + "cty25cd": "E13000002", + } + postcodes = tmp_path / "postcodes.parquet" + pl.DataFrame([sentinel, real]).write_parquet(postcodes) + + # lat/long outcode tree: only the real postcode survives, so a London-area query + # cannot be tagged with the sentinel's (empty) outcode. + tree, outcodes = _outcode_tree(postcodes) + assert tree.n == 1 + assert outcodes == ["CR0"] + _, idx = tree.query([[51.371273, -0.101793]]) + assert outcodes[idx[0]] == "CR0" + + # BNG London tree: only the real postcode survives, so the (0, 0) origin can never + # be the nearest neighbour of a real place. + bng_tree, london_flags = _london_postcode_tree(postcodes) + assert bng_tree.n == 1 + assert london_flags.tolist() == [True] + _, bng_idx = bng_tree.query([[croydon_easting, croydon_northing]]) + assert bng_idx[0] == 0 diff --git a/pipeline/download/uprn_lookup.py b/pipeline/download/uprn_lookup.py index 3f2c804..dcedc0f 100644 --- a/pipeline/download/uprn_lookup.py +++ b/pipeline/download/uprn_lookup.py @@ -14,7 +14,7 @@ from pathlib import Path import polars as pl from pipeline.local_temp import local_tmp_dir -from pipeline.utils import download, extract_zip +from pipeline.utils import code_col_overrides, download, extract_zip URL = "https://www.arcgis.com/sharing/rest/content/items/4e0b4b3fbc2540caae27e7be532e61be/data" @@ -34,16 +34,16 @@ def find_csvs(extract_path: Path) -> list[Path]: def convert_to_parquet(csv_paths: list[Path], parquet_path: Path) -> None: # Some regional files infer different types for the same column (e.g. - # ruc21ind is String in most but Int64 in YH). Read all code columns as - # String to avoid schema mismatches. - CODE_COLS = { - "ruc21ind": pl.String, - "oac21ind": pl.String, - "imd19ind": pl.String, - } + # ruc21ind is String in most but Int64 in YH), and string codes like "UN1" + # appear deep in the data. Read all classification-index code columns as + # String to avoid schema mismatches. NSUL renames the year suffixes each + # release and polars silently ignores overrides for missing columns, so + # match on the suffix-free stem (from the header) rather than hard-coding. + names = pl.scan_csv(csv_paths[0]).collect_schema().names() + code_cols = code_col_overrides(names) df = pl.concat( [ - pl.scan_csv(p, try_parse_dates=True, schema_overrides=CODE_COLS) + pl.scan_csv(p, try_parse_dates=True, schema_overrides=code_cols) for p in csv_paths ] ) diff --git a/pipeline/test_validate_outputs.py b/pipeline/test_validate_outputs.py index 748a661..5b93e2a 100644 --- a/pipeline/test_validate_outputs.py +++ b/pipeline/test_validate_outputs.py @@ -265,6 +265,72 @@ def test_rejects_contaminated_postcode_features(tmp_path, monkeypatch, capsys): assert "[0, 100]" in err +def test_postcode_universe_rejects_missing(tmp_path, monkeypatch, capsys): + arcgis_path = tmp_path / "arcgis.parquet" + postcodes_path = tmp_path / "postcode.parquet" + pl.DataFrame( + { + "pcds": ["AA1 1AA", "AA1 1AB", "AA1 1AC"], + "ctry25cd": ["E92000001", "E92000001", "E92000001"], + "doterm": [None, None, None], + } + ).write_parquet(arcgis_path) + # Only 1 of the 3 active English postcodes is present, all otherwise valid. + _write_postcode_features( + postcodes_path, + { + "Postcode": ["AA1 1AA"], + "lat": [51.5], + "lon": [-0.1], + "ctry25cd": ["E92000001"], + "% White": [80.0], + }, + ) + monkeypatch.setattr( + "sys.argv", + [ + "validate", + "--postcode-universe", + f"{arcgis_path}::{postcodes_path}", + ], + ) + assert main() == 1 + err = capsys.readouterr().err + assert "missing" in err + assert "2" in err # 2 of the 3 active postcodes are absent + + +def test_postcode_universe_accepts_exact_match(tmp_path, monkeypatch): + arcgis_path = tmp_path / "arcgis.parquet" + postcodes_path = tmp_path / "postcode.parquet" + pl.DataFrame( + { + "pcds": ["AA1 1AA", "AA1 1AB"], + "ctry25cd": ["E92000001", "E92000001"], + "doterm": [None, None], + } + ).write_parquet(arcgis_path) + _write_postcode_features( + postcodes_path, + { + "Postcode": ["AA1 1AA", "AA1 1AB"], + "lat": [51.5, 53.4], + "lon": [-0.1, -2.2], + "ctry25cd": ["E92000001", "E92000001"], + "% White": [80.0, 55.0], + }, + ) + monkeypatch.setattr( + "sys.argv", + [ + "validate", + "--postcode-universe", + f"{arcgis_path}::{postcodes_path}", + ], + ) + assert main() == 0 + + def test_validates_properties_subset(tmp_path, monkeypatch): postcode = tmp_path / "postcode.parquet" properties = tmp_path / "properties.parquet" diff --git a/pipeline/transform/crime_spatial.py b/pipeline/transform/crime_spatial.py index b1a4de7..4b54623 100644 --- a/pipeline/transform/crime_spatial.py +++ b/pipeline/transform/crime_spatial.py @@ -273,6 +273,28 @@ def _write_avg_yr( for type_idx, name in enumerate(ALL_CRIME_TYPES): data[f"{name} (avg/yr)"] = avg[:, type_idx] + # Serious/Minor rollup headlines, computed the SAME way as the by-year rollup + # bars (_write_by_year/_rollup_long): sum the rollup's types per year, then + # average over the years in which ANY of those types occurred. This keeps the + # headline equal to the mean of the "Serious/Minor crime (by year)" bars. + # Summing the per-type avg/yr values instead (as the merge previously did) + # divides each type by its OWN years-present and overstates the rollup when a + # postcode's serious/minor types occur in disjoint years. + for rollup_name, rollup_types in ( + ("Serious crime", SERIOUS_CRIME_TYPES), + ("Minor crime", MINOR_CRIME_TYPES), + ): + rollup_idx = [ALL_CRIME_TYPES.index(name) for name in rollup_types] + rollup_counts = counts[:, rollup_idx, :].sum(axis=1) # (n_postcodes, n_years) + rollup_per_year = per_year[:, rollup_idx, :].sum(axis=1) + rollup_years_present = np.clip( + (rollup_counts > 0).sum(axis=1), 1, None + ).astype(np.float64) + rollup_avg = rollup_per_year.sum(axis=1) / rollup_years_present + data[f"{rollup_name} (avg/yr)"] = np.round(rollup_avg * norm, 1).astype( + np.float32 + ) + output_path.parent.mkdir(parents=True, exist_ok=True) pl.DataFrame(data).write_parquet(output_path, compression="zstd") print(f"Wrote postcode crime averages: {output_path}") diff --git a/pipeline/transform/join_epc_pp.py b/pipeline/transform/join_epc_pp.py index dc34bb3..b20b8de 100644 --- a/pipeline/transform/join_epc_pp.py +++ b/pipeline/transform/join_epc_pp.py @@ -106,7 +106,14 @@ def _select_epc_columns(raw: pl.LazyFrame) -> pl.LazyFrame: .alias("potential_energy_rating"), _clean_string("property_type").alias("epc_property_type"), _clean_string("built_form").alias("built_form"), - _clean_string("inspection_date").alias("inspection_date"), + # Parse to a real Date once (unparseable/blank -> null) so dedup can + # sort newest-first with nulls_last and _event_year can use dt.year(); + # a lexicographic string sort would let a null/garbled date win under + # Polars' default nulls-first descending order. EPC inspection dates + # are ISO (YYYY-MM-DD). + _clean_string("inspection_date") + .str.to_date(format="%Y-%m-%d", strict=False) + .alias("inspection_date"), _clean_number("total_floor_area", pl.Float64).alias("total_floor_area"), _clean_number("number_habitable_rooms", pl.Int16).alias( "number_habitable_rooms" @@ -247,9 +254,11 @@ def _run(epc_path: Path, price_paid_path: Path, output_path: Path, temp_dir: Pat normalize_postcode_key(pl.col("epc_postcode")).alias("_epc_match_postcode"), ) - # Dedup fork: keep latest certificate per property (existing logic) + # Dedup fork: keep latest certificate per property. inspection_date is a typed + # Date (see _select_epc_columns); nulls_last keeps a real-dated cert ahead of a + # null/unparseable-dated one so the genuinely newest certificate is chosen. epc = ( - epc_base.sort("inspection_date", descending=True) + epc_base.sort("inspection_date", descending=True, nulls_last=True) .group_by("_epc_match_address", "_epc_match_postcode") .first() .drop("tenure") @@ -303,11 +312,7 @@ def _run(epc_path: Path, price_paid_path: Path, output_path: Path, temp_dir: Pat ) .filter(pl.col("_event").is_not_null()) .with_columns( - pl.col("inspection_date") - .cast(pl.String) - .str.slice(0, 4) - .cast(pl.Int32) - .alias("_event_year"), + pl.col("inspection_date").dt.year().cast(pl.Int32).alias("_event_year"), ) .group_by("_epc_match_address", "_epc_match_postcode") .agg( diff --git a/pipeline/transform/merge.py b/pipeline/transform/merge.py index a6d18f8..d844572 100644 --- a/pipeline/transform/merge.py +++ b/pipeline/transform/merge.py @@ -807,6 +807,22 @@ def _remap_terminated_postcodes( ) +def _dedupe_collapsed_properties(wide: pl.LazyFrame) -> pl.LazyFrame: + """Keep one row per (postcode, pp_address) — the most-recent transaction. + + The terminated-postcode remap can map two distinct postcodes onto one active + successor, collapsing the same physical address onto a single + (postcode, pp_address) key with conflicting sale records. Keep the row with + the latest date_of_transfer so the headline price/date reflect the most + recent transaction; genuinely distinct addresses (a different pp_address) are + untouched. pp_address is non-null here (join_epc_pp filters it), so the key + never merges unrelated rows. + """ + return wide.sort( + "date_of_transfer", descending=True, nulls_last=True + ).unique(subset=["postcode", "pp_address"], keep="first", maintain_order=True) + + def _filter_to_active_english_postcodes( wide: pl.LazyFrame, active_postcodes: pl.LazyFrame ) -> pl.LazyFrame: @@ -837,38 +853,19 @@ def _join_area_side_tables( ) # Crime is counted spatially per postcode (incidents within 50m of the - # postcode boundary), so it joins on postcode rather than LSOA. - base = base.join(crime, on="postcode", how="left") - serious_crime_cols = [ - "Violence and sexual offences (avg/yr)", - "Robbery (avg/yr)", - "Burglary (avg/yr)", - "Possession of weapons (avg/yr)", - ] - minor_crime_cols = [ - "Anti-social behaviour (avg/yr)", - "Criminal damage and arson (avg/yr)", - "Shoplifting (avg/yr)", - "Bicycle theft (avg/yr)", - "Theft from the person (avg/yr)", - "Other theft (avg/yr)", - "Vehicle crime (avg/yr)", - "Public order (avg/yr)", - "Drugs (avg/yr)", - "Other crime (avg/yr)", - ] - # The LEFT join leaves every per-type column null for postcodes absent from - # the crime table; sum_horizontal alone would fabricate a "zero crime" - # rollup there, so keep the rollup null when ALL components are null. - base = base.with_columns( - pl.when(pl.all_horizontal([pl.col(c).is_null() for c in serious_crime_cols])) - .then(None) - .otherwise(pl.sum_horizontal(serious_crime_cols)) - .alias("serious_crime_avg_yr"), - pl.when(pl.all_horizontal([pl.col(c).is_null() for c in minor_crime_cols])) - .then(None) - .otherwise(pl.sum_horizontal(minor_crime_cols)) - .alias("minor_crime_avg_yr"), + # postcode boundary), so it joins on postcode rather than LSOA. crime_spatial + # precomputes the Serious/Minor headline rollups as the mean of the by-year + # rollup bars; read those straight through (renamed to the internal columns + # _finalize_merged_columns expects) rather than re-summing the per-type + # avg/yr columns — summing divides each type by its OWN years-present and + # overstates the rollup when types differ in coverage. A postcode absent from + # the crime table keeps null rollups via the left join (no fabricated zero); + # the per-type avg/yr columns pass through unchanged for display. + base = base.join(crime, on="postcode", how="left").rename( + { + "Serious crime (avg/yr)": "serious_crime_avg_yr", + "Minor crime (avg/yr)": "minor_crime_avg_yr", + } ) base = base.join(median_age, on="lsoa21", how="left") @@ -881,7 +878,37 @@ def _join_area_side_tables( ) if tree_density is not None: base = base.join(tree_density, on="postcode", how="left") - return base.join(broadband, left_on="postcode", right_on="bb_postcode", how="left") + + # Broadband is the one side table sourced straight from a third-party CSV + # (Ofcom `postcode_space`) rather than from a sibling pcds-keyed pipeline + # step, so its postcode may drift in spacing/casing from the NSPL `pcds` + # base key. Normalize BOTH sides to the same canonical pcds form (reusing + # `_canonical_postcode_expr`, exactly as the listing/EPC re-anchor joins do) + # before joining, otherwise a real postcode silently misses and its + # `max_download_speed` reads as null "no data" downstream. Re-aggregate on + # the canonical key so two raw spellings collapsing to one key can't fan out + # the base; drop a null canonical key so an unparseable Ofcom row joins + # nothing rather than matching a null-key base row. + broadband_canonical = ( + broadband.with_columns( + _canonical_postcode_expr("bb_postcode").alias("_bb_canonical_postcode") + ) + .drop_nulls("_bb_canonical_postcode") + .group_by("_bb_canonical_postcode") + .agg(pl.col("max_download_speed").max()) + ) + return ( + base.with_columns( + _canonical_postcode_expr("postcode").alias("_base_canonical_postcode") + ) + .join( + broadband_canonical, + left_on="_base_canonical_postcode", + right_on="_bb_canonical_postcode", + how="left", + ) + .drop("_base_canonical_postcode") + ) def _finalize_merged_columns(frame: pl.LazyFrame) -> pl.LazyFrame: @@ -1328,7 +1355,7 @@ def _load_direct_epc_candidates( ) return ( - epc_base.sort("inspection_date", descending=True) + epc_base.sort("inspection_date", descending=True, nulls_last=True) .group_by("_direct_epc_match_address", "_direct_epc_match_postcode") .first() .join( @@ -1918,6 +1945,10 @@ def _build( # terminated English postcodes are retained under their successor postcode. postcode_mapping = build_postcode_mapping(arcgis_path) wide = _remap_terminated_postcodes(wide, postcode_mapping.lazy()) + # The remap can collapse two terminated postcodes onto one active successor, + # duplicating a physical address's (postcode, pp_address) key; keep only the + # most-recent transaction per address before the per-postcode joins. + wide = _dedupe_collapsed_properties(wide) arcgis_raw = pl.scan_parquet(arcgis_path) arcgis = _active_english_postcode_area(arcgis_raw) active_postcodes = arcgis.select("postcode").unique() diff --git a/pipeline/transform/noise_overlay_tiles.py b/pipeline/transform/noise_overlay_tiles.py index b242fad..671c9c8 100644 --- a/pipeline/transform/noise_overlay_tiles.py +++ b/pipeline/transform/noise_overlay_tiles.py @@ -164,19 +164,39 @@ def _read_noise_tile( for info in candidates: with rasterio.open(info.path) as source: + # The Defra rasters encode genuine "quiet / below threshold" as the + # value 0.0 (only -96.0 is true nodata). Mask both BEFORE + # reprojecting so resampling never blends a 0 cell into an adjacent + # loud corridor and fabricates a halo of intermediate dB. + # + # Lden values are dB (a logarithmic scale), so bilinear resampling + # would arithmetically average neighbouring dB cells, which is + # acoustically wrong (it diluted a 75 dB peak to ~53 dB in tests) + # and inconsistent with the postcode sampler. Use Resampling.max: + # it preserves peak corridors, never invents an intermediate dB + # between a masked (NaN) quiet cell and a loud one, and mirrors the + # max semantics of sample_noise_at_postcodes. + src_arr = source.read(1).astype(np.float32) + nodata = source.nodata + invalid = ~np.isfinite(src_arr) | (src_arr <= 0) + if nodata is not None: + invalid |= np.isclose( + src_arr, np.float32(nodata), rtol=1e-5, atol=1e-5 + ) + src_arr = np.where(invalid, np.float32("nan"), src_arr) tile = np.full((tile_size, tile_size), np.nan, dtype=np.float32) reproject( - source=rasterio.band(source, 1), + source=src_arr, destination=tile, src_transform=source.transform, src_crs=source.crs, - src_nodata=source.nodata if source.nodata is not None else 0, + src_nodata=float("nan"), dst_transform=from_bounds( left, bottom, right, top, tile_size, tile_size ), dst_crs=WEB_MERCATOR_CRS, dst_nodata=np.nan, - resampling=Resampling.bilinear, + resampling=Resampling.max, ) tile[~np.isfinite(tile) | (tile <= 0)] = np.nan diff --git a/pipeline/transform/postcode_boundaries/test_postcode_boundaries.py b/pipeline/transform/postcode_boundaries/test_postcode_boundaries.py index 2c20d33..0b6ce96 100644 --- a/pipeline/transform/postcode_boundaries/test_postcode_boundaries.py +++ b/pipeline/transform/postcode_boundaries/test_postcode_boundaries.py @@ -27,7 +27,7 @@ from .output import ( to_wgs84_geojson_multi, write_district_geojson, ) -from .process_oa import _extract_polygonal, process_oa +from .process_oa import MIN_GEOM_AREA, _extract_polygonal, process_oa from .uprn import get_oa_uprns, load_uprns from .voronoi import _equal_split_fallback, compute_voronoi_regions @@ -341,6 +341,65 @@ class TestVoronoiDeduplication: assert "B" in result, "Postcode B missing with int64 coords" +class TestVoronoiCoincidentClusterNotCrushed: + """3+ postcodes at one coordinate must each keep a real cell. + + Pre-fix, the first coincident postcode stayed unjittered at the exact + cluster centre; with other seeds in the OA its Voronoi cell was squeezed + below MIN_GEOM_AREA, so _clean_polygonal dropped that active postcode + downstream. The fix spreads coincident postcodes onto a small regular + polygon (equal wedges), so none is crushed. + """ + + def test_coincident_cluster_plus_outer_seed_no_postcode_crushed(self): + # A block of flats: 4 distinct postcodes share one building coordinate, + # plus one other postcode elsewhere in the OA. Pre-fix, the centre seed's + # cell collapsed to ~0.0001 m^2 (< MIN_GEOM_AREA) and the postcode was + # dropped; every postcode must now keep a non-degenerate cell. + boundary = box(0, 0, 1000, 1000) + points = np.array( + [ + [500, 500], # A — coincident + [500, 500], # B — coincident + [500, 500], # C — coincident + [500, 500], # D — coincident + [100, 100], # OUT — elsewhere in the OA + ], + dtype=np.float64, + ) + postcodes = ["A", "B", "C", "D", "OUT"] + result = compute_voronoi_regions(points, postcodes, boundary) + for pc in postcodes: + assert pc in result, f"Postcode {pc} was dropped" + assert result[pc].area > MIN_GEOM_AREA, ( + f"Postcode {pc} cell {result[pc].area} <= MIN_GEOM_AREA" + ) + + def test_coincident_cluster_partitions_into_fair_wedges(self, square_boundary): + # N postcodes sharing one coordinate split the surrounding area into + # roughly equal wedges (regular-polygon seeds), none degenerate. + points = np.array([[500050, 180050]] * 5, dtype=np.float64) + postcodes = ["A", "B", "C", "D", "E"] + result = compute_voronoi_regions(points, postcodes, square_boundary) + fair_share = square_boundary.area / len(postcodes) + for pc in postcodes: + assert pc in result, f"Postcode {pc} was dropped" + # Each wedge is a meaningful fraction of its fair share (not crushed). + assert result[pc].area > 0.3 * fair_share, ( + f"Postcode {pc} cell {result[pc].area} far below fair share {fair_share}" + ) + + def test_two_coincident_split_is_fair(self, square_boundary): + """Regression: two postcodes at one coordinate split ~50/50.""" + points = np.array([[500050, 180050], [500050, 180050]], dtype=np.float64) + postcodes = ["A", "B"] + result = compute_voronoi_regions(points, postcodes, square_boundary) + assert "A" in result and "B" in result + total = result["A"].area + result["B"].area + assert result["A"].area / total > 0.4 + assert result["B"].area / total > 0.4 + + # --------------------------------------------------------------------------- # Bug 4: Voronoi collinear fallback gives everything to first postcode # --------------------------------------------------------------------------- diff --git a/pipeline/transform/postcode_boundaries/voronoi.py b/pipeline/transform/postcode_boundaries/voronoi.py index 21ce654..6d80d12 100644 --- a/pipeline/transform/postcode_boundaries/voronoi.py +++ b/pipeline/transform/postcode_boundaries/voronoi.py @@ -20,33 +20,48 @@ def compute_voronoi_regions( # Convert to float64 so sub-metre jitter isn't truncated. points = points.astype(np.float64) - # Deduplicate points, keeping one per (location, postcode) pair. - # Multiple postcodes at the same coordinate each get their own point, - # jittered by a tiny offset (0.01m) so Voronoi can distinguish them. - # Coords are rounded to mm precision for stable hashing — UPRN inputs are - # already integer metres, but the float64 cast can introduce ULP noise. - GOLDEN_ANGLE = np.pi * (3.0 - np.sqrt(5.0)) + # Deduplicate points, keeping one per (location, postcode) pair. Coords are + # rounded to mm precision for stable hashing — UPRN inputs are already integer + # metres, but the float64 cast can introduce ULP noise. + # + # Where several DISTINCT postcodes share one coordinate, jitter ALL of them + # onto a small regular polygon (equal 0.01m radius, equally spaced by angle) + # so their Voronoi cells become equal wedges and NONE is crushed. Leaving any + # seed at the centre — or innermost on a spiral — squeezes its cell below + # MIN_GEOM_AREA, which _clean_polygonal then drops downstream, silently losing + # an active postcode. Seeds at a UNIQUE coordinate are left exactly on their + # UPRN (no perturbation of normal Voronoi output). Coords are rounded to mm + # for stable hashing (the float64 cast can add ULP noise). + rounded_coords = [ + (round(float(points[i, 0]), 3), round(float(points[i, 1]), 3)) + for i in range(len(points)) + ] + coord_postcodes: dict[tuple[float, float], set[str]] = defaultdict(set) + for coord, pc in zip(rounded_coords, postcodes): + coord_postcodes[coord].add(pc) + seen: dict[tuple[float, float, str], bool] = {} unique_pts = [] unique_pcs = [] coord_counts: dict[tuple[float, float], int] = defaultdict(int) for i in range(len(points)): - coord = (round(float(points[i, 0]), 3), round(float(points[i, 1]), 3)) + coord = rounded_coords[i] key = (coord[0], coord[1], postcodes[i]) if key not in seen: seen[key] = True - jitter_idx = coord_counts[coord] - coord_counts[coord] += 1 - if jitter_idx == 0: - unique_pts.append(points[i].copy()) - else: - # Golden-angle spacing distributes any number of jittered - # points evenly around (and outward from) the original coord. + count = len(coord_postcodes[coord]) + if count > 1: + # Coincident cluster: equally-spaced regular polygon -> equal + # Voronoi wedges, so every postcode here keeps a fair share. + jitter_idx = coord_counts[coord] + coord_counts[coord] += 1 + angle = 2.0 * np.pi * jitter_idx / count jittered = points[i].copy() - angle = jitter_idx * GOLDEN_ANGLE jittered[0] += 0.01 * np.cos(angle) jittered[1] += 0.01 * np.sin(angle) unique_pts.append(jittered) + else: + unique_pts.append(points[i].copy()) unique_pcs.append(postcodes[i]) if len(unique_pts) == 1: diff --git a/pipeline/transform/price_estimation/index.py b/pipeline/transform/price_estimation/index.py index 676c0b8..e667ae3 100644 --- a/pipeline/transform/price_estimation/index.py +++ b/pipeline/transform/price_estimation/index.py @@ -19,8 +19,7 @@ from tqdm import tqdm from pipeline.transform.price_estimation.shrinkage import ( blend_dicts, hierarchical_shrinkage, - reanchor_dict, - reanchor_dicts, + lift_onto_parent, shrink_dicts, spatial_smooth, ) @@ -169,33 +168,47 @@ def solve_robust_index( signs_arr = np.concatenate([np.ones(mask2.sum()), -np.ones(mask1.sum())]) # Temporal smoothness prior: penalise curvature in the year betas with a - # second-difference penalty lambda * (beta_t - 2*beta_{t-1} + beta_{t-2})^2, - # encoded as extra least-squares rows (sqrt(lambda) * [1, -2, 1] against a - # zero target). This damps single-year index spikes without flattening - # genuine multi-year trends. Betas are ordered by calendar year; the baseline - # year (min_year, implicit beta=0) has no column, so the penalty spans the - # non-baseline years only. For cells with <3 betas there is no curvature to - # penalise and the solve is unchanged. + # second-difference penalty lambda * (d2 beta / dt2)^2, encoded as extra + # least-squares rows (sqrt(lambda) * [w0, w1, w2] against a zero target). + # The weights are the CALENDAR-SPACING-AWARE second-derivative coefficients + # for the consecutive triple (y0, y1, y2), so gap years are not treated as + # adjacent: a multi-year gap relaxes the penalty (correctly preserving a + # genuine level jump) instead of forcing a smooth ramp. For unit spacing + # (1, 1) these reduce to [1, -2, 1], leaving contiguous cells unchanged. + # This damps single-year index spikes without flattening genuine trends. + # Betas are ordered by calendar year; the baseline year (min_year, implicit + # beta=0) has no column, so the penalty spans the non-baseline years only. + # For cells with <3 betas there is no curvature to penalise and the solve is + # unchanged. n_pen = 0 pen_rows_arr = pen_cols_arr = np.empty(0, dtype=np.int64) pen_vals_arr = pen_b = np.empty(0, dtype=np.float64) if TEMPORAL_SMOOTHNESS_LAMBDA > 0 and n_cols >= 3: sqrt_lambda = float(np.sqrt(TEMPORAL_SMOOTHNESS_LAMBDA)) - cols_by_year = [c for _, c in sorted(year_to_col.items())] + years_sorted = sorted(year_to_col) + cols_by_year = [year_to_col[y] for y in years_sorted] n_pen = n_cols - 2 pen_rows = np.repeat(n + np.arange(n_pen), 3) pen_cols = np.empty(n_pen * 3, dtype=np.int64) + pen_vals = np.empty(n_pen * 3, dtype=np.float64) for k in range(n_pen): pen_cols[3 * k : 3 * k + 3] = ( cols_by_year[k], cols_by_year[k + 1], cols_by_year[k + 2], ) + y0, y1, y2 = years_sorted[k], years_sorted[k + 1], years_sorted[k + 2] + w0 = 2.0 / ((y1 - y0) * (y2 - y0)) + w1 = -2.0 / ((y1 - y0) * (y2 - y1)) + w2 = 2.0 / ((y2 - y1) * (y2 - y0)) + pen_vals[3 * k : 3 * k + 3] = ( + sqrt_lambda * w0, + sqrt_lambda * w1, + sqrt_lambda * w2, + ) pen_rows_arr = pen_rows.astype(np.int64) pen_cols_arr = pen_cols - pen_vals_arr = np.tile( - [sqrt_lambda, -2.0 * sqrt_lambda, sqrt_lambda], n_pen - ).astype(np.float64) + pen_vals_arr = pen_vals pen_b = np.zeros(n_pen, dtype=np.float64) n_total_rows = n + n_pen @@ -252,7 +265,11 @@ def compute_indices_for_level(pairs: pl.DataFrame, group_col: str): idx = solve_robust_index(y1, y2, lr, w) if idx: indices[key] = idx - n_pairs[key] = len(y1) + # Count only information-bearing pairs: same-year (year1==year2) and + # baseline-baseline pairs cancel in the sparse solve and contribute + # zero information to the annual index, so including them would + # inflate the shrinkage weight n/(n+k) and under-shrink noisy sectors. + n_pairs[key] = int(np.count_nonzero(y2 != y1)) return indices, n_pairs @@ -433,20 +450,17 @@ def build_index( f" {len(area_idx)} areas, {len(district_idx)} districts, {len(sector_idx)} sectors" ) - # Re-anchor every repeat-sales dict to the global base year before any - # shrinkage/smoothing/blending. solve_robust_index anchors each cell to - # log-index 0 at its OWN earliest year, so cells with shorter histories - # are measured from a later origin; combining them key-by-key would - # otherwise average level-incompatible numbers. The hedonic fallback is - # already anchored at min_year, so we align everything to min_year. - national_idx = reanchor_dict(national_idx, min_year) - area_idx = reanchor_dicts(area_idx, min_year) - district_idx = reanchor_dicts(district_idx, min_year) - sector_idx = reanchor_dicts(sector_idx, min_year) - - # Shrinkage: national -> hedonic first, then hierarchical + # Shrinkage: national -> hedonic first, then hierarchical. Each cell is + # anchored to log-index 0 at its OWN earliest year (solve_robust_index), + # so cells with shorter histories sit on a later origin than their wider + # parents. Before each blend we lift the child onto its parent's base at + # the child's first year (lift_onto_parent) -- otherwise combining them + # key-by-key averages level-incompatible numbers. The hedonic fallback is + # anchored at the global min_year, so it serves as the base for national. print(" Applying shrinkage...") - national_shrunk = shrink_dicts(national_idx, hedonic_idx, national_n) + national_shrunk = shrink_dicts( + lift_onto_parent(national_idx, hedonic_idx), hedonic_idx, national_n + ) sector_shrunk = hierarchical_shrinkage( sector_idx, sector_n, @@ -459,6 +473,7 @@ def build_index( sector_to_dist, dist_to_area, shrink_dicts, + lift_onto_parent, ) # Spatial smoothing diff --git a/pipeline/transform/price_estimation/knn.py b/pipeline/transform/price_estimation/knn.py index 0c18157..50c3b18 100644 --- a/pipeline/transform/price_estimation/knn.py +++ b/pipeline/transform/price_estimation/knn.py @@ -142,6 +142,20 @@ def _sale_identity_matches( target_price: float, target_sale_date: int, ) -> np.ndarray: + """Mark pool comparables that are (almost certainly) the target's own sale. + + properties.parquet has no per-property id, so a sale is identified by the + proxy tuple (postcode, price within 0.5, sale_date) to keep a target's own + prior sale out of its comparable set (leakage prevention). + + Limitation: new-build / bulk blocks sell many DISTINCT properties in one + postcode on the same day at the same price, so all such siblings collide on + this proxy and are excluded together. This is intentional conservative + over-exclusion: it guarantees no leakage at the cost of occasionally + dropping legitimate same-(postcode, price, date) siblings. The effect is + bounded (~1.8% of the pool) and a precise fix would require a per-property + id that the data does not carry. + """ if not target_postcode or not np.isfinite(target_price) or target_sale_date < 0: return np.zeros(len(pool_postcodes), dtype=bool) return ( @@ -166,6 +180,16 @@ def knn_median_psm( PSM is at the reference date used when building the pool. NaN where not computable (missing coords, unknown type, too few neighbors). + + Coordinate limitation: lat/lon come from postcode.parquet (one centroid per + postcode), so every property within a postcode is co-located. For a dense + postcode the "k nearest" therefore degenerates into an arbitrary + same-postcode subset whose membership is decided by KDTree index order + rather than true proximity. No property-level coordinates exist to fix this, + so the kNN signal is treated as a weak, noisy prior: the downstream guarded + blend (guarded_blend_estimates) only blends kNN when it is close to the + index estimate and otherwise discards it, bounding the impact of this + degeneracy. The result is deterministic for a fixed pool order. """ n = len(lat) result = np.full(n, np.nan) diff --git a/pipeline/transform/price_estimation/shrinkage.py b/pipeline/transform/price_estimation/shrinkage.py index 51101fe..a74d5e3 100644 --- a/pipeline/transform/price_estimation/shrinkage.py +++ b/pipeline/transform/price_estimation/shrinkage.py @@ -36,26 +36,43 @@ def _base_value(index: dict[int, float], base_year: int) -> float: return index[prior[-1]] -def reanchor_dict(index: dict[int, float], base_year: int) -> dict[int, float]: - """Re-anchor an index dict so index[base_year] == 0 (pure constant shift). +def lift_onto_parent( + child: dict[int, float], parent: dict[int, float] +) -> dict[int, float]: + """Lift a child index onto its parent's base before blending the two. - Subtracting the same constant from every year preserves all within-dict - year-to-year differences, so estimate.py's (current - sale) semantics are - unchanged; it only fixes the cross-dict level mismatch before blending. + solve_robust_index anchors every cell to log-index 0 at its OWN earliest + year, so a cell with a shorter history sits on a later origin than its + (wider) parent. Combining them key-by-key would average level-incompatible + numbers (a sector measured from 2008 blended with a district measured from + 1996). We add the parent's accumulated level at the child's first year, so + ``child[start] == parent[start]``: the child's own year-to-year moves are + layered on top of the parent's growth up to that point -- the same + assumption shrinkage already makes for years the child lacks. + + Re-basing on each cell's OWN earliest year (rather than the global base, + which the child cannot observe) is what makes this effective: subtracting + the child's value at the global base is always 0 and changes nothing. + + The shift is a single constant added to every year of the child, so the + child's own year-to-year differences are preserved. PRECONDITION for the + downstream estimate to be unaffected within the child's range: the parent's + year coverage must be a superset of the child's. This holds throughout + build_index, where each parent aggregates a superset of its children's sale + pairs, so shrink_dicts blends every child year against a present parent year + and the constant shift cancels in a within-range (current - sale) difference; + only comparisons that span the child's start year (e.g. a sale predating the + cell's own data) change. If a caller violates the precondition (a child year + the parent lacks), shrink_dicts passes that year through unshrunk and the + cancellation no longer holds. """ - if not index: - return index - shift = _base_value(index, base_year) - if shift == 0.0: - return index - return {y: v - shift for y, v in index.items()} - - -def reanchor_dicts( - indices: dict[str, dict[int, float]], base_year: int -) -> dict[str, dict[int, float]]: - """Re-anchor every index dict in a mapping to the common `base_year`.""" - return {key: reanchor_dict(idx, base_year) for key, idx in indices.items()} + if not child or not parent: + return child + child_start = min(child) + offset = _base_value(parent, child_start) - child[child_start] + if offset == 0.0: + return child + return {y: v + offset for y, v in child.items()} def shrink_dicts(raw: dict, parent: dict, n: int) -> dict: @@ -84,30 +101,40 @@ def hierarchical_shrinkage( sector_to_dist: dict[str, str], dist_to_area: dict[str, str], shrink_fn: Callable[[V, V, int], V], + lift_fn: Callable[[V, V], V] | None = None, ) -> dict[str, V]: """Top-down hierarchical shrinkage: area->top, district->area, sector->district. `top_level` is the ultimate fallback value (e.g. national shrunk toward hedonic, or just national). `shrink_fn(raw, parent, n)` blends raw toward parent. + `lift_fn(raw, parent)`, if given, re-bases raw onto its parent before blending + (see lift_onto_parent); pass None for category-keyed dicts where re-basing is + meaningless. """ + + def combine(raw: V, parent: V, n: int) -> V: + if lift_fn is not None: + raw = lift_fn(raw, parent) + return shrink_fn(raw, parent, n) + # Area -> top level area_shrunk = {} for area, val in area_vals.items(): - area_shrunk[area] = shrink_fn(val, top_level, area_n[area]) + area_shrunk[area] = combine(val, top_level, area_n[area]) # District -> area district_shrunk = {} for dist, val in district_vals.items(): a = dist_to_area.get(dist, "") parent = area_shrunk.get(a, top_level) - district_shrunk[dist] = shrink_fn(val, parent, district_n[dist]) + district_shrunk[dist] = combine(val, parent, district_n[dist]) # Sector -> district sector_shrunk = {} for sec, val in sector_vals.items(): d = sector_to_dist.get(sec, "") parent = district_shrunk.get(d, top_level) - sector_shrunk[sec] = shrink_fn(val, parent, sector_n[sec]) + sector_shrunk[sec] = combine(val, parent, sector_n[sec]) # Fill sectors without their own values for sec in all_sectors: diff --git a/pipeline/transform/price_estimation/test_index.py b/pipeline/transform/price_estimation/test_index.py new file mode 100644 index 0000000..4cc9e8e --- /dev/null +++ b/pipeline/transform/price_estimation/test_index.py @@ -0,0 +1,135 @@ +import numpy as np +import polars as pl + +from pipeline.transform.price_estimation import index as index_mod +from pipeline.transform.price_estimation.index import ( + compute_indices_for_level, + solve_robust_index, +) + + +def _pairs_from_path(true_levels: dict[int, float]): + """Build adjacent-year repeat-sale pairs that exactly trace a known path. + + Each consecutive pair's log_ratio is the difference of the true log-levels, + so the solver should recover the levels exactly (relative to the min year). + """ + years = sorted(true_levels) + y1, y2, lr, w = [], [], [], [] + for a, b in zip(years[:-1], years[1:]): + y1.append(a) + y2.append(b) + lr.append(true_levels[b] - true_levels[a]) + w.append(1.0) + return ( + np.array(y1, dtype=np.int32), + np.array(y2, dtype=np.int32), + np.array(lr, dtype=np.float64), + np.array(w, dtype=np.float64), + ) + + +def test_solver_recovers_contiguous_path(): + """A contiguous price path is recovered as log-levels relative to min_year. + + Proves the IRLS solver is correct (and unchanged) for contiguous data: the + spacing-aware penalty reduces to the standard [1,-2,1] for unit spacing. + """ + years = range(2010, 2021) + true = {y: 0.04 * (y - 2010) for y in years} # smooth (zero curvature) ramp + # Replicate each adjacent pair so MIN_PAIRS is comfortably met. + y1, y2, lr, w = _pairs_from_path(true) + y1 = np.tile(y1, 3) + y2 = np.tile(y2, 3) + lr = np.tile(lr, 3) + w = np.tile(w, 3) + + idx = solve_robust_index(y1, y2, lr, w) + + assert idx[2010] == 0.0 # baseline anchor + for y in years: + assert abs(idx[y] - (true[y] - true[2010])) < 1e-3 + + +def test_gap_spanning_level_jump_is_not_smoothed_into_a_ramp(): + """FIX #5: a sharp true level jump across a multi-year gap is preserved. + + Coverage is 2000,2001,2002 then 2015,2016 with cross-gap pairs encoding a + sharp jump at the gap. The uniform [1,-2,1] curvature penalty treats + (beta_2002, beta_2015, beta_2016) as three adjacent years and over-penalizes + the genuine level jump, biasing beta_2015 down toward a smooth ramp. The + spacing-aware second difference relaxes the penalty across the gap. + """ + # True log-levels relative to min_year (2000 anchored at 0). + true = { + 2000: 0.0, + 2001: 0.05, + 2002: 0.10, + 2015: 1.10, # sharp +1.0 jump across the gap + 2016: 1.15, + } + + y1, y2, lr, w = [], [], [], [] + + def add(a, b, n=4): + for _ in range(n): + y1.append(a) + y2.append(b) + lr.append(true[b] - true[a]) + w.append(1.0) + + # In-segment adjacent pairs. + add(2000, 2001) + add(2001, 2002) + add(2015, 2016) + # Cross-gap pairs consistent with the sharp jump. + add(2002, 2015) + add(2002, 2016) + + y1 = np.array(y1, dtype=np.int32) + y2 = np.array(y2, dtype=np.int32) + lr = np.array(lr, dtype=np.float64) + w = np.array(w, dtype=np.float64) + + # Use a strong penalty to make the smoothing bias obvious. + original = index_mod.TEMPORAL_SMOOTHNESS_LAMBDA + index_mod.TEMPORAL_SMOOTHNESS_LAMBDA = 1.0 + try: + idx = solve_robust_index(y1, y2, lr, w) + finally: + index_mod.TEMPORAL_SMOOTHNESS_LAMBDA = original + + assert idx[2000] == 0.0 # baseline anchor + # beta_2015 must stay near its true post-gap level, not get dragged down by a + # spurious curvature penalty that treats the gap as a single-year step. + assert abs(idx[2015] - true[2015]) < 0.05 + + +def test_n_pairs_counts_only_cross_year_pairs(): + """FIX #12: same-year pairs carry zero index information and must not inflate + the shrinkage weight; n_pairs counts only cross-year (year2 != year1) pairs.""" + rows = [] + + def add_pairs(group, year1, year2, n): + for _ in range(n): + rows.append( + { + "grp": group, + "year1": year1, + "year2": year2, + "log_ratio": 0.03 * (year2 - year1), + "weight": 1.0, + } + ) + + # 8 genuine cross-year pairs spanning enough years for a valid solve, plus 3 + # zero-information same-year pairs that must not be counted. + add_pairs("g", 2010, 2011, 4) + add_pairs("g", 2011, 2012, 4) + add_pairs("g", 2012, 2012, 3) # same-year, zero info + + pairs = pl.DataFrame(rows) + indices, n_pairs = compute_indices_for_level(pairs, "grp") + + assert "g" in indices + assert n_pairs["g"] == 8 # not 11 diff --git a/pipeline/transform/price_estimation/test_knn.py b/pipeline/transform/price_estimation/test_knn.py index c5309aa..3b147be 100644 --- a/pipeline/transform/price_estimation/test_knn.py +++ b/pipeline/transform/price_estimation/test_knn.py @@ -71,9 +71,49 @@ def test_knn_excludes_same_sale_and_uses_stable_comparables(): ), ) + # The five 900k same-postcode siblings share the target's (postcode, price, + # date) identity proxy, so they are all excluded as comparables, leaving the + # 200k/80sqm = 2_500 PSM neighbours. Removing same-identity siblings is an + # INTENTIONAL conservative leakage-prevention tradeoff (no per-property id + # exists to distinguish a target's own resale from a distinct bulk-block + # sibling sold same-day at the same price), not ideal behaviour -- see the + # _sale_identity_matches docstring. assert psm[0] == 2_500.0 +def test_knn_median_psm_is_deterministic(): + """Reproducibility guard (BUG #6): within-postcode neighbours are co-located + (one centroid per postcode), so the kNN result for dense postcodes depends on + an arbitrary same-postcode subset. That is acceptable, but it MUST be stable: + two identical calls against the same trees/inputs return identical output, so + future refactors cannot silently introduce run-to-run nondeterminism.""" + sale_date = date(2026, 1, 1) + rows = [ + { + "Postcode": "AA1 1AA", + "Property type": "Detached", + "lat": 51.5000 + i * 0.00001, + "lon": -0.1000, + "Total floor area (sqm)": 80.0, + "Last known price": 200_000.0 + i * 1_000.0, + "Date of last transaction": sale_date, + } + for i in range(40) + ] + df = pl.DataFrame(rows) + trees = build_knn_pool(df.lazy(), _flat_index(), 2026.0) + + args = dict( + lat=np.array([51.5000, 51.5002]), + lon=np.array([-0.1000, -0.1000]), + type_groups=np.array(["Detached", "Detached"]), + ) + first = knn_median_psm(trees, **args) + second = knn_median_psm(trees, **args) + + assert np.array_equal(first, second) + + def test_guarded_blend_routes_unstable_knn_to_index_and_caps_uplift(): blended = guarded_blend_estimates( index_est=np.array([120_000.0, 1_000_000.0]), diff --git a/pipeline/transform/price_estimation/test_shrinkage.py b/pipeline/transform/price_estimation/test_shrinkage.py index 06ab01f..5c8bed5 100644 --- a/pipeline/transform/price_estimation/test_shrinkage.py +++ b/pipeline/transform/price_estimation/test_shrinkage.py @@ -1,99 +1,117 @@ -"""Regression tests for common-base-year re-anchoring before blending. +"""Regression tests for parent-base lifting before hierarchical blending. -Each repeat-sales index dict is anchored to log-index 0 at its OWN earliest -year. shrink_dicts / blend_dicts combine dicts key-by-key, so dicts anchored to -different base years must be re-anchored to a single common base first, or the +solve_robust_index anchors every repeat-sales cell to log-index 0 at its OWN +earliest year, so a cell with a shorter history sits on a later origin than its +(wider) parent. shrink_dicts / blend_dicts combine dicts key-by-key, so a child +must first be lifted onto its parent's base at the child's first year, or the blend averages level-incompatible numbers (fix5-index-base-year). + +Note: re-anchoring each cell to the *global* base year is a no-op on real data +(a cell anchored to 0 at its own earliest year already reads 0 there, and the +global base is never later), which is why the fix lifts onto the *parent* at the +child's own start year instead. """ from pipeline.transform.price_estimation.shrinkage import ( - blend_dicts, - reanchor_dict, - reanchor_dicts, + hierarchical_shrinkage, + lift_onto_parent, shrink_dicts, ) +from pipeline.transform.price_estimation.utils import SHRINKAGE_K -def test_reanchor_is_pure_constant_shift_preserving_differences(): - """Re-anchoring only shifts the origin; year-to-year moves are unchanged.""" - # Anchored at its own earliest year 2008. - idx = {2008: 0.0, 2009: 0.10, 2010: 0.25, 2011: 0.40} +def test_lift_rebases_late_starting_child_onto_parent(): + """A child anchored at its own later start year is lifted to the parent's level there.""" + parent = {1996: 0.0, 2008: 0.80, 2016: 1.20, 2024: 1.50} + # Sector with its own repeat-sales data only from 2016, anchored at 2016 = 0. + sector = {2016: 0.0, 2024: 0.20} - reanchored = reanchor_dict(idx, 1996) - # 1996 is before this dict's history -> back-fill earliest value (0.0), - # so the shift is 0 and the dict is unchanged. - assert reanchored[2008] == 0.0 + lifted = lift_onto_parent(sector, parent) - # Same shape, different exact-hit base year: anchoring at 2010 subtracts 0.25. - reanchored_2010 = reanchor_dict(idx, 2010) - assert reanchored_2010[2010] == 0.0 - # All within-dict differences are preserved under the constant shift. - years = sorted(idx) - for a, b in zip(years, years[1:]): - assert abs((reanchored_2010[b] - reanchored_2010[a]) - (idx[b] - idx[a])) < 1e-12 + # child[start] now equals the parent's accumulated level at that year. + assert abs(lifted[2016] - parent[2016]) < 1e-12 # 1.20 + assert abs(lifted[2024] - (parent[2016] + 0.20)) < 1e-12 # 1.40 + # Pure constant shift: the child's own year-to-year move is preserved. + assert abs((lifted[2024] - lifted[2016]) - (sector[2024] - sector[2016])) < 1e-12 -def test_blend_different_base_years_needs_reanchoring(): - """Blending two dicts on different bases is biased unless re-anchored first. +def test_lift_is_noop_when_child_starts_at_parent_base(): + """A child whose earliest year is the parent's base (value 0) is unchanged.""" + parent = {1996: 0.0, 2008: 0.80, 2016: 1.20} + child = {1996: 0.0, 2008: 0.75, 2016: 1.10} + assert lift_onto_parent(child, parent) == child - Both cells observe the common base year 1996 but were anchored to DIFFERENT - origins (sectorA at 1996, sectorB at 2008, as solve_robust_index would do for - cells whose pair history starts at different years). They describe the SAME - true trajectory measured from 1996, so a 50/50 blend should reproduce that - common level. Pre-fix, blend_dicts mixes sectorB's 2008-relative numbers with - sectorA's 1996-relative numbers, level-shifting the smoothed result. + +def test_lift_handles_empty_inputs(): + assert lift_onto_parent({}, {2000: 0.0}) == {} + assert lift_onto_parent({2000: 0.0}, {}) == {2000: 0.0} + + +def test_lift_fixes_estimate_spanning_child_start_but_not_within_range(): + """The lift corrects comparisons that span the cell's start year, and ONLY those. + + A property sold in 2008 (before the sector's own data begins in 2016) and + valued in 2024: pre-lift the shrunk index mixes a 2016-based sector level + with 1996-based parent levels and badly understates the move. Comparisons + wholly inside the sector's own range (2016->2024) are unchanged, because the + lift is a pure constant shift that cancels in a within-cell difference. """ - base_year = 1996 + parent = {1996: 0.0, 2008: 0.80, 2016: 1.20, 2024: 1.50} + sector = {2016: 0.0, 2024: 0.20} # own data starts 2016 + n = 30 + w = n / (n + SHRINKAGE_K) - # True log-levels relative to 1996 (identical trajectory for both cells). - truth = {1996: 0.0, 2008: 0.80, 2012: 1.00} + raw = shrink_dicts(sector, parent, n) # pre-fix: blend without lifting + fixed = shrink_dicts(lift_onto_parent(sector, parent), parent, n) - # sectorA: anchored at 1996 (its earliest year) -> equals truth. - sector_a = dict(truth) - # sectorB: same trajectory but anchored at 2008 (subtract truth[2008] from - # every year), exactly how solve_robust_index would express a cell whose - # earliest year happened to be picked as 2008. - shift_b = truth[2008] - sector_b = {y: v - shift_b for y, v in truth.items()} + # Within the sector's own range the lift changes nothing. + assert abs((fixed[2024] - fixed[2016]) - (raw[2024] - raw[2016])) < 1e-12 - # --- Pre-fix behaviour: blend the raw dicts directly. --- - raw_blend = blend_dicts(sector_a, [sector_b], 0.5, [0.5]) - # Every year is pulled by half of shift_b (0.4) away from the truth. - assert abs(raw_blend[2012] - truth[2012]) > 0.3 - assert abs(raw_blend[1996] - truth[1996]) > 0.3 + # 2008 is parent-only in both (sector absent), so both read parent[2008]. + assert abs(raw[2008] - parent[2008]) < 1e-12 + assert abs(fixed[2008] - parent[2008]) < 1e-12 - # --- Post-fix behaviour: re-anchor to the common base, THEN blend. --- - reanchored = reanchor_dicts({"A": sector_a, "B": sector_b}, base_year) - fixed_blend = blend_dicts(reanchored["A"], [reanchored["B"]], 0.5, [0.5]) - # Both cells now read 0 at 1996 and the true level at every shared year. - for y in truth: - assert abs(fixed_blend[y] - truth[y]) < 1e-9 + raw_move = raw[2024] - raw[2008] + fixed_move = fixed[2024] - fixed[2008] + # Hand-computed: raw[2024] = w*0.20 + (1-w)*1.50; fixed[2024] = w*1.40 + (1-w)*1.50. + assert abs(raw_move - ((w * 0.20 + (1 - w) * 1.50) - 0.80)) < 1e-12 + assert abs(fixed_move - ((w * 1.40 + (1 - w) * 1.50) - 0.80)) < 1e-12 + # The fix raises the spanning move by exactly the parent growth to the + # sector's start year that the raw blend dropped (weighted by w). + assert abs((fixed_move - raw_move) - w * parent[2016]) < 1e-12 + # Fixed move is close to the true area-level move (0.70); raw badly understates it. + assert abs(fixed_move - 0.70) < 0.2 + assert raw_move < 0.4 * fixed_move -def test_shrink_dicts_after_reanchoring_is_consistent(): - """Shrinking a cell toward its parent must use a common origin.""" - base_year = 2000 - # Parent (national) anchored at 2000. - parent = {2000: 0.0, 2010: 0.50, 2020: 1.20} - # Sector tracking the parent exactly but anchored at 2010 (subtract 0.50 from - # every year), as solve_robust_index would express a cell whose earliest year - # is later. It still observes the 2000 base year (value -0.50). - sector = {2000: -0.50, 2010: 0.0, 2020: 0.70} - n = 0 # no own data weight -> result should equal parent after anchoring +def test_hierarchical_shrinkage_lift_fn_only_changes_spanning_comparisons(): + """Integration: passing lift_fn re-bases a late-starting sector via its parent chain.""" + top = {1996: 0.0, 2008: 0.80, 2016: 1.20, 2024: 1.50} + sector = {"AB1 1": {2016: 0.0, 2024: 0.20}} + sector_n = {"AB1 1": 300} + # No own area/district indices -> the sector shrinks straight toward `top`. + base_args = ( + sector, + sector_n, + {}, + {}, + {}, + {}, + top, + ["AB1 1"], + {"AB1 1": "AB1"}, + {"AB1": "AB"}, + shrink_dicts, + ) - reanchored_sector = reanchor_dict(sector, base_year) - # Exact hit on 2000 subtracts -0.50, putting the sector back on the parent's - # origin: 0.0 at 2000, 0.50 at 2010, 1.20 at 2020. - shrunk = shrink_dicts(reanchored_sector, parent, n) - assert abs(shrunk[2000] - 0.0) < 1e-9 - assert abs(shrunk[2010] - 0.50) < 1e-9 - assert abs(shrunk[2020] - 1.20) < 1e-9 + without_lift = hierarchical_shrinkage(*base_args)["AB1 1"] + with_lift = hierarchical_shrinkage(*base_args, lift_onto_parent)["AB1 1"] - -def test_reanchor_exact_hit_shifts_all_years(): - """When the base year is present, subtract its value from every year.""" - idx = {1996: 0.0, 2005: 0.30, 2015: 0.90} - reanchored = reanchor_dict(idx, 2005) - assert reanchored[2005] == 0.0 - assert abs(reanchored[1996] - (-0.30)) < 1e-12 - assert abs(reanchored[2015] - 0.60) < 1e-12 + # Within the sector's own range: identical (pure constant shift cancels). + assert abs( + (with_lift[2024] - with_lift[2016]) - (without_lift[2024] - without_lift[2016]) + ) < 1e-12 + # Spanning the sector's start year: the lift raises the 2008->2024 move. + assert (with_lift[2024] - with_lift[2008]) > ( + without_lift[2024] - without_lift[2008] + ) + 0.1 diff --git a/pipeline/transform/test_crime_spatial.py b/pipeline/transform/test_crime_spatial.py index fe9af06..d368e77 100644 --- a/pipeline/transform/test_crime_spatial.py +++ b/pipeline/transform/test_crime_spatial.py @@ -252,6 +252,47 @@ def test_avg_yr_is_simple_mean_of_year_bars(tmp_path): assert bars == {2023: pytest.approx(24.0, abs=0.05), 2024: pytest.approx(12.0, abs=0.05)} +def test_serious_rollup_avg_yr_equals_mean_of_rollup_bars(tmp_path): + # Two SERIOUS types occur in DISJOINT years for one postcode: Burglary only in + # 2014, Robbery only in 2024 (each a single full month -> 12/yr). The headline + # "Serious crime (avg/yr)" must equal the mean of the "Serious crime (by year)" + # bars (which span the UNION of years any serious type occurred), NOT the sum + # of the per-type means. Summing per-type means divides each type by its OWN + # years-present (1 each) -> 12 + 12 = 24; the consistent rollup divides the + # per-year serious total by the years any serious type occurred (2) -> 12. + units = tmp_path / "units" + _write_boundaries( + units, {"AB1": [_square_feature("AB1 1AA", 1000, 1000, 1010, 1010)]} + ) + + crime = tmp_path / "crime" + _write_month(crime, "2014-01", [_crime_row("2014-01", 1005, 1005, "Burglary")]) + _write_month(crime, "2024-01", [_crime_row("2024-01", 1005, 1005, "Robbery")]) + + output = tmp_path / "crime_by_postcode.parquet" + by_year = tmp_path / "crime_by_postcode_by_year.parquet" + transform_crime_spatial(crime, units, output, by_year, buffer_m=50.0) + + avg = pl.read_parquet(output).row(0, named=True) + # The precomputed rollup headline exists and equals the mean of the bars (12), + # not the sum of the per-type avg/yr values (Burglary 12 + Robbery 12 = 24). + assert "Serious crime (avg/yr)" in avg + assert avg["Burglary (avg/yr)"] == pytest.approx(12.0, abs=0.05) + assert avg["Robbery (avg/yr)"] == pytest.approx(12.0, abs=0.05) + assert avg["Serious crime (avg/yr)"] == pytest.approx(12.0, abs=0.05) + + serious_bars = { + p["year"]: p["count"] + for p in pl.read_parquet(by_year).row(0, named=True)["Serious crime (by year)"] + } + assert serious_bars == { + 2014: pytest.approx(12.0, abs=0.05), + 2024: pytest.approx(12.0, abs=0.05), + } + mean_of_bars = sum(serious_bars.values()) / len(serious_bars) + assert avg["Serious crime (avg/yr)"] == pytest.approx(mean_of_bars, abs=0.05) + + def test_avg_yr_denominator_is_per_postcode_not_global(tmp_path): # P (AB1 1AA) has burglaries only in its single most-recent year (2024); Q # (AB1 1AB), far away, has a burglary in 2014. The type therefore spans TWO diff --git a/pipeline/transform/test_join_epc_pp.py b/pipeline/transform/test_join_epc_pp.py index e10fde1..bd78db0 100644 --- a/pipeline/transform/test_join_epc_pp.py +++ b/pipeline/transform/test_join_epc_pp.py @@ -58,7 +58,7 @@ def test_scan_epc_certificates_supports_legacy_uppercase_csv(tmp_path: Path): "potential_energy_rating": "B", "epc_property_type": "House", "built_form": "Mid-Terrace", - "inspection_date": "2024-01-02", + "inspection_date": date(2024, 1, 2), "total_floor_area": 84.5, "number_habitable_rooms": None, "floor_height": 2.4, @@ -179,6 +179,65 @@ def test_run_joins_domestic_zip_with_price_paid(tmp_path: Path): assert df.get_column("historical_prices").list.len().to_list() == [2] +def test_run_dedup_prefers_valid_dated_cert_over_garbled_date(tmp_path: Path): + # Two certificates for the same property. The cert with the garbled, + # unparseable inspection_date must NOT be chosen as "latest": a string sort + # nulls-first would have picked it, attaching a stale rating/floor area. The + # valid-dated cert wins, so its rating ("C") and floor area (85) survive. + zip_path = tmp_path / "domestic-csv.zip" + with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive: + csv_buffer = io.StringIO() + writer = csv.DictWriter(csv_buffer, fieldnames=EPC_SOURCE_COLUMNS) + writer.writeheader() + writer.writerows( + [ + _row( + current_energy_rating="c", + inspection_date="2024-01-01", + total_floor_area="85", + ), + # Same property; an unparseable date (OCR/garbled). Under a raw + # string descending sort "not-a-date" outranks the ISO date and + # wins the dedup, but as a null Date it loses. + _row( + current_energy_rating="g", + inspection_date="not-a-date", + total_floor_area="40", + ), + ] + ) + archive.writestr("certificates-2024.csv", csv_buffer.getvalue()) + + price_paid_path = tmp_path / "price-paid.parquet" + pl.DataFrame( + { + "price": [250_000], + "date_of_transfer": [date(2024, 2, 3)], + "property_type": ["T"], + "postcode": ["AA1 1AA"], + "paon": ["1"], + "saon": [None], + "street": ["Example Street"], + "locality": [None], + "town_city": ["Exampletown"], + "duration": ["F"], + "old_new": ["N"], + "ppd_category": ["A"], + } + ).write_parquet(price_paid_path) + + output_path = tmp_path / "epc-pp.parquet" + _run(zip_path, price_paid_path, output_path, tmp_path) + + df = pl.read_parquet(output_path) + + assert df.height == 1 + # The valid-dated cert's facts are kept; the garbled-date cert is NOT chosen. + assert df.select("current_energy_rating", "total_floor_area").to_dicts() == [ + {"current_energy_rating": "C", "total_floor_area": 85.0} + ] + + def test_run_excludes_price_paid_rows_without_full_postcode(tmp_path: Path): zip_path = tmp_path / "domestic-csv.zip" with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive: diff --git a/pipeline/transform/test_merge.py b/pipeline/transform/test_merge.py index 482cc1c..3d4589e 100644 --- a/pipeline/transform/test_merge.py +++ b/pipeline/transform/test_merge.py @@ -14,6 +14,7 @@ from pipeline.transform.merge import ( _build_unmatched_listing_seed_rows, _canonical_postcode_expr, _coalesce_direct_epc_columns, + _dedupe_collapsed_properties, _filter_to_active_english_postcodes, _join_area_side_tables, _finalize_listings, @@ -193,6 +194,159 @@ def test_postcode_feature_validation_rejects_unsupported_or_ungeocoded_rows() -> _validate_postcode_feature_output(postcode_df, expected_postcode_count=2) +def test_postcode_feature_validation_rejects_wrong_count() -> None: + # The universe-size invariant: the postcode feature output must contain + # EXACTLY the active-England universe. Too few rows (silently dropped + # postcodes) and too many / duplicated rows (a join fan-out) must both fail, + # so neither a truncated build nor a one-to-many join can ship. + too_few = pl.DataFrame( + { + "Postcode": ["AA1 1AA"], + "lat": [51.0], + "lon": [-0.1], + "ctry25cd": ["E92000001"], + } + ) + with pytest.raises(ValueError, match="active England postcode universe"): + _validate_postcode_feature_output(too_few, expected_postcode_count=2) + + too_many = pl.DataFrame( + { + "Postcode": ["AA1 1AA", "BB1 1BB", "CC1 1CC"], + "lat": [51.0, 52.0, 53.0], + "lon": [-0.1, -0.2, -0.3], + "ctry25cd": ["E92000001"] * 3, + } + ) + with pytest.raises(ValueError, match="active England postcode universe"): + _validate_postcode_feature_output(too_many, expected_postcode_count=2) + + # Right row count but a duplicated key (n_unique < height) -- the signature of + # a join fan-out. + duplicated = pl.DataFrame( + { + "Postcode": ["AA1 1AA", "AA1 1AA"], + "lat": [51.0, 51.0], + "lon": [-0.1, -0.1], + "ctry25cd": ["E92000001", "E92000001"], + } + ) + with pytest.raises(ValueError, match="active England postcode universe"): + _validate_postcode_feature_output(duplicated, expected_postcode_count=2) + + +def test_join_area_side_tables_does_not_fan_out_on_unique_keys() -> None: + # Soundness: with side tables unique on their join key, the per-postcode + # feature joins emit exactly one row per postcode (no fan-out). A fan-out here + # would inflate the postcode universe above the active-England count -- the + # failure the universe assertion above is the backstop for. + base = pl.LazyFrame( + { + "postcode": ["AA1 1AA", "BB2 2BB"], + "lsoa21": ["E01000001", "E01000002"], + "Local Authority District code (2024)": ["E09000001", "E09000002"], + "pcon": ["E14000001", "E14000002"], + } + ) + + def _by_postcode(extra: dict) -> pl.LazyFrame: + return pl.LazyFrame({"postcode": ["AA1 1AA", "BB2 2BB"], **extra}) + + crime = pl.LazyFrame( + { + "postcode": ["AA1 1AA", "BB2 2BB"], + "Serious crime (avg/yr)": [1.0, 2.0], + "Minor crime (avg/yr)": [3.0, 4.0], + } + ) + joined = _join_area_side_tables( + base, + iod=pl.LazyFrame({"LSOA code (2021)": ["E01000001", "E01000002"]}), + ethnicity=pl.LazyFrame({"Geography_code": ["E09000001", "E09000002"]}), + crime=crime, + median_age=pl.LazyFrame({"lsoa21": ["E01000001", "E01000002"]}), + election=pl.LazyFrame({"pcon": ["E14000001", "E14000002"]}), + poi_counts=_by_postcode({}), + noise=_by_postcode({}), + school_proximity=_by_postcode({}), + conservation_areas=_by_postcode({CONSERVATION_AREA_FEATURE: ["Yes", "No"]}), + tree_density=None, + broadband=pl.LazyFrame( + { + "bb_postcode": ["AA1 1AA", "BB2 2BB"], + "max_download_speed": pl.Series([100, 300], dtype=pl.UInt16), + } + ), + ).collect() + + # One row per postcode in -> one row out; the universe is not inflated. + assert joined.height == 2 + assert sorted(joined["postcode"].to_list()) == ["AA1 1AA", "BB2 2BB"] + + +def test_join_area_side_tables_normalizes_broadband_postcode_key() -> None: + # Broadband comes straight from Ofcom's CSV, so its postcode can drift in + # spacing/casing from the NSPL `pcds` base key. Both sides must be reduced + # to the same canonical form so a real postcode populates + # `max_download_speed` instead of silently missing the left join. + base = pl.LazyFrame( + { + "postcode": ["AB1 2CD", "EF3 4GH"], + "lsoa21": ["E01000001", "E01000002"], + "Local Authority District code (2024)": ["E09000001", "E09000002"], + "pcon": ["E14000001", "E14000002"], + } + ) + + def _by_postcode(extra: dict) -> pl.LazyFrame: + return pl.LazyFrame({"postcode": ["AB1 2CD", "EF3 4GH"], **extra}) + + crime = pl.LazyFrame( + { + "postcode": ["AB1 2CD", "EF3 4GH"], + "Serious crime (avg/yr)": [1.0, 2.0], + "Minor crime (avg/yr)": [3.0, 4.0], + } + ) + # AB1 2CD arrives lowercase + un-spaced; EF3 4GH arrives under two distinct + # raw spellings that canonicalize to one key (the max speed must win, with + # no fan-out of the base row). + broadband = pl.LazyFrame( + { + "bb_postcode": ["ab1 2cd", "ef34gh", "EF3 4GH"], + "max_download_speed": pl.Series([300, 30, 1000], dtype=pl.UInt16), + } + ) + joined = _join_area_side_tables( + base, + iod=pl.LazyFrame({"LSOA code (2021)": ["E01000001", "E01000002"]}), + ethnicity=pl.LazyFrame({"Geography_code": ["E09000001", "E09000002"]}), + crime=crime, + median_age=pl.LazyFrame({"lsoa21": ["E01000001", "E01000002"]}), + election=pl.LazyFrame({"pcon": ["E14000001", "E14000002"]}), + poi_counts=_by_postcode({}), + noise=_by_postcode({}), + school_proximity=_by_postcode({}), + conservation_areas=_by_postcode({CONSERVATION_AREA_FEATURE: ["Yes", "No"]}), + tree_density=None, + broadband=broadband, + ).collect() + + # No fan-out: still one row per base postcode. + assert joined.height == 2 + speeds = dict( + zip(joined["postcode"].to_list(), joined["max_download_speed"].to_list()) + ) + # Spacing/casing drift still joins. + assert speeds["AB1 2CD"] == 300 + # Two raw spellings collapse to one canonical key; the max wins. + assert speeds["EF3 4GH"] == 1000 + # The temporary canonical join key is not leaked into the output schema. + assert "_base_canonical_postcode" not in joined.columns + assert "_bb_canonical_postcode" not in joined.columns + assert "bb_postcode" not in joined.columns + + def test_listed_building_feature_is_property_level() -> None: assert LISTED_BUILDING_FEATURE not in _AREA_COLUMNS @@ -758,8 +912,10 @@ def test_coalesce_direct_epc_was_council_house_prefers_yes() -> None: def test_join_area_side_tables_preserves_missing_crime_as_null() -> None: # The crime table is LEFT-joined per postcode; a postcode absent from it - # must NOT be fabricated as "zero crime" (the safest value). When every - # per-type column is null the Serious/Minor rollups must stay null. + # must NOT be fabricated as "zero crime" (the safest value). The Serious/Minor + # rollups are precomputed in crime_spatial (the mean of the by-year rollup + # bars), so the merge reads them straight through; a missing postcode leaves + # them null. base = pl.LazyFrame( { "postcode": ["AA1 1AA", "BB2 2BB"], @@ -772,7 +928,10 @@ def test_join_area_side_tables_preserves_missing_crime_as_null() -> None: def _by_postcode(extra: dict) -> pl.LazyFrame: return pl.LazyFrame({"postcode": ["AA1 1AA", "BB2 2BB"], **extra}) - # Crime is present only for AA1 1AA; BB2 2BB is absent from the table. + # Crime is present only for AA1 1AA; BB2 2BB is absent from the table. The + # rollup headlines are precomputed values (deliberately NOT the per-type sum, + # which would be 10.0 each) so this test proves the merge consumes the + # precomputed column rather than re-summing per-type columns. crime = pl.LazyFrame( { "postcode": ["AA1 1AA"], @@ -790,6 +949,8 @@ def test_join_area_side_tables_preserves_missing_crime_as_null() -> None: "Public order (avg/yr)": [1.0], "Drugs (avg/yr)": [1.0], "Other crime (avg/yr)": [1.0], + "Serious crime (avg/yr)": [7.5], + "Minor crime (avg/yr)": [4.2], } ) @@ -805,7 +966,12 @@ def test_join_area_side_tables_preserves_missing_crime_as_null() -> None: school_proximity=_by_postcode({}), conservation_areas=_by_postcode({CONSERVATION_AREA_FEATURE: ["Yes", "No"]}), tree_density=None, - broadband=pl.LazyFrame({"bb_postcode": ["AA1 1AA", "BB2 2BB"]}), + broadband=pl.LazyFrame( + { + "bb_postcode": ["AA1 1AA", "BB2 2BB"], + "max_download_speed": pl.Series([100, 300], dtype=pl.UInt16), + } + ), ).collect() by_postcode = { @@ -814,14 +980,50 @@ def test_join_area_side_tables_preserves_missing_crime_as_null() -> None: "postcode", "serious_crime_avg_yr", "minor_crime_avg_yr" ).iter_rows(named=True) } - # Present postcode: rollups are the component sums (1+2+3+4, 10×1). - assert by_postcode["AA1 1AA"]["serious_crime_avg_yr"] == 10.0 - assert by_postcode["AA1 1AA"]["minor_crime_avg_yr"] == 10.0 + # Present postcode: rollups are the precomputed headline values, read through + # unchanged (NOT the per-type sum of 10.0). + assert by_postcode["AA1 1AA"]["serious_crime_avg_yr"] == 7.5 + assert by_postcode["AA1 1AA"]["minor_crime_avg_yr"] == 4.2 # Missing postcode: rollups stay null rather than fabricating 0.0. assert by_postcode["BB2 2BB"]["serious_crime_avg_yr"] is None assert by_postcode["BB2 2BB"]["minor_crime_avg_yr"] is None +def test_dedupe_collapsed_properties_keeps_most_recent_per_address() -> None: + # The terminated-postcode remap can merge two distinct postcodes onto one + # active successor, collapsing the same physical address onto a single + # (postcode, pp_address) key with conflicting sale records. The dedup must + # keep exactly one row per (postcode, pp_address) -- the most recent + # transaction -- and must not collapse genuinely distinct addresses. + from datetime import datetime + + wide = pl.LazyFrame( + { + "postcode": ["SW3 3JY", "SW3 3JY", "SW3 3JY"], + "pp_address": ["45 ELYSTAN PLACE", "45 ELYSTAN PLACE", "9 OTHER ROAD"], + "date_of_transfer": [ + datetime(1990, 1, 1), + datetime(2015, 6, 1), + datetime(2000, 1, 1), + ], + "latest_price": [1_587_700, 4_500_000, 250_000], + } + ) + + out = _dedupe_collapsed_properties(wide).collect() + + # One row per (postcode, pp_address): the two ELYSTAN PLACE rows collapse to one. + assert out.height == 2 + assert out.select(["postcode", "pp_address"]).is_unique().all() + by_addr = {r["pp_address"]: r for r in out.iter_rows(named=True)} + # The kept ELYSTAN PLACE row is the most recent transaction (2015 @ 4.5M), + # not an arbitrary one. + assert by_addr["45 ELYSTAN PLACE"]["date_of_transfer"] == datetime(2015, 6, 1) + assert by_addr["45 ELYSTAN PLACE"]["latest_price"] == 4_500_000 + # A genuinely distinct address in the same postcode is untouched. + assert by_addr["9 OTHER ROAD"]["latest_price"] == 250_000 + + def _property_candidates(rows: list[dict]) -> pl.DataFrame: base = { "postcode": "AA1 1AA", diff --git a/pipeline/transform/test_noise_overlay_tiles.py b/pipeline/transform/test_noise_overlay_tiles.py new file mode 100644 index 0000000..149a08a --- /dev/null +++ b/pipeline/transform/test_noise_overlay_tiles.py @@ -0,0 +1,110 @@ +import numpy as np +import rasterio +from rasterio.transform import from_origin +from rasterio.warp import transform_bounds + +from pipeline.transform import noise_overlay_tiles +from pipeline.transform.noise_overlay_tiles import RasterInfo, _read_noise_tile + + +def _write_corridor_raster(path, nodata=-96.0): + """A small EPSG:27700 raster: a column of 70 dB cells adjacent to genuine + 0.0 (quiet) cells. Bilinear blending of the 0 cells would fabricate a halo + of intermediate dB values between 0 and 70.""" + # 8x8 grid: leftmost two columns are 70 dB, the rest are genuine quiet 0.0. + data = np.zeros((8, 8), dtype=np.float32) + data[:, 0:2] = 70.0 + # Place one true nodata cell to make sure it is also masked out. + data[0, 7] = nodata + + # 10m cells anchored somewhere inside England's BNG extent. + left = 300_000.0 + top = 300_080.0 + transform = from_origin(left, top, 10.0, 10.0) + with rasterio.open( + path, + "w", + driver="GTiff", + height=data.shape[0], + width=data.shape[1], + count=1, + dtype=data.dtype, + crs="EPSG:27700", + transform=transform, + nodata=nodata, + ) as dataset: + dataset.write(data, 1) + return path + + +def test_read_noise_tile_does_not_fabricate_halo(tmp_path): + raster_path = _write_corridor_raster(tmp_path / "corridor.tif") + + with rasterio.open(raster_path) as dataset: + bounds_27700 = dataset.bounds + bounds_mercator = transform_bounds( + dataset.crs, + noise_overlay_tiles.WEB_MERCATOR_CRS, + *bounds_27700, + densify_pts=21, + ) + + info = RasterInfo(path=raster_path, bounds_mercator=bounds_mercator) + + # Render at high resolution so any bilinear halo would surface as + # intermediate dB values along the corridor/quiet seam. + tile_size = 64 + tile = _read_noise_tile([info], bounds_mercator, tile_size) + + finite = tile[np.isfinite(tile)] + # Every finite cell must be the genuine corridor value (~70). There must be + # NO fabricated halo strictly between 0 and 70. + halo = finite[(finite > 0.0) & (finite < 70.0 - 1e-3)] + assert halo.size == 0, f"fabricated halo values present: {np.unique(halo)}" + # Sanity: the corridor itself must still be rendered. + assert finite.size > 0 + assert np.all(finite >= 70.0 - 1e-3) + + +def test_read_noise_tile_preserves_peak_under_downsample(tmp_path): + # 8x8 EPSG:27700 raster: a single loud 75 dB cell in a 50 dB field. + # Downsampling into a smaller tile with bilinear would dilute the peak + # (arithmetic dB averaging); Resampling.max must keep the worst-case dB. + data = np.full((8, 8), 50.0, dtype=np.float32) + data[4, 4] = 75.0 + transform = from_origin(300_000.0, 300_080.0, 10.0, 10.0) + raster_path = tmp_path / "peak.tif" + with rasterio.open( + raster_path, + "w", + driver="GTiff", + height=data.shape[0], + width=data.shape[1], + count=1, + dtype=data.dtype, + crs="EPSG:27700", + transform=transform, + nodata=-96.0, + ) as dataset: + dataset.write(data, 1) + + with rasterio.open(raster_path) as dataset: + bounds_mercator = transform_bounds( + dataset.crs, + noise_overlay_tiles.WEB_MERCATOR_CRS, + *dataset.bounds, + densify_pts=21, + ) + + info = RasterInfo(path=raster_path, bounds_mercator=bounds_mercator) + + # Render the 8x8 source into a 4x4 tile: this downsamples, so bilinear + # would average the 75 dB peak away. + tile = _read_noise_tile([info], bounds_mercator, 4) + finite = tile[np.isfinite(tile)] + + assert finite.size > 0 + # The loud peak must survive the downsample (max, not arithmetic mean). + assert finite.max() >= 75.0 - 1e-3, f"peak diluted to {finite.max()}" + # Max resampling must never invent a value louder than the source. + assert finite.max() <= 75.0 + 1e-3 diff --git a/pipeline/transform/test_transform_poi.py b/pipeline/transform/test_transform_poi.py index b6078a3..023d464 100644 --- a/pipeline/transform/test_transform_poi.py +++ b/pipeline/transform/test_transform_poi.py @@ -1,12 +1,115 @@ +import json + import polars as pl from pipeline.transform.transform_poi import ( _load_ofsted_ratings, _school_icon_category_expr, + transform, transform_grocery_retail_points, ) +def _write_boundary(tmp_path): + """A FeatureCollection whose single feature covers the London-area test + coords used by the transform() fixtures, so in_england_mask keeps them.""" + boundary_path = tmp_path / "england.geojson" + coords = [[-1.0, 51.0], [1.0, 51.0], [1.0, 52.0], [-1.0, 52.0], [-1.0, 51.0]] + boundary_path.write_text( + json.dumps( + { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "properties": {}, + "geometry": {"type": "Polygon", "coordinates": [coords]}, + } + ], + } + ) + ) + return boundary_path + + +def _write_transform_inputs(tmp_path, raw_pois: pl.DataFrame): + """Materialise the parquet inputs transform() requires around a given raw + OSM POIs frame. NaPTAN / grocery / GIAS / Ofsted are minimal but valid.""" + input_path = tmp_path / "pois.parquet" + raw_pois.write_parquet(input_path) + + naptan_path = tmp_path / "naptan.parquet" + pl.DataFrame( + { + "id": ["naptan-1"], + "name": ["Test Rail Station"], + "category": ["Rail station"], + "lat": [51.51], + "lng": [-0.13], + } + ).write_parquet(naptan_path) + + grocery_path = tmp_path / "grocery.parquet" + pl.DataFrame( + { + "id": list(range(1, 6)), + "retailer": ["Tesco"] * 5, + "fascia": ["Tesco"] * 5, + "store_name": [f"Tesco Test {i}" for i in range(1, 6)], + "long_wgs": [-0.14] * 5, + "lat_wgs": [51.52] * 5, + } + ).write_parquet(grocery_path) + + gias_path = tmp_path / "gias.parquet" + pl.DataFrame( + { + "urn": [1001], + "name": ["Test Primary School"], + "phase": ["Primary"], + "type": ["Community school"], + "type_group": ["Local authority maintained schools"], + "age_range": ["4–11"], + "gender": ["Mixed"], + "religious_character": [None], + "admissions_policy": ["Comprehensive"], + "nursery_provision": ["No"], + "sixth_form": ["No"], + "capacity": [200], + "pupils": [180], + "fsm_percent": [12.5], + "trust": [None], + "address": ["1 Test Street"], + "postcode": ["E1 1AA"], + "local_authority": ["Test LA"], + "website": [None], + "telephone": ["02012345678"], + "head_name": ["Jane Doe"], + "lat": [51.53], + "lng": [-0.12], + } + ).write_parquet(gias_path) + + ofsted_path = tmp_path / "ofsted.parquet" + pl.DataFrame( + { + "URN": [1001], + "Latest OEIF overall effectiveness": ["2"], + "Ungraded inspection overall outcome": [None], + } + ).write_parquet(ofsted_path) + + boundary_path = _write_boundary(tmp_path) + return { + "input_path": input_path, + "naptan_path": naptan_path, + "boundary_path": boundary_path, + "grocery_retail_points_path": grocery_path, + "gias_path": gias_path, + "ofsted_path": ofsted_path, + } + + def test_transform_grocery_retail_points_outputs_chain_categories(): raw = pl.DataFrame( { @@ -292,3 +395,79 @@ def test_school_icon_category_handles_one_sided_age_ranges(): "Primary school", "School", ] + + +def test_transform_dedupes_multi_tag_pois(tmp_path): + # One OSM object can carry several tag keys that map to the SAME friendly + # category, so pois.py emits one raw row per key with the SAME id. + # "amenity/pharmacy" and "shop/chemist" both map to "Pharmacy". + raw = pl.DataFrame( + { + "id": ["n42", "n42"], + "name": ["Boots", "Boots"], + "category": ["amenity/pharmacy", "shop/chemist"], + "lat": [51.50, 51.50], + "lng": [-0.10, -0.10], + } + ) + inputs = _write_transform_inputs(tmp_path, raw) + + out = transform(**inputs).collect() + + # No (id, category) pair appears more than once. + assert out.group_by("id", "category").len()["len"].max() == 1 + # The single physical pharmacy is present exactly once. + pharmacies = out.filter( + (pl.col("id") == "n42") & (pl.col("category") == "Pharmacy") + ) + assert pharmacies.height == 1 + + +def test_osm_supermarkets_dropped(tmp_path): + # GEOLYTIX is authoritative for supermarkets; an OSM "shop/supermarket" row + # must not flow through as a second Groceries/Supermarket pin. A + # complementary grocery category (Convenience Store) must still survive. + raw = pl.DataFrame( + { + "id": ["n1", "n2"], + "name": ["Some Supermarket", "Corner Shop"], + "category": ["shop/supermarket", "shop/convenience"], + "lat": [51.50, 51.51], + "lng": [-0.10, -0.11], + } + ) + inputs = _write_transform_inputs(tmp_path, raw) + + out = transform(**inputs).collect() + + osm_supermarkets = out.filter( + (pl.col("group") == "Groceries") & (pl.col("category") == "Supermarket") + ) + assert osm_supermarkets.height == 0 + # Complementary OSM grocery category survives. + convenience = out.filter(pl.col("category") == "Convenience Store") + assert convenience.height == 1 + + +def test_transform_output_unique_per_id_category(tmp_path): + # Soundness: the full transform() output has at most one row per + # (id, category) overall, across every source. + raw = pl.DataFrame( + { + "id": ["n42", "n42", "n7", "n8"], + "name": ["Boots", "Boots", "St Mary's", "St Mary's"], + "category": [ + "amenity/pharmacy", + "shop/chemist", + "amenity/place_of_worship", + "building/church", + ], + "lat": [51.50, 51.50, 51.55, 51.55], + "lng": [-0.10, -0.10, -0.15, -0.15], + } + ) + inputs = _write_transform_inputs(tmp_path, raw) + + out = transform(**inputs).collect() + + assert out.group_by("id", "category").len()["len"].max() == 1 diff --git a/pipeline/transform/transform_poi.py b/pipeline/transform/transform_poi.py index 8109d8a..7bfa304 100644 --- a/pipeline/transform/transform_poi.py +++ b/pipeline/transform/transform_poi.py @@ -6,6 +6,10 @@ import polars as pl from pipeline.utils.england_geometry import in_england_mask DROP_CATEGORIES = { + # GEOLYTIX Grocery Retail Points is the authoritative supermarket source + # (transform_grocery_retail_points), so drop OSM supermarkets to avoid + # double-counting each store as both a GEOLYTIX brand and an OSM "Supermarket". + "shop/supermarket", # Street furniture & infrastructure "amenity/advice", "amenity/atm", @@ -364,14 +368,6 @@ _CATEGORIES: list[tuple[str, str, str, list[str]]] = [ "leisure/yes", ], ), - ( - "Groceries", - "Supermarket", - "🛒", - [ - "shop/supermarket", - ], - ), ( "Groceries", "Convenience Store", @@ -1534,6 +1530,14 @@ def transform( pl.col("category").replace_strict(emoji_mapping).alias("emoji"), ) + # A single OSM object can carry several tag keys that map to the same + # friendly category (e.g. amenity/pharmacy + shop/chemist -> "Pharmacy"), + # which pois.py emits as multiple raw rows sharing one id. Collapse those + # duplicates so they don't inflate downstream proximity counts; rows sharing + # an id with DIFFERENT categories are preserved. Other sources are + # pre-deduplicated. + lf = lf.unique(subset=["id", "category"], keep="first", maintain_order=True) + naptan_df = pl.scan_parquet(naptan_path).collect() mask = in_england_mask( boundary_path, diff --git a/pipeline/utils/__init__.py b/pipeline/utils/__init__.py index 74596c6..043291b 100644 --- a/pipeline/utils/__init__.py +++ b/pipeline/utils/__init__.py @@ -5,6 +5,7 @@ from .fuzzy_join import ( normalize_postcode_key, ) from .haversine import haversine_km, haversine_km_expr +from .nspl_schema import code_col_overrides from .poi_counts import count_pois_per_postcode from .postcode_mapping import build_postcode_mapping @@ -16,6 +17,7 @@ __all__ = [ "normalize_postcode_key", "haversine_km", "haversine_km_expr", + "code_col_overrides", "count_pois_per_postcode", "build_postcode_mapping", ] diff --git a/pipeline/utils/nspl_schema.py b/pipeline/utils/nspl_schema.py new file mode 100644 index 0000000..4e972ed --- /dev/null +++ b/pipeline/utils/nspl_schema.py @@ -0,0 +1,30 @@ +"""Suffix-agnostic schema overrides for ONS NSPL/NSUL classification columns. + +NSPL/NSUL embed the source year in classification-index column names +(e.g. ruc21ind, oac11ind, imd20ind) and ONS bumps those suffixes with each +release. These columns look numeric in early rows but contain string codes +like "UN1" (Unclassified) further down, so they must be forced to String +before scanning — otherwise polars infers Int64 and crashes mid-stream. + +Hard-coding the year suffix is fragile: polars silently ignores overrides for +columns that don't exist, so a renamed suffix would no-op and reintroduce the +crash. Match on the suffix-free stem instead. +""" + +import re + +import polars as pl + +# Matches ruc/oac/imd classification-index columns regardless of year suffix: +# ruc21ind, oac11ind, imd20ind, imd25ind, oacind, ... (case-insensitive). +CODE_COL_PATTERN = re.compile(r"^(ruc|oac|imd)\d*ind$", re.IGNORECASE) + + +def code_col_overrides(names: list[str]) -> dict[str, type[pl.String]]: + """Build a String schema-override dict for NSPL/NSUL code columns. + + Given the column names of an NSPL/NSUL CSV, return overrides forcing every + RUC/OAC/IMD classification-index column to String, independent of the year + suffix ONS happens to use this release. + """ + return {name: pl.String for name in names if CODE_COL_PATTERN.match(name)} diff --git a/pipeline/utils/test_nspl_schema.py b/pipeline/utils/test_nspl_schema.py new file mode 100644 index 0000000..40acbe8 --- /dev/null +++ b/pipeline/utils/test_nspl_schema.py @@ -0,0 +1,57 @@ +import polars as pl + +from pipeline.utils.nspl_schema import CODE_COL_PATTERN, code_col_overrides + + +def test_matches_current_and_renamed_suffixes(): + names = ["pcd", "ruc21ind", "oac11ind", "imd20ind", "lat", "long"] + overrides = code_col_overrides(names) + assert overrides == { + "ruc21ind": pl.String, + "oac11ind": pl.String, + "imd20ind": pl.String, + } + + +def test_catches_future_suffix_bump(): + # The regression: ONS bumps imd20ind -> imd25ind. A hard-coded dict would + # no-op here; the stem match must still catch it. + names = ["pcd", "ruc25ind", "oac21ind", "imd25ind"] + overrides = code_col_overrides(names) + assert set(overrides) == {"ruc25ind", "oac21ind", "imd25ind"} + assert all(dtype is pl.String for dtype in overrides.values()) + + +def test_is_case_insensitive(): + overrides = code_col_overrides(["RUC21IND", "Oac11Ind", "IMD20ind"]) + assert set(overrides) == {"RUC21IND", "Oac11Ind", "IMD20ind"} + + +def test_matches_suffixless_stem(): + assert set(code_col_overrides(["rucind", "oacind", "imdind"])) == { + "rucind", + "oacind", + "imdind", + } + + +def test_ignores_unrelated_columns(): + names = [ + "pcd", + "laua", + "imdscore", # not an *ind column + "indicator", # ends in nothing relevant, no ruc/oac/imd stem + "ruc21indx", # extra trailing char -> not a code column + "xruc21ind", # leading char -> not anchored at start + ] + assert code_col_overrides(names) == {} + + +def test_empty_names(): + assert code_col_overrides([]) == {} + + +def test_pattern_anchored(): + assert CODE_COL_PATTERN.match("imd25ind") + assert not CODE_COL_PATTERN.match("oac11indicator") + assert not CODE_COL_PATTERN.match("region_imd20ind") diff --git a/pipeline/validate_outputs.py b/pipeline/validate_outputs.py index 360b65b..8dc2d59 100644 --- a/pipeline/validate_outputs.py +++ b/pipeline/validate_outputs.py @@ -352,6 +352,50 @@ def _failures_for_active_postcode_boundary_match(spec: str) -> list[str]: return failures +def _failures_for_postcode_universe(spec: str) -> list[str]: + """Validate that a postcode-features parquet's postcode set is exactly the + active-English NSPL/ArcGIS universe. Guards against a truncated or stale + postcode.parquet (e.g. an interrupted merge that wrote only a fraction of the + ~1.49M rows, all otherwise valid) silently passing the build gate, since + `_failures_for_postcode_features` only checks per-row validity, not the count. + """ + arcgis_path, postcodes_path = _split_pair(spec, "postcode universe") + failures = _failures_for_parquet(arcgis_path) + _failures_for_parquet( + postcodes_path + ) + if failures: + return failures + + try: + active = _active_english_arcgis_postcodes(arcgis_path) + got = _parquet_postcodes(postcodes_path) + except Exception as exc: + return [ + f"{arcgis_path} / {postcodes_path}: postcode universe check failed: {exc}" + ] + + failures = [] + if len(got) != len(active): + failures.append( + f"{postcodes_path}: postcode count {len(got):,} != active-English NSPL " + f"universe {len(active):,} (from {arcgis_path})" + ) + + missing = active - got + extra = got - active + if missing: + failures.append( + f"{postcodes_path}: {len(missing):,} active English postcodes from " + f"{arcgis_path} are missing; sample: {_sample(missing)}" + ) + if extra: + failures.append( + f"{postcodes_path}: {len(extra):,} postcodes are not active English " + f"postcodes in {arcgis_path}; sample: {_sample(extra)}" + ) + return failures + + def _failures_for_postcode_features(path: Path) -> list[str]: """Validate the postcode feature output: unique Postcode, non-null lat/lon inside the England bbox, ctry25cd == E92000001, and every '% ' column in @@ -565,6 +609,15 @@ def main() -> int: "lat/lon in England, ctry25cd=E92000001, '% ' columns in [0,100]" ), ) + parser.add_argument( + "--postcode-universe", + action="append", + default=[], + help=( + "Require postcode parquet keys to equal the active-English NSPL " + "universe: ARCGIS::POSTCODES" + ), + ) parser.add_argument( "--properties-subset", action="append", @@ -599,6 +652,8 @@ def main() -> int: failures.extend(_failures_for_active_postcode_boundary_match(spec)) for path in args.postcode_features: failures.extend(_failures_for_postcode_features(path)) + for spec in args.postcode_universe: + failures.extend(_failures_for_postcode_universe(spec)) for spec in args.properties_subset: failures.extend(_failures_for_properties_subset(spec)) for path in args.price_index: