From bd0dd34b6e2d987faddb5daccde8715b55f25e39 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Tue, 27 Jan 2026 22:06:45 +0000 Subject: [PATCH] Improve journey time fetching --- pipeline/journey_times/__init__.py | 2 - pipeline/journey_times/__main__.py | 7 +- pipeline/journey_times/models.py | 6 +- pipeline/journey_times/rate_limiter.py | 6 +- pipeline/journey_times/results.py | 6 +- pipeline/journey_times/tfl_client.py | 113 ++++++++++++------ .../processors/journey_times_aggregator.py | 79 ++++++++++++ 7 files changed, 165 insertions(+), 54 deletions(-) create mode 100644 pipeline/processors/journey_times_aggregator.py diff --git a/pipeline/journey_times/__init__.py b/pipeline/journey_times/__init__.py index e6cf5ed..678959a 100644 --- a/pipeline/journey_times/__init__.py +++ b/pipeline/journey_times/__init__.py @@ -14,7 +14,6 @@ from .results import results_to_dataframe, save_results from .tfl_client import fetch_journey_times __all__ = [ - # Config "DATA_DIR", "OUTPUT_DIR", "MAX_DELAY", @@ -24,7 +23,6 @@ __all__ = [ "DESTINATIONS", "Destination", "JourneyResult", - "load_all_postcodes", "fetch_journey_times", "results_to_dataframe", "save_results", diff --git a/pipeline/journey_times/__main__.py b/pipeline/journey_times/__main__.py index a201ba1..47a8a79 100644 --- a/pipeline/journey_times/__main__.py +++ b/pipeline/journey_times/__main__.py @@ -10,7 +10,6 @@ from .results import results_to_dataframe, save_results from .tfl_client import fetch_journey_times - def main(): destination = DESTINATIONS["bank"] @@ -18,7 +17,7 @@ def main(): today = date.today() days_until_monday = (7 - today.weekday()) % 7 or 7 journey_date = today + timedelta(days=days_until_monday) - journey_time = "0800" + journey_time = "0845" print(f"Destination: {destination.name}") print( @@ -26,7 +25,7 @@ def main(): f"at {journey_time[:2]}:{journey_time[2:]}" ) - postcodes_df = pl.read_parquet(OUTPUT_DIR / "postcodes_h3.parquet") + postcodes_df = pl.read_parquet(OUTPUT_DIR / "postcodes_h3.parquet") print(f"Loaded {postcodes_df.height:,} postcodes") postcode_data = list( @@ -67,7 +66,7 @@ def main(): pl.lit(f"{journey_time[:2]}:{journey_time[2:]}").alias("journey_time"), ) - successful = results_df.filter(pl.col("fastest_minutes").is_not_null()).height + successful = results_df.filter(pl.col("cycling").is_not_null()).height print(f"Completed: {successful}/{len(results)} successful") parquet_path = save_results(results_df, destination.name) diff --git a/pipeline/journey_times/models.py b/pipeline/journey_times/models.py index ca33e3f..9261357 100644 --- a/pipeline/journey_times/models.py +++ b/pipeline/journey_times/models.py @@ -24,9 +24,7 @@ class JourneyResult: """Result of a journey time calculation for a postcode.""" postcode: str - walking_minutes: int | None = None + public_transport_easy_minutes: int | None = None cycling_minutes: int | None = None - public_transport_minutes: int | None = None - fastest_minutes: int | None = None - fastest_mode: str | None = None + public_transport_quick_minutes: int | None = None error: str | None = None diff --git a/pipeline/journey_times/rate_limiter.py b/pipeline/journey_times/rate_limiter.py index 98a333a..0f8103c 100644 --- a/pipeline/journey_times/rate_limiter.py +++ b/pipeline/journey_times/rate_limiter.py @@ -17,10 +17,12 @@ class RateLimiter: """Wait until we can make a request within rate limits.""" async with self._lock: now = asyncio.get_event_loop().time() - cutoff = now - 60.0 + cutoff = now - 10.0 # 10 seconds self.request_times = [t for t in self.request_times if t > cutoff] - if len(self.request_times) >= REQUESTS_PER_MIN: + if ( + len(self.request_times) >= REQUESTS_PER_MIN // 6 + ): # we look at it every 10 seconds instead of minutes wait_time = self.request_times[0] - cutoff if wait_time > 0: warnings.warn( diff --git a/pipeline/journey_times/results.py b/pipeline/journey_times/results.py index cef02cb..4d4d54f 100644 --- a/pipeline/journey_times/results.py +++ b/pipeline/journey_times/results.py @@ -11,11 +11,9 @@ def results_to_dataframe(results: list[JourneyResult]) -> pl.DataFrame: [ { "postcode": r.postcode, - "walking_minutes": r.walking_minutes, + "public_transport_easy_minutes": r.public_transport_easy_minutes, + "public_transport_quick_minutes": r.public_transport_quick_minutes, "cycling_minutes": r.cycling_minutes, - "public_transport_minutes": r.public_transport_minutes, - "fastest_minutes": r.fastest_minutes, - "fastest_mode": r.fastest_mode, "error": r.error, } for r in results diff --git a/pipeline/journey_times/tfl_client.py b/pipeline/journey_times/tfl_client.py index b976e42..94ee88e 100644 --- a/pipeline/journey_times/tfl_client.py +++ b/pipeline/journey_times/tfl_client.py @@ -1,10 +1,10 @@ -"""TfL API client for fetching journey times.""" - import asyncio +from typing import Literal import warnings from collections.abc import Callable from http import HTTPStatus +from httpx import Timeout from journey_client import Client from journey_client.api.journey import ( journey_journey_results_by_path_from_path_to_query_via_query_national_search_query_date_qu as journey_api, @@ -12,7 +12,16 @@ from journey_client.api.journey import ( from journey_client.models import ( JourneyJourneyResultsByPathFromPathToQueryViaQueryNationalSearchQueryDateQuTimeIs as TimeIs, ) -from journey_client.types import UNSET, Unset +from journey_client.models import ( + JourneyJourneyResultsByPathFromPathToQueryViaQueryNationalSearchQueryDateQuJourneyPreference as JourneyPreference, +) +from journey_client.models import ( + JourneyJourneyResultsByPathFromPathToQueryViaQueryNationalSearchQueryDateQuCyclePreference as CyclePreference, +) +from journey_client.models import ( + JourneyJourneyResultsByPathFromPathToQueryViaQueryNationalSearchQueryDateQuBikeProficiency as BikeProficiency, +) +from journey_client.types import Unset from .config import MAX_DELAY from .models import Destination, JourneyResult @@ -24,25 +33,66 @@ async def fetch_journey_for_mode( rate_limiter: RateLimiter, from_location: str, to_location: str, - mode: list[str] | Unset, journey_date: str, journey_time: str, + journey_type: Literal["quick"] | Literal["easy"] | Literal["cycle"], retry_count: int = 5, ) -> int | None: """Fetch journey time for a specific mode with rate limiting.""" - mode_name = ",".join(mode) if not isinstance(mode, Unset) else "public_transport" backoff = 1.0 for attempt in range(retry_count): try: await rate_limiter.acquire() + cycle_preference = { + "quick": CyclePreference.TAKEONTRANSPORT, + "easy": CyclePreference.NONE, + "cycle": CyclePreference.ALLTHEWAY, + }[journey_type] + + # options: public-bus,overground,train,tube,coach,dlr,cablecar,tram,river,walking,cycle + mode = { + "quick": [ + "public-bus", + "overground", + "train", + "tube", + "coach", + "dlr", + "cablecar", + "tram", + "river", + "walking", + "cycle", + ], + "easy": [ + "public-bus", + "overground", + "train", + "tube", + "coach", + "dlr", + "cablecar", + "tram", + "river", + ], + "cycle": ["cycle"], + }[journey_type] + response = await journey_api.asyncio_detailed( from_=from_location, to=to_location, client=client, date=journey_date, time=journey_time, - time_is=TimeIs.DEPARTING, + national_search=True, + time_is=TimeIs.ARRIVING, + journey_preference=JourneyPreference.LEASTINTERCHANGE + if journey_type == "easy" + else JourneyPreference.LEASTINTERCHANGE, + cycle_preference=cycle_preference, + bike_proficiency=BikeProficiency.FAST, + walking_optimization=journey_type == "quick", mode=mode, ) @@ -65,7 +115,7 @@ async def fetch_journey_for_mode( HTTPStatus.GATEWAY_TIMEOUT, ): warnings.warn( - f"HTTP {response.status_code.value} for {mode_name} from {from_location}, " + f"HTTP {response.status_code.value} for {journey_type} from {from_location}, " f"retrying in {backoff:.1f}s (attempt {attempt + 1}/{retry_count})", stacklevel=2, ) @@ -76,7 +126,7 @@ async def fetch_journey_for_mode( return None except Exception as e: warnings.warn( - f"Network error for {mode_name} from {from_location}: {e}, " + f"Network error for {journey_type} from {from_location}: {e}, " f"retrying in {backoff:.1f}s (attempt {attempt + 1}/{retry_count})", stacklevel=2, ) @@ -84,7 +134,7 @@ async def fetch_journey_for_mode( backoff = min(backoff * 2, MAX_DELAY) continue warnings.warn( - f"Failed to fetch {mode_name} from {from_location} after {retry_count} attempts", + f"Failed to fetch {journey_type} from {from_location} after {retry_count} attempts", stacklevel=2, ) return None @@ -106,55 +156,42 @@ async def fetch_all_modes( try: from_location = f"{lat},{lon}" - walking = await fetch_journey_for_mode( + easy = await fetch_journey_for_mode( client, rate_limiter, from_location, to_location, - ["walking"], journey_date, journey_time, + journey_type="easy", + ) + quick = await fetch_journey_for_mode( + client, + rate_limiter, + from_location, + to_location, + journey_date, + journey_time, + journey_type="quick", ) cycling = await fetch_journey_for_mode( client, rate_limiter, from_location, to_location, - ["cycle"], journey_date, journey_time, + journey_type="cycle", ) - public = await fetch_journey_for_mode( - client, - rate_limiter, - from_location, - to_location, - UNSET, - journey_date, - journey_time, - ) - - options = [ - ("walking", walking), - ("cycling", cycling), - ("public_transport", public), - ] - valid_options = [(mode, time) for mode, time in options if time is not None] - - if valid_options: - fastest_mode, fastest_time = min(valid_options, key=lambda x: x[1]) - else: - fastest_mode, fastest_time = None, None return JourneyResult( postcode=postcode, - walking_minutes=walking, + public_transport_easy_minutes=easy, + public_transport_quick_minutes=quick, cycling_minutes=cycling, - public_transport_minutes=public, - fastest_minutes=fastest_time, - fastest_mode=fastest_mode, ) except Exception as e: + print(f"Error: {e}") return JourneyResult(postcode=postcode, error=str(e)) @@ -183,7 +220,7 @@ async def fetch_journey_times( to_location = dest.to_tfl_location() rate_limiter = RateLimiter() - client = Client(base_url="https://api.tfl.gov.uk", timeout=30.0) + client = Client(base_url="https://api.tfl.gov.uk").with_timeout(Timeout(30)) async with client as client: tasks = [ fetch_all_modes( diff --git a/pipeline/processors/journey_times_aggregator.py b/pipeline/processors/journey_times_aggregator.py new file mode 100644 index 0000000..ffb6b6f --- /dev/null +++ b/pipeline/processors/journey_times_aggregator.py @@ -0,0 +1,79 @@ +"""Aggregate journey times data by H3 hexagonal cells.""" + +from pathlib import Path + +import polars as pl + +from pipeline.config import AGGREGATES_DIR, H3_RESOLUTIONS, PROCESSED_DIR + + +def aggregate_journey_times( + journey_times_path: Path | None = None, + postcodes_h3_path: Path | None = None, + output_dir: Path | None = None, +) -> list[Path]: + """ + Aggregate journey times by H3 cells at all resolutions. + + Joins journey_times_bank.parquet with postcodes_h3.parquet on postcode, + then groups by H3 cell to compute median journey time. + """ + journey_times_path = journey_times_path or PROCESSED_DIR / "journey_times_bank.parquet" + postcodes_h3_path = postcodes_h3_path or PROCESSED_DIR / "postcodes_h3.parquet" + output_dir = output_dir or AGGREGATES_DIR + + output_dir.mkdir(parents=True, exist_ok=True) + + # Load journey times data + journey_df = pl.read_parquet(journey_times_path).select( + ["postcode", "public_transport_minutes"] + ) + + # Filter out null journey times + journey_df = journey_df.filter(pl.col("public_transport_minutes").is_not_null()) + + if journey_df.height == 0: + print("No valid journey times found") + return [] + + # Load postcodes with H3 indices + postcodes_df = pl.read_parquet(postcodes_h3_path) + + # Join on postcode to get H3 indices + joined_df = journey_df.join(postcodes_df, on="postcode", how="inner") + + if joined_df.height == 0: + print("No matching postcodes found") + return [] + + print(f"Joined {joined_df.height} postcodes with journey times") + + saved_paths = [] + + for resolution in H3_RESOLUTIONS: + h3_col = f"h3_res{resolution}" + + if h3_col not in joined_df.columns: + print(f"Skipping resolution {resolution} - column {h3_col} not found") + continue + + # Aggregate by H3 cell - compute median journey time + agg_df = ( + joined_df.group_by(h3_col) + .agg( + pl.col("public_transport_minutes").median().alias("median_journey_minutes"), + pl.col("public_transport_minutes").count().alias("journey_count"), + ) + .rename({h3_col: "h3"}) + ) + + output_path = output_dir / f"journey_times_res{resolution}.parquet" + agg_df.write_parquet(output_path) + saved_paths.append(output_path) + print(f"Saved {agg_df.height} cells to {output_path}") + + return saved_paths + + +if __name__ == "__main__": + aggregate_journey_times()