#!/usr/bin/env -S uv run --project ../finder """Zoopla scraping experiment — working prototype using Camoufox. Key findings: - Zoopla uses Cloudflare Turnstile (managed interactive challenge) - Playwright headless Chromium + stealth patches CANNOT beat it - Camoufox (anti-fingerprinting Firefox fork) PASSES Cloudflare - Zoopla uses Next.js App Router with React Server Components (RSC) - Listing data is NOT in __NEXT_DATA__ — it's server-rendered in RSC stream - URL-based location slugs (e.g. /properties/london/) return 0 results - Must use the search autocomplete (GraphQL: getGeoSuggestion) to resolve a location, then submit the form to get results - GraphQL endpoint: api-graphql-lambda.prod.zoopla.co.uk/graphql - Listings loaded via getTopLeadListingIds + getRareFindLeadListingIds ops Usage: uv run --project finder scripts/zoopla_experiment.py [LOCATION] uv run --project finder scripts/zoopla_experiment.py "Tower Hamlets" """ import json import logging import re import sys import time logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger("zoopla-exp") def scrape_zoopla(location: str = "London", channel: str = "BUY"): from camoufox.sync_api import Camoufox tab_label = "Buy" if channel == "BUY" else "Rent" log.info("Scraping Zoopla: location=%s channel=%s", location, channel) with Camoufox(headless=True) as browser: page = browser.new_page() # Intercept GraphQL responses graphql_responses = [] def on_resp(response): url = response.url ct = response.headers.get("content-type", "") if "json" in ct and "graphql" in url: try: body = response.json() req = response.request.post_data or "" graphql_responses.append({"body": body, "req": req}) except Exception: pass page.on("response", on_resp) # Step 1: Load homepage and pass Cloudflare log.info("Loading Zoopla homepage...") page.goto("https://www.zoopla.co.uk/", wait_until="domcontentloaded", timeout=60000) for i in range(20): if "Just a moment" not in page.title(): break time.sleep(3) else: log.error("Cloudflare did not resolve after 60s") return [] log.info("Homepage loaded: %s", page.title()) time.sleep(3) # Step 2: Dismiss cookie consent (shadow DOM) page.evaluate("""() => { const aside = document.querySelector('#usercentrics-cmp-ui'); if (aside && aside.shadowRoot) { const btns = aside.shadowRoot.querySelectorAll('button'); for (const btn of btns) { if (btn.innerText.includes('Accept')) { btn.click(); return; } } } aside?.remove(); }""") time.sleep(2) # Step 3: Select Buy/Rent tab if needed if channel == "RENT": rent_tab = page.query_selector('button:has-text("Rent")') or page.query_selector(f'[role="tab"]:has-text("{tab_label}")') if rent_tab: rent_tab.click() time.sleep(1) # Step 4: Type location into search and select autocomplete suggestion log.info("Searching for '%s'...", location) search_input = ( page.query_selector('input[name="autosuggest-input"]') or page.query_selector('input[type="text"]') ) if not search_input: log.error("Could not find search input") return [] search_input.click() time.sleep(0.5) search_input.fill("") # Clear any existing text search_input.type(location, delay=80) time.sleep(3) # Select first autocomplete suggestion first_option = page.query_selector('[role="option"]') if first_option: suggestion_text = first_option.inner_text() log.info("Selecting suggestion: %s", suggestion_text) first_option.click() time.sleep(1) else: log.warning("No autocomplete suggestions appeared") # Step 5: Submit search search_btn = page.query_selector('button:has-text("Search")') if search_btn: search_btn.click() else: search_input.press("Enter") log.info("Waiting for results...") time.sleep(10) final_url = page.url final_title = page.title() log.info("URL: %s", final_url) log.info("Title: %s", final_title) # Step 6: Extract listings from rendered DOM listings = page.evaluate(r"""() => { const links = Array.from(document.querySelectorAll( 'a[href*="/for-sale/details/"], a[href*="/new-homes/details/"], a[href*="/to-rent/details/"]' )); const seen = new Set(); const results = []; for (const link of links) { const href = link.href; const match = href.match(/\/details\/(\d+)\//); if (!match) continue; const id = match[1]; if (seen.has(id)) continue; seen.add(id); // Walk up to find the listing card container let card = link; for (let j = 0; j < 10; j++) { card = card.parentElement; if (!card) break; const text = card.innerText || ''; // A listing card should have a price and at least beds or area if (text.includes('£') && (text.includes('bed') || text.includes('sq ft'))) { break; } } if (!card) continue; const text = card.innerText || ''; const lines = text.split('\n').map(l => l.trim()).filter(Boolean); const priceMatch = text.match(/£([\d,]+)/); const bedsMatch = text.match(/(\d+)\s*beds?/i); const bathsMatch = text.match(/(\d+)\s*baths?/i); const recMatch = text.match(/(\d+)\s*reception/i); const areaMatch = text.match(/([\d,]+)\s*sq\s*ft/i); // Try to find address — usually a line with a postcode or comma-separated location let address = ''; for (const line of lines) { if (/[A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2}/i.test(line) || (line.includes(',') && !line.includes('£') && !line.match(/^\d+ beds?/i))) { address = line; break; } } // Tenure let tenure = ''; if (/freehold/i.test(text)) tenure = 'Freehold'; else if (/leasehold/i.test(text)) tenure = 'Leasehold'; results.push({ id: id, url: href.replace(window.location.origin, ''), price: priceMatch ? parseInt(priceMatch[1].replace(/,/g, '')) : null, beds: bedsMatch ? parseInt(bedsMatch[1]) : null, baths: bathsMatch ? parseInt(bathsMatch[1]) : null, receptions: recMatch ? parseInt(recMatch[1]) : null, floor_area_sqft: areaMatch ? parseInt(areaMatch[1].replace(/,/g, '')) : null, address: address, tenure: tenure, text_preview: lines.slice(0, 10).join(' | '), }); } return results; }""") log.info("Extracted %d unique listings from page 1", len(listings)) # Step 7: Check for results count and pagination body_text = page.inner_text("body") count_match = re.search(r"([\d,]+)\s+results?", body_text) total_results = int(count_match.group(1).replace(",", "")) if count_match else len(listings) log.info("Total results: %d", total_results) # Step 8: Log GraphQL operations we saw log.info("GraphQL operations intercepted:") for gql in graphql_responses: try: req = json.loads(gql["req"]) op = req.get("operationName", "?") log.info(" - %s", op) except Exception: pass # Step 9: Extract cookies for potential curl_cffi reuse cookies = page.context.cookies() session_cookies = { c["name"]: c["value"] for c in cookies if "zoopla" in c.get("domain", "") or "cf" in c.get("name", "").lower() } ua = page.evaluate("navigator.userAgent") return { "url": final_url, "title": final_title, "total_results": total_results, "listings": listings, "cookies": session_cookies, "user_agent": ua, } def main(): location = sys.argv[1] if len(sys.argv) > 1 else "London" result = scrape_zoopla(location, channel="BUY") if not result: log.error("Scraping failed") sys.exit(1) listings = result["listings"] print(f"\n{'='*60}") print(f" Zoopla: {result['title']}") print(f" URL: {result['url']}") print(f" Total: {result['total_results']} results, {len(listings)} extracted") print(f"{'='*60}\n") for i, listing in enumerate(listings): print(f"--- Listing {i+1}: {listing['url']} ---") display = {k: v for k, v in listing.items() if k != "text_preview" and v} print(json.dumps(display, indent=2, ensure_ascii=False)) print() # Summary stats prices = [item["price"] for item in listings if item["price"]] beds = [item["beds"] for item in listings if item["beds"]] if prices: print(f"Price range: £{min(prices):,} - £{max(prices):,}") print(f"Median: £{sorted(prices)[len(prices)//2]:,}") if beds: print(f"Bedrooms: {min(beds)}-{max(beds)}") # Cookie info for reuse print(f"\nSession cookies ({len(result['cookies'])} cookies)") print(f"User-Agent: {result['user_agent']}") if __name__ == "__main__": main()