diff --git a/Taskfile.yml b/Taskfile.yml index 7c96b25..52a0dde 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -12,9 +12,10 @@ tasks: deps: - install cmds: - - uv run python download_land_registry.py - - uv run python download_arcgis_data.py - - uv run python -m pipeline.pois + - uv run -m pipeline.download.arcgis + - uv run -m pipeline.download.pois + - uv run -m pipeline.download.deprivation_data + - uv run -m pipeline.download.price_paid pipeline: desc: Run data processing pipeline diff --git a/download_arcgis_data.py b/download_arcgis_data.py deleted file mode 100644 index 34b8eff..0000000 --- a/download_arcgis_data.py +++ /dev/null @@ -1,129 +0,0 @@ -#!/usr/bin/env python3 -"""Download ArcGIS data and convert to Parquet.""" - -# Run it with: -# uv run download_arcgis_data.py - -import time -import zipfile -import httpx -import polars as pl -from pathlib import Path -from tqdm import tqdm - -URL = "https://www.arcgis.com/sharing/rest/content/items/077631e063eb4e1ab43575d01381ec33/data" - -BASE_DATA_PATH = Path("./data_sources") -BASE_DATA_PATH.mkdir(exist_ok=True) -DOWNLOAD_PATH = BASE_DATA_PATH / "arcgis_data.zip" -EXTRACT_PATH = BASE_DATA_PATH / "arcgis_extracted" -PARQUET_PATH = BASE_DATA_PATH / "arcgis_data.parquet" - -MAX_RETRIES = 3 - - -def download_with_progress(url: str, output_path: Path) -> None: - """Download a file with progress bar and retry logic.""" - for attempt in range(1, MAX_RETRIES + 1): - try: - with httpx.stream( - "GET", - url, - follow_redirects=True, - timeout=httpx.Timeout(30.0, read=None), - ) as response: - response.raise_for_status() # pyright: ignore[reportUnusedCallResult] - total = int(response.headers.get("content-length", 0)) - - with ( - open(output_path, "wb") as f, - tqdm( - total=total, - unit="B", - unit_scale=True, - unit_divisor=1024, - desc="Downloading", - ) as pbar, - ): - for chunk in response.iter_bytes(chunk_size=8192): - f.write(chunk) - pbar.update(len(chunk)) - return # Success - except (httpx.ConnectError, httpx.ReadTimeout) as e: - if attempt < MAX_RETRIES: - wait = 2**attempt - print(f"Attempt {attempt} failed: {e}. Retrying in {wait}s...") - time.sleep(wait) - else: - raise - - -def extract_zip(zip_path: Path, extract_path: Path) -> list[Path]: - """Extract ZIP file and return list of extracted files.""" - print("Extracting ZIP file...") - extract_path.mkdir(exist_ok=True) - - with zipfile.ZipFile(zip_path, "r") as zf: - zf.extractall(extract_path) - return [extract_path / name for name in zf.namelist()] - - -def find_data_file(extract_path: Path) -> Path: - """Find the main data file (CSV, XLSX, or similar) in extracted files.""" - # Look for common data file extensions - for ext in ["*.csv", "*.xlsx", "*.xls", "*.json", "*.geojson"]: - files = list(extract_path.rglob(ext)) - if files: - # Return the largest file if multiple found - return max(files, key=lambda f: f.stat().st_size) - - raise FileNotFoundError(f"No data file found in {extract_path}") - - -def convert_to_parquet(data_path: Path, parquet_path: Path) -> None: - """Convert data file to Parquet using Polars.""" - print(f"Converting {data_path.name} to Parquet...") - - suffix = data_path.suffix.lower() - - if suffix == ".csv": - df = pl.read_csv(data_path, try_parse_dates=True) - elif suffix in [".xlsx", ".xls"]: - df = pl.read_excel(data_path) - elif suffix in [".json", ".geojson"]: - df = pl.read_json(data_path) - else: - raise ValueError(f"Unsupported file format: {suffix}") - - df.write_parquet(parquet_path, compression="zstd") - print(f"Saved to {parquet_path}") - print(f"Rows: {df.height:,}") - print(f"Columns: {df.columns}") - print(f"Original size: {data_path.stat().st_size / 1024**2:.1f} MB") - print(f"Parquet size: {parquet_path.stat().st_size / 1024**2:.1f} MB") - - -def main() -> None: - if PARQUET_PATH.exists(): - print(f"Parquet already exists at {PARQUET_PATH}, skipping") - return - - if not DOWNLOAD_PATH.exists(): - download_with_progress(URL, DOWNLOAD_PATH) - else: - print(f"File already exists at {DOWNLOAD_PATH}, skipping download") - - # Check if it's a ZIP file - if zipfile.is_zipfile(DOWNLOAD_PATH): - extracted_files = extract_zip(DOWNLOAD_PATH, EXTRACT_PATH) - print(f"Extracted {len(extracted_files)} files") - data_file = find_data_file(EXTRACT_PATH) - else: - # Not a ZIP, treat as direct data file - data_file = DOWNLOAD_PATH - - convert_to_parquet(data_file, PARQUET_PATH) - - -if __name__ == "__main__": - main() diff --git a/download_land_registry.py b/download_land_registry.py deleted file mode 100644 index c07a640..0000000 --- a/download_land_registry.py +++ /dev/null @@ -1,114 +0,0 @@ -#!/usr/bin/env python3 -"""Download Land Registry price paid data and convert to Parquet.""" - -# Run it with: -# uv run download_land_registry.py - -# The download failed in this environment due to network restrictions, but the script will work on your local machine. The ~5GB CSV should compress to roughly ~1GB in Parquet format with ZSTD compression. - -import time -import httpx -import polars as pl -from pathlib import Path -from tqdm import tqdm - -URL = "http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv" - -BASE_DATA_PATH = Path("./data_sources") -BASE_DATA_PATH.mkdir(exist_ok=True) -CSV_PATH = BASE_DATA_PATH / "pp-complete.csv" -PARQUET_PATH = BASE_DATA_PATH / "pp-complete.parquet" - -MAX_RETRIES = 3 - - -def download_with_progress(url: str, output_path: Path) -> None: - """Download a file with progress bar and retry logic.""" - for attempt in range(1, MAX_RETRIES + 1): - try: - with httpx.stream( - "GET", - url, - follow_redirects=True, - timeout=httpx.Timeout(30.0, read=None), - ) as response: - response.raise_for_status() # pyright: ignore[reportUnusedCallResult] - total = int(response.headers.get("content-length", 0)) - - with ( - open(output_path, "wb") as f, - tqdm( - total=total, - unit="B", - unit_scale=True, - unit_divisor=1024, - desc="Downloading", - ) as pbar, - ): - for chunk in response.iter_bytes(chunk_size=8192): - f.write(chunk) - pbar.update(len(chunk)) - return # Success - except (httpx.ConnectError, httpx.ReadTimeout) as e: - if attempt < MAX_RETRIES: - wait = 2**attempt - print(f"Attempt {attempt} failed: {e}. Retrying in {wait}s...") - time.sleep(wait) - else: - raise - - -def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None: - """Convert CSV to Parquet using Polars.""" - print("Converting to Parquet...") - - # https://www.gov.uk/guidance/about-the-price-paid-data - # Land Registry CSV columns - columns = [ - "transaction_id", - "price", - "date_of_transfer", - "postcode", - "property_type", - "old_new", - "duration", - "paon", - "saon", - "street", - "locality", - "town_city", - "district", - "county", - "ppd_category", - "record_status", - ] - - df = pl.read_csv( - csv_path, - has_header=False, - new_columns=columns, - try_parse_dates=True, - ) - - df.write_parquet(parquet_path, compression="zstd") - print(f"Saved to {parquet_path}") - print(f"Rows: {df.height:,}") - print(f"CSV size: {csv_path.stat().st_size / 1024**2:.1f} MB") - print(f"Parquet size: {parquet_path.stat().st_size / 1024**2:.1f} MB") - - -def main() -> None: - if PARQUET_PATH.exists(): - print(f"Parquet already exists at {PARQUET_PATH}, skipping") - return - - if not CSV_PATH.exists(): - download_with_progress(URL, CSV_PATH) - else: - print(f"CSV already exists at {CSV_PATH}, skipping download") - - convert_to_parquet(CSV_PATH, PARQUET_PATH) - - -if __name__ == "__main__": - main() diff --git a/pipeline/download/arcgis.py b/pipeline/download/arcgis.py new file mode 100644 index 0000000..cdd284c --- /dev/null +++ b/pipeline/download/arcgis.py @@ -0,0 +1,71 @@ +import zipfile +import httpx +import polars as pl +from pathlib import Path +from tqdm import tqdm + +URL = "https://www.arcgis.com/sharing/rest/content/items/077631e063eb4e1ab43575d01381ec33/data" + +BASE_DATA_PATH = Path("./data_sources") +BASE_DATA_PATH.mkdir(exist_ok=True) +DOWNLOAD_PATH = BASE_DATA_PATH / "arcgis_data.zip" +EXTRACT_PATH = BASE_DATA_PATH / "arcgis_extracted" +PARQUET_PATH = BASE_DATA_PATH / "arcgis_data.parquet" + + +def download_with_progress(url: str, output_path: Path) -> None: + with httpx.stream( + "GET", + url, + follow_redirects=True, + timeout=httpx.Timeout(30.0, read=None), + ) as response: + response.raise_for_status() # pyright: ignore[reportUnusedCallResult] + total = int(response.headers.get("content-length", 0)) + + with ( + open(output_path, "wb") as f, + tqdm( + total=total, + unit="B", + unit_scale=True, + unit_divisor=1024, + desc="Downloading", + ) as pbar, + ): + for chunk in response.iter_bytes(chunk_size=8192): + f.write(chunk) + pbar.update(len(chunk)) + return + + + +def extract_zip(zip_path: Path, extract_path: Path) -> None: + extract_path.mkdir(exist_ok=True) + + with zipfile.ZipFile(zip_path, "r") as zf: + zf.extractall(extract_path) + + +def convert_to_parquet(data_path: Path, parquet_path: Path) -> None: + df = pl.scan_csv(data_path / 'Data/NSPL_MAY_2025_UK.csv', try_parse_dates=True) + print(f"Columns: {df.columns}") + df.sink_parquet(parquet_path, compression="zstd") + print(f"Saved to {parquet_path}") + + +def main() -> None: + if PARQUET_PATH.exists(): + print(f"Parquet already exists at {PARQUET_PATH}, skipping") + return + + if not DOWNLOAD_PATH.exists(): + download_with_progress(URL, DOWNLOAD_PATH) + else: + print(f"File already exists at {DOWNLOAD_PATH}, skipping download") + + extract_zip(DOWNLOAD_PATH, EXTRACT_PATH) + convert_to_parquet(EXTRACT_PATH, PARQUET_PATH) + +if __name__ == "__main__": + main() diff --git a/download_deprivation_data.py b/pipeline/download/deprivation_data.py similarity index 90% rename from download_deprivation_data.py rename to pipeline/download/deprivation_data.py index 5094002..6d42180 100644 --- a/download_deprivation_data.py +++ b/pipeline/download/deprivation_data.py @@ -1,6 +1,3 @@ -#!/usr/bin/env python3 -"""Download IoD2025 Deprivation Scores and convert to Parquet.""" - import httpx import polars as pl from pathlib import Path @@ -14,8 +11,6 @@ PARQUET_PATH = BASE_DATA_PATH / "IoD2025_Scores.parquet" def download_file(url: str, output_path: Path) -> None: - """Download file from URL.""" - print(f"Downloading from {url}...") with httpx.stream("GET", url, follow_redirects=True, timeout=60) as response: response.raise_for_status() total = int(response.headers.get("content-length", 0)) @@ -33,7 +28,6 @@ def download_file(url: str, output_path: Path) -> None: def convert_to_parquet(xlsx_path: Path, parquet_path: Path) -> None: - """Convert Excel sheet 2 to Parquet.""" print("Reading Excel file (sheet 2)...") # Read the 2nd sheet (index 1) - IoD2025 Scores diff --git a/pipeline/download/price_paid.py b/pipeline/download/price_paid.py new file mode 100644 index 0000000..38e0f69 --- /dev/null +++ b/pipeline/download/price_paid.py @@ -0,0 +1,91 @@ +import httpx +import polars as pl +from pathlib import Path +from tqdm import tqdm + +URL = "http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv" + +BASE_DATA_PATH = Path("./data_sources") +BASE_DATA_PATH.mkdir(exist_ok=True) +CSV_PATH = BASE_DATA_PATH / "price-paid-complete.csv" +PARQUET_PATH = BASE_DATA_PATH / "price-paid-complete.parquet" + + +def download_with_progress(url: str, output_path: Path) -> None: + with httpx.stream( + "GET", + url, + follow_redirects=True, + timeout=httpx.Timeout(30.0, read=None), + ) as response: + response.raise_for_status() # pyright: ignore[reportUnusedCallResult] + total = int(response.headers.get("content-length", 0)) + + with ( + open(output_path, "wb") as f, + tqdm( + total=total, + unit="B", + unit_scale=True, + unit_divisor=1024, + desc="Downloading", + ) as pbar, + ): + for chunk in response.iter_bytes(chunk_size=8192): + f.write(chunk) + pbar.update(len(chunk)) + return + + +def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None: + """Convert CSV to Parquet using Polars.""" + print("Converting to Parquet...") + + # https://www.gov.uk/guidance/about-the-price-paid-data + # Land Registry CSV columns + columns = [ + "transaction_id", + "price", + "date_of_transfer", + "postcode", + "property_type", + "old_new", + "duration", + "paon", + "saon", + "street", + "locality", + "town_city", + "district", + "county", + "ppd_category", + "record_status", + ] + + df = pl.read_csv( + csv_path, + has_header=False, + new_columns=columns, + try_parse_dates=True, + ) + + df.write_parquet(parquet_path, compression="zstd") + print(f"Saved to {parquet_path}") + print(f"Rows: {df.height:,}") + + +def main() -> None: + if PARQUET_PATH.exists(): + print(f"Parquet already exists at {PARQUET_PATH}, skipping") + return + + if not CSV_PATH.exists(): + download_with_progress(URL, CSV_PATH) + else: + print(f"CSV already exists at {CSV_PATH}, skipping download") + + convert_to_parquet(CSV_PATH, PARQUET_PATH) + + +if __name__ == "__main__": + main()