Fix tfl scraping
This commit is contained in:
parent
ac45af8514
commit
cf7449e38b
2 changed files with 42 additions and 6 deletions
|
|
@ -12,6 +12,7 @@ from .config import (
|
||||||
OUTPUT_DIR,
|
OUTPUT_DIR,
|
||||||
MAX_DISTANCE_KM,
|
MAX_DISTANCE_KM,
|
||||||
)
|
)
|
||||||
|
from .models import JourneyResult
|
||||||
from .results import CheckpointSaver, results_to_dataframe, save_results
|
from .results import CheckpointSaver, results_to_dataframe, save_results
|
||||||
from .tfl_client import fetch_journey_times
|
from .tfl_client import fetch_journey_times
|
||||||
from pipeline.utils import haversine_km_expr
|
from pipeline.utils import haversine_km_expr
|
||||||
|
|
@ -63,12 +64,44 @@ def main():
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# 25556/76273
|
||||||
|
|
||||||
|
# Resume from checkpoint if one exists
|
||||||
|
checkpoint_path = checkpoint_saver._checkpoint_path()
|
||||||
|
prior_results: list[JourneyResult] = []
|
||||||
|
if checkpoint_path.exists():
|
||||||
|
checkpoint_df = pl.read_parquet(checkpoint_path)
|
||||||
|
completed_postcodes = set(
|
||||||
|
checkpoint_df.filter(pl.col("public_transport_easy_minutes").is_not_null())["postcode"].to_list()
|
||||||
|
)
|
||||||
|
prior_results = [
|
||||||
|
JourneyResult(
|
||||||
|
postcode=row["postcode"],
|
||||||
|
public_transport_easy_minutes=row["public_transport_easy_minutes"],
|
||||||
|
public_transport_quick_minutes=row["public_transport_quick_minutes"],
|
||||||
|
cycling_minutes=row["cycling_minutes"],
|
||||||
|
error=row["error"],
|
||||||
|
)
|
||||||
|
for row in checkpoint_df.iter_rows(named=True)
|
||||||
|
]
|
||||||
|
checkpoint_saver.results = prior_results
|
||||||
|
checkpoint_saver._last_save_count = len(prior_results)
|
||||||
|
postcode_data = [
|
||||||
|
(pc, lat, lon)
|
||||||
|
for pc, lat, lon in postcode_data
|
||||||
|
if pc not in completed_postcodes
|
||||||
|
]
|
||||||
|
print(
|
||||||
|
f"Resumed from checkpoint: {len(prior_results):,} already done, "
|
||||||
|
f"{len(postcode_data):,} remaining"
|
||||||
|
)
|
||||||
|
|
||||||
def on_result(result):
|
def on_result(result):
|
||||||
pbar.update(1)
|
pbar.update(1)
|
||||||
checkpoint_saver.add_result(result)
|
checkpoint_saver.add_result(result)
|
||||||
|
|
||||||
with tqdm(total=len(postcode_data), desc="Fetching journeys") as pbar:
|
with tqdm(total=len(postcode_data), desc="Fetching journeys") as pbar:
|
||||||
results = asyncio.run(
|
new_results = asyncio.run(
|
||||||
fetch_journey_times(
|
fetch_journey_times(
|
||||||
postcode_data,
|
postcode_data,
|
||||||
destination,
|
destination,
|
||||||
|
|
@ -79,11 +112,12 @@ def main():
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
results_df = results_to_dataframe(results)
|
all_results = prior_results + new_results
|
||||||
|
results_df = results_to_dataframe(all_results)
|
||||||
|
|
||||||
postcodes_processed = [pc for pc, _, _ in postcode_data]
|
all_postcodes = {r.postcode for r in all_results}
|
||||||
coords_df = postcodes_df.filter(
|
coords_df = postcodes_df.filter(
|
||||||
pl.col("postcode").is_in(postcodes_processed)
|
pl.col("postcode").is_in(all_postcodes)
|
||||||
).select(["postcode", "lat", "long"])
|
).select(["postcode", "lat", "long"])
|
||||||
results_df = coords_df.join(results_df, on="postcode", how="left")
|
results_df = coords_df.join(results_df, on="postcode", how="left")
|
||||||
|
|
||||||
|
|
@ -94,7 +128,7 @@ def main():
|
||||||
)
|
)
|
||||||
|
|
||||||
successful = results_df.filter(pl.col("cycling_minutes").is_not_null()).height
|
successful = results_df.filter(pl.col("cycling_minutes").is_not_null()).height
|
||||||
print(f"Completed: {successful}/{len(results)} successful")
|
print(f"Completed: {successful}/{len(all_results)} successful")
|
||||||
|
|
||||||
parquet_path = save_results(results_df, destination.name)
|
parquet_path = save_results(results_df, destination.name)
|
||||||
checkpoint_saver.cleanup_checkpoint()
|
checkpoint_saver.cleanup_checkpoint()
|
||||||
|
|
|
||||||
|
|
@ -219,7 +219,9 @@ async def fetch_journey_times(
|
||||||
|
|
||||||
# TFL API authentication via app_key query parameter
|
# TFL API authentication via app_key query parameter
|
||||||
tfl_token = os.environ.get("TFL_TOKEN")
|
tfl_token = os.environ.get("TFL_TOKEN")
|
||||||
params = {"app_key": tfl_token} if tfl_token else {}
|
if not tfl_token:
|
||||||
|
raise RuntimeError("TFL_TOKEN environment variable not set")
|
||||||
|
params = {"app_key": tfl_token}
|
||||||
|
|
||||||
async with httpx.AsyncClient(
|
async with httpx.AsyncClient(
|
||||||
base_url=BASE_URL,
|
base_url=BASE_URL,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue