840 lines
31 KiB
Python
840 lines
31 KiB
Python
"""Zoopla (zoopla.co.uk) scraper — buy and rental properties.
|
|
|
|
Zoopla is behind Cloudflare Turnstile (managed interactive challenge), which
|
|
blocks all HTTP clients (curl_cffi, httpx) and even Playwright with stealth
|
|
patches. Only Camoufox (an anti-fingerprinting Firefox fork) passes reliably.
|
|
|
|
Zoopla uses Next.js App Router with React Server Components (RSC). Search
|
|
result data is server-rendered in an RSC stream, not available via
|
|
__NEXT_DATA__ or a JSON API. URL-based location slugs return 0 results —
|
|
the working flow requires typing into the autocomplete input, selecting a
|
|
suggestion, and clicking Search.
|
|
|
|
Architecture:
|
|
Unlike the other scrapers which use HTTP clients per outcode, Zoopla keeps
|
|
a single Camoufox browser alive for the entire scrape. For each outcode, it:
|
|
1. Clears and types the outcode into the search input
|
|
2. Selects the first autocomplete suggestion
|
|
3. Clicks Search
|
|
4. Extracts listing data from the rendered DOM
|
|
5. Handles pagination via ?pn=N parameter
|
|
|
|
The browser session replaces the cookie/client pattern used by other scrapers.
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
import time
|
|
|
|
from constants import DELAY_BETWEEN_PAGES, MAX_BEDROOMS, PROPERTY_TYPE_MAP, ZOOPLA_BASE
|
|
from metrics import zoopla_errors_total, zoopla_pages_scraped, zoopla_properties_scraped
|
|
from spatial import PostcodeSpatialIndex
|
|
from transform import validate_floor_area
|
|
|
|
log = logging.getLogger("zoopla")
|
|
|
|
|
|
class TurnstileError(Exception):
|
|
"""Raised when Cloudflare Turnstile challenge cannot be passed."""
|
|
|
|
|
|
# Maximum search result pages to scrape per outcode (25 listings/page)
|
|
MAX_PAGES_PER_OUTCODE = 40
|
|
|
|
# JavaScript to extract listings from the rendered DOM.
|
|
# Uses data-testid attributes as primary selectors (stable across deployments),
|
|
# then falls back to href-based link matching with parent-walking.
|
|
_EXTRACT_LISTINGS_JS = r"""() => {
|
|
const seen = new Set();
|
|
const results = [];
|
|
|
|
// Strategy 1: Use data-testid selectors (post-2025 redesign)
|
|
const listingCards = document.querySelectorAll(
|
|
'[data-testid="regular-listings"] > div, [data-testid="search-content"] li'
|
|
);
|
|
|
|
for (const card of listingCards) {
|
|
const link = card.querySelector(
|
|
'a[href*="/for-sale/details/"], a[href*="/new-homes/details/"], a[href*="/to-rent/details/"]'
|
|
);
|
|
if (!link) continue;
|
|
|
|
const href = link.href;
|
|
const match = href.match(/\/details\/(\d+)\//);
|
|
if (!match) continue;
|
|
|
|
const id = match[1];
|
|
if (seen.has(id)) continue;
|
|
seen.add(id);
|
|
|
|
const text = card.innerText || '';
|
|
|
|
// Try data-testid price element first, then regex
|
|
const priceEl = card.querySelector('[data-testid="listing-price"]');
|
|
const priceText = priceEl ? priceEl.innerText : text;
|
|
const priceMatch = priceText.match(/\u00a3([\d,]+)/);
|
|
|
|
// Try address element first, then regex
|
|
const addressEl = card.querySelector('address');
|
|
let address = addressEl ? addressEl.innerText.trim() : '';
|
|
|
|
if (!address) {
|
|
const lines = text.split('\n').map(l => l.trim()).filter(Boolean);
|
|
for (const line of lines) {
|
|
if (/[A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2}/i.test(line) ||
|
|
(line.includes(',') && !line.includes('\u00a3') && !/^\d+ beds?/i.test(line))) {
|
|
address = line;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
const bedsMatch = text.match(/(\d+)\s*beds?/i);
|
|
const bathsMatch = text.match(/(\d+)\s*baths?/i);
|
|
const recMatch = text.match(/(\d+)\s*reception/i);
|
|
const areaMatch = text.match(/([\d,]+)\s*sq\.?\s*ft/i);
|
|
|
|
let tenure = '';
|
|
if (/leasehold/i.test(text)) tenure = 'Leasehold';
|
|
else if (/freehold/i.test(text)) tenure = 'Freehold';
|
|
|
|
// Extract property type (e.g., "2 bed flat for sale" → "flat")
|
|
let property_type = '';
|
|
const ptMatch = text.match(/\d+\s*(?:beds?|bedrooms?)\s+([\w\s-]+?)\s+(?:for\s+sale|to\s+(?:rent|let)|for\s+rent)/i);
|
|
if (ptMatch) property_type = ptMatch[1].trim();
|
|
else if (/\bstudio\s*(?:flat|apartment)?\s+(?:for\s+sale|to\s+(?:rent|let)|for\s+rent)/i.test(text)) property_type = 'Studio';
|
|
|
|
results.push({
|
|
id, url: href.replace(window.location.origin, ''),
|
|
price: priceMatch ? parseInt(priceMatch[1].replace(/,/g, '')) : null,
|
|
price_text: priceText.trim(),
|
|
beds: bedsMatch && parseInt(bedsMatch[1]) <= 20 ? parseInt(bedsMatch[1]) : null,
|
|
baths: bathsMatch && parseInt(bathsMatch[1]) <= 20 ? parseInt(bathsMatch[1]) : null,
|
|
receptions: recMatch && parseInt(recMatch[1]) <= 20 ? parseInt(recMatch[1]) : null,
|
|
floor_area_sqft: areaMatch ? parseInt(areaMatch[1].replace(/,/g, '')) : null,
|
|
address, tenure, property_type,
|
|
});
|
|
}
|
|
|
|
// Strategy 2: Fall back to href-based link matching with parent-walking
|
|
if (results.length === 0) {
|
|
const links = Array.from(document.querySelectorAll(
|
|
'a[href*="/for-sale/details/"], a[href*="/new-homes/details/"], a[href*="/to-rent/details/"]'
|
|
));
|
|
|
|
for (const link of links) {
|
|
const href = link.href;
|
|
const match = href.match(/\/details\/(\d+)\//);
|
|
if (!match) continue;
|
|
|
|
const id = match[1];
|
|
if (seen.has(id)) continue;
|
|
seen.add(id);
|
|
|
|
let card = link;
|
|
for (let j = 0; j < 15; j++) {
|
|
card = card.parentElement;
|
|
if (!card) break;
|
|
const t = card.innerText || '';
|
|
if (t.includes('\u00a3') && (t.includes('bed') || t.includes('Bath') || t.includes('sq ft'))) {
|
|
break;
|
|
}
|
|
}
|
|
if (!card) continue;
|
|
|
|
const text = card.innerText || '';
|
|
const lines = text.split('\n').map(l => l.trim()).filter(Boolean);
|
|
|
|
const priceEl2 = card.querySelector('[data-testid="listing-price"]');
|
|
const priceText2 = priceEl2 ? priceEl2.innerText : text;
|
|
const priceMatch = priceText2.match(/\u00a3([\d,]+)/);
|
|
const bedsMatch = text.match(/(\d+)\s*beds?/i);
|
|
const bathsMatch = text.match(/(\d+)\s*baths?/i);
|
|
const recMatch = text.match(/(\d+)\s*reception/i);
|
|
const areaMatch = text.match(/([\d,]+)\s*sq\.?\s*ft/i);
|
|
|
|
let address = '';
|
|
for (const line of lines) {
|
|
if (/[A-Z]{1,2}\d[A-Z\d]?\s*\d[A-Z]{2}/i.test(line) ||
|
|
(line.includes(',') && !line.includes('\u00a3') && !/^\d+ beds?/i.test(line))) {
|
|
address = line;
|
|
break;
|
|
}
|
|
}
|
|
|
|
let tenure = '';
|
|
if (/leasehold/i.test(text)) tenure = 'Leasehold';
|
|
else if (/freehold/i.test(text)) tenure = 'Freehold';
|
|
|
|
// Extract property type
|
|
let property_type = '';
|
|
const ptMatch2 = text.match(/\d+\s*(?:beds?|bedrooms?)\s+([\w\s-]+?)\s+(?:for\s+sale|to\s+(?:rent|let)|for\s+rent)/i);
|
|
if (ptMatch2) property_type = ptMatch2[1].trim();
|
|
else if (/\bstudio\s*(?:flat|apartment)?\s+(?:for\s+sale|to\s+(?:rent|let)|for\s+rent)/i.test(text)) property_type = 'Studio';
|
|
|
|
results.push({
|
|
id, url: href.replace(window.location.origin, ''),
|
|
price: priceMatch ? parseInt(priceMatch[1].replace(/,/g, '')) : null,
|
|
price_text: priceText2.trim(),
|
|
beds: bedsMatch && parseInt(bedsMatch[1]) <= 20 ? parseInt(bedsMatch[1]) : null,
|
|
baths: bathsMatch && parseInt(bathsMatch[1]) <= 20 ? parseInt(bathsMatch[1]) : null,
|
|
receptions: recMatch && parseInt(recMatch[1]) <= 20 ? parseInt(recMatch[1]) : null,
|
|
floor_area_sqft: areaMatch ? parseInt(areaMatch[1].replace(/,/g, '')) : null,
|
|
address, tenure, property_type,
|
|
});
|
|
}
|
|
}
|
|
|
|
return results;
|
|
}"""
|
|
|
|
# JavaScript to dismiss the Usercentrics cookie consent overlay (shadow DOM).
|
|
_DISMISS_COOKIES_JS = """() => {
|
|
const aside = document.querySelector('#usercentrics-cmp-ui');
|
|
if (aside && aside.shadowRoot) {
|
|
const btns = aside.shadowRoot.querySelectorAll('button');
|
|
for (const btn of btns) {
|
|
if (btn.innerText.includes('Accept')) { btn.click(); return true; }
|
|
}
|
|
}
|
|
if (aside) { aside.remove(); return true; }
|
|
return false;
|
|
}"""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Browser lifecycle
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def launch_browser():
|
|
"""Launch Camoufox, navigate to Zoopla homepage, pass Cloudflare Turnstile,
|
|
and dismiss cookie consent. Returns (browser, page) tuple.
|
|
|
|
Raises TurnstileError if Cloudflare cannot be passed within 60 seconds.
|
|
Caller must close browser when done."""
|
|
from camoufox.pkgman import camoufox_path
|
|
|
|
# Verify camoufox is pre-installed — never download at runtime
|
|
camoufox_path(download_if_missing=False)
|
|
|
|
from camoufox.sync_api import Camoufox
|
|
|
|
log.info("Launching Camoufox browser for Zoopla...")
|
|
browser = Camoufox(headless=True).__enter__()
|
|
page = browser.new_page()
|
|
|
|
log.info("Navigating to Zoopla homepage...")
|
|
page.goto(f"{ZOOPLA_BASE}/", wait_until="domcontentloaded", timeout=60000)
|
|
|
|
# Wait for Cloudflare Turnstile to resolve.
|
|
# Try clicking the Turnstile checkbox if present (helps in some cases).
|
|
for i in range(20):
|
|
if "Just a moment" not in page.title():
|
|
break
|
|
# Attempt to click the Turnstile checkbox in the challenge iframe
|
|
for frame in page.frames:
|
|
if "challenges.cloudflare.com" in frame.url:
|
|
try:
|
|
iframe_el = page.query_selector('iframe[src*="challenges.cloudflare"]')
|
|
if iframe_el:
|
|
box = iframe_el.bounding_box()
|
|
if box:
|
|
page.mouse.click(box["x"] + 30, box["y"] + box["height"] / 2)
|
|
except Exception:
|
|
pass
|
|
break
|
|
time.sleep(3)
|
|
else:
|
|
page.close()
|
|
browser.close()
|
|
raise TurnstileError("Cloudflare Turnstile did not resolve after 60s")
|
|
|
|
log.info("Cloudflare passed — title: %s", page.title())
|
|
time.sleep(2)
|
|
|
|
# Dismiss cookie consent
|
|
page.evaluate(_DISMISS_COOKIES_JS)
|
|
time.sleep(1)
|
|
|
|
return browser, page
|
|
|
|
|
|
def _ensure_not_challenged(page) -> None:
|
|
"""Check if current page is a Cloudflare challenge and wait/raise."""
|
|
if "Just a moment" not in page.title():
|
|
return
|
|
|
|
log.warning("Cloudflare challenge detected mid-session, waiting...")
|
|
for i in range(20):
|
|
time.sleep(3)
|
|
if "Just a moment" not in page.title():
|
|
log.info("Cloudflare challenge resolved")
|
|
return
|
|
|
|
raise TurnstileError("Cloudflare re-challenge did not resolve")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Search navigation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _navigate_direct(page, url: str) -> bool:
|
|
"""Navigate directly to a Zoopla search URL (skipping the homepage flow).
|
|
|
|
Used to load the second channel (e.g., RENT after BUY) for the same outcode
|
|
by swapping the path component. Falls back gracefully — returns False if
|
|
the page has no listings, so the caller can retry via the full search flow.
|
|
"""
|
|
try:
|
|
page.goto(url, wait_until="domcontentloaded", timeout=30000)
|
|
except Exception as e:
|
|
log.debug("Direct navigation failed: %s", e)
|
|
return False
|
|
_ensure_not_challenged(page)
|
|
|
|
# Wait for listing content to hydrate
|
|
try:
|
|
page.wait_for_function(
|
|
"""() => {
|
|
const cards = document.querySelectorAll(
|
|
'[data-testid="regular-listings"] > div'
|
|
);
|
|
if (cards.length === 0) return false;
|
|
for (const card of cards) {
|
|
const t = card.innerText || '';
|
|
if (t.includes('\\u00a3') && t.length > 50) return true;
|
|
}
|
|
return false;
|
|
}""",
|
|
timeout=8000,
|
|
)
|
|
except Exception:
|
|
# Check if the page has any listings at all
|
|
has_listings = page.query_selector('a[href*="/details/"]')
|
|
if not has_listings:
|
|
return False
|
|
time.sleep(1.5)
|
|
|
|
return True
|
|
|
|
|
|
def _navigate_search(page, outcode: str, channel: str) -> bool:
|
|
"""Navigate to search results for an outcode via the homepage search flow.
|
|
|
|
Returns True if results were found, False if no results or navigation failed.
|
|
Raises TurnstileError if Cloudflare blocks us."""
|
|
# Navigate to homepage to reset search state
|
|
page.goto(f"{ZOOPLA_BASE}/", wait_until="domcontentloaded", timeout=30000)
|
|
time.sleep(0.5)
|
|
_ensure_not_challenged(page)
|
|
|
|
# Dismiss cookie consent (may reappear after navigation)
|
|
page.evaluate(_DISMISS_COOKIES_JS)
|
|
time.sleep(0.3)
|
|
|
|
# Select Buy/Rent tab
|
|
if channel == "RENT":
|
|
rent_tab = page.query_selector(
|
|
'button:has-text("Rent"), [role="tab"]:has-text("Rent")'
|
|
)
|
|
if rent_tab:
|
|
rent_tab.click()
|
|
time.sleep(0.2)
|
|
|
|
# Find and fill search input
|
|
search_input = page.query_selector(
|
|
'input[name="autosuggest-input"]'
|
|
) or page.query_selector('input[type="text"]')
|
|
if not search_input:
|
|
log.warning("Could not find search input on homepage")
|
|
return False
|
|
|
|
search_input.click()
|
|
time.sleep(0.1)
|
|
search_input.fill("")
|
|
search_input.type(outcode, delay=60)
|
|
time.sleep(1.2)
|
|
|
|
# Select first autocomplete suggestion
|
|
first_option = page.query_selector('[role="option"]')
|
|
if not first_option:
|
|
log.debug("No autocomplete suggestions for outcode %s", outcode)
|
|
return False
|
|
|
|
first_option.click()
|
|
time.sleep(0.2)
|
|
|
|
# Click search button
|
|
search_btn = page.query_selector('button:has-text("Search")')
|
|
if search_btn:
|
|
search_btn.click()
|
|
else:
|
|
search_input.press("Enter")
|
|
|
|
# Wait for results to load — try waiting for listings container, fall back to fixed wait
|
|
try:
|
|
page.wait_for_selector(
|
|
'[data-testid="regular-listings"], a[href*="/details/"]',
|
|
timeout=10000,
|
|
)
|
|
except Exception:
|
|
time.sleep(4)
|
|
_ensure_not_challenged(page)
|
|
|
|
# Wait for client-side hydration to populate listing content (prices, addresses).
|
|
# The structural container appears in server-rendered HTML before React hydrates
|
|
# the actual card content — extracting too early yields empty price/address fields.
|
|
try:
|
|
page.wait_for_function(
|
|
"""() => {
|
|
const cards = document.querySelectorAll(
|
|
'[data-testid="regular-listings"] > div'
|
|
);
|
|
if (cards.length === 0) return false;
|
|
for (const card of cards) {
|
|
const t = card.innerText || '';
|
|
if (t.includes('\\u00a3') && t.length > 50) return true;
|
|
}
|
|
return false;
|
|
}""",
|
|
timeout=8000,
|
|
)
|
|
except Exception:
|
|
# Content never appeared — extraction will likely fail but let it try
|
|
log.debug("Listing content hydration wait timed out — prices may not have rendered")
|
|
time.sleep(2)
|
|
|
|
return True
|
|
|
|
|
|
def _get_result_count(page) -> int:
|
|
"""Extract the total results count from the page.
|
|
|
|
Tries __ZAD_TARGETING__ JSON first (most reliable), then body text regex
|
|
matching both "N results" and "N properties" patterns."""
|
|
try:
|
|
# Try the ZAD targeting JSON script tag first
|
|
count = page.evaluate("""() => {
|
|
const s = document.querySelector('#__ZAD_TARGETING__');
|
|
if (s) {
|
|
try {
|
|
const d = JSON.parse(s.textContent);
|
|
if (d.search_results_count != null) return d.search_results_count;
|
|
} catch(e) {}
|
|
}
|
|
return null;
|
|
}""")
|
|
if count is not None and count > 0:
|
|
return count
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
body = page.inner_text("body")
|
|
match = re.search(r"([\d,]+)\s+(?:results?|properties)", body)
|
|
if match:
|
|
return int(match.group(1).replace(",", ""))
|
|
except Exception:
|
|
pass
|
|
return 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Extraction and pagination
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
_first_extraction_logged = False
|
|
|
|
|
|
def _extract_listings(page) -> list[dict]:
|
|
"""Extract listing data from the current search results page DOM."""
|
|
global _first_extraction_logged
|
|
try:
|
|
listings = page.evaluate(_EXTRACT_LISTINGS_JS)
|
|
|
|
# Log diagnostic info on the very first extraction attempt
|
|
if not _first_extraction_logged:
|
|
_first_extraction_logged = True
|
|
try:
|
|
diag = page.evaluate("""() => {
|
|
const details = document.querySelectorAll('a[href*="/details/"]');
|
|
const testids = document.querySelectorAll('[data-testid]');
|
|
const testidNames = [...new Set([...testids].map(e => e.dataset.testid))];
|
|
return {
|
|
url: location.href,
|
|
title: document.title,
|
|
detailLinks: details.length,
|
|
testids: testidNames.slice(0, 30),
|
|
bodySnippet: document.body?.innerText?.slice(0, 500) || '',
|
|
};
|
|
}""")
|
|
log.info(
|
|
"Zoopla first-page diagnostic: url=%s title=%s detailLinks=%d "
|
|
"testids=%s bodySnippet=%.200s",
|
|
diag.get("url"), diag.get("title"), diag.get("detailLinks", 0),
|
|
diag.get("testids", []), diag.get("bodySnippet", ""),
|
|
)
|
|
except Exception:
|
|
pass
|
|
log.info("Zoopla first extraction: %d listings found", len(listings))
|
|
|
|
return listings
|
|
except Exception as e:
|
|
log.warning("Failed to extract listings from DOM: %s", e)
|
|
zoopla_errors_total.labels(type="extract_failed").inc()
|
|
return []
|
|
|
|
|
|
def _paginate(page, total_results: int, channel: str) -> list[dict]:
|
|
"""Extract listings from all pages of search results.
|
|
|
|
Page 1 is already loaded. For subsequent pages, clicks the Next button
|
|
or navigates via URL parameter ?pn=N."""
|
|
all_listings = _extract_listings(page)
|
|
channel_label = "buy" if channel == "BUY" else "rent"
|
|
zoopla_pages_scraped.labels(channel=channel_label).inc()
|
|
|
|
if not all_listings or total_results <= len(all_listings):
|
|
return all_listings
|
|
|
|
seen_ids = {listing["id"] for listing in all_listings}
|
|
current_url = page.url
|
|
page_num = 2
|
|
|
|
while len(all_listings) < total_results and page_num <= MAX_PAGES_PER_OUTCODE:
|
|
time.sleep(DELAY_BETWEEN_PAGES)
|
|
|
|
# Try navigating via URL parameter
|
|
if "?" in current_url:
|
|
next_url = re.sub(r"[?&]pn=\d+", "", current_url)
|
|
separator = "&" if "?" in next_url else "?"
|
|
next_url = f"{next_url}{separator}pn={page_num}"
|
|
else:
|
|
next_url = f"{current_url}?pn={page_num}"
|
|
|
|
try:
|
|
page.goto(next_url, wait_until="domcontentloaded", timeout=30000)
|
|
_ensure_not_challenged(page)
|
|
# Wait for listing content instead of fixed sleep
|
|
try:
|
|
page.wait_for_function(
|
|
"""() => {
|
|
const cards = document.querySelectorAll(
|
|
'[data-testid="regular-listings"] > div'
|
|
);
|
|
if (cards.length === 0) return false;
|
|
for (const card of cards) {
|
|
const t = card.innerText || '';
|
|
if (t.includes('\\u00a3') && t.length > 50) return true;
|
|
}
|
|
return false;
|
|
}""",
|
|
timeout=8000,
|
|
)
|
|
except Exception:
|
|
time.sleep(1.5)
|
|
except TurnstileError:
|
|
raise
|
|
except Exception as e:
|
|
log.debug("Pagination navigation failed at page %d: %s", page_num, e)
|
|
break
|
|
|
|
page_listings = _extract_listings(page)
|
|
if not page_listings:
|
|
break
|
|
|
|
# Deduplicate within this outcode
|
|
new_count = 0
|
|
for listing in page_listings:
|
|
if listing["id"] not in seen_ids:
|
|
seen_ids.add(listing["id"])
|
|
all_listings.append(listing)
|
|
new_count += 1
|
|
|
|
zoopla_pages_scraped.labels(channel=channel_label).inc()
|
|
|
|
if new_count == 0:
|
|
break # No new listings on this page
|
|
|
|
page_num += 1
|
|
|
|
return all_listings
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property transformation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
# Cached outcode → (postcode, lat, lng) lookups to avoid repeated O(n) scans
|
|
# over 2.26M postcodes. Populated lazily on first lookup per outcode.
|
|
_outcode_coords_cache: dict[str, tuple[str, float, float] | None] = {}
|
|
|
|
|
|
def _resolve_outcode_coords(
|
|
outcode: str, pc_coords: dict[str, tuple[float, float]]
|
|
) -> tuple[str, float, float] | None:
|
|
"""Find first postcode + coords for an outcode. Result is cached."""
|
|
if outcode in _outcode_coords_cache:
|
|
return _outcode_coords_cache[outcode]
|
|
|
|
prefix = outcode + " "
|
|
for pcd, (lat, lng) in pc_coords.items():
|
|
if pcd.startswith(prefix) or (
|
|
len(outcode) >= 4
|
|
and pcd.startswith(outcode)
|
|
and len(pcd) > len(outcode)
|
|
):
|
|
_outcode_coords_cache[outcode] = (pcd, lat, lng)
|
|
return (pcd, lat, lng)
|
|
|
|
_outcode_coords_cache[outcode] = None
|
|
return None
|
|
|
|
|
|
def _extract_postcode(text: str) -> str | None:
|
|
"""Extract a full UK postcode from text like 'Dollar Bay Place, Canary Wharf E14 9SS'."""
|
|
match = re.search(r"([A-Z]{1,2}\d[A-Z0-9]?\s*\d[A-Z]{2})", text, re.IGNORECASE)
|
|
if match:
|
|
return match.group(1).upper().strip()
|
|
return None
|
|
|
|
|
|
def _extract_outcode(text: str) -> str | None:
|
|
"""Extract a UK outcode from address text like 'Whitechapel Road, London E1'."""
|
|
# Look for outcode at end of string or after last comma
|
|
match = re.search(r"\b([A-Z]{1,2}\d[A-Z0-9]?)\s*$", text.strip(), re.IGNORECASE)
|
|
if match:
|
|
return match.group(1).upper()
|
|
# Try after comma
|
|
parts = text.split(",")
|
|
if len(parts) > 1:
|
|
last = parts[-1].strip()
|
|
match = re.match(r"^([A-Z]{1,2}\d[A-Z0-9]?)$", last, re.IGNORECASE)
|
|
if match:
|
|
return match.group(1).upper()
|
|
return None
|
|
|
|
|
|
def _map_property_type(raw_type: str | None) -> str:
|
|
"""Map Zoopla property type text to canonical type."""
|
|
if not raw_type:
|
|
return "Other"
|
|
# Exact match (handles Rightmove-style capitalised values)
|
|
canonical = PROPERTY_TYPE_MAP.get(raw_type)
|
|
if canonical:
|
|
return canonical
|
|
# Title-case match (handles regex-extracted lowercase like "town house" → "Town House")
|
|
canonical = PROPERTY_TYPE_MAP.get(raw_type.title())
|
|
if canonical:
|
|
return canonical
|
|
# Keyword fallback
|
|
lower = raw_type.lower()
|
|
if "flat" in lower or "apartment" in lower or "maisonette" in lower or "studio" in lower or "penthouse" in lower:
|
|
return "Flats/Maisonettes"
|
|
if "detached" in lower and "semi" not in lower:
|
|
return "Detached"
|
|
if "semi" in lower:
|
|
return "Semi-Detached"
|
|
if "terrace" in lower or "mews" in lower:
|
|
return "Terraced"
|
|
if "house" in lower:
|
|
return "Detached"
|
|
return "Other"
|
|
|
|
|
|
def _detect_rent_frequency(price_text: str) -> str:
|
|
"""Detect rent frequency from Zoopla price text.
|
|
|
|
Zoopla price elements contain text like '£1,500 pcm', '£350 pw',
|
|
'£18,000 pa'. Defaults to 'monthly' if no frequency indicator found.
|
|
"""
|
|
lower = price_text.lower()
|
|
if "pw" in lower or "per week" in lower or "/w" in lower:
|
|
return "weekly"
|
|
if "pa" in lower or "per annum" in lower or "/y" in lower or "per year" in lower:
|
|
return "yearly"
|
|
# pcm, per month, /m, or no indicator — default monthly
|
|
return "monthly"
|
|
|
|
|
|
def transform_property(
|
|
raw: dict,
|
|
channel: str,
|
|
pc_index: PostcodeSpatialIndex,
|
|
pc_coords: dict[str, tuple[float, float]],
|
|
search_outcode: str | None = None,
|
|
) -> dict | None:
|
|
"""Transform a raw Zoopla listing dict into the standard output schema.
|
|
|
|
Zoopla search cards do not include coordinates, so we resolve lat/lng
|
|
from postcodes extracted from the address text."""
|
|
price = raw.get("price")
|
|
if not price or int(price) <= 0:
|
|
return None
|
|
|
|
address = raw.get("address", "")
|
|
|
|
# Resolve postcode and coordinates from address
|
|
postcode = _extract_postcode(address)
|
|
lat = lng = None
|
|
|
|
if postcode:
|
|
coords = pc_coords.get(postcode)
|
|
if coords:
|
|
lat, lng = coords
|
|
|
|
if lat is None:
|
|
# Try outcode-level fallback from address text
|
|
addr_outcode = _extract_outcode(address)
|
|
if addr_outcode:
|
|
result = _resolve_outcode_coords(addr_outcode, pc_coords)
|
|
if result:
|
|
postcode, lat, lng = result
|
|
|
|
# Final fallback: use the outcode we know we're searching
|
|
if lat is None and search_outcode:
|
|
result = _resolve_outcode_coords(search_outcode, pc_coords)
|
|
if result:
|
|
postcode, lat, lng = result
|
|
|
|
if lat is None or lng is None or not postcode:
|
|
return None
|
|
|
|
# Validate coordinates are in England
|
|
if not (49 <= lat <= 56 and -7 <= lng <= 2):
|
|
return None
|
|
|
|
raw_beds = raw.get("beds") or 0
|
|
raw_baths = raw.get("baths") or 0
|
|
bedrooms = raw_beds if raw_beds <= MAX_BEDROOMS else 0
|
|
bathrooms = raw_baths if raw_baths <= MAX_BEDROOMS else 0
|
|
if raw_beds > MAX_BEDROOMS or raw_baths > MAX_BEDROOMS:
|
|
log.warning(
|
|
"Zoopla %s: implausible beds=%d baths=%d (capped to 0)",
|
|
raw.get("id", "?"), raw_beds, raw_baths,
|
|
)
|
|
receptions = raw.get("receptions") or 0
|
|
|
|
# Floor area: convert sq ft to sq m
|
|
floor_area_sqm = None
|
|
sqft = raw.get("floor_area_sqft")
|
|
if sqft:
|
|
floor_area_sqm = validate_floor_area(round(sqft * 0.092903, 1))
|
|
|
|
listing_id = raw.get("id", "")
|
|
listing_url = raw.get("url", "")
|
|
if listing_url and not listing_url.startswith("http"):
|
|
listing_url = ZOOPLA_BASE + listing_url
|
|
|
|
# Detect rent frequency from price text (e.g. "£1,500 pcm" vs "£350 pw")
|
|
if channel == "BUY":
|
|
frequency = ""
|
|
else:
|
|
price_text = raw.get("price_text", "")
|
|
frequency = _detect_rent_frequency(price_text)
|
|
|
|
return {
|
|
"id": f"zp_{listing_id}",
|
|
"Bedrooms": bedrooms,
|
|
"Bathrooms": bathrooms,
|
|
"Number of bedrooms & living rooms": bedrooms + receptions,
|
|
"lon": lng,
|
|
"lat": lat,
|
|
"Postcode": postcode,
|
|
"Address per Property Register": address,
|
|
"Leasehold/Freehold": raw.get("tenure") or None,
|
|
"Property type": _map_property_type(raw.get("property_type")),
|
|
"Property sub-type": raw.get("property_type") or "",
|
|
"price": int(price),
|
|
"price_frequency": frequency,
|
|
"Price qualifier": "",
|
|
"Total floor area (sqm)": floor_area_sqm,
|
|
"Listing URL": listing_url,
|
|
"Listing features": [],
|
|
"first_visible_date": "",
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Top-level search function (called by scraper.py)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def search_outcode(
|
|
page,
|
|
outcode: str,
|
|
channel: str,
|
|
pc_index: PostcodeSpatialIndex,
|
|
pc_coords: dict[str, tuple[float, float]],
|
|
base_search_url: str | None = None,
|
|
) -> tuple[list[dict], str | None]:
|
|
"""Search Zoopla for properties in one outcode.
|
|
|
|
Takes a live Camoufox Page (from launch_browser). Navigates through the
|
|
search flow, extracts listings from rendered DOM, and transforms to the
|
|
standard output schema.
|
|
|
|
If base_search_url is provided (from a previous channel search for the same
|
|
outcode), tries direct URL navigation first — skipping the slow homepage
|
|
search flow. Falls back to full navigation if direct fails.
|
|
|
|
Returns (properties, search_url) where search_url can be passed to the next
|
|
channel call for this outcode.
|
|
|
|
Raises TurnstileError if Cloudflare blocks us mid-session.
|
|
"""
|
|
navigated = False
|
|
if base_search_url:
|
|
navigated = _navigate_direct(page, base_search_url)
|
|
if navigated:
|
|
log.debug("Zoopla %s %s: used direct URL navigation", outcode, channel)
|
|
|
|
if not navigated:
|
|
if not _navigate_search(page, outcode, channel):
|
|
return [], None
|
|
|
|
total_results = _get_result_count(page)
|
|
|
|
# Always try extraction even if result count is 0 — the count regex may
|
|
# not match Zoopla's current text format, but listings may still be in DOM
|
|
raw_listings = _paginate(page, max(total_results, 25), channel)
|
|
if not raw_listings:
|
|
if total_results > 0:
|
|
log.debug(
|
|
"Zoopla %s %s: page claims %d results but extraction found 0 — "
|
|
"DOM selectors may need updating",
|
|
outcode, channel, total_results,
|
|
)
|
|
return [], None
|
|
|
|
channel_label = "buy" if channel == "BUY" else "rent"
|
|
properties = []
|
|
dropped = 0
|
|
for raw in raw_listings:
|
|
transformed = transform_property(raw, channel, pc_index, pc_coords, search_outcode=outcode)
|
|
if transformed:
|
|
properties.append(transformed)
|
|
zoopla_properties_scraped.labels(channel=channel_label).inc()
|
|
else:
|
|
dropped += 1
|
|
|
|
if dropped and not properties:
|
|
# Log a sample raw listing to diagnose which fields are missing
|
|
sample = raw_listings[0] if raw_listings else {}
|
|
log.debug(
|
|
"Zoopla %s %s: extracted %d raw listings but all %d dropped in transform "
|
|
"(no price/postcode/coords). Sample raw: price=%s address=%r",
|
|
outcode, channel, len(raw_listings), dropped,
|
|
sample.get("price"), sample.get("address", ""),
|
|
)
|
|
elif dropped > len(raw_listings) // 2:
|
|
log.debug(
|
|
"Zoopla %s %s: %d/%d listings dropped in transform",
|
|
outcode, channel, dropped, len(raw_listings),
|
|
)
|
|
|
|
return properties, page.url
|