From cf7449e38b51492b552c81cf2c4e492537495409 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sun, 1 Feb 2026 08:49:40 +0000 Subject: [PATCH] Fix tfl scraping --- pipeline/journey_times/__main__.py | 44 ++++++++++++++++++++++++---- pipeline/journey_times/tfl_client.py | 4 ++- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/pipeline/journey_times/__main__.py b/pipeline/journey_times/__main__.py index 5a5a944..cde9480 100644 --- a/pipeline/journey_times/__main__.py +++ b/pipeline/journey_times/__main__.py @@ -12,6 +12,7 @@ from .config import ( OUTPUT_DIR, MAX_DISTANCE_KM, ) +from .models import JourneyResult from .results import CheckpointSaver, results_to_dataframe, save_results from .tfl_client import fetch_journey_times from pipeline.utils import haversine_km_expr @@ -63,12 +64,44 @@ def main(): ), ) + # 25556/76273 + + # Resume from checkpoint if one exists + checkpoint_path = checkpoint_saver._checkpoint_path() + prior_results: list[JourneyResult] = [] + if checkpoint_path.exists(): + checkpoint_df = pl.read_parquet(checkpoint_path) + completed_postcodes = set( + checkpoint_df.filter(pl.col("public_transport_easy_minutes").is_not_null())["postcode"].to_list() + ) + prior_results = [ + JourneyResult( + postcode=row["postcode"], + public_transport_easy_minutes=row["public_transport_easy_minutes"], + public_transport_quick_minutes=row["public_transport_quick_minutes"], + cycling_minutes=row["cycling_minutes"], + error=row["error"], + ) + for row in checkpoint_df.iter_rows(named=True) + ] + checkpoint_saver.results = prior_results + checkpoint_saver._last_save_count = len(prior_results) + postcode_data = [ + (pc, lat, lon) + for pc, lat, lon in postcode_data + if pc not in completed_postcodes + ] + print( + f"Resumed from checkpoint: {len(prior_results):,} already done, " + f"{len(postcode_data):,} remaining" + ) + def on_result(result): pbar.update(1) checkpoint_saver.add_result(result) with tqdm(total=len(postcode_data), desc="Fetching journeys") as pbar: - results = asyncio.run( + new_results = asyncio.run( fetch_journey_times( postcode_data, destination, @@ -79,11 +112,12 @@ def main(): ) ) - results_df = results_to_dataframe(results) + all_results = prior_results + new_results + results_df = results_to_dataframe(all_results) - postcodes_processed = [pc for pc, _, _ in postcode_data] + all_postcodes = {r.postcode for r in all_results} coords_df = postcodes_df.filter( - pl.col("postcode").is_in(postcodes_processed) + pl.col("postcode").is_in(all_postcodes) ).select(["postcode", "lat", "long"]) results_df = coords_df.join(results_df, on="postcode", how="left") @@ -94,7 +128,7 @@ def main(): ) successful = results_df.filter(pl.col("cycling_minutes").is_not_null()).height - print(f"Completed: {successful}/{len(results)} successful") + print(f"Completed: {successful}/{len(all_results)} successful") parquet_path = save_results(results_df, destination.name) checkpoint_saver.cleanup_checkpoint() diff --git a/pipeline/journey_times/tfl_client.py b/pipeline/journey_times/tfl_client.py index 33fc794..4ddf634 100644 --- a/pipeline/journey_times/tfl_client.py +++ b/pipeline/journey_times/tfl_client.py @@ -219,7 +219,9 @@ async def fetch_journey_times( # TFL API authentication via app_key query parameter tfl_token = os.environ.get("TFL_TOKEN") - params = {"app_key": tfl_token} if tfl_token else {} + if not tfl_token: + raise RuntimeError("TFL_TOKEN environment variable not set") + params = {"app_key": tfl_token} async with httpx.AsyncClient( base_url=BASE_URL,