Improve journey time fetching

This commit is contained in:
Andras Schmelczer 2026-01-27 22:06:45 +00:00
parent 31bfc76b58
commit bd0dd34b6e
7 changed files with 165 additions and 54 deletions

View file

@ -14,7 +14,6 @@ from .results import results_to_dataframe, save_results
from .tfl_client import fetch_journey_times from .tfl_client import fetch_journey_times
__all__ = [ __all__ = [
# Config
"DATA_DIR", "DATA_DIR",
"OUTPUT_DIR", "OUTPUT_DIR",
"MAX_DELAY", "MAX_DELAY",
@ -24,7 +23,6 @@ __all__ = [
"DESTINATIONS", "DESTINATIONS",
"Destination", "Destination",
"JourneyResult", "JourneyResult",
"load_all_postcodes",
"fetch_journey_times", "fetch_journey_times",
"results_to_dataframe", "results_to_dataframe",
"save_results", "save_results",

View file

@ -10,7 +10,6 @@ from .results import results_to_dataframe, save_results
from .tfl_client import fetch_journey_times from .tfl_client import fetch_journey_times
def main(): def main():
destination = DESTINATIONS["bank"] destination = DESTINATIONS["bank"]
@ -18,7 +17,7 @@ def main():
today = date.today() today = date.today()
days_until_monday = (7 - today.weekday()) % 7 or 7 days_until_monday = (7 - today.weekday()) % 7 or 7
journey_date = today + timedelta(days=days_until_monday) journey_date = today + timedelta(days=days_until_monday)
journey_time = "0800" journey_time = "0845"
print(f"Destination: {destination.name}") print(f"Destination: {destination.name}")
print( print(
@ -26,7 +25,7 @@ def main():
f"at {journey_time[:2]}:{journey_time[2:]}" f"at {journey_time[:2]}:{journey_time[2:]}"
) )
postcodes_df = pl.read_parquet(OUTPUT_DIR / "postcodes_h3.parquet") postcodes_df = pl.read_parquet(OUTPUT_DIR / "postcodes_h3.parquet")
print(f"Loaded {postcodes_df.height:,} postcodes") print(f"Loaded {postcodes_df.height:,} postcodes")
postcode_data = list( postcode_data = list(
@ -67,7 +66,7 @@ def main():
pl.lit(f"{journey_time[:2]}:{journey_time[2:]}").alias("journey_time"), pl.lit(f"{journey_time[:2]}:{journey_time[2:]}").alias("journey_time"),
) )
successful = results_df.filter(pl.col("fastest_minutes").is_not_null()).height successful = results_df.filter(pl.col("cycling").is_not_null()).height
print(f"Completed: {successful}/{len(results)} successful") print(f"Completed: {successful}/{len(results)} successful")
parquet_path = save_results(results_df, destination.name) parquet_path = save_results(results_df, destination.name)

View file

@ -24,9 +24,7 @@ class JourneyResult:
"""Result of a journey time calculation for a postcode.""" """Result of a journey time calculation for a postcode."""
postcode: str postcode: str
walking_minutes: int | None = None public_transport_easy_minutes: int | None = None
cycling_minutes: int | None = None cycling_minutes: int | None = None
public_transport_minutes: int | None = None public_transport_quick_minutes: int | None = None
fastest_minutes: int | None = None
fastest_mode: str | None = None
error: str | None = None error: str | None = None

View file

@ -17,10 +17,12 @@ class RateLimiter:
"""Wait until we can make a request within rate limits.""" """Wait until we can make a request within rate limits."""
async with self._lock: async with self._lock:
now = asyncio.get_event_loop().time() now = asyncio.get_event_loop().time()
cutoff = now - 60.0 cutoff = now - 10.0 # 10 seconds
self.request_times = [t for t in self.request_times if t > cutoff] self.request_times = [t for t in self.request_times if t > cutoff]
if len(self.request_times) >= REQUESTS_PER_MIN: if (
len(self.request_times) >= REQUESTS_PER_MIN // 6
): # we look at it every 10 seconds instead of minutes
wait_time = self.request_times[0] - cutoff wait_time = self.request_times[0] - cutoff
if wait_time > 0: if wait_time > 0:
warnings.warn( warnings.warn(

View file

@ -11,11 +11,9 @@ def results_to_dataframe(results: list[JourneyResult]) -> pl.DataFrame:
[ [
{ {
"postcode": r.postcode, "postcode": r.postcode,
"walking_minutes": r.walking_minutes, "public_transport_easy_minutes": r.public_transport_easy_minutes,
"public_transport_quick_minutes": r.public_transport_quick_minutes,
"cycling_minutes": r.cycling_minutes, "cycling_minutes": r.cycling_minutes,
"public_transport_minutes": r.public_transport_minutes,
"fastest_minutes": r.fastest_minutes,
"fastest_mode": r.fastest_mode,
"error": r.error, "error": r.error,
} }
for r in results for r in results

View file

@ -1,10 +1,10 @@
"""TfL API client for fetching journey times."""
import asyncio import asyncio
from typing import Literal
import warnings import warnings
from collections.abc import Callable from collections.abc import Callable
from http import HTTPStatus from http import HTTPStatus
from httpx import Timeout
from journey_client import Client from journey_client import Client
from journey_client.api.journey import ( from journey_client.api.journey import (
journey_journey_results_by_path_from_path_to_query_via_query_national_search_query_date_qu as journey_api, journey_journey_results_by_path_from_path_to_query_via_query_national_search_query_date_qu as journey_api,
@ -12,7 +12,16 @@ from journey_client.api.journey import (
from journey_client.models import ( from journey_client.models import (
JourneyJourneyResultsByPathFromPathToQueryViaQueryNationalSearchQueryDateQuTimeIs as TimeIs, JourneyJourneyResultsByPathFromPathToQueryViaQueryNationalSearchQueryDateQuTimeIs as TimeIs,
) )
from journey_client.types import UNSET, Unset from journey_client.models import (
JourneyJourneyResultsByPathFromPathToQueryViaQueryNationalSearchQueryDateQuJourneyPreference as JourneyPreference,
)
from journey_client.models import (
JourneyJourneyResultsByPathFromPathToQueryViaQueryNationalSearchQueryDateQuCyclePreference as CyclePreference,
)
from journey_client.models import (
JourneyJourneyResultsByPathFromPathToQueryViaQueryNationalSearchQueryDateQuBikeProficiency as BikeProficiency,
)
from journey_client.types import Unset
from .config import MAX_DELAY from .config import MAX_DELAY
from .models import Destination, JourneyResult from .models import Destination, JourneyResult
@ -24,25 +33,66 @@ async def fetch_journey_for_mode(
rate_limiter: RateLimiter, rate_limiter: RateLimiter,
from_location: str, from_location: str,
to_location: str, to_location: str,
mode: list[str] | Unset,
journey_date: str, journey_date: str,
journey_time: str, journey_time: str,
journey_type: Literal["quick"] | Literal["easy"] | Literal["cycle"],
retry_count: int = 5, retry_count: int = 5,
) -> int | None: ) -> int | None:
"""Fetch journey time for a specific mode with rate limiting.""" """Fetch journey time for a specific mode with rate limiting."""
mode_name = ",".join(mode) if not isinstance(mode, Unset) else "public_transport"
backoff = 1.0 backoff = 1.0
for attempt in range(retry_count): for attempt in range(retry_count):
try: try:
await rate_limiter.acquire() await rate_limiter.acquire()
cycle_preference = {
"quick": CyclePreference.TAKEONTRANSPORT,
"easy": CyclePreference.NONE,
"cycle": CyclePreference.ALLTHEWAY,
}[journey_type]
# options: public-bus,overground,train,tube,coach,dlr,cablecar,tram,river,walking,cycle
mode = {
"quick": [
"public-bus",
"overground",
"train",
"tube",
"coach",
"dlr",
"cablecar",
"tram",
"river",
"walking",
"cycle",
],
"easy": [
"public-bus",
"overground",
"train",
"tube",
"coach",
"dlr",
"cablecar",
"tram",
"river",
],
"cycle": ["cycle"],
}[journey_type]
response = await journey_api.asyncio_detailed( response = await journey_api.asyncio_detailed(
from_=from_location, from_=from_location,
to=to_location, to=to_location,
client=client, client=client,
date=journey_date, date=journey_date,
time=journey_time, time=journey_time,
time_is=TimeIs.DEPARTING, national_search=True,
time_is=TimeIs.ARRIVING,
journey_preference=JourneyPreference.LEASTINTERCHANGE
if journey_type == "easy"
else JourneyPreference.LEASTINTERCHANGE,
cycle_preference=cycle_preference,
bike_proficiency=BikeProficiency.FAST,
walking_optimization=journey_type == "quick",
mode=mode, mode=mode,
) )
@ -65,7 +115,7 @@ async def fetch_journey_for_mode(
HTTPStatus.GATEWAY_TIMEOUT, HTTPStatus.GATEWAY_TIMEOUT,
): ):
warnings.warn( warnings.warn(
f"HTTP {response.status_code.value} for {mode_name} from {from_location}, " f"HTTP {response.status_code.value} for {journey_type} from {from_location}, "
f"retrying in {backoff:.1f}s (attempt {attempt + 1}/{retry_count})", f"retrying in {backoff:.1f}s (attempt {attempt + 1}/{retry_count})",
stacklevel=2, stacklevel=2,
) )
@ -76,7 +126,7 @@ async def fetch_journey_for_mode(
return None return None
except Exception as e: except Exception as e:
warnings.warn( warnings.warn(
f"Network error for {mode_name} from {from_location}: {e}, " f"Network error for {journey_type} from {from_location}: {e}, "
f"retrying in {backoff:.1f}s (attempt {attempt + 1}/{retry_count})", f"retrying in {backoff:.1f}s (attempt {attempt + 1}/{retry_count})",
stacklevel=2, stacklevel=2,
) )
@ -84,7 +134,7 @@ async def fetch_journey_for_mode(
backoff = min(backoff * 2, MAX_DELAY) backoff = min(backoff * 2, MAX_DELAY)
continue continue
warnings.warn( warnings.warn(
f"Failed to fetch {mode_name} from {from_location} after {retry_count} attempts", f"Failed to fetch {journey_type} from {from_location} after {retry_count} attempts",
stacklevel=2, stacklevel=2,
) )
return None return None
@ -106,55 +156,42 @@ async def fetch_all_modes(
try: try:
from_location = f"{lat},{lon}" from_location = f"{lat},{lon}"
walking = await fetch_journey_for_mode( easy = await fetch_journey_for_mode(
client, client,
rate_limiter, rate_limiter,
from_location, from_location,
to_location, to_location,
["walking"],
journey_date, journey_date,
journey_time, journey_time,
journey_type="easy",
)
quick = await fetch_journey_for_mode(
client,
rate_limiter,
from_location,
to_location,
journey_date,
journey_time,
journey_type="quick",
) )
cycling = await fetch_journey_for_mode( cycling = await fetch_journey_for_mode(
client, client,
rate_limiter, rate_limiter,
from_location, from_location,
to_location, to_location,
["cycle"],
journey_date, journey_date,
journey_time, journey_time,
journey_type="cycle",
) )
public = await fetch_journey_for_mode(
client,
rate_limiter,
from_location,
to_location,
UNSET,
journey_date,
journey_time,
)
options = [
("walking", walking),
("cycling", cycling),
("public_transport", public),
]
valid_options = [(mode, time) for mode, time in options if time is not None]
if valid_options:
fastest_mode, fastest_time = min(valid_options, key=lambda x: x[1])
else:
fastest_mode, fastest_time = None, None
return JourneyResult( return JourneyResult(
postcode=postcode, postcode=postcode,
walking_minutes=walking, public_transport_easy_minutes=easy,
public_transport_quick_minutes=quick,
cycling_minutes=cycling, cycling_minutes=cycling,
public_transport_minutes=public,
fastest_minutes=fastest_time,
fastest_mode=fastest_mode,
) )
except Exception as e: except Exception as e:
print(f"Error: {e}")
return JourneyResult(postcode=postcode, error=str(e)) return JourneyResult(postcode=postcode, error=str(e))
@ -183,7 +220,7 @@ async def fetch_journey_times(
to_location = dest.to_tfl_location() to_location = dest.to_tfl_location()
rate_limiter = RateLimiter() rate_limiter = RateLimiter()
client = Client(base_url="https://api.tfl.gov.uk", timeout=30.0) client = Client(base_url="https://api.tfl.gov.uk").with_timeout(Timeout(30))
async with client as client: async with client as client:
tasks = [ tasks = [
fetch_all_modes( fetch_all_modes(

View file

@ -0,0 +1,79 @@
"""Aggregate journey times data by H3 hexagonal cells."""
from pathlib import Path
import polars as pl
from pipeline.config import AGGREGATES_DIR, H3_RESOLUTIONS, PROCESSED_DIR
def aggregate_journey_times(
journey_times_path: Path | None = None,
postcodes_h3_path: Path | None = None,
output_dir: Path | None = None,
) -> list[Path]:
"""
Aggregate journey times by H3 cells at all resolutions.
Joins journey_times_bank.parquet with postcodes_h3.parquet on postcode,
then groups by H3 cell to compute median journey time.
"""
journey_times_path = journey_times_path or PROCESSED_DIR / "journey_times_bank.parquet"
postcodes_h3_path = postcodes_h3_path or PROCESSED_DIR / "postcodes_h3.parquet"
output_dir = output_dir or AGGREGATES_DIR
output_dir.mkdir(parents=True, exist_ok=True)
# Load journey times data
journey_df = pl.read_parquet(journey_times_path).select(
["postcode", "public_transport_minutes"]
)
# Filter out null journey times
journey_df = journey_df.filter(pl.col("public_transport_minutes").is_not_null())
if journey_df.height == 0:
print("No valid journey times found")
return []
# Load postcodes with H3 indices
postcodes_df = pl.read_parquet(postcodes_h3_path)
# Join on postcode to get H3 indices
joined_df = journey_df.join(postcodes_df, on="postcode", how="inner")
if joined_df.height == 0:
print("No matching postcodes found")
return []
print(f"Joined {joined_df.height} postcodes with journey times")
saved_paths = []
for resolution in H3_RESOLUTIONS:
h3_col = f"h3_res{resolution}"
if h3_col not in joined_df.columns:
print(f"Skipping resolution {resolution} - column {h3_col} not found")
continue
# Aggregate by H3 cell - compute median journey time
agg_df = (
joined_df.group_by(h3_col)
.agg(
pl.col("public_transport_minutes").median().alias("median_journey_minutes"),
pl.col("public_transport_minutes").count().alias("journey_count"),
)
.rename({h3_col: "h3"})
)
output_path = output_dir / f"journey_times_res{resolution}.parquet"
agg_df.write_parquet(output_path)
saved_paths.append(output_path)
print(f"Saved {agg_df.height} cells to {output_path}")
return saved_paths
if __name__ == "__main__":
aggregate_journey_times()