96 lines
3.5 KiB
Python
96 lines
3.5 KiB
Python
import logging
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import polars as pl
|
|
|
|
from transform import normalize_price
|
|
|
|
log = logging.getLogger("rightmove")
|
|
|
|
|
|
def write_parquet(properties: list[dict], path: Path, channel: str) -> None:
|
|
"""Write properties list to parquet with server-ready column names.
|
|
|
|
channel: "buy" or "rent"
|
|
"""
|
|
if not properties:
|
|
log.warning("No properties to write to %s", path)
|
|
return
|
|
|
|
# Parse first_visible_date to datetime
|
|
listing_dates = []
|
|
for p in properties:
|
|
fvd = p.get("first_visible_date", "")
|
|
if fvd:
|
|
try:
|
|
dt = datetime.fromisoformat(fvd.replace("Z", "+00:00"))
|
|
listing_dates.append(dt.replace(tzinfo=None))
|
|
except (ValueError, TypeError):
|
|
listing_dates.append(None)
|
|
else:
|
|
listing_dates.append(None)
|
|
|
|
# Derive asking price / asking rent based on channel
|
|
if channel == "buy":
|
|
asking_prices = [p["price"] for p in properties]
|
|
asking_rents = [None] * len(properties)
|
|
listing_statuses = ["For sale"] * len(properties)
|
|
else:
|
|
asking_prices = [None] * len(properties)
|
|
asking_rents = [
|
|
normalize_price(p["price"], p["price_frequency"]) for p in properties
|
|
]
|
|
listing_statuses = ["For rent"] * len(properties)
|
|
|
|
df = pl.DataFrame(
|
|
{
|
|
"Bedrooms": [p["Bedrooms"] for p in properties],
|
|
"Bathrooms": [p["Bathrooms"] for p in properties],
|
|
"Number of bedrooms & living rooms": [
|
|
p["Number of bedrooms & living rooms"] for p in properties
|
|
],
|
|
"lon": [p["lon"] for p in properties],
|
|
"lat": [p["lat"] for p in properties],
|
|
"Postcode": [p["Postcode"] for p in properties],
|
|
"Address per Property Register": [
|
|
p["Address per Property Register"] for p in properties
|
|
],
|
|
"Leashold/Freehold": [p["Leashold/Freehold"] for p in properties],
|
|
"Property type": [p["Property type"] for p in properties],
|
|
"Property sub-type": [p["Property sub-type"] for p in properties],
|
|
"Price qualifier": [p["Price qualifier"] for p in properties],
|
|
"Total floor area (sqm)": [
|
|
p["Total floor area (sqm)"] for p in properties
|
|
],
|
|
"Listing URL": [p["Listing URL"] for p in properties],
|
|
"Listing features": [p["Listing features"] for p in properties],
|
|
"Listing date": listing_dates,
|
|
"Listing status": listing_statuses,
|
|
"Asking price": asking_prices,
|
|
"Asking rent (monthly)": asking_rents,
|
|
},
|
|
schema={
|
|
"Bedrooms": pl.Int32,
|
|
"Bathrooms": pl.Int32,
|
|
"Number of bedrooms & living rooms": pl.Int32,
|
|
"lon": pl.Float64,
|
|
"lat": pl.Float64,
|
|
"Postcode": pl.Utf8,
|
|
"Address per Property Register": pl.Utf8,
|
|
"Leashold/Freehold": pl.Utf8,
|
|
"Property type": pl.Utf8,
|
|
"Property sub-type": pl.Utf8,
|
|
"Price qualifier": pl.Utf8,
|
|
"Total floor area (sqm)": pl.Float64,
|
|
"Listing URL": pl.Utf8,
|
|
"Listing features": pl.List(pl.Utf8),
|
|
"Listing date": pl.Datetime("us"),
|
|
"Listing status": pl.Utf8,
|
|
"Asking price": pl.Int64,
|
|
"Asking rent (monthly)": pl.Int64,
|
|
},
|
|
)
|
|
|
|
df.write_parquet(path)
|
|
log.info("Wrote %d properties to %s", len(df), path)
|