319 lines
12 KiB
Python
Executable file
319 lines
12 KiB
Python
Executable file
#!/usr/bin/env -S uv run --project ../finder
|
|
"""Zoopla scraping experiment — Playwright with stealth + network interception.
|
|
|
|
Zoopla uses Next.js App Router with React Server Components. The listing data
|
|
is NOT in __NEXT_DATA__ or the initial HTML — it's fetched client-side after
|
|
hydration. This means we need a real browser that:
|
|
1. Passes Cloudflare's bot detection
|
|
2. Executes JavaScript to trigger the client-side data fetch
|
|
3. Intercepts the network response OR scrapes the rendered DOM
|
|
|
|
Usage:
|
|
uv run --project finder scripts/zoopla_experiment.py [OUTCODE]
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import sys
|
|
import time
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)-8s %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
log = logging.getLogger("zoopla-exp")
|
|
|
|
ZOOPLA_BASE = "https://www.zoopla.co.uk"
|
|
|
|
CHANNELS = {
|
|
"BUY": "for-sale",
|
|
"RENT": "to-rent",
|
|
}
|
|
|
|
|
|
def run_playwright_stealth(outcode: str, channel: str = "BUY"):
|
|
"""Use Playwright with stealth patches to scrape Zoopla.
|
|
|
|
Strategy:
|
|
1. Launch stealth browser to bypass Cloudflare
|
|
2. Navigate to search page
|
|
3. Wait for listings to render (client-side hydration)
|
|
4. Try two extraction methods:
|
|
a. Intercept network requests for API data (cleanest)
|
|
b. Parse the rendered DOM (fallback)
|
|
"""
|
|
from playwright.sync_api import sync_playwright
|
|
from playwright_stealth import Stealth
|
|
|
|
url_segment = CHANNELS[channel]
|
|
search_url = f"{ZOOPLA_BASE}/{url_segment}/properties/{outcode.lower()}/"
|
|
log.info("Target: %s", search_url)
|
|
|
|
intercepted_data = []
|
|
|
|
def handle_response(response):
|
|
"""Capture any API responses that look like listing data."""
|
|
url = response.url
|
|
# Look for API/data endpoints
|
|
if any(kw in url for kw in ["/api/", "graphql", "search", "listing", "property"]):
|
|
try:
|
|
if "application/json" in (response.headers.get("content-type", "")):
|
|
body = response.json()
|
|
intercepted_data.append({"url": url, "data": body})
|
|
log.info(" [intercepted] %s (%s)", url[:100], type(body).__name__)
|
|
except Exception:
|
|
pass
|
|
|
|
with sync_playwright() as p:
|
|
# Launch with stealth-friendly args
|
|
browser = p.chromium.launch(
|
|
headless=True,
|
|
args=[
|
|
"--disable-blink-features=AutomationControlled",
|
|
"--no-sandbox",
|
|
"--disable-dev-shm-usage",
|
|
"--disable-web-security",
|
|
"--lang=en-GB",
|
|
],
|
|
)
|
|
context = browser.new_context(
|
|
locale="en-GB",
|
|
timezone_id="Europe/London",
|
|
viewport={"width": 1920, "height": 1080},
|
|
user_agent=(
|
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
|
|
),
|
|
)
|
|
page = context.new_page()
|
|
|
|
# Apply stealth patches (Linux platform, Chrome UA)
|
|
stealth = Stealth(
|
|
navigator_platform_override="Linux x86_64",
|
|
navigator_languages_override=("en-GB", "en"),
|
|
)
|
|
stealth.apply_stealth_sync(page)
|
|
|
|
# Listen for responses to intercept API data
|
|
page.on("response", handle_response)
|
|
|
|
# Navigate
|
|
log.info("Navigating to %s ...", search_url)
|
|
try:
|
|
page.goto(search_url, wait_until="domcontentloaded", timeout=60000)
|
|
except Exception as e:
|
|
log.error("Navigation failed: %s", e)
|
|
browser.close()
|
|
return
|
|
|
|
# Wait for Cloudflare to resolve
|
|
log.info("Waiting for Cloudflare challenge to resolve ...")
|
|
for attempt in range(20):
|
|
content = page.content()
|
|
title = page.title()
|
|
if "Just a moment" in content and "challenge" in content.lower():
|
|
log.info(" Cloudflare challenge still active (%d/20) title=%s", attempt + 1, title)
|
|
time.sleep(3)
|
|
else:
|
|
log.info(" Challenge resolved! title=%s", title)
|
|
break
|
|
else:
|
|
log.error("Cloudflare challenge did not resolve")
|
|
# Dump page content for debugging
|
|
print("\n=== Cloudflare challenge page ===")
|
|
print(page.content()[:3000])
|
|
browser.close()
|
|
return
|
|
|
|
# Wait for actual content to render
|
|
log.info("Waiting for listing content to render ...")
|
|
try:
|
|
# Try waiting for property cards to appear
|
|
page.wait_for_selector(
|
|
'[data-testid="search-result"], [data-testid="regular-listings"], '
|
|
'.listing-results, .css-kdnlof, [class*="ListingCard"], '
|
|
'[class*="listing"], [class*="PropertyCard"]',
|
|
timeout=15000,
|
|
)
|
|
log.info("Listing elements found in DOM!")
|
|
except Exception:
|
|
log.warning("No listing elements found by selector. Trying to wait for prices...")
|
|
try:
|
|
page.wait_for_function(
|
|
"document.querySelectorAll('a[href*=\"/for-sale/details/\"]').length > 0",
|
|
timeout=15000,
|
|
)
|
|
log.info("Listing links found in DOM!")
|
|
except Exception:
|
|
log.warning("No listing links either. Page may still be loading or we're blocked.")
|
|
|
|
# Give hydration a moment
|
|
time.sleep(3)
|
|
|
|
# --- Extraction Method A: Check intercepted network data ---
|
|
if intercepted_data:
|
|
print(f"\n=== Intercepted {len(intercepted_data)} API responses ===")
|
|
for item in intercepted_data:
|
|
print(f"\nURL: {item['url'][:150]}")
|
|
data = item["data"]
|
|
if isinstance(data, dict):
|
|
print(f"Keys: {list(data.keys())[:15]}")
|
|
# Look for listings inside
|
|
for k, v in data.items():
|
|
if isinstance(v, list) and len(v) > 2 and isinstance(v[0], dict):
|
|
print(f" {k}: list of {len(v)} items, [0] keys={list(v[0].keys())[:10]}")
|
|
elif isinstance(data, list) and data:
|
|
print(f"Array of {len(data)} items")
|
|
if isinstance(data[0], dict):
|
|
print(f" [0] keys: {list(data[0].keys())[:15]}")
|
|
print(json.dumps(data, indent=2, default=str, ensure_ascii=False)[:2000])
|
|
|
|
# --- Extraction Method B: Parse rendered DOM ---
|
|
log.info("Extracting from rendered DOM ...")
|
|
|
|
# Get full page content after hydration
|
|
content = page.content()
|
|
|
|
# Find listing URLs
|
|
listing_urls = re.findall(r'href="(/for-sale/details/\d+/[^"]*)"', content)
|
|
log.info("Found %d listing detail links", len(listing_urls))
|
|
|
|
# Find prices
|
|
prices = re.findall(r'£([\d,]+)', content)
|
|
log.info("Found %d price strings", len(prices))
|
|
if prices:
|
|
log.info("Prices: %s", prices[:10])
|
|
|
|
# Try to extract structured listing data from the page
|
|
listings = page.evaluate("""() => {
|
|
// Try to find listing cards via various selectors
|
|
const selectors = [
|
|
'[data-testid="search-result"]',
|
|
'[data-testid="regular-listings"] > div',
|
|
'a[href*="/for-sale/details/"]',
|
|
'[class*="ListingCard"]',
|
|
'[class*="listing-result"]',
|
|
];
|
|
|
|
for (const sel of selectors) {
|
|
const elements = document.querySelectorAll(sel);
|
|
if (elements.length > 2) {
|
|
return {
|
|
selector: sel,
|
|
count: elements.length,
|
|
// Get text and href from first 3
|
|
samples: Array.from(elements).slice(0, 3).map(el => ({
|
|
text: el.innerText?.substring(0, 300),
|
|
href: el.href || el.querySelector('a')?.href || '',
|
|
html: el.outerHTML?.substring(0, 500),
|
|
}))
|
|
};
|
|
}
|
|
}
|
|
|
|
// Fallback: find all links to listing detail pages
|
|
const links = Array.from(document.querySelectorAll('a[href*="/details/"]'));
|
|
if (links.length > 0) {
|
|
return {
|
|
selector: 'a[href*="/details/"]',
|
|
count: links.length,
|
|
samples: links.slice(0, 5).map(el => ({
|
|
text: el.innerText?.substring(0, 300),
|
|
href: el.href,
|
|
parentText: el.closest('div, li, article')?.innerText?.substring(0, 500) || '',
|
|
}))
|
|
};
|
|
}
|
|
|
|
// Last resort: get page structure
|
|
return {
|
|
selector: 'none',
|
|
count: 0,
|
|
bodyText: document.body?.innerText?.substring(0, 2000),
|
|
title: document.title,
|
|
};
|
|
}""")
|
|
|
|
print(f"\n=== DOM Extraction Results ===")
|
|
print(json.dumps(listings, indent=2, ensure_ascii=False)[:5000])
|
|
|
|
# Also extract cookies for potential reuse
|
|
cookies = context.cookies()
|
|
zoopla_cookies = {c["name"]: c["value"] for c in cookies if ".zoopla.co.uk" in c.get("domain", "")}
|
|
ua = page.evaluate("navigator.userAgent")
|
|
|
|
print(f"\n=== Session Info ===")
|
|
print(f"Cookies ({len(zoopla_cookies)}): {list(zoopla_cookies.keys())}")
|
|
print(f"User-Agent: {ua}")
|
|
|
|
if zoopla_cookies:
|
|
# Save cookies for reuse
|
|
print(f"\n=== Reusable cookie env vars ===")
|
|
for name, value in zoopla_cookies.items():
|
|
print(f" {name}={value[:50]}...")
|
|
|
|
# --- Try a detail page if we found any listing URLs ---
|
|
if listing_urls:
|
|
detail_path = listing_urls[0]
|
|
detail_url = f"{ZOOPLA_BASE}{detail_path}"
|
|
log.info("--- Fetching detail page: %s ---", detail_url)
|
|
time.sleep(2)
|
|
|
|
page.goto(detail_url, wait_until="domcontentloaded", timeout=30000)
|
|
time.sleep(5) # Let it hydrate
|
|
|
|
detail = page.evaluate("""() => {
|
|
const result = {};
|
|
|
|
// Price
|
|
const priceEl = document.querySelector('[data-testid="price"]')
|
|
|| document.querySelector('[class*="price"]');
|
|
result.price = priceEl?.innerText || '';
|
|
|
|
// Address
|
|
const addrEl = document.querySelector('[data-testid="address-label"]')
|
|
|| document.querySelector('h1') || document.querySelector('address');
|
|
result.address = addrEl?.innerText || '';
|
|
|
|
// Key features
|
|
const features = Array.from(document.querySelectorAll('[data-testid="listing_feature"] li, [class*="feature"] li'));
|
|
result.features = features.map(f => f.innerText).slice(0, 15);
|
|
|
|
// Bedrooms/bathrooms from icons or text
|
|
const specs = document.querySelectorAll('[data-testid="beds-label"], [data-testid="baths-label"], [class*="bed"], [class*="bath"]');
|
|
result.specs = Array.from(specs).map(s => s.innerText).slice(0, 5);
|
|
|
|
// Description
|
|
const desc = document.querySelector('[data-testid="listing_description"], [class*="description"]');
|
|
result.description = desc?.innerText?.substring(0, 500) || '';
|
|
|
|
// Agent
|
|
const agent = document.querySelector('[data-testid="agent-details"], [class*="agent"]');
|
|
result.agent = agent?.innerText?.substring(0, 200) || '';
|
|
|
|
// Full page text summary
|
|
result.pageTitle = document.title;
|
|
result.bodyPreview = document.body?.innerText?.substring(0, 1000);
|
|
|
|
return result;
|
|
}""")
|
|
|
|
print(f"\n=== Detail Page Data ===")
|
|
print(json.dumps(detail, indent=2, ensure_ascii=False)[:3000])
|
|
|
|
browser.close()
|
|
|
|
|
|
def main():
|
|
outcode = sys.argv[1] if len(sys.argv) > 1 else "E1"
|
|
channel = "BUY"
|
|
log.info("=== Zoopla Scraping Experiment (Playwright Stealth) ===")
|
|
log.info("Outcode: %s, Channel: %s", outcode, channel)
|
|
run_playwright_stealth(outcode, channel)
|
|
log.info("=== Done ===")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|