diff --git a/pipeline/download/test_transit_network.py b/pipeline/download/test_transit_network.py new file mode 100644 index 0000000..c50e654 --- /dev/null +++ b/pipeline/download/test_transit_network.py @@ -0,0 +1,79 @@ +"""Tests for transit_network GTFS processing.""" + +import zipfile +from pathlib import Path + +import pytest + +from pipeline.download.transit_network import convert_high_freq_to_frequency_based + + +def _write_gtfs(path: Path, *, stop_times: str) -> None: + """Write a minimal GTFS zip with one metro route and several trips.""" + routes = "route_id,route_type\nR1,1\n" + trips = "trip_id,route_id,direction_id,service_id\n" + "".join( + f"T{i},R1,0,S1\n" for i in range(1, 7) + ) + with zipfile.ZipFile(path, "w") as z: + z.writestr("routes.txt", routes) + z.writestr("trips.txt", trips) + z.writestr("stop_times.txt", stop_times) + + +def _one_based_stop_times() -> str: + """Six trips, 1-based stop_sequence (1,2,...), 5-minute headway.""" + header = "trip_id,stop_sequence,departure_time,stop_id\n" + rows = [] + # First departures 06:00, 06:05, ... (300s = 5 min headway, well under 15 min) + for i in range(6): + trip = f"T{i + 1}" + first_dep = 6 * 3600 + i * 300 + h, m = divmod(first_dep, 3600) + m, s = divmod(m, 60) + # First stop has stop_sequence 1 (NOT 0); second stop sequence 2. + rows.append(f"{trip},1,{h:02d}:{m:02d}:{s:02d},STOP_A\n") + later = first_dep + 120 + h2, m2 = divmod(later, 3600) + m2, s2 = divmod(m2, 60) + rows.append(f"{trip},2,{h2:02d}:{m2:02d}:{s2:02d},STOP_B\n") + return header + "".join(rows) + + +def test_one_based_stop_sequence_is_converted(tmp_path: Path) -> None: + """First stop selection must use the minimum stop_sequence, not literal "0". + + With 1-based stop_sequence the old code (keyed on stop_sequence == "0") found + zero first stops and produced an empty frequencies.txt. The fix selects the + minimum stop_sequence per trip, so the high-frequency group is converted. + """ + src = tmp_path / "in.zip" + dst = tmp_path / "out.zip" + _write_gtfs(src, stop_times=_one_based_stop_times()) + + convert_high_freq_to_frequency_based(src, dst) + + with zipfile.ZipFile(dst, "r") as z: + freq = z.read("frequencies.txt").decode("utf-8") + + freq_rows = [r for r in freq.splitlines()[1:] if r.strip()] + # The single high-frequency group must produce exactly one frequency entry. + assert len(freq_rows) == 1, freq + trip_id, start_time, end_time, headway_secs, _exact = freq_rows[0].split(",") + # Template trip is the earliest departure (T1 at 06:00) starting at first stop. + assert start_time == "06:00:00" + # Median headway of 300s rounds to a 300s headway entry. + assert headway_secs == "300" + + +def test_raises_when_no_first_stops_found(tmp_path: Path) -> None: + """A non-empty target trip set with unparseable stop_sequence is loud, not silent.""" + src = tmp_path / "in.zip" + dst = tmp_path / "out.zip" + bad = ( + "trip_id,stop_sequence,departure_time,stop_id\n" + "T1,not_a_number,06:00:00,STOP_A\n" + ) + _write_gtfs(src, stop_times=bad) + + with pytest.raises(RuntimeError, match="no first stops"): + convert_high_freq_to_frequency_based(src, dst) diff --git a/pipeline/transform/postcode_boundaries/fragments_cache.py b/pipeline/transform/postcode_boundaries/fragments_cache.py new file mode 100644 index 0000000..5fa5b4d --- /dev/null +++ b/pipeline/transform/postcode_boundaries/fragments_cache.py @@ -0,0 +1,79 @@ +"""Persist Phase-3 output (the per-postcode fragments) so a crash in the later +merge/write phases can resume in seconds instead of re-running the ~10-hour OA +loop. + +Phase 3 turns OA boundaries + INSPIRE parcels + UPRN points into ~1.8M +``(postcode, geometry)`` fragments held only in memory. Everything after it +(merge, simplify, GeoJSON write) is cheap but failure-prone -- a single +degenerate postcode used to abort the whole run *after* those 10 hours. Caching +the fragments to disk decouples the expensive computation from the fragile +output stage. + +Fragments are stored as one parquet file with two columns: ``postcode`` +(string) and ``wkb`` (binary Shapely WKB). Writes are atomic (temp file + +``os.replace``) so an interrupted write never leaves a cache that passes the +freshness check. The cache is validated against its upstream inputs by mtime: if +any input is newer than the cache it is treated as stale and ignored, mirroring +make's own freshness logic. +""" + +from __future__ import annotations + +import os +from pathlib import Path + +import numpy as np +import polars as pl +import shapely +from shapely.geometry.base import BaseGeometry + +Fragment = tuple[str, BaseGeometry] + + +def _tmp_path(cache_path: Path) -> Path: + return cache_path.parent / (cache_path.name + ".tmp") + + +def fragments_cache_is_fresh( + cache_path: Path, inputs: list[Path | None] +) -> bool: + """True if ``cache_path`` exists and is newer than every input that exists. + + A missing input is ignored (it cannot have changed the cache); a ``None`` + input is skipped. Any existing input newer than the cache marks it stale. + """ + if not cache_path.exists(): + return False + cache_mtime = cache_path.stat().st_mtime + for inp in inputs: + if inp is None: + continue + path = Path(inp) + if path.exists() and path.stat().st_mtime > cache_mtime: + return False + return True + + +def save_fragments(cache_path: Path, fragments: list[Fragment]) -> None: + """Atomically write ``(postcode, geometry)`` fragments to a parquet cache.""" + postcodes = [pc for pc, _ in fragments] + geoms = np.array([geom for _, geom in fragments], dtype=object) + wkb = shapely.to_wkb(geoms) + + frame = pl.DataFrame( + {"postcode": postcodes, "wkb": list(wkb)}, + schema={"postcode": pl.Utf8, "wkb": pl.Binary}, + ) + + cache_path.parent.mkdir(parents=True, exist_ok=True) + tmp = _tmp_path(cache_path) + frame.write_parquet(tmp, compression="zstd") + os.replace(tmp, cache_path) + + +def load_fragments(cache_path: Path) -> list[Fragment]: + """Read fragments written by :func:`save_fragments` back into memory.""" + frame = pl.read_parquet(cache_path) + postcodes = frame["postcode"].to_list() + geoms = shapely.from_wkb(frame["wkb"].to_list()) + return list(zip(postcodes, geoms)) diff --git a/pipeline/transform/price_estimation/test_shrinkage.py b/pipeline/transform/price_estimation/test_shrinkage.py new file mode 100644 index 0000000..06ab01f --- /dev/null +++ b/pipeline/transform/price_estimation/test_shrinkage.py @@ -0,0 +1,99 @@ +"""Regression tests for common-base-year re-anchoring before blending. + +Each repeat-sales index dict is anchored to log-index 0 at its OWN earliest +year. shrink_dicts / blend_dicts combine dicts key-by-key, so dicts anchored to +different base years must be re-anchored to a single common base first, or the +blend averages level-incompatible numbers (fix5-index-base-year). +""" + +from pipeline.transform.price_estimation.shrinkage import ( + blend_dicts, + reanchor_dict, + reanchor_dicts, + shrink_dicts, +) + + +def test_reanchor_is_pure_constant_shift_preserving_differences(): + """Re-anchoring only shifts the origin; year-to-year moves are unchanged.""" + # Anchored at its own earliest year 2008. + idx = {2008: 0.0, 2009: 0.10, 2010: 0.25, 2011: 0.40} + + reanchored = reanchor_dict(idx, 1996) + # 1996 is before this dict's history -> back-fill earliest value (0.0), + # so the shift is 0 and the dict is unchanged. + assert reanchored[2008] == 0.0 + + # Same shape, different exact-hit base year: anchoring at 2010 subtracts 0.25. + reanchored_2010 = reanchor_dict(idx, 2010) + assert reanchored_2010[2010] == 0.0 + # All within-dict differences are preserved under the constant shift. + years = sorted(idx) + for a, b in zip(years, years[1:]): + assert abs((reanchored_2010[b] - reanchored_2010[a]) - (idx[b] - idx[a])) < 1e-12 + + +def test_blend_different_base_years_needs_reanchoring(): + """Blending two dicts on different bases is biased unless re-anchored first. + + Both cells observe the common base year 1996 but were anchored to DIFFERENT + origins (sectorA at 1996, sectorB at 2008, as solve_robust_index would do for + cells whose pair history starts at different years). They describe the SAME + true trajectory measured from 1996, so a 50/50 blend should reproduce that + common level. Pre-fix, blend_dicts mixes sectorB's 2008-relative numbers with + sectorA's 1996-relative numbers, level-shifting the smoothed result. + """ + base_year = 1996 + + # True log-levels relative to 1996 (identical trajectory for both cells). + truth = {1996: 0.0, 2008: 0.80, 2012: 1.00} + + # sectorA: anchored at 1996 (its earliest year) -> equals truth. + sector_a = dict(truth) + # sectorB: same trajectory but anchored at 2008 (subtract truth[2008] from + # every year), exactly how solve_robust_index would express a cell whose + # earliest year happened to be picked as 2008. + shift_b = truth[2008] + sector_b = {y: v - shift_b for y, v in truth.items()} + + # --- Pre-fix behaviour: blend the raw dicts directly. --- + raw_blend = blend_dicts(sector_a, [sector_b], 0.5, [0.5]) + # Every year is pulled by half of shift_b (0.4) away from the truth. + assert abs(raw_blend[2012] - truth[2012]) > 0.3 + assert abs(raw_blend[1996] - truth[1996]) > 0.3 + + # --- Post-fix behaviour: re-anchor to the common base, THEN blend. --- + reanchored = reanchor_dicts({"A": sector_a, "B": sector_b}, base_year) + fixed_blend = blend_dicts(reanchored["A"], [reanchored["B"]], 0.5, [0.5]) + # Both cells now read 0 at 1996 and the true level at every shared year. + for y in truth: + assert abs(fixed_blend[y] - truth[y]) < 1e-9 + + +def test_shrink_dicts_after_reanchoring_is_consistent(): + """Shrinking a cell toward its parent must use a common origin.""" + base_year = 2000 + # Parent (national) anchored at 2000. + parent = {2000: 0.0, 2010: 0.50, 2020: 1.20} + # Sector tracking the parent exactly but anchored at 2010 (subtract 0.50 from + # every year), as solve_robust_index would express a cell whose earliest year + # is later. It still observes the 2000 base year (value -0.50). + sector = {2000: -0.50, 2010: 0.0, 2020: 0.70} + n = 0 # no own data weight -> result should equal parent after anchoring + + reanchored_sector = reanchor_dict(sector, base_year) + # Exact hit on 2000 subtracts -0.50, putting the sector back on the parent's + # origin: 0.0 at 2000, 0.50 at 2010, 1.20 at 2020. + shrunk = shrink_dicts(reanchored_sector, parent, n) + assert abs(shrunk[2000] - 0.0) < 1e-9 + assert abs(shrunk[2010] - 0.50) < 1e-9 + assert abs(shrunk[2020] - 1.20) < 1e-9 + + +def test_reanchor_exact_hit_shifts_all_years(): + """When the base year is present, subtract its value from every year.""" + idx = {1996: 0.0, 2005: 0.30, 2015: 0.90} + reanchored = reanchor_dict(idx, 2005) + assert reanchored[2005] == 0.0 + assert abs(reanchored[1996] - (-0.30)) < 1e-12 + assert abs(reanchored[2015] - 0.60) < 1e-12 diff --git a/pipeline/utils/test_postcode_mapping.py b/pipeline/utils/test_postcode_mapping.py new file mode 100644 index 0000000..3ed0c0f --- /dev/null +++ b/pipeline/utils/test_postcode_mapping.py @@ -0,0 +1,38 @@ +import polars as pl + +from pipeline.utils.postcode_mapping import ( + MAX_REMAP_DISTANCE_M, + build_postcode_mapping, +) + + +def test_remap_drops_terminated_postcodes_beyond_distance_cap(tmp_path): + # One active England postcode at the origin. + # One terminated postcode 100m away (legitimate adjacent remap -> mapped). + # One terminated postcode 5km away (gross misattribution -> dropped). + arcgis = pl.DataFrame( + { + "pcds": ["AB1 1AA", "AB1 1AB", "ZZ9 9ZZ"], + "ctry25cd": ["E92000001", "E92000001", "E92000001"], + "doterm": [None, "202001", "202001"], + "east1m": [500000.0, 500100.0, 505000.0], + "north1m": [200000.0, 200000.0, 200000.0], + } + ) + arcgis_path = tmp_path / "arcgis.parquet" + arcgis.write_parquet(arcgis_path) + + mapping = build_postcode_mapping(arcgis_path) + + # The nearby terminated postcode is remapped onto the active one. + assert mapping.filter(pl.col("old_postcode") == "AB1 1AB")[ + "new_postcode" + ].to_list() == ["AB1 1AA"] + + # The far (5km > 1km cap) terminated postcode must NOT appear in the mapping. + assert "ZZ9 9ZZ" not in mapping["old_postcode"].to_list() + assert mapping.height == 1 + + +def test_max_remap_distance_constant_is_one_kilometre(): + assert MAX_REMAP_DISTANCE_M == 1000.0 diff --git a/probe_bin b/probe_bin new file mode 100755 index 0000000..e89bbf9 Binary files /dev/null and b/probe_bin differ