perfect-postcode/finder/homecouk.py
2026-03-15 21:22:28 +00:00

372 lines
13 KiB
Python

import json
import logging
import os
import random
import re
import time
from urllib.parse import unquote
import httpx
from curl_cffi.requests import Session
from curl_cffi.requests.errors import RequestsError
from constants import (
DELAY_BETWEEN_PAGES,
HOMECOUK_API_BASE,
HOMECOUK_BASE,
HOMECOUK_PER_PAGE,
PROPERTY_TYPE_MAP,
RETRY_BASE_DELAY,
)
from metrics import (
flaresolverr_attempts_total,
homecouk_errors_total,
homecouk_properties_scraped,
homecouk_requests_total,
)
from spatial import PostcodeSpatialIndex
log = logging.getLogger("homecouk")
class CookiesExpiredError(Exception):
"""Raised when home.co.uk returns 403, indicating cookies need refresh."""
# Channel mapping: internal name → URL path segment
HOMECOUK_CHANNELS = {
"BUY": "for-sale",
"RENT": "to-rent",
}
FLARESOLVERR_URL = os.environ.get("FLARESOLVERR_URL", "http://flaresolverr:8191")
def solve_cloudflare() -> tuple[dict[str, str], str] | None:
"""Use FlareSolverr to solve the Cloudflare challenge.
Returns (cookies_dict, user_agent) or None on failure."""
log.info("Solving Cloudflare challenge via FlareSolverr at %s", FLARESOLVERR_URL)
try:
with httpx.Client(timeout=120) as client:
resp = client.post(
f"{FLARESOLVERR_URL}/v1",
json={
"cmd": "request.get",
"url": f"{HOMECOUK_BASE}/for-sale/e1/",
"maxTimeout": 60000,
},
)
if resp.status_code != 200:
log.error("FlareSolverr returned HTTP %d", resp.status_code)
return None
data = resp.json()
if data.get("status") != "ok":
log.error("FlareSolverr error: %s", data.get("message", "unknown"))
return None
solution = data["solution"]
raw_cookies = solution.get("cookies", [])
user_agent = solution.get("userAgent", "")
# Pass through ALL cookies from FlareSolverr — different Cloudflare
# configurations set different cookies (cf_clearance only appears when
# a challenge is triggered; it's not needed if no challenge was detected)
cookies = {}
for c in raw_cookies:
name = c.get("name", "")
if name:
cookies[name] = c["value"]
if not cookies:
log.error("FlareSolverr solved but returned no cookies at all")
flaresolverr_attempts_total.labels(result="no_cookies").inc()
return None
log.info(
"Cloudflare solved — got %d cookies, UA: %s",
len(cookies),
user_agent[:60],
)
flaresolverr_attempts_total.labels(result="success").inc()
return cookies, user_agent
except (httpx.ConnectError, httpx.ReadTimeout) as e:
log.warning("FlareSolverr not available: %s", e)
flaresolverr_attempts_total.labels(result="unavailable").inc()
return None
except Exception as e:
log.error("FlareSolverr error: %s", e)
flaresolverr_attempts_total.labels(result="error").inc()
return None
def load_cookies() -> tuple[dict[str, str], str] | None:
"""Get home.co.uk cookies + user-agent.
Tries FlareSolverr first, then falls back to environment variables.
Returns (cookies_dict, user_agent) or None if not configured."""
# Try FlareSolverr first
result = solve_cloudflare()
if result:
return result
# Fall back to env vars
cf_clearance = os.environ.get("HOMECOUK_CF_CLEARANCE", "")
session = os.environ.get("HOMECOUK_SESSION", "")
if not cf_clearance or not session:
return None
user_agent = os.environ.get(
"HOMECOUK_USER_AGENT",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/145.0.0.0 Safari/537.36",
)
return {"cf_clearance": cf_clearance, "homecouk_session": session}, user_agent
def make_client(cookies: dict[str, str], user_agent: str) -> Session:
"""Create a curl_cffi Session configured for home.co.uk API calls.
Uses Chrome TLS impersonation so cf_clearance cookies (which are bound
to Chrome's JA3 fingerprint from FlareSolverr) remain valid."""
session = Session(impersonate="chrome")
session.headers.update(
{
"User-Agent": user_agent,
"Accept": "application/json, text/plain, */*",
"x-requested-with": "XMLHttpRequest",
}
)
# Laravel CSRF: the XSRF-TOKEN cookie value must also be sent as the
# X-XSRF-TOKEN request header (URL-decoded). Without this header, the
# server rejects every request with 419/403.
xsrf = cookies.get("XSRF-TOKEN")
if xsrf:
session.headers["X-XSRF-TOKEN"] = unquote(xsrf)
for name, value in cookies.items():
session.cookies.set(name, value, domain="home.co.uk")
return session
def _status_label(code: int) -> str:
if code >= 500:
return "5xx"
return str(code)
def fetch_page(
client: Session, url: str, params: dict, max_retries: int = 3
) -> dict | None:
"""GET JSON with retries on 429/5xx. Returns None on permanent failure.
403 means cookies expired — raises CookiesExpiredError immediately."""
for attempt in range(max_retries):
try:
resp = client.get(url, params=params, timeout=30)
homecouk_requests_total.labels(status=_status_label(resp.status_code)).inc()
if resp.status_code == 200:
try:
return resp.json()
except json.JSONDecodeError:
homecouk_errors_total.labels(type="json_decode").inc()
log.error(
"Non-JSON response from %s (got %s)",
url,
resp.headers.get("content-type", "?"),
)
return None
if resp.status_code == 403:
raise CookiesExpiredError("HTTP 403 — cookies likely expired")
if resp.status_code in (429, 500, 502, 503, 504):
delay = RETRY_BASE_DELAY * (2**attempt) + random.uniform(0, 1)
log.warning(
"HTTP %d from %s, retry %d/%d in %.1fs",
resp.status_code,
url,
attempt + 1,
max_retries,
delay,
)
time.sleep(delay)
continue
log.error("HTTP %d from %s (non-retryable)", resp.status_code, url)
return None
except CookiesExpiredError:
raise
except RequestsError as e:
homecouk_errors_total.labels(type=type(e).__name__).inc()
delay = RETRY_BASE_DELAY * (2**attempt) + random.uniform(0, 1)
log.warning(
"%s from %s, retry %d/%d in %.1fs",
type(e).__name__,
url,
attempt + 1,
max_retries,
delay,
)
time.sleep(delay)
homecouk_errors_total.labels(type="retry_exhausted").inc()
log.error("All %d retries exhausted for %s", max_retries, url)
return None
def parse_floor_area(description: str | None) -> float | None:
"""Try to extract floor area from description text like '789 sq.ft.' or '73 sq.m.'."""
if not description:
return None
m = re.search(r"([\d,]+(?:\.\d+)?)\s*sq\.?\s*ft", description, re.IGNORECASE)
if m:
sqft = float(m.group(1).replace(",", ""))
return round(sqft * 0.092903, 1)
m = re.search(r"([\d,]+(?:\.\d+)?)\s*sq\.?\s*m", description, re.IGNORECASE)
if m:
return round(float(m.group(1).replace(",", "")), 1)
return None
def map_property_type(raw_type: str | None) -> str:
"""Map home.co.uk property type to canonical type."""
if not raw_type:
return "Other"
canonical = PROPERTY_TYPE_MAP.get(raw_type)
if canonical:
return canonical
# Home.co.uk uses types like "House", "Flat", "Apartment", "Detached", etc.
# Try common patterns
lower = raw_type.lower()
if (
"flat" in lower
or "apartment" in lower
or "maisonette" in lower
or "studio" in lower
):
return "Flats/Maisonettes"
if "detached" in lower and "semi" not in lower:
return "Detached"
if "semi" in lower:
return "Semi-Detached"
if "terrace" in lower or "mews" in lower:
return "Terraced"
log.debug("Unknown property type: %r — mapping to Other", raw_type)
return "Other"
def transform_property(
prop: dict,
channel: str,
pc_index: PostcodeSpatialIndex,
) -> dict | None:
"""Transform a raw home.co.uk property dict into our output schema."""
lat = prop.get("latitude")
lng = prop.get("longitude")
if lat is None or lng is None:
return None
# Validate coordinates are in England
if not (49 <= lat <= 56 and -7 <= lng <= 2):
log.debug("Coords outside England: lat=%.4f lng=%.4f — skipping", lat, lng)
return None
price = prop.get("price") or prop.get("latest_price")
if not price:
return None
# Home.co.uk provides postcodes directly, but fall back to spatial index
postcode = prop.get("postcode")
if not postcode:
postcode = pc_index.nearest(lat, lng)
if not postcode:
log.debug("No postcode for property at %.4f, %.4f — skipping", lat, lng)
return None
bedrooms = prop.get("bedrooms", 0) or 0
bathrooms = prop.get("bathrooms", 0) or 0
listing_type = prop.get("listing_property_type") or prop.get("property_type") or ""
address = prop.get("display_address") or prop.get("address") or ""
# Derive price qualifier from reduction info
price_qualifier = ""
if prop.get("is_reduced"):
pct = prop.get("reduction_percent", 0)
if pct:
price_qualifier = f"Reduced by {pct}%"
else:
price_qualifier = "Reduced"
listing_id = prop.get("listing_id") or prop.get("property_id") or ""
return {
"id": f"hk_{listing_id}", # prefix to avoid collision with Rightmove IDs
"Bedrooms": bedrooms,
"Bathrooms": bathrooms,
"Number of bedrooms & living rooms": bedrooms + bathrooms,
"lon": lng,
"lat": lat,
"Postcode": postcode,
"Address per Property Register": address,
"Leasehold/Freehold": None, # not available from home.co.uk
"Property type": map_property_type(listing_type),
"Property sub-type": listing_type or "Unknown",
"price": int(price),
"price_frequency": "" if channel == "BUY" else "monthly",
"Price qualifier": price_qualifier,
"Total floor area (sqm)": parse_floor_area(prop.get("description")),
"Listing URL": f"{HOMECOUK_BASE}/property/{listing_id}",
"Listing features": [], # not available from home.co.uk
"first_visible_date": prop.get("added_date") or "",
}
def search_outcode(
client: Session,
outcode: str,
channel: str,
pc_index: PostcodeSpatialIndex,
) -> list[dict]:
"""Paginate through search results for one outcode+channel.
channel: "BUY" or "RENT".
Returns transformed properties."""
url_segment = HOMECOUK_CHANNELS[channel]
url = f"{HOMECOUK_API_BASE}/{url_segment}/{outcode.lower()}/"
properties = []
page = 1
while True:
params = {
"page": str(page),
"sort": "date_desc",
"per_page": str(HOMECOUK_PER_PAGE),
}
# Set referer to match the page URL pattern
client.headers["referer"] = (
f"https://home.co.uk/{url_segment}/{outcode.lower()}/"
f"?page={page}&sort=date_desc&per_page={HOMECOUK_PER_PAGE}"
)
data = fetch_page(client, url, params)
if not data:
break
raw_props = data.get("properties", [])
if not raw_props:
break
for prop in raw_props:
transformed = transform_property(prop, channel, pc_index)
if transformed:
properties.append(transformed)
homecouk_properties_scraped.labels(
channel="buy" if channel == "BUY" else "rent",
).inc()
# Check pagination
pagination = data.get("pagination", {})
last_page = pagination.get("last_page", 1)
if page >= last_page:
break
page += 1
time.sleep(DELAY_BETWEEN_PAGES)
return properties