diff --git a/Taskfile.data.yml b/Taskfile.data.yml index 4e6bd4c..e60e8f5 100644 --- a/Taskfile.data.yml +++ b/Taskfile.data.yml @@ -11,7 +11,8 @@ vars: EPC_PP_OUTPUT: "{{.DATA_DIR}}/epc_pp.parquet" WIDE_OUTPUT: "{{.DATA_DIR}}/wide.parquet" EPC: "{{.DATA_DIR}}/certificates.csv" - JOURNEY_TIMES: "{{.DATA_DIR}}/journey_times.parquet" + JOURNEY_TIMES_BANK: "{{.DATA_DIR}}/journey_times_bank.parquet" + JOURNEY_TIMES_FITZROVIA: "{{.DATA_DIR}}/journey_times_fitzrovia.parquet" ETHNICITY_OUTPUT: "{{.DATA_DIR}}/ethnicity_by_la.parquet" CRIME_DIR: "{{.DATA_DIR}}/crime" CRIME_OUTPUT: "{{.DATA_DIR}}/crime_by_lsoa.parquet" @@ -20,8 +21,6 @@ vars: NAPTAN_OUTPUT: "{{.DATA_DIR}}/naptan.parquet" BROADBAND_OUTPUT: "{{.DATA_DIR}}/broadband.parquet" SCHOOL_PROXIMITY_OUTPUT: "{{.DATA_DIR}}/school_proximity.parquet" - COUNCIL_TAX_OUTPUT: "{{.DATA_DIR}}/council_tax.parquet" - COUNCIL_TAX_BANDS_OUTPUT: "{{.DATA_DIR}}/council_tax_bands.parquet" tasks: prompt:epc: @@ -41,7 +40,7 @@ tasks: prompt:journey-times: desc: Download TFL journey times if missing (requires API key registration) status: - - test -f {{.JOURNEY_TIMES}} + - test -f {{.JOURNEY_TIMES_BANK}} || test -f {{.JOURNEY_TIMES_FITZROVIA}} deps: - download:arcgis cmds: @@ -49,7 +48,9 @@ tasks: echo "" echo "=== TFL journey times not found ===" echo "Register for a TFL API key at https://api-portal.tfl.gov.uk/signin" - echo "Then set the TFL_API_KEY environment variable and re-run this task." + echo "Then set the TFL_API_KEY environment variable and run:" + echo " task download:journey-times -- bank" + echo " task download:journey-times -- fitzrovia" echo "" exit 1 @@ -109,19 +110,6 @@ tasks: cmds: - uv run python -m pipeline.download.broadband --output {{.BROADBAND_OUTPUT}} - download:council-tax: - desc: Download council tax rates by local authority (GOV.UK Table 9) - status: - - test -f {{.COUNCIL_TAX_OUTPUT}} - cmds: - - uv run python -m pipeline.download.council_tax --output {{.COUNCIL_TAX_OUTPUT}} - - download:council-tax-bands: - desc: Scrape individual property council tax bands from VOA - status: - - test -f {{.COUNCIL_TAX_BANDS_OUTPUT}} - cmds: - - uv run python -m pipeline.download.council_tax_bands --postcodes {{.WIDE_OUTPUT}} --output {{.COUNCIL_TAX_BANDS_OUTPUT}} download:noise: desc: Download Defra noise data (road, rail, airport) sampled at postcode centroids @@ -199,7 +187,6 @@ tasks: - download:ethnicity - download:broadband - download:noise - - download:council-tax - transform:crime - transform:poi-proximity - transform:school-proximity @@ -213,12 +200,11 @@ tasks: --arcgis {{.ARCGIS_OUTPUT}} --iod {{.IOD_OUTPUT}} --poi-proximity {{.POI_PROXIMITY_OUTPUT}} - --journey-times {{.JOURNEY_TIMES}} + --journey-times-bank {{.JOURNEY_TIMES_BANK}} + --journey-times-fitzrovia {{.JOURNEY_TIMES_FITZROVIA}} --ethnicity {{.ETHNICITY_OUTPUT}} --crime {{.CRIME_OUTPUT}} --noise {{.NOISE_OUTPUT}} --school-proximity {{.SCHOOL_PROXIMITY_OUTPUT}} --broadband {{.BROADBAND_OUTPUT}} - --council-tax {{.COUNCIL_TAX_OUTPUT}} - --council-tax-bands {{.COUNCIL_TAX_BANDS_OUTPUT}} --output {{.WIDE_OUTPUT}} diff --git a/pipeline/transform/merge.py b/pipeline/transform/merge.py index 69a93e2..023be43 100644 --- a/pipeline/transform/merge.py +++ b/pipeline/transform/merge.py @@ -1,4 +1,5 @@ import argparse + import polars as pl from pathlib import Path @@ -6,12 +7,35 @@ MIN_PRICE = 10_000 MIN_FLOOR_AREA_M2 = 10 +def _join_journey_times( + wide: pl.LazyFrame, + journey_times_path: Path, + destination_name: str, +) -> pl.LazyFrame: + """Join journey times for a single destination, renaming columns appropriately.""" + journey_times = ( + pl.scan_parquet(journey_times_path) + .select( + "postcode", + pl.col("public_transport_quick_minutes").alias( + f"Public transport to {destination_name} (mins)" + ), + pl.col("cycling_minutes").alias(f"Cycling to {destination_name} (mins)"), + ) + .sort(f"Public transport to {destination_name} (mins)", nulls_last=True) + .group_by("postcode") + .first() + ) + return wide.join(journey_times, on="postcode", how="left") + + def _build_wide( epc_pp_path: Path, arcgis_path: Path, iod_path: Path, poi_proximity_path: Path, - journey_times_path: Path, + journey_times_bank_path: Path, + journey_times_fitzrovia_path: Path, ethnicity_path: Path, crime_path: Path, noise_path: Path, @@ -30,19 +54,8 @@ def _build_wide( ) wide = wide.join(arcgis, on="postcode", how="inner") - journey_times = ( - pl.scan_parquet(journey_times_path) - .select( - "postcode", - "public_transport_easy_minutes", - "public_transport_quick_minutes", - "cycling_minutes", - ) - .sort("public_transport_quick_minutes", nulls_last=True) - .group_by("postcode") - .first() - ) - wide = wide.join(journey_times, on="postcode", how="left") + wide = _join_journey_times(wide, journey_times_bank_path, "Bank") + wide = _join_journey_times(wide, journey_times_fitzrovia_path, "Fitzrovia") iod = pl.scan_parquet(iod_path) wide = wide.join(iod, left_on="lsoa21", right_on="LSOA code (2021)", how="left") @@ -227,10 +240,16 @@ def main(): help="POI proximity counts parquet file (optional)", ) parser.add_argument( - "--journey-times", - required=True, + "--journey-times-bank", type=Path, - help="Journey times parquet file (optional)", + default=None, + help="Journey times to Bank parquet file", + ) + parser.add_argument( + "--journey-times-fitzrovia", + type=Path, + default=None, + help="Journey times to Fitzrovia parquet file", ) parser.add_argument( "--ethnicity", @@ -269,7 +288,8 @@ def main(): arcgis_path=args.arcgis, iod_path=args.iod, poi_proximity_path=args.poi_proximity, - journey_times_path=args.journey_times, + journey_times_bank_path=args.journey_times_bank, + journey_times_fitzrovia_path=args.journey_times_fitzrovia, ethnicity_path=args.ethnicity, crime_path=args.crime, noise_path=args.noise,