40 lines
1.3 KiB
Python
40 lines
1.3 KiB
Python
import sqlite3
|
|
from pathlib import Path
|
|
|
|
from shapely import wkb
|
|
from shapely.geometry import MultiPolygon, Polygon
|
|
|
|
_ENVELOPE_SIZES = {0: 0, 1: 32, 2: 48, 3: 48, 4: 64}
|
|
|
|
|
|
def parse_gpkg_geometry(blob: bytes):
|
|
"""Extract a Shapely geometry from a GeoPackage binary blob."""
|
|
flags = blob[3]
|
|
envelope_type = (flags >> 1) & 0x07
|
|
envelope_size = _ENVELOPE_SIZES.get(envelope_type)
|
|
if envelope_size is None:
|
|
raise ValueError(
|
|
f"Unknown GeoPackage envelope type {envelope_type} (expected 0-4)"
|
|
)
|
|
header_size = 8 + envelope_size
|
|
return wkb.loads(blob[header_size:])
|
|
|
|
|
|
def load_oa_boundaries(gpkg_path: Path) -> dict[str, Polygon | MultiPolygon]:
|
|
"""Load OA boundary polygons from a GeoPackage. Geometry is already in BNG."""
|
|
print("Loading OA boundaries...")
|
|
|
|
conn = sqlite3.connect(str(gpkg_path))
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT OA21CD, SHAPE FROM OA_2021_EW_BGC_V2")
|
|
|
|
oa_geoms: dict[str, Polygon | MultiPolygon] = {}
|
|
for oa_code, blob in cur:
|
|
geom = parse_gpkg_geometry(bytes(blob))
|
|
if geom.geom_type == "MultiPolygon" and len(geom.geoms) == 1:
|
|
geom = geom.geoms[0]
|
|
oa_geoms[oa_code] = geom
|
|
|
|
conn.close()
|
|
print(f" Loaded {len(oa_geoms)} OA boundaries")
|
|
return oa_geoms
|