fix zoopla bug
This commit is contained in:
parent
96dfdd7491
commit
1dfa0e0009
4 changed files with 215 additions and 4 deletions
|
|
@ -10,6 +10,7 @@ MAX_RETRIES = 3
|
|||
RETRY_BASE_DELAY = 2.0
|
||||
GRID_CELL_SIZE = 0.01 # degrees for postcode spatial index
|
||||
SEED = 42
|
||||
CHECKPOINT_INTERVAL = int(os.environ.get("CHECKPOINT_INTERVAL", "900")) # seconds
|
||||
|
||||
# Schedule: hour of day (UTC) to auto-run scrape. Set to -1 to disable.
|
||||
SCHEDULE_HOUR = int(os.environ.get("SCHEDULE_HOUR", "3"))
|
||||
|
|
|
|||
|
|
@ -624,9 +624,13 @@ def _resolve_outcode_postcodes(
|
|||
pc_coords: dict[str, tuple[float, float]],
|
||||
) -> list[str]:
|
||||
"""Get all postcodes for an outcode from the postcode coordinates lookup."""
|
||||
# ONSPD 7-char format: 4-char outcodes have no space before incode
|
||||
# (e.g., "BH191AB"), while shorter outcodes do (e.g., "E14 5AB").
|
||||
prefix = outcode + " "
|
||||
# Also try without space for non-standard format (e.g. "SW1Y" matches "SW1Y 4AA")
|
||||
return [pcd for pcd in pc_coords if pcd.startswith(prefix)]
|
||||
results = [pcd for pcd in pc_coords if pcd.startswith(prefix)]
|
||||
if not results and len(outcode) >= 4:
|
||||
results = [pcd for pcd in pc_coords if pcd.startswith(outcode) and len(pcd) > len(outcode)]
|
||||
return results
|
||||
|
||||
|
||||
def transform_property(
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import json
|
||||
import logging
|
||||
import random
|
||||
import threading
|
||||
|
|
@ -11,6 +12,7 @@ import httpx
|
|||
from constants import (
|
||||
ARCGIS_PATH,
|
||||
CHANNELS,
|
||||
CHECKPOINT_INTERVAL,
|
||||
DATA_DIR,
|
||||
DELAY_BETWEEN_OUTCODES,
|
||||
RELOAD_URL,
|
||||
|
|
@ -233,6 +235,135 @@ def _merge_channel(
|
|||
return all_properties, counts, total_dedup
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Checkpointing — save/resume partial results across crashes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _checkpoint_meta_path():
|
||||
return DATA_DIR / "checkpoint.json"
|
||||
|
||||
|
||||
def _checkpoint_results_path(source: str, channel: str):
|
||||
return DATA_DIR / f"checkpoint_{source}_{channel}.json"
|
||||
|
||||
|
||||
def _save_checkpoint(
|
||||
shuffled: list[str],
|
||||
progress: _Progress,
|
||||
source_results: dict[str, dict[str, list]],
|
||||
active_sources: list[str],
|
||||
) -> None:
|
||||
"""Save per-source progress indices and partial results to disk.
|
||||
|
||||
Writes atomically (temp + rename) so a crash mid-write leaves the previous
|
||||
checkpoint intact.
|
||||
"""
|
||||
snap = progress.snapshot()
|
||||
|
||||
meta = {
|
||||
"seed": SEED,
|
||||
"num_outcodes": len(shuffled),
|
||||
"sources": {s: snap.get(s, 0) for s in active_sources},
|
||||
"timestamp": time.time(),
|
||||
}
|
||||
|
||||
# Write result files per source per channel
|
||||
for source in active_sources:
|
||||
results = source_results.get(source, {})
|
||||
for ch_key in ("BUY", "RENT"):
|
||||
props = results.get(ch_key, [])
|
||||
path = _checkpoint_results_path(source, ch_key.lower())
|
||||
tmp = path.with_suffix(".tmp")
|
||||
try:
|
||||
with open(tmp, "w") as f:
|
||||
json.dump(props, f, default=str)
|
||||
tmp.rename(path)
|
||||
except Exception as e:
|
||||
log.warning("Failed to write checkpoint %s: %s", path.name, e)
|
||||
|
||||
# Write metadata atomically
|
||||
tmp = _checkpoint_meta_path().with_suffix(".tmp")
|
||||
try:
|
||||
with open(tmp, "w") as f:
|
||||
json.dump(meta, f)
|
||||
tmp.rename(_checkpoint_meta_path())
|
||||
except Exception as e:
|
||||
log.warning("Failed to write checkpoint metadata: %s", e)
|
||||
return
|
||||
|
||||
total = sum(len(source_results.get(s, {}).get(ch, []))
|
||||
for s in active_sources for ch in ("BUY", "RENT"))
|
||||
log.info(
|
||||
"Checkpoint saved: %s (%d properties)",
|
||||
{s: snap.get(s, 0) for s in active_sources},
|
||||
total,
|
||||
)
|
||||
|
||||
|
||||
def _load_checkpoint(
|
||||
shuffled: list[str],
|
||||
) -> tuple[dict[str, int], dict[str, dict[str, list]]] | None:
|
||||
"""Load checkpoint if it exists and matches the current outcode list.
|
||||
|
||||
Returns (start_indices, loaded_results) or None if no valid checkpoint.
|
||||
"""
|
||||
path = _checkpoint_meta_path()
|
||||
if not path.exists():
|
||||
return None
|
||||
|
||||
try:
|
||||
with open(path) as f:
|
||||
meta = json.load(f)
|
||||
except Exception:
|
||||
log.warning("Checkpoint file corrupt, starting fresh")
|
||||
_clear_checkpoint()
|
||||
return None
|
||||
|
||||
if meta.get("seed") != SEED or meta.get("num_outcodes") != len(shuffled):
|
||||
log.info("Checkpoint from different run configuration, discarding")
|
||||
_clear_checkpoint()
|
||||
return None
|
||||
|
||||
start_indices: dict[str, int] = {}
|
||||
loaded_results: dict[str, dict[str, list]] = {}
|
||||
|
||||
for source, completed in meta.get("sources", {}).items():
|
||||
start_indices[source] = completed
|
||||
loaded_results[source] = {"BUY": [], "RENT": []}
|
||||
for channel in ("buy", "rent"):
|
||||
rpath = _checkpoint_results_path(source, channel)
|
||||
if rpath.exists():
|
||||
try:
|
||||
with open(rpath) as f:
|
||||
loaded_results[source][channel.upper()] = json.load(f)
|
||||
except Exception:
|
||||
log.warning(
|
||||
"Checkpoint results for %s/%s corrupt, restarting %s",
|
||||
source, channel, source,
|
||||
)
|
||||
start_indices[source] = 0
|
||||
loaded_results[source] = {"BUY": [], "RENT": []}
|
||||
break
|
||||
|
||||
elapsed_since = time.time() - meta.get("timestamp", 0)
|
||||
log.info(
|
||||
"Resuming from checkpoint (saved %.0fm ago): %s",
|
||||
elapsed_since / 60,
|
||||
start_indices,
|
||||
)
|
||||
return start_indices, loaded_results
|
||||
|
||||
|
||||
def _clear_checkpoint() -> None:
|
||||
"""Remove all checkpoint files after successful completion."""
|
||||
for path in DATA_DIR.glob("checkpoint*"):
|
||||
try:
|
||||
path.unlink()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def run_scrape(
|
||||
outcodes: list[str],
|
||||
pc_index: PostcodeSpatialIndex,
|
||||
|
|
@ -293,15 +424,40 @@ def run_scrape(
|
|||
|
||||
progress = _Progress()
|
||||
|
||||
# --- Resume from checkpoint if available ---
|
||||
start_indices: dict[str, int] = {}
|
||||
checkpoint = _load_checkpoint(shuffled)
|
||||
if checkpoint:
|
||||
start_indices, loaded = checkpoint
|
||||
source_to_results = {"rm": rm_results, "hk": hk_results, "or": or_results, "zp": zp_results}
|
||||
for src, data in loaded.items():
|
||||
if src in source_to_results:
|
||||
for ch in ("BUY", "RENT"):
|
||||
source_to_results[src][ch] = data.get(ch, [])
|
||||
# Reassign in case references changed
|
||||
rm_results = source_to_results["rm"]
|
||||
hk_results = source_to_results["hk"]
|
||||
or_results = source_to_results["or"]
|
||||
zp_results = source_to_results["zp"]
|
||||
# Pre-set progress for resumed sources
|
||||
for src, idx in start_indices.items():
|
||||
if idx > 0:
|
||||
progress.update(src, idx)
|
||||
|
||||
# --- Source worker closures ---
|
||||
# Each worker owns its client lifecycle and iterates all outcodes for both
|
||||
# channels. On auth failure, it refreshes cookies and continues. On fatal
|
||||
# failure, it marks itself as done and returns partial results.
|
||||
|
||||
def rm_worker():
|
||||
rm_start = start_indices.get("rm", 0)
|
||||
if rm_start > 0:
|
||||
log.info("Rightmove resuming from outcode %d/%d", rm_start, len(shuffled))
|
||||
client = make_client()
|
||||
try:
|
||||
for i, outcode in enumerate(shuffled):
|
||||
if i < rm_start:
|
||||
continue
|
||||
try:
|
||||
outcode_id = resolve_outcode_id(client, outcode)
|
||||
except Exception as e:
|
||||
|
|
@ -344,11 +500,16 @@ def run_scrape(
|
|||
homecouk_enabled.set(0)
|
||||
progress.update("hk", len(shuffled))
|
||||
return
|
||||
hk_start = start_indices.get("hk", 0)
|
||||
if hk_start > 0:
|
||||
log.info("home.co.uk resuming from outcode %d/%d", hk_start, len(shuffled))
|
||||
client = make_homecouk_client(*hk_result)
|
||||
log.info("home.co.uk scraping ENABLED")
|
||||
homecouk_enabled.set(1)
|
||||
try:
|
||||
for i, outcode in enumerate(shuffled):
|
||||
if i < hk_start:
|
||||
continue
|
||||
for ch_cfg in CHANNELS:
|
||||
ch = ch_cfg["channel"]
|
||||
try:
|
||||
|
|
@ -403,11 +564,16 @@ def run_scrape(
|
|||
openrent_enabled.set(0)
|
||||
progress.update("or", len(shuffled))
|
||||
return
|
||||
or_start = start_indices.get("or", 0)
|
||||
if or_start > 0:
|
||||
log.info("OpenRent resuming from outcode %d/%d", or_start, len(shuffled))
|
||||
client = make_openrent_client(*or_result)
|
||||
log.info("OpenRent scraping ENABLED")
|
||||
openrent_enabled.set(1)
|
||||
try:
|
||||
for i, outcode in enumerate(shuffled):
|
||||
if i < or_start:
|
||||
continue
|
||||
# OpenRent is RENT-only
|
||||
try:
|
||||
props = openrent_search_outcode(
|
||||
|
|
@ -470,8 +636,14 @@ def run_scrape(
|
|||
progress.update("zp", len(shuffled))
|
||||
return
|
||||
|
||||
zp_start = start_indices.get("zp", 0)
|
||||
if zp_start > 0:
|
||||
log.info("Zoopla resuming from outcode %d/%d", zp_start, len(shuffled))
|
||||
|
||||
try:
|
||||
for i, outcode in enumerate(shuffled):
|
||||
if i < zp_start:
|
||||
continue
|
||||
search_url = None
|
||||
for ch_cfg in CHANNELS:
|
||||
ch = ch_cfg["channel"]
|
||||
|
|
@ -559,8 +731,15 @@ def run_scrape(
|
|||
|
||||
# --- Monitor progress while workers run ---
|
||||
|
||||
# Map source names to result dicts for checkpointing
|
||||
source_results_map = {
|
||||
"rm": rm_results, "hk": hk_results,
|
||||
"or": or_results, "zp": zp_results,
|
||||
}
|
||||
|
||||
scrape_start = time.time()
|
||||
last_log = 0.0
|
||||
last_checkpoint = time.time()
|
||||
|
||||
try:
|
||||
while any(t.is_alive() for t in threads):
|
||||
|
|
@ -588,8 +767,9 @@ def run_scrape(
|
|||
status.zp_properties = len(zp_results["BUY"]) + len(zp_results["RENT"])
|
||||
_sync_gauges()
|
||||
|
||||
# Log progress every 30 seconds
|
||||
now = time.time()
|
||||
|
||||
# Log progress every 30 seconds
|
||||
if now - last_log >= 30:
|
||||
elapsed = now - scrape_start
|
||||
per_source = ", ".join(
|
||||
|
|
@ -606,10 +786,26 @@ def run_scrape(
|
|||
)
|
||||
last_log = now
|
||||
|
||||
# Save checkpoint periodically
|
||||
if now - last_checkpoint >= CHECKPOINT_INTERVAL:
|
||||
try:
|
||||
_save_checkpoint(
|
||||
shuffled, progress, source_results_map, active_sources,
|
||||
)
|
||||
except Exception as e:
|
||||
log.warning("Checkpoint save failed: %s", e)
|
||||
last_checkpoint = now
|
||||
|
||||
time.sleep(5)
|
||||
except Exception as e:
|
||||
log.exception("Monitor loop error: %s", e)
|
||||
|
||||
# Save final checkpoint before joining (in case merge/write fails)
|
||||
try:
|
||||
_save_checkpoint(shuffled, progress, source_results_map, active_sources)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
|
|
@ -656,6 +852,9 @@ def run_scrape(
|
|||
total_dedup,
|
||||
)
|
||||
|
||||
# Scrape completed successfully — clear checkpoint
|
||||
_clear_checkpoint()
|
||||
|
||||
with status_lock:
|
||||
status.state = "done"
|
||||
status.finished_at = time.time()
|
||||
|
|
|
|||
|
|
@ -626,9 +626,16 @@ def transform_property(
|
|||
# Try outcode-level fallback
|
||||
outcode = _extract_outcode(address)
|
||||
if outcode:
|
||||
# ONSPD 7-char format: 4-char outcodes have no space before incode
|
||||
# (e.g., "BH191AB"), while shorter outcodes do (e.g., "E14 5AB").
|
||||
# Check both formats to handle all outcode lengths.
|
||||
prefix = outcode + " "
|
||||
for pcd, coords in pc_coords.items():
|
||||
if pcd.startswith(prefix):
|
||||
if pcd.startswith(prefix) or (
|
||||
len(outcode) >= 4
|
||||
and pcd.startswith(outcode)
|
||||
and len(pcd) > len(outcode)
|
||||
):
|
||||
postcode = pcd
|
||||
lat, lng = coords
|
||||
break
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue