import logging import os import re import signal import time from contextlib import contextmanager from pathlib import Path from typing import Iterable import polars as pl from constants import ( ARCGIS_PATH, CHANNELS, DATA_DIR, DELAY_BETWEEN_OUTCODES, LONDON_OUTCODE_PREFIXES, ) from http_client import make_client from onthemarket import search_outcode as onthemarket_search_outcode from rightmove import resolve_outcode_id from rightmove import search_outcode as rightmove_search_outcode from spatial import PostcodeSpatialIndex from storage import write_parquet from zoopla import TurnstileError from zoopla import launch_browser as launch_zoopla_browser from zoopla import search_outcode as zoopla_search_outcode log = logging.getLogger("rightmove") SOURCE_ORDER = ("rightmove", "onthemarket", "zoopla") SALE_CHANNEL = CHANNELS[0] LONDON_AREAS = sorted({prefix.upper() for prefix in LONDON_OUTCODE_PREFIXES}) OUTCODE_RE = re.compile(r"^([A-Z]{1,2}\d[A-Z0-9]?)") def _arcgis_columns() -> tuple[str, str]: """Return postcode and country column names for supported ARCGIS schemas.""" columns = set(pl.scan_parquet(ARCGIS_PATH).collect_schema().names()) if "pcd" in columns: postcode_col = "pcd" elif "pcds" in columns: postcode_col = "pcds" else: raise ValueError(f"{ARCGIS_PATH} has no supported postcode column") if "ctry" in columns: country_col = "ctry" elif "ctry25cd" in columns: country_col = "ctry25cd" else: raise ValueError(f"{ARCGIS_PATH} has no supported country column") return postcode_col, country_col def _normalize_postcode(postcode: str) -> str: compact = "".join(str(postcode).upper().split()) if len(compact) < 5: return compact return compact[:-3] + " " + compact[-3:] def _londonish_postcode_expr(postcode_col: str) -> pl.Expr: return ( pl.col(postcode_col) .str.to_uppercase() .str.extract(r"^([A-Z]{1,2})", 1) .is_in(LONDON_AREAS) ) def _outcode_area(outcode: str) -> str: chars = [] for ch in outcode.upper(): if not ch.isalpha(): break chars.append(ch) return "".join(chars) def is_londonish_outcode(outcode: str) -> bool: normalized = outcode.upper() return normalized in LONDON_AREAS or _outcode_area(normalized) in LONDON_AREAS def _property_is_londonish(prop: dict) -> bool: postcode = str(prop.get("Postcode") or "").upper().strip() match = OUTCODE_RE.match(postcode) return bool(match and is_londonish_outcode(match.group(1))) def filter_londonish_outcodes(outcodes: Iterable[str]) -> list[str]: return sorted( {outcode.upper() for outcode in outcodes if is_londonish_outcode(outcode)} ) def load_outcodes() -> list[str]: """Load England outcodes from ARCGIS and keep only Greater London-ish areas.""" log.info("Loading outcodes from %s", ARCGIS_PATH) postcode_col, country_col = _arcgis_columns() df = pl.read_parquet(ARCGIS_PATH, columns=[postcode_col, country_col]) england = df.filter( (pl.col(country_col) == "E92000001") & _londonish_postcode_expr(postcode_col) ) outcodes = ( england.select( pl.col(postcode_col) .str.extract(r"^([A-Z]{1,2}\d[A-Z0-9]?)", 1) .alias("outcode") ) .drop_nulls() .get_column("outcode") .unique() .sort() .to_list() ) londonish = filter_londonish_outcodes(outcodes) log.info("Greater London-ish outcodes: %d", len(londonish)) return londonish def build_postcode_coords() -> dict[str, tuple[float, float]]: """Build postcode -> (lat, lng) lookup from ARCGIS England postcodes.""" log.info("Building postcode coords lookup from %s", ARCGIS_PATH) postcode_col, country_col = _arcgis_columns() df = pl.read_parquet( ARCGIS_PATH, columns=[postcode_col, country_col, "lat", "long"] ) england = df.filter( (pl.col(country_col) == "E92000001") & _londonish_postcode_expr(postcode_col) ).drop_nulls( subset=["lat", "long"] ) coords: dict[str, tuple[float, float]] = {} for pcd, lat, lng in zip( england.get_column(postcode_col).to_list(), england.get_column("lat").to_list(), england.get_column("long").to_list(), ): coords[_normalize_postcode(pcd)] = (lat, lng) log.info("Postcode coords lookup: %d postcodes", len(coords)) return coords def build_postcode_index() -> PostcodeSpatialIndex: """Build spatial index from ARCGIS England postcodes.""" log.info("Building postcode spatial index from %s", ARCGIS_PATH) postcode_col, country_col = _arcgis_columns() df = pl.read_parquet( ARCGIS_PATH, columns=[postcode_col, country_col, "lat", "long"] ) england = df.filter( (pl.col(country_col) == "E92000001") & _londonish_postcode_expr(postcode_col) ).drop_nulls( subset=["lat", "long"] ) return PostcodeSpatialIndex( england.get_column("lat").to_list(), england.get_column("long").to_list(), [ _normalize_postcode(pcd) for pcd in england.get_column(postcode_col).to_list() ], ) def _source_names(sources: str | Iterable[str] | None) -> list[str]: if sources is None: return list(SOURCE_ORDER) if isinstance(sources, str): requested = [part.strip().lower() for part in sources.split(",")] else: requested = [str(source).strip().lower() for source in sources] requested = [source for source in requested if source] unknown = sorted(set(requested) - set(SOURCE_ORDER) - {"all"}) if unknown: raise ValueError(f"Unknown source(s): {', '.join(unknown)}") if "all" in requested: return list(SOURCE_ORDER) return [source for source in SOURCE_ORDER if source in requested] def _dedup_key(prop: dict) -> tuple | None: postcode = str(prop.get("Postcode") or "").strip().upper() price = int(prop.get("price") or 0) if not postcode or price <= 0: return None return (postcode, int(prop.get("Bedrooms") or 0), price) def _merge_properties(source_results: dict[str, list[dict]]) -> tuple[list[dict], dict, int]: merged: dict[str, dict] = {} seen_keys: dict[tuple, str] = {} seen_ids: set[str] = set() counts = {source: 0 for source in SOURCE_ORDER} deduped = 0 for source in SOURCE_ORDER: for prop in source_results.get(source, []): key = _dedup_key(prop) prop_id_raw = prop.get("id") prop_id = str(prop_id_raw).strip() if prop_id_raw is not None else None if prop_id: if prop_id in seen_ids: deduped += 1 continue if key is not None: previous_source = seen_keys.get(key) if previous_source is not None and previous_source != source: deduped += 1 continue seen_ids.add(prop_id) storage_key = prop_id else: if key is not None and key in seen_keys: deduped += 1 continue storage_key = f"{source}:{len(merged)}" if key is not None: seen_keys.setdefault(key, source) merged[storage_key] = prop counts[source] += 1 return list(merged.values()), counts, deduped def _source_total( results: dict[str, list[dict]], source: str, ) -> int: return len(results[source]) def _source_remaining( results: dict[str, list[dict]], source: str, max_properties_per_source: int | None, ) -> int | None: if max_properties_per_source is None: return None return max(max_properties_per_source - _source_total(results, source), 0) def _store_properties( results: dict[str, list[dict]], source: str, props: list[dict], max_properties_per_source: int | None, ) -> int: remaining = _source_remaining(results, source, max_properties_per_source) if remaining == 0: return 0 londonish = [prop for prop in props if _property_is_londonish(prop)] dropped_outside_area = len(props) - len(londonish) if dropped_outside_area: log.debug( "%s dropped %d properties outside the Greater London-ish postcode filter", source, dropped_outside_area, ) selected = londonish if remaining is None else londonish[:remaining] results[source].extend(selected) return len(selected) def _exception_detail(exc: BaseException) -> str: detail = " ".join(str(exc).split()) if not detail: detail = repr(exc) if len(detail) > 300: detail = f"{detail[:300]}..." return f"{type(exc).__name__}: {detail}" def _record_error( errors: list[str], source: str, outcode: str, exc: BaseException ) -> None: detail = _exception_detail(exc) message = f"{source} {outcode}: {detail}" errors.append(message) log.warning(message) def _scrape_rightmove( outcodes: list[str], pc_index: PostcodeSpatialIndex, results: dict[str, list[dict]], errors: list[str], max_properties_per_source: int | None, ) -> None: client = make_client() try: for outcode in outcodes: if _source_remaining(results, "rightmove", max_properties_per_source) == 0: log.info("Rightmove cap reached") return try: outcode_id = resolve_outcode_id(client, outcode) except Exception as exc: _record_error(errors, "rightmove", outcode, exc) time.sleep(DELAY_BETWEEN_OUTCODES) continue if not outcode_id: log.debug("No Rightmove outcode ID for %s", outcode) time.sleep(DELAY_BETWEEN_OUTCODES) continue remaining = _source_remaining( results, "rightmove", max_properties_per_source ) if remaining == 0: log.info("Rightmove cap reached") return try: props = rightmove_search_outcode( client, outcode_id, outcode, SALE_CHANNEL, pc_index, max_properties=remaining, ) added = _store_properties( results, "rightmove", props, max_properties_per_source, ) log.info("Rightmove %s: +%d", outcode, added) except Exception as exc: _record_error(errors, "rightmove", outcode, exc) time.sleep(DELAY_BETWEEN_OUTCODES) finally: client.close() class OutcodeTimeout(BaseException): """Raised when a single outcode exceeds the wall-clock budget. Inherits BaseException (not Exception) so the SIGALRM-triggered raise can't be silently swallowed by any of the broad `except Exception:` handlers inside zoopla.py — the signal may fire at any bytecode boundary, including inside those handlers.""" def _zoopla_outcode_timeout_seconds() -> int: raw = os.environ.get("ZOOPLA_OUTCODE_TIMEOUT_SECONDS") if raw is None: return 300 try: timeout = int(raw) except ValueError as exc: raise ValueError("ZOOPLA_OUTCODE_TIMEOUT_SECONDS must be an integer") from exc if timeout < 1: raise ValueError("ZOOPLA_OUTCODE_TIMEOUT_SECONDS must be greater than zero") return timeout @contextmanager def _wall_clock_timeout(seconds: int, label: str): """SIGALRM-based wall-clock guard (POSIX). Raises OutcodeTimeout on expiry. Interrupts a hung Playwright IPC by delivering SIGALRM to the main thread; socket waits return EINTR and the handler raises into the caller. The browser is presumed unhealthy afterwards — caller must relaunch it.""" if seconds <= 0: yield return def _handler(signum, frame): raise OutcodeTimeout(f"{label} exceeded {seconds}s budget") old_handler = signal.signal(signal.SIGALRM, _handler) signal.alarm(seconds) try: yield finally: signal.alarm(0) signal.signal(signal.SIGALRM, old_handler) def _launch_zoopla_with_retries(attempts: int = 3): last_error: Exception | None = None for attempt in range(1, attempts + 1): try: return launch_zoopla_browser() except TurnstileError: raise except Exception as exc: last_error = exc log.warning( "Zoopla browser launch failed (%d/%d): %s", attempt, attempts, exc, ) time.sleep(5) assert last_error is not None raise last_error def _close_zoopla_browser(browser, label: str) -> None: try: with _wall_clock_timeout(15, f"{label} browser close"): browser.close() return except (OutcodeTimeout, Exception) as exc: log.warning( "%s browser close failed: %s; force-closing", label, _exception_detail(exc), ) force_close = getattr(browser, "force_close", None) if not callable(force_close): log.warning("%s browser has no force-close hook", label) return try: force_close() except Exception as exc: log.warning("%s browser force-close failed: %s", label, _exception_detail(exc)) def _scrape_zoopla( outcodes: list[str], pc_index: PostcodeSpatialIndex, pc_coords: dict[str, tuple[float, float]], results: dict[str, list[dict]], errors: list[str], max_properties_per_source: int | None, ) -> None: try: browser, page = _launch_zoopla_with_retries() except Exception as exc: errors.append(f"zoopla: browser launch failed: {exc}") log.warning("Zoopla skipped: browser launch failed: %s", exc) return outcode_timeout = _zoopla_outcode_timeout_seconds() try: for outcode in outcodes: if _source_remaining(results, "zoopla", max_properties_per_source) == 0: log.info("Zoopla cap reached") return for attempt in range(2): try: with _wall_clock_timeout(outcode_timeout, f"zoopla {outcode}"): props, _ = zoopla_search_outcode( page, outcode, pc_index, pc_coords, max_properties=None, ) added = _store_properties( results, "zoopla", props, max_properties_per_source, ) log.info("Zoopla %s: +%d", outcode, added) break except (OutcodeTimeout, Exception) as exc: if attempt == 1: _record_error(errors, "zoopla", outcode, exc) if isinstance(exc, TurnstileError): return break log.warning( "Zoopla %s attempt %d/2 failed: %s; relaunching browser " "and retrying", outcode, attempt + 1, _exception_detail(exc), ) _close_zoopla_browser(browser, f"zoopla {outcode}") try: browser, page = _launch_zoopla_with_retries() log.info("Zoopla %s retrying with fresh browser", outcode) except Exception as relaunch_exc: _record_error(errors, "zoopla", outcode, relaunch_exc) return time.sleep(DELAY_BETWEEN_OUTCODES) finally: _close_zoopla_browser(browser, "zoopla final") def _scrape_onthemarket( outcodes: list[str], pc_index: PostcodeSpatialIndex, results: dict[str, list[dict]], errors: list[str], max_properties_per_source: int | None, ) -> None: client = make_client() try: for outcode in outcodes: if ( _source_remaining(results, "onthemarket", max_properties_per_source) == 0 ): log.info("OnTheMarket cap reached") return remaining = _source_remaining( results, "onthemarket", max_properties_per_source ) try: props = onthemarket_search_outcode( client, outcode, pc_index, max_properties=remaining, ) added = _store_properties( results, "onthemarket", props, max_properties_per_source, ) log.info("OnTheMarket %s: +%d", outcode, added) except Exception as exc: _record_error(errors, "onthemarket", outcode, exc) time.sleep(DELAY_BETWEEN_OUTCODES) finally: client.close() def run_scrape( outcodes: list[str], pc_index: PostcodeSpatialIndex, pc_coords: dict[str, tuple[float, float]] | None = None, sources: str | Iterable[str] | None = None, output_dir: str | Path | None = None, max_properties_per_source: int | None = None, ) -> dict: """Run one manual sale-listings scrape and write a parquet output.""" selected_sources = _source_names(sources) selected_outcodes = filter_londonish_outcodes(outcodes) if not selected_sources: raise ValueError("No sources selected") if not selected_outcodes: raise ValueError("No Greater London-ish outcodes selected") output_base = Path(output_dir) if output_dir is not None else DATA_DIR output_base.mkdir(parents=True, exist_ok=True) errors: list[str] = [] results = {source: [] for source in SOURCE_ORDER} started_at = time.time() log.info( "Starting manual sale scrape: %d outcodes, sources=%s, source_cap=%s", len(selected_outcodes), ",".join(selected_sources), max_properties_per_source, ) if "rightmove" in selected_sources: _scrape_rightmove( selected_outcodes, pc_index, results, errors, max_properties_per_source, ) if "onthemarket" in selected_sources: _scrape_onthemarket( selected_outcodes, pc_index, results, errors, max_properties_per_source, ) if "zoopla" in selected_sources: if pc_coords is None: pc_coords = build_postcode_coords() _scrape_zoopla( selected_outcodes, pc_index, pc_coords, results, errors, max_properties_per_source, ) merged, source_counts, deduped = _merge_properties(results) output_path = output_base / "online_listings_buy.parquet" if merged: write_parquet(merged, output_path) else: if output_path.exists(): output_path.unlink() log.warning("No London-ish properties to write to %s", output_path) counts = { "total": len(merged), "deduped": deduped, "sources": source_counts, } source_summary = " ".join( f"{source}:{source_counts[source]}" for source in SOURCE_ORDER ) log.info( "Sale scrape complete: %d unique (%s deduped:%d)", len(merged), source_summary, deduped, ) return { "outcodes": len(selected_outcodes), "sources": selected_sources, "source_totals": { source: _source_total(results, source) for source in selected_sources }, "counts": counts, "path": str(output_path), "errors": errors, "elapsed_seconds": round(time.time() - started_at, 3), }