import logging import math import re from constants import MAX_BEDROOMS, PROPERTY_TYPE_MAP, RIGHTMOVE_BASE from spatial import PostcodeSpatialIndex log = logging.getLogger("rightmove") # Floor area bounds (sqm). Values outside this range are almost certainly # data errors: sub-5 sqm catches garbled extractions (e.g., 0.1 sqm for a # detached house), and >2000 sqm (~21,500 sq ft) exceeds even the largest # UK mansions. MIN_FLOOR_AREA_SQM = 5.0 MAX_FLOOR_AREA_SQM = 2000.0 def validate_floor_area(sqm: float | None) -> float | None: """Validate a floor area value. Returns None for nonsensical values. Rejects values below MIN_FLOOR_AREA_SQM and above MAX_FLOOR_AREA_SQM, which catches parsing errors where prices or other large numbers are mistakenly extracted as floor area from free-text descriptions or DOM text. """ if sqm is None: return None if sqm < MIN_FLOOR_AREA_SQM or sqm > MAX_FLOOR_AREA_SQM: return None return sqm def parse_int_value(value) -> int | None: """Parse an integer-like API value without truncating decimals.""" if value is None or isinstance(value, bool): return None if isinstance(value, int): return value if isinstance(value, float): if not math.isfinite(value) or not value.is_integer(): return None return int(value) if isinstance(value, str): cleaned = value.strip().replace(",", "").replace("£", "") if not re.fullmatch(r"\d+", cleaned): return None return int(cleaned) return None def parse_display_size(display_size: str | None) -> float | None: """Parse displaySize like '499 sq. ft.' or '4,124 sq. ft.' to sqm.""" if not display_size: return None # Try sq. ft. first m = re.search( r"([\d,]+(?:\.\d+)?)\s*(?:sq\.?\s*ft|square\s+feet|ft(?:\^?2|²))", display_size, re.IGNORECASE, ) if m: sqft = float(m.group(1).replace(",", "")) return validate_floor_area(round(sqft * 0.092903, 1)) # Try sq. m. m = re.search( r"([\d,]+(?:\.\d+)?)\s*(?:sq\.?\s*m|square\s+met(?:er|re)s?|m(?:\^?2|²))", display_size, re.IGNORECASE, ) if m: return validate_floor_area(round(float(m.group(1).replace(",", "")), 1)) return None def normalize_sub_type(sub_type: str | None) -> str: """Normalize property sub-type for consistent storage. Fixes delimiter inconsistencies (underscores/hyphens → spaces) from legacy listing data and truncates Zoopla description fragments that were accidentally captured as sub-types. """ if not sub_type: return "Unknown" cleaned = sub_type.replace("_", " ").strip() # Description fragments captured as sub-types are much longer than any # real property type name (longest canonical is ~25 chars) if len(cleaned) > 40: return "Unknown" # Collapse multiple spaces cleaned = re.sub(r"\s+", " ", cleaned) return cleaned.title() def map_property_type(sub_type: str | None) -> str: """Map propertySubType to canonical type.""" if not sub_type: return "Other" canonical = PROPERTY_TYPE_MAP.get(sub_type) if canonical: return canonical # Try title-case variant (e.g., "country house" → "Country House") canonical = PROPERTY_TYPE_MAP.get(sub_type.title()) if canonical: return canonical # Try lowercase variant (e.g., "Townhouse" → "townhouse") canonical = PROPERTY_TYPE_MAP.get(sub_type.lower()) if canonical: return canonical # Normalize delimiters (underscores/hyphens → spaces) and try again normalized = re.sub(r"[-_]+", " ", sub_type).strip().title() canonical = PROPERTY_TYPE_MAP.get(normalized) if canonical: return canonical # Keyword fallback for compound types not in the map lower = sub_type.lower() excluded_flat_like = ( "block of apartment", "house of multiple occupation", "private halls", "retirement", "serviced apartment", ) if any(term in lower for term in excluded_flat_like): return "Other" if ( "flat" in lower or "apartment" in lower or "maisonette" in lower or "studio" in lower ): return "Flats/Maisonettes" if "semi" in lower and "detach" in lower: return "Semi-Detached" if "detach" in lower: return "Detached" if "terrace" in lower or "mews" in lower: return "Terraced" if "house" in lower or "cottage" in lower: return "Detached" log.warning("Unknown propertySubType: %r — mapping to Other", sub_type) return "Other" def extract_tenure(tenure_obj: dict | None) -> str | None: """Extract tenure string from tenure object.""" if not tenure_obj: return None tt = tenure_obj.get("tenureType", "") if tt == "FREEHOLD": return "Freehold" if tt == "LEASEHOLD": return "Leasehold" return None def fix_coords(lat: float, lng: float) -> tuple[float, float]: """Swap lat/lng if they look reversed. England: lat ~49–56, lng ~-7–2.""" if 49 <= lat <= 56 and -7 <= lng <= 2: return lat, lng if 49 <= lng <= 56 and -7 <= lat <= 2: log.debug( "Swapping reversed coords: lat=%.4f lng=%.4f → lat=%.4f lng=%.4f", lat, lng, lng, lat, ) return lng, lat log.warning( "Coords outside England bounds even after swap attempt: lat=%.4f lng=%.4f", lat, lng, ) return lat, lng def normalize_postcode(postcode: str) -> str: """Ensure UK postcode has exactly one space before the 3-char incode. E.g., 'SW1A1AA' → 'SW1A 1AA', 'N4 2HA' → 'N4 2HA', 'E1 4AB' unchanged.""" # Strip all whitespace then re-insert the single canonical space compact = re.sub(r"\s+", "", postcode).upper() if len(compact) < 5: return compact return compact[:-3] + " " + compact[-3:] def transform_property( prop: dict, outcode: str, pc_index: PostcodeSpatialIndex ) -> dict | None: """Transform a raw Rightmove property dict into our output schema.""" loc = prop.get("location") if not loc: return None raw_lat = loc.get("latitude") raw_lng = loc.get("longitude") if raw_lat is None or raw_lng is None: return None lat, lng = fix_coords(raw_lat, raw_lng) price_obj = prop.get("price", {}) amount = parse_int_value(price_obj.get("amount")) price = amount or 0 display_prices = price_obj.get("displayPrices", []) price_qualifier = ( display_prices[0].get("displayPriceQualifier", "") if display_prices else "" ) sub_type = prop.get("propertySubType", "") raw_beds = parse_int_value(prop.get("bedrooms")) or 0 raw_baths = parse_int_value(prop.get("bathrooms")) or 0 bedrooms = raw_beds if 0 <= raw_beds <= MAX_BEDROOMS else 0 bathrooms = raw_baths if 0 <= raw_baths <= MAX_BEDROOMS else 0 if raw_beds > MAX_BEDROOMS or raw_baths > MAX_BEDROOMS: log.warning( "Rightmove %s: implausible beds=%d baths=%d (capped to 0)", prop.get("id", "?"), raw_beds, raw_baths, ) key_features = [ kf.get("description", "") for kf in prop.get("keyFeatures", []) if kf.get("description") ] postcode = pc_index.nearest(lat, lng) if not postcode: log.debug("No England postcode for property at %.4f, %.4f — skipping", lat, lng) return None property_url = prop.get("propertyUrl") or "" if not isinstance(property_url, str): property_url = "" listing_id = prop.get("id") or property_url if not listing_id: return None return { "id": listing_id, "Bedrooms": bedrooms, "Bathrooms": bathrooms, "Number of bedrooms & living rooms": bedrooms + bathrooms, "lon": lng, "lat": lat, "Postcode": postcode, "Address per Property Register": prop.get("displayAddress", ""), "Leasehold/Freehold": extract_tenure(prop.get("tenure")), "Property type": map_property_type(sub_type), "Property sub-type": normalize_sub_type(sub_type), "price": price, "price_frequency": "", "Price qualifier": price_qualifier, "Total floor area (sqm)": parse_display_size(prop.get("displaySize")), "Listing URL": RIGHTMOVE_BASE + property_url if property_url else "", "Listing features": key_features, "first_visible_date": prop.get("firstVisibleDate", ""), }