This commit is contained in:
Andras Schmelczer 2026-05-12 22:30:36 +01:00
parent 81a16f543c
commit 63713c3a2b
15 changed files with 492 additions and 159 deletions

View file

@ -1,7 +1,8 @@
"""Extract place=* nodes and railway stations from OSM PBF → data/places.parquet.
"""Extract places, stations, and universities → data/places.parquet.
Extracts named place nodes and railway stations (tube, national rail, DLR,
etc.) for typeahead search.
etc.) for typeahead search. Official English university providers from the
Office for Students register can also be added as travel-time destinations.
Reuses the same england-latest.osm.pbf as pois.py.
"""
@ -53,6 +54,19 @@ _STATION_STRIP = (
)
_DLR_CODE_RE = re.compile(r"ZZDL([A-Z0-9]{3})")
_POSTCODE_RE = re.compile(r"\b([A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2})\b", re.I)
_NOISY_PROVIDER_SUFFIXES = (
" higher education corporation",
" limited",
" ltd",
)
_LEGAL_NAME_FALLBACK_MARKERS = (
"the chancellor",
"chancellor, masters",
"chancellor masters",
)
def _is_dlr_station(tags: dict[str, str]) -> bool:
@ -124,6 +138,170 @@ def _station_name_score(name: str) -> tuple[int, int]:
return (suffix_penalty, len(name))
def _cell_text(value: object) -> str:
if value is None:
return ""
return str(value).strip()
def _header_key(value: object) -> str:
return re.sub(r"[^a-z0-9]+", " ", _cell_text(value).lower()).strip()
def _find_header_row(rows: list[tuple]) -> int:
for idx, row in enumerate(rows):
keys = [_header_key(value) for value in row]
has_legal_name = any(
all(token in key for token in ("provider", "legal", "name"))
for key in keys
)
has_university_title = any(
all(token in key for token in ("right", "use", "university"))
for key in keys
)
if has_legal_name and has_university_title:
return idx
raise ValueError("Could not find the OfS register header row")
def _find_column(headers: list[object], *tokens: str) -> int:
for idx, header in enumerate(headers):
key = _header_key(header)
if all(token in key for token in tokens):
return idx
raise ValueError(f"Could not find OfS register column containing {tokens}")
def _normalize_postcode(postcode: str) -> str:
return re.sub(r"[^A-Z0-9]", "", postcode.upper())
def _extract_postcode(address: str) -> str | None:
match = _POSTCODE_RE.search(address)
if match is None:
return None
return _normalize_postcode(match.group(1))
def _clean_provider_name(name: str) -> str:
name = re.sub(r"\s+", " ", name).strip(" ,")
if name.lower().endswith(", the"):
name = f"The {name[:-5].strip(' ,')}"
for suffix in _NOISY_PROVIDER_SUFFIXES:
if name.lower().endswith(suffix):
name = name[: -len(suffix)].strip(" ,")
break
if name.startswith("The ") and name != "The Open University":
name = name[4:].strip()
return name
def _split_trading_names(trading_names: str) -> list[str]:
if not trading_names or trading_names.casefold() == "not applicable":
return []
return [
_clean_provider_name(name)
for name in trading_names.splitlines()
if _clean_provider_name(name)
]
def _needs_trading_name(legal_name: str) -> bool:
lower = legal_name.lower()
return any(marker in lower for marker in _LEGAL_NAME_FALLBACK_MARKERS) or any(
lower.endswith(suffix) for suffix in _NOISY_PROVIDER_SUFFIXES
)
def _select_university_name(legal_name: str, trading_names: str) -> str:
legal = _clean_provider_name(legal_name)
trading = _split_trading_names(trading_names)
if _needs_trading_name(legal_name):
for name in trading:
if "university" in name.lower() or "imperial college" in name.lower():
return name
if trading:
return trading[0]
return legal
def _slugify_name(name: str) -> str:
slug = name.lower()
slug = re.sub(r"[^a-z0-9 -]", "", slug)
return re.sub(r"\s+", "-", slug).strip("-")
def _postcode_lookup(postcodes_path: Path) -> dict[str, tuple[float, float]]:
df = pl.read_parquet(
postcodes_path,
columns=["pcds", "lat", "long", "ctry25cd", "doterm"],
).filter((pl.col("ctry25cd") == "E92000001") & pl.col("doterm").is_null())
return {
_normalize_postcode(postcode): (float(lat), float(lon))
for postcode, lat, lon in df.select(["pcds", "lat", "long"]).iter_rows()
}
def _ofs_universities(
raw: pl.DataFrame, postcode_coords: dict[str, tuple[float, float]]
) -> tuple[list[dict], int]:
rows = raw.rows()
header_idx = _find_header_row(rows)
headers = list(rows[header_idx])
legal_idx = _find_column(headers, "provider", "legal", "name")
trading_idx = _find_column(headers, "trading", "name")
address_idx = _find_column(headers, "contact", "address")
university_title_idx = _find_column(headers, "right", "use", "university")
universities: list[dict] = []
skipped = 0
for row in rows[header_idx + 1 :]:
if _cell_text(row[university_title_idx]).casefold() != "yes":
continue
name = _select_university_name(
_cell_text(row[legal_idx]), _cell_text(row[trading_idx])
)
postcode = _extract_postcode(_cell_text(row[address_idx]))
coords = postcode_coords.get(postcode or "")
if not name or coords is None:
skipped += 1
continue
lat, lon = coords
universities.append(
{
"name": name,
"place_type": "university",
"lat": lat,
"lon": lon,
"population": 0,
"travel_destination": True,
}
)
return universities, skipped
def _append_ofs_universities(
places: list[dict], register_path: Path, postcodes_path: Path
) -> tuple[int, int]:
postcode_coords = _postcode_lookup(postcodes_path)
raw = pl.read_excel(register_path, has_header=False)
universities, skipped = _ofs_universities(raw, postcode_coords)
existing_slugs = {_slugify_name(str(place["name"])) for place in places}
added = 0
for university in universities:
slug = _slugify_name(university["name"])
if slug in existing_slugs:
continue
places.append(university)
existing_slugs.add(slug)
added += 1
return added, skipped
def _naptan_dlr_stations(naptan_path: Path) -> list[dict]:
"""Extract station-level DLR destinations from NaPTAN access nodes."""
df = pl.read_parquet(naptan_path)
@ -293,6 +471,16 @@ def main() -> None:
type=Path,
help="Optional NaPTAN parquet file used to add DLR station destinations",
)
parser.add_argument(
"--university-register",
type=Path,
help="Optional OfS register spreadsheet used to add university destinations",
)
parser.add_argument(
"--postcodes",
type=Path,
help="Postcode parquet used to geocode OfS university contact postcodes",
)
args = parser.parse_args()
pbf_file = args.pbf
@ -313,6 +501,17 @@ def main() -> None:
if args.naptan:
added = _append_naptan_dlr_stations(handler.places, args.naptan)
print(f"Added {added:,} DLR station destinations from NaPTAN")
if args.university_register:
if not args.postcodes:
raise ValueError("--postcodes is required with --university-register")
added, skipped = _append_ofs_universities(
handler.places, args.university_register, args.postcodes
)
print(
f"Added {added:,} university travel destinations from the OfS register"
)
if skipped:
print(f"Skipped {skipped:,} OfS university rows without usable coordinates")
if handler.places:
df = pl.DataFrame(handler.places)