perfect-postcode/pipeline/download/transit_network.py
2026-02-22 22:36:46 +00:00

696 lines
26 KiB
Python

"""Download and prepare transit network data for R5 routing.
Downloads:
- England OSM PBF from Geofabrik (~1.5GB)
- BODS GTFS from Bus Open Data Service (~1.5GB, all England bus/tram/ferry)
- TfL TransXChange timetables → converted to GTFS
- National Rail CIF timetable → converted to GTFS (requires credentials)
Then processes for R5 compatibility:
- Cleans BODS GTFS (fixes stop_times >72h, feed_info year >2100)
- Converts TfL TransXChange to GTFS via transxchange2gtfs
- Converts National Rail CIF to GTFS via dtd2mysql (requires MariaDB Docker)
Requires: osmium-tool, Node.js (npx), Docker (for national rail)
Output directory: property-data/transit/
raw/england.osm.pbf + bods_gtfs.zip + tfl_gtfs.zip + national_rail_gtfs.zip
"""
import argparse
import json
import os
import subprocess
import tempfile
import time
import urllib.parse
import urllib.request
import zipfile
from pathlib import Path
from tqdm import tqdm
ENGLAND_PBF_URL = (
"https://download.geofabrik.de/europe/united-kingdom/england-latest.osm.pbf"
)
# Bus Open Data Service — pre-converted GTFS covering all England bus/tram/ferry
BODS_GTFS_URL = "https://data.bus-data.dft.gov.uk/timetable/download/gtfs-file/all/"
# TfL TransXChange timetables (tube, DLR, tram, buses, river bus, cable car)
TFL_TRANSXCHANGE_URL = (
"https://tfl.gov.uk/cdn/static/cms/documents/journey-planner-timetables.zip"
)
# NaPTAN stops data — needed by transxchange2gtfs (its built-in URL is broken)
NAPTAN_URL = "https://naptan.api.dft.gov.uk/v1/access-nodes?dataFormat=csv"
# National Rail Open Data API
NR_AUTH_URL = "https://opendata.nationalrail.co.uk/authenticate"
NR_TIMETABLE_URL = "https://opendata.nationalrail.co.uk/api/staticfeeds/3.0/timetable"
USER_AGENT = "property-map-pipeline/1.0 (https://github.com)"
def _download_http(url: str, dest: Path, *, desc: str, headers: dict | None = None) -> None:
"""Stream-download a URL to a file with progress bar."""
dest.parent.mkdir(parents=True, exist_ok=True)
tmp = dest.with_suffix(dest.suffix + ".tmp")
req_headers = {"User-Agent": USER_AGENT}
if headers:
req_headers.update(headers)
req = urllib.request.Request(url, headers=req_headers)
with (
tqdm(unit="B", unit_scale=True, desc=desc) as bar,
urllib.request.urlopen(req) as resp,
open(tmp, "wb") as f,
):
length = resp.headers.get("Content-Length")
if length:
bar.total = int(length)
while chunk := resp.read(1 << 20):
f.write(chunk)
bar.update(len(chunk))
tmp.rename(dest)
print(f" Saved to {dest}")
def download_osm_pbf(output_dir: Path) -> Path:
"""Download England OSM PBF extract from Geofabrik."""
dest = output_dir / "england.osm.pbf"
if dest.exists():
print(f"OSM PBF already exists: {dest}")
return dest
print("Downloading England OSM PBF (~1.5 GB)...")
_download_http(ENGLAND_PBF_URL, dest, desc="england.osm.pbf")
return dest
def download_bods_gtfs(output_dir: Path) -> Path:
"""Download BODS GTFS (all England bus/tram/ferry timetables)."""
dest = output_dir / "bods_gtfs_raw.zip"
if dest.exists():
print(f"BODS GTFS already exists: {dest}")
return dest
print("Downloading BODS GTFS (~1.5 GB)...")
_download_http(BODS_GTFS_URL, dest, desc="bods_gtfs_raw.zip")
return dest
def clean_gtfs(src: Path, dst: Path) -> None:
"""Fix R5-incompatible entries in GTFS.
- Removes stop_times with arrival/departure hour > 72
- Caps feed_info end_date year to 2099
"""
if dst.exists():
print(f"Cleaned GTFS already exists: {dst}")
return
print("Cleaning GTFS for R5 compatibility...")
with zipfile.ZipFile(src, "r") as zin, zipfile.ZipFile(
dst, "w", zipfile.ZIP_DEFLATED
) as zout:
for info in zin.infolist():
if info.filename == "stop_times.txt":
dropped = 0
with zin.open(info) as f:
header = f.readline()
header_str = header.decode("utf-8").strip()
cols = header_str.split(",")
arr_idx = cols.index("arrival_time") if "arrival_time" in cols else -1
dep_idx = (
cols.index("departure_time") if "departure_time" in cols else -1
)
tmp = tempfile.NamedTemporaryFile(
mode="wb", delete=False, suffix=".txt"
)
tmp.write(header)
for line in f:
line_str = line.decode("utf-8", errors="replace").strip()
if not line_str:
continue
parts = line_str.split(",")
skip = False
for idx in [arr_idx, dep_idx]:
if 0 <= idx < len(parts):
time_val = parts[idx].strip('"')
if ":" in time_val:
try:
hour = int(time_val.split(":")[0])
if hour > 72:
skip = True
break
except ValueError:
pass
if skip:
dropped += 1
else:
tmp.write(line)
tmp.close()
print(f" stop_times: dropped {dropped} rows with hours > 72")
zout.write(tmp.name, "stop_times.txt")
os.unlink(tmp.name)
elif info.filename == "feed_info.txt":
data = zin.read(info).decode("utf-8")
lines = data.strip().split("\n")
header_line = lines[0]
feed_cols = header_line.split(",")
fixed_lines = [header_line]
for line in lines[1:]:
parts = line.split(",")
for i, col_name in enumerate(feed_cols):
if "end_date" in col_name.lower() and i < len(parts):
date_val = parts[i].strip('"')
if len(date_val) == 8:
year = int(date_val[:4])
if year > 2100:
parts[i] = "20991231"
print(f" feed_info: capped end_date {date_val} → 20991231")
fixed_lines.append(",".join(parts))
zout.writestr("feed_info.txt", "\n".join(fixed_lines) + "\n")
else:
zout.writestr(info, zin.read(info))
print(f" Saved to {dst}")
def download_tfl_transxchange(raw_dir: Path) -> Path:
"""Download TfL TransXChange timetable bundle."""
dest = raw_dir / "tfl_transxchange.zip"
if dest.exists():
print(f"TfL TransXChange already exists: {dest}")
return dest
print("Downloading TfL TransXChange timetables...")
_download_http(TFL_TRANSXCHANGE_URL, dest, desc="tfl_transxchange.zip")
return dest
def download_naptan() -> None:
"""Download NaPTAN stops to /tmp/Stops.csv (needed by transxchange2gtfs)."""
dest = Path("/tmp/Stops.csv")
if dest.exists():
print(f"NaPTAN Stops.csv already exists: {dest}")
return
print("Downloading NaPTAN stops data...")
_download_http(NAPTAN_URL, dest, desc="Stops.csv")
def convert_tfl_to_gtfs(raw_dir: Path, output_dir: Path) -> Path:
"""Convert TfL TransXChange to GTFS using transxchange2gtfs."""
dest = output_dir / "tfl_gtfs.zip"
if dest.exists():
print(f"TfL GTFS already exists: {dest}")
return dest
txc_path = raw_dir / "tfl_transxchange.zip"
# Ensure NaPTAN is available (transxchange2gtfs has a broken download URL)
download_naptan()
print("Converting TfL TransXChange → GTFS...")
subprocess.run(
["npx", "--yes", "transxchange2gtfs", str(txc_path), str(dest)],
check=True,
)
size_mb = dest.stat().st_size / (1024 * 1024)
print(f" Saved to {dest} ({size_mb:.1f} MB)")
return dest
def download_national_rail_cif(raw_dir: Path) -> Path | None:
"""Download National Rail CIF timetable (requires credentials)."""
dest = raw_dir / "national_rail_cif.zip"
if dest.exists():
print(f"National Rail CIF already exists: {dest}")
return dest
email = os.environ.get("NATIONAL_RAIL_EMAIL")
password = os.environ.get("NATIONAL_RAIL_PASSWORD")
if not email or not password:
print("Warning: NATIONAL_RAIL_EMAIL/NATIONAL_RAIL_PASSWORD not set, skipping national rail")
return None
print("Authenticating with National Rail Open Data...")
auth_data = urllib.parse.urlencode({"username": email, "password": password}).encode()
auth_req = urllib.request.Request(
NR_AUTH_URL,
data=auth_data,
headers={"User-Agent": USER_AGENT, "Content-Type": "application/x-www-form-urlencoded"},
)
with urllib.request.urlopen(auth_req) as resp:
token_data = json.loads(resp.read())
token = token_data["token"]
print(" Authenticated successfully")
print("Downloading National Rail CIF timetable...")
_download_http(
NR_TIMETABLE_URL,
dest,
desc="national_rail_cif.zip",
headers={"X-Auth-Token": token},
)
return dest
def clean_national_rail_gtfs(src: Path, dst: Path) -> None:
"""Fix R5-incompatible entries in dtd2mysql-generated National Rail GTFS.
Fixes:
- Interior pass-through stops (pickup_type=1, drop_off_type=1) → normal stops.
R5 builds TripPatterns from the full stop sequence but may build shorter
TripSchedules when stops are non-boarding, causing ArrayIndexOutOfBoundsException.
- Removes stop_times referencing stops not in stops.txt.
- Removes trips with backwards travel times.
- Converts route_type=714 (rail replacement bus) to 3 (bus) for R5 compatibility.
- Removes non-standard links.txt file.
- Renumbers stop_sequence to 0-based (R5/BODS convention).
- Fixes bogus coordinates (lat < 0) on Irish CIE stations.
"""
if dst.exists():
print(f"Cleaned National Rail GTFS already exists: {dst}")
return
print("Cleaning National Rail GTFS for R5 compatibility...")
# First pass: collect valid stop IDs and find bad trips
stop_ids: set[str] = set()
bad_trip_ids: set[str] = set()
with zipfile.ZipFile(src, "r") as zin:
# Load valid stop IDs
with zin.open("stops.txt") as f:
header = f.readline().decode("utf-8").strip()
stop_id_idx = header.split(",").index("stop_id")
lat_idx = header.split(",").index("stop_lat")
for line in f:
parts = line.decode("utf-8", errors="replace").strip().split(",")
if parts:
stop_ids.add(parts[stop_id_idx])
# Find trips with backwards travel times
with zin.open("stop_times.txt") as f:
st_header = f.readline().decode("utf-8").strip()
st_cols = st_header.split(",")
trip_id_idx = st_cols.index("trip_id")
dep_idx = st_cols.index("departure_time")
prev_trip = ""
prev_dep_secs = -1
for line in f:
parts = line.decode("utf-8", errors="replace").strip().split(",")
if not parts:
continue
trip_id = parts[trip_id_idx].strip('"')
if trip_id != prev_trip:
prev_trip = trip_id
prev_dep_secs = -1
dep_str = parts[dep_idx].strip('"')
if ":" in dep_str:
try:
h, m, s = dep_str.split(":")
dep_secs = int(h) * 3600 + int(m) * 60 + int(s)
if dep_secs < prev_dep_secs:
bad_trip_ids.add(trip_id)
prev_dep_secs = dep_secs
except ValueError:
pass
print(f" Found {len(bad_trip_ids)} trips with backwards travel times")
# Second pass: write cleaned zip
passthrough_fixed = 0
orphan_stops_removed = 0
bad_trips_removed = 0
seqs_renumbered = 0
coords_fixed = 0
route_types_fixed = 0
with zipfile.ZipFile(src, "r") as zin, zipfile.ZipFile(
dst, "w", zipfile.ZIP_DEFLATED
) as zout:
for info in zin.infolist():
# Skip non-standard links.txt
if info.filename == "links.txt":
continue
if info.filename == "stop_times.txt":
with zin.open(info) as f:
header = f.readline()
header_str = header.decode("utf-8").strip()
cols = header_str.split(",")
trip_id_idx = cols.index("trip_id")
stop_id_idx = cols.index("stop_id")
seq_idx = cols.index("stop_sequence")
pickup_idx = cols.index("pickup_type") if "pickup_type" in cols else -1
dropoff_idx = cols.index("drop_off_type") if "drop_off_type" in cols else -1
tmp = tempfile.NamedTemporaryFile(
mode="wb", delete=False, suffix=".txt"
)
tmp.write(header)
prev_trip = ""
seq_counter = 0
for line in f:
line_str = line.decode("utf-8", errors="replace").strip()
if not line_str:
continue
parts = line_str.split(",")
trip_id = parts[trip_id_idx].strip('"')
stop_id = parts[stop_id_idx].strip('"')
# Skip trips with backwards times
if trip_id in bad_trip_ids:
bad_trips_removed += 1
continue
# Skip stop_times referencing missing stops
if stop_id not in stop_ids:
orphan_stops_removed += 1
continue
# Fix pass-through stops: set pickup/dropoff to 0 (normal)
if pickup_idx >= 0 and dropoff_idx >= 0:
pickup = parts[pickup_idx].strip('"')
dropoff = parts[dropoff_idx].strip('"')
if pickup == "1" and dropoff == "1":
parts[pickup_idx] = "0"
parts[dropoff_idx] = "0"
passthrough_fixed += 1
# Renumber stop_sequence to 0-based
if trip_id != prev_trip:
prev_trip = trip_id
seq_counter = 0
else:
seq_counter += 1
old_seq = parts[seq_idx].strip('"')
parts[seq_idx] = str(seq_counter)
if old_seq != str(seq_counter):
seqs_renumbered += 1
tmp.write((",".join(parts) + "\n").encode("utf-8"))
tmp.close()
zout.write(tmp.name, "stop_times.txt")
os.unlink(tmp.name)
elif info.filename == "stops.txt":
with zin.open(info) as f:
header = f.readline()
header_str = header.decode("utf-8").strip()
cols = header_str.split(",")
lat_idx = cols.index("stop_lat")
lon_idx = cols.index("stop_lon")
tmp = tempfile.NamedTemporaryFile(
mode="wb", delete=False, suffix=".txt"
)
tmp.write(header)
for line in f:
line_str = line.decode("utf-8", errors="replace").strip()
if not line_str:
continue
parts = line_str.split(",")
try:
lat = float(parts[lat_idx])
# Fix bogus Irish CIE coordinates (South Atlantic)
if lat < 0:
# Set to a neutral UK coordinate that won't be routed to
parts[lat_idx] = "54.0"
parts[lon_idx] = "-2.0"
coords_fixed += 1
except ValueError:
pass
tmp.write((",".join(parts) + "\n").encode("utf-8"))
tmp.close()
zout.write(tmp.name, "stops.txt")
os.unlink(tmp.name)
elif info.filename == "routes.txt":
with zin.open(info) as f:
header = f.readline()
header_str = header.decode("utf-8").strip()
cols = header_str.split(",")
rt_idx = cols.index("route_type")
tmp = tempfile.NamedTemporaryFile(
mode="wb", delete=False, suffix=".txt"
)
tmp.write(header)
for line in f:
line_str = line.decode("utf-8", errors="replace").strip()
if not line_str:
continue
parts = line_str.split(",")
if parts[rt_idx].strip('"') == "714":
parts[rt_idx] = "3"
route_types_fixed += 1
tmp.write((",".join(parts) + "\n").encode("utf-8"))
tmp.close()
zout.write(tmp.name, "routes.txt")
os.unlink(tmp.name)
elif info.filename == "trips.txt":
# Remove trips that have backwards travel times
with zin.open(info) as f:
header = f.readline()
header_str = header.decode("utf-8").strip()
cols = header_str.split(",")
trip_id_idx = cols.index("trip_id")
tmp = tempfile.NamedTemporaryFile(
mode="wb", delete=False, suffix=".txt"
)
tmp.write(header)
for line in f:
line_str = line.decode("utf-8", errors="replace").strip()
if not line_str:
continue
parts = line_str.split(",")
if parts[trip_id_idx].strip('"') not in bad_trip_ids:
tmp.write(line)
tmp.close()
zout.write(tmp.name, "trips.txt")
os.unlink(tmp.name)
elif info.filename == "calendar.txt":
# Cap end_date year to 2099
with zin.open(info) as f:
header = f.readline()
header_str = header.decode("utf-8").strip()
cols = header_str.split(",")
end_idx = cols.index("end_date")
tmp = tempfile.NamedTemporaryFile(
mode="wb", delete=False, suffix=".txt"
)
tmp.write(header)
for line in f:
line_str = line.decode("utf-8", errors="replace").strip()
if not line_str:
continue
parts = line_str.split(",")
date_val = parts[end_idx].strip('"')
if len(date_val) == 8:
try:
year = int(date_val[:4])
if year > 2099:
parts[end_idx] = "20991231"
except ValueError:
pass
tmp.write((",".join(parts) + "\n").encode("utf-8"))
tmp.close()
zout.write(tmp.name, "calendar.txt")
os.unlink(tmp.name)
else:
zout.writestr(info, zin.read(info))
print(f" Pass-through stops fixed: {passthrough_fixed}")
print(f" Orphan stop references removed: {orphan_stops_removed}")
print(f" Bad trip stop_times removed: {bad_trips_removed}")
print(f" Stop sequences renumbered: {seqs_renumbered}")
print(f" Bogus coordinates fixed: {coords_fixed}")
print(f" Route types 714→3 fixed: {route_types_fixed}")
print(f" Saved to {dst}")
def _docker_run_dtd2mysql(
network: str, db_container: str, volumes: list[str], args: list[str]
) -> None:
"""Run dtd2mysql in a Node.js container on the same Docker network as MariaDB."""
cmd = [
"docker", "run", "--rm", "--network", network,
"-e", f"DATABASE_HOSTNAME={db_container}",
"-e", "DATABASE_USERNAME=root",
"-e", "DATABASE_PASSWORD=root",
"-e", "DATABASE_NAME=dtd",
]
for v in volumes:
cmd.extend(["-v", v])
# Install zip (needed for --gtfs-zip) then run dtd2mysql
inner = "apt-get update -qq && apt-get install -y -qq zip > /dev/null 2>&1 && npx --yes dtd2mysql " + " ".join(args)
cmd.extend(["node:20", "bash", "-c", inner])
subprocess.run(cmd, check=True)
def convert_national_rail_to_gtfs(raw_dir: Path, output_dir: Path) -> Path:
"""Convert National Rail CIF to GTFS using dtd2mysql + MariaDB Docker.
Runs both MariaDB and dtd2mysql as Docker containers on a shared network,
since Docker port forwarding is not available in all environments.
Then cleans the output for R5 compatibility.
"""
dest = output_dir / "national_rail_gtfs.zip"
if dest.exists():
print(f"National Rail GTFS already exists: {dest}")
return dest
raw_dest = raw_dir / "national_rail_gtfs_raw.zip"
if not raw_dest.exists():
db_container = "propertymap-mariadb-temp"
network = "propertymap-dtd-net"
print("Creating Docker network and starting MariaDB...")
subprocess.run(["docker", "network", "create", network], capture_output=True)
subprocess.run(
[
"docker", "run", "-d",
"--name", db_container,
"--network", network,
"-e", "MARIADB_ROOT_PASSWORD=root",
"-e", "MARIADB_DATABASE=dtd",
"mariadb:latest",
],
check=True,
)
try:
# Wait for MariaDB to be ready
print(" Waiting for MariaDB to be ready...")
for attempt in range(30):
result = subprocess.run(
["docker", "exec", db_container, "mariadb", "-uroot", "-proot", "-e", "SELECT 1"],
capture_output=True,
)
if result.returncode == 0:
break
time.sleep(2)
else:
raise RuntimeError("MariaDB did not become ready in time")
raw_abs = str(raw_dir.resolve())
print("Importing CIF timetable into MariaDB...")
_docker_run_dtd2mysql(
network, db_container,
volumes=[f"{raw_abs}:/data:ro"],
args=["--timetable", "/data/national_rail_cif.zip"],
)
print("Exporting GTFS from MariaDB...")
_docker_run_dtd2mysql(
network, db_container,
volumes=[f"{raw_abs}:/output"],
args=["--gtfs-zip", "/output/national_rail_gtfs_raw.zip"],
)
finally:
print("Cleaning up Docker resources...")
subprocess.run(["docker", "stop", db_container], capture_output=True)
subprocess.run(["docker", "rm", db_container], capture_output=True)
subprocess.run(["docker", "network", "rm", network], capture_output=True)
# Clean the raw GTFS for R5 compatibility
clean_national_rail_gtfs(raw_dest, dest)
return dest
def main() -> None:
parser = argparse.ArgumentParser(
description="Download and prepare transit network data for R5 routing engine"
)
parser.add_argument(
"--output",
type=Path,
required=True,
help="Output directory for transit data",
)
parser.add_argument(
"--skip-tfl",
action="store_true",
help="Skip TfL TransXChange download and conversion",
)
parser.add_argument(
"--skip-national-rail",
action="store_true",
help="Skip National Rail CIF download and conversion",
)
args = parser.parse_args()
output_dir: Path = args.output
raw_dir = output_dir / "raw"
raw_dir.mkdir(parents=True, exist_ok=True)
# 1. Download and clean BODS GTFS
download_osm_pbf(raw_dir)
bods_raw = download_bods_gtfs(raw_dir)
bods_clean = output_dir / "bods_gtfs.zip"
clean_gtfs(bods_raw, bods_clean)
# 2. TfL TransXChange → GTFS
if args.skip_tfl:
print("Skipping TfL (--skip-tfl)")
else:
download_tfl_transxchange(raw_dir)
convert_tfl_to_gtfs(raw_dir, output_dir)
# 3. National Rail CIF → GTFS
if args.skip_national_rail:
print("Skipping National Rail (--skip-national-rail)")
else:
cif = download_national_rail_cif(raw_dir)
if cif is not None:
convert_national_rail_to_gtfs(raw_dir, output_dir)
# Summary
print()
print("Transit data ready for R5:")
for f in sorted(output_dir.iterdir()):
if f.is_dir() or f.name.startswith("."):
continue
size_mb = f.stat().st_size / (1024 * 1024)
print(f" {f.name}: {size_mb:.1f} MB")
print()
print("IMPORTANT: If you previously built a network from London-only data,")
print("delete the stale cache before running R5:")
print(" rm -f property-data/r5-network/network.dat")
if __name__ == "__main__":
main()