Extract common download utils

This commit is contained in:
Andras Schmelczer 2026-01-31 19:52:21 +00:00
parent 6ddb3d2121
commit 3b9ad11d71
9 changed files with 152 additions and 161 deletions

View file

@ -1,47 +1,13 @@
import argparse import argparse
import tempfile import tempfile
import zipfile
import httpx
import polars as pl import polars as pl
from pathlib import Path from pathlib import Path
from tqdm import tqdm
from pipeline.utils import download, extract_zip
URL = "https://www.arcgis.com/sharing/rest/content/items/077631e063eb4e1ab43575d01381ec33/data" URL = "https://www.arcgis.com/sharing/rest/content/items/077631e063eb4e1ab43575d01381ec33/data"
def download_with_progress(url: str, output_path: Path) -> None:
with httpx.stream(
"GET",
url,
follow_redirects=True,
timeout=httpx.Timeout(30.0, read=None),
) as response:
response.raise_for_status() # pyright: ignore[reportUnusedCallResult]
total = int(response.headers.get("content-length", 0))
with (
open(output_path, "wb") as f,
tqdm(
total=total,
unit="B",
unit_scale=True,
unit_divisor=1024,
desc="Downloading",
) as pbar,
):
for chunk in response.iter_bytes(chunk_size=8192):
f.write(chunk)
pbar.update(len(chunk))
return
def extract_zip(zip_path: Path, extract_path: Path) -> None:
extract_path.mkdir(exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(extract_path)
def convert_to_parquet(data_path: Path, parquet_path: Path) -> None: def convert_to_parquet(data_path: Path, parquet_path: Path) -> None:
df = pl.scan_csv(data_path / "Data/NSPL_MAY_2025_UK.csv", try_parse_dates=True) df = pl.scan_csv(data_path / "Data/NSPL_MAY_2025_UK.csv", try_parse_dates=True)
print(f"Columns: {df.collect_schema().names()}") print(f"Columns: {df.collect_schema().names()}")
@ -63,7 +29,7 @@ def main() -> None:
download_path = Path(cache_dir) / "arcgis_data.zip" download_path = Path(cache_dir) / "arcgis_data.zip"
extract_path = Path(cache_dir) / "arcgis_extracted" extract_path = Path(cache_dir) / "arcgis_extracted"
download_with_progress(URL, download_path) download(URL, download_path)
extract_zip(download_path, extract_path) extract_zip(download_path, extract_path)
convert_to_parquet(extract_path, args.output) convert_to_parquet(extract_path, args.output)

View file

@ -1,32 +1,13 @@
import argparse import argparse
import zipfile import tempfile
import httpx
import polars as pl import polars as pl
from pathlib import Path from pathlib import Path
import tempfile
from tqdm import tqdm from pipeline.utils import download, extract_zip
# Ofcom Connected Nations 2025 - Fixed broadband performance (output area & local authority level) # Ofcom Connected Nations 2025 - Fixed broadband performance (output area & local authority level)
# Source: https://www.ofcom.org.uk/phones-and-broadband/coverage-and-speeds/connected-nations-20252/data-downloads-2025 # Source: https://www.ofcom.org.uk/phones-and-broadband/coverage-and-speeds/connected-nations-20252/data-downloads-2025
PERFORMANCE_URL = "https://www.ofcom.org.uk/siteassets/resources/documents/research-and-data/multi-sector/infrastructure-research/connected-nations-2025/202507_fixed_broadband_performance_r01.zip" PERFORMANCE_URL = "https://www.ofcom.org.uk/siteassets/resources/documents/research-and-data/multi-sector/infrastructure-research/connected-nations-2025/202507_fixed_broadband_coverage_r01.zip?v=407830"
def download_with_progress(url: str, output_path: Path) -> None:
with httpx.stream("GET", url, follow_redirects=True, timeout=120) as response:
response.raise_for_status() # pyright: ignore[reportUnusedCallResult]
total = int(response.headers.get("content-length", 0))
with (
open(output_path, "wb") as f,
tqdm(total=total, unit="B", unit_scale=True, desc="Downloading") as pbar,
):
for chunk in response.iter_bytes(chunk_size=8192):
f.write(chunk)
pbar.update(len(chunk))
def extract_zip(zip_path: Path, extract_dir: Path) -> None:
with zipfile.ZipFile(zip_path, "r") as z:
z.extractall(extract_dir)
def convert_to_parquet(extract_dir: Path, parquet_path: Path) -> None: def convert_to_parquet(extract_dir: Path, parquet_path: Path) -> None:
@ -35,22 +16,21 @@ def convert_to_parquet(extract_dir: Path, parquet_path: Path) -> None:
if not csv_files: if not csv_files:
raise FileNotFoundError(f"No CSV files found in {extract_dir}") raise FileNotFoundError(f"No CSV files found in {extract_dir}")
print(f"Found CSV files: {[f.name for f in csv_files]}") print(f"Found {len(csv_files)} CSV files: {[f.name for f in csv_files]}")
# Read and concatenate all CSVs (typically split by geography level)
frames = [] frames = []
for csv_file in sorted(csv_files): for csv_file in sorted(csv_files):
print(f"Reading {csv_file.name}...") print(f"Reading {csv_file.name}...")
df = pl.read_csv(csv_file, infer_schema_length=10000, encoding="utf8-lossy") df = pl.read_csv(csv_file, infer_schema_length=10000, encoding="utf8-lossy")
print(f" Shape: {df.shape}, Columns: {df.columns}") print(f" Shape: {df.shape}")
frames.append((csv_file.stem, df)) frames.append(df)
# Save each CSV as a separate parquet file in the output directory combined = pl.concat(frames, how="diagonal_relaxed")
parquet_path.mkdir(parents=True, exist_ok=True) print(f"Combined shape: {combined.shape}")
for name, df in frames:
out = parquet_path / f"{name}.parquet" parquet_path.parent.mkdir(parents=True, exist_ok=True)
df.write_parquet(out, compression="zstd") combined.write_parquet(parquet_path, compression="zstd")
print(f"Saved {out} ({df.shape[0]} rows)") print(f"Saved {parquet_path} ({combined.shape[0]} rows)")
def main() -> None: def main() -> None:
@ -61,20 +41,22 @@ def main() -> None:
"--output", "--output",
type=Path, type=Path,
required=True, required=True,
help="Output directory for parquet files", help="Output parquet file path",
) )
args = parser.parse_args() args = parser.parse_args()
with tempfile.TemporaryDirectory() as cache_dir: with tempfile.TemporaryDirectory(delete=False) as cache_dir:
cache = Path(cache_dir) cache = Path(cache_dir)
zip_path = cache / "broadband_performance.zip" zip_path = cache / "broadband_performance.zip"
extract_dir = cache / "extracted" extract_dir = cache / "extracted"
extract_dir.mkdir() extracted_again_dir = cache / "extracted-again"
download_with_progress(PERFORMANCE_URL, zip_path) download(PERFORMANCE_URL, zip_path)
extract_zip(zip_path, extract_dir) extract_zip(zip_path, extract_dir)
convert_to_parquet(extract_dir, args.output) print(list((extract_dir / "202507_fixed_coverage_r01").glob("*")))
extract_zip(extract_dir / "202507_fixed_coverage_r01" / "202507_fixed_pc_coverage_r01.zip", extracted_again_dir)
convert_to_parquet(extracted_again_dir, args.output)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View file

