From 38b0cf1ea110f6953ef5c8f7424059b586f3be8a Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sat, 31 Jan 2026 12:48:29 +0000 Subject: [PATCH] Move transform logic around --- Taskfile.yml | 155 ++++- pipeline/base.py | 22 - pipeline/config.py | 12 - .../download/{pois/__main__.py => pois.py} | 39 +- pipeline/download/pois/__init__.py | 0 pipeline/download/pois/config.py | 32 - pipeline/epc_pp.py | 85 --- pipeline/run.py | 6 - pipeline/transform/join_epc_pp.py | 98 +++ pipeline/transform/merge.py | 127 ++++ pipeline/transform/poi_proximity.py | 42 ++ pipeline/transform/transform_poi.py | 644 ++++++++++++++++++ pipeline/utils/test_fuzzy_join.py | 4 +- pipeline/wide.py | 143 ---- 14 files changed, 1073 insertions(+), 336 deletions(-) delete mode 100644 pipeline/base.py delete mode 100644 pipeline/config.py rename pipeline/download/{pois/__main__.py => pois.py} (90%) delete mode 100644 pipeline/download/pois/__init__.py delete mode 100644 pipeline/download/pois/config.py delete mode 100644 pipeline/epc_pp.py delete mode 100644 pipeline/run.py create mode 100644 pipeline/transform/join_epc_pp.py create mode 100644 pipeline/transform/merge.py create mode 100644 pipeline/transform/poi_proximity.py create mode 100644 pipeline/transform/transform_poi.py delete mode 100644 pipeline/wide.py diff --git a/Taskfile.yml b/Taskfile.yml index 52a0dde..ceb4a47 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -1,38 +1,136 @@ version: '3' +vars: + DATA_DIR: data + ARCGIS_OUTPUT: "{{.DATA_DIR}}/arcgis_data.parquet" + PRICE_PAID_OUTPUT: "{{.DATA_DIR}}/price-paid-complete.parquet" + IOD_OUTPUT: "{{.DATA_DIR}}/IoD2025_Scores.parquet" + POIS_RAW_OUTPUT: "{{.DATA_DIR}}/uk_pois.parquet" + POIS_FILTERED_OUTPUT: "{{.DATA_DIR}}/filtered_uk_pois.parquet" + POI_PROXIMITY_OUTPUT: "{{.DATA_DIR}}/poi_proximity.parquet" + EPC_PP_OUTPUT: "{{.DATA_DIR}}/epc_pp.parquet" + WIDE_OUTPUT: "{{.DATA_DIR}}/wide.parquet" + EPC_CSV: "{{.DATA_DIR}}/epc/certificates.csv" + JOURNEY_TIMES: "{{.DATA_DIR}}/journey_times_bank_checkpoint.parquet" + tasks: install: - desc: Install dependencies, generate client, and download data + desc: Install dependencies cmds: - uv sync - cd frontend && npm install - download: - desc: Download data - deps: - - install + download:arcgis: + desc: Download and convert ArcGIS postcode data + sources: + - pipeline/download/arcgis.py + generates: + - "{{.ARCGIS_OUTPUT}}" cmds: - - uv run -m pipeline.download.arcgis - - uv run -m pipeline.download.pois - - uv run -m pipeline.download.deprivation_data - - uv run -m pipeline.download.price_paid + - uv run python -m pipeline.download.arcgis --output {{.ARCGIS_OUTPUT}} - pipeline: - desc: Run data processing pipeline - deps: - - download + download:price-paid: + desc: Download and convert Land Registry price-paid data + sources: + - pipeline/download/price_paid.py + generates: + - "{{.PRICE_PAID_OUTPUT}}" cmds: - - uv run python -m pipeline.run + - uv run python -m pipeline.download.price_paid --output {{.PRICE_PAID_OUTPUT}} + + download:deprivation: + desc: Download and convert Index of Deprivation data + sources: + - pipeline/download/deprivation_data.py + generates: + - "{{.IOD_OUTPUT}}" + cmds: + - uv run python -m pipeline.download.deprivation_data --output {{.IOD_OUTPUT}} + + download:pois: + desc: Download and extract POIs from OpenStreetMap + sources: + - pipeline/download/pois.py + generates: + - "{{.POIS_RAW_OUTPUT}}" + cmds: + - uv run python -m pipeline.download.pois --output {{.POIS_RAW_OUTPUT}} + + transform:pois: + desc: Transform raw POIs to filtered version with friendly names + deps: + - download:pois + sources: + - pipeline/transform/transform_poi.py + - "{{.POIS_RAW_OUTPUT}}" + generates: + - "{{.POIS_FILTERED_OUTPUT}}" + cmds: + - uv run python -m pipeline.transform.transform_poi --input {{.POIS_RAW_OUTPUT}} --output {{.POIS_FILTERED_OUTPUT}} + + transform:epc-pp: + desc: Fuzzy join EPC and Price Paid data + deps: + - download:price-paid + sources: + - pipeline/transform/join_epc_pp.py + - pipeline/utils/fuzzy_join.py + - "{{.PRICE_PAID_OUTPUT}}" + - "{{.EPC_CSV}}" + generates: + - "{{.EPC_PP_OUTPUT}}" + cmds: + - uv run python -m pipeline.transform.join_epc_pp --epc {{.EPC_CSV}} --price-paid {{.PRICE_PAID_OUTPUT}} --output {{.EPC_PP_OUTPUT}} + + transform:poi-proximity: + desc: Compute POI proximity counts per postcode + deps: + - download:arcgis + - transform:pois + sources: + - pipeline/transform/poi_proximity.py + - pipeline/utils/poi_counts.py + - "{{.ARCGIS_OUTPUT}}" + - "{{.POIS_FILTERED_OUTPUT}}" + generates: + - "{{.POI_PROXIMITY_OUTPUT}}" + cmds: + - uv run python -m pipeline.transform.poi_proximity --arcgis {{.ARCGIS_OUTPUT}} --pois {{.POIS_FILTERED_OUTPUT}} --output {{.POI_PROXIMITY_OUTPUT}} + + transform:wide: + desc: Build wide property dataframe with all joins + deps: + - join:epc-pp + - download:arcgis + - download:deprivation + - transform:poi-proximity + sources: + - pipeline/transform/merge.py + - "{{.EPC_PP_OUTPUT}}" + - "{{.ARCGIS_OUTPUT}}" + - "{{.IOD_OUTPUT}}" + - "{{.POI_PROXIMITY_OUTPUT}}" + generates: + - "{{.WIDE_OUTPUT}}" + cmds: + - uv run python -m pipeline.transform.merge --epc-pp {{.EPC_PP_OUTPUT}} --arcgis {{.ARCGIS_OUTPUT}} --iod {{.IOD_OUTPUT}} --poi-proximity {{.POI_PROXIMITY_OUTPUT}} --journey-times {{.JOURNEY_TIMES}} --output {{.WIDE_OUTPUT}} prepare: desc: Prepare the application (install, download data, run pipeline) deps: - - pipeline + - transform:wide + + test: + cmds: + - uv run -m pipeline.utils.test_fuzzy_join + - uv run pytest pipeline/utils/test_haversine.py + - uv run pytest pipeline/utils/test_poi_counts.py server: - desc: Run FastAPI backend on port 8001 + desc: Run Rust backend on port 8001 + dir: server-rs cmds: - - uv run fastapi dev server/main.py --port 8001 + - cargo run --release -- {{.WIDE_OUTPUT}} frontend: desc: Run frontend dev server on port 3030 (proxies /api to :8001) @@ -46,16 +144,13 @@ tasks: cmds: - npm run build - prod: - desc: Run production server (serves built frontend) - cmds: - - uv run fastapi run server/main.py --port 8001 lint: - desc: Lint all code (Python and TypeScript) + desc: Lint all code (Python, TypeScript, and Rust) cmds: - task: lint:python - task: lint:frontend + - task: lint:rust lint:python: desc: Lint Python code with ruff @@ -69,11 +164,19 @@ tasks: - npm run lint - npm run format:check + lint:rust: + desc: Lint Rust code with clippy and check formatting + dir: server-rs + cmds: + - cargo clippy -- -D warnings + - cargo fmt --check + format: - desc: Format all code (Python and TypeScript) + desc: Format all code (Python, TypeScript, and Rust) cmds: - task: format:python - task: format:frontend + - task: format:rust format:python: desc: Format Python code with ruff @@ -88,6 +191,12 @@ tasks: - npm run lint:fix - npm run format + format:rust: + desc: Format Rust code with cargo fmt + dir: server-rs + cmds: + - cargo fmt + check: desc: Run all checks (lint, typecheck, build) cmds: diff --git a/pipeline/base.py b/pipeline/base.py deleted file mode 100644 index 8394792..0000000 --- a/pipeline/base.py +++ /dev/null @@ -1,22 +0,0 @@ -from abc import ABC, abstractmethod -import polars as pl - - -class DataSource(ABC): - """Base class for all data sources.""" - - @property - @abstractmethod - def name(self) -> str: - """Unique identifier for this data source.""" - pass - - @abstractmethod - def load(self) -> pl.LazyFrame: - """Load raw data as LazyFrame.""" - pass - - @abstractmethod - def process(self, postcodes: pl.LazyFrame) -> pl.LazyFrame: - """Process and join with postcode coordinates.""" - pass diff --git a/pipeline/config.py b/pipeline/config.py deleted file mode 100644 index ca7ca24..0000000 --- a/pipeline/config.py +++ /dev/null @@ -1,12 +0,0 @@ -"""Shared configuration for the pipeline and server.""" - -from pathlib import Path - -DATA_DIR = Path(__file__).parent.parent / "data_sources" -PROCESSED_DIR = DATA_DIR / "processed" -AGGREGATES_DIR = PROCESSED_DIR / "aggregates" - -# H3 resolutions to generate and serve -# https://h3geo.org/docs/core-library/restable/#average-area-in-m2 -H3_RESOLUTIONS = [7, 8, 9, 10, 11] -DEFAULT_H3_RESOLUTION = 8 diff --git a/pipeline/download/pois/__main__.py b/pipeline/download/pois.py similarity index 90% rename from pipeline/download/pois/__main__.py rename to pipeline/download/pois.py index 7b06c06..2694fc2 100644 --- a/pipeline/download/pois/__main__.py +++ b/pipeline/download/pois.py @@ -8,17 +8,35 @@ import osmium import polars as pl from tqdm import tqdm -from .config import ( - BATCH_SIZE, - GEOFABRIK_GB_URL, - MIN_OCCURENCE_COUNT, - POI_TAG_KEYS, - UK_BBOX_EAST, - UK_BBOX_NORTH, - UK_BBOX_SOUTH, - UK_BBOX_WEST, +from pathlib import Path + + +BATCH_SIZE = 50_000 + +MIN_OCCURENCE_COUNT = 20 + +GEOFABRIK_GB_URL = ( + "https://download.geofabrik.de/europe/great-britain-latest.osm.pbf" ) +UK_BBOX_WEST = -7.57 +UK_BBOX_SOUTH = 49.96 +UK_BBOX_EAST = 1.68 +UK_BBOX_NORTH = 58.64 + +POI_TAG_KEYS: list[str] = [ + "amenity", + "building", + "craft", + "emergency", + "healthcare", + "leisure", + "office", + "shop", + "tourism", + "public_transport", +] + def download_pbf(pbf_file: Path) -> None: @@ -144,10 +162,9 @@ def main() -> None: ) df = df.join(valid_categories.select("category"), on="category", how="semi") - args.output.parent.mkdir(parents=True, exist_ok=True) + print(f"Total POIs: {handler.poi_count:,}") df.sink_parquet(args.output) print(f"Saved to {args.output}") - print(f"Total POIs: {handler.poi_count:,}") if __name__ == "__main__": diff --git a/pipeline/download/pois/__init__.py b/pipeline/download/pois/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pipeline/download/pois/config.py b/pipeline/download/pois/config.py deleted file mode 100644 index eb677a6..0000000 --- a/pipeline/download/pois/config.py +++ /dev/null @@ -1,32 +0,0 @@ -from pathlib import Path - -DATA_DIR = Path("./data_sources") -GB_PBF_FILE = DATA_DIR / "great-britain-latest.osm.pbf" -OUTPUT_FILE = DATA_DIR / "uk_pois.parquet" - - -BATCH_SIZE = 50_000 - -MIN_OCCURENCE_COUNT = 20 - -GEOFABRIK_GB_URL = ( - "https://download.geofabrik.de/europe/great-britain-latest.osm.pbf" -) - -UK_BBOX_WEST = -7.57 -UK_BBOX_SOUTH = 49.96 -UK_BBOX_EAST = 1.68 -UK_BBOX_NORTH = 58.64 - -POI_TAG_KEYS: list[str] = [ - "amenity", - "building", - "craft", - "emergency", - "healthcare", - "leisure", - "office", - "shop", - "tourism", - "public_transport", -] diff --git a/pipeline/epc_pp.py b/pipeline/epc_pp.py deleted file mode 100644 index 9d7d480..0000000 --- a/pipeline/epc_pp.py +++ /dev/null @@ -1,85 +0,0 @@ -import polars as pl -from .fuzzy_join import fuzzy_join_on_postcode - - -pl.Config.set_tbl_cols(-1) - - - -epc = pl.scan_csv('data_sources/epc/certificates.csv').select( - pl.col('ADDRESS').alias('epc_address'), - 'POSTCODE', - 'CURRENT_ENERGY_RATING', - 'POTENTIAL_ENERGY_RATING', - pl.col('PROPERTY_TYPE').alias('epc_property_type'), - 'BUILT_FORM', - 'INSPECTION_DATE', - 'TOTAL_FLOOR_AREA', - 'NUMBER_HABITABLE_ROOMS', - 'FLOOR_HEIGHT', - 'CONSTRUCTION_AGE_BAND' -).filter(pl.col('epc_address').is_not_null()).sort('INSPECTION_DATE', descending=True).group_by('epc_address', 'POSTCODE').first() - - -print("EPC dataset") -print(epc.head().collect()) - -# https://www.gov.uk/guidance/about-the-price-paid-data -property_type_map = {"D": "Detached", "S": "Semi-Detached", "T": "Terraced", "F": "Flats/Maisonettes", "O": "Other"} -duration_map = {"F": "Freehold", "L": "Leasehold"} - -price_paid = (pl.scan_parquet('data_sources/pp-complete.parquet').select( - "price", - "date_of_transfer", - pl.col('property_type').alias("pp_property_type").replace(property_type_map), - "postcode", - 'paon', - 'saon', - 'street', - 'locality', - 'town_city', - pl.col('duration').replace(duration_map) -) -.filter(pl.col('pp_property_type') != 'Other').with_columns( - pl.concat_str( - [pl.col('saon'), pl.col('paon'), pl.col('street')], - separator=' ', - ignore_nulls=True, - ).alias('pp_address'), - ) - .sort('date_of_transfer') - .group_by('pp_address', 'postcode', maintain_order=True) - .agg( - pl.struct( - pl.col('date_of_transfer').dt.year().alias('year'), - 'price', - ).alias('historical_prices'), - pl.col('pp_property_type').last(), - pl.col('duration').last(), - pl.col('price').last().alias('latest_price'), - pl.col('date_of_transfer').last(), - ) -).filter(pl.col('pp_address').is_not_null()) - -print("Price paid dataset") -print(price_paid.head().collect()) - -joined = fuzzy_join_on_postcode( - left=price_paid, - right=epc, - left_address_col='pp_address', - right_address_col='epc_address', - left_postcode_col='postcode', - right_postcode_col='POSTCODE', -).drop('POSTCODE').collect() - -matched = joined.filter(pl.col('epc_address').is_not_null() & pl.col('pp_address').is_not_null()) -total = joined.height -print(f"Unique properties: {total}") -print(f"Matched: {matched.height} ({100 * matched.height / total:.1f}%)") -print(f"Unmatched: {total - matched.height}") - -matched = matched.rename({col: col.lower() for col in joined.columns}) - -print(matched.head()) -matched.write_parquet('data_sources/processed/epc_pp.parquet') diff --git a/pipeline/run.py b/pipeline/run.py deleted file mode 100644 index 6a03ed6..0000000 --- a/pipeline/run.py +++ /dev/null @@ -1,6 +0,0 @@ -"""Pipeline CLI to process property data with H3 spatial indexing.""" - -from pipeline.wide import run - -if __name__ == "__main__": - run() diff --git a/pipeline/transform/join_epc_pp.py b/pipeline/transform/join_epc_pp.py new file mode 100644 index 0000000..8717dc4 --- /dev/null +++ b/pipeline/transform/join_epc_pp.py @@ -0,0 +1,98 @@ +import argparse +import polars as pl +from pathlib import Path +from ..utils import fuzzy_join_on_postcode + + +pl.Config.set_tbl_cols(-1) + + +def main(): + parser = argparse.ArgumentParser(description="Fuzzy join EPC and Price Paid data") + parser.add_argument("--epc", type=Path, required=True, help="EPC certificates CSV file") + parser.add_argument("--price-paid", type=Path, required=True, help="Price paid parquet file") + parser.add_argument("--output", type=Path, required=True, help="Output parquet file path") + args = parser.parse_args() + + epc = pl.scan_csv(args.epc).select( + pl.col('ADDRESS').alias('epc_address'), + 'POSTCODE', + 'CURRENT_ENERGY_RATING', + 'POTENTIAL_ENERGY_RATING', + pl.col('PROPERTY_TYPE').alias('epc_property_type'), + 'BUILT_FORM', + 'INSPECTION_DATE', + 'TOTAL_FLOOR_AREA', + 'NUMBER_HABITABLE_ROOMS', + 'FLOOR_HEIGHT', + 'CONSTRUCTION_AGE_BAND' + ).filter(pl.col('epc_address').is_not_null()).sort('INSPECTION_DATE', descending=True).group_by('epc_address', 'POSTCODE').first() + + + print("EPC dataset") + print(epc.head().collect()) + + # https://www.gov.uk/guidance/about-the-price-paid-data + property_type_map = {"D": "Detached", "S": "Semi-Detached", "T": "Terraced", "F": "Flats/Maisonettes", "O": "Other"} + duration_map = {"F": "Freehold", "L": "Leasehold"} + + price_paid = (pl.scan_parquet(args.price_paid).select( + "price", + "date_of_transfer", + pl.col('property_type').alias("pp_property_type").replace(property_type_map), + "postcode", + 'paon', + 'saon', + 'street', + 'locality', + 'town_city', + pl.col('duration').replace(duration_map) + ) + .filter(pl.col('pp_property_type') != 'Other').with_columns( + pl.concat_str( + [pl.col('saon'), pl.col('paon'), pl.col('street')], + separator=' ', + ignore_nulls=True, + ).alias('pp_address'), + ) + .sort('date_of_transfer') + .group_by('pp_address', 'postcode', maintain_order=True) + .agg( + pl.struct( + pl.col('date_of_transfer').dt.year().alias('year'), + 'price', + ).alias('historical_prices'), + pl.col('pp_property_type').last(), + pl.col('duration').last(), + pl.col('price').last().alias('latest_price'), + pl.col('date_of_transfer').last(), + ) + ).filter(pl.col('pp_address').is_not_null()) + + print("Price paid dataset") + print(price_paid.head().collect()) + + joined = fuzzy_join_on_postcode( + left=price_paid, + right=epc, + left_address_col='pp_address', + right_address_col='epc_address', + left_postcode_col='postcode', + right_postcode_col='POSTCODE', + ).drop('POSTCODE').collect() + + matched = joined.filter(pl.col('epc_address').is_not_null() & pl.col('pp_address').is_not_null()) + total = joined.height + print(f"Unique properties: {total}") + print(f"Matched: {matched.height} ({100 * matched.height / total:.1f}%)") + print(f"Unmatched: {total - matched.height}") + + matched = matched.rename({col: col.lower() for col in joined.columns}) + + print(matched.head()) + matched.write_parquet(args.output) + print(f"Wrote {args.output}") + + +if __name__ == "__main__": + main() diff --git a/pipeline/transform/merge.py b/pipeline/transform/merge.py new file mode 100644 index 0000000..1ea4459 --- /dev/null +++ b/pipeline/transform/merge.py @@ -0,0 +1,127 @@ +import argparse +import polars as pl +from pathlib import Path + + +def _build_wide( + epc_pp_path: Path, + arcgis_path: Path, + iod_path: Path | None, + poi_proximity_path: Path | None, + journey_times_path: Path | None, +) -> pl.DataFrame: + """Build the wide dataframe by joining epc_pp with all auxiliary data.""" + print("Loading epc_pp...") + wide = pl.read_parquet(epc_pp_path) + print(f" {wide.shape[0]:,} rows, {wide.estimated_size('mb'):.1f} MB") + + # GPS coordinates + LSOA from ArcGIS + print("Joining GPS coordinates...") + arcgis = pl.read_parquet(arcgis_path).select( + pl.col("pcds").alias("postcode"), + "lat", + pl.col("long").alias("lon"), + "lsoa21", + ) + wide = wide.join(arcgis, on="postcode", how="inner") + print(f" {wide.shape[0]:,} rows after GPS join, {wide.estimated_size('mb'):.1f} MB") + + # Journey times (optional) + if journey_times_path and journey_times_path.exists(): + print("Joining journey times...") + journey_times = pl.read_parquet(journey_times_path).select( + "postcode", + "public_transport_easy_minutes", + "public_transport_quick_minutes", + "cycling_minutes", + ) + wide = wide.join(journey_times, on="postcode", how="left") + print(f" {wide.estimated_size('mb'):.1f} MB after journey times") + + # Index of Deprivation + if iod_path and iod_path.exists(): + print("Joining IoD scores...") + iod = pl.read_parquet(iod_path) + wide = wide.join( + iod, left_on="lsoa21", right_on="LSOA code (2021)", how="left" + ) + print(f" {wide.estimated_size('mb'):.1f} MB after IoD") + + # POI proximity counts (pre-computed per postcode) + if poi_proximity_path and poi_proximity_path.exists(): + print("Joining POI proximity counts...") + poi_counts = pl.read_parquet(poi_proximity_path) + wide = wide.join(poi_counts, on="postcode", how="left") + print(f" {wide.estimated_size('mb'):.1f} MB after POI counts") + + # Convert construction_age_band to numeric year + if "construction_age_band" in wide.columns: + wide = wide.with_columns( + pl.col("construction_age_band") + .str.replace("England and Wales: ", "") + .str.replace(" onwards", "") + .str.extract(r"(\d{4})", 1) + .cast(pl.UInt16, strict=False) + .alias("construction_age_band"), + ) + + # Derived columns + wide = wide.with_columns( + (pl.col("latest_price") / pl.col("total_floor_area")).alias("Price per sqm"), + ).drop( + 'date_of_transfer', + 'inspection_date', + 'floor_height', + 'lsoa21', + 'LSOA code (2021)', + 'Local Authority District code (2024)', + 'Local Authority District name (2024)', + 'imd_score', + 'housing_barriers_score', + 'idaci_score', + 'idaopi_score', + 'children_young_people_score', + 'adult_skills_score', + 'geographical_barriers_score', + 'wider_barriers_score', + ).rename({ + 'construction_age_band': "Approximate construction age", + "income_score": "Income Score (rate)", + "employment_score": "Employment Score (rate)", + "education_score": "Education, Skills and Training Score", + "health_score": "Health Deprivation and Disability Score", + "crime_score": "Crime Score", + }) + + return wide + + +def main(): + parser = argparse.ArgumentParser(description="Build wide property dataframe with all joins") + parser.add_argument("--epc-pp", type=Path, required=True, help="EPC-Price Paid joined parquet file") + parser.add_argument("--arcgis", type=Path, required=True, help="ArcGIS postcode data parquet file") + parser.add_argument("--iod", type=Path, help="Index of Deprivation parquet file (optional)") + parser.add_argument("--poi-proximity", type=Path, help="POI proximity counts parquet file (optional)") + parser.add_argument("--journey-times", type=Path, help="Journey times parquet file (optional)") + parser.add_argument("--output", type=Path, required=True, help="Output parquet file path") + args = parser.parse_args() + + wide = _build_wide( + epc_pp_path=args.epc_pp, + arcgis_path=args.arcgis, + iod_path=args.iod, + poi_proximity_path=args.poi_proximity, + journey_times_path=args.journey_times, + ) + + print(f"Columns: {wide.columns}") + print(f"Rows: {wide.height}") + + wide.write_parquet(args.output) + size_mb = args.output.stat().st_size / (1024 * 1024) + + print(f"Wrote {args.output} ({size_mb:.1f} MB)") + + +if __name__ == "__main__": + main() diff --git a/pipeline/transform/poi_proximity.py b/pipeline/transform/poi_proximity.py new file mode 100644 index 0000000..dce09cc --- /dev/null +++ b/pipeline/transform/poi_proximity.py @@ -0,0 +1,42 @@ +"""Compute POI proximity counts per postcode from ArcGIS + filtered POIs.""" + +import argparse +from pathlib import Path + +import polars as pl + +from pipeline.utils.poi_counts import _count_pois_per_postcode + + +def main(): + parser = argparse.ArgumentParser( + description="Count POIs within radius per postcode" + ) + parser.add_argument( + "--arcgis", type=Path, required=True, help="ArcGIS postcode parquet" + ) + parser.add_argument( + "--pois", type=Path, required=True, help="Filtered POIs parquet" + ) + parser.add_argument( + "--output", type=Path, required=True, help="Output parquet path" + ) + args = parser.parse_args() + + postcodes = pl.read_parquet(args.arcgis).select( + pl.col("pcds").alias("postcode"), + "lat", + pl.col("long").alias("lon"), + ) + + pois = pl.read_parquet(args.pois) + + result = _count_pois_per_postcode(postcodes, pois, radius_km=2) + + result.write_parquet(args.output) + size_mb = args.output.stat().st_size / (1024 * 1024) + print(f"Wrote {args.output} ({size_mb:.1f} MB)") + + +if __name__ == "__main__": + main() diff --git a/pipeline/transform/transform_poi.py b/pipeline/transform/transform_poi.py new file mode 100644 index 0000000..d9caeb5 --- /dev/null +++ b/pipeline/transform/transform_poi.py @@ -0,0 +1,644 @@ +import argparse +from pathlib import Path + +import polars as pl + + +DROP_CATEGORIES = { + "amenity/advice", + "amenity/atm", + "amenity/bbq", + "amenity/bench", + "amenity/bicycle_parking", + "amenity/clock", + "amenity/fixme", + "amenity/grit_bin", + "amenity/hunting_stand", + "amenity/motorcycle_parking", + "amenity/notice_board", + "amenity/parking", + "amenity/parking_entrance", + "amenity/parking_space", + "amenity/post_box", + "amenity/telephone", + "amenity/toilets", + "amenity/vacuum_cleaner", + "amenity/waste_basket", + "building/air_shaft", + "building/apartments", + "building/detached", + "building/entrance", + "building/entry", + "building/garage", + "building/garages", + "building/house", + "building/hut", + "building/no", + "building/office", + "building/public", + "building/residential", + "building/roof", + "building/shed", + "building/terrace", + "building/yes", + "emergency/access_point", + "emergency/ambulance_station", + "emergency/assembly_point", + "emergency/bleed_control_kit", + "emergency/defibrillator", + "emergency/designated", + "emergency/dry_riser_inlet", + "emergency/emergency_ward_entrance", + "emergency/fire_alarm_box", + "emergency/fire_extinguisher", + "emergency/fire_hydrant", + "emergency/fire_service_inlet", + "emergency/first_aid_kit", + "emergency/life_ring", + "emergency/lifeguard", + "emergency/no", + "emergency/phone", + "emergency/rescue_equipment", + "emergency/siren", + "emergency/throw_bag", + "emergency/water_rescue", + "emergency/yes", + "leisure/firepit", + "leisure/fishing", + "leisure/picnic_table", + "office/company", + "office/yes", + "tourism/apartment", + "tourism/apartments", + "tourism/camp_pitch", + "tourism/information", + "tourism/village_sign", + "tourism/yes", +} + +# (friendly_name, emoji) for every category we keep +CATEGORY_MAP: dict[str, tuple[str, str]] = { + # amenity + "amenity/animal_boarding": ("Animal Boarding", "๐Ÿพ"), + "amenity/animal_breeding": ("Animal Breeding", "๐Ÿฃ"), + "amenity/animal_shelter": ("Animal Shelter", "๐Ÿ "), + "amenity/arts_centre": ("Arts Centre", "๐ŸŽจ"), + "amenity/bank": ("Bank", "๐Ÿฆ"), + "amenity/bar": ("Bar", "๐Ÿธ"), + "amenity/bicycle_rental": ("Bike Rental", "๐Ÿšฒ"), + "amenity/bicycle_repair_station": ("Bike Repair", "๐Ÿ”ง"), + "amenity/binoculars": ("Public Binoculars", "๐Ÿ”ญ"), + "amenity/boat_rental": ("Boat Rental", "โ›ต"), + "amenity/boat_storage": ("Boat Storage", "๐Ÿšข"), + "amenity/boot_scraper": ("Boot Scraper", "๐Ÿฅพ"), + "amenity/bureau_de_change": ("Currency Exchange", "๐Ÿ’ฑ"), + "amenity/bus_station": ("Bus Station", "๐ŸšŒ"), + "amenity/cafe": ("Cafรฉ", "โ˜•"), + "amenity/car_rental": ("Car Rental", "๐Ÿš—"), + "amenity/car_sharing": ("Car Sharing", "๐Ÿš™"), + "amenity/car_wash": ("Car Wash", "๐Ÿงฝ"), + "amenity/care_home": ("Care Home", "๐Ÿฅ"), + "amenity/casino": ("Casino", "๐ŸŽฐ"), + "amenity/charging_station": ("EV Charging", "๐Ÿ”Œ"), + "amenity/check_in": ("Check-In Point", "โœ…"), + "amenity/childcare": ("Childcare", "๐Ÿ‘ถ"), + "amenity/cinema": ("Cinema", "๐ŸŽฌ"), + "amenity/clinic": ("Clinic", "๐Ÿฉบ"), + "amenity/club": ("Club", "๐Ÿ›๏ธ"), + "amenity/college": ("College", "๐ŸŽ“"), + "amenity/community_centre": ("Community Centre", "๐Ÿค"), + "amenity/compressed_air": ("Compressed Air", "๐Ÿ’จ"), + "amenity/conference_centre": ("Conference Centre", "๐Ÿ“‹"), + "amenity/courthouse": ("Courthouse", "โš–๏ธ"), + "amenity/coworking_space": ("Co-working Space", "๐Ÿ’ป"), + "amenity/crematorium": ("Crematorium", "๐Ÿ•ฏ๏ธ"), + "amenity/dancing_school": ("Dance School", "๐Ÿ’ƒ"), + "amenity/dentist": ("Dentist", "๐Ÿฆท"), + "amenity/doctors": ("Doctor", "๐Ÿ‘จโ€โš•๏ธ"), + "amenity/dojo": ("Dojo", "๐Ÿฅ‹"), + "amenity/donation_box": ("Donation Box", "๐Ÿ“ฆ"), + "amenity/dressing_room": ("Dressing Room", "๐Ÿ‘—"), + "amenity/drinking_water": ("Drinking Water", "๐Ÿšฐ"), + "amenity/driving_school": ("Driving School", "๐Ÿšฆ"), + "amenity/escooter_rental": ("E-Scooter Rental", "๐Ÿ›ด"), + "amenity/events_venue": ("Events Venue", "๐ŸŽช"), + "amenity/fast_food": ("Fast Food", "๐Ÿ”"), + "amenity/feeding_place": ("Feeding Place", "๐Ÿฝ๏ธ"), + "amenity/ferry_terminal": ("Ferry Terminal", "โ›ด๏ธ"), + "amenity/fire_station": ("Fire Station", "๐Ÿš’"), + "amenity/food_court": ("Food Court", "๐Ÿด"), + "amenity/fountain": ("Fountain", "โ›ฒ"), + "amenity/fuel": ("Fuel Station", "โ›ฝ"), + "amenity/gambling": ("Gambling", "๐ŸŽฒ"), + "amenity/grave_yard": ("Graveyard", "๐Ÿชฆ"), + "amenity/hall": ("Hall", "๐Ÿ›๏ธ"), + "amenity/hookah_lounge": ("Hookah Lounge", "๐Ÿ’จ"), + "amenity/hospital": ("Hospital", "๐Ÿฅ"), + "amenity/ice_cream": ("Ice Cream", "๐Ÿฆ"), + "amenity/internet_cafe": ("Internet Cafรฉ", "๐ŸŒ"), + "amenity/kick-scooter_rental": ("Kick Scooter Rental", "๐Ÿ›ด"), + "amenity/kindergarten": ("Kindergarten", "๐Ÿ’’"), + "amenity/language_school": ("Language School", "๐Ÿ—ฃ๏ธ"), + "amenity/letter_box": ("Letter Box", "๐Ÿ“ฎ"), + "amenity/library": ("Library", "๐Ÿ“š"), + "amenity/loading_dock": ("Loading Dock", "๐Ÿ“ฅ"), + "amenity/lounge": ("Lounge", "๐Ÿ›‹๏ธ"), + "amenity/lounger": ("Public Lounger", "๐Ÿช‘"), + "amenity/marketplace": ("Market", "๐Ÿ›’"), + "amenity/money_transfer": ("Money Transfer", "๐Ÿ’ธ"), + "amenity/mounting_block": ("Mounting Block", "๐Ÿด"), + "amenity/music_school": ("Music School", "๐ŸŽต"), + "amenity/music_venue": ("Music Venue", "๐ŸŽถ"), + "amenity/nightclub": ("Nightclub", "๐Ÿชฉ"), + "amenity/nursing_home": ("Nursing Home", "๐Ÿ "), + "amenity/parcel_locker": ("Parcel Locker", "๐Ÿ“ฆ"), + "amenity/payment_terminal": ("Payment Terminal", "๐Ÿ’ณ"), + "amenity/pharmacy": ("Pharmacy", "๐Ÿ’Š"), + "amenity/photo_booth": ("Photo Booth", "๐Ÿ“ธ"), + "amenity/piano": ("Public Piano", "๐ŸŽน"), + "amenity/place_of_worship": ("Place of Worship", "โ›ช"), + "amenity/police": ("Police Station", "๐Ÿš”"), + "amenity/post_depot": ("Post Depot", "๐Ÿ“ฌ"), + "amenity/post_office": ("Post Office", "๐Ÿค"), + "amenity/prep_school": ("Prep School", "๐Ÿ“–"), + "amenity/pub": ("Pub", "๐Ÿบ"), + "amenity/public_bookcase": ("Public Bookcase", "๐Ÿ“•"), + "amenity/public_building": ("Public Building", "๐Ÿข"), + "amenity/reception_desk": ("Reception Desk", "๐Ÿ›Ž๏ธ"), + "amenity/recycling": ("Recycling", "โ™ป๏ธ"), + "amenity/restaurant": ("Restaurant", "๐Ÿฝ๏ธ"), + "amenity/sanitary_dump_station": ("Sanitary Dump Station", "๐Ÿšฟ"), + "amenity/school": ("School", "๐Ÿซ"), + "amenity/scout_hut": ("Scout Hut", "โšœ๏ธ"), + "amenity/shelter": ("Shelter", "๐Ÿ›–"), + "amenity/shower": ("Public Shower", "๐Ÿšฟ"), + "amenity/smoking_area": ("Smoking Area", "๐Ÿšฌ"), + "amenity/social_centre": ("Social Centre", "๐Ÿ˜๏ธ"), + "amenity/social_club": ("Social Club", "๐Ÿค"), + "amenity/social_facility": ("Social Facility", "๐Ÿซ‚"), + "amenity/stripclub": ("Strip Club", "๐Ÿ”ž"), + "amenity/studio": ("Studio", "๐ŸŽ™๏ธ"), + "amenity/table": ("Public Table", "๐Ÿช‘"), + "amenity/taxi": ("Taxi Stand", "๐Ÿš•"), + "amenity/telescope": ("Public Telescope", "๐Ÿ”ญ"), + "amenity/theatre": ("Theatre", "๐ŸŽญ"), + "amenity/ticket_validator": ("Ticket Validator", "๐ŸŽซ"), + "amenity/townhall": ("Town Hall", "๐Ÿ›๏ธ"), + "amenity/training": ("Training Centre", "๐Ÿ“"), + "amenity/trolley_bay": ("Trolley Bay", "๐Ÿ›’"), + "amenity/university": ("University", "๐Ÿซ"), + "amenity/vehicle_inspection": ("Vehicle Inspection", "๐Ÿ”"), + "amenity/vending_machine": ("Vending Machine", "๐Ÿง"), + "amenity/veterinary": ("Vet", "๐Ÿ•"), + "amenity/washing_machine": ("Washing Machine", "๐Ÿงบ"), + "amenity/washingline": ("Washing Line", "๐Ÿ‘•"), + "amenity/waste_disposal": ("Waste Disposal", "๐Ÿ—‘๏ธ"), + "amenity/waste_transfer_station": ("Waste Transfer Station", "๐Ÿš›"), + "amenity/water_point": ("Water Point", "๐Ÿ’ง"), + "amenity/watering_place": ("Watering Place", "๐Ÿšฐ"), + "amenity/weighbridge": ("Weighbridge", "โš–๏ธ"), + # building + "building/barn": ("Barn", "๐Ÿš๏ธ"), + "building/bunker": ("Bunker", "๐Ÿ—๏ธ"), + "building/chapel": ("Chapel", "โ›ช"), + "building/church": ("Church", "โ›ช"), + "building/commercial": ("Commercial Building", "๐Ÿฌ"), + "building/construction": ("Construction Site", "๐Ÿšง"), + "building/farm": ("Farmhouse", "๐ŸŒพ"), + "building/greenhouse": ("Greenhouse", "๐ŸŒฟ"), + "building/industrial": ("Industrial Building", "๐Ÿญ"), + "building/kiosk": ("Kiosk", "๐Ÿช"), + "building/retail": ("Retail Building", "๐Ÿฌ"), + "building/ruins": ("Ruins", "๐Ÿš๏ธ"), + "building/school": ("School Building", "๐Ÿซ"), + "building/semidetached_house": ("Semi-Detached House", "๐Ÿ "), + "building/service": ("Service Building", "๐Ÿ”ง"), + "building/university": ("University Building", "๐ŸŽ“"), + "building/warehouse": ("Warehouse", "๐Ÿญ"), + # craft + "craft/agricultural_engines": ("Agricultural Engines", "๐Ÿšœ"), + "craft/atelier": ("Atelier", "๐ŸŽจ"), + "craft/blacksmith": ("Blacksmith", "๐Ÿ”จ"), + "craft/bookbinder": ("Bookbinder", "๐Ÿ“–"), + "craft/brewery": ("Brewery", "๐Ÿบ"), + "craft/builder": ("Builder", "๐Ÿงฑ"), + "craft/carpenter": ("Carpenter", "๐Ÿชš"), + "craft/caterer": ("Caterer", "๐Ÿฑ"), + "craft/cleaning": ("Cleaning Service", "๐Ÿงน"), + "craft/confectionery": ("Confectioner", "๐Ÿฌ"), + "craft/distillery": ("Distillery", "๐Ÿฅƒ"), + "craft/dressmaker": ("Dressmaker", "๐Ÿ‘—"), + "craft/electrician": ("Electrician", "โšก"), + "craft/electronics_repair": ("Electronics Repair", "๐Ÿ”Œ"), + "craft/floorer": ("Flooring Specialist", "๐Ÿชต"), + "craft/gardener": ("Gardener", "๐ŸŒฑ"), + "craft/glaziery": ("Glazier", "๐ŸชŸ"), + "craft/handicraft": ("Handicraft", "โœ‚๏ธ"), + "craft/hvac": ("HVAC", "โ„๏ธ"), + "craft/jeweller": ("Jeweller", "๐Ÿ’Ž"), + "craft/joiner": ("Joiner", "๐Ÿชš"), + "craft/key_cutter": ("Key Cutter", "๐Ÿ”‘"), + "craft/locksmith": ("Locksmith", "๐Ÿ”"), + "craft/metal_construction": ("Metal Fabrication", "๐Ÿ”ฉ"), + "craft/painter": ("Painter & Decorator", "๐Ÿ–Œ๏ธ"), + "craft/photographer": ("Photographer", "๐Ÿ“ท"), + "craft/photographic_laboratory": ("Photo Lab", "๐Ÿ–ผ๏ธ"), + "craft/plumber": ("Plumber", "๐Ÿ”ง"), + "craft/pottery": ("Pottery", "๐Ÿบ"), + "craft/printer": ("Printer", "๐Ÿ–จ๏ธ"), + "craft/roofer": ("Roofer", "๐Ÿ "), + "craft/sawmill": ("Sawmill", "๐Ÿชต"), + "craft/scaffolder": ("Scaffolder", "๐Ÿ—๏ธ"), + "craft/sculptor": ("Sculptor", "๐Ÿ—ฟ"), + "craft/shoemaker": ("Shoemaker", "๐Ÿ‘ž"), + "craft/signmaker": ("Sign Maker", "๐Ÿชง"), + "craft/stonemason": ("Stonemason", "๐Ÿชจ"), + "craft/tailor": ("Tailor", "๐Ÿงต"), + "craft/upholsterer": ("Upholsterer", "๐Ÿ›‹๏ธ"), + "craft/watchmaker": ("Watchmaker", "โŒš"), + "craft/window_construction": ("Window Fitter", "๐ŸชŸ"), + "craft/winery": ("Winery", "๐Ÿท"), + "craft/yes": ("Craft Workshop", "๐Ÿ› ๏ธ"), + # healthcare + "healthcare/alternative": ("Alternative Medicine", "๐ŸŒฟ"), + "healthcare/audiologist": ("Audiologist", "๐Ÿ‘‚"), + "healthcare/centre": ("Health Centre", "๐Ÿฅ"), + "healthcare/clinic": ("Health Clinic", "๐Ÿฉบ"), + "healthcare/counselling": ("Counselling", "๐Ÿง "), + "healthcare/dentist": ("Dental Practice", "๐Ÿฆท"), + "healthcare/doctor": ("GP Surgery", "๐Ÿ‘จโ€โš•๏ธ"), + "healthcare/hospital": ("Hospital", "๐Ÿฅ"), + "healthcare/laboratory": ("Medical Lab", "๐Ÿ”ฌ"), + "healthcare/optometrist": ("Optometrist", "๐Ÿ‘๏ธ"), + "healthcare/pharmacy": ("Pharmacy", "๐Ÿ’Š"), + "healthcare/physiotherapist": ("Physiotherapist", "๐Ÿƒ"), + "healthcare/podiatrist": ("Podiatrist", "๐Ÿฆถ"), + "healthcare/psychotherapist": ("Psychotherapist", "๐Ÿง "), + "healthcare/rehabilitation": ("Rehabilitation Centre", "โ™ฟ"), + "healthcare/vaccination_centre": ("Vaccination Centre", "๐Ÿ’‰"), + "healthcare/yes": ("Healthcare Facility", "๐Ÿฅ"), + # leisure + "leisure/adult_gaming_centre": ("Adult Gaming Centre", "๐ŸŽฎ"), + "leisure/amusement_arcade": ("Amusement Arcade", "๐Ÿ•น๏ธ"), + "leisure/bandstand": ("Bandstand", "๐ŸŽบ"), + "leisure/bathing_place": ("Bathing Spot", "๐Ÿ–๏ธ"), + "leisure/bird_hide": ("Bird Hide", "๐Ÿฆ"), + "leisure/bowling_alley": ("Bowling Alley", "๐ŸŽณ"), + "leisure/common": ("Common Land", "๐ŸŒณ"), + "leisure/dance": ("Dance Venue", "๐Ÿ’ƒ"), + "leisure/dog_park": ("Dog Park", "๐Ÿ•"), + "leisure/escape_game": ("Escape Room", "๐Ÿ”“"), + "leisure/fitness_centre": ("Gym", "๐Ÿ‹๏ธ"), + "leisure/fitness_station": ("Outdoor Gym", "๐Ÿ’ช"), + "leisure/garden": ("Garden", "๐ŸŒท"), + "leisure/golf_course": ("Golf Course", "โ›ณ"), + "leisure/hackerspace": ("Hackerspace", "๐Ÿ’ป"), + "leisure/horse_riding": ("Horse Riding", "๐ŸŽ"), + "leisure/indoor_play": ("Indoor Play Area", "๐Ÿง’"), + "leisure/marina": ("Marina", "โš“"), + "leisure/miniature_golf": ("Mini Golf", "โ›ณ"), + "leisure/nature_reserve": ("Nature Reserve", "๐Ÿฆ”"), + "leisure/outdoor_seating": ("Outdoor Seating", "๐Ÿช‘"), + "leisure/park": ("Park", "๐ŸŒณ"), + "leisure/pitch": ("Sports Pitch", "โšฝ"), + "leisure/playground": ("Playground", "๐Ÿ›"), + "leisure/sauna": ("Sauna", "๐Ÿง–"), + "leisure/slipway": ("Slipway", "๐Ÿšค"), + "leisure/social_club": ("Social Club", "๐Ÿป"), + "leisure/sports_centre": ("Sports Centre", "๐ŸŸ๏ธ"), + "leisure/sports_hall": ("Sports Hall", "๐Ÿ€"), + "leisure/swimming_pool": ("Swimming Pool", "๐ŸŠ"), + "leisure/tanning_salon": ("Tanning Salon", "โ˜€๏ธ"), + "leisure/track": ("Running Track", "๐Ÿƒ"), + "leisure/trampoline_park": ("Trampoline Park", "๐Ÿคธ"), + "leisure/water_park": ("Water Park", "๐ŸŒŠ"), + "leisure/wildlife_hide": ("Wildlife Hide", "๐ŸฆŒ"), + "leisure/yes": ("Leisure Facility", "๐ŸŽ‰"), + # office + "office/accountant": ("Accountant", "๐Ÿงฎ"), + "office/advertising_agency": ("Advertising Agency", "๐Ÿ“ข"), + "office/architect": ("Architect", "๐Ÿ“"), + "office/association": ("Association", "๐Ÿ›๏ธ"), + "office/charity": ("Charity", "โค๏ธ"), + "office/construction_company": ("Construction Company", "๐Ÿ—๏ธ"), + "office/consulting": ("Consulting Firm", "๐Ÿ“Š"), + "office/courier": ("Courier Service", "๐Ÿ“ฆ"), + "office/coworking": ("Co-working Space", "๐Ÿ’ป"), + "office/design": ("Design Studio", "๐ŸŽจ"), + "office/diplomatic": ("Diplomatic Office", "๐Ÿ›๏ธ"), + "office/educational_institution": ("Education Office", "๐ŸŽ“"), + "office/employment_agency": ("Employment Agency", "๐Ÿ’ผ"), + "office/energy_supplier": ("Energy Supplier", "โšก"), + "office/engineer": ("Engineering Firm", "โš™๏ธ"), + "office/estate_agent": ("Estate Agent", "๐Ÿ "), + "office/financial": ("Financial Services", "๐Ÿ’ฐ"), + "office/financial_advisor": ("Financial Advisor", "๐Ÿ“ˆ"), + "office/foundation": ("Foundation", "๐Ÿ›๏ธ"), + "office/government": ("Government Office", "๐Ÿ›๏ธ"), + "office/graphic_design": ("Graphic Design", "๐Ÿ–Œ๏ธ"), + "office/healthcare": ("Healthcare Office", "๐Ÿฅ"), + "office/home_care": ("Home Care Service", "๐Ÿ "), + "office/insurance": ("Insurance", "๐Ÿ›ก๏ธ"), + "office/interior_design": ("Interior Design", "๐Ÿ›‹๏ธ"), + "office/it": ("IT Company", "๐Ÿ’ป"), + "office/lawyer": ("Lawyer", "โš–๏ธ"), + "office/logistics": ("Logistics", "๐Ÿšš"), + "office/marketing": ("Marketing Agency", "๐Ÿ“ฃ"), + "office/mortgage": ("Mortgage Broker", "๐Ÿฆ"), + "office/moving_company": ("Moving Company", "๐Ÿ“ฆ"), + "office/newspaper": ("Newspaper Office", "๐Ÿ“ฐ"), + "office/ngo": ("NGO", "๐ŸŒ"), + "office/notary": ("Notary", "๐Ÿ“œ"), + "office/political_party": ("Political Party", "๐Ÿ—ณ๏ธ"), + "office/politician": ("Politician Office", "๐Ÿ›๏ธ"), + "office/property_management": ("Property Management", "๐Ÿ˜๏ธ"), + "office/recruitment": ("Recruitment Agency", "๐Ÿ‘ฅ"), + "office/religion": ("Religious Office", "โœ๏ธ"), + "office/research": ("Research Office", "๐Ÿ”ฌ"), + "office/security": ("Security Company", "๐Ÿ”’"), + "office/solicitor": ("Solicitor", "โš–๏ธ"), + "office/surveyor": ("Surveyor", "๐Ÿ“"), + "office/tax_advisor": ("Tax Advisor", "๐Ÿงพ"), + "office/taxi": ("Taxi Office", "๐Ÿš•"), + "office/telecommunication": ("Telecoms Office", "๐Ÿ“ก"), + "office/therapist": ("Therapist", "๐Ÿง "), + "office/travel_agent": ("Travel Agent", "โœˆ๏ธ"), + "office/union": ("Trade Union", "โœŠ"), + "office/university": ("University Office", "๐ŸŽ“"), + "office/vacant": ("Vacant Office", "๐Ÿš๏ธ"), + "office/web_design": ("Web Design", "๐ŸŒ"), + # public_transport + "public_transport/entrance": ("Transport Entrance", "๐Ÿšช"), + "public_transport/platform": ("Platform", "๐Ÿš‰"), + "public_transport/station": ("Station", "๐Ÿš‰"), + "public_transport/stop_position": ("Stop", "๐Ÿš"), + # shop + "shop/accessories": ("Accessories Shop", "๐Ÿ‘œ"), + "shop/agrarian": ("Farm Supply Shop", "๐ŸŒพ"), + "shop/alcohol": ("Off-Licence", "๐Ÿท"), + "shop/antiques": ("Antiques Shop", "๐Ÿบ"), + "shop/appliance": ("Appliance Shop", "๐Ÿ”Œ"), + "shop/art": ("Art Shop", "๐ŸŽจ"), + "shop/baby_goods": ("Baby Shop", "๐Ÿผ"), + "shop/bag": ("Bag Shop", "๐Ÿ‘œ"), + "shop/bakery": ("Bakery", "๐Ÿฅ"), + "shop/bathroom": ("Bathroom Shop", "๐Ÿ›"), + "shop/bathroom_furnishing": ("Bathroom Furnishings", "๐Ÿšฟ"), + "shop/beauty": ("Beauty Shop", "๐Ÿ’„"), + "shop/bed": ("Bed Shop", "๐Ÿ›๏ธ"), + "shop/beverages": ("Drinks Shop", "๐Ÿฅค"), + "shop/bicycle": ("Bike Shop", "๐Ÿšฒ"), + "shop/boat": ("Boat Shop", "โ›ต"), + "shop/bookmaker": ("Bookmaker", "๐Ÿ‡"), + "shop/books": ("Bookshop", "๐Ÿ“š"), + "shop/boutique": ("Boutique", "๐Ÿ‘—"), + "shop/building_materials": ("Building Materials", "๐Ÿงฑ"), + "shop/butcher": ("Butcher", "๐Ÿฅฉ"), + "shop/camera": ("Camera Shop", "๐Ÿ“ท"), + "shop/candles": ("Candle Shop", "๐Ÿ•ฏ๏ธ"), + "shop/car": ("Car Dealership", "๐Ÿš—"), + "shop/car;car_repair": ("Car Sales & Repair", "๐Ÿš—"), + "shop/car_parts": ("Car Parts", "๐Ÿ”ฉ"), + "shop/car_repair": ("Car Repair", "๐Ÿ”ง"), + "shop/caravan": ("Caravan Dealer", "๐Ÿš"), + "shop/carpet": ("Carpet Shop", "๐Ÿงถ"), + "shop/catalogue": ("Catalogue Shop", "๐Ÿ“‹"), + "shop/charity": ("Charity Shop", "โค๏ธ"), + "shop/cheese": ("Cheese Shop", "๐Ÿง€"), + "shop/chemist": ("Chemist", "๐Ÿงช"), + "shop/chocolate": ("Chocolate Shop", "๐Ÿซ"), + "shop/clothes": ("Clothes Shop", "๐Ÿ‘•"), + "shop/coffee": ("Coffee Shop", "โ˜•"), + "shop/collector": ("Collector Shop", "๐Ÿ†"), + "shop/computer": ("Computer Shop", "๐Ÿ–ฅ๏ธ"), + "shop/confectionery": ("Sweet Shop", "๐Ÿฌ"), + "shop/convenience": ("Convenience Store", "๐Ÿช"), + "shop/copyshop": ("Copy Shop", "๐Ÿ–จ๏ธ"), + "shop/cosmetics": ("Cosmetics Shop", "๐Ÿ’…"), + "shop/country_store": ("Country Store", "๐Ÿก"), + "shop/craft": ("Craft Shop", "โœ‚๏ธ"), + "shop/curtain": ("Curtain Shop", "๐ŸชŸ"), + "shop/dairy": ("Dairy Shop", "๐Ÿฅ›"), + "shop/deli": ("Delicatessen", "๐Ÿง†"), + "shop/department_store": ("Department Store", "๐Ÿฌ"), + "shop/discount": ("Discount Store", "๐Ÿ’ฒ"), + "shop/doityourself": ("DIY Store", "๐Ÿ”จ"), + "shop/doors": ("Door Shop", "๐Ÿšช"), + "shop/dry_cleaning": ("Dry Cleaner", "๐Ÿ‘”"), + "shop/e-cigarette": ("Vape Shop", "๐Ÿ’จ"), + "shop/electrical": ("Electrical Shop", "โšก"), + "shop/electronics": ("Electronics Shop", "๐Ÿ“ฑ"), + "shop/erotic": ("Adult Shop", "๐Ÿ”ž"), + "shop/esoteric": ("Esoteric Shop", "๐Ÿ”ฎ"), + "shop/estate_agent": ("Estate Agent", "๐Ÿ "), + "shop/fabric": ("Fabric Shop", "๐Ÿงต"), + "shop/fan": ("Fan Shop", "๐Ÿ…"), + "shop/farm": ("Farm Shop", "๐Ÿฅ•"), + "shop/fashion_accessories": ("Fashion Accessories", "๐Ÿ‘’"), + "shop/fireplace": ("Fireplace Shop", "๐Ÿ”ฅ"), + "shop/fishing": ("Fishing Shop", "๐ŸŽฃ"), + "shop/flooring": ("Flooring Shop", "๐Ÿชต"), + "shop/florist": ("Florist", "๐Ÿ’"), + "shop/food": ("Food Shop", "๐Ÿž"), + "shop/frame": ("Framing Shop", "๐Ÿ–ผ๏ธ"), + "shop/frozen_food": ("Frozen Food Shop", "๐ŸงŠ"), + "shop/fuel": ("Fuel Shop", "โ›ฝ"), + "shop/funeral_directors": ("Funeral Director", "โšฐ๏ธ"), + "shop/furniture": ("Furniture Shop", "๐Ÿช‘"), + "shop/games": ("Games Shop", "๐ŸŽฎ"), + "shop/garden_centre": ("Garden Centre", "๐ŸŒป"), + "shop/gas": ("Gas Shop", "๐Ÿ”ฅ"), + "shop/general": ("General Store", "๐Ÿช"), + "shop/gift": ("Gift Shop", "๐ŸŽ"), + "shop/glaziery": ("Glazier", "๐ŸชŸ"), + "shop/greengrocer": ("Greengrocer", "๐Ÿฅฌ"), + "shop/grocery": ("Grocery Shop", "๐Ÿ›’"), + "shop/haberdashery": ("Haberdashery", "๐Ÿงต"), + "shop/hairdresser": ("Hairdresser", "๐Ÿ’‡"), + "shop/hairdresser_supply": ("Hairdresser Supply", "๐Ÿ’‡"), + "shop/hardware": ("Hardware Shop", "๐Ÿ”ฉ"), + "shop/health": ("Health Shop", "๐ŸŒฟ"), + "shop/health_food": ("Health Food Shop", "๐Ÿฅ—"), + "shop/hearing_aids": ("Hearing Aid Shop", "๐Ÿ‘‚"), + "shop/herbalist": ("Herbalist", "๐ŸŒฟ"), + "shop/hifi": ("Hi-Fi Shop", "๐Ÿ”Š"), + "shop/household": ("Household Shop", "๐Ÿ "), + "shop/household_linen": ("Linen Shop", "๐Ÿ›๏ธ"), + "shop/houseware": ("Houseware Shop", "๐Ÿณ"), + "shop/ice_cream": ("Ice Cream Shop", "๐Ÿฆ"), + "shop/interior_decoration": ("Interior Decoration", "๐Ÿ–ผ๏ธ"), + "shop/jewelry": ("Jewellery Shop", "๐Ÿ’"), + "shop/kiosk": ("Kiosk", "๐Ÿช"), + "shop/kitchen": ("Kitchen Shop", "๐Ÿณ"), + "shop/laundry": ("Laundry", "๐Ÿงบ"), + "shop/leather": ("Leather Shop", "๐Ÿงณ"), + "shop/lighting": ("Lighting Shop", "๐Ÿ’ก"), + "shop/locksmith": ("Locksmith", "๐Ÿ”"), + "shop/mall": ("Shopping Centre", "๐Ÿฌ"), + "shop/massage": ("Massage Parlour", "๐Ÿ’†"), + "shop/medical_supply": ("Medical Supply", "๐Ÿฉบ"), + "shop/military_surplus": ("Military Surplus", "๐ŸŽ–๏ธ"), + "shop/mobile_phone": ("Mobile Phone Shop", "๐Ÿ“ฑ"), + "shop/mobile_phone_accessories": ("Phone Accessories", "๐Ÿ“ฑ"), + "shop/mobility": ("Mobility Shop", "โ™ฟ"), + "shop/mobility_scooter": ("Mobility Scooter Shop", "๐Ÿฆฝ"), + "shop/model": ("Model Shop", "โœˆ๏ธ"), + "shop/money_lender": ("Money Lender", "๐Ÿ’ฐ"), + "shop/motorcycle": ("Motorcycle Shop", "๐Ÿ๏ธ"), + "shop/motorcycle_repair": ("Motorcycle Repair", "๐Ÿ”ง"), + "shop/music": ("Music Shop", "๐ŸŽต"), + "shop/musical_instrument": ("Musical Instrument Shop", "๐ŸŽธ"), + "shop/newsagent": ("Newsagent", "๐Ÿ“ฐ"), + "shop/nutrition_supplements": ("Nutrition Shop", "๐Ÿ’ช"), + "shop/optician": ("Optician", "๐Ÿ‘“"), + "shop/outdoor": ("Outdoor Shop", "๐Ÿ•๏ธ"), + "shop/outpost": ("Outpost", "๐Ÿ“ฆ"), + "shop/paint": ("Paint Shop", "๐ŸŽจ"), + "shop/party": ("Party Shop", "๐ŸŽˆ"), + "shop/pastry": ("Pastry Shop", "๐Ÿฅ"), + "shop/pawnbroker": ("Pawnbroker", "๐Ÿ’ฐ"), + "shop/perfumery": ("Perfumery", "๐ŸŒธ"), + "shop/pet": ("Pet Shop", "๐Ÿพ"), + "shop/pet_grooming": ("Pet Grooming", "๐Ÿฉ"), + "shop/photo": ("Photo Shop", "๐Ÿ“ธ"), + "shop/piercing": ("Piercing Studio", "๐Ÿ’Ž"), + "shop/plant_hire": ("Plant Hire", "๐Ÿšœ"), + "shop/pottery": ("Pottery Shop", "๐Ÿบ"), + "shop/printer_ink": ("Ink & Toner Shop", "๐Ÿ–จ๏ธ"), + "shop/printing": ("Print Shop", "๐Ÿ–จ๏ธ"), + "shop/psychic": ("Psychic", "๐Ÿ”ฎ"), + "shop/pyrotechnics": ("Fireworks Shop", "๐ŸŽ†"), + "shop/religion": ("Religious Shop", "โœ๏ธ"), + "shop/rental": ("Rental Shop", "๐Ÿ”‘"), + "shop/repair": ("Repair Shop", "๐Ÿ”ง"), + "shop/scuba_diving": ("Scuba Diving Shop", "๐Ÿคฟ"), + "shop/seafood": ("Fishmonger", "๐ŸŸ"), + "shop/second_hand": ("Second-Hand Shop", "โ™ป๏ธ"), + "shop/security": ("Security Shop", "๐Ÿ”’"), + "shop/sewing": ("Sewing Shop", "๐Ÿชก"), + "shop/shoe_repair": ("Shoe Repair", "๐Ÿ‘ž"), + "shop/shoes": ("Shoe Shop", "๐Ÿ‘Ÿ"), + "shop/sports": ("Sports Shop", "โšฝ"), + "shop/stationery": ("Stationery Shop", "โœ๏ธ"), + "shop/storage_rental": ("Self Storage", "๐Ÿ“ฆ"), + "shop/supermarket": ("Supermarket", "๐Ÿ›’"), + "shop/swimming_pool": ("Pool Supplies", "๐ŸŠ"), + "shop/tailor": ("Tailor", "๐Ÿงต"), + "shop/tattoo": ("Tattoo Studio", "๐Ÿ–‹๏ธ"), + "shop/taxi": ("Taxi Booking", "๐Ÿš•"), + "shop/tea": ("Tea Shop", "๐Ÿซ–"), + "shop/telecommunication": ("Telecoms Shop", "๐Ÿ“ก"), + "shop/ticket": ("Ticket Office", "๐ŸŽซ"), + "shop/tiles": ("Tile Shop", "๐Ÿ”ฒ"), + "shop/tobacco": ("Tobacconist", "๐Ÿšฌ"), + "shop/tool_hire": ("Tool Hire", "๐Ÿงฐ"), + "shop/toys": ("Toy Shop", "๐Ÿงธ"), + "shop/trade": ("Trade Supplier", "๐Ÿญ"), + "shop/travel_agency": ("Travel Agency", "โœˆ๏ธ"), + "shop/trophy": ("Trophy Shop", "๐Ÿ†"), + "shop/tyres": ("Tyre Shop", "๐Ÿ›ž"), + "shop/vacant": ("Vacant Shop", "๐Ÿš๏ธ"), + "shop/variety_store": ("Variety Store", "๐Ÿช"), + "shop/video": ("Video Shop", "๐Ÿ“€"), + "shop/video_games": ("Video Game Shop", "๐ŸŽฎ"), + "shop/watches": ("Watch Shop", "โŒš"), + "shop/water_sports": ("Water Sports Shop", "๐Ÿ„"), + "shop/weapons": ("Weapons Shop", "๐Ÿ—ก๏ธ"), + "shop/wedding": ("Wedding Shop", "๐Ÿ’’"), + "shop/wholesale": ("Wholesaler", "๐Ÿ“ฆ"), + "shop/wigs": ("Wig Shop", "๐Ÿ’‡"), + "shop/window_blind": ("Blinds Shop", "๐ŸชŸ"), + "shop/windows": ("Window Shop", "๐ŸชŸ"), + "shop/wine": ("Wine Shop", "๐Ÿท"), + "shop/wool": ("Wool Shop", "๐Ÿงถ"), + "shop/yes": ("Shop", "๐Ÿ›๏ธ"), + # tourism + "tourism/artwork": ("Public Artwork", "๐ŸŽจ"), + "tourism/attraction": ("Tourist Attraction", "๐Ÿ“ธ"), + "tourism/camp_site": ("Campsite", "โ›บ"), + "tourism/caravan_site": ("Caravan Site", "๐Ÿš"), + "tourism/chalet": ("Chalet", "๐Ÿ”๏ธ"), + "tourism/gallery": ("Gallery", "๐Ÿ–ผ๏ธ"), + "tourism/guest_house": ("Guest House", "๐Ÿก"), + "tourism/hostel": ("Hostel", "๐Ÿ›๏ธ"), + "tourism/hotel": ("Hotel", "๐Ÿจ"), + "tourism/motel": ("Motel", "๐Ÿจ"), + "tourism/museum": ("Museum", "๐Ÿ›๏ธ"), + "tourism/picnic_site": ("Picnic Site", "๐Ÿงบ"), + "tourism/preserved_railway": ("Heritage Railway", "๐Ÿš‚"), + "tourism/theme_park": ("Theme Park", "๐ŸŽข"), + "tourism/viewpoint": ("Viewpoint", "๐Ÿ”ญ"), + "tourism/zoo": ("Zoo", "๐Ÿฆ"), +} + + +def transform(input_path: Path) -> pl.LazyFrame: + lf = pl.scan_parquet(input_path) + + # Get all unique categories present in the data + all_categories = lf.select("category").unique().collect().to_series().to_list() + + # Verify every non-dropped category has a mapping + unmapped = [] + for cat in all_categories: + if cat not in DROP_CATEGORIES and cat not in CATEGORY_MAP: + unmapped.append(cat) + if unmapped: + raise ValueError( + f"Categories missing from CATEGORY_MAP: {sorted(unmapped)}" + ) + + # Verify every CATEGORY_MAP key actually exists in the data (catch typos) + mapped_but_absent = [] + all_set = set(all_categories) + for cat in CATEGORY_MAP: + if cat not in all_set: + mapped_but_absent.append(cat) + if mapped_but_absent: + raise ValueError( + f"CATEGORY_MAP contains categories not in data: {sorted(mapped_but_absent)}" + ) + + # Drop unwanted categories + lf = lf.filter(~pl.col("category").is_in(list(DROP_CATEGORIES))) + + # Build name and emoji lookup expressions + name_mapping = {k: v[0] for k, v in CATEGORY_MAP.items()} + emoji_mapping = {k: v[1] for k, v in CATEGORY_MAP.items()} + + # Check no friendly names are missing (defensive) + missing_names = [k for k, v in CATEGORY_MAP.items() if not v[0]] + if missing_names: + raise ValueError(f"Empty friendly names for: {missing_names}") + missing_emojis = [k for k, v in CATEGORY_MAP.items() if not v[1]] + if missing_emojis: + raise ValueError(f"Empty emojis for: {missing_emojis}") + + lf = lf.with_columns( + pl.col("category").replace_strict(name_mapping).alias("category"), + pl.col("category").replace_strict(emoji_mapping).alias("emoji"), + ) + + return lf + + +def main(): + parser = argparse.ArgumentParser(description="Transform raw POIs to filtered version with friendly names") + parser.add_argument("--input", type=Path, required=True, help="Raw POIs parquet file") + parser.add_argument("--output", type=Path, required=True, help="Output filtered POIs parquet file") + args = parser.parse_args() + + df = transform(args.input).collect() + + df.write_parquet(args.output) + + size_mb = args.output.stat().st_size / (1024 * 1024) + print(f"Wrote {args.output} ({size_mb:.1f} MB, {len(df):,} POIs)") + print(f"\nCategories ({df['category'].n_unique()}):") + counts = df.group_by("category", "emoji").len().sort("len", descending=True) + for row in counts.iter_rows(named=True): + print(f" {row['emoji']} {row['category']}: {row['len']:,}") + + +if __name__ == "__main__": + main() diff --git a/pipeline/utils/test_fuzzy_join.py b/pipeline/utils/test_fuzzy_join.py index e9a1cb5..93c573c 100644 --- a/pipeline/utils/test_fuzzy_join.py +++ b/pipeline/utils/test_fuzzy_join.py @@ -6,7 +6,7 @@ POSTCODE = "E14 2DG" # Price paid: unique addresses for this postcode pp = ( - pl.scan_parquet("data_sources/pp-complete.parquet") + pl.scan_parquet("data/price-paid-complete.parquet") .filter(pl.col("postcode") == POSTCODE) .select("paon", "saon", "street", "postcode") .unique() @@ -22,7 +22,7 @@ pp = ( # EPC: latest inspection per address for this postcode epc = ( - pl.scan_csv("data_sources/epc/certificates.csv") + pl.scan_csv("data/epc/certificates.csv") .select("ADDRESS", "POSTCODE", "INSPECTION_DATE") .filter(pl.col("POSTCODE").str.strip_chars() == POSTCODE) .sort("INSPECTION_DATE", descending=True) diff --git a/pipeline/wide.py b/pipeline/wide.py deleted file mode 100644 index 3e170d9..0000000 --- a/pipeline/wide.py +++ /dev/null @@ -1,143 +0,0 @@ -"""Build a wide property dataframe and H3 aggregates from epc_pp output.""" - -import polars as pl -import h3 - -from pipeline.config import AGGREGATES_DIR, H3_RESOLUTIONS, DATA_DIR, PROCESSED_DIR - - -def _build_wide() -> pl.DataFrame: - """Build the wide dataframe by joining epc_pp with all auxiliary data.""" - print("Loading epc_pp...") - wide = pl.read_parquet(PROCESSED_DIR / "epc_pp.parquet") - print(f" {wide.shape[0]:,} rows") - - # GPS coordinates + LSOA from ArcGIS - print("Joining GPS coordinates...") - arcgis = pl.read_parquet(DATA_DIR / "arcgis_data.parquet").select( - pl.col("pcds").alias("postcode"), - "lat", - pl.col("long").alias("lon"), - "lsoa21", - ) - wide = wide.join(arcgis, on="postcode", how="inner") - print(f" {wide.shape[0]:,} rows after GPS join") - - # Journey times (optional) - journey_path = PROCESSED_DIR / "journey_times_bank_checkpoint.parquet" - if journey_path.exists(): - print("Joining journey times...") - journey_times = pl.read_parquet(journey_path).select( - "postcode", - "public_transport_easy_minutes", - "public_transport_quick_minutes", - "cycling_minutes", - ) - wide = wide.join(journey_times, on="postcode", how="left") - - # Index of Deprivation - iod_path = DATA_DIR / "IoD2025_Scores.parquet" - if iod_path.exists(): - print("Joining IoD scores...") - iod = pl.read_parquet(iod_path).drop( - "LSOA name (2021)", - "Local Authority District code (2024)", - "Local Authority District name (2024)", - ) - # Rename IoD columns to clean snake_case - iod = iod.rename(_IOD_RENAMES) - wide = wide.join( - iod, left_on="lsoa21", right_on="lsoa_code", how="left" - ) - - return wide - - -_IOD_RENAMES = { - "LSOA code (2021)": "lsoa_code", - "Index of Multiple Deprivation (IMD) Score": "imd_score", - "Income Score (rate)": "income_score", - "Employment Score (rate)": "employment_score", - "Education, Skills and Training Score": "education_score", - "Health Deprivation and Disability Score": "health_score", - "Crime Score": "crime_score", - "Barriers to Housing and Services Score": "housing_barriers_score", - "Living Environment Score": "living_environment_score", - "Income Deprivation Affecting Children Index (IDACI) Score (rate)": "idaci_score", - "Income Deprivation Affecting Older People (IDAOPI) Score (rate)": "idaopi_score", - "Children and Young People Sub-domain Score": "children_young_people_score", - "Adult Skills Sub-domain Score": "adult_skills_score", - "Geographical Barriers Sub-domain Score": "geographical_barriers_score", - "Wider Barriers Sub-domain Score": "wider_barriers_score", - "Indoors Sub-domain Score": "indoors_score", - "Outdoors Sub-domain Score": "outdoors_score", -} - - -def _add_h3_indices(df: pl.DataFrame) -> pl.DataFrame: - """Compute H3 indices from lat/lon for all configured resolutions.""" - print("Computing H3 indices...") - # Compute per unique postcode for efficiency, then join back - postcodes = df.select("postcode", "lat", "lon").unique(subset=["postcode"]) - - for res in H3_RESOLUTIONS: - col_name = f"h3_res{res}" - postcodes = postcodes.with_columns( - pl.struct(["lat", "lon"]) - .map_elements( - lambda x, r=res: h3.latlng_to_cell(x["lat"], x["lon"], r), - return_dtype=pl.Utf8, - ) - .alias(col_name) - ) - print(f" res{res}: {postcodes[col_name].n_unique():,} unique cells") - - h3_cols = [f"h3_res{res}" for res in H3_RESOLUTIONS] - return df.join( - postcodes.select("postcode", *h3_cols), on="postcode", how="left" - ) - - -def _aggregate_to_h3(df: pl.DataFrame) -> None: - """Aggregate min/max of every numeric feature per H3 cell at each resolution.""" - AGGREGATES_DIR.mkdir(parents=True, exist_ok=True) - - exclude = {"lat", "lon"} - numeric_cols = [ - col - for col, dtype in zip(df.columns, df.dtypes) - if dtype.is_numeric() and not col.startswith("h3_res") and col not in exclude - ] - - agg_exprs = [pl.len().alias("count")] - for col in numeric_cols: - agg_exprs.append(pl.col(col).min().alias(f"min_{col}")) - agg_exprs.append(pl.col(col).max().alias(f"max_{col}")) - - print("Aggregating to H3 cells...") - for res in H3_RESOLUTIONS: - h3_col = f"h3_res{res}" - result = df.group_by(h3_col).agg(agg_exprs).rename({h3_col: "h3"}) - path = AGGREGATES_DIR / f"res{res}.parquet" - result.write_parquet(path) - size_mb = path.stat().st_size / (1024 * 1024) - print(f" {path.name}: {result.shape[0]:,} cells ({size_mb:.1f} MB)") - - -def run(): - """Run the full wide pipeline: build wide df, compute H3, aggregate.""" - wide = _build_wide() - - wide_path = PROCESSED_DIR / "wide.parquet" - wide.write_parquet(wide_path) - size_mb = wide_path.stat().st_size / (1024 * 1024) - print(f"Wrote {wide_path} ({size_mb:.1f} MB)") - - wide = _add_h3_indices(wide) - _aggregate_to_h3(wide) - - print("Done.") - - -if __name__ == "__main__": - run()