Take last_seen_vault_update_id as a WS message instead of query parameter

This commit is contained in:
Andras Schmelczer 2025-04-08 22:22:37 +01:00
parent bda5f37385
commit 33fd127cf6
No known key found for this signature in database
GPG key ID: FC8F2C3D3D1A718C

View file

@ -1,7 +1,7 @@
use anyhow::Context;
use axum::{
extract::{
Path, Query, State,
Path, State,
ws::{Message, WebSocket, WebSocketUpgrade},
},
response::Response,
@ -21,7 +21,7 @@ use crate::{
database::models::{DeviceId, DocumentVersionWithoutContent, VaultId, VaultUpdateId},
},
errors::{SyncServerError, server_error, unauthenticated_error},
utils::normalize::{normalize, normalize_string},
utils::normalize::normalize,
};
// This is required for aide to infer the path parameter types and names
@ -31,30 +31,18 @@ pub struct WebsocketPathParams {
vault_id: VaultId,
}
// This is required for aide to infer the path parameter types and names
#[derive(Deserialize, JsonSchema)]
pub struct QueryParams {
since_update_id: Option<VaultUpdateId>,
}
pub async fn websocket_handler(
ws: WebSocketUpgrade,
Path(WebsocketPathParams { vault_id }): Path<WebsocketPathParams>,
Query(QueryParams { since_update_id }): Query<QueryParams>,
State(state): State<AppState>,
) -> Result<Response, SyncServerError> {
Ok(ws.on_upgrade(move |socket| websocket_wrapped(state, socket, vault_id, since_update_id)))
Ok(ws.on_upgrade(move |socket| websocket_wrapped(state, socket, vault_id)))
}
async fn websocket_wrapped(
state: AppState,
stream: WebSocket,
vault_id: VaultId,
since_update_id: Option<VaultUpdateId>,
) {
async fn websocket_wrapped(state: AppState, stream: WebSocket, vault_id: VaultId) {
info!("Websocket connection opened on vault '{vault_id}'");
let result = websocket(state, stream, vault_id.clone(), since_update_id).await;
let result = websocket(state, stream, vault_id.clone()).await;
if let Err(err) = result {
error!("Websocket connection error on vault '{vault_id}': {err}");
@ -68,13 +56,13 @@ async fn websocket_wrapped(
struct WebsocketHandshake {
pub token: String,
pub device_id: DeviceId,
pub last_seen_vault_update_id: Option<VaultUpdateId>,
}
async fn websocket(
state: AppState,
stream: WebSocket,
vault_id: VaultId,
since_update_id: Option<VaultUpdateId>,
) -> Result<(), SyncServerError> {
let (mut sender, mut receiver) = stream.split();
@ -83,7 +71,7 @@ async fn websocket(
.context("Failed to parse token")
.map_err(server_error)?;
auth(&state, handshake.token.trim(), &normalize_string(&vault_id))?;
auth(&state, handshake.token.trim(), &vault_id)?;
handshake
} else {
@ -94,10 +82,10 @@ async fn websocket(
let mut rx = state.broadcasts.get_receiver(vault_id.clone()).await;
let documents = if let Some(since_update_id) = since_update_id {
let documents = if let Some(update_id) = handshake.last_seen_vault_update_id {
state
.database
.get_latest_documents_since(&vault_id, since_update_id, None)
.get_latest_documents_since(&vault_id, update_id, None)
.await
.map_err(server_error)
} else {