"""Extract places, stations, and universities → data/places.parquet. Extracts named place nodes and railway stations (tube, national rail, DLR, etc.) for typeahead search. Official English university providers from the Office for Students register can also be added as travel-time destinations. Reuses the same england-latest.osm.pbf as pois.py. """ import argparse import re from pathlib import Path import osmium import polars as pl from shapely.geometry import Point from tqdm import tqdm from pipeline.utils.england_geometry import ( ENGLAND_BBOX_EAST, ENGLAND_BBOX_NORTH, ENGLAND_BBOX_SOUTH, ENGLAND_BBOX_WEST, load_england_polygon, ) # Search can use a wider set of OSM place nodes, but travel-time destinations # must remain restricted to the historical city/station origin set. SEARCH_PLACE_TYPES = { "city", "town", "village", "suburb", "neighbourhood", "quarter", "borough", "locality", "hamlet", "isolated_dwelling", "island", } TRAVEL_DESTINATION_PLACE_TYPES = {"city"} # Suffixes to strip from raw station names before appending the typed suffix. _STATION_STRIP = ( " tube station", " underground station", " railway station", " dlr station", " station dlr", " dlr", " overground station", " tram stop", " station", ) _DLR_CODE_RE = re.compile(r"ZZDL([A-Z0-9]{3})") _POSTCODE_RE = re.compile(r"\b([A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2})\b", re.I) _NOISY_PROVIDER_SUFFIXES = ( " higher education corporation", " limited", " ltd", ) _LEGAL_NAME_FALLBACK_MARKERS = ( "the chancellor", "chancellor, masters", "chancellor masters", ) def _is_dlr_station(tags: dict[str, str]) -> bool: name = tags.get("name", "").lower() network = tags.get("network", "").lower() operator = tags.get("operator", "").lower() return ( "docklands" in network or "dlr" in network or "docklands" in operator or "dlr" in operator or name.endswith(" dlr") or " dlr " in name ) def _is_tram_station(tags: dict[str, str]) -> bool: if _is_dlr_station(tags): return False station_tag = tags.get("station", "") network = tags.get("network", "").lower() return station_tag == "light_rail" or "tramlink" in network or "tram" in network def _station_display_name(name: str, tags: dict[str, str]) -> str: """Build a descriptive station name like 'Bank tube station'.""" station_tag = tags.get("station", "") network = tags.get("network", "").lower() if station_tag == "subway" or "underground" in network: suffix = "tube station" elif "docklands" in network or "dlr" in network: suffix = "DLR station" elif "overground" in network: suffix = "overground station" elif "elizabeth" in network: suffix = "Elizabeth line station" elif station_tag == "light_rail" or "tramlink" in network or "tram" in network: suffix = "tram stop" else: suffix = "railway station" # Strip any existing station suffix from the raw name lower = name.lower() for s in _STATION_STRIP: if lower.endswith(s): name = name[: len(name) - len(s)].rstrip() break return f"{name} {suffix}" def _station_name_score(name: str) -> tuple[int, int]: lower = name.lower() suffix_penalty = int( lower.endswith( ( " underground station", " tube station", " dlr station", " railway station", " rail station", " station dlr", " station", ) ) or lower.endswith(" dlr") ) return (suffix_penalty, len(name)) def _cell_text(value: object) -> str: if value is None: return "" return str(value).strip() def _header_key(value: object) -> str: return re.sub(r"[^a-z0-9]+", " ", _cell_text(value).lower()).strip() def _find_header_row(rows: list[tuple]) -> int: for idx, row in enumerate(rows): keys = [_header_key(value) for value in row] has_legal_name = any( all(token in key for token in ("provider", "legal", "name")) for key in keys ) has_university_title = any( all(token in key for token in ("right", "use", "university")) for key in keys ) if has_legal_name and has_university_title: return idx raise ValueError("Could not find the OfS register header row") def _find_column(headers: list[object], *tokens: str) -> int: for idx, header in enumerate(headers): key = _header_key(header) if all(token in key for token in tokens): return idx raise ValueError(f"Could not find OfS register column containing {tokens}") def _normalize_postcode(postcode: str) -> str: return re.sub(r"[^A-Z0-9]", "", postcode.upper()) def _extract_postcode(address: str) -> str | None: match = _POSTCODE_RE.search(address) if match is None: return None return _normalize_postcode(match.group(1)) def _clean_provider_name(name: str) -> str: name = re.sub(r"\s+", " ", name).strip(" ,") if name.lower().endswith(", the"): name = f"The {name[:-5].strip(' ,')}" for suffix in _NOISY_PROVIDER_SUFFIXES: if name.lower().endswith(suffix): name = name[: -len(suffix)].strip(" ,") break if name.startswith("The ") and name != "The Open University": name = name[4:].strip() return name def _split_trading_names(trading_names: str) -> list[str]: if not trading_names or trading_names.casefold() == "not applicable": return [] return [ _clean_provider_name(name) for name in trading_names.splitlines() if _clean_provider_name(name) ] def _needs_trading_name(legal_name: str) -> bool: lower = legal_name.lower() return any(marker in lower for marker in _LEGAL_NAME_FALLBACK_MARKERS) or any( lower.endswith(suffix) for suffix in _NOISY_PROVIDER_SUFFIXES ) def _select_university_name(legal_name: str, trading_names: str) -> str: legal = _clean_provider_name(legal_name) trading = _split_trading_names(trading_names) if _needs_trading_name(legal_name): for name in trading: if "university" in name.lower() or "imperial college" in name.lower(): return name if trading: return trading[0] return legal def _slugify_name(name: str) -> str: slug = name.lower() slug = re.sub(r"[^a-z0-9 -]", "", slug) return re.sub(r"\s+", "-", slug).strip("-") def _postcode_lookup(postcodes_path: Path) -> dict[str, tuple[float, float]]: df = pl.read_parquet( postcodes_path, columns=["pcds", "lat", "long", "ctry25cd", "doterm"], ).filter((pl.col("ctry25cd") == "E92000001") & pl.col("doterm").is_null()) return { _normalize_postcode(postcode): (float(lat), float(lon)) for postcode, lat, lon in df.select(["pcds", "lat", "long"]).iter_rows() } def _ofs_universities( raw: pl.DataFrame, postcode_coords: dict[str, tuple[float, float]] ) -> tuple[list[dict], int]: rows = raw.rows() header_idx = _find_header_row(rows) headers = list(rows[header_idx]) legal_idx = _find_column(headers, "provider", "legal", "name") trading_idx = _find_column(headers, "trading", "name") address_idx = _find_column(headers, "contact", "address") university_title_idx = _find_column(headers, "right", "use", "university") universities: list[dict] = [] skipped = 0 for row in rows[header_idx + 1 :]: if _cell_text(row[university_title_idx]).casefold() != "yes": continue name = _select_university_name( _cell_text(row[legal_idx]), _cell_text(row[trading_idx]) ) postcode = _extract_postcode(_cell_text(row[address_idx])) coords = postcode_coords.get(postcode or "") if not name or coords is None: skipped += 1 continue lat, lon = coords universities.append( { "name": name, "place_type": "university", "lat": lat, "lon": lon, "population": 0, "travel_destination": True, } ) return universities, skipped def _append_ofs_universities( places: list[dict], register_path: Path, postcodes_path: Path ) -> tuple[int, int]: postcode_coords = _postcode_lookup(postcodes_path) raw = pl.read_excel(register_path, has_header=False) universities, skipped = _ofs_universities(raw, postcode_coords) existing_slugs = {_slugify_name(str(place["name"])) for place in places} added = 0 for university in universities: slug = _slugify_name(university["name"]) if slug in existing_slugs: continue places.append(university) existing_slugs.add(slug) added += 1 return added, skipped def _naptan_dlr_stations(naptan_path: Path) -> list[dict]: """Extract station-level DLR destinations from NaPTAN access nodes.""" df = pl.read_parquet(naptan_path) required = {"id", "name", "category", "lat", "lng"} missing = required - set(df.columns) if missing: raise ValueError(f"NaPTAN file is missing columns: {sorted(missing)}") rows: dict[str, dict] = {} for row in df.iter_rows(named=True): atco_id = str(row["id"] or "") match = _DLR_CODE_RE.search(atco_id) if not match: continue if row["category"] not in {"Tube station", "Rail station"}: continue code = match.group(1) raw_name = str(row["name"] or "") if not raw_name: continue lat = float(row["lat"]) lon = float(row["lng"]) current = rows.get(code) if current is None: rows[code] = { "raw_name": raw_name, "lat_sum": lat, "lon_sum": lon, "count": 1, } continue current["lat_sum"] += lat current["lon_sum"] += lon current["count"] += 1 if _station_name_score(raw_name) < _station_name_score(current["raw_name"]): current["raw_name"] = raw_name stations = [] for station in rows.values(): count = station["count"] display_name = _station_display_name(station["raw_name"], {"network": "DLR"}) stations.append( { "name": display_name, "place_type": "station", "lat": station["lat_sum"] / count, "lon": station["lon_sum"] / count, "population": 0, "travel_destination": True, } ) return sorted(stations, key=lambda station: station["name"]) def _append_naptan_dlr_stations(places: list[dict], naptan_path: Path) -> int: existing_names = {str(place["name"]).casefold() for place in places} added = 0 for station in _naptan_dlr_stations(naptan_path): key = station["name"].casefold() if key in existing_names: continue places.append(station) existing_names.add(key) added += 1 return added class PlaceHandler(osmium.SimpleHandler): def __init__(self, progress: tqdm, england_polygon) -> None: super().__init__() self._progress = progress self.places: list[dict] = [] self._england = england_polygon def _add( self, name: str, place_type: str, lat: float, lon: float, population: int, travel_destination: bool, ) -> None: self.places.append( { "name": name, "place_type": place_type, "lat": lat, "lon": lon, "population": population, "travel_destination": travel_destination, } ) self._progress.set_postfix(places=f"{len(self.places):,}", refresh=False) def node(self, n: osmium.osm.Node) -> None: self._progress.update(1) if not n.location.valid: return lat, lon = n.location.lat, n.location.lon if not ( ENGLAND_BBOX_SOUTH <= lat <= ENGLAND_BBOX_NORTH and ENGLAND_BBOX_WEST <= lon <= ENGLAND_BBOX_EAST ): return if not self._england.contains(Point(lon, lat)): return name = n.tags.get("name:en", n.tags.get("name", "")) if not name: return pop_str = n.tags.get("population", "") try: population = int(pop_str) except ValueError: population = 0 # place=* nodes place_type = n.tags.get("place") if place_type in SEARCH_PLACE_TYPES: self._add( name, place_type, lat, lon, population, travel_destination=place_type in TRAVEL_DESTINATION_PLACE_TYPES, ) return # Railway stations (tube, national rail, DLR, overground, Elizabeth line) if n.tags.get("railway") == "station": tags = dict(n.tags) if _is_tram_station(tags): return display_name = _station_display_name(name, tags) self._add( display_name, "station", lat, lon, population, travel_destination=True, ) return def main() -> None: parser = argparse.ArgumentParser(description="Extract place names from OSM PBF") parser.add_argument( "--output", type=Path, required=True, help="Output parquet file path" ) parser.add_argument("--pbf", type=Path, required=True, help="Path to OSM PBF file") parser.add_argument( "--boundary", type=Path, required=True, help="England boundary GeoJSON file", ) parser.add_argument( "--naptan", type=Path, help="Optional NaPTAN parquet file used to add DLR station destinations", ) parser.add_argument( "--university-register", type=Path, help="Optional OfS register spreadsheet used to add university destinations", ) parser.add_argument( "--postcodes", type=Path, help="Postcode parquet used to geocode OfS university contact postcodes", ) args = parser.parse_args() pbf_file = args.pbf england_polygon = load_england_polygon(args.boundary) print("Extracting search place nodes + railway stations") with tqdm( unit=" elements", unit_scale=True, desc="Streaming", smoothing=0.05, mininterval=1.0, ) as progress: handler = PlaceHandler(progress, england_polygon) handler.apply_file(str(pbf_file), locations=True) print(f"Extracted {len(handler.places):,} place nodes") if args.naptan: added = _append_naptan_dlr_stations(handler.places, args.naptan) print(f"Added {added:,} DLR station destinations from NaPTAN") if args.university_register: if not args.postcodes: raise ValueError("--postcodes is required with --university-register") added, skipped = _append_ofs_universities( handler.places, args.university_register, args.postcodes ) print( f"Added {added:,} university travel destinations from the OfS register" ) if skipped: print(f"Skipped {skipped:,} OfS university rows without usable coordinates") if handler.places: df = pl.DataFrame(handler.places) args.output.parent.mkdir(parents=True, exist_ok=True) df.write_parquet(args.output) print(f"Saved to {args.output}") else: print("No places found — skipping output") if __name__ == "__main__": main()