perfect-postcode/finder/scraper.py

689 lines
25 KiB
Python

import logging
import random
import threading
import time
from dataclasses import dataclass, field
import polars as pl
import httpx
from constants import (
ARCGIS_PATH,
CHANNELS,
DATA_DIR,
DELAY_BETWEEN_OUTCODES,
RELOAD_URL,
SCRAPE_HOMECOUK,
SCRAPE_OPENRENT,
SCRAPE_RIGHTMOVE,
SCRAPE_ZOOPLA,
SEED,
)
from homecouk import CookiesExpiredError
from homecouk import load_cookies as load_homecouk_cookies
from homecouk import make_client as make_homecouk_client
from homecouk import search_outcode as homecouk_search_outcode
from http_client import make_client
from metrics import (
cookie_refreshes_total,
cross_source_dedup_total,
homecouk_enabled,
openrent_enabled,
scrape_elapsed_seconds,
scrape_errors_total,
scrape_outcodes_done,
scrape_outcodes_total,
scrape_properties_total,
scrape_state,
zoopla_enabled,
)
from openrent import WafChallengeError
from openrent import load_cookies as load_openrent_cookies
from openrent import make_client as make_openrent_client
from openrent import search_outcode as openrent_search_outcode
from rightmove import resolve_outcode_id, search_outcode
from zoopla import TurnstileError
from zoopla import launch_browser as launch_zoopla_browser
from zoopla import search_outcode as zoopla_search_outcode
from spatial import PostcodeSpatialIndex
from storage import write_parquet
log = logging.getLogger("rightmove")
@dataclass
class ScrapeStatus:
state: str = "idle" # idle | running | done | error
channel: str = ""
outcode: str = ""
outcodes_done: int = 0
outcodes_total: int = 0
properties_buy: int = 0
properties_rent: int = 0
# Per-source counts (combined across channels)
rm_properties: int = 0
hk_properties: int = 0
or_properties: int = 0
zp_properties: int = 0
errors: list[str] = field(default_factory=list)
started_at: float = 0.0
finished_at: float = 0.0
status = ScrapeStatus()
status_lock = threading.Lock()
def _sync_gauges() -> None:
"""Push current ScrapeStatus values into Prometheus gauges. Must hold status_lock."""
for state in ("idle", "running", "done", "error"):
scrape_state.labels(state=state).set(1 if status.state == state else 0)
scrape_outcodes_done.set(status.outcodes_done)
scrape_outcodes_total.set(status.outcodes_total)
scrape_properties_total.labels(channel="buy", source="total").set(
status.properties_buy
)
scrape_properties_total.labels(channel="rent", source="total").set(
status.properties_rent
)
# Per-source totals (across both channels)
for ch in ("buy", "rent"):
scrape_properties_total.labels(channel=ch, source="rightmove").set(
status.rm_properties
)
scrape_properties_total.labels(channel=ch, source="homecouk").set(
status.hk_properties
)
scrape_properties_total.labels(channel=ch, source="openrent").set(
status.or_properties
)
scrape_properties_total.labels(channel=ch, source="zoopla").set(
status.zp_properties
)
if status.started_at:
end = status.finished_at if status.finished_at else time.time()
scrape_elapsed_seconds.set(end - status.started_at)
else:
scrape_elapsed_seconds.set(0)
def load_outcodes() -> list[str]:
"""Load England-only outcodes from arcgis parquet."""
log.info("Loading outcodes from %s", ARCGIS_PATH)
df = pl.read_parquet(ARCGIS_PATH, columns=["pcd", "ctry", "lat", "long"])
england = df.filter(pl.col("ctry") == "E92000001")
log.info("England postcodes: %d", len(england))
outcodes = (
england.select(
pl.col("pcd").str.extract(r"^([A-Z]{1,2}\d[A-Z0-9]?)", 1).alias("outcode")
)
.drop_nulls()
.get_column("outcode")
.unique()
.sort()
.to_list()
)
log.info("Unique England outcodes: %d", len(outcodes))
return outcodes
def build_postcode_index() -> PostcodeSpatialIndex:
"""Build spatial index from arcgis England postcodes."""
log.info("Building postcode spatial index from %s", ARCGIS_PATH)
df = pl.read_parquet(ARCGIS_PATH, columns=["pcd", "ctry", "lat", "long"])
england = df.filter(pl.col("ctry") == "E92000001").drop_nulls(
subset=["lat", "long"]
)
return PostcodeSpatialIndex(
england.get_column("lat").to_list(),
england.get_column("long").to_list(),
england.get_column("pcd").to_list(),
)
def build_postcode_coords() -> dict[str, tuple[float, float]]:
"""Build postcode → (lat, lng) lookup from arcgis England postcodes.
Used by OpenRent scraper to resolve coordinates from postcodes."""
log.info("Building postcode coords lookup from %s", ARCGIS_PATH)
df = pl.read_parquet(ARCGIS_PATH, columns=["pcd", "ctry", "lat", "long"])
england = df.filter(pl.col("ctry") == "E92000001").drop_nulls(
subset=["lat", "long"]
)
coords: dict[str, tuple[float, float]] = {}
for pcd, lat, lng in zip(
england.get_column("pcd").to_list(),
england.get_column("lat").to_list(),
england.get_column("long").to_list(),
):
coords[pcd] = (lat, lng)
log.info("Postcode coords lookup: %d postcodes", len(coords))
return coords
def _fmt_elapsed(seconds: float) -> str:
"""Format seconds as e.g. '2h13m' or '5m32s'."""
h, rem = divmod(int(seconds), 3600)
m, s = divmod(rem, 60)
if h:
return f"{h}h{m:02d}m"
return f"{m}m{s:02d}s"
def _dedup_key(p: dict) -> tuple:
"""Composite key for cross-source deduplication: (postcode, bedrooms, price).
Two listings on different portals for the same physical property will share
these attributes even though their IDs differ."""
return (p.get("Postcode", ""), p.get("Bedrooms", 0), p.get("price", 0))
class _Progress:
"""Thread-safe progress tracker for parallel source workers."""
def __init__(self):
self._counts: dict[str, int] = {}
self._lock = threading.Lock()
def update(self, source: str, done: int) -> None:
with self._lock:
self._counts[source] = done
def snapshot(self) -> dict[str, int]:
with self._lock:
return dict(self._counts)
def _merge_channel(
rm_props: list[dict],
hk_props: list[dict],
or_props: list[dict],
zp_props: list[dict],
) -> tuple[dict[str, dict], dict[str, int], int]:
"""Merge properties from all sources for one channel with cross-source dedup.
Rightmove has priority; other sources are checked for duplicates.
Returns (all_properties_by_id, per_source_counts, total_dedup_count).
"""
all_properties: dict[str, dict] = {}
seen_keys: set[tuple] = set()
counts = {"rm": 0, "hk": 0, "or": 0, "zp": 0}
total_dedup = 0
# Rightmove first (priority source)
for p in rm_props:
pid = p["id"]
if pid not in all_properties:
all_properties[pid] = p
seen_keys.add(_dedup_key(p))
counts["rm"] += 1
# Other sources (check for cross-source duplicates)
for source, props in [("hk", hk_props), ("or", or_props), ("zp", zp_props)]:
for p in props:
pid = p["id"]
key = _dedup_key(p)
if pid in all_properties or key in seen_keys:
total_dedup += 1
continue
all_properties[pid] = p
seen_keys.add(key)
counts[source] += 1
return all_properties, counts, total_dedup
def run_scrape(
outcodes: list[str],
pc_index: PostcodeSpatialIndex,
pc_coords: dict[str, tuple[float, float]] | None = None,
) -> None:
"""Main scrape orchestrator — runs all sources in parallel threads.
Each source (Rightmove, home.co.uk, OpenRent, Zoopla) gets its own thread
that iterates all outcodes for both BUY and RENT channels. Results are
merged with cross-source deduplication after all workers complete.
"""
global status
with status_lock:
status.state = "running"
status.started_at = time.time()
status.finished_at = 0.0
status.errors = []
status.properties_buy = 0
status.properties_rent = 0
status.channel = ""
status.outcode = ""
_sync_gauges()
shuffled = list(outcodes)
random.seed(SEED)
random.shuffle(shuffled)
if not any([SCRAPE_RIGHTMOVE, SCRAPE_HOMECOUK, SCRAPE_OPENRENT, SCRAPE_ZOOPLA]):
log.warning("All scrapers disabled — nothing to do")
with status_lock:
status.state = "done"
status.finished_at = time.time()
_sync_gauges()
return
if not SCRAPE_RIGHTMOVE:
log.info("Rightmove scraping DISABLED (SCRAPE_RIGHTMOVE=false)")
if not SCRAPE_HOMECOUK:
log.info("home.co.uk scraping DISABLED (SCRAPE_HOMECOUK=false)")
homecouk_enabled.set(0)
if not SCRAPE_OPENRENT:
log.info("OpenRent scraping DISABLED (SCRAPE_OPENRENT=false)")
openrent_enabled.set(0)
if not SCRAPE_ZOOPLA:
log.info("Zoopla scraping DISABLED (SCRAPE_ZOOPLA=false)")
zoopla_enabled.set(0)
# Build postcode coords if needed for OpenRent/Zoopla
if (SCRAPE_OPENRENT or SCRAPE_ZOOPLA) and pc_coords is None:
pc_coords = build_postcode_coords()
# Per-source result containers: {channel_name: [properties]}
# Each list is only written by its owning source thread.
rm_results: dict[str, list] = {"BUY": [], "RENT": []}
hk_results: dict[str, list] = {"BUY": [], "RENT": []}
or_results: dict[str, list] = {"BUY": [], "RENT": []}
zp_results: dict[str, list] = {"BUY": [], "RENT": []}
progress = _Progress()
# --- Source worker closures ---
# Each worker owns its client lifecycle and iterates all outcodes for both
# channels. On auth failure, it refreshes cookies and continues. On fatal
# failure, it marks itself as done and returns partial results.
def rm_worker():
client = make_client()
try:
for i, outcode in enumerate(shuffled):
try:
outcode_id = resolve_outcode_id(client, outcode)
except Exception as e:
log.error("Rightmove %s ID lookup: %s", outcode, e)
scrape_errors_total.labels(source="rightmove").inc()
progress.update("rm", i + 1)
time.sleep(DELAY_BETWEEN_OUTCODES)
continue
if not outcode_id:
log.debug("No Rightmove ID for %s, skipping", outcode)
progress.update("rm", i + 1)
time.sleep(DELAY_BETWEEN_OUTCODES)
continue
for ch_cfg in CHANNELS:
ch = ch_cfg["channel"]
try:
props = search_outcode(
client, outcode_id, outcode, ch_cfg, pc_index
)
rm_results[ch].extend(props)
except Exception as e:
log.error("Rightmove %s/%s: %s", outcode, ch, e)
scrape_errors_total.labels(source="rightmove").inc()
progress.update("rm", i + 1)
time.sleep(DELAY_BETWEEN_OUTCODES)
except Exception as e:
log.exception("Fatal Rightmove error: %s", e)
with status_lock:
status.errors.append(f"Fatal Rightmove: {e}")
finally:
client.close()
def hk_worker():
hk_result = load_homecouk_cookies()
if not hk_result:
log.info("home.co.uk DISABLED (no cookies available)")
homecouk_enabled.set(0)
progress.update("hk", len(shuffled))
return
client = make_homecouk_client(*hk_result)
log.info("home.co.uk scraping ENABLED")
homecouk_enabled.set(1)
try:
for i, outcode in enumerate(shuffled):
for ch_cfg in CHANNELS:
ch = ch_cfg["channel"]
try:
props = homecouk_search_outcode(
client, outcode, ch, pc_index
)
hk_results[ch].extend(props)
if props:
log.info("home.co.uk %s: +%d properties", outcode, len(props))
except CookiesExpiredError:
log.warning(
"home.co.uk cookies expired — attempting refresh"
)
client.close()
hk_new = load_homecouk_cookies()
if hk_new:
client = make_homecouk_client(*hk_new)
log.info("home.co.uk cookies refreshed, continuing")
cookie_refreshes_total.labels(result="success").inc()
else:
log.warning(
"Cookie refresh failed, disabling home.co.uk"
)
homecouk_enabled.set(0)
cookie_refreshes_total.labels(result="failure").inc()
with status_lock:
status.errors.append(
"home.co.uk cookies expired and refresh failed"
)
progress.update("hk", len(shuffled))
return
except Exception as e:
log.error("home.co.uk %s/%s: %s", outcode, ch, e)
scrape_errors_total.labels(source="homecouk").inc()
progress.update("hk", i + 1)
time.sleep(DELAY_BETWEEN_OUTCODES)
except Exception as e:
log.exception("Fatal home.co.uk error: %s", e)
with status_lock:
status.errors.append(f"Fatal home.co.uk: {e}")
finally:
try:
client.close()
except Exception:
pass
def or_worker():
or_result = load_openrent_cookies()
if not or_result:
log.info("OpenRent DISABLED (no cookies available)")
openrent_enabled.set(0)
progress.update("or", len(shuffled))
return
client = make_openrent_client(*or_result)
log.info("OpenRent scraping ENABLED")
openrent_enabled.set(1)
try:
for i, outcode in enumerate(shuffled):
# OpenRent is RENT-only
try:
props = openrent_search_outcode(
client, outcode, pc_index, pc_coords
)
or_results["RENT"].extend(props)
if props:
log.info("OpenRent %s: +%d properties", outcode, len(props))
except WafChallengeError:
log.warning(
"OpenRent WAF cookies expired — attempting refresh"
)
client.close()
or_new = load_openrent_cookies()
if or_new:
client = make_openrent_client(*or_new)
log.info("OpenRent cookies refreshed, continuing")
cookie_refreshes_total.labels(result="success").inc()
else:
log.warning(
"Cookie refresh failed, disabling OpenRent"
)
openrent_enabled.set(0)
cookie_refreshes_total.labels(result="failure").inc()
with status_lock:
status.errors.append(
"OpenRent WAF cookies expired and refresh failed"
)
progress.update("or", len(shuffled))
return
except Exception as e:
log.error("OpenRent %s: %s", outcode, e)
scrape_errors_total.labels(source="openrent").inc()
progress.update("or", i + 1)
time.sleep(DELAY_BETWEEN_OUTCODES)
except Exception as e:
log.exception("Fatal OpenRent error: %s", e)
with status_lock:
status.errors.append(f"Fatal OpenRent: {e}")
finally:
try:
client.close()
except Exception:
pass
def zp_worker():
try:
browser, page = launch_zoopla_browser()
log.info("Zoopla scraping ENABLED (Camoufox browser launched)")
zoopla_enabled.set(1)
except TurnstileError:
log.warning("Zoopla Cloudflare Turnstile failed — disabling Zoopla")
zoopla_enabled.set(0)
progress.update("zp", len(shuffled))
return
except Exception as e:
log.warning("Zoopla browser launch failed: %s — disabling Zoopla", e)
zoopla_enabled.set(0)
progress.update("zp", len(shuffled))
return
try:
for i, outcode in enumerate(shuffled):
for ch_cfg in CHANNELS:
ch = ch_cfg["channel"]
try:
props = zoopla_search_outcode(
page, outcode, ch, pc_index, pc_coords
)
zp_results[ch].extend(props)
if props:
log.info("Zoopla %s: +%d properties", outcode, len(props))
except TurnstileError:
log.warning(
"Zoopla Turnstile challenge — relaunching browser"
)
try:
browser.close()
except Exception:
pass
try:
browser, page = launch_zoopla_browser()
log.info("Zoopla browser relaunched, continuing")
except Exception:
log.warning(
"Browser relaunch failed, disabling Zoopla"
)
zoopla_enabled.set(0)
with status_lock:
status.errors.append(
"Zoopla Cloudflare challenge failed and relaunch failed"
)
progress.update("zp", len(shuffled))
return
except Exception as e:
log.error("Zoopla %s/%s: %s", outcode, ch, e)
scrape_errors_total.labels(source="zoopla").inc()
progress.update("zp", i + 1)
time.sleep(DELAY_BETWEEN_OUTCODES)
except Exception as e:
log.exception("Fatal Zoopla error: %s", e)
with status_lock:
status.errors.append(f"Fatal Zoopla: {e}")
finally:
try:
browser.close()
except Exception:
pass
# --- Launch worker threads ---
active_sources: list[str] = []
threads: list[threading.Thread] = []
if SCRAPE_RIGHTMOVE:
threads.append(threading.Thread(target=rm_worker, name="scrape-rm", daemon=True))
active_sources.append("rm")
if SCRAPE_HOMECOUK:
threads.append(threading.Thread(target=hk_worker, name="scrape-hk", daemon=True))
active_sources.append("hk")
if SCRAPE_OPENRENT:
threads.append(threading.Thread(target=or_worker, name="scrape-or", daemon=True))
active_sources.append("or")
if SCRAPE_ZOOPLA:
threads.append(threading.Thread(target=zp_worker, name="scrape-zp", daemon=True))
active_sources.append("zp")
log.info(
"=== Starting scrape: %d outcodes, sources: %s ===",
len(shuffled),
", ".join(active_sources),
)
for t in threads:
t.start()
# --- Monitor progress while workers run ---
scrape_start = time.time()
last_log = 0.0
try:
while any(t.is_alive() for t in threads):
snap = progress.snapshot()
min_done = min(
(snap.get(s, 0) for s in active_sources), default=0
)
# Count properties across sources (safe: only one thread writes each list)
total_buy = sum(
len(r["BUY"]) for r in [rm_results, hk_results, or_results, zp_results]
)
total_rent = sum(
len(r["RENT"]) for r in [rm_results, hk_results, or_results, zp_results]
)
with status_lock:
status.outcodes_done = min_done
status.outcodes_total = len(shuffled)
status.properties_buy = total_buy
status.properties_rent = total_rent
status.rm_properties = len(rm_results["BUY"]) + len(rm_results["RENT"])
status.hk_properties = len(hk_results["BUY"]) + len(hk_results["RENT"])
status.or_properties = len(or_results["RENT"])
status.zp_properties = len(zp_results["BUY"]) + len(zp_results["RENT"])
_sync_gauges()
# Log progress every 30 seconds
now = time.time()
if now - last_log >= 30:
elapsed = now - scrape_start
per_source = ", ".join(
f"{s}:{snap.get(s, 0)}" for s in active_sources
)
log.info(
"Progress: %d/%d outcodes (%s), %d buy + %d rent props, %s elapsed",
min_done,
len(shuffled),
per_source,
total_buy,
total_rent,
_fmt_elapsed(elapsed),
)
last_log = now
time.sleep(5)
except Exception as e:
log.exception("Monitor loop error: %s", e)
for t in threads:
t.join()
log.info("All source workers completed")
# --- Merge results per channel and write parquet ---
try:
for ch_cfg in CHANNELS:
ch = ch_cfg["channel"]
file_suffix = "buy" if ch == "BUY" else "rent"
merged, counts, total_dedup = _merge_channel(
rm_results[ch],
hk_results[ch],
or_results[ch],
zp_results[ch],
)
# Update cross-source dedup counter
ch_label = "buy" if ch == "BUY" else "rent"
if total_dedup:
cross_source_dedup_total.labels(channel=ch_label).inc(total_dedup)
deduped = list(merged.values())
output_path = DATA_DIR / f"online_listings_{file_suffix}.parquet"
write_parquet(deduped, output_path, channel=file_suffix)
with status_lock:
if ch == "BUY":
status.properties_buy = len(deduped)
else:
status.properties_rent = len(deduped)
_sync_gauges()
log.info(
"=== %s complete: %d unique (rm:%d hk:%d or:%d zp:%d, cross-dedup:%d) ===",
ch,
len(deduped),
counts["rm"],
counts["hk"],
counts["or"],
counts["zp"],
total_dedup,
)
with status_lock:
status.state = "done"
status.finished_at = time.time()
status.outcodes_done = len(shuffled)
_sync_gauges()
elapsed = status.finished_at - status.started_at
log.info(
"Scrape complete in %s — buy: %d, rent: %d",
_fmt_elapsed(elapsed),
status.properties_buy,
status.properties_rent,
)
# Trigger server data reload
if RELOAD_URL:
try:
log.info("Triggering server reload at %s", RELOAD_URL)
resp = httpx.post(RELOAD_URL, timeout=300)
if resp.is_success:
body = resp.json()
log.info(
"Server reload complete: %d rows, %d features, %dms",
body.get("rows", 0),
body.get("features", 0),
body.get("elapsed_ms", 0),
)
else:
log.warning(
"Server reload failed (%d): %s",
resp.status_code,
resp.text[:200],
)
except Exception as e:
log.warning("Server reload request failed: %s", e)
except Exception as e:
log.exception("Fatal scrape error during merge/write")
with status_lock:
status.state = "error"
status.errors.append(f"Fatal: {e}")
status.finished_at = time.time()
_sync_gauges()