This commit is contained in:
Andras Schmelczer 2026-05-10 15:16:40 +01:00
parent 0329fc29f2
commit d8b6ec5b77
5 changed files with 21 additions and 36 deletions

View file

@ -7,10 +7,7 @@ use super::{
database::models::{DeviceId, VaultId}, database::models::{DeviceId, VaultId},
websocket::{ websocket::{
broadcasts::Broadcasts, broadcasts::Broadcasts,
models::{ models::{ClientCursors, CursorPositionFromServer, WebSocketServerMessage},
ClientCursors, CursorPositionFromServer, WebSocketServerMessage,
WebSocketServerMessageWithOrigin,
},
}, },
}; };
use crate::{ use crate::{
@ -126,11 +123,9 @@ impl Cursors {
self.broadcasts.send_document_update( self.broadcasts.send_document_update(
vault_id, vault_id,
WebSocketServerMessageWithOrigin::new(WebSocketServerMessage::CursorPositions( WebSocketServerMessage::CursorPositions(CursorPositionFromServer {
CursorPositionFromServer { clients: client_cursors,
clients: client_cursors, }),
},
)),
) )
} }

View file

@ -29,7 +29,7 @@ use uuid::fmt::Hyphenated;
use super::websocket::{ use super::websocket::{
broadcasts::Broadcasts, broadcasts::Broadcasts,
models::{WebSocketServerMessage, WebSocketServerMessageWithOrigin, WebSocketVaultUpdate}, models::{WebSocketServerMessage, WebSocketVaultUpdate},
}; };
use crate::config::database_config::DatabaseConfig; use crate::config::database_config::DatabaseConfig;
use crate::consts::IDLE_POOL_TIMEOUT; use crate::consts::IDLE_POOL_TIMEOUT;
@ -800,11 +800,12 @@ impl Database {
// delivery channel — and the client-side `parentVersionId` // delivery channel — and the client-side `parentVersionId`
// dedup absorbs the redundant message when the response made it // dedup absorbs the redundant message when the response made it
// through. // through.
let envelope = WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate { self.broadcasts.send_document_update(
document: version.clone().into(), vault_id,
}); WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate {
self.broadcasts document: version.clone().into(),
.send_document_update(vault_id, WebSocketServerMessageWithOrigin::new(envelope))?; }),
)?;
Ok(()) Ok(())
} }

View file

@ -6,7 +6,7 @@ use std::{
use log::{debug, info, warn}; use log::{debug, info, warn};
use tokio::sync::{Mutex, broadcast}; use tokio::sync::{Mutex, broadcast};
use super::models::{WebSocketServerMessage, WebSocketServerMessageWithOrigin}; use super::models::WebSocketServerMessage;
use crate::{ use crate::{
app_state::database::models::VaultId, app_state::database::models::VaultId,
config::server_config::ServerConfig, config::server_config::ServerConfig,
@ -21,11 +21,11 @@ pub struct Broadcasts {
// this non-async lets `send_document_update` run without an `.await`, // this non-async lets `send_document_update` run without an `.await`,
// so an axum handler that is cancelled between `transaction.commit()` // so an axum handler that is cancelled between `transaction.commit()`
// and the broadcast can never drop the notification mid-flight. // and the broadcast can never drop the notification mid-flight.
tx: Arc<StdMutex<HashMap<VaultId, broadcast::Sender<WebSocketServerMessageWithOrigin>>>>, tx: Arc<StdMutex<HashMap<VaultId, broadcast::Sender<WebSocketServerMessage>>>>,
send_locks: Arc<Mutex<HashMap<VaultId, Arc<tokio::sync::Mutex<()>>>>>, send_locks: Arc<Mutex<HashMap<VaultId, Arc<tokio::sync::Mutex<()>>>>>,
} }
type TxMap = HashMap<VaultId, broadcast::Sender<WebSocketServerMessageWithOrigin>>; type TxMap = HashMap<VaultId, broadcast::Sender<WebSocketServerMessage>>;
impl Broadcasts { impl Broadcasts {
pub fn new(server_config: &ServerConfig) -> Self { pub fn new(server_config: &ServerConfig) -> Self {
@ -66,7 +66,7 @@ impl Broadcasts {
&self, &self,
vault: &VaultId, vault: &VaultId,
max_clients: usize, max_clients: usize,
) -> Result<broadcast::Receiver<WebSocketServerMessageWithOrigin>, SyncServerError> { ) -> Result<broadcast::Receiver<WebSocketServerMessage>, SyncServerError> {
let mut tx_map = self let mut tx_map = self
.tx .tx
.lock() .lock()
@ -110,13 +110,13 @@ impl Broadcasts {
pub fn send_document_update( pub fn send_document_update(
&self, &self,
vault: &str, vault: &str,
document: WebSocketServerMessageWithOrigin, document: WebSocketServerMessage,
) -> Result<(), SyncServerError> { ) -> Result<(), SyncServerError> {
let vault_update_id = match &document.message { let vault_update_id = match &document {
WebSocketServerMessage::VaultUpdate(u) => Some(u.document.vault_update_id), WebSocketServerMessage::VaultUpdate(u) => Some(u.document.vault_update_id),
WebSocketServerMessage::CursorPositions(_) => None, WebSocketServerMessage::CursorPositions(_) => None,
}; };
let is_deleted = match &document.message { let is_deleted = match &document {
WebSocketServerMessage::VaultUpdate(u) => Some(u.document.is_deleted), WebSocketServerMessage::VaultUpdate(u) => Some(u.document.is_deleted),
WebSocketServerMessage::CursorPositions(_) => None, WebSocketServerMessage::CursorPositions(_) => None,
}; };

View file

@ -79,14 +79,3 @@ pub enum WebSocketServerMessage {
VaultUpdate(WebSocketVaultUpdate), VaultUpdate(WebSocketVaultUpdate),
CursorPositions(CursorPositionFromServer), CursorPositions(CursorPositionFromServer),
} }
#[derive(Clone, Debug)]
pub struct WebSocketServerMessageWithOrigin {
pub message: WebSocketServerMessage,
}
impl WebSocketServerMessageWithOrigin {
pub fn new(message: WebSocketServerMessage) -> Self {
Self { message }
}
}

View file

@ -238,13 +238,13 @@ async fn websocket(
// Cursor messages aren't versioned and are always // Cursor messages aren't versioned and are always
// forwarded. // forwarded.
if let WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate { document }) = if let WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate { document }) =
&update.message &update
&& document.vault_update_id <= cursor && document.vault_update_id <= cursor
{ {
continue; continue;
} }
let message = match update.message { let message = match update {
WebSocketServerMessage::CursorPositions(CursorPositionFromServer { WebSocketServerMessage::CursorPositions(CursorPositionFromServer {
clients, clients,
}) => WebSocketServerMessage::CursorPositions(CursorPositionFromServer { }) => WebSocketServerMessage::CursorPositions(CursorPositionFromServer {
@ -253,7 +253,7 @@ async fn websocket(
.filter(|client| client.device_id != device_id) .filter(|client| client.device_id != device_id)
.collect(), .collect(),
}), }),
WebSocketServerMessage::VaultUpdate(_) => update.message, update @ WebSocketServerMessage::VaultUpdate(_) => update,
}; };
send_update_over_websocket(&message, &mut sender).await?; send_update_over_websocket(&message, &mut sender).await?;