import logging from datetime import datetime from pathlib import Path import polars as pl from transform import normalize_price log = logging.getLogger("rightmove") def write_parquet(properties: list[dict], path: Path, channel: str) -> None: """Write properties list to parquet with server-ready column names. channel: "buy" or "rent" """ if not properties: log.warning("No properties to write to %s", path) return # Parse first_visible_date to datetime listing_dates = [] for p in properties: fvd = p.get("first_visible_date", "") if fvd: try: dt = datetime.fromisoformat(fvd.replace("Z", "+00:00")) listing_dates.append(dt.replace(tzinfo=None)) except (ValueError, TypeError): listing_dates.append(None) else: listing_dates.append(None) # Derive asking price / asking rent based on channel if channel == "buy": asking_prices = [p["price"] for p in properties] asking_rents = [None] * len(properties) listing_statuses = ["For sale"] * len(properties) else: asking_prices = [None] * len(properties) asking_rents = [ normalize_price(p["price"], p["price_frequency"]) for p in properties ] listing_statuses = ["For rent"] * len(properties) df = pl.DataFrame( { "Bedrooms": [p["Bedrooms"] for p in properties], "Bathrooms": [p["Bathrooms"] for p in properties], "Number of bedrooms & living rooms": [ p["Number of bedrooms & living rooms"] for p in properties ], "lon": [p["lon"] for p in properties], "lat": [p["lat"] for p in properties], "Postcode": [p["Postcode"] for p in properties], "Address per Property Register": [ p["Address per Property Register"] for p in properties ], "Leasehold/Freehold": [p["Leasehold/Freehold"] for p in properties], "Property type": [p["Property type"] for p in properties], "Property sub-type": [p["Property sub-type"] for p in properties], "Price qualifier": [p["Price qualifier"] for p in properties], "Total floor area (sqm)": [p["Total floor area (sqm)"] for p in properties], "Listing URL": [p["Listing URL"] for p in properties], "Listing features": [p["Listing features"] for p in properties], "Listing date": listing_dates, "Listing status": listing_statuses, "Asking price": asking_prices, "Asking rent (monthly)": asking_rents, }, schema={ "Bedrooms": pl.Int32, "Bathrooms": pl.Int32, "Number of bedrooms & living rooms": pl.Int32, "lon": pl.Float64, "lat": pl.Float64, "Postcode": pl.Utf8, "Address per Property Register": pl.Utf8, "Leasehold/Freehold": pl.Utf8, "Property type": pl.Utf8, "Property sub-type": pl.Utf8, "Price qualifier": pl.Utf8, "Total floor area (sqm)": pl.Float64, "Listing URL": pl.Utf8, "Listing features": pl.List(pl.Utf8), "Listing date": pl.Datetime("us"), "Listing status": pl.Utf8, "Asking price": pl.Int64, "Asking rent (monthly)": pl.Int64, }, ) # Derive asking price per sqm for buy listings if channel == "buy": df = df.with_columns( (pl.col("Asking price") / pl.col("Total floor area (sqm)")) .round(0) .cast(pl.Int32, strict=False) .alias("Asking price per sqm"), ) df.write_parquet(path) log.info("Wrote %d properties to %s", len(df), path)