import logging import random import threading import time from dataclasses import dataclass, field import polars as pl import httpx from constants import ( ARCGIS_PATH, CHANNELS, DATA_DIR, DELAY_BETWEEN_OUTCODES, RELOAD_URL, SCRAPE_HOMECOUK, SCRAPE_OPENRENT, SCRAPE_RIGHTMOVE, SCRAPE_ZOOPLA, SEED, ) from homecouk import CookiesExpiredError from homecouk import load_cookies as load_homecouk_cookies from homecouk import make_client as make_homecouk_client from homecouk import search_outcode as homecouk_search_outcode from http_client import make_client from metrics import ( cookie_refreshes_total, cross_source_dedup_total, homecouk_enabled, openrent_enabled, scrape_elapsed_seconds, scrape_errors_total, scrape_outcodes_done, scrape_outcodes_total, scrape_properties_total, scrape_state, zoopla_enabled, ) from openrent import WafChallengeError from openrent import load_cookies as load_openrent_cookies from openrent import make_client as make_openrent_client from openrent import search_outcode as openrent_search_outcode from rightmove import resolve_outcode_id, search_outcode from zoopla import TurnstileError from zoopla import launch_browser as launch_zoopla_browser from zoopla import search_outcode as zoopla_search_outcode from spatial import PostcodeSpatialIndex from storage import write_parquet log = logging.getLogger("rightmove") @dataclass class ScrapeStatus: state: str = "idle" # idle | running | done | error channel: str = "" outcode: str = "" outcodes_done: int = 0 outcodes_total: int = 0 properties_buy: int = 0 properties_rent: int = 0 # Per-source counts (combined across channels) rm_properties: int = 0 hk_properties: int = 0 or_properties: int = 0 zp_properties: int = 0 errors: list[str] = field(default_factory=list) started_at: float = 0.0 finished_at: float = 0.0 status = ScrapeStatus() status_lock = threading.Lock() def _sync_gauges() -> None: """Push current ScrapeStatus values into Prometheus gauges. Must hold status_lock.""" for state in ("idle", "running", "done", "error"): scrape_state.labels(state=state).set(1 if status.state == state else 0) scrape_outcodes_done.set(status.outcodes_done) scrape_outcodes_total.set(status.outcodes_total) scrape_properties_total.labels(channel="buy", source="total").set( status.properties_buy ) scrape_properties_total.labels(channel="rent", source="total").set( status.properties_rent ) # Per-source totals (across both channels) for ch in ("buy", "rent"): scrape_properties_total.labels(channel=ch, source="rightmove").set( status.rm_properties ) scrape_properties_total.labels(channel=ch, source="homecouk").set( status.hk_properties ) scrape_properties_total.labels(channel=ch, source="openrent").set( status.or_properties ) scrape_properties_total.labels(channel=ch, source="zoopla").set( status.zp_properties ) if status.started_at: end = status.finished_at if status.finished_at else time.time() scrape_elapsed_seconds.set(end - status.started_at) else: scrape_elapsed_seconds.set(0) def load_outcodes() -> list[str]: """Load England-only outcodes from arcgis parquet.""" log.info("Loading outcodes from %s", ARCGIS_PATH) df = pl.read_parquet(ARCGIS_PATH, columns=["pcd", "ctry", "lat", "long"]) england = df.filter(pl.col("ctry") == "E92000001") log.info("England postcodes: %d", len(england)) outcodes = ( england.select( pl.col("pcd").str.extract(r"^([A-Z]{1,2}\d[A-Z0-9]?)", 1).alias("outcode") ) .drop_nulls() .get_column("outcode") .unique() .sort() .to_list() ) log.info("Unique England outcodes: %d", len(outcodes)) return outcodes def build_postcode_index() -> PostcodeSpatialIndex: """Build spatial index from arcgis England postcodes.""" log.info("Building postcode spatial index from %s", ARCGIS_PATH) df = pl.read_parquet(ARCGIS_PATH, columns=["pcd", "ctry", "lat", "long"]) england = df.filter(pl.col("ctry") == "E92000001").drop_nulls( subset=["lat", "long"] ) return PostcodeSpatialIndex( england.get_column("lat").to_list(), england.get_column("long").to_list(), england.get_column("pcd").to_list(), ) def build_postcode_coords() -> dict[str, tuple[float, float]]: """Build postcode → (lat, lng) lookup from arcgis England postcodes. Used by OpenRent scraper to resolve coordinates from postcodes.""" log.info("Building postcode coords lookup from %s", ARCGIS_PATH) df = pl.read_parquet(ARCGIS_PATH, columns=["pcd", "ctry", "lat", "long"]) england = df.filter(pl.col("ctry") == "E92000001").drop_nulls( subset=["lat", "long"] ) coords: dict[str, tuple[float, float]] = {} for pcd, lat, lng in zip( england.get_column("pcd").to_list(), england.get_column("lat").to_list(), england.get_column("long").to_list(), ): coords[pcd] = (lat, lng) log.info("Postcode coords lookup: %d postcodes", len(coords)) return coords def _fmt_elapsed(seconds: float) -> str: """Format seconds as e.g. '2h13m' or '5m32s'.""" h, rem = divmod(int(seconds), 3600) m, s = divmod(rem, 60) if h: return f"{h}h{m:02d}m" return f"{m}m{s:02d}s" def _dedup_key(p: dict) -> tuple: """Composite key for cross-source deduplication: (postcode, bedrooms, price). Two listings on different portals for the same physical property will share these attributes even though their IDs differ.""" return (p.get("Postcode", ""), p.get("Bedrooms", 0), p.get("price", 0)) class _Progress: """Thread-safe progress tracker for parallel source workers.""" def __init__(self): self._counts: dict[str, int] = {} self._lock = threading.Lock() def update(self, source: str, done: int) -> None: with self._lock: self._counts[source] = done def snapshot(self) -> dict[str, int]: with self._lock: return dict(self._counts) def _merge_channel( rm_props: list[dict], hk_props: list[dict], or_props: list[dict], zp_props: list[dict], ) -> tuple[dict[str, dict], dict[str, int], int]: """Merge properties from all sources for one channel with cross-source dedup. Rightmove has priority; other sources are checked for duplicates. Returns (all_properties_by_id, per_source_counts, total_dedup_count). """ all_properties: dict[str, dict] = {} seen_keys: set[tuple] = set() counts = {"rm": 0, "hk": 0, "or": 0, "zp": 0} total_dedup = 0 # Rightmove first (priority source) for p in rm_props: pid = p["id"] if pid not in all_properties: all_properties[pid] = p seen_keys.add(_dedup_key(p)) counts["rm"] += 1 # Other sources (check for cross-source duplicates) for source, props in [("hk", hk_props), ("or", or_props), ("zp", zp_props)]: for p in props: pid = p["id"] key = _dedup_key(p) if pid in all_properties or key in seen_keys: total_dedup += 1 continue all_properties[pid] = p seen_keys.add(key) counts[source] += 1 return all_properties, counts, total_dedup def run_scrape( outcodes: list[str], pc_index: PostcodeSpatialIndex, pc_coords: dict[str, tuple[float, float]] | None = None, ) -> None: """Main scrape orchestrator — runs all sources in parallel threads. Each source (Rightmove, home.co.uk, OpenRent, Zoopla) gets its own thread that iterates all outcodes for both BUY and RENT channels. Results are merged with cross-source deduplication after all workers complete. """ global status with status_lock: status.state = "running" status.started_at = time.time() status.finished_at = 0.0 status.errors = [] status.properties_buy = 0 status.properties_rent = 0 status.channel = "" status.outcode = "" _sync_gauges() shuffled = list(outcodes) random.seed(SEED) random.shuffle(shuffled) if not any([SCRAPE_RIGHTMOVE, SCRAPE_HOMECOUK, SCRAPE_OPENRENT, SCRAPE_ZOOPLA]): log.warning("All scrapers disabled — nothing to do") with status_lock: status.state = "done" status.finished_at = time.time() _sync_gauges() return if not SCRAPE_RIGHTMOVE: log.info("Rightmove scraping DISABLED (SCRAPE_RIGHTMOVE=false)") if not SCRAPE_HOMECOUK: log.info("home.co.uk scraping DISABLED (SCRAPE_HOMECOUK=false)") homecouk_enabled.set(0) if not SCRAPE_OPENRENT: log.info("OpenRent scraping DISABLED (SCRAPE_OPENRENT=false)") openrent_enabled.set(0) if not SCRAPE_ZOOPLA: log.info("Zoopla scraping DISABLED (SCRAPE_ZOOPLA=false)") zoopla_enabled.set(0) # Build postcode coords if needed for OpenRent/Zoopla if (SCRAPE_OPENRENT or SCRAPE_ZOOPLA) and pc_coords is None: pc_coords = build_postcode_coords() # Per-source result containers: {channel_name: [properties]} # Each list is only written by its owning source thread. rm_results: dict[str, list] = {"BUY": [], "RENT": []} hk_results: dict[str, list] = {"BUY": [], "RENT": []} or_results: dict[str, list] = {"BUY": [], "RENT": []} zp_results: dict[str, list] = {"BUY": [], "RENT": []} progress = _Progress() # --- Source worker closures --- # Each worker owns its client lifecycle and iterates all outcodes for both # channels. On auth failure, it refreshes cookies and continues. On fatal # failure, it marks itself as done and returns partial results. def rm_worker(): client = make_client() try: for i, outcode in enumerate(shuffled): try: outcode_id = resolve_outcode_id(client, outcode) except Exception as e: log.error("Rightmove %s ID lookup: %s", outcode, e) scrape_errors_total.labels(source="rightmove").inc() progress.update("rm", i + 1) time.sleep(DELAY_BETWEEN_OUTCODES) continue if not outcode_id: log.debug("No Rightmove ID for %s, skipping", outcode) progress.update("rm", i + 1) time.sleep(DELAY_BETWEEN_OUTCODES) continue for ch_cfg in CHANNELS: ch = ch_cfg["channel"] try: props = search_outcode( client, outcode_id, outcode, ch_cfg, pc_index ) rm_results[ch].extend(props) except Exception as e: log.error("Rightmove %s/%s: %s", outcode, ch, e) scrape_errors_total.labels(source="rightmove").inc() progress.update("rm", i + 1) time.sleep(DELAY_BETWEEN_OUTCODES) except Exception as e: log.exception("Fatal Rightmove error: %s", e) with status_lock: status.errors.append(f"Fatal Rightmove: {e}") finally: client.close() def hk_worker(): hk_result = load_homecouk_cookies() if not hk_result: log.info("home.co.uk DISABLED (no cookies available)") homecouk_enabled.set(0) progress.update("hk", len(shuffled)) return client = make_homecouk_client(*hk_result) log.info("home.co.uk scraping ENABLED") homecouk_enabled.set(1) try: for i, outcode in enumerate(shuffled): for ch_cfg in CHANNELS: ch = ch_cfg["channel"] try: props = homecouk_search_outcode( client, outcode, ch, pc_index ) hk_results[ch].extend(props) if props: log.info("home.co.uk %s: +%d properties", outcode, len(props)) except CookiesExpiredError: log.warning( "home.co.uk cookies expired — attempting refresh" ) client.close() hk_new = load_homecouk_cookies() if hk_new: client = make_homecouk_client(*hk_new) log.info("home.co.uk cookies refreshed, continuing") cookie_refreshes_total.labels(result="success").inc() else: log.warning( "Cookie refresh failed, disabling home.co.uk" ) homecouk_enabled.set(0) cookie_refreshes_total.labels(result="failure").inc() with status_lock: status.errors.append( "home.co.uk cookies expired and refresh failed" ) progress.update("hk", len(shuffled)) return except Exception as e: log.error("home.co.uk %s/%s: %s", outcode, ch, e) scrape_errors_total.labels(source="homecouk").inc() progress.update("hk", i + 1) time.sleep(DELAY_BETWEEN_OUTCODES) except Exception as e: log.exception("Fatal home.co.uk error: %s", e) with status_lock: status.errors.append(f"Fatal home.co.uk: {e}") finally: try: client.close() except Exception: pass def or_worker(): or_result = load_openrent_cookies() if not or_result: log.info("OpenRent DISABLED (no cookies available)") openrent_enabled.set(0) progress.update("or", len(shuffled)) return client = make_openrent_client(*or_result) log.info("OpenRent scraping ENABLED") openrent_enabled.set(1) try: for i, outcode in enumerate(shuffled): # OpenRent is RENT-only try: props = openrent_search_outcode( client, outcode, pc_index, pc_coords ) or_results["RENT"].extend(props) if props: log.info("OpenRent %s: +%d properties", outcode, len(props)) except WafChallengeError: log.warning( "OpenRent WAF cookies expired — attempting refresh" ) client.close() or_new = load_openrent_cookies() if or_new: client = make_openrent_client(*or_new) log.info("OpenRent cookies refreshed, continuing") cookie_refreshes_total.labels(result="success").inc() else: log.warning( "Cookie refresh failed, disabling OpenRent" ) openrent_enabled.set(0) cookie_refreshes_total.labels(result="failure").inc() with status_lock: status.errors.append( "OpenRent WAF cookies expired and refresh failed" ) progress.update("or", len(shuffled)) return except Exception as e: log.error("OpenRent %s: %s", outcode, e) scrape_errors_total.labels(source="openrent").inc() progress.update("or", i + 1) time.sleep(DELAY_BETWEEN_OUTCODES) except Exception as e: log.exception("Fatal OpenRent error: %s", e) with status_lock: status.errors.append(f"Fatal OpenRent: {e}") finally: try: client.close() except Exception: pass def zp_worker(): try: browser, page = launch_zoopla_browser() log.info("Zoopla scraping ENABLED (Camoufox browser launched)") zoopla_enabled.set(1) except TurnstileError: log.warning("Zoopla Cloudflare Turnstile failed — disabling Zoopla") zoopla_enabled.set(0) progress.update("zp", len(shuffled)) return except Exception as e: log.warning("Zoopla browser launch failed: %s — disabling Zoopla", e) zoopla_enabled.set(0) progress.update("zp", len(shuffled)) return try: for i, outcode in enumerate(shuffled): for ch_cfg in CHANNELS: ch = ch_cfg["channel"] try: props = zoopla_search_outcode( page, outcode, ch, pc_index, pc_coords ) zp_results[ch].extend(props) if props: log.info("Zoopla %s: +%d properties", outcode, len(props)) except TurnstileError: log.warning( "Zoopla Turnstile challenge — relaunching browser" ) try: browser.close() except Exception: pass try: browser, page = launch_zoopla_browser() log.info("Zoopla browser relaunched, continuing") except Exception: log.warning( "Browser relaunch failed, disabling Zoopla" ) zoopla_enabled.set(0) with status_lock: status.errors.append( "Zoopla Cloudflare challenge failed and relaunch failed" ) progress.update("zp", len(shuffled)) return except Exception as e: log.error("Zoopla %s/%s: %s", outcode, ch, e) scrape_errors_total.labels(source="zoopla").inc() progress.update("zp", i + 1) time.sleep(DELAY_BETWEEN_OUTCODES) except Exception as e: log.exception("Fatal Zoopla error: %s", e) with status_lock: status.errors.append(f"Fatal Zoopla: {e}") finally: try: browser.close() except Exception: pass # --- Launch worker threads --- active_sources: list[str] = [] threads: list[threading.Thread] = [] if SCRAPE_RIGHTMOVE: threads.append(threading.Thread(target=rm_worker, name="scrape-rm", daemon=True)) active_sources.append("rm") if SCRAPE_HOMECOUK: threads.append(threading.Thread(target=hk_worker, name="scrape-hk", daemon=True)) active_sources.append("hk") if SCRAPE_OPENRENT: threads.append(threading.Thread(target=or_worker, name="scrape-or", daemon=True)) active_sources.append("or") if SCRAPE_ZOOPLA: threads.append(threading.Thread(target=zp_worker, name="scrape-zp", daemon=True)) active_sources.append("zp") log.info( "=== Starting scrape: %d outcodes, sources: %s ===", len(shuffled), ", ".join(active_sources), ) for t in threads: t.start() # --- Monitor progress while workers run --- scrape_start = time.time() last_log = 0.0 try: while any(t.is_alive() for t in threads): snap = progress.snapshot() min_done = min( (snap.get(s, 0) for s in active_sources), default=0 ) # Count properties across sources (safe: only one thread writes each list) total_buy = sum( len(r["BUY"]) for r in [rm_results, hk_results, or_results, zp_results] ) total_rent = sum( len(r["RENT"]) for r in [rm_results, hk_results, or_results, zp_results] ) with status_lock: status.outcodes_done = min_done status.outcodes_total = len(shuffled) status.properties_buy = total_buy status.properties_rent = total_rent status.rm_properties = len(rm_results["BUY"]) + len(rm_results["RENT"]) status.hk_properties = len(hk_results["BUY"]) + len(hk_results["RENT"]) status.or_properties = len(or_results["RENT"]) status.zp_properties = len(zp_results["BUY"]) + len(zp_results["RENT"]) _sync_gauges() # Log progress every 30 seconds now = time.time() if now - last_log >= 30: elapsed = now - scrape_start per_source = ", ".join( f"{s}:{snap.get(s, 0)}" for s in active_sources ) log.info( "Progress: %d/%d outcodes (%s), %d buy + %d rent props, %s elapsed", min_done, len(shuffled), per_source, total_buy, total_rent, _fmt_elapsed(elapsed), ) last_log = now time.sleep(5) except Exception as e: log.exception("Monitor loop error: %s", e) for t in threads: t.join() log.info("All source workers completed") # --- Merge results per channel and write parquet --- try: for ch_cfg in CHANNELS: ch = ch_cfg["channel"] file_suffix = "buy" if ch == "BUY" else "rent" merged, counts, total_dedup = _merge_channel( rm_results[ch], hk_results[ch], or_results[ch], zp_results[ch], ) # Update cross-source dedup counter ch_label = "buy" if ch == "BUY" else "rent" if total_dedup: cross_source_dedup_total.labels(channel=ch_label).inc(total_dedup) deduped = list(merged.values()) output_path = DATA_DIR / f"online_listings_{file_suffix}.parquet" write_parquet(deduped, output_path, channel=file_suffix) with status_lock: if ch == "BUY": status.properties_buy = len(deduped) else: status.properties_rent = len(deduped) _sync_gauges() log.info( "=== %s complete: %d unique (rm:%d hk:%d or:%d zp:%d, cross-dedup:%d) ===", ch, len(deduped), counts["rm"], counts["hk"], counts["or"], counts["zp"], total_dedup, ) with status_lock: status.state = "done" status.finished_at = time.time() status.outcodes_done = len(shuffled) _sync_gauges() elapsed = status.finished_at - status.started_at log.info( "Scrape complete in %s — buy: %d, rent: %d", _fmt_elapsed(elapsed), status.properties_buy, status.properties_rent, ) # Trigger server data reload if RELOAD_URL: try: log.info("Triggering server reload at %s", RELOAD_URL) resp = httpx.post(RELOAD_URL, timeout=300) if resp.is_success: body = resp.json() log.info( "Server reload complete: %d rows, %d features, %dms", body.get("rows", 0), body.get("features", 0), body.get("elapsed_ms", 0), ) else: log.warning( "Server reload failed (%d): %s", resp.status_code, resp.text[:200], ) except Exception as e: log.warning("Server reload request failed: %s", e) except Exception as e: log.exception("Fatal scrape error during merge/write") with status_lock: status.state = "error" status.errors.append(f"Fatal: {e}") status.finished_at = time.time() _sync_gauges()