Refactor the server

This commit is contained in:
Andras Schmelczer 2026-01-31 20:25:54 +00:00
parent 3b9ad11d71
commit 01ec17ff04
15 changed files with 939 additions and 1226 deletions

View file

@ -1,555 +0,0 @@
use polars::lazy::frame::LazyFrame;
use polars::prelude::*;
use rayon::prelude::*;
use serde::Serialize;
use std::path::Path;
use crate::consts::{
EXCLUDED_COLUMNS, FEATURE_PERCENTILE_HIGH, FEATURE_PERCENTILE_LOW, H3_PRECOMPUTE_MAX,
H3_PRECOMPUTE_MIN, HISTOGRAM_BINS,
};
/// Returns true if the polars DataType is numeric (integer or float)
fn is_numeric_dtype(dtype: &DataType) -> bool {
matches!(
dtype,
DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Float32
| DataType::Float64
)
}
/// Histogram for a single feature column
#[derive(Serialize, Clone)]
pub struct Histogram {
/// Left edge of first bin
pub min: f64,
/// Right edge of last bin
pub max: f64,
/// Width of each bin
pub bin_width: f64,
/// Count of values in each bin
pub counts: Vec<u64>,
}
/// Precomputed statistics for a single feature
pub struct FeatureStats {
pub p_low: f64,
pub p_high: f64,
pub histogram: Histogram,
}
/// Columnar storage for all property data.
/// Feature values use NaN as the null sentinel.
pub struct PropertyData {
pub lat: Vec<f64>,
pub lon: Vec<f64>,
/// Dynamically discovered numeric feature column names
pub feature_names: Vec<String>,
/// Number of feature columns
pub num_features: usize,
/// Row-major flat array: feature_data[row * num_features + feat_idx].
/// NaN = null. Contiguous layout for cache-friendly per-row access.
pub feature_data: Vec<f64>,
/// Precomputed stats (percentiles + histogram) for each feature
pub feature_stats: Vec<FeatureStats>,
/// String fields for property details
pub address: Vec<String>,
pub postcode: Vec<String>,
pub property_type: Vec<String>,
pub built_form: Vec<String>,
pub current_energy_rating: Vec<String>,
pub potential_energy_rating: Vec<String>,
}
/// Approximate a percentile from a histogram using linear interpolation.
/// `p` is in [0, 100]. `total` is the sum of all bin counts.
fn percentile_from_histogram(
counts: &[u64],
min: f64,
bin_width: f64,
total: usize,
p: f64,
) -> f64 {
let target = (p / 100.0) * (total as f64 - 1.0);
let mut cumulative = 0u64;
for (i, &c) in counts.iter().enumerate() {
let prev = cumulative;
cumulative += c;
if cumulative as f64 > target {
// Interpolate within this bin
let frac = if c > 0 {
(target - prev as f64) / c as f64
} else {
0.0
};
return min + (i as f64 + frac) * bin_width;
}
}
// Fallback: right edge of last bin
min + counts.len() as f64 * bin_width
}
/// Build a histogram and compute approximate percentiles in O(n) — no sort needed.
fn compute_feature_stats(vals: &[f64]) -> FeatureStats {
// Single pass: min, max, count (skipping NaN)
let mut min = f64::INFINITY;
let mut max = f64::NEG_INFINITY;
let mut count = 0usize;
for &v in vals {
if !v.is_nan() {
if v < min {
min = v;
}
if v > max {
max = v;
}
count += 1;
}
}
if count == 0 {
return FeatureStats {
p_low: 0.0,
p_high: 0.0,
histogram: Histogram {
min: 0.0,
max: 0.0,
bin_width: 1.0,
counts: vec![0; HISTOGRAM_BINS],
},
};
}
// Build histogram over full range (second pass, no sort)
let range = if max == min { 1.0 } else { max - min };
let bin_max = min + range * (1.0 + 1e-9);
let bin_width = (bin_max - min) / HISTOGRAM_BINS as f64;
let mut counts = vec![0u64; HISTOGRAM_BINS];
for &v in vals {
if !v.is_nan() {
let bin = ((v - min) / bin_width) as usize;
counts[bin.min(HISTOGRAM_BINS - 1)] += 1;
}
}
// Approximate percentiles from the histogram
let p_low = percentile_from_histogram(&counts, min, bin_width, count, FEATURE_PERCENTILE_LOW);
let p_high = percentile_from_histogram(&counts, min, bin_width, count, FEATURE_PERCENTILE_HIGH);
FeatureStats {
p_low,
p_high,
histogram: Histogram {
min,
max,
bin_width,
counts,
},
}
}
/// Convert a polars Column to Vec<f64> using NaN for null values
fn column_to_f64_vec(c: &Column) -> Vec<f64> {
let s = c.cast(&DataType::Float64).unwrap();
let ca = s.f64().unwrap();
ca.into_iter().map(|v| v.unwrap_or(f64::NAN)).collect()
}
/// Precompute H3 cell IDs for all rows at commonly used resolutions.
/// Returns a Vec indexed by resolution (0..16), where non-precomputed
/// resolutions have an empty Vec.
pub fn precompute_h3(lat: &[f64], lon: &[f64]) -> Vec<Vec<u64>> {
eprintln!(
"Precomputing H3 cells for resolutions {}..{}...",
H3_PRECOMPUTE_MIN, H3_PRECOMPUTE_MAX
);
let resolutions: Vec<u8> = (H3_PRECOMPUTE_MIN..=H3_PRECOMPUTE_MAX).collect();
let computed: Vec<(u8, Vec<u64>)> = resolutions
.into_par_iter()
.map(|res| {
let h3_res = h3o::Resolution::try_from(res).unwrap();
let cells: Vec<u64> = lat
.iter()
.zip(lon.iter())
.map(|(&la, &lo)| {
h3o::LatLng::new(la, lo)
.map(|c| u64::from(c.to_cell(h3_res)))
.unwrap_or(0)
})
.collect();
eprintln!(" Resolution {} done ({} cells)", res, cells.len());
(res, cells)
})
.collect();
let mut result: Vec<Vec<u64>> = (0..16).map(|_| Vec::new()).collect();
for (res, cells) in computed {
result[res as usize] = cells;
}
eprintln!("H3 precomputation complete.");
result
}
impl PropertyData {
pub fn load(parquet_path: &Path) -> Self {
eprintln!("Loading parquet from {:?}...", parquet_path);
// Scan schema to discover numeric feature columns
let mut lf = LazyFrame::scan_parquet(parquet_path, Default::default())
.expect("Failed to scan parquet");
let schema = lf.collect_schema().expect("Failed to read schema");
let feature_names: Vec<String> = schema
.iter()
.filter(|(name, dtype)| {
is_numeric_dtype(dtype) && !EXCLUDED_COLUMNS.contains(&name.as_str())
})
.map(|(name, _)| name.to_string())
.collect();
let num_features = feature_names.len();
eprintln!("Discovered {} numeric feature columns", num_features);
// Read only the columns we need
let mut cols_needed: Vec<String> = vec!["lat".into(), "lon".into()];
cols_needed.extend(feature_names.iter().cloned());
// Add string columns (using actual column names from parquet)
let string_cols = vec![
"pp_address",
"postcode",
"pp_property_type",
"built_form",
"current_energy_rating",
"potential_energy_rating",
];
// Build selection with proper casting
let mut select_exprs: Vec<polars::prelude::Expr> = vec![];
// lat/lon as f64
select_exprs.push(col("lat").cast(DataType::Float64));
select_exprs.push(col("lon").cast(DataType::Float64));
// numeric features as f64
for name in &feature_names {
select_exprs.push(col(name.as_str()).cast(DataType::Float64));
}
// string columns as string (check if they exist in schema)
for &s_col in &string_cols {
if schema.get(s_col).is_some() {
select_exprs.push(col(s_col).cast(DataType::String));
}
}
let df = LazyFrame::scan_parquet(parquet_path, Default::default())
.expect("Failed to scan parquet")
.select(select_exprs)
.collect()
.expect("Failed to read parquet");
let row_count = df.height();
eprintln!("Loaded {} rows", row_count);
// Extract lat/lon using bulk iterator
let lat_series = df.column("lat").unwrap().cast(&DataType::Float64).unwrap();
let lat: Vec<f64> = lat_series
.f64()
.unwrap()
.into_iter()
.map(|v| v.unwrap_or(0.0))
.collect();
let lon_series = df.column("lon").unwrap().cast(&DataType::Float64).unwrap();
let lon: Vec<f64> = lon_series
.f64()
.unwrap()
.into_iter()
.map(|v| v.unwrap_or(0.0))
.collect();
// Extract feature columns (column-major, for cache-friendly histogram computation)
eprintln!("Extracting feature columns...");
let col_major: Vec<Vec<f64>> = feature_names
.iter()
.map(|name| {
let s = df.column(name.as_str()).unwrap();
column_to_f64_vec(s)
})
.collect();
// Compute histograms in parallel (column-major is ideal for per-column iteration)
eprintln!("Computing histograms...");
let feature_stats: Vec<FeatureStats> = col_major
.par_iter()
.enumerate()
.map(|(i, vals)| {
let stats = compute_feature_stats(vals);
eprintln!(
" {}: p{}={:.2}, p{}={:.2}, {} bins",
feature_names[i],
FEATURE_PERCENTILE_LOW,
stats.p_low,
FEATURE_PERCENTILE_HIGH,
stats.p_high,
stats.histogram.counts.len()
);
stats
})
.collect();
// Extract string columns (before permutation)
eprintln!("Extracting string columns...");
let address_raw: Vec<String> = if let Ok(col) = df.column("pp_address") {
col.str()
.unwrap()
.into_iter()
.map(|v| v.unwrap_or("").to_string())
.collect()
} else {
vec![String::new(); row_count]
};
let postcode_raw: Vec<String> = if let Ok(col) = df.column("postcode") {
col.str()
.unwrap()
.into_iter()
.map(|v| v.unwrap_or("").to_string())
.collect()
} else {
vec![String::new(); row_count]
};
let property_type_raw: Vec<String> = if let Ok(col) = df.column("pp_property_type") {
col.str()
.unwrap()
.into_iter()
.map(|v| v.unwrap_or("").to_string())
.collect()
} else {
vec![String::new(); row_count]
};
let built_form_raw: Vec<String> = if let Ok(col) = df.column("built_form") {
col.str()
.unwrap()
.into_iter()
.map(|v| v.unwrap_or("").to_string())
.collect()
} else {
vec![String::new(); row_count]
};
let current_energy_rating_raw: Vec<String> =
if let Ok(col) = df.column("current_energy_rating") {
col.str()
.unwrap()
.into_iter()
.map(|v| v.unwrap_or("").to_string())
.collect()
} else {
vec![String::new(); row_count]
};
let potential_energy_rating_raw: Vec<String> =
if let Ok(col) = df.column("potential_energy_rating") {
col.str()
.unwrap()
.into_iter()
.map(|v| v.unwrap_or("").to_string())
.collect()
} else {
vec![String::new(); row_count]
};
// Sort all rows by spatial locality so that grid queries access
// contiguous memory (sequential reads instead of random DRAM accesses).
// Uses the same 0.01° grid cell as the spatial index for the sort key.
eprintln!("Sorting rows by spatial locality...");
let grid_cell_size = 0.01_f64;
let min_lat_val = lat.iter().cloned().fold(f64::INFINITY, f64::min) - grid_cell_size;
let min_lon_val = lon.iter().cloned().fold(f64::INFINITY, f64::min) - grid_cell_size;
let max_lon_val = lon.iter().cloned().fold(f64::NEG_INFINITY, f64::max) + grid_cell_size;
let grid_cols = ((max_lon_val - min_lon_val) / grid_cell_size).ceil() as u64 + 1;
let mut perm: Vec<u32> = (0..row_count as u32).collect();
perm.sort_unstable_by_key(|&i| {
let r = ((lat[i as usize] - min_lat_val) / grid_cell_size) as u64;
let c = ((lon[i as usize] - min_lon_val) / grid_cell_size) as u64;
r * grid_cols + c
});
// Apply permutation to lat/lon
let lat: Vec<f64> = perm.iter().map(|&i| lat[i as usize]).collect();
let lon: Vec<f64> = perm.iter().map(|&i| lon[i as usize]).collect();
// Apply permutation to string columns
let address: Vec<String> = perm
.iter()
.map(|&i| address_raw[i as usize].clone())
.collect();
let postcode: Vec<String> = perm
.iter()
.map(|&i| postcode_raw[i as usize].clone())
.collect();
let property_type: Vec<String> = perm
.iter()
.map(|&i| property_type_raw[i as usize].clone())
.collect();
let built_form: Vec<String> = perm
.iter()
.map(|&i| built_form_raw[i as usize].clone())
.collect();
let current_energy_rating: Vec<String> = perm
.iter()
.map(|&i| current_energy_rating_raw[i as usize].clone())
.collect();
let potential_energy_rating: Vec<String> = perm
.iter()
.map(|&i| potential_energy_rating_raw[i as usize].clone())
.collect();
// Transpose to row-major AND apply spatial permutation in one pass.
// Result: all features for one row are contiguous, and spatially
// nearby rows are adjacent in memory.
eprintln!("Transposing to row-major layout (spatially sorted)...");
let mut feature_data = vec![f64::NAN; row_count * num_features];
for (new_row, &old_row) in perm.iter().enumerate() {
let old = old_row as usize;
let dst_base = new_row * num_features;
for (feat_idx, col_vec) in col_major.iter().enumerate() {
feature_data[dst_base + feat_idx] = col_vec[old];
}
}
eprintln!("Data loading complete.");
PropertyData {
lat,
lon,
feature_names,
num_features,
feature_data,
feature_stats,
address,
postcode,
property_type,
built_form,
current_energy_rating,
potential_energy_rating,
}
}
}
/// Point of Interest data
#[derive(Serialize)]
pub struct POI {
pub id: String,
pub name: String,
pub category: String,
pub lat: f64,
pub lng: f64,
pub emoji: String,
}
/// Columnar storage for POI data
pub struct POIData {
pub id: Vec<String>,
pub name: Vec<String>,
pub category: Vec<String>,
pub lat: Vec<f64>,
pub lng: Vec<f64>,
pub emoji: Vec<String>,
}
impl POIData {
pub fn load(parquet_path: &Path) -> Self {
eprintln!("Loading POI data from {:?}...", parquet_path);
let df = LazyFrame::scan_parquet(parquet_path, Default::default())
.expect("Failed to scan POI parquet")
.collect()
.expect("Failed to read POI parquet");
let row_count = df.height();
eprintln!("Loaded {} POIs", row_count);
// Extract columns
let id: Vec<String> = df
.column("id")
.unwrap()
.str()
.unwrap()
.into_iter()
.map(|v| v.unwrap_or("").to_string())
.collect();
let name: Vec<String> = df
.column("name")
.unwrap()
.str()
.unwrap()
.into_iter()
.map(|v| v.unwrap_or("").to_string())
.collect();
let category: Vec<String> = df
.column("category")
.unwrap()
.str()
.unwrap()
.into_iter()
.map(|v| v.unwrap_or("").to_string())
.collect();
let lat: Vec<f64> = df
.column("lat")
.unwrap()
.f64()
.unwrap()
.into_iter()
.map(|v| v.unwrap_or(0.0))
.collect();
let lng: Vec<f64> = df
.column("lng")
.unwrap()
.f64()
.unwrap()
.into_iter()
.map(|v| v.unwrap_or(0.0))
.collect();
let emoji: Vec<String> = df
.column("emoji")
.unwrap()
.str()
.unwrap()
.into_iter()
.map(|v| v.unwrap_or("").to_string())
.collect();
eprintln!("POI data loading complete.");
POIData {
id,
name,
category,
lat,
lng,
emoji,
}
}
}