import logging import random import threading import time from dataclasses import dataclass, field import polars as pl from constants import ARCGIS_PATH, CHANNELS, DATA_DIR, DELAY_BETWEEN_OUTCODES, SCRAPE_HOMECOUK, SCRAPE_OPENRENT, SCRAPE_RIGHTMOVE, SEED from homecouk import CookiesExpiredError from homecouk import load_cookies as load_homecouk_cookies from homecouk import make_client as make_homecouk_client from homecouk import search_outcode as homecouk_search_outcode from http_client import make_client from metrics import ( cookie_refreshes_total, cross_source_dedup_total, homecouk_enabled, openrent_enabled, scrape_elapsed_seconds, scrape_errors_total, scrape_outcodes_done, scrape_outcodes_total, scrape_properties_total, scrape_state, ) from openrent import WafChallengeError from openrent import load_cookies as load_openrent_cookies from openrent import make_client as make_openrent_client from openrent import search_outcode as openrent_search_outcode from rightmove import resolve_outcode_id, search_outcode from spatial import PostcodeSpatialIndex from storage import write_parquet log = logging.getLogger("rightmove") @dataclass class ScrapeStatus: state: str = "idle" # idle | running | done | error channel: str = "" outcode: str = "" outcodes_done: int = 0 outcodes_total: int = 0 properties_buy: int = 0 properties_rent: int = 0 # Per-source counts for current channel rm_properties: int = 0 hk_properties: int = 0 or_properties: int = 0 errors: list[str] = field(default_factory=list) started_at: float = 0.0 finished_at: float = 0.0 status = ScrapeStatus() status_lock = threading.Lock() def _sync_gauges() -> None: """Push current ScrapeStatus values into Prometheus gauges. Must hold status_lock.""" for state in ("idle", "running", "done", "error"): scrape_state.labels(state=state).set(1 if status.state == state else 0) scrape_outcodes_done.set(status.outcodes_done) scrape_outcodes_total.set(status.outcodes_total) # Total properties (both sources combined) scrape_properties_total.labels(channel="buy", source="total").set(status.properties_buy) scrape_properties_total.labels(channel="rent", source="total").set(status.properties_rent) # Per-source breakdown for current channel ch = "buy" if status.channel == "BUY" else "rent" scrape_properties_total.labels(channel=ch, source="rightmove").set(status.rm_properties) scrape_properties_total.labels(channel=ch, source="homecouk").set(status.hk_properties) scrape_properties_total.labels(channel=ch, source="openrent").set(status.or_properties) if status.started_at: end = status.finished_at if status.finished_at else time.time() scrape_elapsed_seconds.set(end - status.started_at) else: scrape_elapsed_seconds.set(0) def load_outcodes() -> list[str]: """Load England-only outcodes from arcgis parquet.""" log.info("Loading outcodes from %s", ARCGIS_PATH) df = pl.read_parquet(ARCGIS_PATH, columns=["pcd", "ctry", "lat", "long"]) england = df.filter(pl.col("ctry") == "E92000001") log.info("England postcodes: %d", len(england)) outcodes = ( england.select(pl.col("pcd").str.extract(r"^([A-Z]{1,2}\d[A-Z0-9]?)", 1).alias("outcode")) .drop_nulls() .get_column("outcode") .unique() .sort() .to_list() ) log.info("Unique England outcodes: %d", len(outcodes)) return outcodes def build_postcode_index() -> PostcodeSpatialIndex: """Build spatial index from arcgis England postcodes.""" log.info("Building postcode spatial index from %s", ARCGIS_PATH) df = pl.read_parquet(ARCGIS_PATH, columns=["pcd", "ctry", "lat", "long"]) england = df.filter(pl.col("ctry") == "E92000001").drop_nulls(subset=["lat", "long"]) return PostcodeSpatialIndex( england.get_column("lat").to_list(), england.get_column("long").to_list(), england.get_column("pcd").to_list(), ) def build_postcode_coords() -> dict[str, tuple[float, float]]: """Build postcode → (lat, lng) lookup from arcgis England postcodes. Used by OpenRent scraper to resolve coordinates from postcodes.""" log.info("Building postcode coords lookup from %s", ARCGIS_PATH) df = pl.read_parquet(ARCGIS_PATH, columns=["pcd", "ctry", "lat", "long"]) england = df.filter(pl.col("ctry") == "E92000001").drop_nulls(subset=["lat", "long"]) coords: dict[str, tuple[float, float]] = {} for pcd, lat, lng in zip( england.get_column("pcd").to_list(), england.get_column("lat").to_list(), england.get_column("long").to_list(), ): coords[pcd] = (lat, lng) log.info("Postcode coords lookup: %d postcodes", len(coords)) return coords def _dedup_key(p: dict) -> tuple: """Composite key for cross-source deduplication: (postcode, bedrooms, price). Two listings on different portals for the same physical property will share these attributes even though their IDs differ.""" return (p.get("Postcode", ""), p.get("Bedrooms", 0), p.get("price", 0)) def run_scrape( outcodes: list[str], pc_index: PostcodeSpatialIndex, pc_coords: dict[str, tuple[float, float]] | None = None, ) -> None: """Main scrape loop — runs in background thread. Scrapes Rightmove, home.co.uk, and OpenRent, merging into one dataset.""" global status with status_lock: status.state = "running" status.started_at = time.time() status.errors = [] status.properties_buy = 0 status.properties_rent = 0 _sync_gauges() # Shuffle for geographic diversity shuffled = list(outcodes) random.seed(SEED) random.shuffle(shuffled) if not SCRAPE_RIGHTMOVE and not SCRAPE_HOMECOUK and not SCRAPE_OPENRENT: log.warning("All scrapers disabled — nothing to do") with status_lock: status.state = "done" status.finished_at = time.time() _sync_gauges() return client = make_client() if SCRAPE_RIGHTMOVE else None if not SCRAPE_RIGHTMOVE: log.info("Rightmove scraping DISABLED (SCRAPE_RIGHTMOVE=false)") # home.co.uk: must be enabled via SCRAPE_HOMECOUK + cookies available hk_client = None hk_failed = False if not SCRAPE_HOMECOUK: log.info("home.co.uk scraping DISABLED (SCRAPE_HOMECOUK=false)") homecouk_enabled.set(0) else: hk_result = load_homecouk_cookies() hk_client = make_homecouk_client(*hk_result) if hk_result else None if hk_client: log.info("home.co.uk scraping ENABLED") homecouk_enabled.set(1) else: log.info("home.co.uk scraping DISABLED (need FlareSolverr or HOMECOUK_CF_CLEARANCE + HOMECOUK_SESSION)") homecouk_enabled.set(0) # OpenRent: must be enabled via SCRAPE_OPENRENT + cookies available or_client = None or_failed = False if not SCRAPE_OPENRENT: log.info("OpenRent scraping DISABLED (SCRAPE_OPENRENT=false)") openrent_enabled.set(0) else: or_result = load_openrent_cookies() or_client = make_openrent_client(*or_result) if or_result else None if or_client: log.info("OpenRent scraping ENABLED") openrent_enabled.set(1) else: log.info("OpenRent scraping DISABLED (need FlareSolverr or OPENRENT_WAF_TOKEN)") openrent_enabled.set(0) # Build postcode coords if OpenRent is active and caller didn't provide them if or_client and pc_coords is None: pc_coords = build_postcode_coords() try: for channel_cfg in CHANNELS: channel_name = channel_cfg["channel"] file_suffix = "buy" if channel_name == "BUY" else "rent" all_properties: dict[str, dict] = {} # dedup by id seen_dedup_keys: set[tuple] = set() # cross-source dedup by (postcode, beds, price) rm_count = 0 # Rightmove properties this channel hk_count = 0 # home.co.uk properties this channel hk_dedup_count = 0 # home.co.uk skipped as cross-source duplicates or_count = 0 # OpenRent properties this channel or_dedup_count = 0 # OpenRent skipped as cross-source duplicates with status_lock: status.channel = channel_name status.outcodes_done = 0 status.outcodes_total = len(shuffled) status.rm_properties = 0 status.hk_properties = 0 status.or_properties = 0 log.info("=== Starting %s channel (%d outcodes) ===", channel_name, len(shuffled)) for i, outcode in enumerate(shuffled): with status_lock: status.outcode = outcode status.outcodes_done = i log.debug("Outcode %s (%d/%d) — %d properties so far", outcode, i + 1, len(shuffled), len(all_properties)) made_requests = False # --- Rightmove --- if SCRAPE_RIGHTMOVE: made_requests = True try: outcode_id = resolve_outcode_id(client, outcode) if not outcode_id: log.debug("No Rightmove ID for outcode %s, skipping", outcode) else: props = search_outcode(client, outcode_id, outcode, channel_cfg, pc_index) for p in props: pid = p["id"] if pid not in all_properties: all_properties[pid] = p seen_dedup_keys.add(_dedup_key(p)) rm_count += 1 except Exception as e: msg = f"Error scraping Rightmove {outcode}/{channel_name}: {e}" log.error(msg) scrape_errors_total.labels(source="rightmove").inc() with status_lock: status.errors.append(msg) # --- home.co.uk --- if hk_client and not hk_failed: made_requests = True try: hk_props = homecouk_search_outcode( hk_client, outcode, channel_name, pc_index, ) for p in hk_props: pid = p["id"] key = _dedup_key(p) if pid in all_properties or key in seen_dedup_keys: hk_dedup_count += 1 cross_source_dedup_total.labels( channel="buy" if channel_name == "BUY" else "rent", ).inc() continue all_properties[pid] = p seen_dedup_keys.add(key) hk_count += 1 if hk_props: log.info("home.co.uk %s: +%d properties", outcode, len(hk_props)) except CookiesExpiredError: log.warning("home.co.uk cookies expired — attempting refresh via FlareSolverr") hk_client.close() hk_result = load_homecouk_cookies() if hk_result: hk_client = make_homecouk_client(*hk_result) log.info("home.co.uk cookies refreshed, continuing") cookie_refreshes_total.labels(result="success").inc() else: log.warning("Cookie refresh failed, disabling home.co.uk for rest of scrape") hk_client = None hk_failed = True homecouk_enabled.set(0) cookie_refreshes_total.labels(result="failure").inc() with status_lock: status.errors.append("home.co.uk cookies expired and refresh failed") except Exception as e: msg = f"Error scraping home.co.uk {outcode}/{channel_name}: {e}" log.error(msg) scrape_errors_total.labels(source="homecouk").inc() with status_lock: status.errors.append(msg) # --- OpenRent (RENT channel only) --- if or_client and not or_failed and channel_name == "RENT": made_requests = True try: or_props = openrent_search_outcode( or_client, outcode, pc_index, pc_coords, ) for p in or_props: pid = p["id"] key = _dedup_key(p) if pid in all_properties or key in seen_dedup_keys: or_dedup_count += 1 cross_source_dedup_total.labels(channel="rent").inc() continue all_properties[pid] = p seen_dedup_keys.add(key) or_count += 1 if or_props: log.info("OpenRent %s: +%d properties", outcode, len(or_props)) except WafChallengeError: log.warning("OpenRent WAF cookies expired — attempting refresh via FlareSolverr") or_client.close() or_result = load_openrent_cookies() if or_result: or_client = make_openrent_client(*or_result) log.info("OpenRent cookies refreshed, continuing") cookie_refreshes_total.labels(result="success").inc() else: log.warning("Cookie refresh failed, disabling OpenRent for rest of scrape") or_client = None or_failed = True openrent_enabled.set(0) cookie_refreshes_total.labels(result="failure").inc() with status_lock: status.errors.append("OpenRent WAF cookies expired and refresh failed") except Exception as e: msg = f"Error scraping OpenRent {outcode}/{channel_name}: {e}" log.error(msg) scrape_errors_total.labels(source="openrent").inc() with status_lock: status.errors.append(msg) with status_lock: if channel_name == "BUY": status.properties_buy = len(all_properties) else: status.properties_rent = len(all_properties) status.rm_properties = rm_count status.hk_properties = hk_count status.or_properties = or_count _sync_gauges() log.info("Outcode %s: total %d (rm: %d, hk: %d, or: %d)", outcode, len(all_properties), rm_count, hk_count, or_count) if made_requests and i < len(shuffled) - 1: time.sleep(DELAY_BETWEEN_OUTCODES) # Write parquet deduped = list(all_properties.values()) output_path = DATA_DIR / f"online_listings_{file_suffix}.parquet" write_parquet(deduped, output_path, channel=file_suffix) with status_lock: if channel_name == "BUY": status.properties_buy = len(deduped) else: status.properties_rent = len(deduped) status.outcodes_done = len(shuffled) _sync_gauges() log.info( "=== %s channel complete: %d unique (rm: %d, hk: %d, or: %d, cross-dedup: %d) ===", channel_name, len(deduped), rm_count, hk_count, or_count, hk_dedup_count + or_dedup_count, ) with status_lock: status.state = "done" status.finished_at = time.time() _sync_gauges() elapsed = status.finished_at - status.started_at log.info("Scrape complete in %.0fs — buy: %d, rent: %d", elapsed, status.properties_buy, status.properties_rent) except Exception as e: log.exception("Fatal scrape error") with status_lock: status.state = "error" status.errors.append(f"Fatal: {e}") status.finished_at = time.time() _sync_gauges() finally: if client: client.close() if hk_client: hk_client.close() if or_client: or_client.close()