From c938b7190429a92b8bb52e287809cf9bf82f04ff Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Thu, 4 Jun 2026 22:34:26 +0100 Subject: [PATCH] try --- analyses/online_listings_buy.ipynb | 43 ++++-- pipeline/download/noise.py | 33 +++-- pipeline/download/test_noise.py | 46 ++++-- pipeline/transform/join_epc_pp.py | 10 +- pipeline/transform/merge.py | 78 ++++++++--- pipeline/transform/test_join_epc_pp.py | 47 ++++++- pipeline/transform/test_merge.py | 169 +++++++++++++++++++++- pipeline/transform/test_transform_poi.py | 113 +++++++++++++-- pipeline/transform/transform_poi.py | 171 +++++++++++++++++++---- server-rs/Cargo.lock | 1 + server-rs/Cargo.toml | 4 + server-rs/src/main.rs | 89 ++++++++++-- server-rs/src/routes/tiles.rs | 3 +- 13 files changed, 698 insertions(+), 109 deletions(-) diff --git a/analyses/online_listings_buy.ipynb b/analyses/online_listings_buy.ipynb index 44dba93..d2e020b 100644 --- a/analyses/online_listings_buy.ipynb +++ b/analyses/online_listings_buy.ipynb @@ -434,7 +434,8 @@ " color=\"white\" if v > 55 else \"black\")\n", "fig.colorbar(im, ax=ax, label=\"% missing\", fraction=0.025, pad=0.01)\n", "ax.set_title(\"Missing (null or empty-string) % by provider\")\n", - "plt.tight_layout(); plt.show()" + "plt.tight_layout()\n", + "plt.show()" ] }, { @@ -640,9 +641,11 @@ " ax.axvline(np.log10(v), color=\"red\", ls=\"--\", lw=1)\n", " ax.text(np.log10(v), ax.get_ylim()[1] * 0.92, lab, rotation=90, fontsize=7,\n", " color=\"red\", va=\"top\")\n", - "ax.set_xlabel(\"log10(Asking price)\"); ax.set_ylabel(\"count\")\n", + "ax.set_xlabel(\"log10(Asking price)\")\n", + "ax.set_ylabel(\"count\")\n", "ax.set_title(\"Asking price distribution (log scale)\")\n", - "plt.tight_layout(); plt.show()" + "plt.tight_layout()\n", + "plt.show()" ] }, { @@ -764,9 +767,12 @@ "ax.scatter(x[impossible], y[impossible], s=14, alpha=0.8, color=\"red\", label=\"impossible (area < beds×8)\")\n", "xs = np.linspace(0, x.max(), 60)\n", "ax.plot(xs, np.maximum(10, xs * 8), \"k--\", lw=1, label=\"area = beds × 8 m²\")\n", - "ax.set_xlabel(\"Bedrooms\"); ax.set_ylabel(\"Total floor area (m²)\")\n", - "ax.set_title(\"Floor area vs bedrooms (≤160 m²)\"); ax.legend(fontsize=8)\n", - "plt.tight_layout(); plt.show()" + "ax.set_xlabel(\"Bedrooms\")\n", + "ax.set_ylabel(\"Total floor area (m²)\")\n", + "ax.set_title(\"Floor area vs bedrooms (≤160 m²)\")\n", + "ax.legend(fontsize=8)\n", + "plt.tight_layout()\n", + "plt.show()" ] }, { @@ -947,9 +953,13 @@ "for prov, c in colors.items():\n", " gp = g.filter(pl.col(\"provider\") == prov)\n", " ax.scatter(gp[\"lon\"], gp[\"lat\"], s=2, alpha=0.30, color=c, label=prov)\n", - "ax.set_xlabel(\"lon\"); ax.set_ylabel(\"lat\"); ax.set_aspect(\"equal\", adjustable=\"datalim\")\n", - "ax.set_title(\"Listing coordinates by provider (25k sample)\"); ax.legend(markerscale=4, fontsize=8)\n", - "plt.tight_layout(); plt.show()" + "ax.set_xlabel(\"lon\")\n", + "ax.set_ylabel(\"lat\")\n", + "ax.set_aspect(\"equal\", adjustable=\"datalim\")\n", + "ax.set_title(\"Listing coordinates by provider (25k sample)\")\n", + "ax.legend(markerscale=4, fontsize=8)\n", + "plt.tight_layout()\n", + "plt.show()" ] }, { @@ -2040,16 +2050,21 @@ " clean.filter((pl.col(\"Bedrooms\") == 0) & (pl.col(\"Property sub-type\") != \"Studio\")).height,\n", " 0,\n", "]\n", - "x = np.arange(len(labels)); w = 0.38\n", + "x = np.arange(len(labels))\n", + "w = 0.38\n", "fig, ax = plt.subplots(figsize=(10, 3.6))\n", "b1 = ax.bar(x - w / 2, before, w, label=\"raw\", color=\"#dc2626\")\n", "b2 = ax.bar(x + w / 2, after, w, label=\"clean\", color=\"#16a34a\")\n", - "ax.set_xticks(x); ax.set_xticklabels(labels, fontsize=8)\n", - "ax.set_yscale(\"symlog\"); ax.set_ylabel(\"value (symlog)\")\n", - "ax.set_title(\"Before / after cleanup\"); ax.legend()\n", + "ax.set_xticks(x)\n", + "ax.set_xticklabels(labels, fontsize=8)\n", + "ax.set_yscale(\"symlog\")\n", + "ax.set_ylabel(\"value (symlog)\")\n", + "ax.set_title(\"Before / after cleanup\")\n", + "ax.legend()\n", "for bars in (b1, b2):\n", " ax.bar_label(bars, fmt=\"%.0f\", fontsize=7, padding=2)\n", - "plt.tight_layout(); plt.show()" + "plt.tight_layout()\n", + "plt.show()" ] }, { diff --git a/pipeline/download/noise.py b/pipeline/download/noise.py index 5811511..55e7c27 100644 --- a/pipeline/download/noise.py +++ b/pipeline/download/noise.py @@ -1,7 +1,7 @@ """Download Defra Round 4 (2022) strategic noise data for England. Downloads modelled noise levels (road, rail, airport) as GeoTIFF rasters via -WCS, then samples the local maximum around each postcode representative point. +WCS, then samples the 10m cell at each postcode representative point. Outputs a parquet file with postcode-level noise in dB for each source. Uses smaller 20km tiles at native 10m resolution so values are not understated @@ -98,15 +98,21 @@ NOISE_NODATA_SENTINEL = np.float32(-96.0) # NOISE_COLOR_STOPS[0]) — a rendering threshold, not the data's reporting floor. NOISE_QUIET_FLOOR_DB = np.float32(40.0) -# The pipeline has postcode representative points rather than complete unit -# polygons here. Use a small local footprint and take the maximum 10m cell so -# postcode-level noise is not understated by centroid rounding. -POSTCODE_NOISE_RADIUS_M = 50 +# Sample noise at the postcode representative point itself (no neighbourhood +# window). A 50m MAX-of-window grabbed the single loudest 10m cell within ~1.2 ha +# of every postcode; because Defra road contours hug every modelled road and +# representative points sit on/near streets, that inflated postcode noise by +# roughly +9 dB (log scale) — making ~94% of England read >=55 dB Lden and +# collapsing the metric's discrimination at the quiet end. Radius 0 -> +# filter_size 1 -> the maximum_filter is skipped and each postcode reads the +# 10m cell it actually sits in. +POSTCODE_NOISE_RADIUS_M = 0 -# Adjacent download tiles must overlap by at least the sampling radius so every -# postcode's 50m max-window is fully contained in at least one tile. Without -# this, a loud pixel just across a tile seam is invisible to a postcode on the -# far side, under-reporting noise near seams. +# Adjacent download tiles overlap by the sampling radius so every postcode's +# sampling footprint is fully contained in at least one tile. With point +# sampling (radius 0) this is 0 — a representative point falls inside exactly +# one tile — but the relationship is kept so any future non-zero radius keeps +# its window seam-safe. TILE_OVERLAP_M = POSTCODE_NOISE_RADIUS_M # Retry/split behaviour for slow Defra WCS requests. Some 100km eastern tiles @@ -413,8 +419,13 @@ def sample_noise_at_postcodes( label: str, col_name: str, ) -> pl.Series: - """Sample max noise values from 10m tiles around postcode representative points.""" - print(f"[{label}] Sampling max noise values from {len(tile_paths)} tiles...") + """Sample noise from 10m tiles at postcode representative points. + + With POSTCODE_NOISE_RADIUS_M == 0 (the default) each postcode reads the + single 10m cell it sits in; a larger radius reduces to a max over the + surrounding window. + """ + print(f"[{label}] Sampling noise values from {len(tile_paths)} tiles...") noise_db = np.full(len(easting), np.nan, dtype=np.float32) radius_cells = max(0, math.ceil(POSTCODE_NOISE_RADIUS_M / RESOLUTION)) filter_size = radius_cells * 2 + 1 diff --git a/pipeline/download/test_noise.py b/pipeline/download/test_noise.py index b7c159b..d075590 100644 --- a/pipeline/download/test_noise.py +++ b/pipeline/download/test_noise.py @@ -126,19 +126,23 @@ def test_download_raster_raises_on_missing_strict_tiles(monkeypatch, tmp_path): def test_generate_tiles_neighbours_overlap_by_radius(): + # Use an explicit non-zero overlap so the assertion verifies a real positive + # overlap. The production radius is 0 (point sampling), which would make this + # a vacuous ">= 0" check; this keeps the seam-safety guard meaningful for any + # future non-zero sampling radius. tile_size = 20_000 - overlap = noise.POSTCODE_NOISE_RADIUS_M - tiles = noise._generate_tiles( - 0, 60_000, 0, 60_000, tile_size, overlap, tile_size - ) + overlap = 50 + tiles = noise._generate_tiles(0, 60_000, 0, 60_000, tile_size, overlap, tile_size) by_origin = {(min_e, min_n): (max_e, max_n) for min_e, min_n, max_e, max_n in tiles} + saw_horizontal_overlap = False # Horizontally adjacent tiles must overlap by >= overlap. for (min_e, min_n), (max_e, _max_n) in by_origin.items(): right_origin = (min_e + tile_size, min_n) if right_origin in by_origin: assert max_e - right_origin[0] >= overlap + saw_horizontal_overlap = True # Vertically adjacent tiles must overlap by >= overlap. for (min_e, min_n), (_max_e, max_n) in by_origin.items(): @@ -146,6 +150,8 @@ def test_generate_tiles_neighbours_overlap_by_radius(): if up_origin in by_origin: assert max_n - up_origin[1] >= overlap + assert saw_horizontal_overlap # the fixture actually has adjacent tiles + def test_generate_tiles_clamps_to_grid_extent(): tile_size = 20_000 @@ -193,9 +199,7 @@ def test_sample_noise_recovers_value_across_overlapping_seam(monkeypatch, tmp_pa tile_size = 100 overlap = noise.POSTCODE_NOISE_RADIUS_M tiles = noise._generate_tiles(0, 200, 0, 100, tile_size, overlap, tile_size) - by_origin = { - (min_e, min_n): (max_e, max_n) for min_e, min_n, max_e, max_n in tiles - } + by_origin = {(min_e, min_n): (max_e, max_n) for min_e, min_n, max_e, max_n in tiles} left_min_e, left_min_n = 0, 0 left_max_e, left_max_n = by_origin[(left_min_e, left_min_n)] # Overlap fix is what makes the left tile reach across the seam. @@ -269,7 +273,9 @@ def test_sample_noise_distinguishes_nodata_from_in_coverage_quiet( assert result.to_list() == [None, float(noise.NOISE_QUIET_FLOOR_DB), 65.0] -def test_sample_noise_preserves_genuine_reading_above_quiet_floor(monkeypatch, tmp_path): +def test_sample_noise_preserves_genuine_reading_above_quiet_floor( + monkeypatch, tmp_path +): monkeypatch.setattr(noise, "POSTCODE_NOISE_RADIUS_M", 0) monkeypatch.setattr(noise, "RESOLUTION", 10) @@ -324,6 +330,30 @@ def test_sample_noise_nodata_window_stays_null(monkeypatch, tmp_path): assert result.to_list() == [None] +def test_sample_noise_default_radius_samples_at_point_not_window(monkeypatch, tmp_path): + # Regression: production samples noise at the postcode's own 10m cell + # (POSTCODE_NOISE_RADIUS_M == 0), NOT a max-of-window that would grab the + # loudest nearby road cell and inflate every postcode's noise by ~+9 dB. + monkeypatch.setattr(noise, "RESOLUTION", 10) + assert noise.POSTCODE_NOISE_RADIUS_M == 0 + + # Cell 0 = quiet (at the 40 dB floor), cell 1 = loud road (70), adjacent. + data = np.array([[40.0, 70.0]], dtype=np.float32) + _write_geotiff(tmp_path / "noise.tif", data, 0, 10, 10, nodata=-96.0) + + result = noise.sample_noise_at_postcodes( + [tmp_path / "noise.tif"], + # Cell centres: easting 5 -> quiet cell 0; the loud cell 1 is at 15. + easting=np.array([5.0]), + northing=np.array([5.0]), + label="Road", + col_name="road_noise_lden_db", + ) + + # Point sampling reads the quiet own-cell (40), not the loud neighbour (70). + assert result.to_list() == [40.0] + + def test_sample_noise_at_postcodes_uses_local_maximum(monkeypatch, tmp_path): monkeypatch.setattr(noise, "POSTCODE_NOISE_RADIUS_M", 15) monkeypatch.setattr(noise, "RESOLUTION", 10) diff --git a/pipeline/transform/join_epc_pp.py b/pipeline/transform/join_epc_pp.py index b20b8de..d75b92e 100644 --- a/pipeline/transform/join_epc_pp.py +++ b/pipeline/transform/join_epc_pp.py @@ -21,7 +21,15 @@ from ..utils import ( pl.Config.set_tbl_cols(-1) RATING_RANK = {"A": 1, "B": 2, "C": 3, "D": 4, "E": 5, "F": 6, "G": 7} -MIN_PRICE = 50_000 +# Value-quality floor for price aggregations. A flat nominal floor is a blunt +# tool against a deflating threshold — £50k was completely normal for a 1990s +# house, so a 50k floor wrongly discarded ~a third of legitimate 1990s +# open-market sales (and deleted properties whose only sales were old/cheap), +# biasing early-year price history upward. 10k recovers the large [10k,50k) +# band of genuine cheaper sales while still excluding the nominal/junk transfers +# (£1 etc.). A small tail of real sub-10k sales is still dropped — a deliberate +# conservative tradeoff to keep clearly-implausible transfers out. +MIN_PRICE = 10_000 # Plausible construction-year range; band-derived years outside it (e.g. OCR # noise like 1012 or 2202) are nulled rather than published. diff --git a/pipeline/transform/merge.py b/pipeline/transform/merge.py index d844572..0aa8dad 100644 --- a/pipeline/transform/merge.py +++ b/pipeline/transform/merge.py @@ -30,6 +30,7 @@ from pipeline.utils.postcode_mapping import build_postcode_mapping MIN_FLOOR_AREA_M2 = 10 CONSERVATION_AREA_FEATURE = "Within conservation area" +TREE_DENSITY_FEATURE = "Street tree density percentile" LISTED_BUILDING_FEATURE = "Listed building" LISTED_BUILDING_MATCH_RADIUS_M = 250.0 LISTED_BUILDING_NEAREST_POSTCODES = 3 @@ -92,6 +93,10 @@ _AREA_COLUMNS = [ "Noise (dB)", "Max available download speed (Mbps)", CONSERVATION_AREA_FEATURE, + # Tree canopy is a 50m-radius percentile around the postcode centroid, so it + # is postcode-grain: it belongs in the area output (one value per postcode, + # covering property-less postcodes too) rather than duplicated per property. + TREE_DENSITY_FEATURE, # Schools "Good+ primary schools within 5km", "Good+ secondary schools within 5km", @@ -116,7 +121,6 @@ _AREA_COLUMNS = [ _DYNAMIC_POI_DISTANCE_RE = re.compile(r"^Distance to nearest amenity \(.+\) \(km\)$") _DYNAMIC_POI_COUNT_RE = re.compile(r"^Number of amenities \(.+\) within (2|5)km$") -TREE_DENSITY_FEATURE = "Street tree density percentile" _POSTCODE_TREE_DENSITY_PERCENTILE_RE = re.compile( r"^Tree canopy density percentile within \d+m$" ) @@ -818,9 +822,9 @@ def _dedupe_collapsed_properties(wide: pl.LazyFrame) -> pl.LazyFrame: untouched. pp_address is non-null here (join_epc_pp filters it), so the key never merges unrelated rows. """ - return wide.sort( - "date_of_transfer", descending=True, nulls_last=True - ).unique(subset=["postcode", "pp_address"], keep="first", maintain_order=True) + return wide.sort("date_of_transfer", descending=True, nulls_last=True).unique( + subset=["postcode", "pp_address"], keep="first", maintain_order=True + ) def _filter_to_active_english_postcodes( @@ -1108,13 +1112,26 @@ def _construction_year_expr(column: str = "construction_age_band") -> pl.Expr: return epc_band_to_year(pl.col(column)) -def _address_score(query: str, candidate: str | None) -> int: +def _address_score(query: str, candidate: str | None, *, allow_token_set: bool) -> int: if not candidate: return 0 - return max( - fuzz.token_set_ratio(query, candidate), - fuzz.token_sort_ratio(query, candidate), - ) + # token_set_ratio returns 100 whenever the shorter token set is a subset of + # the longer. For a NUMBER-LESS query that is unsafe — a single locality + # token (e.g. "KINGSWOOD") subsets to 100 against any long address that + # merely contains it — so number-less queries score with token_sort_ratio + # only, matching the canonical fuzzy_join._score_bucket. For a NUMBERED + # query the unconditional _numbers_compatible gate has already guaranteed the + # candidate carries compatible house numbers, so token_set cannot inflate + # across different addresses; allowing it recovers genuine matches where the + # scraped listing appends trailing town/county tokens the bare register + # address omits (e.g. "105 RIDGEWAY DRIVE BROMLEY KENT" vs "105 RIDGEWAY + # DRIVE"). + if allow_token_set: + return max( + fuzz.token_set_ratio(query, candidate), + fuzz.token_sort_ratio(query, candidate), + ) + return fuzz.token_sort_ratio(query, candidate) def _has_number(address: str | None) -> bool: @@ -1153,9 +1170,12 @@ def _best_listing_match( ``uprn_index`` (postcode-independent, so it is robust even when the listing's postcode is slightly off); (2) failing that, the highest fuzzy street-address similarity within the listing's own postcode bucket. - No property-attribute heuristics are used — a house number in the listing - address gates the fuzzy match (`_numbers_compatible`) and lowers the score - threshold; a number-less address must match the street almost exactly. + No property-attribute heuristics are used — `_numbers_compatible` gates + every fuzzy match unconditionally (so a number-less listing can never match + a numbered property, and vice versa), as in the canonical + `fuzzy_join._score_bucket`. A house number additionally lowers the score + threshold and (via `_address_score`) permits token_set scoring; a number-less + address scores on token_sort only and must match the street almost exactly. ``addressed_fields`` names the candidate columns to fuzzy-match against (a candidate may carry both a register and an EPC address). Returns @@ -1180,9 +1200,11 @@ def _best_listing_match( address = candidate.get(field) if not address: continue - if listing_has_numbers and not _numbers_compatible(query, address): + # Unconditional number gate (matches fuzzy_join): a number-less + # listing cannot match a numbered candidate and vice versa. + if not _numbers_compatible(query, address): continue - score = _address_score(query, address) + score = _address_score(query, address, allow_token_set=listing_has_numbers) if score > best_score: best_score = score best = candidate @@ -1675,7 +1697,9 @@ def _coalesce_direct_epc_columns(wide: pl.LazyFrame) -> pl.LazyFrame: # "Yes". "Former council house" should fire if EITHER side says so. if raw_column == "was_council_house": return ( - pl.when((pl.col(raw_column) == "Yes") | (pl.col(direct_column) == "Yes")) + pl.when( + (pl.col(raw_column) == "Yes") | (pl.col(direct_column) == "Yes") + ) .then(pl.lit("Yes")) .otherwise(coalesce) .alias(raw_column) @@ -1716,9 +1740,13 @@ def _build_unmatched_listing_seed_rows( "total_floor_area": pl.coalesce( pl.col("_actual_total_floor_area"), pl.col("_direct_total_floor_area") ), + # Prefer the direct-EPC habitable-room count over the listing's value: + # the scraped room count is bedrooms + bathrooms (upstream storage.py + # defect), so it over-counts. Fall back to the listing value only when + # the direct-EPC match has no count. "number_habitable_rooms": pl.coalesce( - pl.col("_actual_number_habitable_rooms"), pl.col("_direct_number_habitable_rooms"), + pl.col("_actual_number_habitable_rooms"), ), "latest_price": pl.col("_actual_asking_price"), } @@ -1836,14 +1864,19 @@ def _finalize_listings(df: pl.DataFrame) -> pl.DataFrame: # Listing coordinates win over the postcode centroid. pl.coalesce(pl.col("_actual_lat").cast(pl.Float64), pl.col("lat")).alias("lat"), pl.coalesce(pl.col("_actual_lon").cast(pl.Float64), pl.col("lon")).alias("lon"), - # Listing's floor area / rooms override any EPC/PP value when present. + # Listing's floor area overrides any EPC/PP value when present. pl.coalesce( pl.col("_actual_total_floor_area").cast(pl.Float64), pl.col("Total floor area (sqm)"), ).alias("Total floor area (sqm)"), + # Rooms: prefer the EPC habitable-room count and fall back to the listing + # value only when no EPC count exists. The scraped "Number of bedrooms & + # living rooms" is actually bedrooms + bathrooms (an upstream storage.py + # defect), so preferring it would inflate the room count and overwrite a + # correct EPC value. pl.coalesce( - pl.col("_actual_number_habitable_rooms").cast(pl.Int16), pl.col("Number of bedrooms & living rooms"), + pl.col("_actual_number_habitable_rooms").cast(pl.Int16), ).alias("Number of bedrooms & living rooms"), pl.when(pl.col("_actual_property_type").is_in(_PROPERTY_TYPE_VALUES)) .then(pl.col("_actual_property_type")) @@ -2130,12 +2163,15 @@ def _build( pl.when( (pl.col("total_floor_area") > MIN_FLOOR_AREA_M2) & ( - (pl.col("latest_price") / pl.col("total_floor_area")) - .is_between(MIN_COMPARABLE_PSM, MAX_COMPARABLE_PSM) + (pl.col("latest_price") / pl.col("total_floor_area")).is_between( + MIN_COMPARABLE_PSM, MAX_COMPARABLE_PSM + ) ) ) .then( - (pl.col("latest_price") / pl.col("total_floor_area")).round(0).cast(pl.Int32) + (pl.col("latest_price") / pl.col("total_floor_area")) + .round(0) + .cast(pl.Int32) ) .otherwise(None) .alias("Price per sqm"), diff --git a/pipeline/transform/test_join_epc_pp.py b/pipeline/transform/test_join_epc_pp.py index bd78db0..baa6cf2 100644 --- a/pipeline/transform/test_join_epc_pp.py +++ b/pipeline/transform/test_join_epc_pp.py @@ -378,7 +378,10 @@ def test_run_new_build_keeps_early_first_transfer_when_sub_min_price(tmp_path: P price_paid_path = tmp_path / "price-paid.parquet" pl.DataFrame( { - "price": [30_000, 300_000], + # 5_000 is below MIN_PRICE (10_000) — a nominal/junk transfer that + # must still anchor the construction year but stay out of the price + # aggregations. + "price": [5_000, 300_000], "date_of_transfer": [date(2015, 2, 3), date(2022, 2, 3)], "property_type": ["T", "T"], "postcode": ["AA1 1AA", "AA1 1AA"], @@ -408,6 +411,48 @@ def test_run_new_build_keeps_early_first_transfer_when_sub_min_price(tmp_path: P assert df.get_column("historical_prices").list.len().to_list() == [1] +def test_run_keeps_sale_above_lowered_min_price(tmp_path: Path): + # A genuine cheap sale of 30_000 sits between the OLD floor (50k) and the + # NEW floor (10k): it must now be RETAINED in the price aggregations. This + # pins the 50k->10k change — it fails on the pre-fix 50k floor (where 30k was + # excluded, giving historical_prices length 1 / latest_price 250_000). + zip_path = tmp_path / "domestic-csv.zip" + with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive: + csv_buffer = io.StringIO() + writer = csv.DictWriter(csv_buffer, fieldnames=EPC_SOURCE_COLUMNS) + writer.writeheader() + writer.writerow(_row()) + archive.writestr("certificates-2024.csv", csv_buffer.getvalue()) + + price_paid_path = tmp_path / "price-paid.parquet" + pl.DataFrame( + { + "price": [250_000, 30_000], + "date_of_transfer": [date(2018, 2, 3), date(2022, 2, 3)], + "property_type": ["T", "T"], + "postcode": ["AA1 1AA", "AA1 1AA"], + "paon": ["1", "1"], + "saon": [None, None], + "street": ["Example Street", "Example Street"], + "locality": [None, None], + "town_city": ["Exampletown", "Exampletown"], + "duration": ["F", "F"], + "old_new": ["N", "N"], + "ppd_category": ["A", "A"], + } + ).write_parquet(price_paid_path) + + output_path = tmp_path / "epc-pp.parquet" + _run(zip_path, price_paid_path, output_path, tmp_path) + + df = pl.read_parquet(output_path) + + assert df.height == 1 + # Both sales now survive the 10k floor; the 30_000 (2022) is the most recent. + assert df.get_column("historical_prices").list.len().to_list() == [2] + assert df.get_column("latest_price").to_list() == [30_000] + + def test_epc_band_to_year_uses_midpoint_and_clamps(): import polars as pl diff --git a/pipeline/transform/test_merge.py b/pipeline/transform/test_merge.py index 3d4589e..3fe6369 100644 --- a/pipeline/transform/test_merge.py +++ b/pipeline/transform/test_merge.py @@ -13,6 +13,7 @@ from pipeline.transform.merge import ( _active_english_postcode_area, _build_unmatched_listing_seed_rows, _canonical_postcode_expr, + _best_listing_match, _coalesce_direct_epc_columns, _dedupe_collapsed_properties, _filter_to_active_english_postcodes, @@ -78,6 +79,40 @@ def test_conservation_area_feature_is_area_level() -> None: assert CONSERVATION_AREA_FEATURE in _AREA_COLUMNS +def test_tree_density_is_area_level_and_survives_the_split() -> None: + # Street tree density is a postcode-centroid percentile (constant per + # postcode), so it must route to the postcode/area output -- not be stripped + # by _area_columns_from -- and must NOT be duplicated into the property + # output. Regression for the drift where it landed only in properties.parquet + # and was lost for the ~308k property-less postcodes. + assert TREE_DENSITY_FEATURE in _AREA_COLUMNS + + df = pl.DataFrame( + { + "Postcode": ["AA1 1AA"], + "Last known price": [250_000], + TREE_DENSITY_FEATURE: [42.0], + } + ) + postcode_features = pl.DataFrame( + { + "Postcode": ["AA1 1AA", "BB1 1BB"], + "lat": [51.0, 52.0], + "lon": [-0.1, -0.2], + "ctry25cd": ["E92000001", "E92000001"], + TREE_DENSITY_FEATURE: [42.0, 7.0], + } + ) + + postcode_df, properties_df = _split_normal_outputs( + df, postcode_features, expected_postcode_count=2 + ) + + assert TREE_DENSITY_FEATURE in postcode_df.columns + assert postcode_df[TREE_DENSITY_FEATURE].to_list() == [42.0, 7.0] + assert TREE_DENSITY_FEATURE not in properties_df.columns + + def test_crime_columns_are_spatial_counts_not_per_capita() -> None: # Crime is now a raw spatial count per postcode; the per-1k-residents # variants were dropped along with the LSOA population denominator. @@ -767,6 +802,41 @@ def test_build_unmatched_listing_seed_rows_uses_direct_epc_fallbacks( assert seed["was_council_house"].to_list() == ["No"] +def test_build_unmatched_listing_seed_rows_prefers_direct_epc_rooms_over_listing( + tmp_path, +) -> None: + # When BOTH the listing room count and a direct-EPC count exist, the EPC + # value must win: the scraped "Number of bedrooms & living rooms" is actually + # bedrooms + bathrooms (upstream defect), so preferring it would inflate the + # count. This pins the coalesce direction (direct-EPC before listing). + listings_path = tmp_path / "listings.parquet" + arcgis_path = tmp_path / "arcgis.parquet" + _sample_listings_frame().with_columns( + # The corrupt listing room count (beds + baths). + pl.lit(5, dtype=pl.Int32).alias("Number of bedrooms & living rooms"), + ).write_parquet(listings_path) + _stub_arcgis(arcgis_path) + + listings = _load_listings_for_merge(listings_path, arcgis_path).with_columns( + # The genuine EPC habitable-room count. + pl.lit(3, dtype=pl.Int16).alias("_direct_number_habitable_rooms"), + ) + template_schema = pl.Schema( + { + "postcode": pl.Utf8, + "pp_address": pl.Utf8, + "number_habitable_rooms": pl.Int16, + **{dst: dtype for _src, dst, dtype in _LISTING_OVERLAY_SOURCES}, + } + ) + + seed = _build_unmatched_listing_seed_rows( + listings.select("_listing_idx"), listings, template_schema + ) + + assert seed["number_habitable_rooms"].to_list() == [3] + + _DIRECT_EPC_CANDIDATE_SCHEMA = { "_direct_epc_row": pl.UInt32, "_direct_epc_match_address": pl.Utf8, @@ -1249,6 +1319,98 @@ def test_integrate_listings_seeds_listing_with_unmatched_street(tmp_path) -> Non assert seed["_actual_listing_url"].to_list() == ["https://example.test/abc"] +def test_best_listing_match_rejects_numberless_listing_against_numbered_property() -> ( + None +): + # Regression: a number-less listing (street/locality only) must NOT match a + # numbered property. The number gate is unconditional (like fuzzy_join), and + # the score is token_sort_ratio only, so a single locality token can no + # longer subset-inflate to 100 against a long numbered address. + candidates = [{"pp_address": "FLAT A3 CHESHAM HEIGHTS ST MONICAS ROAD"}] + result = _best_listing_match( + listing_uprn=None, + query="KINGSWOOD", + uprn_index={}, + bucket_candidates=candidates, + addressed_fields=["pp_address"], + ) + assert result is None + + +def test_best_listing_match_allows_numberless_to_numberless_named_house() -> None: + # A number-less listing CAN still match a number-less (named-house) property + # when the street/name matches almost exactly. + candidates = [{"pp_address": "WOODLANDS HOUSE OAK LANE"}] + result = _best_listing_match( + listing_uprn=None, + query="WOODLANDS HOUSE OAK LANE", + uprn_index={}, + bucket_candidates=candidates, + addressed_fields=["pp_address"], + ) + assert result is not None + candidate, score, method, field = result + assert method == "address" + assert score >= 90.0 + + +def test_best_listing_match_still_matches_numbered_listing_to_numbered_property() -> ( + None +): + # No regression for numbered listings: the number gate still permits a + # compatible house number and the lower with-numbers threshold applies. + candidates = [{"pp_address": "10 OAK LANE"}] + result = _best_listing_match( + listing_uprn=None, + query="10 OAK LANE", + uprn_index={}, + bucket_candidates=candidates, + addressed_fields=["pp_address"], + ) + assert result is not None + _candidate, score, method, _field = result + assert method == "address" + assert score >= 82.0 + + +def test_best_listing_match_numbered_listing_with_trailing_locality_still_matches() -> ( + None +): + # A scraped numbered listing often appends town/county tokens that the bare + # Price-Paid register address omits. token_sort alone would score this ~73 + # (below 82) and drop a correct match; token_set (allowed for numbered + # queries, where the number gate makes it safe) recovers it. + candidates = [{"pp_address": "105 RIDGEWAY DRIVE"}] + result = _best_listing_match( + listing_uprn=None, + query="105 RIDGEWAY DRIVE BROMLEY KENT", + uprn_index={}, + bucket_candidates=candidates, + addressed_fields=["pp_address"], + ) + assert result is not None + candidate, score, _method, _field = result + assert candidate["pp_address"] == "105 RIDGEWAY DRIVE" + assert score >= 82.0 + + +def test_best_listing_match_numbered_query_cannot_subset_inflate_across_numbers() -> ( + None +): + # token_set for numbered queries is safe only because the number gate runs + # first: a query and candidate with incompatible house numbers never reach + # scoring, so token_set cannot inflate "10 OAK LANE" onto "12 OAK LANE". + candidates = [{"pp_address": "12 OAK LANE KINGSTON"}] + result = _best_listing_match( + listing_uprn=None, + query="10 OAK LANE", + uprn_index={}, + bucket_candidates=candidates, + addressed_fields=["pp_address"], + ) + assert result is None + + def test_finalize_listings_promotes_overlay_columns_and_filters_to_listing_rows() -> ( None ): @@ -1325,9 +1487,12 @@ def test_finalize_listings_promotes_overlay_columns_and_filters_to_listing_rows( assert finalized["Est. price per sqm"].to_list() == [5_000, 7_368] assert finalized["Estimated current price"].to_list() == [600_000, 700_000] assert finalized["Last known price"].to_list() == [500_000, 700_000] - # Listing's preferred floor area / rooms / property type / tenure. + # Listing's preferred floor area / property type / tenure. assert finalized["Total floor area (sqm)"].to_list() == [110.0, 95.0] - assert finalized["Number of bedrooms & living rooms"].to_list() == [4, 3] + # Rooms prefer the EPC habitable-room count over the listing's beds+baths + # value: row 0 keeps the EPC 3 (not the listing's _actual 4); row 1 has no + # EPC count so it falls back to the listing's 3. + assert finalized["Number of bedrooms & living rooms"].to_list() == [3, 3] assert finalized["Property type"].to_list() == ["Terraced", "Flats/Maisonettes"] assert finalized["Leasehold/Freehold"].to_list() == ["Freehold", "Leasehold"] # Postcode-level feature carried through to both matched and unmatched rows. diff --git a/pipeline/transform/test_transform_poi.py b/pipeline/transform/test_transform_poi.py index 023d464..1648150 100644 --- a/pipeline/transform/test_transform_poi.py +++ b/pipeline/transform/test_transform_poi.py @@ -5,11 +5,72 @@ import polars as pl from pipeline.transform.transform_poi import ( _load_ofsted_ratings, _school_icon_category_expr, + osm_groceries_colocated_with_geolytix, transform, transform_grocery_retail_points, ) +def test_osm_groceries_colocated_with_geolytix_drops_only_brand_matched_duplicates(): + # GEOLYTIX is authoritative for its chains. An OSM grocery that sits on top + # of a GEOLYTIX store AND carries its brand is the same physical store and + # must be dropped; an independent shop at the same spot, and a same-brand + # store far from any GEOLYTIX point, must be kept. + geolytix = pl.DataFrame( + { + "category": ["Tesco"], + "lat": [51.5000], + "lng": [-0.1000], + } + ) + osm = pl.DataFrame( + { + "id": ["dup-brand", "independent", "far-brand"], + "name": ["Tesco Express", "Bob's Corner Shop", "Tesco Express"], + # ~1 m, ~2 m, and ~55 km from the GEOLYTIX Tesco. + "lat": [51.50001, 51.50002, 52.0], + "lng": [-0.10001, -0.1000, -1.0], + } + ) + + drop_ids = osm_groceries_colocated_with_geolytix(osm, geolytix, radius_m=50.0) + + assert drop_ids == ["dup-brand"] + + +def test_osm_groceries_colocated_with_geolytix_dedupes_cooperative_spelling(): + # GEOLYTIX brand "Co-op" tokenises to "coop"; OSM commonly spells it + # "The Co-operative Food" -> "cooperative". The alias folds them so the + # genuine duplicate is still dropped. + geolytix = pl.DataFrame({"category": ["Co-op"], "lat": [53.0], "lng": [-1.5]}) + osm = pl.DataFrame( + { + "id": ["coop-dup"], + "name": ["The Co-operative Food"], + "lat": [53.00001], + "lng": [-1.5], + } + ) + assert osm_groceries_colocated_with_geolytix(osm, geolytix, radius_m=50.0) == [ + "coop-dup" + ] + + +def test_osm_groceries_colocated_with_geolytix_handles_empty_inputs(): + geolytix = pl.DataFrame({"category": ["Tesco"], "lat": [51.5], "lng": [-0.1]}) + empty = pl.DataFrame( + schema={"id": pl.Utf8, "name": pl.Utf8, "lat": pl.Float64, "lng": pl.Float64} + ) + assert osm_groceries_colocated_with_geolytix(empty, geolytix) == [] + osm = pl.DataFrame( + {"id": ["x"], "name": ["Tesco Express"], "lat": [51.5], "lng": [-0.1]} + ) + empty_glx = pl.DataFrame( + schema={"category": pl.Utf8, "lat": pl.Float64, "lng": pl.Float64} + ) + assert osm_groceries_colocated_with_geolytix(osm, empty_glx) == [] + + def _write_boundary(tmp_path): """A FeatureCollection whose single feature covers the London-area test coords used by the transform() fixtures, so in_england_mask keeps them.""" @@ -345,12 +406,7 @@ def test_load_ofsted_ratings_falls_back_to_ungraded_outcome(tmp_path): } ).write_parquet(ofsted_path) - ratings = ( - _load_ofsted_ratings(ofsted_path) - .collect() - .sort("urn") - .to_dicts() - ) + ratings = _load_ofsted_ratings(ofsted_path).collect().sort("urn").to_dicts() assert ratings == [ {"urn": 1, "ofsted_rating": "Outstanding"}, @@ -384,9 +440,9 @@ def test_school_icon_category_handles_one_sided_age_ranges(): }, ) - categories = df.select( - _school_icon_category_expr().alias("category") - )["category"].to_list() + categories = df.select(_school_icon_category_expr().alias("category"))[ + "category" + ].to_list() assert categories == [ "Nursery school", @@ -449,6 +505,45 @@ def test_osm_supermarkets_dropped(tmp_path): assert convenience.height == 1 +def test_transform_grocery_dedup_drops_only_grocery_aspect(tmp_path): + # The _write_transform_inputs fixture seeds 5 GEOLYTIX "Tesco" points at + # (51.52, -0.14). An OSM object colocated there carrying "Tesco" in its name + # is the same physical store, so its Convenience Store (Groceries) row is a + # duplicate and must be dropped — but its NON-grocery aspect (a Post Office + # sharing the same OSM id) must survive. An independent shop away from the + # GEOLYTIX point keeps its grocery row. + raw = pl.DataFrame( + { + "id": ["n1", "n1", "n2"], + "name": ["Tesco Express", "Tesco Express", "Corner Shop"], + "category": [ + "shop/convenience", + "amenity/post_office", + "shop/convenience", + ], + "lat": [51.52, 51.52, 51.40], + "lng": [-0.14, -0.14, -0.05], + } + ) + inputs = _write_transform_inputs(tmp_path, raw) + + out = transform(**inputs).collect() + + # The colocated, brand-matched grocery row is dropped. + n1_grocery = out.filter((pl.col("id") == "n1") & (pl.col("group") == "Groceries")) + assert n1_grocery.height == 0 + # Its non-grocery aspect (Post Office) survives. + n1_post_office = out.filter( + (pl.col("id") == "n1") & (pl.col("category") == "Post Office") + ) + assert n1_post_office.height == 1 + # The independent corner shop (no brand, far away) keeps its grocery row. + n2_grocery = out.filter( + (pl.col("id") == "n2") & (pl.col("category") == "Convenience Store") + ) + assert n2_grocery.height == 1 + + def test_transform_output_unique_per_id_category(tmp_path): # Soundness: the full transform() output has at most one row per # (id, category) overall, across every source. diff --git a/pipeline/transform/transform_poi.py b/pipeline/transform/transform_poi.py index 7bfa304..ba8d6a1 100644 --- a/pipeline/transform/transform_poi.py +++ b/pipeline/transform/transform_poi.py @@ -1,6 +1,7 @@ import argparse from pathlib import Path +import numpy as np import polars as pl from pipeline.utils.england_geometry import in_england_mask @@ -955,7 +956,6 @@ _CATEGORIES: list[tuple[str, str, str, list[str]]] = [ # Note: schools come from the GIAS register (see transform_gias_schools). # Niche/tertiary education amenities that GIAS does not cover are dropped # rather than mixed in with state-funded schools. - ( "Local Businesses", "Hotel", @@ -1441,38 +1441,128 @@ def transform_gias_schools(gias_path: Path, ofsted_path: Path) -> pl.LazyFrame: # category mirrors icon_category so the dashboard renders one toggle per # school type (Nursery / Primary / Secondary / Sixth form / University /…) # instead of bundling every GIAS row under a single "School" pill. - return pl.scan_parquet(gias_path).join(ofsted, on="urn", how="left").select( - pl.concat_str([pl.lit("gias-"), pl.col("urn").cast(pl.String)]).alias("id"), - pl.col("name"), - icon_category_expr.alias("category"), - icon_category_expr.alias("icon_category"), - pl.lit("Education").alias("group"), - pl.col("lat").cast(pl.Float64), - pl.col("lng").cast(pl.Float64), - emoji_expr.alias("emoji"), - pl.col("phase").alias("school_phase"), - pl.col("type").alias("school_type"), - pl.col("type_group").alias("school_type_group"), - pl.col("age_range").alias("school_age_range"), - pl.col("gender").alias("school_gender"), - pl.col("religious_character").alias("school_religious_character"), - pl.col("admissions_policy").alias("school_admissions_policy"), - pl.col("nursery_provision").alias("school_nursery_provision"), - pl.col("sixth_form").alias("school_sixth_form"), - pl.col("capacity").cast(pl.Int32, strict=False).alias("school_capacity"), - pl.col("pupils").cast(pl.Int32, strict=False).alias("school_pupils"), - pl.col("fsm_percent").cast(pl.Float32, strict=False).alias("school_fsm_percent"), - pl.col("trust").alias("school_trust"), - pl.col("address").alias("school_address"), - pl.col("postcode").alias("school_postcode"), - pl.col("local_authority").alias("school_local_authority"), - pl.col("website").alias("school_website"), - pl.col("telephone").cast(pl.String, strict=False).alias("school_telephone"), - pl.col("head_name").alias("school_head_name"), - pl.col("ofsted_rating").alias("school_ofsted_rating"), + return ( + pl.scan_parquet(gias_path) + .join(ofsted, on="urn", how="left") + .select( + pl.concat_str([pl.lit("gias-"), pl.col("urn").cast(pl.String)]).alias("id"), + pl.col("name"), + icon_category_expr.alias("category"), + icon_category_expr.alias("icon_category"), + pl.lit("Education").alias("group"), + pl.col("lat").cast(pl.Float64), + pl.col("lng").cast(pl.Float64), + emoji_expr.alias("emoji"), + pl.col("phase").alias("school_phase"), + pl.col("type").alias("school_type"), + pl.col("type_group").alias("school_type_group"), + pl.col("age_range").alias("school_age_range"), + pl.col("gender").alias("school_gender"), + pl.col("religious_character").alias("school_religious_character"), + pl.col("admissions_policy").alias("school_admissions_policy"), + pl.col("nursery_provision").alias("school_nursery_provision"), + pl.col("sixth_form").alias("school_sixth_form"), + pl.col("capacity").cast(pl.Int32, strict=False).alias("school_capacity"), + pl.col("pupils").cast(pl.Int32, strict=False).alias("school_pupils"), + pl.col("fsm_percent") + .cast(pl.Float32, strict=False) + .alias("school_fsm_percent"), + pl.col("trust").alias("school_trust"), + pl.col("address").alias("school_address"), + pl.col("postcode").alias("school_postcode"), + pl.col("local_authority").alias("school_local_authority"), + pl.col("website").alias("school_website"), + pl.col("telephone").cast(pl.String, strict=False).alias("school_telephone"), + pl.col("head_name").alias("school_head_name"), + pl.col("ofsted_rating").alias("school_ofsted_rating"), + ) ) +# OSM convenience-format stores that GEOLYTIX also covers (Tesco Express, +# Sainsbury's Local, Co-op Food, Morrisons Daily, Spar, ...) would otherwise be +# counted twice: once as a GEOLYTIX brand row and once as an OSM "Convenience +# Store". GEOLYTIX is authoritative for its chains, so an OSM grocery row that +# sits on top of a GEOLYTIX point AND carries that point's brand name is the +# same physical store and is dropped. Independent corner shops never carry a +# chain brand, so they are kept. +GROCERY_DEDUP_RADIUS_M = 50.0 + +# Brand-token aliases so an OSM name spelt differently from the GEOLYTIX brand +# still matches. GEOLYTIX's "Co-op" tokenises to "coop", but OSM frequently +# spells it "The Co-operative Food" -> "cooperative"; without this, ~300+ genuine +# Co-op duplicates would survive. Keys/values are post-strip (alnum-only) tokens. +_GROCERY_TOKEN_ALIASES = { + "cooperative": "coop", + "cooperatives": "coop", +} + + +def _significant_tokens(name: str | None) -> set[str]: + """Lower-case alphanumeric tokens of length >= 3 from a POI name (aliased).""" + if not name: + return set() + tokens: set[str] = set() + for raw in str(name).lower().split(): + token = "".join(ch for ch in raw if ch.isalnum()) + if len(token) >= 3: + tokens.add(_GROCERY_TOKEN_ALIASES.get(token, token)) + return tokens + + +def osm_groceries_colocated_with_geolytix( + osm_groceries: pl.DataFrame, + geolytix: pl.DataFrame, + radius_m: float = GROCERY_DEDUP_RADIUS_M, +) -> list[str]: + """Return OSM grocery ids that duplicate a GEOLYTIX store. + + An OSM Groceries row is a duplicate when a GEOLYTIX point lies within + ``radius_m`` metres AND that point's brand tokens (its ``category``, e.g. + "Tesco", "Co-op") are all present in the OSM row's name — i.e. the same + physical branded store. Brands with no token >= 3 chars (e.g. "M&S") never + match, so they are conservatively kept rather than risk a false drop. + + ``osm_groceries`` needs columns ``id``, ``name``, ``lat``, ``lng``; + ``geolytix`` needs ``category`` (the brand), ``lat``, ``lng``. + """ + if osm_groceries.is_empty() or geolytix.is_empty(): + return [] + + from scipy.spatial import cKDTree + + glx_lat = geolytix["lat"].to_numpy().astype(float) + glx_lng = geolytix["lng"].to_numpy().astype(float) + glx_brand_tokens = [_significant_tokens(b) for b in geolytix["category"].to_list()] + + osm_lat = osm_groceries["lat"].to_numpy().astype(float) + osm_lng = osm_groceries["lng"].to_numpy().astype(float) + osm_ids = osm_groceries["id"].to_list() + osm_name_tokens = [_significant_tokens(n) for n in osm_groceries["name"].to_list()] + + # Equirectangular projection to metres around the shared mean latitude — at + # England's scale this is accurate to well under the dedup radius. + mean_lat = float(np.mean(np.concatenate([glx_lat, osm_lat]))) + cos_lat = float(np.cos(np.radians(mean_lat))) + glx_xy = np.column_stack([glx_lng * cos_lat * 111_320.0, glx_lat * 110_540.0]) + osm_xy = np.column_stack([osm_lng * cos_lat * 111_320.0, osm_lat * 110_540.0]) + + tree = cKDTree(glx_xy) + neighbours = tree.query_ball_point(osm_xy, r=radius_m) + + drop_ids: list[str] = [] + for osm_idx, glx_indices in enumerate(neighbours): + tokens = osm_name_tokens[osm_idx] + if not tokens: + continue + for glx_idx in glx_indices: + brand = glx_brand_tokens[glx_idx] + if brand and brand.issubset(tokens): + drop_ids.append(osm_ids[osm_idx]) + break + return drop_ids + + def transform( input_path: Path, naptan_path: Path, @@ -1553,6 +1643,27 @@ def transform( grocery_df = pl.read_parquet(grocery_retail_points_path) grocery_pois = transform_grocery_retail_points(grocery_df, boundary_path) + + # Drop OSM grocery rows that duplicate a GEOLYTIX store (same brand, + # colocated) so a Tesco Express / Co-op / Spar isn't counted twice. + osm_groceries = ( + lf.filter(pl.col("group") == "Groceries") + .select("id", "name", "lat", "lng") + .collect(engine="streaming") + ) + duplicate_ids = osm_groceries_colocated_with_geolytix(osm_groceries, grocery_pois) + if duplicate_ids: + print( + f"Dropping {len(duplicate_ids):,} OSM grocery POIs that duplicate a " + "GEOLYTIX store" + ) + # Scope the drop to the Groceries group: a single OSM object can also + # carry a non-grocery aspect (e.g. a convenience store that is also a + # Post Office), which must survive — only its duplicate grocery row goes. + lf = lf.filter( + ~((pl.col("group") == "Groceries") & pl.col("id").is_in(duplicate_ids)) + ) + frames = [ lf, naptan, diff --git a/server-rs/Cargo.lock b/server-rs/Cargo.lock index 4aa3069..db5ab54 100644 --- a/server-rs/Cargo.lock +++ b/server-rs/Cargo.lock @@ -3901,6 +3901,7 @@ dependencies = [ "serde", "serde_json", "sha2 0.11.0", + "tikv-jemalloc-sys", "tikv-jemallocator", "tokio", "tower", diff --git a/server-rs/Cargo.toml b/server-rs/Cargo.toml index bb6b3b4..2d7d290 100644 --- a/server-rs/Cargo.toml +++ b/server-rs/Cargo.toml @@ -41,6 +41,10 @@ sentry = { version = "0.46.0", default-features = false, features = ["backtrace" # steady-state RSS). Decay is configured via `malloc_conf` in main.rs. [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = { version = "0.6", features = ["unprefixed_malloc_on_supported_platforms"] } +# Direct mallctl access so we can force-purge dirty pages back to the OS after the +# data load (jemalloc's idle decay/background_thread doesn't reliably return the +# ~30GB of load-time transient buffers without subsequent allocation activity). +tikv-jemalloc-sys = "0.6" [lints.clippy] min_ident_chars = "warn" diff --git a/server-rs/src/main.rs b/server-rs/src/main.rs index ac2275c..37acf55 100644 --- a/server-rs/src/main.rs +++ b/server-rs/src/main.rs @@ -136,9 +136,74 @@ fn resident_memory_kib() -> Option { }) } +/// Force jemalloc to return all dirty/muzzy pages to the OS immediately. +/// +/// jemalloc keeps freed pages "dirty" for fast reuse and only hands them back to +/// the OS via decay. Under the bursty allocate/free of request handling (and the +/// huge startup-load transients) that decay lags badly, so RSS balloons far above +/// the live working set. `arena.4096.purge` (4096 == `MALLCTL_ARENAS_ALL`) purges +/// every arena synchronously, dropping RSS back down. +#[cfg(target_os = "linux")] +fn jemalloc_purge() { + const PURGE: &[u8] = b"arena.4096.purge\0"; + // Safety: a write-only mallctl with no input/output buffers; the name is a + // valid NUL-terminated jemalloc control string. + unsafe { + tikv_jemalloc_sys::mallctl( + PURGE.as_ptr().cast(), + std::ptr::null_mut(), + std::ptr::null_mut(), + std::ptr::null_mut(), + 0, + ); + } +} + +#[cfg(not(target_os = "linux"))] +fn jemalloc_purge() {} + +/// Enable jemalloc's background purge thread at runtime. Setting it via +/// `background_thread:true` in `malloc_conf` is unreliable (it can be silently +/// dropped depending on init ordering), so we also flip it explicitly here. +#[cfg(target_os = "linux")] +fn enable_jemalloc_background_thread() { + let enabled: bool = true; + // Safety: write-only mallctl of a bool to a valid control name. + unsafe { + tikv_jemalloc_sys::mallctl( + b"background_thread\0".as_ptr().cast(), + std::ptr::null_mut(), + std::ptr::null_mut(), + std::ptr::addr_of!(enabled) as *mut core::ffi::c_void, + std::mem::size_of::(), + ); + } +} + +#[cfg(not(target_os = "linux"))] +fn enable_jemalloc_background_thread() {} + +/// Periodically purge jemalloc arenas so request-handling transients are returned +/// to the OS instead of accumulating as dirty pages. A plain OS thread (not a +/// tokio task) keeps the madvise sweep off the async runtime. +#[cfg(target_os = "linux")] +fn spawn_jemalloc_purger() { + std::thread::Builder::new() + .name("jemalloc-purge".to_string()) + .spawn(|| loop { + std::thread::sleep(Duration::from_secs(10)); + jemalloc_purge(); + }) + .ok(); +} + +#[cfg(not(target_os = "linux"))] +fn spawn_jemalloc_purger() {} + #[cfg(target_os = "linux")] fn trim_allocator(label: &'static str) { let before = resident_memory_kib(); + jemalloc_purge(); let trimmed = unsafe { libc::malloc_trim(0) }; let after = resident_memory_kib(); if let (Some(before), Some(after)) = (before, after) { @@ -401,6 +466,12 @@ async fn main() -> anyhow::Result<()> { ) .init(); + // Keep jemalloc from hoarding freed memory: run its background purge thread + // and a periodic explicit purge so load-time and request-time transients are + // returned to the OS instead of inflating RSS. + enable_jemalloc_background_thread(); + spawn_jemalloc_purger(); + // Initialize Prometheus metrics let metrics_handle = metrics::init_metrics(); info!("Prometheus metrics initialized"); @@ -1016,17 +1087,13 @@ async fn main() -> anyhow::Result<()> { .layer(sentry::integrations::tower::SentryHttpLayer::new()), ); - // Lock all current and future memory pages to prevent swapping - unsafe { - if libc::mlockall(libc::MCL_CURRENT | libc::MCL_FUTURE) != 0 { - let err = std::io::Error::last_os_error(); - tracing::warn!( - "mlockall failed (need CAP_IPC_LOCK or sufficient RLIMIT_MEMLOCK): {err}" - ); - } else { - info!("All memory pages locked (mlockall)"); - } - } + // NOTE: we deliberately do NOT mlockall() here. Locking MCL_CURRENT|MCL_FUTURE + // pinned the allocator's entire mapped heap — including jemalloc's freed/dirty + // pages — resident and non-reclaimable, inflating RSS from the ~10GB working + // set to ~40GB and defeating the allocator's page-return entirely. The hot + // working set stays resident naturally; freed pages are returned to the OS. + + trim_allocator("startup complete"); let addr = consts::SERVER_ADDRESS; let listener = tokio::net::TcpListener::bind(addr) diff --git a/server-rs/src/routes/tiles.rs b/server-rs/src/routes/tiles.rs index bd79b5b..3f2e7c4 100644 --- a/server-rs/src/routes/tiles.rs +++ b/server-rs/src/routes/tiles.rs @@ -312,6 +312,7 @@ fn build_style(is_dark: bool, layers: &[serde_json::Value], tile_url: &str) -> s pub async fn init_tile_reader(path: &std::path::Path) -> anyhow::Result { let backend = FileBackend::open(path)?; - let reader = AsyncPmTilesReader::try_from_cached_source(backend, HashMapCache::default()).await?; + let reader = + AsyncPmTilesReader::try_from_cached_source(backend, HashMapCache::default()).await?; Ok(reader) }