Add hexagon backend

This commit is contained in:
Andras Schmelczer 2026-01-25 21:07:05 +00:00
parent a7cc4d9b2b
commit ab704c0dc0
18 changed files with 1443 additions and 0 deletions

0
pipeline/__init__.py Normal file
View file

22
pipeline/base.py Normal file
View file

@ -0,0 +1,22 @@
from abc import ABC, abstractmethod
import polars as pl
class DataSource(ABC):
"""Base class for all data sources."""
@property
@abstractmethod
def name(self) -> str:
"""Unique identifier for this data source."""
pass
@abstractmethod
def load(self) -> pl.LazyFrame:
"""Load raw data as LazyFrame."""
pass
@abstractmethod
def process(self, postcodes: pl.LazyFrame) -> pl.LazyFrame:
"""Process and join with postcode coordinates."""
pass

23
pipeline/config.py Normal file
View file

@ -0,0 +1,23 @@
"""Shared configuration for the pipeline and server."""
from pathlib import Path
# Data directories
DATA_DIR = Path(__file__).parent.parent / "data_sources"
PROCESSED_DIR = DATA_DIR / "processed"
AGGREGATES_DIR = PROCESSED_DIR / "aggregates"
# H3 resolutions to generate and serve
# https://h3geo.org/docs/core-library/restable/#average-area-in-m2
H3_RESOLUTIONS = [6, 7, 8, 9, 10, 11, 12]
DEFAULT_H3_RESOLUTION = 8
# Year filters
MIN_YEAR = 1995
MAX_YEAR = 2024
DEFAULT_MIN_YEAR = 2020
DEFAULT_MAX_YEAR = 2024
# Price filters
DEFAULT_MIN_PRICE = 0
DEFAULT_MAX_PRICE = 2_000_000

View file

View file

@ -0,0 +1,42 @@
from pathlib import Path
import polars as pl
from pipeline.config import AGGREGATES_DIR, H3_RESOLUTIONS
def aggregate(df: pl.LazyFrame, resolution: int) -> pl.LazyFrame:
"""Aggregate property data by H3 cell and year."""
h3_col = f"h3_res{resolution}"
return (
df.group_by(h3_col, "year")
.agg(
pl.len().alias("count"),
pl.col("price").mean().alias("avg_price"),
pl.col("price").median().alias("median_price"),
pl.col("price").min().alias("min_price"),
pl.col("price").max().alias("max_price"),
)
.rename({h3_col: "h3"})
)
def aggregate_all(df: pl.LazyFrame) -> dict[int, pl.LazyFrame]:
"""Aggregate at all H3 resolutions."""
return {res: aggregate(df, res) for res in H3_RESOLUTIONS}
def save_aggregates(df: pl.LazyFrame, output_dir: Path | None = None) -> list[Path]:
"""Aggregate and save at all H3 resolutions."""
output_dir = output_dir or AGGREGATES_DIR
output_dir.mkdir(parents=True, exist_ok=True)
saved_paths = []
aggregates = aggregate_all(df)
for res, agg_df in aggregates.items():
output_path = output_dir / f"res{res}.parquet"
agg_df.collect().write_parquet(output_path)
saved_paths.append(output_path)
return saved_paths

36
pipeline/run.py Normal file
View file

@ -0,0 +1,36 @@
"""Pipeline CLI to process property data with H3 spatial indexing."""
from pathlib import Path
import polars as pl
from tqdm import tqdm
from pipeline.sources.postcodes import save_postcodes, DATA_DIR
from pipeline.sources.property_prices import PropertyPricesSource
from pipeline.processors.h3_aggregator import save_aggregates
def run_pipeline():
"""Run the full data processing pipeline."""
print("=" * 60)
print("Property Map Data Pipeline")
print("=" * 60)
# Step 1: Process postcodes with H3 indices
print("\n[1/3] Processing postcodes with H3 indices...")
postcodes_path = save_postcodes()
print(f" Saved: {postcodes_path}")
print("\n[2/3] Processing property prices...")
postcodes = pl.scan_parquet(postcodes_path)
property_source = PropertyPricesSource()
properties = property_source.process(postcodes)
print(" Joined property prices with postcodes")
print("\n[3/3] Aggregating at H3 resolutions...")
saved_paths = save_aggregates(properties)
for path in saved_paths:
size_mb = path.stat().st_size / (1024 * 1024)
print(f" Saved: {path.name} ({size_mb:.1f} MB)")
if __name__ == "__main__":
run_pipeline()

View file

View file

@ -0,0 +1,48 @@
from pathlib import Path
import polars as pl
import h3
from pipeline.config import DATA_DIR, H3_RESOLUTIONS, PROCESSED_DIR
def lat_long_to_h3(lat: float, long: float, resolution: int) -> str:
"""Convert lat/long to H3 index at given resolution."""
return h3.latlng_to_cell(lat, long, resolution)
def load_postcodes() -> pl.LazyFrame:
"""Load postcode data from arcgis parquet file."""
return pl.scan_parquet(DATA_DIR / "arcgis_data.parquet").select(
pl.col("pcds").alias("postcode"),
pl.col("lat"),
pl.col("long"),
)
def process_postcodes() -> pl.LazyFrame:
"""Process postcodes and add H3 indices at multiple resolutions."""
df = load_postcodes().collect()
for res in H3_RESOLUTIONS:
col_name = f"h3_res{res}"
df = df.with_columns(
pl.struct(["lat", "long"])
.map_elements(
lambda x: lat_long_to_h3(x["lat"], x["long"], res),
return_dtype=pl.Utf8,
)
.alias(col_name)
)
return df.lazy()
def save_postcodes(output_path: Path | None = None) -> Path:
"""Process and save postcodes with H3 indices."""
output_path = output_path or PROCESSED_DIR / "postcodes_h3.parquet"
output_path.parent.mkdir(parents=True, exist_ok=True)
df = process_postcodes().collect()
df.write_parquet(output_path)
return output_path

View file

@ -0,0 +1,41 @@
import polars as pl
from pipeline.base import DataSource
from pipeline.config import DATA_DIR, H3_RESOLUTIONS
class PropertyPricesSource(DataSource):
"""Land Registry property prices data source."""
@property
def name(self) -> str:
return "property_prices"
def load(self) -> pl.LazyFrame:
"""Load raw property prices data."""
return pl.scan_parquet(DATA_DIR / "pp-complete.parquet")
def process(self, postcodes: pl.LazyFrame) -> pl.LazyFrame:
"""Process and join with postcode coordinates and H3 indices."""
prices = self.load().select(
pl.col("price"),
pl.col("date_of_transfer").dt.year().alias("year"),
pl.col("property_type"),
pl.col("postcode"),
)
joined = prices.join(
postcodes,
on="postcode",
how="inner",
)
h3_cols = [pl.col(f"h3_res{res}") for res in H3_RESOLUTIONS]
return joined.select(
pl.col("price"),
pl.col("year"),
pl.col("property_type"),
pl.col("lat"),
pl.col("long"),
*h3_cols,
)