diff --git a/pipeline/epc_pp.py b/pipeline/epc_pp.py new file mode 100644 index 0000000..1c50ec2 --- /dev/null +++ b/pipeline/epc_pp.py @@ -0,0 +1,87 @@ +import polars as pl +from .fuzzy_join import fuzzy_join_on_postcode + + +pl.Config.set_tbl_cols(-1) + + + +epc = pl.scan_csv('data_sources/epc/certificates.csv').select( + pl.col('ADDRESS').alias('epc_address'), + 'POSTCODE', + 'CURRENT_ENERGY_RATING', + 'POTENTIAL_ENERGY_RATING', + pl.col('PROPERTY_TYPE').alias('epc_property_type'), + 'BUILT_FORM', + 'INSPECTION_DATE', + 'TOTAL_FLOOR_AREA', + 'NUMBER_HABITABLE_ROOMS', + 'FLOOR_HEIGHT', + 'CONSTRUCTION_AGE_BAND' +).sort('INSPECTION_DATE', descending=True).group_by('epc_address').first() + + +print("EPC dataset") +print(epc.head().collect()) + +# https://www.gov.uk/guidance/about-the-price-paid-data +property_type_map = {"D": "Detached", "S": "Semi-Detached", "T": "Terraced", "F": "Flats/Maisonettes", "O": "Other"} +duration_map = {"F": "Freehold", "L": "Leasehold"} + +price_paid = (pl.scan_parquet('data_sources/pp-complete.parquet').select( + "price", + "date_of_transfer", + pl.col('property_type').alias("pp_property_type").replace(property_type_map), + "postcode", + 'paon', + 'saon', + 'street', + 'locality', + 'town_city', + pl.col('duration').replace(duration_map) +).filter(pl.col('pp_property_type') != 'Other').with_columns( + pl.concat_str( + [pl.col('saon'), pl.col('paon'), pl.col('street')], + separator=' ', + ignore_nulls=True, + ).alias('pp_address'), + ) + .sort('date_of_transfer') + .group_by('pp_address', 'postcode', maintain_order=True) + .agg( + pl.struct( + pl.col('date_of_transfer').dt.year().alias('year'), + 'price', + ).alias('historical_prices'), + pl.col('pp_property_type').last(), + pl.col('duration').last(), + pl.col('price').last().alias('latest_price'), + pl.col('date_of_transfer').last(), + ) +) + +print("Price paid dataset") +print(price_paid.head().collect()) + +price_paid_df = price_paid.collect() +epc_df = epc.collect() + +joined = fuzzy_join_on_postcode( + left=price_paid_df, + right=epc_df, + left_address_col='pp_address', + right_address_col='epc_address', + left_postcode_col='postcode', + right_postcode_col='POSTCODE', + score_threshold=80, +).drop('POSTCODE') + +matched_count = joined.filter(pl.col('epc_address').is_not_null() & pl.col('pp_address').is_not_null()).height +print(f"Unique properties: {price_paid_df.height}") +print(f"Matched: {matched_count} ({100 * matched_count / price_paid_df.height:.1f}%)") +print(f"Unmatched: {price_paid_df.height - matched_count}") + +joined = joined.rename({col: col.lower() for col in joined.columns}) + +print(joined.head()) +joined.write_parquet('data_sources/processed/epc_pp.parquet') diff --git a/pipeline/fuzzy_join.py b/pipeline/fuzzy_join.py new file mode 100644 index 0000000..ed85286 --- /dev/null +++ b/pipeline/fuzzy_join.py @@ -0,0 +1,139 @@ +import re +from concurrent.futures import ProcessPoolExecutor +from os import cpu_count + +import polars as pl +from thefuzz import fuzz +from tqdm import tqdm + +_NUMBER_RE = re.compile(r'\d+') + + +def fuzzy_join_on_postcode( + left: pl.DataFrame, + right: pl.DataFrame, + left_address_col: str, + right_address_col: str, + left_postcode_col: str, + right_postcode_col: str, + score_threshold: int = 80, +) -> pl.DataFrame: + """Fuzzy join two DataFrames by matching addresses within postcode buckets. + + Returns the left DataFrame with all right columns appended. + Unmatched rows have null right columns. + """ + + def _normalize(s: pl.Expr) -> pl.Expr: + return ( + s.str.to_uppercase() + .str.replace_all(r'[,.\-]', ' ') + .str.replace_all(r'\s+', ' ') + .str.strip_chars() + ) + + left = left.with_columns( + _normalize(pl.col(left_address_col)).alias('_left_address'), + pl.col(left_postcode_col).str.strip_chars().str.to_uppercase().alias('_left_postcode'), + ) + right = right.with_columns( + _normalize(pl.col(right_address_col)).alias('_right_address'), + pl.col(right_postcode_col).str.strip_chars().str.to_uppercase().alias('_right_postcode'), + ) + + # Deduplicate right side on normalized address + postcode so that + # variant spellings of the same address don't consume multiple slots. + right = right.unique(subset=['_right_address', '_right_postcode'], keep='first') + + # Group right side by postcode for fast lookup + right_by_postcode: dict[str, list[tuple[int, str]]] = {} + for i, (postcode, address) in enumerate( + zip(right['_right_postcode'], right['_right_address']) + ): + if postcode is not None: + right_by_postcode.setdefault(postcode, []).append((i, address)) + + # Group left side by postcode + left_by_postcode: dict[str, list[tuple[int, str]]] = {} + for left_row, (postcode, address) in enumerate( + zip(left['_left_postcode'], left['_left_address']) + ): + if address is not None and postcode is not None: + left_by_postcode.setdefault(postcode, []).append((left_row, address)) + + # Build tasks for each postcode bucket + tasks = [ + (left_entries, right_by_postcode[postcode], score_threshold) + for postcode, left_entries in left_by_postcode.items() + if postcode in right_by_postcode + ] + + # Score all pairwise matches in parallel, then greedily assign from + # highest score downward so best pairs lock in first. + all_pairs: list[tuple[int, int, int]] = [] # (score, left_row, right_row) + with ProcessPoolExecutor(max_workers=cpu_count()) as executor: + for pairs in tqdm( + executor.map(_score_bucket, tasks, chunksize=64), + total=len(tasks), + desc='Fuzzy matching', + ): + all_pairs.extend(pairs) + + # Sort descending by score so best matches are assigned first + all_pairs.sort(key=lambda t: (t[0], -t[1]), reverse=True) + + match_indices: list[int | None] = [None] * len(left) + matched_left: set[int] = set() + matched_right: set[int] = set() + + for score, left_row, right_row in all_pairs: + if left_row in matched_left or right_row in matched_right: + continue + match_indices[left_row] = right_row + matched_left.add(left_row) + matched_right.add(right_row) + + # Select right columns (excluding internal helpers) + right_cols = right.select(pl.exclude('_right_address', '_right_postcode')) + right_matched = right_cols[ + [i if i is not None else 0 for i in match_indices] + ] + + # Null out unmatched rows + mask = pl.Series('_matched', [i is not None for i in match_indices]) + right_matched = right_matched.with_columns( + pl.when(mask).then(pl.col(c)).otherwise(pl.lit(None)).alias(c) + for c in right_matched.columns + ) + + left_clean = left.select(pl.exclude('_left_address', '_left_postcode')) + return pl.concat([left_clean, right_matched], how='horizontal') + + +def _numbers_compatible(a: str, b: str) -> bool: + """Check that numeric tokens (flat/house numbers) in the shorter set are a subset of the longer. + + Returns False if one address has numbers and the other doesn't. + """ + nums_a = set(_NUMBER_RE.findall(a)) + nums_b = set(_NUMBER_RE.findall(b)) + smaller, larger = (nums_a, nums_b) if len(nums_a) <= len(nums_b) else (nums_b, nums_a) + if not smaller and larger: + return False + return smaller.issubset(larger) + + +def _score_bucket( + args: tuple[list[tuple[int, str]], list[tuple[int, str]], int], +) -> list[tuple[int, int, int]]: + """Score all address pairs within a single postcode bucket.""" + left_entries, right_entries, score_threshold = args + pairs = [] + for left_row, left_address in left_entries: + for right_row, right_address in right_entries: + if not _numbers_compatible(left_address, right_address): + continue + score = fuzz.token_sort_ratio(left_address, right_address) + if score >= score_threshold: + pairs.append((score, left_row, right_row)) + return pairs diff --git a/pipeline/test_fuzzy_join.py b/pipeline/test_fuzzy_join.py new file mode 100644 index 0000000..7197009 --- /dev/null +++ b/pipeline/test_fuzzy_join.py @@ -0,0 +1,52 @@ +import polars as pl + +from fuzzy_join import fuzzy_join_on_postcode + +POSTCODE = "E14 2DG" + +# Price paid: unique addresses for this postcode +pp = ( + pl.scan_parquet("data_sources/pp-complete.parquet") + .filter(pl.col("postcode") == POSTCODE) + .select("paon", "saon", "street", "postcode") + .collect() + .unique() + .sort("saon") + .with_columns( + pl.concat_str( + [pl.col("saon"), pl.col("paon"), pl.col("street")], + separator=" ", + ignore_nulls=True, + ).alias("pp_address"), + ) +) + +# EPC: latest inspection per address for this postcode +epc = ( + pl.scan_csv("data_sources/epc/certificates.csv") + .select("ADDRESS", "POSTCODE", "INSPECTION_DATE") + .filter(pl.col("POSTCODE").str.strip_chars() == POSTCODE) + .sort("INSPECTION_DATE", descending=True) + .collect() + .unique("ADDRESS") + .sort("ADDRESS") +) + +print(f"Price paid: {len(pp)} unique addresses") +print(f"EPC: {len(epc)} unique addresses") + +result = fuzzy_join_on_postcode( + left=pp, + right=epc, + left_address_col="pp_address", + right_address_col="ADDRESS", + left_postcode_col="postcode", + right_postcode_col="POSTCODE", + score_threshold=80, + +) + +snapshot = result.select("pp_address", "ADDRESS").sort("pp_address") + +with pl.Config(tbl_rows=-1, tbl_cols=-1, fmt_str_lengths=80): + print(snapshot)