diff --git a/pipeline/download/noise.py b/pipeline/download/noise.py new file mode 100644 index 0000000..ba91ff7 --- /dev/null +++ b/pipeline/download/noise.py @@ -0,0 +1,279 @@ +"""Download Defra Round 4 (2022) strategic noise data for England. + +Downloads modelled noise levels (road, rail, airport) as GeoTIFF rasters via +WCS, then samples noise values at postcode centroids. Outputs a parquet file +with postcode-level noise in dB for each source. + +Uses 100km tiles (~42 per source) to balance request size vs count. The server +times out on tiles larger than ~150km at 100m resolution. + +Data source: Defra Strategic Noise Mapping Round 4 (2022) + - Lden = day-evening-night 24h weighted average (the EU standard metric) + - 10m grid, modelled at 4m above ground +License: Open Government Licence v3.0 + +Note: Road/rail use WCS 1.0.0; airport requires WCS 2.0.1 (Defra's 1.0.0 +endpoint is broken for that coverage). +""" + +import argparse +import tempfile +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path + +import httpx +import numpy as np +import polars as pl +import rasterio +from pyproj import Transformer +from rasterio.merge import merge +from rasterio.transform import rowcol + +# Noise sources: (label, column_name, WCS base URL, coverage ID, WCS version) +# Road/rail work with WCS 1.0.0; airport requires WCS 2.0.1. +NOISE_SOURCES = [ + ( + "Road", + "road_noise_lden_db", + "https://environment.data.gov.uk/spatialdata/road-noise-all-metrics-england-round-4/wcs", + "Road_Noise_Lden_England_Round_4_All", + "1.0.0", + ), + ( + "Rail", + "rail_noise_lden_db", + "https://environment.data.gov.uk/spatialdata/noise-data/wcs", + "Rail_Noise_Lden_England_Round_4_All", + "1.0.0", + ), + ( + "Airport", + "airport_noise_lden_db", + "https://environment.data.gov.uk/spatialdata/airport-noise-all-metrics-england-round-4/wcs", + "dac9cba4-abe7-43bd-b8e9-8a83da52edd8__Airport_Noise_ALL_Lden", + "2.0.1", + ), +] + +# England extent in EPSG:27700 (British National Grid), rounded outward +BNG_MIN_E = 80_000 +BNG_MAX_E = 660_000 +BNG_MIN_N = 0 +BNG_MAX_N = 660_000 + +# Tile size in metres (100km balances request size vs count; 300km causes 504s) +TILE_SIZE = 100_000 + +# Max concurrent tile downloads +MAX_WORKERS = 4 + +# Native raster resolution (10m grid) +NATIVE_RESOLUTION = 10 + +# Request pixel resolution in metres (100m is sufficient for postcode-level data +# and keeps download size ~100x smaller than native 10m) +RESOLUTION = 100 + + +def _wcs_get_coverage_url( + wcs_base: str, + coverage_id: str, + min_e: int, + min_n: int, + max_e: int, + max_n: int, + wcs_version: str = "1.0.0", +) -> str: + """Build a WCS GetCoverage URL for a BNG bounding box.""" + if wcs_version == "2.0.1": + return ( + f"{wcs_base}?" + f"service=WCS&version=2.0.1&request=GetCoverage" + f"&coverageId={coverage_id}" + f"&format=image/tiff" + f"&subsettingCRS=EPSG:27700" + f"&subset=E({min_e},{max_e})" + f"&subset=N({min_n},{max_n})" + f"&scaleFactor={NATIVE_RESOLUTION / RESOLUTION}" + ) + width = (max_e - min_e) // RESOLUTION + height = (max_n - min_n) // RESOLUTION + return ( + f"{wcs_base}?" + f"service=WCS&version=1.0.0&request=GetCoverage" + f"&coverage={coverage_id}" + f"&CRS=EPSG:27700" + f"&BBOX={min_e},{min_n},{max_e},{max_n}" + f"&width={width}&height={height}" + f"&format=GeoTIFF" + ) + + +_TO_BNG = Transformer.from_crs("EPSG:4326", "EPSG:27700", always_xy=True) + + +def _bng_from_latlon(lat: np.ndarray, lon: np.ndarray) -> tuple[np.ndarray, np.ndarray]: + """Convert WGS84 lat/lon to British National Grid easting/northing.""" + return _TO_BNG.transform(lon, lat) # pyproj takes (x=lon, y=lat) + + +def _download_tile( + wcs_base: str, + coverage_id: str, + min_e: int, + min_n: int, + max_e: int, + max_n: int, + tile_path: Path, + wcs_version: str = "1.0.0", +) -> Path | None: + """Download a single WCS tile. Returns path if successful, None otherwise.""" + url = _wcs_get_coverage_url(wcs_base, coverage_id, min_e, min_n, max_e, max_n, wcs_version) + try: + with httpx.Client(timeout=300, follow_redirects=True) as client: + resp = client.get(url) + resp.raise_for_status() + + content_type = resp.headers.get("content-type", "") + if "tiff" not in content_type and resp.content[:4] not in (b"II*\x00", b"MM\x00*"): + return None + + tile_path.write_bytes(resp.content) + return tile_path + except (httpx.HTTPStatusError, httpx.TimeoutException, httpx.ConnectError) as e: + print(f" Failed to download tile ({min_e},{min_n})-({max_e},{max_n}): {e}") + return None + + +def download_raster( + tile_dir: Path, wcs_base: str, coverage_id: str, label: str, wcs_version: str = "1.0.0" +) -> list[Path]: + """Download noise GeoTIFF raster covering England, returning paths to saved files.""" + tiles = [] + for min_e in range(BNG_MIN_E, BNG_MAX_E, TILE_SIZE): + for min_n in range(BNG_MIN_N, BNG_MAX_N, TILE_SIZE): + max_e = min(min_e + TILE_SIZE, BNG_MAX_E) + max_n = min(min_n + TILE_SIZE, BNG_MAX_N) + tiles.append((min_e, min_n, max_e, max_n)) + + print(f"[{label}] Downloading {len(tiles)} tiles at {RESOLUTION}m resolution ({MAX_WORKERS} workers)...") + paths = [] + completed = 0 + + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + futures = {} + for min_e, min_n, max_e, max_n in tiles: + tile_path = tile_dir / f"tile_{min_e}_{min_n}.tif" + fut = executor.submit( + _download_tile, wcs_base, coverage_id, + min_e, min_n, max_e, max_n, tile_path, wcs_version, + ) + futures[fut] = (min_e, min_n) + + for fut in as_completed(futures): + completed += 1 + result = fut.result() + if result is not None: + paths.append(result) + print( + f"\r [{completed}/{len(tiles)}] Downloaded {len(paths)} valid tiles", + end="", + flush=True, + ) + + print(f"\n[{label}] Downloaded {len(paths)}/{len(tiles)} tiles") + return paths + + +def sample_noise_at_postcodes( + tile_paths: list[Path], + easting: np.ndarray, + northing: np.ndarray, + label: str, +) -> np.ndarray: + """Sample noise values from merged tiles at given BNG coordinates.""" + print(f"[{label}] Merging {len(tile_paths)} tiles...") + datasets = [rasterio.open(p) for p in tile_paths] + raster_nodata = datasets[0].nodata + mosaic, mosaic_transform = merge(datasets) + for ds in datasets: + ds.close() + + noise_grid = mosaic[0] + + print(f"[{label}] Sampling noise values at postcode centroids...") + rows, cols = rowcol(mosaic_transform, easting, northing) + rows = np.asarray(rows) + cols = np.asarray(cols) + + h, w = noise_grid.shape + in_bounds = (rows >= 0) & (rows < h) & (cols >= 0) & (cols < w) + + noise_db = np.full(len(easting), np.nan, dtype=np.float32) + valid_rows = rows[in_bounds] + valid_cols = cols[in_bounds] + sampled = noise_grid[valid_rows, valid_cols].astype(np.float32) + + # Mark nodata and zero (unmapped areas) as NaN. + # Road/rail use nodata=-96, airport uses nodata=3.4e38. + if raster_nodata is not None: + sampled[np.isclose(sampled, np.float32(raster_nodata), rtol=1e-5)] = np.nan + sampled[sampled == 0] = np.nan + noise_db[in_bounds] = sampled + + valid_count = int(np.sum(~np.isnan(noise_db))) + print(f"[{label}] Sampled {valid_count:,} / {len(easting):,} postcodes with noise data") + return noise_db + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Download Defra noise data (road, rail, airport) and sample at postcode centroids" + ) + parser.add_argument( + "--arcgis", + type=Path, + required=True, + help="ArcGIS postcode data parquet (for lat/lon coordinates)", + ) + parser.add_argument( + "--output", type=Path, required=True, help="Output parquet file path" + ) + args = parser.parse_args() + + args.output.parent.mkdir(parents=True, exist_ok=True) + + print("Loading postcode coordinates...") + postcodes = pl.read_parquet( + args.arcgis, columns=["pcds", "lat", "long"] + ).rename({"pcds": "postcode", "long": "lon"}) + + lat = postcodes["lat"].to_numpy() + lon = postcodes["lon"].to_numpy() + + print("Converting lat/lon to BNG...") + easting, northing = _bng_from_latlon(lat, lon) + + result = postcodes.select("postcode") + + with tempfile.TemporaryDirectory() as tmp: + for label, col_name, wcs_base, coverage_id, wcs_version in NOISE_SOURCES: + tile_dir = Path(tmp) / label.lower() + tile_dir.mkdir() + tile_paths = download_raster(tile_dir, wcs_base, coverage_id, label, wcs_version) + + if not tile_paths: + print(f"[{label}] WARNING: No tiles downloaded — column will be all null") + noise_db = np.full(len(lat), np.nan, dtype=np.float32) + else: + noise_db = sample_noise_at_postcodes(tile_paths, easting, northing, label) + + result = result.with_columns(pl.Series(col_name, noise_db)) + + result.write_parquet(args.output, compression="zstd") + size_mb = args.output.stat().st_size / (1024 * 1024) + print(f"Wrote {args.output} ({size_mb:.1f} MB)") + + +if __name__ == "__main__": + main()