Close unsued databases
This commit is contained in:
parent
10fdc938c5
commit
e635e84aa4
1 changed files with 85 additions and 8 deletions
|
|
@ -2,6 +2,7 @@ use core::time::Duration;
|
||||||
use std::{collections::HashMap, sync::Arc};
|
use std::{collections::HashMap, sync::Arc};
|
||||||
|
|
||||||
use anyhow::{Context as _, Result};
|
use anyhow::{Context as _, Result};
|
||||||
|
use log::info;
|
||||||
use models::{
|
use models::{
|
||||||
DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId, VaultUpdateId,
|
DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId, VaultUpdateId,
|
||||||
};
|
};
|
||||||
|
|
@ -10,6 +11,7 @@ use sqlx::{sqlite::SqliteConnectOptions, types::chrono::Utc};
|
||||||
pub mod models;
|
pub mod models;
|
||||||
use sqlx::{Pool, Sqlite, sqlite::SqlitePoolOptions};
|
use sqlx::{Pool, Sqlite, sqlite::SqlitePoolOptions};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
use tokio::time::Instant;
|
||||||
use uuid::fmt::Hyphenated;
|
use uuid::fmt::Hyphenated;
|
||||||
|
|
||||||
use super::websocket::{
|
use super::websocket::{
|
||||||
|
|
@ -18,11 +20,26 @@ use super::websocket::{
|
||||||
};
|
};
|
||||||
use crate::config::database_config::DatabaseConfig;
|
use crate::config::database_config::DatabaseConfig;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct PoolWithTimestamp {
|
||||||
|
pool: Pool<Sqlite>,
|
||||||
|
last_accessed: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Debug for PoolWithTimestamp {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
f.debug_struct("PoolWithTimestamp")
|
||||||
|
.field("pool", &"Pool<Sqlite>")
|
||||||
|
.field("last_accessed", &self.last_accessed)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Database {
|
pub struct Database {
|
||||||
config: DatabaseConfig,
|
config: DatabaseConfig,
|
||||||
broadcasts: Broadcasts,
|
broadcasts: Broadcasts,
|
||||||
connection_pools: Arc<Mutex<HashMap<VaultId, Pool<Sqlite>>>>,
|
connection_pools: Arc<Mutex<HashMap<VaultId, PoolWithTimestamp>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Transaction<'a> = sqlx::Transaction<'a, Sqlite>;
|
pub type Transaction<'a> = sqlx::Transaction<'a, Sqlite>;
|
||||||
|
|
@ -52,17 +69,26 @@ impl Database {
|
||||||
.trim_end_matches(".sqlite")
|
.trim_end_matches(".sqlite")
|
||||||
.to_owned();
|
.to_owned();
|
||||||
|
|
||||||
|
let pool = Self::create_vault_database(config, &vault).await?;
|
||||||
connection_pools.insert(
|
connection_pools.insert(
|
||||||
vault.clone(),
|
vault.clone(),
|
||||||
Self::create_vault_database(config, &vault).await?,
|
PoolWithTimestamp {
|
||||||
|
pool,
|
||||||
|
last_accessed: Instant::now(),
|
||||||
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Self {
|
let database = Self {
|
||||||
config: config.clone(),
|
config: config.clone(),
|
||||||
connection_pools: Arc::new(Mutex::new(connection_pools)),
|
connection_pools: Arc::new(Mutex::new(connection_pools)),
|
||||||
broadcasts: broadcasts.clone(),
|
broadcasts: broadcasts.clone(),
|
||||||
})
|
};
|
||||||
|
|
||||||
|
// Start background task to cleanup idle connection pools
|
||||||
|
database.start_idle_pool_cleanup();
|
||||||
|
|
||||||
|
Ok(database)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn create_vault_database(
|
async fn create_vault_database(
|
||||||
|
|
@ -100,16 +126,26 @@ impl Database {
|
||||||
|
|
||||||
async fn get_connection_pool(&self, vault: &VaultId) -> Result<Pool<Sqlite>> {
|
async fn get_connection_pool(&self, vault: &VaultId) -> Result<Pool<Sqlite>> {
|
||||||
let mut pools = self.connection_pools.lock().await;
|
let mut pools = self.connection_pools.lock().await;
|
||||||
|
|
||||||
if !pools.contains_key(vault) {
|
if !pools.contains_key(vault) {
|
||||||
let pool = Self::create_vault_database(&self.config, vault).await?;
|
let pool = Self::create_vault_database(&self.config, vault).await?;
|
||||||
pools.insert(vault.clone(), pool);
|
pools.insert(
|
||||||
|
vault.clone(),
|
||||||
|
PoolWithTimestamp {
|
||||||
|
pool,
|
||||||
|
last_accessed: Instant::now(),
|
||||||
|
},
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let pool = pools
|
let pool_with_timestamp = pools
|
||||||
.get(vault)
|
.get_mut(vault)
|
||||||
.expect("Pool was just inserted or already exists");
|
.expect("Pool was just inserted or already exists");
|
||||||
|
|
||||||
Ok(pool.clone())
|
// Update last accessed time
|
||||||
|
pool_with_timestamp.last_accessed = Instant::now();
|
||||||
|
|
||||||
|
Ok(pool_with_timestamp.pool.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Attempting to write from this transaction might result in a
|
/// Attempting to write from this transaction might result in a
|
||||||
|
|
@ -434,4 +470,45 @@ impl Database {
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Cleanup idle connection pools that haven't been accessed in more than 5 minutes
|
||||||
|
async fn cleanup_idle_pools(&self) {
|
||||||
|
let mut pools = self.connection_pools.lock().await;
|
||||||
|
let now = Instant::now();
|
||||||
|
let idle_timeout = Duration::from_secs(5 * 60); // 5 minutes
|
||||||
|
|
||||||
|
// Collect vaults to remove
|
||||||
|
let vaults_to_remove: Vec<VaultId> = pools
|
||||||
|
.iter()
|
||||||
|
.filter(|(_, pool_with_timestamp)| {
|
||||||
|
now.duration_since(pool_with_timestamp.last_accessed) > idle_timeout
|
||||||
|
})
|
||||||
|
.map(|(vault_id, _)| vault_id.clone())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Close and remove idle pools
|
||||||
|
for vault_id in &vaults_to_remove {
|
||||||
|
if let Some(pool_with_timestamp) = pools.remove(vault_id) {
|
||||||
|
info!(
|
||||||
|
"Closing idle database connection pool for vault {}",
|
||||||
|
vault_id
|
||||||
|
);
|
||||||
|
pool_with_timestamp.pool.close().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Start a background task that periodically cleans up idle connection pools
|
||||||
|
fn start_idle_pool_cleanup(&self) {
|
||||||
|
let database = self.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut interval = tokio::time::interval(Duration::from_secs(60)); // Check every minute
|
||||||
|
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
database.cleanup_idle_pools().await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue