From 19e4c39f44afdb5570867b65944b50fe2496fde5 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Wed, 1 Apr 2026 21:46:29 +0100 Subject: [PATCH] Parallel clean up --- sync-server/src/app_state/database.rs | 85 +++++++++++++++++++-------- 1 file changed, 59 insertions(+), 26 deletions(-) diff --git a/sync-server/src/app_state/database.rs b/sync-server/src/app_state/database.rs index eb450c56..195dc7e7 100644 --- a/sync-server/src/app_state/database.rs +++ b/sync-server/src/app_state/database.rs @@ -1,5 +1,5 @@ use core::time::Duration; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, sync::atomic::{AtomicU64, Ordering}}; use anyhow::{Context as _, Result}; use log::info; @@ -42,7 +42,8 @@ struct VaultPools { #[derive(Debug)] struct VaultPool { cell: Arc>, - last_accessed: Mutex, + /// Monotonic timestamp in milliseconds (from `Instant::now()` at server start) + last_accessed_ms: AtomicU64, } #[derive(Clone, Debug)] @@ -57,6 +58,8 @@ pub struct Database { /// This mutex moves the wait from the SQLite layer (where it holds a /// pool connection) to the Tokio layer (where it holds nothing). write_locks: Arc>>>>, + /// Monotonic epoch for lock-free `last_accessed_ms` timestamps + epoch: Instant, } /// A write transaction backed by a raw `BEGIN IMMEDIATE` instead of sqlx's @@ -171,6 +174,10 @@ fn rollback_before_acquire( } impl Database { + fn now_ms(&self) -> u64 { + self.epoch.elapsed().as_millis() as u64 + } + /// Lists all vault IDs that exist on disk (have a `.sqlite` file). pub async fn list_vaults(&self) -> Result> { let mut vaults = Vec::new(); @@ -248,7 +255,7 @@ impl Database { vault.clone(), Arc::new(VaultPool { cell, - last_accessed: Mutex::new(Instant::now()), + last_accessed_ms: AtomicU64::new(0), }), ); } @@ -259,6 +266,7 @@ impl Database { connection_pools: Arc::new(Mutex::new(connection_pools)), broadcasts: broadcasts.clone(), write_locks: Arc::new(Mutex::new(HashMap::new())), + epoch: Instant::now(), }; database.start_idle_pool_cleanup(shutdown); @@ -385,7 +393,7 @@ impl Database { .or_insert_with(|| { Arc::new(VaultPool { cell: Arc::new(OnceCell::new()), - last_accessed: Mutex::new(Instant::now()), + last_accessed_ms: AtomicU64::new(self.now_ms()), }) }) .clone() @@ -403,7 +411,7 @@ impl Database { }) .await?; - *vault_pool.last_accessed.lock().await = Instant::now(); + vault_pool.last_accessed_ms.store(self.now_ms(), Ordering::Relaxed); Ok(pools.clone()) } @@ -863,16 +871,14 @@ impl Database { // pool.close().await doesn't block other get_connection_pool calls. let idle_pools: Vec<(VaultId, Arc)> = { let mut pools = self.connection_pools.lock().await; - let now = Instant::now(); + let now_ms = self.now_ms(); + let idle_threshold_ms = IDLE_POOL_TIMEOUT.as_millis() as u64; let vaults_to_remove: Vec = pools .iter() .filter(|(_, vp)| { - // If the lock is contested, the pool is actively used — not idle. - let Ok(last) = vp.last_accessed.try_lock() else { - return false; - }; - now.duration_since(*last) > IDLE_POOL_TIMEOUT + let last = vp.last_accessed_ms.load(Ordering::Relaxed); + now_ms.saturating_sub(last) > idle_threshold_ms }) .map(|(vault_id, _)| vault_id.clone()) .collect(); @@ -883,21 +889,48 @@ impl Database { .collect() }; - for (vault_id, vault_pool) in idle_pools { - if let Some(pools) = vault_pool.cell.get() { - // Checkpoint the WAL before closing to reclaim disk space - // and ensure the next open doesn't need a large WAL replay. - // TRUNCATE mode resets the WAL file to zero bytes. - if let Err(e) = sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)") - .execute(&pools.writer) - .await - { - log::warn!("WAL checkpoint failed for vault `{vault_id}`: {e}"); - } - info!("Closing idle database connection pools for vault `{vault_id}`"); - pools.reader.close().await; - pools.writer.close().await; - } + // Close pools concurrently so cleanup doesn't serialise across vaults + let closures: Vec<_> = idle_pools + .into_iter() + .filter_map(|(vault_id, vault_pool)| { + vault_pool.cell.get().cloned().map(|pools| (vault_id, pools)) + }) + .collect(); + + let handles: Vec<_> = closures + .into_iter() + .map(|(vault_id, pools)| { + tokio::spawn(async move { + // Checkpoint the WAL before closing to reclaim disk space. + // Run on the blocking pool so disk I/O doesn't starve the runtime + let writer_clone = pools.writer.clone(); + let ckpt_result = tokio::task::spawn_blocking(move || { + futures::executor::block_on( + sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)") + .execute(&writer_clone), + ) + }) + .await; + + match ckpt_result { + Ok(Err(e)) => { + log::warn!("WAL checkpoint failed for vault `{vault_id}`: {e}"); + } + Err(e) => { + log::warn!("WAL checkpoint task panicked for vault `{vault_id}`: {e}"); + } + _ => {} + } + + info!("Closing idle database connection pools for vault `{vault_id}`"); + pools.reader.close().await; + pools.writer.close().await; + }) + }) + .collect(); + + for handle in handles { + let _ = handle.await; } }