perfect-postcode/pipeline/transform/school_catchments.py

748 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Model historical school catchment areas and count them per postcode.
No national dataset of school catchment areas exists for England: catchments
are set per admission authority, only a handful of councils publish polygons,
and the pupil-residence data behind commercial "heatmap" catchments lives in
the restricted National Pupil Database. This module therefore COMPILES one
from open data, estimating each school's admission cutoff distance ("last
distance offered") — the radius within which an applicant would plausibly be
offered a place.
Model: English state admissions are run as deferred acceptance with distance
tie-breaks, which in a continuum economy is equivalent to finding
market-clearing cutoff distances (Azevedo & Leshno 2016). Per phase
(primary/secondary):
1. Demand — Census 2021 children per LSOA (TS007A age bands, prorated to the
phase's cohort ages) split evenly across the LSOA's live postcodes.
2. Supply — every open, non-selective state-funded school (GIAS), with a fill
target of max(capacity, headcount) prorated to the phase's cohorts
(sixth-form and nursery years carry reduced weight, since their class
sizes differ and they are not allocated by the same admissions round).
3. Preferences — children prefer nearby schools, trading distance against
Ofsted grade: a school's effective distance is its real distance minus a
grade bonus (Outstanding > Good > ungraded > below-Good). Because real
first preferences are heterogeneous, each postcode's children split
across nearby feasible schools with logit weights over effective
distance rather than all picking the same one.
4. Equilibrium — cutoffs start unbounded and tighten monotonically: each
round, children apply to their preferred feasible school(s), and
oversubscribed schools tighten their cutoff to the distance of their
marginal admitted child. Converges to the deferred-acceptance outcome.
5. Schools that never fill have no binding cutoff — anyone who applies gets
in — so their feasibility radius is the distance within which the local
child population would cover their fill target, capped.
The free parameters (preference bonuses, demand scale, choice temperature,
residual calibration factors) are CALIBRATED against published "last
distance offered" figures scraped from nine local authorities' allocation
reports — see check_school_cutoffs.py and the constants below.
A postcode is "inside the catchment" of every school whose cutoff radius
covers it. The output counts those schools per postcode for the four
good+/outstanding x primary/secondary categories (Ofsted-classified, same
rules as the previous proximity metric). Selective (grammar) schools are
excluded throughout: their intakes are test-based and region-wide, so a
distance model would fabricate a catchment that does not exist.
Known limitations: faith oversubscription criteria are not modelled (whether
a faith school's catchment is open to a given family depends on the family),
and Census 2021 child counts lag current rolls slightly. Cutoffs are
straight-line distances, the modal LA tie-break criterion.
"""
import argparse
from pathlib import Path
import numpy as np
import polars as pl
from scipy.spatial import cKDTree
from pipeline.utils.poi_counts import _project_lat_lng_km, valid_uk_coords_mask
SCHOOL_GROUPS = {
"good_primary": ["good_primary", "outstanding_primary"],
"good_secondary": ["good_secondary", "outstanding_secondary"],
"outstanding_primary": ["outstanding_primary"],
"outstanding_secondary": ["outstanding_secondary"],
}
# Age thresholds for deciding which phase(s) a school serves. A school serves
# PRIMARY-age children if its statutory lowest age is <= 10, and SECONDARY-age
# children if its statutory highest age is >= 12. All-through (e.g. 3-18) and
# middle-deemed-secondary (e.g. 9-13) schools satisfy BOTH and so are counted in
# both the primary and the secondary metrics — Ofsted's coarse "Ofsted phase"
# labels such schools as just "Secondary", which previously hid them from every
# postcode's primary-school count.
PRIMARY_MAX_AGE = 10
SECONDARY_MIN_AGE = 12
# Cohort ages (inclusive) each phase competes for: Reception-Y6 and Y7-Y11.
PRIMARY_AGES = (4, 10)
SECONDARY_AGES = (11, 15)
# Cohort weights for prorating a school's headcount/capacity across the ages
# it teaches. Nursery classes are typically part-time and small; sixth forms
# run at roughly 60% of a school's Y7-Y11 cohort size. A flat proration
# undersupplied secondary places by ~8%.
NURSERY_COHORT_WEIGHT = 0.5 # ages < 4
SIXTH_FORM_COHORT_WEIGHT = 0.6 # ages >= 16
# Only schools that admit (mostly) by geography take part in the assignment.
# Independent, special and Welsh schools and post-16 colleges either don't
# admit by distance or fall outside the England postcode universe; selective
# (grammar) schools admit by test from a wide region.
STATE_SCHOOL_TYPE_GROUPS = [
"Academies",
"Local authority maintained schools",
"Free Schools",
]
# Preference bonuses (km of extra travel a family accepts for a better
# school), applied as a discount on effective distance when children choose.
# Grade 3/4 schools repel by the same magnitudes.
PREF_BONUS_OUTSTANDING_KM = 0.6
PREF_BONUS_GOOD_KM = 0.3
# Share of resident children who actually compete for state places. Census
# 2021 counts overstate current entry cohorts (birth rates fell ~10% between
# 2016 and 2021, which is exactly the gap between the census stock and the
# children reaching Reception by mid-decade) and independent/home-educated
# children (~7%) never enter the allocation at all. Without this, modelled
# cutoffs run systematically tight and undersubscribed schools look full.
DEMAND_SCALE = 0.8
# Logit choice temperature (km). With deterministic choice every child at a
# postcode ranks the same school first, so popular schools fill entirely from
# their nearest band and the marginal admitted child sits unrealistically
# close. Real first preferences are heterogeneous; a school draws only a
# distance-decaying share of nearby families. Children therefore split across
# nearby feasible schools with weights softmax(-effective_distance / tau):
# higher tau = more smearing = wider cutoffs. tau -> 0 recovers the
# deterministic model (used by the unit tests). Calibrated 2026-06 against
# 240 published binding cutoffs from 9 LAs (check_school_cutoffs.py): 0.3 km
# maximises rank correlation and within-2x share; beyond ~0.6 the smearing
# erases school-to-school differentiation (Spearman 0.24 -> 0.01).
CHOICE_TEMPERATURE_KM = 0.3
# Residual calibration from the same ground truth: after the equilibrium
# solve, modelled cutoffs still ran systematically tight (median log2 bias
# -0.53 primary / -0.36 secondary at the settings above — published "last
# distance offered" reflects offer-day frictions, waiting-list churn and
# furthest-applicant noise that no clean equilibrium reproduces). Radii are
# multiplied by 2^-bias so the modelled median matches the published median;
# rank ordering is unaffected.
CUTOFF_CALIBRATION_FACTOR = {"primary": 1.44, "secondary": 1.28}
# Each demand postcode considers this many nearest schools; beyond ~16
# candidates assignment shares are negligible.
NEAREST_SCHOOL_CANDIDATES = 16
# Radius guard rails: the floor absorbs postcode-centroid noise around tiny
# urban catchments; the cap bounds feasibility radii for schools the model
# never fills (mostly rural).
MIN_RADIUS_KM = 0.3
MAX_RADIUS_KM = 25.0
EQUILIBRIUM_MAX_ITER = 100
def classify_good_plus_schools(
ofsted: pl.DataFrame, open_urns: set[int] | None = None
) -> pl.DataFrame:
"""Label good+/outstanding primary & secondary schools for catchment counts.
Derives a grade ("1" = outstanding, "2" = good) and one or two
``category`` rows per school, returning a ``(urn, category)`` frame.
Schools with a recent GRADED inspection carry a 1-4 grade in "Latest OEIF
overall effectiveness" (OEIF = the previous Ofsted Education Inspection
Framework). A large and growing share of schools were last inspected under an
UNGRADED (Section 8) inspection or the post-2024 report-card framework, so
that column is null/"Not judged" for them even when they are demonstrably
good — their status lives in "Ungraded inspection overall outcome" ("School
remains Good"/"School remains Outstanding"). Filtering on the graded column
alone dropped ~7,000 genuinely good/outstanding schools. We fall back to the
ungraded outcome, but ONLY when there is no usable graded result
(null/"Not judged"), so a genuine grade 3/4 is never overridden.
Outcomes flagged "(Concerns)" are NOT treated as good+: a "remains Good
(Concerns)" outcome signals inspectors found issues warranting an earlier
graded re-inspection, so marketing it as a good+ school is misleading.
Phase assignment uses the statutory age range when available (so all-through
and middle schools count toward BOTH primary and secondary), falling back to
the coarse "Ofsted phase" label when age columns are absent. When
``open_urns`` is given, schools whose URN is not in the current GIAS open
register are dropped so closed/merged schools are not counted.
"""
graded = _with_derived_grade(ofsted).filter(
pl.col("Ofsted phase").is_in(["Primary", "Secondary"])
& pl.col("_ofsted_grade").is_in(["1", "2"])
)
# Drop schools no longer open (closed/merged) when the GIAS open register is
# provided, so stale Ofsted "latest inspection" rows are not counted.
if open_urns is not None and "URN" in graded.columns:
graded = graded.filter(pl.col("URN").is_in(list(open_urns)))
# Decide which phase(s) each school serves.
if {"Statutory lowest age", "Statutory highest age"} <= set(graded.columns):
low = pl.col("Statutory lowest age").cast(pl.Int64, strict=False)
high = pl.col("Statutory highest age").cast(pl.Int64, strict=False)
serves_primary = (
pl.when(low.is_not_null())
.then(low <= PRIMARY_MAX_AGE)
.otherwise(pl.col("Ofsted phase") == "Primary")
)
serves_secondary = (
pl.when(high.is_not_null())
.then(high >= SECONDARY_MIN_AGE)
.otherwise(pl.col("Ofsted phase") == "Secondary")
)
else:
serves_primary = pl.col("Ofsted phase") == "Primary"
serves_secondary = pl.col("Ofsted phase") == "Secondary"
graded = graded.with_columns(
serves_primary.alias("_serves_primary"),
serves_secondary.alias("_serves_secondary"),
)
# Good+ groups include both grade variants; outstanding groups count grade 1.
# A school can yield up to two rows (primary and secondary).
primary = graded.filter(pl.col("_serves_primary")).with_columns(
pl.when(pl.col("_ofsted_grade") == "1")
.then(pl.lit("outstanding_primary"))
.otherwise(pl.lit("good_primary"))
.alias("category")
)
secondary = graded.filter(pl.col("_serves_secondary")).with_columns(
pl.when(pl.col("_ofsted_grade") == "1")
.then(pl.lit("outstanding_secondary"))
.otherwise(pl.lit("good_secondary"))
.alias("category")
)
return pl.concat([primary, secondary]).select(
pl.col("URN").cast(pl.Int64).alias("urn"),
"category",
)
def _with_derived_grade(ofsted: pl.DataFrame) -> pl.DataFrame:
"""Attach ``_ofsted_grade`` ("1"-"4" or null): graded OEIF result first,
falling back to ungraded "School remains Good/Outstanding" outcomes (minus
"(Concerns)") only when there is no usable graded result."""
# Cast to Utf8 so the string predicates below are well-defined even if a
# column happens to be entirely null (read back as a Null dtype).
oeif = pl.col("Latest OEIF overall effectiveness").cast(pl.Utf8, strict=False)
ungraded = pl.col("Ungraded inspection overall outcome").cast(pl.Utf8, strict=False)
no_usable_grade = oeif.is_null() | (oeif == "Not judged")
has_concern = ungraded.str.contains(r"\(Concerns\)")
remains_outstanding = (
ungraded.str.starts_with("School remains Outstanding") & ~has_concern
)
remains_good = ungraded.str.starts_with("School remains Good") & ~has_concern
return ofsted.with_columns(
pl.when(oeif.is_in(["1", "2", "3", "4"]))
.then(oeif)
.when(no_usable_grade & remains_outstanding)
.then(pl.lit("1"))
.when(no_usable_grade & remains_good)
.then(pl.lit("2"))
.otherwise(None)
.alias("_ofsted_grade")
)
def school_preference_bonuses(
ofsted: pl.DataFrame,
bonus_outstanding_km: float = PREF_BONUS_OUTSTANDING_KM,
bonus_good_km: float = PREF_BONUS_GOOD_KM,
) -> pl.DataFrame:
"""Per-school preference bonus in km, from the derived Ofsted grade.
Outstanding/Good schools attract demand from further away; grade 3/4
schools repel it symmetrically. Ungraded (typically new) schools are
neutral. Returns ``(urn, bonus_km)`` with one row per URN.
"""
bonus = {
"1": bonus_outstanding_km,
"2": bonus_good_km,
"3": -bonus_good_km,
"4": -bonus_outstanding_km,
}
return (
_with_derived_grade(ofsted)
.filter(pl.col("URN").is_not_null())
.select(
pl.col("URN").cast(pl.Int64).alias("urn"),
pl.col("_ofsted_grade")
.replace_strict(bonus, default=0.0, return_dtype=pl.Float64)
.alias("bonus_km"),
)
.unique(subset="urn", keep="first")
)
def phase_intakes(gias: pl.DataFrame) -> pl.DataFrame:
"""Per-school phase-prorated fill targets for the admissions model.
Returns one row per open, non-selective state-funded school with valid
coordinates: ``(urn, lat, lng, primary_intake, secondary_intake)``. The
fill target — max(capacity, headcount), so over-full schools keep their
demonstrated size and under-full schools can admit up to capacity — is
spread over the cohort ages the school teaches (parsed from ``age_range``,
e.g. "311" = ages 3..10) with nursery and sixth-form ages down-weighted,
and each phase receives the share of cohort weight in its age band.
"""
ages = pl.col("age_range").str.extract_all(r"\d+")
low = ages.list.get(0, null_on_oob=True).cast(pl.Int64, strict=False)
# The leaving age is exclusive as a cohort: a "3-11" school teaches
# children aged 3 through 10.
high = ages.list.get(1, null_on_oob=True).cast(pl.Int64, strict=False) - 1
schools = (
gias.filter(
pl.col("type_group").is_in(STATE_SCHOOL_TYPE_GROUPS)
& (
pl.col("admissions_policy").is_null()
| (pl.col("admissions_policy") != "Selective")
)
& pl.col("lat").is_not_null()
& pl.col("lng").is_not_null()
)
.with_columns(low.alias("_low"), high.alias("_high"))
.filter(pl.col("_low").is_not_null() & (pl.col("_high") >= pl.col("_low")))
.with_columns(
pl.max_horizontal(
pl.col("pupils").fill_null(0), pl.col("capacity").fill_null(0)
)
.cast(pl.Float64)
.alias("_fill_target"),
)
.filter(pl.col("_fill_target") > 0)
)
def weighted_overlap(lo: int, hi: int, weight: float = 1.0) -> pl.Expr:
"""Cohort weight contributed by ages [lo, hi] within [_low, _high]."""
return (
weight
* (
pl.min_horizontal(pl.col("_high"), hi)
- pl.max_horizontal(pl.col("_low"), lo)
+ 1
).clip(lower_bound=0)
).cast(pl.Float64)
total_weight = (
weighted_overlap(0, 3, NURSERY_COHORT_WEIGHT)
+ weighted_overlap(4, 15)
+ weighted_overlap(16, 30, SIXTH_FORM_COHORT_WEIGHT)
)
return schools.select(
pl.col("urn").cast(pl.Int64),
"lat",
"lng",
(pl.col("_fill_target") * weighted_overlap(*PRIMARY_AGES) / total_weight).alias(
"primary_intake"
),
(
pl.col("_fill_target") * weighted_overlap(*SECONDARY_AGES) / total_weight
).alias("secondary_intake"),
)
def children_per_postcode(
postcodes: pl.DataFrame, lsoa_children: pl.DataFrame
) -> pl.DataFrame:
"""Estimate phase-age children living at each live postcode.
Census age bands don't align with school phases, so phase totals take
fractional shares of bands (one fifth per single year of age): primary
(4-10) = age 4 + ages 5-9 + age 10, secondary (11-15) = ages 11-14 +
age 15. LSOA totals are then split evenly across the LSOA's postcodes.
"""
lsoa = lsoa_children.select(
"lsoa21",
(
0.2 * pl.col("aged_0_4") + pl.col("aged_5_9") + 0.2 * pl.col("aged_10_14")
).alias("_lsoa_primary"),
(0.8 * pl.col("aged_10_14") + 0.2 * pl.col("aged_15_19")).alias(
"_lsoa_secondary"
),
)
return (
postcodes.join(lsoa, left_on="lsoa21cd", right_on="lsoa21", how="inner")
.with_columns(pl.len().over("lsoa21cd").alias("_lsoa_postcodes"))
.select(
"postcode",
"lat",
"lng",
(pl.col("_lsoa_primary") / pl.col("_lsoa_postcodes")).alias(
"primary_children"
),
(pl.col("_lsoa_secondary") / pl.col("_lsoa_postcodes")).alias(
"secondary_children"
),
)
)
def equilibrium_cutoffs(
school_xy: np.ndarray,
fill_target: np.ndarray,
bonus_km: np.ndarray,
pc_xy: np.ndarray,
pc_children: np.ndarray,
k: int = NEAREST_SCHOOL_CANDIDATES,
max_iter: int = EQUILIBRIUM_MAX_ITER,
tau_km: float = CHOICE_TEMPERATURE_KM,
) -> np.ndarray:
"""Market-clearing admission cutoff distance (km) per school.
Deferred acceptance with distance priority, solved as cutoff dynamics
(Azevedo & Leshno): cutoffs start unbounded; each round every child unit
applies to its preferred feasible school(s) — a logit split over
effective distance (distance - school bonus) among schools whose cutoff
covers it, collapsing to the single best school when ``tau_km`` is 0 —
and each oversubscribed school tightens its cutoff to its marginal
admitted child's distance. Cutoffs only ever tighten, so the iteration
converges.
Returns np.inf for schools that never fill (no binding cutoff).
"""
n_schools = len(school_xy)
k = min(k, n_schools)
demand = np.flatnonzero(pc_children > 0)
weights = pc_children[demand]
tree = cKDTree(school_xy)
dist, cand = tree.query(pc_xy[demand], k=k, workers=-1)
if k == 1:
dist = dist[:, None]
cand = cand[:, None]
eff = dist - bonus_km[cand]
rows = np.arange(len(demand))
cutoff = np.full(n_schools, np.inf)
for _ in range(max_iter):
eff_feasible = np.where(dist <= cutoff[cand], eff, np.inf)
if tau_km <= 0:
choice = np.argmin(eff_feasible, axis=1)
valid = np.isfinite(eff_feasible[rows, choice])
chosen_school = cand[rows[valid], choice[valid]]
chosen_dist = dist[rows[valid], choice[valid]]
chosen_mass = weights[valid]
else:
z = -eff_feasible / tau_km
z_max = z.max(axis=1, keepdims=True)
share = np.exp(z - np.where(np.isfinite(z_max), z_max, 0.0))
share[~np.isfinite(eff_feasible)] = 0.0
total = share.sum(axis=1, keepdims=True)
mass = weights[:, None] * share / np.where(total > 0, total, 1.0)
# Sub-thousandth-of-a-child applications only slow the sort down.
keep = mass > 1e-3
chosen_school = cand[keep]
chosen_dist = dist[keep]
chosen_mass = mass[keep]
order = np.lexsort((chosen_dist, chosen_school))
s_sorted = chosen_school[order]
d_sorted = chosen_dist[order]
m_cum = np.cumsum(chosen_mass[order])
boundaries = np.flatnonzero(np.diff(s_sorted)) + 1
starts = np.concatenate(([0], boundaries))
ends = np.concatenate((boundaries, [len(s_sorted)]))
changed = False
for start, end in zip(starts, ends):
school = s_sorted[start]
seg_cum = m_cum[start:end] - (m_cum[start - 1] if start else 0.0)
if seg_cum[-1] <= fill_target[school]:
continue
marginal = d_sorted[start + np.searchsorted(seg_cum, fill_target[school])]
if marginal < cutoff[school]:
cutoff[school] = marginal
changed = True
if not changed:
break
return cutoff
def capacity_fill_radii(
school_xy: np.ndarray,
fill_target: np.ndarray,
pc_xy: np.ndarray,
pc_children: np.ndarray,
max_radius_km: float = MAX_RADIUS_KM,
) -> np.ndarray:
"""Feasibility radius for schools without a binding cutoff.
An undersubscribed school admits anyone who applies, so its catchment is
bounded by plausibility rather than competition: the distance within
which the local child population would cover its fill target. Capped at
``max_radius_km``.
"""
demand = np.flatnonzero(pc_children > 0)
tree = cKDTree(pc_xy[demand])
radii = np.full(len(school_xy), max_radius_km)
k = min(4096, len(demand))
for i in range(len(school_xy)):
dists, idx = tree.query(
school_xy[i], k=k, distance_upper_bound=max_radius_km
)
found = np.isfinite(dists)
cum = np.cumsum(pc_children[demand[idx[found]]])
if len(cum) and cum[-1] >= fill_target[i]:
radii[i] = dists[found][np.searchsorted(cum, fill_target[i])]
return radii
def count_covering_catchments(
pc_xy: np.ndarray,
pc_valid: np.ndarray,
school_xy: np.ndarray,
school_radii: np.ndarray,
n_postcodes: int,
) -> np.ndarray:
"""Count, per postcode, how many schools' catchment radii cover it."""
counts = np.zeros(n_postcodes, dtype=np.int32)
if len(school_xy) == 0:
return counts
valid_indices = np.flatnonzero(pc_valid)
tree = cKDTree(pc_xy[valid_indices])
covered = np.zeros(len(valid_indices), dtype=np.int32)
for indices in tree.query_ball_point(school_xy, school_radii, workers=-1):
covered[indices] += 1
counts[valid_indices] = covered
return counts
def main():
parser = argparse.ArgumentParser(
description=(
"Model school admission cutoff radii and count good+/outstanding "
"primary/secondary catchments covering each postcode"
)
)
parser.add_argument(
"--ofsted", type=Path, required=True, help="Ofsted inspection parquet"
)
parser.add_argument(
"--gias", type=Path, required=True, help="GIAS open-school parquet"
)
parser.add_argument(
"--arcgis", type=Path, required=True, help="ArcGIS postcode parquet"
)
parser.add_argument(
"--lsoa-children",
type=Path,
required=True,
help="Census 2021 children by LSOA parquet",
)
parser.add_argument(
"--output",
type=Path,
default=None,
help="Per-postcode counts parquet; omit for calibration runs that only "
"need --schools-output",
)
parser.add_argument(
"--schools-output",
type=Path,
default=None,
help="Optional per-school catchment radii parquet (for calibration/debugging)",
)
parser.add_argument(
"--bonus-outstanding-km",
type=float,
default=PREF_BONUS_OUTSTANDING_KM,
help="Preference bonus for Outstanding schools (calibration sweeps)",
)
parser.add_argument(
"--bonus-good-km",
type=float,
default=PREF_BONUS_GOOD_KM,
help="Preference bonus for Good schools (calibration sweeps)",
)
parser.add_argument(
"--demand-scale",
type=float,
default=DEMAND_SCALE,
help="Share of resident children competing for state places",
)
parser.add_argument(
"--choice-temperature-km",
type=float,
default=CHOICE_TEMPERATURE_KM,
help="Logit choice temperature over effective distance",
)
args = parser.parse_args()
gias = pl.read_parquet(args.gias)
open_urns = set(
gias.select(pl.col("urn").cast(pl.Int64, strict=False))
.to_series()
.drop_nulls()
.to_list()
)
print(f"GIAS open register: {len(open_urns):,} open school URNs")
ofsted = pl.read_parquet(args.ofsted)
rated = classify_good_plus_schools(ofsted, open_urns=open_urns)
if rated.is_empty():
raise ValueError("No good+ primary/secondary Ofsted schools found")
print(f"Good+ school/phase rows: {len(rated):,}")
supply = phase_intakes(gias).join(
school_preference_bonuses(
ofsted,
bonus_outstanding_km=args.bonus_outstanding_km,
bonus_good_km=args.bonus_good_km,
),
on="urn",
how="left",
).with_columns(pl.col("bonus_km").fill_null(0.0))
print(f"State schools in admissions model: {len(supply):,}")
arcgis = pl.read_parquet(args.arcgis).select(
pl.col("pcds").alias("postcode"),
"lat",
pl.col("long").alias("lng"),
"lsoa21cd",
"doterm",
)
live = arcgis.filter(
pl.col("doterm").is_null() & pl.col("lsoa21cd").str.starts_with("E")
)
demand = children_per_postcode(live, pl.read_parquet(args.lsoa_children))
print(
f"Demand postcodes: {len(demand):,} "
f"({demand['primary_children'].sum():,.0f} primary-age, "
f"{demand['secondary_children'].sum():,.0f} secondary-age children)"
)
# Shared local-km projection so assignment and coverage use one metric.
pc_lats = arcgis["lat"].to_numpy()
pc_lngs = arcgis["lng"].to_numpy()
pc_valid = valid_uk_coords_mask(pc_lats, pc_lngs)
origin_lat = float(np.mean(pc_lats[pc_valid]))
pc_xy = _project_lat_lng_km(pc_lats, pc_lngs, origin_lat)
demand_lats = demand["lat"].to_numpy()
demand_lngs = demand["lng"].to_numpy()
demand_valid = valid_uk_coords_mask(demand_lats, demand_lngs)
demand_xy = _project_lat_lng_km(demand_lats, demand_lngs, origin_lat)
school_xy = _project_lat_lng_km(
supply["lat"].to_numpy(), supply["lng"].to_numpy(), origin_lat
)
radii = {}
for phase in ("primary", "secondary"):
in_phase = supply[f"{phase}_intake"].to_numpy() > 0
targets = supply[f"{phase}_intake"].to_numpy()[in_phase]
xy = school_xy[in_phase]
children = np.where(
demand_valid,
demand[f"{phase}_children"].to_numpy() * args.demand_scale,
0.0,
)
print(f"Solving {phase} admissions for {in_phase.sum():,} schools...")
cutoffs = equilibrium_cutoffs(
xy,
targets,
supply["bonus_km"].to_numpy()[in_phase],
demand_xy,
children,
tau_km=args.choice_temperature_km,
)
filled = np.isfinite(cutoffs)
print(
f" {filled.sum():,} schools have binding cutoffs "
f"(median {np.median(cutoffs[filled]):.2f} km); "
f"{(~filled).sum():,} undersubscribed"
)
fallback = capacity_fill_radii(
xy[~filled], targets[~filled], demand_xy, children
)
raw = cutoffs.copy()
raw[~filled] = fallback
radii[phase] = pl.DataFrame(
{
"urn": supply["urn"].to_numpy()[in_phase],
"phase": phase,
"cutoff_km": raw,
"filled": filled,
"radius_km": np.clip(
raw * CUTOFF_CALIBRATION_FACTOR[phase],
MIN_RADIUS_KM,
MAX_RADIUS_KM,
),
}
)
print(
f" radius km: median {radii[phase]['radius_km'].median():.2f}, "
f"p90 {radii[phase]['radius_km'].quantile(0.9):.2f}"
)
# Attach each rated school's phase radius; rated schools outside the
# admissions model (special schools, selective schools, missing
# headcounts) cannot be given a defensible radius and are dropped.
rated = rated.with_columns(
pl.col("category").str.split("_").list.get(1).alias("phase")
)
rated_with_radius = rated.join(
pl.concat(list(radii.values())), on=["urn", "phase"], how="inner"
).join(supply.select("urn", "lat", "lng"), on="urn", how="inner")
dropped = len(rated) - len(rated_with_radius)
print(
f"Rated school/phase rows with radii: {len(rated_with_radius):,} "
f"(dropped {dropped:,}, incl. selective schools)"
)
if args.output is None and args.schools_output is None:
raise SystemExit("Provide --output and/or --schools-output")
if args.output is not None:
category_counts = {}
for category in set(c for cats in SCHOOL_GROUPS.values() for c in cats):
cat = rated_with_radius.filter(pl.col("category") == category)
cat_xy = _project_lat_lng_km(
cat["lat"].to_numpy(), cat["lng"].to_numpy(), origin_lat
)
category_counts[category] = count_covering_catchments(
pc_xy, pc_valid, cat_xy, cat["radius_km"].to_numpy(), len(arcgis)
)
print(f" {category}: {len(cat):,} schools")
result = pl.DataFrame(
{
"postcode": arcgis["postcode"],
**{
f"{group}_catchments": sum(category_counts[c] for c in categories)
for group, categories in SCHOOL_GROUPS.items()
},
}
)
for group in SCHOOL_GROUPS:
col = result[f"{group}_catchments"]
print(f" {group}_catchments: mean {col.mean():.2f}, max {col.max()}")
args.output.parent.mkdir(parents=True, exist_ok=True)
result.write_parquet(args.output)
size_mb = args.output.stat().st_size / (1024 * 1024)
print(f"Wrote {args.output} ({size_mb:.1f} MB)")
if args.schools_output is not None:
schools_out = rated_with_radius.select(
"urn", "category", "phase", "cutoff_km", "filled", "radius_km", "lat", "lng"
)
args.schools_output.parent.mkdir(parents=True, exist_ok=True)
schools_out.write_parquet(args.schools_output)
print(f"Wrote {args.schools_output}")
if __name__ == "__main__":
main()