"""Download INSPIRE Index Polygons from HM Land Registry. Downloads GML files for all local authorities from the INSPIRE download page. Each ZIP contains a GML file with title extent polygons for that authority. Source: https://use-land-property-data.service.gov.uk/datasets/inspire/download License: INSPIRE End User Licence """ import argparse from concurrent.futures import ThreadPoolExecutor, as_completed from html.parser import HTMLParser from pathlib import Path import time import zipfile from urllib.parse import urljoin, urlparse import httpx from tqdm import tqdm BASE_URL = "https://use-land-property-data.service.gov.uk" INDEX_URL = f"{BASE_URL}/datasets/inspire/download" HEADERS = { "User-Agent": "Mozilla/5.0 (compatible; perfect-postcode-data-pipeline/1.0)" } CHUNK_SIZE = 1024 * 1024 MAX_ATTEMPTS = 5 BACKOFF_BASE = 2.0 class ZipLinkParser(HTMLParser): """Collect links to Land Registry INSPIRE ZIP downloads.""" def __init__(self, base_url: str) -> None: super().__init__() self.base_url = base_url self.base_netloc = urlparse(base_url).netloc self.urls: set[str] = set() def handle_starttag( self, tag: str, attrs: list[tuple[str, str | None]] ) -> None: if tag != "a": return href = dict(attrs).get("href") if not href: return url = urljoin(self.base_url, href) parsed = urlparse(url) if ( parsed.scheme in {"http", "https"} and parsed.netloc == self.base_netloc and parsed.path.startswith("/datasets/inspire/download/") and parsed.path.endswith(".zip") ): self.urls.add(parsed._replace(query="", fragment="").geturl()) def parse_zip_urls(html: str, base_url: str = BASE_URL) -> list[str]: """Parse the INSPIRE download page for all council ZIP URLs.""" parser = ZipLinkParser(base_url) parser.feed(html) return sorted(parser.urls) def get_zip_urls() -> list[str]: """Scrape the INSPIRE download page for all .zip hrefs.""" # The site requires a cookie jar to avoid redirect loops. with httpx.Client( follow_redirects=True, timeout=httpx.Timeout(30.0, read=60), headers={**HEADERS, "Accept": "text/html"}, ) as client: resp = client.get(INDEX_URL) resp.raise_for_status() html = resp.text urls = parse_zip_urls(html) if not urls: raise RuntimeError(f"No INSPIRE ZIP links found at {INDEX_URL}") return urls def _is_valid_zip(path: Path) -> bool: return path.exists() and zipfile.is_zipfile(path) def _stream_download(url: str, output_path: Path, *, timeout: float) -> None: with httpx.stream( "GET", url, follow_redirects=True, timeout=httpx.Timeout(30.0, read=timeout), headers=HEADERS, ) as response: response.raise_for_status() with output_path.open("wb") as out: for chunk in response.iter_bytes(chunk_size=CHUNK_SIZE): out.write(chunk) def download_one( url: str, output_dir: Path, *, force: bool = False, timeout: float = 600, ) -> str: """Download a single ZIP file. Returns the filename.""" name = Path(urlparse(url).path).name if not name.endswith(".zip"): raise ValueError(f"Expected a ZIP download URL, got {url}") output_dir.mkdir(parents=True, exist_ok=True) dest = output_dir / name if not force and _is_valid_zip(dest): return f"{name} (skipped, valid ZIP exists)" tmp = dest.with_suffix(dest.suffix + ".tmp") last_exc: Exception | None = None try: for attempt in range(1, MAX_ATTEMPTS + 1): tmp.unlink(missing_ok=True) try: _stream_download(url, tmp, timeout=timeout) if not _is_valid_zip(tmp): raise RuntimeError( f"{name} did not download as a valid ZIP" ) tmp.replace(dest) return name except (httpx.HTTPError, OSError) as exc: last_exc = exc if attempt < MAX_ATTEMPTS: time.sleep(BACKOFF_BASE ** (attempt - 1)) finally: tmp.unlink(missing_ok=True) raise RuntimeError( f"{name} failed after {MAX_ATTEMPTS} attempts" ) from last_exc def main() -> None: parser = argparse.ArgumentParser( description="Download INSPIRE Index Polygon GML files" ) parser.add_argument( "--output", type=Path, required=True, help="Output directory for downloaded ZIPs", ) parser.add_argument( "--workers", type=int, default=8, help="Number of parallel downloads (default: 8)", ) parser.add_argument( "--force", action="store_true", help="Re-download files even when a valid ZIP already exists", ) parser.add_argument( "--timeout", type=float, default=600, help="Per-file read timeout in seconds (default: 600)", ) args = parser.parse_args() if args.workers < 1: raise SystemExit("--workers must be at least 1") args.output.mkdir(parents=True, exist_ok=True) print("Fetching download index...") urls = get_zip_urls() print(f"Found {len(urls)} files to download") failures: list[tuple[str, Exception]] = [] with tqdm(total=len(urls), unit="file") as pbar: with ThreadPoolExecutor(max_workers=args.workers) as pool: futures = { pool.submit( download_one, url, args.output, force=args.force, timeout=args.timeout, ): url for url in urls } for future in as_completed(futures): try: result = future.result() pbar.set_postfix_str(result[:40]) except Exception as exc: # noqa: BLE001 failures.append((futures[future], exc)) pbar.set_postfix_str("FAILED") pbar.update(1) succeeded = len(urls) - len(failures) print(f"Done. {succeeded}/{len(urls)} files in {args.output}") if failures: print(f"{len(failures)} file(s) failed:") for url, exc in failures: name = Path(urlparse(url).path).name print(f" - {name}: {exc}") raise SystemExit( f"{len(failures)} INSPIRE download(s) failed; " "re-run to retry only the missing files" ) if __name__ == "__main__": main()