diff --git a/pipeline/config.py b/pipeline/config.py index 116a078..ca7ca24 100644 --- a/pipeline/config.py +++ b/pipeline/config.py @@ -10,13 +10,3 @@ AGGREGATES_DIR = PROCESSED_DIR / "aggregates" # https://h3geo.org/docs/core-library/restable/#average-area-in-m2 H3_RESOLUTIONS = [7, 8, 9, 10, 11] DEFAULT_H3_RESOLUTION = 8 - -# Year filters -MIN_YEAR = 1995 -MAX_YEAR = 2024 -DEFAULT_MIN_YEAR = 2020 -DEFAULT_MAX_YEAR = 2024 - -# Price filters -DEFAULT_MIN_PRICE = 0 -DEFAULT_MAX_PRICE = 100_000_000 diff --git a/pipeline/epc_pp.py b/pipeline/epc_pp.py index 1c50ec2..9d7d480 100644 --- a/pipeline/epc_pp.py +++ b/pipeline/epc_pp.py @@ -18,7 +18,7 @@ epc = pl.scan_csv('data_sources/epc/certificates.csv').select( 'NUMBER_HABITABLE_ROOMS', 'FLOOR_HEIGHT', 'CONSTRUCTION_AGE_BAND' -).sort('INSPECTION_DATE', descending=True).group_by('epc_address').first() +).filter(pl.col('epc_address').is_not_null()).sort('INSPECTION_DATE', descending=True).group_by('epc_address', 'POSTCODE').first() print("EPC dataset") @@ -39,7 +39,8 @@ price_paid = (pl.scan_parquet('data_sources/pp-complete.parquet').select( 'locality', 'town_city', pl.col('duration').replace(duration_map) -).filter(pl.col('pp_property_type') != 'Other').with_columns( +) +.filter(pl.col('pp_property_type') != 'Other').with_columns( pl.concat_str( [pl.col('saon'), pl.col('paon'), pl.col('street')], separator=' ', @@ -58,30 +59,27 @@ price_paid = (pl.scan_parquet('data_sources/pp-complete.parquet').select( pl.col('price').last().alias('latest_price'), pl.col('date_of_transfer').last(), ) -) +).filter(pl.col('pp_address').is_not_null()) print("Price paid dataset") print(price_paid.head().collect()) -price_paid_df = price_paid.collect() -epc_df = epc.collect() - joined = fuzzy_join_on_postcode( - left=price_paid_df, - right=epc_df, + left=price_paid, + right=epc, left_address_col='pp_address', right_address_col='epc_address', left_postcode_col='postcode', right_postcode_col='POSTCODE', - score_threshold=80, -).drop('POSTCODE') +).drop('POSTCODE').collect() -matched_count = joined.filter(pl.col('epc_address').is_not_null() & pl.col('pp_address').is_not_null()).height -print(f"Unique properties: {price_paid_df.height}") -print(f"Matched: {matched_count} ({100 * matched_count / price_paid_df.height:.1f}%)") -print(f"Unmatched: {price_paid_df.height - matched_count}") +matched = joined.filter(pl.col('epc_address').is_not_null() & pl.col('pp_address').is_not_null()) +total = joined.height +print(f"Unique properties: {total}") +print(f"Matched: {matched.height} ({100 * matched.height / total:.1f}%)") +print(f"Unmatched: {total - matched.height}") -joined = joined.rename({col: col.lower() for col in joined.columns}) +matched = matched.rename({col: col.lower() for col in joined.columns}) -print(joined.head()) -joined.write_parquet('data_sources/processed/epc_pp.parquet') +print(matched.head()) +matched.write_parquet('data_sources/processed/epc_pp.parquet') diff --git a/pipeline/fuzzy_join.py b/pipeline/fuzzy_join.py index ed85286..f516f22 100644 --- a/pipeline/fuzzy_join.py +++ b/pipeline/fuzzy_join.py @@ -1,6 +1,9 @@ import re +import shutil +import tempfile from concurrent.futures import ProcessPoolExecutor from os import cpu_count +from pathlib import Path import polars as pl from thefuzz import fuzz @@ -9,105 +12,143 @@ from tqdm import tqdm _NUMBER_RE = re.compile(r'\d+') +def _normalize(s: pl.Expr) -> pl.Expr: + return ( + s.str.to_uppercase() + .str.replace_all(r'[,.\-]', ' ') + .str.replace_all(r'\s+', ' ') + .str.strip_chars() + ) + + def fuzzy_join_on_postcode( - left: pl.DataFrame, - right: pl.DataFrame, + left: pl.LazyFrame, + right: pl.LazyFrame, left_address_col: str, right_address_col: str, left_postcode_col: str, right_postcode_col: str, - score_threshold: int = 80, -) -> pl.DataFrame: - """Fuzzy join two DataFrames by matching addresses within postcode buckets. +) -> pl.LazyFrame: + """Fuzzy join two LazyFrames by matching addresses within postcode buckets. - Returns the left DataFrame with all right columns appended. - Unmatched rows have null right columns. + Sinks each side to a temporary parquet file so the upstream pipeline + executes only once. The matching phase collects just three narrow + columns (index, address, postcode) via projection pushdown, and the + final join reads the remaining columns lazily. + + Returns a LazyFrame with all left and right columns. Unmatched rows + have null right columns. """ - def _normalize(s: pl.Expr) -> pl.Expr: - return ( - s.str.to_uppercase() - .str.replace_all(r'[,.\-]', ' ') - .str.replace_all(r'\s+', ' ') - .str.strip_chars() + tmpdir = tempfile.mkdtemp(prefix='fuzzy_join_') + left_path = Path(tmpdir) / 'left.parquet' + right_path = Path(tmpdir) / 'right.parquet' + + try: + # Materialise each side exactly once, with a row index, to temp parquet. + left.with_row_index('_left_idx').sink_parquet(left_path) + right.with_row_index('_right_idx').sink_parquet(right_path) + + # Collect only the narrow columns needed for matching (projection pushdown). + left_match = ( + pl.scan_parquet(left_path) + .select( + '_left_idx', + _normalize(pl.col(left_address_col)).alias('_left_address'), + pl.col(left_postcode_col).str.strip_chars().str.to_uppercase().alias('_left_postcode'), + ) + .collect() ) - left = left.with_columns( - _normalize(pl.col(left_address_col)).alias('_left_address'), - pl.col(left_postcode_col).str.strip_chars().str.to_uppercase().alias('_left_postcode'), - ) - right = right.with_columns( - _normalize(pl.col(right_address_col)).alias('_right_address'), - pl.col(right_postcode_col).str.strip_chars().str.to_uppercase().alias('_right_postcode'), - ) + right_match = ( + pl.scan_parquet(right_path) + .select( + '_right_idx', + _normalize(pl.col(right_address_col)).alias('_right_address'), + pl.col(right_postcode_col).str.strip_chars().str.to_uppercase().alias('_right_postcode'), + ) + .unique(subset=['_right_address', '_right_postcode'], keep='first') + .collect() + ) - # Deduplicate right side on normalized address + postcode so that - # variant spellings of the same address don't consume multiple slots. - right = right.unique(subset=['_right_address', '_right_postcode'], keep='first') - - # Group right side by postcode for fast lookup - right_by_postcode: dict[str, list[tuple[int, str]]] = {} - for i, (postcode, address) in enumerate( - zip(right['_right_postcode'], right['_right_address']) - ): - if postcode is not None: - right_by_postcode.setdefault(postcode, []).append((i, address)) - - # Group left side by postcode - left_by_postcode: dict[str, list[tuple[int, str]]] = {} - for left_row, (postcode, address) in enumerate( - zip(left['_left_postcode'], left['_left_address']) - ): - if address is not None and postcode is not None: - left_by_postcode.setdefault(postcode, []).append((left_row, address)) - - # Build tasks for each postcode bucket - tasks = [ - (left_entries, right_by_postcode[postcode], score_threshold) - for postcode, left_entries in left_by_postcode.items() - if postcode in right_by_postcode - ] - - # Score all pairwise matches in parallel, then greedily assign from - # highest score downward so best pairs lock in first. - all_pairs: list[tuple[int, int, int]] = [] # (score, left_row, right_row) - with ProcessPoolExecutor(max_workers=cpu_count()) as executor: - for pairs in tqdm( - executor.map(_score_bucket, tasks, chunksize=64), - total=len(tasks), - desc='Fuzzy matching', + # Group right side by postcode for fast lookup + right_by_postcode: dict[str, list[tuple[int, str]]] = {} + for idx, postcode, address in zip( + right_match['_right_idx'], right_match['_right_postcode'], right_match['_right_address'] ): - all_pairs.extend(pairs) + if postcode is not None: + right_by_postcode.setdefault(postcode, []).append((idx, address)) - # Sort descending by score so best matches are assigned first - all_pairs.sort(key=lambda t: (t[0], -t[1]), reverse=True) + # Group left side by postcode + left_by_postcode: dict[str, list[tuple[int, str]]] = {} + for idx, postcode, address in zip( + left_match['_left_idx'], left_match['_left_postcode'], left_match['_left_address'] + ): + if address is not None and postcode is not None: + left_by_postcode.setdefault(postcode, []).append((idx, address)) - match_indices: list[int | None] = [None] * len(left) - matched_left: set[int] = set() - matched_right: set[int] = set() + del left_match, right_match - for score, left_row, right_row in all_pairs: - if left_row in matched_left or right_row in matched_right: - continue - match_indices[left_row] = right_row - matched_left.add(left_row) - matched_right.add(right_row) + # Build tasks for each postcode bucket + tasks = [ + (left_entries, right_by_postcode[postcode]) + for postcode, left_entries in left_by_postcode.items() + if postcode in right_by_postcode + ] - # Select right columns (excluding internal helpers) - right_cols = right.select(pl.exclude('_right_address', '_right_postcode')) - right_matched = right_cols[ - [i if i is not None else 0 for i in match_indices] - ] + # Score all pairwise matches in parallel, then greedily assign from + # highest score downward so best pairs lock in first. + all_pairs: list[tuple[int, int, int]] = [] # (score, left_idx, right_idx) + with ProcessPoolExecutor(max_workers=cpu_count()) as executor: + for pairs in tqdm( + executor.map(_score_bucket, tasks, chunksize=64), + total=len(tasks), + desc='Fuzzy matching', + ): + all_pairs.extend(pairs) - # Null out unmatched rows - mask = pl.Series('_matched', [i is not None for i in match_indices]) - right_matched = right_matched.with_columns( - pl.when(mask).then(pl.col(c)).otherwise(pl.lit(None)).alias(c) - for c in right_matched.columns - ) + del tasks, left_by_postcode, right_by_postcode - left_clean = left.select(pl.exclude('_left_address', '_left_postcode')) - return pl.concat([left_clean, right_matched], how='horizontal') + # Sort descending by score so best matches are assigned first + all_pairs.sort(key=lambda t: (t[0], -t[1]), reverse=True) + + matches: list[tuple[int, int]] = [] + matched_left: set[int] = set() + matched_right: set[int] = set() + + for _score, left_idx, right_idx in all_pairs: + if left_idx in matched_left or right_idx in matched_right: + continue + matches.append((left_idx, right_idx)) + matched_left.add(left_idx) + matched_right.add(right_idx) + + del all_pairs, matched_left, matched_right + + # Build a small mapping LazyFrame and join back to the cached parquets. + if matches: + mapping = pl.LazyFrame({ + '_left_idx': pl.Series([m[0] for m in matches], dtype=pl.UInt32), + '_right_idx': pl.Series([m[1] for m in matches], dtype=pl.UInt32), + }) + else: + mapping = pl.LazyFrame({ + '_left_idx': pl.Series([], dtype=pl.UInt32), + '_right_idx': pl.Series([], dtype=pl.UInt32), + }) + + left_cached = pl.scan_parquet(left_path) + right_cached = pl.scan_parquet(right_path) + + return ( + left_cached + .join(mapping, on='_left_idx', how='left') + .join(right_cached, on='_right_idx', how='left') + .drop('_left_idx', '_right_idx') + ) + except BaseException: + shutil.rmtree(tmpdir, ignore_errors=True) + raise def _numbers_compatible(a: str, b: str) -> bool: @@ -127,13 +168,12 @@ def _score_bucket( args: tuple[list[tuple[int, str]], list[tuple[int, str]], int], ) -> list[tuple[int, int, int]]: """Score all address pairs within a single postcode bucket.""" - left_entries, right_entries, score_threshold = args + left_entries, right_entries = args pairs = [] for left_row, left_address in left_entries: for right_row, right_address in right_entries: if not _numbers_compatible(left_address, right_address): continue score = fuzz.token_sort_ratio(left_address, right_address) - if score >= score_threshold: - pairs.append((score, left_row, right_row)) + pairs.append((score, left_row, right_row)) return pairs diff --git a/pipeline/processors/__init__.py b/pipeline/processors/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pipeline/processors/h3_aggregator.py b/pipeline/processors/h3_aggregator.py deleted file mode 100644 index 94653e9..0000000 --- a/pipeline/processors/h3_aggregator.py +++ /dev/null @@ -1,42 +0,0 @@ -from pathlib import Path -import polars as pl - -from pipeline.config import AGGREGATES_DIR, H3_RESOLUTIONS - - -def aggregate(df: pl.LazyFrame, resolution: int) -> pl.LazyFrame: - """Aggregate property data by H3 cell and year.""" - h3_col = f"h3_res{resolution}" - - return ( - df.group_by(h3_col, "year") - .agg( - pl.len().alias("count"), - pl.col("price").mean().alias("avg_price"), - pl.col("price").median().alias("median_price"), - pl.col("price").min().alias("min_price"), - pl.col("price").max().alias("max_price"), - ) - .rename({h3_col: "h3"}) - ) - - -def aggregate_all(df: pl.LazyFrame) -> dict[int, pl.LazyFrame]: - """Aggregate at all H3 resolutions.""" - return {res: aggregate(df, res) for res in H3_RESOLUTIONS} - - -def save_aggregates(df: pl.LazyFrame, output_dir: Path | None = None) -> list[Path]: - """Aggregate and save at all H3 resolutions.""" - output_dir = output_dir or AGGREGATES_DIR - output_dir.mkdir(parents=True, exist_ok=True) - - saved_paths = [] - aggregates = aggregate_all(df) - - for res, agg_df in aggregates.items(): - output_path = output_dir / f"res{res}.parquet" - agg_df.collect().write_parquet(output_path) - saved_paths.append(output_path) - - return saved_paths diff --git a/pipeline/processors/journey_times_aggregator.py b/pipeline/processors/journey_times_aggregator.py deleted file mode 100644 index 5f6dfb9..0000000 --- a/pipeline/processors/journey_times_aggregator.py +++ /dev/null @@ -1,127 +0,0 @@ -"""Aggregate journey times data by H3 hexagonal cells.""" - -from pathlib import Path - -import polars as pl - -from pipeline.config import AGGREGATES_DIR, H3_RESOLUTIONS, PROCESSED_DIR - -JOURNEY_COLS = [ - "public_transport_easy_minutes", - "public_transport_quick_minutes", - "cycling_minutes", -] - -AGGREGATE_COLS = [ - "median_pt_easy_minutes", - "median_pt_quick_minutes", - "median_cycling_minutes", - "median_journey_minutes", -] - - -def aggregate_journey_times( - journey_times_path: Path | None = None, - postcodes_h3_path: Path | None = None, - aggregates_dir: Path | None = None, -) -> list[Path]: - """ - Add journey times to existing H3 aggregate parquet files. - - Joins journey_times_bank_checkpoint.parquet with postcodes_h3.parquet on postcode, - aggregates by H3 cell, then merges into existing res{N}.parquet files. - """ - journey_times_path = ( - journey_times_path - or PROCESSED_DIR / "journey_times_bank_checkpoint.parquet" - ) - postcodes_h3_path = postcodes_h3_path or PROCESSED_DIR / "postcodes_h3.parquet" - aggregates_dir = aggregates_dir or AGGREGATES_DIR - - # Load journey times data - journey_df = pl.read_parquet(journey_times_path).select( - ["postcode"] + JOURNEY_COLS - ) - - # Filter out rows where all journey time columns are null - journey_df = journey_df.filter( - pl.any_horizontal(pl.col(c).is_not_null() for c in JOURNEY_COLS) - ) - - if journey_df.height == 0: - print("No valid journey times found") - return [] - - # Load postcodes with H3 indices - postcodes_df = pl.read_parquet(postcodes_h3_path) - - # Join on postcode to get H3 indices - joined_df = journey_df.join(postcodes_df, on="postcode", how="inner") - - if joined_df.height == 0: - print("No matching postcodes found") - return [] - - print(f"Joined {joined_df.height} postcodes with journey times") - - updated_paths = [] - - for resolution in H3_RESOLUTIONS: - h3_col = f"h3_res{resolution}" - parquet_path = aggregates_dir / f"res{resolution}.parquet" - - if not parquet_path.exists(): - print(f"Skipping resolution {resolution} - {parquet_path} not found") - continue - - if h3_col not in joined_df.columns: - print(f"Skipping resolution {resolution} - column {h3_col} not found") - continue - - # Aggregate journey times by H3 cell - journey_agg = ( - joined_df.group_by(h3_col) - .agg( - pl.col("public_transport_easy_minutes") - .median() - .alias("median_pt_easy_minutes"), - pl.col("public_transport_quick_minutes") - .median() - .alias("median_pt_quick_minutes"), - pl.col("cycling_minutes") - .median() - .alias("median_cycling_minutes"), - pl.col("public_transport_quick_minutes") - .median() - .alias("median_journey_minutes"), - ) - .rename({h3_col: "h3"}) - ) - - # Load existing parquet - existing_df = pl.read_parquet(parquet_path) - - # Drop existing journey time columns if present - existing_df = existing_df.drop( - [c for c in AGGREGATE_COLS if c in existing_df.columns] - ) - - # Left join journey times onto existing data - updated_df = existing_df.join(journey_agg, on="h3", how="left") - - # Save back to parquet - updated_df.write_parquet(parquet_path) - updated_paths.append(parquet_path) - matched = updated_df.filter( - pl.col("median_journey_minutes").is_not_null() - ).height - print( - f"Updated {parquet_path.name}: {matched} rows with journey times " - f"(out of {updated_df.height} total)" - ) - - return updated_paths - - -if __name__ == "__main__": - aggregate_journey_times() diff --git a/pipeline/run.py b/pipeline/run.py index 630a892..6a03ed6 100644 --- a/pipeline/run.py +++ b/pipeline/run.py @@ -1,45 +1,6 @@ """Pipeline CLI to process property data with H3 spatial indexing.""" -import polars as pl - -from pipeline.sources.postcodes import save_postcodes -from pipeline.sources.property_prices import PropertyPricesSource -from pipeline.processors.h3_aggregator import save_aggregates -from pipeline.processors.journey_times_aggregator import aggregate_journey_times - - -def run_pipeline(): - """Run the full data processing pipeline.""" - print("=" * 60) - print("Property Map Data Pipeline") - print("=" * 60) - - # Step 1: Process postcodes with H3 indices - print("\n[1/4] Processing postcodes with H3 indices...") - postcodes_path = save_postcodes() - print(f" Saved: {postcodes_path}") - - print("\n[2/4] Processing property prices...") - postcodes = pl.scan_parquet(postcodes_path) - property_source = PropertyPricesSource() - properties = property_source.process(postcodes) - print(" Joined property prices with postcodes") - - print("\n[3/4] Aggregating at H3 resolutions...") - saved_paths = save_aggregates(properties) - for path in saved_paths: - size_mb = path.stat().st_size / (1024 * 1024) - print(f" Saved: {path.name} ({size_mb:.1f} MB)") - - print("\n[4/4] Adding journey times to aggregates...") - updated_paths = aggregate_journey_times() - if updated_paths: - for path in updated_paths: - size_mb = path.stat().st_size / (1024 * 1024) - print(f" Updated: {path.name} ({size_mb:.1f} MB)") - else: - print(" Skipped (no journey time data found)") - +from pipeline.wide import run if __name__ == "__main__": - run_pipeline() + run() diff --git a/pipeline/sources/__init__.py b/pipeline/sources/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pipeline/sources/postcodes.py b/pipeline/sources/postcodes.py deleted file mode 100644 index d0105b5..0000000 --- a/pipeline/sources/postcodes.py +++ /dev/null @@ -1,49 +0,0 @@ -from pathlib import Path -import polars as pl -import h3 - -from pipeline.config import DATA_DIR, H3_RESOLUTIONS, PROCESSED_DIR - - -def lat_long_to_h3(lat: float, long: float, resolution: int) -> str: - """Convert lat/long to H3 index at given resolution.""" - return h3.latlng_to_cell(lat, long, resolution) - - -def load_postcodes() -> pl.LazyFrame: - """Load postcode data from arcgis parquet file.""" - return pl.scan_parquet(DATA_DIR / "arcgis_data.parquet").select( - pl.col("pcds").alias("postcode"), - pl.col("lat"), - pl.col("long"), - ) - - -def process_postcodes() -> pl.LazyFrame: - """Process postcodes and add H3 indices at multiple resolutions.""" - df = load_postcodes().collect() - - for res in H3_RESOLUTIONS: - col_name = f"h3_res{res}" - df = df.with_columns( - pl.struct(["lat", "long"]) - .map_elements( - # Capture res by value using default argument to avoid closure bug - lambda x, res=res: lat_long_to_h3(x["lat"], x["long"], res), - return_dtype=pl.Utf8, - ) - .alias(col_name) - ) - - return df.lazy() - - -def save_postcodes(output_path: Path | None = None) -> Path: - """Process and save postcodes with H3 indices.""" - output_path = output_path or PROCESSED_DIR / "postcodes_h3.parquet" - output_path.parent.mkdir(parents=True, exist_ok=True) - - df = process_postcodes().collect() - df.write_parquet(output_path) - - return output_path diff --git a/pipeline/sources/property_prices.py b/pipeline/sources/property_prices.py deleted file mode 100644 index 4d61b0c..0000000 --- a/pipeline/sources/property_prices.py +++ /dev/null @@ -1,41 +0,0 @@ -import polars as pl - -from pipeline.base import DataSource -from pipeline.config import DATA_DIR, H3_RESOLUTIONS - - -class PropertyPricesSource(DataSource): - """Land Registry property prices data source.""" - - @property - def name(self) -> str: - return "property_prices" - - def load(self) -> pl.LazyFrame: - """Load raw property prices data.""" - return pl.scan_parquet(DATA_DIR / "pp-complete.parquet") - - def process(self, postcodes: pl.LazyFrame) -> pl.LazyFrame: - """Process and join with postcode coordinates and H3 indices.""" - prices = self.load().select( - pl.col("price"), - pl.col("date_of_transfer").dt.year().alias("year"), - pl.col("property_type"), - pl.col("postcode"), - ) - - joined = prices.join( - postcodes, - on="postcode", - how="inner", - ) - - h3_cols = [pl.col(f"h3_res{res}") for res in H3_RESOLUTIONS] - return joined.select( - pl.col("price"), - pl.col("year"), - pl.col("property_type"), - pl.col("lat"), - pl.col("long"), - *h3_cols, - ) diff --git a/pipeline/test_fuzzy_join.py b/pipeline/test_fuzzy_join.py index 7197009..7a73a73 100644 --- a/pipeline/test_fuzzy_join.py +++ b/pipeline/test_fuzzy_join.py @@ -9,7 +9,6 @@ pp = ( pl.scan_parquet("data_sources/pp-complete.parquet") .filter(pl.col("postcode") == POSTCODE) .select("paon", "saon", "street", "postcode") - .collect() .unique() .sort("saon") .with_columns( @@ -27,14 +26,10 @@ epc = ( .select("ADDRESS", "POSTCODE", "INSPECTION_DATE") .filter(pl.col("POSTCODE").str.strip_chars() == POSTCODE) .sort("INSPECTION_DATE", descending=True) - .collect() .unique("ADDRESS") .sort("ADDRESS") ) -print(f"Price paid: {len(pp)} unique addresses") -print(f"EPC: {len(epc)} unique addresses") - result = fuzzy_join_on_postcode( left=pp, right=epc, @@ -42,9 +37,7 @@ result = fuzzy_join_on_postcode( right_address_col="ADDRESS", left_postcode_col="postcode", right_postcode_col="POSTCODE", - score_threshold=80, - -) +).collect() snapshot = result.select("pp_address", "ADDRESS").sort("pp_address") diff --git a/pipeline/wide.py b/pipeline/wide.py new file mode 100644 index 0000000..3e170d9 --- /dev/null +++ b/pipeline/wide.py @@ -0,0 +1,143 @@ +"""Build a wide property dataframe and H3 aggregates from epc_pp output.""" + +import polars as pl +import h3 + +from pipeline.config import AGGREGATES_DIR, H3_RESOLUTIONS, DATA_DIR, PROCESSED_DIR + + +def _build_wide() -> pl.DataFrame: + """Build the wide dataframe by joining epc_pp with all auxiliary data.""" + print("Loading epc_pp...") + wide = pl.read_parquet(PROCESSED_DIR / "epc_pp.parquet") + print(f" {wide.shape[0]:,} rows") + + # GPS coordinates + LSOA from ArcGIS + print("Joining GPS coordinates...") + arcgis = pl.read_parquet(DATA_DIR / "arcgis_data.parquet").select( + pl.col("pcds").alias("postcode"), + "lat", + pl.col("long").alias("lon"), + "lsoa21", + ) + wide = wide.join(arcgis, on="postcode", how="inner") + print(f" {wide.shape[0]:,} rows after GPS join") + + # Journey times (optional) + journey_path = PROCESSED_DIR / "journey_times_bank_checkpoint.parquet" + if journey_path.exists(): + print("Joining journey times...") + journey_times = pl.read_parquet(journey_path).select( + "postcode", + "public_transport_easy_minutes", + "public_transport_quick_minutes", + "cycling_minutes", + ) + wide = wide.join(journey_times, on="postcode", how="left") + + # Index of Deprivation + iod_path = DATA_DIR / "IoD2025_Scores.parquet" + if iod_path.exists(): + print("Joining IoD scores...") + iod = pl.read_parquet(iod_path).drop( + "LSOA name (2021)", + "Local Authority District code (2024)", + "Local Authority District name (2024)", + ) + # Rename IoD columns to clean snake_case + iod = iod.rename(_IOD_RENAMES) + wide = wide.join( + iod, left_on="lsoa21", right_on="lsoa_code", how="left" + ) + + return wide + + +_IOD_RENAMES = { + "LSOA code (2021)": "lsoa_code", + "Index of Multiple Deprivation (IMD) Score": "imd_score", + "Income Score (rate)": "income_score", + "Employment Score (rate)": "employment_score", + "Education, Skills and Training Score": "education_score", + "Health Deprivation and Disability Score": "health_score", + "Crime Score": "crime_score", + "Barriers to Housing and Services Score": "housing_barriers_score", + "Living Environment Score": "living_environment_score", + "Income Deprivation Affecting Children Index (IDACI) Score (rate)": "idaci_score", + "Income Deprivation Affecting Older People (IDAOPI) Score (rate)": "idaopi_score", + "Children and Young People Sub-domain Score": "children_young_people_score", + "Adult Skills Sub-domain Score": "adult_skills_score", + "Geographical Barriers Sub-domain Score": "geographical_barriers_score", + "Wider Barriers Sub-domain Score": "wider_barriers_score", + "Indoors Sub-domain Score": "indoors_score", + "Outdoors Sub-domain Score": "outdoors_score", +} + + +def _add_h3_indices(df: pl.DataFrame) -> pl.DataFrame: + """Compute H3 indices from lat/lon for all configured resolutions.""" + print("Computing H3 indices...") + # Compute per unique postcode for efficiency, then join back + postcodes = df.select("postcode", "lat", "lon").unique(subset=["postcode"]) + + for res in H3_RESOLUTIONS: + col_name = f"h3_res{res}" + postcodes = postcodes.with_columns( + pl.struct(["lat", "lon"]) + .map_elements( + lambda x, r=res: h3.latlng_to_cell(x["lat"], x["lon"], r), + return_dtype=pl.Utf8, + ) + .alias(col_name) + ) + print(f" res{res}: {postcodes[col_name].n_unique():,} unique cells") + + h3_cols = [f"h3_res{res}" for res in H3_RESOLUTIONS] + return df.join( + postcodes.select("postcode", *h3_cols), on="postcode", how="left" + ) + + +def _aggregate_to_h3(df: pl.DataFrame) -> None: + """Aggregate min/max of every numeric feature per H3 cell at each resolution.""" + AGGREGATES_DIR.mkdir(parents=True, exist_ok=True) + + exclude = {"lat", "lon"} + numeric_cols = [ + col + for col, dtype in zip(df.columns, df.dtypes) + if dtype.is_numeric() and not col.startswith("h3_res") and col not in exclude + ] + + agg_exprs = [pl.len().alias("count")] + for col in numeric_cols: + agg_exprs.append(pl.col(col).min().alias(f"min_{col}")) + agg_exprs.append(pl.col(col).max().alias(f"max_{col}")) + + print("Aggregating to H3 cells...") + for res in H3_RESOLUTIONS: + h3_col = f"h3_res{res}" + result = df.group_by(h3_col).agg(agg_exprs).rename({h3_col: "h3"}) + path = AGGREGATES_DIR / f"res{res}.parquet" + result.write_parquet(path) + size_mb = path.stat().st_size / (1024 * 1024) + print(f" {path.name}: {result.shape[0]:,} cells ({size_mb:.1f} MB)") + + +def run(): + """Run the full wide pipeline: build wide df, compute H3, aggregate.""" + wide = _build_wide() + + wide_path = PROCESSED_DIR / "wide.parquet" + wide.write_parquet(wide_path) + size_mb = wide_path.stat().st_size / (1024 * 1024) + print(f"Wrote {wide_path} ({size_mb:.1f} MB)") + + wide = _add_h3_indices(wide) + _aggregate_to_h3(wide) + + print("Done.") + + +if __name__ == "__main__": + run() diff --git a/pyproject.toml b/pyproject.toml index 9a1cd5c..a0e6a9f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,10 +24,15 @@ dependencies = [ "fastexcel>=0.19.0", "osmium>=4.0.0", "matplotlib>=3.10.8", + "thefuzz>=0.22.1", + "python-levenshtein>=0.27.3", ] [tool.uv] environments = ["sys_platform == 'linux' and python_version < '3.14'"] [dependency-groups] -dev = ["ruff>=0.8.0"] +dev = [ + "pytest>=9.0.2", + "ruff>=0.8.0", +]