"""Aggregate journey times data by H3 hexagonal cells.""" from pathlib import Path import polars as pl from pipeline.config import AGGREGATES_DIR, H3_RESOLUTIONS, PROCESSED_DIR def aggregate_journey_times( journey_times_path: Path | None = None, postcodes_h3_path: Path | None = None, output_dir: Path | None = None, ) -> list[Path]: """ Aggregate journey times by H3 cells at all resolutions. Joins journey_times_bank.parquet with postcodes_h3.parquet on postcode, then groups by H3 cell to compute median journey time. """ journey_times_path = journey_times_path or PROCESSED_DIR / "journey_times_bank.parquet" postcodes_h3_path = postcodes_h3_path or PROCESSED_DIR / "postcodes_h3.parquet" output_dir = output_dir or AGGREGATES_DIR output_dir.mkdir(parents=True, exist_ok=True) # Load journey times data journey_df = pl.read_parquet(journey_times_path).select( ["postcode", "public_transport_minutes"] ) # Filter out null journey times journey_df = journey_df.filter(pl.col("public_transport_minutes").is_not_null()) if journey_df.height == 0: print("No valid journey times found") return [] # Load postcodes with H3 indices postcodes_df = pl.read_parquet(postcodes_h3_path) # Join on postcode to get H3 indices joined_df = journey_df.join(postcodes_df, on="postcode", how="inner") if joined_df.height == 0: print("No matching postcodes found") return [] print(f"Joined {joined_df.height} postcodes with journey times") saved_paths = [] for resolution in H3_RESOLUTIONS: h3_col = f"h3_res{resolution}" if h3_col not in joined_df.columns: print(f"Skipping resolution {resolution} - column {h3_col} not found") continue # Aggregate by H3 cell - compute median journey time agg_df = ( joined_df.group_by(h3_col) .agg( pl.col("public_transport_minutes").median().alias("median_journey_minutes"), pl.col("public_transport_minutes").count().alias("journey_count"), ) .rename({h3_col: "h3"}) ) output_path = output_dir / f"journey_times_res{resolution}.parquet" agg_df.write_parquet(output_path) saved_paths.append(output_path) print(f"Saved {agg_df.height} cells to {output_path}") return saved_paths if __name__ == "__main__": aggregate_journey_times()