Add initial version of TFL isochrones
This commit is contained in:
parent
433fca64ad
commit
31bfc76b58
9 changed files with 448 additions and 0 deletions
|
|
@ -69,3 +69,4 @@ Nice to haves?
|
|||
- [Local Autheority (Upper Tier)](https://communitiesopendata-communities.hub.arcgis.com/datasets/6e8edb2974da4834bbafa09644a5b02d_0/explore?location=52.684195%2C-2.489482%2C7.17)
|
||||
- [Open Geography](https://geoportal.statistics.gov.uk/)
|
||||
- [CommunitiesOpenData](https://communitiesopendata-communities.hub.arcgis.com/)
|
||||
- [PlanetOSM](https://planet.openstreetmap.org/) for open street map POI
|
||||
|
|
|
|||
31
pipeline/journey_times/__init__.py
Normal file
31
pipeline/journey_times/__init__.py
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
"""Journey times calculation module for TfL transit data."""
|
||||
|
||||
from .config import (
|
||||
DATA_DIR,
|
||||
DESTINATIONS,
|
||||
MAX_CONCURRENT,
|
||||
MAX_DELAY,
|
||||
MAX_POSTCODES,
|
||||
OUTPUT_DIR,
|
||||
REQUESTS_PER_MIN,
|
||||
)
|
||||
from .models import Destination, JourneyResult
|
||||
from .results import results_to_dataframe, save_results
|
||||
from .tfl_client import fetch_journey_times
|
||||
|
||||
__all__ = [
|
||||
# Config
|
||||
"DATA_DIR",
|
||||
"OUTPUT_DIR",
|
||||
"MAX_DELAY",
|
||||
"REQUESTS_PER_MIN",
|
||||
"MAX_POSTCODES",
|
||||
"MAX_CONCURRENT",
|
||||
"DESTINATIONS",
|
||||
"Destination",
|
||||
"JourneyResult",
|
||||
"load_all_postcodes",
|
||||
"fetch_journey_times",
|
||||
"results_to_dataframe",
|
||||
"save_results",
|
||||
]
|
||||
78
pipeline/journey_times/__main__.py
Normal file
78
pipeline/journey_times/__main__.py
Normal file
|
|
@ -0,0 +1,78 @@
|
|||
import asyncio
|
||||
import random
|
||||
from datetime import date, timedelta
|
||||
|
||||
import polars as pl
|
||||
from tqdm import tqdm
|
||||
|
||||
from .config import DESTINATIONS, MAX_CONCURRENT, MAX_POSTCODES, OUTPUT_DIR
|
||||
from .results import results_to_dataframe, save_results
|
||||
from .tfl_client import fetch_journey_times
|
||||
|
||||
|
||||
|
||||
def main():
|
||||
destination = DESTINATIONS["bank"]
|
||||
|
||||
# Calculate next Monday at 8am
|
||||
today = date.today()
|
||||
days_until_monday = (7 - today.weekday()) % 7 or 7
|
||||
journey_date = today + timedelta(days=days_until_monday)
|
||||
journey_time = "0800"
|
||||
|
||||
print(f"Destination: {destination.name}")
|
||||
print(
|
||||
f"Journey: {journey_date.strftime('%A %Y-%m-%d')} "
|
||||
f"at {journey_time[:2]}:{journey_time[2:]}"
|
||||
)
|
||||
|
||||
postcodes_df = pl.read_parquet(OUTPUT_DIR / "postcodes_h3.parquet")
|
||||
print(f"Loaded {postcodes_df.height:,} postcodes")
|
||||
|
||||
postcode_data = list(
|
||||
zip(
|
||||
postcodes_df["postcode"].to_list(),
|
||||
postcodes_df["lat"].to_list(),
|
||||
postcodes_df["long"].to_list(),
|
||||
)
|
||||
)
|
||||
|
||||
if MAX_POSTCODES is not None and len(postcode_data) > MAX_POSTCODES:
|
||||
postcode_data = random.sample(postcode_data, MAX_POSTCODES)
|
||||
print(f"Randomly sampled {MAX_POSTCODES} postcodes")
|
||||
|
||||
with tqdm(total=len(postcode_data), desc="Fetching journeys") as pbar:
|
||||
results = asyncio.run(
|
||||
fetch_journey_times(
|
||||
postcode_data,
|
||||
destination,
|
||||
journey_date.strftime("%Y%m%d"),
|
||||
journey_time,
|
||||
MAX_CONCURRENT,
|
||||
progress_callback=lambda _: pbar.update(1),
|
||||
)
|
||||
)
|
||||
|
||||
results_df = results_to_dataframe(results)
|
||||
|
||||
postcodes_processed = [pc for pc, _, _ in postcode_data]
|
||||
coords_df = postcodes_df.filter(
|
||||
pl.col("postcode").is_in(postcodes_processed)
|
||||
).select(["postcode", "lat", "long"])
|
||||
results_df = coords_df.join(results_df, on="postcode", how="left")
|
||||
|
||||
results_df = results_df.with_columns(
|
||||
pl.lit(destination.name).alias("destination"),
|
||||
pl.lit(journey_date.strftime("%Y-%m-%d")).alias("journey_date"),
|
||||
pl.lit(f"{journey_time[:2]}:{journey_time[2:]}").alias("journey_time"),
|
||||
)
|
||||
|
||||
successful = results_df.filter(pl.col("fastest_minutes").is_not_null()).height
|
||||
print(f"Completed: {successful}/{len(results)} successful")
|
||||
|
||||
parquet_path = save_results(results_df, destination.name)
|
||||
print(f"Saved to {parquet_path}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
24
pipeline/journey_times/config.py
Normal file
24
pipeline/journey_times/config.py
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
"""Configuration constants for journey times processing."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from .models import Destination
|
||||
|
||||
DATA_DIR = Path("./data_sources")
|
||||
OUTPUT_DIR = DATA_DIR / "processed"
|
||||
|
||||
MAX_DELAY = 10
|
||||
REQUESTS_PER_MIN = 50
|
||||
MAX_POSTCODES = 100 # Set to None to process all postcodes
|
||||
MAX_CONCURRENT = 5
|
||||
|
||||
DESTINATIONS = {
|
||||
"bank": Destination(51.5133, -0.0886, "Bank", "940GZZLUBNK"),
|
||||
"waterloo": Destination(51.5031, -0.1132, "Waterloo", "940GZZLUWLO"),
|
||||
"kings-cross": Destination(51.5308, -0.1238, "King's Cross", "940GZZLUKSX"),
|
||||
"liverpool-street": Destination(
|
||||
51.5178, -0.0823, "Liverpool Street", "940GZZLULVS"
|
||||
),
|
||||
"paddington": Destination(51.5154, -0.1755, "Paddington", "940GZZLUPAC"),
|
||||
"victoria": Destination(51.4965, -0.1447, "Victoria", "940GZZLUVIC"),
|
||||
}
|
||||
0
pipeline/journey_times/data.py
Normal file
0
pipeline/journey_times/data.py
Normal file
32
pipeline/journey_times/models.py
Normal file
32
pipeline/journey_times/models.py
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
"""Data models for journey times processing."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class Destination:
|
||||
"""A destination point for journey planning."""
|
||||
|
||||
lat: float
|
||||
lon: float
|
||||
name: str
|
||||
naptan_id: str | None = None
|
||||
|
||||
def to_tfl_location(self) -> str:
|
||||
"""Convert to TfL API location string."""
|
||||
if self.naptan_id:
|
||||
return self.naptan_id
|
||||
return f"{self.lat},{self.lon}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class JourneyResult:
|
||||
"""Result of a journey time calculation for a postcode."""
|
||||
|
||||
postcode: str
|
||||
walking_minutes: int | None = None
|
||||
cycling_minutes: int | None = None
|
||||
public_transport_minutes: int | None = None
|
||||
fastest_minutes: int | None = None
|
||||
fastest_mode: str | None = None
|
||||
error: str | None = None
|
||||
33
pipeline/journey_times/rate_limiter.py
Normal file
33
pipeline/journey_times/rate_limiter.py
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
"""Rate limiting for TfL API requests."""
|
||||
|
||||
import asyncio
|
||||
import warnings
|
||||
|
||||
from .config import REQUESTS_PER_MIN
|
||||
|
||||
|
||||
class RateLimiter:
|
||||
"""Rate limiter enforcing max requests per minute."""
|
||||
|
||||
def __init__(self):
|
||||
self.request_times: list[float] = []
|
||||
self._lock = asyncio.Lock()
|
||||
|
||||
async def acquire(self):
|
||||
"""Wait until we can make a request within rate limits."""
|
||||
async with self._lock:
|
||||
now = asyncio.get_event_loop().time()
|
||||
cutoff = now - 60.0
|
||||
self.request_times = [t for t in self.request_times if t > cutoff]
|
||||
|
||||
if len(self.request_times) >= REQUESTS_PER_MIN:
|
||||
wait_time = self.request_times[0] - cutoff
|
||||
if wait_time > 0:
|
||||
warnings.warn(
|
||||
f"Rate limit reached ({REQUESTS_PER_MIN}/min), "
|
||||
f"waiting {wait_time:.1f}s",
|
||||
stacklevel=2,
|
||||
)
|
||||
await asyncio.sleep(wait_time)
|
||||
|
||||
self.request_times.append(asyncio.get_event_loop().time())
|
||||
38
pipeline/journey_times/results.py
Normal file
38
pipeline/journey_times/results.py
Normal file
|
|
@ -0,0 +1,38 @@
|
|||
from pathlib import Path
|
||||
|
||||
import polars as pl
|
||||
|
||||
from .config import OUTPUT_DIR
|
||||
from .models import JourneyResult
|
||||
|
||||
|
||||
def results_to_dataframe(results: list[JourneyResult]) -> pl.DataFrame:
|
||||
return pl.DataFrame(
|
||||
[
|
||||
{
|
||||
"postcode": r.postcode,
|
||||
"walking_minutes": r.walking_minutes,
|
||||
"cycling_minutes": r.cycling_minutes,
|
||||
"public_transport_minutes": r.public_transport_minutes,
|
||||
"fastest_minutes": r.fastest_minutes,
|
||||
"fastest_mode": r.fastest_mode,
|
||||
"error": r.error,
|
||||
}
|
||||
for r in results
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def save_results(
|
||||
results: pl.DataFrame,
|
||||
destination_name: str,
|
||||
output_dir: Path | None = None,
|
||||
) -> Path:
|
||||
if output_dir is None:
|
||||
output_dir = OUTPUT_DIR
|
||||
|
||||
safe_name = destination_name.lower().replace(" ", "-")
|
||||
parquet_path = output_dir / f"journey_times_{safe_name}.parquet"
|
||||
results.write_parquet(parquet_path)
|
||||
|
||||
return parquet_path
|
||||
211
pipeline/journey_times/tfl_client.py
Normal file
211
pipeline/journey_times/tfl_client.py
Normal file
|
|
@ -0,0 +1,211 @@
|
|||
"""TfL API client for fetching journey times."""
|
||||
|
||||
import asyncio
|
||||
import warnings
|
||||
from collections.abc import Callable
|
||||
from http import HTTPStatus
|
||||
|
||||
from journey_client import Client
|
||||
from journey_client.api.journey import (
|
||||
journey_journey_results_by_path_from_path_to_query_via_query_national_search_query_date_qu as journey_api,
|
||||
)
|
||||
from journey_client.models import (
|
||||
JourneyJourneyResultsByPathFromPathToQueryViaQueryNationalSearchQueryDateQuTimeIs as TimeIs,
|
||||
)
|
||||
from journey_client.types import UNSET, Unset
|
||||
|
||||
from .config import MAX_DELAY
|
||||
from .models import Destination, JourneyResult
|
||||
from .rate_limiter import RateLimiter
|
||||
|
||||
|
||||
async def fetch_journey_for_mode(
|
||||
client: Client,
|
||||
rate_limiter: RateLimiter,
|
||||
from_location: str,
|
||||
to_location: str,
|
||||
mode: list[str] | Unset,
|
||||
journey_date: str,
|
||||
journey_time: str,
|
||||
retry_count: int = 5,
|
||||
) -> int | None:
|
||||
"""Fetch journey time for a specific mode with rate limiting."""
|
||||
mode_name = ",".join(mode) if not isinstance(mode, Unset) else "public_transport"
|
||||
backoff = 1.0
|
||||
for attempt in range(retry_count):
|
||||
try:
|
||||
await rate_limiter.acquire()
|
||||
|
||||
response = await journey_api.asyncio_detailed(
|
||||
from_=from_location,
|
||||
to=to_location,
|
||||
client=client,
|
||||
date=journey_date,
|
||||
time=journey_time,
|
||||
time_is=TimeIs.DEPARTING,
|
||||
mode=mode,
|
||||
)
|
||||
|
||||
if response.status_code == HTTPStatus.OK and response.parsed:
|
||||
journeys = response.parsed.journeys
|
||||
if not isinstance(journeys, Unset) and journeys:
|
||||
durations = [
|
||||
j.duration
|
||||
for j in journeys
|
||||
if not isinstance(j.duration, Unset)
|
||||
]
|
||||
if durations:
|
||||
return min(durations)
|
||||
return None
|
||||
elif response.status_code in (
|
||||
HTTPStatus.TOO_MANY_REQUESTS,
|
||||
HTTPStatus.INTERNAL_SERVER_ERROR,
|
||||
HTTPStatus.BAD_GATEWAY,
|
||||
HTTPStatus.SERVICE_UNAVAILABLE,
|
||||
HTTPStatus.GATEWAY_TIMEOUT,
|
||||
):
|
||||
warnings.warn(
|
||||
f"HTTP {response.status_code.value} for {mode_name} from {from_location}, "
|
||||
f"retrying in {backoff:.1f}s (attempt {attempt + 1}/{retry_count})",
|
||||
stacklevel=2,
|
||||
)
|
||||
await asyncio.sleep(backoff)
|
||||
backoff = min(backoff * 2, MAX_DELAY)
|
||||
continue
|
||||
else:
|
||||
return None
|
||||
except Exception as e:
|
||||
warnings.warn(
|
||||
f"Network error for {mode_name} from {from_location}: {e}, "
|
||||
f"retrying in {backoff:.1f}s (attempt {attempt + 1}/{retry_count})",
|
||||
stacklevel=2,
|
||||
)
|
||||
await asyncio.sleep(backoff)
|
||||
backoff = min(backoff * 2, MAX_DELAY)
|
||||
continue
|
||||
warnings.warn(
|
||||
f"Failed to fetch {mode_name} from {from_location} after {retry_count} attempts",
|
||||
stacklevel=2,
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
async def fetch_all_modes(
|
||||
client: Client,
|
||||
rate_limiter: RateLimiter,
|
||||
postcode: str,
|
||||
lat: float,
|
||||
lon: float,
|
||||
to_location: str,
|
||||
journey_date: str,
|
||||
journey_time: str,
|
||||
semaphore: asyncio.Semaphore,
|
||||
) -> JourneyResult:
|
||||
"""Fetch journey times for all transport modes using coordinates."""
|
||||
async with semaphore:
|
||||
try:
|
||||
from_location = f"{lat},{lon}"
|
||||
|
||||
walking = await fetch_journey_for_mode(
|
||||
client,
|
||||
rate_limiter,
|
||||
from_location,
|
||||
to_location,
|
||||
["walking"],
|
||||
journey_date,
|
||||
journey_time,
|
||||
)
|
||||
cycling = await fetch_journey_for_mode(
|
||||
client,
|
||||
rate_limiter,
|
||||
from_location,
|
||||
to_location,
|
||||
["cycle"],
|
||||
journey_date,
|
||||
journey_time,
|
||||
)
|
||||
public = await fetch_journey_for_mode(
|
||||
client,
|
||||
rate_limiter,
|
||||
from_location,
|
||||
to_location,
|
||||
UNSET,
|
||||
journey_date,
|
||||
journey_time,
|
||||
)
|
||||
|
||||
options = [
|
||||
("walking", walking),
|
||||
("cycling", cycling),
|
||||
("public_transport", public),
|
||||
]
|
||||
valid_options = [(mode, time) for mode, time in options if time is not None]
|
||||
|
||||
if valid_options:
|
||||
fastest_mode, fastest_time = min(valid_options, key=lambda x: x[1])
|
||||
else:
|
||||
fastest_mode, fastest_time = None, None
|
||||
|
||||
return JourneyResult(
|
||||
postcode=postcode,
|
||||
walking_minutes=walking,
|
||||
cycling_minutes=cycling,
|
||||
public_transport_minutes=public,
|
||||
fastest_minutes=fastest_time,
|
||||
fastest_mode=fastest_mode,
|
||||
)
|
||||
except Exception as e:
|
||||
return JourneyResult(postcode=postcode, error=str(e))
|
||||
|
||||
|
||||
async def fetch_journey_times(
|
||||
postcode_data: list[tuple[str, float, float]],
|
||||
dest: Destination,
|
||||
journey_date: str,
|
||||
journey_time: str,
|
||||
max_concurrent: int = 2,
|
||||
progress_callback: Callable[[JourneyResult], None] | None = None,
|
||||
) -> list[JourneyResult]:
|
||||
"""Fetch journey times for all postcodes with rate limiting.
|
||||
|
||||
Args:
|
||||
postcode_data: List of (postcode, lat, lon) tuples
|
||||
dest: Destination for journey planning
|
||||
journey_date: Date in YYYYMMDD format
|
||||
journey_time: Time in HHMM format
|
||||
max_concurrent: Maximum concurrent API requests
|
||||
progress_callback: Optional callback called with each result
|
||||
|
||||
Returns:
|
||||
List of JourneyResult objects in the same order as postcode_data
|
||||
"""
|
||||
semaphore = asyncio.Semaphore(max_concurrent)
|
||||
to_location = dest.to_tfl_location()
|
||||
rate_limiter = RateLimiter()
|
||||
|
||||
client = Client(base_url="https://api.tfl.gov.uk", timeout=30.0)
|
||||
async with client as client:
|
||||
tasks = [
|
||||
fetch_all_modes(
|
||||
client,
|
||||
rate_limiter,
|
||||
pc,
|
||||
lat,
|
||||
lon,
|
||||
to_location,
|
||||
journey_date,
|
||||
journey_time,
|
||||
semaphore,
|
||||
)
|
||||
for pc, lat, lon in postcode_data
|
||||
]
|
||||
|
||||
results = []
|
||||
for coro in asyncio.as_completed(tasks):
|
||||
result = await coro
|
||||
results.append(result)
|
||||
if progress_callback:
|
||||
progress_callback(result)
|
||||
|
||||
postcode_to_result = {r.postcode: r for r in results}
|
||||
return [postcode_to_result[pc] for pc, _, _ in postcode_data]
|
||||
Loading…
Add table
Add a link
Reference in a new issue