Stop doing manual caching

This commit is contained in:
Andras Schmelczer 2026-01-31 10:18:31 +00:00
parent 5c39f31283
commit 0cea9b873c
6 changed files with 82 additions and 191 deletions

View file

@ -1,54 +0,0 @@
"""Download POI data for the UK from Overture Maps."""
from pathlib import Path
import overturemaps
import pyarrow as pa
import pyarrow.parquet as pq
from tqdm import tqdm
# UK bounding box (west, south, east, north)
UK_BBOX = (-8.65, 49.86, 1.77, 60.86)
OUTPUT_DIR = Path("data_sources")
OUTPUT_FILE = OUTPUT_DIR / "uk_pois.parquet"
def main():
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
if OUTPUT_FILE.exists():
print(f"POI file already exists: {OUTPUT_FILE}")
print("Delete it manually to re-download.")
return
print("Downloading UK POI data from Overture Maps...")
print(f"Bounding box: {UK_BBOX}")
print("This may take several minutes...")
reader = overturemaps.record_batch_reader("place", bbox=UK_BBOX)
# Read all batches
batches = []
with tqdm(desc="Downloading batches", unit=" batches") as pbar:
for batch in reader:
batches.append(batch)
pbar.update(1)
pbar.set_postfix(rows=sum(b.num_rows for b in batches))
if not batches:
print("No data found in bounding box!")
return
# Combine batches into a table and write
table = pa.Table.from_batches(batches, schema=reader.schema)
print(f"\nWriting {table.num_rows:,} POIs to {OUTPUT_FILE}...")
pq.write_table(table, OUTPUT_FILE)
print(f"Download complete: {OUTPUT_FILE}")
print(f"File size: {OUTPUT_FILE.stat().st_size / 1024 / 1024:.1f} MB")
if __name__ == "__main__":
main()

View file

