From 852bb3f3a7d8bffb1fa280227cb2272075b4741b Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Fri, 20 Mar 2026 07:52:22 +0000 Subject: [PATCH] Faster scraping --- docker-compose.yml | 14 +- finder/constants.py | 22 +- finder/openrent.py | 19 +- finder/scraper.py | 706 ++++++++++++++++++++++++-------------------- 4 files changed, 437 insertions(+), 324 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index c8126ae..0b0f525 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -146,6 +146,12 @@ services: # networks: # - dev-network # restart: unless-stopped + # healthcheck: + # test: ["CMD", "curl", "-f", "http://localhost:8191/health"] + # interval: 30s + # timeout: 5s + # retries: 3 + # start_period: 30s # finder: # build: @@ -161,8 +167,14 @@ services: # gluetun: # condition: service_healthy # flaresolverr: - # condition: service_started + # condition: service_healthy # restart: unless-stopped + # healthcheck: + # test: ["CMD", "python3", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:1234/health')"] + # interval: 30s + # timeout: 5s + # retries: 3 + # start_period: 60s volumes: diff --git a/finder/constants.py b/finder/constants.py index 3c75f9d..2985486 100644 --- a/finder/constants.py +++ b/finder/constants.py @@ -4,8 +4,8 @@ from pathlib import Path ARCGIS_PATH = os.environ.get("ARCGIS_PATH", "/data/arcgis_data.parquet") DATA_DIR = Path("/app/data") PAGE_SIZE = 24 -DELAY_BETWEEN_PAGES = 1.0 -DELAY_BETWEEN_OUTCODES = 2.0 +DELAY_BETWEEN_PAGES = 0.5 +DELAY_BETWEEN_OUTCODES = 1.0 MAX_RETRIES = 3 RETRY_BASE_DELAY = 2.0 GRID_CELL_SIZE = 0.01 # degrees for postcode spatial index @@ -67,6 +67,7 @@ PROPERTY_TYPE_MAP = { "Apartment": "Flats/Maisonettes", "Penthouse": "Flats/Maisonettes", "Ground Flat": "Flats/Maisonettes", + "Duplex": "Flats/Maisonettes", "Detached Bungalow": "Detached", "Semi-Detached Bungalow": "Semi-Detached", "Town House": "Terraced", @@ -75,9 +76,15 @@ PROPERTY_TYPE_MAP = { "Bungalow": "Other", "Cottage": "Other", "Park Home": "Other", + "Mobile Home": "Other", + "Caravan": "Other", + "Lodge": "Other", "Land": "Other", "Farm / Barn": "Other", + "Farm House": "Other", "House": "Detached", + "House of Multiple Occupation": "Flats/Maisonettes", + "House Share": "Other", "Not Specified": "Other", "Chalet": "Other", "Barn Conversion": "Other", @@ -85,9 +92,20 @@ PROPERTY_TYPE_MAP = { "Character Property": "Other", "Cluster House": "Other", "Retirement Property": "Flats/Maisonettes", + "Parking": "Other", "Plot": "Other", "Garages": "Other", "Mews": "Terraced", + "Property": "Other", + # Lowercase variants (from home.co.uk / Rightmove APIs) + "house": "Detached", + "bungalow": "Other", + "townhouse": "Terraced", + "land": "Other", + "other": "Other", + "not-specified": "Other", + "retirement-property": "Flats/Maisonettes", + "equestrian-facility": "Other", } CHANNELS = [ diff --git a/finder/openrent.py b/finder/openrent.py index f7da645..c96dd44 100644 --- a/finder/openrent.py +++ b/finder/openrent.py @@ -788,7 +788,24 @@ def search_outcode( for search_data in search_results: detail_data = None - if fetch_details and search_data.get("url"): + # Skip detail page if we already have coordinates or a resolvable postcode + has_coords = ( + search_data.get("lat") is not None + and search_data.get("lng") is not None + ) + has_resolvable_pc = ( + search_data.get("postcode") + and pc_coords + and search_data["postcode"] in pc_coords + ) + needs_detail = ( + fetch_details + and search_data.get("url") + and not has_coords + and not has_resolvable_pc + ) + + if needs_detail: detail_html = fetch_page(client, search_data["url"]) if detail_html: detail_data = parse_property_detail(detail_html) diff --git a/finder/scraper.py b/finder/scraper.py index b5e9d3e..88c3dd2 100644 --- a/finder/scraper.py +++ b/finder/scraper.py @@ -61,7 +61,7 @@ class ScrapeStatus: outcodes_total: int = 0 properties_buy: int = 0 properties_rent: int = 0 - # Per-source counts for current channel + # Per-source counts (combined across channels) rm_properties: int = 0 hk_properties: int = 0 or_properties: int = 0 @@ -81,27 +81,26 @@ def _sync_gauges() -> None: scrape_state.labels(state=state).set(1 if status.state == state else 0) scrape_outcodes_done.set(status.outcodes_done) scrape_outcodes_total.set(status.outcodes_total) - # Total properties (both sources combined) scrape_properties_total.labels(channel="buy", source="total").set( status.properties_buy ) scrape_properties_total.labels(channel="rent", source="total").set( status.properties_rent ) - # Per-source breakdown for current channel - ch = "buy" if status.channel == "BUY" else "rent" - scrape_properties_total.labels(channel=ch, source="rightmove").set( - status.rm_properties - ) - scrape_properties_total.labels(channel=ch, source="homecouk").set( - status.hk_properties - ) - scrape_properties_total.labels(channel=ch, source="openrent").set( - status.or_properties - ) - scrape_properties_total.labels(channel=ch, source="zoopla").set( - status.zp_properties - ) + # Per-source totals (across both channels) + for ch in ("buy", "rent"): + scrape_properties_total.labels(channel=ch, source="rightmove").set( + status.rm_properties + ) + scrape_properties_total.labels(channel=ch, source="homecouk").set( + status.hk_properties + ) + scrape_properties_total.labels(channel=ch, source="openrent").set( + status.or_properties + ) + scrape_properties_total.labels(channel=ch, source="zoopla").set( + status.zp_properties + ) if status.started_at: end = status.finished_at if status.finished_at else time.time() scrape_elapsed_seconds.set(end - status.started_at) @@ -179,28 +178,89 @@ def _dedup_key(p: dict) -> tuple: return (p.get("Postcode", ""), p.get("Bedrooms", 0), p.get("price", 0)) +class _Progress: + """Thread-safe progress tracker for parallel source workers.""" + + def __init__(self): + self._counts: dict[str, int] = {} + self._lock = threading.Lock() + + def update(self, source: str, done: int) -> None: + with self._lock: + self._counts[source] = done + + def snapshot(self) -> dict[str, int]: + with self._lock: + return dict(self._counts) + + +def _merge_channel( + rm_props: list[dict], + hk_props: list[dict], + or_props: list[dict], + zp_props: list[dict], +) -> tuple[dict[str, dict], dict[str, int], int]: + """Merge properties from all sources for one channel with cross-source dedup. + + Rightmove has priority; other sources are checked for duplicates. + Returns (all_properties_by_id, per_source_counts, total_dedup_count). + """ + all_properties: dict[str, dict] = {} + seen_keys: set[tuple] = set() + counts = {"rm": 0, "hk": 0, "or": 0, "zp": 0} + total_dedup = 0 + + # Rightmove first (priority source) + for p in rm_props: + pid = p["id"] + if pid not in all_properties: + all_properties[pid] = p + seen_keys.add(_dedup_key(p)) + counts["rm"] += 1 + + # Other sources (check for cross-source duplicates) + for source, props in [("hk", hk_props), ("or", or_props), ("zp", zp_props)]: + for p in props: + pid = p["id"] + key = _dedup_key(p) + if pid in all_properties or key in seen_keys: + total_dedup += 1 + continue + all_properties[pid] = p + seen_keys.add(key) + counts[source] += 1 + + return all_properties, counts, total_dedup + + def run_scrape( outcodes: list[str], pc_index: PostcodeSpatialIndex, pc_coords: dict[str, tuple[float, float]] | None = None, ) -> None: - """Main scrape loop — runs in background thread. - Scrapes Rightmove, home.co.uk, and OpenRent, merging into one dataset.""" + """Main scrape orchestrator — runs all sources in parallel threads. + + Each source (Rightmove, home.co.uk, OpenRent, Zoopla) gets its own thread + that iterates all outcodes for both BUY and RENT channels. Results are + merged with cross-source deduplication after all workers complete. + """ global status with status_lock: status.state = "running" status.started_at = time.time() + status.finished_at = 0.0 status.errors = [] status.properties_buy = 0 status.properties_rent = 0 + status.channel = "" + status.outcode = "" _sync_gauges() - # Shuffle for geographic diversity shuffled = list(outcodes) random.seed(SEED) random.shuffle(shuffled) - if not SCRAPE_RIGHTMOVE and not SCRAPE_HOMECOUK and not SCRAPE_OPENRENT and not SCRAPE_ZOOPLA: + if not any([SCRAPE_RIGHTMOVE, SCRAPE_HOMECOUK, SCRAPE_OPENRENT, SCRAPE_ZOOPLA]): log.warning("All scrapers disabled — nothing to do") with status_lock: status.state = "done" @@ -208,373 +268,387 @@ def run_scrape( _sync_gauges() return - client = make_client() if SCRAPE_RIGHTMOVE else None if not SCRAPE_RIGHTMOVE: log.info("Rightmove scraping DISABLED (SCRAPE_RIGHTMOVE=false)") - - # home.co.uk: must be enabled via SCRAPE_HOMECOUK + cookies available - hk_client = None - hk_failed = False if not SCRAPE_HOMECOUK: log.info("home.co.uk scraping DISABLED (SCRAPE_HOMECOUK=false)") homecouk_enabled.set(0) - else: - hk_result = load_homecouk_cookies() - hk_client = make_homecouk_client(*hk_result) if hk_result else None - if hk_client: - log.info("home.co.uk scraping ENABLED") - homecouk_enabled.set(1) - else: - log.info( - "home.co.uk scraping DISABLED (need FlareSolverr or HOMECOUK_CF_CLEARANCE + HOMECOUK_SESSION)" - ) - homecouk_enabled.set(0) - - # OpenRent: must be enabled via SCRAPE_OPENRENT + cookies available - or_client = None - or_failed = False if not SCRAPE_OPENRENT: log.info("OpenRent scraping DISABLED (SCRAPE_OPENRENT=false)") openrent_enabled.set(0) - else: - or_result = load_openrent_cookies() - or_client = make_openrent_client(*or_result) if or_result else None - if or_client: - log.info("OpenRent scraping ENABLED") - openrent_enabled.set(1) - else: - log.info( - "OpenRent scraping DISABLED (need FlareSolverr or OPENRENT_WAF_TOKEN)" - ) - openrent_enabled.set(0) - - # Zoopla: uses Camoufox browser (no cookies/client pattern) - zp_browser = None - zp_page = None - zp_failed = False if not SCRAPE_ZOOPLA: log.info("Zoopla scraping DISABLED (SCRAPE_ZOOPLA=false)") zoopla_enabled.set(0) - else: - try: - zp_browser, zp_page = launch_zoopla_browser() - log.info("Zoopla scraping ENABLED (Camoufox browser launched)") - zoopla_enabled.set(1) - except TurnstileError: - log.warning("Zoopla Cloudflare Turnstile failed — disabling Zoopla") - zoopla_enabled.set(0) - except Exception as e: - log.warning("Zoopla browser launch failed: %s — disabling Zoopla", e) - zoopla_enabled.set(0) - # Build postcode coords if OpenRent/Zoopla is active and caller didn't provide them - if (or_client or zp_page) and pc_coords is None: + # Build postcode coords if needed for OpenRent/Zoopla + if (SCRAPE_OPENRENT or SCRAPE_ZOOPLA) and pc_coords is None: pc_coords = build_postcode_coords() - try: - for channel_cfg in CHANNELS: - channel_name = channel_cfg["channel"] - file_suffix = "buy" if channel_name == "BUY" else "rent" - all_properties: dict[str, dict] = {} # dedup by id - seen_dedup_keys: set[tuple] = ( - set() - ) # cross-source dedup by (postcode, beds, price) - rm_count = 0 # Rightmove properties this channel - hk_count = 0 # home.co.uk properties this channel - hk_dedup_count = 0 # home.co.uk skipped as cross-source duplicates - or_count = 0 # OpenRent properties this channel - or_dedup_count = 0 # OpenRent skipped as cross-source duplicates - zp_count = 0 # Zoopla properties this channel - zp_dedup_count = 0 # Zoopla skipped as cross-source duplicates + # Per-source result containers: {channel_name: [properties]} + # Each list is only written by its owning source thread. + rm_results: dict[str, list] = {"BUY": [], "RENT": []} + hk_results: dict[str, list] = {"BUY": [], "RENT": []} + or_results: dict[str, list] = {"BUY": [], "RENT": []} + zp_results: dict[str, list] = {"BUY": [], "RENT": []} - with status_lock: - status.channel = channel_name - status.outcodes_done = 0 - status.outcodes_total = len(shuffled) - status.rm_properties = 0 - status.hk_properties = 0 - status.or_properties = 0 - status.zp_properties = 0 + progress = _Progress() - channel_start = time.time() - prev_prop_milestone = 0 # last 10k milestone we logged - log.info( - "=== Starting %s channel (%d outcodes) ===", channel_name, len(shuffled) - ) + # --- Source worker closures --- + # Each worker owns its client lifecycle and iterates all outcodes for both + # channels. On auth failure, it refreshes cookies and continues. On fatal + # failure, it marks itself as done and returns partial results. + def rm_worker(): + client = make_client() + try: for i, outcode in enumerate(shuffled): - with status_lock: - status.outcode = outcode - status.outcodes_done = i + try: + outcode_id = resolve_outcode_id(client, outcode) + except Exception as e: + log.error("Rightmove %s ID lookup: %s", outcode, e) + scrape_errors_total.labels(source="rightmove").inc() + progress.update("rm", i + 1) + time.sleep(DELAY_BETWEEN_OUTCODES) + continue - made_requests = False + if not outcode_id: + log.debug("No Rightmove ID for %s, skipping", outcode) + progress.update("rm", i + 1) + time.sleep(DELAY_BETWEEN_OUTCODES) + continue - # --- Rightmove --- - if SCRAPE_RIGHTMOVE: - made_requests = True + for ch_cfg in CHANNELS: + ch = ch_cfg["channel"] try: - outcode_id = resolve_outcode_id(client, outcode) - if not outcode_id: - log.debug( - "No Rightmove ID for outcode %s, skipping", outcode - ) - else: - props = search_outcode( - client, outcode_id, outcode, channel_cfg, pc_index - ) - for p in props: - pid = p["id"] - if pid not in all_properties: - all_properties[pid] = p - seen_dedup_keys.add(_dedup_key(p)) - rm_count += 1 - except Exception as e: - msg = f"Error scraping Rightmove {outcode}/{channel_name}: {e}" - log.error(msg) - scrape_errors_total.labels(source="rightmove").inc() - with status_lock: - status.errors.append(msg) - - # --- home.co.uk --- - if hk_client and not hk_failed: - made_requests = True - try: - hk_props = homecouk_search_outcode( - hk_client, - outcode, - channel_name, - pc_index, + props = search_outcode( + client, outcode_id, outcode, ch_cfg, pc_index ) - for p in hk_props: - pid = p["id"] - key = _dedup_key(p) - if pid in all_properties or key in seen_dedup_keys: - hk_dedup_count += 1 - cross_source_dedup_total.labels( - channel="buy" if channel_name == "BUY" else "rent", - ).inc() - continue - all_properties[pid] = p - seen_dedup_keys.add(key) - hk_count += 1 - if hk_props: - log.info( - "home.co.uk %s: +%d properties", outcode, len(hk_props) - ) + rm_results[ch].extend(props) + except Exception as e: + log.error("Rightmove %s/%s: %s", outcode, ch, e) + scrape_errors_total.labels(source="rightmove").inc() + + progress.update("rm", i + 1) + time.sleep(DELAY_BETWEEN_OUTCODES) + except Exception as e: + log.exception("Fatal Rightmove error: %s", e) + with status_lock: + status.errors.append(f"Fatal Rightmove: {e}") + finally: + client.close() + + def hk_worker(): + hk_result = load_homecouk_cookies() + if not hk_result: + log.info("home.co.uk DISABLED (no cookies available)") + homecouk_enabled.set(0) + progress.update("hk", len(shuffled)) + return + client = make_homecouk_client(*hk_result) + log.info("home.co.uk scraping ENABLED") + homecouk_enabled.set(1) + try: + for i, outcode in enumerate(shuffled): + for ch_cfg in CHANNELS: + ch = ch_cfg["channel"] + try: + props = homecouk_search_outcode( + client, outcode, ch, pc_index + ) + hk_results[ch].extend(props) + if props: + log.info("home.co.uk %s: +%d properties", outcode, len(props)) except CookiesExpiredError: log.warning( - "home.co.uk cookies expired — attempting refresh via FlareSolverr" + "home.co.uk cookies expired — attempting refresh" ) - hk_client.close() - hk_result = load_homecouk_cookies() - if hk_result: - hk_client = make_homecouk_client(*hk_result) + client.close() + hk_new = load_homecouk_cookies() + if hk_new: + client = make_homecouk_client(*hk_new) log.info("home.co.uk cookies refreshed, continuing") cookie_refreshes_total.labels(result="success").inc() else: log.warning( - "Cookie refresh failed, disabling home.co.uk for rest of scrape" + "Cookie refresh failed, disabling home.co.uk" ) - hk_client = None - hk_failed = True homecouk_enabled.set(0) cookie_refreshes_total.labels(result="failure").inc() with status_lock: status.errors.append( "home.co.uk cookies expired and refresh failed" ) + progress.update("hk", len(shuffled)) + return except Exception as e: - msg = f"Error scraping home.co.uk {outcode}/{channel_name}: {e}" - log.error(msg) + log.error("home.co.uk %s/%s: %s", outcode, ch, e) scrape_errors_total.labels(source="homecouk").inc() - with status_lock: - status.errors.append(msg) - # --- OpenRent (RENT channel only) --- - if or_client and not or_failed and channel_name == "RENT": - made_requests = True - try: - or_props = openrent_search_outcode( - or_client, - outcode, - pc_index, - pc_coords, - ) - for p in or_props: - pid = p["id"] - key = _dedup_key(p) - if pid in all_properties or key in seen_dedup_keys: - or_dedup_count += 1 - cross_source_dedup_total.labels(channel="rent").inc() - continue - all_properties[pid] = p - seen_dedup_keys.add(key) - or_count += 1 - if or_props: - log.info( - "OpenRent %s: +%d properties", outcode, len(or_props) - ) - except WafChallengeError: + progress.update("hk", i + 1) + time.sleep(DELAY_BETWEEN_OUTCODES) + except Exception as e: + log.exception("Fatal home.co.uk error: %s", e) + with status_lock: + status.errors.append(f"Fatal home.co.uk: {e}") + finally: + try: + client.close() + except Exception: + pass + + def or_worker(): + or_result = load_openrent_cookies() + if not or_result: + log.info("OpenRent DISABLED (no cookies available)") + openrent_enabled.set(0) + progress.update("or", len(shuffled)) + return + client = make_openrent_client(*or_result) + log.info("OpenRent scraping ENABLED") + openrent_enabled.set(1) + try: + for i, outcode in enumerate(shuffled): + # OpenRent is RENT-only + try: + props = openrent_search_outcode( + client, outcode, pc_index, pc_coords + ) + or_results["RENT"].extend(props) + if props: + log.info("OpenRent %s: +%d properties", outcode, len(props)) + except WafChallengeError: + log.warning( + "OpenRent WAF cookies expired — attempting refresh" + ) + client.close() + or_new = load_openrent_cookies() + if or_new: + client = make_openrent_client(*or_new) + log.info("OpenRent cookies refreshed, continuing") + cookie_refreshes_total.labels(result="success").inc() + else: log.warning( - "OpenRent WAF cookies expired — attempting refresh via FlareSolverr" + "Cookie refresh failed, disabling OpenRent" ) - or_client.close() - or_result = load_openrent_cookies() - if or_result: - or_client = make_openrent_client(*or_result) - log.info("OpenRent cookies refreshed, continuing") - cookie_refreshes_total.labels(result="success").inc() - else: - log.warning( - "Cookie refresh failed, disabling OpenRent for rest of scrape" - ) - or_client = None - or_failed = True - openrent_enabled.set(0) - cookie_refreshes_total.labels(result="failure").inc() - with status_lock: - status.errors.append( - "OpenRent WAF cookies expired and refresh failed" - ) - except Exception as e: - msg = f"Error scraping OpenRent {outcode}/{channel_name}: {e}" - log.error(msg) - scrape_errors_total.labels(source="openrent").inc() + openrent_enabled.set(0) + cookie_refreshes_total.labels(result="failure").inc() with status_lock: - status.errors.append(msg) - - # --- Zoopla --- - if zp_page and not zp_failed: - made_requests = True - try: - zp_props = zoopla_search_outcode( - zp_page, - outcode, - channel_name, - pc_index, - pc_coords, - ) - for p in zp_props: - pid = p["id"] - key = _dedup_key(p) - if pid in all_properties or key in seen_dedup_keys: - zp_dedup_count += 1 - cross_source_dedup_total.labels( - channel="buy" if channel_name == "BUY" else "rent", - ).inc() - continue - all_properties[pid] = p - seen_dedup_keys.add(key) - zp_count += 1 - if zp_props: - log.info( - "Zoopla %s: +%d properties", outcode, len(zp_props) + status.errors.append( + "OpenRent WAF cookies expired and refresh failed" ) + progress.update("or", len(shuffled)) + return + except Exception as e: + log.error("OpenRent %s: %s", outcode, e) + scrape_errors_total.labels(source="openrent").inc() + + progress.update("or", i + 1) + time.sleep(DELAY_BETWEEN_OUTCODES) + except Exception as e: + log.exception("Fatal OpenRent error: %s", e) + with status_lock: + status.errors.append(f"Fatal OpenRent: {e}") + finally: + try: + client.close() + except Exception: + pass + + def zp_worker(): + try: + browser, page = launch_zoopla_browser() + log.info("Zoopla scraping ENABLED (Camoufox browser launched)") + zoopla_enabled.set(1) + except TurnstileError: + log.warning("Zoopla Cloudflare Turnstile failed — disabling Zoopla") + zoopla_enabled.set(0) + progress.update("zp", len(shuffled)) + return + except Exception as e: + log.warning("Zoopla browser launch failed: %s — disabling Zoopla", e) + zoopla_enabled.set(0) + progress.update("zp", len(shuffled)) + return + + try: + for i, outcode in enumerate(shuffled): + for ch_cfg in CHANNELS: + ch = ch_cfg["channel"] + try: + props = zoopla_search_outcode( + page, outcode, ch, pc_index, pc_coords + ) + zp_results[ch].extend(props) + if props: + log.info("Zoopla %s: +%d properties", outcode, len(props)) except TurnstileError: log.warning( - "Zoopla Cloudflare challenge failed — attempting browser relaunch" + "Zoopla Turnstile challenge — relaunching browser" ) try: - zp_browser.close() + browser.close() except Exception: pass try: - zp_browser, zp_page = launch_zoopla_browser() + browser, page = launch_zoopla_browser() log.info("Zoopla browser relaunched, continuing") except Exception: log.warning( - "Browser relaunch failed, disabling Zoopla for rest of scrape" + "Browser relaunch failed, disabling Zoopla" ) - zp_page = None - zp_browser = None - zp_failed = True zoopla_enabled.set(0) with status_lock: status.errors.append( - "Zoopla Cloudflare challenge failed and browser relaunch failed" + "Zoopla Cloudflare challenge failed and relaunch failed" ) + progress.update("zp", len(shuffled)) + return except Exception as e: - msg = f"Error scraping Zoopla {outcode}/{channel_name}: {e}" - log.error(msg) + log.error("Zoopla %s/%s: %s", outcode, ch, e) scrape_errors_total.labels(source="zoopla").inc() - with status_lock: - status.errors.append(msg) - with status_lock: - if channel_name == "BUY": - status.properties_buy = len(all_properties) - else: - status.properties_rent = len(all_properties) - status.rm_properties = rm_count - status.hk_properties = hk_count - status.or_properties = or_count - status.zp_properties = zp_count - _sync_gauges() + progress.update("zp", i + 1) + time.sleep(DELAY_BETWEEN_OUTCODES) + except Exception as e: + log.exception("Fatal Zoopla error: %s", e) + with status_lock: + status.errors.append(f"Fatal Zoopla: {e}") + finally: + try: + browser.close() + except Exception: + pass - # Log progress every 100 outcodes - done = i + 1 - elapsed = time.time() - channel_start - if done % 100 == 0 or done == len(shuffled): - pct = done * 100 // len(shuffled) - rate = done / elapsed if elapsed > 0 else 0 - log.info( - "%s %d/%d (%d%%) — %d props, %s elapsed, %.1f outcodes/min", - channel_name, - done, - len(shuffled), - pct, - len(all_properties), - _fmt_elapsed(elapsed), - rate * 60, - ) + # --- Launch worker threads --- - # Log when crossing a 10k property milestone - current_milestone = len(all_properties) // 10_000 - if current_milestone > prev_prop_milestone: - prev_prop_milestone = current_milestone - log.info( - "%s %dk properties (rm: %d, hk: %d, or: %d, zp: %d) at outcode %d/%d [%s]", - channel_name, - current_milestone * 10, - rm_count, - hk_count, - or_count, - zp_count, - done, - len(shuffled), - _fmt_elapsed(elapsed), - ) + active_sources: list[str] = [] + threads: list[threading.Thread] = [] - if made_requests and i < len(shuffled) - 1: - time.sleep(DELAY_BETWEEN_OUTCODES) + if SCRAPE_RIGHTMOVE: + threads.append(threading.Thread(target=rm_worker, name="scrape-rm", daemon=True)) + active_sources.append("rm") + if SCRAPE_HOMECOUK: + threads.append(threading.Thread(target=hk_worker, name="scrape-hk", daemon=True)) + active_sources.append("hk") + if SCRAPE_OPENRENT: + threads.append(threading.Thread(target=or_worker, name="scrape-or", daemon=True)) + active_sources.append("or") + if SCRAPE_ZOOPLA: + threads.append(threading.Thread(target=zp_worker, name="scrape-zp", daemon=True)) + active_sources.append("zp") - # Write parquet - deduped = list(all_properties.values()) + log.info( + "=== Starting scrape: %d outcodes, sources: %s ===", + len(shuffled), + ", ".join(active_sources), + ) + + for t in threads: + t.start() + + # --- Monitor progress while workers run --- + + scrape_start = time.time() + last_log = 0.0 + + try: + while any(t.is_alive() for t in threads): + snap = progress.snapshot() + min_done = min( + (snap.get(s, 0) for s in active_sources), default=0 + ) + + # Count properties across sources (safe: only one thread writes each list) + total_buy = sum( + len(r["BUY"]) for r in [rm_results, hk_results, or_results, zp_results] + ) + total_rent = sum( + len(r["RENT"]) for r in [rm_results, hk_results, or_results, zp_results] + ) + + with status_lock: + status.outcodes_done = min_done + status.outcodes_total = len(shuffled) + status.properties_buy = total_buy + status.properties_rent = total_rent + status.rm_properties = len(rm_results["BUY"]) + len(rm_results["RENT"]) + status.hk_properties = len(hk_results["BUY"]) + len(hk_results["RENT"]) + status.or_properties = len(or_results["RENT"]) + status.zp_properties = len(zp_results["BUY"]) + len(zp_results["RENT"]) + _sync_gauges() + + # Log progress every 30 seconds + now = time.time() + if now - last_log >= 30: + elapsed = now - scrape_start + per_source = ", ".join( + f"{s}:{snap.get(s, 0)}" for s in active_sources + ) + log.info( + "Progress: %d/%d outcodes (%s), %d buy + %d rent props, %s elapsed", + min_done, + len(shuffled), + per_source, + total_buy, + total_rent, + _fmt_elapsed(elapsed), + ) + last_log = now + + time.sleep(5) + except Exception as e: + log.exception("Monitor loop error: %s", e) + + for t in threads: + t.join() + + log.info("All source workers completed") + + # --- Merge results per channel and write parquet --- + + try: + for ch_cfg in CHANNELS: + ch = ch_cfg["channel"] + file_suffix = "buy" if ch == "BUY" else "rent" + + merged, counts, total_dedup = _merge_channel( + rm_results[ch], + hk_results[ch], + or_results[ch], + zp_results[ch], + ) + + # Update cross-source dedup counter + ch_label = "buy" if ch == "BUY" else "rent" + if total_dedup: + cross_source_dedup_total.labels(channel=ch_label).inc(total_dedup) + + deduped = list(merged.values()) output_path = DATA_DIR / f"online_listings_{file_suffix}.parquet" write_parquet(deduped, output_path, channel=file_suffix) with status_lock: - if channel_name == "BUY": + if ch == "BUY": status.properties_buy = len(deduped) else: status.properties_rent = len(deduped) - status.outcodes_done = len(shuffled) _sync_gauges() log.info( - "=== %s channel complete: %d unique (rm: %d, hk: %d, or: %d, zp: %d, cross-dedup: %d) ===", - channel_name, + "=== %s complete: %d unique (rm:%d hk:%d or:%d zp:%d, cross-dedup:%d) ===", + ch, len(deduped), - rm_count, - hk_count, - or_count, - zp_count, - hk_dedup_count + or_dedup_count + zp_dedup_count, + counts["rm"], + counts["hk"], + counts["or"], + counts["zp"], + total_dedup, ) with status_lock: status.state = "done" status.finished_at = time.time() + status.outcodes_done = len(shuffled) _sync_gauges() elapsed = status.finished_at - status.started_at log.info( @@ -598,26 +672,18 @@ def run_scrape( body.get("elapsed_ms", 0), ) else: - log.warning("Server reload failed (%d): %s", resp.status_code, resp.text[:200]) + log.warning( + "Server reload failed (%d): %s", + resp.status_code, + resp.text[:200], + ) except Exception as e: log.warning("Server reload request failed: %s", e) except Exception as e: - log.exception("Fatal scrape error") + log.exception("Fatal scrape error during merge/write") with status_lock: status.state = "error" status.errors.append(f"Fatal: {e}") status.finished_at = time.time() _sync_gauges() - finally: - if client: - client.close() - if hk_client: - hk_client.close() - if or_client: - or_client.close() - if zp_browser: - try: - zp_browser.close() - except Exception: - pass