500 lines
15 KiB
Python
500 lines
15 KiB
Python
import logging
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
|
|
import polars as pl
|
|
|
|
from constants import (
|
|
ARCGIS_PATH,
|
|
CHANNELS,
|
|
DATA_DIR,
|
|
DELAY_BETWEEN_OUTCODES,
|
|
LONDON_OUTCODE_PREFIXES,
|
|
)
|
|
|
|
from http_client import make_client
|
|
from rightmove import resolve_outcode_id
|
|
from rightmove import search_outcode as rightmove_search_outcode
|
|
from spatial import PostcodeSpatialIndex
|
|
from storage import write_parquet
|
|
from zoopla import TurnstileError
|
|
from zoopla import launch_browser as launch_zoopla_browser
|
|
from zoopla import search_outcode as zoopla_search_outcode
|
|
|
|
log = logging.getLogger("rightmove")
|
|
|
|
SOURCE_ORDER = ("rightmove", "zoopla")
|
|
SALE_CHANNEL = CHANNELS[0]
|
|
LONDON_AREAS = sorted({prefix.upper() for prefix in LONDON_OUTCODE_PREFIXES})
|
|
OUTCODE_RE = re.compile(r"^([A-Z]{1,2}\d[A-Z0-9]?)")
|
|
|
|
|
|
def _arcgis_columns() -> tuple[str, str]:
|
|
"""Return postcode and country column names for supported ARCGIS schemas."""
|
|
columns = set(pl.scan_parquet(ARCGIS_PATH).collect_schema().names())
|
|
|
|
if "pcd" in columns:
|
|
postcode_col = "pcd"
|
|
elif "pcds" in columns:
|
|
postcode_col = "pcds"
|
|
else:
|
|
raise ValueError(f"{ARCGIS_PATH} has no supported postcode column")
|
|
|
|
if "ctry" in columns:
|
|
country_col = "ctry"
|
|
elif "ctry25cd" in columns:
|
|
country_col = "ctry25cd"
|
|
else:
|
|
raise ValueError(f"{ARCGIS_PATH} has no supported country column")
|
|
|
|
return postcode_col, country_col
|
|
|
|
|
|
def _normalize_postcode(postcode: str) -> str:
|
|
compact = "".join(str(postcode).upper().split())
|
|
if len(compact) < 5:
|
|
return compact
|
|
return compact[:-3] + " " + compact[-3:]
|
|
|
|
|
|
def _londonish_postcode_expr(postcode_col: str) -> pl.Expr:
|
|
return (
|
|
pl.col(postcode_col)
|
|
.str.to_uppercase()
|
|
.str.extract(r"^([A-Z]{1,2})", 1)
|
|
.is_in(LONDON_AREAS)
|
|
)
|
|
|
|
|
|
def _outcode_area(outcode: str) -> str:
|
|
chars = []
|
|
for ch in outcode.upper():
|
|
if not ch.isalpha():
|
|
break
|
|
chars.append(ch)
|
|
return "".join(chars)
|
|
|
|
|
|
def is_londonish_outcode(outcode: str) -> bool:
|
|
normalized = outcode.upper()
|
|
return normalized in LONDON_AREAS or _outcode_area(normalized) in LONDON_AREAS
|
|
|
|
|
|
def _property_is_londonish(prop: dict) -> bool:
|
|
postcode = str(prop.get("Postcode") or "").upper().strip()
|
|
match = OUTCODE_RE.match(postcode)
|
|
return bool(match and is_londonish_outcode(match.group(1)))
|
|
|
|
|
|
def filter_londonish_outcodes(outcodes: Iterable[str]) -> list[str]:
|
|
return sorted(
|
|
{outcode.upper() for outcode in outcodes if is_londonish_outcode(outcode)}
|
|
)
|
|
|
|
|
|
def load_outcodes() -> list[str]:
|
|
"""Load England outcodes from ARCGIS and keep only Greater London-ish areas."""
|
|
log.info("Loading outcodes from %s", ARCGIS_PATH)
|
|
postcode_col, country_col = _arcgis_columns()
|
|
df = pl.read_parquet(ARCGIS_PATH, columns=[postcode_col, country_col])
|
|
england = df.filter(
|
|
(pl.col(country_col) == "E92000001")
|
|
& _londonish_postcode_expr(postcode_col)
|
|
)
|
|
|
|
outcodes = (
|
|
england.select(
|
|
pl.col(postcode_col)
|
|
.str.extract(r"^([A-Z]{1,2}\d[A-Z0-9]?)", 1)
|
|
.alias("outcode")
|
|
)
|
|
.drop_nulls()
|
|
.get_column("outcode")
|
|
.unique()
|
|
.sort()
|
|
.to_list()
|
|
)
|
|
londonish = filter_londonish_outcodes(outcodes)
|
|
log.info("Greater London-ish outcodes: %d", len(londonish))
|
|
return londonish
|
|
|
|
|
|
def build_postcode_index() -> PostcodeSpatialIndex:
|
|
"""Build spatial index from ARCGIS England postcodes."""
|
|
log.info("Building postcode spatial index from %s", ARCGIS_PATH)
|
|
postcode_col, country_col = _arcgis_columns()
|
|
df = pl.read_parquet(
|
|
ARCGIS_PATH, columns=[postcode_col, country_col, "lat", "long"]
|
|
)
|
|
england = df.filter(
|
|
(pl.col(country_col) == "E92000001")
|
|
& _londonish_postcode_expr(postcode_col)
|
|
).drop_nulls(
|
|
subset=["lat", "long"]
|
|
)
|
|
return PostcodeSpatialIndex(
|
|
england.get_column("lat").to_list(),
|
|
england.get_column("long").to_list(),
|
|
[
|
|
_normalize_postcode(pcd)
|
|
for pcd in england.get_column(postcode_col).to_list()
|
|
],
|
|
)
|
|
|
|
|
|
def build_postcode_coords() -> dict[str, tuple[float, float]]:
|
|
"""Build postcode -> (lat, lng) lookup from ARCGIS England postcodes."""
|
|
log.info("Building postcode coords lookup from %s", ARCGIS_PATH)
|
|
postcode_col, country_col = _arcgis_columns()
|
|
df = pl.read_parquet(
|
|
ARCGIS_PATH, columns=[postcode_col, country_col, "lat", "long"]
|
|
)
|
|
england = df.filter(
|
|
(pl.col(country_col) == "E92000001")
|
|
& _londonish_postcode_expr(postcode_col)
|
|
).drop_nulls(
|
|
subset=["lat", "long"]
|
|
)
|
|
coords: dict[str, tuple[float, float]] = {}
|
|
for pcd, lat, lng in zip(
|
|
england.get_column(postcode_col).to_list(),
|
|
england.get_column("lat").to_list(),
|
|
england.get_column("long").to_list(),
|
|
):
|
|
coords[_normalize_postcode(pcd)] = (lat, lng)
|
|
log.info("Postcode coords lookup: %d postcodes", len(coords))
|
|
return coords
|
|
|
|
|
|
def _source_names(sources: str | Iterable[str] | None) -> list[str]:
|
|
if sources is None:
|
|
return list(SOURCE_ORDER)
|
|
if isinstance(sources, str):
|
|
requested = [part.strip().lower() for part in sources.split(",")]
|
|
else:
|
|
requested = [str(source).strip().lower() for source in sources]
|
|
|
|
requested = [source for source in requested if source]
|
|
unknown = sorted(set(requested) - set(SOURCE_ORDER) - {"all"})
|
|
if unknown:
|
|
raise ValueError(f"Unknown source(s): {', '.join(unknown)}")
|
|
if "all" in requested:
|
|
return list(SOURCE_ORDER)
|
|
return [source for source in SOURCE_ORDER if source in requested]
|
|
|
|
|
|
def _dedup_key(prop: dict) -> tuple:
|
|
return (prop.get("Postcode", ""), prop.get("Bedrooms", 0), prop.get("price", 0))
|
|
|
|
|
|
def _merge_properties(source_results: dict[str, list[dict]]) -> tuple[list[dict], dict, int]:
|
|
merged: dict[str, dict] = {}
|
|
seen_keys: set[tuple] = set()
|
|
seen_ids: set[str] = set()
|
|
counts = {source: 0 for source in SOURCE_ORDER}
|
|
deduped = 0
|
|
|
|
for source in SOURCE_ORDER:
|
|
for prop in source_results.get(source, []):
|
|
prop_id = prop.get("id")
|
|
if prop_id is not None:
|
|
prop_id = str(prop_id)
|
|
if prop_id in seen_ids:
|
|
deduped += 1
|
|
continue
|
|
seen_ids.add(prop_id)
|
|
storage_key = prop_id
|
|
else:
|
|
key = _dedup_key(prop)
|
|
if key in seen_keys:
|
|
deduped += 1
|
|
continue
|
|
seen_keys.add(key)
|
|
storage_key = f"{source}:{len(merged)}"
|
|
merged[storage_key] = prop
|
|
counts[source] += 1
|
|
|
|
return list(merged.values()), counts, deduped
|
|
|
|
|
|
def _source_total(
|
|
results: dict[str, list[dict]],
|
|
source: str,
|
|
) -> int:
|
|
return len(results[source])
|
|
|
|
|
|
def _source_remaining(
|
|
results: dict[str, list[dict]],
|
|
source: str,
|
|
max_properties_per_source: int | None,
|
|
) -> int | None:
|
|
if max_properties_per_source is None:
|
|
return None
|
|
return max(max_properties_per_source - _source_total(results, source), 0)
|
|
|
|
|
|
def _store_properties(
|
|
results: dict[str, list[dict]],
|
|
source: str,
|
|
props: list[dict],
|
|
max_properties_per_source: int | None,
|
|
) -> int:
|
|
remaining = _source_remaining(results, source, max_properties_per_source)
|
|
if remaining == 0:
|
|
return 0
|
|
|
|
londonish = [prop for prop in props if _property_is_londonish(prop)]
|
|
dropped_outside_area = len(props) - len(londonish)
|
|
if dropped_outside_area:
|
|
log.debug(
|
|
"%s dropped %d properties outside the Greater London-ish postcode filter",
|
|
source,
|
|
dropped_outside_area,
|
|
)
|
|
|
|
selected = londonish if remaining is None else londonish[:remaining]
|
|
results[source].extend(selected)
|
|
return len(selected)
|
|
|
|
|
|
def _record_error(
|
|
errors: list[str], source: str, outcode: str, exc: Exception
|
|
) -> None:
|
|
detail = " ".join(str(exc).split())
|
|
if len(detail) > 300:
|
|
detail = f"{detail[:300]}..."
|
|
message = f"{source} {outcode}: {detail}"
|
|
errors.append(message)
|
|
log.warning(message)
|
|
|
|
|
|
def _launch_zoopla_with_retries(attempts: int = 3):
|
|
last_error: Exception | None = None
|
|
for attempt in range(1, attempts + 1):
|
|
try:
|
|
return launch_zoopla_browser()
|
|
except TurnstileError:
|
|
raise
|
|
except Exception as exc:
|
|
last_error = exc
|
|
log.warning(
|
|
"Zoopla browser launch failed (%d/%d): %s",
|
|
attempt,
|
|
attempts,
|
|
exc,
|
|
)
|
|
time.sleep(5)
|
|
|
|
assert last_error is not None
|
|
raise last_error
|
|
|
|
|
|
def _scrape_rightmove(
|
|
outcodes: list[str],
|
|
pc_index: PostcodeSpatialIndex,
|
|
results: dict[str, list[dict]],
|
|
errors: list[str],
|
|
max_properties_per_source: int | None,
|
|
) -> None:
|
|
client = make_client()
|
|
try:
|
|
for outcode in outcodes:
|
|
if _source_remaining(results, "rightmove", max_properties_per_source) == 0:
|
|
log.info("Rightmove cap reached")
|
|
return
|
|
|
|
try:
|
|
outcode_id = resolve_outcode_id(client, outcode)
|
|
except Exception as exc:
|
|
_record_error(errors, "rightmove", outcode, exc)
|
|
time.sleep(DELAY_BETWEEN_OUTCODES)
|
|
continue
|
|
|
|
if not outcode_id:
|
|
log.debug("No Rightmove outcode ID for %s", outcode)
|
|
time.sleep(DELAY_BETWEEN_OUTCODES)
|
|
continue
|
|
|
|
remaining = _source_remaining(
|
|
results, "rightmove", max_properties_per_source
|
|
)
|
|
if remaining == 0:
|
|
log.info("Rightmove cap reached")
|
|
return
|
|
|
|
try:
|
|
props = rightmove_search_outcode(
|
|
client,
|
|
outcode_id,
|
|
outcode,
|
|
SALE_CHANNEL,
|
|
pc_index,
|
|
max_properties=remaining,
|
|
)
|
|
added = _store_properties(
|
|
results,
|
|
"rightmove",
|
|
props,
|
|
max_properties_per_source,
|
|
)
|
|
log.info("Rightmove %s: +%d", outcode, added)
|
|
except Exception as exc:
|
|
_record_error(errors, "rightmove", outcode, exc)
|
|
|
|
time.sleep(DELAY_BETWEEN_OUTCODES)
|
|
finally:
|
|
client.close()
|
|
|
|
|
|
def _scrape_zoopla(
|
|
outcodes: list[str],
|
|
pc_index: PostcodeSpatialIndex,
|
|
pc_coords: dict[str, tuple[float, float]],
|
|
results: dict[str, list[dict]],
|
|
errors: list[str],
|
|
max_properties_per_source: int | None,
|
|
) -> None:
|
|
try:
|
|
browser, page = _launch_zoopla_with_retries()
|
|
except Exception as exc:
|
|
errors.append(f"zoopla: browser launch failed: {exc}")
|
|
log.warning("Zoopla skipped: browser launch failed: %s", exc)
|
|
return
|
|
|
|
try:
|
|
for outcode in outcodes:
|
|
if _source_remaining(results, "zoopla", max_properties_per_source) == 0:
|
|
log.info("Zoopla cap reached")
|
|
return
|
|
|
|
for attempt in range(2):
|
|
try:
|
|
# Fetch the outcode page set first; _store_properties applies
|
|
# the London-ish postcode filter and source cap after transformation.
|
|
props, _ = zoopla_search_outcode(
|
|
page,
|
|
outcode,
|
|
pc_index,
|
|
pc_coords,
|
|
max_properties=None,
|
|
)
|
|
added = _store_properties(
|
|
results,
|
|
"zoopla",
|
|
props,
|
|
max_properties_per_source,
|
|
)
|
|
log.info("Zoopla %s: +%d", outcode, added)
|
|
break
|
|
except Exception as exc:
|
|
if attempt == 1:
|
|
_record_error(errors, "zoopla", outcode, exc)
|
|
if isinstance(exc, TurnstileError):
|
|
return
|
|
break
|
|
|
|
log.warning("Zoopla %s failed; relaunching browser and retrying", outcode)
|
|
try:
|
|
browser.close()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
browser, page = _launch_zoopla_with_retries()
|
|
except Exception as relaunch_exc:
|
|
_record_error(errors, "zoopla", outcode, relaunch_exc)
|
|
return
|
|
|
|
time.sleep(DELAY_BETWEEN_OUTCODES)
|
|
finally:
|
|
browser.close()
|
|
|
|
|
|
def run_scrape(
|
|
outcodes: list[str],
|
|
pc_index: PostcodeSpatialIndex,
|
|
pc_coords: dict[str, tuple[float, float]] | None = None,
|
|
sources: str | Iterable[str] | None = None,
|
|
output_dir: str | Path | None = None,
|
|
max_properties_per_source: int | None = None,
|
|
) -> dict:
|
|
"""Run one manual sale-listings scrape and write a parquet output."""
|
|
selected_sources = _source_names(sources)
|
|
selected_outcodes = filter_londonish_outcodes(outcodes)
|
|
if not selected_sources:
|
|
raise ValueError("No sources selected")
|
|
if not selected_outcodes:
|
|
raise ValueError("No Greater London-ish outcodes selected")
|
|
|
|
output_base = Path(output_dir) if output_dir is not None else DATA_DIR
|
|
output_base.mkdir(parents=True, exist_ok=True)
|
|
|
|
errors: list[str] = []
|
|
results = {source: [] for source in SOURCE_ORDER}
|
|
started_at = time.time()
|
|
|
|
log.info(
|
|
"Starting manual sale scrape: %d outcodes, sources=%s, source_cap=%s",
|
|
len(selected_outcodes),
|
|
",".join(selected_sources),
|
|
max_properties_per_source,
|
|
)
|
|
|
|
if "rightmove" in selected_sources:
|
|
_scrape_rightmove(
|
|
selected_outcodes,
|
|
pc_index,
|
|
results,
|
|
errors,
|
|
max_properties_per_source,
|
|
)
|
|
|
|
if "zoopla" in selected_sources:
|
|
if pc_coords is None:
|
|
pc_coords = build_postcode_coords()
|
|
_scrape_zoopla(
|
|
selected_outcodes,
|
|
pc_index,
|
|
pc_coords,
|
|
results,
|
|
errors,
|
|
max_properties_per_source,
|
|
)
|
|
|
|
merged, source_counts, deduped = _merge_properties(results)
|
|
output_path = output_base / "online_listings_buy.parquet"
|
|
if merged:
|
|
write_parquet(merged, output_path)
|
|
else:
|
|
if output_path.exists():
|
|
output_path.unlink()
|
|
log.warning("No London-ish properties to write to %s", output_path)
|
|
|
|
counts = {
|
|
"total": len(merged),
|
|
"deduped": deduped,
|
|
"sources": source_counts,
|
|
}
|
|
source_summary = " ".join(
|
|
f"{source}:{source_counts[source]}" for source in SOURCE_ORDER
|
|
)
|
|
log.info(
|
|
"Sale scrape complete: %d unique (%s deduped:%d)",
|
|
len(merged),
|
|
source_summary,
|
|
deduped,
|
|
)
|
|
|
|
return {
|
|
"outcodes": len(selected_outcodes),
|
|
"sources": selected_sources,
|
|
"source_totals": {
|
|
source: _source_total(results, source) for source in selected_sources
|
|
},
|
|
"counts": counts,
|
|
"path": str(output_path),
|
|
"errors": errors,
|
|
"elapsed_seconds": round(time.time() - started_at, 3),
|
|
}
|