@ -1,29 +1,13 @@
import argparse import argparse
import httpx import tempfile
import polars as pl import polars as pl
from pathlib import Path from pathlib import Path
import tempfile
from pipeline.utils import download
URL = "https://assets.publishing.service.gov.uk/media/691ded34513046b952c500bd/File_5_IoD2025_Scores_for_the_Indices_of_Deprivation.xlsx" URL = "https://assets.publishing.service.gov.uk/media/691ded34513046b952c500bd/File_5_IoD2025_Scores_for_the_Indices_of_Deprivation.xlsx"
def download_file(url: str, output_path: Path) -> None:
with httpx.stream("GET", url, follow_redirects=True, timeout=60) as response:
response.raise_for_status()
total = int(response.headers.get("content-length", 0))
downloaded = 0
with open(output_path, "wb") as f:
for chunk in response.iter_bytes(chunk_size=8192):
f.write(chunk)
downloaded += len(chunk)
if total:
print(
f"\rDownloaded {downloaded / 1024 / 1024:.1f} MB / {total / 1024 / 1024:.1f} MB",
end="",
)
print(f"\nSaved to {output_path}")
def convert_to_parquet(xlsx_path: Path, parquet_path: Path) -> None: def convert_to_parquet(xlsx_path: Path, parquet_path: Path) -> None:
print("Reading Excel file (sheet 2)...") print("Reading Excel file (sheet 2)...")
@ -51,7 +35,7 @@ def main() -> None:
with tempfile.TemporaryDirectory() as cache_dir: with tempfile.TemporaryDirectory() as cache_dir:
xlsx_path = Path(cache_dir) / "IoD2025_Scores.xlsx" xlsx_path = Path(cache_dir) / "IoD2025_Scores.xlsx"
download_file(URL, xlsx_path) download(URL, xlsx_path, timeout=60)
convert_to_parquet(xlsx_path, args.output) convert_to_parquet(xlsx_path, args.output)

View file

@ -10,6 +10,17 @@ import polars as pl
NAPTAN_CSV_URL = "https://naptan.api.dft.gov.uk/v1/access-nodes?dataFormat=csv" NAPTAN_CSV_URL = "https://naptan.api.dft.gov.uk/v1/access-nodes?dataFormat=csv"
STOP_TYPES = {
'AIR': "Airport",
'FTD': "Ferry",
"RSE": "Rail station",
"BCT": "Bus stop",
"BCE": "Bus station",
"TXR": "Taxi rank",
"TMU": "Metro or Tram stop",
}
def download_naptan(output: Path) -> None: def download_naptan(output: Path) -> None:
output.parent.mkdir(parents=True, exist_ok=True) output.parent.mkdir(parents=True, exist_ok=True)
@ -26,10 +37,11 @@ def download_naptan(output: Path) -> None:
pl.col("Longitude").cast(pl.Float64, strict=False), pl.col("Longitude").cast(pl.Float64, strict=False),
) )
.drop_nulls(subset=["Latitude", "Longitude"]) .drop_nulls(subset=["Latitude", "Longitude"])
.filter(pl.col("StopType").is_in(list(STOP_TYPES.keys())))
.select( .select(
pl.col("ATCOCode").alias("id"), pl.col("ATCOCode").alias("id"),
pl.col("CommonName").alias("name"), pl.col("CommonName").alias("name"),
pl.col("StopType").alias("category"), pl.col("StopType").replace(STOP_TYPES).alias("category"),
pl.col("Latitude").alias("lat"), pl.col("Latitude").alias("lat"),
pl.col("Longitude").alias("lng"), pl.col("Longitude").alias("lng"),
) )

View file

