perfect-postcode/scripts/zoopla_experiment.py

281 lines
10 KiB
Python
Executable file

#!/usr/bin/env -S uv run --project ../finder
"""Zoopla scraping experiment — working prototype using Camoufox.
Key findings:
- Zoopla uses Cloudflare Turnstile (managed interactive challenge)
- Playwright headless Chromium + stealth patches CANNOT beat it
- Camoufox (anti-fingerprinting Firefox fork) PASSES Cloudflare
- Zoopla uses Next.js App Router with React Server Components (RSC)
- Listing data is NOT in __NEXT_DATA__ — it's server-rendered in RSC stream
- URL-based location slugs (e.g. /properties/london/) return 0 results
- Must use the search autocomplete (GraphQL: getGeoSuggestion) to resolve
a location, then submit the form to get results
- GraphQL endpoint: api-graphql-lambda.prod.zoopla.co.uk/graphql
- Listings loaded via getTopLeadListingIds + getRareFindLeadListingIds ops
Usage:
uv run --project finder scripts/zoopla_experiment.py [LOCATION]
uv run --project finder scripts/zoopla_experiment.py "Tower Hamlets"
"""
import json
import logging
import re
import sys
import time
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("zoopla-exp")
def scrape_zoopla(location: str = "London", channel: str = "BUY"):
from camoufox.sync_api import Camoufox
tab_label = "Buy" if channel == "BUY" else "Rent"
log.info("Scraping Zoopla: location=%s channel=%s", location, channel)
with Camoufox(headless=True) as browser:
page = browser.new_page()
# Intercept GraphQL responses
graphql_responses = []
def on_resp(response):
url = response.url
ct = response.headers.get("content-type", "")
if "json" in ct and "graphql" in url:
try:
body = response.json()
req = response.request.post_data or ""
graphql_responses.append({"body": body, "req": req})
except Exception:
pass
page.on("response", on_resp)
# Step 1: Load homepage and pass Cloudflare
log.info("Loading Zoopla homepage...")
page.goto("https://www.zoopla.co.uk/", wait_until="domcontentloaded", timeout=60000)
for i in range(20):
if "Just a moment" not in page.title():
break
time.sleep(3)
else:
log.error("Cloudflare did not resolve after 60s")
return []
log.info("Homepage loaded: %s", page.title())
time.sleep(3)
# Step 2: Dismiss cookie consent (shadow DOM)
page.evaluate("""() => {
const aside = document.querySelector('#usercentrics-cmp-ui');
if (aside && aside.shadowRoot) {
const btns = aside.shadowRoot.querySelectorAll('button');
for (const btn of btns) {
if (btn.innerText.includes('Accept')) { btn.click(); return; }
}
}
aside?.remove();
}""")
time.sleep(2)
# Step 3: Select Buy/Rent tab if needed
if channel == "RENT":
rent_tab = page.query_selector('button:has-text("Rent")') or page.query_selector(f'[role="tab"]:has-text("{tab_label}")')
if rent_tab:
rent_tab.click()
time.sleep(1)
# Step 4: Type location into search and select autocomplete suggestion
log.info("Searching for '%s'...", location)
search_input = (
page.query_selector('input[name="autosuggest-input"]')
or page.query_selector('input[type="text"]')
)
if not search_input:
log.error("Could not find search input")
return []
search_input.click()
time.sleep(0.5)
search_input.fill("") # Clear any existing text
search_input.type(location, delay=80)
time.sleep(3)
# Select first autocomplete suggestion
first_option = page.query_selector('[role="option"]')
if first_option:
suggestion_text = first_option.inner_text()
log.info("Selecting suggestion: %s", suggestion_text)
first_option.click()
time.sleep(1)
else:
log.warning("No autocomplete suggestions appeared")
# Step 5: Submit search
search_btn = page.query_selector('button:has-text("Search")')
if search_btn:
search_btn.click()
else:
search_input.press("Enter")
log.info("Waiting for results...")
time.sleep(10)
final_url = page.url
final_title = page.title()
log.info("URL: %s", final_url)
log.info("Title: %s", final_title)
# Step 6: Extract listings from rendered DOM
listings = page.evaluate(r"""() => {
const links = Array.from(document.querySelectorAll(
'a[href*="/for-sale/details/"], a[href*="/new-homes/details/"], a[href*="/to-rent/details/"]'
));
const seen = new Set();
const results = [];
for (const link of links) {
const href = link.href;
const match = href.match(/\/details\/(\d+)\//);
if (!match) continue;
const id = match[1];
if (seen.has(id)) continue;
seen.add(id);
// Walk up to find the listing card container
let card = link;
for (let j = 0; j < 10; j++) {
card = card.parentElement;
if (!card) break;
const text = card.innerText || '';
// A listing card should have a price and at least beds or area
if (text.includes('£') && (text.includes('bed') || text.includes('sq ft'))) {
break;
}
}
if (!card) continue;
const text = card.innerText || '';
const lines = text.split('\n').map(l => l.trim()).filter(Boolean);
const priceMatch = text.match(/£([\d,]+)/);
const bedsMatch = text.match(/(\d+)\s*beds?/i);
const bathsMatch = text.match(/(\d+)\s*baths?/i);
const recMatch = text.match(/(\d+)\s*reception/i);
const areaMatch = text.match(/([\d,]+)\s*sq\s*ft/i);
// Try to find address — usually a line with a postcode or comma-separated location
let address = '';
for (const line of lines) {
if (/[A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2}/i.test(line) ||
(line.includes(',') && !line.includes('£') && !line.match(/^\d+ beds?/i))) {
address = line;
break;
}
}
// Tenure
let tenure = '';
if (/freehold/i.test(text)) tenure = 'Freehold';
else if (/leasehold/i.test(text)) tenure = 'Leasehold';
results.push({
id: id,
url: href.replace(window.location.origin, ''),
price: priceMatch ? parseInt(priceMatch[1].replace(/,/g, '')) : null,
beds: bedsMatch ? parseInt(bedsMatch[1]) : null,
baths: bathsMatch ? parseInt(bathsMatch[1]) : null,
receptions: recMatch ? parseInt(recMatch[1]) : null,
floor_area_sqft: areaMatch ? parseInt(areaMatch[1].replace(/,/g, '')) : null,
address: address,
tenure: tenure,
text_preview: lines.slice(0, 10).join(' | '),
});
}
return results;
}""")
log.info("Extracted %d unique listings from page 1", len(listings))
# Step 7: Check for results count and pagination
body_text = page.inner_text("body")
count_match = re.search(r"([\d,]+)\s+results?", body_text)
total_results = int(count_match.group(1).replace(",", "")) if count_match else len(listings)
log.info("Total results: %d", total_results)
# Step 8: Log GraphQL operations we saw
log.info("GraphQL operations intercepted:")
for gql in graphql_responses:
try:
req = json.loads(gql["req"])
op = req.get("operationName", "?")
log.info(" - %s", op)
except Exception:
pass
# Step 9: Extract cookies for potential curl_cffi reuse
cookies = page.context.cookies()
session_cookies = {
c["name"]: c["value"]
for c in cookies
if "zoopla" in c.get("domain", "") or "cf" in c.get("name", "").lower()
}
ua = page.evaluate("navigator.userAgent")
return {
"url": final_url,
"title": final_title,
"total_results": total_results,
"listings": listings,
"cookies": session_cookies,
"user_agent": ua,
}
def main():
location = sys.argv[1] if len(sys.argv) > 1 else "London"
result = scrape_zoopla(location, channel="BUY")
if not result:
log.error("Scraping failed")
sys.exit(1)
listings = result["listings"]
print(f"\n{'='*60}")
print(f" Zoopla: {result['title']}")
print(f" URL: {result['url']}")
print(f" Total: {result['total_results']} results, {len(listings)} extracted")
print(f"{'='*60}\n")
for i, listing in enumerate(listings):
print(f"--- Listing {i+1}: {listing['url']} ---")
display = {k: v for k, v in listing.items() if k != "text_preview" and v}
print(json.dumps(display, indent=2, ensure_ascii=False))
print()
# Summary stats
prices = [item["price"] for item in listings if item["price"]]
beds = [item["beds"] for item in listings if item["beds"]]
if prices:
print(f"Price range: £{min(prices):,} - £{max(prices):,}")
print(f"Median: £{sorted(prices)[len(prices)//2]:,}")
if beds:
print(f"Bedrooms: {min(beds)}-{max(beds)}")
# Cookie info for reuse
print(f"\nSession cookies ({len(result['cookies'])} cookies)")
print(f"User-Agent: {result['user_agent']}")
if __name__ == "__main__":
main()