Remove journey times

This commit is contained in:
Andras Schmelczer 2026-02-22 21:32:47 +00:00
parent 7a12e6c09a
commit 9da2db707f
7 changed files with 0 additions and 616 deletions

View file

@ -1,25 +0,0 @@
"""Journey times calculation module for TfL transit data."""
from .config import (
DESTINATIONS,
MAX_CONCURRENT,
MAX_DELAY,
MAX_POSTCODES,
REQUESTS_PER_MIN,
)
from .models import Destination, JourneyResult
from .results import results_to_dataframe, save_results
from .tfl_client import fetch_journey_times
__all__ = [
"MAX_DELAY",
"REQUESTS_PER_MIN",
"MAX_POSTCODES",
"MAX_CONCURRENT",
"DESTINATIONS",
"Destination",
"JourneyResult",
"fetch_journey_times",
"results_to_dataframe",
"save_results",
]

View file

@ -1,167 +0,0 @@
import argparse
import asyncio
import random
from datetime import date, timedelta
from pathlib import Path
import polars as pl
from tqdm import tqdm
from .config import (
DESTINATIONS,
MAX_CONCURRENT,
MAX_POSTCODES,
MAX_DISTANCE_KM,
)
from .models import JourneyResult
from .results import CheckpointSaver, results_to_dataframe, save_results
from .tfl_client import fetch_journey_times
from pipeline.utils import haversine_km_expr
def main():
parser = argparse.ArgumentParser(description="Fetch TfL journey times")
parser.add_argument(
"--destination",
required=True,
choices=list(DESTINATIONS.keys()),
help="Destination key",
)
parser.add_argument(
"--output-dir",
required=True,
type=Path,
help="Directory for output and checkpoint files",
)
parser.add_argument(
"--postcodes",
required=True,
type=Path,
help="ArcGIS postcode parquet file",
)
args = parser.parse_args()
destination = DESTINATIONS[args.destination]
output_dir = args.output_dir
# Calculate next Monday at 8am
today = date.today()
days_until_monday = (7 - today.weekday()) % 7 or 7
journey_date = today + timedelta(days=days_until_monday)
journey_time = "0845"
print(f"Destination: {destination.name}")
print(
f"Journey: {journey_date.strftime('%A %Y-%m-%d')} "
f"at {journey_time[:2]}:{journey_time[2:]}"
)
postcodes_df = pl.read_parquet(args.postcodes).select(
pl.col("pcds").alias("postcode"),
"lat",
"long",
)
print(f"Loaded {postcodes_df.height:,} postcodes")
# Filter to postcodes within range of destination
postcodes_df = postcodes_df.with_columns(
haversine_km_expr("lat", "long", destination.lat, destination.lon).alias(
"distance_km"
)
).filter(pl.col("distance_km") <= MAX_DISTANCE_KM)
print(f"Filtered to {postcodes_df.height:,} postcodes within {MAX_DISTANCE_KM}km")
postcode_data = list(
zip(
postcodes_df["postcode"].to_list(),
postcodes_df["lat"].to_list(),
postcodes_df["long"].to_list(),
)
)
if MAX_POSTCODES is not None and len(postcode_data) > MAX_POSTCODES:
postcode_data = random.sample(postcode_data, MAX_POSTCODES)
print(f"Randomly sampled {MAX_POSTCODES} postcodes")
checkpoint_saver = CheckpointSaver(
destination_name=destination.name,
output_dir=output_dir,
on_save=lambda path, count: print(
f"Checkpoint saved: {count:,} results to {path}"
),
)
# Resume from checkpoint if one exists
checkpoint_path = checkpoint_saver._checkpoint_path()
prior_results: list[JourneyResult] = []
if checkpoint_path.exists():
checkpoint_df = pl.read_parquet(checkpoint_path)
# Deduplicate checkpoint rows per postcode, preferring rows with data
checkpoint_df = checkpoint_df.sort(
"public_transport_quick_minutes", nulls_last=True
).unique(subset=["postcode"], keep="first")
completed_postcodes = set(checkpoint_df["postcode"].to_list())
prior_results = [
JourneyResult(
postcode=row["postcode"],
public_transport_easy_minutes=row["public_transport_easy_minutes"],
public_transport_quick_minutes=row["public_transport_quick_minutes"],
cycling_minutes=row["cycling_minutes"],
error=row["error"],
)
for row in checkpoint_df.iter_rows(named=True)
]
checkpoint_saver.results = prior_results
checkpoint_saver._last_save_count = len(prior_results)
postcode_data = [
(pc, lat, lon)
for pc, lat, lon in postcode_data
if pc not in completed_postcodes
]
print(
f"Resumed from checkpoint: {len(prior_results):,} already done, "
f"{len(postcode_data):,} remaining"
)
def on_result(result):
pbar.update(1)
checkpoint_saver.add_result(result)
with tqdm(total=len(postcode_data), desc="Fetching journeys") as pbar:
new_results = asyncio.run(
fetch_journey_times(
postcode_data,
destination,
journey_date.strftime("%Y%m%d"),
journey_time,
MAX_CONCURRENT,
progress_callback=on_result,
)
)
all_results = prior_results + new_results
results_df = results_to_dataframe(all_results)
all_postcodes = {r.postcode for r in all_results}
coords_df = postcodes_df.filter(pl.col("postcode").is_in(all_postcodes)).select(
["postcode", "lat", "long"]
)
results_df = coords_df.join(results_df, on="postcode", how="left")
results_df = results_df.with_columns(
pl.lit(destination.name).alias("destination"),
pl.lit(journey_date.strftime("%Y-%m-%d")).alias("journey_date"),
pl.lit(f"{journey_time[:2]}:{journey_time[2:]}").alias("journey_time"),
)
successful = results_df.filter(pl.col("cycling_minutes").is_not_null()).height
print(f"Completed: {successful}/{len(all_results)} successful")
parquet_path = save_results(results_df, destination.name, output_dir)
checkpoint_saver.cleanup_checkpoint()
print(f"Saved to {parquet_path}")
if __name__ == "__main__":
main()

