diff --git a/pipeline/transform/join_epc_pp.py b/pipeline/transform/join_epc_pp.py index 0859ebc..f0aba76 100644 --- a/pipeline/transform/join_epc_pp.py +++ b/pipeline/transform/join_epc_pp.py @@ -37,6 +37,7 @@ def main(): "NUMBER_HABITABLE_ROOMS", "FLOOR_HEIGHT", "CONSTRUCTION_AGE_BAND", + "TENURE", ) .filter(pl.col("epc_address").is_not_null()) .with_columns( @@ -52,6 +53,7 @@ def main(): epc_base.sort("INSPECTION_DATE", descending=True) .group_by("epc_address", "POSTCODE") .first() + .drop("TENURE") ) # Events fork: detect renovation events between consecutive certificates @@ -124,11 +126,29 @@ def main(): print(f"Renovation events: {events.height} properties with events") print(event_counts) - # Left-join events back onto dedup EPC + # Social tenure fork: flag properties that were ever social housing + social_tenure = ( + epc_base.filter( + pl.col("TENURE").str.to_lowercase().str.contains("social") + ) + .select("epc_address", "POSTCODE") + .unique() + .with_columns(pl.lit("Yes").alias("was_council_house")) + .collect() + ) + print(f"Former council houses (EPC social tenure): {social_tenure.height}") + + # Left-join events and social tenure back onto dedup EPC epc = epc.join( events.lazy(), on=["epc_address", "POSTCODE"], how="left", + ).join( + social_tenure.lazy(), + on=["epc_address", "POSTCODE"], + how="left", + ).with_columns( + pl.col("was_council_house").fill_null("No"), ) print("EPC dataset") diff --git a/pipeline/transform/merge.py b/pipeline/transform/merge.py index a8d4f4b..f22fb9b 100644 --- a/pipeline/transform/merge.py +++ b/pipeline/transform/merge.py @@ -52,6 +52,7 @@ _AREA_COLUMNS = [ "Number of parks within 2km", "Train or tube stations within 1km", "Distance to nearest train or tube station (km)", + "Distance to nearest park (km)", # Environment "Noise (dB)", "Max available download speed (Mbps)", @@ -73,9 +74,9 @@ def _build( noise_path: Path, school_proximity_path: Path, broadband_path: Path, - geosure_path: Path, rental_prices_path: Path, lsoa_population_path: Path, + median_age_path: Path, ) -> tuple[pl.DataFrame, pl.DataFrame]: """Build postcode and properties dataframes from epc_pp + auxiliary data. @@ -188,6 +189,9 @@ def _build( .alias("minor_crime_per_1k"), ).drop("population") + median_age = pl.scan_parquet(median_age_path) + wide = wide.join(median_age, on="lsoa21", how="left") + poi_counts = pl.scan_parquet(poi_proximity_path) wide = wide.join(poi_counts, on="postcode", how="left") @@ -233,9 +237,6 @@ def _build( ) wide = wide.join(broadband, left_on="postcode", right_on="bb_postcode", how="left") - geosure = pl.scan_parquet(geosure_path) - wide = wide.join(geosure, on="postcode", how="left") - # Derive property_type: prefer EPC data, fall back to price-paid. # For Houses, use built_form (e.g. Semi-Detached, Mid-Terrace) for finer detail. bad_built_form = pl.col("built_form").is_null() | pl.col("built_form").is_in( @@ -324,6 +325,7 @@ def _build( "parks_2km": "Number of parks within 2km", "train_tube_1km": "Train or tube stations within 1km", "train_tube_nearest_km": "Distance to nearest train or tube station (km)", + "parks_nearest_km": "Distance to nearest park (km)", "latest_price": "Last known price", "number_habitable_rooms": "Number of bedrooms & living rooms", "noise_lden_db": "Noise (dB)", @@ -334,15 +336,10 @@ def _build( "minor_crime_avg_yr": "Minor crime (avg/yr)", "serious_crime_per_1k": "Serious crime per 1k residents (avg/yr)", "minor_crime_per_1k": "Minor crime per 1k residents (avg/yr)", - "environmental_risk": "Environmental risk", - "collapsible_deposits_risk": "Collapsible deposits risk", - "compressible_ground_risk": "Compressible ground risk", - "landslide_risk": "Landslide risk", - "running_sand_risk": "Running sand risk", - "shrink_swell_risk": "Shrink-swell risk", - "soluble_rocks_risk": "Soluble rocks risk", "median_monthly_rent": "Estimated monthly rent", "floor_height": "Interior height (m)", + "was_council_house": "Former council house", + "median_age": "Median age", } ) ) @@ -410,12 +407,6 @@ def main(): required=True, help="Broadband performance by output area parquet file", ) - parser.add_argument( - "--geosure", - type=Path, - required=True, - help="GeoSure ground stability parquet file", - ) parser.add_argument( "--rental-prices", type=Path, @@ -428,6 +419,12 @@ def main(): required=True, help="Census 2021 population by LSOA parquet file", ) + parser.add_argument( + "--median-age", + type=Path, + required=True, + help="Census 2021 median age by LSOA parquet file", + ) parser.add_argument( "--output-postcodes", type=Path, @@ -452,9 +449,9 @@ def main(): noise_path=args.noise, school_proximity_path=args.school_proximity, broadband_path=args.broadband, - geosure_path=args.geosure, rental_prices_path=args.rental_prices, lsoa_population_path=args.lsoa_population, + median_age_path=args.median_age, ) print(f"\nPostcode columns: {postcode_df.columns}")