"""Download OS Open Greenspace and extract access points. Downloads the OS Open Greenspace dataset as ESRI Shapefile and extracts access point locations (park entrances). Each access point is tagged with its parent site's function type (e.g. Public Park Or Garden). Sites without access points fall back to polygon centroids. Using access points rather than polygon centroids gives much more accurate distance calculations — a property next to Hyde Park won't show 400m just because the centroid is in the middle of the park. Source: https://osdatahub.os.uk/downloads/open/OpenGreenspace License: Open Government Licence v3.0 """ import argparse import tempfile from pathlib import Path import numpy as np import polars as pl import shapefile as shp from pyproj import Transformer from shapely.geometry import shape as to_shapely from pipeline.utils.download import download, extract_zip URL = "https://api.os.uk/downloads/v1/products/OpenGreenspace/downloads?area=GB&format=ESRI%C2%AE+Shapefile&redirect" _to_wgs84 = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True) def _find_field(field_names: list[str], *needles: str) -> int | None: """Find the index of the first field whose lowercased name contains any needle.""" for i, name in enumerate(field_names): lower = name.lower() for needle in needles: if needle in lower: return i return None def _read_site_functions(shp_path: Path) -> dict[str, str]: """Build a mapping from site ID → function type from the GreenspaceSite shapefile.""" reader = shp.Reader(str(shp_path), encoding="latin-1") field_names = [f[0] for f in reader.fields[1:]] id_idx = _find_field(field_names, "id") func_idx = _find_field(field_names, "funct") if id_idx is None or func_idx is None: raise ValueError(f"Missing id/function fields. Available: {field_names}") site_funcs = {} for rec in reader.iterRecords(): site_funcs[rec[id_idx]] = rec[func_idx] print(f" Loaded {len(site_funcs):,} site function mappings") return site_funcs def _read_access_points( shp_path: Path, site_funcs: dict[str, str] ) -> tuple[list[float], list[float], list[str]]: """Read access points, tagging each with its parent site's function.""" reader = shp.Reader(str(shp_path), encoding="latin-1") field_names = [f[0] for f in reader.fields[1:]] # The access point shapefile has a reference field linking to the parent site ref_idx = _find_field(field_names, "refto", "ref_to", "greensp") if ref_idx is None: raise ValueError( f"No site reference field found in access points. Available: {field_names}" ) lats: list[float] = [] lngs: list[float] = [] categories: list[str] = [] skipped = 0 for sr in reader.shapeRecords(): site_id = sr.record[ref_idx] func = site_funcs.get(site_id) if func is None: skipped += 1 continue try: geom = to_shapely(sr.shape.__geo_interface__) if geom.is_empty: continue lng, lat = _to_wgs84.transform(geom.x, geom.y) except Exception: continue lats.append(lat) lngs.append(lng) categories.append(func) if skipped: print(f" Skipped {skipped:,} access points with unknown site ID") return lats, lngs, categories def _read_site_centroids( shp_path: Path, site_funcs: dict[str, str], covered_ids: set[str] ) -> tuple[list[float], list[float], list[str]]: """Read polygon centroids for sites that have no access points (fallback).""" reader = shp.Reader(str(shp_path), encoding="latin-1") field_names = [f[0] for f in reader.fields[1:]] id_idx = _find_field(field_names, "id") func_idx = _find_field(field_names, "funct") if id_idx is None or func_idx is None: return [], [], [] lats: list[float] = [] lngs: list[float] = [] categories: list[str] = [] for sr in reader.shapeRecords(): site_id = sr.record[id_idx] if site_id in covered_ids: continue func = sr.record[func_idx] try: geom = to_shapely(sr.shape.__geo_interface__) if geom.is_empty or not geom.is_valid: continue centroid = geom.centroid lng, lat = _to_wgs84.transform(centroid.x, centroid.y) except Exception: continue lats.append(lat) lngs.append(lng) categories.append(func) return lats, lngs, categories def download_greenspace(output: Path) -> None: output.parent.mkdir(parents=True, exist_ok=True) with tempfile.TemporaryDirectory() as cache_dir: zip_path = Path(cache_dir) / "greenspace.zip" extract_dir = Path(cache_dir) / "extracted" download(URL, zip_path, timeout=300) extract_zip(zip_path, extract_dir) # Find both shapefiles site_shps = list(extract_dir.rglob("*GreenspaceSite*.shp")) access_shps = list(extract_dir.rglob("*AccessPoint*.shp")) if not site_shps: raise FileNotFoundError("No GreenspaceSite shapefile found") if not access_shps: raise FileNotFoundError("No AccessPoint shapefile found") # Step 1: Build site ID → function mapping print(f"Reading {site_shps[0].name} for function types...") site_funcs = _read_site_functions(site_shps[0]) # Step 2: Read access points (primary — park entrances) print(f"Reading {access_shps[0].name}...") ap_lats, ap_lngs, ap_cats = _read_access_points(access_shps[0], site_funcs) print(f" {len(ap_lats):,} access points loaded") # Step 3: Fall back to centroids for sites without any access points covered_ids = set() reader = shp.Reader(str(access_shps[0]), encoding="latin-1") field_names = [f[0] for f in reader.fields[1:]] ref_idx = _find_field(field_names, "refto", "ref_to", "greensp") if ref_idx is not None: for rec in reader.iterRecords(): covered_ids.add(rec[ref_idx]) print("Adding centroids for sites without access points...") fb_lats, fb_lngs, fb_cats = _read_site_centroids( site_shps[0], site_funcs, covered_ids ) print(f" {len(fb_lats):,} centroid fallbacks added") lats = ap_lats + fb_lats lngs = ap_lngs + fb_lngs categories = ap_cats + fb_cats df = pl.DataFrame( { "lat": np.array(lats, dtype=np.float64), "lng": np.array(lngs, dtype=np.float64), "category": categories, } ) df.write_parquet(output) size_mb = output.stat().st_size / (1024 * 1024) print(f"Wrote {output} ({size_mb:.1f} MB, {len(df):,} points)") counts = df.group_by("category").len().sort("len", descending=True) for row in counts.iter_rows(named=True): print(f" {row['category']}: {row['len']:,}") def main() -> None: parser = argparse.ArgumentParser( description="Download OS Open Greenspace access points" ) parser.add_argument( "--output", type=Path, required=True, help="Output parquet file path" ) args = parser.parse_args() download_greenspace(args.output) if __name__ == "__main__": main()