perfect-postcode/pipeline/transform/postcode_boundaries/uprn.py
2026-06-02 13:46:18 +01:00

214 lines
7.9 KiB
Python

from pathlib import Path
import numpy as np
import polars as pl
from pipeline.local_temp import local_tmp_dir
from pipeline.utils.postcode_mapping import build_postcode_mapping
from .memory import release_memory
def _canonical_postcode_expr(name: str) -> pl.Expr:
return pl.col(name).str.strip_chars().str.to_uppercase()
def _active_english_arcgis_postcodes(arcgis_path: Path) -> pl.LazyFrame:
return (
pl.read_parquet(
arcgis_path,
columns=["pcds", "east1m", "north1m", "oa21cd", "ctry25cd", "doterm"],
)
.lazy()
.filter(pl.col("ctry25cd") == "E92000001")
.filter(pl.col("doterm").cast(pl.Utf8).is_null())
.select(
_canonical_postcode_expr("pcds").alias("PCDS"),
pl.col("east1m").cast(pl.Float64).alias("GRIDGB1E"),
pl.col("north1m").cast(pl.Float64).alias("GRIDGB1N"),
pl.col("oa21cd").alias("OA21CD"),
)
.filter(
pl.col("PCDS").is_not_null()
& (pl.col("PCDS") != "")
& pl.col("GRIDGB1E").is_not_null()
& pl.col("GRIDGB1N").is_not_null()
& pl.col("OA21CD").is_not_null()
& pl.col("OA21CD").str.starts_with("E")
)
.unique("PCDS")
)
def load_uprns(
uprn_path: Path, arcgis_path: Path | None = None
) -> tuple[pl.DataFrame, dict[str, tuple[int, int]]]:
"""Load UPRNs as a sorted polars DataFrame with OA offset lookup.
Returns (df, offsets) where offsets[oa_code] = (start_row, end_row).
Peak ~5GB during sort, steady state ~1.5GB (Arrow columnar with compact strings).
"""
import tempfile
print("Loading UPRN lookup...")
mapping = None
active_postcode_points = None
if arcgis_path is not None:
mapping = (
build_postcode_mapping(arcgis_path)
.with_columns(
_canonical_postcode_expr("old_postcode").alias("old_postcode"),
_canonical_postcode_expr("new_postcode").alias("new_postcode"),
)
.unique("old_postcode")
)
active_postcode_points = _active_english_arcgis_postcodes(arcgis_path)
# Sort via streaming sink to avoid polars doubling memory during in-memory sort
with tempfile.NamedTemporaryFile(
suffix=".parquet", delete=False, dir=local_tmp_dir()
) as tmp:
tmp_path = Path(tmp.name)
uprns = (
pl.scan_parquet(uprn_path)
.select("GRIDGB1E", "GRIDGB1N", "PCDS", "OA21CD")
.filter(pl.col("OA21CD").str.starts_with("E"))
.filter(pl.col("GRIDGB1E").is_not_null() & pl.col("GRIDGB1N").is_not_null())
.with_columns(_canonical_postcode_expr("PCDS").alias("PCDS"))
.filter(pl.col("PCDS").is_not_null() & (pl.col("PCDS") != ""))
)
if mapping is not None and mapping.height > 0:
# Remap terminated postcodes to their nearest active successor. The
# successor generally lives in a DIFFERENT OA (and at different grid
# coordinates), so the remapped point must adopt the successor's
# authoritative OA/coords — keeping the terminated postcode's original
# OA would seed the successor into an OA it doesn't belong to, splitting
# its boundary across OAs. Genuine (non-remapped) UPRN rows keep their
# own OA, since a live postcode can legitimately span several OAs.
uprns = uprns.join(
mapping.lazy(), left_on="PCDS", right_on="old_postcode", how="left"
).with_columns(pl.col("new_postcode").is_not_null().alias("_remapped"))
if active_postcode_points is not None:
successor_oa = active_postcode_points.rename(
{
"PCDS": "new_postcode",
"GRIDGB1E": "_succ_e",
"GRIDGB1N": "_succ_n",
"OA21CD": "_succ_oa",
}
)
uprns = uprns.join(successor_oa, on="new_postcode", how="left").with_columns(
pl.when("_remapped")
.then(pl.col("_succ_e"))
.otherwise(pl.col("GRIDGB1E"))
.alias("GRIDGB1E"),
pl.when("_remapped")
.then(pl.col("_succ_n"))
.otherwise(pl.col("GRIDGB1N"))
.alias("GRIDGB1N"),
pl.when("_remapped")
.then(pl.col("_succ_oa"))
.otherwise(pl.col("OA21CD"))
.alias("OA21CD"),
)
uprns = uprns.with_columns(
pl.coalesce("new_postcode", "PCDS").alias("PCDS")
).select("GRIDGB1E", "GRIDGB1N", "PCDS", "OA21CD")
if active_postcode_points is not None:
active_postcodes = active_postcode_points.select("PCDS").unique()
uprns = uprns.join(active_postcodes, on="PCDS", how="semi")
missing_active = active_postcode_points.join(
uprns.select("PCDS").unique(), on="PCDS", how="anti"
).select("GRIDGB1E", "GRIDGB1N", "PCDS", "OA21CD")
uprns = pl.concat([uprns, missing_active], how="vertical_relaxed")
uprns.sort("OA21CD").sink_parquet(tmp_path)
release_memory()
# Read the sorted data — only one copy in memory (~2GB)
df = pl.read_parquet(tmp_path)
tmp_path.unlink()
n = len(df)
print(f" Loaded {n:,} UPRNs (England)")
# Compute OA group offsets using polars (avoids 37M Python string creation)
boundary_df = (
df.lazy()
.with_row_index("_i")
.filter(
pl.col("OA21CD").shift(1).is_null()
| (pl.col("OA21CD") != pl.col("OA21CD").shift(1))
)
.select("_i", "OA21CD")
.collect()
)
starts_list = boundary_df["_i"].to_list()
oa_list = boundary_df["OA21CD"].to_list()
del boundary_df
offsets: dict[str, tuple[int, int]] = {}
for j in range(len(starts_list)):
end = starts_list[j + 1] if j + 1 < len(starts_list) else n
offsets[oa_list[j]] = (starts_list[j], end)
del starts_list, oa_list
# Drop OA column (no longer needed) to save ~400MB
df = df.select("GRIDGB1E", "GRIDGB1N", "PCDS")
release_memory()
print(f" Grouped into {len(offsets)} OAs")
return df, offsets
def get_oa_uprns(
df: pl.DataFrame, offsets: dict[str, tuple[int, int]], oa_code: str
) -> tuple[np.ndarray, list[str]]:
"""Get UPRN coordinates and postcodes for a single OA.
Returns (points_nx2, postcodes_list).
"""
s, e = offsets[oa_code]
sub = df[s:e]
points = np.column_stack(
[
sub["GRIDGB1E"].to_numpy(),
sub["GRIDGB1N"].to_numpy(),
]
)
postcodes = sub["PCDS"].to_list()
return points, postcodes
def extract_uprn_arrays(df: pl.DataFrame):
"""Convert the UPRN DataFrame to fork-shareable numpy/Arrow arrays.
Returns ``(east, north, postcodes)``: two float64 ndarrays and a contiguous
pyarrow string Array. Multiprocessing workers slice these per OA via
:func:`get_oa_uprns_arrays` **without touching polars**, which avoids the
fork-after-threads deadlock hazard of polars' rayon pool. Being plain
numpy/Arrow buffers (not millions of Python objects), they are shared by
``fork`` copy-on-write rather than duplicated ~1GB per worker.
"""
import pyarrow as pa
east = np.ascontiguousarray(df["GRIDGB1E"].to_numpy(), dtype=np.float64)
north = np.ascontiguousarray(df["GRIDGB1N"].to_numpy(), dtype=np.float64)
postcodes = df["PCDS"].to_arrow()
if isinstance(postcodes, pa.ChunkedArray):
postcodes = postcodes.combine_chunks()
return east, north, postcodes
def get_oa_uprns_arrays(
east: np.ndarray,
north: np.ndarray,
postcodes,
offsets: dict[str, tuple[int, int]],
oa_code: str,
) -> tuple[np.ndarray, list[str]]:
"""Like :func:`get_oa_uprns`, but slices the fork-shareable arrays from
:func:`extract_uprn_arrays` (no polars), so it is safe to call in workers."""
s, e = offsets[oa_code]
points = np.column_stack([east[s:e], north[s:e]])
return points, postcodes.slice(s, e - s).to_pylist()