View file

@ -1,23 +0,0 @@
"""Configuration constants for journey times processing."""
from .models import Destination
MAX_DELAY = 10
REQUESTS_PER_MIN = 500
MAX_POSTCODES = None
MAX_CONCURRENT = 80
MAX_DISTANCE_KM = 110
CHECKPOINT_INTERVAL = 10000
DESTINATIONS = {
"bank": Destination(51.5133, -0.0886, "Bank", "940GZZLUBNK"),
"waterloo": Destination(51.5031, -0.1132, "Waterloo", "940GZZLUWLO"),
"kings-cross": Destination(51.5308, -0.1238, "King's Cross", "940GZZLUKSX"),
"liverpool-street": Destination(
51.5178, -0.0823, "Liverpool Street", "940GZZLULVS"
),
"paddington": Destination(51.5154, -0.1755, "Paddington", "940GZZLUPAC"),
"victoria": Destination(51.4965, -0.1447, "Victoria", "940GZZLUVIC"),
"fitzrovia": Destination(51.5165, -0.1310, "Fitzrovia", "940GZZLUTCR"),
}

View file

@ -1,30 +0,0 @@
"""Data models for journey times processing."""
from dataclasses import dataclass
@dataclass
class Destination:
"""A destination point for journey planning."""
lat: float
lon: float
name: str
naptan_id: str | None = None
def to_tfl_location(self) -> str:
"""Convert to TfL API location string."""
if self.naptan_id:
return self.naptan_id
return f"{self.lat},{self.lon}"
@dataclass
class JourneyResult:
"""Result of a journey time calculation for a postcode."""
postcode: str
public_transport_easy_minutes: int | None = None
cycling_minutes: int | None = None
public_transport_quick_minutes: int | None = None
error: str | None = None

View file

@ -1,35 +0,0 @@
"""Rate limiting for TfL API requests."""
import asyncio
import warnings
from .config import REQUESTS_PER_MIN
class RateLimiter:
"""Rate limiter enforcing max requests per minute."""
def __init__(self):
self.request_times: list[float] = []
self._lock = asyncio.Lock()
async def acquire(self):
"""Wait until we can make a request within rate limits."""
async with self._lock:
now = asyncio.get_event_loop().time()
cutoff = now - 10.0 # 10 seconds
self.request_times = [t for t in self.request_times if t > cutoff]
if (
len(self.request_times) >= REQUESTS_PER_MIN // 6
): # we look at it every 10 seconds instead of minutes
wait_time = self.request_times[0] - cutoff
if wait_time > 0:
warnings.warn(
f"Rate limit reached ({REQUESTS_PER_MIN}/min), "
f"waiting {wait_time:.1f}s",
stacklevel=1,
)
await asyncio.sleep(wait_time)
self.request_times.append(asyncio.get_event_loop().time())

