no remote path chacnge

This commit is contained in:
Andras Schmelczer 2026-04-24 21:33:00 +01:00
parent 19d5dc1999
commit 17a1f4d060
16 changed files with 93 additions and 314 deletions

View file

@ -1,6 +1,5 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { CursorPositionFromServer } from "./CursorPositionFromServer";
import type { WebSocketVaultPathChange } from "./WebSocketVaultPathChange";
import type { WebSocketVaultUpdate } from "./WebSocketVaultUpdate";
export type WebSocketServerMessage = { "type": "vaultUpdate" } & WebSocketVaultUpdate | { "type": "pathChange" } & WebSocketVaultPathChange | { "type": "cursorPositions" } & CursorPositionFromServer;
export type WebSocketServerMessage = { "type": "vaultUpdate" } & WebSocketVaultUpdate | { "type": "cursorPositions" } & CursorPositionFromServer;

View file

@ -1,3 +0,0 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type WebSocketVaultPathChange = { vaultUpdateId: number, documentId: string, relativePath: string, };

View file

@ -1,4 +1,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent";
export type WebSocketVaultUpdate = { documents: Array<DocumentVersionWithoutContent>, isInitialSync: boolean, };
export type WebSocketVaultUpdate = { document: DocumentVersionWithoutContent, };

View file

@ -1,6 +1,5 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { CursorPositionFromServer } from "./CursorPositionFromServer";
import type { WebSocketVaultPathChange } from "./WebSocketVaultPathChange";
import type { WebSocketVaultUpdate } from "./WebSocketVaultUpdate";
export type WebSocketServerMessage = { "type": "vaultUpdate" } & WebSocketVaultUpdate | { "type": "pathChange" } & WebSocketVaultPathChange | { "type": "cursorPositions" } & CursorPositionFromServer;
export type WebSocketServerMessage = { "type": "vaultUpdate" } & WebSocketVaultUpdate | { "type": "cursorPositions" } & CursorPositionFromServer;

View file

@ -1,3 +0,0 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface WebSocketVaultPathChange { vaultUpdateId: number, documentId: string, relativePath: string, }

View file

@ -1,4 +1,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent";
export interface WebSocketVaultUpdate { documents: DocumentVersionWithoutContent[], isInitialSync: boolean, }
export interface WebSocketVaultUpdate { document: DocumentVersionWithoutContent, }

View file

