diff --git a/backend/sync_server/src/app_state/cursors.rs b/backend/sync_server/src/app_state/cursors.rs index a48aceec..a2dc6807 100644 --- a/backend/sync_server/src/app_state/cursors.rs +++ b/backend/sync_server/src/app_state/cursors.rs @@ -34,6 +34,7 @@ impl Cursors { pub async fn update_cursors( &self, vault_id: VaultId, + user_name: String, device_id: &DeviceId, document_to_cursors: HashMap>, ) { @@ -43,6 +44,7 @@ impl Cursors { all_device_cursors.retain(|c| &c.client_cursors.device_id != device_id); all_device_cursors.push(ClientCursorsWithTimeToLive::new(ClientCursors { + user_name, device_id: device_id.to_string(), cursors: document_to_cursors, })); diff --git a/backend/sync_server/src/app_state/websocket/models.rs b/backend/sync_server/src/app_state/websocket/models.rs index 0b8e1828..6bb4f4e1 100644 --- a/backend/sync_server/src/app_state/websocket/models.rs +++ b/backend/sync_server/src/app_state/websocket/models.rs @@ -31,6 +31,7 @@ pub struct CursorPositionFromClient { #[derive(TS, Serialize, Clone, Debug)] #[serde(rename_all = "camelCase")] pub struct ClientCursors { + pub user_name: String, pub device_id: DeviceId, pub cursors: HashMap>, } diff --git a/backend/sync_server/src/app_state/websocket/utils.rs b/backend/sync_server/src/app_state/websocket/utils.rs index cf337e39..1e0dd243 100644 --- a/backend/sync_server/src/app_state/websocket/utils.rs +++ b/backend/sync_server/src/app_state/websocket/utils.rs @@ -8,15 +8,21 @@ use crate::{ AppState, database::models::{DocumentVersionWithoutContent, VaultId, VaultUpdateId}, }, + config::user_config::User, errors::{SyncServerError, server_error, unauthenticated_error}, server::auth::auth, }; +pub struct AuthenticatedWebSocketHandshake { + pub handshake: WebSocketHandshake, + pub user: User, +} + pub fn get_authenticated_handshake( state: &AppState, vault_id: &VaultId, message: Option, -) -> Result { +) -> Result { if let Some(Message::Text(message)) = message { let message: WebSocketClientMessage = serde_json::from_str(&message) .context("Failed to parse message") @@ -24,8 +30,8 @@ pub fn get_authenticated_handshake( match message { WebSocketClientMessage::Handshake(handshake) => { - auth(state, handshake.token.trim(), vault_id)?; - Ok(handshake) + let user = auth(state, handshake.token.trim(), vault_id)?; + Ok(AuthenticatedWebSocketHandshake { handshake, user }) } WebSocketClientMessage::CursorPositions(_) => Err(unauthenticated_error( anyhow::anyhow!("Expected a handshake message"), diff --git a/backend/sync_server/src/server/websocket.rs b/backend/sync_server/src/server/websocket.rs index cfe7f8f9..e9dd8867 100644 --- a/backend/sync_server/src/server/websocket.rs +++ b/backend/sync_server/src/server/websocket.rs @@ -52,6 +52,7 @@ async fn websocket_wrapped(state: AppState, stream: WebSocket, vault_id: VaultId } } +#[allow(clippy::too_many_lines)] async fn websocket( state: AppState, stream: WebSocket, @@ -59,7 +60,7 @@ async fn websocket( ) -> Result<(), SyncServerError> { let (mut sender, mut websocket_receiver) = stream.split(); - let handshake = get_authenticated_handshake( + let authed_handshake = get_authenticated_handshake( &state, &vault_id, websocket_receiver @@ -71,15 +72,19 @@ async fn websocket( info!( "WebSocket handshake successful for vault '{vault_id}' for '{}'", - handshake.device_id + authed_handshake.handshake.device_id ); let mut broadcast_receiver = state.broadcasts.get_receiver(vault_id.clone()).await; send_update_over_websocket( &WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate { - documents: get_unseen_documents(&state, &vault_id, handshake.last_seen_vault_update_id) - .await?, + documents: get_unseen_documents( + &state, + &vault_id, + authed_handshake.handshake.last_seen_vault_update_id, + ) + .await?, is_initial_sync: true, }), &mut sender, @@ -94,7 +99,7 @@ async fn websocket( ) .await?; - let device_id = handshake.device_id.clone(); + let device_id = authed_handshake.handshake.device_id.clone(); let mut send_task = tokio::spawn(async move { while let Ok(update) = broadcast_receiver.recv().await { if Some(&device_id) == update.origin_device_id.as_ref() { @@ -107,7 +112,7 @@ async fn websocket( Ok::<(), SyncServerError>(()) }); - let device_id = handshake.device_id.clone(); + let device_id = authed_handshake.handshake.device_id.clone(); let vault_id_clone = vault_id.clone(); let cursor_manager = state.cursors.clone(); let mut receive_task = tokio::spawn(async move { @@ -126,6 +131,7 @@ async fn websocket( cursor_manager .update_cursors( vault_id_clone.clone(), + authed_handshake.user.name.clone(), &device_id, cursors.document_to_cursors, ) @@ -161,13 +167,13 @@ async fn websocket( state .cursors - .remove_cursors_of_device(&vault_id, &handshake.device_id) + .remove_cursors_of_device(&vault_id, &authed_handshake.handshake.device_id) .await; if result.is_err() { info!( "WebSocket disconnected on vault '{vault_id}' for '{}'", - handshake.device_id + authed_handshake.handshake.device_id ); } diff --git a/frontend/sync-client/src/services/types/ClientCursors.ts b/frontend/sync-client/src/services/types/ClientCursors.ts index 87ebebdf..a70420b8 100644 --- a/frontend/sync-client/src/services/types/ClientCursors.ts +++ b/frontend/sync-client/src/services/types/ClientCursors.ts @@ -1,4 +1,4 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { CursorSpan } from "./CursorSpan"; -export type ClientCursors = { deviceId: string, cursors: { [key in string]?: Array }, }; +export type ClientCursors = { userName: string, deviceId: string, cursors: { [key in string]?: Array }, };