diff --git a/analyses/school_catchment_model.ipynb b/analyses/school_catchment_model.ipynb index c259bd7..38966b7 100644 --- a/analyses/school_catchment_model.ipynb +++ b/analyses/school_catchment_model.ipynb @@ -65,7 +65,7 @@ "sys.path.insert(0, str(ROOT))\n", "DATA = ROOT / \"property-data\"\n", "\n", - "from pipeline.transform import school_catchments as sc\n", + "from pipeline.transform import school_catchments as sc # noqa: E402\n", "\n", "print(\"Calibrated constants (see the calibration section for how these were chosen):\")\n", "print(f\" DEMAND_SCALE = {sc.DEMAND_SCALE}\")\n", @@ -292,10 +292,13 @@ " z = np.stack([-eff_good / tau, -d_other / tau])\n", " share_good = np.exp(z[0] - z.max(0)) / np.exp(z - z.max(0)).sum(0)\n", " ax.plot(xs, share_good, ls, label=f\"tau = {tau} km\")\n", - "ax.axvline(0, color=\"tab:green\", lw=1); ax.text(0, 1.04, \"Good school\", ha=\"center\", color=\"tab:green\")\n", - "ax.axvline(1, color=\"tab:gray\", lw=1); ax.text(1, 1.04, \"unrated school\", ha=\"center\", color=\"tab:gray\")\n", + "ax.axvline(0, color=\"tab:green\", lw=1)\n", + "ax.text(0, 1.04, \"Good school\", ha=\"center\", color=\"tab:green\")\n", + "ax.axvline(1, color=\"tab:gray\", lw=1)\n", + "ax.text(1, 1.04, \"unrated school\", ha=\"center\", color=\"tab:gray\")\n", "ax.set(xlabel=\"family position (km)\", ylabel=\"share applying to the Good school\", ylim=(0, 1.12))\n", - "ax.legend(loc=\"lower left\"); fig.tight_layout()\n" + "ax.legend(loc=\"lower left\")\n", + "fig.tight_layout()\n" ] }, { @@ -383,7 +386,8 @@ "ax2.bar(x + 0.18, sme, 0.36, label=\"logit (tau=0.3)\")\n", "ax2.set(xticks=x, xticklabels=[\"A\", \"B\", \"C\"], ylabel=\"final cutoff (km)\",\n", " title=\"smearing widens the popular school's cutoff\")\n", - "ax2.legend(); fig.tight_layout()\n", + "ax2.legend()\n", + "fig.tight_layout()\n", "print(\"deterministic cutoffs:\", np.round(det, 2), \" logit cutoffs:\", np.round(sme, 2))\n" ] }, @@ -586,7 +590,8 @@ "ax.plot([], [], color=\"tab:purple\", label=\"Outstanding primary catchment\")\n", "ax.set(xlim=(-half, half), ylim=(-half, half), xlabel=\"km east of Cambridge centre\",\n", " ylabel=\"km north\", title=\"Modelled primary catchments around Cambridge\")\n", - "ax.set_aspect(\"equal\"); ax.legend(loc=\"upper left\", fontsize=8)\n", + "ax.set_aspect(\"equal\")\n", + "ax.legend(loc=\"upper left\", fontsize=8)\n", "fig.tight_layout()\n" ] }, @@ -686,7 +691,9 @@ " s=14, alpha=0.5, color=\"tab:orange\", marker=\"^\", label=f\"faith (n={len(sub)})\")\n", "ax.set(xscale=\"log\", yscale=\"log\", xlim=lim, ylim=lim,\n", " xlabel=\"published last distance offered (km)\", ylabel=\"modelled cutoff radius (km)\")\n", - "ax.set_aspect(\"equal\"); ax.legend(fontsize=8); fig.tight_layout()\n", + "ax.set_aspect(\"equal\")\n", + "ax.legend(fontsize=8)\n", + "fig.tight_layout()\n", "\n", "for phase in (\"primary\", \"secondary\"):\n", " sub = binding.filter((pl.col(\"phase\") == phase) & ~pl.col(\"faith_school\"))\n", diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 7f03f83..97eabf6 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -171,8 +171,7 @@ function pageToPath(page: Page, inviteCode?: string): string { function pathToPage(rawPathname: string): RouteMatch | null { // Proxies 307-redirect /learn -> /learn/; treat trailing slashes as equivalent. - const pathname = - rawPathname.length > 1 ? rawPathname.replace(/\/+$/, '') || '/' : rawPathname; + const pathname = rawPathname.length > 1 ? rawPathname.replace(/\/+$/, '') || '/' : rawPathname; if (pathname === '/dashboard') return { page: 'dashboard' }; if (pathname === '/saved') return { page: 'saved' }; if (pathname === '/invites') return { page: 'account', hash: 'invites' }; diff --git a/frontend/src/components/legal/legal-content.ts b/frontend/src/components/legal/legal-content.ts index 108f1cc..7ea6a7d 100644 --- a/frontend/src/components/legal/legal-content.ts +++ b/frontend/src/components/legal/legal-content.ts @@ -171,7 +171,9 @@ export const PRIVACY: LegalDoc = { }, { heading: '8. Children', - paragraphs: ['The service is aimed at home buyers and renters and is not directed at children under 16.'], + paragraphs: [ + 'The service is aimed at home buyers and renters and is not directed at children under 16.', + ], }, { heading: '9. Changes to this policy', diff --git a/frontend/src/components/ui/PlaceSearchInput.tsx b/frontend/src/components/ui/PlaceSearchInput.tsx index 9fa5081..0e55935 100644 --- a/frontend/src/components/ui/PlaceSearchInput.tsx +++ b/frontend/src/components/ui/PlaceSearchInput.tsx @@ -23,7 +23,9 @@ interface SearchHook { /** Addresses arrive in raw ALL-CAPS Land Registry casing; title-case for display. */ function titleCaseAddress(address: string): string { - return address.toLowerCase().replace(/(^|[\s\-/(])([a-z])/g, (_, sep, c) => sep + c.toUpperCase()); + return address + .toLowerCase() + .replace(/(^|[\s\-/(])([a-z])/g, (_, sep, c) => sep + c.toUpperCase()); } interface PlaceSearchInputProps { diff --git a/frontend/src/lib/fit-bounds.ts b/frontend/src/lib/fit-bounds.ts index 2265dfb..56d7fb9 100644 --- a/frontend/src/lib/fit-bounds.ts +++ b/frontend/src/lib/fit-bounds.ts @@ -35,7 +35,10 @@ export function boundsToCenterZoom(bounds: GeoBounds): { lat: number; lng: numbe const zoomX = Math.log2((NOMINAL_VIEWPORT.width * 360) / (TILE_SIZE * lonSpan)); const zoomY = Math.log2((NOMINAL_VIEWPORT.height * 2 * Math.PI) / (TILE_SIZE * mercSpan)); - const zoom = Math.max(MAP_MIN_ZOOM, Math.min(MAX_FIT_ZOOM, Math.min(zoomX, zoomY) - ZOOM_PADDING)); + const zoom = Math.max( + MAP_MIN_ZOOM, + Math.min(MAX_FIT_ZOOM, Math.min(zoomX, zoomY) - ZOOM_PADDING) + ); return { lat: (south + north) / 2, diff --git a/pipeline/transform/crime_spatial.py b/pipeline/transform/crime_spatial.py index 55b1baa..9a061ef 100644 --- a/pipeline/transform/crime_spatial.py +++ b/pipeline/transform/crime_spatial.py @@ -15,15 +15,42 @@ crime *density* rather than how much ground the buffer sweeps (a median-sized catchment is left unchanged; a large rural postcode is no longer inflated simply for covering more of the map). Normalising by the buffered area -- the region that actually collects points -- rather than the raw polygon keeps tiny unit -postcodes from being over-inflated by the fixed buffer-ring floor. The headline -``"{type} (avg/yr)"`` is the simple mean of the per-year annualised counts, so it -equals the average of the by-year chart bars. +postcodes from being over-inflated by the fixed buffer-ring floor. NOTE: this is +an incident *density of the surrounding streets*, not a per-resident risk -- +zero-resident commercial centres (Soho, retail parks) legitimately rank high. + +**Force-coverage calendar.** police.uk has multi-year publication gaps for whole +forces (Greater Manchester has published nothing between 2019-07 and the present +except 2022-08; BTP, Gloucestershire, Devon & Cornwall and others have shorter +gaps). A missing month is *no data*, not zero crime, so every figure here is +computed against the months the postcode's own force actually published: + +* Each postcode is assigned a home force by majority vote of the incidents that + matched it (BTP, which reports nationwide, is excluded from the vote); + postcodes with no incidents inherit their outcode's majority force, then the + national modal force. +* The headline ``"{type} (avg/yr)"`` is the POOLED annualised rate over the + force's covered months: ``sum(counts in covered years) * 12 / covered_months``. + Years in which the force published nothing contribute neither incidents nor + months, so a coverage gap no longer reads as a low-crime period. (Pooling over + covered months also fixes the old "divide by years-with-incidents" headline, + which inflated sporadic categories by up to ~15x.) +* The by-year series only emits bars for years with at least + ``min_bar_months`` covered months (default 6): annualising a single observed + month x12 produced misleading spikes. Each bar is scaled by the force's + covered months in that year, not the global month calendar. +* ``covered_years`` (list[struct{year, months}]) is written for every postcode + so the server can tell "covered, zero crime" (year listed, no bar) from "no + data" (year absent) instead of charting gaps as zeros. +* Postcodes whose boundary buffer is unusable (broken geometry) get null + headline columns and an empty ``covered_years`` -- unknown, not zero. Outputs mirror the old LSOA transform's shape but are keyed on ``postcode``: * ``crime_by_postcode.parquet`` -- ``postcode`` + ``"{type} (avg/yr)"`` columns. -* ``crime_by_postcode_by_year.parquet`` -- ``postcode`` + ``"{type} (by year)"`` - nested ``list[struct{year, count}]`` columns, with Serious/Minor rollups. +* ``crime_by_postcode_by_year.parquet`` -- one row per postcode: ``postcode`` + + ``covered_years`` + nested ``"{type} (by year)"`` ``list[struct{year, count}]`` + columns, with Serious/Minor rollups. Caveat: police.uk coordinates are snapped to a fixed set of anonymous "map points", not true locations, and a share of rows have no coordinate at all @@ -56,6 +83,22 @@ ALL_CRIME_TYPES: tuple[str, ...] = SERIOUS_CRIME_TYPES + MINOR_CRIME_TYPES DEFAULT_BUFFER_M = 100.0 MONTH_DIR_RE = re.compile(r"^\d{4}-\d{2}$") +STREET_CSV_NAME_RE = re.compile(r"^(\d{4}-\d{2})-(.+)-street\.csv$") + +# Minimum covered months for a year to get a by-year chart bar (and to be +# listed in `covered_years`). Annualising fewer observed months (x12 from a +# single month at the worst) produces bars dominated by noise, and the first +# (2010: one month) and current partial year would otherwise always chart as +# spikes/dips. Six months keeps the annualisation factor <= 2. +MIN_BAR_MONTHS = 6 + +# Forces that report nationwide rather than policing a territory. They never +# define a postcode's home force (their publication calendar says nothing about +# whether the *territorial* force covering the postcode published), but their +# incidents still count toward whichever postcodes they fall in. +NON_TERRITORIAL_FORCES = frozenset({"btp"}) + +COVERAGE_COLUMN = "covered_years" # Generous GB bounds; points outside fall in no English postcode anyway, but # filtering first keeps the WGS84->BNG transform out of its undefined region. @@ -67,27 +110,51 @@ LAT_BOUNDS = (49.0, 61.5) _CSV_BATCH = 64 -def _month_calendar(csvs: list[Path]) -> tuple[list[int], dict[int, int], int]: - """Derive annualisation denominators from the monthly directory names. +def _force_calendar( + csvs: list[Path], +) -> tuple[list[int], list[str], np.ndarray]: + """Derive the per-force publication calendar from the CSV paths. - Each police.uk file lives under ``{crime_dir}/{YYYY-MM}/...`` and holds that - month's incidents, so the set of month directories is the set of observed - months. Returns the sorted distinct years, months-observed-per-year, and the - total month count (the avg/yr denominator). + Each police.uk file lives under ``{crime_dir}/{YYYY-MM}/{YYYY-MM}-{force}- + street.csv`` and holds that force's incidents for that month, so file + presence IS the coverage signal: a (force, month) with no file published + nothing. Returns the sorted distinct years, the force slugs (sorted), and + ``months_in_year_force`` of shape (n_forces, n_years) -- how many months + each force published in each year. """ - months = sorted( - {path.parent.name for path in csvs if MONTH_DIR_RE.fullmatch(path.parent.name)} - ) - if not months: - raise ValueError("No valid YYYY-MM month directories found among crime CSVs") + month_force: set[tuple[str, str]] = set() + for path in csvs: + if not MONTH_DIR_RE.fullmatch(path.parent.name): + continue + m = STREET_CSV_NAME_RE.fullmatch(path.name) + if m is None or m.group(1) != path.parent.name: + continue + month_force.add((m.group(1), m.group(2))) + if not month_force: + raise ValueError("No valid YYYY-MM street crime CSVs found") - months_in_year: dict[int, int] = {} - for month in months: - year = int(month[:4]) - months_in_year[year] = months_in_year.get(year, 0) + 1 + years = sorted({int(month[:4]) for month, _ in month_force}) + forces = sorted({force for _, force in month_force}) + year_to_idx = {year: idx for idx, year in enumerate(years)} + force_to_idx = {force: idx for idx, force in enumerate(forces)} - years = sorted(months_in_year) - return years, months_in_year, len(months) + months_in_year_force = np.zeros((len(forces), len(years)), dtype=np.int32) + for month, force in month_force: + months_in_year_force[force_to_idx[force], year_to_idx[int(month[:4])]] += 1 + + # Surface coverage gaps loudly: any territorial force missing months inside + # the global publication window is exactly the data hole the coverage + # masking exists for. + all_months = {month for month, _ in month_force} + for force in forces: + published = {m for m, f in month_force if f == force} + missing = len(all_months) - len(published) + if missing: + print( + f" coverage gap: {force} missing {missing}/{len(all_months)} months" + ) + + return years, forces, months_in_year_force def _build_tree( @@ -111,10 +178,17 @@ def _accumulate_counts( tree: shapely.STRtree, type_to_idx: dict[str, int], year_to_idx: dict[int, int], + force_to_idx: dict[str, int], transformer: Transformer, counts: np.ndarray, + force_votes: np.ndarray, ) -> None: - """Stream the crime CSVs, counting points-in-buffer per (postcode, type, year).""" + """Stream the crime CSVs, counting points-in-buffer per (postcode, type, year). + + Also accumulates ``force_votes`` (n_postcodes, n_forces): how many matched + incidents each force's files contributed to each postcode, which later + elects the postcode's home force for the coverage calendar. + """ schema = { "Longitude": pl.Float64, "Latitude": pl.Float64, @@ -129,13 +203,22 @@ def _accumulate_counts( for start in range(0, len(csvs), _CSV_BATCH): batch = csvs[start : start + _CSV_BATCH] + # The source file identifies the publishing force (police.uk has no + # force column with consistent naming); map each path back to its + # force index for the home-force vote. + path_to_fidx = {} + for path in batch: + m = STREET_CSV_NAME_RE.fullmatch(path.name) + if m is not None and m.group(2) in force_to_idx: + path_to_fidx[str(path)] = force_to_idx[m.group(2)] frame = ( pl.scan_csv( batch, schema_overrides=schema, ignore_errors=True, + include_file_paths="_source_path", ) - .select("Longitude", "Latitude", "Month", "Crime type") + .select("Longitude", "Latitude", "Month", "Crime type", "_source_path") # strict=False: a single malformed Month drops only that row instead # of aborting the whole build (a non-numeric year becomes null and is # filtered out by the year membership check below). @@ -166,8 +249,11 @@ def _accumulate_counts( pl.col("year") .replace_strict(year_to_idx, return_dtype=pl.Int32) .alias("yidx"), + pl.col("_source_path") + .replace_strict(path_to_fidx, default=-1, return_dtype=pl.Int32) + .alias("fidx"), ) - .select("Longitude", "Latitude", "Crime type", "tidx", "yidx") + .select("Longitude", "Latitude", "Crime type", "tidx", "yidx", "fidx") .collect(engine="streaming") ) @@ -186,13 +272,20 @@ def _accumulate_counts( lat = frame["Latitude"].to_numpy() tidx = frame["tidx"].to_numpy() yidx = frame["yidx"].to_numpy() + fidx = frame["fidx"].to_numpy() x, y = transformer.transform(lon, lat) finite = np.isfinite(x) & np.isfinite(y) total_dropped += int((~finite).sum()) if not finite.any(): continue - x, y, tidx, yidx = x[finite], y[finite], tidx[finite], yidx[finite] + x, y, tidx, yidx, fidx = ( + x[finite], + y[finite], + tidx[finite], + yidx[finite], + fidx[finite], + ) total_points += x.size points = shapely.points(x, y) @@ -203,6 +296,14 @@ def _accumulate_counts( (postcode_index, tidx[point_index], yidx[point_index]), 1, ) + matched_fidx = fidx[point_index] + known_force = matched_fidx >= 0 + if known_force.any(): + np.add.at( + force_votes, + (postcode_index[known_force], matched_fidx[known_force]), + 1, + ) total_matches += point_index.size print( @@ -228,6 +329,56 @@ def _accumulate_counts( ) +def _assign_home_force( + postcodes: np.ndarray, + force_votes: np.ndarray, + forces: list[str], +) -> np.ndarray: + """Elect each postcode's home (territorial) force. + + Majority vote of matched incidents per publishing force; non-territorial + forces (BTP) are excluded from the vote because their calendar says nothing + about local coverage. Postcodes with no votes (no incidents ever, or + BTP-only) inherit the majority force of their outcode, then the national + modal force, so every postcode gets a coverage calendar. + """ + votes = force_votes.astype(np.int64, copy=True) + for idx, force in enumerate(forces): + if force in NON_TERRITORIAL_FORCES: + votes[:, idx] = 0 + + home = votes.argmax(axis=1).astype(np.int32) + has_vote = votes.max(axis=1) > 0 + home[~has_vote] = -1 + + if not has_vote.any(): + raise ValueError("No incidents matched any postcode; cannot assign forces") + + # Outcode-majority fallback for postcodes with no (territorial) incidents. + outcodes = np.array([pc.split(" ")[0] for pc in postcodes], dtype=object) + national_modal = int( + np.bincount(home[has_vote], minlength=len(forces)).argmax() + ) + if (~has_vote).any(): + outcode_modal: dict[str, int] = {} + voted_outcodes = outcodes[has_vote] + voted_home = home[has_vote] + for oc in np.unique(voted_outcodes): + counts = np.bincount(voted_home[voted_outcodes == oc], minlength=len(forces)) + outcode_modal[oc] = int(counts.argmax()) + fallback = np.array( + [outcode_modal.get(oc, national_modal) for oc in outcodes[~has_vote]], + dtype=np.int32, + ) + home[~has_vote] = fallback + print( + f" {int((~has_vote).sum()):,} postcodes had no territorial incidents; " + "home force inherited from outcode majority" + ) + + return home + + def _rollup_long( long: pl.DataFrame, types: tuple[str, ...], rollup_name: str ) -> pl.DataFrame: @@ -244,30 +395,41 @@ def _rollup_long( def _write_avg_yr( postcodes: np.ndarray, counts: np.ndarray, - years: list[int], - months_in_year: dict[int, int], + months_in_year_force: np.ndarray, + home_fidx: np.ndarray, norm: np.ndarray, output_path: Path, ) -> None: """Write ``postcode`` + ``"{type} (avg/yr)"`` density-normalised averages. - The headline figure is the **simple mean of the per-year annualised counts** - (each year scaled to a 12-month equivalent), so it equals the average of the - by-year chart bars instead of a month-weighted pooled rate. Each postcode's - value is then multiplied by ``norm`` (median_area / buffered catchment area) - so the metric is a density rather than a footprint-inflated raw count. + The headline is the POOLED annualised rate over the home force's covered + months: ``sum(counts in covered years) * 12 / covered_months``. Years the + force published nothing contribute neither incidents nor months, so a + coverage gap (e.g. Greater Manchester 2019-07 onwards) is excluded instead + of read as zero crime. Pooling over the full covered window -- rather than + averaging only over years a type happened to occur -- is what keeps a + single robbery-year from printing as a perennial robbery rate. Each + postcode's value is then multiplied by ``norm`` (median_area / buffered + catchment area) so the metric is a density rather than a footprint-inflated + raw count; postcodes with unusable geometry (norm == 0) are null, not 0. """ - months = np.array([months_in_year[year] for year in years], dtype=np.float64) - per_year = counts.astype(np.float64) * 12.0 / months[None, None, :] - # Average over the years *this postcode* actually has incidents of *this - # type* -- the same per-(postcode, type) x-span the by-year chart plots - # (server-rs/.../crime_by_year.rs), so the headline equals the mean of the - # by-year bars. Dividing by a global years-present count (years a type - # appeared anywhere in England) would deflate postcodes whose incidents - # cluster in only a few years of the ~13-year window. - years_present = np.clip((counts > 0).sum(axis=2), 1, None).astype(np.float64) - avg = per_year.sum(axis=2) / years_present # (n_postcodes, n_types) - avg = np.round(avg * norm[:, None], 1).astype(np.float32) + n_postcodes, n_types = counts.shape[0], counts.shape[1] + avg = np.full((n_postcodes, n_types), np.nan, dtype=np.float64) + for f in range(months_in_year_force.shape[0]): + sel = home_fidx == f + if not sel.any(): + continue + cov_months = months_in_year_force[f].astype(np.float64) + denom = cov_months.sum() + if denom <= 0: + continue # force never published; stays null + covered_years = cov_months > 0 + pooled = counts[sel][:, :, covered_years].sum(axis=2, dtype=np.float64) + avg[sel] = pooled * 12.0 / denom + + avg *= norm[:, None] + avg[norm <= 0] = np.nan # unusable geometry: unknown, not zero + avg = np.round(avg, 1).astype(np.float32) data: dict[str, np.ndarray] = {"postcode": postcodes} for type_idx, name in enumerate(ALL_CRIME_TYPES): @@ -275,14 +437,10 @@ def _write_avg_yr( # Serious/Minor rollup headlines = the exact SUM of their component (avg/yr) # columns, so each rollup always equals the sum of the parts shown beside it - # and can never fall below one of its own components. (Previously the rollup - # re-derived a union-years-present mean: it divided the summed counts by the - # number of years in which ANY component type occurred, whereas each - # component divides by its OWN years-present. When a postcode's serious/minor - # types occurred in disjoint years the union denominator was larger, so the - # rollup came out smaller than the sum of its parts.) The by-year rollup - # series in _write_by_year is likewise the per-year sum of the component - # bars, so headline and chart both present the rollup as the sum of its parts. + # and can never fall below one of its own components. All components share + # the postcode's pooled covered-month denominator, so the sum is itself the + # pooled rollup rate. Null components (unusable geometry) propagate to a + # null rollup. for rollup_name, rollup_types in ( ("Serious crime", SERIOUS_CRIME_TYPES), ("Minor crime", MINOR_CRIME_TYPES), @@ -292,8 +450,12 @@ def _write_avg_yr( avg[:, rollup_idx].sum(axis=1), 1 ).astype(np.float32) + frame = pl.DataFrame(data) + value_cols = [c for c in frame.columns if c != "postcode"] + frame = frame.with_columns(pl.col(c).fill_nan(None) for c in value_cols) + output_path.parent.mkdir(parents=True, exist_ok=True) - pl.DataFrame(data).write_parquet(output_path, compression="zstd") + frame.write_parquet(output_path, compression="zstd") print(f"Wrote postcode crime averages: {output_path}") @@ -301,35 +463,60 @@ def _write_by_year( postcodes: np.ndarray, counts: np.ndarray, years: list[int], - months_in_year: dict[int, int], + months_in_year_force: np.ndarray, + home_fidx: np.ndarray, norm: np.ndarray, + min_bar_months: int, output_path: Path, ) -> None: - """Write nested ``"{type} (by year)"`` series plus Serious/Minor rollups. + """Write nested ``"{type} (by year)"`` series plus rollups and coverage. - Per-year counts are area-normalised by the same ``norm`` (median_area / - buffered catchment area) factor applied to the avg/yr headline, so the chart - bars and the headline figure remain mutually consistent. + A bar is only emitted for (postcode, year)s where the postcode's home force + published at least ``min_bar_months`` months -- annualising a thinner year + (x12 from a single month at the extreme) charts noise, and a force-gap year + must chart as *no data*, not zero. Bars are scaled by the force's covered + months in that year and area-normalised by the same ``norm`` factor as the + headline so chart and headline stay mutually consistent. + + Every postcode gets a row (the output is dense) carrying ``covered_years`` + -- the list of {year, months} the home force published at least + ``min_bar_months`` months -- so consumers can distinguish covered-but- + crime-free years (year listed, no bar => genuine zero) from coverage gaps + (year absent => unknown). Postcodes with unusable geometry get an empty + coverage list: their crime picture is unknown. """ - months = np.array([months_in_year[year] for year in years], dtype=np.float64) + # (n_postcodes, n_years): covered months of each postcode's home force. + cov_pc_year = months_in_year_force[home_fidx, :] + usable = norm > 0 + annual = np.round( - counts.astype(np.float64) * 12.0 / months[None, None, :] * norm[:, None, None], + counts.astype(np.float64) + * 12.0 + / np.maximum(cov_pc_year[:, None, :], 1) + * norm[:, None, None], 1, ) + bar_ok = ( + (counts > 0) + & (cov_pc_year[:, None, :] >= min_bar_months) + & usable[:, None, None] + ) - pc_i, ty_i, yr_i = np.nonzero(counts) - if pc_i.size == 0: - raise ValueError("No crime points matched any postcode buffer") + pc_i, ty_i, yr_i = np.nonzero(bar_ok) type_names = np.array(ALL_CRIME_TYPES, dtype=object) year_values = np.array(years, dtype=np.int32) + # Explicit schema: with full masking (e.g. every year below min_bar_months) + # the fancy-indexed numpy object arrays are empty and polars would infer + # Object columns, which breaks the rollup `is_in` below. long = pl.DataFrame( { - "postcode": postcodes[pc_i], - "Crime type": type_names[ty_i], + "postcode": postcodes[pc_i].astype(str), + "Crime type": type_names[ty_i].astype(str), "year": year_values[yr_i], "count": annual[pc_i, ty_i, yr_i].astype(np.float32), - } + }, + schema_overrides={"postcode": pl.String, "Crime type": pl.String}, ) serious = _rollup_long(long, SERIOUS_CRIME_TYPES, "Serious crime") @@ -345,6 +532,46 @@ def _write_by_year( type_cols = [c for c in wide.columns if c != "postcode"] wide = wide.rename({col: f"{col} (by year)" for col in type_cols}) + # Dense base: every postcode, with its home force's coverage calendar. + # Built per force (there are ~45) and joined on the force index. + coverage_per_force: list[list[dict[str, int]]] = [] + for f in range(months_in_year_force.shape[0]): + coverage_per_force.append( + [ + {"year": int(years[y]), "months": int(m)} + for y, m in enumerate(months_in_year_force[f]) + if m >= min_bar_months + ] + ) + coverage_frame = pl.DataFrame( + { + "_fidx": pl.Series(range(len(coverage_per_force)), dtype=pl.Int32), + COVERAGE_COLUMN: pl.Series( + coverage_per_force, + dtype=pl.List(pl.Struct({"year": pl.Int32, "months": pl.Int32})), + ), + } + ) + base = pl.DataFrame( + { + "postcode": postcodes, + "_fidx": pl.Series(home_fidx, dtype=pl.Int32), + "_usable": pl.Series(usable), + } + ) + dense = ( + base.join(coverage_frame, on="_fidx", how="left") + .with_columns( + # Unusable geometry: empty coverage -- the crime picture is unknown. + pl.when(pl.col("_usable")) + .then(pl.col(COVERAGE_COLUMN)) + .otherwise(pl.col(COVERAGE_COLUMN).list.head(0)) + .alias(COVERAGE_COLUMN) + ) + .drop("_fidx", "_usable") + ) + wide = dense.join(wide, on="postcode", how="left") + output_path.parent.mkdir(parents=True, exist_ok=True) wide.write_parquet(output_path, compression="zstd") print(f"Wrote postcode crime by-year series: {output_path} {wide.shape}") @@ -358,6 +585,7 @@ def transform_crime_spatial( buffer_m: float = DEFAULT_BUFFER_M, max_postcodes: int | None = None, max_files: int | None = None, + min_bar_months: int = MIN_BAR_MONTHS, ) -> None: csvs, ignored_csv_count = find_street_crime_csvs(crime_dir) if not csvs: @@ -365,9 +593,9 @@ def transform_crime_spatial( if max_files is not None: csvs = csvs[:max_files] - years, months_in_year, valid_month_count = _month_calendar(csvs) + years, forces, months_in_year_force = _force_calendar(csvs) print( - f"Found {len(csvs):,} street crime CSVs across {valid_month_count} months " + f"Found {len(csvs):,} street crime CSVs across {len(forces)} forces " f"({years[0]}-{years[-1]})" + (f" (ignored {ignored_csv_count} non-street CSVs)" if ignored_csv_count else "") ) @@ -397,18 +625,35 @@ def transform_crime_spatial( type_to_idx = {name: idx for idx, name in enumerate(ALL_CRIME_TYPES)} year_to_idx = {year: idx for idx, year in enumerate(years)} + force_to_idx = {force: idx for idx, force in enumerate(forces)} counts = np.zeros((len(postcodes), len(ALL_CRIME_TYPES), len(years)), dtype=np.int32) + force_votes = np.zeros((len(postcodes), len(forces)), dtype=np.int32) transformer = Transformer.from_crs("EPSG:4326", "EPSG:27700", always_xy=True) - _accumulate_counts(csvs, tree, type_to_idx, year_to_idx, transformer, counts) + _accumulate_counts( + csvs, tree, type_to_idx, year_to_idx, force_to_idx, transformer, counts, force_votes + ) - _write_avg_yr(postcodes, counts, years, months_in_year, norm, output_path) - _write_by_year(postcodes, counts, years, months_in_year, norm, by_year_output_path) + home_fidx = _assign_home_force(np.asarray(postcodes), force_votes, forces) + + _write_avg_yr( + postcodes, counts, months_in_year_force, home_fidx, norm, output_path + ) + _write_by_year( + postcodes, + counts, + years, + months_in_year_force, + home_fidx, + norm, + min_bar_months, + by_year_output_path, + ) def main() -> None: parser = argparse.ArgumentParser( - description="Count police.uk crime points within 50m of each postcode boundary" + description="Count police.uk crime points near each postcode boundary" ) parser.add_argument( "--input", @@ -452,6 +697,12 @@ def main() -> None: default=None, help="Testing only: process the first N monthly CSV files", ) + parser.add_argument( + "--min-bar-months", + type=int, + default=MIN_BAR_MONTHS, + help="Minimum covered months for a year to get a by-year bar", + ) args = parser.parse_args() if args.buffer_m <= 0: @@ -465,6 +716,7 @@ def main() -> None: buffer_m=args.buffer_m, max_postcodes=args.max_postcodes, max_files=args.max_files, + min_bar_months=args.min_bar_months, ) diff --git a/pipeline/transform/join_epc_pp.py b/pipeline/transform/join_epc_pp.py index 13b855f..cdfc99a 100644 --- a/pipeline/transform/join_epc_pp.py +++ b/pipeline/transform/join_epc_pp.py @@ -31,6 +31,22 @@ RATING_RANK = {"A": 1, "B": 2, "C": 3, "D": 4, "E": 5, "F": 6, "G": 7} # conservative tradeoff to keep clearly-implausible transfers out. MIN_PRICE = 10_000 +# Time-aware consecutive-sale jump guard. Price-paid contains keyed-in price +# errors that pass the MIN_PRICE/category filters — e.g. 13 QUICKSETTS HR2 7PP, +# a 93 m² terrace, sold £140,000 in 2016 then "£207,500,000" in 2026 (clearly +# £207,500 with extra digits, lodged as category A) — and would otherwise +# become latest_price. A quality sale is flagged when it exceeds its +# neighbouring sale by more than JUMP_TOLERANCE * JUMP_GROWTH_PER_YEAR ** years +# between the two sales. Calibration: genuine extreme appreciation (prime +# London 1995->2026 is roughly x50 over 31 years) stays comfortably under +# 12 * 1.10**31 ≈ 230, while the HR2 case (x1,482 over 10 years against a +# threshold of 12 * 1.10**10 ≈ 31) is caught. JUMP_MIN_PRICE is an absolute +# floor on the flagged price itself so right-to-buy resales and other +# legitimate x20-50 jumps on cheap properties are never flagged. +JUMP_TOLERANCE = 12.0 +JUMP_GROWTH_PER_YEAR = 1.10 +JUMP_MIN_PRICE = 2_000_000 + # Plausible construction-year range; band-derived years outside it (e.g. OCR # noise like 1012 or 2202) are nulled rather than published. MIN_BUILD_YEAR = 1700 @@ -286,6 +302,64 @@ def _scan_epc_certificates(epc_path: Path, temp_dir: Path) -> pl.LazyFrame: return _select_epc_columns(raw) +def flag_price_outliers(slim: pl.DataFrame) -> pl.DataFrame: + """Flag the implausible side of extreme consecutive-sale price jumps. + + ``slim`` holds one row per quality (>= MIN_PRICE, category A) sale: + (_pp_group_address, _pp_group_postcode, date_of_transfer, price). Per + property, each sale is compared against its previous and next sale and + the HIGHER sale of an implausible pair is flagged: + + - UP rule: the sale is more than the time-aware threshold above its + PREVIOUS sale (catches a garbage spike after a normal sale); + - DOWN rule: the NEXT sale is less than 1/threshold of this one (catches + a garbage spike before a normal sale); + - either way the flagged price itself must be >= JUMP_MIN_PRICE, so + cheap-property noise and right-to-buy-style resales stay safe. + + Runs as a bounded EAGER pass: .shift().over() window functions may not + execute under the streaming sink used by fuzzy_join_on_postcode, so the + flags are computed here and left-joined back into the lazy stream. + + Returns the exclusion rows (group keys, date_of_transfer, price) with a + literal ``_price_outlier`` column, unique on the four join columns so + the join-back can never fan out. + """ + group_keys = ["_pp_group_address", "_pp_group_postcode"] + # Years between consecutive sales, floored at six months so back-to-back + # transfers don't get a near-zero exponent and an over-tight threshold. + dy_prev = ( + (pl.col("date_of_transfer") - pl.col("_prev_date")).dt.total_days() / 365.25 + ).clip(lower_bound=0.5) + dy_next = ( + (pl.col("_next_date") - pl.col("date_of_transfer")).dt.total_days() / 365.25 + ).clip(lower_bound=0.5) + up_rule = (pl.col("price") / pl.col("_prev_price")) > JUMP_TOLERANCE * pl.lit( + JUMP_GROWTH_PER_YEAR + ).pow(dy_prev) + down_rule = (pl.col("_next_price") / pl.col("price")) < 1 / ( + JUMP_TOLERANCE * pl.lit(JUMP_GROWTH_PER_YEAR).pow(dy_next) + ) + return ( + slim.sort([*group_keys, "date_of_transfer"]) + .with_columns( + pl.col("price").shift(1).over(group_keys).alias("_prev_price"), + pl.col("date_of_transfer").shift(1).over(group_keys).alias("_prev_date"), + pl.col("price").shift(-1).over(group_keys).alias("_next_price"), + pl.col("date_of_transfer").shift(-1).over(group_keys).alias("_next_date"), + ) + # fill_null(False): a missing neighbour (first/last sale of a group) + # makes that rule's comparison null, which must read as "not flagged". + .filter( + (up_rule.fill_null(False) | down_rule.fill_null(False)) + & (pl.col("price") >= JUMP_MIN_PRICE) + ) + .select(*group_keys, "date_of_transfer", "price") + .unique() + .with_columns(pl.lit(True).alias("_price_outlier")) + ) + + def main(): parser = argparse.ArgumentParser(description="Fuzzy join EPC and Price Paid data") parser.add_argument( @@ -429,15 +503,19 @@ def _run(epc_path: Path, price_paid_path: Path, output_path: Path, temp_dir: Pat # price >= MIN_PRICE and ppd_category == "A" (standard open-market sale) are # VALUE-QUALITY filters: they gate the price aggregations only. Category B - # entries (repossessions, bulk/portfolio, power-of-sale transfers) and sub-MIN - # sales must not pollute latest_price / historical_prices (and the downstream - # price-per-sqm feature), but they MUST still count for first_transfer_date / - # old_new so a new-build's genuine earliest transfer year is preserved. + # entries (repossessions, bulk/portfolio, power-of-sale transfers), sub-MIN + # sales and jump-flagged outliers must not pollute latest_price / + # historical_prices (and the downstream price-per-sqm feature), but they + # MUST still count for first_transfer_date / old_new so a new-build's + # genuine earliest transfer year is preserved. price_ok = pl.col("price") >= MIN_PRICE category_ok = pl.col("ppd_category") == "A" - quality_ok = price_ok & category_ok + value_ok = price_ok & category_ok + # quality_ok additionally excludes consecutive-sale jump outliers (see + # flag_price_outliers); _price_outlier exists only after the join below. + quality_ok = value_ok & pl.col("_price_outlier").is_null() - price_paid = ( + price_paid_base = ( pl.scan_parquet(price_paid_path) .select( "price", @@ -469,6 +547,52 @@ def _run(epc_path: Path, price_paid_path: Path, output_path: Path, temp_dir: Pat pl.col("_pp_match_postcode").alias("_pp_group_postcode"), ) .filter(pl.col("pp_address").is_not_null()) + # Price-paid carries ~72k duplicate (address, postcode, date, price) + # transaction groups with DISTINCT transaction ids — the same completed + # sale lodged twice — which double-counted sales in historical_prices. + # Collapse each to one row. ppd_category stays in the subset so an + # A/B-categorised pair of the same sale survives as two rows; only the + # A row feeds the price aggregations (quality_ok), which is intentional. + .unique( + subset=[ + "_pp_group_address", + "_pp_group_postcode", + "date_of_transfer", + "price", + "ppd_category", + ], + keep="any", + ) + ) + + # Bounded eager pass over the quality sales only (~30M rows x 4 narrow + # columns): the window functions inside flag_price_outliers may not run + # under the streaming sink used by fuzzy_join_on_postcode, so the outlier + # flags are computed here and joined back into the lazy stream. + outliers = flag_price_outliers( + price_paid_base.filter(value_ok) + .select( + "_pp_group_address", "_pp_group_postcode", "date_of_transfer", "price" + ) + .collect(engine="streaming") + ) + print(f"Implausible consecutive-sale price jumps flagged: {outliers.height}") + + price_paid = ( + # Outlier rows stay in the stream (they still count for + # first_transfer_date / old_new, same as category-B sales); quality_ok + # merely drops them from the price aggregations. _price_outlier is not + # aggregated below, so the helper column dies with the group_by. + price_paid_base.join( + outliers.lazy(), + on=[ + "_pp_group_address", + "_pp_group_postcode", + "date_of_transfer", + "price", + ], + how="left", + ) .sort("date_of_transfer") .group_by("_pp_group_address", "_pp_group_postcode", maintain_order=True) .agg( @@ -511,6 +635,9 @@ def _run(epc_path: Path, price_paid_path: Path, output_path: Path, temp_dir: Pat right_postcode_col="epc_postcode", ) .drop("epc_postcode") + # Audit trail: keep the fuzzy-match confidence (100 = exact address + # match) in the published output; null means no EPC match. + .rename({"_match_score": "epc_match_score"}) .collect(engine="streaming") ) diff --git a/pipeline/transform/price_estimation/estimate.py b/pipeline/transform/price_estimation/estimate.py index c5d3e93..efc51c9 100644 --- a/pipeline/transform/price_estimation/estimate.py +++ b/pipeline/transform/price_estimation/estimate.py @@ -25,6 +25,7 @@ from pipeline.transform.price_estimation.knn import ( ) from pipeline.transform.price_estimation.utils import ( CURRENT_FRAC_YEAR, + CURRENT_YEAR, MAX_LOG_ADJUSTMENT, interpolate_log_index, sector_expr, @@ -41,6 +42,87 @@ MIN_KNN_TO_INDEX_RATIO = 0.5 # only catching outliers. MAX_ESTIMATE_TO_LAST_PRICE_RATIO = 20.0 +# Guard for rows with NO usable floor area: the per-sqm plausibility check +# cannot fire there, which let commercial blocks misfiled as dwellings keep +# absurd headline estimates (e.g. a GBP 175M "Detached" in SW1W). Without +# floor area we cannot psm-check, so the only sanity reference left is what +# the local market actually pays: beyond this multiple of the district's +# recent 99th-percentile sale price the estimate is unreliable and misleading, +# so it is nulled rather than shown. +FLOORLESS_ESTIMATE_P99_MULT = 2.0 +# Never null a floorless estimate below this absolute value: genuine mansions +# in cheap districts can legitimately exceed 2x their district's recent p99, +# but a sub-GBP 2M estimate is within the plausible single-dwelling range +# anywhere in the UK, so it survives regardless of the local p99. +FLOORLESS_ESTIMATE_MIN_CAP = 2_000_000.0 +# Look-back window for the district p99 reference: long enough that thin +# districts accumulate a usable sale sample, short enough that the reference +# reflects today's price level rather than a pre-boom one. +FLOORLESS_P99_LOOKBACK_YEARS = 10 + + +def apply_floorless_estimate_guard(df: pl.DataFrame) -> pl.DataFrame: + """Null floor-area-less estimates far above their district's recent sales. + + Builds a per-district reference from the SAME frame -- the 99th percentile + of `Last known price` over sales in the last FLOORLESS_P99_LOOKBACK_YEARS + -- and nulls `Estimated current price` where the floor area is null/zero + AND the estimate exceeds max(FLOORLESS_ESTIMATE_P99_MULT * p99, + FLOORLESS_ESTIMATE_MIN_CAP). Districts with no recent sales yield a null + p99 and are left alone: with neither a psm check nor a local reference we + cannot judge the estimate, and nulling on the absolute cap alone would be + too aggressive. Expects the `_sector` helper column; rows with floor area + present are never touched (the psm guard covers them). + """ + # District = sector minus the trailing sector digit group, matching the + # rsplit semantics of utils.hierarchy_keys ("SW1W 9" -> "SW1W"). + district = pl.col("_sector").str.replace(r"\s+\d+$", "") + + district_p99 = ( + df.lazy() + .filter( + pl.col("Last known price").is_not_null(), + pl.col("Date of last transaction").dt.year() + >= CURRENT_YEAR - FLOORLESS_P99_LOOKBACK_YEARS, + ) + .group_by(district.alias("_district")) + .agg( + pl.col("Last known price") + .cast(pl.Float64) + .quantile(0.99) + .alias("_district_p99") + ) + .collect() + ) + + df = df.with_columns(district.alias("_district")).join( + district_p99, on="_district", how="left", maintain_order="left" + ) + + floorless = pl.col("Total floor area (sqm)").is_null() | ( + pl.col("Total floor area (sqm)") <= 0 + ) + cap = pl.max_horizontal( + FLOORLESS_ESTIMATE_P99_MULT * pl.col("_district_p99"), + pl.lit(FLOORLESS_ESTIMATE_MIN_CAP), + ) + implausible = ( + pl.col("Estimated current price").is_not_null() + & floorless + & pl.col("_district_p99").is_not_null() + & (pl.col("Estimated current price") > cap) + ) + + n_nulled = df.select(implausible.sum()).item() + print(f" Floorless-estimate guard: nulled {n_nulled:,} estimates") + + return df.with_columns( + pl.when(implausible) + .then(None) + .otherwise(pl.col("Estimated current price")) + .alias("Estimated current price"), + ).drop("_district", "_district_p99") + def guarded_blend_estimates( index_est: np.ndarray, @@ -249,9 +331,16 @@ def main(): .alias("Estimated current price"), ) + # Floor-area-less rows escape the per-sqm guard above entirely; cap them + # against their district's recent sale prices instead (see + # apply_floorless_estimate_guard). Must run before temp columns + # (_sector) are dropped. + df = apply_floorless_estimate_guard(df) + # Derive estimated price per sqm where both estimated price and floor area # exist. Now that the implausible-psm estimates are nulled above, the band - # filter here mainly guards the floor-area>0 case. + # filter here mainly guards the floor-area>0 case. (The floorless guard + # never touches floor-area-present rows, so this derivation is unaffected.) _est_psm = pl.col("Estimated current price") / pl.col("Total floor area (sqm)") df = df.with_columns( pl.when( diff --git a/pipeline/transform/price_estimation/index.py b/pipeline/transform/price_estimation/index.py index 829baa3..0994939 100644 --- a/pipeline/transform/price_estimation/index.py +++ b/pipeline/transform/price_estimation/index.py @@ -17,11 +17,13 @@ from scipy.sparse.linalg import lsqr from tqdm import tqdm from pipeline.transform.price_estimation.shrinkage import ( + MAX_STEP_DEVIATION_PER_YEAR, blend_dicts, hierarchical_shrinkage, lift_onto_parent, shrink_dicts, spatial_smooth, + winsorize_steps, ) from pipeline.transform.price_estimation.utils import ( CURRENT_YEAR, @@ -485,8 +487,20 @@ def build_index( input_path, min_year, max_year, max_sale_year=estimation_cap ) - # Precompute hierarchy - all_sectors = pairs["sector"].unique().to_list() + # Precompute hierarchy. The sector universe is the UNION of sectors with + # repeat-sale pairs and every sector in the postcode universe (centroids + # is keyed by every sector derived from postcode.parquet): a sector whose + # properties never resold still gets a full index row via the district -> + # area -> national fallback in hierarchical_shrinkage (then spatial + # smoothing and forward fill). Restricting the universe to pairs-only + # sectors silently dropped ~15% of live sectors from the output, nulling + # every per-sector lookup and estimate there. n_pairs = 0 marks the + # synthesised cells. + all_sectors = sorted(set(pairs["sector"].unique().to_list()) | set(centroids)) + if sectors is not None: + # Debug scoping restricts the universe too, not just the pairs. + scoped = set(sectors) + all_sectors = [s for s in all_sectors if s in scoped] sector_to_dist = {} dist_to_area = {} for s in all_sectors: @@ -562,10 +576,23 @@ def build_index( sector_shrunk, centroids, sector_n, blend_dicts ) - # Forward fill + # Winsorise per-year steps against the national index, then forward + # fill. The support-scaled smoothness prior still under-penalises + # years identified by 1-2 pairs in thin early histories (observed: + # x9.7 single-year jumps in city-centre regeneration sectors); + # clamping each step to within +/-MAX_STEP_DEVIATION_PER_YEAR of the + # national move over the same span removes those artefacts while + # leaving genuine sector-vs-national divergence (well inside the + # band) untouched. for sec in all_sectors: sector_smoothed[sec] = forward_fill( - sector_smoothed.get(sec, hedonic_idx), min_year, max_year + winsorize_steps( + sector_smoothed.get(sec, hedonic_idx), + national_shrunk, + MAX_STEP_DEVIATION_PER_YEAR, + ), + min_year, + max_year, ) final[tg] = sector_smoothed diff --git a/pipeline/transform/price_estimation/shrinkage.py b/pipeline/transform/price_estimation/shrinkage.py index a74d5e3..9efebe5 100644 --- a/pipeline/transform/price_estimation/shrinkage.py +++ b/pipeline/transform/price_estimation/shrinkage.py @@ -12,6 +12,18 @@ V = TypeVar("V") SPATIAL_NEIGHBORS = 5 SPATIAL_BLEND_K = 30 +# Hard band on a sector's per-year index move RELATIVE to its parent (the +# national index), enforced by winsorize_steps after spatial smoothing. The +# support-scaled temporal smoothness prior still under-penalises years +# identified by only 1-2 repeat-sale pairs in thin early histories, leaving +# artefacts like a x9.7 single-year jump (log +2.27, sector "M3 1" +# 1998->1999). A sector may genuinely outpace the nation -- regeneration, new +# transport links -- but those stories play out over multiple years, not as a +# one-year x9.7 step. +/-0.40 log/yr (~x1.5 in a year) relative to the +# national move keeps every plausible genuine sector-level divergence while +# clamping thin-year data artefacts. +MAX_STEP_DEVIATION_PER_YEAR = 0.40 + def _base_value(index: dict[int, float], base_year: int) -> float: """Value of an index dict at `base_year`, with forward/back-fill for gaps. @@ -75,6 +87,42 @@ def lift_onto_parent( return {y: v + offset for y, v in child.items()} +def winsorize_steps( + child: dict[int, float], + parent: dict[int, float], + max_dev_per_year: float, +) -> dict[int, float]: + """Clamp a child's per-year index steps to within a band of the parent's. + + For each consecutive pair of solved years (y_prev, y) the child's per-year + rate r = (child[y] - child[y_prev]) / (y - y_prev) is winsorised into + [p - max_dev_per_year, p + max_dev_per_year], where p is the parent's + per-year rate over the same span (via _base_value, so gaps in the parent's + coverage are forward/back-filled rather than crashing). The series is then + rebuilt cumulatively from the FIRST year's value, so: + - the first year's level is preserved; + - non-outlier steps are preserved exactly (later years simply shift by + whatever the clamped steps removed); + - a multi-year gap is judged on its per-year rate, not as one giant + single-year move, so genuine level changes across gaps survive. + + A child with <2 years has no steps to clamp; an empty parent only occurs + in degenerate paths (build_index always passes the national index) -- both + are returned unchanged. + """ + if len(child) < 2 or not parent: + return child + years = sorted(child) + result = {years[0]: child[years[0]]} + for y_prev, y in zip(years[:-1], years[1:]): + span = y - y_prev + r = (child[y] - child[y_prev]) / span + p = (_base_value(parent, y) - _base_value(parent, y_prev)) / span + r = min(max(r, p - max_dev_per_year), p + max_dev_per_year) + result[y] = result[y_prev] + r * span + return result + + def shrink_dicts(raw: dict, parent: dict, n: int) -> dict: """Shrink dict values toward parent using n/(n+k) weighting. diff --git a/pipeline/transform/price_estimation/test_index.py b/pipeline/transform/price_estimation/test_index.py index 51e9590..8a1938f 100644 --- a/pipeline/transform/price_estimation/test_index.py +++ b/pipeline/transform/price_estimation/test_index.py @@ -1,14 +1,18 @@ +from datetime import date + import numpy as np import polars as pl from pipeline.transform.price_estimation import index as index_mod from pipeline.transform.price_estimation.index import ( MAX_EXTRAPOLATION_SLOPE, + build_index, compute_indices_for_level, extract_pairs, forward_fill, solve_robust_index, ) +from pipeline.transform.price_estimation.utils import CURRENT_YEAR, TYPE_GROUPS def _pairs_from_path(true_levels: dict[int, float]): @@ -269,3 +273,82 @@ def test_n_pairs_counts_only_cross_year_pairs(): assert "g" in indices assert n_pairs["g"] == 8 # not 11 + + +def _write_universe_fixtures(tmp_path): + """Properties with repeat sales only in sector 'AB1 2', plus a postcode + universe that also contains the pairless sector 'AB1 3'.""" + props = pl.DataFrame( + { + "Postcode": [f"AB1 2A{c}" for c in "ABCDEF"], + "Property type": ["Detached"] * 6, + "Total floor area (sqm)": [80.0] * 6, + "Last known price": [130_000] * 6, + "Date of last transaction": [date(2021, 6, 1)] * 6, + # 6 repeat-sale pairs 2018 -> 2021, log_ratio ~0.26 (well within + # the flat and annualised outlier caps), comfortably >= MIN_PAIRS. + "historical_prices": [ + [ + {"year": 2018, "month": 1, "price": 100_000}, + {"year": 2021, "month": 6, "price": 130_000}, + ] + ] + * 6, + } + ) + props_path = tmp_path / "props.parquet" + props.write_parquet(props_path) + + postcodes = pl.DataFrame( + { + "Postcode": ["AB1 2AA", "AB1 2AB", "AB1 3AA"], + "lat": [57.10, 57.10, 57.20], + "lon": [-2.10, -2.10, -2.20], + } + ) + pc_path = tmp_path / "postcodes.parquet" + postcodes.write_parquet(pc_path) + return props_path, pc_path + + +def test_build_index_covers_pairless_sectors_from_postcode_universe(tmp_path): + """FIX: the sector universe is pairs-sectors UNION postcode-universe + sectors, not just sectors that happened to have a repeat sale (which + silently dropped ~15% of live sectors from the output). A pairless sector + present in postcode.parquet must get index rows via the hierarchy + fallback: n_pairs == 0 marks the synthesised cells, with full year + coverage after forward fill.""" + props_path, pc_path = _write_universe_fixtures(tmp_path) + + result = build_index(props_path, postcodes_path=pc_path) + + pairless = result.filter(pl.col("sector") == "AB1 3") + assert len(pairless) > 0 + assert set(pairless["type_group"]) == {"All", *TYPE_GROUPS} + assert pairless["n_pairs"].to_list() == [0] * len(pairless) + assert pairless["log_index"].is_not_null().all() + # Full year coverage (min pair year .. CURRENT_YEAR) for the solved type + # groups. (Type groups with ( without_lift[2024] - without_lift[2008] ) + 0.1 + + +def test_winsorize_clamps_thin_year_spike_and_shifts_later_years(): + """A "M3 1"-style single-year spike (x9.7, log +2.27) is clamped to + parent_rate + max_dev; the first year's level is preserved, and later + years keep their OWN steps (the tail shifts down rigidly by whatever the + clamped step removed).""" + child = {1995: 0.0, 1998: 0.2, 1999: 2.47, 2000: 2.5} + parent = {y: 0.1 * (y - 1995) for y in range(1995, 2001)} # flat-ish 0.1/yr + + out = winsorize_steps(child, parent, MAX_STEP_DEVIATION_PER_YEAR) + + assert out[1995] == child[1995] # first year preserved + # 1995->1998: 0.0667/yr, well within 0.1 +/- 0.40 -> untouched. + assert abs(out[1998] - child[1998]) < 1e-12 + # 1998->1999: 2.27/yr clamped to parent_rate + max_dev = 0.1 + 0.40. + assert abs((out[1999] - out[1998]) - (0.1 + MAX_STEP_DEVIATION_PER_YEAR)) < 1e-12 + # 1999->2000: the in-band +0.03 step survives; the level shifts down with + # the clamped 1999. + assert abs((out[2000] - out[1999]) - (child[2000] - child[1999])) < 1e-12 + assert abs(out[2000] - 0.73) < 1e-12 + + +def test_winsorize_preserves_genuine_moves(): + """Steps within parent_rate +/- max_dev pass through (numerically) unchanged.""" + child = {2000: 0.0, 2001: 0.35, 2002: 0.40, 2003: 0.20} + parent = {y: 0.05 * (y - 2000) for y in range(2000, 2004)} + + out = winsorize_steps(child, parent, MAX_STEP_DEVIATION_PER_YEAR) + + assert set(out) == set(child) + assert max(abs(out[y] - child[y]) for y in child) < 1e-12 + + +def test_winsorize_judges_gap_steps_on_per_year_rate(): + """A step across a multi-year gap is judged on its PER-YEAR rate (with + gap-tolerant parent lookup via _base_value), not as one giant single-year + move: +1.0 over 5 years (0.2/yr) is in-band even though +1.0 in one year + would be clamped.""" + child = {1995: 0.0, 2000: 1.0} + # Parent lacks both endpoint years: 1995 back-fills to its earliest value + # (0.0), 2000 forward-fills from 1999 (0.3) -> parent rate 0.06/yr. + parent = {1996: 0.0, 1999: 0.3} + + out = winsorize_steps(child, parent, MAX_STEP_DEVIATION_PER_YEAR) + + assert out == child + + +def test_winsorize_degenerate_inputs_unchanged(): + """<2 child years -> no steps to clamp; an empty parent only occurs in + degenerate paths (build_index always passes the national index) -> child + is returned unchanged, never clamped against an arbitrary rate.""" + assert winsorize_steps({}, {2000: 0.0, 2001: 0.1}, 0.4) == {} + assert winsorize_steps({2000: 0.5}, {2000: 0.0, 2001: 0.1}, 0.4) == {2000: 0.5} + spiky = {2000: 0.0, 2001: 5.0} + assert winsorize_steps(spiky, {}, 0.4) == spiky diff --git a/pipeline/transform/test_crime_spatial.py b/pipeline/transform/test_crime_spatial.py index 0021242..22d6f6c 100644 --- a/pipeline/transform/test_crime_spatial.py +++ b/pipeline/transform/test_crime_spatial.py @@ -47,11 +47,22 @@ def _crime_row(month: str, x, y, crime_type: str) -> str: return f",{month},F,F,{lon},{lat},On or near X,E01000001,L,{crime_type},U," -def _write_month(crime_dir, month: str, rows: list[str]) -> None: +def _write_month( + crime_dir, month: str, rows: list[str], force: str = "test-force" +) -> None: + """Write one force's monthly CSV; an empty ``rows`` list still creates the + file, which counts as published coverage for that (force, month).""" month_dir = crime_dir / month - month_dir.mkdir(parents=True) + month_dir.mkdir(parents=True, exist_ok=True) body = "\n".join([_CSV_HEADER, *rows]) + "\n" - (month_dir / f"{month}-test-force-street.csv").write_text(body) + (month_dir / f"{month}-{force}-street.csv").write_text(body) + + +def _run(tmp_path, crime, units, **kwargs): + output = tmp_path / "crime_by_postcode.parquet" + by_year = tmp_path / "crime_by_postcode_by_year.parquet" + transform_crime_spatial(crime, units, output, by_year, buffer_m=50.0, **kwargs) + return pl.read_parquet(output), pl.read_parquet(by_year) def test_buffer_overlap_counts_for_each_postcode(tmp_path): @@ -84,18 +95,9 @@ def test_buffer_overlap_counts_for_each_postcode(tmp_path): ], ) - output = tmp_path / "crime_by_postcode.parquet" - by_year = tmp_path / "crime_by_postcode_by_year.parquet" - # Pin the 50m buffer the geometry above was designed around (the production - # default is now 100m). The three squares are equal-area, so area - # normalisation leaves the counts unchanged. - transform_crime_spatial(crime, units, output, by_year, buffer_m=50.0) - - rows = { - r["postcode"]: r - for r in pl.read_parquet(output).to_dicts() - } - # Single month -> annualised x12. + avg_df, _ = _run(tmp_path, crime, units) + rows = {r["postcode"]: r for r in avg_df.to_dicts()} + # Single covered month -> pooled rate x12. assert rows["AB1 1AA"]["Burglary (avg/yr)"] == 12.0 assert rows["AB1 1AB"]["Burglary (avg/yr)"] == 12.0 assert rows["AB1 1AA"]["Robbery (avg/yr)"] == 0.0 @@ -132,18 +134,14 @@ def test_by_year_annualises_and_rolls_up(tmp_path): ], ) - output = tmp_path / "crime_by_postcode.parquet" - by_year = tmp_path / "crime_by_postcode_by_year.parquet" - transform_crime_spatial(crime, units, output, by_year, buffer_m=50.0) - - by_year_df = pl.read_parquet(by_year) + _, by_year_df = _run(tmp_path, crime, units, min_bar_months=1) assert by_year_df.height == 1 cols = set(by_year_df.columns) assert {"Burglary (by year)", "Serious crime (by year)", "Minor crime (by year)"} <= cols row = by_year_df.row(0, named=True) burglary = sorted(row["Burglary (by year)"], key=lambda r: r["year"]) - # 2023: 1 burglary in 1 month -> 12/yr; 2024: 2 in 2 months -> 12/yr. + # 2023: 1 burglary in 1 covered month -> 12/yr; 2024: 2 in 2 months -> 12/yr. assert burglary == [ {"year": 2023, "count": 12.0}, {"year": 2024, "count": 12.0}, @@ -152,6 +150,9 @@ def test_by_year_annualises_and_rolls_up(tmp_path): # 2023 serious = Burglary(12) + Robbery(12) = 24; 2024 = Burglary(12). assert serious[2023] == 24.0 assert serious[2024] == 12.0 + # Coverage calendar: both years published, with their month counts. + coverage = {c["year"]: c["months"] for c in row["covered_years"]} + assert coverage == {2023: 1, 2024: 2} def test_area_normalisation_divides_out_buffered_catchment(tmp_path): @@ -184,9 +185,7 @@ def test_area_normalisation_divides_out_buffered_catchment(tmp_path): ], ) - output = tmp_path / "crime_by_postcode.parquet" - by_year = tmp_path / "crime_by_postcode_by_year.parquet" - transform_crime_spatial(crime, units, output, by_year, buffer_m=50.0) + avg_df, by_year_df = _run(tmp_path, crime, units, min_bar_months=1) # Re-derive the expected values from the same buffered catchment areas: each # postcode is 12/yr before normalisation, then x (median_buf / buffered_area). @@ -198,7 +197,7 @@ def test_area_normalisation_divides_out_buffered_catchment(tmp_path): median_buf = float(np.median(list(buf_area.values()))) expected = {pc: 12.0 * median_buf / buf_area[pc] for pc in buf_area} - rows = {r["postcode"]: r for r in pl.read_parquet(output).to_dicts()} + rows = {r["postcode"]: r for r in avg_df.to_dicts()} for pc, exp in expected.items(): assert rows[pc]["Burglary (avg/yr)"] == pytest.approx(exp, abs=0.1) @@ -211,18 +210,17 @@ def test_area_normalisation_divides_out_buffered_catchment(tmp_path): assert small / big < 1.5 # by-year series carries the same normalisation. - by_year_df = pl.read_parquet(by_year) small_row = by_year_df.filter(pl.col("postcode") == "AB1 1AA").row(0, named=True) assert small_row["Burglary (by year)"] == [ {"year": 2024, "count": pytest.approx(expected["AB1 1AA"], abs=0.1)} ] -def test_avg_yr_is_simple_mean_of_year_bars(tmp_path): - # Uneven month coverage across years: 2023 has 1 month (2 incidents -> 24/yr), - # 2024 has 2 months (2 incidents -> 12/yr). The headline must be the *simple* - # mean of the bars (24+12)/2 = 18, not the month-weighted pooled rate - # (4 incidents / 3 months * 12 = 16). +def test_avg_yr_is_pooled_rate_over_covered_months(tmp_path): + # Uneven month coverage across years: 2023 has 1 month (2 incidents), + # 2024 has 2 months (2 incidents). The headline is the POOLED annualised + # rate over all covered months: 4 incidents / 3 months * 12 = 16/yr -- not + # the old mean-of-bars (24+12)/2 = 18, which over-weighted thin years. units = tmp_path / "units" _write_boundaries( units, {"AB1": [_square_feature("AB1 1AA", 1000, 1000, 1010, 1010)]} @@ -240,68 +238,179 @@ def test_avg_yr_is_simple_mean_of_year_bars(tmp_path): _write_month(crime, "2024-01", [_crime_row("2024-01", 1005, 1005, "Burglary")]) _write_month(crime, "2024-02", [_crime_row("2024-02", 1005, 1005, "Burglary")]) - output = tmp_path / "crime_by_postcode.parquet" - by_year = tmp_path / "crime_by_postcode_by_year.parquet" - transform_crime_spatial(crime, units, output, by_year, buffer_m=50.0) + avg_df, by_year_df = _run(tmp_path, crime, units, min_bar_months=1) - avg = pl.read_parquet(output).row(0, named=True) - assert avg["Burglary (avg/yr)"] == pytest.approx(18.0, abs=0.05) + avg = avg_df.row(0, named=True) + assert avg["Burglary (avg/yr)"] == pytest.approx(16.0, abs=0.05) - row = pl.read_parquet(by_year).row(0, named=True) + # Bars remain per-year annualised: 2023 -> 24/yr (x12), 2024 -> 12/yr (x6). + row = by_year_df.row(0, named=True) bars = {p["year"]: p["count"] for p in row["Burglary (by year)"]} assert bars == {2023: pytest.approx(24.0, abs=0.05), 2024: pytest.approx(12.0, abs=0.05)} -def test_serious_rollup_avg_yr_equals_sum_of_components(tmp_path): - # Two SERIOUS types occur in DISJOINT years for one postcode: Burglary only in - # 2014, Robbery only in 2024 (each a single full month -> 12/yr). The headline - # "Serious crime (avg/yr)" must equal the SUM of its component (avg/yr) columns - # (Burglary 12 + Robbery 12 = 24), so the rollup is always the sum of the parts - # shown beside it and can never fall below a single component. (The previous - # union-years-present mean would have divided the per-year serious total by the - # 2 years any serious type occurred, giving a misleading 12 that sits below - # both the burglary and robbery rollup contributions.) +def test_sporadic_type_is_not_inflated_by_years_present(tmp_path): + # A single robbery in a 24-covered-month window must read as ~0.5/yr (the + # long-run pooled rate), NOT 12/yr (the old years-with-incidents mean that + # inflated sporadic categories by up to ~15x). units = tmp_path / "units" _write_boundaries( units, {"AB1": [_square_feature("AB1 1AA", 1000, 1000, 1010, 1010)]} ) crime = tmp_path / "crime" - _write_month(crime, "2014-01", [_crime_row("2014-01", 1005, 1005, "Burglary")]) - _write_month(crime, "2024-01", [_crime_row("2024-01", 1005, 1005, "Robbery")]) + for year in (2023, 2024): + for month in range(1, 13): + rows = [] + if (year, month) == (2023, 6): + rows = [_crime_row(f"{year}-{month:02d}", 1005, 1005, "Robbery")] + _write_month(crime, f"{year}-{month:02d}", rows) - output = tmp_path / "crime_by_postcode.parquet" - by_year = tmp_path / "crime_by_postcode_by_year.parquet" - transform_crime_spatial(crime, units, output, by_year, buffer_m=50.0) + avg_df, by_year_df = _run(tmp_path, crime, units) - avg = pl.read_parquet(output).row(0, named=True) - assert "Serious crime (avg/yr)" in avg - assert avg["Burglary (avg/yr)"] == pytest.approx(12.0, abs=0.05) - assert avg["Robbery (avg/yr)"] == pytest.approx(12.0, abs=0.05) - # Rollup == sum of its component (avg/yr) columns. - assert avg["Serious crime (avg/yr)"] == pytest.approx(24.0, abs=0.05) - assert avg["Serious crime (avg/yr)"] == pytest.approx( - avg["Burglary (avg/yr)"] + avg["Robbery (avg/yr)"], abs=0.05 + avg = avg_df.row(0, named=True) + # 1 incident over 24 covered months -> 0.5/yr. + assert avg["Robbery (avg/yr)"] == pytest.approx(0.5, abs=0.05) + # The by-year bar still shows the 2023 incident annualised over 12 covered + # months (1/yr); 2024 is covered with zero robberies -> no bar, but the + # year IS in the coverage list so consumers may render it as a true zero. + row = by_year_df.row(0, named=True) + bars = {p["year"]: p["count"] for p in row["Robbery (by year)"]} + assert bars == {2023: pytest.approx(1.0, abs=0.05)} + coverage = {c["year"]: c["months"] for c in row["covered_years"]} + assert coverage == {2023: 12, 2024: 12} + + +def test_force_gap_years_are_excluded_not_zeroed(tmp_path): + # Two postcodes policed by different forces. force-a publishes 2023+2024; + # force-b publishes only 2023 (a 2024 gap, like Greater Manchester). The + # b-postcode's headline must pool over force-b's 12 covered months only, + # and its by-year series must NOT contain a 2024 bar or coverage entry. + units = tmp_path / "units" + _write_boundaries( + units, + { + "AB1": [_square_feature("AB1 1AA", 1000, 1000, 1010, 1010)], + "CD1": [_square_feature("CD1 1AA", 9000, 9000, 9010, 9010)], + }, ) - # The by-year rollup series remains the per-year sum of the component bars. - serious_bars = { - p["year"]: p["count"] - for p in pl.read_parquet(by_year).row(0, named=True)["Serious crime (by year)"] - } - assert serious_bars == { - 2014: pytest.approx(12.0, abs=0.05), - 2024: pytest.approx(12.0, abs=0.05), - } + crime = tmp_path / "crime" + for month in range(1, 13): + ym23 = f"2023-{month:02d}" + ym24 = f"2024-{month:02d}" + # force-a covers AB1 in both years; one burglary per month in 2024. + _write_month(crime, ym23, [], force="force-a") + _write_month( + crime, ym24, [_crime_row(ym24, 1005, 1005, "Burglary")], force="force-a" + ) + # force-b covers CD1 in 2023 only: one burglary per month. + _write_month( + crime, ym23, [_crime_row(ym23, 9005, 9005, "Burglary")], force="force-b" + ) + + avg_df, by_year_df = _run(tmp_path, crime, units) + rows = {r["postcode"]: r for r in avg_df.to_dicts()} + + # force-a postcode: 12 burglaries over 24 covered months -> 6/yr. + assert rows["AB1 1AA"]["Burglary (avg/yr)"] == pytest.approx(6.0, abs=0.05) + # force-b postcode: 12 burglaries over 12 covered months -> 12/yr. Under + # the old global calendar this would have been diluted to 6/yr by the + # uncovered 2024. + assert rows["CD1 1AA"]["Burglary (avg/yr)"] == pytest.approx(12.0, abs=0.05) + + by_rows = {r["postcode"]: r for r in by_year_df.to_dicts()} + b_coverage = {c["year"]: c["months"] for c in by_rows["CD1 1AA"]["covered_years"]} + assert b_coverage == {2023: 12} + b_bars = {p["year"]: p["count"] for p in by_rows["CD1 1AA"]["Burglary (by year)"]} + assert set(b_bars) == {2023} + a_coverage = {c["year"]: c["months"] for c in by_rows["AB1 1AA"]["covered_years"]} + assert a_coverage == {2023: 12, 2024: 12} -def test_avg_yr_denominator_is_per_postcode_not_global(tmp_path): - # P (AB1 1AA) has burglaries only in its single most-recent year (2024); Q - # (AB1 1AB), far away, has a burglary in 2014. The type therefore spans TWO - # distinct years across all postcodes, but only ONE year for P. The headline - # must divide by P's own years-present (1), equalling its single by-year bar - # (24/yr) -- not by the global span (2), which would deflate it to 12/yr. - # The two squares are equal-area, so area normalisation leaves counts as-is. +def test_residue_incidents_in_uncovered_years_are_excluded(tmp_path): + # force-b stops publishing after 2023, but a force-a file contains a 2024 + # incident that falls inside the b-postcode's buffer (cross-border residue, + # the Greater Manchester pattern). That incident must not produce a 2024 + # bar for the b-postcode, nor leak into its pooled headline. + units = tmp_path / "units" + _write_boundaries( + units, + { + "AB1": [_square_feature("AB1 1AA", 1000, 1000, 1010, 1010)], + "CD1": [_square_feature("CD1 1AA", 9000, 9000, 9010, 9010)], + }, + ) + + crime = tmp_path / "crime" + for month in range(1, 13): + ym23 = f"2023-{month:02d}" + ym24 = f"2024-{month:02d}" + _write_month(crime, ym23, [], force="force-a") + # b's own 2023 incidents establish force-b as its home force. + _write_month( + crime, + ym23, + [_crime_row(ym23, 9005, 9005, "Burglary")] if month <= 6 else [], + force="force-b", + ) + # 2024: only force-a publishes; one of its incidents lands in CD1 1AA. + _write_month( + crime, + ym24, + [_crime_row(ym24, 9005, 9005, "Burglary")] if month == 1 else [], + force="force-a", + ) + + avg_df, by_year_df = _run(tmp_path, crime, units) + + b_row = avg_df.filter(pl.col("postcode") == "CD1 1AA").row(0, named=True) + # Pooled over force-b's 12 covered months (2023): 6 incidents -> 6/yr. + # The residue 2024 incident is excluded (force-b published 0 months in 2024). + assert b_row["Burglary (avg/yr)"] == pytest.approx(6.0, abs=0.05) + + b_by = by_year_df.filter(pl.col("postcode") == "CD1 1AA").row(0, named=True) + bars = {p["year"]: p["count"] for p in b_by["Burglary (by year)"]} + assert set(bars) == {2023} + coverage = {c["year"]: c["months"] for c in b_by["covered_years"]} + assert coverage == {2023: 12} + + +def test_partial_years_below_min_bar_months_get_no_bar(tmp_path): + # 2023 fully covered; 2024 has only 2 published months. With the default + # 6-month minimum, 2024 must produce neither a bar (annualising x6 charts + # noise) nor a coverage entry -- but its incidents and months still count + # toward the pooled headline. + units = tmp_path / "units" + _write_boundaries( + units, {"AB1": [_square_feature("AB1 1AA", 1000, 1000, 1010, 1010)]} + ) + + crime = tmp_path / "crime" + for month in range(1, 13): + ym = f"2023-{month:02d}" + _write_month(crime, ym, [_crime_row(ym, 1005, 1005, "Burglary")]) + for month in (1, 2): + ym = f"2024-{month:02d}" + _write_month(crime, ym, [_crime_row(ym, 1005, 1005, "Burglary")]) + + avg_df, by_year_df = _run(tmp_path, crime, units) + + # Pooled: 14 incidents over 14 covered months -> 12/yr. + assert avg_df.row(0, named=True)["Burglary (avg/yr)"] == pytest.approx( + 12.0, abs=0.05 + ) + row = by_year_df.row(0, named=True) + bars = {p["year"]: p["count"] for p in row["Burglary (by year)"]} + assert set(bars) == {2023} + coverage = {c["year"]: c["months"] for c in row["covered_years"]} + assert coverage == {2023: 12} + + +def test_by_year_output_is_dense_with_coverage(tmp_path): + # A postcode with zero incidents still gets a by-year row carrying its + # coverage calendar, so "covered and crime-free" is distinguishable from + # "no data" downstream. units = tmp_path / "units" _write_boundaries( units, @@ -314,42 +423,52 @@ def test_avg_yr_denominator_is_per_postcode_not_global(tmp_path): ) crime = tmp_path / "crime" - # P: 2 burglaries in a single 2024 month -> 24/yr bar, present in 1 year. - _write_month( - crime, - "2024-01", - [ - _crime_row("2024-01", 1005, 1005, "Burglary"), - _crime_row("2024-01", 1005, 1005, "Burglary"), - ], + _write_month(crime, "2024-01", [_crime_row("2024-01", 1005, 1005, "Burglary")]) + + avg_df, by_year_df = _run(tmp_path, crime, units, min_bar_months=1) + assert by_year_df.height == 2 + + quiet = by_year_df.filter(pl.col("postcode") == "AB1 1AB").row(0, named=True) + assert quiet["Burglary (by year)"] is None + assert [c["year"] for c in quiet["covered_years"]] == [2024] + # And the headline for the quiet postcode is a genuine 0, not null. + quiet_avg = avg_df.filter(pl.col("postcode") == "AB1 1AB").row(0, named=True) + assert quiet_avg["Burglary (avg/yr)"] == 0.0 + + +def test_serious_rollup_avg_yr_equals_sum_of_components(tmp_path): + # Burglary only in 2014, Robbery only in 2024 (one incident each, 2 covered + # months total). Components pool over the same covered window (each + # 1 x 12 / 2 = 6/yr) and the rollup equals their sum. + units = tmp_path / "units" + _write_boundaries( + units, {"AB1": [_square_feature("AB1 1AA", 1000, 1000, 1010, 1010)]} ) - # Q: 1 burglary in a far-back 2014 month -> widens the type's global span to - # two years without adding any incident to P. - _write_month(crime, "2014-01", [_crime_row("2014-01", 5005, 5005, "Burglary")]) - output = tmp_path / "crime_by_postcode.parquet" - by_year = tmp_path / "crime_by_postcode_by_year.parquet" - transform_crime_spatial(crime, units, output, by_year, buffer_m=50.0) + crime = tmp_path / "crime" + _write_month(crime, "2014-01", [_crime_row("2014-01", 1005, 1005, "Burglary")]) + _write_month(crime, "2024-01", [_crime_row("2024-01", 1005, 1005, "Robbery")]) - rows = {r["postcode"]: r for r in pl.read_parquet(output).to_dicts()} - by_year_rows = { - r["postcode"]: r for r in pl.read_parquet(by_year).to_dicts() + avg_df, by_year_df = _run(tmp_path, crime, units, min_bar_months=1) + + avg = avg_df.row(0, named=True) + assert avg["Burglary (avg/yr)"] == pytest.approx(6.0, abs=0.05) + assert avg["Robbery (avg/yr)"] == pytest.approx(6.0, abs=0.05) + # Rollup == sum of its component (avg/yr) columns. + assert avg["Serious crime (avg/yr)"] == pytest.approx(12.0, abs=0.05) + assert avg["Serious crime (avg/yr)"] == pytest.approx( + avg["Burglary (avg/yr)"] + avg["Robbery (avg/yr)"], abs=0.05 + ) + + # The by-year rollup series remains the per-year sum of the component bars. + serious_bars = { + p["year"]: p["count"] + for p in by_year_df.row(0, named=True)["Serious crime (by year)"] + } + assert serious_bars == { + 2014: pytest.approx(12.0, abs=0.05), + 2024: pytest.approx(12.0, abs=0.05), } - - # P's headline equals the simple mean of its own bars (just the 2024 bar). - p_bars = {p["year"]: p["count"] for p in by_year_rows["AB1 1AA"]["Burglary (by year)"]} - assert p_bars == {2024: pytest.approx(24.0, abs=0.05)} - # Per-postcode denominator (1) -> 24.0. The old global denominator (2 years - # across all postcodes) would have deflated this to 12.0. - assert rows["AB1 1AA"]["Burglary (avg/yr)"] == pytest.approx(24.0, abs=0.05) - assert rows["AB1 1AA"]["Burglary (avg/yr)"] == pytest.approx( - sum(p_bars.values()) / len(p_bars), abs=0.05 - ) - - # Q likewise: its sole 2014 bar -> 12/yr, divided by its own 1 year = 12.0. - q_bars = {p["year"]: p["count"] for p in by_year_rows["AB1 1AB"]["Burglary (by year)"]} - assert q_bars == {2014: pytest.approx(12.0, abs=0.05)} - assert rows["AB1 1AB"]["Burglary (avg/yr)"] == pytest.approx(12.0, abs=0.05) def test_unknown_crime_type_is_dropped_with_warning(tmp_path, capsys): @@ -368,11 +487,8 @@ def test_unknown_crime_type_is_dropped_with_warning(tmp_path, capsys): ], ) - output = tmp_path / "crime_by_postcode.parquet" - by_year = tmp_path / "crime_by_postcode_by_year.parquet" - transform_crime_spatial(crime, units, output, by_year, buffer_m=50.0) - - columns = pl.read_parquet(output).columns + avg_df, _ = _run(tmp_path, crime, units) + columns = avg_df.columns # The unknown type is dropped (no column for it) but a warning is emitted. assert "Cyber fraud (avg/yr)" not in columns assert "Burglary (avg/yr)" in columns @@ -399,16 +515,13 @@ def test_legacy_crime_types_are_mapped(tmp_path): ], ) - output = tmp_path / "crime_by_postcode.parquet" - by_year = tmp_path / "crime_by_postcode_by_year.parquet" - transform_crime_spatial(crime, units, output, by_year, buffer_m=50.0) - - row = pl.read_parquet(output).to_dicts()[0] - # Single postcode -> area-norm factor 1.0; single month/year -> x12. + avg_df, by_year_df = _run(tmp_path, crime, units, min_bar_months=1) + row = avg_df.to_dicts()[0] + # Single postcode -> area-norm factor 1.0; single covered month -> x12. assert row["Violence and sexual offences (avg/yr)"] == 12.0 assert row["Public order (avg/yr)"] == 12.0 - by_year_row = pl.read_parquet(by_year).row(0, named=True) + by_year_row = by_year_df.row(0, named=True) assert by_year_row["Violence and sexual offences (by year)"] == [ {"year": 2013, "count": 12.0} ] diff --git a/pipeline/transform/test_join_epc_pp.py b/pipeline/transform/test_join_epc_pp.py index b8d4dcd..ecd3d8f 100644 --- a/pipeline/transform/test_join_epc_pp.py +++ b/pipeline/transform/test_join_epc_pp.py @@ -11,6 +11,7 @@ from pipeline.transform.join_epc_pp import ( _join_address_parts, _run, _scan_epc_certificates, + flag_price_outliers, ) @@ -261,6 +262,9 @@ def test_run_joins_domestic_zip_with_price_paid(tmp_path: Path): ] assert df.get_column("renovation_history").list.len().to_list() == [1] assert df.get_column("historical_prices").list.len().to_list() == [2] + # Audit trail: the accepted fuzzy match's score is published (100 = exact + # post-normalisation address match). + assert df.get_column("epc_match_score").to_list() == [100] def test_run_dedup_prefers_valid_dated_cert_over_garbled_date(tmp_path: Path): @@ -395,12 +399,15 @@ def test_run_does_not_attach_epc_facts_to_low_score_address_match(tmp_path: Path "epc_address", "total_floor_area", "current_energy_rating", + "epc_match_score", ).to_dicts() == [ { "pp_address": "1 Example Street", "epc_address": None, "total_floor_area": None, "current_energy_rating": None, + # No accepted match -> no score. + "epc_match_score": None, } ] @@ -537,6 +544,222 @@ def test_run_keeps_sale_above_lowered_min_price(tmp_path: Path): assert df.get_column("latest_price").to_list() == [30_000] +def _write_epc_zip(zip_path: Path) -> None: + """Write a minimal domestic zip with the default certificate row.""" + with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as archive: + csv_buffer = io.StringIO() + writer = csv.DictWriter(csv_buffer, fieldnames=EPC_SOURCE_COLUMNS) + writer.writeheader() + writer.writerow(_row()) + archive.writestr("certificates-2024.csv", csv_buffer.getvalue()) + + +def _price_paid_frame( + prices: list[int], + dates: list[date], + ppd_categories: list[str] | None = None, +) -> pl.DataFrame: + """One property ("1 Example Street, AA1 1AA") with the given sales.""" + count = len(prices) + return pl.DataFrame( + { + "price": prices, + "date_of_transfer": dates, + "property_type": ["T"] * count, + "postcode": ["AA1 1AA"] * count, + "paon": ["1"] * count, + "saon": [None] * count, + "street": ["Example Street"] * count, + "locality": [None] * count, + "town_city": ["Exampletown"] * count, + "duration": ["F"] * count, + "old_new": ["N"] * count, + "ppd_category": ppd_categories or ["A"] * count, + } + ) + + +def test_run_collapses_duplicate_transactions(tmp_path: Path): + # Price-paid lodges the same completed sale twice under distinct + # transaction ids; the duplicate must appear ONCE in historical_prices + # rather than double-counting the sale. + zip_path = tmp_path / "domestic-csv.zip" + _write_epc_zip(zip_path) + + price_paid_path = tmp_path / "price-paid.parquet" + _price_paid_frame( + prices=[200_000, 250_000, 250_000], + dates=[date(2020, 2, 3), date(2024, 2, 3), date(2024, 2, 3)], + ).write_parquet(price_paid_path) + + output_path = tmp_path / "epc-pp.parquet" + _run(zip_path, price_paid_path, output_path, tmp_path) + + df = pl.read_parquet(output_path) + + assert df.height == 1 + # The duplicated 250_000 sale collapses to one entry; two distinct sales. + assert df.get_column("historical_prices").to_list() == [ + [ + {"year": 2020, "month": 2, "price": 200_000}, + {"year": 2024, "month": 2, "price": 250_000}, + ] + ] + assert df.get_column("latest_price").to_list() == [250_000] + + +def test_run_excludes_implausible_price_jump_but_keeps_property(tmp_path: Path): + # The 13 QUICKSETTS HR2 7PP case: £140,000 in 2016 then "£207,500,000" in + # 2026 (clearly £207,500 with extra digits, lodged as category A). The + # garbage sale must vanish from latest_price / historical_prices while the + # property row itself survives on its genuine sale. + zip_path = tmp_path / "domestic-csv.zip" + _write_epc_zip(zip_path) + + price_paid_path = tmp_path / "price-paid.parquet" + _price_paid_frame( + prices=[140_000, 207_500_000], + dates=[date(2016, 6, 1), date(2026, 6, 1)], + ).write_parquet(price_paid_path) + + output_path = tmp_path / "epc-pp.parquet" + _run(zip_path, price_paid_path, output_path, tmp_path) + + df = pl.read_parquet(output_path) + + assert df.height == 1 + assert df.get_column("latest_price").to_list() == [140_000] + assert df.get_column("historical_prices").to_list() == [ + [{"year": 2016, "month": 6, "price": 140_000}] + ] + + +def test_run_keeps_genuine_long_horizon_appreciation(tmp_path: Path): + # x30 over 31 years is extreme but genuine (prime-London territory); the + # time-aware threshold (12 * 1.10**31 ≈ 230) must leave it untouched. + zip_path = tmp_path / "domestic-csv.zip" + _write_epc_zip(zip_path) + + price_paid_path = tmp_path / "price-paid.parquet" + _price_paid_frame( + prices=[20_000, 600_000], + dates=[date(1995, 3, 1), date(2026, 3, 1)], + ).write_parquet(price_paid_path) + + output_path = tmp_path / "epc-pp.parquet" + _run(zip_path, price_paid_path, output_path, tmp_path) + + df = pl.read_parquet(output_path) + + assert df.height == 1 + assert df.get_column("historical_prices").list.len().to_list() == [2] + assert df.get_column("latest_price").to_list() == [600_000] + + +def test_run_keeps_right_to_buy_style_jump(tmp_path: Path): + # A x12 jump on a cheap property (discounted right-to-buy purchase then an + # open-market resale) is legitimate; the JUMP_MIN_PRICE floor keeps such + # sales safe from the jump guard. + zip_path = tmp_path / "domestic-csv.zip" + _write_epc_zip(zip_path) + + price_paid_path = tmp_path / "price-paid.parquet" + _price_paid_frame( + prices=[15_000, 180_000], + dates=[date(1998, 5, 1), date(2003, 5, 1)], + ).write_parquet(price_paid_path) + + output_path = tmp_path / "epc-pp.parquet" + _run(zip_path, price_paid_path, output_path, tmp_path) + + df = pl.read_parquet(output_path) + + assert df.height == 1 + assert df.get_column("historical_prices").list.len().to_list() == [2] + assert df.get_column("latest_price").to_list() == [180_000] + + +def _slim_sales(rows: list[tuple[str, date, int]]) -> pl.DataFrame: + return pl.DataFrame( + { + "_pp_group_address": [address for address, _, _ in rows], + "_pp_group_postcode": ["AA11AA"] * len(rows), + "date_of_transfer": [transfer_date for _, transfer_date, _ in rows], + "price": [price for _, _, price in rows], + } + ) + + +def test_flag_price_outliers_up_rule_flags_spike_after_normal_sale(): + # x1,482 over 10 years against a threshold of 12 * 1.10**10 ≈ 31: the + # HIGHER sale is flagged, the genuine earlier sale is left alone. + outliers = flag_price_outliers( + _slim_sales( + [ + ("13 QUICKSETTS", date(2016, 6, 1), 140_000), + ("13 QUICKSETTS", date(2026, 6, 1), 207_500_000), + ] + ) + ) + + assert outliers.to_dicts() == [ + { + "_pp_group_address": "13 QUICKSETTS", + "_pp_group_postcode": "AA11AA", + "date_of_transfer": date(2026, 6, 1), + "price": 207_500_000, + "_price_outlier": True, + } + ] + + +def test_flag_price_outliers_down_rule_flags_spike_before_normal_sale(): + # The garbage sale comes FIRST, so it has no previous sale to compare + # against; the down rule (next sale collapses to under 1/threshold of this + # one) must catch it instead. + outliers = flag_price_outliers( + _slim_sales( + [ + ("5 EXAMPLE ROAD", date(2016, 6, 1), 250_000_000), + ("5 EXAMPLE ROAD", date(2017, 6, 1), 140_000), + ] + ) + ) + + assert outliers.get_column("price").to_list() == [250_000_000] + + +def test_flag_price_outliers_min_price_floor_protects_cheap_properties(): + # x40 in under six months exceeds the relative threshold (~12.6 at the + # half-year floor), but the flagged price (600k) is below JUMP_MIN_PRICE, + # so nothing is flagged: the absolute floor is load-bearing here. + outliers = flag_price_outliers( + _slim_sales( + [ + ("9 CHEAP STREET", date(2000, 1, 1), 15_000), + ("9 CHEAP STREET", date(2000, 6, 1), 600_000), + ] + ) + ) + + assert outliers.height == 0 + + +def test_flag_price_outliers_spares_expensive_long_horizon_growth(): + # x30 over 31 years on a now-£4.5M property clears the £2M floor but stays + # under the time-aware threshold (12 * 1.10**31 ≈ 230): not flagged. + outliers = flag_price_outliers( + _slim_sales( + [ + ("1 PRIME PLACE", date(1995, 1, 1), 150_000), + ("1 PRIME PLACE", date(2026, 1, 1), 4_500_000), + ] + ) + ) + + assert outliers.height == 0 + + def test_epc_band_to_year_uses_midpoint_and_clamps(): import polars as pl diff --git a/pipeline/utils/fuzzy_join.py b/pipeline/utils/fuzzy_join.py index d638bc7..a6a057d 100644 --- a/pipeline/utils/fuzzy_join.py +++ b/pipeline/utils/fuzzy_join.py @@ -11,7 +11,12 @@ from tqdm import tqdm from pipeline.local_temp import local_tmp_dir -_NUMBER_RE = re.compile(r"\d+") +# A house-number token includes any letter suffix: 8A, 8B and plain 8 are +# three different properties on the same street, so digit-only extraction +# (which collapsed all three to "8") is not enough. Addresses are passed +# through normalize_address_key first, so tokens are uppercase and +# space-separated and [A-Z] suffices for the suffix. +_NUMBER_RE = re.compile(r"\d+[A-Z]?") _POSTCODE_RE = r"^[A-Z]{1,2}\d[A-Z\d]?\d[A-Z]{2}$" # A house number is a strong disambiguator, so a numbered, number-compatible # pair may match on a lower address-similarity score than a number-less one @@ -61,8 +66,10 @@ def fuzzy_join_on_postcode( columns (index, address, postcode) via projection pushdown, and the final join reads the remaining columns lazily. - Returns a LazyFrame with all left and right columns. Unmatched rows - have null right columns. + Returns a LazyFrame with all left and right columns, plus a + ``_match_score`` (UInt8) audit column holding the token_sort_ratio of + the accepted match (exact matches score 100). Unmatched rows have null + right columns and a null score. """ tmpdir = tempfile.mkdtemp(prefix="fuzzy_join_", dir=local_tmp_dir()) @@ -152,14 +159,17 @@ def fuzzy_join_on_postcode( # Sort descending by score so best matches are assigned first all_pairs.sort(key=lambda t: (t[0], -t[1]), reverse=True) - matches: list[tuple[int, int]] = [] + # Keep the score alongside each accepted pair: it is emitted as the + # _match_score audit column so downstream consumers can distinguish + # exact (100) from looser fuzzy matches. + matches: list[tuple[int, int, int]] = [] # (left_idx, right_idx, score) matched_left: set[int] = set() matched_right: set[int] = set() - for _score, left_idx, right_idx in all_pairs: + for score, left_idx, right_idx in all_pairs: if left_idx in matched_left or right_idx in matched_right: continue - matches.append((left_idx, right_idx)) + matches.append((left_idx, right_idx, score)) matched_left.add(left_idx) matched_right.add(right_idx) @@ -171,6 +181,7 @@ def fuzzy_join_on_postcode( { "_left_idx": pl.Series([m[0] for m in matches], dtype=pl.UInt32), "_right_idx": pl.Series([m[1] for m in matches], dtype=pl.UInt32), + "_match_score": pl.Series([m[2] for m in matches], dtype=pl.UInt8), } ) else: @@ -178,6 +189,7 @@ def fuzzy_join_on_postcode( { "_left_idx": pl.Series([], dtype=pl.UInt32), "_right_idx": pl.Series([], dtype=pl.UInt32), + "_match_score": pl.Series([], dtype=pl.UInt8), } ) @@ -197,18 +209,26 @@ def fuzzy_join_on_postcode( def _numbers_compatible(a: str, b: str) -> bool: - """Check that numeric tokens (flat/house numbers) in the shorter set are a subset of the longer. + """Check that the number tokens (house/flat numbers, including any letter + suffix) of two addresses are IDENTICAL sets. - Returns False if one address has numbers and the other doesn't. + Equality, not subset: subset logic let "188 GREAT NORTH WAY" absorb + "FLAT 1 188 GREAT NORTH WAY" ({188} is a subset of {1, 188}), attaching a + single flat's EPC facts to the whole building — tens of thousands of + wrong-property matches. Likewise digit-only tokens made "8A" and "8B" + both look like {8} and match each other (and plain "8"). Precision over + recall: a pair whose two sources genuinely disagree on number tokens is + safer left unmatched. + + One side numbered, the other not -> incompatible. Neither numbered -> + compatible; such pairs are scored against the stricter no-numbers + threshold instead. """ nums_a = set(_NUMBER_RE.findall(a)) nums_b = set(_NUMBER_RE.findall(b)) - smaller, larger = ( - (nums_a, nums_b) if len(nums_a) <= len(nums_b) else (nums_b, nums_a) - ) - if not smaller and larger: - return False - return smaller.issubset(larger) + if not nums_a and not nums_b: + return True + return nums_a == nums_b def _score_bucket( diff --git a/pipeline/utils/test_fuzzy_join.py b/pipeline/utils/test_fuzzy_join.py index 28d0dfc..6d8f180 100644 --- a/pipeline/utils/test_fuzzy_join.py +++ b/pipeline/utils/test_fuzzy_join.py @@ -1,6 +1,7 @@ import polars as pl from pipeline.utils import fuzzy_join_on_postcode, normalize_postcode_key +from pipeline.utils.fuzzy_join import _numbers_compatible def test_fuzzy_join_on_postcode_matches_addresses_within_postcode(): @@ -219,6 +220,107 @@ def test_fuzzy_join_matches_high_score_number_less_pair(): assert result["right_address"].to_list() == ["THE OLD RECTORY"] +def test_numbers_compatible_treats_letter_suffix_as_part_of_the_number(): + # 8A, 8B and plain 8 are three different properties on the same street; + # digit-only extraction collapsed all three to {8} and let them match. + assert not _numbers_compatible("8A HIGH STREET", "8B HIGH STREET") + assert not _numbers_compatible("8A HIGH STREET", "8 HIGH STREET") + assert _numbers_compatible("8A HIGH STREET", "8A HIGH STREET") + + +def test_numbers_compatible_requires_equal_sets_not_subset(): + # Subset logic let the whole-building record "188 ..." absorb its flat + # "FLAT 1 188 ..." ({188} is a subset of {1, 188}); the sets must be equal. + assert not _numbers_compatible("FLAT 1 188 GREAT NORTH WAY", "188 GREAT NORTH WAY") + assert _numbers_compatible( + "FLAT 1 188 GREAT NORTH WAY", "188 GREAT NORTH WAY FLAT 1" + ) + + +def test_numbers_compatible_number_less_and_one_sided_pairs(): + # Neither side numbered -> compatible (gated by the stricter no-numbers + # score threshold instead); exactly one side numbered -> incompatible. + assert _numbers_compatible("ROSE COTTAGE", "ROSE COTTAGE") + assert not _numbers_compatible("ROSE COTTAGE", "8 HIGH STREET") + + +def test_fuzzy_join_rejects_wrong_letter_suffix_match(): + # End-to-end guard for the 8A/8B class of wrong-property matches: the only + # candidate in the postcode bucket differs solely in the number suffix, so + # the row must stay unmatched rather than borrow the neighbour's record. + left = pl.LazyFrame( + { + "left_address": ["8A High Street"], + "left_postcode": ["AB1 2CD"], + } + ) + right = pl.LazyFrame( + { + "right_address": ["8B High Street"], + "right_postcode": ["AB1 2CD"], + } + ) + + result = fuzzy_join_on_postcode( + left=left, + right=right, + left_address_col="left_address", + right_address_col="right_address", + left_postcode_col="left_postcode", + right_postcode_col="right_postcode", + ).collect() + + assert result["right_address"].to_list() == [None] + + +def test_fuzzy_join_emits_match_score_column(): + # The audit column carries the token_sort_ratio of the accepted match: + # 100 for an exact (post-normalisation) address match, the raw fuzzy score + # otherwise, and null for unmatched rows. + left = pl.LazyFrame( + { + "left_id": ["exact", "fuzzy", "unmatched"], + "left_address": [ + "10 High Street", + "10 Acacia Avenue", + "99 Other Road", + ], + "left_postcode": ["AB1 2CD", "EF3 4GH", "ZZ9 9ZZ"], + } + ) + right = pl.LazyFrame( + { + "right_address": [ + "10 HIGH STREET", + # Scores exactly 82 against "10 Acacia Avenue" (see + # test_fuzzy_join_matches_numbered_pair_at_baseline_threshold). + "Flat A, 10 Acacia Avenue", + ], + "right_postcode": ["AB1 2CD", "EF3 4GH"], + } + ) + + result = ( + fuzzy_join_on_postcode( + left=left, + right=right, + left_address_col="left_address", + right_address_col="right_address", + left_postcode_col="left_postcode", + right_postcode_col="right_postcode", + ) + .sort("left_id") + .collect() + ) + + assert result.schema["_match_score"] == pl.UInt8 + assert result.select("left_id", "_match_score").to_dicts() == [ + {"left_id": "exact", "_match_score": 100}, + {"left_id": "fuzzy", "_match_score": 82}, + {"left_id": "unmatched", "_match_score": None}, + ] + + def test_normalize_postcode_key_requires_full_postcode(): df = pl.DataFrame( { diff --git a/server-rs/src/data/crime_by_year.rs b/server-rs/src/data/crime_by_year.rs index ceff34a..b77eed6 100644 --- a/server-rs/src/data/crime_by_year.rs +++ b/server-rs/src/data/crime_by_year.rs @@ -17,6 +17,14 @@ use super::run_polars_io; /// (e.g. `"Burglary (by year)"`). Stripped to derive the display name. pub const BY_YEAR_SUFFIX: &str = " (by year)"; +/// Per-postcode police-force coverage calendar column: `list[struct{year, +/// months}]` of the years the postcode's home force published enough months. +/// police.uk has multi-year publication gaps for whole forces (e.g. Greater +/// Manchester 2019-07 onwards), and a missing year is *no data*, not zero +/// crime — consumers must exclude uncovered (postcode, year)s instead of +/// charting them as zeros. +pub const COVERAGE_COLUMN: &str = "covered_years"; + #[derive(Clone, Copy)] pub struct YearPoint { pub year: i32, @@ -37,6 +45,12 @@ pub struct CrimeByYearData { pub years_by_type: Vec>, /// Postcode → all available per-type series for that postcode. pub series_by_postcode: FxHashMap>, + /// Postcode → years its police force actually published data for (from + /// the `covered_years` column). An EMPTY vec means the postcode's crime + /// picture is unknown (force gap / unusable geometry) — it must not count + /// toward any year. A postcode ABSENT from this map (legacy parquet + /// without the column) is treated as covered for every year. + pub covered_years_by_postcode: FxHashMap>, } impl CrimeByYearData { @@ -165,9 +179,44 @@ impl CrimeByYearData { years_by_type.push(years_for_type.into_iter().collect()); } + // Force-coverage calendar (optional column: legacy parquets predate it; + // their postcodes are treated as fully covered). A row with an empty + // list is meaningful — zero covered years — so it IS inserted. + let mut covered_years_by_postcode: FxHashMap> = + FxHashMap::default(); + if let Ok(col) = df.column(COVERAGE_COLUMN) { + let list_ca = col + .list() + .with_context(|| format!("Column '{COVERAGE_COLUMN}' is not a list"))?; + for (row, postcode) in postcode_values.iter().enumerate().take(row_count) { + let Some(inner) = list_ca.get_as_series(row) else { + // Null coverage: treat as legacy/fully covered (skip). + continue; + }; + let mut years: Vec = Vec::with_capacity(inner.len()); + if !inner.is_empty() { + let structs = inner.struct_().with_context(|| { + format!("Inner of '{COVERAGE_COLUMN}' is not a struct") + })?; + let year_field = structs.field_by_name("year").with_context(|| { + format!("Missing 'year' field in '{COVERAGE_COLUMN}'") + })?; + for idx in 0..inner.len() { + match year_field.get(idx).ok() { + Some(AnyValue::Int32(y)) => years.push(y), + Some(AnyValue::Int64(y)) => years.push(y as i32), + _ => continue, + } + } + } + covered_years_by_postcode.insert(postcode.clone(), years); + } + } + info!( postcodes = series_by_postcode.len(), crime_types = crime_types.len(), + with_coverage = covered_years_by_postcode.len(), "Crime-by-year data loaded" ); @@ -175,6 +224,7 @@ impl CrimeByYearData { crime_types, years_by_type, series_by_postcode, + covered_years_by_postcode, }) } } diff --git a/server-rs/src/features.rs b/server-rs/src/features.rs index cb16b1c..51bacca 100644 --- a/server-rs/src/features.rs +++ b/server-rs/src/features.rs @@ -474,7 +474,7 @@ pub static FEATURE_GROUPS: &[FeatureGroup] = &[ }, step: 1.0, description: "Aggregate of serious crime categories per year", - detail: "Sum of violence, robbery, burglary, and weapons possession per year within 50m of the postcode, counted from police.uk street-level crime points (anonymised, snapped to nearby map points). Provides a single serious crime metric.", + detail: "Sum of violence, robbery, burglary, and weapons possession per year near the postcode, counted from police.uk street-level crime points (anonymised, snapped to nearby map points). This is an area-normalised incident density for the surrounding streets, not a per-resident risk: busy commercial centres rank high however few people live there. Averaged over the months the local police force actually published data; known force gaps (e.g. Greater Manchester since mid-2019) are excluded rather than counted as zero crime.", source: "crime", prefix: "", suffix: "/yr", @@ -489,7 +489,7 @@ pub static FEATURE_GROUPS: &[FeatureGroup] = &[ }, step: 1.0, description: "Aggregate of minor crime categories per year", - detail: "Sum of anti-social behaviour, shoplifting, bicycle theft, and other lower-severity crime per year within 50m of the postcode, counted from police.uk street-level crime points (anonymised, snapped to nearby map points). Provides a single minor crime metric.", + detail: "Sum of anti-social behaviour, shoplifting, bicycle theft, and other lower-severity crime per year near the postcode, counted from police.uk street-level crime points (anonymised, snapped to nearby map points). This is an area-normalised incident density for the surrounding streets, not a per-resident risk: busy commercial centres rank high however few people live there. Averaged over the months the local police force actually published data; known force gaps (e.g. Greater Manchester since mid-2019) are excluded rather than counted as zero crime.", source: "crime", prefix: "", suffix: "/yr", @@ -504,7 +504,7 @@ pub static FEATURE_GROUPS: &[FeatureGroup] = &[ }, step: 1.0, description: "Average yearly violent and sexual offences in the area", - detail: "Average number of violence and sexual offences per year within 50m of the postcode, from police.uk street-level crime data. Includes assault, harassment, and sexual offences.", + detail: "Average number of violence and sexual offences per year near the postcode, from police.uk street-level crime data. Includes assault, harassment, and sexual offences.", source: "crime", prefix: "", suffix: "/yr", @@ -519,7 +519,7 @@ pub static FEATURE_GROUPS: &[FeatureGroup] = &[ }, step: 1.0, description: "Average yearly burglary offences in the area", - detail: "Average number of burglary offences per year within 50m of the postcode, from police.uk street-level crime data. Includes residential and commercial burglary.", + detail: "Average number of burglary offences per year near the postcode, from police.uk street-level crime data. Includes residential and commercial burglary.", source: "crime", prefix: "", suffix: "/yr", @@ -534,7 +534,7 @@ pub static FEATURE_GROUPS: &[FeatureGroup] = &[ }, step: 1.0, description: "Average yearly robbery offences in the area", - detail: "Average number of robbery offences per year within 50m of the postcode, from police.uk street-level crime data. Robbery involves theft with force or threat of force.", + detail: "Average number of robbery offences per year near the postcode, from police.uk street-level crime data. Robbery involves theft with force or threat of force.", source: "crime", prefix: "", suffix: "/yr", @@ -549,7 +549,7 @@ pub static FEATURE_GROUPS: &[FeatureGroup] = &[ }, step: 1.0, description: "Average yearly vehicle crime in the area", - detail: "Average number of vehicle crime incidents per year within 50m of the postcode, from police.uk street-level crime data. Includes theft of and from vehicles.", + detail: "Average number of vehicle crime incidents per year near the postcode, from police.uk street-level crime data. Includes theft of and from vehicles.", source: "crime", prefix: "", suffix: "/yr", @@ -564,7 +564,7 @@ pub static FEATURE_GROUPS: &[FeatureGroup] = &[ }, step: 1.0, description: "Average yearly anti-social behaviour incidents in the area", - detail: "Average number of anti-social behaviour incidents per year within 50m of the postcode, from police.uk street-level crime data. Includes nuisance, environmental, and personal anti-social behaviour.", + detail: "Average number of anti-social behaviour incidents per year near the postcode, from police.uk street-level crime data. Includes nuisance, environmental, and personal anti-social behaviour.", source: "crime", prefix: "", suffix: "/yr", @@ -579,7 +579,7 @@ pub static FEATURE_GROUPS: &[FeatureGroup] = &[ }, step: 1.0, description: "Average yearly criminal damage and arson in the area", - detail: "Average number of criminal damage and arson incidents per year within 50m of the postcode, from police.uk street-level crime data.", + detail: "Average number of criminal damage and arson incidents per year near the postcode, from police.uk street-level crime data.", source: "crime", prefix: "", suffix: "/yr", @@ -594,7 +594,7 @@ pub static FEATURE_GROUPS: &[FeatureGroup] = &[ }, step: 1.0, description: "Average yearly other theft offences in the area", - detail: "Average number of 'other theft' offences per year within 50m of the postcode, from police.uk street-level crime data. Includes theft not classified under burglary, vehicle crime, shoplifting, or bicycle theft.", + detail: "Average number of 'other theft' offences per year near the postcode, from police.uk street-level crime data. Includes theft not classified under burglary, vehicle crime, shoplifting, or bicycle theft.", source: "crime", prefix: "", suffix: "/yr", @@ -609,7 +609,7 @@ pub static FEATURE_GROUPS: &[FeatureGroup] = &[ }, step: 1.0, description: "Average yearly theft from the person in the area", - detail: "Average number of theft from the person offences per year within 50m of the postcode, from police.uk street-level crime data. Includes pickpocketing and bag snatching without force.", + detail: "Average number of theft from the person offences per year near the postcode, from police.uk street-level crime data. Includes pickpocketing and bag snatching without force.", source: "crime", prefix: "", suffix: "/yr", @@ -624,7 +624,7 @@ pub static FEATURE_GROUPS: &[FeatureGroup] = &[ }, step: 1.0, description: "Average yearly shoplifting offences in the area", - detail: "Average number of shoplifting offences per year within 50m of the postcode, from police.uk street-level crime data.", + detail: "Average number of shoplifting offences per year near the postcode, from police.uk street-level crime data.", source: "crime", prefix: "", suffix: "/yr", @@ -639,7 +639,7 @@ pub static FEATURE_GROUPS: &[FeatureGroup] = &[ }, step: 1.0, description: "Average yearly bicycle theft in the area", - detail: "Average number of bicycle theft offences per year within 50m of the postcode, from police.uk street-level crime data.", + detail: "Average number of bicycle theft offences per year near the postcode, from police.uk street-level crime data.", source: "crime", prefix: "", suffix: "/yr", @@ -654,7 +654,7 @@ pub static FEATURE_GROUPS: &[FeatureGroup] = &[ }, step: 1.0, description: "Average yearly drug offences in the area", - detail: "Average number of drug offences per year within 50m of the postcode, from police.uk street-level crime data. Includes possession and trafficking offences.", + detail: "Average number of drug offences per year near the postcode, from police.uk street-level crime data. Includes possession and trafficking offences.", source: "crime", prefix: "", suffix: "/yr", @@ -669,7 +669,7 @@ pub static FEATURE_GROUPS: &[FeatureGroup] = &[ }, step: 1.0, description: "Average yearly weapons possession offences in the area", - detail: "Average number of possession of weapons offences per year within 50m of the postcode, from police.uk street-level crime data.", + detail: "Average number of possession of weapons offences per year near the postcode, from police.uk street-level crime data.", source: "crime", prefix: "", suffix: "/yr", @@ -684,7 +684,7 @@ pub static FEATURE_GROUPS: &[FeatureGroup] = &[ }, step: 1.0, description: "Average yearly public order offences in the area", - detail: "Average number of public order offences per year within 50m of the postcode, from police.uk street-level crime data. Includes causing fear, alarm, or distress.", + detail: "Average number of public order offences per year near the postcode, from police.uk street-level crime data. Includes causing fear, alarm, or distress.", source: "crime", prefix: "", suffix: "/yr", @@ -699,7 +699,7 @@ pub static FEATURE_GROUPS: &[FeatureGroup] = &[ }, step: 1.0, description: "Average yearly other crime in the area", - detail: "Average number of other crime offences per year within 50m of the postcode, from police.uk street-level crime data. A catch-all category for offences not classified elsewhere.", + detail: "Average number of other crime offences per year near the postcode, from police.uk street-level crime data. A catch-all category for offences not classified elsewhere.", source: "crime", prefix: "", suffix: "/yr", diff --git a/server-rs/src/routes/ai_filters.rs b/server-rs/src/routes/ai_filters.rs index 9b7778e..bddba35 100644 --- a/server-rs/src/routes/ai_filters.rs +++ b/server-rs/src/routes/ai_filters.rs @@ -391,7 +391,7 @@ pub fn build_system_prompt( - Use EXACT feature names from the list — spelling, capitalisation, and punctuation must match.\n\ - \"cheap\" / \"affordable\" = lower price range. \"expensive\" = higher price range.\n\ - \"low crime\" / \"safe\" = low values on the Serious crime (avg/yr) and Minor crime (avg/yr) \ - features (incidents counted within 50m of the postcode). Prefer these aggregates for broad \ + features (area-normalised incident density near the postcode). Prefer these aggregates for broad \ area safety; use specific crime features only when the user names a crime type.\n\ - \"quiet\" = low Noise (dB). \"green\" / \"near parks\" = high Number of amenities (Park) within 2km \ or low Distance to nearest park (km), depending on wording.\n\ @@ -1167,7 +1167,8 @@ pub async fn post_ai_filters( .to_string(); // Count matching properties and refine if too restrictive - let (match_count, match_bounds) = count_matching_rows(&state, &filters, &travel_time_filters); + let (match_count, match_bounds) = + count_matching_rows(&state, &filters, &travel_time_filters); info!( match_count = match_count, round = round, diff --git a/server-rs/src/routes/stats.rs b/server-rs/src/routes/stats.rs index ec50ede..1a349a9 100644 --- a/server-rs/src/routes/stats.rs +++ b/server-rs/src/routes/stats.rs @@ -258,10 +258,17 @@ pub fn compute_feature_stats( /// Compute property-weighted per-year crime means across the selection. /// /// Each matching property contributes its postcode's per-year counts (incidents -/// within 50m of that postcode); this is the same property-weighted-average -/// shape used elsewhere in the right pane. Postcodes with no series for a given -/// crime type contribute 0 for that type (matching how the `(avg/yr)` columns -/// treat missing crime types). +/// near that postcode); this is the same property-weighted-average shape used +/// elsewhere in the right pane. +/// +/// Denominators are COVERAGE-AWARE: police.uk has multi-year publication gaps +/// for whole forces (e.g. Greater Manchester from 2019-07), and the pipeline +/// emits a `covered_years` calendar per postcode. A postcode only counts toward +/// a year's denominator if its force published that year — and only then does +/// its missing bar mean a genuine zero. Years no selected postcode covers are +/// omitted entirely (charted as gaps, not zeros). Postcodes without coverage +/// info (legacy parquet without the column) count toward every year, restoring +/// the previous behaviour. pub fn compute_crime_by_year( matching_rows: &[usize], data: &PropertyData, @@ -273,27 +280,34 @@ pub fn compute_crime_by_year( return Vec::new(); } - // For each crime type, accumulate per-year sums and the count of rows whose - // postcode exists in the crime side table. let num_types = crime_by_year.crime_types.len(); let mut per_type_year_sums: Vec> = (0..num_types).map(|_| FxHashMap::default()).collect(); - let mut per_type_row_counts: Vec = vec![0; num_types]; + // Per-year denominator parts: rows whose coverage calendar includes the + // year, plus rows with no calendar at all (legacy: covered everywhere). + let mut covered_counts: FxHashMap = FxHashMap::default(); + let mut fully_covered_rows: u32 = 0; for &row in matching_rows { let postcode = data.postcode(row); - // A postcode absent from the by-year table has no recorded crime within - // 50m, so it contributes 0 to every type's per-year sum. It must still be - // counted in the denominator: the matching `(avg/yr)` stat counts those - // same zero-crime postcodes as 0.0 (crime_by_postcode.parquet has a dense - // row for every boundary postcode), so excluding them here would compute - // the chart over a smaller population and report a higher magnitude than - // the headline. Property postcodes are guaranteed to be boundary - // postcodes by the postcode-boundary-match validation, so "absent" means - // genuinely zero-crime, not missing data. + match crime_by_year.covered_years_by_postcode.get(postcode) { + Some(years) => { + // An empty list (force gap for the whole window / unusable + // boundary geometry) adds nothing: the postcode's crime + // picture is unknown and must not dilute any year's mean. + for &year in years { + *covered_counts.entry(year).or_insert(0) += 1; + } + } + None => fully_covered_rows += 1, + } + + // A postcode with a row but no series for a given type had no recorded + // incidents of that type: it contributes 0 to the sums, and its covered + // years still count in the denominator — a genuine zero. Uncovered + // years are excluded via the denominators instead. if let Some(series_list) = crime_by_year.series_by_postcode.get(postcode) { - // For every type the postcode reports, add its per-year counts. for series in series_list { let acc = &mut per_type_year_sums[series.type_idx as usize]; for point in &series.points { @@ -301,9 +315,6 @@ pub fn compute_crime_by_year( } } } - for c in per_type_row_counts.iter_mut() { - *c += 1; - } } let mut out = Vec::new(); @@ -317,10 +328,6 @@ pub fn compute_crime_by_year( continue; } } - let row_count = per_type_row_counts[type_idx]; - if row_count == 0 { - continue; - } let years = crime_by_year .years_by_type .get(type_idx) @@ -329,15 +336,26 @@ pub fn compute_crime_by_year( if years.is_empty() { continue; } - let denom = row_count as f64; let sums = &per_type_year_sums[type_idx]; let points: Vec = years .iter() - .map(|&year| CrimeYearPoint { - year, - count: (sums.get(&year).copied().unwrap_or(0.0) / denom) as f32, + .filter_map(|&year| { + let denom = fully_covered_rows + + covered_counts.get(&year).copied().unwrap_or(0); + if denom == 0 { + // No selected postcode has published data for this year. + return None; + } + Some(CrimeYearPoint { + year, + count: (sums.get(&year).copied().unwrap_or(0.0) / denom as f64) + as f32, + }) }) .collect(); + if points.is_empty() { + continue; + } out.push(CrimeYearStats { name: name.clone(), points,