@ -1,3 +1,5 @@
import argparse
import tempfile
import zipfile import zipfile
import httpx import httpx
import polars as pl import polars as pl
@ -6,12 +8,6 @@ from tqdm import tqdm
URL = "https://www.arcgis.com/sharing/rest/content/items/077631e063eb4e1ab43575d01381ec33/data" URL = "https://www.arcgis.com/sharing/rest/content/items/077631e063eb4e1ab43575d01381ec33/data"
BASE_DATA_PATH = Path("./data_sources")
BASE_DATA_PATH.mkdir(exist_ok=True)
DOWNLOAD_PATH = BASE_DATA_PATH / "arcgis_data.zip"
EXTRACT_PATH = BASE_DATA_PATH / "arcgis_extracted"
PARQUET_PATH = BASE_DATA_PATH / "arcgis_data.parquet"
def download_with_progress(url: str, output_path: Path) -> None: def download_with_progress(url: str, output_path: Path) -> None:
with httpx.stream( with httpx.stream(
@ -36,8 +32,8 @@ def download_with_progress(url: str, output_path: Path) -> None:
for chunk in response.iter_bytes(chunk_size=8192): for chunk in response.iter_bytes(chunk_size=8192):
f.write(chunk) f.write(chunk)
pbar.update(len(chunk)) pbar.update(len(chunk))
return return
def extract_zip(zip_path: Path, extract_path: Path) -> None: def extract_zip(zip_path: Path, extract_path: Path) -> None:
@ -49,23 +45,24 @@ def extract_zip(zip_path: Path, extract_path: Path) -> None:
def convert_to_parquet(data_path: Path, parquet_path: Path) -> None: def convert_to_parquet(data_path: Path, parquet_path: Path) -> None:
df = pl.scan_csv(data_path / 'Data/NSPL_MAY_2025_UK.csv', try_parse_dates=True) df = pl.scan_csv(data_path / 'Data/NSPL_MAY_2025_UK.csv', try_parse_dates=True)
print(f"Columns: {df.columns}") print(f"Columns: {df.collect_schema().names()}")
parquet_path.parent.mkdir(parents=True, exist_ok=True)
df.sink_parquet(parquet_path, compression="zstd") df.sink_parquet(parquet_path, compression="zstd")
print(f"Saved to {parquet_path}") print(f"Saved to {parquet_path}")
def main() -> None: def main() -> None:
if PARQUET_PATH.exists(): parser = argparse.ArgumentParser(description="Download and convert ArcGIS postcode data")
print(f"Parquet already exists at {PARQUET_PATH}, skipping") parser.add_argument("--output", type=Path, required=True, help="Output parquet file path")
return args = parser.parse_args()
if not DOWNLOAD_PATH.exists(): with tempfile.TemporaryDirectory() as cache_dir:
download_with_progress(URL, DOWNLOAD_PATH) download_path = Path(cache_dir) / "arcgis_data.zip"
else: extract_path = Path(cache_dir) / "arcgis_extracted"
print(f"File already exists at {DOWNLOAD_PATH}, skipping download")
extract_zip(DOWNLOAD_PATH, EXTRACT_PATH) download_with_progress(URL, download_path)
convert_to_parquet(EXTRACT_PATH, PARQUET_PATH) extract_zip(download_path, extract_path)
convert_to_parquet(extract_path, args.output)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View file

@ -1,14 +1,11 @@
import argparse
import httpx import httpx
import polars as pl import polars as pl
from pathlib import Path from pathlib import Path
import tempfile
URL = "https://assets.publishing.service.gov.uk/media/691ded34513046b952c500bd/File_5_IoD2025_Scores_for_the_Indices_of_Deprivation.xlsx" URL = "https://assets.publishing.service.gov.uk/media/691ded34513046b952c500bd/File_5_IoD2025_Scores_for_the_Indices_of_Deprivation.xlsx"
BASE_DATA_PATH = Path("./data_sources")
BASE_DATA_PATH.mkdir(exist_ok=True)
XLSX_PATH = BASE_DATA_PATH / "IoD2025_Scores.xlsx"
PARQUET_PATH = BASE_DATA_PATH / "IoD2025_Scores.parquet"
def download_file(url: str, output_path: Path) -> None: def download_file(url: str, output_path: Path) -> None:
with httpx.stream("GET", url, follow_redirects=True, timeout=60) as response: with httpx.stream("GET", url, follow_redirects=True, timeout=60) as response:
@ -41,17 +38,17 @@ def convert_to_parquet(xlsx_path: Path, parquet_path: Path) -> None:
df.write_parquet(parquet_path, compression="zstd") df.write_parquet(parquet_path, compression="zstd")
print(f"Saved to {parquet_path}") print(f"Saved to {parquet_path}")
print(f"Excel size: {xlsx_path.stat().st_size / 1024 / 1024:.1f} MB")
print(f"Parquet size: {parquet_path.stat().st_size / 1024 / 1024:.1f} MB")
def main() -> None: def main() -> None:
if not XLSX_PATH.exists(): parser = argparse.ArgumentParser(description="Download and convert Index of Deprivation data")
download_file(URL, XLSX_PATH) parser.add_argument("--output", type=Path, required=True, help="Output parquet file path")
else: args = parser.parse_args()
print(f"Excel file already exists at {XLSX_PATH}, skipping download")
convert_to_parquet(XLSX_PATH, PARQUET_PATH) with tempfile.TemporaryDirectory() as cache_dir:
xlsx_path = Path(cache_dir) / "IoD2025_Scores.xlsx"
download_file(URL, xlsx_path)
convert_to_parquet(xlsx_path, args.output)
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -1,5 +1,5 @@
import json import argparse
import shutil import tempfile
import urllib.request import urllib.request
from pathlib import Path from pathlib import Path
from tempfile import mkdtemp from tempfile import mkdtemp
@ -9,9 +9,9 @@ import polars as pl
from tqdm import tqdm from tqdm import tqdm
from .config import ( from .config import (
GB_PBF_FILE, BATCH_SIZE,
GEOFABRIK_GB_URL, GEOFABRIK_GB_URL,
OUTPUT_FILE, MIN_OCCURENCE_COUNT,
POI_TAG_KEYS, POI_TAG_KEYS,
UK_BBOX_EAST, UK_BBOX_EAST,
UK_BBOX_NORTH, UK_BBOX_NORTH,
@ -19,12 +19,11 @@ from .config import (
UK_BBOX_WEST, UK_BBOX_WEST,
) )
BATCH_SIZE = 50_000
def download_pbf() -> None: def download_pbf(pbf_file: Path) -> None:
GB_PBF_FILE.parent.mkdir(parents=True, exist_ok=True) pbf_file.parent.mkdir(parents=True, exist_ok=True)
tmp = GB_PBF_FILE.with_suffix(".pbf.tmp") tmp = pbf_file.with_suffix(".pbf.tmp")
print(f"Downloading {GEOFABRIK_GB_URL}") print(f"Downloading {GEOFABRIK_GB_URL}")
with ( with (
@ -39,13 +38,11 @@ def download_pbf() -> None:
f.write(chunk) f.write(chunk)
bar.update(len(chunk)) bar.update(len(chunk))
tmp.rename(GB_PBF_FILE) tmp.rename(pbf_file)
print(f"Saved to {GB_PBF_FILE}") print(f"Saved to {pbf_file}")
class POIHandler(osmium.SimpleHandler): class POIHandler(osmium.SimpleHandler):
"""Streams OSM data, filters to UK bbox, extracts matching POIs in batches."""
def __init__(self, progress: tqdm, tmp_dir: Path) -> None: def __init__(self, progress: tqdm, tmp_dir: Path) -> None:
super().__init__() super().__init__()
self._batch: list[dict] = [] self._batch: list[dict] = []
@ -60,16 +57,12 @@ class POIHandler(osmium.SimpleHandler):
and UK_BBOX_WEST <= lon <= UK_BBOX_EAST and UK_BBOX_WEST <= lon <= UK_BBOX_EAST
) )
def _match_tags(self, tags: osmium.osm.TagList) -> str | None: def _match_tags(self, tags: osmium.osm.TagList) -> list[str]:
parts = [tags[key] for key in POI_TAG_KEYS if key in tags] return [f"{key}/{tags[key]}" for key in POI_TAG_KEYS if key in tags]
return " / ".join(parts) if parts else None
def _get_name(self, tags: osmium.osm.TagList) -> str: def _get_name(self, tags: osmium.osm.TagList) -> str:
return tags.get("name:en", tags.get("name", "")) return tags.get("name:en", tags.get("name", ""))
def _tags_to_json(self, tags: osmium.osm.TagList) -> str:
return json.dumps({tag.k: tag.v for tag in tags})
def _flush_batch(self) -> None: def _flush_batch(self) -> None:
if not self._batch: if not self._batch:
return return
@ -89,7 +82,6 @@ class POIHandler(osmium.SimpleHandler):
"category": category, "category": category,
"lat": lat, "lat": lat,
"lng": lng, "lng": lng,
"osm_tags": self._tags_to_json(tags),
} }
) )
self.poi_count += 1 self.poi_count += 1
@ -107,54 +99,27 @@ class POIHandler(osmium.SimpleHandler):
lat, lon = n.location.lat, n.location.lon lat, lon = n.location.lat, n.location.lon
if not self._in_uk(lat, lon): if not self._in_uk(lat, lon):
return return
category = self._match_tags(n.tags) categories = self._match_tags(n.tags)
if category: for category in categories:
self._add_poi(f"n{n.id}", n.tags, category, lat, lon) self._add_poi(f"n{n.id}", n.tags, category, lat, lon)
def way(self, w: osmium.osm.Way) -> None:
self._tick()
category = self._match_tags(w.tags)
if not category:
return
lats = []
lons = []
for node in w.nodes:
try:
lats.append(node.location.lat)
lons.append(node.location.lon)
except osmium.InvalidLocationError:
continue
if not lats:
return
centroid_lat = sum(lats) / len(lats)
centroid_lng = sum(lons) / len(lons)
if not self._in_uk(centroid_lat, centroid_lng):
return
self._add_poi(f"w{w.id}", w.tags, category, centroid_lat, centroid_lng)
def main() -> None: def main() -> None:
if not GB_PBF_FILE.exists(): parser = argparse.ArgumentParser(description="Download and extract POIs from OpenStreetMap")
download_pbf() parser.add_argument("--output", type=Path, required=True, help="Output parquet file path")
args = parser.parse_args()
print(f"=== POI Extraction from {GB_PBF_FILE} ===") with tempfile.TemporaryDirectory() as cache_dir:
print( pbf_file = Path(cache_dir) / "great-britain-latest.osm.pbf"
f"UK bbox: ({UK_BBOX_WEST}, {UK_BBOX_SOUTH}, {UK_BBOX_EAST}, {UK_BBOX_NORTH})"
)
print(f"Tag keys: {POI_TAG_KEYS}")
print()
if OUTPUT_FILE.exists(): if not pbf_file.exists():
print("POIs are up-to-date") download_pbf(pbf_file)
return else:
print(f"Using cached PBF file at {pbf_file}")
tmp_dir = Path(mkdtemp(prefix="pois_")) print(f"Tag keys: {POI_TAG_KEYS}")
try:
tmp_dir = Path(mkdtemp(prefix="pois_"))
with tqdm( with tqdm(
unit=" elements", unit=" elements",
unit_scale=True, unit_scale=True,
@ -163,35 +128,26 @@ def main() -> None:
mininterval=1.0, mininterval=1.0,
) as progress: ) as progress:
handler = POIHandler(progress, tmp_dir) handler = POIHandler(progress, tmp_dir)
handler.apply_file(str(GB_PBF_FILE), locations=True) handler.apply_file(str(pbf_file), locations=True)
handler._flush_batch() # write any remaining POIs handler._flush_batch() # write any remaining POIs
print(f"Extracted {handler.poi_count:,} POIs") print(f"Extracted {handler.poi_count:,} POIs")
batch_files = sorted(tmp_dir.glob("batch_*.parquet")) batch_files = sorted(tmp_dir.glob("batch_*.parquet"))
if not batch_files:
print("No POIs found.")
return
df = pl.concat([pl.scan_parquet(f) for f in batch_files]) df = pl.concat([pl.scan_parquet(f) for f in batch_files])
OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True) # Only keep categories with enough occurrences
df.sink_parquet(OUTPUT_FILE) valid_categories = (
print(f"Saved to {OUTPUT_FILE}")
print("\n=== Summary ===")
print(f"Total POIs: {handler.poi_count:,}")
print("\nPOIs by category:")
category_counts = (
df.group_by("category") df.group_by("category")
.agg(pl.len().alias("count")) .agg(pl.len().alias("count"))
.sort("count", descending=True) .filter(pl.col("count") >= MIN_OCCURENCE_COUNT)
.collect()
) )
for row in category_counts.iter_rows(named=True): df = df.join(valid_categories.select("category"), on="category", how="semi")
print(f" {row['category']}: {row['count']:,}")
finally: args.output.parent.mkdir(parents=True, exist_ok=True)
shutil.rmtree(tmp_dir, ignore_errors=True) df.sink_parquet(args.output)
print(f"Saved to {args.output}")
print(f"Total POIs: {handler.poi_count:,}")
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -4,31 +4,29 @@ DATA_DIR = Path("./data_sources")
GB_PBF_FILE = DATA_DIR / "great-britain-latest.osm.pbf" GB_PBF_FILE = DATA_DIR / "great-britain-latest.osm.pbf"
OUTPUT_FILE = DATA_DIR / "uk_pois.parquet" OUTPUT_FILE = DATA_DIR / "uk_pois.parquet"
BATCH_SIZE = 50_000
MIN_OCCURENCE_COUNT = 20
GEOFABRIK_GB_URL = ( GEOFABRIK_GB_URL = (
"https://download.geofabrik.de/europe/great-britain-latest.osm.pbf" "https://download.geofabrik.de/europe/great-britain-latest.osm.pbf"
) )
# UK bounding box (west, south, east, north) — used for way centroid filtering
UK_BBOX_WEST = -7.57 UK_BBOX_WEST = -7.57
UK_BBOX_SOUTH = 49.96 UK_BBOX_SOUTH = 49.96
UK_BBOX_EAST = 1.68 UK_BBOX_EAST = 1.68
UK_BBOX_NORTH = 58.64 UK_BBOX_NORTH = 58.64
# OSM tag keys that indicate a POI. Any element with one of these keys is kept,
# regardless of the specific value. When multiple keys match, their values are
# concatenated with " / ".
POI_TAG_KEYS: list[str] = [ POI_TAG_KEYS: list[str] = [
"amenity", "amenity",
"shop",
"leisure",
"tourism",
"railway",
"aeroway",
"highway",
"public_transport",
"station",
"building", "building",
"military", "craft",
"emergency", "emergency",
"healthcare", "healthcare",
"leisure",
"office",
"shop",
"tourism",
"public_transport",
] ]

