This commit is contained in:
Andras Schmelczer 2026-04-21 22:35:30 +01:00
parent 5ee9db0007
commit 6a8c7635f1
8 changed files with 122 additions and 78 deletions

View file

@ -117,16 +117,14 @@ impl Cursors {
.unwrap_or_default() .unwrap_or_default()
}; };
self.broadcasts self.broadcasts.send_document_update(
.send_document_update( vault_id.clone(),
vault_id.clone(), WebSocketServerMessageWithOrigin::new(WebSocketServerMessage::CursorPositions(
WebSocketServerMessageWithOrigin::new(WebSocketServerMessage::CursorPositions( CursorPositionFromServer {
CursorPositionFromServer { clients: client_cursors,
clients: client_cursors, },
}, )),
)), );
)
.await;
} }
pub async fn remove_cursors_of_device(&self, vault_id: &VaultId, device_id: &DeviceId) { pub async fn remove_cursors_of_device(&self, vault_id: &VaultId, device_id: &DeviceId) {

View file

@ -739,22 +739,23 @@ impl Database {
.await .await
.context("Failed to commit transaction")?; .context("Failed to commit transaction")?;
// Both sends are synchronous: there's no `.await` between the
// `commit()` above and function return, so a task cancellation
// can't drop the broadcast and leave peers permanently behind.
if broadcast.content_changed { if broadcast.content_changed {
// Content events are filtered out for the origin device — the // Content events are filtered out for the origin device — the
// origin already has the content (or learns about the merge // origin already has the content (or learns about the merge
// via the HTTP response). // via the HTTP response).
self.broadcasts self.broadcasts.send_document_update(
.send_document_update( vault_id.clone(),
vault_id.clone(), WebSocketServerMessageWithOrigin::with_origin(
WebSocketServerMessageWithOrigin::with_origin( version.device_id.clone(),
version.device_id.clone(), WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate {
WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate { documents: vec![version.clone().into()],
documents: vec![version.clone().into()], is_initial_sync: false,
is_initial_sync: false, }),
}), ),
), );
)
.await;
} }
if broadcast.path_changed { if broadcast.path_changed {
@ -763,18 +764,19 @@ impl Database {
// receives them. The create/update HTTP response no longer // receives them. The create/update HTTP response no longer
// carries `relative_path`, so the origin device relies on this // carries `relative_path`, so the origin device relies on this
// event to learn the server-canonical path. // event to learn the server-canonical path.
self.broadcasts self.broadcasts.send_document_update(
.send_document_update( vault_id.clone(),
vault_id.clone(), WebSocketServerMessageWithOrigin::new(WebSocketServerMessage::PathChange(
WebSocketServerMessageWithOrigin::new(WebSocketServerMessage::PathChange( WebSocketVaultPathChange {
WebSocketVaultPathChange { vault_update_id: version.vault_update_id,
vault_update_id: version.vault_update_id, document_id: version.document_id,
document_id: version.document_id, relative_path: version.relative_path.clone(),
relative_path: version.relative_path.clone(), updated_date: version.updated_date,
}, user_id: version.user_id.clone(),
)), device_id: version.device_id.clone(),
) },
.await; )),
);
} }
Ok(()) Ok(())

View file

@ -1,4 +1,7 @@
use std::{collections::HashMap, sync::Arc}; use std::{
collections::HashMap,
sync::{Arc, Mutex as StdMutex},
};
use log::{debug, warn}; use log::{debug, warn};
use tokio::sync::{Mutex, broadcast}; use tokio::sync::{Mutex, broadcast};
@ -9,7 +12,12 @@ use crate::{app_state::database::models::VaultId, config::server_config::ServerC
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Broadcasts { pub struct Broadcasts {
broadcast_channel_capacity: usize, broadcast_channel_capacity: usize,
tx: Arc<Mutex<HashMap<VaultId, broadcast::Sender<WebSocketServerMessageWithOrigin>>>>, // `tx` uses a blocking std::sync::Mutex because the critical section is
// a HashMap lookup plus a synchronous `broadcast::Sender::send`. Making
// this non-async lets `send_document_update` run without an `.await`,
// so an axum handler that is cancelled between `transaction.commit()`
// and the broadcast can never drop the notification mid-flight.
tx: Arc<StdMutex<HashMap<VaultId, broadcast::Sender<WebSocketServerMessageWithOrigin>>>>,
send_locks: Arc<Mutex<HashMap<VaultId, Arc<tokio::sync::Mutex<()>>>>>, send_locks: Arc<Mutex<HashMap<VaultId, Arc<tokio::sync::Mutex<()>>>>>,
} }
@ -19,7 +27,7 @@ impl Broadcasts {
pub fn new(server_config: &ServerConfig) -> Self { pub fn new(server_config: &ServerConfig) -> Self {
Self { Self {
broadcast_channel_capacity: server_config.broadcast_channel_capacity, broadcast_channel_capacity: server_config.broadcast_channel_capacity,
tx: Arc::new(Mutex::new(HashMap::new())), tx: Arc::new(StdMutex::new(HashMap::new())),
send_locks: Arc::new(Mutex::new(HashMap::new())), send_locks: Arc::new(Mutex::new(HashMap::new())),
} }
} }
@ -42,19 +50,25 @@ impl Broadcasts {
tx_map.retain(|_, sender| sender.receiver_count() > 0); tx_map.retain(|_, sender| sender.receiver_count() > 0);
} }
pub async fn get_receiver( pub fn get_receiver(
&self, &self,
vault: VaultId, vault: VaultId,
max_clients: usize, max_clients: usize,
) -> Result<broadcast::Receiver<WebSocketServerMessageWithOrigin>, crate::errors::SyncServerError> ) -> Result<broadcast::Receiver<WebSocketServerMessageWithOrigin>, crate::errors::SyncServerError>
{ {
let mut tx_map = self.tx.lock().await; let mut tx_map = self
.tx
.lock()
.expect("broadcasts.tx mutex poisoned — a previous holder panicked");
Self::prune_inactive_vaults(&mut tx_map); Self::prune_inactive_vaults(&mut tx_map);
let sender = tx_map let sender = tx_map
.entry(vault) .entry(vault)
.or_insert_with(|| broadcast::channel(self.broadcast_channel_capacity).0); .or_insert_with(|| broadcast::channel(self.broadcast_channel_capacity).0);
// Hold the lock across the count check *and* the subscribe so the
// `max_clients` cap is atomic: two concurrent callers can't both
// observe `receiver_count() < max_clients` and both subscribe.
if sender.receiver_count() >= max_clients { if sender.receiver_count() >= max_clients {
return Err(crate::errors::client_error(anyhow::anyhow!( return Err(crate::errors::client_error(anyhow::anyhow!(
"Vault has reached the maximum number of clients ({max_clients})" "Vault has reached the maximum number of clients ({max_clients})"
@ -65,13 +79,18 @@ impl Broadcasts {
} }
/// Notify all clients (who are subscribed to the vault) about an update. /// Notify all clients (who are subscribed to the vault) about an update.
/// We only log failures and don't propagate them. /// Synchronous: safe to invoke from a handler between `commit()` and
pub async fn send_document_update( /// function return without worrying about task cancellation dropping
/// the broadcast mid-flight. Failures are logged, never propagated.
pub fn send_document_update(
&self, &self,
vault: VaultId, vault: VaultId,
document: WebSocketServerMessageWithOrigin, document: WebSocketServerMessageWithOrigin,
) { ) {
let mut tx_map = self.tx.lock().await; let mut tx_map = self
.tx
.lock()
.expect("broadcasts.tx mutex poisoned — a previous holder panicked");
Self::prune_inactive_vaults(&mut tx_map); Self::prune_inactive_vaults(&mut tx_map);
let sender = tx_map let sender = tx_map

View file

@ -69,31 +69,40 @@ pub async fn create_document(
.map_err(server_error)?; .map_err(server_error)?;
if let Some(latest_version) = latest_version { if let Some(latest_version) = latest_version {
let is_mergeable_text = is_file_type_mergable( // Only merge with an existing document the client couldn't have
&sanitized_relative_path, // known about: its creation is newer than the client's last seen
&state.config.server.mergeable_file_extensions, // vault update to avoid creating cycles by merging two documents into one.
) && !is_binary(&latest_version.content) // This could happen if both clients know of document A at path P1,
&& !is_binary(&new_content); // but client 2 moves it to P2 while client 1 creates a new document at P2,
// then client 1 would merge its new document with the moved version of A at P2
if is_mergeable_text || new_content == latest_version.content { // that client 2 resulting in two files (P1 and P2) with the same doc id (A).
return update_document::update_document( if latest_version.creation_vault_update_id > request.last_seen_vault_update_id {
let is_mergeable_text = is_file_type_mergable(
&sanitized_relative_path, &sanitized_relative_path,
Vec::new(), &state.config.server.mergeable_file_extensions,
vault_id, ) && !is_binary(&latest_version.content)
latest_version.document_id, && !is_binary(&new_content);
&request.relative_path,
new_content,
user,
device_id,
state,
transaction,
)
.await;
}
// For non-mergeable (binary) files with different content, don't if is_mergeable_text || new_content == latest_version.content {
// merge, create a separate document at a deconflicted path so return update_document::update_document(
// neither client's data is silently overwritten. &sanitized_relative_path,
Vec::new(),
vault_id,
latest_version.document_id,
&request.relative_path,
new_content,
user,
device_id,
state,
transaction,
)
.await;
}
// For non-mergeable (binary) files with different content, don't
// merge, create a separate document at a deconflicted path so
// neither client's data is silently overwritten.
}
} }
let document_id = uuid::Uuid::new_v4(); let document_id = uuid::Uuid::new_v4();

View file

@ -1,4 +1,4 @@
use anyhow::Context; use anyhow::{Context, anyhow};
use axum::{ use axum::{
Extension, Json, Extension, Json,
extract::{Path, State}, extract::{Path, State},
@ -17,7 +17,7 @@ use crate::{
}, },
}, },
config::user_config::User, config::user_config::User,
errors::{SyncServerError, server_error, write_transaction_error}, errors::{SyncServerError, not_found_error, server_error, write_transaction_error},
utils::normalize::normalize, utils::normalize::normalize,
}; };
@ -60,9 +60,18 @@ pub async fn delete_document(
.await .await
.map_err(server_error)?; .map_err(server_error)?;
if let Some(latest_version) = &latest_version let Some(latest_version) = latest_version else {
&& latest_version.is_deleted transaction
{ .rollback()
.await
.context("Failed to roll back transaction")
.map_err(server_error)?;
return Err(not_found_error(anyhow!(
"Document `{document_id}` not found in vault `{vault_id}`"
)));
};
if latest_version.is_deleted {
transaction transaction
.rollback() .rollback()
.await .await
@ -70,15 +79,13 @@ pub async fn delete_document(
.map_err(server_error)?; .map_err(server_error)?;
info!("Document `{document_id}` has already been deleted",); info!("Document `{document_id}` has already been deleted",);
return Ok(Json(latest_version.clone().into())); return Ok(Json(latest_version.into()));
} }
let new_vault_update_id = last_update_id + 1; let new_vault_update_id = last_update_id + 1;
let (latest_relative_path, latest_content, creation_vault_update_id) = let latest_relative_path = latest_version.relative_path;
latest_version.map_or_else( let latest_content = latest_version.content;
|| (String::new(), Vec::new(), new_vault_update_id), let creation_vault_update_id = latest_version.creation_vault_update_id;
|version| (version.relative_path, version.content, version.creation_vault_update_id),
);
let new_version = StoredDocumentVersion { let new_version = StoredDocumentVersion {
vault_update_id: new_vault_update_id, vault_update_id: new_vault_update_id,

View file

@ -11,6 +11,9 @@ use crate::app_state::database::models::VaultUpdateId;
pub struct CreateDocumentVersion { pub struct CreateDocumentVersion {
pub relative_path: String, pub relative_path: String,
#[ts(type = "number")]
pub last_seen_vault_update_id: VaultUpdateId,
#[ts(as = "Vec<u8>")] #[ts(as = "Vec<u8>")]
#[form_data(limit = "unlimited")] #[form_data(limit = "unlimited")]
pub content: FieldData<Bytes>, pub content: FieldData<Bytes>,

View file

@ -147,7 +147,14 @@ pub async fn restore_document_version(
let (content_changed, path_changed) = match &current_latest { let (content_changed, path_changed) = match &current_latest {
Some(prev) => ( Some(prev) => (
prev.content != new_version.content || prev.is_deleted, prev.content != new_version.content || prev.is_deleted,
prev.relative_path != new_version.relative_path, // Mirror `update_document`: `path_changed` is true when the
// stored path differs from either the prior stored path (peers
// need to learn about the move) *or* from the path the caller
// implicitly requested (`target_version.relative_path`, so the
// origin learns if the server deduped its requested restore
// path).
prev.relative_path != new_version.relative_path
|| target_version.relative_path != new_version.relative_path,
), ),
// No prior version (shouldn't happen in practice — target_version // No prior version (shouldn't happen in practice — target_version
// already proved the document exists — but treat defensively). // already proved the document exists — but treat defensively).

View file

@ -115,7 +115,6 @@ async fn websocket(
let mut broadcast_receiver = match state let mut broadcast_receiver = match state
.broadcasts .broadcasts
.get_receiver(vault_id.clone(), max_clients) .get_receiver(vault_id.clone(), max_clients)
.await
{ {
Ok(receiver) => receiver, Ok(receiver) => receiver,
Err(err) => { Err(err) => {