diff --git a/pipeline/journey_times/__init__.py b/pipeline/journey_times/__init__.py deleted file mode 100644 index 799c14b..0000000 --- a/pipeline/journey_times/__init__.py +++ /dev/null @@ -1,25 +0,0 @@ -"""Journey times calculation module for TfL transit data.""" - -from .config import ( - DESTINATIONS, - MAX_CONCURRENT, - MAX_DELAY, - MAX_POSTCODES, - REQUESTS_PER_MIN, -) -from .models import Destination, JourneyResult -from .results import results_to_dataframe, save_results -from .tfl_client import fetch_journey_times - -__all__ = [ - "MAX_DELAY", - "REQUESTS_PER_MIN", - "MAX_POSTCODES", - "MAX_CONCURRENT", - "DESTINATIONS", - "Destination", - "JourneyResult", - "fetch_journey_times", - "results_to_dataframe", - "save_results", -] diff --git a/pipeline/journey_times/__main__.py b/pipeline/journey_times/__main__.py deleted file mode 100644 index 96494f5..0000000 --- a/pipeline/journey_times/__main__.py +++ /dev/null @@ -1,167 +0,0 @@ -import argparse -import asyncio -import random -from datetime import date, timedelta -from pathlib import Path - -import polars as pl -from tqdm import tqdm - -from .config import ( - DESTINATIONS, - MAX_CONCURRENT, - MAX_POSTCODES, - MAX_DISTANCE_KM, -) -from .models import JourneyResult -from .results import CheckpointSaver, results_to_dataframe, save_results -from .tfl_client import fetch_journey_times -from pipeline.utils import haversine_km_expr - - -def main(): - parser = argparse.ArgumentParser(description="Fetch TfL journey times") - parser.add_argument( - "--destination", - required=True, - choices=list(DESTINATIONS.keys()), - help="Destination key", - ) - parser.add_argument( - "--output-dir", - required=True, - type=Path, - help="Directory for output and checkpoint files", - ) - parser.add_argument( - "--postcodes", - required=True, - type=Path, - help="ArcGIS postcode parquet file", - ) - args = parser.parse_args() - - destination = DESTINATIONS[args.destination] - output_dir = args.output_dir - - # Calculate next Monday at 8am - today = date.today() - days_until_monday = (7 - today.weekday()) % 7 or 7 - journey_date = today + timedelta(days=days_until_monday) - journey_time = "0845" - - print(f"Destination: {destination.name}") - print( - f"Journey: {journey_date.strftime('%A %Y-%m-%d')} " - f"at {journey_time[:2]}:{journey_time[2:]}" - ) - - postcodes_df = pl.read_parquet(args.postcodes).select( - pl.col("pcds").alias("postcode"), - "lat", - "long", - ) - print(f"Loaded {postcodes_df.height:,} postcodes") - - # Filter to postcodes within range of destination - postcodes_df = postcodes_df.with_columns( - haversine_km_expr("lat", "long", destination.lat, destination.lon).alias( - "distance_km" - ) - ).filter(pl.col("distance_km") <= MAX_DISTANCE_KM) - - print(f"Filtered to {postcodes_df.height:,} postcodes within {MAX_DISTANCE_KM}km") - - postcode_data = list( - zip( - postcodes_df["postcode"].to_list(), - postcodes_df["lat"].to_list(), - postcodes_df["long"].to_list(), - ) - ) - - if MAX_POSTCODES is not None and len(postcode_data) > MAX_POSTCODES: - postcode_data = random.sample(postcode_data, MAX_POSTCODES) - print(f"Randomly sampled {MAX_POSTCODES} postcodes") - - checkpoint_saver = CheckpointSaver( - destination_name=destination.name, - output_dir=output_dir, - on_save=lambda path, count: print( - f"Checkpoint saved: {count:,} results to {path}" - ), - ) - - # Resume from checkpoint if one exists - checkpoint_path = checkpoint_saver._checkpoint_path() - prior_results: list[JourneyResult] = [] - if checkpoint_path.exists(): - checkpoint_df = pl.read_parquet(checkpoint_path) - # Deduplicate checkpoint rows per postcode, preferring rows with data - checkpoint_df = checkpoint_df.sort( - "public_transport_quick_minutes", nulls_last=True - ).unique(subset=["postcode"], keep="first") - completed_postcodes = set(checkpoint_df["postcode"].to_list()) - prior_results = [ - JourneyResult( - postcode=row["postcode"], - public_transport_easy_minutes=row["public_transport_easy_minutes"], - public_transport_quick_minutes=row["public_transport_quick_minutes"], - cycling_minutes=row["cycling_minutes"], - error=row["error"], - ) - for row in checkpoint_df.iter_rows(named=True) - ] - checkpoint_saver.results = prior_results - checkpoint_saver._last_save_count = len(prior_results) - postcode_data = [ - (pc, lat, lon) - for pc, lat, lon in postcode_data - if pc not in completed_postcodes - ] - print( - f"Resumed from checkpoint: {len(prior_results):,} already done, " - f"{len(postcode_data):,} remaining" - ) - - def on_result(result): - pbar.update(1) - checkpoint_saver.add_result(result) - - with tqdm(total=len(postcode_data), desc="Fetching journeys") as pbar: - new_results = asyncio.run( - fetch_journey_times( - postcode_data, - destination, - journey_date.strftime("%Y%m%d"), - journey_time, - MAX_CONCURRENT, - progress_callback=on_result, - ) - ) - - all_results = prior_results + new_results - results_df = results_to_dataframe(all_results) - - all_postcodes = {r.postcode for r in all_results} - coords_df = postcodes_df.filter(pl.col("postcode").is_in(all_postcodes)).select( - ["postcode", "lat", "long"] - ) - results_df = coords_df.join(results_df, on="postcode", how="left") - - results_df = results_df.with_columns( - pl.lit(destination.name).alias("destination"), - pl.lit(journey_date.strftime("%Y-%m-%d")).alias("journey_date"), - pl.lit(f"{journey_time[:2]}:{journey_time[2:]}").alias("journey_time"), - ) - - successful = results_df.filter(pl.col("cycling_minutes").is_not_null()).height - print(f"Completed: {successful}/{len(all_results)} successful") - - parquet_path = save_results(results_df, destination.name, output_dir) - checkpoint_saver.cleanup_checkpoint() - print(f"Saved to {parquet_path}") - - -if __name__ == "__main__": - main() diff --git a/pipeline/journey_times/config.py b/pipeline/journey_times/config.py deleted file mode 100644 index ed36d58..0000000 --- a/pipeline/journey_times/config.py +++ /dev/null @@ -1,23 +0,0 @@ -"""Configuration constants for journey times processing.""" - -from .models import Destination - -MAX_DELAY = 10 -REQUESTS_PER_MIN = 500 -MAX_POSTCODES = None -MAX_CONCURRENT = 80 -MAX_DISTANCE_KM = 110 -CHECKPOINT_INTERVAL = 10000 - - -DESTINATIONS = { - "bank": Destination(51.5133, -0.0886, "Bank", "940GZZLUBNK"), - "waterloo": Destination(51.5031, -0.1132, "Waterloo", "940GZZLUWLO"), - "kings-cross": Destination(51.5308, -0.1238, "King's Cross", "940GZZLUKSX"), - "liverpool-street": Destination( - 51.5178, -0.0823, "Liverpool Street", "940GZZLULVS" - ), - "paddington": Destination(51.5154, -0.1755, "Paddington", "940GZZLUPAC"), - "victoria": Destination(51.4965, -0.1447, "Victoria", "940GZZLUVIC"), - "fitzrovia": Destination(51.5165, -0.1310, "Fitzrovia", "940GZZLUTCR"), -} diff --git a/pipeline/journey_times/models.py b/pipeline/journey_times/models.py deleted file mode 100644 index 9261357..0000000 --- a/pipeline/journey_times/models.py +++ /dev/null @@ -1,30 +0,0 @@ -"""Data models for journey times processing.""" - -from dataclasses import dataclass - - -@dataclass -class Destination: - """A destination point for journey planning.""" - - lat: float - lon: float - name: str - naptan_id: str | None = None - - def to_tfl_location(self) -> str: - """Convert to TfL API location string.""" - if self.naptan_id: - return self.naptan_id - return f"{self.lat},{self.lon}" - - -@dataclass -class JourneyResult: - """Result of a journey time calculation for a postcode.""" - - postcode: str - public_transport_easy_minutes: int | None = None - cycling_minutes: int | None = None - public_transport_quick_minutes: int | None = None - error: str | None = None diff --git a/pipeline/journey_times/rate_limiter.py b/pipeline/journey_times/rate_limiter.py deleted file mode 100644 index eba2d1a..0000000 --- a/pipeline/journey_times/rate_limiter.py +++ /dev/null @@ -1,35 +0,0 @@ -"""Rate limiting for TfL API requests.""" - -import asyncio -import warnings - -from .config import REQUESTS_PER_MIN - - -class RateLimiter: - """Rate limiter enforcing max requests per minute.""" - - def __init__(self): - self.request_times: list[float] = [] - self._lock = asyncio.Lock() - - async def acquire(self): - """Wait until we can make a request within rate limits.""" - async with self._lock: - now = asyncio.get_event_loop().time() - cutoff = now - 10.0 # 10 seconds - self.request_times = [t for t in self.request_times if t > cutoff] - - if ( - len(self.request_times) >= REQUESTS_PER_MIN // 6 - ): # we look at it every 10 seconds instead of minutes - wait_time = self.request_times[0] - cutoff - if wait_time > 0: - warnings.warn( - f"Rate limit reached ({REQUESTS_PER_MIN}/min), " - f"waiting {wait_time:.1f}s", - stacklevel=1, - ) - await asyncio.sleep(wait_time) - - self.request_times.append(asyncio.get_event_loop().time()) diff --git a/pipeline/journey_times/results.py b/pipeline/journey_times/results.py deleted file mode 100644 index 293e28f..0000000 --- a/pipeline/journey_times/results.py +++ /dev/null @@ -1,82 +0,0 @@ -from pathlib import Path -from typing import Callable - -import polars as pl - -from .config import CHECKPOINT_INTERVAL -from .models import JourneyResult - - -def results_to_dataframe(results: list[JourneyResult]) -> pl.DataFrame: - return pl.DataFrame( - [ - { - "postcode": r.postcode, - "public_transport_easy_minutes": r.public_transport_easy_minutes, - "public_transport_quick_minutes": r.public_transport_quick_minutes, - "cycling_minutes": r.cycling_minutes, - "error": r.error, - } - for r in results - ] - ) - - -class CheckpointSaver: - """Collects results and saves checkpoints at regular intervals.""" - - def __init__( - self, - destination_name: str, - output_dir: Path, - interval: int = CHECKPOINT_INTERVAL, - on_save: Callable[[Path, int], None] | None = None, - ): - self.destination_name = destination_name - self.output_dir = output_dir - self.interval = interval - self.on_save = on_save - self.results: list[JourneyResult] = [] - self._last_save_count = 0 - - def add_result(self, result: JourneyResult) -> None: - """Add a result and save checkpoint if interval is reached.""" - self.results.append(result) - if len(self.results) - self._last_save_count >= self.interval: - self.save_checkpoint() - - def save_checkpoint(self) -> Path: - """Save current results to checkpoint file.""" - df = results_to_dataframe(self.results) - path = self._checkpoint_path() - df.write_parquet(path) - self._last_save_count = len(self.results) - if self.on_save: - self.on_save(path, len(self.results)) - return path - - def _checkpoint_path(self) -> Path: - safe_name = self.destination_name.lower().replace(" ", "-") - return self.output_dir / f"journey_times_{safe_name}_checkpoint.parquet" - - def get_results(self) -> list[JourneyResult]: - """Return all collected results.""" - return self.results - - def cleanup_checkpoint(self) -> None: - """Remove the checkpoint file after successful completion.""" - path = self._checkpoint_path() - if path.exists(): - path.unlink() - - -def save_results( - results: pl.DataFrame, - destination_name: str, - output_dir: Path, -) -> Path: - safe_name = destination_name.lower().replace(" ", "-") - parquet_path = output_dir / f"journey_times_{safe_name}.parquet" - results.write_parquet(parquet_path) - - return parquet_path diff --git a/pipeline/journey_times/tfl_client.py b/pipeline/journey_times/tfl_client.py deleted file mode 100644 index 4ddf634..0000000 --- a/pipeline/journey_times/tfl_client.py +++ /dev/null @@ -1,254 +0,0 @@ -import asyncio -import os -from typing import Literal -import warnings -from collections.abc import Callable -from http import HTTPStatus - -import httpx - -from .config import MAX_DELAY -from .models import Destination, JourneyResult -from .rate_limiter import RateLimiter - - -BASE_URL = "https://api.tfl.gov.uk" - - -async def fetch_journey_for_mode( - client: httpx.AsyncClient, - rate_limiter: RateLimiter, - from_location: str, - to_location: str, - journey_date: str, - journey_time: str, - journey_type: Literal["quick"] | Literal["easy"] | Literal["cycle"], - retry_count: int = 5, -) -> int | None: - """Fetch journey time for a specific mode with rate limiting.""" - backoff = 1.0 - for attempt in range(retry_count): - try: - await rate_limiter.acquire() - - journey_preference = { - "quick": "LeastTime", - "easy": "LeastInterchange", - "cycle": None, - }[journey_type] - - cycle_preference = { - "quick": None, - "easy": None, - "cycle": "AllTheWay", - }[journey_type] - - # curl -s "https://api.tfl.gov.uk/Journey/Meta/Modes" | jq '.[].modeName' - mode = { - "quick": [ - "bus", - "overground", - "national-rail", - "international-rail", - "elizabeth-line", - "tube", - "coach", - "dlr", - "cable-car", - "replacement-bus", - "tram", - "river-bus", - "walking", - "cycle", - ], - "easy": [ - "bus", - "overground", - "national-rail", - "international-rail", - "elizabeth-line", - "replacement-bus", - "tube", - "coach", - "dlr", - "cable-car", - "tram", - "river-bus", - ], - "cycle": ["cycle"], - }[journey_type] - - params: dict = { - "date": journey_date, - "time": journey_time, - "nationalSearch": "true", - "timeIs": "Arriving", - "cyclePreference": cycle_preference, - "bikeProficiency": "Fast", - "walkingOptimization": str(journey_type == "quick").lower(), - "mode": ",".join(mode), - } - if journey_preference: - params["journeyPreference"] = journey_preference - - url = f"/Journey/JourneyResults/{from_location}/to/{to_location}" - response = await client.get(url, params=params) - - if response.status_code == HTTPStatus.OK: - data = response.json() - journeys = data.get("journeys", []) - if journeys: - durations = [ - j["duration"] for j in journeys if j.get("duration") is not None - ] - if durations: - return min(durations) - return None - elif response.status_code in ( - HTTPStatus.TOO_MANY_REQUESTS, - HTTPStatus.INTERNAL_SERVER_ERROR, - HTTPStatus.BAD_GATEWAY, - HTTPStatus.SERVICE_UNAVAILABLE, - HTTPStatus.GATEWAY_TIMEOUT, - ): - warnings.warn( - f"HTTP {response.status_code} for {journey_type} from {from_location}, " - f"retrying in {backoff:.1f}s (attempt {attempt + 1}/{retry_count})", - stacklevel=2, - ) - await asyncio.sleep(backoff) - backoff = min(backoff * 2, MAX_DELAY) - continue - else: - return None - except Exception as e: - warnings.warn( - f"Network error for {journey_type} from {from_location}: {e}, " - f"retrying in {backoff:.1f}s (attempt {attempt + 1}/{retry_count})", - stacklevel=2, - ) - await asyncio.sleep(backoff) - backoff = min(backoff * 2, MAX_DELAY) - continue - warnings.warn( - f"Failed to fetch {journey_type} from {from_location} after {retry_count} attempts", - stacklevel=2, - ) - return None - - -async def fetch_all_modes( - client: httpx.AsyncClient, - rate_limiter: RateLimiter, - postcode: str, - lat: float, - lon: float, - to_location: str, - journey_date: str, - journey_time: str, - semaphore: asyncio.Semaphore, -) -> JourneyResult: - """Fetch journey times for all transport modes using coordinates.""" - async with semaphore: - try: - from_location = f"{lat},{lon}" - - easy = await fetch_journey_for_mode( - client, - rate_limiter, - from_location, - to_location, - journey_date, - journey_time, - journey_type="easy", - ) - quick = await fetch_journey_for_mode( - client, - rate_limiter, - from_location, - to_location, - journey_date, - journey_time, - journey_type="quick", - ) - cycling = await fetch_journey_for_mode( - client, - rate_limiter, - from_location, - to_location, - journey_date, - journey_time, - journey_type="cycle", - ) - - return JourneyResult( - postcode=postcode, - public_transport_easy_minutes=easy, - public_transport_quick_minutes=quick, - cycling_minutes=cycling, - ) - except Exception as e: - print(f"Error: {e}") - return JourneyResult(postcode=postcode, error=str(e)) - - -async def fetch_journey_times( - postcode_data: list[tuple[str, float, float]], - dest: Destination, - journey_date: str, - journey_time: str, - max_concurrent: int = 2, - progress_callback: Callable[[JourneyResult], None] | None = None, -) -> list[JourneyResult]: - """Fetch journey times for all postcodes with rate limiting. - - Args: - postcode_data: List of (postcode, lat, lon) tuples - dest: Destination for journey planning - journey_date: Date in YYYYMMDD format - journey_time: Time in HHMM format - max_concurrent: Maximum concurrent API requests - progress_callback: Optional callback called with each result - - Returns: - List of JourneyResult objects in the same order as postcode_data - """ - semaphore = asyncio.Semaphore(max_concurrent) - to_location = dest.to_tfl_location() - rate_limiter = RateLimiter() - - # TFL API authentication via app_key query parameter - tfl_token = os.environ.get("TFL_TOKEN") - if not tfl_token: - raise RuntimeError("TFL_TOKEN environment variable not set") - params = {"app_key": tfl_token} - - async with httpx.AsyncClient( - base_url=BASE_URL, - params=params, - timeout=httpx.Timeout(30), - ) as client: - tasks = [ - fetch_all_modes( - client, - rate_limiter, - pc, - lat, - lon, - to_location, - journey_date, - journey_time, - semaphore, - ) - for pc, lat, lon in postcode_data - ] - - results = [] - for coro in asyncio.as_completed(tasks): - result = await coro - results.append(result) - if progress_callback: - progress_callback(result) - - postcode_to_result = {r.postcode: r for r in results} - return [postcode_to_result[pc] for pc, _, _ in postcode_data]