Rewrite server in rust
This commit is contained in:
parent
0cea9b873c
commit
bf2d5de156
13 changed files with 3875 additions and 547 deletions
2740
server-rs/Cargo.lock
generated
Normal file
2740
server-rs/Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
19
server-rs/Cargo.toml
Normal file
19
server-rs/Cargo.toml
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
[package]
|
||||
name = "property-map-server"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
axum = "0.8"
|
||||
tower-http = { version = "0.6", features = ["cors", "fs", "compression-gzip"] }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
polars = { version = "0.46", features = ["parquet", "lazy", "dtype-struct", "dtype-u8", "dtype-u16", "dtype-i8", "dtype-i16"] }
|
||||
h3o = "0.7"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
rayon = "1"
|
||||
rustc-hash = "2"
|
||||
|
||||
[profile.release]
|
||||
opt-level = 3
|
||||
lto = true
|
||||
11
server-rs/src/consts.rs
Normal file
11
server-rs/src/consts.rs
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
/// Lower percentile for feature range reporting
|
||||
pub const FEATURE_PERCENTILE_LOW: f64 = 2.0;
|
||||
|
||||
/// Upper percentile for feature range reporting
|
||||
pub const FEATURE_PERCENTILE_HIGH: f64 = 98.0;
|
||||
|
||||
pub const HISTOGRAM_BINS: usize = 100;
|
||||
|
||||
/// H3 resolutions to precompute at startup (covers typical zoom levels)
|
||||
pub const H3_PRECOMPUTE_MIN: u8 = 4;
|
||||
pub const H3_PRECOMPUTE_MAX: u8 = 12;
|
||||
405
server-rs/src/data.rs
Normal file
405
server-rs/src/data.rs
Normal file
|
|
@ -0,0 +1,405 @@
|
|||
use polars::prelude::*;
|
||||
use polars::lazy::frame::LazyFrame;
|
||||
use rayon::prelude::*;
|
||||
use serde::Serialize;
|
||||
use std::path::Path;
|
||||
|
||||
use crate::consts::{FEATURE_PERCENTILE_LOW, FEATURE_PERCENTILE_HIGH, HISTOGRAM_BINS, H3_PRECOMPUTE_MIN, H3_PRECOMPUTE_MAX};
|
||||
|
||||
/// Columns to exclude from feature discovery (not numeric features)
|
||||
const EXCLUDED_COLUMNS: &[&str] = &["lat", "lon"];
|
||||
|
||||
/// H3 valid resolution range (0-15)
|
||||
pub const MIN_RESOLUTION: u8 = 0;
|
||||
pub const MAX_RESOLUTION: u8 = 15;
|
||||
pub const DEFAULT_RESOLUTION: u8 = 8;
|
||||
|
||||
/// Returns true if the polars DataType is numeric (integer or float)
|
||||
fn is_numeric_dtype(dtype: &DataType) -> bool {
|
||||
matches!(
|
||||
dtype,
|
||||
DataType::Int8
|
||||
| DataType::Int16
|
||||
| DataType::Int32
|
||||
| DataType::Int64
|
||||
| DataType::UInt8
|
||||
| DataType::UInt16
|
||||
| DataType::UInt32
|
||||
| DataType::UInt64
|
||||
| DataType::Float32
|
||||
| DataType::Float64
|
||||
)
|
||||
}
|
||||
|
||||
/// Histogram for a single feature column
|
||||
#[derive(Serialize, Clone)]
|
||||
pub struct Histogram {
|
||||
/// Left edge of first bin
|
||||
pub min: f64,
|
||||
/// Right edge of last bin
|
||||
pub max: f64,
|
||||
/// Width of each bin
|
||||
pub bin_width: f64,
|
||||
/// Count of values in each bin
|
||||
pub counts: Vec<u64>,
|
||||
}
|
||||
|
||||
/// Precomputed statistics for a single feature
|
||||
pub struct FeatureStats {
|
||||
pub p_low: f64,
|
||||
pub p_high: f64,
|
||||
pub histogram: Histogram,
|
||||
}
|
||||
|
||||
/// Columnar storage for all property data.
|
||||
/// Feature values use NaN as the null sentinel.
|
||||
pub struct PropertyData {
|
||||
pub lat: Vec<f64>,
|
||||
pub lon: Vec<f64>,
|
||||
/// Dynamically discovered numeric feature column names
|
||||
pub feature_names: Vec<String>,
|
||||
/// Number of feature columns
|
||||
pub num_features: usize,
|
||||
/// Row-major flat array: feature_data[row * num_features + feat_idx].
|
||||
/// NaN = null. Contiguous layout for cache-friendly per-row access.
|
||||
pub feature_data: Vec<f64>,
|
||||
/// Precomputed stats (percentiles + histogram) for each feature
|
||||
pub feature_stats: Vec<FeatureStats>,
|
||||
}
|
||||
|
||||
/// Approximate a percentile from a histogram using linear interpolation.
|
||||
/// `p` is in [0, 100]. `total` is the sum of all bin counts.
|
||||
fn percentile_from_histogram(counts: &[u64], min: f64, bin_width: f64, total: usize, p: f64) -> f64 {
|
||||
let target = (p / 100.0) * (total as f64 - 1.0);
|
||||
let mut cumulative = 0u64;
|
||||
for (i, &c) in counts.iter().enumerate() {
|
||||
let prev = cumulative;
|
||||
cumulative += c;
|
||||
if cumulative as f64 > target {
|
||||
// Interpolate within this bin
|
||||
let frac = if c > 0 {
|
||||
(target - prev as f64) / c as f64
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
return min + (i as f64 + frac) * bin_width;
|
||||
}
|
||||
}
|
||||
// Fallback: right edge of last bin
|
||||
min + counts.len() as f64 * bin_width
|
||||
}
|
||||
|
||||
/// Build a histogram and compute approximate percentiles in O(n) — no sort needed.
|
||||
fn compute_feature_stats(vals: &[f64]) -> FeatureStats {
|
||||
// Single pass: min, max, count (skipping NaN)
|
||||
let mut min = f64::INFINITY;
|
||||
let mut max = f64::NEG_INFINITY;
|
||||
let mut count = 0usize;
|
||||
for &v in vals {
|
||||
if !v.is_nan() {
|
||||
if v < min { min = v; }
|
||||
if v > max { max = v; }
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if count == 0 {
|
||||
return FeatureStats {
|
||||
p_low: 0.0,
|
||||
p_high: 0.0,
|
||||
histogram: Histogram {
|
||||
min: 0.0,
|
||||
max: 0.0,
|
||||
bin_width: 1.0,
|
||||
counts: vec![0; HISTOGRAM_BINS],
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
// Build histogram over full range (second pass, no sort)
|
||||
let range = if max == min { 1.0 } else { max - min };
|
||||
let bin_max = min + range * (1.0 + 1e-9);
|
||||
let bin_width = (bin_max - min) / HISTOGRAM_BINS as f64;
|
||||
|
||||
let mut counts = vec![0u64; HISTOGRAM_BINS];
|
||||
for &v in vals {
|
||||
if !v.is_nan() {
|
||||
let bin = ((v - min) / bin_width) as usize;
|
||||
counts[bin.min(HISTOGRAM_BINS - 1)] += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Approximate percentiles from the histogram
|
||||
let p_low = percentile_from_histogram(&counts, min, bin_width, count, FEATURE_PERCENTILE_LOW);
|
||||
let p_high = percentile_from_histogram(&counts, min, bin_width, count, FEATURE_PERCENTILE_HIGH);
|
||||
|
||||
FeatureStats {
|
||||
p_low,
|
||||
p_high,
|
||||
histogram: Histogram {
|
||||
min,
|
||||
max,
|
||||
bin_width,
|
||||
counts,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert a polars Column to Vec<f64> using NaN for null values
|
||||
fn column_to_f64_vec(c: &Column) -> Vec<f64> {
|
||||
let s = c.cast(&DataType::Float64).unwrap();
|
||||
let ca = s.f64().unwrap();
|
||||
ca.into_iter().map(|v| v.unwrap_or(f64::NAN)).collect()
|
||||
}
|
||||
|
||||
/// Precompute H3 cell IDs for all rows at commonly used resolutions.
|
||||
/// Returns a Vec indexed by resolution (0..16), where non-precomputed
|
||||
/// resolutions have an empty Vec.
|
||||
pub fn precompute_h3(lat: &[f64], lon: &[f64]) -> Vec<Vec<u64>> {
|
||||
eprintln!(
|
||||
"Precomputing H3 cells for resolutions {}..{}...",
|
||||
H3_PRECOMPUTE_MIN, H3_PRECOMPUTE_MAX
|
||||
);
|
||||
|
||||
let resolutions: Vec<u8> = (H3_PRECOMPUTE_MIN..=H3_PRECOMPUTE_MAX).collect();
|
||||
let computed: Vec<(u8, Vec<u64>)> = resolutions
|
||||
.into_par_iter()
|
||||
.map(|res| {
|
||||
let h3_res = h3o::Resolution::try_from(res).unwrap();
|
||||
let cells: Vec<u64> = lat
|
||||
.iter()
|
||||
.zip(lon.iter())
|
||||
.map(|(&la, &lo)| {
|
||||
h3o::LatLng::new(la, lo)
|
||||
.map(|c| u64::from(c.to_cell(h3_res)))
|
||||
.unwrap_or(0)
|
||||
})
|
||||
.collect();
|
||||
eprintln!(" Resolution {} done ({} cells)", res, cells.len());
|
||||
(res, cells)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut result: Vec<Vec<u64>> = (0..16).map(|_| Vec::new()).collect();
|
||||
for (res, cells) in computed {
|
||||
result[res as usize] = cells;
|
||||
}
|
||||
|
||||
eprintln!("H3 precomputation complete.");
|
||||
result
|
||||
}
|
||||
|
||||
impl PropertyData {
|
||||
pub fn load(parquet_path: &Path) -> Self {
|
||||
eprintln!("Loading parquet from {:?}...", parquet_path);
|
||||
|
||||
// Scan schema to discover numeric feature columns
|
||||
let mut lf = LazyFrame::scan_parquet(parquet_path, Default::default())
|
||||
.expect("Failed to scan parquet");
|
||||
let schema = lf.collect_schema().expect("Failed to read schema");
|
||||
|
||||
let feature_names: Vec<String> = schema
|
||||
.iter()
|
||||
.filter(|(name, dtype)| {
|
||||
is_numeric_dtype(dtype) && !EXCLUDED_COLUMNS.contains(&name.as_str())
|
||||
})
|
||||
.map(|(name, _)| name.to_string())
|
||||
.collect();
|
||||
|
||||
let num_features = feature_names.len();
|
||||
eprintln!("Discovered {} numeric feature columns", num_features);
|
||||
|
||||
// Read only the columns we need
|
||||
let mut cols_needed: Vec<String> = vec!["lat".into(), "lon".into()];
|
||||
cols_needed.extend(feature_names.iter().cloned());
|
||||
|
||||
let df = LazyFrame::scan_parquet(parquet_path, Default::default())
|
||||
.expect("Failed to scan parquet")
|
||||
.select(
|
||||
cols_needed
|
||||
.iter()
|
||||
.map(|c| col(c.as_str()).cast(DataType::Float64))
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.collect()
|
||||
.expect("Failed to read parquet");
|
||||
|
||||
let row_count = df.height();
|
||||
eprintln!("Loaded {} rows", row_count);
|
||||
|
||||
// Extract lat/lon using bulk iterator
|
||||
let lat_series = df.column("lat").unwrap().cast(&DataType::Float64).unwrap();
|
||||
let lat: Vec<f64> = lat_series.f64().unwrap().into_iter().map(|v| v.unwrap_or(0.0)).collect();
|
||||
|
||||
let lon_series = df.column("lon").unwrap().cast(&DataType::Float64).unwrap();
|
||||
let lon: Vec<f64> = lon_series.f64().unwrap().into_iter().map(|v| v.unwrap_or(0.0)).collect();
|
||||
|
||||
// Extract feature columns (column-major, for cache-friendly histogram computation)
|
||||
eprintln!("Extracting feature columns...");
|
||||
let col_major: Vec<Vec<f64>> = feature_names
|
||||
.iter()
|
||||
.map(|name| {
|
||||
let s = df.column(name.as_str()).unwrap();
|
||||
column_to_f64_vec(s)
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Compute histograms in parallel (column-major is ideal for per-column iteration)
|
||||
eprintln!("Computing histograms...");
|
||||
let feature_stats: Vec<FeatureStats> = col_major
|
||||
.par_iter()
|
||||
.enumerate()
|
||||
.map(|(i, vals)| {
|
||||
let stats = compute_feature_stats(vals);
|
||||
eprintln!(
|
||||
" {}: p{}={:.2}, p{}={:.2}, {} bins",
|
||||
feature_names[i],
|
||||
FEATURE_PERCENTILE_LOW, stats.p_low,
|
||||
FEATURE_PERCENTILE_HIGH, stats.p_high,
|
||||
stats.histogram.counts.len()
|
||||
);
|
||||
stats
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Sort all rows by spatial locality so that grid queries access
|
||||
// contiguous memory (sequential reads instead of random DRAM accesses).
|
||||
// Uses the same 0.01° grid cell as the spatial index for the sort key.
|
||||
eprintln!("Sorting rows by spatial locality...");
|
||||
let grid_cell_size = 0.01_f64;
|
||||
let min_lat_val = lat.iter().cloned().fold(f64::INFINITY, f64::min) - grid_cell_size;
|
||||
let min_lon_val = lon.iter().cloned().fold(f64::INFINITY, f64::min) - grid_cell_size;
|
||||
let max_lon_val = lon.iter().cloned().fold(f64::NEG_INFINITY, f64::max) + grid_cell_size;
|
||||
let grid_cols = ((max_lon_val - min_lon_val) / grid_cell_size).ceil() as u64 + 1;
|
||||
|
||||
let mut perm: Vec<u32> = (0..row_count as u32).collect();
|
||||
perm.sort_unstable_by_key(|&i| {
|
||||
let r = ((lat[i as usize] - min_lat_val) / grid_cell_size) as u64;
|
||||
let c = ((lon[i as usize] - min_lon_val) / grid_cell_size) as u64;
|
||||
r * grid_cols + c
|
||||
});
|
||||
|
||||
// Apply permutation to lat/lon
|
||||
let lat: Vec<f64> = perm.iter().map(|&i| lat[i as usize]).collect();
|
||||
let lon: Vec<f64> = perm.iter().map(|&i| lon[i as usize]).collect();
|
||||
|
||||
// Transpose to row-major AND apply spatial permutation in one pass.
|
||||
// Result: all features for one row are contiguous, and spatially
|
||||
// nearby rows are adjacent in memory.
|
||||
eprintln!("Transposing to row-major layout (spatially sorted)...");
|
||||
let mut feature_data = vec![f64::NAN; row_count * num_features];
|
||||
for (new_row, &old_row) in perm.iter().enumerate() {
|
||||
let old = old_row as usize;
|
||||
let dst_base = new_row * num_features;
|
||||
for (feat_idx, col_vec) in col_major.iter().enumerate() {
|
||||
feature_data[dst_base + feat_idx] = col_vec[old];
|
||||
}
|
||||
}
|
||||
|
||||
eprintln!("Data loading complete.");
|
||||
|
||||
PropertyData {
|
||||
lat,
|
||||
lon,
|
||||
feature_names,
|
||||
num_features,
|
||||
feature_data,
|
||||
feature_stats,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Point of Interest data
|
||||
#[derive(Serialize)]
|
||||
pub struct POI {
|
||||
pub id: String,
|
||||
pub name: String,
|
||||
pub category: String,
|
||||
pub lat: f64,
|
||||
pub lng: f64,
|
||||
pub emoji: String,
|
||||
}
|
||||
|
||||
/// Columnar storage for POI data
|
||||
pub struct POIData {
|
||||
pub id: Vec<String>,
|
||||
pub name: Vec<String>,
|
||||
pub category: Vec<String>,
|
||||
pub lat: Vec<f64>,
|
||||
pub lng: Vec<f64>,
|
||||
pub emoji: Vec<String>,
|
||||
}
|
||||
|
||||
impl POIData {
|
||||
pub fn load(parquet_path: &Path) -> Self {
|
||||
eprintln!("Loading POI data from {:?}...", parquet_path);
|
||||
|
||||
let df = LazyFrame::scan_parquet(parquet_path, Default::default())
|
||||
.expect("Failed to scan POI parquet")
|
||||
.collect()
|
||||
.expect("Failed to read POI parquet");
|
||||
|
||||
let row_count = df.height();
|
||||
eprintln!("Loaded {} POIs", row_count);
|
||||
|
||||
// Extract columns
|
||||
let id: Vec<String> = df.column("id")
|
||||
.unwrap()
|
||||
.str()
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|v| v.unwrap_or("").to_string())
|
||||
.collect();
|
||||
|
||||
let name: Vec<String> = df.column("name")
|
||||
.unwrap()
|
||||
.str()
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|v| v.unwrap_or("").to_string())
|
||||
.collect();
|
||||
|
||||
let category: Vec<String> = df.column("category")
|
||||
.unwrap()
|
||||
.str()
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|v| v.unwrap_or("").to_string())
|
||||
.collect();
|
||||
|
||||
let lat: Vec<f64> = df.column("lat")
|
||||
.unwrap()
|
||||
.f64()
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|v| v.unwrap_or(0.0))
|
||||
.collect();
|
||||
|
||||
let lng: Vec<f64> = df.column("lng")
|
||||
.unwrap()
|
||||
.f64()
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|v| v.unwrap_or(0.0))
|
||||
.collect();
|
||||
|
||||
let emoji: Vec<String> = df.column("emoji")
|
||||
.unwrap()
|
||||
.str()
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|v| v.unwrap_or("").to_string())
|
||||
.collect();
|
||||
|
||||
eprintln!("POI data loading complete.");
|
||||
|
||||
POIData {
|
||||
id,
|
||||
name,
|
||||
category,
|
||||
lat,
|
||||
lng,
|
||||
emoji,
|
||||
}
|
||||
}
|
||||
}
|
||||
130
server-rs/src/index.rs
Normal file
130
server-rs/src/index.rs
Normal file
|
|
@ -0,0 +1,130 @@
|
|||
/// Grid-based spatial index for fast rectangle queries over property rows.
|
||||
///
|
||||
/// Divides the UK bounding box into cells of ~0.01 degrees (~1km),
|
||||
/// each storing indices of rows whose lat/lon falls within that cell.
|
||||
|
||||
pub struct GridIndex {
|
||||
min_lat: f64,
|
||||
min_lon: f64,
|
||||
cell_size: f64,
|
||||
cols: usize,
|
||||
rows: usize,
|
||||
/// cells[row * cols + col] = vec of row indices
|
||||
cells: Vec<Vec<u32>>,
|
||||
}
|
||||
|
||||
impl GridIndex {
|
||||
/// Build the grid index from lat/lon arrays.
|
||||
pub fn build(lat: &[f64], lon: &[f64], cell_size: f64) -> Self {
|
||||
// Compute bounding box with a small margin
|
||||
let mut min_lat = f64::INFINITY;
|
||||
let mut max_lat = f64::NEG_INFINITY;
|
||||
let mut min_lon = f64::INFINITY;
|
||||
let mut max_lon = f64::NEG_INFINITY;
|
||||
|
||||
for i in 0..lat.len() {
|
||||
let la = lat[i];
|
||||
let lo = lon[i];
|
||||
if la < min_lat {
|
||||
min_lat = la;
|
||||
}
|
||||
if la > max_lat {
|
||||
max_lat = la;
|
||||
}
|
||||
if lo < min_lon {
|
||||
min_lon = lo;
|
||||
}
|
||||
if lo > max_lon {
|
||||
max_lon = lo;
|
||||
}
|
||||
}
|
||||
|
||||
// Add margin
|
||||
min_lat -= cell_size;
|
||||
min_lon -= cell_size;
|
||||
max_lat += cell_size;
|
||||
max_lon += cell_size;
|
||||
|
||||
let rows = ((max_lat - min_lat) / cell_size).ceil() as usize + 1;
|
||||
let cols = ((max_lon - min_lon) / cell_size).ceil() as usize + 1;
|
||||
|
||||
eprintln!(
|
||||
"Building grid index: {}x{} cells ({} total), cell_size={}",
|
||||
rows,
|
||||
cols,
|
||||
rows * cols,
|
||||
cell_size
|
||||
);
|
||||
|
||||
let mut cells: Vec<Vec<u32>> = vec![Vec::new(); rows * cols];
|
||||
|
||||
for i in 0..lat.len() {
|
||||
let r = ((lat[i] - min_lat) / cell_size) as usize;
|
||||
let c = ((lon[i] - min_lon) / cell_size) as usize;
|
||||
let idx = r * cols + c;
|
||||
cells[idx].push(i as u32);
|
||||
}
|
||||
|
||||
eprintln!("Grid index built.");
|
||||
|
||||
GridIndex {
|
||||
min_lat,
|
||||
min_lon,
|
||||
cell_size,
|
||||
cols,
|
||||
rows,
|
||||
cells,
|
||||
}
|
||||
}
|
||||
|
||||
/// Query all row indices within the given bounding box.
|
||||
pub fn query(&self, south: f64, west: f64, north: f64, east: f64) -> Vec<u32> {
|
||||
let (r_min, r_max, c_min, c_max) = self.clamp_bounds(south, west, north, east);
|
||||
|
||||
let mut result = Vec::new();
|
||||
for r in r_min..=r_max {
|
||||
let row_start = r * self.cols;
|
||||
for c in c_min..=c_max {
|
||||
result.extend_from_slice(&self.cells[row_start + c]);
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// Iterate all row indices in bounds without allocating a Vec.
|
||||
#[inline]
|
||||
pub fn for_each_in_bounds(
|
||||
&self,
|
||||
south: f64,
|
||||
west: f64,
|
||||
north: f64,
|
||||
east: f64,
|
||||
mut f: impl FnMut(u32),
|
||||
) {
|
||||
let (r_min, r_max, c_min, c_max) = self.clamp_bounds(south, west, north, east);
|
||||
|
||||
for r in r_min..=r_max {
|
||||
let row_start = r * self.cols;
|
||||
for c in c_min..=c_max {
|
||||
for &row_idx in &self.cells[row_start + c] {
|
||||
f(row_idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn clamp_bounds(&self, south: f64, west: f64, north: f64, east: f64) -> (usize, usize, usize, usize) {
|
||||
let r_min = ((south - self.min_lat) / self.cell_size) as isize;
|
||||
let r_max = ((north - self.min_lat) / self.cell_size) as isize;
|
||||
let c_min = ((west - self.min_lon) / self.cell_size) as isize;
|
||||
let c_max = ((east - self.min_lon) / self.cell_size) as isize;
|
||||
|
||||
let r_min = r_min.max(0) as usize;
|
||||
let r_max = (r_max.min(self.rows as isize - 1)).max(0) as usize;
|
||||
let c_min = c_min.max(0) as usize;
|
||||
let c_max = (c_max.min(self.cols as isize - 1)).max(0) as usize;
|
||||
|
||||
(r_min, r_max, c_min, c_max)
|
||||
}
|
||||
}
|
||||
109
server-rs/src/main.rs
Normal file
109
server-rs/src/main.rs
Normal file
|
|
@ -0,0 +1,109 @@
|
|||
mod consts;
|
||||
mod data;
|
||||
mod index;
|
||||
mod routes;
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::routing::get;
|
||||
use axum::Router;
|
||||
use tower_http::compression::CompressionLayer;
|
||||
use tower_http::cors::{Any, CorsLayer};
|
||||
use tower_http::services::ServeDir;
|
||||
|
||||
use routes::AppState;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let parquet_path = PathBuf::from(
|
||||
std::env::args()
|
||||
.nth(1)
|
||||
.unwrap_or_else(|| "data_sources/processed/wide.parquet".to_string()),
|
||||
);
|
||||
if !parquet_path.exists() {
|
||||
eprintln!("Error: {} not found.", parquet_path.display());
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
// Load property data and build indices
|
||||
let property_data = data::PropertyData::load(&parquet_path);
|
||||
let grid = index::GridIndex::build(&property_data.lat, &property_data.lon, 0.01);
|
||||
let h3_cells = data::precompute_h3(&property_data.lat, &property_data.lon);
|
||||
|
||||
// Load POI data and build spatial index
|
||||
// Derive POI path from the data parquet path (same directory)
|
||||
let poi_path = parquet_path
|
||||
.parent()
|
||||
.and_then(|p| p.parent())
|
||||
.map(|p| p.join("filtered_uk_pois.parquet"))
|
||||
.unwrap_or_else(|| PathBuf::from("data_sources/filtered_uk_pois.parquet"));
|
||||
|
||||
let poi_data = if poi_path.exists() {
|
||||
data::POIData::load(&poi_path)
|
||||
} else {
|
||||
eprintln!("Warning: {} not found. POI endpoints will be unavailable.", poi_path.display());
|
||||
data::POIData {
|
||||
id: Vec::new(),
|
||||
name: Vec::new(),
|
||||
category: Vec::new(),
|
||||
lat: Vec::new(),
|
||||
lng: Vec::new(),
|
||||
emoji: Vec::new(),
|
||||
}
|
||||
};
|
||||
let poi_grid = index::GridIndex::build(&poi_data.lat, &poi_data.lng, 0.01);
|
||||
|
||||
let state = Arc::new(AppState {
|
||||
data: property_data,
|
||||
grid,
|
||||
h3_cells,
|
||||
poi_data,
|
||||
poi_grid,
|
||||
});
|
||||
|
||||
let cors = CorsLayer::new()
|
||||
.allow_origin(Any)
|
||||
.allow_methods(Any)
|
||||
.allow_headers(Any);
|
||||
|
||||
// API routes
|
||||
let state_features = state.clone();
|
||||
let state_hexagons = state.clone();
|
||||
let state_pois = state.clone();
|
||||
let state_poi_categories = state.clone();
|
||||
|
||||
let api = Router::new()
|
||||
.route(
|
||||
"/api/features",
|
||||
get(move || routes::get_features(state_features.clone())),
|
||||
)
|
||||
.route(
|
||||
"/api/hexagons",
|
||||
get(move |query| routes::get_hexagons(state_hexagons.clone(), query)),
|
||||
)
|
||||
.route(
|
||||
"/api/pois",
|
||||
get(move |query| routes::get_pois(state_pois.clone(), query)),
|
||||
)
|
||||
.route(
|
||||
"/api/poi-categories",
|
||||
get(move || routes::get_poi_categories(state_poi_categories.clone())),
|
||||
);
|
||||
|
||||
// Static file serving for frontend
|
||||
let frontend_dist = PathBuf::from("frontend/dist");
|
||||
let app = if frontend_dist.exists() {
|
||||
api.fallback_service(ServeDir::new(frontend_dist))
|
||||
} else {
|
||||
api
|
||||
};
|
||||
|
||||
let app = app.layer(cors).layer(CompressionLayer::new().gzip(true));
|
||||
|
||||
let addr = "0.0.0.0:8001";
|
||||
eprintln!("Server listening on {}", addr);
|
||||
|
||||
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
}
|
||||
461
server-rs/src/routes.rs
Normal file
461
server-rs/src/routes.rs
Normal file
|
|
@ -0,0 +1,461 @@
|
|||
use std::fmt::Write;
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::extract::Query;
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::{IntoResponse, Json};
|
||||
use rustc_hash::FxHashMap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::data::{Histogram, PropertyData, POIData, POI, DEFAULT_RESOLUTION, MAX_RESOLUTION, MIN_RESOLUTION};
|
||||
use crate::index::GridIndex;
|
||||
|
||||
/// Shared application state
|
||||
pub struct AppState {
|
||||
pub data: PropertyData,
|
||||
pub grid: GridIndex,
|
||||
/// h3_cells[resolution][row_idx] = precomputed H3 cell ID.
|
||||
/// Empty Vec for resolutions not precomputed.
|
||||
pub h3_cells: Vec<Vec<u64>>,
|
||||
pub poi_data: POIData,
|
||||
pub poi_grid: GridIndex,
|
||||
}
|
||||
|
||||
const BOUNDS_BUFFER_PERCENT: f64 = 0.2;
|
||||
|
||||
// ── /api/features ──
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct FeatureInfo {
|
||||
name: String,
|
||||
min: f64,
|
||||
max: f64,
|
||||
label: String,
|
||||
histogram: Histogram,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct FeaturesResponse {
|
||||
features: Vec<FeatureInfo>,
|
||||
}
|
||||
|
||||
fn snake_to_label(name: &str) -> String {
|
||||
name.split('_')
|
||||
.map(|word| {
|
||||
let mut chars = word.chars();
|
||||
match chars.next() {
|
||||
None => String::new(),
|
||||
Some(c) => {
|
||||
let mut s = c.to_uppercase().to_string();
|
||||
s.extend(chars);
|
||||
s
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.join(" ")
|
||||
}
|
||||
|
||||
pub async fn get_features(state: Arc<AppState>) -> Json<FeaturesResponse> {
|
||||
let features = state
|
||||
.data
|
||||
.feature_names
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, name): (usize, &String)| {
|
||||
let stats = &state.data.feature_stats[i];
|
||||
FeatureInfo {
|
||||
name: name.clone(),
|
||||
min: stats.p_low,
|
||||
max: stats.p_high,
|
||||
label: snake_to_label(name),
|
||||
histogram: stats.histogram.clone(),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
Json(FeaturesResponse { features })
|
||||
}
|
||||
|
||||
// ── /api/hexagons ──
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct HexagonParams {
|
||||
resolution: Option<u8>,
|
||||
bounds: Option<String>,
|
||||
/// Comma-separated filters: `name:min:max,...`
|
||||
/// Rows must have non-NaN values within [min,max] for each filter.
|
||||
filters: Option<String>,
|
||||
}
|
||||
|
||||
struct ParsedFilter {
|
||||
feat_idx: usize,
|
||||
min: f64,
|
||||
max: f64,
|
||||
}
|
||||
|
||||
/// Per-cell accumulator for aggregating features
|
||||
struct CellAgg {
|
||||
count: u32,
|
||||
mins: Vec<f64>,
|
||||
maxs: Vec<f64>,
|
||||
}
|
||||
|
||||
impl CellAgg {
|
||||
fn new(num_features: usize) -> Self {
|
||||
CellAgg {
|
||||
count: 0,
|
||||
mins: vec![f64::INFINITY; num_features],
|
||||
maxs: vec![f64::NEG_INFINITY; num_features],
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a row using row-major feature_data layout.
|
||||
/// feature_data[row * num_features + feat_idx] — all features for one row
|
||||
/// are contiguous, so this reads a single cache line per ~8 features.
|
||||
#[inline]
|
||||
fn add_row(&mut self, feature_data: &[f64], row: usize, num_features: usize) {
|
||||
self.count += 1;
|
||||
let base = row * num_features;
|
||||
let row_slice = &feature_data[base..base + num_features];
|
||||
for (i, &v) in row_slice.iter().enumerate() {
|
||||
if v.is_finite() {
|
||||
if v < self.mins[i] {
|
||||
self.mins[i] = v;
|
||||
}
|
||||
if v > self.maxs[i] {
|
||||
self.maxs[i] = v;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// Write the hexagons JSON response directly to a String buffer,
|
||||
/// avoiding serde_json::Value allocations entirely.
|
||||
fn write_hexagons_json(
|
||||
buf: &mut String,
|
||||
groups: &FxHashMap<u64, CellAgg>,
|
||||
min_keys: &[String],
|
||||
max_keys: &[String],
|
||||
num_features: usize,
|
||||
) {
|
||||
buf.push_str("{\"features\":[");
|
||||
let mut first = true;
|
||||
for (&cell_id, agg) in groups {
|
||||
if !first {
|
||||
buf.push(',');
|
||||
}
|
||||
first = false;
|
||||
|
||||
let cell = h3o::CellIndex::try_from(cell_id).unwrap();
|
||||
write!(buf, "{{\"h3\":\"{}\",\"count\":{}", cell, agg.count).unwrap();
|
||||
|
||||
for i in 0..num_features {
|
||||
if agg.mins[i] != f64::INFINITY {
|
||||
write!(
|
||||
buf,
|
||||
",\"{}\":{},\"{}\":{}",
|
||||
min_keys[i], agg.mins[i], max_keys[i], agg.maxs[i]
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
buf.push('}');
|
||||
}
|
||||
buf.push_str("]}");
|
||||
}
|
||||
|
||||
pub async fn get_hexagons(
|
||||
state: Arc<AppState>,
|
||||
Query(params): Query<HexagonParams>,
|
||||
) -> Result<impl IntoResponse, (StatusCode, String)> {
|
||||
let resolution = params.resolution.unwrap_or(DEFAULT_RESOLUTION);
|
||||
if resolution > MAX_RESOLUTION {
|
||||
return Err((
|
||||
StatusCode::BAD_REQUEST,
|
||||
format!(
|
||||
"resolution must be between {} and {}",
|
||||
MIN_RESOLUTION, MAX_RESOLUTION
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
let bounds_str = params
|
||||
.bounds
|
||||
.ok_or((StatusCode::BAD_REQUEST, "bounds parameter is required".into()))?;
|
||||
|
||||
let parts: Vec<f64> = bounds_str
|
||||
.split(',')
|
||||
.map(|s| s.trim().parse::<f64>())
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map_err(|_| {
|
||||
(
|
||||
StatusCode::BAD_REQUEST,
|
||||
"Invalid bounds format. Use: south,west,north,east".into(),
|
||||
)
|
||||
})?;
|
||||
|
||||
if parts.len() != 4 {
|
||||
return Err((
|
||||
StatusCode::BAD_REQUEST,
|
||||
"Invalid bounds format. Use: south,west,north,east".into(),
|
||||
));
|
||||
}
|
||||
|
||||
let (mut south, mut west, mut north, mut east) = (parts[0], parts[1], parts[2], parts[3]);
|
||||
|
||||
// Apply bounds buffer (20%)
|
||||
let lat_range = north - south;
|
||||
let lng_range = east - west;
|
||||
south -= lat_range * BOUNDS_BUFFER_PERCENT;
|
||||
north += lat_range * BOUNDS_BUFFER_PERCENT;
|
||||
west -= lng_range * BOUNDS_BUFFER_PERCENT;
|
||||
east += lng_range * BOUNDS_BUFFER_PERCENT;
|
||||
|
||||
// Quantize to 0.01 degree precision
|
||||
let precision = 0.01;
|
||||
south = (south / precision).floor() * precision;
|
||||
west = (west / precision).floor() * precision;
|
||||
north = (north / precision).ceil() * precision;
|
||||
east = (east / precision).ceil() * precision;
|
||||
|
||||
// Parse filters: `name:min:max,...`
|
||||
let parsed_filters: Vec<ParsedFilter> = params
|
||||
.filters
|
||||
.as_deref()
|
||||
.filter(|s| !s.is_empty())
|
||||
.map(|s| {
|
||||
s.split(',')
|
||||
.filter_map(|entry| {
|
||||
let parts: Vec<&str> = entry.splitn(3, ':').collect();
|
||||
if parts.len() != 3 {
|
||||
return None;
|
||||
}
|
||||
let name = parts[0].trim();
|
||||
let min = parts[1].trim().parse::<f64>().ok()?;
|
||||
let max = parts[2].trim().parse::<f64>().ok()?;
|
||||
let feat_idx = state.data.feature_names.iter().position(|n| n == name)?;
|
||||
Some(ParsedFilter { feat_idx, min, max })
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
// Move CPU-heavy work off the async executor
|
||||
let json_body = tokio::task::spawn_blocking(move || {
|
||||
let t0 = std::time::Instant::now();
|
||||
|
||||
let num_features = state.data.num_features;
|
||||
let feature_data = &state.data.feature_data;
|
||||
|
||||
// Pre-compute JSON key strings once
|
||||
let min_keys: Vec<String> = state
|
||||
.data
|
||||
.feature_names
|
||||
.iter()
|
||||
.map(|n| format!("min_{}", n))
|
||||
.collect();
|
||||
let max_keys: Vec<String> = state
|
||||
.data
|
||||
.feature_names
|
||||
.iter()
|
||||
.map(|n| format!("max_{}", n))
|
||||
.collect();
|
||||
|
||||
// Use precomputed H3 cells if available
|
||||
let h3_cells_for_res: Option<&[u64]> = state
|
||||
.h3_cells
|
||||
.get(resolution as usize)
|
||||
.filter(|v| !v.is_empty())
|
||||
.map(|v| v.as_slice());
|
||||
|
||||
// Aggregate using FxHashMap (fast non-crypto hash for integer keys)
|
||||
// and grid visitor (no intermediate Vec<u32> allocation)
|
||||
let mut groups: FxHashMap<u64, CellAgg> = FxHashMap::default();
|
||||
|
||||
// Row-level filter check: value must be non-NaN and within [min, max]
|
||||
let row_passes = |row: usize| -> bool {
|
||||
parsed_filters.iter().all(|f| {
|
||||
let v = feature_data[row * num_features + f.feat_idx];
|
||||
v.is_finite() && v >= f.min && v <= f.max
|
||||
})
|
||||
};
|
||||
|
||||
if let Some(precomputed) = h3_cells_for_res {
|
||||
// Fast path: precomputed H3 + visitor pattern
|
||||
state.grid.for_each_in_bounds(south, west, north, east, |row_idx| {
|
||||
let row = row_idx as usize;
|
||||
if !row_passes(row) {
|
||||
return;
|
||||
}
|
||||
let cell_id = precomputed[row];
|
||||
groups
|
||||
.entry(cell_id)
|
||||
.or_insert_with(|| CellAgg::new(num_features))
|
||||
.add_row(feature_data, row, num_features);
|
||||
});
|
||||
} else {
|
||||
// Fallback: compute H3 on-the-fly
|
||||
let h3_res = h3o::Resolution::try_from(resolution).unwrap();
|
||||
state.grid.for_each_in_bounds(south, west, north, east, |row_idx| {
|
||||
let row = row_idx as usize;
|
||||
if !row_passes(row) {
|
||||
return;
|
||||
}
|
||||
let cell_id = h3o::LatLng::new(state.data.lat[row], state.data.lon[row])
|
||||
.map(|c| u64::from(c.to_cell(h3_res)))
|
||||
.unwrap_or(0);
|
||||
groups
|
||||
.entry(cell_id)
|
||||
.or_insert_with(|| CellAgg::new(num_features))
|
||||
.add_row(feature_data, row, num_features);
|
||||
});
|
||||
}
|
||||
|
||||
let t_agg = t0.elapsed();
|
||||
|
||||
// Write JSON directly (no serde_json::Value allocation overhead)
|
||||
let mut json_buf = String::with_capacity(groups.len() * 128);
|
||||
write_hexagons_json(
|
||||
&mut json_buf,
|
||||
&groups,
|
||||
&min_keys,
|
||||
&max_keys,
|
||||
num_features,
|
||||
);
|
||||
|
||||
let t_total = t0.elapsed();
|
||||
eprintln!(
|
||||
"hexagons: res={} cells={} agg={:?} json={:?} total={:?} bytes={}",
|
||||
resolution,
|
||||
groups.len(),
|
||||
t_agg,
|
||||
t_total - t_agg,
|
||||
t_total,
|
||||
json_buf.len()
|
||||
);
|
||||
|
||||
json_buf
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
Ok(([("content-type", "application/json")], json_body))
|
||||
}
|
||||
|
||||
// ── /api/pois ──
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct POIParams {
|
||||
bounds: Option<String>,
|
||||
/// Comma-separated list of categories to filter by
|
||||
categories: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct POIsResponse {
|
||||
pois: Vec<POI>,
|
||||
}
|
||||
|
||||
pub async fn get_pois(
|
||||
state: Arc<AppState>,
|
||||
Query(params): Query<POIParams>,
|
||||
) -> Result<Json<POIsResponse>, (StatusCode, String)> {
|
||||
let bounds_str = params
|
||||
.bounds
|
||||
.ok_or((StatusCode::BAD_REQUEST, "bounds parameter is required".into()))?;
|
||||
|
||||
let parts: Vec<f64> = bounds_str
|
||||
.split(',')
|
||||
.map(|s| s.trim().parse::<f64>())
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map_err(|_| {
|
||||
(
|
||||
StatusCode::BAD_REQUEST,
|
||||
"Invalid bounds format. Use: south,west,north,east".into(),
|
||||
)
|
||||
})?;
|
||||
|
||||
if parts.len() != 4 {
|
||||
return Err((
|
||||
StatusCode::BAD_REQUEST,
|
||||
"Invalid bounds format. Use: south,west,north,east".into(),
|
||||
));
|
||||
}
|
||||
|
||||
let (south, west, north, east) = (parts[0], parts[1], parts[2], parts[3]);
|
||||
|
||||
// Parse category filter if provided
|
||||
let category_filter: Option<Vec<String>> = params
|
||||
.categories
|
||||
.as_deref()
|
||||
.filter(|s| !s.is_empty())
|
||||
.map(|s| s.split(',').map(|c| c.trim().to_string()).collect());
|
||||
|
||||
// Move CPU-heavy work off the async executor
|
||||
let result = tokio::task::spawn_blocking(move || {
|
||||
// Spatial query using grid index
|
||||
let row_indices = state.poi_grid.query(south, west, north, east);
|
||||
|
||||
let pois: Vec<POI> = row_indices
|
||||
.iter()
|
||||
.filter_map(|&row_idx| {
|
||||
let row = row_idx as usize;
|
||||
|
||||
// Apply category filter if specified
|
||||
if let Some(ref categories) = category_filter {
|
||||
if !categories.contains(&state.poi_data.category[row]) {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
Some(POI {
|
||||
id: state.poi_data.id[row].clone(),
|
||||
name: state.poi_data.name[row].clone(),
|
||||
category: state.poi_data.category[row].clone(),
|
||||
lat: state.poi_data.lat[row],
|
||||
lng: state.poi_data.lng[row],
|
||||
emoji: state.poi_data.emoji[row].clone(),
|
||||
})
|
||||
})
|
||||
.take(5000)
|
||||
.collect();
|
||||
|
||||
POIsResponse { pois }
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
Ok(Json(result))
|
||||
}
|
||||
|
||||
// ── /api/poi-categories ──
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct POICategoriesResponse {
|
||||
categories: Vec<String>,
|
||||
}
|
||||
|
||||
pub async fn get_poi_categories(state: Arc<AppState>) -> Json<POICategoriesResponse> {
|
||||
// Compute unique categories
|
||||
let result = tokio::task::spawn_blocking(move || {
|
||||
let mut categories: Vec<String> = state
|
||||
.poi_data
|
||||
.category
|
||||
.iter()
|
||||
.cloned()
|
||||
.collect::<std::collections::HashSet<_>>()
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
categories.sort();
|
||||
|
||||
POICategoriesResponse { categories }
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
Json(result)
|
||||
}
|
||||
|
|
@ -1,18 +0,0 @@
|
|||
"""Server configuration - imports shared values from pipeline config."""
|
||||
|
||||
from pipeline.config import (
|
||||
AGGREGATES_DIR,
|
||||
H3_RESOLUTIONS as VALID_RESOLUTIONS,
|
||||
DEFAULT_H3_RESOLUTION as DEFAULT_RESOLUTION,
|
||||
)
|
||||
|
||||
# Extra area to return beyond requested bounds (0.2 = 20%)
|
||||
# Makes panning smoother by preloading nearby hexagons
|
||||
BOUNDS_BUFFER_PERCENT = 0.2
|
||||
|
||||
__all__ = [
|
||||
"AGGREGATES_DIR",
|
||||
"VALID_RESOLUTIONS",
|
||||
"DEFAULT_RESOLUTION",
|
||||
"BOUNDS_BUFFER_PERCENT",
|
||||
]
|
||||
|
|
@ -1,35 +0,0 @@
|
|||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
from fastapi import FastAPI
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
|
||||
from server.routes import hexagons, pois
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
# Startup: preload all parquet files
|
||||
hexagons.preload_dataframes()
|
||||
pois.preload_pois()
|
||||
yield
|
||||
# Shutdown: nothing to clean up
|
||||
|
||||
|
||||
app = FastAPI(title="Property Map API", lifespan=lifespan)
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=False, # Cannot use True with wildcard origins
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
app.include_router(hexagons.router, prefix="/api")
|
||||
app.include_router(pois.router, prefix="/api")
|
||||
|
||||
# Mount static files for production (frontend build)
|
||||
frontend_dist = Path(__file__).parent.parent / "frontend" / "dist"
|
||||
if frontend_dist.exists():
|
||||
app.mount("/", StaticFiles(directory=frontend_dist, html=True), name="static")
|
||||
|
|
@ -1,172 +0,0 @@
|
|||
import math
|
||||
from functools import lru_cache
|
||||
from fastapi import APIRouter, Query, HTTPException
|
||||
import polars as pl
|
||||
import h3
|
||||
|
||||
from tqdm import tqdm
|
||||
|
||||
from server.config import (
|
||||
AGGREGATES_DIR,
|
||||
VALID_RESOLUTIONS,
|
||||
DEFAULT_RESOLUTION,
|
||||
BOUNDS_BUFFER_PERCENT,
|
||||
)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
# Cache loaded dataframes in memory (one per resolution)
|
||||
_df_cache: dict[int, pl.DataFrame] = {}
|
||||
|
||||
# Discovered features (computed once on first load)
|
||||
_features_cache: list[dict] | None = None
|
||||
|
||||
|
||||
def _snake_to_label(name: str) -> str:
|
||||
"""Convert snake_case feature name to a human-readable label."""
|
||||
return name.replace("_", " ").title()
|
||||
|
||||
|
||||
def _discover_features(df: pl.DataFrame) -> list[dict]:
|
||||
"""Discover features from column pairs min_X / max_X."""
|
||||
features = []
|
||||
seen = set()
|
||||
for col in df.columns:
|
||||
if col.startswith("min_"):
|
||||
name = col[4:]
|
||||
max_col = f"max_{name}"
|
||||
if max_col in df.columns and name not in seen:
|
||||
seen.add(name)
|
||||
global_min = df[col].min()
|
||||
global_max = df[max_col].max()
|
||||
if global_min is not None and global_max is not None:
|
||||
features.append(
|
||||
{
|
||||
"name": name,
|
||||
"min": float(global_min),
|
||||
"max": float(global_max),
|
||||
"label": _snake_to_label(name),
|
||||
}
|
||||
)
|
||||
return features
|
||||
|
||||
|
||||
def preload_dataframes() -> None:
|
||||
"""Load all resolution dataframes into cache on startup."""
|
||||
for resolution in tqdm(VALID_RESOLUTIONS, desc="Loading parquet files"):
|
||||
get_cached_df(resolution)
|
||||
|
||||
|
||||
def get_cached_df(resolution: int) -> pl.DataFrame | None:
|
||||
"""Get cached dataframe for resolution, loading from disk if needed."""
|
||||
if resolution not in _df_cache:
|
||||
parquet_path = AGGREGATES_DIR / f"res{resolution}.parquet"
|
||||
if not parquet_path.exists():
|
||||
return None
|
||||
# Load and add H3 cell centroids for fast bbox filtering
|
||||
df = pl.read_parquet(parquet_path)
|
||||
|
||||
# Pre-compute cell centroids for bbox filtering
|
||||
centroids = [h3.cell_to_latlng(cell) for cell in df["h3"].to_list()]
|
||||
df = df.with_columns(
|
||||
[
|
||||
pl.Series("_lat", [c[0] for c in centroids]),
|
||||
pl.Series("_lng", [c[1] for c in centroids]),
|
||||
]
|
||||
)
|
||||
_df_cache[resolution] = df
|
||||
return _df_cache[resolution]
|
||||
|
||||
|
||||
def get_features() -> list[dict]:
|
||||
"""Get discovered features, computing from the first available resolution."""
|
||||
global _features_cache
|
||||
if _features_cache is None:
|
||||
for resolution in VALID_RESOLUTIONS:
|
||||
df = get_cached_df(resolution)
|
||||
if df is not None:
|
||||
_features_cache = _discover_features(df)
|
||||
break
|
||||
if _features_cache is None:
|
||||
_features_cache = []
|
||||
return _features_cache
|
||||
|
||||
|
||||
@router.get("/features")
|
||||
async def get_features_endpoint() -> dict:
|
||||
"""Return discovered feature metadata with global min/max ranges."""
|
||||
return {"features": get_features()}
|
||||
|
||||
|
||||
@lru_cache(maxsize=128)
|
||||
def query_hexagons_cached(
|
||||
resolution: int,
|
||||
bounds_tuple: tuple[float, float, float, float],
|
||||
) -> list[dict]:
|
||||
"""Cached query - returns features list."""
|
||||
south, west, north, east = bounds_tuple
|
||||
|
||||
df = get_cached_df(resolution)
|
||||
if df is None:
|
||||
return []
|
||||
|
||||
# Fast bbox filter using pre-computed centroids
|
||||
df = df.filter(
|
||||
(pl.col("_lat") >= south)
|
||||
& (pl.col("_lat") <= north)
|
||||
& (pl.col("_lng") >= west)
|
||||
& (pl.col("_lng") <= east)
|
||||
)
|
||||
|
||||
# Drop internal centroid columns before returning
|
||||
df = df.drop("_lat", "_lng")
|
||||
|
||||
return df.to_dicts()
|
||||
|
||||
|
||||
@router.get("/hexagons")
|
||||
async def get_hexagons(
|
||||
resolution: int = Query(
|
||||
DEFAULT_RESOLUTION,
|
||||
ge=min(VALID_RESOLUTIONS),
|
||||
le=max(VALID_RESOLUTIONS),
|
||||
description=f"H3 resolution ({min(VALID_RESOLUTIONS)}-{max(VALID_RESOLUTIONS)})",
|
||||
),
|
||||
bounds: str | None = Query(None, description="Bounding box: south,west,north,east"),
|
||||
) -> dict:
|
||||
"""Get aggregated property data as hexagons within bounds."""
|
||||
if resolution not in VALID_RESOLUTIONS:
|
||||
resolution = DEFAULT_RESOLUTION
|
||||
|
||||
if not bounds:
|
||||
raise HTTPException(status_code=400, detail="bounds parameter is required")
|
||||
|
||||
try:
|
||||
south, west, north, east = map(float, bounds.split(","))
|
||||
except ValueError:
|
||||
raise HTTPException(
|
||||
status_code=400, detail="Invalid bounds format. Use: south,west,north,east"
|
||||
)
|
||||
|
||||
# Expand bounds by buffer percentage for smoother panning
|
||||
lat_range = north - south
|
||||
lng_range = east - west
|
||||
lat_buffer = lat_range * BOUNDS_BUFFER_PERCENT
|
||||
lng_buffer = lng_range * BOUNDS_BUFFER_PERCENT
|
||||
south -= lat_buffer
|
||||
north += lat_buffer
|
||||
west -= lng_buffer
|
||||
east += lng_buffer
|
||||
|
||||
# Round bounds to reduce cache misses (0.01 degree ~ 1km precision)
|
||||
precision = 0.01
|
||||
bounds_tuple = (
|
||||
math.floor(south / precision) * precision,
|
||||
math.floor(west / precision) * precision,
|
||||
math.ceil(north / precision) * precision,
|
||||
math.ceil(east / precision) * precision,
|
||||
)
|
||||
|
||||
features = query_hexagons_cached(resolution, bounds_tuple)
|
||||
|
||||
return {"features": features}
|
||||
|
|
@ -1,322 +0,0 @@
|
|||
"""POI (Points of Interest) API endpoint."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import APIRouter, Query
|
||||
import polars as pl
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
DATA_FILE = Path("data_sources/uk_pois.parquet")
|
||||
|
||||
# Group definitions: maps a group key to its display metadata and the
|
||||
# individual POI categories it contains. Categories are matched against
|
||||
# the values that actually exist in the loaded parquet so that the
|
||||
# selector only shows groups with real data.
|
||||
_GROUP_DEFS: dict[str, dict] = {
|
||||
"schools": {
|
||||
"emoji": "🏫",
|
||||
"label": "Schools",
|
||||
"categories": ["school", "preschool", "college_university", "library"],
|
||||
},
|
||||
"healthcare": {
|
||||
"emoji": "🏥",
|
||||
"label": "Healthcare",
|
||||
"categories": [
|
||||
"doctor",
|
||||
"dentist",
|
||||
"pharmacy",
|
||||
"hospital",
|
||||
"public_health_clinic",
|
||||
"veterinary",
|
||||
"nursing_home",
|
||||
"social_facility",
|
||||
],
|
||||
},
|
||||
"transport": {
|
||||
"emoji": "🚉",
|
||||
"label": "Transport",
|
||||
"categories": [
|
||||
"train_station",
|
||||
"bus_station",
|
||||
"bus_stop",
|
||||
"metro_station",
|
||||
"light_rail_station",
|
||||
"tram_stop",
|
||||
"ferry_terminal",
|
||||
"airport",
|
||||
],
|
||||
},
|
||||
"parks": {
|
||||
"emoji": "🌳",
|
||||
"label": "Parks & Leisure",
|
||||
"categories": [
|
||||
"park",
|
||||
"national_park",
|
||||
"nature_reserve",
|
||||
"dog_park",
|
||||
"playground",
|
||||
"garden",
|
||||
"sports_centre",
|
||||
"swimming_pool",
|
||||
"gym",
|
||||
"golf_course",
|
||||
"marina",
|
||||
],
|
||||
},
|
||||
"emergency": {
|
||||
"emoji": "🚨",
|
||||
"label": "Emergency",
|
||||
"categories": ["police_department", "fire_department"],
|
||||
},
|
||||
"supermarkets": {
|
||||
"emoji": "🛒",
|
||||
"label": "Supermarkets & Grocery",
|
||||
"categories": [
|
||||
"supermarket",
|
||||
"grocery_store",
|
||||
"convenience_store",
|
||||
"bakery",
|
||||
"butcher",
|
||||
"greengrocer",
|
||||
"deli",
|
||||
],
|
||||
},
|
||||
"shopping": {
|
||||
"emoji": "🛍️",
|
||||
"label": "Shopping",
|
||||
"categories": [
|
||||
"department_store",
|
||||
"clothing_store",
|
||||
"shoe_store",
|
||||
"electronics_store",
|
||||
"hardware_store",
|
||||
"furniture_store",
|
||||
"bookshop",
|
||||
"newsagent",
|
||||
"charity_shop",
|
||||
"shopping_centre",
|
||||
"optician",
|
||||
"off_licence",
|
||||
],
|
||||
},
|
||||
"food_drink": {
|
||||
"emoji": "🍽️",
|
||||
"label": "Food & Drink",
|
||||
"categories": [
|
||||
"restaurant",
|
||||
"cafe",
|
||||
"pub",
|
||||
"bar",
|
||||
"fast_food",
|
||||
"food_court",
|
||||
"ice_cream",
|
||||
"beer_garden",
|
||||
],
|
||||
},
|
||||
"personal_care": {
|
||||
"emoji": "💇",
|
||||
"label": "Personal Care",
|
||||
"categories": [
|
||||
"hairdresser",
|
||||
"beauty_salon",
|
||||
"laundry",
|
||||
"dry_cleaning",
|
||||
],
|
||||
},
|
||||
"finance": {
|
||||
"emoji": "🏦",
|
||||
"label": "Finance",
|
||||
"categories": ["bank", "atm", "bureau_de_change"],
|
||||
},
|
||||
"entertainment": {
|
||||
"emoji": "🎭",
|
||||
"label": "Entertainment & Culture",
|
||||
"categories": [
|
||||
"cinema",
|
||||
"theatre",
|
||||
"nightclub",
|
||||
"community_centre",
|
||||
"arts_centre",
|
||||
"museum",
|
||||
"gallery",
|
||||
"attraction",
|
||||
"zoo",
|
||||
"theme_park",
|
||||
"viewpoint",
|
||||
],
|
||||
},
|
||||
"accommodation": {
|
||||
"emoji": "🏨",
|
||||
"label": "Accommodation",
|
||||
"categories": [
|
||||
"hotel",
|
||||
"hostel",
|
||||
"guest_house",
|
||||
"campsite",
|
||||
"caravan_site",
|
||||
],
|
||||
},
|
||||
"religion": {
|
||||
"emoji": "🛐",
|
||||
"label": "Places of Worship",
|
||||
"categories": ["place_of_worship"],
|
||||
},
|
||||
"government": {
|
||||
"emoji": "🏛️",
|
||||
"label": "Government & Public",
|
||||
"categories": [
|
||||
"town_hall",
|
||||
"courthouse",
|
||||
"post_office",
|
||||
"prison",
|
||||
"public_toilets",
|
||||
],
|
||||
},
|
||||
"automotive": {
|
||||
"emoji": "⛽",
|
||||
"label": "Automotive",
|
||||
"categories": [
|
||||
"petrol_station",
|
||||
"ev_charging",
|
||||
"car_dealer",
|
||||
"car_repair",
|
||||
"parking",
|
||||
"bicycle_parking",
|
||||
],
|
||||
},
|
||||
"recycling": {
|
||||
"emoji": "♻️",
|
||||
"label": "Recycling & Waste",
|
||||
"categories": ["recycling", "waste_disposal"],
|
||||
},
|
||||
}
|
||||
|
||||
# Built at startup from the data — only groups whose member categories
|
||||
# actually appear in the parquet file are included.
|
||||
_active_groups: dict[str, dict] = {}
|
||||
|
||||
# Reverse lookup: category value -> group key (built at startup)
|
||||
_cat_to_group: dict[str, str] = {}
|
||||
|
||||
# Cache the dataframe
|
||||
_df_cache: pl.DataFrame | None = None
|
||||
|
||||
|
||||
def _load_and_build() -> pl.DataFrame | None:
|
||||
"""Load the parquet, build category groups from actual data."""
|
||||
global _df_cache, _active_groups, _cat_to_group
|
||||
|
||||
if not DATA_FILE.exists():
|
||||
return None
|
||||
|
||||
df = pl.read_parquet(DATA_FILE).select("id", "name", "category", "lat", "lng")
|
||||
|
||||
# Distinct categories present in the data
|
||||
data_categories: set[str] = set(
|
||||
df.select("category").unique().to_series().to_list()
|
||||
)
|
||||
|
||||
# Per-category counts for the response
|
||||
counts: dict[str, int] = dict(
|
||||
df.group_by("category")
|
||||
.agg(pl.len().alias("n"))
|
||||
.iter_rows()
|
||||
)
|
||||
|
||||
# Build reverse map from every known category to its group
|
||||
cat_to_group: dict[str, str] = {}
|
||||
for key, gdef in _GROUP_DEFS.items():
|
||||
for cat in gdef["categories"]:
|
||||
cat_to_group[cat] = key
|
||||
|
||||
# Only keep categories that belong to a known group
|
||||
known_categories = data_categories & cat_to_group.keys()
|
||||
|
||||
# Build active groups — only those with at least one matching category
|
||||
active: dict[str, dict] = {}
|
||||
for key, gdef in _GROUP_DEFS.items():
|
||||
present = [c for c in gdef["categories"] if c in known_categories]
|
||||
if present:
|
||||
active[key] = {
|
||||
"emoji": gdef["emoji"],
|
||||
"label": gdef["label"],
|
||||
"categories": present,
|
||||
"count": sum(counts.get(c, 0) for c in present),
|
||||
}
|
||||
|
||||
_active_groups = active
|
||||
_cat_to_group = cat_to_group
|
||||
|
||||
# Filter dataframe to only known categories
|
||||
_df_cache = df.filter(pl.col("category").is_in(known_categories))
|
||||
return _df_cache
|
||||
|
||||
|
||||
def get_df() -> pl.DataFrame | None:
|
||||
"""Return cached POI dataframe, loading if necessary."""
|
||||
if _df_cache is None:
|
||||
return _load_and_build()
|
||||
return _df_cache
|
||||
|
||||
|
||||
def preload_pois() -> None:
|
||||
"""Preload POI data on startup."""
|
||||
df = _load_and_build()
|
||||
if df is not None:
|
||||
n_groups = len(_active_groups)
|
||||
print(f"Loaded {len(df):,} POIs across {n_groups} category groups")
|
||||
|
||||
|
||||
@router.get("/pois")
|
||||
async def get_pois(
|
||||
categories: str = Query(..., description="Comma-separated category groups"),
|
||||
bounds: str = Query(..., description="Bounding box: south,west,north,east"),
|
||||
) -> dict:
|
||||
"""Get POIs within bounds for specified category groups."""
|
||||
df = get_df()
|
||||
if df is None:
|
||||
return {"features": []}
|
||||
|
||||
try:
|
||||
south, west, north, east = map(float, bounds.split(","))
|
||||
except ValueError:
|
||||
return {"features": []}
|
||||
|
||||
requested_groups = [g.strip() for g in categories.split(",")]
|
||||
cats_to_include: set[str] = set()
|
||||
for group in requested_groups:
|
||||
if group in _active_groups:
|
||||
cats_to_include.update(_active_groups[group]["categories"])
|
||||
|
||||
if not cats_to_include:
|
||||
return {"features": []}
|
||||
|
||||
filtered = df.filter(
|
||||
(pl.col("lat") >= south)
|
||||
& (pl.col("lat") <= north)
|
||||
& (pl.col("lng") >= west)
|
||||
& (pl.col("lng") <= east)
|
||||
& (pl.col("category").is_in(cats_to_include))
|
||||
)
|
||||
|
||||
MAX_POIS = 5000
|
||||
if len(filtered) > MAX_POIS:
|
||||
filtered = filtered.sample(n=MAX_POIS, seed=42)
|
||||
|
||||
return {"features": filtered.to_dicts()}
|
||||
|
||||
|
||||
@router.get("/poi-categories")
|
||||
async def get_poi_categories() -> dict:
|
||||
"""Get available POI category groups derived from loaded data."""
|
||||
return {
|
||||
"categories": {
|
||||
key: {
|
||||
"emoji": group["emoji"],
|
||||
"label": group["label"],
|
||||
"count": group["count"],
|
||||
}
|
||||
for key, group in _active_groups.items()
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue