From 4c258018c319cbcc6055791dbb37569a4ea802c1 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sat, 31 Jan 2026 13:07:09 +0000 Subject: [PATCH] Format python --- Taskfile.yml | 54 ++++------ pipeline/download/arcgis.py | 12 ++- pipeline/download/deprivation_data.py | 8 +- pipeline/download/pois.py | 22 ++-- pipeline/download/price_paid.py | 8 +- pipeline/journey_times/config.py | 4 - pipeline/journey_times/tfl_client.py | 4 +- pipeline/transform/join_epc_pp.py | 139 ++++++++++++++++---------- pipeline/transform/merge.py | 100 +++++++++++------- pipeline/transform/transform_poi.py | 16 +-- pipeline/utils/__init__.py | 8 +- pipeline/utils/fuzzy_join.py | 77 ++++++++------ pipeline/utils/haversine.py | 13 ++- pipeline/utils/poi_counts.py | 31 +++--- pipeline/utils/test_fuzzy_join.py | 2 +- pipeline/utils/test_haversine.py | 30 ++++-- pipeline/utils/test_poi_counts.py | 68 +++++++------ 17 files changed, 348 insertions(+), 248 deletions(-) diff --git a/Taskfile.yml b/Taskfile.yml index ceb4a47..b6334c6 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -21,47 +21,43 @@ tasks: - cd frontend && npm install download:arcgis: + internal: true desc: Download and convert ArcGIS postcode data - sources: - - pipeline/download/arcgis.py generates: - "{{.ARCGIS_OUTPUT}}" cmds: - uv run python -m pipeline.download.arcgis --output {{.ARCGIS_OUTPUT}} download:price-paid: + internal: true desc: Download and convert Land Registry price-paid data - sources: - - pipeline/download/price_paid.py generates: - "{{.PRICE_PAID_OUTPUT}}" cmds: - uv run python -m pipeline.download.price_paid --output {{.PRICE_PAID_OUTPUT}} download:deprivation: + internal: true desc: Download and convert Index of Deprivation data - sources: - - pipeline/download/deprivation_data.py generates: - "{{.IOD_OUTPUT}}" cmds: - uv run python -m pipeline.download.deprivation_data --output {{.IOD_OUTPUT}} download:pois: + internal: true desc: Download and extract POIs from OpenStreetMap - sources: - - pipeline/download/pois.py generates: - "{{.POIS_RAW_OUTPUT}}" cmds: - uv run python -m pipeline.download.pois --output {{.POIS_RAW_OUTPUT}} transform:pois: + internal: true desc: Transform raw POIs to filtered version with friendly names deps: - download:pois sources: - - pipeline/transform/transform_poi.py - "{{.POIS_RAW_OUTPUT}}" generates: - "{{.POIS_FILTERED_OUTPUT}}" @@ -69,12 +65,11 @@ tasks: - uv run python -m pipeline.transform.transform_poi --input {{.POIS_RAW_OUTPUT}} --output {{.POIS_FILTERED_OUTPUT}} transform:epc-pp: + internal: true desc: Fuzzy join EPC and Price Paid data deps: - download:price-paid sources: - - pipeline/transform/join_epc_pp.py - - pipeline/utils/fuzzy_join.py - "{{.PRICE_PAID_OUTPUT}}" - "{{.EPC_CSV}}" generates: @@ -83,13 +78,12 @@ tasks: - uv run python -m pipeline.transform.join_epc_pp --epc {{.EPC_CSV}} --price-paid {{.PRICE_PAID_OUTPUT}} --output {{.EPC_PP_OUTPUT}} transform:poi-proximity: + internal: true desc: Compute POI proximity counts per postcode deps: - download:arcgis - transform:pois sources: - - pipeline/transform/poi_proximity.py - - pipeline/utils/poi_counts.py - "{{.ARCGIS_OUTPUT}}" - "{{.POIS_FILTERED_OUTPUT}}" generates: @@ -97,7 +91,7 @@ tasks: cmds: - uv run python -m pipeline.transform.poi_proximity --arcgis {{.ARCGIS_OUTPUT}} --pois {{.POIS_FILTERED_OUTPUT}} --output {{.POI_PROXIMITY_OUTPUT}} - transform:wide: + prepare: desc: Build wide property dataframe with all joins deps: - join:epc-pp @@ -105,7 +99,6 @@ tasks: - download:deprivation - transform:poi-proximity sources: - - pipeline/transform/merge.py - "{{.EPC_PP_OUTPUT}}" - "{{.ARCGIS_OUTPUT}}" - "{{.IOD_OUTPUT}}" @@ -115,36 +108,37 @@ tasks: cmds: - uv run python -m pipeline.transform.merge --epc-pp {{.EPC_PP_OUTPUT}} --arcgis {{.ARCGIS_OUTPUT}} --iod {{.IOD_OUTPUT}} --poi-proximity {{.POI_PROXIMITY_OUTPUT}} --journey-times {{.JOURNEY_TIMES}} --output {{.WIDE_OUTPUT}} - prepare: - desc: Prepare the application (install, download data, run pipeline) - deps: - - transform:wide - test: cmds: - uv run -m pipeline.utils.test_fuzzy_join - uv run pytest pipeline/utils/test_haversine.py - uv run pytest pipeline/utils/test_poi_counts.py - server: + dev:server: desc: Run Rust backend on port 8001 dir: server-rs cmds: - cargo run --release -- {{.WIDE_OUTPUT}} - frontend: + dev:frontend: desc: Run frontend dev server on port 3030 (proxies /api to :8001) dir: frontend cmds: - npm run dev - build: + build:server: + desc: Build server for production + dir: frontend + cmds: + - cargo build --release + + build:frontend: desc: Build frontend for production dir: frontend cmds: + - npm run typecheck - npm run build - lint: desc: Lint all code (Python, TypeScript, and Rust) cmds: @@ -195,17 +189,13 @@ tasks: desc: Format Rust code with cargo fmt dir: server-rs cmds: - - cargo fmt + - cargo fmt --all check: desc: Run all checks (lint, typecheck, build) cmds: - task: lint - - task: typecheck - - task: build + - task: build:server + - task: build:frontend + - task: test - typecheck: - desc: Type check frontend TypeScript code - dir: frontend - cmds: - - npm run typecheck diff --git a/pipeline/download/arcgis.py b/pipeline/download/arcgis.py index 1277fc6..de0fa0c 100644 --- a/pipeline/download/arcgis.py +++ b/pipeline/download/arcgis.py @@ -35,7 +35,6 @@ def download_with_progress(url: str, output_path: Path) -> None: return - def extract_zip(zip_path: Path, extract_path: Path) -> None: extract_path.mkdir(exist_ok=True) @@ -44,7 +43,7 @@ def extract_zip(zip_path: Path, extract_path: Path) -> None: def convert_to_parquet(data_path: Path, parquet_path: Path) -> None: - df = pl.scan_csv(data_path / 'Data/NSPL_MAY_2025_UK.csv', try_parse_dates=True) + df = pl.scan_csv(data_path / "Data/NSPL_MAY_2025_UK.csv", try_parse_dates=True) print(f"Columns: {df.collect_schema().names()}") parquet_path.parent.mkdir(parents=True, exist_ok=True) df.sink_parquet(parquet_path, compression="zstd") @@ -52,8 +51,12 @@ def convert_to_parquet(data_path: Path, parquet_path: Path) -> None: def main() -> None: - parser = argparse.ArgumentParser(description="Download and convert ArcGIS postcode data") - parser.add_argument("--output", type=Path, required=True, help="Output parquet file path") + parser = argparse.ArgumentParser( + description="Download and convert ArcGIS postcode data" + ) + parser.add_argument( + "--output", type=Path, required=True, help="Output parquet file path" + ) args = parser.parse_args() with tempfile.TemporaryDirectory() as cache_dir: @@ -64,5 +67,6 @@ def main() -> None: extract_zip(download_path, extract_path) convert_to_parquet(extract_path, args.output) + if __name__ == "__main__": main() diff --git a/pipeline/download/deprivation_data.py b/pipeline/download/deprivation_data.py index e73cb96..db75969 100644 --- a/pipeline/download/deprivation_data.py +++ b/pipeline/download/deprivation_data.py @@ -41,8 +41,12 @@ def convert_to_parquet(xlsx_path: Path, parquet_path: Path) -> None: def main() -> None: - parser = argparse.ArgumentParser(description="Download and convert Index of Deprivation data") - parser.add_argument("--output", type=Path, required=True, help="Output parquet file path") + parser = argparse.ArgumentParser( + description="Download and convert Index of Deprivation data" + ) + parser.add_argument( + "--output", type=Path, required=True, help="Output parquet file path" + ) args = parser.parse_args() with tempfile.TemporaryDirectory() as cache_dir: diff --git a/pipeline/download/pois.py b/pipeline/download/pois.py index 2694fc2..cd7345d 100644 --- a/pipeline/download/pois.py +++ b/pipeline/download/pois.py @@ -8,16 +8,12 @@ import osmium import polars as pl from tqdm import tqdm -from pathlib import Path - BATCH_SIZE = 50_000 MIN_OCCURENCE_COUNT = 20 -GEOFABRIK_GB_URL = ( - "https://download.geofabrik.de/europe/great-britain-latest.osm.pbf" -) +GEOFABRIK_GB_URL = "https://download.geofabrik.de/europe/great-britain-latest.osm.pbf" UK_BBOX_WEST = -7.57 UK_BBOX_SOUTH = 49.96 @@ -38,7 +34,6 @@ POI_TAG_KEYS: list[str] = [ ] - def download_pbf(pbf_file: Path) -> None: pbf_file.parent.mkdir(parents=True, exist_ok=True) tmp = pbf_file.with_suffix(".pbf.tmp") @@ -91,7 +86,12 @@ class POIHandler(osmium.SimpleHandler): self._batch.clear() def _add_poi( - self, osm_id: str, tags: osmium.osm.TagList, category: str, lat: float, lng: float + self, + osm_id: str, + tags: osmium.osm.TagList, + category: str, + lat: float, + lng: float, ) -> None: self._batch.append( { @@ -123,8 +123,12 @@ class POIHandler(osmium.SimpleHandler): def main() -> None: - parser = argparse.ArgumentParser(description="Download and extract POIs from OpenStreetMap") - parser.add_argument("--output", type=Path, required=True, help="Output parquet file path") + parser = argparse.ArgumentParser( + description="Download and extract POIs from OpenStreetMap" + ) + parser.add_argument( + "--output", type=Path, required=True, help="Output parquet file path" + ) args = parser.parse_args() with tempfile.TemporaryDirectory() as cache_dir: diff --git a/pipeline/download/price_paid.py b/pipeline/download/price_paid.py index fac4ec1..56ebc4e 100644 --- a/pipeline/download/price_paid.py +++ b/pipeline/download/price_paid.py @@ -73,8 +73,12 @@ def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None: def main() -> None: - parser = argparse.ArgumentParser(description="Download and convert Land Registry price-paid data") - parser.add_argument("--output", type=Path, required=True, help="Output parquet file path") + parser = argparse.ArgumentParser( + description="Download and convert Land Registry price-paid data" + ) + parser.add_argument( + "--output", type=Path, required=True, help="Output parquet file path" + ) args = parser.parse_args() with tempfile.TemporaryDirectory() as cache_dir: diff --git a/pipeline/journey_times/config.py b/pipeline/journey_times/config.py index f2401cc..f86a422 100644 --- a/pipeline/journey_times/config.py +++ b/pipeline/journey_times/config.py @@ -1,11 +1,7 @@ """Configuration constants for journey times processing.""" -from pathlib import Path - from .models import Destination -DATA_DIR = Path("./data_sources") -OUTPUT_DIR = DATA_DIR / "processed" MAX_DELAY = 10 REQUESTS_PER_MIN = 500 diff --git a/pipeline/journey_times/tfl_client.py b/pipeline/journey_times/tfl_client.py index f214f7c..33fc794 100644 --- a/pipeline/journey_times/tfl_client.py +++ b/pipeline/journey_times/tfl_client.py @@ -99,9 +99,7 @@ async def fetch_journey_for_mode( journeys = data.get("journeys", []) if journeys: durations = [ - j["duration"] - for j in journeys - if j.get("duration") is not None + j["duration"] for j in journeys if j.get("duration") is not None ] if durations: return min(durations) diff --git a/pipeline/transform/join_epc_pp.py b/pipeline/transform/join_epc_pp.py index 8717dc4..741848a 100644 --- a/pipeline/transform/join_epc_pp.py +++ b/pipeline/transform/join_epc_pp.py @@ -9,79 +9,108 @@ pl.Config.set_tbl_cols(-1) def main(): parser = argparse.ArgumentParser(description="Fuzzy join EPC and Price Paid data") - parser.add_argument("--epc", type=Path, required=True, help="EPC certificates CSV file") - parser.add_argument("--price-paid", type=Path, required=True, help="Price paid parquet file") - parser.add_argument("--output", type=Path, required=True, help="Output parquet file path") + parser.add_argument( + "--epc", type=Path, required=True, help="EPC certificates CSV file" + ) + parser.add_argument( + "--price-paid", type=Path, required=True, help="Price paid parquet file" + ) + parser.add_argument( + "--output", type=Path, required=True, help="Output parquet file path" + ) args = parser.parse_args() - epc = pl.scan_csv(args.epc).select( - pl.col('ADDRESS').alias('epc_address'), - 'POSTCODE', - 'CURRENT_ENERGY_RATING', - 'POTENTIAL_ENERGY_RATING', - pl.col('PROPERTY_TYPE').alias('epc_property_type'), - 'BUILT_FORM', - 'INSPECTION_DATE', - 'TOTAL_FLOOR_AREA', - 'NUMBER_HABITABLE_ROOMS', - 'FLOOR_HEIGHT', - 'CONSTRUCTION_AGE_BAND' - ).filter(pl.col('epc_address').is_not_null()).sort('INSPECTION_DATE', descending=True).group_by('epc_address', 'POSTCODE').first() - + epc = ( + pl.scan_csv(args.epc) + .select( + pl.col("ADDRESS").alias("epc_address"), + "POSTCODE", + "CURRENT_ENERGY_RATING", + "POTENTIAL_ENERGY_RATING", + pl.col("PROPERTY_TYPE").alias("epc_property_type"), + "BUILT_FORM", + "INSPECTION_DATE", + "TOTAL_FLOOR_AREA", + "NUMBER_HABITABLE_ROOMS", + "FLOOR_HEIGHT", + "CONSTRUCTION_AGE_BAND", + ) + .filter(pl.col("epc_address").is_not_null()) + .sort("INSPECTION_DATE", descending=True) + .group_by("epc_address", "POSTCODE") + .first() + ) print("EPC dataset") print(epc.head().collect()) # https://www.gov.uk/guidance/about-the-price-paid-data - property_type_map = {"D": "Detached", "S": "Semi-Detached", "T": "Terraced", "F": "Flats/Maisonettes", "O": "Other"} + property_type_map = { + "D": "Detached", + "S": "Semi-Detached", + "T": "Terraced", + "F": "Flats/Maisonettes", + "O": "Other", + } duration_map = {"F": "Freehold", "L": "Leasehold"} - price_paid = (pl.scan_parquet(args.price_paid).select( - "price", - "date_of_transfer", - pl.col('property_type').alias("pp_property_type").replace(property_type_map), - "postcode", - 'paon', - 'saon', - 'street', - 'locality', - 'town_city', - pl.col('duration').replace(duration_map) - ) - .filter(pl.col('pp_property_type') != 'Other').with_columns( - pl.concat_str( - [pl.col('saon'), pl.col('paon'), pl.col('street')], - separator=' ', - ignore_nulls=True, - ).alias('pp_address'), + price_paid = ( + pl.scan_parquet(args.price_paid) + .select( + "price", + "date_of_transfer", + pl.col("property_type") + .alias("pp_property_type") + .replace(property_type_map), + "postcode", + "paon", + "saon", + "street", + "locality", + "town_city", + pl.col("duration").replace(duration_map), ) - .sort('date_of_transfer') - .group_by('pp_address', 'postcode', maintain_order=True) + .filter(pl.col("pp_property_type") != "Other") + .with_columns( + pl.concat_str( + [pl.col("saon"), pl.col("paon"), pl.col("street")], + separator=" ", + ignore_nulls=True, + ).alias("pp_address"), + ) + .sort("date_of_transfer") + .group_by("pp_address", "postcode", maintain_order=True) .agg( pl.struct( - pl.col('date_of_transfer').dt.year().alias('year'), - 'price', - ).alias('historical_prices'), - pl.col('pp_property_type').last(), - pl.col('duration').last(), - pl.col('price').last().alias('latest_price'), - pl.col('date_of_transfer').last(), + pl.col("date_of_transfer").dt.year().alias("year"), + "price", + ).alias("historical_prices"), + pl.col("pp_property_type").last(), + pl.col("duration").last(), + pl.col("price").last().alias("latest_price"), + pl.col("date_of_transfer").last(), ) - ).filter(pl.col('pp_address').is_not_null()) + ).filter(pl.col("pp_address").is_not_null()) print("Price paid dataset") print(price_paid.head().collect()) - joined = fuzzy_join_on_postcode( - left=price_paid, - right=epc, - left_address_col='pp_address', - right_address_col='epc_address', - left_postcode_col='postcode', - right_postcode_col='POSTCODE', - ).drop('POSTCODE').collect() + joined = ( + fuzzy_join_on_postcode( + left=price_paid, + right=epc, + left_address_col="pp_address", + right_address_col="epc_address", + left_postcode_col="postcode", + right_postcode_col="POSTCODE", + ) + .drop("POSTCODE") + .collect() + ) - matched = joined.filter(pl.col('epc_address').is_not_null() & pl.col('pp_address').is_not_null()) + matched = joined.filter( + pl.col("epc_address").is_not_null() & pl.col("pp_address").is_not_null() + ) total = joined.height print(f"Unique properties: {total}") print(f"Matched: {matched.height} ({100 * matched.height / total:.1f}%)") diff --git a/pipeline/transform/merge.py b/pipeline/transform/merge.py index 1ea4459..26b4076 100644 --- a/pipeline/transform/merge.py +++ b/pipeline/transform/merge.py @@ -24,7 +24,9 @@ def _build_wide( "lsoa21", ) wide = wide.join(arcgis, on="postcode", how="inner") - print(f" {wide.shape[0]:,} rows after GPS join, {wide.estimated_size('mb'):.1f} MB") + print( + f" {wide.shape[0]:,} rows after GPS join, {wide.estimated_size('mb'):.1f} MB" + ) # Journey times (optional) if journey_times_path and journey_times_path.exists(): @@ -42,9 +44,7 @@ def _build_wide( if iod_path and iod_path.exists(): print("Joining IoD scores...") iod = pl.read_parquet(iod_path) - wide = wide.join( - iod, left_on="lsoa21", right_on="LSOA code (2021)", how="left" - ) + wide = wide.join(iod, left_on="lsoa21", right_on="LSOA code (2021)", how="left") print(f" {wide.estimated_size('mb'):.1f} MB after IoD") # POI proximity counts (pre-computed per postcode) @@ -66,44 +66,68 @@ def _build_wide( ) # Derived columns - wide = wide.with_columns( - (pl.col("latest_price") / pl.col("total_floor_area")).alias("Price per sqm"), - ).drop( - 'date_of_transfer', - 'inspection_date', - 'floor_height', - 'lsoa21', - 'LSOA code (2021)', - 'Local Authority District code (2024)', - 'Local Authority District name (2024)', - 'imd_score', - 'housing_barriers_score', - 'idaci_score', - 'idaopi_score', - 'children_young_people_score', - 'adult_skills_score', - 'geographical_barriers_score', - 'wider_barriers_score', - ).rename({ - 'construction_age_band': "Approximate construction age", - "income_score": "Income Score (rate)", - "employment_score": "Employment Score (rate)", - "education_score": "Education, Skills and Training Score", - "health_score": "Health Deprivation and Disability Score", - "crime_score": "Crime Score", - }) + wide = ( + wide.with_columns( + (pl.col("latest_price") / pl.col("total_floor_area")).alias( + "Price per sqm" + ), + ) + .drop( + "date_of_transfer", + "inspection_date", + "floor_height", + "lsoa21", + "LSOA code (2021)", + "Local Authority District code (2024)", + "Local Authority District name (2024)", + "imd_score", + "housing_barriers_score", + "idaci_score", + "idaopi_score", + "children_young_people_score", + "adult_skills_score", + "geographical_barriers_score", + "wider_barriers_score", + ) + .rename( + { + "construction_age_band": "Approximate construction age", + "income_score": "Income Score (rate)", + "employment_score": "Employment Score (rate)", + "education_score": "Education, Skills and Training Score", + "health_score": "Health Deprivation and Disability Score", + "crime_score": "Crime Score", + } + ) + ) return wide def main(): - parser = argparse.ArgumentParser(description="Build wide property dataframe with all joins") - parser.add_argument("--epc-pp", type=Path, required=True, help="EPC-Price Paid joined parquet file") - parser.add_argument("--arcgis", type=Path, required=True, help="ArcGIS postcode data parquet file") - parser.add_argument("--iod", type=Path, help="Index of Deprivation parquet file (optional)") - parser.add_argument("--poi-proximity", type=Path, help="POI proximity counts parquet file (optional)") - parser.add_argument("--journey-times", type=Path, help="Journey times parquet file (optional)") - parser.add_argument("--output", type=Path, required=True, help="Output parquet file path") + parser = argparse.ArgumentParser( + description="Build wide property dataframe with all joins" + ) + parser.add_argument( + "--epc-pp", type=Path, required=True, help="EPC-Price Paid joined parquet file" + ) + parser.add_argument( + "--arcgis", type=Path, required=True, help="ArcGIS postcode data parquet file" + ) + parser.add_argument( + "--iod", type=Path, help="Index of Deprivation parquet file (optional)" + ) + parser.add_argument( + "--poi-proximity", + type=Path, + help="POI proximity counts parquet file (optional)", + ) + parser.add_argument( + "--journey-times", type=Path, help="Journey times parquet file (optional)" + ) + parser.add_argument( + "--output", type=Path, required=True, help="Output parquet file path" + ) args = parser.parse_args() wide = _build_wide( @@ -119,7 +143,7 @@ def main(): wide.write_parquet(args.output) size_mb = args.output.stat().st_size / (1024 * 1024) - + print(f"Wrote {args.output} ({size_mb:.1f} MB)") diff --git a/pipeline/transform/transform_poi.py b/pipeline/transform/transform_poi.py index d9caeb5..5a4608b 100644 --- a/pipeline/transform/transform_poi.py +++ b/pipeline/transform/transform_poi.py @@ -584,9 +584,7 @@ def transform(input_path: Path) -> pl.LazyFrame: if cat not in DROP_CATEGORIES and cat not in CATEGORY_MAP: unmapped.append(cat) if unmapped: - raise ValueError( - f"Categories missing from CATEGORY_MAP: {sorted(unmapped)}" - ) + raise ValueError(f"Categories missing from CATEGORY_MAP: {sorted(unmapped)}") # Verify every CATEGORY_MAP key actually exists in the data (catch typos) mapped_but_absent = [] @@ -623,9 +621,15 @@ def transform(input_path: Path) -> pl.LazyFrame: def main(): - parser = argparse.ArgumentParser(description="Transform raw POIs to filtered version with friendly names") - parser.add_argument("--input", type=Path, required=True, help="Raw POIs parquet file") - parser.add_argument("--output", type=Path, required=True, help="Output filtered POIs parquet file") + parser = argparse.ArgumentParser( + description="Transform raw POIs to filtered version with friendly names" + ) + parser.add_argument( + "--input", type=Path, required=True, help="Raw POIs parquet file" + ) + parser.add_argument( + "--output", type=Path, required=True, help="Output filtered POIs parquet file" + ) args = parser.parse_args() df = transform(args.input).collect() diff --git a/pipeline/utils/__init__.py b/pipeline/utils/__init__.py index 8f435ac..cb7663a 100644 --- a/pipeline/utils/__init__.py +++ b/pipeline/utils/__init__.py @@ -2,4 +2,10 @@ from .fuzzy_join import fuzzy_join_on_postcode from .haversine import haversine_km, haversine_km_expr from .poi_counts import POI_GROUPS, count_pois_within_radius -__all__ = ["fuzzy_join_on_postcode", "haversine_km", "haversine_km_expr", "POI_GROUPS", "count_pois_within_radius"] +__all__ = [ + "fuzzy_join_on_postcode", + "haversine_km", + "haversine_km_expr", + "POI_GROUPS", + "count_pois_within_radius", +] diff --git a/pipeline/utils/fuzzy_join.py b/pipeline/utils/fuzzy_join.py index f516f22..4ddd602 100644 --- a/pipeline/utils/fuzzy_join.py +++ b/pipeline/utils/fuzzy_join.py @@ -9,14 +9,14 @@ import polars as pl from thefuzz import fuzz from tqdm import tqdm -_NUMBER_RE = re.compile(r'\d+') +_NUMBER_RE = re.compile(r"\d+") def _normalize(s: pl.Expr) -> pl.Expr: return ( s.str.to_uppercase() - .str.replace_all(r'[,.\-]', ' ') - .str.replace_all(r'\s+', ' ') + .str.replace_all(r"[,.\-]", " ") + .str.replace_all(r"\s+", " ") .str.strip_chars() ) @@ -40,22 +40,25 @@ def fuzzy_join_on_postcode( have null right columns. """ - tmpdir = tempfile.mkdtemp(prefix='fuzzy_join_') - left_path = Path(tmpdir) / 'left.parquet' - right_path = Path(tmpdir) / 'right.parquet' + tmpdir = tempfile.mkdtemp(prefix="fuzzy_join_") + left_path = Path(tmpdir) / "left.parquet" + right_path = Path(tmpdir) / "right.parquet" try: # Materialise each side exactly once, with a row index, to temp parquet. - left.with_row_index('_left_idx').sink_parquet(left_path) - right.with_row_index('_right_idx').sink_parquet(right_path) + left.with_row_index("_left_idx").sink_parquet(left_path) + right.with_row_index("_right_idx").sink_parquet(right_path) # Collect only the narrow columns needed for matching (projection pushdown). left_match = ( pl.scan_parquet(left_path) .select( - '_left_idx', - _normalize(pl.col(left_address_col)).alias('_left_address'), - pl.col(left_postcode_col).str.strip_chars().str.to_uppercase().alias('_left_postcode'), + "_left_idx", + _normalize(pl.col(left_address_col)).alias("_left_address"), + pl.col(left_postcode_col) + .str.strip_chars() + .str.to_uppercase() + .alias("_left_postcode"), ) .collect() ) @@ -63,18 +66,23 @@ def fuzzy_join_on_postcode( right_match = ( pl.scan_parquet(right_path) .select( - '_right_idx', - _normalize(pl.col(right_address_col)).alias('_right_address'), - pl.col(right_postcode_col).str.strip_chars().str.to_uppercase().alias('_right_postcode'), + "_right_idx", + _normalize(pl.col(right_address_col)).alias("_right_address"), + pl.col(right_postcode_col) + .str.strip_chars() + .str.to_uppercase() + .alias("_right_postcode"), ) - .unique(subset=['_right_address', '_right_postcode'], keep='first') + .unique(subset=["_right_address", "_right_postcode"], keep="first") .collect() ) # Group right side by postcode for fast lookup right_by_postcode: dict[str, list[tuple[int, str]]] = {} for idx, postcode, address in zip( - right_match['_right_idx'], right_match['_right_postcode'], right_match['_right_address'] + right_match["_right_idx"], + right_match["_right_postcode"], + right_match["_right_address"], ): if postcode is not None: right_by_postcode.setdefault(postcode, []).append((idx, address)) @@ -82,7 +90,9 @@ def fuzzy_join_on_postcode( # Group left side by postcode left_by_postcode: dict[str, list[tuple[int, str]]] = {} for idx, postcode, address in zip( - left_match['_left_idx'], left_match['_left_postcode'], left_match['_left_address'] + left_match["_left_idx"], + left_match["_left_postcode"], + left_match["_left_address"], ): if address is not None and postcode is not None: left_by_postcode.setdefault(postcode, []).append((idx, address)) @@ -103,7 +113,7 @@ def fuzzy_join_on_postcode( for pairs in tqdm( executor.map(_score_bucket, tasks, chunksize=64), total=len(tasks), - desc='Fuzzy matching', + desc="Fuzzy matching", ): all_pairs.extend(pairs) @@ -127,24 +137,27 @@ def fuzzy_join_on_postcode( # Build a small mapping LazyFrame and join back to the cached parquets. if matches: - mapping = pl.LazyFrame({ - '_left_idx': pl.Series([m[0] for m in matches], dtype=pl.UInt32), - '_right_idx': pl.Series([m[1] for m in matches], dtype=pl.UInt32), - }) + mapping = pl.LazyFrame( + { + "_left_idx": pl.Series([m[0] for m in matches], dtype=pl.UInt32), + "_right_idx": pl.Series([m[1] for m in matches], dtype=pl.UInt32), + } + ) else: - mapping = pl.LazyFrame({ - '_left_idx': pl.Series([], dtype=pl.UInt32), - '_right_idx': pl.Series([], dtype=pl.UInt32), - }) + mapping = pl.LazyFrame( + { + "_left_idx": pl.Series([], dtype=pl.UInt32), + "_right_idx": pl.Series([], dtype=pl.UInt32), + } + ) left_cached = pl.scan_parquet(left_path) right_cached = pl.scan_parquet(right_path) return ( - left_cached - .join(mapping, on='_left_idx', how='left') - .join(right_cached, on='_right_idx', how='left') - .drop('_left_idx', '_right_idx') + left_cached.join(mapping, on="_left_idx", how="left") + .join(right_cached, on="_right_idx", how="left") + .drop("_left_idx", "_right_idx") ) except BaseException: shutil.rmtree(tmpdir, ignore_errors=True) @@ -158,7 +171,9 @@ def _numbers_compatible(a: str, b: str) -> bool: """ nums_a = set(_NUMBER_RE.findall(a)) nums_b = set(_NUMBER_RE.findall(b)) - smaller, larger = (nums_a, nums_b) if len(nums_a) <= len(nums_b) else (nums_b, nums_a) + smaller, larger = ( + (nums_a, nums_b) if len(nums_a) <= len(nums_b) else (nums_b, nums_a) + ) if not smaller and larger: return False return smaller.issubset(larger) diff --git a/pipeline/utils/haversine.py b/pipeline/utils/haversine.py index ff4a4bf..96fc02a 100644 --- a/pipeline/utils/haversine.py +++ b/pipeline/utils/haversine.py @@ -6,7 +6,9 @@ import polars as pl _EARTH_RADIUS_KM = 6371.0 -def haversine_km(lat1: np.ndarray, lon1: np.ndarray, lat2: float, lon2: float) -> np.ndarray: +def haversine_km( + lat1: np.ndarray, lon1: np.ndarray, lat2: float, lon2: float +) -> np.ndarray: """Compute haversine distance in km between arrays (lat1, lon1) and a single point (lat2, lon2).""" lat1_rad = np.radians(lat1) lon1_rad = np.radians(lon1) @@ -14,7 +16,10 @@ def haversine_km(lat1: np.ndarray, lon1: np.ndarray, lat2: float, lon2: float) - lon2_rad = np.radians(lon2) dlat = lat2_rad - lat1_rad dlon = lon2_rad - lon1_rad - a = np.sin(dlat / 2) ** 2 + np.cos(lat1_rad) * np.cos(lat2_rad) * np.sin(dlon / 2) ** 2 + a = ( + np.sin(dlat / 2) ** 2 + + np.cos(lat1_rad) * np.cos(lat2_rad) * np.sin(dlon / 2) ** 2 + ) c = 2 * np.arcsin(np.sqrt(a)) return _EARTH_RADIUS_KM * c @@ -32,5 +37,7 @@ def haversine_km_expr( dlat = pl.lit(dest_lat_rad) - lat_rad dlon = pl.lit(dest_lon_rad) - lon_rad - a = (dlat / 2).sin() ** 2 + pl.lit(dest_lat_rad).cos() * lat_rad.cos() * (dlon / 2).sin() ** 2 + a = (dlat / 2).sin() ** 2 + pl.lit(dest_lat_rad).cos() * lat_rad.cos() * ( + dlon / 2 + ).sin() ** 2 return 2 * _EARTH_RADIUS_KM * a.sqrt().arcsin() diff --git a/pipeline/utils/poi_counts.py b/pipeline/utils/poi_counts.py index 6ab700c..b97df5e 100644 --- a/pipeline/utils/poi_counts.py +++ b/pipeline/utils/poi_counts.py @@ -70,7 +70,9 @@ def _count_pois_per_postcode( pc_codes = postcodes_df["postcode"].to_list() # Initialize result arrays - result_counts = {group: np.zeros(n_postcodes, dtype=np.int32) for group in POI_GROUPS} + result_counts = { + group: np.zeros(n_postcodes, dtype=np.int32) for group in POI_GROUPS + } # Process in batches with progress batch_size = 50000 @@ -83,7 +85,9 @@ def _count_pois_per_postcode( end_idx = min(start_idx + batch_size, n_postcodes) if batch_idx % 5 == 0: - print(f" Batch {batch_idx + 1}/{n_batches}: postcodes {start_idx:,} - {end_idx:,}") + print( + f" Batch {batch_idx + 1}/{n_batches}: postcodes {start_idx:,} - {end_idx:,}" + ) # Process batch for i in range(start_idx, end_idx): @@ -109,12 +113,7 @@ def _count_pois_per_postcode( nearby = np.concatenate(nearby_indices) # Vectorized distance calculation for all nearby POIs - distances = haversine_km( - poi_lats[nearby], - poi_lngs[nearby], - pc_lat, - pc_lon - ) + distances = haversine_km(poi_lats[nearby], poi_lngs[nearby], pc_lat, pc_lon) # Filter by radius within_mask = distances <= radius_km @@ -147,13 +146,13 @@ def count_pois_within_radius( """ # Get unique postcodes with coordinates print("Deduplicating postcodes...") - unique_postcodes = ( - properties - .select(["postcode", "lat", "lon"]) - .unique(subset=["postcode"]) + unique_postcodes = properties.select(["postcode", "lat", "lon"]).unique( + subset=["postcode"] ) - print(f" {len(properties):,} properties → {len(unique_postcodes):,} unique postcodes") + print( + f" {len(properties):,} properties → {len(unique_postcodes):,} unique postcodes" + ) # Count POIs per postcode postcode_counts = _count_pois_per_postcode(unique_postcodes, pois, radius_km) @@ -174,11 +173,7 @@ def count_pois_within_radius( result_lazy = ( properties.lazy() .select("postcode") - .join( - pl.scan_parquet(tmp_path), - on="postcode", - how="left" - ) + .join(pl.scan_parquet(tmp_path), on="postcode", how="left") .select(count_cols) .fill_null(0) ) diff --git a/pipeline/utils/test_fuzzy_join.py b/pipeline/utils/test_fuzzy_join.py index 93c573c..bd3f03b 100644 --- a/pipeline/utils/test_fuzzy_join.py +++ b/pipeline/utils/test_fuzzy_join.py @@ -41,6 +41,6 @@ result = fuzzy_join_on_postcode( snapshot = result.select("pp_address", "ADDRESS").sort("pp_address") -print('Testing the matching between EPC and PP addresses') +print("Testing the matching between EPC and PP addresses") with pl.Config(tbl_rows=-1, tbl_cols=-1, fmt_str_lengths=80): print(snapshot) diff --git a/pipeline/utils/test_haversine.py b/pipeline/utils/test_haversine.py index 639eba7..d1c46af 100644 --- a/pipeline/utils/test_haversine.py +++ b/pipeline/utils/test_haversine.py @@ -73,29 +73,39 @@ class TestHaversineKmExpr: def test_same_point(self): """Distance from a point to itself should be zero.""" df = pl.DataFrame({"lat": [51.5074], "lon": [-0.1278]}) - result = df.select(haversine_km_expr("lat", "lon", 51.5074, -0.1278).alias("dist")) + result = df.select( + haversine_km_expr("lat", "lon", 51.5074, -0.1278).alias("dist") + ) assert result["dist"][0] == pytest.approx(0.0, abs=1e-10) def test_known_distance_london_to_paris(self): """Test distance from London to Paris (~344 km).""" df = pl.DataFrame({"lat": [51.5074], "lon": [-0.1278]}) - result = df.select(haversine_km_expr("lat", "lon", 48.8566, 2.3522).alias("dist")) + result = df.select( + haversine_km_expr("lat", "lon", 48.8566, 2.3522).alias("dist") + ) assert result["dist"][0] == pytest.approx(344, rel=0.01) def test_known_distance_new_york_to_london(self): """Test distance from New York to London (~5570 km).""" df = pl.DataFrame({"lat": [40.7128], "lon": [-74.0060]}) - result = df.select(haversine_km_expr("lat", "lon", 51.5074, -0.1278).alias("dist")) + result = df.select( + haversine_km_expr("lat", "lon", 51.5074, -0.1278).alias("dist") + ) assert result["dist"][0] == pytest.approx(5570, rel=0.01) def test_multiple_points(self): """Test calculating distances from multiple points to a single destination.""" - df = pl.DataFrame({ - "lat": [51.5074, 48.8566, 40.7128], # London, Paris, NYC - "lon": [-0.1278, 2.3522, -74.0060], - }) + df = pl.DataFrame( + { + "lat": [51.5074, 48.8566, 40.7128], # London, Paris, NYC + "lon": [-0.1278, 2.3522, -74.0060], + } + ) # Distance to Edinburgh - result = df.select(haversine_km_expr("lat", "lon", 55.9533, -3.1883).alias("dist")) + result = df.select( + haversine_km_expr("lat", "lon", 55.9533, -3.1883).alias("dist") + ) dists = result["dist"].to_numpy() # All distances should be positive @@ -128,7 +138,9 @@ class TestHaversineConsistency: # Polars version df = pl.DataFrame({"lat": lats, "lon": lons}) - polars_result = df.select(haversine_km_expr("lat", "lon", dest_lat, dest_lon).alias("dist")) + polars_result = df.select( + haversine_km_expr("lat", "lon", dest_lat, dest_lon).alias("dist") + ) polars_dists = polars_result["dist"].to_numpy() # Should be identical (or at least very close due to floating point) diff --git a/pipeline/utils/test_poi_counts.py b/pipeline/utils/test_poi_counts.py index a7366f5..a5786b6 100644 --- a/pipeline/utils/test_poi_counts.py +++ b/pipeline/utils/test_poi_counts.py @@ -7,28 +7,32 @@ from pipeline.utils.poi_counts import POI_GROUPS, count_pois_within_radius @pytest.fixture def pois(): """POIs clustered around two locations: central London and 10km away.""" - return pl.DataFrame({ - "lat": [51.5074, 51.5075, 51.5080, 51.5076, 51.5073, 51.60], - "lng": [-0.1278, -0.1280, -0.1275, -0.1279, -0.1277, -0.20], - "category": [ - "Restaurant", - "Fast Food", - "Supermarket", - "Park", - "Station", - "Restaurant", # too far from any property - ], - }) + return pl.DataFrame( + { + "lat": [51.5074, 51.5075, 51.5080, 51.5076, 51.5073, 51.60], + "lng": [-0.1278, -0.1280, -0.1275, -0.1279, -0.1277, -0.20], + "category": [ + "Restaurant", + "Fast Food", + "Supermarket", + "Park", + "Station", + "Restaurant", # too far from any property + ], + } + ) @pytest.fixture def properties(): """Two properties at the same postcode near central London, one at a distant postcode.""" - return pl.DataFrame({ - "postcode": ["EC1A 1BB", "EC1A 1BB", "ZZ99 9ZZ"], - "lat": [51.5074, 51.5074, 55.0], - "lon": [-0.1278, -0.1278, -3.0], - }) + return pl.DataFrame( + { + "postcode": ["EC1A 1BB", "EC1A 1BB", "ZZ99 9ZZ"], + "lat": [51.5074, 51.5074, 55.0], + "lon": [-0.1278, -0.1278, -3.0], + } + ) def test_counts_pois_within_radius(properties, pois): @@ -41,9 +45,9 @@ def test_counts_pois_within_radius(properties, pois): assert len(series) == 3, f"{col} has {len(series)} rows, expected 3" # First two rows share a postcode near the central London cluster - assert result["restaurants_2km"][0] == 2 # Restaurant + Fast Food - assert result["groceries_2km"][0] == 1 # Supermarket - assert result["parks_2km"][0] == 1 # Park + assert result["restaurants_2km"][0] == 2 # Restaurant + Fast Food + assert result["groceries_2km"][0] == 1 # Supermarket + assert result["parks_2km"][0] == 1 # Park assert result["public_transport_2km"][0] == 1 # Station # Second row is the same postcode, so same counts @@ -55,11 +59,13 @@ def test_counts_pois_within_radius(properties, pois): def test_no_pois_returns_zeros(properties): - empty_pois = pl.DataFrame({ - "lat": pl.Series([], dtype=pl.Float64), - "lng": pl.Series([], dtype=pl.Float64), - "category": pl.Series([], dtype=pl.String), - }) + empty_pois = pl.DataFrame( + { + "lat": pl.Series([], dtype=pl.Float64), + "lng": pl.Series([], dtype=pl.Float64), + "category": pl.Series([], dtype=pl.String), + } + ) result = count_pois_within_radius(properties, empty_pois, radius_km=2.0) for group in POI_GROUPS: @@ -70,11 +76,13 @@ def test_no_pois_returns_zeros(properties): def test_custom_radius(pois): """A tiny radius should exclude POIs that are even slightly away.""" - properties = pl.DataFrame({ - "postcode": ["EC1A 1BB"], - "lat": [51.5074], - "lon": [-0.1278], - }) + properties = pl.DataFrame( + { + "postcode": ["EC1A 1BB"], + "lat": [51.5074], + "lon": [-0.1278], + } + ) # 0.01 km = 10m — only the POI at the exact same location should match result = count_pois_within_radius(properties, pois, radius_km=0.01)