Rerun prepare script

This commit is contained in:
Andras Schmelczer 2026-04-06 11:13:52 +01:00
parent 349a6c1d53
commit 8614acdfae
24 changed files with 1132 additions and 226 deletions

View file

@ -5,11 +5,26 @@ from pathlib import Path
from pipeline.utils import download, extract_zip
URL = "https://www.arcgis.com/sharing/rest/content/items/077631e063eb4e1ab43575d01381ec33/data"
URL = "https://www.arcgis.com/sharing/rest/content/items/36b718ad00de49afb9ad364f8b815b9e/data"
def convert_to_parquet(data_path: Path, parquet_path: Path) -> None:
df = pl.scan_csv(data_path / "Data/NSPL_MAY_2025_UK.csv", try_parse_dates=True)
# Classification code columns (ruc21ind, oac11ind, imd20ind) look numeric
# in early rows but contain string codes like "UN1" (Unclassified) later
# on. Force them to String to avoid mid-stream dtype inference failures.
# Note: NSPL renames these year suffixes as new releases roll in (e.g.
# Feb 2026 bumped oac from oac21ind → oac11ind, imd from imd19ind →
# imd20ind), so keep this dict in sync with the current CSV headers —
# polars silently ignores overrides for missing columns, masking drift.
df = pl.scan_csv(
data_path / "Data/NSPL_FEB_2026_UK.csv",
try_parse_dates=True,
schema_overrides={
"ruc21ind": pl.String,
"oac11ind": pl.String,
"imd20ind": pl.String,
},
)
print(f"Columns: {df.collect_schema().names()}")
parquet_path.parent.mkdir(parents=True, exist_ok=True)
df.sink_parquet(parquet_path, compression="zstd")

View file

