From 31bfc76b58fefdf5d3610ab68654b0e65bd914a3 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Tue, 27 Jan 2026 21:34:09 +0000 Subject: [PATCH] Add initial version of TFL isochrones --- README.md | 1 + pipeline/journey_times/__init__.py | 31 ++++ pipeline/journey_times/__main__.py | 78 +++++++++ pipeline/journey_times/config.py | 24 +++ pipeline/journey_times/data.py | 0 pipeline/journey_times/models.py | 32 ++++ pipeline/journey_times/rate_limiter.py | 33 ++++ pipeline/journey_times/results.py | 38 +++++ pipeline/journey_times/tfl_client.py | 211 +++++++++++++++++++++++++ 9 files changed, 448 insertions(+) create mode 100644 pipeline/journey_times/__init__.py create mode 100644 pipeline/journey_times/__main__.py create mode 100644 pipeline/journey_times/config.py create mode 100644 pipeline/journey_times/data.py create mode 100644 pipeline/journey_times/models.py create mode 100644 pipeline/journey_times/rate_limiter.py create mode 100644 pipeline/journey_times/results.py create mode 100644 pipeline/journey_times/tfl_client.py diff --git a/README.md b/README.md index 7755e41..1b5f622 100644 --- a/README.md +++ b/README.md @@ -69,3 +69,4 @@ Nice to haves? - [Local Autheority (Upper Tier)](https://communitiesopendata-communities.hub.arcgis.com/datasets/6e8edb2974da4834bbafa09644a5b02d_0/explore?location=52.684195%2C-2.489482%2C7.17) - [Open Geography](https://geoportal.statistics.gov.uk/) - [CommunitiesOpenData](https://communitiesopendata-communities.hub.arcgis.com/) +- [PlanetOSM](https://planet.openstreetmap.org/) for open street map POI diff --git a/pipeline/journey_times/__init__.py b/pipeline/journey_times/__init__.py new file mode 100644 index 0000000..e6cf5ed --- /dev/null +++ b/pipeline/journey_times/__init__.py @@ -0,0 +1,31 @@ +"""Journey times calculation module for TfL transit data.""" + +from .config import ( + DATA_DIR, + DESTINATIONS, + MAX_CONCURRENT, + MAX_DELAY, + MAX_POSTCODES, + OUTPUT_DIR, + REQUESTS_PER_MIN, +) +from .models import Destination, JourneyResult +from .results import results_to_dataframe, save_results +from .tfl_client import fetch_journey_times + +__all__ = [ + # Config + "DATA_DIR", + "OUTPUT_DIR", + "MAX_DELAY", + "REQUESTS_PER_MIN", + "MAX_POSTCODES", + "MAX_CONCURRENT", + "DESTINATIONS", + "Destination", + "JourneyResult", + "load_all_postcodes", + "fetch_journey_times", + "results_to_dataframe", + "save_results", +] diff --git a/pipeline/journey_times/__main__.py b/pipeline/journey_times/__main__.py new file mode 100644 index 0000000..a201ba1 --- /dev/null +++ b/pipeline/journey_times/__main__.py @@ -0,0 +1,78 @@ +import asyncio +import random +from datetime import date, timedelta + +import polars as pl +from tqdm import tqdm + +from .config import DESTINATIONS, MAX_CONCURRENT, MAX_POSTCODES, OUTPUT_DIR +from .results import results_to_dataframe, save_results +from .tfl_client import fetch_journey_times + + + +def main(): + destination = DESTINATIONS["bank"] + + # Calculate next Monday at 8am + today = date.today() + days_until_monday = (7 - today.weekday()) % 7 or 7 + journey_date = today + timedelta(days=days_until_monday) + journey_time = "0800" + + print(f"Destination: {destination.name}") + print( + f"Journey: {journey_date.strftime('%A %Y-%m-%d')} " + f"at {journey_time[:2]}:{journey_time[2:]}" + ) + + postcodes_df = pl.read_parquet(OUTPUT_DIR / "postcodes_h3.parquet") + print(f"Loaded {postcodes_df.height:,} postcodes") + + postcode_data = list( + zip( + postcodes_df["postcode"].to_list(), + postcodes_df["lat"].to_list(), + postcodes_df["long"].to_list(), + ) + ) + + if MAX_POSTCODES is not None and len(postcode_data) > MAX_POSTCODES: + postcode_data = random.sample(postcode_data, MAX_POSTCODES) + print(f"Randomly sampled {MAX_POSTCODES} postcodes") + + with tqdm(total=len(postcode_data), desc="Fetching journeys") as pbar: + results = asyncio.run( + fetch_journey_times( + postcode_data, + destination, + journey_date.strftime("%Y%m%d"), + journey_time, + MAX_CONCURRENT, + progress_callback=lambda _: pbar.update(1), + ) + ) + + results_df = results_to_dataframe(results) + + postcodes_processed = [pc for pc, _, _ in postcode_data] + coords_df = postcodes_df.filter( + pl.col("postcode").is_in(postcodes_processed) + ).select(["postcode", "lat", "long"]) + results_df = coords_df.join(results_df, on="postcode", how="left") + + results_df = results_df.with_columns( + pl.lit(destination.name).alias("destination"), + pl.lit(journey_date.strftime("%Y-%m-%d")).alias("journey_date"), + pl.lit(f"{journey_time[:2]}:{journey_time[2:]}").alias("journey_time"), + ) + + successful = results_df.filter(pl.col("fastest_minutes").is_not_null()).height + print(f"Completed: {successful}/{len(results)} successful") + + parquet_path = save_results(results_df, destination.name) + print(f"Saved to {parquet_path}") + + +if __name__ == "__main__": + main() diff --git a/pipeline/journey_times/config.py b/pipeline/journey_times/config.py new file mode 100644 index 0000000..3b54517 --- /dev/null +++ b/pipeline/journey_times/config.py @@ -0,0 +1,24 @@ +"""Configuration constants for journey times processing.""" + +from pathlib import Path + +from .models import Destination + +DATA_DIR = Path("./data_sources") +OUTPUT_DIR = DATA_DIR / "processed" + +MAX_DELAY = 10 +REQUESTS_PER_MIN = 50 +MAX_POSTCODES = 100 # Set to None to process all postcodes +MAX_CONCURRENT = 5 + +DESTINATIONS = { + "bank": Destination(51.5133, -0.0886, "Bank", "940GZZLUBNK"), + "waterloo": Destination(51.5031, -0.1132, "Waterloo", "940GZZLUWLO"), + "kings-cross": Destination(51.5308, -0.1238, "King's Cross", "940GZZLUKSX"), + "liverpool-street": Destination( + 51.5178, -0.0823, "Liverpool Street", "940GZZLULVS" + ), + "paddington": Destination(51.5154, -0.1755, "Paddington", "940GZZLUPAC"), + "victoria": Destination(51.4965, -0.1447, "Victoria", "940GZZLUVIC"), +} diff --git a/pipeline/journey_times/data.py b/pipeline/journey_times/data.py new file mode 100644 index 0000000..e69de29 diff --git a/pipeline/journey_times/models.py b/pipeline/journey_times/models.py new file mode 100644 index 0000000..ca33e3f --- /dev/null +++ b/pipeline/journey_times/models.py @@ -0,0 +1,32 @@ +"""Data models for journey times processing.""" + +from dataclasses import dataclass + + +@dataclass +class Destination: + """A destination point for journey planning.""" + + lat: float + lon: float + name: str + naptan_id: str | None = None + + def to_tfl_location(self) -> str: + """Convert to TfL API location string.""" + if self.naptan_id: + return self.naptan_id + return f"{self.lat},{self.lon}" + + +@dataclass +class JourneyResult: + """Result of a journey time calculation for a postcode.""" + + postcode: str + walking_minutes: int | None = None + cycling_minutes: int | None = None + public_transport_minutes: int | None = None + fastest_minutes: int | None = None + fastest_mode: str | None = None + error: str | None = None diff --git a/pipeline/journey_times/rate_limiter.py b/pipeline/journey_times/rate_limiter.py new file mode 100644 index 0000000..98a333a --- /dev/null +++ b/pipeline/journey_times/rate_limiter.py @@ -0,0 +1,33 @@ +"""Rate limiting for TfL API requests.""" + +import asyncio +import warnings + +from .config import REQUESTS_PER_MIN + + +class RateLimiter: + """Rate limiter enforcing max requests per minute.""" + + def __init__(self): + self.request_times: list[float] = [] + self._lock = asyncio.Lock() + + async def acquire(self): + """Wait until we can make a request within rate limits.""" + async with self._lock: + now = asyncio.get_event_loop().time() + cutoff = now - 60.0 + self.request_times = [t for t in self.request_times if t > cutoff] + + if len(self.request_times) >= REQUESTS_PER_MIN: + wait_time = self.request_times[0] - cutoff + if wait_time > 0: + warnings.warn( + f"Rate limit reached ({REQUESTS_PER_MIN}/min), " + f"waiting {wait_time:.1f}s", + stacklevel=2, + ) + await asyncio.sleep(wait_time) + + self.request_times.append(asyncio.get_event_loop().time()) diff --git a/pipeline/journey_times/results.py b/pipeline/journey_times/results.py new file mode 100644 index 0000000..cef02cb --- /dev/null +++ b/pipeline/journey_times/results.py @@ -0,0 +1,38 @@ +from pathlib import Path + +import polars as pl + +from .config import OUTPUT_DIR +from .models import JourneyResult + + +def results_to_dataframe(results: list[JourneyResult]) -> pl.DataFrame: + return pl.DataFrame( + [ + { + "postcode": r.postcode, + "walking_minutes": r.walking_minutes, + "cycling_minutes": r.cycling_minutes, + "public_transport_minutes": r.public_transport_minutes, + "fastest_minutes": r.fastest_minutes, + "fastest_mode": r.fastest_mode, + "error": r.error, + } + for r in results + ] + ) + + +def save_results( + results: pl.DataFrame, + destination_name: str, + output_dir: Path | None = None, +) -> Path: + if output_dir is None: + output_dir = OUTPUT_DIR + + safe_name = destination_name.lower().replace(" ", "-") + parquet_path = output_dir / f"journey_times_{safe_name}.parquet" + results.write_parquet(parquet_path) + + return parquet_path diff --git a/pipeline/journey_times/tfl_client.py b/pipeline/journey_times/tfl_client.py new file mode 100644 index 0000000..b976e42 --- /dev/null +++ b/pipeline/journey_times/tfl_client.py @@ -0,0 +1,211 @@ +"""TfL API client for fetching journey times.""" + +import asyncio +import warnings +from collections.abc import Callable +from http import HTTPStatus + +from journey_client import Client +from journey_client.api.journey import ( + journey_journey_results_by_path_from_path_to_query_via_query_national_search_query_date_qu as journey_api, +) +from journey_client.models import ( + JourneyJourneyResultsByPathFromPathToQueryViaQueryNationalSearchQueryDateQuTimeIs as TimeIs, +) +from journey_client.types import UNSET, Unset + +from .config import MAX_DELAY +from .models import Destination, JourneyResult +from .rate_limiter import RateLimiter + + +async def fetch_journey_for_mode( + client: Client, + rate_limiter: RateLimiter, + from_location: str, + to_location: str, + mode: list[str] | Unset, + journey_date: str, + journey_time: str, + retry_count: int = 5, +) -> int | None: + """Fetch journey time for a specific mode with rate limiting.""" + mode_name = ",".join(mode) if not isinstance(mode, Unset) else "public_transport" + backoff = 1.0 + for attempt in range(retry_count): + try: + await rate_limiter.acquire() + + response = await journey_api.asyncio_detailed( + from_=from_location, + to=to_location, + client=client, + date=journey_date, + time=journey_time, + time_is=TimeIs.DEPARTING, + mode=mode, + ) + + if response.status_code == HTTPStatus.OK and response.parsed: + journeys = response.parsed.journeys + if not isinstance(journeys, Unset) and journeys: + durations = [ + j.duration + for j in journeys + if not isinstance(j.duration, Unset) + ] + if durations: + return min(durations) + return None + elif response.status_code in ( + HTTPStatus.TOO_MANY_REQUESTS, + HTTPStatus.INTERNAL_SERVER_ERROR, + HTTPStatus.BAD_GATEWAY, + HTTPStatus.SERVICE_UNAVAILABLE, + HTTPStatus.GATEWAY_TIMEOUT, + ): + warnings.warn( + f"HTTP {response.status_code.value} for {mode_name} from {from_location}, " + f"retrying in {backoff:.1f}s (attempt {attempt + 1}/{retry_count})", + stacklevel=2, + ) + await asyncio.sleep(backoff) + backoff = min(backoff * 2, MAX_DELAY) + continue + else: + return None + except Exception as e: + warnings.warn( + f"Network error for {mode_name} from {from_location}: {e}, " + f"retrying in {backoff:.1f}s (attempt {attempt + 1}/{retry_count})", + stacklevel=2, + ) + await asyncio.sleep(backoff) + backoff = min(backoff * 2, MAX_DELAY) + continue + warnings.warn( + f"Failed to fetch {mode_name} from {from_location} after {retry_count} attempts", + stacklevel=2, + ) + return None + + +async def fetch_all_modes( + client: Client, + rate_limiter: RateLimiter, + postcode: str, + lat: float, + lon: float, + to_location: str, + journey_date: str, + journey_time: str, + semaphore: asyncio.Semaphore, +) -> JourneyResult: + """Fetch journey times for all transport modes using coordinates.""" + async with semaphore: + try: + from_location = f"{lat},{lon}" + + walking = await fetch_journey_for_mode( + client, + rate_limiter, + from_location, + to_location, + ["walking"], + journey_date, + journey_time, + ) + cycling = await fetch_journey_for_mode( + client, + rate_limiter, + from_location, + to_location, + ["cycle"], + journey_date, + journey_time, + ) + public = await fetch_journey_for_mode( + client, + rate_limiter, + from_location, + to_location, + UNSET, + journey_date, + journey_time, + ) + + options = [ + ("walking", walking), + ("cycling", cycling), + ("public_transport", public), + ] + valid_options = [(mode, time) for mode, time in options if time is not None] + + if valid_options: + fastest_mode, fastest_time = min(valid_options, key=lambda x: x[1]) + else: + fastest_mode, fastest_time = None, None + + return JourneyResult( + postcode=postcode, + walking_minutes=walking, + cycling_minutes=cycling, + public_transport_minutes=public, + fastest_minutes=fastest_time, + fastest_mode=fastest_mode, + ) + except Exception as e: + return JourneyResult(postcode=postcode, error=str(e)) + + +async def fetch_journey_times( + postcode_data: list[tuple[str, float, float]], + dest: Destination, + journey_date: str, + journey_time: str, + max_concurrent: int = 2, + progress_callback: Callable[[JourneyResult], None] | None = None, +) -> list[JourneyResult]: + """Fetch journey times for all postcodes with rate limiting. + + Args: + postcode_data: List of (postcode, lat, lon) tuples + dest: Destination for journey planning + journey_date: Date in YYYYMMDD format + journey_time: Time in HHMM format + max_concurrent: Maximum concurrent API requests + progress_callback: Optional callback called with each result + + Returns: + List of JourneyResult objects in the same order as postcode_data + """ + semaphore = asyncio.Semaphore(max_concurrent) + to_location = dest.to_tfl_location() + rate_limiter = RateLimiter() + + client = Client(base_url="https://api.tfl.gov.uk", timeout=30.0) + async with client as client: + tasks = [ + fetch_all_modes( + client, + rate_limiter, + pc, + lat, + lon, + to_location, + journey_date, + journey_time, + semaphore, + ) + for pc, lat, lon in postcode_data + ] + + results = [] + for coro in asyncio.as_completed(tasks): + result = await coro + results.append(result) + if progress_callback: + progress_callback(result) + + postcode_to_result = {r.postcode: r for r in results} + return [postcode_to_result[pc] for pc, _, _ in postcode_data]