From e1b38a1b954f847c4ff558c869192bdd8c1936ee Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sat, 31 Jan 2026 10:49:43 +0000 Subject: [PATCH] Extarct utils --- pipeline/journey_times/__main__.py | 27 +--- pipeline/utils/__init__.py | 5 + pipeline/{ => utils}/fuzzy_join.py | 0 pipeline/utils/haversine.py | 36 +++++ pipeline/utils/poi_counts.py | 192 ++++++++++++++++++++++++ pipeline/{ => utils}/test_fuzzy_join.py | 3 +- pipeline/utils/test_haversine.py | 135 +++++++++++++++++ pipeline/utils/test_poi_counts.py | 85 +++++++++++ 8 files changed, 458 insertions(+), 25 deletions(-) create mode 100644 pipeline/utils/__init__.py rename pipeline/{ => utils}/fuzzy_join.py (100%) create mode 100644 pipeline/utils/haversine.py create mode 100644 pipeline/utils/poi_counts.py rename pipeline/{ => utils}/test_fuzzy_join.py (91%) create mode 100644 pipeline/utils/test_haversine.py create mode 100644 pipeline/utils/test_poi_counts.py diff --git a/pipeline/journey_times/__main__.py b/pipeline/journey_times/__main__.py index 0951ae4..40e8452 100644 --- a/pipeline/journey_times/__main__.py +++ b/pipeline/journey_times/__main__.py @@ -8,6 +8,7 @@ from tqdm import tqdm from .config import DESTINATIONS, MAX_CONCURRENT, MAX_POSTCODES, OUTPUT_DIR, MAX_DISTANCE_KM from .results import CheckpointSaver, results_to_dataframe, save_results from .tfl_client import fetch_journey_times +from pipeline.utils import haversine_km_expr def main(): @@ -28,31 +29,9 @@ def main(): postcodes_df = pl.read_parquet(OUTPUT_DIR / "postcodes_h3.parquet") print(f"Loaded {postcodes_df.height:,} postcodes") - # Filter to postcodes within 150km of destination using Haversine formula - earth_radius_km = 6371 - - dest_lat_rad = destination.lat * 3.14159265359 / 180 - dest_lon_rad = destination.lon * 3.14159265359 / 180 - + # Filter to postcodes within range of destination postcodes_df = postcodes_df.with_columns( - ( - 2 - * earth_radius_km - * ( - ( - ((pl.lit(dest_lat_rad) - pl.col("lat") * 3.14159265359 / 180) / 2).sin() - ** 2 - + pl.lit(dest_lat_rad).cos() - * (pl.col("lat") * 3.14159265359 / 180).cos() - * ( - (pl.lit(dest_lon_rad) - pl.col("long") * 3.14159265359 / 180) / 2 - ).sin() - ** 2 - ) - .sqrt() - .arcsin() - ) - ).alias("distance_km") + haversine_km_expr("lat", "long", destination.lat, destination.lon).alias("distance_km") ).filter(pl.col("distance_km") <= MAX_DISTANCE_KM) print(f"Filtered to {postcodes_df.height:,} postcodes within {MAX_DISTANCE_KM}km") diff --git a/pipeline/utils/__init__.py b/pipeline/utils/__init__.py new file mode 100644 index 0000000..8f435ac --- /dev/null +++ b/pipeline/utils/__init__.py @@ -0,0 +1,5 @@ +from .fuzzy_join import fuzzy_join_on_postcode +from .haversine import haversine_km, haversine_km_expr +from .poi_counts import POI_GROUPS, count_pois_within_radius + +__all__ = ["fuzzy_join_on_postcode", "haversine_km", "haversine_km_expr", "POI_GROUPS", "count_pois_within_radius"] diff --git a/pipeline/fuzzy_join.py b/pipeline/utils/fuzzy_join.py similarity index 100% rename from pipeline/fuzzy_join.py rename to pipeline/utils/fuzzy_join.py diff --git a/pipeline/utils/haversine.py b/pipeline/utils/haversine.py new file mode 100644 index 0000000..ff4a4bf --- /dev/null +++ b/pipeline/utils/haversine.py @@ -0,0 +1,36 @@ +import math + +import numpy as np +import polars as pl + +_EARTH_RADIUS_KM = 6371.0 + + +def haversine_km(lat1: np.ndarray, lon1: np.ndarray, lat2: float, lon2: float) -> np.ndarray: + """Compute haversine distance in km between arrays (lat1, lon1) and a single point (lat2, lon2).""" + lat1_rad = np.radians(lat1) + lon1_rad = np.radians(lon1) + lat2_rad = np.radians(lat2) + lon2_rad = np.radians(lon2) + dlat = lat2_rad - lat1_rad + dlon = lon2_rad - lon1_rad + a = np.sin(dlat / 2) ** 2 + np.cos(lat1_rad) * np.cos(lat2_rad) * np.sin(dlon / 2) ** 2 + c = 2 * np.arcsin(np.sqrt(a)) + return _EARTH_RADIUS_KM * c + + +def haversine_km_expr( + lat_col: str, lon_col: str, dest_lat: float, dest_lon: float +) -> pl.Expr: + """Polars expression computing haversine distance in km to a fixed point.""" + dest_lat_rad = math.radians(dest_lat) + dest_lon_rad = math.radians(dest_lon) + + lat_rad = pl.col(lat_col).radians() + lon_rad = pl.col(lon_col).radians() + + dlat = pl.lit(dest_lat_rad) - lat_rad + dlon = pl.lit(dest_lon_rad) - lon_rad + + a = (dlat / 2).sin() ** 2 + pl.lit(dest_lat_rad).cos() * lat_rad.cos() * (dlon / 2).sin() ** 2 + return 2 * _EARTH_RADIUS_KM * a.sqrt().arcsin() diff --git a/pipeline/utils/poi_counts.py b/pipeline/utils/poi_counts.py new file mode 100644 index 0000000..6ab700c --- /dev/null +++ b/pipeline/utils/poi_counts.py @@ -0,0 +1,192 @@ +"""Count POIs within a radius of properties, optimized via postcode deduplication.""" + +import os +import tempfile + +import numpy as np +import polars as pl + +from .haversine import haversine_km + +# POI category groups for proximity counting +POI_GROUPS = { + "restaurants": ["Restaurant", "Fast Food"], + "groceries": ["Greengrocer", "Grocery Shop", "Supermarket", "Convenience Store"], + "parks": ["Park", "Garden", "Nature Reserve"], + "public_transport": ["Station", "Stop", "Bus Station"], +} + + +def _count_pois_per_postcode( + postcodes_df: pl.DataFrame, pois: pl.DataFrame, radius_km: float = 2.0 +) -> pl.DataFrame: + """ + For each unique postcode, count POIs within radius_km by category group. + Uses spatial grid with vectorized distance calculations. + """ + print(f"Counting POIs within {radius_km}km per postcode...") + + n_postcodes = len(postcodes_df) + n_pois = len(pois) + print(f" {n_postcodes:,} postcodes, {n_pois:,} POIs") + + # Build spatial grid for POIs (0.05 degree cells ~5.5km) + grid_size = 0.05 + print(" Building POI spatial grid...") + + # Convert to numpy arrays + poi_lats = pois["lat"].to_numpy() + poi_lngs = pois["lng"].to_numpy() + poi_cats = pois["category"].to_numpy() + + # Compute grid coordinates for all POIs + poi_grid_lats = np.floor(poi_lats / grid_size).astype(np.int32) + poi_grid_lngs = np.floor(poi_lngs / grid_size).astype(np.int32) + + # Build grid cell lookup using numpy indexing + poi_grid = {} + for i in range(n_pois): + key = (poi_grid_lats[i], poi_grid_lngs[i]) + if key not in poi_grid: + poi_grid[key] = [] + poi_grid[key].append(i) + + # Convert grid values to numpy arrays for faster indexing + for key in poi_grid: + poi_grid[key] = np.array(poi_grid[key], dtype=np.int32) + + print(f" POI grid has {len(poi_grid):,} occupied cells") + + # Pre-compute category masks + category_masks = {} + for group, categories in POI_GROUPS.items(): + mask = np.isin(poi_cats, categories) + category_masks[group] = mask + print(f" {group}: {mask.sum():,} POIs") + + # Extract postcode coordinates as numpy arrays + pc_lats = postcodes_df["lat"].to_numpy() + pc_lons = postcodes_df["lon"].to_numpy() + pc_codes = postcodes_df["postcode"].to_list() + + # Initialize result arrays + result_counts = {group: np.zeros(n_postcodes, dtype=np.int32) for group in POI_GROUPS} + + # Process in batches with progress + batch_size = 50000 + n_batches = (n_postcodes + batch_size - 1) // batch_size + + print(f" Processing {n_postcodes:,} postcodes in {n_batches} batches...") + + for batch_idx in range(n_batches): + start_idx = batch_idx * batch_size + end_idx = min(start_idx + batch_size, n_postcodes) + + if batch_idx % 5 == 0: + print(f" Batch {batch_idx + 1}/{n_batches}: postcodes {start_idx:,} - {end_idx:,}") + + # Process batch + for i in range(start_idx, end_idx): + pc_lat = pc_lats[i] + pc_lon = pc_lons[i] + + # Find grid cells to check (3x3 grid) + grid_lat = int(np.floor(pc_lat / grid_size)) + grid_lng = int(np.floor(pc_lon / grid_size)) + + # Collect nearby POI indices + nearby_indices = [] + for dlat in [-1, 0, 1]: + for dlng in [-1, 0, 1]: + cell_key = (grid_lat + dlat, grid_lng + dlng) + if cell_key in poi_grid: + nearby_indices.append(poi_grid[cell_key]) + + if not nearby_indices: + continue + + # Concatenate all nearby POI indices + nearby = np.concatenate(nearby_indices) + + # Vectorized distance calculation for all nearby POIs + distances = haversine_km( + poi_lats[nearby], + poi_lngs[nearby], + pc_lat, + pc_lon + ) + + # Filter by radius + within_mask = distances <= radius_km + within_indices = nearby[within_mask] + + if len(within_indices) == 0: + continue + + # Count by category group using pre-computed masks + for group, cat_mask in category_masks.items(): + result_counts[group][i] = cat_mask[within_indices].sum() + + # Build result dataframe + result_data = {"postcode": pc_codes} + for group in POI_GROUPS: + result_data[f"{group}_{int(radius_km)}km"] = result_counts[group] + + result = pl.DataFrame(result_data) + print(" Completed POI counting") + return result + + +def count_pois_within_radius( + properties: pl.DataFrame, pois: pl.DataFrame, radius_km: float = 2.0 +) -> dict[str, pl.Series]: + """ + Count POIs within radius for properties, optimized by deduplicating postcodes. + + Returns dict of {column_name: count_series} aligned to properties dataframe. + """ + # Get unique postcodes with coordinates + print("Deduplicating postcodes...") + unique_postcodes = ( + properties + .select(["postcode", "lat", "lon"]) + .unique(subset=["postcode"]) + ) + + print(f" {len(properties):,} properties → {len(unique_postcodes):,} unique postcodes") + + # Count POIs per postcode + postcode_counts = _count_pois_per_postcode(unique_postcodes, pois, radius_km) + + # Write to temp file to avoid memory duplication during join + print(" Writing postcode counts to temp file...") + with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as tmp: + tmp_path = tmp.name + postcode_counts.write_parquet(tmp_path) + + del postcode_counts # Free memory + + # Join using lazy evaluation + print(" Joining counts back to properties (lazy)...") + count_cols = [f"{group}_{int(radius_km)}km" for group in POI_GROUPS] + + # Convert properties to lazy frame, join, then collect + result_lazy = ( + properties.lazy() + .select("postcode") + .join( + pl.scan_parquet(tmp_path), + on="postcode", + how="left" + ) + .select(count_cols) + .fill_null(0) + ) + + result_df = result_lazy.collect() + + # Clean up temp file + os.unlink(tmp_path) + + # Extract as dict of Series + return {col: result_df[col] for col in count_cols} diff --git a/pipeline/test_fuzzy_join.py b/pipeline/utils/test_fuzzy_join.py similarity index 91% rename from pipeline/test_fuzzy_join.py rename to pipeline/utils/test_fuzzy_join.py index 7a73a73..e9a1cb5 100644 --- a/pipeline/test_fuzzy_join.py +++ b/pipeline/utils/test_fuzzy_join.py @@ -1,6 +1,6 @@ import polars as pl -from fuzzy_join import fuzzy_join_on_postcode +from pipeline.utils import fuzzy_join_on_postcode POSTCODE = "E14 2DG" @@ -41,5 +41,6 @@ result = fuzzy_join_on_postcode( snapshot = result.select("pp_address", "ADDRESS").sort("pp_address") +print('Testing the matching between EPC and PP addresses') with pl.Config(tbl_rows=-1, tbl_cols=-1, fmt_str_lengths=80): print(snapshot) diff --git a/pipeline/utils/test_haversine.py b/pipeline/utils/test_haversine.py new file mode 100644 index 0000000..639eba7 --- /dev/null +++ b/pipeline/utils/test_haversine.py @@ -0,0 +1,135 @@ +import numpy as np +import polars as pl +import pytest + +from pipeline.utils.haversine import haversine_km, haversine_km_expr + + +class TestHaversineKm: + """Test numpy-based haversine distance calculation.""" + + def test_same_point(self): + """Distance from a point to itself should be zero.""" + lat = np.array([51.5074]) + lon = np.array([-0.1278]) + dist = haversine_km(lat, lon, 51.5074, -0.1278) + assert np.allclose(dist, 0.0, atol=1e-10) + + def test_known_distance_london_to_paris(self): + """Test distance from London to Paris (~344 km).""" + # London coordinates + london_lat = np.array([51.5074]) + london_lon = np.array([-0.1278]) + # Paris coordinates + paris_lat = 48.8566 + paris_lon = 2.3522 + + dist = haversine_km(london_lat, london_lon, paris_lat, paris_lon) + # Expected distance is approximately 344 km + assert np.allclose(dist[0], 344, rtol=0.01) + + def test_known_distance_new_york_to_london(self): + """Test distance from New York to London (~5570 km).""" + ny_lat = np.array([40.7128]) + ny_lon = np.array([-74.0060]) + london_lat = 51.5074 + london_lon = -0.1278 + + dist = haversine_km(ny_lat, ny_lon, london_lat, london_lon) + # Expected distance is approximately 5570 km + assert np.allclose(dist[0], 5570, rtol=0.01) + + def test_multiple_points(self): + """Test calculating distances from multiple points to a single destination.""" + lats = np.array([51.5074, 48.8566, 40.7128]) # London, Paris, NYC + lons = np.array([-0.1278, 2.3522, -74.0060]) + # Distance to Edinburgh + edinburgh_lat = 55.9533 + edinburgh_lon = -3.1883 + + dists = haversine_km(lats, lons, edinburgh_lat, edinburgh_lon) + + # All distances should be positive + assert np.all(dists > 0) + # London to Edinburgh should be shortest (~530 km) + assert dists[0] < dists[1] < dists[2] + assert np.allclose(dists[0], 530, rtol=0.02) + + def test_equator_points(self): + """Test distance along the equator.""" + # Two points on the equator, 1 degree apart + lat = np.array([0.0]) + lon1 = np.array([0.0]) + lon2 = 1.0 + + dist = haversine_km(lat, lon1, 0.0, lon2) + # 1 degree at equator ≈ 111 km + assert np.allclose(dist[0], 111.2, rtol=0.01) + + +class TestHaversineKmExpr: + """Test Polars expression-based haversine distance calculation.""" + + def test_same_point(self): + """Distance from a point to itself should be zero.""" + df = pl.DataFrame({"lat": [51.5074], "lon": [-0.1278]}) + result = df.select(haversine_km_expr("lat", "lon", 51.5074, -0.1278).alias("dist")) + assert result["dist"][0] == pytest.approx(0.0, abs=1e-10) + + def test_known_distance_london_to_paris(self): + """Test distance from London to Paris (~344 km).""" + df = pl.DataFrame({"lat": [51.5074], "lon": [-0.1278]}) + result = df.select(haversine_km_expr("lat", "lon", 48.8566, 2.3522).alias("dist")) + assert result["dist"][0] == pytest.approx(344, rel=0.01) + + def test_known_distance_new_york_to_london(self): + """Test distance from New York to London (~5570 km).""" + df = pl.DataFrame({"lat": [40.7128], "lon": [-74.0060]}) + result = df.select(haversine_km_expr("lat", "lon", 51.5074, -0.1278).alias("dist")) + assert result["dist"][0] == pytest.approx(5570, rel=0.01) + + def test_multiple_points(self): + """Test calculating distances from multiple points to a single destination.""" + df = pl.DataFrame({ + "lat": [51.5074, 48.8566, 40.7128], # London, Paris, NYC + "lon": [-0.1278, 2.3522, -74.0060], + }) + # Distance to Edinburgh + result = df.select(haversine_km_expr("lat", "lon", 55.9533, -3.1883).alias("dist")) + + dists = result["dist"].to_numpy() + # All distances should be positive + assert np.all(dists > 0) + # London to Edinburgh should be shortest (~530 km) + assert dists[0] < dists[1] < dists[2] + assert dists[0] == pytest.approx(530, rel=0.02) + + def test_equator_points(self): + """Test distance along the equator.""" + df = pl.DataFrame({"lat": [0.0], "lon": [0.0]}) + result = df.select(haversine_km_expr("lat", "lon", 0.0, 1.0).alias("dist")) + # 1 degree at equator ≈ 111 km + assert result["dist"][0] == pytest.approx(111.2, rel=0.01) + + +class TestHaversineConsistency: + """Test that both implementations give consistent results.""" + + def test_numpy_and_polars_match(self): + """Both implementations should give identical results.""" + # Test data + lats = np.array([51.5074, 48.8566, 40.7128, 55.9533, 52.5200]) + lons = np.array([-0.1278, 2.3522, -74.0060, -3.1883, 13.4050]) + dest_lat = 41.9028 # Rome + dest_lon = 12.4964 + + # Numpy version + numpy_dists = haversine_km(lats, lons, dest_lat, dest_lon) + + # Polars version + df = pl.DataFrame({"lat": lats, "lon": lons}) + polars_result = df.select(haversine_km_expr("lat", "lon", dest_lat, dest_lon).alias("dist")) + polars_dists = polars_result["dist"].to_numpy() + + # Should be identical (or at least very close due to floating point) + assert np.allclose(numpy_dists, polars_dists, rtol=1e-10) diff --git a/pipeline/utils/test_poi_counts.py b/pipeline/utils/test_poi_counts.py new file mode 100644 index 0000000..a7366f5 --- /dev/null +++ b/pipeline/utils/test_poi_counts.py @@ -0,0 +1,85 @@ +import polars as pl +import pytest + +from pipeline.utils.poi_counts import POI_GROUPS, count_pois_within_radius + + +@pytest.fixture +def pois(): + """POIs clustered around two locations: central London and 10km away.""" + return pl.DataFrame({ + "lat": [51.5074, 51.5075, 51.5080, 51.5076, 51.5073, 51.60], + "lng": [-0.1278, -0.1280, -0.1275, -0.1279, -0.1277, -0.20], + "category": [ + "Restaurant", + "Fast Food", + "Supermarket", + "Park", + "Station", + "Restaurant", # too far from any property + ], + }) + + +@pytest.fixture +def properties(): + """Two properties at the same postcode near central London, one at a distant postcode.""" + return pl.DataFrame({ + "postcode": ["EC1A 1BB", "EC1A 1BB", "ZZ99 9ZZ"], + "lat": [51.5074, 51.5074, 55.0], + "lon": [-0.1278, -0.1278, -3.0], + }) + + +def test_counts_pois_within_radius(properties, pois): + result = count_pois_within_radius(properties, pois, radius_km=2.0) + + assert set(result.keys()) == {f"{g}_2km" for g in POI_GROUPS} + + # Result Series must be aligned to properties (3 rows) + for col, series in result.items(): + assert len(series) == 3, f"{col} has {len(series)} rows, expected 3" + + # First two rows share a postcode near the central London cluster + assert result["restaurants_2km"][0] == 2 # Restaurant + Fast Food + assert result["groceries_2km"][0] == 1 # Supermarket + assert result["parks_2km"][0] == 1 # Park + assert result["public_transport_2km"][0] == 1 # Station + + # Second row is the same postcode, so same counts + assert result["restaurants_2km"][1] == result["restaurants_2km"][0] + + # Third row (ZZ99 9ZZ) is far from all POIs → zero counts + for group in POI_GROUPS: + assert result[f"{group}_2km"][2] == 0 + + +def test_no_pois_returns_zeros(properties): + empty_pois = pl.DataFrame({ + "lat": pl.Series([], dtype=pl.Float64), + "lng": pl.Series([], dtype=pl.Float64), + "category": pl.Series([], dtype=pl.String), + }) + result = count_pois_within_radius(properties, empty_pois, radius_km=2.0) + + for group in POI_GROUPS: + col = f"{group}_2km" + assert col in result + assert result[col].to_list() == [0, 0, 0] + + +def test_custom_radius(pois): + """A tiny radius should exclude POIs that are even slightly away.""" + properties = pl.DataFrame({ + "postcode": ["EC1A 1BB"], + "lat": [51.5074], + "lon": [-0.1278], + }) + + # 0.01 km = 10m — only the POI at the exact same location should match + result = count_pois_within_radius(properties, pois, radius_km=0.01) + # The Restaurant at (51.5074, -0.1278) is at distance 0 + assert result["restaurants_0km"][0] >= 1 + # POIs >100m away should not be counted + total = sum(result[f"{g}_0km"][0] for g in POI_GROUPS) + assert total <= 2 # at most the co-located POIs