Fix OOMs
This commit is contained in:
parent
b580c51b6d
commit
7ca29c2d81
2 changed files with 58 additions and 62 deletions
|
|
@ -133,12 +133,49 @@ pub struct PostcodeData {
|
||||||
pub centroids: Vec<(f32, f32)>,
|
pub centroids: Vec<(f32, f32)>,
|
||||||
/// Precomputed AABB per postcode: (south, west, north, east) as f32
|
/// Precomputed AABB per postcode: (south, west, north, east) as f32
|
||||||
pub aabbs: Vec<(f32, f32, f32, f32)>,
|
pub aabbs: Vec<(f32, f32, f32, f32)>,
|
||||||
/// Precomputed GeoJSON geometry Value per postcode
|
/// Compact polygon storage: outer Vec is per-postcode, inner Vecs are rings of [lon, lat] f32 pairs.
|
||||||
pub geometries: Vec<serde_json::Value>,
|
/// Held as raw f32 to keep boundary memory ~10x smaller than serde_json::Value form.
|
||||||
|
pub polygons: Vec<Vec<Vec<[f32; 2]>>>,
|
||||||
/// Lookup from postcode string to index
|
/// Lookup from postcode string to index
|
||||||
pub postcode_to_idx: FxHashMap<String, usize>,
|
pub postcode_to_idx: FxHashMap<String, usize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl PostcodeData {
|
||||||
|
/// Build the GeoJSON Value for a postcode polygon on demand.
|
||||||
|
pub fn geometry_geojson(&self, idx: usize) -> serde_json::Value {
|
||||||
|
let rings = &self.polygons[idx];
|
||||||
|
if rings.len() == 1 {
|
||||||
|
let coords: Vec<serde_json::Value> = rings[0]
|
||||||
|
.iter()
|
||||||
|
.map(|[lon, lat]| {
|
||||||
|
serde_json::Value::Array(vec![
|
||||||
|
serde_json::Value::from(*lon as f64),
|
||||||
|
serde_json::Value::from(*lat as f64),
|
||||||
|
])
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
serde_json::json!({"type": "Polygon", "coordinates": [coords]})
|
||||||
|
} else {
|
||||||
|
let polys: Vec<serde_json::Value> = rings
|
||||||
|
.iter()
|
||||||
|
.map(|ring| {
|
||||||
|
let coords: Vec<serde_json::Value> = ring
|
||||||
|
.iter()
|
||||||
|
.map(|[lon, lat]| {
|
||||||
|
serde_json::Value::Array(vec![
|
||||||
|
serde_json::Value::from(*lon as f64),
|
||||||
|
serde_json::Value::from(*lat as f64),
|
||||||
|
])
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
serde_json::Value::Array(vec![serde_json::Value::Array(coords)])
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
serde_json::json!({"type": "MultiPolygon", "coordinates": polys})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl PostcodeData {
|
impl PostcodeData {
|
||||||
/// Load postcode boundaries from a directory of GeoJSON files.
|
/// Load postcode boundaries from a directory of GeoJSON files.
|
||||||
/// Expects the directory to have a `units/` subdirectory containing .geojson files.
|
/// Expects the directory to have a `units/` subdirectory containing .geojson files.
|
||||||
|
|
@ -295,49 +332,13 @@ impl PostcodeData {
|
||||||
postcode_to_idx.insert(postcode.clone(), idx);
|
postcode_to_idx.insert(postcode.clone(), idx);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Precompute GeoJSON geometry for each postcode
|
|
||||||
let geometries: Vec<serde_json::Value> = polygons
|
|
||||||
.iter()
|
|
||||||
.map(|rings| {
|
|
||||||
if rings.len() == 1 {
|
|
||||||
let coords: Vec<serde_json::Value> = rings[0]
|
|
||||||
.iter()
|
|
||||||
.map(|[lon, lat]| {
|
|
||||||
serde_json::Value::Array(vec![
|
|
||||||
serde_json::Value::from(*lon as f64),
|
|
||||||
serde_json::Value::from(*lat as f64),
|
|
||||||
])
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
serde_json::json!({"type": "Polygon", "coordinates": [coords]})
|
|
||||||
} else {
|
|
||||||
let polys: Vec<serde_json::Value> = rings
|
|
||||||
.iter()
|
|
||||||
.map(|ring| {
|
|
||||||
let coords: Vec<serde_json::Value> = ring
|
|
||||||
.iter()
|
|
||||||
.map(|[lon, lat]| {
|
|
||||||
serde_json::Value::Array(vec![
|
|
||||||
serde_json::Value::from(*lon as f64),
|
|
||||||
serde_json::Value::from(*lat as f64),
|
|
||||||
])
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
serde_json::Value::Array(vec![serde_json::Value::Array(coords)])
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
serde_json::json!({"type": "MultiPolygon", "coordinates": polys})
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
info!(postcodes = postcodes.len(), "Postcode boundary data ready");
|
info!(postcodes = postcodes.len(), "Postcode boundary data ready");
|
||||||
|
|
||||||
Ok(PostcodeData {
|
Ok(PostcodeData {
|
||||||
postcodes,
|
postcodes,
|
||||||
centroids,
|
centroids,
|
||||||
aabbs,
|
aabbs,
|
||||||
geometries,
|
polygons,
|
||||||
postcode_to_idx,
|
postcode_to_idx,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1282,23 +1282,20 @@ impl PropertyData {
|
||||||
};
|
};
|
||||||
let mut poi_metrics = PostcodePoiMetrics::from_postcode_df(&postcode_df, poi_metric_names)?;
|
let mut poi_metrics = PostcodePoiMetrics::from_postcode_df(&postcode_df, poi_metric_names)?;
|
||||||
|
|
||||||
// Load properties.parquet and join with postcode data for lat/lon + area features
|
// Load properties.parquet and join with postcode data lazily so the
|
||||||
|
// wide combined frame is never fully materialized — projection is
|
||||||
|
// pushed down into the join, keeping peak memory bounded.
|
||||||
tracing::info!("Loading properties from {:?}", properties_path);
|
tracing::info!("Loading properties from {:?}", properties_path);
|
||||||
let properties_path = PlRefPath::try_from_path(properties_path)
|
let properties_path = PlRefPath::try_from_path(properties_path)
|
||||||
.context("Failed to normalize properties parquet path")?;
|
.context("Failed to normalize properties parquet path")?;
|
||||||
let properties_lf = LazyFrame::scan_parquet(properties_path, Default::default())
|
let properties_lf = LazyFrame::scan_parquet(properties_path, Default::default())
|
||||||
.context("Failed to scan properties parquet")?;
|
.context("Failed to scan properties parquet")?;
|
||||||
let combined = properties_lf
|
let combined_lf = properties_lf.join(
|
||||||
.join(
|
postcode_df.lazy(),
|
||||||
postcode_df.clone().lazy(),
|
[col("Postcode")],
|
||||||
[col("Postcode")],
|
[col("Postcode")],
|
||||||
[col("Postcode")],
|
JoinArgs::new(JoinType::Left),
|
||||||
JoinArgs::new(JoinType::Left),
|
);
|
||||||
)
|
|
||||||
.collect()
|
|
||||||
.context("Failed to join properties with postcodes")?;
|
|
||||||
let total_rows = combined.height();
|
|
||||||
tracing::info!(rows = total_rows, "Properties joined with postcodes");
|
|
||||||
|
|
||||||
// Get configured feature/enum names in config order. Dynamic POI
|
// Get configured feature/enum names in config order. Dynamic POI
|
||||||
// metrics live in a postcode-level side table so they do not widen the
|
// metrics live in a postcode-level side table so they do not widen the
|
||||||
|
|
@ -1306,7 +1303,10 @@ impl PropertyData {
|
||||||
let configured_numeric_names = features::all_numeric_feature_names();
|
let configured_numeric_names = features::all_numeric_feature_names();
|
||||||
let enum_names = features::all_enum_feature_names();
|
let enum_names = features::all_enum_feature_names();
|
||||||
|
|
||||||
let schema = combined.schema();
|
let schema = combined_lf
|
||||||
|
.clone()
|
||||||
|
.collect_schema()
|
||||||
|
.context("Failed to collect joined schema")?;
|
||||||
let numeric_names: Vec<String> = configured_numeric_names
|
let numeric_names: Vec<String> = configured_numeric_names
|
||||||
.iter()
|
.iter()
|
||||||
.map(|name| (*name).to_string())
|
.map(|name| (*name).to_string())
|
||||||
|
|
@ -1402,24 +1402,16 @@ impl PropertyData {
|
||||||
if has_renovation_history {
|
if has_renovation_history {
|
||||||
select_exprs.push(col("renovation_history"));
|
select_exprs.push(col("renovation_history"));
|
||||||
}
|
}
|
||||||
let df = combined
|
let df = combined_lf
|
||||||
.lazy()
|
|
||||||
.filter(col("lat").is_not_null().and(col("lon").is_not_null()))
|
.filter(col("lat").is_not_null().and(col("lon").is_not_null()))
|
||||||
.select(select_exprs)
|
.select(select_exprs)
|
||||||
.collect()
|
.collect()
|
||||||
.context("Failed to select columns from combined data")?;
|
.context("Failed to select columns from joined frame")?;
|
||||||
|
|
||||||
let row_count = df.height();
|
let row_count = df.height();
|
||||||
if row_count == 0 {
|
if row_count == 0 {
|
||||||
bail!("No property rows have usable coordinates after joining postcode data");
|
bail!("No property rows have usable coordinates after joining postcode data");
|
||||||
}
|
}
|
||||||
let dropped_coordinate_rows = total_rows.saturating_sub(row_count);
|
|
||||||
if dropped_coordinate_rows > 0 {
|
|
||||||
tracing::warn!(
|
|
||||||
rows = dropped_coordinate_rows,
|
|
||||||
"Dropped properties with missing postcode coordinates"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
tracing::info!(rows = row_count, "Combined data selected");
|
tracing::info!(rows = row_count, "Combined data selected");
|
||||||
|
|
||||||
let lat_series = df
|
let lat_series = df
|
||||||
|
|
@ -1692,6 +1684,9 @@ impl PropertyData {
|
||||||
FxHashMap::default()
|
FxHashMap::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Free the projected joined frame before building the row-major matrix.
|
||||||
|
drop(df);
|
||||||
|
|
||||||
// Sort all rows by spatial locality so that grid queries access
|
// Sort all rows by spatial locality so that grid queries access
|
||||||
// contiguous memory (sequential reads instead of random DRAM accesses).
|
// contiguous memory (sequential reads instead of random DRAM accesses).
|
||||||
tracing::info!("Sorting rows by spatial locality");
|
tracing::info!("Sorting rows by spatial locality");
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue