This commit is contained in:
Andras Schmelczer 2026-02-18 21:22:15 +00:00
parent 524580eb25
commit ffe080adef
82 changed files with 2652 additions and 2956 deletions

View file

@ -8,7 +8,7 @@ use std::path::Path;
use rustc_hash::FxHashMap;
use crate::consts::{H3_PRECOMPUTE_MAX, HISTOGRAM_BINS};
use crate::features::{self, Bounds, IGNORED_COLUMNS};
use crate::features::{self, Bounds};
fn is_numeric_dtype(dtype: &DataType) -> bool {
matches!(
@ -122,6 +122,13 @@ pub struct PropertyData {
/// Per-row renovation events. Keyed by (permuted) row index.
/// Only rows with events are present in the map.
renovation_history: FxHashMap<u32, Vec<RenovationEvent>>,
/// Per-row listing features (key feature bullet points from online listings).
/// Only rows with features are present in the map.
listing_features: FxHashMap<u32, Vec<String>>,
/// Per-row optional string columns from online listings.
listing_url: Vec<Option<String>>,
property_sub_type: Vec<Option<String>>,
price_qualifier: Vec<Option<String>>,
}
impl PropertyData {
@ -155,6 +162,29 @@ impl PropertyData {
.map(|v| v.as_slice())
.unwrap_or(&[])
}
/// Get listing features for a given row (empty slice if none).
pub fn listing_features(&self, row: usize) -> &[String] {
self.listing_features
.get(&(row as u32))
.map(|v| v.as_slice())
.unwrap_or(&[])
}
/// Get listing URL for a given row.
pub fn listing_url(&self, row: usize) -> Option<&str> {
self.listing_url[row].as_deref()
}
/// Get property sub-type for a given row.
pub fn property_sub_type(&self, row: usize) -> Option<&str> {
self.property_sub_type[row].as_deref()
}
/// Get price qualifier for a given row.
pub fn price_qualifier(&self, row: usize) -> Option<&str> {
self.price_qualifier[row].as_deref()
}
}
/// Compute a percentile from a uniformly-binned histogram.
@ -375,73 +405,226 @@ pub fn precompute_h3(lat: &[f32], lon: &[f32]) -> anyhow::Result<Vec<u64>> {
}
impl PropertyData {
pub fn load(parquet_path: &Path) -> anyhow::Result<Self> {
tracing::info!("Loading parquet from {:?}", parquet_path);
pub fn load(
properties_path: &Path,
postcode_features_path: &Path,
listings_buy_path: &Path,
listings_rent_path: &Path,
) -> anyhow::Result<Self> {
// Load postcode.parquet
tracing::info!("Loading postcode features from {:?}", postcode_features_path);
let postcode_df = LazyFrame::scan_parquet(postcode_features_path, Default::default())
.context("Failed to scan postcode parquet")?
.collect()
.context("Failed to read postcode parquet")?;
tracing::info!(rows = postcode_df.height(), "Postcode features loaded");
let mut lf = LazyFrame::scan_parquet(parquet_path, Default::default())
.context("Failed to scan parquet")?;
let schema = lf.collect_schema().context("Failed to read schema")?;
// Load properties.parquet and join with postcode data for lat/lon + area features
tracing::info!("Loading properties from {:?}", properties_path);
let properties_lf = LazyFrame::scan_parquet(properties_path, Default::default())
.context("Failed to scan properties parquet")?
.with_columns([lit("Historical sale").alias("Listing status")]);
let properties_joined = properties_lf
.join(
postcode_df.clone().lazy(),
[col("Postcode")],
[col("Postcode")],
JoinArgs::new(JoinType::Left),
)
.collect()
.context("Failed to join properties with postcodes")?;
let prop_count = properties_joined.height();
tracing::info!(rows = prop_count, "Properties joined with postcodes");
// Load online listings (buy + rent) — these have their own lat/lon.
// Normalize column names from finder output to server-expected names.
// strict=false: columns already using the new name are silently skipped.
let load_listings = |path: &Path, label: &str| -> anyhow::Result<DataFrame> {
tracing::info!("Loading {} listings from {:?}", label, path);
let mut lf = LazyFrame::scan_parquet(path, Default::default())
.with_context(|| format!("Failed to scan {label} listings parquet"))?;
let schema = lf
.collect_schema()
.with_context(|| format!("Failed to read {label} listings schema"))?;
// Rename raw finder columns → server-expected names (no-op if already renamed)
let lf = lf.rename(
[
"postcode",
"address",
"latitude",
"longitude",
"bedrooms",
"bathrooms",
"total_rooms",
"tenure",
"property_type",
"property_sub_type",
"price_qualifier",
"floorspace_sqm",
"url",
"features",
],
[
"Postcode",
"Address per Property Register",
"lat",
"lon",
"Bedrooms",
"Bathrooms",
"Number of bedrooms & living rooms",
"Leashold/Freehold",
"Property type",
"Property sub-type",
"Price qualifier",
"Total floor area (sqm)",
"Listing URL",
"Listing features",
],
false,
);
// Derive missing columns for raw finder output that doesn't have them
let listing_status = if label == "buy" {
"For sale"
} else {
"For rent"
};
let lf = if schema.get("Listing status").is_none() {
lf.with_column(lit(listing_status).alias("Listing status"))
} else {
lf
};
let lf = if schema.get("Asking price").is_none() && schema.get("price").is_some() {
if label == "buy" {
lf.with_column(col("price").alias("Asking price"))
} else {
// Normalize rent to monthly: weekly×52/12, yearly÷12
lf.with_column(
when(col("price_frequency").eq(lit("weekly")))
.then(col("price").cast(DataType::Float64) * lit(52.0 / 12.0))
.when(col("price_frequency").eq(lit("yearly")))
.then(col("price").cast(DataType::Float64) / lit(12.0))
.otherwise(col("price").cast(DataType::Float64))
.cast(DataType::Int64)
.alias("Asking rent (monthly)"),
)
}
} else {
lf
};
// Join with postcodes for area features (listings have their own lat/lon)
let pc_no_coords = postcode_df.clone().lazy().drop(["lat", "lon"]);
let joined = lf
.join(
pc_no_coords,
[col("Postcode")],
[col("Postcode")],
JoinArgs::new(JoinType::Left),
)
.collect()
.with_context(|| format!("Failed to join {label} listings with postcodes"))?;
tracing::info!(rows = joined.height(), "{} listings joined", label);
Ok(joined)
};
let listings_buy = load_listings(listings_buy_path, "buy")?;
let listings_rent = load_listings(listings_rent_path, "rent")?;
// Concatenate all rows into a single DataFrame
tracing::info!("Concatenating all data sources");
let buy_count = listings_buy.height();
let rent_count = listings_rent.height();
let mut combined = concat(
[
properties_joined.lazy(),
listings_buy.lazy(),
listings_rent.lazy(),
],
UnionArgs {
parallel: false,
rechunk: true,
to_supertypes: true,
diagonal: true,
..Default::default()
},
)
.context("Failed to concat data sources")?
.collect()
.context("Failed to collect combined data")?;
let total_rows = combined.height();
tracing::info!(
properties = prop_count,
buy_listings = buy_count,
rent_listings = rent_count,
total = total_rows,
"All data sources combined"
);
// Get configured feature/enum names in config order
let numeric_names = features::all_numeric_feature_names();
let enum_names = features::all_enum_feature_names();
// Validate: every configured numeric feature must exist in parquet as numeric
// Fill in NaN/empty placeholder columns for features that don't exist in all
// sources (e.g. Listing date only comes from listings, Estimated current price
// only from properties). Without this, diagonal concat leaves them absent.
{
let schema = combined.schema();
let mut fill_exprs: Vec<Expr> = Vec::new();
for &name in &numeric_names {
if schema.get(name).is_none() {
tracing::info!(feature = %name, "Adding NaN placeholder for missing numeric feature");
fill_exprs.push(lit(f32::NAN).alias(name));
}
}
for &name in &enum_names {
if schema.get(name).is_none() {
tracing::info!(feature = %name, "Adding empty placeholder for missing enum feature");
fill_exprs.push(lit("").alias(name));
}
}
if !fill_exprs.is_empty() {
combined = combined
.lazy()
.with_columns(fill_exprs)
.collect()
.context("Failed to add placeholder columns for missing features")?;
}
}
let schema = combined.schema();
// Validate: every configured feature exists in combined schema
for name in &numeric_names {
match schema.get(name) {
Some(dtype) if is_numeric_dtype(dtype) => {}
Some(dtype) => bail!(
"Configured numeric feature '{}' has non-numeric type {:?} in parquet",
"Configured numeric feature '{}' has non-numeric type {:?}",
name,
dtype
),
None => bail!(
"Configured numeric feature '{}' not found in parquet schema",
"Configured numeric feature '{}' not found in combined schema",
name
),
}
}
// Validate: every configured enum feature must exist in parquet as string
for name in &enum_names {
match schema.get(name) {
Some(dtype) if matches!(dtype, DataType::String) || dtype.is_categorical() => {}
Some(dtype) => bail!(
"Configured enum feature '{}' has unexpected type {:?} in parquet",
"Configured enum feature '{}' has unexpected type {:?}",
name,
dtype
),
None => bail!(
"Configured enum feature '{}' not found in parquet schema",
"Configured enum feature '{}' not found in combined schema",
name
),
}
}
// Validate: every parquet column must be accounted for
let all_known: std::collections::HashSet<&str> = numeric_names
.iter()
.chain(enum_names.iter())
.copied()
.chain(IGNORED_COLUMNS.iter().copied())
.collect();
for (col_name, dtype) in schema.iter() {
let name = col_name.as_str();
if all_known.contains(name) {
continue;
}
// Skip non-simple types (List, Struct, etc.)
if matches!(dtype, DataType::List(_) | DataType::Struct(_)) {
tracing::debug!(column = %name, dtype = ?dtype, "Skipping complex-type column");
continue;
}
bail!(
"Unknown column '{}' (type {:?}) in parquet — add it to features.rs config or IGNORED_COLUMNS",
name, dtype
);
}
// Combine numeric and enum feature names (numeric first, then enum)
let feature_names: Vec<String> = numeric_names
.iter()
@ -457,7 +640,7 @@ impl PropertyData {
"Feature columns from config"
);
// Build select expressions
// Build select expressions for the combined DataFrame
let mut select_exprs: Vec<polars::prelude::Expr> = vec![];
select_exprs.push(col("lat").cast(DataType::Float32));
select_exprs.push(col("lon").cast(DataType::Float32));
@ -465,7 +648,6 @@ impl PropertyData {
// Select numeric features as Float32 (datetime columns → fractional year)
for &name in &numeric_names {
if is_datetime_dtype(schema.get(name).unwrap()) {
// Convert datetime to fractional year: year + (month - 1) / 12
select_exprs.push(
(col(name).dt().year().cast(DataType::Float32)
+ (col(name).dt().month().cast(DataType::Float32) - lit(1.0f32))
@ -477,42 +659,47 @@ impl PropertyData {
}
}
// String columns for address/postcode
// String columns for address/postcode and online listing metadata
for &string_col_name in &[
"Address per Property Register",
"Address per EPC",
"Postcode",
"Listing URL",
"Property sub-type",
"Price qualifier",
] {
if schema.get(string_col_name).is_some() {
select_exprs.push(col(string_col_name).cast(DataType::String));
}
}
// Enum features as String (will be encoded to f32 indices later)
// Enum features as String
for &name in &enum_names {
select_exprs.push(col(name).cast(DataType::String));
}
// Optional boolean column for construction date approximation
// Optional columns
let has_approx_col = schema.get("Is construction date approximate").is_some();
if has_approx_col {
select_exprs.push(col("Is construction date approximate").cast(DataType::Float32));
}
// Optional renovation history (List<Struct{year, event}>)
let has_renovation_history = schema.get("renovation_history").is_some();
if has_renovation_history {
select_exprs.push(col("renovation_history"));
}
let has_listing_features = schema.get("Listing features").is_some();
if has_listing_features {
select_exprs.push(col("Listing features"));
}
let df = LazyFrame::scan_parquet(parquet_path, Default::default())
.context("Failed to scan parquet")?
let df = combined
.lazy()
.select(select_exprs)
.collect()
.context("Failed to read parquet")?;
.context("Failed to select columns from combined data")?;
let row_count = df.height();
tracing::info!(rows = row_count, "Parquet loaded");
tracing::info!(rows = row_count, "Combined data selected");
let lat_series = df
.column("lat")
@ -586,6 +773,35 @@ impl PropertyData {
let address_raw = extract_string_col(&df, "Address per Property Register")?;
let postcode_raw = extract_string_col(&df, "Postcode")?;
// Extract optional string columns for online listing metadata
let extract_optional_string_col =
|df: &DataFrame, name: &str| -> anyhow::Result<Vec<Option<String>>> {
if let Ok(column) = df.column(name) {
let string_column = column
.str()
.with_context(|| format!("Column '{name}' is not a string column"))?;
Ok(string_column
.into_iter()
.map(|value| {
value.and_then(|s| {
let trimmed = s.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
})
})
.collect())
} else {
Ok(vec![None; row_count])
}
};
let listing_url_raw = extract_optional_string_col(&df, "Listing URL")?;
let property_sub_type_raw = extract_optional_string_col(&df, "Property sub-type")?;
let price_qualifier_raw = extract_optional_string_col(&df, "Price qualifier")?;
tracing::info!("Building enum features");
// enum_col_major: Vec<(values_list, encoded_as_f32)>
let mut enum_col_major: Vec<(Vec<String>, Vec<f32>)> = Vec::new();
@ -689,7 +905,7 @@ impl PropertyData {
let mut history: FxHashMap<u32, Vec<RenovationEvent>> = FxHashMap::default();
for old_row in 0..row_count {
if let Some(inner) = list_ca.get_as_series(old_row) {
if inner.len() == 0 {
if inner.is_empty() {
continue;
}
let structs = inner
@ -727,6 +943,44 @@ impl PropertyData {
FxHashMap::default()
};
// Extract listing features: List<String>
let mut listing_features_raw: FxHashMap<u32, Vec<String>> = if has_listing_features {
tracing::info!("Extracting listing features");
let feat_col = df
.column("Listing features")
.context("Missing Listing features column")?;
let list_ca = feat_col
.list()
.context("Listing features is not a list column")?;
let mut features_map: FxHashMap<u32, Vec<String>> = FxHashMap::default();
for old_row in 0..row_count {
if let Some(inner) = list_ca.get_as_series(old_row) {
if inner.is_empty() {
continue;
}
let str_ca = inner
.str()
.context("Listing features inner is not a string series")?;
let items: Vec<String> = str_ca
.into_iter()
.filter_map(|v| v.map(|s| s.to_string()))
.filter(|s| !s.is_empty())
.collect();
if !items.is_empty() {
features_map.insert(old_row as u32, items);
}
}
}
tracing::info!(
properties_with_features = features_map.len(),
"Listing features extracted"
);
features_map
} else {
FxHashMap::default()
};
// Sort all rows by spatial locality so that grid queries access
// contiguous memory (sequential reads instead of random DRAM accesses).
tracing::info!("Sorting rows by spatial locality");
@ -796,6 +1050,32 @@ impl PropertyData {
map
};
// Re-key listing_features by permuted row index
let listing_features: FxHashMap<u32, Vec<String>> = {
let mut map =
FxHashMap::with_capacity_and_hasher(listing_features_raw.len(), Default::default());
for (new_row, &old_row) in perm.iter().enumerate() {
if let Some(items) = listing_features_raw.remove(&old_row) {
map.insert(new_row as u32, items);
}
}
map
};
// Permute optional string columns
let listing_url: Vec<Option<String>> = perm
.iter()
.map(|&old_row| listing_url_raw[old_row as usize].clone())
.collect();
let property_sub_type: Vec<Option<String>> = perm
.iter()
.map(|&old_row| property_sub_type_raw[old_row as usize].clone())
.collect();
let price_qualifier: Vec<Option<String>> = perm
.iter()
.map(|&old_row| price_qualifier_raw[old_row as usize].clone())
.collect();
// Build enum_values map: feature_index -> list of string values
let mut enum_values: rustc_hash::FxHashMap<usize, Vec<String>> =
rustc_hash::FxHashMap::default();
@ -857,6 +1137,10 @@ impl PropertyData {
enum_values,
approx_build_date_bits,
renovation_history,
listing_features,
listing_url,
property_sub_type,
price_qualifier,
})
}
}