diff --git a/pipeline/download/broadband.py b/pipeline/download/broadband.py new file mode 100644 index 0000000..eab0e44 --- /dev/null +++ b/pipeline/download/broadband.py @@ -0,0 +1,80 @@ +import argparse +import zipfile +import httpx +import polars as pl +from pathlib import Path +import tempfile +from tqdm import tqdm + +# Ofcom Connected Nations 2025 - Fixed broadband performance (output area & local authority level) +# Source: https://www.ofcom.org.uk/phones-and-broadband/coverage-and-speeds/connected-nations-20252/data-downloads-2025 +PERFORMANCE_URL = "https://www.ofcom.org.uk/siteassets/resources/documents/research-and-data/multi-sector/infrastructure-research/connected-nations-2025/202507_fixed_broadband_performance_r01.zip" + + +def download_with_progress(url: str, output_path: Path) -> None: + with httpx.stream("GET", url, follow_redirects=True, timeout=120) as response: + response.raise_for_status() # pyright: ignore[reportUnusedCallResult] + total = int(response.headers.get("content-length", 0)) + with ( + open(output_path, "wb") as f, + tqdm(total=total, unit="B", unit_scale=True, desc="Downloading") as pbar, + ): + for chunk in response.iter_bytes(chunk_size=8192): + f.write(chunk) + pbar.update(len(chunk)) + + +def extract_zip(zip_path: Path, extract_dir: Path) -> None: + with zipfile.ZipFile(zip_path, "r") as z: + z.extractall(extract_dir) + + +def convert_to_parquet(extract_dir: Path, parquet_path: Path) -> None: + # Find CSV files in the extracted directory + csv_files = list(extract_dir.rglob("*.csv")) + if not csv_files: + raise FileNotFoundError(f"No CSV files found in {extract_dir}") + + print(f"Found CSV files: {[f.name for f in csv_files]}") + + # Read and concatenate all CSVs (typically split by geography level) + frames = [] + for csv_file in sorted(csv_files): + print(f"Reading {csv_file.name}...") + df = pl.read_csv(csv_file, infer_schema_length=10000, encoding="utf8-lossy") + print(f" Shape: {df.shape}, Columns: {df.columns}") + frames.append((csv_file.stem, df)) + + # Save each CSV as a separate parquet file in the output directory + parquet_path.mkdir(parents=True, exist_ok=True) + for name, df in frames: + out = parquet_path / f"{name}.parquet" + df.write_parquet(out, compression="zstd") + print(f"Saved {out} ({df.shape[0]} rows)") + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Download Ofcom broadband performance data" + ) + parser.add_argument( + "--output", + type=Path, + required=True, + help="Output directory for parquet files", + ) + args = parser.parse_args() + + with tempfile.TemporaryDirectory() as cache_dir: + cache = Path(cache_dir) + zip_path = cache / "broadband_performance.zip" + extract_dir = cache / "extracted" + extract_dir.mkdir() + + download_with_progress(PERFORMANCE_URL, zip_path) + extract_zip(zip_path, extract_dir) + convert_to_parquet(extract_dir, args.output) + + +if __name__ == "__main__": + main() diff --git a/pipeline/download/naptan.py b/pipeline/download/naptan.py new file mode 100644 index 0000000..52f823a --- /dev/null +++ b/pipeline/download/naptan.py @@ -0,0 +1,57 @@ +"""Download NaPTAN data and extract railway/metro station POIs.""" + +import argparse +import io +import urllib.request +from pathlib import Path + +import polars as pl + +NAPTAN_CSV_URL = "https://naptan.api.dft.gov.uk/v1/access-nodes?dataFormat=csv" + + +def download_naptan(output: Path) -> None: + output.parent.mkdir(parents=True, exist_ok=True) + + print(f"Downloading NaPTAN data from {NAPTAN_CSV_URL}") + with urllib.request.urlopen(NAPTAN_CSV_URL) as resp: + raw = resp.read() + + print(f"Downloaded {len(raw) / (1024 * 1024):.1f} MB") + + df = ( + pl.read_csv(io.BytesIO(raw), infer_schema_length=0) + .with_columns( + pl.col("Latitude").cast(pl.Float64, strict=False), + pl.col("Longitude").cast(pl.Float64, strict=False), + ) + .drop_nulls(subset=["Latitude", "Longitude"]) + .select( + pl.col("ATCOCode").alias("id"), + pl.col("CommonName").alias("name"), + pl.col("StopType").alias("category"), + pl.col("Latitude").alias("lat"), + pl.col("Longitude").alias("lng"), + ) + ) + + df.write_parquet(output) + size_mb = output.stat().st_size / (1024 * 1024) + print(f"Wrote {output} ({size_mb:.1f} MB, {len(df):,} stations)") + + counts = df.group_by("category").len().sort("len", descending=True) + for row in counts.iter_rows(named=True): + print(f" {row['category']}: {row['len']:,}") + + +def main() -> None: + parser = argparse.ArgumentParser(description="Download NaPTAN station data") + parser.add_argument( + "--output", type=Path, required=True, help="Output parquet file path" + ) + args = parser.parse_args() + download_naptan(args.output) + + +if __name__ == "__main__": + main() diff --git a/pipeline/download/ofsted.py b/pipeline/download/ofsted.py new file mode 100644 index 0000000..b783ae9 --- /dev/null +++ b/pipeline/download/ofsted.py @@ -0,0 +1,62 @@ +import argparse +import httpx +import polars as pl +from pathlib import Path +import tempfile + +# Management information - state-funded schools - latest inspections (as at 30 Apr 2025) +# Source: https://www.gov.uk/government/statistical-data-sets/monthly-management-information-ofsteds-school-inspections-outcomes +URL = "https://assets.publishing.service.gov.uk/media/681cd390275cb67b18d870fc/Management_information_-_state-funded_schools_-_latest_inspections_as_at_30_Apr_2025.csv" + + +def download_file(url: str, output_path: Path) -> None: + with httpx.stream("GET", url, follow_redirects=True, timeout=60) as response: + response.raise_for_status() # pyright: ignore[reportUnusedCallResult] + total = int(response.headers.get("content-length", 0)) + downloaded = 0 + with open(output_path, "wb") as f: + for chunk in response.iter_bytes(chunk_size=8192): + f.write(chunk) + downloaded += len(chunk) + if total: + print( + f"\rDownloaded {downloaded / 1024 / 1024:.1f} MB / {total / 1024 / 1024:.1f} MB", + end="", + ) + print(f"\nSaved to {output_path}") + + +def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None: + print("Reading CSV...") + + df = pl.read_csv( + csv_path, + infer_schema_length=10000, + encoding="utf8-lossy", + null_values=["NULL", "Not applicable"], + ) + + print(f"Shape: {df.shape}") + print(f"Columns: {df.columns}") + + df.write_parquet(parquet_path, compression="zstd") + print(f"Saved to {parquet_path}") + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Download Ofsted school inspection outcomes data" + ) + parser.add_argument( + "--output", type=Path, required=True, help="Output parquet file path" + ) + args = parser.parse_args() + + with tempfile.TemporaryDirectory() as cache_dir: + csv_path = Path(cache_dir) / "ofsted_latest_inspections.csv" + download_file(URL, csv_path) + convert_to_parquet(csv_path, args.output) + + +if __name__ == "__main__": + main() diff --git a/pipeline/transform/transform_poi.py b/pipeline/transform/transform_poi.py index de92d24..ac6d98f 100644 --- a/pipeline/transform/transform_poi.py +++ b/pipeline/transform/transform_poi.py @@ -1,4 +1,5 @@ import argparse +import warnings from pathlib import Path import polars as pl