From 3b9ad11d71c6c8365210fad6aad8d60ee97ff8f8 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sat, 31 Jan 2026 19:52:21 +0000 Subject: [PATCH] Extract common download utils --- pipeline/download/arcgis.py | 40 ++------------ pipeline/download/broadband.py | 58 +++++++------------- pipeline/download/deprivation_data.py | 24 ++------- pipeline/download/naptan.py | 14 ++++- pipeline/download/ofsted.py | 24 ++------- pipeline/download/price_paid.py | 32 ++--------- pipeline/transform/merge.py | 78 +++++++++++++++++++++------ pipeline/utils/__init__.py | 3 ++ pipeline/utils/download.py | 40 ++++++++++++++ 9 files changed, 152 insertions(+), 161 deletions(-) create mode 100644 pipeline/utils/download.py diff --git a/pipeline/download/arcgis.py b/pipeline/download/arcgis.py index de0fa0c..40fb0d0 100644 --- a/pipeline/download/arcgis.py +++ b/pipeline/download/arcgis.py @@ -1,47 +1,13 @@ import argparse import tempfile -import zipfile -import httpx import polars as pl from pathlib import Path -from tqdm import tqdm + +from pipeline.utils import download, extract_zip URL = "https://www.arcgis.com/sharing/rest/content/items/077631e063eb4e1ab43575d01381ec33/data" -def download_with_progress(url: str, output_path: Path) -> None: - with httpx.stream( - "GET", - url, - follow_redirects=True, - timeout=httpx.Timeout(30.0, read=None), - ) as response: - response.raise_for_status() # pyright: ignore[reportUnusedCallResult] - total = int(response.headers.get("content-length", 0)) - - with ( - open(output_path, "wb") as f, - tqdm( - total=total, - unit="B", - unit_scale=True, - unit_divisor=1024, - desc="Downloading", - ) as pbar, - ): - for chunk in response.iter_bytes(chunk_size=8192): - f.write(chunk) - pbar.update(len(chunk)) - return - - -def extract_zip(zip_path: Path, extract_path: Path) -> None: - extract_path.mkdir(exist_ok=True) - - with zipfile.ZipFile(zip_path, "r") as zf: - zf.extractall(extract_path) - - def convert_to_parquet(data_path: Path, parquet_path: Path) -> None: df = pl.scan_csv(data_path / "Data/NSPL_MAY_2025_UK.csv", try_parse_dates=True) print(f"Columns: {df.collect_schema().names()}") @@ -63,7 +29,7 @@ def main() -> None: download_path = Path(cache_dir) / "arcgis_data.zip" extract_path = Path(cache_dir) / "arcgis_extracted" - download_with_progress(URL, download_path) + download(URL, download_path) extract_zip(download_path, extract_path) convert_to_parquet(extract_path, args.output) diff --git a/pipeline/download/broadband.py b/pipeline/download/broadband.py index eab0e44..cc70577 100644 --- a/pipeline/download/broadband.py +++ b/pipeline/download/broadband.py @@ -1,32 +1,13 @@ import argparse -import zipfile -import httpx +import tempfile import polars as pl from pathlib import Path -import tempfile -from tqdm import tqdm + +from pipeline.utils import download, extract_zip # Ofcom Connected Nations 2025 - Fixed broadband performance (output area & local authority level) # Source: https://www.ofcom.org.uk/phones-and-broadband/coverage-and-speeds/connected-nations-20252/data-downloads-2025 -PERFORMANCE_URL = "https://www.ofcom.org.uk/siteassets/resources/documents/research-and-data/multi-sector/infrastructure-research/connected-nations-2025/202507_fixed_broadband_performance_r01.zip" - - -def download_with_progress(url: str, output_path: Path) -> None: - with httpx.stream("GET", url, follow_redirects=True, timeout=120) as response: - response.raise_for_status() # pyright: ignore[reportUnusedCallResult] - total = int(response.headers.get("content-length", 0)) - with ( - open(output_path, "wb") as f, - tqdm(total=total, unit="B", unit_scale=True, desc="Downloading") as pbar, - ): - for chunk in response.iter_bytes(chunk_size=8192): - f.write(chunk) - pbar.update(len(chunk)) - - -def extract_zip(zip_path: Path, extract_dir: Path) -> None: - with zipfile.ZipFile(zip_path, "r") as z: - z.extractall(extract_dir) +PERFORMANCE_URL = "https://www.ofcom.org.uk/siteassets/resources/documents/research-and-data/multi-sector/infrastructure-research/connected-nations-2025/202507_fixed_broadband_coverage_r01.zip?v=407830" def convert_to_parquet(extract_dir: Path, parquet_path: Path) -> None: @@ -35,22 +16,21 @@ def convert_to_parquet(extract_dir: Path, parquet_path: Path) -> None: if not csv_files: raise FileNotFoundError(f"No CSV files found in {extract_dir}") - print(f"Found CSV files: {[f.name for f in csv_files]}") + print(f"Found {len(csv_files)} CSV files: {[f.name for f in csv_files]}") - # Read and concatenate all CSVs (typically split by geography level) frames = [] for csv_file in sorted(csv_files): print(f"Reading {csv_file.name}...") df = pl.read_csv(csv_file, infer_schema_length=10000, encoding="utf8-lossy") - print(f" Shape: {df.shape}, Columns: {df.columns}") - frames.append((csv_file.stem, df)) + print(f" Shape: {df.shape}") + frames.append(df) - # Save each CSV as a separate parquet file in the output directory - parquet_path.mkdir(parents=True, exist_ok=True) - for name, df in frames: - out = parquet_path / f"{name}.parquet" - df.write_parquet(out, compression="zstd") - print(f"Saved {out} ({df.shape[0]} rows)") + combined = pl.concat(frames, how="diagonal_relaxed") + print(f"Combined shape: {combined.shape}") + + parquet_path.parent.mkdir(parents=True, exist_ok=True) + combined.write_parquet(parquet_path, compression="zstd") + print(f"Saved {parquet_path} ({combined.shape[0]} rows)") def main() -> None: @@ -61,20 +41,22 @@ def main() -> None: "--output", type=Path, required=True, - help="Output directory for parquet files", + help="Output parquet file path", ) args = parser.parse_args() - with tempfile.TemporaryDirectory() as cache_dir: + with tempfile.TemporaryDirectory(delete=False) as cache_dir: cache = Path(cache_dir) zip_path = cache / "broadband_performance.zip" extract_dir = cache / "extracted" - extract_dir.mkdir() + extracted_again_dir = cache / "extracted-again" - download_with_progress(PERFORMANCE_URL, zip_path) + download(PERFORMANCE_URL, zip_path) extract_zip(zip_path, extract_dir) - convert_to_parquet(extract_dir, args.output) + print(list((extract_dir / "202507_fixed_coverage_r01").glob("*"))) + extract_zip(extract_dir / "202507_fixed_coverage_r01" / "202507_fixed_pc_coverage_r01.zip", extracted_again_dir) + convert_to_parquet(extracted_again_dir, args.output) if __name__ == "__main__": main() diff --git a/pipeline/download/deprivation_data.py b/pipeline/download/deprivation_data.py index db75969..b4ad3eb 100644 --- a/pipeline/download/deprivation_data.py +++ b/pipeline/download/deprivation_data.py @@ -1,29 +1,13 @@ import argparse -import httpx +import tempfile import polars as pl from pathlib import Path -import tempfile + +from pipeline.utils import download URL = "https://assets.publishing.service.gov.uk/media/691ded34513046b952c500bd/File_5_IoD2025_Scores_for_the_Indices_of_Deprivation.xlsx" -def download_file(url: str, output_path: Path) -> None: - with httpx.stream("GET", url, follow_redirects=True, timeout=60) as response: - response.raise_for_status() - total = int(response.headers.get("content-length", 0)) - downloaded = 0 - with open(output_path, "wb") as f: - for chunk in response.iter_bytes(chunk_size=8192): - f.write(chunk) - downloaded += len(chunk) - if total: - print( - f"\rDownloaded {downloaded / 1024 / 1024:.1f} MB / {total / 1024 / 1024:.1f} MB", - end="", - ) - print(f"\nSaved to {output_path}") - - def convert_to_parquet(xlsx_path: Path, parquet_path: Path) -> None: print("Reading Excel file (sheet 2)...") @@ -51,7 +35,7 @@ def main() -> None: with tempfile.TemporaryDirectory() as cache_dir: xlsx_path = Path(cache_dir) / "IoD2025_Scores.xlsx" - download_file(URL, xlsx_path) + download(URL, xlsx_path, timeout=60) convert_to_parquet(xlsx_path, args.output) diff --git a/pipeline/download/naptan.py b/pipeline/download/naptan.py index 52f823a..3935c21 100644 --- a/pipeline/download/naptan.py +++ b/pipeline/download/naptan.py @@ -10,6 +10,17 @@ import polars as pl NAPTAN_CSV_URL = "https://naptan.api.dft.gov.uk/v1/access-nodes?dataFormat=csv" +STOP_TYPES = { + 'AIR': "Airport", + 'FTD': "Ferry", + "RSE": "Rail station", + "BCT": "Bus stop", + "BCE": "Bus station", + "TXR": "Taxi rank", + "TMU": "Metro or Tram stop", +} + + def download_naptan(output: Path) -> None: output.parent.mkdir(parents=True, exist_ok=True) @@ -26,10 +37,11 @@ def download_naptan(output: Path) -> None: pl.col("Longitude").cast(pl.Float64, strict=False), ) .drop_nulls(subset=["Latitude", "Longitude"]) + .filter(pl.col("StopType").is_in(list(STOP_TYPES.keys()))) .select( pl.col("ATCOCode").alias("id"), pl.col("CommonName").alias("name"), - pl.col("StopType").alias("category"), + pl.col("StopType").replace(STOP_TYPES).alias("category"), pl.col("Latitude").alias("lat"), pl.col("Longitude").alias("lng"), ) diff --git a/pipeline/download/ofsted.py b/pipeline/download/ofsted.py index b783ae9..f86c471 100644 --- a/pipeline/download/ofsted.py +++ b/pipeline/download/ofsted.py @@ -1,31 +1,15 @@ import argparse -import httpx +import tempfile import polars as pl from pathlib import Path -import tempfile + +from pipeline.utils import download # Management information - state-funded schools - latest inspections (as at 30 Apr 2025) # Source: https://www.gov.uk/government/statistical-data-sets/monthly-management-information-ofsteds-school-inspections-outcomes URL = "https://assets.publishing.service.gov.uk/media/681cd390275cb67b18d870fc/Management_information_-_state-funded_schools_-_latest_inspections_as_at_30_Apr_2025.csv" -def download_file(url: str, output_path: Path) -> None: - with httpx.stream("GET", url, follow_redirects=True, timeout=60) as response: - response.raise_for_status() # pyright: ignore[reportUnusedCallResult] - total = int(response.headers.get("content-length", 0)) - downloaded = 0 - with open(output_path, "wb") as f: - for chunk in response.iter_bytes(chunk_size=8192): - f.write(chunk) - downloaded += len(chunk) - if total: - print( - f"\rDownloaded {downloaded / 1024 / 1024:.1f} MB / {total / 1024 / 1024:.1f} MB", - end="", - ) - print(f"\nSaved to {output_path}") - - def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None: print("Reading CSV...") @@ -54,7 +38,7 @@ def main() -> None: with tempfile.TemporaryDirectory() as cache_dir: csv_path = Path(cache_dir) / "ofsted_latest_inspections.csv" - download_file(URL, csv_path) + download(URL, csv_path, timeout=60) convert_to_parquet(csv_path, args.output) diff --git a/pipeline/download/price_paid.py b/pipeline/download/price_paid.py index 56ebc4e..a186636 100644 --- a/pipeline/download/price_paid.py +++ b/pipeline/download/price_paid.py @@ -1,39 +1,13 @@ import argparse import tempfile -import httpx import polars as pl from pathlib import Path -from tqdm import tqdm + +from pipeline.utils import download URL = "http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv" -def download_with_progress(url: str, output_path: Path) -> None: - with httpx.stream( - "GET", - url, - follow_redirects=True, - timeout=httpx.Timeout(30.0, read=None), - ) as response: - response.raise_for_status() # pyright: ignore[reportUnusedCallResult] - total = int(response.headers.get("content-length", 0)) - - with ( - open(output_path, "wb") as f, - tqdm( - total=total, - unit="B", - unit_scale=True, - unit_divisor=1024, - desc="Downloading", - ) as pbar, - ): - for chunk in response.iter_bytes(chunk_size=8192): - f.write(chunk) - pbar.update(len(chunk)) - return - - def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None: """Convert CSV to Parquet using Polars.""" print("Converting to Parquet...") @@ -84,7 +58,7 @@ def main() -> None: with tempfile.TemporaryDirectory() as cache_dir: csv_path = Path(cache_dir) / "price-paid-complete.csv" - download_with_progress(URL, csv_path) + download(URL, csv_path) convert_to_parquet(csv_path, args.output) diff --git a/pipeline/transform/merge.py b/pipeline/transform/merge.py index b5081b6..e672b78 100644 --- a/pipeline/transform/merge.py +++ b/pipeline/transform/merge.py @@ -6,11 +6,14 @@ from pathlib import Path def _build_wide( epc_pp_path: Path, arcgis_path: Path, - iod_path: Path | None, - poi_proximity_path: Path | None, - journey_times_path: Path | None, - ethnicity_path: Path | None, - crime_path: Path | None, + iod_path: Path, + poi_proximity_path: Path, + journey_times_path: Path, + ethnicity_path: Path, + crime_path: Path , + noise_path: Path , + ofsted_path: Path, + broadband_path: Path, ) -> pl.DataFrame: """Build the wide dataframe by joining epc_pp with all auxiliary data.""" print("Scanning epc_pp...") @@ -23,19 +26,18 @@ def _build_wide( "lat", pl.col("long").alias("lon"), "lsoa21", + "oa21", ) wide = wide.join(arcgis, on="postcode", how="inner") - # Journey times (optional) - if journey_times_path and journey_times_path.exists(): - print("Joining journey times...") - journey_times = pl.scan_parquet(journey_times_path).select( - "postcode", - "public_transport_easy_minutes", - "public_transport_quick_minutes", - "cycling_minutes", - ) - wide = wide.join(journey_times, on="postcode", how="left") + print("Joining journey times...") + journey_times = pl.scan_parquet(journey_times_path).select( + "postcode", + "public_transport_easy_minutes", + "public_transport_quick_minutes", + "cycling_minutes", + ) + wide = wide.join(journey_times, on="postcode", how="left") print("Joining IoD scores...") iod = pl.scan_parquet(iod_path) @@ -60,6 +62,32 @@ def _build_wide( poi_counts = pl.scan_parquet(poi_proximity_path) wide = wide.join(poi_counts, on="postcode", how="left") + noise = pl.scan_parquet(noise_path).select("postcode", "road_noise_lden_db") + wide = wide.join(noise, on="postcode", how="left") + + print("Joining Ofsted school ratings...") + ofsted = ( + pl.scan_parquet(ofsted_path) + .filter(pl.col("Overall effectiveness").is_in(["1", "2", "3", "4"])) + .select( + pl.col("Postcode").alias("ofsted_postcode"), + pl.col("Overall effectiveness").cast(pl.UInt8).alias("ofsted_rating"), + ) + .group_by("ofsted_postcode") + .agg(pl.col("ofsted_rating").mean().round(1).alias("ofsted_avg_rating")) + ) + wide = wide.join(ofsted, left_on="postcode", right_on="ofsted_postcode", how="left") + + print("Joining broadband performance...") + broadband = pl.scan_parquet(broadband_path).select( + "output_area", + pl.col("Average max download speed (Mbit/s) for lines >=900Mbit/s").alias("broadband_max_download_900plus"), + pl.col("Average max download speed (Mbit/s) for lines 100<300Mbit/s").alias("broadband_max_download_100_300"), + pl.col("Average max download speed (Mbit/s) for lines 30<100Mbit/s").alias("broadband_max_download_30_100"), + pl.col("Average max upload speed (Mbit/s) for lines >=900Mbit/s").alias("broadband_max_upload_900plus"), + ) + wide = wide.join(broadband, left_on="oa21", right_on="output_area", how="left") + # Convert construction_age_band to numeric year wide = wide.with_columns( pl.col("construction_age_band") @@ -103,6 +131,7 @@ def _build_wide( "Income Deprivation Affecting Children Index (IDACI) Score (rate)", "Barriers to Housing and Services Score", "lsoa21", + "oa21", "pp_property_type", "built_form", ) @@ -124,6 +153,12 @@ def _build_wide( "public_transport_2km": "Public transport within 2km", "latest_price": "Last known price", "number_habitable_rooms": "Rooms (including bedrooms & bathrooms)", + "road_noise_lden_db": "Road noise Lden (dB)", + "ofsted_avg_rating": "Ofsted avg rating (1=Outstanding, 4=Inadequate)" + "broadband_max_download_900plus": "Broadband download speed 900+ Mbps", + "broadband_max_download_100_300": "Broadband download speed 100-300 Mbps", + "broadband_max_download_30_100": "Broadband download speed 30-100 Mbps", + "broadband_max_upload_900plus": "Broadband upload speed 900+ Mbps", } ) ) @@ -131,7 +166,6 @@ def _build_wide( print("Collecting with streaming engine...") return wide.collect(engine="streaming") - def main(): parser = argparse.ArgumentParser( description="Build wide property dataframe with all joins" @@ -159,6 +193,15 @@ def main(): parser.add_argument( "--crime", type=Path, required=True, help="Crime by LSOA parquet file (optional)" ) + parser.add_argument( + "--noise", type=Path, required=True, help="Road noise by postcode parquet file" + ) + parser.add_argument( + "--ofsted", type=Path, required=True, help="Ofsted school inspection parquet file" + ) + parser.add_argument( + "--broadband", type=Path, required=True, help="Broadband performance by output area parquet file" + ) parser.add_argument( "--output", type=Path, required=True, help="Output parquet file path" ) @@ -172,6 +215,9 @@ def main(): journey_times_path=args.journey_times, ethnicity_path=args.ethnicity, crime_path=args.crime, + noise_path=args.noise, + ofsted_path=args.ofsted, + broadband_path=args.broadband, ) print(f"Columns: {wide.columns}") diff --git a/pipeline/utils/__init__.py b/pipeline/utils/__init__.py index cb7663a..772d4ae 100644 --- a/pipeline/utils/__init__.py +++ b/pipeline/utils/__init__.py @@ -1,8 +1,11 @@ +from .download import download, extract_zip from .fuzzy_join import fuzzy_join_on_postcode from .haversine import haversine_km, haversine_km_expr from .poi_counts import POI_GROUPS, count_pois_within_radius __all__ = [ + "download", + "extract_zip", "fuzzy_join_on_postcode", "haversine_km", "haversine_km_expr", diff --git a/pipeline/utils/download.py b/pipeline/utils/download.py new file mode 100644 index 0000000..558184f --- /dev/null +++ b/pipeline/utils/download.py @@ -0,0 +1,40 @@ +"""Shared download and extraction helpers for pipeline scripts.""" + +import zipfile +from pathlib import Path + +import httpx +from tqdm import tqdm + + +def download(url: str, output_path: Path, *, timeout: float = 120) -> None: + """Stream-download a URL to a local file with a tqdm progress bar.""" + with httpx.stream( + "GET", + url, + follow_redirects=True, + timeout=httpx.Timeout(30.0, read=timeout), + ) as response: + response.raise_for_status() # pyright: ignore[reportUnusedCallResult] + total = int(response.headers.get("content-length", 0)) + + with ( + open(output_path, "wb") as f, + tqdm( + total=total or None, + unit="B", + unit_scale=True, + unit_divisor=1024, + desc=output_path.name, + ) as pbar, + ): + for chunk in response.iter_bytes(chunk_size=8192): + f.write(chunk) + pbar.update(len(chunk)) + + +def extract_zip(zip_path: Path, extract_dir: Path) -> None: + """Extract a ZIP archive into the given directory.""" + extract_dir.mkdir(parents=True, exist_ok=True) + with zipfile.ZipFile(zip_path, "r") as zf: + zf.extractall(extract_dir)