good stuff
This commit is contained in:
parent
ea8389ef40
commit
f4de0eeb9f
39 changed files with 5165 additions and 348 deletions
|
|
@ -4,9 +4,11 @@ use axum::extract::Query;
|
|||
use axum::http::StatusCode;
|
||||
use axum::response::{IntoResponse, Json};
|
||||
use axum::Extension;
|
||||
use rayon::prelude::*;
|
||||
use rustc_hash::FxHashMap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{Map, Value};
|
||||
use metrics::histogram;
|
||||
use tracing::info;
|
||||
|
||||
use crate::aggregation::Aggregator;
|
||||
|
|
@ -15,12 +17,15 @@ use crate::consts::{DEMO_BOUNDS, MAX_CELLS_PER_REQUEST};
|
|||
use crate::data::travel_time::TravelData;
|
||||
use crate::licensing::check_license_bounds;
|
||||
use crate::parsing::{
|
||||
bounds_intersect, cell_for_row, h3_cell_bounds, needs_parent, parse_field_indices,
|
||||
bounds_intersect, cell_for_row_cached, h3_cell_bounds, needs_parent, parse_field_indices,
|
||||
parse_filters, require_bounds, row_passes_filters, validate_h3_resolution,
|
||||
};
|
||||
use crate::routes::travel_time::{parse_optional_travel, TravelTimeAgg};
|
||||
use crate::state::AppState;
|
||||
|
||||
/// Row count threshold above which we use rayon parallel aggregation.
|
||||
const PARALLEL_THRESHOLD: usize = 50_000;
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct HexagonsResponse {
|
||||
features: Vec<Map<String, Value>>,
|
||||
|
|
@ -202,11 +207,67 @@ pub async fn get_hexagons(
|
|||
.map(|_| FxHashMap::default())
|
||||
.collect();
|
||||
|
||||
// Main aggregation loop
|
||||
let aggregate_row =
|
||||
|row: usize,
|
||||
groups: &mut FxHashMap<u64, Aggregator>,
|
||||
travel_aggs: &mut [FxHashMap<u64, TravelTimeAgg>]| {
|
||||
// Collect row indices for threshold-based sequential/parallel aggregation
|
||||
let row_indices = state.grid.query(south, west, north, east);
|
||||
|
||||
if row_indices.len() >= PARALLEL_THRESHOLD && !has_travel {
|
||||
// Parallel path: split rows across rayon threads, each with local accumulators
|
||||
let chunk_size = (row_indices.len() / rayon::current_num_threads()).max(1000);
|
||||
|
||||
let thread_results: Vec<FxHashMap<u64, Aggregator>> = row_indices
|
||||
.par_chunks(chunk_size)
|
||||
.map(|chunk| {
|
||||
let mut local_groups: FxHashMap<u64, Aggregator> = FxHashMap::default();
|
||||
let mut h3_cache: FxHashMap<u64, u64> = FxHashMap::default();
|
||||
for &row_idx in chunk {
|
||||
let row = row_idx as usize;
|
||||
if !row_passes_filters(
|
||||
row,
|
||||
&parsed_filters,
|
||||
&parsed_enum_filters,
|
||||
feature_data,
|
||||
num_features,
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
let cell_id =
|
||||
cell_for_row_cached(row, precomputed, h3_res, need_parent, &mut h3_cache);
|
||||
let agg = local_groups
|
||||
.entry(cell_id)
|
||||
.or_insert_with(|| Aggregator::new(num_features));
|
||||
if let Some(sel_indices) = field_indices.as_deref() {
|
||||
agg.add_row_selective(
|
||||
feature_data,
|
||||
row,
|
||||
num_features,
|
||||
sel_indices,
|
||||
&quant,
|
||||
);
|
||||
} else {
|
||||
agg.add_row(feature_data, row, num_features, &quant);
|
||||
}
|
||||
}
|
||||
local_groups
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Merge thread-local results into the main groups map
|
||||
for local_groups in thread_results {
|
||||
for (cell_id, local_agg) in local_groups {
|
||||
let agg = groups
|
||||
.entry(cell_id)
|
||||
.or_insert_with(|| Aggregator::new(num_features));
|
||||
agg.merge(&local_agg);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Sequential path (also handles travel time which needs postcode lookups)
|
||||
let mut travel_minutes: Vec<Option<i16>> = Vec::with_capacity(travel_entries.len());
|
||||
let mut h3_cache: FxHashMap<u64, u64> = FxHashMap::default();
|
||||
|
||||
'row: for &row_idx in &row_indices {
|
||||
let row = row_idx as usize;
|
||||
|
||||
// Regular filters
|
||||
if !row_passes_filters(
|
||||
row,
|
||||
|
|
@ -215,14 +276,13 @@ pub async fn get_hexagons(
|
|||
feature_data,
|
||||
num_features,
|
||||
) {
|
||||
return;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Travel time filter: check each entry with a range
|
||||
let mut travel_minutes: Vec<Option<i16>> = Vec::new();
|
||||
if has_travel {
|
||||
travel_minutes.clear();
|
||||
let postcode = pc_interner.resolve(&pc_keys[row]);
|
||||
travel_minutes.reserve(travel_entries.len());
|
||||
for (ti, entry) in travel_entries.iter().enumerate() {
|
||||
let row_data = travel_data[ti].get(postcode);
|
||||
let minutes = row_data.map(|r| {
|
||||
|
|
@ -236,13 +296,14 @@ pub async fn get_hexagons(
|
|||
if let (Some(fmin), Some(fmax)) = (entry.filter_min, entry.filter_max) {
|
||||
match minutes {
|
||||
Some(mins) if (mins as f32) >= fmin && (mins as f32) <= fmax => {}
|
||||
_ => return, // Filtered out
|
||||
_ => continue 'row, // Filtered out (jump to next row_idx)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let cell_id = cell_for_row(row, precomputed, h3_res, need_parent);
|
||||
let cell_id =
|
||||
cell_for_row_cached(row, precomputed, h3_res, need_parent, &mut h3_cache);
|
||||
|
||||
// Aggregate regular features
|
||||
let aggregation = groups
|
||||
|
|
@ -269,13 +330,8 @@ pub async fn get_hexagons(
|
|||
agg.add(*mins as f32);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
state
|
||||
.grid
|
||||
.for_each_in_bounds(south, west, north, east, |row_idx| {
|
||||
aggregate_row(row_idx as usize, &mut groups, &mut travel_aggs);
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
let t_agg = t0.elapsed();
|
||||
|
||||
|
|
@ -296,9 +352,12 @@ pub async fn get_hexagons(
|
|||
features.truncate(MAX_CELLS_PER_REQUEST);
|
||||
}
|
||||
|
||||
let parallel = row_indices.len() >= PARALLEL_THRESHOLD && !has_travel;
|
||||
let t_total = t0.elapsed();
|
||||
info!(
|
||||
resolution,
|
||||
rows = row_indices.len(),
|
||||
parallel,
|
||||
cells_before_filter = groups.len(),
|
||||
cells_after_filter = features.len(),
|
||||
truncated,
|
||||
|
|
@ -311,6 +370,8 @@ pub async fn get_hexagons(
|
|||
"GET /api/hexagons"
|
||||
);
|
||||
|
||||
histogram!("hexagons_response_count").record(features.len() as f64);
|
||||
|
||||
Ok(HexagonsResponse { features })
|
||||
})
|
||||
.await
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue