Fun changes
This commit is contained in:
parent
cd778dd088
commit
349a6c1d53
60 changed files with 1260 additions and 2600 deletions
|
|
@ -166,6 +166,8 @@ pub struct PropertyData {
|
|||
/// For enum features: maps feature index to list of possible string values.
|
||||
/// Index in values list corresponds to the u16 value stored in feature_data.
|
||||
pub enum_values: rustc_hash::FxHashMap<usize, Vec<String>>,
|
||||
/// For enum features: maps feature index to per-value global counts (same order as enum_values).
|
||||
pub enum_counts: rustc_hash::FxHashMap<usize, Vec<u64>>,
|
||||
/// Per-row flag: true = construction date is approximate (from EPC band),
|
||||
/// false = exact (from new-build transaction date).
|
||||
/// Bit-packed: byte `row / 8`, bit `row % 8`. 8x smaller than Vec<bool>.
|
||||
|
|
@ -173,12 +175,6 @@ pub struct PropertyData {
|
|||
/// Per-row renovation events. Keyed by (permuted) row index.
|
||||
/// Only rows with events are present in the map.
|
||||
renovation_history: FxHashMap<u32, Vec<RenovationEvent>>,
|
||||
/// Per-row listing features (key feature bullet points from online listings).
|
||||
/// Only rows with features are present in the map.
|
||||
listing_features: FxHashMap<u32, Vec<String>>,
|
||||
/// Sparse per-row optional string columns from online listings.
|
||||
/// Only rows with non-empty values are stored (saves ~1 GB vs Vec<Option<String>>).
|
||||
listing_url: FxHashMap<u32, String>,
|
||||
property_sub_type: FxHashMap<u32, String>,
|
||||
price_qualifier: FxHashMap<u32, String>,
|
||||
}
|
||||
|
|
@ -215,19 +211,6 @@ impl PropertyData {
|
|||
.unwrap_or(&[])
|
||||
}
|
||||
|
||||
/// Get listing features for a given row (empty slice if none).
|
||||
pub fn listing_features(&self, row: usize) -> &[String] {
|
||||
self.listing_features
|
||||
.get(&(row as u32))
|
||||
.map(|v| v.as_slice())
|
||||
.unwrap_or(&[])
|
||||
}
|
||||
|
||||
/// Get listing URL for a given row.
|
||||
pub fn listing_url(&self, row: usize) -> Option<&str> {
|
||||
self.listing_url.get(&(row as u32)).map(String::as_str)
|
||||
}
|
||||
|
||||
/// Get property sub-type for a given row.
|
||||
pub fn property_sub_type(&self, row: usize) -> Option<&str> {
|
||||
self.property_sub_type
|
||||
|
|
@ -534,8 +517,6 @@ impl PropertyData {
|
|||
pub fn load(
|
||||
properties_path: &Path,
|
||||
postcode_features_path: &Path,
|
||||
listings_buy_path: &Path,
|
||||
listings_rent_path: &Path,
|
||||
) -> anyhow::Result<Self> {
|
||||
// Load postcode.parquet
|
||||
tracing::info!(
|
||||
|
|
@ -551,9 +532,8 @@ impl PropertyData {
|
|||
// Load properties.parquet and join with postcode data for lat/lon + area features
|
||||
tracing::info!("Loading properties from {:?}", properties_path);
|
||||
let properties_lf = LazyFrame::scan_parquet(properties_path, Default::default())
|
||||
.context("Failed to scan properties parquet")?
|
||||
.with_columns([lit("Historical sale").alias("Listing status")]);
|
||||
let properties_joined = properties_lf
|
||||
.context("Failed to scan properties parquet")?;
|
||||
let combined = properties_lf
|
||||
.join(
|
||||
postcode_df.clone().lazy(),
|
||||
[col("Postcode")],
|
||||
|
|
@ -562,77 +542,8 @@ impl PropertyData {
|
|||
)
|
||||
.collect()
|
||||
.context("Failed to join properties with postcodes")?;
|
||||
let prop_count = properties_joined.height();
|
||||
tracing::info!(rows = prop_count, "Properties joined with postcodes");
|
||||
|
||||
// Load online listings (buy + rent) — these have their own lat/lon.
|
||||
// Expects the new finder parquet format with human-readable column names.
|
||||
let load_listings = |path: &Path, label: &str| -> anyhow::Result<DataFrame> {
|
||||
tracing::info!("Loading {} listings from {:?}", label, path);
|
||||
let lf = LazyFrame::scan_parquet(path, Default::default())
|
||||
.with_context(|| format!("Failed to scan {label} listings parquet"))?;
|
||||
|
||||
// Join with postcodes for area features (listings have their own lat/lon)
|
||||
let pc_no_coords = postcode_df.clone().lazy().drop(["lat", "lon"]);
|
||||
let joined = lf
|
||||
.join(
|
||||
pc_no_coords,
|
||||
[col("Postcode")],
|
||||
[col("Postcode")],
|
||||
JoinArgs::new(JoinType::Left),
|
||||
)
|
||||
.collect()
|
||||
.with_context(|| format!("Failed to join {label} listings with postcodes"))?;
|
||||
tracing::info!(rows = joined.height(), "{} listings joined", label);
|
||||
Ok(joined)
|
||||
};
|
||||
let listings_buy = load_listings(listings_buy_path, "buy")?;
|
||||
// Derive "Asking price per sqm" if not already present
|
||||
let listings_buy = if listings_buy.schema().get("Asking price per sqm").is_none() {
|
||||
listings_buy
|
||||
.lazy()
|
||||
.with_column(
|
||||
(col("Asking price").cast(DataType::Float64) / col("Total floor area (sqm)"))
|
||||
.round(0)
|
||||
.alias("Asking price per sqm"),
|
||||
)
|
||||
.collect()
|
||||
.context("Failed to derive Asking price per sqm")?
|
||||
} else {
|
||||
listings_buy
|
||||
};
|
||||
let listings_rent = load_listings(listings_rent_path, "rent")?;
|
||||
|
||||
// Concatenate all rows into a single DataFrame
|
||||
tracing::info!("Concatenating all data sources");
|
||||
let buy_count = listings_buy.height();
|
||||
let rent_count = listings_rent.height();
|
||||
let combined = concat(
|
||||
[
|
||||
properties_joined.lazy(),
|
||||
listings_buy.lazy(),
|
||||
listings_rent.lazy(),
|
||||
],
|
||||
UnionArgs {
|
||||
parallel: false,
|
||||
rechunk: true,
|
||||
to_supertypes: true,
|
||||
diagonal: true,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.context("Failed to concat data sources")?
|
||||
.collect()
|
||||
.context("Failed to collect combined data")?;
|
||||
|
||||
let total_rows = combined.height();
|
||||
tracing::info!(
|
||||
properties = prop_count,
|
||||
buy_listings = buy_count,
|
||||
rent_listings = rent_count,
|
||||
total = total_rows,
|
||||
"All data sources combined"
|
||||
);
|
||||
tracing::info!(rows = total_rows, "Properties joined with postcodes");
|
||||
|
||||
// Get configured feature/enum names in config order
|
||||
let numeric_names = features::all_numeric_feature_names();
|
||||
|
|
@ -703,12 +614,11 @@ impl PropertyData {
|
|||
}
|
||||
}
|
||||
|
||||
// String columns for address/postcode and online listing metadata
|
||||
// String columns for address/postcode and property metadata
|
||||
for &string_col_name in &[
|
||||
"Address per Property Register",
|
||||
"Address per EPC",
|
||||
"Postcode",
|
||||
"Listing URL",
|
||||
"Property sub-type",
|
||||
"Price qualifier",
|
||||
] {
|
||||
|
|
@ -731,11 +641,6 @@ impl PropertyData {
|
|||
if has_renovation_history {
|
||||
select_exprs.push(col("renovation_history"));
|
||||
}
|
||||
let has_listing_features = schema.get("Listing features").is_some();
|
||||
if has_listing_features {
|
||||
select_exprs.push(col("Listing features"));
|
||||
}
|
||||
|
||||
let df = combined
|
||||
.lazy()
|
||||
.select(select_exprs)
|
||||
|
|
@ -827,7 +732,7 @@ impl PropertyData {
|
|||
let address_raw = extract_string_col(&df, "Address per Property Register")?;
|
||||
let postcode_raw = extract_string_col(&df, "Postcode")?;
|
||||
|
||||
// Extract optional string columns for online listing metadata
|
||||
// Extract optional string columns
|
||||
let extract_optional_string_col =
|
||||
|df: &DataFrame, name: &str| -> anyhow::Result<Vec<Option<String>>> {
|
||||
if let Ok(column) = df.column(name) {
|
||||
|
|
@ -852,7 +757,6 @@ impl PropertyData {
|
|||
}
|
||||
};
|
||||
|
||||
let listing_url_raw = extract_optional_string_col(&df, "Listing URL")?;
|
||||
let property_sub_type_raw = extract_optional_string_col(&df, "Property sub-type")?;
|
||||
let price_qualifier_raw = extract_optional_string_col(&df, "Price qualifier")?;
|
||||
|
||||
|
|
@ -996,44 +900,6 @@ impl PropertyData {
|
|||
FxHashMap::default()
|
||||
};
|
||||
|
||||
// Extract listing features: List<String>
|
||||
let mut listing_features_raw: FxHashMap<u32, Vec<String>> = if has_listing_features {
|
||||
tracing::info!("Extracting listing features");
|
||||
let feat_col = df
|
||||
.column("Listing features")
|
||||
.context("Missing Listing features column")?;
|
||||
let list_ca = feat_col
|
||||
.list()
|
||||
.context("Listing features is not a list column")?;
|
||||
|
||||
let mut features_map: FxHashMap<u32, Vec<String>> = FxHashMap::default();
|
||||
for old_row in 0..row_count {
|
||||
if let Some(inner) = list_ca.get_as_series(old_row) {
|
||||
if inner.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let str_ca = inner
|
||||
.str()
|
||||
.context("Listing features inner is not a string series")?;
|
||||
let items: Vec<String> = str_ca
|
||||
.into_iter()
|
||||
.filter_map(|v| v.map(|s| s.to_string()))
|
||||
.filter(|s| !s.is_empty())
|
||||
.collect();
|
||||
if !items.is_empty() {
|
||||
features_map.insert(old_row as u32, items);
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::info!(
|
||||
properties_with_features = features_map.len(),
|
||||
"Listing features extracted"
|
||||
);
|
||||
features_map
|
||||
} else {
|
||||
FxHashMap::default()
|
||||
};
|
||||
|
||||
// Sort all rows by spatial locality so that grid queries access
|
||||
// contiguous memory (sequential reads instead of random DRAM accesses).
|
||||
tracing::info!("Sorting rows by spatial locality");
|
||||
|
|
@ -1103,28 +969,7 @@ impl PropertyData {
|
|||
map
|
||||
};
|
||||
|
||||
// Re-key listing_features by permuted row index
|
||||
let listing_features: FxHashMap<u32, Vec<String>> = {
|
||||
let mut map =
|
||||
FxHashMap::with_capacity_and_hasher(listing_features_raw.len(), Default::default());
|
||||
for (new_row, &old_row) in perm.iter().enumerate() {
|
||||
if let Some(items) = listing_features_raw.remove(&old_row) {
|
||||
map.insert(new_row as u32, items);
|
||||
}
|
||||
}
|
||||
map
|
||||
};
|
||||
|
||||
// Permute optional string columns into sparse HashMaps
|
||||
let listing_url: FxHashMap<u32, String> = {
|
||||
let mut map = FxHashMap::default();
|
||||
for (new_row, &old_row) in perm.iter().enumerate() {
|
||||
if let Some(ref s) = listing_url_raw[old_row as usize] {
|
||||
map.insert(new_row as u32, s.clone());
|
||||
}
|
||||
}
|
||||
map
|
||||
};
|
||||
let property_sub_type: FxHashMap<u32, String> = {
|
||||
let mut map = FxHashMap::default();
|
||||
for (new_row, &old_row) in perm.iter().enumerate() {
|
||||
|
|
@ -1145,11 +990,24 @@ impl PropertyData {
|
|||
};
|
||||
|
||||
// Build enum_values map: feature_index -> list of string values
|
||||
// and enum_counts map: feature_index -> per-value global counts
|
||||
let mut enum_values: rustc_hash::FxHashMap<usize, Vec<String>> =
|
||||
rustc_hash::FxHashMap::default();
|
||||
for (enum_idx, (values, _)) in enum_col_major.iter().enumerate() {
|
||||
let mut enum_counts: rustc_hash::FxHashMap<usize, Vec<u64>> =
|
||||
rustc_hash::FxHashMap::default();
|
||||
for (enum_idx, (values, encoded)) in enum_col_major.iter().enumerate() {
|
||||
let feature_idx = num_numeric + enum_idx;
|
||||
enum_values.insert(feature_idx, values.clone());
|
||||
let mut counts = vec![0u64; values.len()];
|
||||
for &val in encoded {
|
||||
if val.is_finite() {
|
||||
let idx = val as usize;
|
||||
if idx < counts.len() {
|
||||
counts[idx] += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
enum_counts.insert(feature_idx, counts);
|
||||
}
|
||||
|
||||
// Build feature_stats: numeric stats + placeholder stats for enums
|
||||
|
|
@ -1232,10 +1090,9 @@ impl PropertyData {
|
|||
postcode_interner,
|
||||
postcode_keys,
|
||||
enum_values,
|
||||
enum_counts,
|
||||
approx_build_date_bits,
|
||||
renovation_history,
|
||||
listing_features,
|
||||
listing_url,
|
||||
property_sub_type,
|
||||
price_qualifier,
|
||||
})
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue