import json import logging import os import random import re import time from urllib.parse import unquote import httpx from curl_cffi.requests import Session from curl_cffi.requests.errors import RequestsError from constants import ( DELAY_BETWEEN_PAGES, HOMECOUK_API_BASE, HOMECOUK_BASE, HOMECOUK_PER_PAGE, MAX_BEDROOMS, PROPERTY_TYPE_MAP, RETRY_BASE_DELAY, ) from metrics import ( flaresolverr_attempts_total, homecouk_errors_total, homecouk_properties_scraped, homecouk_requests_total, ) from spatial import PostcodeSpatialIndex from transform import validate_floor_area log = logging.getLogger("homecouk") class CookiesExpiredError(Exception): """Raised when home.co.uk returns 403, indicating cookies need refresh.""" # Channel mapping: internal name → URL path segment HOMECOUK_CHANNELS = { "BUY": "for-sale", "RENT": "to-rent", } FLARESOLVERR_URL = os.environ.get("FLARESOLVERR_URL", "http://flaresolverr:8191") def solve_cloudflare() -> tuple[dict[str, str], str] | None: """Use FlareSolverr to solve the Cloudflare challenge. Returns (cookies_dict, user_agent) or None on failure.""" log.info("Solving Cloudflare challenge via FlareSolverr at %s", FLARESOLVERR_URL) try: with httpx.Client(timeout=120) as client: resp = client.post( f"{FLARESOLVERR_URL}/v1", json={ "cmd": "request.get", "url": f"{HOMECOUK_BASE}/for-sale/e1/", "maxTimeout": 60000, }, ) if resp.status_code != 200: log.error("FlareSolverr returned HTTP %d", resp.status_code) return None data = resp.json() if data.get("status") != "ok": log.error("FlareSolverr error: %s", data.get("message", "unknown")) return None solution = data["solution"] raw_cookies = solution.get("cookies", []) user_agent = solution.get("userAgent", "") # Pass through ALL cookies from FlareSolverr — different Cloudflare # configurations set different cookies (cf_clearance only appears when # a challenge is triggered; it's not needed if no challenge was detected) cookies = {} for c in raw_cookies: name = c.get("name", "") if name: cookies[name] = c["value"] if not cookies: log.error("FlareSolverr solved but returned no cookies at all") flaresolverr_attempts_total.labels(result="no_cookies").inc() return None log.info( "Cloudflare solved — got %d cookies, UA: %s", len(cookies), user_agent[:60], ) flaresolverr_attempts_total.labels(result="success").inc() return cookies, user_agent except (httpx.ConnectError, httpx.ReadTimeout) as e: log.warning("FlareSolverr not available: %s", e) flaresolverr_attempts_total.labels(result="unavailable").inc() return None except Exception as e: log.error("FlareSolverr error: %s", e) flaresolverr_attempts_total.labels(result="error").inc() return None def load_cookies() -> tuple[dict[str, str], str] | None: """Get home.co.uk cookies + user-agent. Tries FlareSolverr first, then falls back to environment variables. Returns (cookies_dict, user_agent) or None if not configured.""" # Try FlareSolverr first result = solve_cloudflare() if result: return result # Fall back to env vars cf_clearance = os.environ.get("HOMECOUK_CF_CLEARANCE", "") session = os.environ.get("HOMECOUK_SESSION", "") if not cf_clearance or not session: return None user_agent = os.environ.get( "HOMECOUK_USER_AGENT", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/145.0.0.0 Safari/537.36", ) return {"cf_clearance": cf_clearance, "homecouk_session": session}, user_agent def make_client(cookies: dict[str, str], user_agent: str) -> Session: """Create a curl_cffi Session configured for home.co.uk API calls. Uses Chrome TLS impersonation so cf_clearance cookies (which are bound to Chrome's JA3 fingerprint from FlareSolverr) remain valid.""" session = Session(impersonate="chrome") session.headers.update( { "User-Agent": user_agent, "Accept": "application/json, text/plain, */*", "x-requested-with": "XMLHttpRequest", } ) # Laravel CSRF: the XSRF-TOKEN cookie value must also be sent as the # X-XSRF-TOKEN request header (URL-decoded). Without this header, the # server rejects every request with 419/403. xsrf = cookies.get("XSRF-TOKEN") if xsrf: session.headers["X-XSRF-TOKEN"] = unquote(xsrf) for name, value in cookies.items(): session.cookies.set(name, value, domain="home.co.uk") return session def _status_label(code: int) -> str: if code >= 500: return "5xx" return str(code) def fetch_page( client: Session, url: str, params: dict, max_retries: int = 3 ) -> dict | None: """GET JSON with retries on 429/5xx. Returns None on permanent failure. 403 means cookies expired — raises CookiesExpiredError immediately.""" for attempt in range(max_retries): try: resp = client.get(url, params=params, timeout=30) homecouk_requests_total.labels(status=_status_label(resp.status_code)).inc() if resp.status_code == 200: try: return resp.json() except json.JSONDecodeError: homecouk_errors_total.labels(type="json_decode").inc() log.error( "Non-JSON response from %s (got %s)", url, resp.headers.get("content-type", "?"), ) return None if resp.status_code == 403: raise CookiesExpiredError("HTTP 403 — cookies likely expired") if resp.status_code in (429, 500, 502, 503, 504): delay = RETRY_BASE_DELAY * (2**attempt) + random.uniform(0, 1) log.warning( "HTTP %d from %s, retry %d/%d in %.1fs", resp.status_code, url, attempt + 1, max_retries, delay, ) time.sleep(delay) continue log.error("HTTP %d from %s (non-retryable)", resp.status_code, url) return None except CookiesExpiredError: raise except RequestsError as e: homecouk_errors_total.labels(type=type(e).__name__).inc() delay = RETRY_BASE_DELAY * (2**attempt) + random.uniform(0, 1) log.warning( "%s from %s, retry %d/%d in %.1fs", type(e).__name__, url, attempt + 1, max_retries, delay, ) time.sleep(delay) homecouk_errors_total.labels(type="retry_exhausted").inc() log.error("All %d retries exhausted for %s", max_retries, url) return None def parse_floor_area(description: str | None) -> float | None: """Try to extract floor area from description text like '789 sq.ft.' or '73 sq.m.'.""" if not description: return None m = re.search(r"([\d,]+(?:\.\d+)?)\s*sq\.?\s*ft", description, re.IGNORECASE) if m: sqft = float(m.group(1).replace(",", "")) return validate_floor_area(round(sqft * 0.092903, 1)) m = re.search(r"([\d,]+(?:\.\d+)?)\s*sq\.?\s*m", description, re.IGNORECASE) if m: return validate_floor_area(round(float(m.group(1).replace(",", "")), 1)) return None def parse_tenure(prop: dict) -> str | None: """Extract tenure from home.co.uk property data. Checks multiple sources in priority order: 1. Dedicated 'tenure' or 'tenure_type' field in the API response 2. Free-text search in the description for 'freehold' / 'leasehold' 3. Free-text search in features lists home.co.uk aggregates listings from estate agents, so tenure is often embedded in the description text rather than a structured field. """ # 1. Check dedicated tenure fields (in case the API adds them) for key in ("tenure", "tenure_type", "tenureType"): val = prop.get(key) if val and isinstance(val, str): lower = val.lower().strip() if "leasehold" in lower: return "Leasehold" if "freehold" in lower: return "Freehold" # 2. Check description text — estate agents often include tenure here description = prop.get("description") or "" if description: lower_desc = description.lower() if re.search(r"\bleasehold\b", lower_desc): return "Leasehold" if re.search(r"\bfreehold\b", lower_desc): # Matches "Freehold" and "Share of Freehold" (both = freehold ownership) return "Freehold" # 3. Check features / key_features lists if present for key in ("features", "key_features", "keyFeatures"): features = prop.get(key) if features and isinstance(features, list): for feat in features: if not isinstance(feat, str): continue lower_feat = feat.lower() if "leasehold" in lower_feat: return "Leasehold" if "freehold" in lower_feat: return "Freehold" return None def map_property_type(raw_type: str | None) -> str: """Map home.co.uk property type to canonical type.""" if not raw_type: return "Other" canonical = PROPERTY_TYPE_MAP.get(raw_type) if canonical: return canonical # Home.co.uk uses types like "House", "Flat", "Apartment", "Detached", etc. # Try common patterns lower = raw_type.lower() if ( "flat" in lower or "apartment" in lower or "maisonette" in lower or "studio" in lower ): return "Flats/Maisonettes" if "detached" in lower and "semi" not in lower: return "Detached" if "semi" in lower: return "Semi-Detached" if "terrace" in lower or "mews" in lower: return "Terraced" log.debug("Unknown property type: %r — mapping to Other", raw_type) return "Other" def transform_property( prop: dict, channel: str, pc_index: PostcodeSpatialIndex, ) -> dict | None: """Transform a raw home.co.uk property dict into our output schema.""" lat = prop.get("latitude") lng = prop.get("longitude") if lat is None or lng is None: return None # Validate coordinates are in England if not (49 <= lat <= 56 and -7 <= lng <= 2): log.debug("Coords outside England: lat=%.4f lng=%.4f — skipping", lat, lng) return None price = prop.get("price") or prop.get("latest_price") if not price or int(price) <= 0: return None # Home.co.uk provides postcodes directly, but fall back to spatial index postcode = prop.get("postcode") if not postcode: postcode = pc_index.nearest(lat, lng) if not postcode: log.debug("No postcode for property at %.4f, %.4f — skipping", lat, lng) return None raw_beds = prop.get("bedrooms", 0) or 0 raw_baths = prop.get("bathrooms", 0) or 0 bedrooms = raw_beds if raw_beds <= MAX_BEDROOMS else 0 bathrooms = raw_baths if raw_baths <= MAX_BEDROOMS else 0 if raw_beds > MAX_BEDROOMS or raw_baths > MAX_BEDROOMS: log.warning( "home.co.uk %s: implausible beds=%d baths=%d (capped to 0)", prop.get("listing_id") or prop.get("property_id") or "?", raw_beds, raw_baths, ) listing_type = prop.get("listing_property_type") or prop.get("property_type") or "" address = prop.get("display_address") or prop.get("address") or "" # Derive price qualifier from reduction info price_qualifier = "" if prop.get("is_reduced"): pct = prop.get("reduction_percent", 0) if pct: price_qualifier = f"Reduced by {pct}%" else: price_qualifier = "Reduced" listing_id = prop.get("listing_id") or prop.get("property_id") or "" return { "id": f"hk_{listing_id}", # prefix to avoid collision with Rightmove IDs "Bedrooms": bedrooms, "Bathrooms": bathrooms, "Number of bedrooms & living rooms": bedrooms + bathrooms, "lon": lng, "lat": lat, "Postcode": postcode, "Address per Property Register": address, "Leasehold/Freehold": parse_tenure(prop), "Property type": map_property_type(listing_type), "Property sub-type": listing_type.title() if listing_type else "Unknown", "price": int(price), "price_frequency": "" if channel == "BUY" else "monthly", "Price qualifier": price_qualifier, "Total floor area (sqm)": parse_floor_area(prop.get("description")), "Listing URL": f"{HOMECOUK_BASE}/property/{listing_id}", "Listing features": [], # not available from home.co.uk "first_visible_date": prop.get("added_date") or "", } def search_outcode( client: Session, outcode: str, channel: str, pc_index: PostcodeSpatialIndex, ) -> list[dict]: """Paginate through search results for one outcode+channel. channel: "BUY" or "RENT". Returns transformed properties.""" url_segment = HOMECOUK_CHANNELS[channel] url = f"{HOMECOUK_API_BASE}/{url_segment}/{outcode.lower()}/" properties = [] page = 1 while True: params = { "page": str(page), "sort": "date_desc", "per_page": str(HOMECOUK_PER_PAGE), } # Set referer to match the page URL pattern client.headers["referer"] = ( f"https://home.co.uk/{url_segment}/{outcode.lower()}/" f"?page={page}&sort=date_desc&per_page={HOMECOUK_PER_PAGE}" ) data = fetch_page(client, url, params) if not data: break raw_props = data.get("properties", []) if not raw_props: break for prop in raw_props: transformed = transform_property(prop, channel, pc_index) if transformed: properties.append(transformed) homecouk_properties_scraped.labels( channel="buy" if channel == "BUY" else "rent", ).inc() # Check pagination pagination = data.get("pagination", {}) last_page = pagination.get("last_page", 1) if page >= last_page: break page += 1 time.sleep(DELAY_BETWEEN_PAGES) return properties