"""OpenRent (openrent.co.uk) scraper — rental properties only. OpenRent is behind AWS WAF, so we use FlareSolverr to solve the challenge and get valid cookies. Then we use curl_cffi with Chrome TLS impersonation to make requests with those cookies (same pattern as homecouk.py). OpenRent is a rental-only platform, so this scraper only handles RENT channel. HTML parsing notes: OpenRent server-renders property cards in the search results page. Property cards use class "pli" (property list item). Each card contains a link to the property detail page, price, bedrooms, and address info. The CSS selectors below may need updating if OpenRent changes their markup. """ import logging import os import re import time import httpx from bs4 import BeautifulSoup from curl_cffi.requests import Session from curl_cffi.requests.errors import RequestsError from constants import ( DELAY_BETWEEN_PAGES, OPENRENT_BASE, PROPERTY_TYPE_MAP, RETRY_BASE_DELAY, ) from metrics import ( flaresolverr_attempts_total, openrent_errors_total, openrent_properties_scraped, openrent_requests_total, ) from spatial import PostcodeSpatialIndex log = logging.getLogger("openrent") class WafChallengeError(Exception): """Raised when OpenRent returns a WAF challenge, indicating cookies need refresh.""" FLARESOLVERR_URL = os.environ.get("FLARESOLVERR_URL", "http://flaresolverr:8191") # --------------------------------------------------------------------------- # Cookie / session management (mirrors homecouk.py pattern) # --------------------------------------------------------------------------- def solve_waf() -> tuple[dict[str, str], str] | None: """Use FlareSolverr to solve the AWS WAF challenge. Returns (cookies_dict, user_agent) or None on failure.""" log.info("Solving AWS WAF challenge via FlareSolverr at %s", FLARESOLVERR_URL) try: with httpx.Client(timeout=120) as client: resp = client.post( f"{FLARESOLVERR_URL}/v1", json={ "cmd": "request.get", "url": f"{OPENRENT_BASE}/properties-to-rent/?term=london&isLive=true", "maxTimeout": 60000, }, ) if resp.status_code != 200: log.error("FlareSolverr returned HTTP %d", resp.status_code) return None data = resp.json() if data.get("status") != "ok": log.error("FlareSolverr error: %s", data.get("message", "unknown")) return None solution = data["solution"] raw_cookies = solution.get("cookies", []) user_agent = solution.get("userAgent", "") cookies = {} for c in raw_cookies: name = c.get("name", "") if name: cookies[name] = c["value"] if not cookies: log.error("FlareSolverr solved but returned no cookies") flaresolverr_attempts_total.labels(result="no_cookies").inc() return None log.info( "AWS WAF solved — got %d cookies, UA: %s", len(cookies), user_agent[:60], ) flaresolverr_attempts_total.labels(result="success").inc() return cookies, user_agent except (httpx.ConnectError, httpx.ReadTimeout) as e: log.warning("FlareSolverr not available: %s", e) flaresolverr_attempts_total.labels(result="unavailable").inc() return None except Exception as e: log.error("FlareSolverr error: %s", e) flaresolverr_attempts_total.labels(result="error").inc() return None def load_cookies() -> tuple[dict[str, str], str] | None: """Get OpenRent cookies + user-agent. Tries FlareSolverr first, then falls back to environment variables.""" result = solve_waf() if result: return result # Fall back to env vars waf_token = os.environ.get("OPENRENT_WAF_TOKEN", "") if not waf_token: return None user_agent = os.environ.get( "OPENRENT_USER_AGENT", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/145.0.0.0 Safari/537.36", ) return {"aws-waf-token": waf_token}, user_agent def make_client(cookies: dict[str, str], user_agent: str) -> Session: """Create a curl_cffi Session configured for OpenRent. Uses Chrome TLS impersonation so AWS WAF cookies remain valid.""" session = Session(impersonate="chrome") session.headers.update({ "User-Agent": user_agent, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-GB,en;q=0.9", }) for name, value in cookies.items(): session.cookies.set(name, value, domain="openrent.co.uk") return session # --------------------------------------------------------------------------- # HTTP fetch with retry # --------------------------------------------------------------------------- def _status_label(code: int) -> str: if code >= 500: return "5xx" return str(code) def fetch_page( client: Session, url: str, max_retries: int = 3, ) -> str | None: """GET HTML with retries on 429/5xx. Returns None on permanent failure. WAF challenge (202 or 403 with challenge JS) raises WafChallengeError.""" for attempt in range(max_retries): try: resp = client.get(url, timeout=30) openrent_requests_total.labels(status=_status_label(resp.status_code)).inc() if resp.status_code == 200: html = resp.text # Detect WAF challenge page masquerading as 200 if "AwsWafIntegration" in html and "challenge.js" in html: raise WafChallengeError("Got AWS WAF challenge page — cookies expired") return html if resp.status_code in (202, 403): raise WafChallengeError(f"HTTP {resp.status_code} — cookies likely expired") if resp.status_code in (429, 500, 502, 503, 504): delay = RETRY_BASE_DELAY * (2 ** attempt) log.warning( "HTTP %d from %s, retry %d/%d in %.1fs", resp.status_code, url, attempt + 1, max_retries, delay, ) time.sleep(delay) continue log.error("HTTP %d from %s (non-retryable)", resp.status_code, url) return None except WafChallengeError: raise except RequestsError as e: openrent_errors_total.labels(type=type(e).__name__).inc() delay = RETRY_BASE_DELAY * (2 ** attempt) log.warning( "%s from %s, retry %d/%d in %.1fs", type(e).__name__, url, attempt + 1, max_retries, delay, ) time.sleep(delay) openrent_errors_total.labels(type="retry_exhausted").inc() log.error("All %d retries exhausted for %s", max_retries, url) return None # --------------------------------------------------------------------------- # HTML parsing # --------------------------------------------------------------------------- def _extract_price(text: str) -> tuple[int, str] | None: """Extract price and frequency from text like '£1,500 pcm' or '£350 pw'. Returns (price_int, frequency) or None.""" match = re.search(r"£([\d,]+)", text) if not match: return None price = int(match.group(1).replace(",", "")) lower = text.lower() if "pw" in lower or "per week" in lower or "/w" in lower: return price, "weekly" if "pa" in lower or "per annum" in lower or "/y" in lower: return price, "yearly" # OpenRent defaults to pcm (per calendar month) return price, "monthly" def _extract_bedrooms_from_title(title: str) -> int | None: """Extract bedroom count from title like '2 Bed Flat, Pimlico'.""" match = re.search(r"(\d+)\s*bed", title, re.IGNORECASE) if match: return int(match.group(1)) if re.search(r"\bstudio\b", title, re.IGNORECASE): return 0 return None def _extract_postcode(text: str) -> str | None: """Extract full UK postcode from text like '2 Bed Flat, Pimlico, SW1V 2AA'.""" match = re.search(r"([A-Z]{1,2}\d[A-Z0-9]?\s*\d[A-Z]{2})", text, re.IGNORECASE) if match: return match.group(1).upper().strip() return None def parse_search_results(html: str) -> list[dict]: """Parse property data from OpenRent search results HTML. Returns list of raw property dicts extracted from property cards. Uses multiple fallback selectors for resilience against markup changes. """ soup = BeautifulSoup(html, "html.parser") properties = [] # Try known selectors for property cards (most specific first) cards = soup.select("a.pli") if not cards: cards = soup.select(".pli.clearfix") if not cards: cards = soup.select("[class*='propertyListing']") if not cards: # Last resort: look for links that match property URL pattern cards = soup.find_all("a", href=re.compile(r"/property-to-rent/")) if not cards: log.warning( "No property cards found in search HTML (%d bytes). " "CSS selectors may need updating.", len(html), ) return [] for card in cards: prop: dict = {} # Extract property URL and ID from href href = card.get("href", "") if not href: continue prop["url"] = href if href.startswith("http") else OPENRENT_BASE + href id_match = re.search(r"/(\d+)(?:\?|$|#)", href) if id_match: prop["id"] = id_match.group(1) else: continue # can't use a property without an ID # Extract card text for parsing card_text = card.get_text(" ", strip=True) # Price price_result = _extract_price(card_text) if price_result: prop["price"], prop["frequency"] = price_result # Title / address — try specific elements first, fall back to card text title_el = card.select_one( ".listing-title, .banda, h2, h3, [class*='title']" ) prop["title"] = ( title_el.get_text(strip=True) if title_el else card_text[:200] ) # Bedrooms from title text beds = _extract_bedrooms_from_title(prop["title"]) if beds is not None: prop["bedrooms"] = beds # Postcode from title postcode = _extract_postcode(prop["title"]) if postcode: prop["postcode"] = postcode # Coordinates from data attributes (if present on card or child elements) for el in [card] + card.select("[data-lat], [data-latitude]"): lat = el.get("data-lat") or el.get("data-latitude") lng = ( el.get("data-lng") or el.get("data-longitude") or el.get("data-lon") ) if lat and lng: try: prop["lat"] = float(lat) prop["lng"] = float(lng) except ValueError: pass break properties.append(prop) log.debug("Parsed %d property cards from search HTML", len(properties)) return properties def parse_property_detail(html: str) -> dict: """Parse a single property detail page for additional data. Extracts: bedrooms, bathrooms, price, property_type, postcode, lat/lng (from map data), description (for floor area). """ soup = BeautifulSoup(html, "html.parser") details: dict = {} # Parse structured data tables (class "table table-striped") for table in soup.select("table.table-striped, table.table"): for row in table.select("tr"): cells = row.select("td, th") if len(cells) < 2: continue label = cells[0].get_text(strip=True).lower() value = cells[1].get_text(strip=True) if "bedroom" in label: match = re.search(r"(\d+)", value) if match: details["bedrooms"] = int(match.group(1)) elif "bathroom" in label: match = re.search(r"(\d+)", value) if match: details["bathrooms"] = int(match.group(1)) elif "rent" in label or "price" in label: match = re.search(r"£([\d,]+)", value) if match: details["price"] = int(match.group(1).replace(",", "")) elif "type" in label: details["property_type"] = value elif "available" in label or "move" in label: details["available_date"] = value elif "furnish" in label: details["furnished"] = value # Extract postcode from page title / address heading title_tag = soup.select_one("h1, .property-title, [class*='title']") if title_tag: title_text = title_tag.get_text(strip=True) details["title"] = title_text postcode = _extract_postcode(title_text) if postcode: details["postcode"] = postcode # Extract coordinates from map element data attributes map_el = soup.select_one("[data-lat], [data-latitude]") if map_el: lat = map_el.get("data-lat") or map_el.get("data-latitude") lng = ( map_el.get("data-lng") or map_el.get("data-longitude") or map_el.get("data-lon") ) if lat and lng: try: details["lat"] = float(lat) details["lng"] = float(lng) except ValueError: pass # Also check for coordinates in JSON-LD or inline JavaScript if "lat" not in details: for script in soup.select("script"): text = script.string or "" lat_match = re.search(r'"latitude"\s*:\s*([\d.-]+)', text) lng_match = re.search(r'"longitude"\s*:\s*([\d.-]+)', text) if lat_match and lng_match: try: details["lat"] = float(lat_match.group(1)) details["lng"] = float(lng_match.group(1)) except ValueError: pass break # Extract description for floor area parsing desc_el = soup.select_one( ".description, [class*='description'], #description" ) if desc_el: details["description"] = desc_el.get_text(strip=True) return details # --------------------------------------------------------------------------- # Property type mapping & floor area # --------------------------------------------------------------------------- def map_property_type(raw_type: str | None) -> str: """Map OpenRent property type to canonical type.""" if not raw_type: return "Other" canonical = PROPERTY_TYPE_MAP.get(raw_type) if canonical: return canonical lower = raw_type.lower() if "room" in lower or "shared" in lower: return "Other" if "flat" in lower or "apartment" in lower or "maisonette" in lower or "studio" in lower: return "Flats/Maisonettes" if "detached" in lower and "semi" not in lower: return "Detached" if "semi" in lower: return "Semi-Detached" if "terrace" in lower or "mews" in lower: return "Terraced" if "house" in lower: return "Detached" log.debug("Unknown property type: %r — mapping to Other", raw_type) return "Other" def parse_floor_area(description: str | None) -> float | None: """Try to extract floor area from description text.""" if not description: return None m = re.search(r"([\d,]+(?:\.\d+)?)\s*sq\.?\s*ft", description, re.IGNORECASE) if m: sqft = float(m.group(1).replace(",", "")) return round(sqft * 0.092903, 1) m = re.search(r"([\d,]+(?:\.\d+)?)\s*sq\.?\s*m", description, re.IGNORECASE) if m: return round(float(m.group(1).replace(",", "")), 1) return None # --------------------------------------------------------------------------- # Transform & search # --------------------------------------------------------------------------- def transform_property( search_data: dict, detail_data: dict | None, pc_index: PostcodeSpatialIndex, pc_coords: dict[str, tuple[float, float]], ) -> dict | None: """Transform OpenRent property data into our output schema. Merges data from the search results page and (optionally) the detail page. Uses pc_coords (postcode → lat/lng) as a fallback when coordinates are missing but a postcode is available. """ detail = detail_data or {} # Merge: detail page data takes precedence lat = detail.get("lat") or search_data.get("lat") lng = detail.get("lng") or search_data.get("lng") price = detail.get("price") or search_data.get("price") if not price: return None frequency = search_data.get("frequency", "monthly") # Get postcode: detail page > search card > spatial index postcode = detail.get("postcode") or search_data.get("postcode") if lat is not None and lng is not None: # Validate coordinates are in England if not (49 <= lat <= 56 and -7 <= lng <= 2): log.debug("Coords outside England: lat=%.4f lng=%.4f — skipping", lat, lng) return None if not postcode: postcode = pc_index.nearest(lat, lng) elif postcode: # Have postcode but no coordinates — look up centroid from arcgis data coords = pc_coords.get(postcode) if coords: lat, lng = coords else: log.debug("Postcode %s not in arcgis data — skipping", postcode) return None else: return None if not postcode: log.debug("No postcode for property — skipping") return None bedrooms = detail.get("bedrooms") or search_data.get("bedrooms", 0) or 0 bathrooms = detail.get("bathrooms", 0) or 0 title = detail.get("title") or search_data.get("title", "") address = title.split(",")[0].strip() if title else "" property_type = detail.get("property_type", "") # Infer from title if not found in detail page if not property_type and title: lower = title.lower() if "flat" in lower or "apartment" in lower: property_type = "Flat" elif "studio" in lower: property_type = "Studio" elif "maisonette" in lower: property_type = "Maisonette" elif "house" in lower: property_type = "House" elif "room" in lower: property_type = "Room" prop_id = search_data.get("id", "") listing_url = search_data.get( "url", f"{OPENRENT_BASE}/{prop_id}" if prop_id else "", ) description = detail.get("description", "") return { "id": f"or_{prop_id}", "Bedrooms": bedrooms, "Bathrooms": bathrooms, "Number of bedrooms & living rooms": bedrooms + bathrooms, "lon": lng, "lat": lat, "Postcode": postcode, "Address per Property Register": address, "Leasehold/Freehold": None, "Property type": map_property_type(property_type), "Property sub-type": property_type or "Unknown", "price": int(price), "price_frequency": frequency, "Price qualifier": "", "Total floor area (sqm)": parse_floor_area(description), "Listing URL": listing_url, "Listing features": [], "first_visible_date": detail.get("available_date", ""), } def search_outcode( client: Session, outcode: str, pc_index: PostcodeSpatialIndex, pc_coords: dict[str, tuple[float, float]], fetch_details: bool = True, ) -> list[dict]: """Search OpenRent for rental properties in one outcode. 1. Fetches the search results page for the outcode 2. Parses property cards from the HTML 3. Optionally fetches each property's detail page for full data 4. Transforms to common output schema Args: fetch_details: If True, visits each property's detail page for coordinates and extra data. Slower but more complete. If False, relies only on search card data + postcode lookup. """ search_url = f"{OPENRENT_BASE}/properties-to-rent/?term={outcode}&isLive=true" html = fetch_page(client, search_url) if not html: return [] search_results = parse_search_results(html) if not search_results: return [] properties = [] for search_data in search_results: detail_data = None if fetch_details and search_data.get("url"): detail_html = fetch_page(client, search_data["url"]) if detail_html: detail_data = parse_property_detail(detail_html) # Shorter delay for detail pages (within same outcode) time.sleep(DELAY_BETWEEN_PAGES * 0.5) transformed = transform_property( search_data, detail_data, pc_index, pc_coords, ) if transformed: properties.append(transformed) openrent_properties_scraped.labels(channel="rent").inc() return properties