View file

@ -1,3 +1,5 @@
import argparse
import tempfile
import httpx import httpx
import polars as pl import polars as pl
from pathlib import Path from pathlib import Path
@ -5,11 +7,6 @@ from tqdm import tqdm
URL = "http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv" URL = "http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv"
BASE_DATA_PATH = Path("./data_sources")
BASE_DATA_PATH.mkdir(exist_ok=True)
CSV_PATH = BASE_DATA_PATH / "price-paid-complete.csv"
PARQUET_PATH = BASE_DATA_PATH / "price-paid-complete.parquet"
def download_with_progress(url: str, output_path: Path) -> None: def download_with_progress(url: str, output_path: Path) -> None:
with httpx.stream( with httpx.stream(
@ -36,7 +33,7 @@ def download_with_progress(url: str, output_path: Path) -> None:
pbar.update(len(chunk)) pbar.update(len(chunk))
return return
def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None: def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None:
"""Convert CSV to Parquet using Polars.""" """Convert CSV to Parquet using Polars."""
print("Converting to Parquet...") print("Converting to Parquet...")
@ -69,22 +66,22 @@ def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None:
try_parse_dates=True, try_parse_dates=True,
) )
parquet_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Columns: {df.collect_schema().names()}")
df.write_parquet(parquet_path, compression="zstd") df.write_parquet(parquet_path, compression="zstd")
print(f"Saved to {parquet_path}") print(f"Saved to {parquet_path}")
print(f"Rows: {df.height:,}")
def main() -> None: def main() -> None:
if PARQUET_PATH.exists(): parser = argparse.ArgumentParser(description="Download and convert Land Registry price-paid data")
print(f"Parquet already exists at {PARQUET_PATH}, skipping") parser.add_argument("--output", type=Path, required=True, help="Output parquet file path")
return args = parser.parse_args()
if not CSV_PATH.exists(): with tempfile.TemporaryDirectory() as cache_dir:
download_with_progress(URL, CSV_PATH) csv_path = Path(cache_dir) / "price-paid-complete.csv"
else:
print(f"CSV already exists at {CSV_PATH}, skipping download")
convert_to_parquet(CSV_PATH, PARQUET_PATH) download_with_progress(URL, csv_path)
convert_to_parquet(csv_path, args.output)
if __name__ == "__main__": if __name__ == "__main__":