From d8b6ec5b77447c09047c7cc50fc965d6e8893683 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sun, 10 May 2026 15:16:40 +0100 Subject: [PATCH] Simplify --- sync-server/src/app_state/cursors.rs | 13 ++++--------- sync-server/src/app_state/database.rs | 13 +++++++------ sync-server/src/app_state/websocket/broadcasts.rs | 14 +++++++------- sync-server/src/app_state/websocket/models.rs | 11 ----------- sync-server/src/server/websocket.rs | 6 +++--- 5 files changed, 21 insertions(+), 36 deletions(-) diff --git a/sync-server/src/app_state/cursors.rs b/sync-server/src/app_state/cursors.rs index d3ea0602..130da680 100644 --- a/sync-server/src/app_state/cursors.rs +++ b/sync-server/src/app_state/cursors.rs @@ -7,10 +7,7 @@ use super::{ database::models::{DeviceId, VaultId}, websocket::{ broadcasts::Broadcasts, - models::{ - ClientCursors, CursorPositionFromServer, WebSocketServerMessage, - WebSocketServerMessageWithOrigin, - }, + models::{ClientCursors, CursorPositionFromServer, WebSocketServerMessage}, }, }; use crate::{ @@ -126,11 +123,9 @@ impl Cursors { self.broadcasts.send_document_update( vault_id, - WebSocketServerMessageWithOrigin::new(WebSocketServerMessage::CursorPositions( - CursorPositionFromServer { - clients: client_cursors, - }, - )), + WebSocketServerMessage::CursorPositions(CursorPositionFromServer { + clients: client_cursors, + }), ) } diff --git a/sync-server/src/app_state/database.rs b/sync-server/src/app_state/database.rs index 1a2483a2..ace07de3 100644 --- a/sync-server/src/app_state/database.rs +++ b/sync-server/src/app_state/database.rs @@ -29,7 +29,7 @@ use uuid::fmt::Hyphenated; use super::websocket::{ broadcasts::Broadcasts, - models::{WebSocketServerMessage, WebSocketServerMessageWithOrigin, WebSocketVaultUpdate}, + models::{WebSocketServerMessage, WebSocketVaultUpdate}, }; use crate::config::database_config::DatabaseConfig; use crate::consts::IDLE_POOL_TIMEOUT; @@ -800,11 +800,12 @@ impl Database { // delivery channel — and the client-side `parentVersionId` // dedup absorbs the redundant message when the response made it // through. - let envelope = WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate { - document: version.clone().into(), - }); - self.broadcasts - .send_document_update(vault_id, WebSocketServerMessageWithOrigin::new(envelope))?; + self.broadcasts.send_document_update( + vault_id, + WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate { + document: version.clone().into(), + }), + )?; Ok(()) } diff --git a/sync-server/src/app_state/websocket/broadcasts.rs b/sync-server/src/app_state/websocket/broadcasts.rs index 5dec6221..0ef21e4e 100644 --- a/sync-server/src/app_state/websocket/broadcasts.rs +++ b/sync-server/src/app_state/websocket/broadcasts.rs @@ -6,7 +6,7 @@ use std::{ use log::{debug, info, warn}; use tokio::sync::{Mutex, broadcast}; -use super::models::{WebSocketServerMessage, WebSocketServerMessageWithOrigin}; +use super::models::WebSocketServerMessage; use crate::{ app_state::database::models::VaultId, config::server_config::ServerConfig, @@ -21,11 +21,11 @@ pub struct Broadcasts { // this non-async lets `send_document_update` run without an `.await`, // so an axum handler that is cancelled between `transaction.commit()` // and the broadcast can never drop the notification mid-flight. - tx: Arc>>>, + tx: Arc>>>, send_locks: Arc>>>>, } -type TxMap = HashMap>; +type TxMap = HashMap>; impl Broadcasts { pub fn new(server_config: &ServerConfig) -> Self { @@ -66,7 +66,7 @@ impl Broadcasts { &self, vault: &VaultId, max_clients: usize, - ) -> Result, SyncServerError> { + ) -> Result, SyncServerError> { let mut tx_map = self .tx .lock() @@ -110,13 +110,13 @@ impl Broadcasts { pub fn send_document_update( &self, vault: &str, - document: WebSocketServerMessageWithOrigin, + document: WebSocketServerMessage, ) -> Result<(), SyncServerError> { - let vault_update_id = match &document.message { + let vault_update_id = match &document { WebSocketServerMessage::VaultUpdate(u) => Some(u.document.vault_update_id), WebSocketServerMessage::CursorPositions(_) => None, }; - let is_deleted = match &document.message { + let is_deleted = match &document { WebSocketServerMessage::VaultUpdate(u) => Some(u.document.is_deleted), WebSocketServerMessage::CursorPositions(_) => None, }; diff --git a/sync-server/src/app_state/websocket/models.rs b/sync-server/src/app_state/websocket/models.rs index ebe4018b..60d690cd 100644 --- a/sync-server/src/app_state/websocket/models.rs +++ b/sync-server/src/app_state/websocket/models.rs @@ -79,14 +79,3 @@ pub enum WebSocketServerMessage { VaultUpdate(WebSocketVaultUpdate), CursorPositions(CursorPositionFromServer), } - -#[derive(Clone, Debug)] -pub struct WebSocketServerMessageWithOrigin { - pub message: WebSocketServerMessage, -} - -impl WebSocketServerMessageWithOrigin { - pub fn new(message: WebSocketServerMessage) -> Self { - Self { message } - } -} diff --git a/sync-server/src/server/websocket.rs b/sync-server/src/server/websocket.rs index e2db6ca9..379f68fd 100644 --- a/sync-server/src/server/websocket.rs +++ b/sync-server/src/server/websocket.rs @@ -238,13 +238,13 @@ async fn websocket( // Cursor messages aren't versioned and are always // forwarded. if let WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate { document }) = - &update.message + &update && document.vault_update_id <= cursor { continue; } - let message = match update.message { + let message = match update { WebSocketServerMessage::CursorPositions(CursorPositionFromServer { clients, }) => WebSocketServerMessage::CursorPositions(CursorPositionFromServer { @@ -253,7 +253,7 @@ async fn websocket( .filter(|client| client.device_id != device_id) .collect(), }), - WebSocketServerMessage::VaultUpdate(_) => update.message, + update @ WebSocketServerMessage::VaultUpdate(_) => update, }; send_update_over_websocket(&message, &mut sender).await?;