This commit is contained in:
Andras Schmelczer 2026-05-06 22:40:46 +01:00
parent 28323f145e
commit 94f9c0d594
76 changed files with 3238 additions and 1230 deletions

View file

@ -2,12 +2,17 @@
import argparse
import io
import math
import re
import urllib.request
from dataclasses import dataclass
from pathlib import Path
import polars as pl
NAPTAN_CSV_URL = "https://naptan.api.dft.gov.uk/v1/access-nodes?dataFormat=csv"
TUBE_STATION_CATEGORY = "Tube station"
TUBE_STATION_MERGE_RADIUS_DEGREES = 0.01
STOP_TYPES = {
@ -25,6 +30,41 @@ STOP_TYPES = {
OUTPUT_COLUMNS = ["id", "name", "category", "lat", "lng"]
def canonical_station_name(name: str | None) -> str:
"""Normalize station names so entrances/transport-mode variants collapse."""
if not name:
return ""
normalized = name.lower()
normalized = re.sub(r"\([^)]*\)", " ", normalized)
normalized = re.sub(r"['`]", "", normalized)
normalized = normalized.replace("&", " and ")
normalized = re.sub(r"[^a-z0-9]+", " ", normalized)
words = normalized.split()
suffixes = (
("underground", "station"),
("tube", "station"),
("dlr", "station"),
("metro", "station"),
("tram", "stop"),
("rail", "station"),
("railway", "station"),
("station",),
("stop",),
)
while True:
suffix = next(
(suffix for suffix in suffixes if words[-len(suffix) :] == list(suffix)),
None,
)
if suffix is None:
break
del words[-len(suffix) :]
return " ".join(words)
def canonical_station_name_expr(name_col: str = "name") -> pl.Expr:
"""Normalize station names so entrances/transport-mode variants collapse."""
expr = pl.col(name_col).str.to_lowercase()
@ -45,67 +85,158 @@ def _has_locality() -> pl.Expr:
return pl.col("locality").is_not_null() & (pl.col("locality") != "")
def _deduplicate_tube_partition(
df: pl.DataFrame, group_cols: list[str]
) -> pl.DataFrame:
if len(df) == 0:
return pl.DataFrame(
{
"id": pl.Series([], dtype=pl.String),
"name": pl.Series([], dtype=pl.String),
"category": pl.Series([], dtype=pl.String),
"lat": pl.Series([], dtype=pl.Float64),
"lng": pl.Series([], dtype=pl.Float64),
}
)
name_len = pl.col("name").str.len_chars()
return (
df.group_by(group_cols)
.agg(
pl.col("id").sort_by(name_len).first(),
pl.col("name").sort_by(name_len).first(),
pl.col("category").first(),
pl.col("lat").mean(),
pl.col("lng").mean(),
)
.select(OUTPUT_COLUMNS)
def _empty_output_frame() -> pl.DataFrame:
return pl.DataFrame(
{
"id": pl.Series([], dtype=pl.String),
"name": pl.Series([], dtype=pl.String),
"category": pl.Series([], dtype=pl.String),
"lat": pl.Series([], dtype=pl.Float64),
"lng": pl.Series([], dtype=pl.Float64),
}
)
def station_name_score(name: str) -> tuple[int, int]:
lower = name.lower()
suffix_penalty = int(
lower.endswith(
(
" underground station",
" tube station",
" dlr station",
" metro station",
" tram stop",
" station",
" stop",
)
)
)
return (suffix_penalty, len(name))
@dataclass
class StationAccumulator:
id: str
name: str
category: str
lat_sum: float
lng_sum: float
count: int = 1
@property
def lat(self) -> float:
return self.lat_sum / self.count
@property
def lng(self) -> float:
return self.lng_sum / self.count
def same_area(self, lat: float, lng: float) -> bool:
dlat = self.lat - lat
dlng = (self.lng - lng) * math.cos(math.radians(self.lat))
return (dlat * dlat + dlng * dlng) <= TUBE_STATION_MERGE_RADIUS_DEGREES**2
def merge(self, row: dict[str, object]) -> None:
self.lat_sum += float(row["lat"])
self.lng_sum += float(row["lng"])
self.count += 1
name = str(row["name"] or "")
if station_name_score(name) < station_name_score(self.name):
self.id = str(row["id"] or "")
self.name = name
def _station_from_row(row: dict[str, object]) -> StationAccumulator:
return StationAccumulator(
id=str(row["id"] or ""),
name=str(row["name"] or ""),
category=str(row["category"] or ""),
lat_sum=float(row["lat"]),
lng_sum=float(row["lng"]),
)
def _deduplicate_tube_stations(df: pl.DataFrame) -> pl.DataFrame:
if len(df) == 0:
return _empty_output_frame()
selected: list[StationAccumulator] = []
groups: dict[str, list[int]] = {}
for row in df.iter_rows(named=True):
station_key = canonical_station_name(str(row["name"] or ""))
if not station_key:
selected.append(_station_from_row(row))
continue
existing = next(
(
index
for index in groups.get(station_key, [])
if selected[index].same_area(float(row["lat"]), float(row["lng"]))
),
None,
)
if existing is not None:
selected[existing].merge(row)
continue
index = len(selected)
selected.append(_station_from_row(row))
groups.setdefault(station_key, []).append(index)
return pl.DataFrame(
{
"id": [station.id for station in selected],
"name": [station.name for station in selected],
"category": [station.category for station in selected],
"lat": [station.lat for station in selected],
"lng": [station.lng for station in selected],
}
).select(OUTPUT_COLUMNS)
def _deduplicate_non_tube_stops(df: pl.DataFrame) -> pl.DataFrame:
if len(df) == 0:
return _empty_output_frame()
has_loc = df.filter(_has_locality())
no_loc = df.filter(~_has_locality())
# First pass: one record per exact stop name/category/locality.
frames = []
if len(has_loc) > 0:
frames.append(
has_loc.group_by("name", "category", "locality")
.agg(
pl.col("id").first(),
pl.col("lat").mean(),
pl.col("lng").mean(),
)
.select(OUTPUT_COLUMNS)
)
if len(no_loc) > 0:
frames.append(no_loc.select(OUTPUT_COLUMNS))
if not frames:
return _empty_output_frame()
return pl.concat(frames).select(OUTPUT_COLUMNS)
def deduplicate_naptan(df: pl.DataFrame) -> pl.DataFrame:
"""Deduplicate NaPTAN stops, with stricter station-level merging for Tube POIs."""
has_loc = df.filter(_has_locality())
no_loc = df.filter(~_has_locality())
cols_with_locality = [*OUTPUT_COLUMNS, "locality"]
"""Deduplicate NaPTAN stops, with station-level merging for Tube POIs."""
tube = df.filter(pl.col("category") == TUBE_STATION_CATEGORY)
other = df.filter(pl.col("category") != TUBE_STATION_CATEGORY)
# First pass: one record per exact stop name/category/locality.
deduped_has_loc = (
has_loc.group_by("name", "category", "locality")
.agg(
pl.col("id").first(),
pl.col("lat").mean(),
pl.col("lng").mean(),
)
.select(cols_with_locality)
)
df = pl.concat([deduped_has_loc, no_loc.select(cols_with_locality)])
tube = df.filter(pl.col("category") == "Tube station").with_columns(
canonical_station_name_expr().alias("_station_key")
)
other = df.filter(pl.col("category") != "Tube station")
tube_with_loc = tube.filter(_has_locality())
tube_no_loc = tube.filter(~_has_locality())
deduped_tube = pl.concat(
return pl.concat(
[
_deduplicate_tube_partition(tube_with_loc, ["_station_key", "locality"]),
_deduplicate_tube_partition(tube_no_loc, ["_station_key"]),
_deduplicate_non_tube_stops(other),
_deduplicate_tube_stations(tube),
]
)
return pl.concat([other.select(OUTPUT_COLUMNS), deduped_tube])
).select(OUTPUT_COLUMNS)
def download_naptan(output: Path) -> None:
@ -140,7 +271,7 @@ def download_naptan(output: Path) -> None:
print(
f"Deduplicated {before:,}{len(df):,} stops "
"(by name+category+locality; tube stations by normalized station name)"
"(by name+category+locality; tube stations by normalized name+area)"
)
df.write_parquet(output)

View file

@ -1,19 +1,24 @@
import polars as pl
import pytest
from pipeline.download.naptan import canonical_station_name_expr, deduplicate_naptan
from pipeline.download.naptan import (
canonical_station_name,
canonical_station_name_expr,
deduplicate_naptan,
)
def test_canonical_station_name_expr_normalizes_transport_suffixes():
names = [
"Bank",
"Bank Underground Station",
"Bank DLR Station",
"Pleasure Beach (Blackpool Tramway)",
"Earl's Court Tube Station",
]
df = pl.DataFrame(
{
"name": [
"Bank",
"Bank Underground Station",
"Bank DLR Station",
"Pleasure Beach (Blackpool Tramway)",
"Earl's Court Tube Station",
]
"name": names,
}
)
@ -26,30 +31,45 @@ def test_canonical_station_name_expr_normalizes_transport_suffixes():
"pleasure beach",
"earls court",
]
assert [canonical_station_name(name) for name in names] == result
def test_deduplicate_naptan_merges_tube_station_variants_by_locality():
def test_deduplicate_naptan_merges_tube_station_variants_by_area():
df = pl.DataFrame(
{
"id": ["bank", "bank-lu", "bank-dlr", "other-bank"],
"id": [
"bank",
"bank-lu",
"bank-dlr",
"other-bank",
"central-a",
"central-b",
],
"name": [
"Bank",
"Bank Underground Station",
"Bank DLR Station",
"Bank Underground Station",
"Central Tube Station",
"Central Tube Station",
],
"category": ["Tube station"] * 4,
"lat": [51.5129, 51.5134, 51.5132, 55.0140],
"lng": [-0.0889, -0.0890, -0.0885, -1.6781],
"locality": ["LOC1", "LOC1", "LOC1", "LOC2"],
"category": ["Tube station"] * 6,
"lat": [51.5129, 51.5134, 51.5132, 55.0140, 51.5, 53.0],
"lng": [-0.0889, -0.0890, -0.0885, -1.6781, -0.1, -2.0],
"locality": ["LOC1", "LOC1", "LOC2", "LOC1", None, None],
}
)
result = deduplicate_naptan(df).sort("lat")
assert len(result) == 2
assert result["name"].to_list() == ["Bank", "Bank Underground Station"]
assert result["lat"].to_list()[0] == pytest.approx(
assert len(result) == 4
assert result["name"].to_list() == [
"Central Tube Station",
"Bank",
"Central Tube Station",
"Bank Underground Station",
]
assert result.filter(pl.col("name") == "Bank")["lat"][0] == pytest.approx(
(51.5129 + 51.5134 + 51.5132) / 3
)

View file

@ -13,13 +13,12 @@ _AREA_COLUMNS = [
"lat",
"lon",
# Deprivation
"Income Score (rate)",
"Employment Score (rate)",
"Income Score",
"Employment Score",
"Education, Skills and Training Score",
"Health Deprivation and Disability Score",
"Living Environment Score",
"Indoors Sub-domain Score",
"Outdoors Sub-domain Score",
"Housing Conditions Score",
"Air Quality and Road Safety Score",
# Ethnicity
"% South Asian",
"% East Asian",
@ -144,7 +143,6 @@ def _build(
"Income Score (rate)",
"Employment Score (rate)",
"Health Deprivation and Disability Score",
"Living Environment Score",
"Indoors Sub-domain Score",
"Outdoors Sub-domain Score",
]
@ -319,6 +317,7 @@ def _build(
"Adult Skills Sub-domain Score",
"Children and Young People Sub-domain Score",
"Crime Score",
"Living Environment Score",
"Index of Multiple Deprivation (IMD) Score",
"Income Deprivation Affecting Older People (IDAOPI) Score (rate)",
"Income Deprivation Affecting Children Index (IDACI) Score (rate)",
@ -335,6 +334,10 @@ def _build(
"date_of_transfer": "Date of last transaction",
"construction_age_band": "Construction year",
"is_construction_date_approximate": "Is construction date approximate",
"Income Score (rate)": "Income Score",
"Employment Score (rate)": "Employment Score",
"Indoors Sub-domain Score": "Housing Conditions Score",
"Outdoors Sub-domain Score": "Air Quality and Road Safety Score",
"pp_address": "Address per Property Register",
"epc_address": "Address per EPC",
"postcode": "Postcode",

View file

@ -17,11 +17,14 @@ def test_transform_grocery_retail_points_outputs_chain_categories():
pois = transform_grocery_retail_points(raw)
assert pois.select("id", "name", "category", "group", "emoji").to_dicts() == [
assert pois.select(
"id", "name", "category", "icon_category", "group", "emoji"
).to_dicts() == [
{
"id": "glx-101",
"name": "Waitrose Test",
"category": "Waitrose",
"icon_category": "Waitrose",
"group": "Groceries",
"emoji": "🛒",
},
@ -29,6 +32,7 @@ def test_transform_grocery_retail_points_outputs_chain_categories():
"id": "glx-102",
"name": "Sainsbury's Test",
"category": "Sainsbury's",
"icon_category": "Sainsbury's Local",
"group": "Groceries",
"emoji": "🛒",
},
@ -36,12 +40,45 @@ def test_transform_grocery_retail_points_outputs_chain_categories():
"id": "glx-103",
"name": "Co-op Test",
"category": "Co-op",
"icon_category": "Co-op",
"group": "Groceries",
"emoji": "🛒",
},
]
def test_transform_grocery_retail_points_keeps_fascia_icon_category():
raw = pl.DataFrame(
{
"id": [101, 102, 103, 104],
"retailer": ["Tesco", "Iceland", "Waitrose", "Morrisons"],
"fascia": [
"Tesco Express Esso",
"The Food Warehouse",
"Little Waitrose Shell",
"Morrisons Daily",
],
"store_name": [
"Tesco Test Express",
"Iceland Test Food Warehouse",
"Little Waitrose Test",
"Morrisons Daily Test",
],
"long_wgs": [-0.141, -0.142, -0.143, -0.144],
"lat_wgs": [51.515, 51.516, 51.517, 51.518],
}
)
pois = transform_grocery_retail_points(raw)
assert pois.select("category", "icon_category").to_dicts() == [
{"category": "Tesco", "icon_category": "Tesco Express"},
{"category": "Iceland", "icon_category": "The Food Warehouse"},
{"category": "Waitrose", "icon_category": "Little Waitrose"},
{"category": "Morrisons", "icon_category": "Morrisons Daily"},
]
def test_transform_grocery_retail_points_drops_invalid_rows():
raw = pl.DataFrame(
{

View file

@ -1086,12 +1086,56 @@ GROCERY_RETAILER_DISPLAY_NAMES: dict[str, str] = {
}
GROCERY_FASCIA_ICON_NAMES: dict[str, str] = {
"Aldi Local": "Aldi",
"Asda Express": "Asda Express",
"Asda Living": "Asda Living",
"Asda PFS": "Asda PFS",
"Cooltrader": "Heron Foods",
"Cook": "COOK",
"Eurospar": "Spar",
"Eurospar PFS": "Spar",
"Heron": "Heron Foods",
"Little Waitrose": "Little Waitrose",
"Little Waitrose Shell": "Little Waitrose",
"Marks and Spencer": "M&S",
"Marks and Spencer BP": "M&S Food",
"Marks and Spencer Clothing": "M&S Clothing",
"Marks and Spencer Food To Go": "M&S Food",
"Marks and Spencer Food Outlet": "M&S Outlet",
"Marks and Spencer Foodhall": "M&S Food",
"Marks and Spencer Hospital": "M&S Hospital",
"Marks and Spencer MSA": "M&S MSA",
"Marks and Spencer Outlet": "M&S Outlet",
"Marks and Spencer Simply Food": "M&S Food",
"Marks and Spencer Travel SF": "M&S Food",
"Morrisons Daily": "Morrisons Daily",
"Morrisons Select": "Morrisons",
"Sainsburys": "Sainsbury's",
"Sainsburys Local": "Sainsbury's Local",
"Spar PFS": "Spar",
"Tesco Express": "Tesco Express",
"Tesco Express Esso": "Tesco Express",
"Tesco Extra": "Tesco Extra",
"The Co-operative Food": "Co-op",
"The Co-operative Food PFS": "Co-op",
"The Food Warehouse": "The Food Warehouse",
"Waitrose MSA": "Waitrose",
}
def normalize_grocery_retailer(retailer: str | None) -> str:
if retailer is None:
return ""
return GROCERY_RETAILER_DISPLAY_NAMES.get(retailer, retailer)
def normalize_grocery_icon_category(fascia: str | None, retailer: str | None) -> str:
if fascia:
return GROCERY_FASCIA_ICON_NAMES.get(fascia, normalize_grocery_retailer(fascia))
return normalize_grocery_retailer(retailer)
def transform_grocery_retail_points(
grocery_df: pl.DataFrame,
boundary_path: Path | None = None,
@ -1133,9 +1177,15 @@ def transform_grocery_retail_points(
pl.col("retailer")
.map_elements(normalize_grocery_retailer, return_dtype=pl.String)
.alias("category"),
pl.struct(["fascia", "retailer"])
.map_elements(
lambda row: normalize_grocery_icon_category(row["fascia"], row["retailer"]),
return_dtype=pl.String,
)
.alias("icon_category"),
pl.lit("Groceries").alias("group"),
pl.lit("🛒").alias("emoji"),
).select("id", "name", "category", "group", "lat", "lng", "emoji")
).select("id", "name", "category", "icon_category", "group", "lat", "lng", "emoji")
def transform(
@ -1189,6 +1239,7 @@ def transform(
lf = lf.with_columns(
pl.col("category").replace_strict(group_mapping).alias("group"),
pl.col("category").replace_strict(name_mapping).alias("category"),
pl.col("category").replace_strict(name_mapping).alias("icon_category"),
pl.col("category").replace_strict(emoji_mapping).alias("emoji"),
)
@ -1203,6 +1254,7 @@ def transform(
naptan = naptan_df.lazy().with_columns(
pl.col("category").replace_strict(NAPTAN_EMOJIS).alias("emoji"),
pl.lit("Public Transport").alias("group"),
pl.col("category").alias("icon_category"),
)
frames = [lf, naptan]