perfect-postcode/finder/scraper.py
2026-03-24 22:30:49 +00:00

899 lines
32 KiB
Python

import json
import logging
import random
import threading
import time
from dataclasses import dataclass, field
import polars as pl
import httpx
from constants import (
ARCGIS_PATH,
CHANNELS,
CHECKPOINT_INTERVAL,
DATA_DIR,
DELAY_BETWEEN_OUTCODES,
RELOAD_URL,
SCRAPE_HOMECOUK,
SCRAPE_OPENRENT,
SCRAPE_RIGHTMOVE,
SCRAPE_ZOOPLA,
SEED,
)
from homecouk import CookiesExpiredError
from homecouk import load_cookies as load_homecouk_cookies
from homecouk import make_client as make_homecouk_client
from homecouk import search_outcode as homecouk_search_outcode
from http_client import make_client
from metrics import (
cookie_refreshes_total,
cross_source_dedup_total,
homecouk_enabled,
openrent_enabled,
scrape_elapsed_seconds,
scrape_errors_total,
scrape_outcodes_done,
scrape_outcodes_total,
scrape_properties_total,
scrape_state,
zoopla_enabled,
)
from openrent import WafChallengeError
from openrent import load_cookies as load_openrent_cookies
from openrent import make_client as make_openrent_client
from openrent import search_outcode as openrent_search_outcode
from rightmove import resolve_outcode_id, search_outcode
from zoopla import TurnstileError
from zoopla import launch_browser as launch_zoopla_browser
from zoopla import search_outcode as zoopla_search_outcode
from spatial import PostcodeSpatialIndex
from storage import write_parquet
log = logging.getLogger("rightmove")
@dataclass
class ScrapeStatus:
state: str = "idle" # idle | running | done | error
channel: str = ""
outcode: str = ""
outcodes_done: int = 0
outcodes_total: int = 0
properties_buy: int = 0
properties_rent: int = 0
# Per-source counts (combined across channels)
rm_properties: int = 0
hk_properties: int = 0
or_properties: int = 0
zp_properties: int = 0
errors: list[str] = field(default_factory=list)
started_at: float = 0.0
finished_at: float = 0.0
status = ScrapeStatus()
status_lock = threading.Lock()
def _sync_gauges() -> None:
"""Push current ScrapeStatus values into Prometheus gauges. Must hold status_lock."""
for state in ("idle", "running", "done", "error"):
scrape_state.labels(state=state).set(1 if status.state == state else 0)
scrape_outcodes_done.set(status.outcodes_done)
scrape_outcodes_total.set(status.outcodes_total)
scrape_properties_total.labels(channel="buy", source="total").set(
status.properties_buy
)
scrape_properties_total.labels(channel="rent", source="total").set(
status.properties_rent
)
# Per-source totals (across both channels)
for ch in ("buy", "rent"):
scrape_properties_total.labels(channel=ch, source="rightmove").set(
status.rm_properties
)
scrape_properties_total.labels(channel=ch, source="homecouk").set(
status.hk_properties
)
scrape_properties_total.labels(channel=ch, source="openrent").set(
status.or_properties
)
scrape_properties_total.labels(channel=ch, source="zoopla").set(
status.zp_properties
)
if status.started_at:
end = status.finished_at if status.finished_at else time.time()
scrape_elapsed_seconds.set(end - status.started_at)
else:
scrape_elapsed_seconds.set(0)
def load_outcodes() -> list[str]:
"""Load England-only outcodes from arcgis parquet."""
log.info("Loading outcodes from %s", ARCGIS_PATH)
df = pl.read_parquet(ARCGIS_PATH, columns=["pcd", "ctry", "lat", "long"])
england = df.filter(pl.col("ctry") == "E92000001")
log.info("England postcodes: %d", len(england))
outcodes = (
england.select(
pl.col("pcd").str.extract(r"^([A-Z]{1,2}\d[A-Z0-9]?)", 1).alias("outcode")
)
.drop_nulls()
.get_column("outcode")
.unique()
.sort()
.to_list()
)
log.info("Unique England outcodes: %d", len(outcodes))
return outcodes
def build_postcode_index() -> PostcodeSpatialIndex:
"""Build spatial index from arcgis England postcodes."""
log.info("Building postcode spatial index from %s", ARCGIS_PATH)
df = pl.read_parquet(ARCGIS_PATH, columns=["pcd", "ctry", "lat", "long"])
england = df.filter(pl.col("ctry") == "E92000001").drop_nulls(
subset=["lat", "long"]
)
return PostcodeSpatialIndex(
england.get_column("lat").to_list(),
england.get_column("long").to_list(),
england.get_column("pcd").to_list(),
)
def build_postcode_coords() -> dict[str, tuple[float, float]]:
"""Build postcode → (lat, lng) lookup from arcgis England postcodes.
Used by OpenRent scraper to resolve coordinates from postcodes."""
log.info("Building postcode coords lookup from %s", ARCGIS_PATH)
df = pl.read_parquet(ARCGIS_PATH, columns=["pcd", "ctry", "lat", "long"])
england = df.filter(pl.col("ctry") == "E92000001").drop_nulls(
subset=["lat", "long"]
)
coords: dict[str, tuple[float, float]] = {}
for pcd, lat, lng in zip(
england.get_column("pcd").to_list(),
england.get_column("lat").to_list(),
england.get_column("long").to_list(),
):
coords[pcd] = (lat, lng)
log.info("Postcode coords lookup: %d postcodes", len(coords))
return coords
def _fmt_elapsed(seconds: float) -> str:
"""Format seconds as e.g. '2h13m' or '5m32s'."""
h, rem = divmod(int(seconds), 3600)
m, s = divmod(rem, 60)
if h:
return f"{h}h{m:02d}m"
return f"{m}m{s:02d}s"
def _dedup_key(p: dict) -> tuple:
"""Composite key for cross-source deduplication: (postcode, bedrooms, price).
Two listings on different portals for the same physical property will share
these attributes even though their IDs differ."""
return (p.get("Postcode", ""), p.get("Bedrooms", 0), p.get("price", 0))
class _Progress:
"""Thread-safe progress tracker for parallel source workers."""
def __init__(self):
self._counts: dict[str, int] = {}
self._lock = threading.Lock()
def update(self, source: str, done: int) -> None:
with self._lock:
self._counts[source] = done
def snapshot(self) -> dict[str, int]:
with self._lock:
return dict(self._counts)
def _merge_channel(
rm_props: list[dict],
hk_props: list[dict],
or_props: list[dict],
zp_props: list[dict],
) -> tuple[dict[str, dict], dict[str, int], int]:
"""Merge properties from all sources for one channel with cross-source dedup.
Rightmove has priority; other sources are checked for duplicates.
Returns (all_properties_by_id, per_source_counts, total_dedup_count).
"""
all_properties: dict[str, dict] = {}
seen_keys: set[tuple] = set()
counts = {"rm": 0, "hk": 0, "or": 0, "zp": 0}
total_dedup = 0
# Rightmove first (priority source)
for p in rm_props:
pid = p["id"]
if pid not in all_properties:
all_properties[pid] = p
seen_keys.add(_dedup_key(p))
counts["rm"] += 1
# Other sources (check for cross-source duplicates)
for source, props in [("hk", hk_props), ("or", or_props), ("zp", zp_props)]:
for p in props:
pid = p["id"]
key = _dedup_key(p)
if pid in all_properties or key in seen_keys:
total_dedup += 1
continue
all_properties[pid] = p
seen_keys.add(key)
counts[source] += 1
return all_properties, counts, total_dedup
# ---------------------------------------------------------------------------
# Checkpointing — save/resume partial results across crashes
# ---------------------------------------------------------------------------
def _checkpoint_meta_path():
return DATA_DIR / "checkpoint.json"
def _checkpoint_results_path(source: str, channel: str):
return DATA_DIR / f"checkpoint_{source}_{channel}.json"
def _save_checkpoint(
shuffled: list[str],
progress: _Progress,
source_results: dict[str, dict[str, list]],
active_sources: list[str],
) -> None:
"""Save per-source progress indices and partial results to disk.
Writes atomically (temp + rename) so a crash mid-write leaves the previous
checkpoint intact.
"""
snap = progress.snapshot()
meta = {
"seed": SEED,
"num_outcodes": len(shuffled),
"sources": {s: snap.get(s, 0) for s in active_sources},
"timestamp": time.time(),
}
# Write result files per source per channel
for source in active_sources:
results = source_results.get(source, {})
for ch_key in ("BUY", "RENT"):
props = results.get(ch_key, [])
path = _checkpoint_results_path(source, ch_key.lower())
tmp = path.with_suffix(".tmp")
try:
with open(tmp, "w") as f:
json.dump(props, f, default=str)
tmp.rename(path)
except Exception as e:
log.warning("Failed to write checkpoint %s: %s", path.name, e)
# Write metadata atomically
tmp = _checkpoint_meta_path().with_suffix(".tmp")
try:
with open(tmp, "w") as f:
json.dump(meta, f)
tmp.rename(_checkpoint_meta_path())
except Exception as e:
log.warning("Failed to write checkpoint metadata: %s", e)
return
total = sum(len(source_results.get(s, {}).get(ch, []))
for s in active_sources for ch in ("BUY", "RENT"))
log.info(
"Checkpoint saved: %s (%d properties)",
{s: snap.get(s, 0) for s in active_sources},
total,
)
def _load_checkpoint(
shuffled: list[str],
) -> tuple[dict[str, int], dict[str, dict[str, list]]] | None:
"""Load checkpoint if it exists and matches the current outcode list.
Returns (start_indices, loaded_results) or None if no valid checkpoint.
"""
path = _checkpoint_meta_path()
if not path.exists():
return None
try:
with open(path) as f:
meta = json.load(f)
except Exception:
log.warning("Checkpoint file corrupt, starting fresh")
_clear_checkpoint()
return None
if meta.get("seed") != SEED or meta.get("num_outcodes") != len(shuffled):
log.info("Checkpoint from different run configuration, discarding")
_clear_checkpoint()
return None
start_indices: dict[str, int] = {}
loaded_results: dict[str, dict[str, list]] = {}
for source, completed in meta.get("sources", {}).items():
start_indices[source] = completed
loaded_results[source] = {"BUY": [], "RENT": []}
for channel in ("buy", "rent"):
rpath = _checkpoint_results_path(source, channel)
if rpath.exists():
try:
with open(rpath) as f:
loaded_results[source][channel.upper()] = json.load(f)
except Exception:
log.warning(
"Checkpoint results for %s/%s corrupt, restarting %s",
source, channel, source,
)
start_indices[source] = 0
loaded_results[source] = {"BUY": [], "RENT": []}
break
elapsed_since = time.time() - meta.get("timestamp", 0)
log.info(
"Resuming from checkpoint (saved %.0fm ago): %s",
elapsed_since / 60,
start_indices,
)
return start_indices, loaded_results
def _clear_checkpoint() -> None:
"""Remove all checkpoint files after successful completion."""
for path in DATA_DIR.glob("checkpoint*"):
try:
path.unlink()
except Exception:
pass
def run_scrape(
outcodes: list[str],
pc_index: PostcodeSpatialIndex,
pc_coords: dict[str, tuple[float, float]] | None = None,
) -> None:
"""Main scrape orchestrator — runs all sources in parallel threads.
Each source (Rightmove, home.co.uk, OpenRent, Zoopla) gets its own thread
that iterates all outcodes for both BUY and RENT channels. Results are
merged with cross-source deduplication after all workers complete.
"""
global status
with status_lock:
status.state = "running"
status.started_at = time.time()
status.finished_at = 0.0
status.errors = []
status.properties_buy = 0
status.properties_rent = 0
status.channel = ""
status.outcode = ""
_sync_gauges()
shuffled = list(outcodes)
random.seed(SEED)
random.shuffle(shuffled)
if not any([SCRAPE_RIGHTMOVE, SCRAPE_HOMECOUK, SCRAPE_OPENRENT, SCRAPE_ZOOPLA]):
log.warning("All scrapers disabled — nothing to do")
with status_lock:
status.state = "done"
status.finished_at = time.time()
_sync_gauges()
return
if not SCRAPE_RIGHTMOVE:
log.info("Rightmove scraping DISABLED (SCRAPE_RIGHTMOVE=false)")
if not SCRAPE_HOMECOUK:
log.info("home.co.uk scraping DISABLED (SCRAPE_HOMECOUK=false)")
homecouk_enabled.set(0)
if not SCRAPE_OPENRENT:
log.info("OpenRent scraping DISABLED (SCRAPE_OPENRENT=false)")
openrent_enabled.set(0)
if not SCRAPE_ZOOPLA:
log.info("Zoopla scraping DISABLED (SCRAPE_ZOOPLA=false)")
zoopla_enabled.set(0)
# Build postcode coords if needed for OpenRent/Zoopla
if (SCRAPE_OPENRENT or SCRAPE_ZOOPLA) and pc_coords is None:
pc_coords = build_postcode_coords()
# Per-source result containers: {channel_name: [properties]}
# Each list is only written by its owning source thread.
rm_results: dict[str, list] = {"BUY": [], "RENT": []}
hk_results: dict[str, list] = {"BUY": [], "RENT": []}
or_results: dict[str, list] = {"BUY": [], "RENT": []}
zp_results: dict[str, list] = {"BUY": [], "RENT": []}
progress = _Progress()
# --- Resume from checkpoint if available ---
start_indices: dict[str, int] = {}
checkpoint = _load_checkpoint(shuffled)
if checkpoint:
start_indices, loaded = checkpoint
source_to_results = {"rm": rm_results, "hk": hk_results, "or": or_results, "zp": zp_results}
for src, data in loaded.items():
if src in source_to_results:
for ch in ("BUY", "RENT"):
source_to_results[src][ch] = data.get(ch, [])
# Reassign in case references changed
rm_results = source_to_results["rm"]
hk_results = source_to_results["hk"]
or_results = source_to_results["or"]
zp_results = source_to_results["zp"]
# Pre-set progress for resumed sources
for src, idx in start_indices.items():
if idx > 0:
progress.update(src, idx)
# --- Source worker closures ---
# Each worker owns its client lifecycle and iterates all outcodes for both
# channels. On auth failure, it refreshes cookies and continues. On fatal
# failure, it marks itself as done and returns partial results.
def rm_worker():
rm_start = start_indices.get("rm", 0)
if rm_start > 0:
log.info("Rightmove resuming from outcode %d/%d", rm_start, len(shuffled))
client = make_client()
try:
for i, outcode in enumerate(shuffled):
if i < rm_start:
continue
try:
outcode_id = resolve_outcode_id(client, outcode)
except Exception as e:
log.error("Rightmove %s ID lookup: %s", outcode, e)
scrape_errors_total.labels(source="rightmove").inc()
progress.update("rm", i + 1)
time.sleep(DELAY_BETWEEN_OUTCODES)
continue
if not outcode_id:
log.debug("No Rightmove ID for %s, skipping", outcode)
progress.update("rm", i + 1)
time.sleep(DELAY_BETWEEN_OUTCODES)
continue
for ch_cfg in CHANNELS:
ch = ch_cfg["channel"]
try:
props = search_outcode(
client, outcode_id, outcode, ch_cfg, pc_index
)
rm_results[ch].extend(props)
except Exception as e:
log.error("Rightmove %s/%s: %s", outcode, ch, e)
scrape_errors_total.labels(source="rightmove").inc()
progress.update("rm", i + 1)
time.sleep(DELAY_BETWEEN_OUTCODES)
except Exception as e:
log.exception("Fatal Rightmove error: %s", e)
with status_lock:
status.errors.append(f"Fatal Rightmove: {e}")
finally:
client.close()
def hk_worker():
hk_result = load_homecouk_cookies()
if not hk_result:
log.info("home.co.uk DISABLED (no cookies available)")
homecouk_enabled.set(0)
progress.update("hk", len(shuffled))
return
hk_start = start_indices.get("hk", 0)
if hk_start > 0:
log.info("home.co.uk resuming from outcode %d/%d", hk_start, len(shuffled))
client = make_homecouk_client(*hk_result)
log.info("home.co.uk scraping ENABLED")
homecouk_enabled.set(1)
try:
for i, outcode in enumerate(shuffled):
if i < hk_start:
continue
for ch_cfg in CHANNELS:
ch = ch_cfg["channel"]
try:
props = homecouk_search_outcode(
client, outcode, ch, pc_index
)
hk_results[ch].extend(props)
if props:
log.info("home.co.uk %s: +%d properties", outcode, len(props))
except CookiesExpiredError:
log.warning(
"home.co.uk cookies expired — attempting refresh"
)
client.close()
hk_new = load_homecouk_cookies()
if hk_new:
client = make_homecouk_client(*hk_new)
log.info("home.co.uk cookies refreshed, continuing")
cookie_refreshes_total.labels(result="success").inc()
else:
log.warning(
"Cookie refresh failed, disabling home.co.uk"
)
homecouk_enabled.set(0)
cookie_refreshes_total.labels(result="failure").inc()
with status_lock:
status.errors.append(
"home.co.uk cookies expired and refresh failed"
)
progress.update("hk", len(shuffled))
return
except Exception as e:
log.error("home.co.uk %s/%s: %s", outcode, ch, e)
scrape_errors_total.labels(source="homecouk").inc()
progress.update("hk", i + 1)
time.sleep(DELAY_BETWEEN_OUTCODES)
except Exception as e:
log.exception("Fatal home.co.uk error: %s", e)
with status_lock:
status.errors.append(f"Fatal home.co.uk: {e}")
finally:
try:
client.close()
except Exception:
pass
def or_worker():
or_result = load_openrent_cookies()
if not or_result:
log.info("OpenRent DISABLED (no cookies available)")
openrent_enabled.set(0)
progress.update("or", len(shuffled))
return
or_start = start_indices.get("or", 0)
if or_start > 0:
log.info("OpenRent resuming from outcode %d/%d", or_start, len(shuffled))
client = make_openrent_client(*or_result)
log.info("OpenRent scraping ENABLED")
openrent_enabled.set(1)
try:
for i, outcode in enumerate(shuffled):
if i < or_start:
continue
# OpenRent is RENT-only
try:
props = openrent_search_outcode(
client, outcode, pc_index, pc_coords
)
or_results["RENT"].extend(props)
if props:
log.info("OpenRent %s: +%d properties", outcode, len(props))
except WafChallengeError:
log.warning(
"OpenRent WAF cookies expired — attempting refresh"
)
client.close()
or_new = load_openrent_cookies()
if or_new:
client = make_openrent_client(*or_new)
log.info("OpenRent cookies refreshed, continuing")
cookie_refreshes_total.labels(result="success").inc()
else:
log.warning(
"Cookie refresh failed, disabling OpenRent"
)
openrent_enabled.set(0)
cookie_refreshes_total.labels(result="failure").inc()
with status_lock:
status.errors.append(
"OpenRent WAF cookies expired and refresh failed"
)
progress.update("or", len(shuffled))
return
except Exception as e:
log.error("OpenRent %s: %s", outcode, e)
scrape_errors_total.labels(source="openrent").inc()
progress.update("or", i + 1)
time.sleep(DELAY_BETWEEN_OUTCODES)
except Exception as e:
log.exception("Fatal OpenRent error: %s", e)
with status_lock:
status.errors.append(f"Fatal OpenRent: {e}")
finally:
try:
client.close()
except Exception:
pass
def zp_worker():
try:
browser, page = launch_zoopla_browser()
log.info("Zoopla scraping ENABLED (Camoufox browser launched)")
zoopla_enabled.set(1)
except TurnstileError:
log.warning("Zoopla Cloudflare Turnstile failed — disabling Zoopla")
zoopla_enabled.set(0)
progress.update("zp", len(shuffled))
return
except Exception as e:
log.warning("Zoopla browser launch failed: %s — disabling Zoopla", e)
zoopla_enabled.set(0)
progress.update("zp", len(shuffled))
return
zp_start = start_indices.get("zp", 0)
if zp_start > 0:
log.info("Zoopla resuming from outcode %d/%d", zp_start, len(shuffled))
try:
for i, outcode in enumerate(shuffled):
if i < zp_start:
continue
search_url = None
for ch_cfg in CHANNELS:
ch = ch_cfg["channel"]
# Build direct URL for second channel by swapping path
direct_url = None
if search_url:
if ch == "BUY":
direct_url = search_url.replace("/to-rent/", "/for-sale/")
else:
direct_url = search_url.replace("/for-sale/", "/to-rent/")
try:
props, result_url = zoopla_search_outcode(
page, outcode, ch, pc_index, pc_coords,
base_search_url=direct_url,
)
if result_url:
search_url = result_url
zp_results[ch].extend(props)
if props:
log.info("Zoopla %s: +%d properties", outcode, len(props))
except TurnstileError:
log.warning(
"Zoopla Turnstile challenge — relaunching browser"
)
try:
browser.close()
except Exception:
pass
try:
browser, page = launch_zoopla_browser()
log.info("Zoopla browser relaunched, continuing")
except Exception:
log.warning(
"Browser relaunch failed, disabling Zoopla"
)
zoopla_enabled.set(0)
with status_lock:
status.errors.append(
"Zoopla Cloudflare challenge failed and relaunch failed"
)
progress.update("zp", len(shuffled))
return
except Exception as e:
log.error("Zoopla %s/%s: %s", outcode, ch, e)
scrape_errors_total.labels(source="zoopla").inc()
progress.update("zp", i + 1)
time.sleep(DELAY_BETWEEN_OUTCODES)
except Exception as e:
log.exception("Fatal Zoopla error: %s", e)
with status_lock:
status.errors.append(f"Fatal Zoopla: {e}")
finally:
try:
browser.close()
except Exception:
pass
# --- Launch worker threads ---
active_sources: list[str] = []
threads: list[threading.Thread] = []
if SCRAPE_RIGHTMOVE:
threads.append(threading.Thread(target=rm_worker, name="scrape-rm", daemon=True))
active_sources.append("rm")
if SCRAPE_HOMECOUK:
threads.append(threading.Thread(target=hk_worker, name="scrape-hk", daemon=True))
active_sources.append("hk")
if SCRAPE_OPENRENT:
threads.append(threading.Thread(target=or_worker, name="scrape-or", daemon=True))
active_sources.append("or")
if SCRAPE_ZOOPLA:
threads.append(threading.Thread(target=zp_worker, name="scrape-zp", daemon=True))
active_sources.append("zp")
log.info(
"=== Starting scrape: %d outcodes, sources: %s ===",
len(shuffled),
", ".join(active_sources),
)
for t in threads:
t.start()
# --- Monitor progress while workers run ---
# Map source names to result dicts for checkpointing
source_results_map = {
"rm": rm_results, "hk": hk_results,
"or": or_results, "zp": zp_results,
}
scrape_start = time.time()
last_log = 0.0
last_checkpoint = time.time()
try:
while any(t.is_alive() for t in threads):
snap = progress.snapshot()
min_done = min(
(snap.get(s, 0) for s in active_sources), default=0
)
# Count properties across sources (safe: only one thread writes each list)
total_buy = sum(
len(r["BUY"]) for r in [rm_results, hk_results, or_results, zp_results]
)
total_rent = sum(
len(r["RENT"]) for r in [rm_results, hk_results, or_results, zp_results]
)
with status_lock:
status.outcodes_done = min_done
status.outcodes_total = len(shuffled)
status.properties_buy = total_buy
status.properties_rent = total_rent
status.rm_properties = len(rm_results["BUY"]) + len(rm_results["RENT"])
status.hk_properties = len(hk_results["BUY"]) + len(hk_results["RENT"])
status.or_properties = len(or_results["RENT"])
status.zp_properties = len(zp_results["BUY"]) + len(zp_results["RENT"])
_sync_gauges()
now = time.time()
# Log progress every 30 seconds
if now - last_log >= 30:
elapsed = now - scrape_start
per_source = ", ".join(
f"{s}:{snap.get(s, 0)}" for s in active_sources
)
log.info(
"Progress: %d/%d outcodes (%s), %d buy + %d rent props, %s elapsed",
min_done,
len(shuffled),
per_source,
total_buy,
total_rent,
_fmt_elapsed(elapsed),
)
last_log = now
# Save checkpoint periodically
if now - last_checkpoint >= CHECKPOINT_INTERVAL:
try:
_save_checkpoint(
shuffled, progress, source_results_map, active_sources,
)
except Exception as e:
log.warning("Checkpoint save failed: %s", e)
last_checkpoint = now
time.sleep(5)
except Exception as e:
log.exception("Monitor loop error: %s", e)
# Save final checkpoint before joining (in case merge/write fails)
try:
_save_checkpoint(shuffled, progress, source_results_map, active_sources)
except Exception:
pass
for t in threads:
t.join()
log.info("All source workers completed")
# --- Merge results per channel and write parquet ---
try:
for ch_cfg in CHANNELS:
ch = ch_cfg["channel"]
file_suffix = "buy" if ch == "BUY" else "rent"
merged, counts, total_dedup = _merge_channel(
rm_results[ch],
hk_results[ch],
or_results[ch],
zp_results[ch],
)
# Update cross-source dedup counter
ch_label = "buy" if ch == "BUY" else "rent"
if total_dedup:
cross_source_dedup_total.labels(channel=ch_label).inc(total_dedup)
deduped = list(merged.values())
output_path = DATA_DIR / f"online_listings_{file_suffix}.parquet"
write_parquet(deduped, output_path, channel=file_suffix)
with status_lock:
if ch == "BUY":
status.properties_buy = len(deduped)
else:
status.properties_rent = len(deduped)
_sync_gauges()
log.info(
"=== %s complete: %d unique (rm:%d hk:%d or:%d zp:%d, cross-dedup:%d) ===",
ch,
len(deduped),
counts["rm"],
counts["hk"],
counts["or"],
counts["zp"],
total_dedup,
)
# Scrape completed successfully — clear checkpoint
_clear_checkpoint()
with status_lock:
status.state = "done"
status.finished_at = time.time()
status.outcodes_done = len(shuffled)
_sync_gauges()
elapsed = status.finished_at - status.started_at
log.info(
"Scrape complete in %s — buy: %d, rent: %d",
_fmt_elapsed(elapsed),
status.properties_buy,
status.properties_rent,
)
# Trigger server data reload
if RELOAD_URL:
try:
log.info("Triggering server reload at %s", RELOAD_URL)
resp = httpx.post(RELOAD_URL, timeout=300)
if resp.is_success:
body = resp.json()
log.info(
"Server reload complete: %d rows, %d features, %dms",
body.get("rows", 0),
body.get("features", 0),
body.get("elapsed_ms", 0),
)
else:
log.warning(
"Server reload failed (%d): %s",
resp.status_code,
resp.text[:200],
)
except Exception as e:
log.warning("Server reload request failed: %s", e)
except Exception as e:
log.exception("Fatal scrape error during merge/write")
with status_lock:
status.state = "error"
status.errors.append(f"Fatal: {e}")
status.finished_at = time.time()
_sync_gauges()