Remove finder
This commit is contained in:
parent
55238f59aa
commit
cd778dd088
26 changed files with 0 additions and 57826 deletions
|
|
@ -1,993 +0,0 @@
|
|||
import json
|
||||
import logging
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
import polars as pl
|
||||
|
||||
import httpx
|
||||
|
||||
from constants import (
|
||||
ARCGIS_PATH,
|
||||
CHANNELS,
|
||||
CHECKPOINT_INTERVAL,
|
||||
DATA_DIR,
|
||||
DELAY_BETWEEN_OUTCODES,
|
||||
HOMECOUK_CONCURRENCY,
|
||||
RELOAD_URL,
|
||||
SCRAPE_HOMECOUK,
|
||||
SCRAPE_OPENRENT,
|
||||
SCRAPE_RIGHTMOVE,
|
||||
SCRAPE_ZOOPLA,
|
||||
SEED,
|
||||
)
|
||||
from homecouk import CookiesExpiredError
|
||||
from homecouk import load_cookies as load_homecouk_cookies
|
||||
from homecouk import make_client as make_homecouk_client
|
||||
from homecouk import search_outcode as homecouk_search_outcode
|
||||
from http_client import make_client
|
||||
from metrics import (
|
||||
cookie_refreshes_total,
|
||||
cross_source_dedup_total,
|
||||
homecouk_enabled,
|
||||
openrent_enabled,
|
||||
scrape_elapsed_seconds,
|
||||
scrape_errors_total,
|
||||
scrape_outcodes_done,
|
||||
scrape_outcodes_total,
|
||||
scrape_properties_total,
|
||||
scrape_state,
|
||||
zoopla_enabled,
|
||||
)
|
||||
from openrent import WafChallengeError
|
||||
from openrent import load_cookies as load_openrent_cookies
|
||||
from openrent import make_client as make_openrent_client
|
||||
from openrent import search_outcode as openrent_search_outcode
|
||||
from rightmove import resolve_outcode_id, search_outcode
|
||||
from zoopla import TurnstileError
|
||||
from zoopla import launch_browser as launch_zoopla_browser
|
||||
from zoopla import search_outcode as zoopla_search_outcode
|
||||
from spatial import PostcodeSpatialIndex
|
||||
from storage import write_parquet
|
||||
|
||||
log = logging.getLogger("rightmove")
|
||||
|
||||
|
||||
@dataclass
|
||||
class ScrapeStatus:
|
||||
state: str = "idle" # idle | running | done | error
|
||||
channel: str = ""
|
||||
outcode: str = ""
|
||||
outcodes_done: int = 0
|
||||
outcodes_total: int = 0
|
||||
properties_buy: int = 0
|
||||
properties_rent: int = 0
|
||||
# Per-source counts (combined across channels)
|
||||
rm_properties: int = 0
|
||||
hk_properties: int = 0
|
||||
or_properties: int = 0
|
||||
zp_properties: int = 0
|
||||
errors: list[str] = field(default_factory=list)
|
||||
started_at: float = 0.0
|
||||
finished_at: float = 0.0
|
||||
|
||||
|
||||
status = ScrapeStatus()
|
||||
status_lock = threading.Lock()
|
||||
|
||||
|
||||
def _sync_gauges() -> None:
|
||||
"""Push current ScrapeStatus values into Prometheus gauges. Must hold status_lock."""
|
||||
for state in ("idle", "running", "done", "error"):
|
||||
scrape_state.labels(state=state).set(1 if status.state == state else 0)
|
||||
scrape_outcodes_done.set(status.outcodes_done)
|
||||
scrape_outcodes_total.set(status.outcodes_total)
|
||||
scrape_properties_total.labels(channel="buy", source="total").set(
|
||||
status.properties_buy
|
||||
)
|
||||
scrape_properties_total.labels(channel="rent", source="total").set(
|
||||
status.properties_rent
|
||||
)
|
||||
# Per-source totals (across both channels)
|
||||
for ch in ("buy", "rent"):
|
||||
scrape_properties_total.labels(channel=ch, source="rightmove").set(
|
||||
status.rm_properties
|
||||
)
|
||||
scrape_properties_total.labels(channel=ch, source="homecouk").set(
|
||||
status.hk_properties
|
||||
)
|
||||
scrape_properties_total.labels(channel=ch, source="openrent").set(
|
||||
status.or_properties
|
||||
)
|
||||
scrape_properties_total.labels(channel=ch, source="zoopla").set(
|
||||
status.zp_properties
|
||||
)
|
||||
if status.started_at:
|
||||
end = status.finished_at if status.finished_at else time.time()
|
||||
scrape_elapsed_seconds.set(end - status.started_at)
|
||||
else:
|
||||
scrape_elapsed_seconds.set(0)
|
||||
|
||||
|
||||
def load_outcodes() -> list[str]:
|
||||
"""Load England-only outcodes from arcgis parquet."""
|
||||
log.info("Loading outcodes from %s", ARCGIS_PATH)
|
||||
df = pl.read_parquet(ARCGIS_PATH, columns=["pcd", "ctry", "lat", "long"])
|
||||
england = df.filter(pl.col("ctry") == "E92000001")
|
||||
log.info("England postcodes: %d", len(england))
|
||||
|
||||
outcodes = (
|
||||
england.select(
|
||||
pl.col("pcd").str.extract(r"^([A-Z]{1,2}\d[A-Z0-9]?)", 1).alias("outcode")
|
||||
)
|
||||
.drop_nulls()
|
||||
.get_column("outcode")
|
||||
.unique()
|
||||
.sort()
|
||||
.to_list()
|
||||
)
|
||||
log.info("Unique England outcodes: %d", len(outcodes))
|
||||
return outcodes
|
||||
|
||||
|
||||
def build_postcode_index() -> PostcodeSpatialIndex:
|
||||
"""Build spatial index from arcgis England postcodes."""
|
||||
log.info("Building postcode spatial index from %s", ARCGIS_PATH)
|
||||
df = pl.read_parquet(ARCGIS_PATH, columns=["pcd", "ctry", "lat", "long"])
|
||||
england = df.filter(pl.col("ctry") == "E92000001").drop_nulls(
|
||||
subset=["lat", "long"]
|
||||
)
|
||||
return PostcodeSpatialIndex(
|
||||
england.get_column("lat").to_list(),
|
||||
england.get_column("long").to_list(),
|
||||
england.get_column("pcd").to_list(),
|
||||
)
|
||||
|
||||
|
||||
def build_postcode_coords() -> dict[str, tuple[float, float]]:
|
||||
"""Build postcode → (lat, lng) lookup from arcgis England postcodes.
|
||||
Used by OpenRent scraper to resolve coordinates from postcodes."""
|
||||
log.info("Building postcode coords lookup from %s", ARCGIS_PATH)
|
||||
df = pl.read_parquet(ARCGIS_PATH, columns=["pcd", "ctry", "lat", "long"])
|
||||
england = df.filter(pl.col("ctry") == "E92000001").drop_nulls(
|
||||
subset=["lat", "long"]
|
||||
)
|
||||
coords: dict[str, tuple[float, float]] = {}
|
||||
for pcd, lat, lng in zip(
|
||||
england.get_column("pcd").to_list(),
|
||||
england.get_column("lat").to_list(),
|
||||
england.get_column("long").to_list(),
|
||||
):
|
||||
coords[pcd] = (lat, lng)
|
||||
log.info("Postcode coords lookup: %d postcodes", len(coords))
|
||||
return coords
|
||||
|
||||
|
||||
def _fmt_elapsed(seconds: float) -> str:
|
||||
"""Format seconds as e.g. '2h13m' or '5m32s'."""
|
||||
h, rem = divmod(int(seconds), 3600)
|
||||
m, s = divmod(rem, 60)
|
||||
if h:
|
||||
return f"{h}h{m:02d}m"
|
||||
return f"{m}m{s:02d}s"
|
||||
|
||||
|
||||
def _dedup_key(p: dict) -> tuple:
|
||||
"""Composite key for cross-source deduplication: (postcode, bedrooms, price).
|
||||
Two listings on different portals for the same physical property will share
|
||||
these attributes even though their IDs differ."""
|
||||
return (p.get("Postcode", ""), p.get("Bedrooms", 0), p.get("price", 0))
|
||||
|
||||
|
||||
class _Progress:
|
||||
"""Thread-safe progress tracker for parallel source workers."""
|
||||
|
||||
def __init__(self):
|
||||
self._counts: dict[str, int] = {}
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def update(self, source: str, done: int) -> None:
|
||||
with self._lock:
|
||||
self._counts[source] = done
|
||||
|
||||
def snapshot(self) -> dict[str, int]:
|
||||
with self._lock:
|
||||
return dict(self._counts)
|
||||
|
||||
|
||||
def _merge_channel(
|
||||
rm_props: list[dict],
|
||||
hk_props: list[dict],
|
||||
or_props: list[dict],
|
||||
zp_props: list[dict],
|
||||
) -> tuple[dict[str, dict], dict[str, int], int]:
|
||||
"""Merge properties from all sources for one channel with cross-source dedup.
|
||||
|
||||
Rightmove has priority; other sources are checked for duplicates.
|
||||
Returns (all_properties_by_id, per_source_counts, total_dedup_count).
|
||||
"""
|
||||
all_properties: dict[str, dict] = {}
|
||||
seen_keys: set[tuple] = set()
|
||||
counts = {"rm": 0, "hk": 0, "or": 0, "zp": 0}
|
||||
total_dedup = 0
|
||||
|
||||
# Rightmove first (priority source)
|
||||
for p in rm_props:
|
||||
pid = p["id"]
|
||||
if pid not in all_properties:
|
||||
all_properties[pid] = p
|
||||
seen_keys.add(_dedup_key(p))
|
||||
counts["rm"] += 1
|
||||
|
||||
# Other sources (check for cross-source duplicates)
|
||||
for source, props in [("hk", hk_props), ("or", or_props), ("zp", zp_props)]:
|
||||
for p in props:
|
||||
pid = p["id"]
|
||||
key = _dedup_key(p)
|
||||
if pid in all_properties or key in seen_keys:
|
||||
total_dedup += 1
|
||||
continue
|
||||
all_properties[pid] = p
|
||||
seen_keys.add(key)
|
||||
counts[source] += 1
|
||||
|
||||
return all_properties, counts, total_dedup
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Checkpointing — save/resume partial results across crashes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _checkpoint_meta_path():
|
||||
return DATA_DIR / "checkpoint.json"
|
||||
|
||||
|
||||
def _checkpoint_results_path(source: str, channel: str):
|
||||
return DATA_DIR / f"checkpoint_{source}_{channel}.json"
|
||||
|
||||
|
||||
def _save_checkpoint(
|
||||
shuffled: list[str],
|
||||
progress: _Progress,
|
||||
source_results: dict[str, dict[str, list]],
|
||||
active_sources: list[str],
|
||||
) -> None:
|
||||
"""Save per-source progress indices and partial results to disk.
|
||||
|
||||
Writes atomically (temp + rename) so a crash mid-write leaves the previous
|
||||
checkpoint intact.
|
||||
"""
|
||||
snap = progress.snapshot()
|
||||
|
||||
meta = {
|
||||
"seed": SEED,
|
||||
"num_outcodes": len(shuffled),
|
||||
"sources": {s: snap.get(s, 0) for s in active_sources},
|
||||
"timestamp": time.time(),
|
||||
}
|
||||
|
||||
# Write result files per source per channel
|
||||
for source in active_sources:
|
||||
results = source_results.get(source, {})
|
||||
for ch_key in ("BUY", "RENT"):
|
||||
props = results.get(ch_key, [])
|
||||
path = _checkpoint_results_path(source, ch_key.lower())
|
||||
tmp = path.with_suffix(".tmp")
|
||||
try:
|
||||
with open(tmp, "w") as f:
|
||||
json.dump(props, f, default=str)
|
||||
tmp.rename(path)
|
||||
except Exception as e:
|
||||
log.warning("Failed to write checkpoint %s: %s", path.name, e)
|
||||
|
||||
# Write metadata atomically
|
||||
tmp = _checkpoint_meta_path().with_suffix(".tmp")
|
||||
try:
|
||||
with open(tmp, "w") as f:
|
||||
json.dump(meta, f)
|
||||
tmp.rename(_checkpoint_meta_path())
|
||||
except Exception as e:
|
||||
log.warning("Failed to write checkpoint metadata: %s", e)
|
||||
return
|
||||
|
||||
total = sum(len(source_results.get(s, {}).get(ch, []))
|
||||
for s in active_sources for ch in ("BUY", "RENT"))
|
||||
log.info(
|
||||
"Checkpoint saved: %s (%d properties)",
|
||||
{s: snap.get(s, 0) for s in active_sources},
|
||||
total,
|
||||
)
|
||||
|
||||
|
||||
def _load_checkpoint(
|
||||
shuffled: list[str],
|
||||
) -> tuple[dict[str, int], dict[str, dict[str, list]]] | None:
|
||||
"""Load checkpoint if it exists and matches the current outcode list.
|
||||
|
||||
Returns (start_indices, loaded_results) or None if no valid checkpoint.
|
||||
"""
|
||||
path = _checkpoint_meta_path()
|
||||
if not path.exists():
|
||||
return None
|
||||
|
||||
try:
|
||||
with open(path) as f:
|
||||
meta = json.load(f)
|
||||
except Exception:
|
||||
log.warning("Checkpoint file corrupt, starting fresh")
|
||||
_clear_checkpoint()
|
||||
return None
|
||||
|
||||
if meta.get("seed") != SEED or meta.get("num_outcodes") != len(shuffled):
|
||||
log.info("Checkpoint from different run configuration, discarding")
|
||||
_clear_checkpoint()
|
||||
return None
|
||||
|
||||
start_indices: dict[str, int] = {}
|
||||
loaded_results: dict[str, dict[str, list]] = {}
|
||||
|
||||
for source, completed in meta.get("sources", {}).items():
|
||||
start_indices[source] = completed
|
||||
loaded_results[source] = {"BUY": [], "RENT": []}
|
||||
for channel in ("buy", "rent"):
|
||||
rpath = _checkpoint_results_path(source, channel)
|
||||
if rpath.exists():
|
||||
try:
|
||||
with open(rpath) as f:
|
||||
raw = json.load(f)
|
||||
# Deduplicate by ID — concurrent workers (e.g. hk_worker's
|
||||
# ThreadPoolExecutor) can cause in-flight outcodes to have
|
||||
# results saved before their progress index is recorded.
|
||||
# On resume those outcodes get re-scraped, duplicating results.
|
||||
seen_ids: set[str] = set()
|
||||
deduped: list[dict] = []
|
||||
for p in raw:
|
||||
pid = p.get("id")
|
||||
if pid not in seen_ids:
|
||||
seen_ids.add(pid)
|
||||
deduped.append(p)
|
||||
if len(deduped) < len(raw):
|
||||
log.info(
|
||||
"Checkpoint %s/%s: deduped %d → %d (removed %d dupes)",
|
||||
source, channel, len(raw), len(deduped),
|
||||
len(raw) - len(deduped),
|
||||
)
|
||||
loaded_results[source][channel.upper()] = deduped
|
||||
except Exception:
|
||||
log.warning(
|
||||
"Checkpoint results for %s/%s corrupt, restarting %s",
|
||||
source, channel, source,
|
||||
)
|
||||
start_indices[source] = 0
|
||||
loaded_results[source] = {"BUY": [], "RENT": []}
|
||||
break
|
||||
|
||||
elapsed_since = time.time() - meta.get("timestamp", 0)
|
||||
log.info(
|
||||
"Resuming from checkpoint (saved %.0fm ago): %s",
|
||||
elapsed_since / 60,
|
||||
start_indices,
|
||||
)
|
||||
return start_indices, loaded_results
|
||||
|
||||
|
||||
def _clear_checkpoint() -> None:
|
||||
"""Remove all checkpoint files after successful completion."""
|
||||
for path in DATA_DIR.glob("checkpoint*"):
|
||||
try:
|
||||
path.unlink()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def run_scrape(
|
||||
outcodes: list[str],
|
||||
pc_index: PostcodeSpatialIndex,
|
||||
pc_coords: dict[str, tuple[float, float]] | None = None,
|
||||
) -> None:
|
||||
"""Main scrape orchestrator — runs all sources in parallel threads.
|
||||
|
||||
Each source (Rightmove, home.co.uk, OpenRent, Zoopla) gets its own thread
|
||||
that iterates all outcodes for both BUY and RENT channels. Results are
|
||||
merged with cross-source deduplication after all workers complete.
|
||||
"""
|
||||
global status
|
||||
with status_lock:
|
||||
status.state = "running"
|
||||
status.started_at = time.time()
|
||||
status.finished_at = 0.0
|
||||
status.errors = []
|
||||
status.properties_buy = 0
|
||||
status.properties_rent = 0
|
||||
status.channel = ""
|
||||
status.outcode = ""
|
||||
_sync_gauges()
|
||||
|
||||
shuffled = list(outcodes)
|
||||
random.seed(SEED)
|
||||
random.shuffle(shuffled)
|
||||
|
||||
if not any([SCRAPE_RIGHTMOVE, SCRAPE_HOMECOUK, SCRAPE_OPENRENT, SCRAPE_ZOOPLA]):
|
||||
log.warning("All scrapers disabled — nothing to do")
|
||||
with status_lock:
|
||||
status.state = "done"
|
||||
status.finished_at = time.time()
|
||||
_sync_gauges()
|
||||
return
|
||||
|
||||
if not SCRAPE_RIGHTMOVE:
|
||||
log.info("Rightmove scraping DISABLED (SCRAPE_RIGHTMOVE=false)")
|
||||
if not SCRAPE_HOMECOUK:
|
||||
log.info("home.co.uk scraping DISABLED (SCRAPE_HOMECOUK=false)")
|
||||
homecouk_enabled.set(0)
|
||||
if not SCRAPE_OPENRENT:
|
||||
log.info("OpenRent scraping DISABLED (SCRAPE_OPENRENT=false)")
|
||||
openrent_enabled.set(0)
|
||||
if not SCRAPE_ZOOPLA:
|
||||
log.info("Zoopla scraping DISABLED (SCRAPE_ZOOPLA=false)")
|
||||
zoopla_enabled.set(0)
|
||||
|
||||
# Build postcode coords if needed for OpenRent/Zoopla
|
||||
if (SCRAPE_OPENRENT or SCRAPE_ZOOPLA) and pc_coords is None:
|
||||
pc_coords = build_postcode_coords()
|
||||
|
||||
# Per-source result containers: {channel_name: [properties]}
|
||||
# Each list is only written by its owning source thread.
|
||||
rm_results: dict[str, list] = {"BUY": [], "RENT": []}
|
||||
hk_results: dict[str, list] = {"BUY": [], "RENT": []}
|
||||
or_results: dict[str, list] = {"BUY": [], "RENT": []}
|
||||
zp_results: dict[str, list] = {"BUY": [], "RENT": []}
|
||||
|
||||
progress = _Progress()
|
||||
|
||||
# --- Resume from checkpoint if available ---
|
||||
start_indices: dict[str, int] = {}
|
||||
checkpoint = _load_checkpoint(shuffled)
|
||||
if checkpoint:
|
||||
start_indices, loaded = checkpoint
|
||||
source_to_results = {"rm": rm_results, "hk": hk_results, "or": or_results, "zp": zp_results}
|
||||
for src, data in loaded.items():
|
||||
if src in source_to_results:
|
||||
for ch in ("BUY", "RENT"):
|
||||
source_to_results[src][ch] = data.get(ch, [])
|
||||
# Reassign in case references changed
|
||||
rm_results = source_to_results["rm"]
|
||||
hk_results = source_to_results["hk"]
|
||||
or_results = source_to_results["or"]
|
||||
zp_results = source_to_results["zp"]
|
||||
# Pre-set progress for resumed sources
|
||||
for src, idx in start_indices.items():
|
||||
if idx > 0:
|
||||
progress.update(src, idx)
|
||||
|
||||
# --- Source worker closures ---
|
||||
# Each worker owns its client lifecycle and iterates all outcodes for both
|
||||
# channels. On auth failure, it refreshes cookies and continues. On fatal
|
||||
# failure, it marks itself as done and returns partial results.
|
||||
|
||||
def rm_worker():
|
||||
rm_start = start_indices.get("rm", 0)
|
||||
if rm_start > 0:
|
||||
log.info("Rightmove resuming from outcode %d/%d", rm_start, len(shuffled))
|
||||
client = make_client()
|
||||
try:
|
||||
for i, outcode in enumerate(shuffled):
|
||||
if i < rm_start:
|
||||
continue
|
||||
try:
|
||||
outcode_id = resolve_outcode_id(client, outcode)
|
||||
except Exception as e:
|
||||
log.error("Rightmove %s ID lookup: %s", outcode, e)
|
||||
scrape_errors_total.labels(source="rightmove").inc()
|
||||
progress.update("rm", i + 1)
|
||||
time.sleep(DELAY_BETWEEN_OUTCODES)
|
||||
continue
|
||||
|
||||
if not outcode_id:
|
||||
log.debug("No Rightmove ID for %s, skipping", outcode)
|
||||
progress.update("rm", i + 1)
|
||||
time.sleep(DELAY_BETWEEN_OUTCODES)
|
||||
continue
|
||||
|
||||
for ch_cfg in CHANNELS:
|
||||
ch = ch_cfg["channel"]
|
||||
try:
|
||||
props = search_outcode(
|
||||
client, outcode_id, outcode, ch_cfg, pc_index
|
||||
)
|
||||
rm_results[ch].extend(props)
|
||||
except Exception as e:
|
||||
log.error("Rightmove %s/%s: %s", outcode, ch, e)
|
||||
scrape_errors_total.labels(source="rightmove").inc()
|
||||
|
||||
progress.update("rm", i + 1)
|
||||
time.sleep(DELAY_BETWEEN_OUTCODES)
|
||||
except Exception as e:
|
||||
log.exception("Fatal Rightmove error: %s", e)
|
||||
with status_lock:
|
||||
status.errors.append(f"Fatal Rightmove: {e}")
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
def hk_worker():
|
||||
hk_result = load_homecouk_cookies()
|
||||
if not hk_result:
|
||||
log.info("home.co.uk DISABLED (no cookies available)")
|
||||
homecouk_enabled.set(0)
|
||||
progress.update("hk", len(shuffled))
|
||||
return
|
||||
hk_start = start_indices.get("hk", 0)
|
||||
if hk_start > 0:
|
||||
log.info("home.co.uk resuming from outcode %d/%d", hk_start, len(shuffled))
|
||||
log.info(
|
||||
"home.co.uk scraping ENABLED (concurrency=%d)", HOMECOUK_CONCURRENCY
|
||||
)
|
||||
homecouk_enabled.set(1)
|
||||
|
||||
# Shared state across pool threads
|
||||
cookie_state = {
|
||||
"cookies": hk_result[0],
|
||||
"user_agent": hk_result[1],
|
||||
"generation": 0,
|
||||
}
|
||||
cookie_lock = threading.Lock()
|
||||
results_lock = threading.Lock()
|
||||
completed_count = [hk_start]
|
||||
disabled = [False]
|
||||
_local = threading.local()
|
||||
|
||||
def _get_client():
|
||||
"""Get or create a thread-local curl_cffi session."""
|
||||
with cookie_lock:
|
||||
gen = cookie_state["generation"]
|
||||
cookies = cookie_state["cookies"]
|
||||
ua = cookie_state["user_agent"]
|
||||
if not hasattr(_local, "client") or _local.gen != gen:
|
||||
if hasattr(_local, "client"):
|
||||
try:
|
||||
_local.client.close()
|
||||
except Exception:
|
||||
pass
|
||||
_local.client = make_homecouk_client(cookies, ua)
|
||||
_local.gen = gen
|
||||
return _local.client
|
||||
|
||||
def _refresh_cookies():
|
||||
"""Refresh cookies via FlareSolverr. Thread-safe with generation check."""
|
||||
with cookie_lock:
|
||||
pre_gen = cookie_state["generation"]
|
||||
new = load_homecouk_cookies()
|
||||
if not new:
|
||||
return False
|
||||
with cookie_lock:
|
||||
if cookie_state["generation"] == pre_gen:
|
||||
cookie_state["cookies"] = new[0]
|
||||
cookie_state["user_agent"] = new[1]
|
||||
cookie_state["generation"] += 1
|
||||
cookie_refreshes_total.labels(result="success").inc()
|
||||
log.info("home.co.uk cookies refreshed")
|
||||
return True
|
||||
|
||||
def _scrape_outcode(outcode):
|
||||
if disabled[0]:
|
||||
return
|
||||
client = _get_client()
|
||||
for ch_cfg in CHANNELS:
|
||||
ch = ch_cfg["channel"]
|
||||
if disabled[0]:
|
||||
return
|
||||
try:
|
||||
props = homecouk_search_outcode(
|
||||
client, outcode, ch, pc_index
|
||||
)
|
||||
if props:
|
||||
with results_lock:
|
||||
hk_results[ch].extend(props)
|
||||
log.info(
|
||||
"home.co.uk %s: +%d properties", outcode, len(props)
|
||||
)
|
||||
except CookiesExpiredError:
|
||||
log.warning(
|
||||
"home.co.uk cookies expired — attempting refresh"
|
||||
)
|
||||
if _refresh_cookies():
|
||||
client = _get_client()
|
||||
try:
|
||||
props = homecouk_search_outcode(
|
||||
client, outcode, ch, pc_index
|
||||
)
|
||||
if props:
|
||||
with results_lock:
|
||||
hk_results[ch].extend(props)
|
||||
log.info(
|
||||
"home.co.uk %s: +%d properties",
|
||||
outcode,
|
||||
len(props),
|
||||
)
|
||||
except Exception as e:
|
||||
log.error(
|
||||
"home.co.uk %s/%s (after refresh): %s",
|
||||
outcode,
|
||||
ch,
|
||||
e,
|
||||
)
|
||||
scrape_errors_total.labels(source="homecouk").inc()
|
||||
else:
|
||||
log.warning(
|
||||
"Cookie refresh failed, disabling home.co.uk"
|
||||
)
|
||||
disabled[0] = True
|
||||
homecouk_enabled.set(0)
|
||||
cookie_refreshes_total.labels(result="failure").inc()
|
||||
with status_lock:
|
||||
status.errors.append(
|
||||
"home.co.uk cookies expired and refresh failed"
|
||||
)
|
||||
return
|
||||
except Exception as e:
|
||||
log.error("home.co.uk %s/%s: %s", outcode, ch, e)
|
||||
scrape_errors_total.labels(source="homecouk").inc()
|
||||
|
||||
with results_lock:
|
||||
completed_count[0] += 1
|
||||
progress.update("hk", completed_count[0])
|
||||
time.sleep(DELAY_BETWEEN_OUTCODES)
|
||||
|
||||
try:
|
||||
work = [oc for i, oc in enumerate(shuffled) if i >= hk_start]
|
||||
with ThreadPoolExecutor(
|
||||
max_workers=HOMECOUK_CONCURRENCY
|
||||
) as pool:
|
||||
list(pool.map(_scrape_outcode, work))
|
||||
except Exception as e:
|
||||
log.exception("Fatal home.co.uk error: %s", e)
|
||||
with status_lock:
|
||||
status.errors.append(f"Fatal home.co.uk: {e}")
|
||||
|
||||
if disabled[0]:
|
||||
progress.update("hk", len(shuffled))
|
||||
|
||||
def or_worker():
|
||||
or_result = load_openrent_cookies()
|
||||
if not or_result:
|
||||
log.info("OpenRent DISABLED (no cookies available)")
|
||||
openrent_enabled.set(0)
|
||||
progress.update("or", len(shuffled))
|
||||
return
|
||||
or_start = start_indices.get("or", 0)
|
||||
if or_start > 0:
|
||||
log.info("OpenRent resuming from outcode %d/%d", or_start, len(shuffled))
|
||||
client = make_openrent_client(*or_result)
|
||||
log.info("OpenRent scraping ENABLED")
|
||||
openrent_enabled.set(1)
|
||||
try:
|
||||
for i, outcode in enumerate(shuffled):
|
||||
if i < or_start:
|
||||
continue
|
||||
# OpenRent is RENT-only
|
||||
try:
|
||||
props = openrent_search_outcode(
|
||||
client, outcode, pc_index, pc_coords
|
||||
)
|
||||
or_results["RENT"].extend(props)
|
||||
if props:
|
||||
log.info("OpenRent %s: +%d properties", outcode, len(props))
|
||||
except WafChallengeError:
|
||||
log.warning(
|
||||
"OpenRent WAF cookies expired — attempting refresh"
|
||||
)
|
||||
client.close()
|
||||
or_new = load_openrent_cookies()
|
||||
if or_new:
|
||||
client = make_openrent_client(*or_new)
|
||||
log.info("OpenRent cookies refreshed, continuing")
|
||||
cookie_refreshes_total.labels(result="success").inc()
|
||||
else:
|
||||
log.warning(
|
||||
"Cookie refresh failed, disabling OpenRent"
|
||||
)
|
||||
openrent_enabled.set(0)
|
||||
cookie_refreshes_total.labels(result="failure").inc()
|
||||
with status_lock:
|
||||
status.errors.append(
|
||||
"OpenRent WAF cookies expired and refresh failed"
|
||||
)
|
||||
progress.update("or", len(shuffled))
|
||||
return
|
||||
except Exception as e:
|
||||
log.error("OpenRent %s: %s", outcode, e)
|
||||
scrape_errors_total.labels(source="openrent").inc()
|
||||
|
||||
progress.update("or", i + 1)
|
||||
time.sleep(DELAY_BETWEEN_OUTCODES)
|
||||
except Exception as e:
|
||||
log.exception("Fatal OpenRent error: %s", e)
|
||||
with status_lock:
|
||||
status.errors.append(f"Fatal OpenRent: {e}")
|
||||
finally:
|
||||
try:
|
||||
client.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def zp_worker():
|
||||
try:
|
||||
browser, page = launch_zoopla_browser()
|
||||
log.info("Zoopla scraping ENABLED (Camoufox browser launched)")
|
||||
zoopla_enabled.set(1)
|
||||
except TurnstileError:
|
||||
log.warning("Zoopla Cloudflare Turnstile failed — disabling Zoopla")
|
||||
zoopla_enabled.set(0)
|
||||
progress.update("zp", len(shuffled))
|
||||
return
|
||||
except Exception as e:
|
||||
log.warning("Zoopla browser launch failed: %s — disabling Zoopla", e)
|
||||
zoopla_enabled.set(0)
|
||||
progress.update("zp", len(shuffled))
|
||||
return
|
||||
|
||||
zp_start = start_indices.get("zp", 0)
|
||||
if zp_start > 0:
|
||||
log.info("Zoopla resuming from outcode %d/%d", zp_start, len(shuffled))
|
||||
|
||||
try:
|
||||
for i, outcode in enumerate(shuffled):
|
||||
if i < zp_start:
|
||||
continue
|
||||
search_url = None
|
||||
for ch_cfg in CHANNELS:
|
||||
ch = ch_cfg["channel"]
|
||||
# Build direct URL for second channel by swapping path
|
||||
direct_url = None
|
||||
if search_url:
|
||||
if ch == "BUY":
|
||||
direct_url = search_url.replace("/to-rent/", "/for-sale/")
|
||||
else:
|
||||
direct_url = search_url.replace("/for-sale/", "/to-rent/")
|
||||
try:
|
||||
props, result_url = zoopla_search_outcode(
|
||||
page, outcode, ch, pc_index, pc_coords,
|
||||
base_search_url=direct_url,
|
||||
)
|
||||
if result_url:
|
||||
search_url = result_url
|
||||
zp_results[ch].extend(props)
|
||||
if props:
|
||||
log.info("Zoopla %s: +%d properties", outcode, len(props))
|
||||
except TurnstileError:
|
||||
log.warning(
|
||||
"Zoopla Turnstile challenge — relaunching browser"
|
||||
)
|
||||
try:
|
||||
browser.close()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
browser, page = launch_zoopla_browser()
|
||||
log.info("Zoopla browser relaunched, continuing")
|
||||
except Exception:
|
||||
log.warning(
|
||||
"Browser relaunch failed, disabling Zoopla"
|
||||
)
|
||||
zoopla_enabled.set(0)
|
||||
with status_lock:
|
||||
status.errors.append(
|
||||
"Zoopla Cloudflare challenge failed and relaunch failed"
|
||||
)
|
||||
progress.update("zp", len(shuffled))
|
||||
return
|
||||
except Exception as e:
|
||||
log.error("Zoopla %s/%s: %s", outcode, ch, e)
|
||||
scrape_errors_total.labels(source="zoopla").inc()
|
||||
|
||||
progress.update("zp", i + 1)
|
||||
time.sleep(DELAY_BETWEEN_OUTCODES)
|
||||
except Exception as e:
|
||||
log.exception("Fatal Zoopla error: %s", e)
|
||||
with status_lock:
|
||||
status.errors.append(f"Fatal Zoopla: {e}")
|
||||
finally:
|
||||
try:
|
||||
browser.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# --- Launch worker threads ---
|
||||
|
||||
active_sources: list[str] = []
|
||||
threads: list[threading.Thread] = []
|
||||
|
||||
if SCRAPE_RIGHTMOVE:
|
||||
threads.append(threading.Thread(target=rm_worker, name="scrape-rm", daemon=True))
|
||||
active_sources.append("rm")
|
||||
if SCRAPE_HOMECOUK:
|
||||
threads.append(threading.Thread(target=hk_worker, name="scrape-hk", daemon=True))
|
||||
active_sources.append("hk")
|
||||
if SCRAPE_OPENRENT:
|
||||
threads.append(threading.Thread(target=or_worker, name="scrape-or", daemon=True))
|
||||
active_sources.append("or")
|
||||
if SCRAPE_ZOOPLA:
|
||||
threads.append(threading.Thread(target=zp_worker, name="scrape-zp", daemon=True))
|
||||
active_sources.append("zp")
|
||||
|
||||
log.info(
|
||||
"=== Starting scrape: %d outcodes, sources: %s ===",
|
||||
len(shuffled),
|
||||
", ".join(active_sources),
|
||||
)
|
||||
|
||||
for t in threads:
|
||||
t.start()
|
||||
|
||||
# --- Monitor progress while workers run ---
|
||||
|
||||
# Map source names to result dicts for checkpointing
|
||||
source_results_map = {
|
||||
"rm": rm_results, "hk": hk_results,
|
||||
"or": or_results, "zp": zp_results,
|
||||
}
|
||||
|
||||
scrape_start = time.time()
|
||||
last_log = 0.0
|
||||
last_checkpoint = time.time()
|
||||
|
||||
try:
|
||||
while any(t.is_alive() for t in threads):
|
||||
snap = progress.snapshot()
|
||||
min_done = min(
|
||||
(snap.get(s, 0) for s in active_sources), default=0
|
||||
)
|
||||
|
||||
# Count properties across sources (safe: only one thread writes each list)
|
||||
total_buy = sum(
|
||||
len(r["BUY"]) for r in [rm_results, hk_results, or_results, zp_results]
|
||||
)
|
||||
total_rent = sum(
|
||||
len(r["RENT"]) for r in [rm_results, hk_results, or_results, zp_results]
|
||||
)
|
||||
|
||||
with status_lock:
|
||||
status.outcodes_done = min_done
|
||||
status.outcodes_total = len(shuffled)
|
||||
status.properties_buy = total_buy
|
||||
status.properties_rent = total_rent
|
||||
status.rm_properties = len(rm_results["BUY"]) + len(rm_results["RENT"])
|
||||
status.hk_properties = len(hk_results["BUY"]) + len(hk_results["RENT"])
|
||||
status.or_properties = len(or_results["RENT"])
|
||||
status.zp_properties = len(zp_results["BUY"]) + len(zp_results["RENT"])
|
||||
_sync_gauges()
|
||||
|
||||
now = time.time()
|
||||
|
||||
# Log progress every 30 seconds
|
||||
if now - last_log >= 30:
|
||||
elapsed = now - scrape_start
|
||||
per_source = ", ".join(
|
||||
f"{s}:{snap.get(s, 0)}" for s in active_sources
|
||||
)
|
||||
log.info(
|
||||
"Progress: %d/%d outcodes (%s), %d buy + %d rent props, %s elapsed",
|
||||
min_done,
|
||||
len(shuffled),
|
||||
per_source,
|
||||
total_buy,
|
||||
total_rent,
|
||||
_fmt_elapsed(elapsed),
|
||||
)
|
||||
last_log = now
|
||||
|
||||
# Save checkpoint periodically
|
||||
if now - last_checkpoint >= CHECKPOINT_INTERVAL:
|
||||
try:
|
||||
_save_checkpoint(
|
||||
shuffled, progress, source_results_map, active_sources,
|
||||
)
|
||||
except Exception as e:
|
||||
log.warning("Checkpoint save failed: %s", e)
|
||||
last_checkpoint = now
|
||||
|
||||
time.sleep(5)
|
||||
except Exception as e:
|
||||
log.exception("Monitor loop error: %s", e)
|
||||
|
||||
# Save final checkpoint before joining (in case merge/write fails)
|
||||
try:
|
||||
_save_checkpoint(shuffled, progress, source_results_map, active_sources)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
log.info("All source workers completed")
|
||||
|
||||
# --- Merge results per channel and write parquet ---
|
||||
|
||||
try:
|
||||
for ch_cfg in CHANNELS:
|
||||
ch = ch_cfg["channel"]
|
||||
file_suffix = "buy" if ch == "BUY" else "rent"
|
||||
|
||||
merged, counts, total_dedup = _merge_channel(
|
||||
rm_results[ch],
|
||||
hk_results[ch],
|
||||
or_results[ch],
|
||||
zp_results[ch],
|
||||
)
|
||||
|
||||
# Update cross-source dedup counter
|
||||
ch_label = "buy" if ch == "BUY" else "rent"
|
||||
if total_dedup:
|
||||
cross_source_dedup_total.labels(channel=ch_label).inc(total_dedup)
|
||||
|
||||
deduped = list(merged.values())
|
||||
output_path = DATA_DIR / f"online_listings_{file_suffix}.parquet"
|
||||
write_parquet(deduped, output_path, channel=file_suffix)
|
||||
|
||||
with status_lock:
|
||||
if ch == "BUY":
|
||||
status.properties_buy = len(deduped)
|
||||
else:
|
||||
status.properties_rent = len(deduped)
|
||||
_sync_gauges()
|
||||
|
||||
log.info(
|
||||
"=== %s complete: %d unique (rm:%d hk:%d or:%d zp:%d, cross-dedup:%d) ===",
|
||||
ch,
|
||||
len(deduped),
|
||||
counts["rm"],
|
||||
counts["hk"],
|
||||
counts["or"],
|
||||
counts["zp"],
|
||||
total_dedup,
|
||||
)
|
||||
|
||||
# Scrape completed successfully — clear checkpoint
|
||||
_clear_checkpoint()
|
||||
|
||||
with status_lock:
|
||||
status.state = "done"
|
||||
status.finished_at = time.time()
|
||||
status.outcodes_done = len(shuffled)
|
||||
_sync_gauges()
|
||||
elapsed = status.finished_at - status.started_at
|
||||
log.info(
|
||||
"Scrape complete in %s — buy: %d, rent: %d",
|
||||
_fmt_elapsed(elapsed),
|
||||
status.properties_buy,
|
||||
status.properties_rent,
|
||||
)
|
||||
|
||||
# Trigger server data reload
|
||||
if RELOAD_URL:
|
||||
try:
|
||||
log.info("Triggering server reload at %s", RELOAD_URL)
|
||||
resp = httpx.post(RELOAD_URL, timeout=300)
|
||||
if resp.is_success:
|
||||
body = resp.json()
|
||||
log.info(
|
||||
"Server reload complete: %d rows, %d features, %dms",
|
||||
body.get("rows", 0),
|
||||
body.get("features", 0),
|
||||
body.get("elapsed_ms", 0),
|
||||
)
|
||||
else:
|
||||
log.warning(
|
||||
"Server reload failed (%d): %s",
|
||||
resp.status_code,
|
||||
resp.text[:200],
|
||||
)
|
||||
except Exception as e:
|
||||
log.warning("Server reload request failed: %s", e)
|
||||
|
||||
except Exception as e:
|
||||
log.exception("Fatal scrape error during merge/write")
|
||||
with status_lock:
|
||||
status.state = "error"
|
||||
status.errors.append(f"Fatal: {e}")
|
||||
status.finished_at = time.time()
|
||||
_sync_gauges()
|
||||
Loading…
Add table
Add a link
Reference in a new issue