This commit is contained in:
Andras Schmelczer 2026-03-15 17:38:26 +00:00
parent 80c093b7ba
commit f72c43a9fa
101 changed files with 2168 additions and 1177 deletions

View file

@ -7,7 +7,7 @@ use std::path::Path;
use rustc_hash::FxHashMap;
use crate::consts::{H3_PRECOMPUTE_MAX, HISTOGRAM_BINS};
use crate::consts::{H3_PRECOMPUTE_MAX, HISTOGRAM_BINS, NAN_U16, QUANT_SCALE};
use crate::features::{self, Bounds};
fn is_numeric_dtype(dtype: &DataType) -> bool {
@ -47,6 +47,38 @@ pub struct Histogram {
pub counts: Vec<u64>,
}
impl Histogram {
/// Return the bin index for a given value using the outlier-bracket layout.
#[cfg(test)]
pub fn bin_for_value(&self, value: f32) -> usize {
let num_bins = self.counts.len();
if value < self.p1 {
0
} else if value >= self.p99 {
num_bins - 1
} else {
let middle_bins = num_bins.saturating_sub(2);
if middle_bins > 0 && self.p99 > self.p1 {
let width = (self.p99 - self.p1) / middle_bins as f32;
let middle_bin = ((value - self.p1) / width) as usize;
(1 + middle_bin).min(num_bins - 2)
} else {
num_bins / 2
}
}
}
/// Width of a single middle bin (bins 1..n-2).
#[cfg(test)]
pub fn middle_bin_width(&self) -> f32 {
let middle_bins = self.counts.len().saturating_sub(2);
if middle_bins > 0 && self.p99 > self.p1 {
(self.p99 - self.p1) / middle_bins as f32
} else {
0.0
}
}
}
pub struct FeatureStats {
pub slider_min: f32,
@ -60,14 +92,67 @@ pub struct RenovationEvent {
pub event: String,
}
/// Lightweight reference to quantization parameters for decoding u16 feature data.
pub struct QuantRef<'a> {
pub dequant_a: &'a [f32],
pub quant_min: &'a [f32],
pub quant_range: &'a [f32],
pub num_numeric: usize,
}
impl QuantRef<'_> {
/// Decode a raw u16 value back to f32.
#[inline]
pub fn decode(&self, feat_idx: usize, raw: u16) -> f32 {
if raw == NAN_U16 {
return f32::NAN;
}
if feat_idx >= self.num_numeric {
raw as f32
} else {
raw as f32 * self.dequant_a[feat_idx] + self.quant_min[feat_idx]
}
}
/// Encode a filter minimum bound to u16 (floors to include boundary values).
#[inline]
pub fn encode_min(&self, feat_idx: usize, value: f32) -> u16 {
if !value.is_finite() || self.quant_range[feat_idx] == 0.0 {
return 0;
}
let norm = (value - self.quant_min[feat_idx]) / self.quant_range[feat_idx];
(norm * QUANT_SCALE).floor().clamp(0.0, QUANT_SCALE) as u16
}
/// Encode a filter maximum bound to u16 (ceils to include boundary values).
#[inline]
pub fn encode_max(&self, feat_idx: usize, value: f32) -> u16 {
if !value.is_finite() || self.quant_range[feat_idx] == 0.0 {
return QUANT_SCALE as u16;
}
let norm = (value - self.quant_min[feat_idx]) / self.quant_range[feat_idx];
(norm * QUANT_SCALE).ceil().clamp(0.0, QUANT_SCALE) as u16
}
}
pub struct PropertyData {
pub lat: Vec<f32>,
pub lon: Vec<f32>,
pub feature_names: Vec<String>,
pub num_features: usize,
/// Number of numeric features (enum features start at this index).
pub num_numeric: usize,
/// Row-major flat array: feature_data[row * num_features + feat_idx].
/// NaN = null. For enum features, stores the index as f32 (0.0, 1.0, etc).
pub feature_data: Vec<f32>,
/// Quantized to u16. NaN sentinel = u16::MAX (65535).
/// Numeric features: encoded via (val - min) / range * 65534.
/// Enum features: stored directly as u16 cast of the f32 index.
pub feature_data: Vec<u16>,
/// Per-feature: range / QUANT_SCALE for fast decode.
dequant_a: Vec<f32>,
/// Per-feature: minimum value (offset for dequantization).
quant_min: Vec<f32>,
/// Per-feature: max - min (for encoding filter bounds).
quant_range: Vec<f32>,
pub feature_stats: Vec<FeatureStats>,
/// Contiguous buffer holding all address strings end-to-end.
address_buffer: String,
@ -79,7 +164,7 @@ pub struct PropertyData {
postcode_interner: lasso::RodeoReader,
postcode_keys: Vec<lasso::Spur>,
/// For enum features: maps feature index to list of possible string values.
/// Index in values list corresponds to the f32 value stored in feature_data.
/// Index in values list corresponds to the u16 value stored in feature_data.
pub enum_values: rustc_hash::FxHashMap<usize, Vec<String>>,
/// Per-row flag: true = construction date is approximate (from EPC band),
/// false = exact (from new-build transaction date).
@ -91,10 +176,11 @@ pub struct PropertyData {
/// Per-row listing features (key feature bullet points from online listings).
/// Only rows with features are present in the map.
listing_features: FxHashMap<u32, Vec<String>>,
/// Per-row optional string columns from online listings.
listing_url: Vec<Option<String>>,
property_sub_type: Vec<Option<String>>,
price_qualifier: Vec<Option<String>>,
/// Sparse per-row optional string columns from online listings.
/// Only rows with non-empty values are stored (saves ~1 GB vs Vec<Option<String>>).
listing_url: FxHashMap<u32, String>,
property_sub_type: FxHashMap<u32, String>,
price_qualifier: FxHashMap<u32, String>,
}
impl PropertyData {
@ -139,17 +225,43 @@ impl PropertyData {
/// Get listing URL for a given row.
pub fn listing_url(&self, row: usize) -> Option<&str> {
self.listing_url[row].as_deref()
self.listing_url.get(&(row as u32)).map(String::as_str)
}
/// Get property sub-type for a given row.
pub fn property_sub_type(&self, row: usize) -> Option<&str> {
self.property_sub_type[row].as_deref()
self.property_sub_type
.get(&(row as u32))
.map(String::as_str)
}
/// Get price qualifier for a given row.
pub fn price_qualifier(&self, row: usize) -> Option<&str> {
self.price_qualifier[row].as_deref()
self.price_qualifier.get(&(row as u32)).map(String::as_str)
}
/// Decode a single feature value from quantized u16 storage.
#[inline]
pub fn get_feature(&self, row: usize, feat_idx: usize) -> f32 {
let raw = self.feature_data[row * self.num_features + feat_idx];
if raw == NAN_U16 {
return f32::NAN;
}
if feat_idx >= self.num_numeric {
raw as f32
} else {
raw as f32 * self.dequant_a[feat_idx] + self.quant_min[feat_idx]
}
}
/// Get a QuantRef for passing to aggregation/filter functions.
pub fn quant_ref(&self) -> QuantRef<'_> {
QuantRef {
dequant_a: &self.dequant_a,
quant_min: &self.quant_min,
quant_range: &self.quant_range,
num_numeric: self.num_numeric,
}
}
}
@ -355,13 +467,12 @@ pub fn precompute_h3(lat: &[f32], lon: &[f32]) -> anyhow::Result<Vec<u64>> {
.zip(lon.par_iter())
.enumerate()
.map(|(i, (&latitude, &longitude))| {
let coord = h3o::LatLng::new(latitude as f64, longitude as f64)
.unwrap_or_else(|err| {
panic!(
"Invalid coordinates at row {}: lat={}, lon={}: {}",
i, latitude, longitude, err
)
});
let coord = h3o::LatLng::new(latitude as f64, longitude as f64).unwrap_or_else(|err| {
panic!(
"Invalid coordinates at row {}: lat={}, lon={}: {}",
i, latitude, longitude, err
)
});
u64::from(coord.to_cell(h3_res))
})
.collect();
@ -378,7 +489,10 @@ impl PropertyData {
listings_rent_path: &Path,
) -> anyhow::Result<Self> {
// Load postcode.parquet
tracing::info!("Loading postcode features from {:?}", postcode_features_path);
tracing::info!(
"Loading postcode features from {:?}",
postcode_features_path
);
let postcode_df = LazyFrame::scan_parquet(postcode_features_path, Default::default())
.context("Failed to scan postcode parquet")?
.collect()
@ -623,6 +737,16 @@ impl PropertyData {
})
.collect::<anyhow::Result<Vec<_>>>()?;
// Compute quantization parameters from feature stats (numeric features)
let mut quant_min = Vec::with_capacity(num_features);
let mut quant_range = Vec::with_capacity(num_features);
for stats in &numeric_feature_stats {
let min = stats.histogram.min;
let max = stats.histogram.max;
quant_min.push(min);
quant_range.push(if max > min { max - min } else { 0.0 });
}
tracing::info!("Extracting string columns");
let extract_string_col = |df: &DataFrame, name: &str| -> anyhow::Result<Vec<String>> {
let column = df
@ -928,19 +1052,34 @@ impl PropertyData {
map
};
// Permute optional string columns
let listing_url: Vec<Option<String>> = perm
.iter()
.map(|&old_row| listing_url_raw[old_row as usize].clone())
.collect();
let property_sub_type: Vec<Option<String>> = perm
.iter()
.map(|&old_row| property_sub_type_raw[old_row as usize].clone())
.collect();
let price_qualifier: Vec<Option<String>> = perm
.iter()
.map(|&old_row| price_qualifier_raw[old_row as usize].clone())
.collect();
// Permute optional string columns into sparse HashMaps
let listing_url: FxHashMap<u32, String> = {
let mut map = FxHashMap::default();
for (new_row, &old_row) in perm.iter().enumerate() {
if let Some(ref s) = listing_url_raw[old_row as usize] {
map.insert(new_row as u32, s.clone());
}
}
map
};
let property_sub_type: FxHashMap<u32, String> = {
let mut map = FxHashMap::default();
for (new_row, &old_row) in perm.iter().enumerate() {
if let Some(ref s) = property_sub_type_raw[old_row as usize] {
map.insert(new_row as u32, s.clone());
}
}
map
};
let price_qualifier: FxHashMap<u32, String> = {
let mut map = FxHashMap::default();
for (new_row, &old_row) in perm.iter().enumerate() {
if let Some(ref s) = price_qualifier_raw[old_row as usize] {
map.insert(new_row as u32, s.clone());
}
}
map
};
// Build enum_values map: feature_index -> list of string values
let mut enum_values: rustc_hash::FxHashMap<usize, Vec<String>> =
@ -967,24 +1106,47 @@ impl PropertyData {
counts: vec![0; num_values.max(1)],
},
});
// Enum features: not quantized, stored directly as u16
quant_min.push(0.0);
quant_range.push(0.0);
}
let dequant_a: Vec<f32> = quant_range
.iter()
.map(|&r| if r > 0.0 { r / QUANT_SCALE } else { 0.0 })
.collect();
// Transpose to row-major AND apply spatial permutation in one pass.
// Combines numeric and enum features into a single feature_data array.
tracing::info!("Transposing to row-major layout (spatially sorted)");
let mut feature_data = vec![f32::NAN; row_count * num_features];
// Combines numeric and enum features into a single feature_data array, quantized to u16.
tracing::info!("Transposing to row-major layout (spatially sorted, quantized to u16)");
let mut feature_data = vec![NAN_U16; row_count * num_features];
feature_data
.par_chunks_mut(num_features)
.enumerate()
.for_each(|(new_row, row_slice)| {
let old_index = perm[new_row] as usize;
// Numeric features
// Numeric features: quantize to u16
for (feat_idx, col_vec) in numeric_col_major.iter().enumerate() {
row_slice[feat_idx] = col_vec[old_index];
let value = col_vec[old_index];
row_slice[feat_idx] = if value.is_finite() {
let range = quant_range[feat_idx];
if range > 0.0 {
let normalized = (value - quant_min[feat_idx]) / range;
(normalized * QUANT_SCALE).round().clamp(0.0, QUANT_SCALE) as u16
} else {
0
}
} else {
NAN_U16
};
}
// Enum features (stored as f32 indices)
// Enum features: store as u16 directly
for (enum_idx, (_, encoded)) in enum_col_major.iter().enumerate() {
row_slice[num_numeric + enum_idx] = encoded[old_index];
let value = encoded[old_index];
row_slice[num_numeric + enum_idx] = if value.is_finite() {
value as u16
} else {
NAN_U16
};
}
});
@ -995,7 +1157,11 @@ impl PropertyData {
lon,
feature_names,
num_features,
num_numeric,
feature_data,
dequant_a,
quant_min,
quant_range,
feature_stats,
address_buffer,
address_offsets,