Update H3 pipeline

This commit is contained in:
Andras Schmelczer 2026-01-30 18:33:48 +00:00
parent 68b6dcf65e
commit 6122ee44da
13 changed files with 291 additions and 420 deletions

View file

@ -1,6 +1,9 @@
import re
import shutil
import tempfile
from concurrent.futures import ProcessPoolExecutor
from os import cpu_count
from pathlib import Path
import polars as pl
from thefuzz import fuzz
@ -9,105 +12,143 @@ from tqdm import tqdm
_NUMBER_RE = re.compile(r'\d+')
def _normalize(s: pl.Expr) -> pl.Expr:
return (
s.str.to_uppercase()
.str.replace_all(r'[,.\-]', ' ')
.str.replace_all(r'\s+', ' ')
.str.strip_chars()
)
def fuzzy_join_on_postcode(
left: pl.DataFrame,
right: pl.DataFrame,
left: pl.LazyFrame,
right: pl.LazyFrame,
left_address_col: str,
right_address_col: str,
left_postcode_col: str,
right_postcode_col: str,
score_threshold: int = 80,
) -> pl.DataFrame:
"""Fuzzy join two DataFrames by matching addresses within postcode buckets.
) -> pl.LazyFrame:
"""Fuzzy join two LazyFrames by matching addresses within postcode buckets.
Returns the left DataFrame with all right columns appended.
Unmatched rows have null right columns.
Sinks each side to a temporary parquet file so the upstream pipeline
executes only once. The matching phase collects just three narrow
columns (index, address, postcode) via projection pushdown, and the
final join reads the remaining columns lazily.
Returns a LazyFrame with all left and right columns. Unmatched rows
have null right columns.
"""
def _normalize(s: pl.Expr) -> pl.Expr:
return (
s.str.to_uppercase()
.str.replace_all(r'[,.\-]', ' ')
.str.replace_all(r'\s+', ' ')
.str.strip_chars()
tmpdir = tempfile.mkdtemp(prefix='fuzzy_join_')
left_path = Path(tmpdir) / 'left.parquet'
right_path = Path(tmpdir) / 'right.parquet'
try:
# Materialise each side exactly once, with a row index, to temp parquet.
left.with_row_index('_left_idx').sink_parquet(left_path)
right.with_row_index('_right_idx').sink_parquet(right_path)
# Collect only the narrow columns needed for matching (projection pushdown).
left_match = (
pl.scan_parquet(left_path)
.select(
'_left_idx',
_normalize(pl.col(left_address_col)).alias('_left_address'),
pl.col(left_postcode_col).str.strip_chars().str.to_uppercase().alias('_left_postcode'),
)
.collect()
)
left = left.with_columns(
_normalize(pl.col(left_address_col)).alias('_left_address'),
pl.col(left_postcode_col).str.strip_chars().str.to_uppercase().alias('_left_postcode'),
)
right = right.with_columns(
_normalize(pl.col(right_address_col)).alias('_right_address'),
pl.col(right_postcode_col).str.strip_chars().str.to_uppercase().alias('_right_postcode'),
)
right_match = (
pl.scan_parquet(right_path)
.select(
'_right_idx',
_normalize(pl.col(right_address_col)).alias('_right_address'),
pl.col(right_postcode_col).str.strip_chars().str.to_uppercase().alias('_right_postcode'),
)
.unique(subset=['_right_address', '_right_postcode'], keep='first')
.collect()
)
# Deduplicate right side on normalized address + postcode so that
# variant spellings of the same address don't consume multiple slots.
right = right.unique(subset=['_right_address', '_right_postcode'], keep='first')
# Group right side by postcode for fast lookup
right_by_postcode: dict[str, list[tuple[int, str]]] = {}
for i, (postcode, address) in enumerate(
zip(right['_right_postcode'], right['_right_address'])
):
if postcode is not None:
right_by_postcode.setdefault(postcode, []).append((i, address))
# Group left side by postcode
left_by_postcode: dict[str, list[tuple[int, str]]] = {}
for left_row, (postcode, address) in enumerate(
zip(left['_left_postcode'], left['_left_address'])
):
if address is not None and postcode is not None:
left_by_postcode.setdefault(postcode, []).append((left_row, address))
# Build tasks for each postcode bucket
tasks = [
(left_entries, right_by_postcode[postcode], score_threshold)
for postcode, left_entries in left_by_postcode.items()
if postcode in right_by_postcode
]
# Score all pairwise matches in parallel, then greedily assign from
# highest score downward so best pairs lock in first.
all_pairs: list[tuple[int, int, int]] = [] # (score, left_row, right_row)
with ProcessPoolExecutor(max_workers=cpu_count()) as executor:
for pairs in tqdm(
executor.map(_score_bucket, tasks, chunksize=64),
total=len(tasks),
desc='Fuzzy matching',
# Group right side by postcode for fast lookup
right_by_postcode: dict[str, list[tuple[int, str]]] = {}
for idx, postcode, address in zip(
right_match['_right_idx'], right_match['_right_postcode'], right_match['_right_address']
):
all_pairs.extend(pairs)
if postcode is not None:
right_by_postcode.setdefault(postcode, []).append((idx, address))
# Sort descending by score so best matches are assigned first
all_pairs.sort(key=lambda t: (t[0], -t[1]), reverse=True)
# Group left side by postcode
left_by_postcode: dict[str, list[tuple[int, str]]] = {}
for idx, postcode, address in zip(
left_match['_left_idx'], left_match['_left_postcode'], left_match['_left_address']
):
if address is not None and postcode is not None:
left_by_postcode.setdefault(postcode, []).append((idx, address))
match_indices: list[int | None] = [None] * len(left)
matched_left: set[int] = set()
matched_right: set[int] = set()
del left_match, right_match
for score, left_row, right_row in all_pairs:
if left_row in matched_left or right_row in matched_right:
continue
match_indices[left_row] = right_row
matched_left.add(left_row)
matched_right.add(right_row)
# Build tasks for each postcode bucket
tasks = [
(left_entries, right_by_postcode[postcode])
for postcode, left_entries in left_by_postcode.items()
if postcode in right_by_postcode
]
# Select right columns (excluding internal helpers)
right_cols = right.select(pl.exclude('_right_address', '_right_postcode'))
right_matched = right_cols[
[i if i is not None else 0 for i in match_indices]
]
# Score all pairwise matches in parallel, then greedily assign from
# highest score downward so best pairs lock in first.
all_pairs: list[tuple[int, int, int]] = [] # (score, left_idx, right_idx)
with ProcessPoolExecutor(max_workers=cpu_count()) as executor:
for pairs in tqdm(
executor.map(_score_bucket, tasks, chunksize=64),
total=len(tasks),
desc='Fuzzy matching',
):
all_pairs.extend(pairs)
# Null out unmatched rows
mask = pl.Series('_matched', [i is not None for i in match_indices])
right_matched = right_matched.with_columns(
pl.when(mask).then(pl.col(c)).otherwise(pl.lit(None)).alias(c)
for c in right_matched.columns
)
del tasks, left_by_postcode, right_by_postcode
left_clean = left.select(pl.exclude('_left_address', '_left_postcode'))
return pl.concat([left_clean, right_matched], how='horizontal')
# Sort descending by score so best matches are assigned first
all_pairs.sort(key=lambda t: (t[0], -t[1]), reverse=True)
matches: list[tuple[int, int]] = []
matched_left: set[int] = set()
matched_right: set[int] = set()
for _score, left_idx, right_idx in all_pairs:
if left_idx in matched_left or right_idx in matched_right:
continue
matches.append((left_idx, right_idx))
matched_left.add(left_idx)
matched_right.add(right_idx)
del all_pairs, matched_left, matched_right
# Build a small mapping LazyFrame and join back to the cached parquets.
if matches:
mapping = pl.LazyFrame({
'_left_idx': pl.Series([m[0] for m in matches], dtype=pl.UInt32),
'_right_idx': pl.Series([m[1] for m in matches], dtype=pl.UInt32),
})
else:
mapping = pl.LazyFrame({
'_left_idx': pl.Series([], dtype=pl.UInt32),
'_right_idx': pl.Series([], dtype=pl.UInt32),
})
left_cached = pl.scan_parquet(left_path)
right_cached = pl.scan_parquet(right_path)
return (
left_cached
.join(mapping, on='_left_idx', how='left')
.join(right_cached, on='_right_idx', how='left')
.drop('_left_idx', '_right_idx')
)
except BaseException:
shutil.rmtree(tmpdir, ignore_errors=True)
raise
def _numbers_compatible(a: str, b: str) -> bool:
@ -127,13 +168,12 @@ def _score_bucket(
args: tuple[list[tuple[int, str]], list[tuple[int, str]], int],
) -> list[tuple[int, int, int]]:
"""Score all address pairs within a single postcode bucket."""
left_entries, right_entries, score_threshold = args
left_entries, right_entries = args
pairs = []
for left_row, left_address in left_entries:
for right_row, right_address in right_entries:
if not _numbers_compatible(left_address, right_address):
continue
score = fuzz.token_sort_ratio(left_address, right_address)
if score >= score_threshold:
pairs.append((score, left_row, right_row))
pairs.append((score, left_row, right_row))
return pairs