Add noise data

This commit is contained in:
Andras Schmelczer 2026-02-01 11:08:49 +00:00
parent 5f311233e4
commit a45495269e

279
pipeline/download/noise.py Normal file
View file

@ -0,0 +1,279 @@
"""Download Defra Round 4 (2022) strategic noise data for England.
Downloads modelled noise levels (road, rail, airport) as GeoTIFF rasters via
WCS, then samples noise values at postcode centroids. Outputs a parquet file
with postcode-level noise in dB for each source.
Uses 100km tiles (~42 per source) to balance request size vs count. The server
times out on tiles larger than ~150km at 100m resolution.
Data source: Defra Strategic Noise Mapping Round 4 (2022)
- Lden = day-evening-night 24h weighted average (the EU standard metric)
- 10m grid, modelled at 4m above ground
License: Open Government Licence v3.0
Note: Road/rail use WCS 1.0.0; airport requires WCS 2.0.1 (Defra's 1.0.0
endpoint is broken for that coverage).
"""
import argparse
import tempfile
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import httpx
import numpy as np
import polars as pl
import rasterio
from pyproj import Transformer
from rasterio.merge import merge
from rasterio.transform import rowcol
# Noise sources: (label, column_name, WCS base URL, coverage ID, WCS version)
# Road/rail work with WCS 1.0.0; airport requires WCS 2.0.1.
NOISE_SOURCES = [
(
"Road",
"road_noise_lden_db",
"https://environment.data.gov.uk/spatialdata/road-noise-all-metrics-england-round-4/wcs",
"Road_Noise_Lden_England_Round_4_All",
"1.0.0",
),
(
"Rail",
"rail_noise_lden_db",
"https://environment.data.gov.uk/spatialdata/noise-data/wcs",
"Rail_Noise_Lden_England_Round_4_All",
"1.0.0",
),
(
"Airport",
"airport_noise_lden_db",
"https://environment.data.gov.uk/spatialdata/airport-noise-all-metrics-england-round-4/wcs",
"dac9cba4-abe7-43bd-b8e9-8a83da52edd8__Airport_Noise_ALL_Lden",
"2.0.1",
),
]
# England extent in EPSG:27700 (British National Grid), rounded outward
BNG_MIN_E = 80_000
BNG_MAX_E = 660_000
BNG_MIN_N = 0
BNG_MAX_N = 660_000
# Tile size in metres (100km balances request size vs count; 300km causes 504s)
TILE_SIZE = 100_000
# Max concurrent tile downloads
MAX_WORKERS = 4
# Native raster resolution (10m grid)
NATIVE_RESOLUTION = 10
# Request pixel resolution in metres (100m is sufficient for postcode-level data
# and keeps download size ~100x smaller than native 10m)
RESOLUTION = 100
def _wcs_get_coverage_url(
wcs_base: str,
coverage_id: str,
min_e: int,
min_n: int,
max_e: int,
max_n: int,
wcs_version: str = "1.0.0",
) -> str:
"""Build a WCS GetCoverage URL for a BNG bounding box."""
if wcs_version == "2.0.1":
return (
f"{wcs_base}?"
f"service=WCS&version=2.0.1&request=GetCoverage"
f"&coverageId={coverage_id}"
f"&format=image/tiff"
f"&subsettingCRS=EPSG:27700"
f"&subset=E({min_e},{max_e})"
f"&subset=N({min_n},{max_n})"
f"&scaleFactor={NATIVE_RESOLUTION / RESOLUTION}"
)
width = (max_e - min_e) // RESOLUTION
height = (max_n - min_n) // RESOLUTION
return (
f"{wcs_base}?"
f"service=WCS&version=1.0.0&request=GetCoverage"
f"&coverage={coverage_id}"
f"&CRS=EPSG:27700"
f"&BBOX={min_e},{min_n},{max_e},{max_n}"
f"&width={width}&height={height}"
f"&format=GeoTIFF"
)
_TO_BNG = Transformer.from_crs("EPSG:4326", "EPSG:27700", always_xy=True)
def _bng_from_latlon(lat: np.ndarray, lon: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""Convert WGS84 lat/lon to British National Grid easting/northing."""
return _TO_BNG.transform(lon, lat) # pyproj takes (x=lon, y=lat)
def _download_tile(
wcs_base: str,
coverage_id: str,
min_e: int,
min_n: int,
max_e: int,
max_n: int,
tile_path: Path,
wcs_version: str = "1.0.0",
) -> Path | None:
"""Download a single WCS tile. Returns path if successful, None otherwise."""
url = _wcs_get_coverage_url(wcs_base, coverage_id, min_e, min_n, max_e, max_n, wcs_version)
try:
with httpx.Client(timeout=300, follow_redirects=True) as client:
resp = client.get(url)
resp.raise_for_status()
content_type = resp.headers.get("content-type", "")
if "tiff" not in content_type and resp.content[:4] not in (b"II*\x00", b"MM\x00*"):
return None
tile_path.write_bytes(resp.content)
return tile_path
except (httpx.HTTPStatusError, httpx.TimeoutException, httpx.ConnectError) as e:
print(f" Failed to download tile ({min_e},{min_n})-({max_e},{max_n}): {e}")
return None
def download_raster(
tile_dir: Path, wcs_base: str, coverage_id: str, label: str, wcs_version: str = "1.0.0"
) -> list[Path]:
"""Download noise GeoTIFF raster covering England, returning paths to saved files."""
tiles = []
for min_e in range(BNG_MIN_E, BNG_MAX_E, TILE_SIZE):
for min_n in range(BNG_MIN_N, BNG_MAX_N, TILE_SIZE):
max_e = min(min_e + TILE_SIZE, BNG_MAX_E)
max_n = min(min_n + TILE_SIZE, BNG_MAX_N)
tiles.append((min_e, min_n, max_e, max_n))
print(f"[{label}] Downloading {len(tiles)} tiles at {RESOLUTION}m resolution ({MAX_WORKERS} workers)...")
paths = []
completed = 0
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {}
for min_e, min_n, max_e, max_n in tiles:
tile_path = tile_dir / f"tile_{min_e}_{min_n}.tif"
fut = executor.submit(
_download_tile, wcs_base, coverage_id,
min_e, min_n, max_e, max_n, tile_path, wcs_version,
)
futures[fut] = (min_e, min_n)
for fut in as_completed(futures):
completed += 1
result = fut.result()
if result is not None:
paths.append(result)
print(
f"\r [{completed}/{len(tiles)}] Downloaded {len(paths)} valid tiles",
end="",
flush=True,
)
print(f"\n[{label}] Downloaded {len(paths)}/{len(tiles)} tiles")
return paths
def sample_noise_at_postcodes(
tile_paths: list[Path],
easting: np.ndarray,
northing: np.ndarray,
label: str,
) -> np.ndarray:
"""Sample noise values from merged tiles at given BNG coordinates."""
print(f"[{label}] Merging {len(tile_paths)} tiles...")
datasets = [rasterio.open(p) for p in tile_paths]
raster_nodata = datasets[0].nodata
mosaic, mosaic_transform = merge(datasets)
for ds in datasets:
ds.close()
noise_grid = mosaic[0]
print(f"[{label}] Sampling noise values at postcode centroids...")
rows, cols = rowcol(mosaic_transform, easting, northing)
rows = np.asarray(rows)
cols = np.asarray(cols)
h, w = noise_grid.shape
in_bounds = (rows >= 0) & (rows < h) & (cols >= 0) & (cols < w)
noise_db = np.full(len(easting), np.nan, dtype=np.float32)
valid_rows = rows[in_bounds]
valid_cols = cols[in_bounds]
sampled = noise_grid[valid_rows, valid_cols].astype(np.float32)
# Mark nodata and zero (unmapped areas) as NaN.
# Road/rail use nodata=-96, airport uses nodata=3.4e38.
if raster_nodata is not None:
sampled[np.isclose(sampled, np.float32(raster_nodata), rtol=1e-5)] = np.nan
sampled[sampled == 0] = np.nan
noise_db[in_bounds] = sampled
valid_count = int(np.sum(~np.isnan(noise_db)))
print(f"[{label}] Sampled {valid_count:,} / {len(easting):,} postcodes with noise data")
return noise_db
def main() -> None:
parser = argparse.ArgumentParser(
description="Download Defra noise data (road, rail, airport) and sample at postcode centroids"
)
parser.add_argument(
"--arcgis",
type=Path,
required=True,
help="ArcGIS postcode data parquet (for lat/lon coordinates)",
)
parser.add_argument(
"--output", type=Path, required=True, help="Output parquet file path"
)
args = parser.parse_args()
args.output.parent.mkdir(parents=True, exist_ok=True)
print("Loading postcode coordinates...")
postcodes = pl.read_parquet(
args.arcgis, columns=["pcds", "lat", "long"]
).rename({"pcds": "postcode", "long": "lon"})
lat = postcodes["lat"].to_numpy()
lon = postcodes["lon"].to_numpy()
print("Converting lat/lon to BNG...")
easting, northing = _bng_from_latlon(lat, lon)
result = postcodes.select("postcode")
with tempfile.TemporaryDirectory() as tmp:
for label, col_name, wcs_base, coverage_id, wcs_version in NOISE_SOURCES:
tile_dir = Path(tmp) / label.lower()
tile_dir.mkdir()
tile_paths = download_raster(tile_dir, wcs_base, coverage_id, label, wcs_version)
if not tile_paths:
print(f"[{label}] WARNING: No tiles downloaded — column will be all null")
noise_db = np.full(len(lat), np.nan, dtype=np.float32)
else:
noise_db = sample_noise_at_postcodes(tile_paths, easting, northing, label)
result = result.with_columns(pl.Series(col_name, noise_db))
result.write_parquet(args.output, compression="zstd")
size_mb = args.output.stat().st_size / (1024 * 1024)
print(f"Wrote {args.output} ({size_mb:.1f} MB)")
if __name__ == "__main__":
main()