perfect-postcode/pipeline/transform/join_epc_pp.py

98 lines
3.3 KiB
Python

import argparse
import polars as pl
from pathlib import Path
from ..utils import fuzzy_join_on_postcode
pl.Config.set_tbl_cols(-1)
def main():
parser = argparse.ArgumentParser(description="Fuzzy join EPC and Price Paid data")
parser.add_argument("--epc", type=Path, required=True, help="EPC certificates CSV file")
parser.add_argument("--price-paid", type=Path, required=True, help="Price paid parquet file")
parser.add_argument("--output", type=Path, required=True, help="Output parquet file path")
args = parser.parse_args()
epc = pl.scan_csv(args.epc).select(
pl.col('ADDRESS').alias('epc_address'),
'POSTCODE',
'CURRENT_ENERGY_RATING',
'POTENTIAL_ENERGY_RATING',
pl.col('PROPERTY_TYPE').alias('epc_property_type'),
'BUILT_FORM',
'INSPECTION_DATE',
'TOTAL_FLOOR_AREA',
'NUMBER_HABITABLE_ROOMS',
'FLOOR_HEIGHT',
'CONSTRUCTION_AGE_BAND'
).filter(pl.col('epc_address').is_not_null()).sort('INSPECTION_DATE', descending=True).group_by('epc_address', 'POSTCODE').first()
print("EPC dataset")
print(epc.head().collect())
# https://www.gov.uk/guidance/about-the-price-paid-data
property_type_map = {"D": "Detached", "S": "Semi-Detached", "T": "Terraced", "F": "Flats/Maisonettes", "O": "Other"}
duration_map = {"F": "Freehold", "L": "Leasehold"}
price_paid = (pl.scan_parquet(args.price_paid).select(
"price",
"date_of_transfer",
pl.col('property_type').alias("pp_property_type").replace(property_type_map),
"postcode",
'paon',
'saon',
'street',
'locality',
'town_city',
pl.col('duration').replace(duration_map)
)
.filter(pl.col('pp_property_type') != 'Other').with_columns(
pl.concat_str(
[pl.col('saon'), pl.col('paon'), pl.col('street')],
separator=' ',
ignore_nulls=True,
).alias('pp_address'),
)
.sort('date_of_transfer')
.group_by('pp_address', 'postcode', maintain_order=True)
.agg(
pl.struct(
pl.col('date_of_transfer').dt.year().alias('year'),
'price',
).alias('historical_prices'),
pl.col('pp_property_type').last(),
pl.col('duration').last(),
pl.col('price').last().alias('latest_price'),
pl.col('date_of_transfer').last(),
)
).filter(pl.col('pp_address').is_not_null())
print("Price paid dataset")
print(price_paid.head().collect())
joined = fuzzy_join_on_postcode(
left=price_paid,
right=epc,
left_address_col='pp_address',
right_address_col='epc_address',
left_postcode_col='postcode',
right_postcode_col='POSTCODE',
).drop('POSTCODE').collect()
matched = joined.filter(pl.col('epc_address').is_not_null() & pl.col('pp_address').is_not_null())
total = joined.height
print(f"Unique properties: {total}")
print(f"Matched: {matched.height} ({100 * matched.height / total:.1f}%)")
print(f"Unmatched: {total - matched.height}")
matched = matched.rename({col: col.lower() for col in joined.columns})
print(matched.head())
matched.write_parquet(args.output)
print(f"Wrote {args.output}")
if __name__ == "__main__":
main()