@ -1,14 +1,53 @@
import argparse
import shutil
import sys
import tempfile
import polars as pl
from pathlib import Path
import httpx
from pipeline.utils import download, extract_zip
# Ofcom Connected Nations 2025 - Fixed broadband performance (output area & local authority level)
# Source: https://www.ofcom.org.uk/phones-and-broadband/coverage-and-speeds/connected-nations-20252/data-downloads-2025
PERFORMANCE_URL = "https://www.ofcom.org.uk/siteassets/resources/documents/research-and-data/multi-sector/infrastructure-research/connected-nations-2025/202507_fixed_broadband_coverage_r01.zip?v=407830"
# Pre-staged file path. Ofcom put the entire ofcom.org.uk domain behind
# Cloudflare's Managed Challenge in 2026, which requires a JS-executing
# browser to pass — no amount of User-Agent / TLS-impersonation spoofing
# (curl_cffi chrome120..131, safari17, firefox133, chrome_android) gets
# past it. When the automated download fails, the user must download the
# zip manually from the Source URL above and place it at this path.
MANUAL_ZIP_PATH = Path("manual-data/fixed_broadband_coverage.zip")
def _manual_download_instructions() -> str:
return (
f"\nOfcom has blocked automated downloads via Cloudflare's Managed\n"
f"Challenge. Download the zip manually and re-run:\n\n"
f" 1. Open in a browser:\n"
f" {PERFORMANCE_URL}\n"
f" 2. Save the downloaded zip to:\n"
f" {MANUAL_ZIP_PATH.resolve()}\n"
f" 3. Re-run `make -f Makefile.data property-data/broadband.parquet`\n"
)
def _obtain_zip(dest: Path) -> None:
"""Copy the pre-staged manual zip if present; otherwise attempt download."""
if MANUAL_ZIP_PATH.exists():
print(f"Using pre-staged zip: {MANUAL_ZIP_PATH}")
shutil.copyfile(MANUAL_ZIP_PATH, dest)
return
try:
download(PERFORMANCE_URL, dest)
except httpx.HTTPStatusError as e:
if e.response.status_code == 403:
print(_manual_download_instructions(), file=sys.stderr)
raise
def convert_to_parquet(extract_dir: Path, parquet_path: Path) -> None:
# Find CSV files in the extracted directory
@ -51,7 +90,7 @@ def main() -> None:
extract_dir = cache / "extracted"
extracted_again_dir = cache / "extracted-again"
download(PERFORMANCE_URL, zip_path)
_obtain_zip(zip_path)
extract_zip(zip_path, extract_dir)
extract_zip(
extract_dir

View file

@ -41,9 +41,6 @@ def download_and_convert(output_path: Path) -> None:
winners = df.filter(pl.col("Candidate result position") == 1).select(
pl.col("Constituency geographic code").alias("pcon"),
pl.col("party_group").alias("winning_party"),
(pl.col("Majority") / pl.col("Election valid vote count") * 100)
.round(1)
.alias("majority_pct"),
(pl.col("Election valid vote count") / pl.col("Electorate") * 100)
.round(1)
.alias("turnout_pct"),

View file

@ -5,9 +5,9 @@ from pathlib import Path
from pipeline.utils import download
# Management information - state-funded schools - latest inspections (as at 30 Apr 2025)
# Management information - state-funded schools - latest inspections (as at 28 Feb 2026)
# Source: https://www.gov.uk/government/statistical-data-sets/monthly-management-information-ofsteds-school-inspections-outcomes
URL = "https://assets.publishing.service.gov.uk/media/681cd390275cb67b18d870fc/Management_information_-_state-funded_schools_-_latest_inspections_as_at_30_Apr_2025.csv"
URL = "https://assets.publishing.service.gov.uk/media/69c5269b4a06660f0854427b/Management_information_-_state-funded_schools_-_latest_inspections_as_at_28_Feb_2026.csv"
def convert_to_parquet(csv_path: Path, parquet_path: Path) -> None:

View file

@ -1,125 +1,91 @@
"""Download ONS Price Index of Private Rents (PIPR) monthly price statistics.
Provides mean monthly private rent by local authority and bedroom count.
Replaces the discontinued Private Rental Market Summary Statistics.
Source: https://www.ons.gov.uk/economy/inflationandpriceindices/datasets/priceindexofprivaterentsukmonthlypricestatistics
License: Open Government Licence v3.0
"""
import argparse
import tempfile
from pathlib import Path
import polars as pl
from pathlib import Path
from pipeline.utils import download
URL = "https://www.ons.gov.uk/file?uri=/peoplepopulationandcommunity/housing/datasets/privaterentalmarketsummarystatisticsinengland/october2022toseptember2023/privaterentalmarketstatistics231220.xls"
URL = "https://www.ons.gov.uk/file?uri=/economy/inflationandpriceindices/datasets/priceindexofprivaterentsukmonthlypricestatistics/25march2026/priceindexofprivaterentsukmonthlypricestatistics.xlsx"
# Sheets 12-16 are LA-level breakdowns: Studio, 1 Bed, 2 Bed, 3 Bed, 4+ Bed
# (Sheet 11 is "Room" — shared house rooms, not self-contained, so skip it)
BEDROOM_SHEETS = {
12: 0, # Studio
13: 1, # One Bedroom
14: 2, # Two Bedrooms
15: 3, # Three Bedrooms
16: 4, # Four or more Bedrooms
}
# Local authority district codes in England, https://en.wikipedia.org/wiki/ONS_coding_system
# Local authority district codes in England
LA_PREFIXES = ("E06", "E07", "E08", "E09")
# April 2021 + April 2023 LA reorganizations: old district codes → new unitary authority codes.
# The ONS rental data (Oct 2022 Sep 2023) uses the old codes; IoD 2025 uses the new ones.
# We remap old → new and average the medians so the join in merge.py works.
LA_CONSOLIDATION = {
# North Northamptonshire (April 2021)
"E07000150": "E06000061", # Corby
"E07000152": "E06000061", # East Northamptonshire
"E07000153": "E06000061", # Kettering
"E07000156": "E06000061", # Wellingborough
# West Northamptonshire (April 2021)
"E07000151": "E06000062", # Daventry
"E07000154": "E06000062", # Northampton
"E07000155": "E06000062", # South Northamptonshire
# Cumberland (April 2023)
"E07000026": "E06000063", # Allerdale
"E07000028": "E06000063", # Carlisle
"E07000029": "E06000063", # Copeland
# Westmorland and Furness (April 2023)
"E07000027": "E06000064", # Barrow-in-Furness
"E07000030": "E06000064", # Eden
"E07000031": "E06000064", # South Lakeland
# North Yorkshire (April 2023)
"E07000163": "E06000065", # Craven
"E07000164": "E06000065", # Hambleton
"E07000165": "E06000065", # Harrogate
"E07000166": "E06000065", # Richmondshire
"E07000167": "E06000065", # Ryedale
"E07000168": "E06000065", # Scarborough
"E07000169": "E06000065", # Selby
# Somerset (April 2023)
"E07000187": "E06000066", # Mendip
"E07000188": "E06000066", # Sedgemoor
"E07000189": "E06000066", # South Somerset
"E07000246": "E06000066", # Somerset West and Taunton
}
def convert_to_parquet(xlsx_path: Path, parquet_path: Path) -> None:
print("Reading PIPR Excel file (Table 1)...")
def _read_sheet(xls_path: Path, sheet_id: int, bedrooms: int) -> pl.DataFrame:
"""Read one bedroom category sheet, extract LA-level median rents."""
df = pl.read_excel(xls_path, sheet_id=sheet_id)
# Table 1 layout: row 0 = title, row 1 = column headers, row 2+ = data.
# 40 columns in repeating blocks of 4 (index, monthly change, annual change,
# rental price) for each category. Rental price columns (0-indexed):
# 7 = All categories, 11 = One bed, 15 = Two bed, 19 = Three bed,
# 23 = Four or more bed
df = pl.read_excel(xlsx_path, sheet_name="Table 1", has_header=False)
df = df.slice(2) # Skip title and header rows
# Columns are unnamed; positional:
# 0=LA Code, 1=Area Code, 2=Area Name, 3=Count, 4=Mean, 5=LQ, 6=Median, 7=UQ
# First 4 rows are headers (title, notes, bedroom label, column headers)
df = df.slice(4)
df = df.select(
pl.col("column_1").alias("time_period"),
pl.col("column_2").alias("area_code"),
pl.col("column_12").cast(pl.Float32, strict=False).alias("rent_1bed"),
pl.col("column_16").cast(pl.Float32, strict=False).alias("rent_2bed"),
pl.col("column_20").cast(pl.Float32, strict=False).alias("rent_3bed"),
pl.col("column_24").cast(pl.Float32, strict=False).alias("rent_4plus"),
)
area_code_col = df.columns[1]
median_col = df.columns[6]
return (
df.select(
pl.col(area_code_col).alias("area_code"),
pl.col(median_col).alias("median_monthly_rent"),
)
.filter(
pl.col("area_code").is_not_null()
& pl.any_horizontal(
pl.col("area_code").str.starts_with(p) for p in LA_PREFIXES
)
)
.with_columns(
# Suppressed values are ".." — cast will turn them to null
pl.col("median_monthly_rent").cast(pl.Float32, strict=False),
pl.lit(bedrooms).cast(pl.UInt8).alias("bedrooms"),
# Filter to English local authorities
df = df.filter(
pl.any_horizontal(
pl.col("area_code").str.starts_with(p) for p in LA_PREFIXES
)
)
# Use only the latest month
latest = df["time_period"].max()
print(f"Latest month in data: {latest}")
df = df.filter(pl.col("time_period") == latest)
print(f"LAs in latest month: {df.height}")
def convert_to_parquet(xls_path: Path, parquet_path: Path) -> None:
# Melt to long format: one row per area x bedroom count.
# PIPR has no Studio category — one-bed rent used as proxy for bedrooms=0.
frames = []
for sheet_id, bedrooms in BEDROOM_SHEETS.items():
df = _read_sheet(xls_path, sheet_id, bedrooms)
print(f" Sheet {sheet_id} (bedrooms={bedrooms}): {df.height} rows")
frames.append(df)
for col, bedrooms in [
("rent_1bed", 0), # Studio (proxy)
("rent_1bed", 1),
("rent_2bed", 2),
("rent_3bed", 3),
("rent_4plus", 4),
]:
frames.append(
df.select(
pl.col("area_code"),
pl.col(col).alias("mean_monthly_rent"),
pl.lit(bedrooms).cast(pl.UInt8).alias("bedrooms"),
)
)
combined = pl.concat(frames)
# Remap old LA codes to new unitary authority codes and average medians
combined = (
combined.with_columns(
pl.col("area_code").replace(LA_CONSOLIDATION),
)
.group_by("area_code", "bedrooms")
.agg(
pl.col("median_monthly_rent").mean(),
)
)
print(f"Combined: {combined.shape}")
print(f"Non-null medians: {combined['median_monthly_rent'].drop_nulls().len()}")
print(f"Non-null rents: {combined['mean_monthly_rent'].drop_nulls().len()}")
print(combined.head(10))
parquet_path.parent.mkdir(parents=True, exist_ok=True)
combined.write_parquet(parquet_path, compression="zstd")
print(f"Saved to {parquet_path}")
def main() -> None:
parser = argparse.ArgumentParser(
description="Download and convert ONS private rental market statistics"
description="Download ONS private rent monthly price statistics"
)
parser.add_argument(
"--output", type=Path, required=True, help="Output parquet file path"
@ -127,9 +93,9 @@ def main() -> None:
args = parser.parse_args()
with tempfile.TemporaryDirectory() as cache_dir:
xls_path = Path(cache_dir) / "rental_prices.xls"
download(URL, xls_path, timeout=60)
convert_to_parquet(xls_path, args.output)
xlsx_path = Path(cache_dir) / "pipr_monthly.xlsx"
download(URL, xlsx_path, timeout=120)
convert_to_parquet(xlsx_path, args.output)
if __name__ == "__main__":

View file

@ -65,7 +65,6 @@ _AREA_COLUMNS = [
# Politics
"Winning party",
"Voter turnout (%)",
"Majority (%)",
"% Labour",
"% Conservative",
"% Liberal Democrat",
@ -116,15 +115,19 @@ def _build(
arcgis = (
pl.scan_parquet(arcgis_path)
.filter(pl.col("ctry") == "E92000001") # England only
.filter(pl.col("ctry25cd") == "E92000001") # England only
.filter(pl.col("doterm").is_null()) # Active postcodes only
# NSPL Feb 2026 renamed geographic code columns to {field}{year}cd.
# Alias them back to the short canonical names used across the
# pipeline so downstream joins don't need to know about NSPL's
# versioning scheme.
.select(
pl.col("pcds").alias("postcode"),
"lat",
pl.col("long").alias("lon"),
"lsoa21",
"oa21",
"pcon",
pl.col("lsoa21cd").alias("lsoa21"),
pl.col("oa21cd").alias("oa21"),
pl.col("pcon24cd").alias("pcon"),
)
)
wide = wide.join(arcgis, on="postcode", how="left")
@ -354,13 +357,12 @@ def _build(
"minor_crime_avg_yr": "Minor crime (avg/yr)",
"serious_crime_per_1k": "Serious crime per 1k residents (avg/yr)",
"minor_crime_per_1k": "Minor crime per 1k residents (avg/yr)",
"median_monthly_rent": "Estimated monthly rent",
"mean_monthly_rent": "Estimated monthly rent",
"floor_height": "Interior height (m)",
"was_council_house": "Former council house",
"median_age": "Median age",
"winning_party": "Winning party",
"turnout_pct": "Voter turnout (%)",
"majority_pct": "Majority (%)",
}
)
)

View file

@ -28,10 +28,14 @@ def main():
)
args = parser.parse_args()
# Load Ofsted data: filter to good+ (1, 2) primary/secondary schools
# Load Ofsted data: filter to good+ (1, 2) primary/secondary schools.
# Post-2025 reform the single "Overall effectiveness" grade was retired;
# the legacy 14 scale is now carried forward under "Latest OEIF overall
# effectiveness" (OEIF = the previous Ofsted Education Inspection
# Framework). The new report-card columns use text judgements instead.
ofsted = pl.read_parquet(args.ofsted).filter(
pl.col("Ofsted phase").is_in(["Primary", "Secondary"])
& pl.col("Overall effectiveness").is_in(["1", "2"])
& pl.col("Latest OEIF overall effectiveness").is_in(["1", "2"])
)
print(f"Good+ schools: {len(ofsted):,}")

View file

@ -10,19 +10,19 @@ from scipy.spatial import cKDTree
def build_postcode_mapping(arcgis_path: Path) -> pl.DataFrame:
"""Build a mapping from terminated England postcodes to their nearest active postcode.
Uses OS National Grid coordinates (oseast1m, osnrth1m) which are Cartesian metres,
Uses OS National Grid coordinates (east1m, north1m) which are Cartesian metres,
so Euclidean distance via cKDTree gives accurate results without projection.
"""
arcgis = pl.scan_parquet(arcgis_path).filter(pl.col("ctry") == "E92000001")
arcgis = pl.scan_parquet(arcgis_path).filter(pl.col("ctry25cd") == "E92000001")
active = (
arcgis.filter(pl.col("doterm").is_null())
.select("pcds", "oseast1m", "osnrth1m")
.select("pcds", "east1m", "north1m")
.collect()
)
terminated = (
arcgis.filter(pl.col("doterm").is_not_null())
.select("pcds", "oseast1m", "osnrth1m")
.select("pcds", "east1m", "north1m")
.collect()
)
@ -39,10 +39,10 @@ def build_postcode_mapping(arcgis_path: Path) -> pl.DataFrame:
)
active_coords = np.column_stack(
[active["oseast1m"].to_numpy(), active["osnrth1m"].to_numpy()]
[active["east1m"].to_numpy(), active["north1m"].to_numpy()]
)
terminated_coords = np.column_stack(
[terminated["oseast1m"].to_numpy(), terminated["osnrth1m"].to_numpy()]
[terminated["east1m"].to_numpy(), terminated["north1m"].to_numpy()]
)
tree = cKDTree(active_coords)