diff --git a/download_pois.py b/download_pois.py deleted file mode 100644 index fd7a06d..0000000 --- a/download_pois.py +++ /dev/null @@ -1,54 +0,0 @@ -"""Download POI data for the UK from Overture Maps.""" - -from pathlib import Path - -import overturemaps -import pyarrow as pa -import pyarrow.parquet as pq -from tqdm import tqdm - -# UK bounding box (west, south, east, north) -UK_BBOX = (-8.65, 49.86, 1.77, 60.86) - -OUTPUT_DIR = Path("data_sources") -OUTPUT_FILE = OUTPUT_DIR / "uk_pois.parquet" - - -def main(): - OUTPUT_DIR.mkdir(parents=True, exist_ok=True) - - if OUTPUT_FILE.exists(): - print(f"POI file already exists: {OUTPUT_FILE}") - print("Delete it manually to re-download.") - return - - print("Downloading UK POI data from Overture Maps...") - print(f"Bounding box: {UK_BBOX}") - print("This may take several minutes...") - - reader = overturemaps.record_batch_reader("place", bbox=UK_BBOX) - - # Read all batches - batches = [] - with tqdm(desc="Downloading batches", unit=" batches") as pbar: - for batch in reader: - batches.append(batch) - pbar.update(1) - pbar.set_postfix(rows=sum(b.num_rows for b in batches)) - - if not batches: - print("No data found in bounding box!") - return - - # Combine batches into a table and write - table = pa.Table.from_batches(batches, schema=reader.schema) - - print(f"\nWriting {table.num_rows:,} POIs to {OUTPUT_FILE}...") - pq.write_table(table, OUTPUT_FILE) - - print(f"Download complete: {OUTPUT_FILE}") - print(f"File size: {OUTPUT_FILE.stat().st_size / 1024 / 1024:.1f} MB") - - -if __name__ == "__main__": - main() diff --git a/pipeline/download/arcgis.py b/pipeline/download/arcgis.py index cdd284c..1277fc6 100644 --- a/pipeline/download/arcgis.py +++ b/pipeline/download/arcgis.py @@ -1,3 +1,5 @@ +import argparse +import tempfile import zipfile import httpx import polars as pl @@ -6,12 +8,6 @@ from tqdm import tqdm URL = "https://www.arcgis.com/sharing/rest/content/items/077631e063eb4e1ab43575d01381ec33/data" -BASE_DATA_PATH = Path("./data_sources") -BASE_DATA_PATH.mkdir(exist_ok=True) -DOWNLOAD_PATH = BASE_DATA_PATH / "arcgis_data.zip" -EXTRACT_PATH = BASE_DATA_PATH / "arcgis_extracted" -PARQUET_PATH = BASE_DATA_PATH / "arcgis_data.parquet" - def download_with_progress(url: str, output_path: Path) -> None: with httpx.stream( @@ -36,8 +32,8 @@ def download_with_progress(url: str, output_path: Path) -> None: for chunk in response.iter_bytes(chunk_size=8192): f.write(chunk) pbar.update(len(chunk)) - return - + return + def extract_zip(zip_path: Path, extract_path: Path) -> None: @@ -49,23 +45,24 @@ def extract_zip(zip_path: Path, extract_path: Path) -> None: def convert_to_parquet(data_path: Path, parquet_path: Path) -> None: df = pl.scan_csv(data_path / 'Data/NSPL_MAY_2025_UK.csv', try_parse_dates=True) - print(f"Columns: {df.columns}") + print(f"Columns: {df.collect_schema().names()}") + parquet_path.parent.mkdir(parents=True, exist_ok=True) df.sink_parquet(parquet_path, compression="zstd") print(f"Saved to {parquet_path}") def main() -> None: - if PARQUET_PATH.exists(): - print(f"Parquet already exists at {PARQUET_PATH}, skipping") - return + parser = argparse.ArgumentParser(description="Download and convert ArcGIS postcode data") + parser.add_argument("--output", type=Path, required=True, help="Output parquet file path") + args = parser.parse_args() - if not DOWNLOAD_PATH.exists(): - download_with_progress(URL, DOWNLOAD_PATH) - else: - print(f"File already exists at {DOWNLOAD_PATH}, skipping download") + with tempfile.TemporaryDirectory() as cache_dir: + download_path = Path(cache_dir) / "arcgis_data.zip" + extract_path = Path(cache_dir) / "arcgis_extracted" - extract_zip(DOWNLOAD_PATH, EXTRACT_PATH) - convert_to_parquet(EXTRACT_PATH, PARQUET_PATH) + download_with_progress(URL, download_path) + extract_zip(download_path, extract_path) + convert_to_parquet(extract_path, args.output) if __name__ == "__main__": main() diff --git a/pipeline/download/deprivation_data.py b/pipeline/download/deprivation_data.py index 6d42180..e73cb96 100644 --- a/pipeline/download/deprivation_data.py +++ b/pipeline/download/deprivation_data.py @@ -1,14 +1,11 @@ +import argparse import httpx import polars as pl from pathlib import Path +import tempfile URL = "https://assets.publishing.service.gov.uk/media/691ded34513046b952c500bd/File_5_IoD2025_Scores_for_the_Indices_of_Deprivation.xlsx" -BASE_DATA_PATH = Path("./data_sources") -BASE_DATA_PATH.mkdir(exist_ok=True) -XLSX_PATH = BASE_DATA_PATH / "IoD2025_Scores.xlsx" -PARQUET_PATH = BASE_DATA_PATH / "IoD2025_Scores.parquet" - def download_file(url: str, output_path: Path) -> None: with httpx.stream("GET", url, follow_redirects=True, timeout=60) as response: @@ -41,17 +38,17 @@ def convert_to_parquet(xlsx_path: Path, parquet_path: Path) -> None: df.write_parquet(parquet_path, compression="zstd") print(f"Saved to {parquet_path}") - print(f"Excel size: {xlsx_path.stat().st_size / 1024 / 1024:.1f} MB") - print(f"Parquet size: {parquet_path.stat().st_size / 1024 / 1024:.1f} MB") def main() -> None: - if not XLSX_PATH.exists(): - download_file(URL, XLSX_PATH) - else: - print(f"Excel file already exists at {XLSX_PATH}, skipping download") + parser = argparse.ArgumentParser(description="Download and convert Index of Deprivation data") + parser.add_argument("--output", type=Path, required=True, help="Output parquet file path") + args = parser.parse_args() - convert_to_parquet(XLSX_PATH, PARQUET_PATH) + with tempfile.TemporaryDirectory() as cache_dir: + xlsx_path = Path(cache_dir) / "IoD2025_Scores.xlsx" + download_file(URL, xlsx_path) + convert_to_parquet(xlsx_path, args.output) if __name__ == "__main__": diff --git a/pipeline/download/pois/__main__.py b/pipeline/download/pois/__main__.py index 091eae3..7b06c06 100644 --- a/pipeline/download/pois/__main__.py +++ b/pipeline/download/pois/__main__.py @@ -1,5 +1,5 @@ -import json -import shutil +import argparse +import tempfile import urllib.request from pathlib import Path from tempfile import mkdtemp @@ -9,9 +9,9 @@ import polars as pl from tqdm import tqdm from .config import ( - GB_PBF_FILE, + BATCH_SIZE, GEOFABRIK_GB_URL, - OUTPUT_FILE, + MIN_OCCURENCE_COUNT, POI_TAG_KEYS, UK_BBOX_EAST, UK_BBOX_NORTH, @@ -19,12 +19,11 @@ from .config import ( UK_BBOX_WEST, ) -BATCH_SIZE = 50_000 -def download_pbf() -> None: - GB_PBF_FILE.parent.mkdir(parents=True, exist_ok=True) - tmp = GB_PBF_FILE.with_suffix(".pbf.tmp") +def download_pbf(pbf_file: Path) -> None: + pbf_file.parent.mkdir(parents=True, exist_ok=True) + tmp = pbf_file.with_suffix(".pbf.tmp") print(f"Downloading {GEOFABRIK_GB_URL}") with ( @@ -39,13 +38,11 @@ def download_pbf() -> None: f.write(chunk) bar.update(len(chunk)) - tmp.rename(GB_PBF_FILE) - print(f"Saved to {GB_PBF_FILE}") + tmp.rename(pbf_file) + print(f"Saved to {pbf_file}") class POIHandler(osmium.SimpleHandler): - """Streams OSM data, filters to UK bbox, extracts matching POIs in batches.""" - def __init__(self, progress: tqdm, tmp_dir: Path) -> None: super().__init__() self._batch: list[dict] = [] @@ -60,16 +57,12 @@ class POIHandler(osmium.SimpleHandler): and UK_BBOX_WEST <= lon <= UK_BBOX_EAST ) - def _match_tags(self, tags: osmium.osm.TagList) -> str | None: - parts = [tags[key] for key in POI_TAG_KEYS if key in tags] - return " / ".join(parts) if parts else None + def _match_tags(self, tags: osmium.osm.TagList) -> list[str]: + return [f"{key}/{tags[key]}" for key in POI_TAG_KEYS if key in tags] def _get_name(self, tags: osmium.osm.TagList) -> str: return tags.get("name:en", tags.get("name", "")) - def _tags_to_json(self, tags: osmium.osm.TagList) -> str: - return json.dumps({tag.k: tag.v for tag in tags}) - def _flush_batch(self) -> None: if not self._batch: return @@ -89,7 +82,6 @@ class POIHandler(osmium.SimpleHandler): "category": category, "lat": lat, "lng": lng, - "osm_tags": self._tags_to_json(tags), } ) self.poi_count += 1 @@ -107,54 +99,27 @@ class POIHandler(osmium.SimpleHandler): lat, lon = n.location.lat, n.location.lon if not self._in_uk(lat, lon): return - category = self._match_tags(n.tags) - if category: + categories = self._match_tags(n.tags) + for category in categories: self._add_poi(f"n{n.id}", n.tags, category, lat, lon) - def way(self, w: osmium.osm.Way) -> None: - self._tick() - category = self._match_tags(w.tags) - if not category: - return - - lats = [] - lons = [] - for node in w.nodes: - try: - lats.append(node.location.lat) - lons.append(node.location.lon) - except osmium.InvalidLocationError: - continue - - if not lats: - return - - centroid_lat = sum(lats) / len(lats) - centroid_lng = sum(lons) / len(lons) - - if not self._in_uk(centroid_lat, centroid_lng): - return - - self._add_poi(f"w{w.id}", w.tags, category, centroid_lat, centroid_lng) - def main() -> None: - if not GB_PBF_FILE.exists(): - download_pbf() + parser = argparse.ArgumentParser(description="Download and extract POIs from OpenStreetMap") + parser.add_argument("--output", type=Path, required=True, help="Output parquet file path") + args = parser.parse_args() - print(f"=== POI Extraction from {GB_PBF_FILE} ===") - print( - f"UK bbox: ({UK_BBOX_WEST}, {UK_BBOX_SOUTH}, {UK_BBOX_EAST}, {UK_BBOX_NORTH})" - ) - print(f"Tag keys: {POI_TAG_KEYS}") - print() + with tempfile.TemporaryDirectory() as cache_dir: + pbf_file = Path(cache_dir) / "great-britain-latest.osm.pbf" - if OUTPUT_FILE.exists(): - print("POIs are up-to-date") - return + if not pbf_file.exists(): + download_pbf(pbf_file) + else: + print(f"Using cached PBF file at {pbf_file}") - tmp_dir = Path(mkdtemp(prefix="pois_")) - try: + print(f"Tag keys: {POI_TAG_KEYS}") + + tmp_dir = Path(mkdtemp(prefix="pois_")) with tqdm( unit=" elements", unit_scale=True, @@ -163,35 +128,26 @@ def main() -> None: mininterval=1.0, ) as progress: handler = POIHandler(progress, tmp_dir) - handler.apply_file(str(GB_PBF_FILE), locations=True) + handler.apply_file(str(pbf_file), locations=True) handler._flush_batch() # write any remaining POIs print(f"Extracted {handler.poi_count:,} POIs") batch_files = sorted(tmp_dir.glob("batch_*.parquet")) - if not batch_files: - print("No POIs found.") - return - df = pl.concat([pl.scan_parquet(f) for f in batch_files]) - OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True) - df.sink_parquet(OUTPUT_FILE) - print(f"Saved to {OUTPUT_FILE}") - - print("\n=== Summary ===") - print(f"Total POIs: {handler.poi_count:,}") - print("\nPOIs by category:") - category_counts = ( + # Only keep categories with enough occurrences + valid_categories = ( df.group_by("category") .agg(pl.len().alias("count")) - .sort("count", descending=True) - .collect() + .filter(pl.col("count") >= MIN_OCCURENCE_COUNT) ) - for row in category_counts.iter_rows(named=True): - print(f" {row['category']}: {row['count']:,}") - finally: - shutil.rmtree(tmp_dir, ignore_errors=True) + df = df.join(valid_categories.select("category"), on="category", how="semi") + + args.output.parent.mkdir(parents=True, exist_ok=True) + df.sink_parquet(args.output) + print(f"Saved to {args.output}") + print(f"Total POIs: {handler.poi_count:,}") if __name__ == "__main__": diff --git a/pipeline/download/pois/config.py b/pipeline/download/pois/config.py index 2a063c6..eb677a6 100644 --- a/pipeline/download/pois/config.py +++ b/pipeline/download/pois/config.py @@ -4,31 +4,29 @@ DATA_DIR = Path("./data_sources") GB_PBF_FILE = DATA_DIR / "great-britain-latest.osm.pbf" OUTPUT_FILE = DATA_DIR / "uk_pois.parquet" + +BATCH_SIZE = 50_000 + +MIN_OCCURENCE_COUNT = 20 + GEOFABRIK_GB_URL = ( "https://download.geofabrik.de/europe/great-britain-latest.osm.pbf" ) -# UK bounding box (west, south, east, north) — used for way centroid filtering UK_BBOX_WEST = -7.57 UK_BBOX_SOUTH = 49.96 UK_BBOX_EAST = 1.68 UK_BBOX_NORTH = 58.64 -# OSM tag keys that indicate a POI. Any element with one of these keys is kept, -# regardless of the specific value. When multiple keys match, their values are -# concatenated with " / ". POI_TAG_KEYS: list[str] = [ "amenity", - "shop", - "leisure", - "tourism", - "railway", - "aeroway", - "highway", - "public_transport", - "station", "building", - "military", + "craft", "emergency", "healthcare", + "leisure", + "office", + "shop", + "tourism", + "public_transport", ] diff --git a/pipeline/download/price_paid.py b/pipeline/download/price_paid.py index 38e0f69..fac4ec1 100644 --- a/pipeline/download/price_paid.py +++ b/pipeline/download/price_paid.py @@ -1,3 +1,5 @@ +import argparse +import tempfile import httpx import polars as pl from pathlib import Path @@ -5,11 +7,6 @@ from tqdm import tqdm URL = "http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv" -BASE_DATA_PATH = Path("./data_sources") -BASE_DATA_PATH.mkdir(exist_ok=True) -CSV_PATH = BASE_DATA_PATH / "price-paid-complete.csv" -PARQUET_PATH = BASE_DATA_PATH / "price-paid-complete.parquet" - def download_with_progress(url: str, output_path: Path) -> None: with httpx.stream( @@ -36,7 +33,7 @@ def download_with_progress(url: str, output_path: Path) -> None: pbar.update(len(chunk)) return - + def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None: """Convert CSV to Parquet using Polars.""" print("Converting to Parquet...") @@ -69,22 +66,22 @@ def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None: try_parse_dates=True, ) + parquet_path.parent.mkdir(parents=True, exist_ok=True) + print(f"Columns: {df.collect_schema().names()}") df.write_parquet(parquet_path, compression="zstd") print(f"Saved to {parquet_path}") - print(f"Rows: {df.height:,}") def main() -> None: - if PARQUET_PATH.exists(): - print(f"Parquet already exists at {PARQUET_PATH}, skipping") - return + parser = argparse.ArgumentParser(description="Download and convert Land Registry price-paid data") + parser.add_argument("--output", type=Path, required=True, help="Output parquet file path") + args = parser.parse_args() - if not CSV_PATH.exists(): - download_with_progress(URL, CSV_PATH) - else: - print(f"CSV already exists at {CSV_PATH}, skipping download") + with tempfile.TemporaryDirectory() as cache_dir: + csv_path = Path(cache_dir) / "price-paid-complete.csv" - convert_to_parquet(CSV_PATH, PARQUET_PATH) + download_with_progress(URL, csv_path) + convert_to_parquet(csv_path, args.output) if __name__ == "__main__":