"""Zoopla (zoopla.co.uk) scraper — sale properties. Zoopla is behind Cloudflare Turnstile (managed interactive challenge), which blocks all HTTP clients (curl_cffi, httpx) and even Playwright with stealth patches. Only Camoufox (an anti-fingerprinting Firefox fork) passes reliably. Zoopla uses Next.js App Router with React Server Components (RSC). Search result data is server-rendered in an RSC stream, not available via __NEXT_DATA__ or a JSON API. Architecture: Unlike the other scrapers which use HTTP clients per outcode, Zoopla keeps a single Camoufox browser alive for the entire scrape. For each outcode, it: 1. Navigates directly to the sale search URL 2. Extracts listing data from the rendered DOM 3. Handles pagination via ?pn=N parameter The browser session replaces the cookie/client pattern used by other scrapers. """ import logging import re import time from constants import DELAY_BETWEEN_PAGES, MAX_BEDROOMS, PROPERTY_TYPE_MAP, ZOOPLA_BASE from spatial import PostcodeSpatialIndex from transform import normalize_sub_type, validate_floor_area log = logging.getLogger("zoopla") class TurnstileError(Exception): """Raised when Cloudflare Turnstile challenge cannot be passed.""" class _ManagedCamoufoxBrowser: def __init__(self, context_manager, browser): self._context_manager = context_manager self._browser = browser self._closed = False def close(self) -> None: if self._closed: return self._closed = True try: self._browser.close() finally: self._context_manager.__exit__(None, None, None) def __getattr__(self, name): return getattr(self._browser, name) # Maximum search result pages to scrape per outcode (25 listings/page) MAX_PAGES_PER_OUTCODE = 40 # JavaScript to extract listings from the rendered DOM. # Uses data-testid attributes as primary selectors (stable across deployments), # then falls back to href-based link matching with parent-walking. _EXTRACT_LISTINGS_JS = r"""() => { const seen = new Set(); const results = []; // Strategy 1: Use data-testid selectors (post-2025 redesign) const listingCards = document.querySelectorAll( '[data-testid="regular-listings"] > div, [data-testid="search-content"] li' ); for (const card of listingCards) { const link = card.querySelector( 'a[href*="/for-sale/details/"], a[href*="/new-homes/details/"]' ); if (!link) continue; const href = link.href; const match = href.match(/\/details\/(\d+)\//); if (!match) continue; const id = match[1]; if (seen.has(id)) continue; seen.add(id); const text = card.innerText || ''; // Try data-testid price element first, then regex const priceEl = card.querySelector('[data-testid="listing-price"]'); const priceText = priceEl ? priceEl.innerText : text; const priceMatch = priceText.match(/\u00a3([\d,]+)/); // Try address element first, then regex const addressEl = card.querySelector('address'); let address = addressEl ? addressEl.innerText.trim() : ''; if (!address) { const lines = text.split('\n').map(l => l.trim()).filter(Boolean); for (const line of lines) { if (/[A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2}/i.test(line) || (line.includes(',') && !line.includes('\u00a3') && !/^\d+ beds?/i.test(line))) { address = line; break; } } } const bedsMatch = text.match(/(\d+)\s*beds?/i); const bathsMatch = text.match(/(\d+)\s*baths?/i); const recMatch = text.match(/(\d+)\s*reception/i); const areaMatch = text.match(/([\d,]+)\s*sq\.?\s*ft/i); let tenure = ''; if (/leasehold/i.test(text)) tenure = 'Leasehold'; else if (/freehold/i.test(text)) tenure = 'Freehold'; // Extract property type (e.g., "2 bed flat for sale" → "flat") let property_type = ''; const ptMatch = text.match(/\d+\s*(?:beds?|bedrooms?)\s+([\w\s-]+?)\s+for\s+sale/i); if (ptMatch) property_type = ptMatch[1].trim(); else if (/\bstudio\s*(?:flat|apartment)?\s+for\s+sale/i.test(text)) property_type = 'Studio'; // Keyword fallback when regex doesn't match current DOM format if (!property_type) { const lower = text.toLowerCase(); if (/\bstudio\b/.test(lower)) property_type = 'Studio'; else if (/\bpenthouse\b/.test(lower)) property_type = 'Penthouse'; else if (/\bmaisonette\b/.test(lower)) property_type = 'Maisonette'; else if (/\bapartment\b/.test(lower)) property_type = 'Apartment'; else if (/\bflat\b/.test(lower)) property_type = 'Flat'; else if (/\bsemi[- ]?detached\b/.test(lower)) property_type = 'Semi-Detached'; else if (/\bdetached\b/.test(lower)) property_type = 'Detached'; else if (/\bterraced?\b/.test(lower)) property_type = 'Terraced'; else if (/\bbungalow\b/.test(lower)) property_type = 'Bungalow'; else if (/\bcottage\b/.test(lower)) property_type = 'Cottage'; else if (/\bhouse\b/.test(lower)) property_type = 'House'; } results.push({ id, url: href.replace(window.location.origin, ''), price: priceMatch ? parseInt(priceMatch[1].replace(/,/g, '')) : null, price_text: priceText.trim(), beds: bedsMatch && parseInt(bedsMatch[1]) <= 20 ? parseInt(bedsMatch[1]) : null, baths: bathsMatch && parseInt(bathsMatch[1]) <= 20 ? parseInt(bathsMatch[1]) : null, receptions: recMatch && parseInt(recMatch[1]) <= 20 ? parseInt(recMatch[1]) : null, floor_area_sqft: areaMatch ? parseInt(areaMatch[1].replace(/,/g, '')) : null, address, tenure, property_type, }); } // Strategy 2: Fall back to href-based link matching with parent-walking if (results.length === 0) { const links = Array.from(document.querySelectorAll( 'a[href*="/for-sale/details/"], a[href*="/new-homes/details/"]' )); for (const link of links) { const href = link.href; const match = href.match(/\/details\/(\d+)\//); if (!match) continue; const id = match[1]; if (seen.has(id)) continue; seen.add(id); let card = link; for (let j = 0; j < 15; j++) { card = card.parentElement; if (!card) break; const t = card.innerText || ''; if (t.includes('\u00a3') && (t.includes('bed') || t.includes('Bath') || t.includes('sq ft'))) { break; } } if (!card) continue; const text = card.innerText || ''; const lines = text.split('\n').map(l => l.trim()).filter(Boolean); const priceEl2 = card.querySelector('[data-testid="listing-price"]'); const priceText2 = priceEl2 ? priceEl2.innerText : text; const priceMatch = priceText2.match(/\u00a3([\d,]+)/); const bedsMatch = text.match(/(\d+)\s*beds?/i); const bathsMatch = text.match(/(\d+)\s*baths?/i); const recMatch = text.match(/(\d+)\s*reception/i); const areaMatch = text.match(/([\d,]+)\s*sq\.?\s*ft/i); let address = ''; for (const line of lines) { if (/[A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2}/i.test(line) || (line.includes(',') && !line.includes('\u00a3') && !/^\d+ beds?/i.test(line))) { address = line; break; } } let tenure = ''; if (/leasehold/i.test(text)) tenure = 'Leasehold'; else if (/freehold/i.test(text)) tenure = 'Freehold'; // Extract property type let property_type = ''; const ptMatch2 = text.match(/\d+\s*(?:beds?|bedrooms?)\s+([\w\s-]+?)\s+for\s+sale/i); if (ptMatch2) property_type = ptMatch2[1].trim(); else if (/\bstudio\s*(?:flat|apartment)?\s+for\s+sale/i.test(text)) property_type = 'Studio'; // Keyword fallback when regex doesn't match current DOM format if (!property_type) { const lower = text.toLowerCase(); if (/\bstudio\b/.test(lower)) property_type = 'Studio'; else if (/\bpenthouse\b/.test(lower)) property_type = 'Penthouse'; else if (/\bmaisonette\b/.test(lower)) property_type = 'Maisonette'; else if (/\bapartment\b/.test(lower)) property_type = 'Apartment'; else if (/\bflat\b/.test(lower)) property_type = 'Flat'; else if (/\bsemi[- ]?detached\b/.test(lower)) property_type = 'Semi-Detached'; else if (/\bdetached\b/.test(lower)) property_type = 'Detached'; else if (/\bterraced?\b/.test(lower)) property_type = 'Terraced'; else if (/\bbungalow\b/.test(lower)) property_type = 'Bungalow'; else if (/\bcottage\b/.test(lower)) property_type = 'Cottage'; else if (/\bhouse\b/.test(lower)) property_type = 'House'; } results.push({ id, url: href.replace(window.location.origin, ''), price: priceMatch ? parseInt(priceMatch[1].replace(/,/g, '')) : null, price_text: priceText2.trim(), beds: bedsMatch && parseInt(bedsMatch[1]) <= 20 ? parseInt(bedsMatch[1]) : null, baths: bathsMatch && parseInt(bathsMatch[1]) <= 20 ? parseInt(bathsMatch[1]) : null, receptions: recMatch && parseInt(recMatch[1]) <= 20 ? parseInt(recMatch[1]) : null, floor_area_sqft: areaMatch ? parseInt(areaMatch[1].replace(/,/g, '')) : null, address, tenure, property_type, }); } } return results; }""" # JavaScript to dismiss the Usercentrics cookie consent overlay (shadow DOM). _DISMISS_COOKIES_JS = """() => { const aside = document.querySelector('#usercentrics-cmp-ui'); if (aside && aside.shadowRoot) { const btns = aside.shadowRoot.querySelectorAll('button'); for (const btn of btns) { if (btn.innerText.includes('Accept')) { btn.click(); return true; } } } if (aside) { aside.remove(); return true; } return false; }""" # --------------------------------------------------------------------------- # Browser lifecycle # --------------------------------------------------------------------------- def launch_browser(): """Launch Camoufox, navigate to Zoopla homepage, pass Cloudflare Turnstile, and dismiss cookie consent. Returns (browser, page) tuple. Raises TurnstileError if Cloudflare cannot be passed within two minutes. Caller must close browser when done.""" from camoufox.pkgman import camoufox_path # Standalone local runs should not require the old container image to have # pre-fetched Camoufox. camoufox_path(download_if_missing=True) from camoufox.sync_api import Camoufox log.info("Launching Camoufox browser for Zoopla...") camoufox = Camoufox(headless=True) raw_browser = camoufox.__enter__() browser = _ManagedCamoufoxBrowser(camoufox, raw_browser) page = browser.new_page() log.info("Navigating to Zoopla homepage...") page.goto(f"{ZOOPLA_BASE}/", wait_until="domcontentloaded", timeout=60000) # Wait for Cloudflare Turnstile to resolve. # Try clicking the Turnstile checkbox if present (helps in some cases). for i in range(40): if "Just a moment" not in page.title(): break # Attempt to click the Turnstile checkbox in the challenge iframe for frame in page.frames: if "challenges.cloudflare.com" in frame.url: try: iframe_el = page.query_selector('iframe[src*="challenges.cloudflare"]') if iframe_el: box = iframe_el.bounding_box() if box: page.mouse.click(box["x"] + 30, box["y"] + box["height"] / 2) except Exception: pass break time.sleep(3) else: page.close() browser.close() raise TurnstileError("Cloudflare Turnstile did not resolve after 120s") log.info("Cloudflare passed — title: %s", page.title()) time.sleep(2) # Dismiss cookie consent page.evaluate(_DISMISS_COOKIES_JS) time.sleep(1) return browser, page def _ensure_not_challenged(page) -> None: """Check if current page is a Cloudflare challenge and wait/raise.""" if "Just a moment" not in page.title(): return log.warning("Cloudflare challenge detected mid-session, waiting...") for i in range(40): time.sleep(3) if "Just a moment" not in page.title(): log.info("Cloudflare challenge resolved") return raise TurnstileError("Cloudflare re-challenge did not resolve after 120s") # --------------------------------------------------------------------------- # Search navigation # --------------------------------------------------------------------------- def _wait_for_listing_content(page) -> None: """Wait for rendered listing cards to contain usable text.""" try: page.wait_for_function( """() => { const cards = document.querySelectorAll( '[data-testid="regular-listings"] > div' ); if (cards.length === 0) return false; for (const card of cards) { const t = card.innerText || ''; if (t.includes('\\u00a3') && t.length > 50) return true; } return false; }""", timeout=8000, ) except Exception: time.sleep(1.5) def _navigate_search(page, outcode: str) -> bool: """Navigate directly to sale search results for an outcode. Returns True if results were found, False if no results or navigation failed. Raises TurnstileError if Cloudflare blocks us.""" url = ( f"{ZOOPLA_BASE}/for-sale/property/{outcode.lower()}/" f"?q={outcode}&search_source=home" ) try: page.goto(url, wait_until="domcontentloaded", timeout=30000) except Exception as exc: log.debug("Zoopla direct navigation failed for %s: %s", outcode, exc) return False _ensure_not_challenged(page) # Dismiss cookie consent (may reappear after navigation) try: page.evaluate(_DISMISS_COOKIES_JS) except Exception: pass try: page.wait_for_selector( '[data-testid="regular-listings"], a[href*="/for-sale/details/"], a[href*="/new-homes/details/"]', timeout=10000, ) except Exception: if not page.query_selector('a[href*="/details/"]'): return False _wait_for_listing_content(page) return True def _get_result_count(page) -> int: """Extract the total results count from the page. Tries __ZAD_TARGETING__ JSON first (most reliable), then body text regex matching both "N results" and "N properties" patterns.""" try: # Try the ZAD targeting JSON script tag first count = page.evaluate("""() => { const s = document.querySelector('#__ZAD_TARGETING__'); if (s) { try { const d = JSON.parse(s.textContent); if (d.search_results_count != null) return d.search_results_count; } catch(e) {} } return null; }""") if count is not None and count > 0: return count except Exception: pass try: body = page.inner_text("body") match = re.search(r"([\d,]+)\s+(?:results?|properties)", body) if match: return int(match.group(1).replace(",", "")) except Exception: pass return 0 # --------------------------------------------------------------------------- # Extraction and pagination # --------------------------------------------------------------------------- _first_extraction_logged = False def _extract_listings(page) -> list[dict]: """Extract listing data from the current search results page DOM.""" global _first_extraction_logged try: listings = page.evaluate(_EXTRACT_LISTINGS_JS) # Log diagnostic info on the very first extraction attempt if not _first_extraction_logged: _first_extraction_logged = True try: diag = page.evaluate("""() => { const details = document.querySelectorAll('a[href*="/details/"]'); const testids = document.querySelectorAll('[data-testid]'); const testidNames = [...new Set([...testids].map(e => e.dataset.testid))]; return { url: location.href, title: document.title, detailLinks: details.length, testids: testidNames.slice(0, 30), bodySnippet: document.body?.innerText?.slice(0, 500) || '', }; }""") log.info( "Zoopla first-page diagnostic: url=%s title=%s detailLinks=%d " "testids=%s bodySnippet=%.200s", diag.get("url"), diag.get("title"), diag.get("detailLinks", 0), diag.get("testids", []), diag.get("bodySnippet", ""), ) except Exception: pass log.info("Zoopla first extraction: %d listings found", len(listings)) return listings except Exception as e: log.warning("Failed to extract listings from DOM: %s", e) return [] def _paginate( page, total_results: int, max_properties: int | None = None, ) -> list[dict]: """Extract listings from all pages of search results. Page 1 is already loaded. For subsequent pages, clicks the Next button or navigates via URL parameter ?pn=N.""" all_listings = _extract_listings(page) if max_properties is not None and len(all_listings) >= max_properties: return all_listings[:max_properties] if not all_listings or total_results <= len(all_listings): return all_listings seen_ids = {listing["id"] for listing in all_listings} current_url = page.url page_num = 2 while len(all_listings) < total_results and page_num <= MAX_PAGES_PER_OUTCODE: time.sleep(DELAY_BETWEEN_PAGES) # Try navigating via URL parameter if "?" in current_url: next_url = re.sub(r"[?&]pn=\d+", "", current_url) separator = "&" if "?" in next_url else "?" next_url = f"{next_url}{separator}pn={page_num}" else: next_url = f"{current_url}?pn={page_num}" try: page.goto(next_url, wait_until="domcontentloaded", timeout=30000) _ensure_not_challenged(page) _wait_for_listing_content(page) except TurnstileError: raise except Exception as e: log.debug("Pagination navigation failed at page %d: %s", page_num, e) break page_listings = _extract_listings(page) if not page_listings: break # Deduplicate within this outcode new_count = 0 for listing in page_listings: if listing["id"] not in seen_ids: seen_ids.add(listing["id"]) all_listings.append(listing) new_count += 1 if max_properties is not None and len(all_listings) >= max_properties: return all_listings[:max_properties] if new_count == 0: break # No new listings on this page page_num += 1 return all_listings # --------------------------------------------------------------------------- # Property transformation # --------------------------------------------------------------------------- # Cached outcode → (postcode, lat, lng) lookups to avoid repeated O(n) scans # over 2.26M postcodes. Populated lazily on first lookup per outcode. _outcode_coords_cache: dict[str, tuple[str, float, float] | None] = {} def _resolve_outcode_coords( outcode: str, pc_coords: dict[str, tuple[float, float]] ) -> tuple[str, float, float] | None: """Find first postcode + coords for an outcode. Result is cached.""" if outcode in _outcode_coords_cache: return _outcode_coords_cache[outcode] prefix = outcode + " " for pcd, (lat, lng) in pc_coords.items(): if pcd.startswith(prefix) or ( len(outcode) >= 4 and pcd.startswith(outcode) and len(pcd) > len(outcode) ): _outcode_coords_cache[outcode] = (pcd, lat, lng) return (pcd, lat, lng) _outcode_coords_cache[outcode] = None return None def _extract_postcode(text: str) -> str | None: """Extract a full UK postcode from text like 'Dollar Bay Place, Canary Wharf E14 9SS'. Normalizes to include a space before the 3-char incode.""" match = re.search(r"([A-Z]{1,2}\d[A-Z0-9]?\s*\d[A-Z]{2})", text, re.IGNORECASE) if match: raw = match.group(1).upper().strip() # Ensure space before incode (last 3 chars): "SW1A1AA" → "SW1A 1AA" if " " not in raw and len(raw) >= 5: return raw[:-3] + " " + raw[-3:] return raw return None def _extract_outcode(text: str) -> str | None: """Extract a UK outcode from address text like 'Whitechapel Road, London E1'.""" # Look for outcode at end of string or after last comma match = re.search(r"\b([A-Z]{1,2}\d[A-Z0-9]?)\s*$", text.strip(), re.IGNORECASE) if match: return match.group(1).upper() # Try after comma parts = text.split(",") if len(parts) > 1: last = parts[-1].strip() match = re.match(r"^([A-Z]{1,2}\d[A-Z0-9]?)$", last, re.IGNORECASE) if match: return match.group(1).upper() return None def _map_property_type(raw_type: str | None) -> str: """Map Zoopla property type text to canonical type.""" if not raw_type: return "Other" # Exact match (handles Rightmove-style capitalised values) canonical = PROPERTY_TYPE_MAP.get(raw_type) if canonical: return canonical # Title-case match (handles regex-extracted lowercase like "town house" → "Town House") canonical = PROPERTY_TYPE_MAP.get(raw_type.title()) if canonical: return canonical # Lowercase match (e.g., "Townhouse" → "townhouse") canonical = PROPERTY_TYPE_MAP.get(raw_type.lower()) if canonical: return canonical # Normalize delimiters (underscores/hyphens → spaces) and try again normalized = re.sub(r"[-_]+", " ", raw_type).strip().title() canonical = PROPERTY_TYPE_MAP.get(normalized) if canonical: return canonical # Keyword fallback lower = raw_type.lower() if "flat" in lower or "apartment" in lower or "maisonette" in lower or "studio" in lower or "penthouse" in lower: return "Flats/Maisonettes" if "semi" in lower and "detach" in lower: return "Semi-Detached" if "detach" in lower: return "Detached" if "terrace" in lower or "mews" in lower: return "Terraced" if "house" in lower: return "Detached" return "Other" def transform_property( raw: dict, pc_index: PostcodeSpatialIndex, pc_coords: dict[str, tuple[float, float]], search_outcode: str | None = None, ) -> dict | None: """Transform a raw Zoopla listing dict into the standard output schema. Zoopla search cards do not include coordinates, so we resolve lat/lng from postcodes extracted from the address text.""" price = raw.get("price") if not price or int(price) <= 0: return None address = raw.get("address", "") # Resolve postcode and coordinates from address postcode = _extract_postcode(address) lat = lng = None if postcode: coords = pc_coords.get(postcode) if coords: lat, lng = coords if lat is None: # Try outcode-level fallback from address text addr_outcode = _extract_outcode(address) if addr_outcode: result = _resolve_outcode_coords(addr_outcode, pc_coords) if result: postcode, lat, lng = result # Final fallback: use the outcode we know we're searching if lat is None and search_outcode: result = _resolve_outcode_coords(search_outcode, pc_coords) if result: postcode, lat, lng = result if lat is None or lng is None or not postcode: return None # Validate coordinates are in England if not (49 <= lat <= 56 and -7 <= lng <= 2): return None raw_beds = raw.get("beds") or 0 raw_baths = raw.get("baths") or 0 bedrooms = raw_beds if raw_beds <= MAX_BEDROOMS else 0 bathrooms = raw_baths if raw_baths <= MAX_BEDROOMS else 0 if raw_beds > MAX_BEDROOMS or raw_baths > MAX_BEDROOMS: log.warning( "Zoopla %s: implausible beds=%d baths=%d (capped to 0)", raw.get("id", "?"), raw_beds, raw_baths, ) receptions = raw.get("receptions") or 0 # Floor area: convert sq ft to sq m floor_area_sqm = None sqft = raw.get("floor_area_sqft") if sqft: floor_area_sqm = validate_floor_area(round(sqft * 0.092903, 1)) listing_id = raw.get("id", "") listing_url = raw.get("url", "") if listing_url and not listing_url.startswith("http"): listing_url = ZOOPLA_BASE + listing_url return { "id": f"zp_{listing_id}", "Bedrooms": bedrooms, "Bathrooms": bathrooms, "Number of bedrooms & living rooms": bedrooms + receptions, "lon": lng, "lat": lat, "Postcode": postcode, "Address per Property Register": address, "Leasehold/Freehold": raw.get("tenure") or None, "Property type": _map_property_type(raw.get("property_type")), "Property sub-type": normalize_sub_type(raw.get("property_type")), "price": int(price), "price_frequency": "", "Price qualifier": "", "Total floor area (sqm)": floor_area_sqm, "Listing URL": listing_url, "Listing features": [], "first_visible_date": "", } # --------------------------------------------------------------------------- # Top-level search function (called by scraper.py) # --------------------------------------------------------------------------- def search_outcode( page, outcode: str, pc_index: PostcodeSpatialIndex, pc_coords: dict[str, tuple[float, float]], max_properties: int | None = None, ) -> tuple[list[dict], str | None]: """Search Zoopla for properties in one outcode. Takes a live Camoufox Page (from launch_browser). Navigates through the search flow, extracts listings from rendered DOM, and transforms to the standard output schema. Returns (properties, search_url). Raises TurnstileError if Cloudflare blocks us mid-session. """ if not _navigate_search(page, outcode): return [], None total_results = _get_result_count(page) # Always try extraction even if result count is 0 — the count regex may # not match Zoopla's current text format, but listings may still be in DOM raw_listings = _paginate( page, max(total_results, 25), max_properties=max_properties, ) if not raw_listings: if total_results > 0: log.debug( "Zoopla %s %s: page claims %d results but extraction found 0 — " "DOM selectors may need updating", outcode, "BUY", total_results, ) return [], None properties = [] dropped = 0 for raw in raw_listings: transformed = transform_property(raw, pc_index, pc_coords, search_outcode=outcode) if transformed: properties.append(transformed) else: dropped += 1 if dropped and not properties: # Log a sample raw listing to diagnose which fields are missing sample = raw_listings[0] if raw_listings else {} log.debug( "Zoopla %s %s: extracted %d raw listings but all %d dropped in transform " "(no price/postcode/coords). Sample raw: price=%s address=%r", outcode, "BUY", len(raw_listings), dropped, sample.get("price"), sample.get("address", ""), ) elif dropped > len(raw_listings) // 2: log.debug( "Zoopla %s %s: %d/%d listings dropped in transform", outcode, "BUY", dropped, len(raw_listings), ) return properties, page.url