107 lines
3.2 KiB
Python
107 lines
3.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Download Land Registry price paid data and convert to Parquet."""
|
|
|
|
# Run it with:
|
|
# uv run download_land_registry.py
|
|
|
|
# The download failed in this environment due to network restrictions, but the script will work on your local machine. The ~5GB CSV should compress to roughly ~1GB in Parquet format with ZSTD compression.
|
|
|
|
import time
|
|
import httpx
|
|
import polars as pl
|
|
from pathlib import Path
|
|
from tqdm import tqdm
|
|
|
|
URL = "http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv"
|
|
|
|
BASE_DATA_PATH = Path("./data_sources")
|
|
BASE_DATA_PATH.mkdir(exist_ok=True)
|
|
CSV_PATH = BASE_DATA_PATH / "pp-complete.csv"
|
|
PARQUET_PATH = BASE_DATA_PATH / "pp-complete.parquet"
|
|
|
|
MAX_RETRIES = 3
|
|
|
|
|
|
def download_with_progress(url: str, output_path: Path) -> None:
|
|
"""Download a file with progress bar and retry logic."""
|
|
for attempt in range(1, MAX_RETRIES + 1):
|
|
try:
|
|
with httpx.stream(
|
|
"GET",
|
|
url,
|
|
follow_redirects=True,
|
|
timeout=httpx.Timeout(30.0, read=None),
|
|
) as response:
|
|
response.raise_for_status() # pyright: ignore[reportUnusedCallResult]
|
|
total = int(response.headers.get("content-length", 0))
|
|
|
|
with open(output_path, "wb") as f, tqdm(
|
|
total=total,
|
|
unit="B",
|
|
unit_scale=True,
|
|
unit_divisor=1024,
|
|
desc="Downloading",
|
|
) as pbar:
|
|
for chunk in response.iter_bytes(chunk_size=8192):
|
|
f.write(chunk)
|
|
pbar.update(len(chunk))
|
|
return # Success
|
|
except (httpx.ConnectError, httpx.ReadTimeout) as e:
|
|
if attempt < MAX_RETRIES:
|
|
wait = 2**attempt
|
|
print(f"Attempt {attempt} failed: {e}. Retrying in {wait}s...")
|
|
time.sleep(wait)
|
|
else:
|
|
raise
|
|
|
|
|
|
def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None:
|
|
"""Convert CSV to Parquet using Polars."""
|
|
print("Converting to Parquet...")
|
|
|
|
# https://www.gov.uk/guidance/about-the-price-paid-data
|
|
# Land Registry CSV columns
|
|
columns = [
|
|
"transaction_id",
|
|
"price",
|
|
"date_of_transfer",
|
|
"postcode",
|
|
"property_type",
|
|
"old_new",
|
|
"duration",
|
|
"paon",
|
|
"saon",
|
|
"street",
|
|
"locality",
|
|
"town_city",
|
|
"district",
|
|
"county",
|
|
"ppd_category",
|
|
"record_status",
|
|
]
|
|
|
|
df = pl.read_csv(
|
|
csv_path,
|
|
has_header=False,
|
|
new_columns=columns,
|
|
try_parse_dates=True,
|
|
)
|
|
|
|
df.write_parquet(parquet_path, compression="zstd")
|
|
print(f"Saved to {parquet_path}")
|
|
print(f"Rows: {df.height:,}")
|
|
print(f"CSV size: {csv_path.stat().st_size / 1024**2:.1f} MB")
|
|
print(f"Parquet size: {parquet_path.stat().st_size / 1024**2:.1f} MB")
|
|
|
|
|
|
def main() -> None:
|
|
if not CSV_PATH.exists():
|
|
download_with_progress(URL, CSV_PATH)
|
|
else:
|
|
print(f"CSV already exists at {CSV_PATH}, skipping download")
|
|
|
|
convert_to_parquet(CSV_PATH, PARQUET_PATH)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|