@ -1,31 +1,15 @@
import argparse import argparse
import httpx import tempfile
import polars as pl import polars as pl
from pathlib import Path from pathlib import Path
import tempfile
from pipeline.utils import download
# Management information - state-funded schools - latest inspections (as at 30 Apr 2025) # Management information - state-funded schools - latest inspections (as at 30 Apr 2025)
# Source: https://www.gov.uk/government/statistical-data-sets/monthly-management-information-ofsteds-school-inspections-outcomes # Source: https://www.gov.uk/government/statistical-data-sets/monthly-management-information-ofsteds-school-inspections-outcomes
URL = "https://assets.publishing.service.gov.uk/media/681cd390275cb67b18d870fc/Management_information_-_state-funded_schools_-_latest_inspections_as_at_30_Apr_2025.csv" URL = "https://assets.publishing.service.gov.uk/media/681cd390275cb67b18d870fc/Management_information_-_state-funded_schools_-_latest_inspections_as_at_30_Apr_2025.csv"
def download_file(url: str, output_path: Path) -> None:
with httpx.stream("GET", url, follow_redirects=True, timeout=60) as response:
response.raise_for_status() # pyright: ignore[reportUnusedCallResult]
total = int(response.headers.get("content-length", 0))
downloaded = 0
with open(output_path, "wb") as f:
for chunk in response.iter_bytes(chunk_size=8192):
f.write(chunk)
downloaded += len(chunk)
if total:
print(
f"\rDownloaded {downloaded / 1024 / 1024:.1f} MB / {total / 1024 / 1024:.1f} MB",
end="",
)
print(f"\nSaved to {output_path}")
def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None: def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None:
print("Reading CSV...") print("Reading CSV...")
@ -54,7 +38,7 @@ def main() -> None:
with tempfile.TemporaryDirectory() as cache_dir: with tempfile.TemporaryDirectory() as cache_dir:
csv_path = Path(cache_dir) / "ofsted_latest_inspections.csv" csv_path = Path(cache_dir) / "ofsted_latest_inspections.csv"
download_file(URL, csv_path) download(URL, csv_path, timeout=60)
convert_to_parquet(csv_path, args.output) convert_to_parquet(csv_path, args.output)

View file

@ -1,39 +1,13 @@
import argparse import argparse
import tempfile import tempfile
import httpx
import polars as pl import polars as pl
from pathlib import Path from pathlib import Path
from tqdm import tqdm
from pipeline.utils import download
URL = "http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv" URL = "http://prod.publicdata.landregistry.gov.uk.s3-website-eu-west-1.amazonaws.com/pp-complete.csv"
def download_with_progress(url: str, output_path: Path) -> None:
with httpx.stream(
"GET",
url,
follow_redirects=True,
timeout=httpx.Timeout(30.0, read=None),
) as response:
response.raise_for_status() # pyright: ignore[reportUnusedCallResult]
total = int(response.headers.get("content-length", 0))
with (
open(output_path, "wb") as f,
tqdm(
total=total,
unit="B",
unit_scale=True,
unit_divisor=1024,
desc="Downloading",
) as pbar,
):
for chunk in response.iter_bytes(chunk_size=8192):
f.write(chunk)
pbar.update(len(chunk))
return
def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None: def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None:
"""Convert CSV to Parquet using Polars.""" """Convert CSV to Parquet using Polars."""
print("Converting to Parquet...") print("Converting to Parquet...")
@ -84,7 +58,7 @@ def main() -> None:
with tempfile.TemporaryDirectory() as cache_dir: with tempfile.TemporaryDirectory() as cache_dir:
csv_path = Path(cache_dir) / "price-paid-complete.csv" csv_path = Path(cache_dir) / "price-paid-complete.csv"
download_with_progress(URL, csv_path) download(URL, csv_path)
convert_to_parquet(csv_path, args.output) convert_to_parquet(csv_path, args.output)

View file