View file

@ -1,82 +0,0 @@
from pathlib import Path
from typing import Callable
import polars as pl
from .config import CHECKPOINT_INTERVAL
from .models import JourneyResult
def results_to_dataframe(results: list[JourneyResult]) -> pl.DataFrame:
return pl.DataFrame(
[
{
"postcode": r.postcode,
"public_transport_easy_minutes": r.public_transport_easy_minutes,
"public_transport_quick_minutes": r.public_transport_quick_minutes,
"cycling_minutes": r.cycling_minutes,
"error": r.error,
}
for r in results
]
)
class CheckpointSaver:
"""Collects results and saves checkpoints at regular intervals."""
def __init__(
self,
destination_name: str,
output_dir: Path,
interval: int = CHECKPOINT_INTERVAL,
on_save: Callable[[Path, int], None] | None = None,
):
self.destination_name = destination_name
self.output_dir = output_dir
self.interval = interval
self.on_save = on_save
self.results: list[JourneyResult] = []
self._last_save_count = 0
def add_result(self, result: JourneyResult) -> None:
"""Add a result and save checkpoint if interval is reached."""
self.results.append(result)
if len(self.results) - self._last_save_count >= self.interval:
self.save_checkpoint()
def save_checkpoint(self) -> Path:
"""Save current results to checkpoint file."""
df = results_to_dataframe(self.results)
path = self._checkpoint_path()
df.write_parquet(path)
self._last_save_count = len(self.results)
if self.on_save:
self.on_save(path, len(self.results))
return path
def _checkpoint_path(self) -> Path:
safe_name = self.destination_name.lower().replace(" ", "-")
return self.output_dir / f"journey_times_{safe_name}_checkpoint.parquet"
def get_results(self) -> list[JourneyResult]:
"""Return all collected results."""
return self.results
def cleanup_checkpoint(self) -> None:
"""Remove the checkpoint file after successful completion."""
path = self._checkpoint_path()
if path.exists():
path.unlink()
def save_results(
results: pl.DataFrame,
destination_name: str,
output_dir: Path,
) -> Path:
safe_name = destination_name.lower().replace(" ", "-")
parquet_path = output_dir / f"journey_times_{safe_name}.parquet"
results.write_parquet(parquet_path)
return parquet_path

View file

