This commit is contained in:
Andras Schmelczer 2026-04-07 21:28:52 +01:00
parent 5a4723cd00
commit 53bfbfaa4a
11 changed files with 162 additions and 359 deletions

View file

@ -663,12 +663,12 @@ impl Database {
.context("Cannot fetch document version")
}
// inserting the document must be the last step of the transaction if there's one
// inserting the document must be the last step of the transaction
pub async fn insert_document_version(
&self,
vault_id: &VaultId,
version: &StoredDocumentVersion,
transaction: Option<WriteTransaction>,
mut transaction: WriteTransaction,
) -> Result<()> {
let document_id = version.document_id.as_hyphenated();
let query = sqlx::query!(
@ -697,22 +697,20 @@ impl Database {
version.has_been_merged
);
if let Some(mut transaction) = transaction {
query
.execute(&mut *transaction)
.await
.context("Cannot insert document version")?;
// Acquire the broadcast send lock before the insert so that
// broadcasts are serialized in vault_update_id order even after
// the write transaction (and its per-vault lock) is released.
let _send_guard = self.broadcasts.acquire_send_lock(vault_id).await;
transaction
.commit()
.await
.context("Failed to commit transaction")?;
} else {
query
.execute(&self.get_connection_pool(vault_id).await?)
.await
.context("Cannot insert document version")?;
}
query
.execute(&mut *transaction)
.await
.context("Cannot insert document version")?;
transaction
.commit()
.await
.context("Failed to commit transaction")?;
self.broadcasts
.send_document_update(

View file

@ -10,6 +10,7 @@ use crate::{app_state::database::models::VaultId, config::server_config::ServerC
pub struct Broadcasts {
broadcast_channel_capacity: usize,
tx: Arc<Mutex<HashMap<VaultId, broadcast::Sender<WebSocketServerMessageWithOrigin>>>>,
send_locks: Arc<Mutex<HashMap<VaultId, Arc<tokio::sync::Mutex<()>>>>>,
}
type TxMap = HashMap<VaultId, broadcast::Sender<WebSocketServerMessageWithOrigin>>;
@ -19,9 +20,23 @@ impl Broadcasts {
Self {
broadcast_channel_capacity: server_config.broadcast_channel_capacity,
tx: Arc::new(Mutex::new(HashMap::new())),
send_locks: Arc::new(Mutex::new(HashMap::new())),
}
}
/// Acquire a per-vault lock that serializes broadcasts in commit order.
/// Must be acquired before the insert, held through commit and broadcast.
pub async fn acquire_send_lock(&self, vault: &VaultId) -> tokio::sync::OwnedMutexGuard<()> {
let lock = {
let mut locks = self.send_locks.lock().await;
locks
.entry(vault.clone())
.or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
.clone()
};
lock.lock_owned().await
}
/// Remove senders for vaults with no active receivers
fn prune_inactive_vaults(tx_map: &mut TxMap) {
tx_map.retain(|_, sender| sender.receiver_count() > 0);

View file

@ -130,7 +130,7 @@ pub async fn create_document(
state
.database
.insert_document_version(&vault_id, &new_version, Some(transaction))
.insert_document_version(&vault_id, &new_version, transaction)
.await
.map_err(server_error)?;

View file

@ -91,7 +91,7 @@ pub async fn delete_document(
state
.database
.insert_document_version(&vault_id, &new_version, Some(transaction))
.insert_document_version(&vault_id, &new_version, transaction)
.await
.map_err(server_error)?;

View file

@ -134,7 +134,7 @@ pub async fn restore_document_version(
state
.database
.insert_document_version(&vault_id, &new_version, Some(transaction))
.insert_document_version(&vault_id, &new_version, transaction)
.await
.map_err(server_error)?;

View file

@ -306,7 +306,7 @@ pub async fn update_document(
state
.database
.insert_document_version(&vault_id, &new_version, Some(transaction))
.insert_document_version(&vault_id, &new_version, transaction)
.await
.map_err(server_error)?;