diff --git a/sync-server/src/app_state/cursors.rs b/sync-server/src/app_state/cursors.rs index 4d01995a..e17fb4f7 100644 --- a/sync-server/src/app_state/cursors.rs +++ b/sync-server/src/app_state/cursors.rs @@ -117,16 +117,14 @@ impl Cursors { .unwrap_or_default() }; - self.broadcasts - .send_document_update( - vault_id.clone(), - WebSocketServerMessageWithOrigin::new(WebSocketServerMessage::CursorPositions( - CursorPositionFromServer { - clients: client_cursors, - }, - )), - ) - .await; + self.broadcasts.send_document_update( + vault_id.clone(), + WebSocketServerMessageWithOrigin::new(WebSocketServerMessage::CursorPositions( + CursorPositionFromServer { + clients: client_cursors, + }, + )), + ); } pub async fn remove_cursors_of_device(&self, vault_id: &VaultId, device_id: &DeviceId) { diff --git a/sync-server/src/app_state/database.rs b/sync-server/src/app_state/database.rs index a249dadc..e8c02b31 100644 --- a/sync-server/src/app_state/database.rs +++ b/sync-server/src/app_state/database.rs @@ -739,22 +739,23 @@ impl Database { .await .context("Failed to commit transaction")?; + // Both sends are synchronous: there's no `.await` between the + // `commit()` above and function return, so a task cancellation + // can't drop the broadcast and leave peers permanently behind. if broadcast.content_changed { // Content events are filtered out for the origin device — the // origin already has the content (or learns about the merge // via the HTTP response). - self.broadcasts - .send_document_update( - vault_id.clone(), - WebSocketServerMessageWithOrigin::with_origin( - version.device_id.clone(), - WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate { - documents: vec![version.clone().into()], - is_initial_sync: false, - }), - ), - ) - .await; + self.broadcasts.send_document_update( + vault_id.clone(), + WebSocketServerMessageWithOrigin::with_origin( + version.device_id.clone(), + WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate { + documents: vec![version.clone().into()], + is_initial_sync: false, + }), + ), + ); } if broadcast.path_changed { @@ -763,18 +764,19 @@ impl Database { // receives them. The create/update HTTP response no longer // carries `relative_path`, so the origin device relies on this // event to learn the server-canonical path. - self.broadcasts - .send_document_update( - vault_id.clone(), - WebSocketServerMessageWithOrigin::new(WebSocketServerMessage::PathChange( - WebSocketVaultPathChange { - vault_update_id: version.vault_update_id, - document_id: version.document_id, - relative_path: version.relative_path.clone(), - }, - )), - ) - .await; + self.broadcasts.send_document_update( + vault_id.clone(), + WebSocketServerMessageWithOrigin::new(WebSocketServerMessage::PathChange( + WebSocketVaultPathChange { + vault_update_id: version.vault_update_id, + document_id: version.document_id, + relative_path: version.relative_path.clone(), + updated_date: version.updated_date, + user_id: version.user_id.clone(), + device_id: version.device_id.clone(), + }, + )), + ); } Ok(()) diff --git a/sync-server/src/app_state/websocket/broadcasts.rs b/sync-server/src/app_state/websocket/broadcasts.rs index 91183970..0b49fa27 100644 --- a/sync-server/src/app_state/websocket/broadcasts.rs +++ b/sync-server/src/app_state/websocket/broadcasts.rs @@ -1,4 +1,7 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::HashMap, + sync::{Arc, Mutex as StdMutex}, +}; use log::{debug, warn}; use tokio::sync::{Mutex, broadcast}; @@ -9,7 +12,12 @@ use crate::{app_state::database::models::VaultId, config::server_config::ServerC #[derive(Debug, Clone)] pub struct Broadcasts { broadcast_channel_capacity: usize, - tx: Arc>>>, + // `tx` uses a blocking std::sync::Mutex because the critical section is + // a HashMap lookup plus a synchronous `broadcast::Sender::send`. Making + // this non-async lets `send_document_update` run without an `.await`, + // so an axum handler that is cancelled between `transaction.commit()` + // and the broadcast can never drop the notification mid-flight. + tx: Arc>>>, send_locks: Arc>>>>, } @@ -19,7 +27,7 @@ impl Broadcasts { pub fn new(server_config: &ServerConfig) -> Self { Self { broadcast_channel_capacity: server_config.broadcast_channel_capacity, - tx: Arc::new(Mutex::new(HashMap::new())), + tx: Arc::new(StdMutex::new(HashMap::new())), send_locks: Arc::new(Mutex::new(HashMap::new())), } } @@ -42,19 +50,25 @@ impl Broadcasts { tx_map.retain(|_, sender| sender.receiver_count() > 0); } - pub async fn get_receiver( + pub fn get_receiver( &self, vault: VaultId, max_clients: usize, ) -> Result, crate::errors::SyncServerError> { - let mut tx_map = self.tx.lock().await; + let mut tx_map = self + .tx + .lock() + .expect("broadcasts.tx mutex poisoned — a previous holder panicked"); Self::prune_inactive_vaults(&mut tx_map); let sender = tx_map .entry(vault) .or_insert_with(|| broadcast::channel(self.broadcast_channel_capacity).0); + // Hold the lock across the count check *and* the subscribe so the + // `max_clients` cap is atomic: two concurrent callers can't both + // observe `receiver_count() < max_clients` and both subscribe. if sender.receiver_count() >= max_clients { return Err(crate::errors::client_error(anyhow::anyhow!( "Vault has reached the maximum number of clients ({max_clients})" @@ -65,13 +79,18 @@ impl Broadcasts { } /// Notify all clients (who are subscribed to the vault) about an update. - /// We only log failures and don't propagate them. - pub async fn send_document_update( + /// Synchronous: safe to invoke from a handler between `commit()` and + /// function return without worrying about task cancellation dropping + /// the broadcast mid-flight. Failures are logged, never propagated. + pub fn send_document_update( &self, vault: VaultId, document: WebSocketServerMessageWithOrigin, ) { - let mut tx_map = self.tx.lock().await; + let mut tx_map = self + .tx + .lock() + .expect("broadcasts.tx mutex poisoned — a previous holder panicked"); Self::prune_inactive_vaults(&mut tx_map); let sender = tx_map diff --git a/sync-server/src/server/create_document.rs b/sync-server/src/server/create_document.rs index 7f0f6b60..64c3c5fe 100644 --- a/sync-server/src/server/create_document.rs +++ b/sync-server/src/server/create_document.rs @@ -69,31 +69,40 @@ pub async fn create_document( .map_err(server_error)?; if let Some(latest_version) = latest_version { - let is_mergeable_text = is_file_type_mergable( - &sanitized_relative_path, - &state.config.server.mergeable_file_extensions, - ) && !is_binary(&latest_version.content) - && !is_binary(&new_content); - - if is_mergeable_text || new_content == latest_version.content { - return update_document::update_document( + // Only merge with an existing document the client couldn't have + // known about: its creation is newer than the client's last seen + // vault update to avoid creating cycles by merging two documents into one. + // This could happen if both clients know of document A at path P1, + // but client 2 moves it to P2 while client 1 creates a new document at P2, + // then client 1 would merge its new document with the moved version of A at P2 + // that client 2 resulting in two files (P1 and P2) with the same doc id (A). + if latest_version.creation_vault_update_id > request.last_seen_vault_update_id { + let is_mergeable_text = is_file_type_mergable( &sanitized_relative_path, - Vec::new(), - vault_id, - latest_version.document_id, - &request.relative_path, - new_content, - user, - device_id, - state, - transaction, - ) - .await; - } + &state.config.server.mergeable_file_extensions, + ) && !is_binary(&latest_version.content) + && !is_binary(&new_content); - // For non-mergeable (binary) files with different content, don't - // merge, create a separate document at a deconflicted path so - // neither client's data is silently overwritten. + if is_mergeable_text || new_content == latest_version.content { + return update_document::update_document( + &sanitized_relative_path, + Vec::new(), + vault_id, + latest_version.document_id, + &request.relative_path, + new_content, + user, + device_id, + state, + transaction, + ) + .await; + } + + // For non-mergeable (binary) files with different content, don't + // merge, create a separate document at a deconflicted path so + // neither client's data is silently overwritten. + } } let document_id = uuid::Uuid::new_v4(); diff --git a/sync-server/src/server/delete_document.rs b/sync-server/src/server/delete_document.rs index a523c499..3057bd6e 100644 --- a/sync-server/src/server/delete_document.rs +++ b/sync-server/src/server/delete_document.rs @@ -1,4 +1,4 @@ -use anyhow::Context; +use anyhow::{Context, anyhow}; use axum::{ Extension, Json, extract::{Path, State}, @@ -17,7 +17,7 @@ use crate::{ }, }, config::user_config::User, - errors::{SyncServerError, server_error, write_transaction_error}, + errors::{SyncServerError, not_found_error, server_error, write_transaction_error}, utils::normalize::normalize, }; @@ -60,9 +60,18 @@ pub async fn delete_document( .await .map_err(server_error)?; - if let Some(latest_version) = &latest_version - && latest_version.is_deleted - { + let Some(latest_version) = latest_version else { + transaction + .rollback() + .await + .context("Failed to roll back transaction") + .map_err(server_error)?; + return Err(not_found_error(anyhow!( + "Document `{document_id}` not found in vault `{vault_id}`" + ))); + }; + + if latest_version.is_deleted { transaction .rollback() .await @@ -70,15 +79,13 @@ pub async fn delete_document( .map_err(server_error)?; info!("Document `{document_id}` has already been deleted",); - return Ok(Json(latest_version.clone().into())); + return Ok(Json(latest_version.into())); } let new_vault_update_id = last_update_id + 1; - let (latest_relative_path, latest_content, creation_vault_update_id) = - latest_version.map_or_else( - || (String::new(), Vec::new(), new_vault_update_id), - |version| (version.relative_path, version.content, version.creation_vault_update_id), - ); + let latest_relative_path = latest_version.relative_path; + let latest_content = latest_version.content; + let creation_vault_update_id = latest_version.creation_vault_update_id; let new_version = StoredDocumentVersion { vault_update_id: new_vault_update_id, diff --git a/sync-server/src/server/requests.rs b/sync-server/src/server/requests.rs index 107c998c..250c65d7 100644 --- a/sync-server/src/server/requests.rs +++ b/sync-server/src/server/requests.rs @@ -11,6 +11,9 @@ use crate::app_state::database::models::VaultUpdateId; pub struct CreateDocumentVersion { pub relative_path: String, + #[ts(type = "number")] + pub last_seen_vault_update_id: VaultUpdateId, + #[ts(as = "Vec")] #[form_data(limit = "unlimited")] pub content: FieldData, diff --git a/sync-server/src/server/restore_document_version.rs b/sync-server/src/server/restore_document_version.rs index bb4e6775..7522e73c 100644 --- a/sync-server/src/server/restore_document_version.rs +++ b/sync-server/src/server/restore_document_version.rs @@ -147,7 +147,14 @@ pub async fn restore_document_version( let (content_changed, path_changed) = match ¤t_latest { Some(prev) => ( prev.content != new_version.content || prev.is_deleted, - prev.relative_path != new_version.relative_path, + // Mirror `update_document`: `path_changed` is true when the + // stored path differs from either the prior stored path (peers + // need to learn about the move) *or* from the path the caller + // implicitly requested (`target_version.relative_path`, so the + // origin learns if the server deduped its requested restore + // path). + prev.relative_path != new_version.relative_path + || target_version.relative_path != new_version.relative_path, ), // No prior version (shouldn't happen in practice — target_version // already proved the document exists — but treat defensively). diff --git a/sync-server/src/server/websocket.rs b/sync-server/src/server/websocket.rs index ffac8d38..41bf4754 100644 --- a/sync-server/src/server/websocket.rs +++ b/sync-server/src/server/websocket.rs @@ -115,7 +115,6 @@ async fn websocket( let mut broadcast_receiver = match state .broadcasts .get_receiver(vault_id.clone(), max_clients) - .await { Ok(receiver) => receiver, Err(err) => {