"""Add online buy/rent listings to wide.parquet as new rows. Matches online listings to existing historical rows by postcode + fuzzy address, carrying over historical prices and area-level data for matched properties. Unmatched listings get area data from any same-postcode row in wide. Modifies wide.parquet in-place, adding: - A `Listing status` column to all rows ("Historical sale" / "For sale" / "For rent") - New columns: Asking price, Asking rent (monthly), Bedrooms, Bathrooms, Listing date, Property sub-type, Listing URL, Price qualifier """ import argparse import re from concurrent.futures import ProcessPoolExecutor from os import cpu_count from pathlib import Path import polars as pl from thefuzz import fuzz from tqdm import tqdm from pipeline.utils.fuzzy_join import _numbers_compatible _NORMALIZE_RE = re.compile(r"[,.\-]") _WHITESPACE_RE = re.compile(r"\s+") # Columns that are property-specific (carried from matched historical row only) _PROPERTY_COLUMNS = [ "Last known price", "Date of last transaction", "historical_prices", "renovation_history", "Construction age", "Is construction date approximate", "Current energy rating", "Potential energy rating", "Address per EPC", "Interior height (m)", "Number of bedrooms & living rooms", "Price per sqm", "Estimated current price", "Est. price per sqm", ] # Columns that are area-level (carried from matched row, or any same-postcode row) _AREA_COLUMNS = [ "Public transport to Bank (mins)", "Cycling to Bank (mins)", "Public transport to Fitzrovia (mins)", "Cycling to Fitzrovia (mins)", "Income Score (rate)", "Employment Score (rate)", "Education, Skills and Training Score", "Health Deprivation and Disability Score", "Living Environment Score", "Indoors Sub-domain Score", "Outdoors Sub-domain Score", "% Asian", "% Black", "% Mixed", "% White", "% Other", "Estimated monthly rent", "Criminal damage and arson (avg/yr)", "Violence and sexual offences (avg/yr)", "Drugs (avg/yr)", "Anti-social behaviour (avg/yr)", "Public order (avg/yr)", "Other crime (avg/yr)", "Burglary (avg/yr)", "Vehicle crime (avg/yr)", "Theft from the person (avg/yr)", "Possession of weapons (avg/yr)", "Other theft (avg/yr)", "Shoplifting (avg/yr)", "Bicycle theft (avg/yr)", "Robbery (avg/yr)", "Serious crime (avg/yr)", "Minor crime (avg/yr)", "Number of restaurants within 2km", "Number of grocery shops and supermarkets within 2km", "Number of parks within 2km", "Number of public transport stations within 2km", "Noise (dB)", "Good+ primary schools within 5km", "Good+ secondary schools within 5km", "Max available download speed (Mbps)", "Collapsible deposits risk", "Compressible ground risk", "Landslide risk", "Running sand risk", "Shrink-swell risk", "Soluble rocks risk", "Environmental risk", ] def _normalize(s: str) -> str: return _WHITESPACE_RE.sub(" ", _NORMALIZE_RE.sub(" ", s.upper())).strip() def _score_bucket( args: tuple[list[tuple[int, str]], list[tuple[int, str]]], ) -> list[tuple[int, int, int]]: """Score all address pairs within a single postcode bucket.""" wide_entries, online_entries = args pairs = [] for wide_idx, wide_address in wide_entries: for online_idx, online_address in online_entries: if not _numbers_compatible(wide_address, online_address): continue score = fuzz.token_sort_ratio(wide_address, online_address) pairs.append((score, online_idx, wide_idx)) return pairs def _load_online(buy_path: Path, rent_path: Path) -> pl.DataFrame: """Load buy + rent parquets, tag with channel, normalize rent to monthly.""" buy = pl.scan_parquet(buy_path).with_columns( pl.lit("For sale").alias("_channel"), ) rent = pl.scan_parquet(rent_path).with_columns( pl.lit("For rent").alias("_channel"), ) online = pl.concat([buy, rent]).collect() # Normalize rent prices to monthly freq = online["price_frequency"] price = online["price"].cast(pl.Float64) monthly_price = ( pl.when(freq == "weekly") .then(price * 52.0 / 12.0) .when(freq == "yearly") .then(price / 12.0) .when(freq == "daily") .then(price * 365.25 / 12.0) .when(freq == "quarterly") .then(price / 3.0) .otherwise(price) # monthly, not specified .round(0) .cast(pl.Int64) ) online = online.with_columns( pl.when(pl.col("_channel") == "For sale") .then(pl.col("price")) .otherwise(None) .alias("Asking price"), pl.when(pl.col("_channel") == "For rent") .then(monthly_price) .otherwise(None) .alias("Asking rent (monthly)"), ) return online def _match_online_to_wide( wide: pl.DataFrame, online: pl.DataFrame, ) -> dict[int, int]: """Match online listings to wide rows by postcode + fuzzy address. Returns dict mapping online row index → wide row index. """ # Build postcode → [(row_idx, normalized_address)] for wide wide_postcodes = wide["Postcode"] wide_addresses = wide["Address per Property Register"] wide_by_postcode: dict[str, list[tuple[int, str]]] = {} for i in range(wide.height): pc = wide_postcodes[i] addr = wide_addresses[i] if pc is not None and addr is not None: pc_upper = pc.strip().upper() wide_by_postcode.setdefault(pc_upper, []).append((i, _normalize(addr))) # Build postcode → [(row_idx, normalized_address)] for online online_postcodes = online["postcode"] online_addresses = online["address"] online_by_postcode: dict[str, list[tuple[int, str]]] = {} for i in range(online.height): pc = online_postcodes[i] addr = online_addresses[i] if pc is not None and addr is not None: pc_upper = pc.strip().upper() online_by_postcode.setdefault(pc_upper, []).append((i, _normalize(addr))) # Build tasks: only postcodes present in both tasks = [ (wide_by_postcode[pc], online_entries) for pc, online_entries in online_by_postcode.items() if pc in wide_by_postcode ] # Score in parallel all_pairs: list[tuple[int, int, int]] = [] with ProcessPoolExecutor(max_workers=cpu_count()) as executor: for pairs in tqdm( executor.map(_score_bucket, tasks, chunksize=64), total=len(tasks), desc="Matching online listings", ): all_pairs.extend(pairs) del tasks, wide_by_postcode, online_by_postcode # Greedy assignment: best score first, one-to-one all_pairs.sort(key=lambda t: (t[0], -t[1]), reverse=True) matches: dict[int, int] = {} # online_idx → wide_idx matched_wide: set[int] = set() for _score, online_idx, wide_idx in all_pairs: if online_idx in matches or wide_idx in matched_wide: continue matches[online_idx] = wide_idx matched_wide.add(wide_idx) return matches def _build_postcode_area_lookup(wide: pl.DataFrame) -> dict[str, int]: """Build postcode → first row index for area data fallback.""" postcodes = wide["Postcode"] lookup: dict[str, int] = {} for i in range(wide.height): pc = postcodes[i] if pc is not None: pc_upper = pc.strip().upper() if pc_upper not in lookup: lookup[pc_upper] = i return lookup def _build_online_rows( wide: pl.DataFrame, online: pl.DataFrame, matches: dict[int, int], postcode_lookup: dict[str, int], ) -> pl.DataFrame: """Build a DataFrame of online listing rows with all wide.parquet columns.""" wide_schema = wide.schema n = online.height # Initialize all columns as null lists columns: dict[str, list] = {col: [None] * n for col in wide_schema} # Add new columns columns["Listing status"] = [None] * n columns["Asking price"] = [None] * n columns["Asking rent (monthly)"] = [None] * n columns["Bedrooms"] = [None] * n columns["Bathrooms"] = [None] * n columns["Listing date"] = [None] * n columns["Property sub-type"] = [None] * n columns["Listing URL"] = [None] * n columns["Price qualifier"] = [None] * n for i in range(n): # Direct mappings from online listing columns["Address per Property Register"][i] = online["address"][i] columns["Postcode"][i] = online["postcode"][i] columns["lat"][i] = online["latitude"][i] columns["lon"][i] = online["longitude"][i] columns["Property type"][i] = online["property_type"][i] columns["Leashold/Freehold"][i] = online["tenure"][i] columns["Total floor area (sqm)"][i] = online["floorspace_sqm"][i] # New columns columns["Listing status"][i] = online["_channel"][i] columns["Asking price"][i] = online["Asking price"][i] columns["Asking rent (monthly)"][i] = online["Asking rent (monthly)"][i] columns["Bedrooms"][i] = online["bedrooms"][i] columns["Bathrooms"][i] = online["bathrooms"][i] columns["Property sub-type"][i] = online["property_sub_type"][i] columns["Listing URL"][i] = online["url"][i] columns["Price qualifier"][i] = online["price_qualifier"][i] # Parse listing date fvd = online["first_visible_date"][i] if fvd is not None: try: from datetime import datetime dt = datetime.fromisoformat(fvd.replace("Z", "+00:00")) columns["Listing date"][i] = dt.replace(tzinfo=None) except (ValueError, TypeError): pass # Determine source row for carried data matched_wide_idx = matches.get(i) postcode = online["postcode"][i] pc_upper = postcode.strip().upper() if postcode else None area_source_idx = matched_wide_idx if area_source_idx is None and pc_upper is not None: area_source_idx = postcode_lookup.get(pc_upper) # Copy property-specific columns from matched row only if matched_wide_idx is not None: for col in _PROPERTY_COLUMNS: if col in wide_schema: columns[col][i] = wide[col][matched_wide_idx] # Copy area columns from matched row or same-postcode fallback if area_source_idx is not None: for col in _AREA_COLUMNS: if col in wide_schema: columns[col][i] = wide[col][area_source_idx] # Build DataFrame with correct types series_list = [] for col_name, dtype in wide_schema.items(): series_list.append(pl.Series(col_name, columns[col_name], dtype=dtype)) # New columns with their types series_list.append( pl.Series("Listing status", columns["Listing status"], dtype=pl.String) ) series_list.append( pl.Series("Asking price", columns["Asking price"], dtype=pl.Int64) ) series_list.append( pl.Series( "Asking rent (monthly)", columns["Asking rent (monthly)"], dtype=pl.Int64 ) ) series_list.append(pl.Series("Bedrooms", columns["Bedrooms"], dtype=pl.Int32)) series_list.append(pl.Series("Bathrooms", columns["Bathrooms"], dtype=pl.Int32)) series_list.append( pl.Series("Listing date", columns["Listing date"], dtype=pl.Datetime("us")) ) series_list.append( pl.Series("Property sub-type", columns["Property sub-type"], dtype=pl.String) ) series_list.append( pl.Series("Listing URL", columns["Listing URL"], dtype=pl.String) ) series_list.append( pl.Series("Price qualifier", columns["Price qualifier"], dtype=pl.String) ) return pl.DataFrame(series_list) def main(): parser = argparse.ArgumentParser( description="Add online buy/rent listings to wide.parquet" ) parser.add_argument( "--input", type=Path, required=True, help="wide.parquet path (modified in-place)", ) parser.add_argument( "--buy", type=Path, required=True, help="rightmove_buy.parquet path" ) parser.add_argument( "--rent", type=Path, required=True, help="rightmove_rent.parquet path" ) args = parser.parse_args() print("Loading wide.parquet...") wide = pl.read_parquet(args.input) print(f" {wide.height} rows, {wide.width} columns") print("Loading online listings...") online = _load_online(args.buy, args.rent) print( f" {online.height} online listings ({online.filter(pl.col('_channel') == 'For sale').height} buy, {online.filter(pl.col('_channel') == 'For rent').height} rent)" ) print("Matching online listings to historical rows...") matches = _match_online_to_wide(wide, online) print(f" {len(matches)} online listings matched to historical rows") print("Building postcode area lookup...") postcode_lookup = _build_postcode_area_lookup(wide) print("Building online listing rows...") online_rows = _build_online_rows(wide, online, matches, postcode_lookup) print(f" {online_rows.height} online rows built") # Add Listing status + new columns to existing wide rows wide = wide.with_columns( pl.lit("Historical sale").alias("Listing status"), pl.lit(None, dtype=pl.Int64).alias("Asking price"), pl.lit(None, dtype=pl.Int64).alias("Asking rent (monthly)"), pl.lit(None, dtype=pl.Int32).alias("Bedrooms"), pl.lit(None, dtype=pl.Int32).alias("Bathrooms"), pl.lit(None, dtype=pl.Datetime("us")).alias("Listing date"), pl.lit(None, dtype=pl.String).alias("Property sub-type"), pl.lit(None, dtype=pl.String).alias("Listing URL"), pl.lit(None, dtype=pl.String).alias("Price qualifier"), ) # Concat result = pl.concat([wide, online_rows], how="diagonal_relaxed") print(f"Final: {result.height} rows, {result.width} columns") # Verify status_counts = ( result["Listing status"].value_counts().sort("count", descending=True) ) print(f"Listing status distribution:\n{status_counts}") result.write_parquet(args.input) size_mb = args.input.stat().st_size / (1024 * 1024) print(f"Wrote {args.input} ({size_mb:.1f} MB)") if __name__ == "__main__": main()