use std::sync::Arc; use axum::extract::Query; use axum::http::StatusCode; use axum::response::Json; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use tracing::{info, warn}; use crate::consts::{H3_PRECOMPUTE_MAX, H3_REQUEST_MAX, H3_REQUEST_MIN}; use crate::parsing::{bounds_intersect, h3_cell_bounds, parse_bounds, parse_filters, row_passes_filters}; use crate::state::AppState; #[derive(Serialize)] pub struct HexagonsResponse { features: Vec>, } #[derive(Deserialize)] pub struct HexagonParams { resolution: u8, bounds: Option, /// Comma-separated filters: `name:min:max,...` /// Rows must have non-NaN values within [min,max] for each filter. filters: Option, /// Comma-separated feature names to include in min/max aggregation. /// When present (even if empty), only listed features are aggregated and written. /// When absent, all features are included (backward compatible). fields: Option, } /// Per-cell accumulator for aggregating features. /// Uses Box<[T]> instead of Vec to avoid storing capacity (saves 8 bytes per field per cell). struct CellAgg { count: u32, mins: Box<[f32]>, maxs: Box<[f32]>, } impl CellAgg { fn new(num_features: usize) -> Self { CellAgg { count: 0, mins: vec![f32::INFINITY; num_features].into_boxed_slice(), maxs: vec![f32::NEG_INFINITY; num_features].into_boxed_slice(), } } /// Add a row using row-major feature_data layout. /// feature_data[row * num_features + feat_idx] — all features for one row /// are contiguous, so this reads a single cache line per ~8 features. #[inline] fn add_row(&mut self, feature_data: &[f32], row: usize, num_features: usize) { self.count += 1; let base = row * num_features; let row_slice = &feature_data[base..base + num_features]; for (feat_index, &value) in row_slice.iter().enumerate() { if value.is_finite() { if value < self.mins[feat_index] { self.mins[feat_index] = value; } if value > self.maxs[feat_index] { self.maxs[feat_index] = value; } } } } /// Add a row, only aggregating the features at the given indices. #[inline] fn add_row_selective( &mut self, feature_data: &[f32], row: usize, num_features: usize, indices: &[usize], ) { self.count += 1; let base = row * num_features; for &feat_index in indices { let value = feature_data[base + feat_index]; if value.is_finite() { if value < self.mins[feat_index] { self.mins[feat_index] = value; } if value > self.maxs[feat_index] { self.maxs[feat_index] = value; } } } } } /// Build feature maps from aggregated cell data, filtering to only cells that intersect the query bounds. fn build_feature_maps( groups: &FxHashMap, min_keys: &[String], max_keys: &[String], num_features: usize, indices: Option<&[usize]>, query_bounds: (f64, f64, f64, f64), // (south, west, north, east) ) -> Vec> { let mut features = Vec::with_capacity(groups.len()); let (q_south, q_west, q_north, q_east) = query_bounds; for (&cell_id, aggregation) in groups { let Some(cell) = h3o::CellIndex::try_from(cell_id).ok() else { continue; }; // Filter out cells that don't intersect the query bounds let (c_south, c_west, c_north, c_east) = h3_cell_bounds(cell, 0.0); if !bounds_intersect(c_south, c_west, c_north, c_east, q_south, q_west, q_north, q_east) { continue; } let mut map = Map::new(); map.insert("h3".into(), Value::String(cell.to_string())); map.insert("count".into(), Value::Number(aggregation.count.into())); let iter: Box> = if let Some(idx) = indices { Box::new(idx.iter().copied()) } else { Box::new(0..num_features) }; for feat_index in iter { if aggregation.mins[feat_index].is_finite() && aggregation.maxs[feat_index].is_finite() { if let (Some(min_num), Some(max_num)) = ( serde_json::Number::from_f64(aggregation.mins[feat_index] as f64), serde_json::Number::from_f64(aggregation.maxs[feat_index] as f64), ) { map.insert(min_keys[feat_index].clone(), Value::Number(min_num)); map.insert(max_keys[feat_index].clone(), Value::Number(max_num)); } } } features.push(map); } features } pub async fn get_hexagons( state: Arc, Query(params): Query, ) -> Result, (StatusCode, String)> { let resolution = params.resolution; if !(H3_REQUEST_MIN..=H3_REQUEST_MAX).contains(&resolution) { warn!( resolution, "Resolution out of range [{}, {}]", H3_REQUEST_MIN, H3_REQUEST_MAX ); return Err(( StatusCode::BAD_REQUEST, format!( "resolution must be between {} and {}", H3_REQUEST_MIN, H3_REQUEST_MAX ), )); } let bounds_str = params.bounds.ok_or(( StatusCode::BAD_REQUEST, "bounds parameter is required".into(), ))?; let (south, west, north, east) = parse_bounds(&bounds_str)?; let filters_str = params.filters.clone(); let (parsed_filters, parsed_enum_filters) = parse_filters( params.filters.as_deref(), &state.feature_name_to_index, &state.data.enum_values, ); let num_filters = parsed_filters.len() + parsed_enum_filters.len(); // Parse optional `fields` param into feature indices. // If `fields` is absent (None), all features are included. // If `fields` is present (even empty string), only listed features are included. let field_indices: Option> = params.fields.as_ref().map(|fields_str| { if fields_str.is_empty() { return Vec::new(); } fields_str .split(',') .filter_map(|name| { let name = name.trim(); if name.is_empty() { return None; } state.feature_name_to_index.get(name).copied() }) .collect() }); let response = tokio::task::spawn_blocking(move || -> Result { let t0 = std::time::Instant::now(); let num_features = state.data.num_features; let feature_data = &state.data.feature_data; let min_keys = &state.min_keys; let max_keys = &state.max_keys; let h3_res = h3o::Resolution::try_from(resolution) .map_err(|error| format!("Invalid H3 resolution {}: {}", resolution, error))?; let precomputed = &state.h3_cells; let need_parent = resolution < H3_PRECOMPUTE_MAX; let mut groups: FxHashMap = FxHashMap::default(); let cell_for_row = |row: usize| -> u64 { let max_cell = precomputed[row]; if !need_parent || max_cell == 0 { return max_cell; } h3o::CellIndex::try_from(max_cell) .ok() .and_then(|ci| ci.parent(h3_res)) .map(u64::from) .unwrap_or(0) }; // Hoist has_selective branch outside the hot loop to avoid per-row branching if let Some(sel_indices) = field_indices.as_deref() { state .grid .for_each_in_bounds(south, west, north, east, |row_idx| { let row = row_idx as usize; if !row_passes_filters( row, &parsed_filters, &parsed_enum_filters, feature_data, num_features, ) { return; } let cell_id = cell_for_row(row); let aggregation = groups .entry(cell_id) .or_insert_with(|| CellAgg::new(num_features)); aggregation.add_row_selective(feature_data, row, num_features, sel_indices); }); } else { state .grid .for_each_in_bounds(south, west, north, east, |row_idx| { let row = row_idx as usize; if !row_passes_filters( row, &parsed_filters, &parsed_enum_filters, feature_data, num_features, ) { return; } let cell_id = cell_for_row(row); let aggregation = groups .entry(cell_id) .or_insert_with(|| CellAgg::new(num_features)); aggregation.add_row(feature_data, row, num_features); }); } let t_agg = t0.elapsed(); let features = build_feature_maps( &groups, min_keys, max_keys, num_features, field_indices.as_deref(), (south, west, north, east), ); let t_total = t0.elapsed(); info!( resolution, cells_before_filter = groups.len(), cells_after_filter = features.len(), bounds = format_args!("{:.4},{:.4},{:.4},{:.4}", south, west, north, east), filters = num_filters, filters_raw = filters_str.as_deref().unwrap_or("-"), agg_ms = format_args!("{:.1}", t_agg.as_secs_f64() * 1000.0), total_ms = format_args!("{:.1}", t_total.as_secs_f64() * 1000.0), "GET /api/hexagons" ); Ok(HexagonsResponse { features }) }) .await .map_err(|error| (StatusCode::INTERNAL_SERVER_ERROR, error.to_string()))? .map_err(|error| (StatusCode::INTERNAL_SERVER_ERROR, error))?; Ok(Json(response)) }