Extarct utils

This commit is contained in:
Andras Schmelczer 2026-01-31 10:49:43 +00:00
parent 0153e46478
commit e1b38a1b95
8 changed files with 458 additions and 25 deletions

View file

@ -0,0 +1,5 @@
from .fuzzy_join import fuzzy_join_on_postcode
from .haversine import haversine_km, haversine_km_expr
from .poi_counts import POI_GROUPS, count_pois_within_radius
__all__ = ["fuzzy_join_on_postcode", "haversine_km", "haversine_km_expr", "POI_GROUPS", "count_pois_within_radius"]

View file

@ -0,0 +1,179 @@
import re
import shutil
import tempfile
from concurrent.futures import ProcessPoolExecutor
from os import cpu_count
from pathlib import Path
import polars as pl
from thefuzz import fuzz
from tqdm import tqdm
_NUMBER_RE = re.compile(r'\d+')
def _normalize(s: pl.Expr) -> pl.Expr:
return (
s.str.to_uppercase()
.str.replace_all(r'[,.\-]', ' ')
.str.replace_all(r'\s+', ' ')
.str.strip_chars()
)
def fuzzy_join_on_postcode(
left: pl.LazyFrame,
right: pl.LazyFrame,
left_address_col: str,
right_address_col: str,
left_postcode_col: str,
right_postcode_col: str,
) -> pl.LazyFrame:
"""Fuzzy join two LazyFrames by matching addresses within postcode buckets.
Sinks each side to a temporary parquet file so the upstream pipeline
executes only once. The matching phase collects just three narrow
columns (index, address, postcode) via projection pushdown, and the
final join reads the remaining columns lazily.
Returns a LazyFrame with all left and right columns. Unmatched rows
have null right columns.
"""
tmpdir = tempfile.mkdtemp(prefix='fuzzy_join_')
left_path = Path(tmpdir) / 'left.parquet'
right_path = Path(tmpdir) / 'right.parquet'
try:
# Materialise each side exactly once, with a row index, to temp parquet.
left.with_row_index('_left_idx').sink_parquet(left_path)
right.with_row_index('_right_idx').sink_parquet(right_path)
# Collect only the narrow columns needed for matching (projection pushdown).
left_match = (
pl.scan_parquet(left_path)
.select(
'_left_idx',
_normalize(pl.col(left_address_col)).alias('_left_address'),
pl.col(left_postcode_col).str.strip_chars().str.to_uppercase().alias('_left_postcode'),
)
.collect()
)
right_match = (
pl.scan_parquet(right_path)
.select(
'_right_idx',
_normalize(pl.col(right_address_col)).alias('_right_address'),
pl.col(right_postcode_col).str.strip_chars().str.to_uppercase().alias('_right_postcode'),
)
.unique(subset=['_right_address', '_right_postcode'], keep='first')
.collect()
)
# Group right side by postcode for fast lookup
right_by_postcode: dict[str, list[tuple[int, str]]] = {}
for idx, postcode, address in zip(
right_match['_right_idx'], right_match['_right_postcode'], right_match['_right_address']
):
if postcode is not None:
right_by_postcode.setdefault(postcode, []).append((idx, address))
# Group left side by postcode
left_by_postcode: dict[str, list[tuple[int, str]]] = {}
for idx, postcode, address in zip(
left_match['_left_idx'], left_match['_left_postcode'], left_match['_left_address']
):
if address is not None and postcode is not None:
left_by_postcode.setdefault(postcode, []).append((idx, address))
del left_match, right_match
# Build tasks for each postcode bucket
tasks = [
(left_entries, right_by_postcode[postcode])
for postcode, left_entries in left_by_postcode.items()
if postcode in right_by_postcode
]
# Score all pairwise matches in parallel, then greedily assign from
# highest score downward so best pairs lock in first.
all_pairs: list[tuple[int, int, int]] = [] # (score, left_idx, right_idx)
with ProcessPoolExecutor(max_workers=cpu_count()) as executor:
for pairs in tqdm(
executor.map(_score_bucket, tasks, chunksize=64),
total=len(tasks),
desc='Fuzzy matching',
):
all_pairs.extend(pairs)
del tasks, left_by_postcode, right_by_postcode
# Sort descending by score so best matches are assigned first
all_pairs.sort(key=lambda t: (t[0], -t[1]), reverse=True)
matches: list[tuple[int, int]] = []
matched_left: set[int] = set()
matched_right: set[int] = set()
for _score, left_idx, right_idx in all_pairs:
if left_idx in matched_left or right_idx in matched_right:
continue
matches.append((left_idx, right_idx))
matched_left.add(left_idx)
matched_right.add(right_idx)
del all_pairs, matched_left, matched_right
# Build a small mapping LazyFrame and join back to the cached parquets.
if matches:
mapping = pl.LazyFrame({
'_left_idx': pl.Series([m[0] for m in matches], dtype=pl.UInt32),
'_right_idx': pl.Series([m[1] for m in matches], dtype=pl.UInt32),
})
else:
mapping = pl.LazyFrame({
'_left_idx': pl.Series([], dtype=pl.UInt32),
'_right_idx': pl.Series([], dtype=pl.UInt32),
})
left_cached = pl.scan_parquet(left_path)
right_cached = pl.scan_parquet(right_path)
return (
left_cached
.join(mapping, on='_left_idx', how='left')
.join(right_cached, on='_right_idx', how='left')
.drop('_left_idx', '_right_idx')
)
except BaseException:
shutil.rmtree(tmpdir, ignore_errors=True)
raise
def _numbers_compatible(a: str, b: str) -> bool:
"""Check that numeric tokens (flat/house numbers) in the shorter set are a subset of the longer.
Returns False if one address has numbers and the other doesn't.
"""
nums_a = set(_NUMBER_RE.findall(a))
nums_b = set(_NUMBER_RE.findall(b))
smaller, larger = (nums_a, nums_b) if len(nums_a) <= len(nums_b) else (nums_b, nums_a)
if not smaller and larger:
return False
return smaller.issubset(larger)
def _score_bucket(
args: tuple[list[tuple[int, str]], list[tuple[int, str]], int],
) -> list[tuple[int, int, int]]:
"""Score all address pairs within a single postcode bucket."""
left_entries, right_entries = args
pairs = []
for left_row, left_address in left_entries:
for right_row, right_address in right_entries:
if not _numbers_compatible(left_address, right_address):
continue
score = fuzz.token_sort_ratio(left_address, right_address)
pairs.append((score, left_row, right_row))
return pairs

View file

@ -0,0 +1,36 @@
import math
import numpy as np
import polars as pl
_EARTH_RADIUS_KM = 6371.0
def haversine_km(lat1: np.ndarray, lon1: np.ndarray, lat2: float, lon2: float) -> np.ndarray:
"""Compute haversine distance in km between arrays (lat1, lon1) and a single point (lat2, lon2)."""
lat1_rad = np.radians(lat1)
lon1_rad = np.radians(lon1)
lat2_rad = np.radians(lat2)
lon2_rad = np.radians(lon2)
dlat = lat2_rad - lat1_rad
dlon = lon2_rad - lon1_rad
a = np.sin(dlat / 2) ** 2 + np.cos(lat1_rad) * np.cos(lat2_rad) * np.sin(dlon / 2) ** 2
c = 2 * np.arcsin(np.sqrt(a))
return _EARTH_RADIUS_KM * c
def haversine_km_expr(
lat_col: str, lon_col: str, dest_lat: float, dest_lon: float
) -> pl.Expr:
"""Polars expression computing haversine distance in km to a fixed point."""
dest_lat_rad = math.radians(dest_lat)
dest_lon_rad = math.radians(dest_lon)
lat_rad = pl.col(lat_col).radians()
lon_rad = pl.col(lon_col).radians()
dlat = pl.lit(dest_lat_rad) - lat_rad
dlon = pl.lit(dest_lon_rad) - lon_rad
a = (dlat / 2).sin() ** 2 + pl.lit(dest_lat_rad).cos() * lat_rad.cos() * (dlon / 2).sin() ** 2
return 2 * _EARTH_RADIUS_KM * a.sqrt().arcsin()

View file

@ -0,0 +1,192 @@
"""Count POIs within a radius of properties, optimized via postcode deduplication."""
import os
import tempfile
import numpy as np
import polars as pl
from .haversine import haversine_km
# POI category groups for proximity counting
POI_GROUPS = {
"restaurants": ["Restaurant", "Fast Food"],
"groceries": ["Greengrocer", "Grocery Shop", "Supermarket", "Convenience Store"],
"parks": ["Park", "Garden", "Nature Reserve"],
"public_transport": ["Station", "Stop", "Bus Station"],
}
def _count_pois_per_postcode(
postcodes_df: pl.DataFrame, pois: pl.DataFrame, radius_km: float = 2.0
) -> pl.DataFrame:
"""
For each unique postcode, count POIs within radius_km by category group.
Uses spatial grid with vectorized distance calculations.
"""
print(f"Counting POIs within {radius_km}km per postcode...")
n_postcodes = len(postcodes_df)
n_pois = len(pois)
print(f" {n_postcodes:,} postcodes, {n_pois:,} POIs")
# Build spatial grid for POIs (0.05 degree cells ~5.5km)
grid_size = 0.05
print(" Building POI spatial grid...")
# Convert to numpy arrays
poi_lats = pois["lat"].to_numpy()
poi_lngs = pois["lng"].to_numpy()
poi_cats = pois["category"].to_numpy()
# Compute grid coordinates for all POIs
poi_grid_lats = np.floor(poi_lats / grid_size).astype(np.int32)
poi_grid_lngs = np.floor(poi_lngs / grid_size).astype(np.int32)
# Build grid cell lookup using numpy indexing
poi_grid = {}
for i in range(n_pois):
key = (poi_grid_lats[i], poi_grid_lngs[i])
if key not in poi_grid:
poi_grid[key] = []
poi_grid[key].append(i)
# Convert grid values to numpy arrays for faster indexing
for key in poi_grid:
poi_grid[key] = np.array(poi_grid[key], dtype=np.int32)
print(f" POI grid has {len(poi_grid):,} occupied cells")
# Pre-compute category masks
category_masks = {}
for group, categories in POI_GROUPS.items():
mask = np.isin(poi_cats, categories)
category_masks[group] = mask
print(f" {group}: {mask.sum():,} POIs")
# Extract postcode coordinates as numpy arrays
pc_lats = postcodes_df["lat"].to_numpy()
pc_lons = postcodes_df["lon"].to_numpy()
pc_codes = postcodes_df["postcode"].to_list()
# Initialize result arrays
result_counts = {group: np.zeros(n_postcodes, dtype=np.int32) for group in POI_GROUPS}
# Process in batches with progress
batch_size = 50000
n_batches = (n_postcodes + batch_size - 1) // batch_size
print(f" Processing {n_postcodes:,} postcodes in {n_batches} batches...")
for batch_idx in range(n_batches):
start_idx = batch_idx * batch_size
end_idx = min(start_idx + batch_size, n_postcodes)
if batch_idx % 5 == 0:
print(f" Batch {batch_idx + 1}/{n_batches}: postcodes {start_idx:,} - {end_idx:,}")
# Process batch
for i in range(start_idx, end_idx):
pc_lat = pc_lats[i]
pc_lon = pc_lons[i]
# Find grid cells to check (3x3 grid)
grid_lat = int(np.floor(pc_lat / grid_size))
grid_lng = int(np.floor(pc_lon / grid_size))
# Collect nearby POI indices
nearby_indices = []
for dlat in [-1, 0, 1]:
for dlng in [-1, 0, 1]:
cell_key = (grid_lat + dlat, grid_lng + dlng)
if cell_key in poi_grid:
nearby_indices.append(poi_grid[cell_key])
if not nearby_indices:
continue
# Concatenate all nearby POI indices
nearby = np.concatenate(nearby_indices)
# Vectorized distance calculation for all nearby POIs
distances = haversine_km(
poi_lats[nearby],
poi_lngs[nearby],
pc_lat,
pc_lon
)
# Filter by radius
within_mask = distances <= radius_km
within_indices = nearby[within_mask]
if len(within_indices) == 0:
continue
# Count by category group using pre-computed masks
for group, cat_mask in category_masks.items():
result_counts[group][i] = cat_mask[within_indices].sum()
# Build result dataframe
result_data = {"postcode": pc_codes}
for group in POI_GROUPS:
result_data[f"{group}_{int(radius_km)}km"] = result_counts[group]
result = pl.DataFrame(result_data)
print(" Completed POI counting")
return result
def count_pois_within_radius(
properties: pl.DataFrame, pois: pl.DataFrame, radius_km: float = 2.0
) -> dict[str, pl.Series]:
"""
Count POIs within radius for properties, optimized by deduplicating postcodes.
Returns dict of {column_name: count_series} aligned to properties dataframe.
"""
# Get unique postcodes with coordinates
print("Deduplicating postcodes...")
unique_postcodes = (
properties
.select(["postcode", "lat", "lon"])
.unique(subset=["postcode"])
)
print(f" {len(properties):,} properties → {len(unique_postcodes):,} unique postcodes")
# Count POIs per postcode
postcode_counts = _count_pois_per_postcode(unique_postcodes, pois, radius_km)
# Write to temp file to avoid memory duplication during join
print(" Writing postcode counts to temp file...")
with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as tmp:
tmp_path = tmp.name
postcode_counts.write_parquet(tmp_path)
del postcode_counts # Free memory
# Join using lazy evaluation
print(" Joining counts back to properties (lazy)...")
count_cols = [f"{group}_{int(radius_km)}km" for group in POI_GROUPS]
# Convert properties to lazy frame, join, then collect
result_lazy = (
properties.lazy()
.select("postcode")
.join(
pl.scan_parquet(tmp_path),
on="postcode",
how="left"
)
.select(count_cols)
.fill_null(0)
)
result_df = result_lazy.collect()
# Clean up temp file
os.unlink(tmp_path)
# Extract as dict of Series
return {col: result_df[col] for col in count_cols}

