#!/usr/bin/env python3 """Download Land Registry price paid data and convert to Parquet.""" # Run it with: # uv run download_land_registry.py # The download failed in this environment due to network restrictions, but the script will work on your local machine. The ~5GB CSV should compress to roughly ~1GB in Parquet format with ZSTD compression. import time import httpx import polars as pl from pathlib import Path from tqdm import tqdm URL = "http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv" BASE_DATA_PATH = Path("./data_sources") BASE_DATA_PATH.mkdir(exist_ok=True) CSV_PATH = BASE_DATA_PATH / "pp-complete.csv" PARQUET_PATH = BASE_DATA_PATH / "pp-complete.parquet" MAX_RETRIES = 3 def download_with_progress(url: str, output_path: Path) -> None: """Download a file with progress bar and retry logic.""" for attempt in range(1, MAX_RETRIES + 1): try: with httpx.stream( "GET", url, follow_redirects=True, timeout=httpx.Timeout(30.0, read=None), ) as response: response.raise_for_status() # pyright: ignore[reportUnusedCallResult] total = int(response.headers.get("content-length", 0)) with ( open(output_path, "wb") as f, tqdm( total=total, unit="B", unit_scale=True, unit_divisor=1024, desc="Downloading", ) as pbar, ): for chunk in response.iter_bytes(chunk_size=8192): f.write(chunk) pbar.update(len(chunk)) return # Success except (httpx.ConnectError, httpx.ReadTimeout) as e: if attempt < MAX_RETRIES: wait = 2**attempt print(f"Attempt {attempt} failed: {e}. Retrying in {wait}s...") time.sleep(wait) else: raise def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None: """Convert CSV to Parquet using Polars.""" print("Converting to Parquet...") # https://www.gov.uk/guidance/about-the-price-paid-data # Land Registry CSV columns columns = [ "transaction_id", "price", "date_of_transfer", "postcode", "property_type", "old_new", "duration", "paon", "saon", "street", "locality", "town_city", "district", "county", "ppd_category", "record_status", ] df = pl.read_csv( csv_path, has_header=False, new_columns=columns, try_parse_dates=True, ) df.write_parquet(parquet_path, compression="zstd") print(f"Saved to {parquet_path}") print(f"Rows: {df.height:,}") print(f"CSV size: {csv_path.stat().st_size / 1024**2:.1f} MB") print(f"Parquet size: {parquet_path.stat().st_size / 1024**2:.1f} MB") def main() -> None: if PARQUET_PATH.exists(): print(f"Parquet already exists at {PARQUET_PATH}, skipping") return if not CSV_PATH.exists(): download_with_progress(URL, CSV_PATH) else: print(f"CSV already exists at {CSV_PATH}, skipping download") convert_to_parquet(CSV_PATH, PARQUET_PATH) if __name__ == "__main__": main()