from pathlib import Path import numpy as np import polars as pl from pipeline.local_temp import local_tmp_dir from pipeline.utils.postcode_mapping import build_postcode_mapping from .memory import release_memory def _canonical_postcode_expr(name: str) -> pl.Expr: return pl.col(name).str.strip_chars().str.to_uppercase() def _active_english_arcgis_postcodes(arcgis_path: Path) -> pl.LazyFrame: return ( pl.read_parquet( arcgis_path, columns=["pcds", "east1m", "north1m", "oa21cd", "ctry25cd", "doterm"], ) .lazy() .filter(pl.col("ctry25cd") == "E92000001") .filter(pl.col("doterm").cast(pl.Utf8).is_null()) .select( _canonical_postcode_expr("pcds").alias("PCDS"), pl.col("east1m").cast(pl.Float64).alias("GRIDGB1E"), pl.col("north1m").cast(pl.Float64).alias("GRIDGB1N"), pl.col("oa21cd").alias("OA21CD"), ) .filter( pl.col("PCDS").is_not_null() & (pl.col("PCDS") != "") & pl.col("GRIDGB1E").is_not_null() & pl.col("GRIDGB1N").is_not_null() & pl.col("OA21CD").is_not_null() & pl.col("OA21CD").str.starts_with("E") ) .unique("PCDS") ) def load_uprns( uprn_path: Path, arcgis_path: Path | None = None ) -> tuple[pl.DataFrame, dict[str, tuple[int, int]]]: """Load UPRNs as a sorted polars DataFrame with OA offset lookup. Returns (df, offsets) where offsets[oa_code] = (start_row, end_row). Peak ~5GB during sort, steady state ~1.5GB (Arrow columnar with compact strings). """ import tempfile print("Loading UPRN lookup...") mapping = None active_postcode_points = None if arcgis_path is not None: mapping = ( build_postcode_mapping(arcgis_path) .with_columns( _canonical_postcode_expr("old_postcode").alias("old_postcode"), _canonical_postcode_expr("new_postcode").alias("new_postcode"), ) .unique("old_postcode") ) active_postcode_points = _active_english_arcgis_postcodes(arcgis_path) # Sort via streaming sink to avoid polars doubling memory during in-memory sort with tempfile.NamedTemporaryFile( suffix=".parquet", delete=False, dir=local_tmp_dir() ) as tmp: tmp_path = Path(tmp.name) uprns = ( pl.scan_parquet(uprn_path) .select("GRIDGB1E", "GRIDGB1N", "PCDS", "OA21CD") .filter(pl.col("OA21CD").str.starts_with("E")) .filter(pl.col("GRIDGB1E").is_not_null() & pl.col("GRIDGB1N").is_not_null()) .with_columns(_canonical_postcode_expr("PCDS").alias("PCDS")) .filter(pl.col("PCDS").is_not_null() & (pl.col("PCDS") != "")) ) if mapping is not None and mapping.height > 0: # Remap terminated postcodes to their nearest active successor. The # successor generally lives in a DIFFERENT OA (and at different grid # coordinates), so the remapped point must adopt the successor's # authoritative OA/coords — keeping the terminated postcode's original # OA would seed the successor into an OA it doesn't belong to, splitting # its boundary across OAs. Genuine (non-remapped) UPRN rows keep their # own OA, since a live postcode can legitimately span several OAs. uprns = uprns.join( mapping.lazy(), left_on="PCDS", right_on="old_postcode", how="left" ).with_columns(pl.col("new_postcode").is_not_null().alias("_remapped")) if active_postcode_points is not None: successor_oa = active_postcode_points.rename( { "PCDS": "new_postcode", "GRIDGB1E": "_succ_e", "GRIDGB1N": "_succ_n", "OA21CD": "_succ_oa", } ) uprns = uprns.join(successor_oa, on="new_postcode", how="left").with_columns( pl.when("_remapped") .then(pl.col("_succ_e")) .otherwise(pl.col("GRIDGB1E")) .alias("GRIDGB1E"), pl.when("_remapped") .then(pl.col("_succ_n")) .otherwise(pl.col("GRIDGB1N")) .alias("GRIDGB1N"), pl.when("_remapped") .then(pl.col("_succ_oa")) .otherwise(pl.col("OA21CD")) .alias("OA21CD"), ) uprns = uprns.with_columns( pl.coalesce("new_postcode", "PCDS").alias("PCDS") ).select("GRIDGB1E", "GRIDGB1N", "PCDS", "OA21CD") if active_postcode_points is not None: active_postcodes = active_postcode_points.select("PCDS").unique() uprns = uprns.join(active_postcodes, on="PCDS", how="semi") missing_active = active_postcode_points.join( uprns.select("PCDS").unique(), on="PCDS", how="anti" ).select("GRIDGB1E", "GRIDGB1N", "PCDS", "OA21CD") uprns = pl.concat([uprns, missing_active], how="vertical_relaxed") uprns.sort("OA21CD").sink_parquet(tmp_path) release_memory() # Read the sorted data — only one copy in memory (~2GB) df = pl.read_parquet(tmp_path) tmp_path.unlink() n = len(df) print(f" Loaded {n:,} UPRNs (England)") # Compute OA group offsets using polars (avoids 37M Python string creation) boundary_df = ( df.lazy() .with_row_index("_i") .filter( pl.col("OA21CD").shift(1).is_null() | (pl.col("OA21CD") != pl.col("OA21CD").shift(1)) ) .select("_i", "OA21CD") .collect() ) starts_list = boundary_df["_i"].to_list() oa_list = boundary_df["OA21CD"].to_list() del boundary_df offsets: dict[str, tuple[int, int]] = {} for j in range(len(starts_list)): end = starts_list[j + 1] if j + 1 < len(starts_list) else n offsets[oa_list[j]] = (starts_list[j], end) del starts_list, oa_list # Drop OA column (no longer needed) to save ~400MB df = df.select("GRIDGB1E", "GRIDGB1N", "PCDS") release_memory() print(f" Grouped into {len(offsets)} OAs") return df, offsets def get_oa_uprns( df: pl.DataFrame, offsets: dict[str, tuple[int, int]], oa_code: str ) -> tuple[np.ndarray, list[str]]: """Get UPRN coordinates and postcodes for a single OA. Returns (points_nx2, postcodes_list). """ s, e = offsets[oa_code] sub = df[s:e] points = np.column_stack( [ sub["GRIDGB1E"].to_numpy(), sub["GRIDGB1N"].to_numpy(), ] ) postcodes = sub["PCDS"].to_list() return points, postcodes def extract_uprn_arrays(df: pl.DataFrame): """Convert the UPRN DataFrame to fork-shareable numpy/Arrow arrays. Returns ``(east, north, postcodes)``: two float64 ndarrays and a contiguous pyarrow string Array. Multiprocessing workers slice these per OA via :func:`get_oa_uprns_arrays` **without touching polars**, which avoids the fork-after-threads deadlock hazard of polars' rayon pool. Being plain numpy/Arrow buffers (not millions of Python objects), they are shared by ``fork`` copy-on-write rather than duplicated ~1GB per worker. """ import pyarrow as pa east = np.ascontiguousarray(df["GRIDGB1E"].to_numpy(), dtype=np.float64) north = np.ascontiguousarray(df["GRIDGB1N"].to_numpy(), dtype=np.float64) postcodes = df["PCDS"].to_arrow() if isinstance(postcodes, pa.ChunkedArray): postcodes = postcodes.combine_chunks() return east, north, postcodes def get_oa_uprns_arrays( east: np.ndarray, north: np.ndarray, postcodes, offsets: dict[str, tuple[int, int]], oa_code: str, ) -> tuple[np.ndarray, list[str]]: """Like :func:`get_oa_uprns`, but slices the fork-shareable arrays from :func:`extract_uprn_arrays` (no polars), so it is safe to call in workers.""" s, e = offsets[oa_code] points = np.column_stack([east[s:e], north[s:e]]) return points, postcodes.slice(s, e - s).to_pylist()