@ -6,11 +6,14 @@ from pathlib import Path
def _build_wide( def _build_wide(
epc_pp_path: Path, epc_pp_path: Path,
arcgis_path: Path, arcgis_path: Path,
iod_path: Path | None, iod_path: Path,
poi_proximity_path: Path | None, poi_proximity_path: Path,
journey_times_path: Path | None, journey_times_path: Path,
ethnicity_path: Path | None, ethnicity_path: Path,
crime_path: Path | None, crime_path: Path ,
noise_path: Path ,
ofsted_path: Path,
broadband_path: Path,
) -> pl.DataFrame: ) -> pl.DataFrame:
"""Build the wide dataframe by joining epc_pp with all auxiliary data.""" """Build the wide dataframe by joining epc_pp with all auxiliary data."""
print("Scanning epc_pp...") print("Scanning epc_pp...")
@ -23,19 +26,18 @@ def _build_wide(
"lat", "lat",
pl.col("long").alias("lon"), pl.col("long").alias("lon"),
"lsoa21", "lsoa21",
"oa21",
) )
wide = wide.join(arcgis, on="postcode", how="inner") wide = wide.join(arcgis, on="postcode", how="inner")
# Journey times (optional) print("Joining journey times...")
if journey_times_path and journey_times_path.exists(): journey_times = pl.scan_parquet(journey_times_path).select(
print("Joining journey times...") "postcode",
journey_times = pl.scan_parquet(journey_times_path).select( "public_transport_easy_minutes",
"postcode", "public_transport_quick_minutes",
"public_transport_easy_minutes", "cycling_minutes",
"public_transport_quick_minutes", )
"cycling_minutes", wide = wide.join(journey_times, on="postcode", how="left")
)
wide = wide.join(journey_times, on="postcode", how="left")
print("Joining IoD scores...") print("Joining IoD scores...")
iod = pl.scan_parquet(iod_path) iod = pl.scan_parquet(iod_path)
@ -60,6 +62,32 @@ def _build_wide(
poi_counts = pl.scan_parquet(poi_proximity_path) poi_counts = pl.scan_parquet(poi_proximity_path)
wide = wide.join(poi_counts, on="postcode", how="left") wide = wide.join(poi_counts, on="postcode", how="left")
noise = pl.scan_parquet(noise_path).select("postcode", "road_noise_lden_db")
wide = wide.join(noise, on="postcode", how="left")
print("Joining Ofsted school ratings...")
ofsted = (
pl.scan_parquet(ofsted_path)
.filter(pl.col("Overall effectiveness").is_in(["1", "2", "3", "4"]))
.select(
pl.col("Postcode").alias("ofsted_postcode"),
pl.col("Overall effectiveness").cast(pl.UInt8).alias("ofsted_rating"),
)
.group_by("ofsted_postcode")
.agg(pl.col("ofsted_rating").mean().round(1).alias("ofsted_avg_rating"))
)
wide = wide.join(ofsted, left_on="postcode", right_on="ofsted_postcode", how="left")
print("Joining broadband performance...")
broadband = pl.scan_parquet(broadband_path).select(
"output_area",
pl.col("Average max download speed (Mbit/s) for lines >=900Mbit/s").alias("broadband_max_download_900plus"),
pl.col("Average max download speed (Mbit/s) for lines 100<300Mbit/s").alias("broadband_max_download_100_300"),
pl.col("Average max download speed (Mbit/s) for lines 30<100Mbit/s").alias("broadband_max_download_30_100"),
pl.col("Average max upload speed (Mbit/s) for lines >=900Mbit/s").alias("broadband_max_upload_900plus"),
)
wide = wide.join(broadband, left_on="oa21", right_on="output_area", how="left")
# Convert construction_age_band to numeric year # Convert construction_age_band to numeric year
wide = wide.with_columns( wide = wide.with_columns(
pl.col("construction_age_band") pl.col("construction_age_band")
@ -103,6 +131,7 @@ def _build_wide(
"Income Deprivation Affecting Children Index (IDACI) Score (rate)", "Income Deprivation Affecting Children Index (IDACI) Score (rate)",
"Barriers to Housing and Services Score", "Barriers to Housing and Services Score",
"lsoa21", "lsoa21",
"oa21",
"pp_property_type", "pp_property_type",
"built_form", "built_form",
) )
@ -124,6 +153,12 @@ def _build_wide(
"public_transport_2km": "Public transport within 2km", "public_transport_2km": "Public transport within 2km",
"latest_price": "Last known price", "latest_price": "Last known price",
"number_habitable_rooms": "Rooms (including bedrooms & bathrooms)", "number_habitable_rooms": "Rooms (including bedrooms & bathrooms)",
"road_noise_lden_db": "Road noise Lden (dB)",
"ofsted_avg_rating": "Ofsted avg rating (1=Outstanding, 4=Inadequate)"
"broadband_max_download_900plus": "Broadband download speed 900+ Mbps",
"broadband_max_download_100_300": "Broadband download speed 100-300 Mbps",
"broadband_max_download_30_100": "Broadband download speed 30-100 Mbps",
"broadband_max_upload_900plus": "Broadband upload speed 900+ Mbps",
} }
) )
) )
@ -131,7 +166,6 @@ def _build_wide(
print("Collecting with streaming engine...") print("Collecting with streaming engine...")
return wide.collect(engine="streaming") return wide.collect(engine="streaming")
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Build wide property dataframe with all joins" description="Build wide property dataframe with all joins"
@ -159,6 +193,15 @@ def main():
parser.add_argument( parser.add_argument(
"--crime", type=Path, required=True, help="Crime by LSOA parquet file (optional)" "--crime", type=Path, required=True, help="Crime by LSOA parquet file (optional)"
) )
parser.add_argument(
"--noise", type=Path, required=True, help="Road noise by postcode parquet file"
)
parser.add_argument(
"--ofsted", type=Path, required=True, help="Ofsted school inspection parquet file"
)
parser.add_argument(
"--broadband", type=Path, required=True, help="Broadband performance by output area parquet file"
)
parser.add_argument( parser.add_argument(
"--output", type=Path, required=True, help="Output parquet file path" "--output", type=Path, required=True, help="Output parquet file path"
) )
@ -172,6 +215,9 @@ def main():
journey_times_path=args.journey_times, journey_times_path=args.journey_times,
ethnicity_path=args.ethnicity, ethnicity_path=args.ethnicity,
crime_path=args.crime, crime_path=args.crime,
noise_path=args.noise,
ofsted_path=args.ofsted,
broadband_path=args.broadband,
) )
print(f"Columns: {wide.columns}") print(f"Columns: {wide.columns}")