@ -5,7 +5,6 @@ import type { WebSocketClientMessage } from "./types/WebSocketClientMessage";
import type { CursorPositionFromClient } from "./types/CursorPositionFromClient";
import type { ClientCursors } from "./types/ClientCursors";
import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate";
import type { WebSocketVaultPathChange } from "./types/WebSocketVaultPathChange";
import {
WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS,
WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS
@ -23,10 +22,6 @@ export class WebSocketManager {
(update: WebSocketVaultUpdate) => Promise<void>
>();
public readonly onRemotePathChangeReceived = new EventListeners<
(pathChange: WebSocketVaultPathChange) => Promise<void>
>();
public readonly onRemoteCursorsUpdateReceived = new EventListeners<
(cursors: ClientCursors[]) => Promise<void>
>();
@ -295,12 +290,6 @@ export class WebSocketManager {
case "vaultUpdate":
await this.onRemoteVaultUpdateReceived.triggerAsync(message);
return;
case "pathChange":
this.logger.debug(
`Received path change for document ${message.documentId}${message.relativePath}`
);
await this.onRemotePathChangeReceived.triggerAsync(message);
return;
case "cursorPositions":
this.logger.debug(
`Received cursor positions for ${JSON.stringify(message.clients)}`

View file

@ -44,7 +44,7 @@ export class SyncEventQueue {
private savePending = false;
private lastSeenUpdateId: VaultUpdateId;
private readonly lastSeenUpdateId: VaultUpdateId;
public constructor(
private readonly settings: Settings,
@ -250,9 +250,7 @@ export class SyncEventQueue {
e.documentId === docId) ||
(e.type === SyncEventType.RemoteUpdate &&
// we care about the local path not the remote
this.getDocumentByDocumentId(e.remoteVersion.documentId)?.path === path) ||
(e.type === SyncEventType.RemotePathChange &&
this.getDocumentByDocumentId(e.pathChange.documentId)?.path === path)
this.getDocumentByDocumentId(e.remoteVersion.documentId)?.path === path)
);
}
@ -280,10 +278,7 @@ export class SyncEventQueue {
}
public enqueue(input: FileSyncEvent): void {
if (
input.type === SyncEventType.RemoteUpdate ||
input.type === SyncEventType.RemotePathChange
) {
if (input.type === SyncEventType.RemoteUpdate) {
this.events.push(input);
return;
}
@ -414,31 +409,15 @@ export class SyncEventQueue {
return result;
}
// Coalesce multiple events of the same remote kind for the same
// documentId to the last one. Kinds are coalesced independently so
// that an interleaved content+path stream (e.g. VaultUpdate →
// PathChange) still preserves the VaultUpdate-before-PathChange
// ordering invariant the syncer relies on.
if (first.type === SyncEventType.RemoteUpdate) {
const { documentId } = first.remoteVersion;
const matching = this.events.filter(
(e) =>
e.type === SyncEventType.RemoteUpdate &&
e.remoteVersion.documentId === documentId
);
const result = matching[matching.length - 1];
for (const item of matching) {
removeFromArray(this.events, item);
}
return result;
}
// SyncRemotePath
const { documentId } = first.pathChange;
// Coalesce multiple RemoteUpdate events for the same documentId
// down to the last one — the `.next` walk already short-circuits
// on obsolete versions via `parentVersionId` checks, but compacting
// here keeps the queue bounded under burst remote activity.
const { documentId } = first.remoteVersion;
const matching = this.events.filter(
(e) =>
e.type === SyncEventType.RemotePathChange &&
e.pathChange.documentId === documentId
e.type === SyncEventType.RemoteUpdate &&
e.remoteVersion.documentId === documentId
);
const result = matching[matching.length - 1];
for (const item of matching) {
@ -463,8 +442,6 @@ export class SyncEventQueue {
e.documentId === documentId) ||
(e.type === SyncEventType.RemoteUpdate &&
e.remoteVersion.documentId === documentId) ||
(e.type === SyncEventType.RemotePathChange &&
e.pathChange.documentId === documentId) ||
(e.type === SyncEventType.LocalDelete &&
e.documentId === documentId)
) {

View file

@ -14,7 +14,6 @@ import { scheduleOfflineChanges } from "./offline-change-detector";
import { SyncResetError } from "../errors/sync-reset-error";
import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent";
import type { WebSocketVaultUpdate } from "../services/types/WebSocketVaultUpdate";
import type { WebSocketVaultPathChange } from "../services/types/WebSocketVaultPathChange";
import type { WebSocketManager } from "../services/websocket-manager";
import type { WebSocketClientMessage } from "../services/types/WebSocketClientMessage";
import { EventListeners } from "../utils/data-structures/event-listeners";
@ -67,14 +66,19 @@ export class Syncer {
this.webSocketManager.onWebSocketStatusChanged.add((isConnected) => {
if (isConnected) {
this.sendHandshakeMessage();
// The server no longer carries an `is_initial_sync`
// terminator: it streams missed versions as individual
// VaultUpdates and then behaves like a live subscription.
// Mark first-sync as complete once we've observed the
// transition to "connected" — per-path sync status still
// relies on `hasPendingEventsForPath`, which correctly
// shows SYNCING while catch-up events are in flight.
this._isFirstSyncComplete = true;
}
});
this.webSocketManager.onRemoteVaultUpdateReceived.add(
this.syncRemotelyUpdatedFile.bind(this)
);
this.webSocketManager.onRemotePathChangeReceived.add(
this.syncRemotelyChangedPath.bind(this)
);
}
public get isFirstSyncComplete(): boolean {
@ -106,63 +110,22 @@ export class Syncer {
}
// Handler for every `WebSocketVaultUpdate` the server emits. The
// server filters out messages authored by this device, so every
// update here comes from a peer (or is part of the catch-up stream
// the server replays on connect for versions we missed while
// offline).
public async syncRemotelyUpdatedFile(
message: WebSocketVaultUpdate
): Promise<void> {
await this.scheduleSyncForOfflineChanges();
for (const remoteVersion of message.documents) {
this.queue.enqueue({
type: SyncEventType.RemoteUpdate,
remoteVersion
});
}
if (message.isInitialSync) {
this._isFirstSyncComplete = true;
}
this.queue.enqueue({
type: SyncEventType.RemoteUpdate,
remoteVersion: message.document
});
this.ensureDraining();
}
// A PathChange notifies us that a document now lives at a new server-
// canonical path. It's delivered to every client (origin included)
// because the create/update HTTP response no longer carries the path,
// so the only way the origin learns about dedupe or first-rename-wins
// is via this event.
//
// Algorithmic assumptions:
// (1) Per-vault broadcast ordering is preserved by the server, so if
// the same write produced a `VaultUpdate` (content change) and a
// `PathChange` (path change), the `VaultUpdate` is handled first
// — that's what lets us skip advancing `parentVersionId` here
// without risking a stuck "already up-to-date" check later.
// (2) On a lag-induced disconnect (`broadcast::error::Lagged`) the
// server disconnects the client for a full resync, so out-of-
// order delivery across a reconnect boundary can't leave us with
// a stale PathChange overwriting a newer one.
public async syncRemotelyChangedPath(
pathChange: WebSocketVaultPathChange
): Promise<void> {
try {
await this.scheduleSyncForOfflineChanges();
this.queue.enqueue({
type: SyncEventType.RemotePathChange,
pathChange
});
await this.scheduleDrain();
} catch (e) {
if (e instanceof SyncResetError) {
this.logger.info(
"Failed to apply remote path change due to a reset"
);
return;
}
this.logger.error(`Failed to apply remote path change: ${e}`);
}
}
public async scheduleSyncForOfflineChanges(): Promise<void> {
@ -332,9 +295,6 @@ export class Syncer {
case SyncEventType.RemoteUpdate:
await this.processSyncRemoteContent(event);
break;
case SyncEventType.RemotePathChange:
await this.processSyncRemotePath(event);
break;
}
} catch (e) {
if (e instanceof FileNotFoundError) {
@ -594,51 +554,6 @@ export class Syncer {
await this.processRemoteUpdateForNewDocument(remoteVersion);
}
private async processSyncRemotePath(
event: Extract<SyncEvent, { type: SyncEventType.RemotePathChange }>
): Promise<void> {
const { pathChange } = event;
const existing = this.queue.getDocumentByDocumentId(
pathChange.documentId
);
if (existing === undefined) {
throw new Error(
`Received path change for unknown document ${pathChange.documentId}`
);
}
const { path: currentPath, record } = existing;
const newPath = pathChange.relativePath;
if (currentPath !== newPath) {
await this.operations.move(currentPath, newPath);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: {
type: SyncType.MOVE,
relativePath: newPath,
movedFrom: currentPath
},
message: "Applied remote path change",
author: pathChange.userId,
timestamp: new Date(pathChange.updatedDate)
});
}
// `operations.move` updates the queue's path index, but doesn't
// touch `remoteRelativePath`. Refresh it so offline change
// detection compares against the server's path. parentVersionId
// intentionally stays at its prior value: if the write also
// changed content, the corresponding VaultUpdate handles that;
// advancing it here would make us skip fetching content we don't
// yet have.
this.queue.setDocument(newPath, {
...record,
remoteRelativePath: newPath
});
}
private async processRemoteUpdateForExistingDocument(
currentPath: RelativePath,
record: DocumentRecord,
@ -734,14 +649,14 @@ export class Syncer {
// Path reconciliation fallback for the reconnect case.
//
// In steady-state streaming, server-initiated renames arrive as
// dedicated `PathChange` WebSocket events and are handled by
// `syncRemotelyChangedPath`. But the reconnect catch-up path
// (`get_unseen_documents` → `VaultUpdate(is_initial_sync=…)`)
// replays *versions* from the DB — `PathChange` is emission-
// only and not replayed. Without this branch, a pure rename
// that happened while we were disconnected would leave our
// local file stuck at its old path forever.
// In steady-state streaming, server-initiated renames arrive
// as `VaultUpdate` events with `originatesFromSelf=true` for
// the author and drive `processSyncRemotePath`. The reconnect
// catch-up (`get_unseen_documents` → `is_initial_sync=true`)
// replays versions authored by any device with
// `originatesFromSelf=false`, so those take the full remote-
// sync branch and we need this in-branch path reconciliation
// to avoid leaving the local file stuck at its old path.
//
// Only apply the server's path when the record's
// `remoteRelativePath` still matches `currentPath` — that means
@ -1107,8 +1022,8 @@ export class Syncer {
}
}
// Only delete on disk if the record at `path` is still the one
// we expected — if a PathChange moved another doc here, we
// shouldn't delete its file.
// we expected — if a self-origin path-change moved another doc
// here, we shouldn't delete its file.
const finalRecord = this.queue.getSettledDocumentByPath(path);
if (
finalRecord === undefined ||
@ -1121,9 +1036,10 @@ export class Syncer {
}
// The response carries content only — path reconciliation is the
// sole responsibility of the `PathChange` WebSocket event, which
// fires independently for renames/dedupes. We therefore always
// record the current local `path` here; an in-flight `PathChange`
// sole responsibility of the self-origin `VaultUpdate` echo (the
// `originatesFromSelf=true` branch of `syncRemoteVaultUpdate`),
// which fires independently for renames/dedupes. We therefore
// always record the current local `path` here; an in-flight echo
// will move the file and fix `remoteRelativePath` if the server
// placed the document somewhere else.
const existingRecord = this.queue.getSettledDocumentByPath(path);

View file

@ -1,5 +1,4 @@
import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent";
import type { WebSocketVaultPathChange } from "../services/types/WebSocketVaultPathChange";
export type VaultUpdateId = number;
export type DocumentId = string;
@ -25,16 +24,14 @@ export enum SyncEventType {
LocalCreate = "local-create",
LocalUpdate = "local-update", // includes both content and path changes
LocalDelete = "local-delete",
RemoteUpdate = "remote-update",
RemotePathChange = "remote-path-change",
RemoteUpdate = "remote-update", // includes every type of update coming from the server
}
export type FileSyncEvent =
| { type: SyncEventType.LocalCreate; path: RelativePath }
| { type: SyncEventType.LocalUpdate; path: RelativePath; oldPath?: RelativePath }
| { type: SyncEventType.LocalDelete; path: RelativePath }
| { type: SyncEventType.RemoteUpdate; remoteVersion: DocumentVersionWithoutContent }
| { type: SyncEventType.RemotePathChange; pathChange: WebSocketVaultPathChange };
| { type: SyncEventType.RemoteUpdate; remoteVersion: DocumentVersionWithoutContent };
export type SyncEvent =
| {
@ -57,8 +54,4 @@ export type SyncEvent =
| {
type: SyncEventType.RemoteUpdate;
remoteVersion: DocumentVersionWithoutContent;
}
| {
type: SyncEventType.RemotePathChange;
pathChange: WebSocketVaultPathChange;
};

View file

@ -20,24 +20,6 @@ pub mod models;
#[error("Database is busy")]
pub struct WriteBusyError;
/// Tells [`Database::insert_document_version`] which WebSocket events the
/// just-committed version should produce. The caller is the only party
/// with enough context to decide this (the DB layer has no access to
/// "what the client sent" or "what the prior version looked like").
#[derive(Debug, Clone, Copy, Default)]
pub struct InsertBroadcast {
/// Emit a `VaultUpdate` (filtered from the origin device). Set when
/// the stored bytes differ from the prior version's bytes — i.e.
/// peers need to pull new content.
pub content_changed: bool,
/// Emit a `PathChange` (delivered to every client, origin included).
/// Set when the stored path differs from the prior stored path *or*
/// from the path the origin client sent — i.e. someone needs to
/// reconcile a dedupe, rename, or first-rename-wins outcome.
pub path_changed: bool,
}
use sqlx::{
Pool, Sqlite, pool::PoolConnection, sqlite::SqliteConnection, sqlite::SqlitePoolOptions,
};
@ -47,10 +29,7 @@ use uuid::fmt::Hyphenated;
use super::websocket::{
broadcasts::Broadcasts,
models::{
WebSocketServerMessage, WebSocketServerMessageWithOrigin, WebSocketVaultPathChange,
WebSocketVaultUpdate,
},
models::{WebSocketServerMessage, WebSocketServerMessageWithOrigin, WebSocketVaultUpdate},
};
use crate::config::database_config::DatabaseConfig;
use crate::consts::IDLE_POOL_TIMEOUT;
@ -693,7 +672,6 @@ impl Database {
vault_id: &VaultId,
version: &StoredDocumentVersion,
mut transaction: WriteTransaction,
broadcast: InsertBroadcast,
) -> Result<()> {
let document_id = version.document_id.as_hyphenated();
let query = sqlx::query!(
@ -739,39 +717,19 @@ impl Database {
.await
.context("Failed to commit transaction")?;
if broadcast.content_changed {
// Content events are filtered out for the origin device — the
// origin already has the content (or learns about the merge
// via the HTTP response).
self.broadcasts.send_document_update(
vault_id.clone(),
WebSocketServerMessageWithOrigin::with_origin(
version.device_id.clone(),
WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate {
documents: vec![version.clone().into()],
is_initial_sync: false,
}),
),
);
}
if broadcast.path_changed {
// Path change events intentionally carry no origin so *every*
// connected client (including the one that made the write)
// receives them. The create/update HTTP response no longer
// carries `relative_path`, so the origin device relies on this
// event to learn the server-canonical path.
self.broadcasts.send_document_update(
vault_id.clone(),
WebSocketServerMessageWithOrigin::new(WebSocketServerMessage::PathChange(
WebSocketVaultPathChange {
vault_update_id: version.vault_update_id,
document_id: version.document_id,
relative_path: version.relative_path.clone(),
},
)),
);
}
// The broadcast is delivered to every connected client except the
// author — the send task filters on `origin_device_id` (see
// `websocket.rs`). The origin already has authoritative state
// from the HTTP response that triggered this write.
self.broadcasts.send_document_update(
vault_id.clone(),
WebSocketServerMessageWithOrigin::with_origin(
version.device_id.clone(),
WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate {
document: version.clone().into(),
}),
),
);
Ok(())
}

View file

@ -58,23 +58,15 @@ pub struct CursorPositionFromServer {
pub clients: Vec<ClientCursors>,
}
// Clients only get notified of other clients' updates through WebSocketVaultUpdate.
// One committed version, broadcast to every connected client *except*
// the device that authored it — that device already has the new state
// via its HTTP response. The server also emits these one-at-a-time to
// catch up a freshly-connected client on versions committed while it
// was offline, in ascending `vault_update_id` order.
#[derive(TS, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct WebSocketVaultUpdate {
pub documents: Vec<DocumentVersionWithoutContent>,
pub is_initial_sync: bool,
}
// Clients get notified of both their own and other clients' path changes through WebSocketVaultPathChange.
// This is becuase we must absolutely order path updates as they may all depend on all previous updates.
#[derive(TS, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct WebSocketVaultPathChange {
#[ts(type = "number")]
pub vault_update_id: VaultUpdateId,
pub document_id: DocumentId,
pub relative_path: String,
pub document: DocumentVersionWithoutContent,
}
#[derive(TS, Deserialize, Clone, Debug)]
@ -90,10 +82,13 @@ pub enum WebSocketClientMessage {
#[ts(export)]
pub enum WebSocketServerMessage {
VaultUpdate(WebSocketVaultUpdate),
PathChange(WebSocketVaultPathChange),
CursorPositions(CursorPositionFromServer),
}
/// Broadcast envelope carrying the message plus the device that produced
/// it. The per-recipient send task compares `origin_device_id` against
/// its own device id to fill in `originates_from_self` before the message
/// is serialized on the wire.
#[derive(Clone, Debug)]
pub struct WebSocketServerMessageWithOrigin {
pub origin_device_id: Option<DeviceId>,

View file

@ -11,10 +11,7 @@ use super::{device_id_header::DeviceIdHeader, requests::CreateDocumentVersion};
use crate::{
app_state::{
AppState,
database::{
InsertBroadcast,
models::{StoredDocumentVersion, VaultId},
},
database::models::{StoredDocumentVersion, VaultId},
},
config::user_config::User,
errors::{SyncServerError, client_error, server_error, write_transaction_error},
@ -128,8 +125,6 @@ pub async fn create_document(
);
}
let path_changed = deduped_path != sanitized_relative_path;
let new_vault_update_id = last_update_id + 1;
let new_version = StoredDocumentVersion {
vault_update_id: new_vault_update_id,
@ -146,17 +141,7 @@ pub async fn create_document(
state
.database
.insert_document_version(
&vault_id,
&new_version,
transaction,
InsertBroadcast {
// A brand-new document is always a content change for peers.
content_changed: true,
// Origin needs to know if the server deduped its requested path.
path_changed,
},
)
.insert_document_version(&vault_id, &new_version, transaction)
.await
.map_err(server_error)?;

View file

@ -11,10 +11,7 @@ use super::device_id_header::DeviceIdHeader;
use crate::{
app_state::{
AppState,
database::{
InsertBroadcast,
models::{DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId},
},
database::models::{DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId},
},
config::user_config::User,
errors::{SyncServerError, not_found_error, server_error, write_transaction_error},
@ -101,17 +98,7 @@ pub async fn delete_document(
state
.database
.insert_document_version(
&vault_id,
&new_version,
transaction,
InsertBroadcast {
// Deletion is a content change peers must learn about.
content_changed: true,
// Delete never renames.
path_changed: false,
},
)
.insert_document_version(&vault_id, &new_version, transaction)
.await
.map_err(server_error)?;

View file

@ -17,7 +17,7 @@ use crate::{
app_state::{
AppState,
database::{
InsertBroadcast, WriteTransaction,
WriteTransaction,
models::{DocumentId, StoredDocumentVersion, VaultId, VaultUpdateId},
},
},
@ -292,14 +292,6 @@ pub async fn update_document(
latest_version.relative_path.clone()
};
let content_changed = merged_content != latest_version.content;
// Stored path differs from either the prior stored path (peers need
// to learn about the rename) or from the path the origin sent
// (origin needs to learn if its rename was deduped or rejected by
// first-rename-wins).
let path_changed = new_relative_path != latest_version.relative_path
|| new_relative_path != sanitized_relative_path;
let new_version = StoredDocumentVersion {
document_id,
vault_update_id: last_update_id + 1,
@ -315,15 +307,7 @@ pub async fn update_document(
state
.database
.insert_document_version(
&vault_id,
&new_version,
transaction,
InsertBroadcast {
content_changed,
path_changed,
},
)
.insert_document_version(&vault_id, &new_version, transaction)
.await
.map_err(server_error)?;

View file

@ -134,19 +134,21 @@ async fn websocket(
}
};
send_update_over_websocket(
&WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate {
documents: get_unseen_documents(
&state,
&vault_id,
authed_handshake.handshake.last_seen_vault_update_id,
)
.await?,
is_initial_sync: true,
}),
&mut sender,
// Catch-up on versions committed while this client was offline,
// streamed one-at-a-time in ascending `vault_update_id` order
let unseen_documents = get_unseen_documents(
&state,
&vault_id,
authed_handshake.handshake.last_seen_vault_update_id,
)
.await?;
for document in unseen_documents {
send_update_over_websocket(
&WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate { document }),
&mut sender,
)
.await?;
}
send_update_over_websocket(
&WebSocketServerMessage::CursorPositions(CursorPositionFromServer {
@ -161,6 +163,8 @@ async fn websocket(
loop {
match broadcast_receiver.recv().await {
Ok(update) => {
// Drop messages this device authored because the HTTP
// response already carried authoritative state back.
if Some(&device_id) == update.origin_device_id.as_ref() {
continue;
}
@ -174,8 +178,7 @@ async fn websocket(
.filter(|client| client.device_id != device_id)
.collect(),
}),
WebSocketServerMessage::VaultUpdate(_)
| WebSocketServerMessage::PathChange(_) => update.message,
WebSocketServerMessage::VaultUpdate(_) => update.message,
};
send_update_over_websocket(&message, &mut sender).await?;