fix greenspace and add more schools

This commit is contained in:
Andras Schmelczer 2026-03-25 08:06:16 +00:00
parent 3a3e249bdd
commit 9cd2b8849c
4 changed files with 161 additions and 61 deletions

View file

@ -1,8 +1,13 @@
"""Download OS Open Greenspace and extract site centroids. """Download OS Open Greenspace and extract access points.
Downloads the OS Open Greenspace dataset as ESRI Shapefile, computes Downloads the OS Open Greenspace dataset as ESRI Shapefile and extracts
WGS84 centroids for each greenspace site polygon, and outputs a parquet access point locations (park entrances). Each access point is tagged with
with lat/lng/category columns compatible with the POI proximity pipeline. its parent site's function type (e.g. Public Park Or Garden). Sites without
access points fall back to polygon centroids.
Using access points rather than polygon centroids gives much more accurate
distance calculations a property next to Hyde Park won't show 400m just
because the centroid is in the middle of the park.
Source: https://osdatahub.os.uk/downloads/open/OpenGreenspace Source: https://osdatahub.os.uk/downloads/open/OpenGreenspace
License: Open Government Licence v3.0 License: Open Government Licence v3.0
@ -25,61 +30,99 @@ URL = "https://api.os.uk/downloads/v1/products/OpenGreenspace/downloads?area=GB&
_to_wgs84 = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True) _to_wgs84 = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True)
def download_greenspace(output: Path) -> None: def _find_field(field_names: list[str], *needles: str) -> int | None:
output.parent.mkdir(parents=True, exist_ok=True) """Find the index of the first field whose lowercased name contains any needle."""
for i, name in enumerate(field_names):
lower = name.lower()
for needle in needles:
if needle in lower:
return i
return None
with tempfile.TemporaryDirectory() as cache_dir:
zip_path = Path(cache_dir) / "greenspace.zip"
extract_dir = Path(cache_dir) / "extracted"
download(URL, zip_path, timeout=300) def _read_site_functions(shp_path: Path) -> dict[str, str]:
extract_zip(zip_path, extract_dir) """Build a mapping from site ID → function type from the GreenspaceSite shapefile."""
reader = shp.Reader(str(shp_path), encoding="latin-1")
field_names = [f[0] for f in reader.fields[1:]]
# Find the GreenspaceSite shapefile (not the AccessPoint one) id_idx = _find_field(field_names, "id")
shp_files = list(extract_dir.rglob("*GreenspaceSite*.shp")) func_idx = _find_field(field_names, "funct")
if not shp_files: if id_idx is None or func_idx is None:
shp_files = [ raise ValueError(f"Missing id/function fields. Available: {field_names}")
f
for f in extract_dir.rglob("*.shp")
if "AccessPoint" not in f.name
]
if not shp_files:
raise FileNotFoundError(
"No GreenspaceSite shapefile found in download"
)
print(f"Reading {shp_files[0].name}...") site_funcs = {}
reader = shp.Reader(str(shp_files[0]), encoding="latin-1") for rec in reader.iterRecords():
site_funcs[rec[id_idx]] = rec[func_idx]
# Find the "function" field (greenspace type) print(f" Loaded {len(site_funcs):,} site function mappings")
field_names = [f[0] for f in reader.fields[1:]] # skip deletion flag return site_funcs
func_field = None
for name in field_names:
if "funct" in name.lower(): def _read_access_points(
func_field = name shp_path: Path, site_funcs: dict[str, str]
break ) -> tuple[list[float], list[float], list[str]]:
if func_field is None: """Read access points, tagging each with its parent site's function."""
reader = shp.Reader(str(shp_path), encoding="latin-1")
field_names = [f[0] for f in reader.fields[1:]]
# The access point shapefile has a reference field linking to the parent site
ref_idx = _find_field(field_names, "refto", "ref_to", "greensp")
if ref_idx is None:
raise ValueError( raise ValueError(
f"No 'function' field found. Available: {field_names}" f"No site reference field found in access points. Available: {field_names}"
) )
func_idx = field_names.index(func_field)
# Find a name field if available lats: list[float] = []
name_idx = None lngs: list[float] = []
for name in field_names: categories: list[str] = []
if "distname" in name.lower(): skipped = 0
name_idx = field_names.index(name)
break
lats = []
lngs = []
categories = []
names = []
for sr in reader.shapeRecords(): for sr in reader.shapeRecords():
func = sr.record[func_idx] site_id = sr.record[ref_idx]
site_name = sr.record[name_idx] if name_idx is not None else "" func = site_funcs.get(site_id)
if func is None:
skipped += 1
continue
try:
geom = to_shapely(sr.shape.__geo_interface__)
if geom.is_empty:
continue
lng, lat = _to_wgs84.transform(geom.x, geom.y)
except Exception:
continue
lats.append(lat)
lngs.append(lng)
categories.append(func)
if skipped:
print(f" Skipped {skipped:,} access points with unknown site ID")
return lats, lngs, categories
def _read_site_centroids(
shp_path: Path, site_funcs: dict[str, str], covered_ids: set[str]
) -> tuple[list[float], list[float], list[str]]:
"""Read polygon centroids for sites that have no access points (fallback)."""
reader = shp.Reader(str(shp_path), encoding="latin-1")
field_names = [f[0] for f in reader.fields[1:]]
id_idx = _find_field(field_names, "id")
func_idx = _find_field(field_names, "funct")
if id_idx is None or func_idx is None:
return [], [], []
lats: list[float] = []
lngs: list[float] = []
categories: list[str] = []
for sr in reader.shapeRecords():
site_id = sr.record[id_idx]
if site_id in covered_ids:
continue
func = sr.record[func_idx]
try: try:
geom = to_shapely(sr.shape.__geo_interface__) geom = to_shapely(sr.shape.__geo_interface__)
if geom.is_empty or not geom.is_valid: if geom.is_empty or not geom.is_valid:
@ -92,20 +135,68 @@ def download_greenspace(output: Path) -> None:
lats.append(lat) lats.append(lat)
lngs.append(lng) lngs.append(lng)
categories.append(func) categories.append(func)
names.append(site_name or "")
return lats, lngs, categories
def download_greenspace(output: Path) -> None:
output.parent.mkdir(parents=True, exist_ok=True)
with tempfile.TemporaryDirectory() as cache_dir:
zip_path = Path(cache_dir) / "greenspace.zip"
extract_dir = Path(cache_dir) / "extracted"
download(URL, zip_path, timeout=300)
extract_zip(zip_path, extract_dir)
# Find both shapefiles
site_shps = list(extract_dir.rglob("*GreenspaceSite*.shp"))
access_shps = list(extract_dir.rglob("*AccessPoint*.shp"))
if not site_shps:
raise FileNotFoundError("No GreenspaceSite shapefile found")
if not access_shps:
raise FileNotFoundError("No AccessPoint shapefile found")
# Step 1: Build site ID → function mapping
print(f"Reading {site_shps[0].name} for function types...")
site_funcs = _read_site_functions(site_shps[0])
# Step 2: Read access points (primary — park entrances)
print(f"Reading {access_shps[0].name}...")
ap_lats, ap_lngs, ap_cats = _read_access_points(access_shps[0], site_funcs)
print(f" {len(ap_lats):,} access points loaded")
# Step 3: Fall back to centroids for sites without any access points
covered_ids = set()
reader = shp.Reader(str(access_shps[0]), encoding="latin-1")
field_names = [f[0] for f in reader.fields[1:]]
ref_idx = _find_field(field_names, "refto", "ref_to", "greensp")
if ref_idx is not None:
for rec in reader.iterRecords():
covered_ids.add(rec[ref_idx])
print("Adding centroids for sites without access points...")
fb_lats, fb_lngs, fb_cats = _read_site_centroids(
site_shps[0], site_funcs, covered_ids
)
print(f" {len(fb_lats):,} centroid fallbacks added")
lats = ap_lats + fb_lats
lngs = ap_lngs + fb_lngs
categories = ap_cats + fb_cats
df = pl.DataFrame( df = pl.DataFrame(
{ {
"lat": np.array(lats, dtype=np.float64), "lat": np.array(lats, dtype=np.float64),
"lng": np.array(lngs, dtype=np.float64), "lng": np.array(lngs, dtype=np.float64),
"category": categories, "category": categories,
"name": names,
} }
) )
df.write_parquet(output) df.write_parquet(output)
size_mb = output.stat().st_size / (1024 * 1024) size_mb = output.stat().st_size / (1024 * 1024)
print(f"Wrote {output} ({size_mb:.1f} MB, {len(df):,} greenspace sites)") print(f"Wrote {output} ({size_mb:.1f} MB, {len(df):,} points)")
counts = df.group_by("category").len().sort("len", descending=True) counts = df.group_by("category").len().sort("len", descending=True)
for row in counts.iter_rows(named=True): for row in counts.iter_rows(named=True):
@ -114,7 +205,7 @@ def download_greenspace(output: Path) -> None:
def main() -> None: def main() -> None:
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Download OS Open Greenspace site centroids" description="Download OS Open Greenspace access points"
) )
parser.add_argument( parser.add_argument(
"--output", type=Path, required=True, help="Output parquet file path" "--output", type=Path, required=True, help="Output parquet file path"

View file

@ -12,7 +12,7 @@ from io import BytesIO
from pathlib import Path from pathlib import Path
PROTOMAPS_BASE = "https://build.protomaps.com" PROTOMAPS_BASE = "https://build.protomaps.com"
UK_BBOX = "-10.5,49.5,2.5,61" UK_BBOX = "-10.5,49,5,61"
MAX_AGE_DAYS = 14 MAX_AGE_DAYS = 14

View file

@ -59,6 +59,8 @@ _AREA_COLUMNS = [
# Schools # Schools
"Good+ primary schools within 5km", "Good+ primary schools within 5km",
"Good+ secondary schools within 5km", "Good+ secondary schools within 5km",
"Good+ primary schools within 2km",
"Good+ secondary schools within 2km",
# Demographics # Demographics
"Median age", "Median age",
] ]
@ -331,6 +333,8 @@ def _build(
"noise_lden_db": "Noise (dB)", "noise_lden_db": "Noise (dB)",
"good_primary_5km": "Good+ primary schools within 5km", "good_primary_5km": "Good+ primary schools within 5km",
"good_secondary_5km": "Good+ secondary schools within 5km", "good_secondary_5km": "Good+ secondary schools within 5km",
"good_primary_2km": "Good+ primary schools within 2km",
"good_secondary_2km": "Good+ secondary schools within 2km",
"max_download_speed": "Max available download speed (Mbps)", "max_download_speed": "Max available download speed (Mbps)",
"serious_crime_avg_yr": "Serious crime (avg/yr)", "serious_crime_avg_yr": "Serious crime (avg/yr)",
"minor_crime_avg_yr": "Minor crime (avg/yr)", "minor_crime_avg_yr": "Minor crime (avg/yr)",

View file

@ -60,9 +60,14 @@ def main():
# Load all postcodes for proximity counting # Load all postcodes for proximity counting
postcodes = arcgis.rename({"lng": "lon"}) postcodes = arcgis.rename({"lng": "lon"})
result = count_pois_per_postcode( counts_5km = count_pois_per_postcode(
postcodes, schools, radius_km=5, groups=SCHOOL_GROUPS postcodes, schools, radius_km=5, groups=SCHOOL_GROUPS
) )
counts_2km = count_pois_per_postcode(
postcodes, schools, radius_km=2, groups=SCHOOL_GROUPS
)
result = counts_5km.join(counts_2km, on="postcode")
result.write_parquet(args.output) result.write_parquet(args.output)
size_mb = args.output.stat().st_size / (1024 * 1024) size_mb = args.output.stat().st_size / (1024 * 1024)