View file

@ -1,8 +1,11 @@
from .download import download, extract_zip
from .fuzzy_join import fuzzy_join_on_postcode from .fuzzy_join import fuzzy_join_on_postcode
from .haversine import haversine_km, haversine_km_expr from .haversine import haversine_km, haversine_km_expr
from .poi_counts import POI_GROUPS, count_pois_within_radius from .poi_counts import POI_GROUPS, count_pois_within_radius
__all__ = [ __all__ = [
"download",
"extract_zip",
"fuzzy_join_on_postcode", "fuzzy_join_on_postcode",
"haversine_km", "haversine_km",
"haversine_km_expr", "haversine_km_expr",

View file

@ -0,0 +1,40 @@
"""Shared download and extraction helpers for pipeline scripts."""
import zipfile
from pathlib import Path
import httpx
from tqdm import tqdm
def download(url: str, output_path: Path, *, timeout: float = 120) -> None:
"""Stream-download a URL to a local file with a tqdm progress bar."""
with httpx.stream(
"GET",
url,
follow_redirects=True,
timeout=httpx.Timeout(30.0, read=timeout),
) as response:
response.raise_for_status() # pyright: ignore[reportUnusedCallResult]
total = int(response.headers.get("content-length", 0))
with (
open(output_path, "wb") as f,
tqdm(
total=total or None,
unit="B",
unit_scale=True,
unit_divisor=1024,
desc=output_path.name,
) as pbar,
):
for chunk in response.iter_bytes(chunk_size=8192):
f.write(chunk)
pbar.update(len(chunk))
def extract_zip(zip_path: Path, extract_dir: Path) -> None:
"""Extract a ZIP archive into the given directory."""
extract_dir.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(extract_dir)