import polars as pl from pipeline.base import DataSource from pipeline.config import DATA_DIR, H3_RESOLUTIONS class PropertyPricesSource(DataSource): """Land Registry property prices data source.""" @property def name(self) -> str: return "property_prices" def load(self) -> pl.LazyFrame: """Load raw property prices data.""" return pl.scan_parquet(DATA_DIR / "pp-complete.parquet") def process(self, postcodes: pl.LazyFrame) -> pl.LazyFrame: """Process and join with postcode coordinates and H3 indices.""" prices = self.load().select( pl.col("price"), pl.col("date_of_transfer").dt.year().alias("year"), pl.col("property_type"), pl.col("postcode"), ) joined = prices.join( postcodes, on="postcode", how="inner", ) h3_cols = [pl.col(f"h3_res{res}") for res in H3_RESOLUTIONS] return joined.select( pl.col("price"), pl.col("year"), pl.col("property_type"), pl.col("lat"), pl.col("long"), *h3_cols, )