Join epc & pp
This commit is contained in:
parent
2131da96aa
commit
68b6dcf65e
3 changed files with 278 additions and 0 deletions
87
pipeline/epc_pp.py
Normal file
87
pipeline/epc_pp.py
Normal file
|
|
@ -0,0 +1,87 @@
|
|||
import polars as pl
|
||||
from .fuzzy_join import fuzzy_join_on_postcode
|
||||
|
||||
|
||||
pl.Config.set_tbl_cols(-1)
|
||||
|
||||
|
||||
|
||||
epc = pl.scan_csv('data_sources/epc/certificates.csv').select(
|
||||
pl.col('ADDRESS').alias('epc_address'),
|
||||
'POSTCODE',
|
||||
'CURRENT_ENERGY_RATING',
|
||||
'POTENTIAL_ENERGY_RATING',
|
||||
pl.col('PROPERTY_TYPE').alias('epc_property_type'),
|
||||
'BUILT_FORM',
|
||||
'INSPECTION_DATE',
|
||||
'TOTAL_FLOOR_AREA',
|
||||
'NUMBER_HABITABLE_ROOMS',
|
||||
'FLOOR_HEIGHT',
|
||||
'CONSTRUCTION_AGE_BAND'
|
||||
).sort('INSPECTION_DATE', descending=True).group_by('epc_address').first()
|
||||
|
||||
|
||||
print("EPC dataset")
|
||||
print(epc.head().collect())
|
||||
|
||||
# https://www.gov.uk/guidance/about-the-price-paid-data
|
||||
property_type_map = {"D": "Detached", "S": "Semi-Detached", "T": "Terraced", "F": "Flats/Maisonettes", "O": "Other"}
|
||||
duration_map = {"F": "Freehold", "L": "Leasehold"}
|
||||
|
||||
price_paid = (pl.scan_parquet('data_sources/pp-complete.parquet').select(
|
||||
"price",
|
||||
"date_of_transfer",
|
||||
pl.col('property_type').alias("pp_property_type").replace(property_type_map),
|
||||
"postcode",
|
||||
'paon',
|
||||
'saon',
|
||||
'street',
|
||||
'locality',
|
||||
'town_city',
|
||||
pl.col('duration').replace(duration_map)
|
||||
).filter(pl.col('pp_property_type') != 'Other').with_columns(
|
||||
pl.concat_str(
|
||||
[pl.col('saon'), pl.col('paon'), pl.col('street')],
|
||||
separator=' ',
|
||||
ignore_nulls=True,
|
||||
).alias('pp_address'),
|
||||
)
|
||||
.sort('date_of_transfer')
|
||||
.group_by('pp_address', 'postcode', maintain_order=True)
|
||||
.agg(
|
||||
pl.struct(
|
||||
pl.col('date_of_transfer').dt.year().alias('year'),
|
||||
'price',
|
||||
).alias('historical_prices'),
|
||||
pl.col('pp_property_type').last(),
|
||||
pl.col('duration').last(),
|
||||
pl.col('price').last().alias('latest_price'),
|
||||
pl.col('date_of_transfer').last(),
|
||||
)
|
||||
)
|
||||
|
||||
print("Price paid dataset")
|
||||
print(price_paid.head().collect())
|
||||
|
||||
price_paid_df = price_paid.collect()
|
||||
epc_df = epc.collect()
|
||||
|
||||
joined = fuzzy_join_on_postcode(
|
||||
left=price_paid_df,
|
||||
right=epc_df,
|
||||
left_address_col='pp_address',
|
||||
right_address_col='epc_address',
|
||||
left_postcode_col='postcode',
|
||||
right_postcode_col='POSTCODE',
|
||||
score_threshold=80,
|
||||
).drop('POSTCODE')
|
||||
|
||||
matched_count = joined.filter(pl.col('epc_address').is_not_null() & pl.col('pp_address').is_not_null()).height
|
||||
print(f"Unique properties: {price_paid_df.height}")
|
||||
print(f"Matched: {matched_count} ({100 * matched_count / price_paid_df.height:.1f}%)")
|
||||
print(f"Unmatched: {price_paid_df.height - matched_count}")
|
||||
|
||||
joined = joined.rename({col: col.lower() for col in joined.columns})
|
||||
|
||||
print(joined.head())
|
||||
joined.write_parquet('data_sources/processed/epc_pp.parquet')
|
||||
139
pipeline/fuzzy_join.py
Normal file
139
pipeline/fuzzy_join.py
Normal file
|
|
@ -0,0 +1,139 @@
|
|||
import re
|
||||
from concurrent.futures import ProcessPoolExecutor
|
||||
from os import cpu_count
|
||||
|
||||
import polars as pl
|
||||
from thefuzz import fuzz
|
||||
from tqdm import tqdm
|
||||
|
||||
_NUMBER_RE = re.compile(r'\d+')
|
||||
|
||||
|
||||
def fuzzy_join_on_postcode(
|
||||
left: pl.DataFrame,
|
||||
right: pl.DataFrame,
|
||||
left_address_col: str,
|
||||
right_address_col: str,
|
||||
left_postcode_col: str,
|
||||
right_postcode_col: str,
|
||||
score_threshold: int = 80,
|
||||
) -> pl.DataFrame:
|
||||
"""Fuzzy join two DataFrames by matching addresses within postcode buckets.
|
||||
|
||||
Returns the left DataFrame with all right columns appended.
|
||||
Unmatched rows have null right columns.
|
||||
"""
|
||||
|
||||
def _normalize(s: pl.Expr) -> pl.Expr:
|
||||
return (
|
||||
s.str.to_uppercase()
|
||||
.str.replace_all(r'[,.\-]', ' ')
|
||||
.str.replace_all(r'\s+', ' ')
|
||||
.str.strip_chars()
|
||||
)
|
||||
|
||||
left = left.with_columns(
|
||||
_normalize(pl.col(left_address_col)).alias('_left_address'),
|
||||
pl.col(left_postcode_col).str.strip_chars().str.to_uppercase().alias('_left_postcode'),
|
||||
)
|
||||
right = right.with_columns(
|
||||
_normalize(pl.col(right_address_col)).alias('_right_address'),
|
||||
pl.col(right_postcode_col).str.strip_chars().str.to_uppercase().alias('_right_postcode'),
|
||||
)
|
||||
|
||||
# Deduplicate right side on normalized address + postcode so that
|
||||
# variant spellings of the same address don't consume multiple slots.
|
||||
right = right.unique(subset=['_right_address', '_right_postcode'], keep='first')
|
||||
|
||||
# Group right side by postcode for fast lookup
|
||||
right_by_postcode: dict[str, list[tuple[int, str]]] = {}
|
||||
for i, (postcode, address) in enumerate(
|
||||
zip(right['_right_postcode'], right['_right_address'])
|
||||
):
|
||||
if postcode is not None:
|
||||
right_by_postcode.setdefault(postcode, []).append((i, address))
|
||||
|
||||
# Group left side by postcode
|
||||
left_by_postcode: dict[str, list[tuple[int, str]]] = {}
|
||||
for left_row, (postcode, address) in enumerate(
|
||||
zip(left['_left_postcode'], left['_left_address'])
|
||||
):
|
||||
if address is not None and postcode is not None:
|
||||
left_by_postcode.setdefault(postcode, []).append((left_row, address))
|
||||
|
||||
# Build tasks for each postcode bucket
|
||||
tasks = [
|
||||
(left_entries, right_by_postcode[postcode], score_threshold)
|
||||
for postcode, left_entries in left_by_postcode.items()
|
||||
if postcode in right_by_postcode
|
||||
]
|
||||
|
||||
# Score all pairwise matches in parallel, then greedily assign from
|
||||
# highest score downward so best pairs lock in first.
|
||||
all_pairs: list[tuple[int, int, int]] = [] # (score, left_row, right_row)
|
||||
with ProcessPoolExecutor(max_workers=cpu_count()) as executor:
|
||||
for pairs in tqdm(
|
||||
executor.map(_score_bucket, tasks, chunksize=64),
|
||||
total=len(tasks),
|
||||
desc='Fuzzy matching',
|
||||
):
|
||||
all_pairs.extend(pairs)
|
||||
|
||||
# Sort descending by score so best matches are assigned first
|
||||
all_pairs.sort(key=lambda t: (t[0], -t[1]), reverse=True)
|
||||
|
||||
match_indices: list[int | None] = [None] * len(left)
|
||||
matched_left: set[int] = set()
|
||||
matched_right: set[int] = set()
|
||||
|
||||
for score, left_row, right_row in all_pairs:
|
||||
if left_row in matched_left or right_row in matched_right:
|
||||
continue
|
||||
match_indices[left_row] = right_row
|
||||
matched_left.add(left_row)
|
||||
matched_right.add(right_row)
|
||||
|
||||
# Select right columns (excluding internal helpers)
|
||||
right_cols = right.select(pl.exclude('_right_address', '_right_postcode'))
|
||||
right_matched = right_cols[
|
||||
[i if i is not None else 0 for i in match_indices]
|
||||
]
|
||||
|
||||
# Null out unmatched rows
|
||||
mask = pl.Series('_matched', [i is not None for i in match_indices])
|
||||
right_matched = right_matched.with_columns(
|
||||
pl.when(mask).then(pl.col(c)).otherwise(pl.lit(None)).alias(c)
|
||||
for c in right_matched.columns
|
||||
)
|
||||
|
||||
left_clean = left.select(pl.exclude('_left_address', '_left_postcode'))
|
||||
return pl.concat([left_clean, right_matched], how='horizontal')
|
||||
|
||||
|
||||
def _numbers_compatible(a: str, b: str) -> bool:
|
||||
"""Check that numeric tokens (flat/house numbers) in the shorter set are a subset of the longer.
|
||||
|
||||
Returns False if one address has numbers and the other doesn't.
|
||||
"""
|
||||
nums_a = set(_NUMBER_RE.findall(a))
|
||||
nums_b = set(_NUMBER_RE.findall(b))
|
||||
smaller, larger = (nums_a, nums_b) if len(nums_a) <= len(nums_b) else (nums_b, nums_a)
|
||||
if not smaller and larger:
|
||||
return False
|
||||
return smaller.issubset(larger)
|
||||
|
||||
|
||||
def _score_bucket(
|
||||
args: tuple[list[tuple[int, str]], list[tuple[int, str]], int],
|
||||
) -> list[tuple[int, int, int]]:
|
||||
"""Score all address pairs within a single postcode bucket."""
|
||||
left_entries, right_entries, score_threshold = args
|
||||
pairs = []
|
||||
for left_row, left_address in left_entries:
|
||||
for right_row, right_address in right_entries:
|
||||
if not _numbers_compatible(left_address, right_address):
|
||||
continue
|
||||
score = fuzz.token_sort_ratio(left_address, right_address)
|
||||
if score >= score_threshold:
|
||||
pairs.append((score, left_row, right_row))
|
||||
return pairs
|
||||
52
pipeline/test_fuzzy_join.py
Normal file
52
pipeline/test_fuzzy_join.py
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
import polars as pl
|
||||
|
||||
from fuzzy_join import fuzzy_join_on_postcode
|
||||
|
||||
POSTCODE = "E14 2DG"
|
||||
|
||||
# Price paid: unique addresses for this postcode
|
||||
pp = (
|
||||
pl.scan_parquet("data_sources/pp-complete.parquet")
|
||||
.filter(pl.col("postcode") == POSTCODE)
|
||||
.select("paon", "saon", "street", "postcode")
|
||||
.collect()
|
||||
.unique()
|
||||
.sort("saon")
|
||||
.with_columns(
|
||||
pl.concat_str(
|
||||
[pl.col("saon"), pl.col("paon"), pl.col("street")],
|
||||
separator=" ",
|
||||
ignore_nulls=True,
|
||||
).alias("pp_address"),
|
||||
)
|
||||
)
|
||||
|
||||
# EPC: latest inspection per address for this postcode
|
||||
epc = (
|
||||
pl.scan_csv("data_sources/epc/certificates.csv")
|
||||
.select("ADDRESS", "POSTCODE", "INSPECTION_DATE")
|
||||
.filter(pl.col("POSTCODE").str.strip_chars() == POSTCODE)
|
||||
.sort("INSPECTION_DATE", descending=True)
|
||||
.collect()
|
||||
.unique("ADDRESS")
|
||||
.sort("ADDRESS")
|
||||
)
|
||||
|
||||
print(f"Price paid: {len(pp)} unique addresses")
|
||||
print(f"EPC: {len(epc)} unique addresses")
|
||||
|
||||
result = fuzzy_join_on_postcode(
|
||||
left=pp,
|
||||
right=epc,
|
||||
left_address_col="pp_address",
|
||||
right_address_col="ADDRESS",
|
||||
left_postcode_col="postcode",
|
||||
right_postcode_col="POSTCODE",
|
||||
score_threshold=80,
|
||||
|
||||
)
|
||||
|
||||
snapshot = result.select("pp_address", "ADDRESS").sort("pp_address")
|
||||
|
||||
with pl.Config(tbl_rows=-1, tbl_cols=-1, fmt_str_lengths=80):
|
||||
print(snapshot)
|
||||
Loading…
Add table
Add a link
Reference in a new issue