79 lines
2.4 KiB
Python
79 lines
2.4 KiB
Python
"""Aggregate journey times data by H3 hexagonal cells."""
|
|
|
|
from pathlib import Path
|
|
|
|
import polars as pl
|
|
|
|
from pipeline.config import AGGREGATES_DIR, H3_RESOLUTIONS, PROCESSED_DIR
|
|
|
|
|
|
def aggregate_journey_times(
|
|
journey_times_path: Path | None = None,
|
|
postcodes_h3_path: Path | None = None,
|
|
output_dir: Path | None = None,
|
|
) -> list[Path]:
|
|
"""
|
|
Aggregate journey times by H3 cells at all resolutions.
|
|
|
|
Joins journey_times_bank.parquet with postcodes_h3.parquet on postcode,
|
|
then groups by H3 cell to compute median journey time.
|
|
"""
|
|
journey_times_path = journey_times_path or PROCESSED_DIR / "journey_times_bank.parquet"
|
|
postcodes_h3_path = postcodes_h3_path or PROCESSED_DIR / "postcodes_h3.parquet"
|
|
output_dir = output_dir or AGGREGATES_DIR
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Load journey times data
|
|
journey_df = pl.read_parquet(journey_times_path).select(
|
|
["postcode", "public_transport_minutes"]
|
|
)
|
|
|
|
# Filter out null journey times
|
|
journey_df = journey_df.filter(pl.col("public_transport_minutes").is_not_null())
|
|
|
|
if journey_df.height == 0:
|
|
print("No valid journey times found")
|
|
return []
|
|
|
|
# Load postcodes with H3 indices
|
|
postcodes_df = pl.read_parquet(postcodes_h3_path)
|
|
|
|
# Join on postcode to get H3 indices
|
|
joined_df = journey_df.join(postcodes_df, on="postcode", how="inner")
|
|
|
|
if joined_df.height == 0:
|
|
print("No matching postcodes found")
|
|
return []
|
|
|
|
print(f"Joined {joined_df.height} postcodes with journey times")
|
|
|
|
saved_paths = []
|
|
|
|
for resolution in H3_RESOLUTIONS:
|
|
h3_col = f"h3_res{resolution}"
|
|
|
|
if h3_col not in joined_df.columns:
|
|
print(f"Skipping resolution {resolution} - column {h3_col} not found")
|
|
continue
|
|
|
|
# Aggregate by H3 cell - compute median journey time
|
|
agg_df = (
|
|
joined_df.group_by(h3_col)
|
|
.agg(
|
|
pl.col("public_transport_minutes").median().alias("median_journey_minutes"),
|
|
pl.col("public_transport_minutes").count().alias("journey_count"),
|
|
)
|
|
.rename({h3_col: "h3"})
|
|
)
|
|
|
|
output_path = output_dir / f"journey_times_res{resolution}.parquet"
|
|
agg_df.write_parquet(output_path)
|
|
saved_paths.append(output_path)
|
|
print(f"Saved {agg_df.height} cells to {output_path}")
|
|
|
|
return saved_paths
|
|
|
|
|
|
if __name__ == "__main__":
|
|
aggregate_journey_times()
|