More fixes

This commit is contained in:
Andras Schmelczer 2026-03-18 22:46:08 +00:00
parent 15fa09430b
commit 6b12e21d50
54 changed files with 1665 additions and 630 deletions

View file

@ -1,15 +1,21 @@
#!/usr/bin/env -S uv run --project ../finder
"""Zoopla scraping experiment — Playwright with stealth + network interception.
"""Zoopla scraping experiment — working prototype using Camoufox.
Zoopla uses Next.js App Router with React Server Components. The listing data
is NOT in __NEXT_DATA__ or the initial HTML it's fetched client-side after
hydration. This means we need a real browser that:
1. Passes Cloudflare's bot detection
2. Executes JavaScript to trigger the client-side data fetch
3. Intercepts the network response OR scrapes the rendered DOM
Key findings:
- Zoopla uses Cloudflare Turnstile (managed interactive challenge)
- Playwright headless Chromium + stealth patches CANNOT beat it
- Camoufox (anti-fingerprinting Firefox fork) PASSES Cloudflare
- Zoopla uses Next.js App Router with React Server Components (RSC)
- Listing data is NOT in __NEXT_DATA__ it's server-rendered in RSC stream
- URL-based location slugs (e.g. /properties/london/) return 0 results
- Must use the search autocomplete (GraphQL: getGeoSuggestion) to resolve
a location, then submit the form to get results
- GraphQL endpoint: api-graphql-lambda.prod.zoopla.co.uk/graphql
- Listings loaded via getTopLeadListingIds + getRareFindLeadListingIds ops
Usage:
uv run --project finder scripts/zoopla_experiment.py [OUTCODE]
uv run --project finder scripts/zoopla_experiment.py [LOCATION]
uv run --project finder scripts/zoopla_experiment.py "Tower Hamlets"
"""
import json
@ -25,294 +31,250 @@ logging.basicConfig(
)
log = logging.getLogger("zoopla-exp")
ZOOPLA_BASE = "https://www.zoopla.co.uk"
CHANNELS = {
"BUY": "for-sale",
"RENT": "to-rent",
}
def scrape_zoopla(location: str = "London", channel: str = "BUY"):
from camoufox.sync_api import Camoufox
tab_label = "Buy" if channel == "BUY" else "Rent"
log.info("Scraping Zoopla: location=%s channel=%s", location, channel)
def run_playwright_stealth(outcode: str, channel: str = "BUY"):
"""Use Playwright with stealth patches to scrape Zoopla.
with Camoufox(headless=True) as browser:
page = browser.new_page()
Strategy:
1. Launch stealth browser to bypass Cloudflare
2. Navigate to search page
3. Wait for listings to render (client-side hydration)
4. Try two extraction methods:
a. Intercept network requests for API data (cleanest)
b. Parse the rendered DOM (fallback)
"""
from playwright.sync_api import sync_playwright
from playwright_stealth import Stealth
# Intercept GraphQL responses
graphql_responses = []
url_segment = CHANNELS[channel]
search_url = f"{ZOOPLA_BASE}/{url_segment}/properties/{outcode.lower()}/"
log.info("Target: %s", search_url)
intercepted_data = []
def handle_response(response):
"""Capture any API responses that look like listing data."""
url = response.url
# Look for API/data endpoints
if any(kw in url for kw in ["/api/", "graphql", "search", "listing", "property"]):
try:
if "application/json" in (response.headers.get("content-type", "")):
def on_resp(response):
url = response.url
ct = response.headers.get("content-type", "")
if "json" in ct and "graphql" in url:
try:
body = response.json()
intercepted_data.append({"url": url, "data": body})
log.info(" [intercepted] %s (%s)", url[:100], type(body).__name__)
req = response.request.post_data or ""
graphql_responses.append({"body": body, "req": req})
except Exception:
pass
page.on("response", on_resp)
# Step 1: Load homepage and pass Cloudflare
log.info("Loading Zoopla homepage...")
page.goto("https://www.zoopla.co.uk/", wait_until="domcontentloaded", timeout=60000)
for i in range(20):
if "Just a moment" not in page.title():
break
time.sleep(3)
else:
log.error("Cloudflare did not resolve after 60s")
return []
log.info("Homepage loaded: %s", page.title())
time.sleep(3)
# Step 2: Dismiss cookie consent (shadow DOM)
page.evaluate("""() => {
const aside = document.querySelector('#usercentrics-cmp-ui');
if (aside && aside.shadowRoot) {
const btns = aside.shadowRoot.querySelectorAll('button');
for (const btn of btns) {
if (btn.innerText.includes('Accept')) { btn.click(); return; }
}
}
aside?.remove();
}""")
time.sleep(2)
# Step 3: Select Buy/Rent tab if needed
if channel == "RENT":
rent_tab = page.query_selector('button:has-text("Rent")') or page.query_selector(f'[role="tab"]:has-text("{tab_label}")')
if rent_tab:
rent_tab.click()
time.sleep(1)
# Step 4: Type location into search and select autocomplete suggestion
log.info("Searching for '%s'...", location)
search_input = (
page.query_selector('input[name="autosuggest-input"]')
or page.query_selector('input[type="text"]')
)
if not search_input:
log.error("Could not find search input")
return []
search_input.click()
time.sleep(0.5)
search_input.fill("") # Clear any existing text
search_input.type(location, delay=80)
time.sleep(3)
# Select first autocomplete suggestion
first_option = page.query_selector('[role="option"]')
if first_option:
suggestion_text = first_option.inner_text()
log.info("Selecting suggestion: %s", suggestion_text)
first_option.click()
time.sleep(1)
else:
log.warning("No autocomplete suggestions appeared")
# Step 5: Submit search
search_btn = page.query_selector('button:has-text("Search")')
if search_btn:
search_btn.click()
else:
search_input.press("Enter")
log.info("Waiting for results...")
time.sleep(10)
final_url = page.url
final_title = page.title()
log.info("URL: %s", final_url)
log.info("Title: %s", final_title)
# Step 6: Extract listings from rendered DOM
listings = page.evaluate(r"""() => {
const links = Array.from(document.querySelectorAll(
'a[href*="/for-sale/details/"], a[href*="/new-homes/details/"], a[href*="/to-rent/details/"]'
));
const seen = new Set();
const results = [];
for (const link of links) {
const href = link.href;
const match = href.match(/\/details\/(\d+)\//);
if (!match) continue;
const id = match[1];
if (seen.has(id)) continue;
seen.add(id);
// Walk up to find the listing card container
let card = link;
for (let j = 0; j < 10; j++) {
card = card.parentElement;
if (!card) break;
const text = card.innerText || '';
// A listing card should have a price and at least beds or area
if (text.includes('£') && (text.includes('bed') || text.includes('sq ft'))) {
break;
}
}
if (!card) continue;
const text = card.innerText || '';
const lines = text.split('\n').map(l => l.trim()).filter(Boolean);
const priceMatch = text.match(/£([\d,]+)/);
const bedsMatch = text.match(/(\d+)\s*beds?/i);
const bathsMatch = text.match(/(\d+)\s*baths?/i);
const recMatch = text.match(/(\d+)\s*reception/i);
const areaMatch = text.match(/([\d,]+)\s*sq\s*ft/i);
// Try to find address usually a line with a postcode or comma-separated location
let address = '';
for (const line of lines) {
if (/[A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2}/i.test(line) ||
(line.includes(',') && !line.includes('£') && !line.match(/^\d+ beds?/i))) {
address = line;
break;
}
}
// Tenure
let tenure = '';
if (/freehold/i.test(text)) tenure = 'Freehold';
else if (/leasehold/i.test(text)) tenure = 'Leasehold';
results.push({
id: id,
url: href.replace(window.location.origin, ''),
price: priceMatch ? parseInt(priceMatch[1].replace(/,/g, '')) : null,
beds: bedsMatch ? parseInt(bedsMatch[1]) : null,
baths: bathsMatch ? parseInt(bathsMatch[1]) : null,
receptions: recMatch ? parseInt(recMatch[1]) : null,
floor_area_sqft: areaMatch ? parseInt(areaMatch[1].replace(/,/g, '')) : null,
address: address,
tenure: tenure,
text_preview: lines.slice(0, 10).join(' | '),
});
}
return results;
}""")
log.info("Extracted %d unique listings from page 1", len(listings))
# Step 7: Check for results count and pagination
body_text = page.inner_text("body")
count_match = re.search(r"([\d,]+)\s+results?", body_text)
total_results = int(count_match.group(1).replace(",", "")) if count_match else len(listings)
log.info("Total results: %d", total_results)
# Step 8: Log GraphQL operations we saw
log.info("GraphQL operations intercepted:")
for gql in graphql_responses:
try:
req = json.loads(gql["req"])
op = req.get("operationName", "?")
log.info(" - %s", op)
except Exception:
pass
with sync_playwright() as p:
# Launch with stealth-friendly args
browser = p.chromium.launch(
headless=True,
args=[
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-web-security",
"--lang=en-GB",
],
)
context = browser.new_context(
locale="en-GB",
timezone_id="Europe/London",
viewport={"width": 1920, "height": 1080},
user_agent=(
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
),
)
page = context.new_page()
# Apply stealth patches (Linux platform, Chrome UA)
stealth = Stealth(
navigator_platform_override="Linux x86_64",
navigator_languages_override=("en-GB", "en"),
)
stealth.apply_stealth_sync(page)
# Listen for responses to intercept API data
page.on("response", handle_response)
# Navigate
log.info("Navigating to %s ...", search_url)
try:
page.goto(search_url, wait_until="domcontentloaded", timeout=60000)
except Exception as e:
log.error("Navigation failed: %s", e)
browser.close()
return
# Wait for Cloudflare to resolve
log.info("Waiting for Cloudflare challenge to resolve ...")
for attempt in range(20):
content = page.content()
title = page.title()
if "Just a moment" in content and "challenge" in content.lower():
log.info(" Cloudflare challenge still active (%d/20) title=%s", attempt + 1, title)
time.sleep(3)
else:
log.info(" Challenge resolved! title=%s", title)
break
else:
log.error("Cloudflare challenge did not resolve")
# Dump page content for debugging
print("\n=== Cloudflare challenge page ===")
print(page.content()[:3000])
browser.close()
return
# Wait for actual content to render
log.info("Waiting for listing content to render ...")
try:
# Try waiting for property cards to appear
page.wait_for_selector(
'[data-testid="search-result"], [data-testid="regular-listings"], '
'.listing-results, .css-kdnlof, [class*="ListingCard"], '
'[class*="listing"], [class*="PropertyCard"]',
timeout=15000,
)
log.info("Listing elements found in DOM!")
except Exception:
log.warning("No listing elements found by selector. Trying to wait for prices...")
try:
page.wait_for_function(
"document.querySelectorAll('a[href*=\"/for-sale/details/\"]').length > 0",
timeout=15000,
)
log.info("Listing links found in DOM!")
except Exception:
log.warning("No listing links either. Page may still be loading or we're blocked.")
# Give hydration a moment
time.sleep(3)
# --- Extraction Method A: Check intercepted network data ---
if intercepted_data:
print(f"\n=== Intercepted {len(intercepted_data)} API responses ===")
for item in intercepted_data:
print(f"\nURL: {item['url'][:150]}")
data = item["data"]
if isinstance(data, dict):
print(f"Keys: {list(data.keys())[:15]}")
# Look for listings inside
for k, v in data.items():
if isinstance(v, list) and len(v) > 2 and isinstance(v[0], dict):
print(f" {k}: list of {len(v)} items, [0] keys={list(v[0].keys())[:10]}")
elif isinstance(data, list) and data:
print(f"Array of {len(data)} items")
if isinstance(data[0], dict):
print(f" [0] keys: {list(data[0].keys())[:15]}")
print(json.dumps(data, indent=2, default=str, ensure_ascii=False)[:2000])
# --- Extraction Method B: Parse rendered DOM ---
log.info("Extracting from rendered DOM ...")
# Get full page content after hydration
content = page.content()
# Find listing URLs
listing_urls = re.findall(r'href="(/for-sale/details/\d+/[^"]*)"', content)
log.info("Found %d listing detail links", len(listing_urls))
# Find prices
prices = re.findall(r'£([\d,]+)', content)
log.info("Found %d price strings", len(prices))
if prices:
log.info("Prices: %s", prices[:10])
# Try to extract structured listing data from the page
listings = page.evaluate("""() => {
// Try to find listing cards via various selectors
const selectors = [
'[data-testid="search-result"]',
'[data-testid="regular-listings"] > div',
'a[href*="/for-sale/details/"]',
'[class*="ListingCard"]',
'[class*="listing-result"]',
];
for (const sel of selectors) {
const elements = document.querySelectorAll(sel);
if (elements.length > 2) {
return {
selector: sel,
count: elements.length,
// Get text and href from first 3
samples: Array.from(elements).slice(0, 3).map(el => ({
text: el.innerText?.substring(0, 300),
href: el.href || el.querySelector('a')?.href || '',
html: el.outerHTML?.substring(0, 500),
}))
};
}
}
// Fallback: find all links to listing detail pages
const links = Array.from(document.querySelectorAll('a[href*="/details/"]'));
if (links.length > 0) {
return {
selector: 'a[href*="/details/"]',
count: links.length,
samples: links.slice(0, 5).map(el => ({
text: el.innerText?.substring(0, 300),
href: el.href,
parentText: el.closest('div, li, article')?.innerText?.substring(0, 500) || '',
}))
};
}
// Last resort: get page structure
return {
selector: 'none',
count: 0,
bodyText: document.body?.innerText?.substring(0, 2000),
title: document.title,
};
}""")
print(f"\n=== DOM Extraction Results ===")
print(json.dumps(listings, indent=2, ensure_ascii=False)[:5000])
# Also extract cookies for potential reuse
cookies = context.cookies()
zoopla_cookies = {c["name"]: c["value"] for c in cookies if ".zoopla.co.uk" in c.get("domain", "")}
# Step 9: Extract cookies for potential curl_cffi reuse
cookies = page.context.cookies()
session_cookies = {
c["name"]: c["value"]
for c in cookies
if "zoopla" in c.get("domain", "") or "cf" in c.get("name", "").lower()
}
ua = page.evaluate("navigator.userAgent")
print(f"\n=== Session Info ===")
print(f"Cookies ({len(zoopla_cookies)}): {list(zoopla_cookies.keys())}")
print(f"User-Agent: {ua}")
if zoopla_cookies:
# Save cookies for reuse
print(f"\n=== Reusable cookie env vars ===")
for name, value in zoopla_cookies.items():
print(f" {name}={value[:50]}...")
# --- Try a detail page if we found any listing URLs ---
if listing_urls:
detail_path = listing_urls[0]
detail_url = f"{ZOOPLA_BASE}{detail_path}"
log.info("--- Fetching detail page: %s ---", detail_url)
time.sleep(2)
page.goto(detail_url, wait_until="domcontentloaded", timeout=30000)
time.sleep(5) # Let it hydrate
detail = page.evaluate("""() => {
const result = {};
// Price
const priceEl = document.querySelector('[data-testid="price"]')
|| document.querySelector('[class*="price"]');
result.price = priceEl?.innerText || '';
// Address
const addrEl = document.querySelector('[data-testid="address-label"]')
|| document.querySelector('h1') || document.querySelector('address');
result.address = addrEl?.innerText || '';
// Key features
const features = Array.from(document.querySelectorAll('[data-testid="listing_feature"] li, [class*="feature"] li'));
result.features = features.map(f => f.innerText).slice(0, 15);
// Bedrooms/bathrooms from icons or text
const specs = document.querySelectorAll('[data-testid="beds-label"], [data-testid="baths-label"], [class*="bed"], [class*="bath"]');
result.specs = Array.from(specs).map(s => s.innerText).slice(0, 5);
// Description
const desc = document.querySelector('[data-testid="listing_description"], [class*="description"]');
result.description = desc?.innerText?.substring(0, 500) || '';
// Agent
const agent = document.querySelector('[data-testid="agent-details"], [class*="agent"]');
result.agent = agent?.innerText?.substring(0, 200) || '';
// Full page text summary
result.pageTitle = document.title;
result.bodyPreview = document.body?.innerText?.substring(0, 1000);
return result;
}""")
print(f"\n=== Detail Page Data ===")
print(json.dumps(detail, indent=2, ensure_ascii=False)[:3000])
browser.close()
return {
"url": final_url,
"title": final_title,
"total_results": total_results,
"listings": listings,
"cookies": session_cookies,
"user_agent": ua,
}
def main():
outcode = sys.argv[1] if len(sys.argv) > 1 else "E1"
channel = "BUY"
log.info("=== Zoopla Scraping Experiment (Playwright Stealth) ===")
log.info("Outcode: %s, Channel: %s", outcode, channel)
run_playwright_stealth(outcode, channel)
log.info("=== Done ===")
location = sys.argv[1] if len(sys.argv) > 1 else "London"
result = scrape_zoopla(location, channel="BUY")
if not result:
log.error("Scraping failed")
sys.exit(1)
listings = result["listings"]
print(f"\n{'='*60}")
print(f" Zoopla: {result['title']}")
print(f" URL: {result['url']}")
print(f" Total: {result['total_results']} results, {len(listings)} extracted")
print(f"{'='*60}\n")
for i, listing in enumerate(listings):
print(f"--- Listing {i+1}: {listing['url']} ---")
display = {k: v for k, v in listing.items() if k != "text_preview" and v}
print(json.dumps(display, indent=2, ensure_ascii=False))
print()
# Summary stats
prices = [l["price"] for l in listings if l["price"]]
beds = [l["beds"] for l in listings if l["beds"]]
if prices:
print(f"Price range: £{min(prices):,} - £{max(prices):,}")
print(f"Median: £{sorted(prices)[len(prices)//2]:,}")
if beds:
print(f"Bedrooms: {min(beds)}-{max(beds)}")
# Cookie info for reuse
print(f"\nSession cookies ({len(result['cookies'])} cookies)")
print(f"User-Agent: {result['user_agent']}")
if __name__ == "__main__":