"""Build PMTiles raster tiles for the high-resolution Defra noise overlay. This keeps the native 10m strategic-noise rasters as the source of truth and renders transparent PNG XYZ tiles into MBTiles before converting to PMTiles. The dashboard serves the resulting archive through /api/overlays/noise. """ from __future__ import annotations import argparse import io import math import sqlite3 import subprocess import tempfile from dataclasses import dataclass from pathlib import Path import numpy as np import rasterio from PIL import Image from rasterio.enums import Resampling from rasterio.transform import from_bounds from rasterio.warp import reproject, transform_bounds from shapely import STRtree, box from pipeline.download.noise import ( BNG_MAX_E, BNG_MAX_N, BNG_MIN_E, BNG_MIN_N, NOISE_SOURCES, download_raster, ) from pipeline.download.tiles import ensure_pmtiles_cli from pipeline.local_temp import local_tmp_dir WEB_MERCATOR_CRS = "EPSG:3857" WEB_MERCATOR_EXTENT = 20_037_508.342789244 DEFAULT_SOURCE_NAMES = ("road", "rail", "airport") NOISE_COLOR_STOPS = np.array([45.0, 55.0, 65.0, 75.0], dtype=np.float32) NOISE_COLORS = np.array( [ [254, 240, 138], [251, 146, 60], [220, 38, 38], [127, 29, 29], ], dtype=np.float32, ) @dataclass(frozen=True) class RasterInfo: path: Path bounds_mercator: tuple[float, float, float, float] def _source_specs(source_names: tuple[str, ...]): requested = {name.lower() for name in source_names} if "all" in requested: requested = set(DEFAULT_SOURCE_NAMES) by_name = {label.lower(): spec for label, *spec in NOISE_SOURCES} unknown = sorted(requested - set(by_name)) if unknown: raise ValueError(f"Unknown noise source(s): {', '.join(unknown)}") return [ (name.title(), *by_name[name]) for name in DEFAULT_SOURCE_NAMES if name in requested ] def _download_source_rasters( raster_dir: Path, source_names: tuple[str, ...], ) -> list[Path]: paths: list[Path] = [] raster_dir.mkdir(parents=True, exist_ok=True) for ( label, _col_name, wcs_base, coverage_id, wcs_version, allow_missing_tiles, ) in _source_specs(source_names): tile_dir = raster_dir / label.lower() tile_dir.mkdir(parents=True, exist_ok=True) paths.extend( download_raster( tile_dir, wcs_base, coverage_id, label, wcs_version, allow_missing_tiles, ) ) return paths def _raster_infos(raster_paths: list[Path]) -> list[RasterInfo]: infos: list[RasterInfo] = [] for path in raster_paths: with rasterio.open(path) as dataset: if dataset.crs is None: raise ValueError(f"Raster has no CRS: {path}") bounds = transform_bounds( dataset.crs, WEB_MERCATOR_CRS, *dataset.bounds, densify_pts=21, ) infos.append(RasterInfo(path=path, bounds_mercator=bounds)) return infos def _england_bounds_wgs84() -> tuple[float, float, float, float]: return transform_bounds( "EPSG:27700", "EPSG:4326", BNG_MIN_E, BNG_MIN_N, BNG_MAX_E, BNG_MAX_N, densify_pts=21, ) def _lonlat_to_tile(lon: float, lat: float, zoom: int) -> tuple[int, int]: lat = max(min(lat, 85.05112878), -85.05112878) n = 1 << zoom x = int(math.floor((lon + 180.0) / 360.0 * n)) y = int( math.floor((1.0 - math.asinh(math.tan(math.radians(lat))) / math.pi) / 2.0 * n) ) return min(max(x, 0), n - 1), min(max(y, 0), n - 1) def _tile_bounds_mercator( zoom: int, x: int, y: int ) -> tuple[float, float, float, float]: n = 1 << zoom tile_size_m = WEB_MERCATOR_EXTENT * 2 / n left = -WEB_MERCATOR_EXTENT + x * tile_size_m right = left + tile_size_m top = WEB_MERCATOR_EXTENT - y * tile_size_m bottom = top - tile_size_m return left, bottom, right, top def _read_noise_tile( candidates: list[RasterInfo], bounds_mercator: tuple[float, float, float, float], tile_size: int, ) -> np.ndarray: left, bottom, right, top = bounds_mercator merged = np.full((tile_size, tile_size), np.nan, dtype=np.float32) for info in candidates: with rasterio.open(info.path) as source: tile = np.full((tile_size, tile_size), np.nan, dtype=np.float32) reproject( source=rasterio.band(source, 1), destination=tile, src_transform=source.transform, src_crs=source.crs, src_nodata=source.nodata if source.nodata is not None else 0, dst_transform=from_bounds( left, bottom, right, top, tile_size, tile_size ), dst_crs=WEB_MERCATOR_CRS, dst_nodata=np.nan, resampling=Resampling.bilinear, ) tile[~np.isfinite(tile) | (tile <= 0)] = np.nan merged = np.fmax(merged, tile) return merged def _encode_noise_png(noise_db: np.ndarray) -> bytes | None: valid = np.isfinite(noise_db) & (noise_db >= NOISE_COLOR_STOPS[0]) if not valid.any(): return None clipped = np.clip(noise_db, NOISE_COLOR_STOPS[0], NOISE_COLOR_STOPS[-1]) rgba = np.zeros((*noise_db.shape, 4), dtype=np.uint8) valid_values = clipped[valid] for channel in range(3): channel_values = np.interp( valid_values, NOISE_COLOR_STOPS, NOISE_COLORS[:, channel], ).astype(np.uint8) rgba[..., channel][valid] = channel_values alpha = np.interp( valid_values, [NOISE_COLOR_STOPS[0], NOISE_COLOR_STOPS[-1]], [70, 190], ).astype(np.uint8) rgba[..., 3][valid] = alpha output = io.BytesIO() Image.fromarray(rgba, mode="RGBA").save(output, format="PNG", optimize=True) return output.getvalue() def _tile_ranges( bounds_wgs84: tuple[float, float, float, float], zoom: int, ) -> tuple[range, range]: west, south, east, north = bounds_wgs84 min_x, min_y = _lonlat_to_tile(west, north, zoom) max_x, max_y = _lonlat_to_tile(east, south, zoom) return range(min_x, max_x + 1), range(min_y, max_y + 1) def _create_mbtiles( raster_infos: list[RasterInfo], mbtiles_path: Path, min_zoom: int, max_zoom: int, tile_size: int, ) -> int: if mbtiles_path.exists(): mbtiles_path.unlink() bounds_wgs84 = _england_bounds_wgs84() geometries = [box(*info.bounds_mercator) for info in raster_infos] tree = STRtree(geometries) conn = sqlite3.connect(mbtiles_path) conn.execute("CREATE TABLE metadata (name TEXT, value TEXT)") conn.execute( "CREATE TABLE tiles (zoom_level INTEGER, tile_column INTEGER, " "tile_row INTEGER, tile_data BLOB)" ) conn.execute( "CREATE UNIQUE INDEX tile_index ON tiles (zoom_level, tile_column, tile_row)" ) conn.executemany( "INSERT INTO metadata (name, value) VALUES (?, ?)", [ ("name", "Defra Lden noise overlay"), ("type", "overlay"), ("version", "1"), ("description", "Defra Round 4 10m strategic noise Lden overlay"), ("format", "png"), ( "attribution", "Contains public sector information licensed under the OGL v3.0", ), ("bounds", ",".join(f"{value:.6f}" for value in bounds_wgs84)), ("minzoom", str(min_zoom)), ("maxzoom", str(max_zoom)), ], ) total_tiles = 0 try: for zoom in range(min_zoom, max_zoom + 1): x_range, y_range = _tile_ranges(bounds_wgs84, zoom) zoom_tiles = 0 for x in x_range: for y in y_range: bounds_mercator = _tile_bounds_mercator(zoom, x, y) candidate_indexes = tree.query(box(*bounds_mercator)) if len(candidate_indexes) == 0: continue candidates = [ raster_infos[int(index)] for index in candidate_indexes ] tile = _read_noise_tile(candidates, bounds_mercator, tile_size) tile_png = _encode_noise_png(tile) if tile_png is None: continue tms_y = (1 << zoom) - 1 - y conn.execute( "INSERT INTO tiles VALUES (?, ?, ?, ?)", (zoom, x, tms_y, tile_png), ) zoom_tiles += 1 total_tiles += 1 conn.commit() print(f"Zoom {zoom}: wrote {zoom_tiles:,} PNG tiles") finally: conn.close() return total_tiles def build_noise_overlay_tiles( output_path: Path, raster_dir: Path, source_names: tuple[str, ...], input_rasters: tuple[Path, ...], pmtiles_bin: Path, pmtiles_version: str, min_zoom: int, max_zoom: int, tile_size: int, ) -> None: if min_zoom > max_zoom: raise ValueError("--min-zoom must be <= --max-zoom") raster_paths = list(input_rasters) or _download_source_rasters( raster_dir, source_names ) if not raster_paths: raise FileNotFoundError("No noise raster GeoTIFFs available") print(f"Preparing {len(raster_paths):,} noise raster tile(s)") raster_infos = _raster_infos(raster_paths) output_path.parent.mkdir(parents=True, exist_ok=True) ensure_pmtiles_cli(pmtiles_bin, pmtiles_version) with tempfile.TemporaryDirectory(dir=local_tmp_dir()) as tmp: mbtiles_path = Path(tmp) / "noise_lden_10m.mbtiles" tile_count = _create_mbtiles( raster_infos, mbtiles_path, min_zoom, max_zoom, tile_size ) if tile_count == 0: raise RuntimeError("Noise overlay generation produced no tiles") subprocess.run( [ str(pmtiles_bin), "convert", str(mbtiles_path), str(output_path), "--force", ], check=True, ) size_mb = output_path.stat().st_size / (1024 * 1024) print(f"Wrote {output_path} ({size_mb:.1f} MB)") def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--output", type=Path, required=True) parser.add_argument( "--raster-dir", type=Path, default=Path("property-data/noise_overlay_rasters"), help="Cache directory for downloaded Defra WCS GeoTIFF tiles", ) parser.add_argument( "--source", action="append", dest="sources", choices=("all", *DEFAULT_SOURCE_NAMES), help="Noise source to include; repeatable. Defaults to all.", ) parser.add_argument( "--input-raster", action="append", dest="input_rasters", type=Path, help="Existing GeoTIFF to render instead of downloading WCS rasters", ) parser.add_argument( "--pmtiles-bin", type=Path, default=Path("property-data/pmtiles") ) parser.add_argument("--pmtiles-version", default="1.22.3") parser.add_argument("--min-zoom", type=int, default=13) parser.add_argument("--max-zoom", type=int, default=14) parser.add_argument("--tile-size", type=int, default=256) args = parser.parse_args() build_noise_overlay_tiles( output_path=args.output, raster_dir=args.raster_dir, source_names=tuple(args.sources or ("all",)), input_rasters=tuple(args.input_rasters or ()), pmtiles_bin=args.pmtiles_bin, pmtiles_version=args.pmtiles_version, min_zoom=args.min_zoom, max_zoom=args.max_zoom, tile_size=args.tile_size, ) if __name__ == "__main__": main()