From 1dfa0e0009faefed8331b33333c3630da38f328d Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Tue, 24 Mar 2026 22:30:49 +0000 Subject: [PATCH] fix zoopla bug --- finder/constants.py | 1 + finder/openrent.py | 8 +- finder/scraper.py | 201 +++++++++++++++++++++++++++++++++++++++++++- finder/zoopla.py | 9 +- 4 files changed, 215 insertions(+), 4 deletions(-) diff --git a/finder/constants.py b/finder/constants.py index 09f9fd8..f10d5da 100644 --- a/finder/constants.py +++ b/finder/constants.py @@ -10,6 +10,7 @@ MAX_RETRIES = 3 RETRY_BASE_DELAY = 2.0 GRID_CELL_SIZE = 0.01 # degrees for postcode spatial index SEED = 42 +CHECKPOINT_INTERVAL = int(os.environ.get("CHECKPOINT_INTERVAL", "900")) # seconds # Schedule: hour of day (UTC) to auto-run scrape. Set to -1 to disable. SCHEDULE_HOUR = int(os.environ.get("SCHEDULE_HOUR", "3")) diff --git a/finder/openrent.py b/finder/openrent.py index 1737745..791e79c 100644 --- a/finder/openrent.py +++ b/finder/openrent.py @@ -624,9 +624,13 @@ def _resolve_outcode_postcodes( pc_coords: dict[str, tuple[float, float]], ) -> list[str]: """Get all postcodes for an outcode from the postcode coordinates lookup.""" + # ONSPD 7-char format: 4-char outcodes have no space before incode + # (e.g., "BH191AB"), while shorter outcodes do (e.g., "E14 5AB"). prefix = outcode + " " - # Also try without space for non-standard format (e.g. "SW1Y" matches "SW1Y 4AA") - return [pcd for pcd in pc_coords if pcd.startswith(prefix)] + results = [pcd for pcd in pc_coords if pcd.startswith(prefix)] + if not results and len(outcode) >= 4: + results = [pcd for pcd in pc_coords if pcd.startswith(outcode) and len(pcd) > len(outcode)] + return results def transform_property( diff --git a/finder/scraper.py b/finder/scraper.py index 1d2ccf4..4f81aee 100644 --- a/finder/scraper.py +++ b/finder/scraper.py @@ -1,3 +1,4 @@ +import json import logging import random import threading @@ -11,6 +12,7 @@ import httpx from constants import ( ARCGIS_PATH, CHANNELS, + CHECKPOINT_INTERVAL, DATA_DIR, DELAY_BETWEEN_OUTCODES, RELOAD_URL, @@ -233,6 +235,135 @@ def _merge_channel( return all_properties, counts, total_dedup +# --------------------------------------------------------------------------- +# Checkpointing — save/resume partial results across crashes +# --------------------------------------------------------------------------- + + +def _checkpoint_meta_path(): + return DATA_DIR / "checkpoint.json" + + +def _checkpoint_results_path(source: str, channel: str): + return DATA_DIR / f"checkpoint_{source}_{channel}.json" + + +def _save_checkpoint( + shuffled: list[str], + progress: _Progress, + source_results: dict[str, dict[str, list]], + active_sources: list[str], +) -> None: + """Save per-source progress indices and partial results to disk. + + Writes atomically (temp + rename) so a crash mid-write leaves the previous + checkpoint intact. + """ + snap = progress.snapshot() + + meta = { + "seed": SEED, + "num_outcodes": len(shuffled), + "sources": {s: snap.get(s, 0) for s in active_sources}, + "timestamp": time.time(), + } + + # Write result files per source per channel + for source in active_sources: + results = source_results.get(source, {}) + for ch_key in ("BUY", "RENT"): + props = results.get(ch_key, []) + path = _checkpoint_results_path(source, ch_key.lower()) + tmp = path.with_suffix(".tmp") + try: + with open(tmp, "w") as f: + json.dump(props, f, default=str) + tmp.rename(path) + except Exception as e: + log.warning("Failed to write checkpoint %s: %s", path.name, e) + + # Write metadata atomically + tmp = _checkpoint_meta_path().with_suffix(".tmp") + try: + with open(tmp, "w") as f: + json.dump(meta, f) + tmp.rename(_checkpoint_meta_path()) + except Exception as e: + log.warning("Failed to write checkpoint metadata: %s", e) + return + + total = sum(len(source_results.get(s, {}).get(ch, [])) + for s in active_sources for ch in ("BUY", "RENT")) + log.info( + "Checkpoint saved: %s (%d properties)", + {s: snap.get(s, 0) for s in active_sources}, + total, + ) + + +def _load_checkpoint( + shuffled: list[str], +) -> tuple[dict[str, int], dict[str, dict[str, list]]] | None: + """Load checkpoint if it exists and matches the current outcode list. + + Returns (start_indices, loaded_results) or None if no valid checkpoint. + """ + path = _checkpoint_meta_path() + if not path.exists(): + return None + + try: + with open(path) as f: + meta = json.load(f) + except Exception: + log.warning("Checkpoint file corrupt, starting fresh") + _clear_checkpoint() + return None + + if meta.get("seed") != SEED or meta.get("num_outcodes") != len(shuffled): + log.info("Checkpoint from different run configuration, discarding") + _clear_checkpoint() + return None + + start_indices: dict[str, int] = {} + loaded_results: dict[str, dict[str, list]] = {} + + for source, completed in meta.get("sources", {}).items(): + start_indices[source] = completed + loaded_results[source] = {"BUY": [], "RENT": []} + for channel in ("buy", "rent"): + rpath = _checkpoint_results_path(source, channel) + if rpath.exists(): + try: + with open(rpath) as f: + loaded_results[source][channel.upper()] = json.load(f) + except Exception: + log.warning( + "Checkpoint results for %s/%s corrupt, restarting %s", + source, channel, source, + ) + start_indices[source] = 0 + loaded_results[source] = {"BUY": [], "RENT": []} + break + + elapsed_since = time.time() - meta.get("timestamp", 0) + log.info( + "Resuming from checkpoint (saved %.0fm ago): %s", + elapsed_since / 60, + start_indices, + ) + return start_indices, loaded_results + + +def _clear_checkpoint() -> None: + """Remove all checkpoint files after successful completion.""" + for path in DATA_DIR.glob("checkpoint*"): + try: + path.unlink() + except Exception: + pass + + def run_scrape( outcodes: list[str], pc_index: PostcodeSpatialIndex, @@ -293,15 +424,40 @@ def run_scrape( progress = _Progress() + # --- Resume from checkpoint if available --- + start_indices: dict[str, int] = {} + checkpoint = _load_checkpoint(shuffled) + if checkpoint: + start_indices, loaded = checkpoint + source_to_results = {"rm": rm_results, "hk": hk_results, "or": or_results, "zp": zp_results} + for src, data in loaded.items(): + if src in source_to_results: + for ch in ("BUY", "RENT"): + source_to_results[src][ch] = data.get(ch, []) + # Reassign in case references changed + rm_results = source_to_results["rm"] + hk_results = source_to_results["hk"] + or_results = source_to_results["or"] + zp_results = source_to_results["zp"] + # Pre-set progress for resumed sources + for src, idx in start_indices.items(): + if idx > 0: + progress.update(src, idx) + # --- Source worker closures --- # Each worker owns its client lifecycle and iterates all outcodes for both # channels. On auth failure, it refreshes cookies and continues. On fatal # failure, it marks itself as done and returns partial results. def rm_worker(): + rm_start = start_indices.get("rm", 0) + if rm_start > 0: + log.info("Rightmove resuming from outcode %d/%d", rm_start, len(shuffled)) client = make_client() try: for i, outcode in enumerate(shuffled): + if i < rm_start: + continue try: outcode_id = resolve_outcode_id(client, outcode) except Exception as e: @@ -344,11 +500,16 @@ def run_scrape( homecouk_enabled.set(0) progress.update("hk", len(shuffled)) return + hk_start = start_indices.get("hk", 0) + if hk_start > 0: + log.info("home.co.uk resuming from outcode %d/%d", hk_start, len(shuffled)) client = make_homecouk_client(*hk_result) log.info("home.co.uk scraping ENABLED") homecouk_enabled.set(1) try: for i, outcode in enumerate(shuffled): + if i < hk_start: + continue for ch_cfg in CHANNELS: ch = ch_cfg["channel"] try: @@ -403,11 +564,16 @@ def run_scrape( openrent_enabled.set(0) progress.update("or", len(shuffled)) return + or_start = start_indices.get("or", 0) + if or_start > 0: + log.info("OpenRent resuming from outcode %d/%d", or_start, len(shuffled)) client = make_openrent_client(*or_result) log.info("OpenRent scraping ENABLED") openrent_enabled.set(1) try: for i, outcode in enumerate(shuffled): + if i < or_start: + continue # OpenRent is RENT-only try: props = openrent_search_outcode( @@ -470,8 +636,14 @@ def run_scrape( progress.update("zp", len(shuffled)) return + zp_start = start_indices.get("zp", 0) + if zp_start > 0: + log.info("Zoopla resuming from outcode %d/%d", zp_start, len(shuffled)) + try: for i, outcode in enumerate(shuffled): + if i < zp_start: + continue search_url = None for ch_cfg in CHANNELS: ch = ch_cfg["channel"] @@ -559,8 +731,15 @@ def run_scrape( # --- Monitor progress while workers run --- + # Map source names to result dicts for checkpointing + source_results_map = { + "rm": rm_results, "hk": hk_results, + "or": or_results, "zp": zp_results, + } + scrape_start = time.time() last_log = 0.0 + last_checkpoint = time.time() try: while any(t.is_alive() for t in threads): @@ -588,8 +767,9 @@ def run_scrape( status.zp_properties = len(zp_results["BUY"]) + len(zp_results["RENT"]) _sync_gauges() - # Log progress every 30 seconds now = time.time() + + # Log progress every 30 seconds if now - last_log >= 30: elapsed = now - scrape_start per_source = ", ".join( @@ -606,10 +786,26 @@ def run_scrape( ) last_log = now + # Save checkpoint periodically + if now - last_checkpoint >= CHECKPOINT_INTERVAL: + try: + _save_checkpoint( + shuffled, progress, source_results_map, active_sources, + ) + except Exception as e: + log.warning("Checkpoint save failed: %s", e) + last_checkpoint = now + time.sleep(5) except Exception as e: log.exception("Monitor loop error: %s", e) + # Save final checkpoint before joining (in case merge/write fails) + try: + _save_checkpoint(shuffled, progress, source_results_map, active_sources) + except Exception: + pass + for t in threads: t.join() @@ -656,6 +852,9 @@ def run_scrape( total_dedup, ) + # Scrape completed successfully — clear checkpoint + _clear_checkpoint() + with status_lock: status.state = "done" status.finished_at = time.time() diff --git a/finder/zoopla.py b/finder/zoopla.py index dbb59cc..59372ad 100644 --- a/finder/zoopla.py +++ b/finder/zoopla.py @@ -626,9 +626,16 @@ def transform_property( # Try outcode-level fallback outcode = _extract_outcode(address) if outcode: + # ONSPD 7-char format: 4-char outcodes have no space before incode + # (e.g., "BH191AB"), while shorter outcodes do (e.g., "E14 5AB"). + # Check both formats to handle all outcode lengths. prefix = outcode + " " for pcd, coords in pc_coords.items(): - if pcd.startswith(prefix): + if pcd.startswith(prefix) or ( + len(outcode) >= 4 + and pcd.startswith(outcode) + and len(pcd) > len(outcode) + ): postcode = pcd lat, lng = coords break