Add API for propagating cursor locations #61

Merged
schmelczer merged 30 commits from asch/show-cursors into main 2025-06-08 20:20:53 +01:00
5 changed files with 27 additions and 12 deletions
Showing only changes of commit 02f32e894a - Show all commits

View file

@ -34,6 +34,7 @@ impl Cursors {
pub async fn update_cursors(
&self,
vault_id: VaultId,
user_name: String,
device_id: &DeviceId,
document_to_cursors: HashMap<String, Vec<CursorSpan>>,
) {
@ -43,6 +44,7 @@ impl Cursors {
all_device_cursors.retain(|c| &c.client_cursors.device_id != device_id);
all_device_cursors.push(ClientCursorsWithTimeToLive::new(ClientCursors {
user_name,
device_id: device_id.to_string(),
cursors: document_to_cursors,
}));

View file

@ -31,6 +31,7 @@ pub struct CursorPositionFromClient {
#[derive(TS, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ClientCursors {
pub user_name: String,
pub device_id: DeviceId,
pub cursors: HashMap<String, Vec<CursorSpan>>,
}

View file

@ -8,15 +8,21 @@ use crate::{
AppState,
database::models::{DocumentVersionWithoutContent, VaultId, VaultUpdateId},
},
config::user_config::User,
errors::{SyncServerError, server_error, unauthenticated_error},
server::auth::auth,
};
pub struct AuthenticatedWebSocketHandshake {
pub handshake: WebSocketHandshake,
pub user: User,
}
pub fn get_authenticated_handshake(
state: &AppState,
vault_id: &VaultId,
message: Option<Message>,
) -> Result<WebSocketHandshake, SyncServerError> {
) -> Result<AuthenticatedWebSocketHandshake, SyncServerError> {
if let Some(Message::Text(message)) = message {
let message: WebSocketClientMessage = serde_json::from_str(&message)
.context("Failed to parse message")
@ -24,8 +30,8 @@ pub fn get_authenticated_handshake(
match message {
WebSocketClientMessage::Handshake(handshake) => {
auth(state, handshake.token.trim(), vault_id)?;
Ok(handshake)
let user = auth(state, handshake.token.trim(), vault_id)?;
Ok(AuthenticatedWebSocketHandshake { handshake, user })
}
WebSocketClientMessage::CursorPositions(_) => Err(unauthenticated_error(
anyhow::anyhow!("Expected a handshake message"),

View file

@ -52,6 +52,7 @@ async fn websocket_wrapped(state: AppState, stream: WebSocket, vault_id: VaultId
}
}
#[allow(clippy::too_many_lines)]
async fn websocket(
state: AppState,
stream: WebSocket,
@ -59,7 +60,7 @@ async fn websocket(
) -> Result<(), SyncServerError> {
let (mut sender, mut websocket_receiver) = stream.split();
let handshake = get_authenticated_handshake(
let authed_handshake = get_authenticated_handshake(
&state,
&vault_id,
websocket_receiver
@ -71,15 +72,19 @@ async fn websocket(
info!(
"WebSocket handshake successful for vault '{vault_id}' for '{}'",
handshake.device_id
authed_handshake.handshake.device_id
);
let mut broadcast_receiver = state.broadcasts.get_receiver(vault_id.clone()).await;
send_update_over_websocket(
&WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate {
documents: get_unseen_documents(&state, &vault_id, handshake.last_seen_vault_update_id)
.await?,
documents: get_unseen_documents(
&state,
&vault_id,
authed_handshake.handshake.last_seen_vault_update_id,
)
.await?,
is_initial_sync: true,
}),
&mut sender,
@ -94,7 +99,7 @@ async fn websocket(
)
.await?;
let device_id = handshake.device_id.clone();
let device_id = authed_handshake.handshake.device_id.clone();
let mut send_task = tokio::spawn(async move {
while let Ok(update) = broadcast_receiver.recv().await {
if Some(&device_id) == update.origin_device_id.as_ref() {
@ -107,7 +112,7 @@ async fn websocket(
Ok::<(), SyncServerError>(())
});
let device_id = handshake.device_id.clone();
let device_id = authed_handshake.handshake.device_id.clone();
let vault_id_clone = vault_id.clone();
let cursor_manager = state.cursors.clone();
let mut receive_task = tokio::spawn(async move {
@ -126,6 +131,7 @@ async fn websocket(
cursor_manager
.update_cursors(
vault_id_clone.clone(),
authed_handshake.user.name.clone(),
&device_id,
cursors.document_to_cursors,
)
@ -161,13 +167,13 @@ async fn websocket(
state
.cursors
.remove_cursors_of_device(&vault_id, &handshake.device_id)
.remove_cursors_of_device(&vault_id, &authed_handshake.handshake.device_id)
.await;
if result.is_err() {
info!(
"WebSocket disconnected on vault '{vault_id}' for '{}'",
handshake.device_id
authed_handshake.handshake.device_id
);
}

View file

@ -1,4 +1,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { CursorSpan } from "./CursorSpan";
export type ClientCursors = { deviceId: string, cursors: { [key in string]?: Array<CursorSpan> }, };
export type ClientCursors = { userName: string, deviceId: string, cursors: { [key in string]?: Array<CursorSpan> }, };