Move pipelines
This commit is contained in:
parent
c0f88602bf
commit
2131da96aa
6 changed files with 166 additions and 252 deletions
|
|
@ -12,9 +12,10 @@ tasks:
|
||||||
deps:
|
deps:
|
||||||
- install
|
- install
|
||||||
cmds:
|
cmds:
|
||||||
- uv run python download_land_registry.py
|
- uv run -m pipeline.download.arcgis
|
||||||
- uv run python download_arcgis_data.py
|
- uv run -m pipeline.download.pois
|
||||||
- uv run python -m pipeline.pois
|
- uv run -m pipeline.download.deprivation_data
|
||||||
|
- uv run -m pipeline.download.price_paid
|
||||||
|
|
||||||
pipeline:
|
pipeline:
|
||||||
desc: Run data processing pipeline
|
desc: Run data processing pipeline
|
||||||
|
|
|
||||||
|
|
@ -1,129 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
"""Download ArcGIS data and convert to Parquet."""
|
|
||||||
|
|
||||||
# Run it with:
|
|
||||||
# uv run download_arcgis_data.py
|
|
||||||
|
|
||||||
import time
|
|
||||||
import zipfile
|
|
||||||
import httpx
|
|
||||||
import polars as pl
|
|
||||||
from pathlib import Path
|
|
||||||
from tqdm import tqdm
|
|
||||||
|
|
||||||
URL = "https://www.arcgis.com/sharing/rest/content/items/077631e063eb4e1ab43575d01381ec33/data"
|
|
||||||
|
|
||||||
BASE_DATA_PATH = Path("./data_sources")
|
|
||||||
BASE_DATA_PATH.mkdir(exist_ok=True)
|
|
||||||
DOWNLOAD_PATH = BASE_DATA_PATH / "arcgis_data.zip"
|
|
||||||
EXTRACT_PATH = BASE_DATA_PATH / "arcgis_extracted"
|
|
||||||
PARQUET_PATH = BASE_DATA_PATH / "arcgis_data.parquet"
|
|
||||||
|
|
||||||
MAX_RETRIES = 3
|
|
||||||
|
|
||||||
|
|
||||||
def download_with_progress(url: str, output_path: Path) -> None:
|
|
||||||
"""Download a file with progress bar and retry logic."""
|
|
||||||
for attempt in range(1, MAX_RETRIES + 1):
|
|
||||||
try:
|
|
||||||
with httpx.stream(
|
|
||||||
"GET",
|
|
||||||
url,
|
|
||||||
follow_redirects=True,
|
|
||||||
timeout=httpx.Timeout(30.0, read=None),
|
|
||||||
) as response:
|
|
||||||
response.raise_for_status() # pyright: ignore[reportUnusedCallResult]
|
|
||||||
total = int(response.headers.get("content-length", 0))
|
|
||||||
|
|
||||||
with (
|
|
||||||
open(output_path, "wb") as f,
|
|
||||||
tqdm(
|
|
||||||
total=total,
|
|
||||||
unit="B",
|
|
||||||
unit_scale=True,
|
|
||||||
unit_divisor=1024,
|
|
||||||
desc="Downloading",
|
|
||||||
) as pbar,
|
|
||||||
):
|
|
||||||
for chunk in response.iter_bytes(chunk_size=8192):
|
|
||||||
f.write(chunk)
|
|
||||||
pbar.update(len(chunk))
|
|
||||||
return # Success
|
|
||||||
except (httpx.ConnectError, httpx.ReadTimeout) as e:
|
|
||||||
if attempt < MAX_RETRIES:
|
|
||||||
wait = 2**attempt
|
|
||||||
print(f"Attempt {attempt} failed: {e}. Retrying in {wait}s...")
|
|
||||||
time.sleep(wait)
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
def extract_zip(zip_path: Path, extract_path: Path) -> list[Path]:
|
|
||||||
"""Extract ZIP file and return list of extracted files."""
|
|
||||||
print("Extracting ZIP file...")
|
|
||||||
extract_path.mkdir(exist_ok=True)
|
|
||||||
|
|
||||||
with zipfile.ZipFile(zip_path, "r") as zf:
|
|
||||||
zf.extractall(extract_path)
|
|
||||||
return [extract_path / name for name in zf.namelist()]
|
|
||||||
|
|
||||||
|
|
||||||
def find_data_file(extract_path: Path) -> Path:
|
|
||||||
"""Find the main data file (CSV, XLSX, or similar) in extracted files."""
|
|
||||||
# Look for common data file extensions
|
|
||||||
for ext in ["*.csv", "*.xlsx", "*.xls", "*.json", "*.geojson"]:
|
|
||||||
files = list(extract_path.rglob(ext))
|
|
||||||
if files:
|
|
||||||
# Return the largest file if multiple found
|
|
||||||
return max(files, key=lambda f: f.stat().st_size)
|
|
||||||
|
|
||||||
raise FileNotFoundError(f"No data file found in {extract_path}")
|
|
||||||
|
|
||||||
|
|
||||||
def convert_to_parquet(data_path: Path, parquet_path: Path) -> None:
|
|
||||||
"""Convert data file to Parquet using Polars."""
|
|
||||||
print(f"Converting {data_path.name} to Parquet...")
|
|
||||||
|
|
||||||
suffix = data_path.suffix.lower()
|
|
||||||
|
|
||||||
if suffix == ".csv":
|
|
||||||
df = pl.read_csv(data_path, try_parse_dates=True)
|
|
||||||
elif suffix in [".xlsx", ".xls"]:
|
|
||||||
df = pl.read_excel(data_path)
|
|
||||||
elif suffix in [".json", ".geojson"]:
|
|
||||||
df = pl.read_json(data_path)
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Unsupported file format: {suffix}")
|
|
||||||
|
|
||||||
df.write_parquet(parquet_path, compression="zstd")
|
|
||||||
print(f"Saved to {parquet_path}")
|
|
||||||
print(f"Rows: {df.height:,}")
|
|
||||||
print(f"Columns: {df.columns}")
|
|
||||||
print(f"Original size: {data_path.stat().st_size / 1024**2:.1f} MB")
|
|
||||||
print(f"Parquet size: {parquet_path.stat().st_size / 1024**2:.1f} MB")
|
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
|
||||||
if PARQUET_PATH.exists():
|
|
||||||
print(f"Parquet already exists at {PARQUET_PATH}, skipping")
|
|
||||||
return
|
|
||||||
|
|
||||||
if not DOWNLOAD_PATH.exists():
|
|
||||||
download_with_progress(URL, DOWNLOAD_PATH)
|
|
||||||
else:
|
|
||||||
print(f"File already exists at {DOWNLOAD_PATH}, skipping download")
|
|
||||||
|
|
||||||
# Check if it's a ZIP file
|
|
||||||
if zipfile.is_zipfile(DOWNLOAD_PATH):
|
|
||||||
extracted_files = extract_zip(DOWNLOAD_PATH, EXTRACT_PATH)
|
|
||||||
print(f"Extracted {len(extracted_files)} files")
|
|
||||||
data_file = find_data_file(EXTRACT_PATH)
|
|
||||||
else:
|
|
||||||
# Not a ZIP, treat as direct data file
|
|
||||||
data_file = DOWNLOAD_PATH
|
|
||||||
|
|
||||||
convert_to_parquet(data_file, PARQUET_PATH)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
@ -1,114 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
"""Download Land Registry price paid data and convert to Parquet."""
|
|
||||||
|
|
||||||
# Run it with:
|
|
||||||
# uv run download_land_registry.py
|
|
||||||
|
|
||||||
# The download failed in this environment due to network restrictions, but the script will work on your local machine. The ~5GB CSV should compress to roughly ~1GB in Parquet format with ZSTD compression.
|
|
||||||
|
|
||||||
import time
|
|
||||||
import httpx
|
|
||||||
import polars as pl
|
|
||||||
from pathlib import Path
|
|
||||||
from tqdm import tqdm
|
|
||||||
|
|
||||||
URL = "http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv"
|
|
||||||
|
|
||||||
BASE_DATA_PATH = Path("./data_sources")
|
|
||||||
BASE_DATA_PATH.mkdir(exist_ok=True)
|
|
||||||
CSV_PATH = BASE_DATA_PATH / "pp-complete.csv"
|
|
||||||
PARQUET_PATH = BASE_DATA_PATH / "pp-complete.parquet"
|
|
||||||
|
|
||||||
MAX_RETRIES = 3
|
|
||||||
|
|
||||||
|
|
||||||
def download_with_progress(url: str, output_path: Path) -> None:
|
|
||||||
"""Download a file with progress bar and retry logic."""
|
|
||||||
for attempt in range(1, MAX_RETRIES + 1):
|
|
||||||
try:
|
|
||||||
with httpx.stream(
|
|
||||||
"GET",
|
|
||||||
url,
|
|
||||||
follow_redirects=True,
|
|
||||||
timeout=httpx.Timeout(30.0, read=None),
|
|
||||||
) as response:
|
|
||||||
response.raise_for_status() # pyright: ignore[reportUnusedCallResult]
|
|
||||||
total = int(response.headers.get("content-length", 0))
|
|
||||||
|
|
||||||
with (
|
|
||||||
open(output_path, "wb") as f,
|
|
||||||
tqdm(
|
|
||||||
total=total,
|
|
||||||
unit="B",
|
|
||||||
unit_scale=True,
|
|
||||||
unit_divisor=1024,
|
|
||||||
desc="Downloading",
|
|
||||||
) as pbar,
|
|
||||||
):
|
|
||||||
for chunk in response.iter_bytes(chunk_size=8192):
|
|
||||||
f.write(chunk)
|
|
||||||
pbar.update(len(chunk))
|
|
||||||
return # Success
|
|
||||||
except (httpx.ConnectError, httpx.ReadTimeout) as e:
|
|
||||||
if attempt < MAX_RETRIES:
|
|
||||||
wait = 2**attempt
|
|
||||||
print(f"Attempt {attempt} failed: {e}. Retrying in {wait}s...")
|
|
||||||
time.sleep(wait)
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None:
|
|
||||||
"""Convert CSV to Parquet using Polars."""
|
|
||||||
print("Converting to Parquet...")
|
|
||||||
|
|
||||||
# https://www.gov.uk/guidance/about-the-price-paid-data
|
|
||||||
# Land Registry CSV columns
|
|
||||||
columns = [
|
|
||||||
"transaction_id",
|
|
||||||
"price",
|
|
||||||
"date_of_transfer",
|
|
||||||
"postcode",
|
|
||||||
"property_type",
|
|
||||||
"old_new",
|
|
||||||
"duration",
|
|
||||||
"paon",
|
|
||||||
"saon",
|
|
||||||
"street",
|
|
||||||
"locality",
|
|
||||||
"town_city",
|
|
||||||
"district",
|
|
||||||
"county",
|
|
||||||
"ppd_category",
|
|
||||||
"record_status",
|
|
||||||
]
|
|
||||||
|
|
||||||
df = pl.read_csv(
|
|
||||||
csv_path,
|
|
||||||
has_header=False,
|
|
||||||
new_columns=columns,
|
|
||||||
try_parse_dates=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
df.write_parquet(parquet_path, compression="zstd")
|
|
||||||
print(f"Saved to {parquet_path}")
|
|
||||||
print(f"Rows: {df.height:,}")
|
|
||||||
print(f"CSV size: {csv_path.stat().st_size / 1024**2:.1f} MB")
|
|
||||||
print(f"Parquet size: {parquet_path.stat().st_size / 1024**2:.1f} MB")
|
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
|
||||||
if PARQUET_PATH.exists():
|
|
||||||
print(f"Parquet already exists at {PARQUET_PATH}, skipping")
|
|
||||||
return
|
|
||||||
|
|
||||||
if not CSV_PATH.exists():
|
|
||||||
download_with_progress(URL, CSV_PATH)
|
|
||||||
else:
|
|
||||||
print(f"CSV already exists at {CSV_PATH}, skipping download")
|
|
||||||
|
|
||||||
convert_to_parquet(CSV_PATH, PARQUET_PATH)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
71
pipeline/download/arcgis.py
Normal file
71
pipeline/download/arcgis.py
Normal file
|
|
@ -0,0 +1,71 @@
|
||||||
|
import zipfile
|
||||||
|
import httpx
|
||||||
|
import polars as pl
|
||||||
|
from pathlib import Path
|
||||||
|
from tqdm import tqdm
|
||||||
|
|
||||||
|
URL = "https://www.arcgis.com/sharing/rest/content/items/077631e063eb4e1ab43575d01381ec33/data"
|
||||||
|
|
||||||
|
BASE_DATA_PATH = Path("./data_sources")
|
||||||
|
BASE_DATA_PATH.mkdir(exist_ok=True)
|
||||||
|
DOWNLOAD_PATH = BASE_DATA_PATH / "arcgis_data.zip"
|
||||||
|
EXTRACT_PATH = BASE_DATA_PATH / "arcgis_extracted"
|
||||||
|
PARQUET_PATH = BASE_DATA_PATH / "arcgis_data.parquet"
|
||||||
|
|
||||||
|
|
||||||
|
def download_with_progress(url: str, output_path: Path) -> None:
|
||||||
|
with httpx.stream(
|
||||||
|
"GET",
|
||||||
|
url,
|
||||||
|
follow_redirects=True,
|
||||||
|
timeout=httpx.Timeout(30.0, read=None),
|
||||||
|
) as response:
|
||||||
|
response.raise_for_status() # pyright: ignore[reportUnusedCallResult]
|
||||||
|
total = int(response.headers.get("content-length", 0))
|
||||||
|
|
||||||
|
with (
|
||||||
|
open(output_path, "wb") as f,
|
||||||
|
tqdm(
|
||||||
|
total=total,
|
||||||
|
unit="B",
|
||||||
|
unit_scale=True,
|
||||||
|
unit_divisor=1024,
|
||||||
|
desc="Downloading",
|
||||||
|
) as pbar,
|
||||||
|
):
|
||||||
|
for chunk in response.iter_bytes(chunk_size=8192):
|
||||||
|
f.write(chunk)
|
||||||
|
pbar.update(len(chunk))
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def extract_zip(zip_path: Path, extract_path: Path) -> None:
|
||||||
|
extract_path.mkdir(exist_ok=True)
|
||||||
|
|
||||||
|
with zipfile.ZipFile(zip_path, "r") as zf:
|
||||||
|
zf.extractall(extract_path)
|
||||||
|
|
||||||
|
|
||||||
|
def convert_to_parquet(data_path: Path, parquet_path: Path) -> None:
|
||||||
|
df = pl.scan_csv(data_path / 'Data/NSPL_MAY_2025_UK.csv', try_parse_dates=True)
|
||||||
|
print(f"Columns: {df.columns}")
|
||||||
|
df.sink_parquet(parquet_path, compression="zstd")
|
||||||
|
print(f"Saved to {parquet_path}")
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
if PARQUET_PATH.exists():
|
||||||
|
print(f"Parquet already exists at {PARQUET_PATH}, skipping")
|
||||||
|
return
|
||||||
|
|
||||||
|
if not DOWNLOAD_PATH.exists():
|
||||||
|
download_with_progress(URL, DOWNLOAD_PATH)
|
||||||
|
else:
|
||||||
|
print(f"File already exists at {DOWNLOAD_PATH}, skipping download")
|
||||||
|
|
||||||
|
extract_zip(DOWNLOAD_PATH, EXTRACT_PATH)
|
||||||
|
convert_to_parquet(EXTRACT_PATH, PARQUET_PATH)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
@ -1,6 +1,3 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
"""Download IoD2025 Deprivation Scores and convert to Parquet."""
|
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
import polars as pl
|
import polars as pl
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
@ -14,8 +11,6 @@ PARQUET_PATH = BASE_DATA_PATH / "IoD2025_Scores.parquet"
|
||||||
|
|
||||||
|
|
||||||
def download_file(url: str, output_path: Path) -> None:
|
def download_file(url: str, output_path: Path) -> None:
|
||||||
"""Download file from URL."""
|
|
||||||
print(f"Downloading from {url}...")
|
|
||||||
with httpx.stream("GET", url, follow_redirects=True, timeout=60) as response:
|
with httpx.stream("GET", url, follow_redirects=True, timeout=60) as response:
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
total = int(response.headers.get("content-length", 0))
|
total = int(response.headers.get("content-length", 0))
|
||||||
|
|
@ -33,7 +28,6 @@ def download_file(url: str, output_path: Path) -> None:
|
||||||
|
|
||||||
|
|
||||||
def convert_to_parquet(xlsx_path: Path, parquet_path: Path) -> None:
|
def convert_to_parquet(xlsx_path: Path, parquet_path: Path) -> None:
|
||||||
"""Convert Excel sheet 2 to Parquet."""
|
|
||||||
print("Reading Excel file (sheet 2)...")
|
print("Reading Excel file (sheet 2)...")
|
||||||
|
|
||||||
# Read the 2nd sheet (index 1) - IoD2025 Scores
|
# Read the 2nd sheet (index 1) - IoD2025 Scores
|
||||||
91
pipeline/download/price_paid.py
Normal file
91
pipeline/download/price_paid.py
Normal file
|
|
@ -0,0 +1,91 @@
|
||||||
|
import httpx
|
||||||
|
import polars as pl
|
||||||
|
from pathlib import Path
|
||||||
|
from tqdm import tqdm
|
||||||
|
|
||||||
|
URL = "http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv"
|
||||||
|
|
||||||
|
BASE_DATA_PATH = Path("./data_sources")
|
||||||
|
BASE_DATA_PATH.mkdir(exist_ok=True)
|
||||||
|
CSV_PATH = BASE_DATA_PATH / "price-paid-complete.csv"
|
||||||
|
PARQUET_PATH = BASE_DATA_PATH / "price-paid-complete.parquet"
|
||||||
|
|
||||||
|
|
||||||
|
def download_with_progress(url: str, output_path: Path) -> None:
|
||||||
|
with httpx.stream(
|
||||||
|
"GET",
|
||||||
|
url,
|
||||||
|
follow_redirects=True,
|
||||||
|
timeout=httpx.Timeout(30.0, read=None),
|
||||||
|
) as response:
|
||||||
|
response.raise_for_status() # pyright: ignore[reportUnusedCallResult]
|
||||||
|
total = int(response.headers.get("content-length", 0))
|
||||||
|
|
||||||
|
with (
|
||||||
|
open(output_path, "wb") as f,
|
||||||
|
tqdm(
|
||||||
|
total=total,
|
||||||
|
unit="B",
|
||||||
|
unit_scale=True,
|
||||||
|
unit_divisor=1024,
|
||||||
|
desc="Downloading",
|
||||||
|
) as pbar,
|
||||||
|
):
|
||||||
|
for chunk in response.iter_bytes(chunk_size=8192):
|
||||||
|
f.write(chunk)
|
||||||
|
pbar.update(len(chunk))
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None:
|
||||||
|
"""Convert CSV to Parquet using Polars."""
|
||||||
|
print("Converting to Parquet...")
|
||||||
|
|
||||||
|
# https://www.gov.uk/guidance/about-the-price-paid-data
|
||||||
|
# Land Registry CSV columns
|
||||||
|
columns = [
|
||||||
|
"transaction_id",
|
||||||
|
"price",
|
||||||
|
"date_of_transfer",
|
||||||
|
"postcode",
|
||||||
|
"property_type",
|
||||||
|
"old_new",
|
||||||
|
"duration",
|
||||||
|
"paon",
|
||||||
|
"saon",
|
||||||
|
"street",
|
||||||
|
"locality",
|
||||||
|
"town_city",
|
||||||
|
"district",
|
||||||
|
"county",
|
||||||
|
"ppd_category",
|
||||||
|
"record_status",
|
||||||
|
]
|
||||||
|
|
||||||
|
df = pl.read_csv(
|
||||||
|
csv_path,
|
||||||
|
has_header=False,
|
||||||
|
new_columns=columns,
|
||||||
|
try_parse_dates=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
df.write_parquet(parquet_path, compression="zstd")
|
||||||
|
print(f"Saved to {parquet_path}")
|
||||||
|
print(f"Rows: {df.height:,}")
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
if PARQUET_PATH.exists():
|
||||||
|
print(f"Parquet already exists at {PARQUET_PATH}, skipping")
|
||||||
|
return
|
||||||
|
|
||||||
|
if not CSV_PATH.exists():
|
||||||
|
download_with_progress(URL, CSV_PATH)
|
||||||
|
else:
|
||||||
|
print(f"CSV already exists at {CSV_PATH}, skipping download")
|
||||||
|
|
||||||
|
convert_to_parquet(CSV_PATH, PARQUET_PATH)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Add table
Add a link
Reference in a new issue