Parallel clean up

This commit is contained in:
Andras Schmelczer 2026-04-01 21:46:29 +01:00
parent 03b5c223d6
commit 19e4c39f44

View file

@ -1,5 +1,5 @@
use core::time::Duration;
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, sync::Arc, sync::atomic::{AtomicU64, Ordering}};
use anyhow::{Context as _, Result};
use log::info;
@ -42,7 +42,8 @@ struct VaultPools {
#[derive(Debug)]
struct VaultPool {
cell: Arc<OnceCell<VaultPools>>,
last_accessed: Mutex<Instant>,
/// Monotonic timestamp in milliseconds (from `Instant::now()` at server start)
last_accessed_ms: AtomicU64,
}
#[derive(Clone, Debug)]
@ -57,6 +58,8 @@ pub struct Database {
/// This mutex moves the wait from the SQLite layer (where it holds a
/// pool connection) to the Tokio layer (where it holds nothing).
write_locks: Arc<Mutex<HashMap<VaultId, Arc<tokio::sync::Mutex<()>>>>>,
/// Monotonic epoch for lock-free `last_accessed_ms` timestamps
epoch: Instant,
}
/// A write transaction backed by a raw `BEGIN IMMEDIATE` instead of sqlx's
@ -171,6 +174,10 @@ fn rollback_before_acquire(
}
impl Database {
fn now_ms(&self) -> u64 {
self.epoch.elapsed().as_millis() as u64
}
/// Lists all vault IDs that exist on disk (have a `.sqlite` file).
pub async fn list_vaults(&self) -> Result<Vec<VaultId>> {
let mut vaults = Vec::new();
@ -248,7 +255,7 @@ impl Database {
vault.clone(),
Arc::new(VaultPool {
cell,
last_accessed: Mutex::new(Instant::now()),
last_accessed_ms: AtomicU64::new(0),
}),
);
}
@ -259,6 +266,7 @@ impl Database {
connection_pools: Arc::new(Mutex::new(connection_pools)),
broadcasts: broadcasts.clone(),
write_locks: Arc::new(Mutex::new(HashMap::new())),
epoch: Instant::now(),
};
database.start_idle_pool_cleanup(shutdown);
@ -385,7 +393,7 @@ impl Database {
.or_insert_with(|| {
Arc::new(VaultPool {
cell: Arc::new(OnceCell::new()),
last_accessed: Mutex::new(Instant::now()),
last_accessed_ms: AtomicU64::new(self.now_ms()),
})
})
.clone()
@ -403,7 +411,7 @@ impl Database {
})
.await?;
*vault_pool.last_accessed.lock().await = Instant::now();
vault_pool.last_accessed_ms.store(self.now_ms(), Ordering::Relaxed);
Ok(pools.clone())
}
@ -863,16 +871,14 @@ impl Database {
// pool.close().await doesn't block other get_connection_pool calls.
let idle_pools: Vec<(VaultId, Arc<VaultPool>)> = {
let mut pools = self.connection_pools.lock().await;
let now = Instant::now();
let now_ms = self.now_ms();
let idle_threshold_ms = IDLE_POOL_TIMEOUT.as_millis() as u64;
let vaults_to_remove: Vec<VaultId> = pools
.iter()
.filter(|(_, vp)| {
// If the lock is contested, the pool is actively used — not idle.
let Ok(last) = vp.last_accessed.try_lock() else {
return false;
};
now.duration_since(*last) > IDLE_POOL_TIMEOUT
let last = vp.last_accessed_ms.load(Ordering::Relaxed);
now_ms.saturating_sub(last) > idle_threshold_ms
})
.map(|(vault_id, _)| vault_id.clone())
.collect();
@ -883,21 +889,48 @@ impl Database {
.collect()
};
for (vault_id, vault_pool) in idle_pools {
if let Some(pools) = vault_pool.cell.get() {
// Checkpoint the WAL before closing to reclaim disk space
// and ensure the next open doesn't need a large WAL replay.
// TRUNCATE mode resets the WAL file to zero bytes.
if let Err(e) = sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
.execute(&pools.writer)
.await
{
log::warn!("WAL checkpoint failed for vault `{vault_id}`: {e}");
}
info!("Closing idle database connection pools for vault `{vault_id}`");
pools.reader.close().await;
pools.writer.close().await;
}
// Close pools concurrently so cleanup doesn't serialise across vaults
let closures: Vec<_> = idle_pools
.into_iter()
.filter_map(|(vault_id, vault_pool)| {
vault_pool.cell.get().cloned().map(|pools| (vault_id, pools))
})
.collect();
let handles: Vec<_> = closures
.into_iter()
.map(|(vault_id, pools)| {
tokio::spawn(async move {
// Checkpoint the WAL before closing to reclaim disk space.
// Run on the blocking pool so disk I/O doesn't starve the runtime
let writer_clone = pools.writer.clone();
let ckpt_result = tokio::task::spawn_blocking(move || {
futures::executor::block_on(
sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
.execute(&writer_clone),
)
})
.await;
match ckpt_result {
Ok(Err(e)) => {
log::warn!("WAL checkpoint failed for vault `{vault_id}`: {e}");
}
Err(e) => {
log::warn!("WAL checkpoint task panicked for vault `{vault_id}`: {e}");
}
_ => {}
}
info!("Closing idle database connection pools for vault `{vault_id}`");
pools.reader.close().await;
pools.writer.close().await;
})
})
.collect();
for handle in handles {
let _ = handle.await;
}
}