View file

@ -0,0 +1,46 @@
import polars as pl
from pipeline.utils import fuzzy_join_on_postcode
POSTCODE = "E14 2DG"
# Price paid: unique addresses for this postcode
pp = (
pl.scan_parquet("data_sources/pp-complete.parquet")
.filter(pl.col("postcode") == POSTCODE)
.select("paon", "saon", "street", "postcode")
.unique()
.sort("saon")
.with_columns(
pl.concat_str(
[pl.col("saon"), pl.col("paon"), pl.col("street")],
separator=" ",
ignore_nulls=True,
).alias("pp_address"),
)
)
# EPC: latest inspection per address for this postcode
epc = (
pl.scan_csv("data_sources/epc/certificates.csv")
.select("ADDRESS", "POSTCODE", "INSPECTION_DATE")
.filter(pl.col("POSTCODE").str.strip_chars() == POSTCODE)
.sort("INSPECTION_DATE", descending=True)
.unique("ADDRESS")
.sort("ADDRESS")
)
result = fuzzy_join_on_postcode(
left=pp,
right=epc,
left_address_col="pp_address",
right_address_col="ADDRESS",
left_postcode_col="postcode",
right_postcode_col="POSTCODE",
).collect()
snapshot = result.select("pp_address", "ADDRESS").sort("pp_address")
print('Testing the matching between EPC and PP addresses')
with pl.Config(tbl_rows=-1, tbl_cols=-1, fmt_str_lengths=80):
print(snapshot)

View file

@ -0,0 +1,135 @@
import numpy as np
import polars as pl
import pytest
from pipeline.utils.haversine import haversine_km, haversine_km_expr
class TestHaversineKm:
"""Test numpy-based haversine distance calculation."""
def test_same_point(self):
"""Distance from a point to itself should be zero."""
lat = np.array([51.5074])
lon = np.array([-0.1278])
dist = haversine_km(lat, lon, 51.5074, -0.1278)
assert np.allclose(dist, 0.0, atol=1e-10)
def test_known_distance_london_to_paris(self):
"""Test distance from London to Paris (~344 km)."""
# London coordinates
london_lat = np.array([51.5074])
london_lon = np.array([-0.1278])
# Paris coordinates
paris_lat = 48.8566
paris_lon = 2.3522
dist = haversine_km(london_lat, london_lon, paris_lat, paris_lon)
# Expected distance is approximately 344 km
assert np.allclose(dist[0], 344, rtol=0.01)
def test_known_distance_new_york_to_london(self):
"""Test distance from New York to London (~5570 km)."""
ny_lat = np.array([40.7128])
ny_lon = np.array([-74.0060])
london_lat = 51.5074
london_lon = -0.1278
dist = haversine_km(ny_lat, ny_lon, london_lat, london_lon)
# Expected distance is approximately 5570 km
assert np.allclose(dist[0], 5570, rtol=0.01)
def test_multiple_points(self):
"""Test calculating distances from multiple points to a single destination."""
lats = np.array([51.5074, 48.8566, 40.7128]) # London, Paris, NYC
lons = np.array([-0.1278, 2.3522, -74.0060])
# Distance to Edinburgh
edinburgh_lat = 55.9533
edinburgh_lon = -3.1883
dists = haversine_km(lats, lons, edinburgh_lat, edinburgh_lon)
# All distances should be positive
assert np.all(dists > 0)
# London to Edinburgh should be shortest (~530 km)
assert dists[0] < dists[1] < dists[2]
assert np.allclose(dists[0], 530, rtol=0.02)
def test_equator_points(self):
"""Test distance along the equator."""
# Two points on the equator, 1 degree apart
lat = np.array([0.0])
lon1 = np.array([0.0])
lon2 = 1.0
dist = haversine_km(lat, lon1, 0.0, lon2)
# 1 degree at equator ≈ 111 km
assert np.allclose(dist[0], 111.2, rtol=0.01)
class TestHaversineKmExpr:
"""Test Polars expression-based haversine distance calculation."""
def test_same_point(self):
"""Distance from a point to itself should be zero."""
df = pl.DataFrame({"lat": [51.5074], "lon": [-0.1278]})
result = df.select(haversine_km_expr("lat", "lon", 51.5074, -0.1278).alias("dist"))
assert result["dist"][0] == pytest.approx(0.0, abs=1e-10)
def test_known_distance_london_to_paris(self):
"""Test distance from London to Paris (~344 km)."""
df = pl.DataFrame({"lat": [51.5074], "lon": [-0.1278]})
result = df.select(haversine_km_expr("lat", "lon", 48.8566, 2.3522).alias("dist"))
assert result["dist"][0] == pytest.approx(344, rel=0.01)
def test_known_distance_new_york_to_london(self):
"""Test distance from New York to London (~5570 km)."""
df = pl.DataFrame({"lat": [40.7128], "lon": [-74.0060]})
result = df.select(haversine_km_expr("lat", "lon", 51.5074, -0.1278).alias("dist"))
assert result["dist"][0] == pytest.approx(5570, rel=0.01)
def test_multiple_points(self):
"""Test calculating distances from multiple points to a single destination."""
df = pl.DataFrame({
"lat": [51.5074, 48.8566, 40.7128], # London, Paris, NYC
"lon": [-0.1278, 2.3522, -74.0060],
})
# Distance to Edinburgh
result = df.select(haversine_km_expr("lat", "lon", 55.9533, -3.1883).alias("dist"))
dists = result["dist"].to_numpy()
# All distances should be positive
assert np.all(dists > 0)
# London to Edinburgh should be shortest (~530 km)
assert dists[0] < dists[1] < dists[2]
assert dists[0] == pytest.approx(530, rel=0.02)
def test_equator_points(self):
"""Test distance along the equator."""
df = pl.DataFrame({"lat": [0.0], "lon": [0.0]})
result = df.select(haversine_km_expr("lat", "lon", 0.0, 1.0).alias("dist"))
# 1 degree at equator ≈ 111 km
assert result["dist"][0] == pytest.approx(111.2, rel=0.01)
class TestHaversineConsistency:
"""Test that both implementations give consistent results."""
def test_numpy_and_polars_match(self):
"""Both implementations should give identical results."""
# Test data
lats = np.array([51.5074, 48.8566, 40.7128, 55.9533, 52.5200])
lons = np.array([-0.1278, 2.3522, -74.0060, -3.1883, 13.4050])
dest_lat = 41.9028 # Rome
dest_lon = 12.4964
# Numpy version
numpy_dists = haversine_km(lats, lons, dest_lat, dest_lon)
# Polars version
df = pl.DataFrame({"lat": lats, "lon": lons})
polars_result = df.select(haversine_km_expr("lat", "lon", dest_lat, dest_lon).alias("dist"))
polars_dists = polars_result["dist"].to_numpy()
# Should be identical (or at least very close due to floating point)
assert np.allclose(numpy_dists, polars_dists, rtol=1e-10)

View file

@ -0,0 +1,85 @@
import polars as pl
import pytest
from pipeline.utils.poi_counts import POI_GROUPS, count_pois_within_radius
@pytest.fixture
def pois():
"""POIs clustered around two locations: central London and 10km away."""
return pl.DataFrame({
"lat": [51.5074, 51.5075, 51.5080, 51.5076, 51.5073, 51.60],
"lng": [-0.1278, -0.1280, -0.1275, -0.1279, -0.1277, -0.20],
"category": [
"Restaurant",
"Fast Food",
"Supermarket",
"Park",
"Station",
"Restaurant", # too far from any property
],
})
@pytest.fixture
def properties():
"""Two properties at the same postcode near central London, one at a distant postcode."""
return pl.DataFrame({
"postcode": ["EC1A 1BB", "EC1A 1BB", "ZZ99 9ZZ"],
"lat": [51.5074, 51.5074, 55.0],
"lon": [-0.1278, -0.1278, -3.0],
})
def test_counts_pois_within_radius(properties, pois):
result = count_pois_within_radius(properties, pois, radius_km=2.0)
assert set(result.keys()) == {f"{g}_2km" for g in POI_GROUPS}
# Result Series must be aligned to properties (3 rows)
for col, series in result.items():
assert len(series) == 3, f"{col} has {len(series)} rows, expected 3"
# First two rows share a postcode near the central London cluster
assert result["restaurants_2km"][0] == 2 # Restaurant + Fast Food
assert result["groceries_2km"][0] == 1 # Supermarket
assert result["parks_2km"][0] == 1 # Park
assert result["public_transport_2km"][0] == 1 # Station
# Second row is the same postcode, so same counts
assert result["restaurants_2km"][1] == result["restaurants_2km"][0]
# Third row (ZZ99 9ZZ) is far from all POIs → zero counts
for group in POI_GROUPS:
assert result[f"{group}_2km"][2] == 0
def test_no_pois_returns_zeros(properties):
empty_pois = pl.DataFrame({
"lat": pl.Series([], dtype=pl.Float64),
"lng": pl.Series([], dtype=pl.Float64),
"category": pl.Series([], dtype=pl.String),
})
result = count_pois_within_radius(properties, empty_pois, radius_km=2.0)
for group in POI_GROUPS:
col = f"{group}_2km"
assert col in result
assert result[col].to_list() == [0, 0, 0]
def test_custom_radius(pois):
"""A tiny radius should exclude POIs that are even slightly away."""
properties = pl.DataFrame({
"postcode": ["EC1A 1BB"],
"lat": [51.5074],
"lon": [-0.1278],
})
# 0.01 km = 10m — only the POI at the exact same location should match
result = count_pois_within_radius(properties, pois, radius_km=0.01)
# The Restaurant at (51.5074, -0.1278) is at distance 0
assert result["restaurants_0km"][0] >= 1
# POIs >100m away should not be counted
total = sum(result[f"{g}_0km"][0] for g in POI_GROUPS)
assert total <= 2 # at most the co-located POIs