@ -1,254 +0,0 @@
import asyncio
import os
from typing import Literal
import warnings
from collections.abc import Callable
from http import HTTPStatus
import httpx
from .config import MAX_DELAY
from .models import Destination, JourneyResult
from .rate_limiter import RateLimiter
BASE_URL = "https://api.tfl.gov.uk"
async def fetch_journey_for_mode(
client: httpx.AsyncClient,
rate_limiter: RateLimiter,
from_location: str,
to_location: str,
journey_date: str,
journey_time: str,
journey_type: Literal["quick"] | Literal["easy"] | Literal["cycle"],
retry_count: int = 5,
) -> int | None:
"""Fetch journey time for a specific mode with rate limiting."""
backoff = 1.0
for attempt in range(retry_count):
try:
await rate_limiter.acquire()
journey_preference = {
"quick": "LeastTime",
"easy": "LeastInterchange",
"cycle": None,
}[journey_type]
cycle_preference = {
"quick": None,
"easy": None,
"cycle": "AllTheWay",
}[journey_type]
# curl -s "https://api.tfl.gov.uk/Journey/Meta/Modes" | jq '.[].modeName'
mode = {
"quick": [
"bus",
"overground",
"national-rail",
"international-rail",
"elizabeth-line",
"tube",
"coach",
"dlr",
"cable-car",
"replacement-bus",
"tram",
"river-bus",
"walking",
"cycle",
],
"easy": [
"bus",
"overground",
"national-rail",
"international-rail",
"elizabeth-line",
"replacement-bus",
"tube",
"coach",
"dlr",
"cable-car",
"tram",
"river-bus",
],
"cycle": ["cycle"],
}[journey_type]
params: dict = {
"date": journey_date,
"time": journey_time,
"nationalSearch": "true",
"timeIs": "Arriving",
"cyclePreference": cycle_preference,
"bikeProficiency": "Fast",
"walkingOptimization": str(journey_type == "quick").lower(),
"mode": ",".join(mode),
}
if journey_preference:
params["journeyPreference"] = journey_preference
url = f"/Journey/JourneyResults/{from_location}/to/{to_location}"
response = await client.get(url, params=params)
if response.status_code == HTTPStatus.OK:
data = response.json()
journeys = data.get("journeys", [])
if journeys:
durations = [
j["duration"] for j in journeys if j.get("duration") is not None
]
if durations:
return min(durations)
return None
elif response.status_code in (
HTTPStatus.TOO_MANY_REQUESTS,
HTTPStatus.INTERNAL_SERVER_ERROR,
HTTPStatus.BAD_GATEWAY,
HTTPStatus.SERVICE_UNAVAILABLE,
HTTPStatus.GATEWAY_TIMEOUT,
):
warnings.warn(
f"HTTP {response.status_code} for {journey_type} from {from_location}, "
f"retrying in {backoff:.1f}s (attempt {attempt + 1}/{retry_count})",
stacklevel=2,
)
await asyncio.sleep(backoff)
backoff = min(backoff * 2, MAX_DELAY)
continue
else:
return None
except Exception as e:
warnings.warn(
f"Network error for {journey_type} from {from_location}: {e}, "
f"retrying in {backoff:.1f}s (attempt {attempt + 1}/{retry_count})",
stacklevel=2,
)
await asyncio.sleep(backoff)
backoff = min(backoff * 2, MAX_DELAY)
continue
warnings.warn(
f"Failed to fetch {journey_type} from {from_location} after {retry_count} attempts",
stacklevel=2,
)
return None
async def fetch_all_modes(
client: httpx.AsyncClient,
rate_limiter: RateLimiter,
postcode: str,
lat: float,
lon: float,
to_location: str,
journey_date: str,
journey_time: str,
semaphore: asyncio.Semaphore,
) -> JourneyResult:
"""Fetch journey times for all transport modes using coordinates."""
async with semaphore:
try:
from_location = f"{lat},{lon}"
easy = await fetch_journey_for_mode(
client,
rate_limiter,
from_location,
to_location,
journey_date,
journey_time,
journey_type="easy",
)
quick = await fetch_journey_for_mode(
client,
rate_limiter,
from_location,
to_location,
journey_date,
journey_time,
journey_type="quick",
)
cycling = await fetch_journey_for_mode(
client,
rate_limiter,
from_location,
to_location,
journey_date,
journey_time,
journey_type="cycle",
)
return JourneyResult(
postcode=postcode,
public_transport_easy_minutes=easy,
public_transport_quick_minutes=quick,
cycling_minutes=cycling,
)
except Exception as e:
print(f"Error: {e}")
return JourneyResult(postcode=postcode, error=str(e))
async def fetch_journey_times(
postcode_data: list[tuple[str, float, float]],
dest: Destination,
journey_date: str,
journey_time: str,
max_concurrent: int = 2,
progress_callback: Callable[[JourneyResult], None] | None = None,
) -> list[JourneyResult]:
"""Fetch journey times for all postcodes with rate limiting.
Args:
postcode_data: List of (postcode, lat, lon) tuples
dest: Destination for journey planning
journey_date: Date in YYYYMMDD format
journey_time: Time in HHMM format
max_concurrent: Maximum concurrent API requests
progress_callback: Optional callback called with each result
Returns:
List of JourneyResult objects in the same order as postcode_data
"""
semaphore = asyncio.Semaphore(max_concurrent)
to_location = dest.to_tfl_location()
rate_limiter = RateLimiter()
# TFL API authentication via app_key query parameter
tfl_token = os.environ.get("TFL_TOKEN")
if not tfl_token:
raise RuntimeError("TFL_TOKEN environment variable not set")
params = {"app_key": tfl_token}
async with httpx.AsyncClient(
base_url=BASE_URL,
params=params,
timeout=httpx.Timeout(30),
) as client:
tasks = [
fetch_all_modes(
client,
rate_limiter,
pc,
lat,
lon,
to_location,
journey_date,
journey_time,
semaphore,
)
for pc, lat, lon in postcode_data
]
results = []
for coro in asyncio.as_completed(tasks):
result = await coro
results.append(result)
if progress_callback:
progress_callback(result)
postcode_to_result = {r.postcode: r for r in results}
return [postcode_to_result[pc] for pc, _, _ in postcode_data]