fmt
This commit is contained in:
parent
2f149503bb
commit
6ea544a0f6
10 changed files with 144 additions and 60 deletions
|
|
@ -33,6 +33,10 @@ class CookiesExpiredError(Exception):
|
|||
"""Raised when home.co.uk returns 403, indicating cookies need refresh."""
|
||||
|
||||
|
||||
class PaginationError(Exception):
|
||||
"""Raised when home.co.uk pagination cannot be completed."""
|
||||
|
||||
|
||||
# Channel mapping: internal name → URL path segment
|
||||
HOMECOUK_URL_SEGMENT = "for-sale"
|
||||
|
||||
|
|
@ -171,6 +175,25 @@ def fetch_page(
|
|||
return None
|
||||
|
||||
|
||||
def _coerce_positive_int(value) -> int | None:
|
||||
parsed = parse_int_value(value)
|
||||
if parsed is None or parsed <= 0:
|
||||
return None
|
||||
return parsed
|
||||
|
||||
|
||||
def _property_identity(prop: dict, page: int, index: int) -> str:
|
||||
for key in ("listing_id", "property_id", "id"):
|
||||
value = prop.get(key)
|
||||
if value:
|
||||
return f"{key}:{value}"
|
||||
return (
|
||||
f"page:{page}:index:{index}:"
|
||||
f"{prop.get('display_address') or prop.get('address') or ''}:"
|
||||
f"{prop.get('price') or prop.get('latest_price') or ''}"
|
||||
)
|
||||
|
||||
|
||||
def parse_floor_area(description: str | None) -> float | None:
|
||||
"""Try to extract floor area from description text like '789 sq.ft.' or '73 sq.m.'."""
|
||||
if not description:
|
||||
|
|
@ -363,6 +386,9 @@ def search_outcode(
|
|||
url = f"{HOMECOUK_API_BASE}/{url_segment}/{outcode.lower()}/"
|
||||
properties = []
|
||||
page = 1
|
||||
last_page: int | None = None
|
||||
total_results: int | None = None
|
||||
seen_ids: set[str] = set()
|
||||
|
||||
while True:
|
||||
params = {
|
||||
|
|
@ -379,12 +405,32 @@ def search_outcode(
|
|||
|
||||
data = fetch_page(client, url, params)
|
||||
if not data:
|
||||
break
|
||||
raise PaginationError(f"home.co.uk {outcode} page {page} failed to load")
|
||||
|
||||
pagination = data.get("pagination", {}) or {}
|
||||
if last_page is None:
|
||||
last_page = _coerce_positive_int(pagination.get("last_page"))
|
||||
if total_results is None:
|
||||
total_results = _coerce_positive_int(pagination.get("total"))
|
||||
|
||||
raw_props = data.get("properties", [])
|
||||
if not raw_props:
|
||||
if total_results and page <= (last_page or page):
|
||||
raise PaginationError(
|
||||
f"home.co.uk {outcode} page {page} returned no properties "
|
||||
f"before the advertised end"
|
||||
)
|
||||
break
|
||||
|
||||
page_ids = {
|
||||
_property_identity(prop, page, idx) for idx, prop in enumerate(raw_props)
|
||||
}
|
||||
if page_ids and page_ids.issubset(seen_ids):
|
||||
raise PaginationError(
|
||||
f"home.co.uk {outcode} page {page} repeated previously seen results"
|
||||
)
|
||||
seen_ids.update(page_ids)
|
||||
|
||||
for prop in raw_props:
|
||||
try:
|
||||
transformed = transform_property(prop, pc_index)
|
||||
|
|
@ -401,10 +447,12 @@ def search_outcode(
|
|||
if max_properties is not None and len(properties) >= max_properties:
|
||||
return properties
|
||||
|
||||
# Check pagination
|
||||
pagination = data.get("pagination", {})
|
||||
last_page = pagination.get("last_page", 1)
|
||||
if page >= last_page:
|
||||
if last_page is not None:
|
||||
if page >= last_page:
|
||||
break
|
||||
elif total_results is not None and len(seen_ids) >= total_results:
|
||||
break
|
||||
elif len(raw_props) < HOMECOUK_PER_PAGE:
|
||||
break
|
||||
|
||||
page += 1
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ Architecture:
|
|||
import logging
|
||||
import re
|
||||
import time
|
||||
from urllib.parse import parse_qsl, urlencode, urljoin, urlparse, urlunparse
|
||||
|
||||
from constants import DELAY_BETWEEN_PAGES, MAX_BEDROOMS, PROPERTY_TYPE_MAP, ZOOPLA_BASE
|
||||
from spatial import PostcodeSpatialIndex
|
||||
|
|
@ -52,9 +53,6 @@ class _ManagedCamoufoxBrowser:
|
|||
return getattr(self._browser, name)
|
||||
|
||||
|
||||
# Maximum search result pages to scrape per outcode (25 listings/page)
|
||||
MAX_PAGES_PER_OUTCODE = 40
|
||||
|
||||
# JavaScript to extract listings from the rendered DOM.
|
||||
# Uses data-testid attributes as primary selectors (stable across deployments),
|
||||
# then falls back to href-based link matching with parent-walking.
|
||||
|
|
@ -423,6 +421,45 @@ def _get_result_count(page) -> int:
|
|||
return 0
|
||||
|
||||
|
||||
def _url_with_page(url: str, page_num: int) -> str:
|
||||
parsed = urlparse(url)
|
||||
query = [(key, value) for key, value in parse_qsl(parsed.query) if key != "pn"]
|
||||
query.append(("pn", str(page_num)))
|
||||
return urlunparse(parsed._replace(query=urlencode(query)))
|
||||
|
||||
|
||||
def _find_next_page_url(page) -> str | None:
|
||||
"""Return the rendered pagination next URL, if Zoopla exposes one."""
|
||||
try:
|
||||
href = page.evaluate(
|
||||
"""() => {
|
||||
const links = Array.from(document.querySelectorAll('a[href]'));
|
||||
const next = links.find((link) => {
|
||||
const text = (link.innerText || link.textContent || '')
|
||||
.trim()
|
||||
.toLowerCase();
|
||||
const label = (link.getAttribute('aria-label') || '').toLowerCase();
|
||||
const rel = (link.getAttribute('rel') || '').toLowerCase();
|
||||
return rel.includes('next')
|
||||
|| label.includes('next')
|
||||
|| text === 'next'
|
||||
|| text === 'next page';
|
||||
});
|
||||
if (!next) return null;
|
||||
const href = next.href || '';
|
||||
if (!href.includes('/for-sale/') && !href.includes('/new-homes/')) {
|
||||
return null;
|
||||
}
|
||||
return href;
|
||||
}"""
|
||||
)
|
||||
except Exception:
|
||||
return None
|
||||
if not href:
|
||||
return None
|
||||
return urljoin(ZOOPLA_BASE, href)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Extraction and pagination
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
@ -476,29 +513,27 @@ def _paginate(
|
|||
) -> list[dict]:
|
||||
"""Extract listings from all pages of search results.
|
||||
|
||||
Page 1 is already loaded. For subsequent pages, clicks the Next button
|
||||
or navigates via URL parameter ?pn=N."""
|
||||
Page 1 is already loaded. For subsequent pages, follow Zoopla's rendered
|
||||
next link when present, otherwise advance via the pn=N URL parameter while
|
||||
the advertised result count says more listings remain."""
|
||||
all_listings = _extract_listings(page)
|
||||
if max_properties is not None and len(all_listings) >= max_properties:
|
||||
return all_listings[:max_properties]
|
||||
|
||||
if not all_listings or total_results <= len(all_listings):
|
||||
if not all_listings:
|
||||
return all_listings
|
||||
|
||||
seen_ids = {listing["id"] for listing in all_listings}
|
||||
current_url = page.url
|
||||
page_num = 2
|
||||
|
||||
while len(all_listings) < total_results and page_num <= MAX_PAGES_PER_OUTCODE:
|
||||
time.sleep(DELAY_BETWEEN_PAGES)
|
||||
while True:
|
||||
next_url = _find_next_page_url(page)
|
||||
if not next_url:
|
||||
if total_results > 0 and len(all_listings) >= total_results:
|
||||
break
|
||||
next_url = _url_with_page(page.url, page_num)
|
||||
|
||||
# Try navigating via URL parameter
|
||||
if "?" in current_url:
|
||||
next_url = re.sub(r"[?&]pn=\d+", "", current_url)
|
||||
separator = "&" if "?" in next_url else "?"
|
||||
next_url = f"{next_url}{separator}pn={page_num}"
|
||||
else:
|
||||
next_url = f"{current_url}?pn={page_num}"
|
||||
time.sleep(DELAY_BETWEEN_PAGES)
|
||||
|
||||
try:
|
||||
page.goto(next_url, wait_until="domcontentloaded", timeout=30000)
|
||||
|
|
@ -512,6 +547,12 @@ def _paginate(
|
|||
|
||||
page_listings = _extract_listings(page)
|
||||
if not page_listings:
|
||||
if total_results > len(all_listings):
|
||||
raise RuntimeError(
|
||||
"Zoopla pagination stopped with no listings on page "
|
||||
f"{page_num}; collected {len(all_listings)} of "
|
||||
f"{total_results} advertised results"
|
||||
)
|
||||
break
|
||||
|
||||
# Deduplicate within this outcode
|
||||
|
|
@ -525,10 +566,20 @@ def _paginate(
|
|||
return all_listings[:max_properties]
|
||||
|
||||
if new_count == 0:
|
||||
break # No new listings on this page
|
||||
if total_results > len(all_listings):
|
||||
raise RuntimeError(
|
||||
"Zoopla pagination repeated results on page "
|
||||
f"{page_num}; collected {len(all_listings)} of "
|
||||
f"{total_results} advertised results"
|
||||
)
|
||||
break
|
||||
|
||||
page_num += 1
|
||||
|
||||
if total_results > 0 and len(all_listings) >= total_results:
|
||||
if not _find_next_page_url(page):
|
||||
break
|
||||
|
||||
return all_listings
|
||||
|
||||
|
||||
|
|
@ -768,7 +819,7 @@ def search_outcode(
|
|||
# not match Zoopla's current text format, but listings may still be in DOM
|
||||
raw_listings = _paginate(
|
||||
page,
|
||||
max(total_results, 25),
|
||||
total_results,
|
||||
max_properties=max_properties,
|
||||
)
|
||||
if not raw_listings:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue