diff --git a/frontend/history-ui/src/lib/types/WebSocketServerMessage.ts b/frontend/history-ui/src/lib/types/WebSocketServerMessage.ts index 09bd3e86..45e37358 100644 --- a/frontend/history-ui/src/lib/types/WebSocketServerMessage.ts +++ b/frontend/history-ui/src/lib/types/WebSocketServerMessage.ts @@ -1,6 +1,5 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { CursorPositionFromServer } from "./CursorPositionFromServer"; -import type { WebSocketVaultPathChange } from "./WebSocketVaultPathChange"; import type { WebSocketVaultUpdate } from "./WebSocketVaultUpdate"; -export type WebSocketServerMessage = { "type": "vaultUpdate" } & WebSocketVaultUpdate | { "type": "pathChange" } & WebSocketVaultPathChange | { "type": "cursorPositions" } & CursorPositionFromServer; +export type WebSocketServerMessage = { "type": "vaultUpdate" } & WebSocketVaultUpdate | { "type": "cursorPositions" } & CursorPositionFromServer; diff --git a/frontend/history-ui/src/lib/types/WebSocketVaultPathChange.ts b/frontend/history-ui/src/lib/types/WebSocketVaultPathChange.ts deleted file mode 100644 index 6ae24f75..00000000 --- a/frontend/history-ui/src/lib/types/WebSocketVaultPathChange.ts +++ /dev/null @@ -1,3 +0,0 @@ -// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. - -export type WebSocketVaultPathChange = { vaultUpdateId: number, documentId: string, relativePath: string, }; diff --git a/frontend/history-ui/src/lib/types/WebSocketVaultUpdate.ts b/frontend/history-ui/src/lib/types/WebSocketVaultUpdate.ts index b627ac3c..fc10827f 100644 --- a/frontend/history-ui/src/lib/types/WebSocketVaultUpdate.ts +++ b/frontend/history-ui/src/lib/types/WebSocketVaultUpdate.ts @@ -1,4 +1,4 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent"; -export type WebSocketVaultUpdate = { documents: Array, isInitialSync: boolean, }; +export type WebSocketVaultUpdate = { document: DocumentVersionWithoutContent, }; diff --git a/frontend/sync-client/src/services/types/WebSocketServerMessage.ts b/frontend/sync-client/src/services/types/WebSocketServerMessage.ts index 09bd3e86..45e37358 100644 --- a/frontend/sync-client/src/services/types/WebSocketServerMessage.ts +++ b/frontend/sync-client/src/services/types/WebSocketServerMessage.ts @@ -1,6 +1,5 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { CursorPositionFromServer } from "./CursorPositionFromServer"; -import type { WebSocketVaultPathChange } from "./WebSocketVaultPathChange"; import type { WebSocketVaultUpdate } from "./WebSocketVaultUpdate"; -export type WebSocketServerMessage = { "type": "vaultUpdate" } & WebSocketVaultUpdate | { "type": "pathChange" } & WebSocketVaultPathChange | { "type": "cursorPositions" } & CursorPositionFromServer; +export type WebSocketServerMessage = { "type": "vaultUpdate" } & WebSocketVaultUpdate | { "type": "cursorPositions" } & CursorPositionFromServer; diff --git a/frontend/sync-client/src/services/types/WebSocketVaultPathChange.ts b/frontend/sync-client/src/services/types/WebSocketVaultPathChange.ts deleted file mode 100644 index f59ca5a5..00000000 --- a/frontend/sync-client/src/services/types/WebSocketVaultPathChange.ts +++ /dev/null @@ -1,3 +0,0 @@ -// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. - -export interface WebSocketVaultPathChange { vaultUpdateId: number, documentId: string, relativePath: string, } diff --git a/frontend/sync-client/src/services/types/WebSocketVaultUpdate.ts b/frontend/sync-client/src/services/types/WebSocketVaultUpdate.ts index 39e03b6f..5e7df8a5 100644 --- a/frontend/sync-client/src/services/types/WebSocketVaultUpdate.ts +++ b/frontend/sync-client/src/services/types/WebSocketVaultUpdate.ts @@ -1,4 +1,4 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent"; -export interface WebSocketVaultUpdate { documents: DocumentVersionWithoutContent[], isInitialSync: boolean, } +export interface WebSocketVaultUpdate { document: DocumentVersionWithoutContent, } diff --git a/frontend/sync-client/src/services/websocket-manager.ts b/frontend/sync-client/src/services/websocket-manager.ts index 6c938dc7..970defb3 100644 --- a/frontend/sync-client/src/services/websocket-manager.ts +++ b/frontend/sync-client/src/services/websocket-manager.ts @@ -5,7 +5,6 @@ import type { WebSocketClientMessage } from "./types/WebSocketClientMessage"; import type { CursorPositionFromClient } from "./types/CursorPositionFromClient"; import type { ClientCursors } from "./types/ClientCursors"; import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate"; -import type { WebSocketVaultPathChange } from "./types/WebSocketVaultPathChange"; import { WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS, WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS @@ -23,10 +22,6 @@ export class WebSocketManager { (update: WebSocketVaultUpdate) => Promise >(); - public readonly onRemotePathChangeReceived = new EventListeners< - (pathChange: WebSocketVaultPathChange) => Promise - >(); - public readonly onRemoteCursorsUpdateReceived = new EventListeners< (cursors: ClientCursors[]) => Promise >(); @@ -295,12 +290,6 @@ export class WebSocketManager { case "vaultUpdate": await this.onRemoteVaultUpdateReceived.triggerAsync(message); return; - case "pathChange": - this.logger.debug( - `Received path change for document ${message.documentId} → ${message.relativePath}` - ); - await this.onRemotePathChangeReceived.triggerAsync(message); - return; case "cursorPositions": this.logger.debug( `Received cursor positions for ${JSON.stringify(message.clients)}` diff --git a/frontend/sync-client/src/sync-operations/sync-event-queue.ts b/frontend/sync-client/src/sync-operations/sync-event-queue.ts index 8a19009a..d6859ead 100644 --- a/frontend/sync-client/src/sync-operations/sync-event-queue.ts +++ b/frontend/sync-client/src/sync-operations/sync-event-queue.ts @@ -44,7 +44,7 @@ export class SyncEventQueue { private savePending = false; - private lastSeenUpdateId: VaultUpdateId; + private readonly lastSeenUpdateId: VaultUpdateId; public constructor( private readonly settings: Settings, @@ -250,9 +250,7 @@ export class SyncEventQueue { e.documentId === docId) || (e.type === SyncEventType.RemoteUpdate && // we care about the local path not the remote - this.getDocumentByDocumentId(e.remoteVersion.documentId)?.path === path) || - (e.type === SyncEventType.RemotePathChange && - this.getDocumentByDocumentId(e.pathChange.documentId)?.path === path) + this.getDocumentByDocumentId(e.remoteVersion.documentId)?.path === path) ); } @@ -280,10 +278,7 @@ export class SyncEventQueue { } public enqueue(input: FileSyncEvent): void { - if ( - input.type === SyncEventType.RemoteUpdate || - input.type === SyncEventType.RemotePathChange - ) { + if (input.type === SyncEventType.RemoteUpdate) { this.events.push(input); return; } @@ -414,31 +409,15 @@ export class SyncEventQueue { return result; } - // Coalesce multiple events of the same remote kind for the same - // documentId to the last one. Kinds are coalesced independently so - // that an interleaved content+path stream (e.g. VaultUpdate → - // PathChange) still preserves the VaultUpdate-before-PathChange - // ordering invariant the syncer relies on. - if (first.type === SyncEventType.RemoteUpdate) { - const { documentId } = first.remoteVersion; - const matching = this.events.filter( - (e) => - e.type === SyncEventType.RemoteUpdate && - e.remoteVersion.documentId === documentId - ); - const result = matching[matching.length - 1]; - for (const item of matching) { - removeFromArray(this.events, item); - } - return result; - } - - // SyncRemotePath - const { documentId } = first.pathChange; + // Coalesce multiple RemoteUpdate events for the same documentId + // down to the last one — the `.next` walk already short-circuits + // on obsolete versions via `parentVersionId` checks, but compacting + // here keeps the queue bounded under burst remote activity. + const { documentId } = first.remoteVersion; const matching = this.events.filter( (e) => - e.type === SyncEventType.RemotePathChange && - e.pathChange.documentId === documentId + e.type === SyncEventType.RemoteUpdate && + e.remoteVersion.documentId === documentId ); const result = matching[matching.length - 1]; for (const item of matching) { @@ -463,8 +442,6 @@ export class SyncEventQueue { e.documentId === documentId) || (e.type === SyncEventType.RemoteUpdate && e.remoteVersion.documentId === documentId) || - (e.type === SyncEventType.RemotePathChange && - e.pathChange.documentId === documentId) || (e.type === SyncEventType.LocalDelete && e.documentId === documentId) ) { diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 202499f8..9d009c3f 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -14,7 +14,6 @@ import { scheduleOfflineChanges } from "./offline-change-detector"; import { SyncResetError } from "../errors/sync-reset-error"; import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent"; import type { WebSocketVaultUpdate } from "../services/types/WebSocketVaultUpdate"; -import type { WebSocketVaultPathChange } from "../services/types/WebSocketVaultPathChange"; import type { WebSocketManager } from "../services/websocket-manager"; import type { WebSocketClientMessage } from "../services/types/WebSocketClientMessage"; import { EventListeners } from "../utils/data-structures/event-listeners"; @@ -67,14 +66,19 @@ export class Syncer { this.webSocketManager.onWebSocketStatusChanged.add((isConnected) => { if (isConnected) { this.sendHandshakeMessage(); + // The server no longer carries an `is_initial_sync` + // terminator: it streams missed versions as individual + // VaultUpdates and then behaves like a live subscription. + // Mark first-sync as complete once we've observed the + // transition to "connected" — per-path sync status still + // relies on `hasPendingEventsForPath`, which correctly + // shows SYNCING while catch-up events are in flight. + this._isFirstSyncComplete = true; } }); this.webSocketManager.onRemoteVaultUpdateReceived.add( this.syncRemotelyUpdatedFile.bind(this) ); - this.webSocketManager.onRemotePathChangeReceived.add( - this.syncRemotelyChangedPath.bind(this) - ); } public get isFirstSyncComplete(): boolean { @@ -106,63 +110,22 @@ export class Syncer { } + // Handler for every `WebSocketVaultUpdate` the server emits. The + // server filters out messages authored by this device, so every + // update here comes from a peer (or is part of the catch-up stream + // the server replays on connect for versions we missed while + // offline). public async syncRemotelyUpdatedFile( message: WebSocketVaultUpdate ): Promise { await this.scheduleSyncForOfflineChanges(); - for (const remoteVersion of message.documents) { - this.queue.enqueue({ - type: SyncEventType.RemoteUpdate, - remoteVersion - }); - } - - if (message.isInitialSync) { - this._isFirstSyncComplete = true; - } + this.queue.enqueue({ + type: SyncEventType.RemoteUpdate, + remoteVersion: message.document + }); this.ensureDraining(); - - } - - // A PathChange notifies us that a document now lives at a new server- - // canonical path. It's delivered to every client (origin included) - // because the create/update HTTP response no longer carries the path, - // so the only way the origin learns about dedupe or first-rename-wins - // is via this event. - // - // Algorithmic assumptions: - // (1) Per-vault broadcast ordering is preserved by the server, so if - // the same write produced a `VaultUpdate` (content change) and a - // `PathChange` (path change), the `VaultUpdate` is handled first - // — that's what lets us skip advancing `parentVersionId` here - // without risking a stuck "already up-to-date" check later. - // (2) On a lag-induced disconnect (`broadcast::error::Lagged`) the - // server disconnects the client for a full resync, so out-of- - // order delivery across a reconnect boundary can't leave us with - // a stale PathChange overwriting a newer one. - public async syncRemotelyChangedPath( - pathChange: WebSocketVaultPathChange - ): Promise { - try { - await this.scheduleSyncForOfflineChanges(); - - this.queue.enqueue({ - type: SyncEventType.RemotePathChange, - pathChange - }); - - await this.scheduleDrain(); - } catch (e) { - if (e instanceof SyncResetError) { - this.logger.info( - "Failed to apply remote path change due to a reset" - ); - return; - } - this.logger.error(`Failed to apply remote path change: ${e}`); - } } public async scheduleSyncForOfflineChanges(): Promise { @@ -332,9 +295,6 @@ export class Syncer { case SyncEventType.RemoteUpdate: await this.processSyncRemoteContent(event); break; - case SyncEventType.RemotePathChange: - await this.processSyncRemotePath(event); - break; } } catch (e) { if (e instanceof FileNotFoundError) { @@ -594,51 +554,6 @@ export class Syncer { await this.processRemoteUpdateForNewDocument(remoteVersion); } - private async processSyncRemotePath( - event: Extract - ): Promise { - const { pathChange } = event; - const existing = this.queue.getDocumentByDocumentId( - pathChange.documentId - ); - if (existing === undefined) { - throw new Error( - `Received path change for unknown document ${pathChange.documentId}` - ); - } - - const { path: currentPath, record } = existing; - const newPath = pathChange.relativePath; - - if (currentPath !== newPath) { - await this.operations.move(currentPath, newPath); - - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: { - type: SyncType.MOVE, - relativePath: newPath, - movedFrom: currentPath - }, - message: "Applied remote path change", - author: pathChange.userId, - timestamp: new Date(pathChange.updatedDate) - }); - } - - // `operations.move` updates the queue's path index, but doesn't - // touch `remoteRelativePath`. Refresh it so offline change - // detection compares against the server's path. parentVersionId - // intentionally stays at its prior value: if the write also - // changed content, the corresponding VaultUpdate handles that; - // advancing it here would make us skip fetching content we don't - // yet have. - this.queue.setDocument(newPath, { - ...record, - remoteRelativePath: newPath - }); - } - private async processRemoteUpdateForExistingDocument( currentPath: RelativePath, record: DocumentRecord, @@ -734,14 +649,14 @@ export class Syncer { // Path reconciliation fallback for the reconnect case. // - // In steady-state streaming, server-initiated renames arrive as - // dedicated `PathChange` WebSocket events and are handled by - // `syncRemotelyChangedPath`. But the reconnect catch-up path - // (`get_unseen_documents` → `VaultUpdate(is_initial_sync=…)`) - // replays *versions* from the DB — `PathChange` is emission- - // only and not replayed. Without this branch, a pure rename - // that happened while we were disconnected would leave our - // local file stuck at its old path forever. + // In steady-state streaming, server-initiated renames arrive + // as `VaultUpdate` events with `originatesFromSelf=true` for + // the author and drive `processSyncRemotePath`. The reconnect + // catch-up (`get_unseen_documents` → `is_initial_sync=true`) + // replays versions authored by any device with + // `originatesFromSelf=false`, so those take the full remote- + // sync branch and we need this in-branch path reconciliation + // to avoid leaving the local file stuck at its old path. // // Only apply the server's path when the record's // `remoteRelativePath` still matches `currentPath` — that means @@ -1107,8 +1022,8 @@ export class Syncer { } } // Only delete on disk if the record at `path` is still the one - // we expected — if a PathChange moved another doc here, we - // shouldn't delete its file. + // we expected — if a self-origin path-change moved another doc + // here, we shouldn't delete its file. const finalRecord = this.queue.getSettledDocumentByPath(path); if ( finalRecord === undefined || @@ -1121,9 +1036,10 @@ export class Syncer { } // The response carries content only — path reconciliation is the - // sole responsibility of the `PathChange` WebSocket event, which - // fires independently for renames/dedupes. We therefore always - // record the current local `path` here; an in-flight `PathChange` + // sole responsibility of the self-origin `VaultUpdate` echo (the + // `originatesFromSelf=true` branch of `syncRemoteVaultUpdate`), + // which fires independently for renames/dedupes. We therefore + // always record the current local `path` here; an in-flight echo // will move the file and fix `remoteRelativePath` if the server // placed the document somewhere else. const existingRecord = this.queue.getSettledDocumentByPath(path); diff --git a/frontend/sync-client/src/sync-operations/types.ts b/frontend/sync-client/src/sync-operations/types.ts index 22b82b3e..4a201ad9 100644 --- a/frontend/sync-client/src/sync-operations/types.ts +++ b/frontend/sync-client/src/sync-operations/types.ts @@ -1,5 +1,4 @@ import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent"; -import type { WebSocketVaultPathChange } from "../services/types/WebSocketVaultPathChange"; export type VaultUpdateId = number; export type DocumentId = string; @@ -25,16 +24,14 @@ export enum SyncEventType { LocalCreate = "local-create", LocalUpdate = "local-update", // includes both content and path changes LocalDelete = "local-delete", - RemoteUpdate = "remote-update", - RemotePathChange = "remote-path-change", + RemoteUpdate = "remote-update", // includes every type of update coming from the server } export type FileSyncEvent = | { type: SyncEventType.LocalCreate; path: RelativePath } | { type: SyncEventType.LocalUpdate; path: RelativePath; oldPath?: RelativePath } | { type: SyncEventType.LocalDelete; path: RelativePath } - | { type: SyncEventType.RemoteUpdate; remoteVersion: DocumentVersionWithoutContent } - | { type: SyncEventType.RemotePathChange; pathChange: WebSocketVaultPathChange }; + | { type: SyncEventType.RemoteUpdate; remoteVersion: DocumentVersionWithoutContent }; export type SyncEvent = | { @@ -57,8 +54,4 @@ export type SyncEvent = | { type: SyncEventType.RemoteUpdate; remoteVersion: DocumentVersionWithoutContent; - } - | { - type: SyncEventType.RemotePathChange; - pathChange: WebSocketVaultPathChange; }; diff --git a/sync-server/src/app_state/database.rs b/sync-server/src/app_state/database.rs index c9ea9746..8e505083 100644 --- a/sync-server/src/app_state/database.rs +++ b/sync-server/src/app_state/database.rs @@ -20,24 +20,6 @@ pub mod models; #[error("Database is busy")] pub struct WriteBusyError; -/// Tells [`Database::insert_document_version`] which WebSocket events the -/// just-committed version should produce. The caller is the only party -/// with enough context to decide this (the DB layer has no access to -/// "what the client sent" or "what the prior version looked like"). -#[derive(Debug, Clone, Copy, Default)] -pub struct InsertBroadcast { - /// Emit a `VaultUpdate` (filtered from the origin device). Set when - /// the stored bytes differ from the prior version's bytes — i.e. - /// peers need to pull new content. - pub content_changed: bool, - - /// Emit a `PathChange` (delivered to every client, origin included). - /// Set when the stored path differs from the prior stored path *or* - /// from the path the origin client sent — i.e. someone needs to - /// reconcile a dedupe, rename, or first-rename-wins outcome. - pub path_changed: bool, -} - use sqlx::{ Pool, Sqlite, pool::PoolConnection, sqlite::SqliteConnection, sqlite::SqlitePoolOptions, }; @@ -47,10 +29,7 @@ use uuid::fmt::Hyphenated; use super::websocket::{ broadcasts::Broadcasts, - models::{ - WebSocketServerMessage, WebSocketServerMessageWithOrigin, WebSocketVaultPathChange, - WebSocketVaultUpdate, - }, + models::{WebSocketServerMessage, WebSocketServerMessageWithOrigin, WebSocketVaultUpdate}, }; use crate::config::database_config::DatabaseConfig; use crate::consts::IDLE_POOL_TIMEOUT; @@ -693,7 +672,6 @@ impl Database { vault_id: &VaultId, version: &StoredDocumentVersion, mut transaction: WriteTransaction, - broadcast: InsertBroadcast, ) -> Result<()> { let document_id = version.document_id.as_hyphenated(); let query = sqlx::query!( @@ -739,39 +717,19 @@ impl Database { .await .context("Failed to commit transaction")?; - if broadcast.content_changed { - // Content events are filtered out for the origin device — the - // origin already has the content (or learns about the merge - // via the HTTP response). - self.broadcasts.send_document_update( - vault_id.clone(), - WebSocketServerMessageWithOrigin::with_origin( - version.device_id.clone(), - WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate { - documents: vec![version.clone().into()], - is_initial_sync: false, - }), - ), - ); - } - - if broadcast.path_changed { - // Path change events intentionally carry no origin so *every* - // connected client (including the one that made the write) - // receives them. The create/update HTTP response no longer - // carries `relative_path`, so the origin device relies on this - // event to learn the server-canonical path. - self.broadcasts.send_document_update( - vault_id.clone(), - WebSocketServerMessageWithOrigin::new(WebSocketServerMessage::PathChange( - WebSocketVaultPathChange { - vault_update_id: version.vault_update_id, - document_id: version.document_id, - relative_path: version.relative_path.clone(), - }, - )), - ); - } + // The broadcast is delivered to every connected client except the + // author — the send task filters on `origin_device_id` (see + // `websocket.rs`). The origin already has authoritative state + // from the HTTP response that triggered this write. + self.broadcasts.send_document_update( + vault_id.clone(), + WebSocketServerMessageWithOrigin::with_origin( + version.device_id.clone(), + WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate { + document: version.clone().into(), + }), + ), + ); Ok(()) } diff --git a/sync-server/src/app_state/websocket/models.rs b/sync-server/src/app_state/websocket/models.rs index 73e81f26..983c0dad 100644 --- a/sync-server/src/app_state/websocket/models.rs +++ b/sync-server/src/app_state/websocket/models.rs @@ -58,23 +58,15 @@ pub struct CursorPositionFromServer { pub clients: Vec, } -// Clients only get notified of other clients' updates through WebSocketVaultUpdate. +// One committed version, broadcast to every connected client *except* +// the device that authored it — that device already has the new state +// via its HTTP response. The server also emits these one-at-a-time to +// catch up a freshly-connected client on versions committed while it +// was offline, in ascending `vault_update_id` order. #[derive(TS, Serialize, Clone, Debug)] #[serde(rename_all = "camelCase")] pub struct WebSocketVaultUpdate { - pub documents: Vec, - pub is_initial_sync: bool, -} - -// Clients get notified of both their own and other clients' path changes through WebSocketVaultPathChange. -// This is becuase we must absolutely order path updates as they may all depend on all previous updates. -#[derive(TS, Serialize, Clone, Debug)] -#[serde(rename_all = "camelCase")] -pub struct WebSocketVaultPathChange { - #[ts(type = "number")] - pub vault_update_id: VaultUpdateId, - pub document_id: DocumentId, - pub relative_path: String, + pub document: DocumentVersionWithoutContent, } #[derive(TS, Deserialize, Clone, Debug)] @@ -90,10 +82,13 @@ pub enum WebSocketClientMessage { #[ts(export)] pub enum WebSocketServerMessage { VaultUpdate(WebSocketVaultUpdate), - PathChange(WebSocketVaultPathChange), CursorPositions(CursorPositionFromServer), } +/// Broadcast envelope carrying the message plus the device that produced +/// it. The per-recipient send task compares `origin_device_id` against +/// its own device id to fill in `originates_from_self` before the message +/// is serialized on the wire. #[derive(Clone, Debug)] pub struct WebSocketServerMessageWithOrigin { pub origin_device_id: Option, diff --git a/sync-server/src/server/create_document.rs b/sync-server/src/server/create_document.rs index 64c3c5fe..84703139 100644 --- a/sync-server/src/server/create_document.rs +++ b/sync-server/src/server/create_document.rs @@ -11,10 +11,7 @@ use super::{device_id_header::DeviceIdHeader, requests::CreateDocumentVersion}; use crate::{ app_state::{ AppState, - database::{ - InsertBroadcast, - models::{StoredDocumentVersion, VaultId}, - }, + database::models::{StoredDocumentVersion, VaultId}, }, config::user_config::User, errors::{SyncServerError, client_error, server_error, write_transaction_error}, @@ -128,8 +125,6 @@ pub async fn create_document( ); } - let path_changed = deduped_path != sanitized_relative_path; - let new_vault_update_id = last_update_id + 1; let new_version = StoredDocumentVersion { vault_update_id: new_vault_update_id, @@ -146,17 +141,7 @@ pub async fn create_document( state .database - .insert_document_version( - &vault_id, - &new_version, - transaction, - InsertBroadcast { - // A brand-new document is always a content change for peers. - content_changed: true, - // Origin needs to know if the server deduped its requested path. - path_changed, - }, - ) + .insert_document_version(&vault_id, &new_version, transaction) .await .map_err(server_error)?; diff --git a/sync-server/src/server/delete_document.rs b/sync-server/src/server/delete_document.rs index aeec13d3..76f92b71 100644 --- a/sync-server/src/server/delete_document.rs +++ b/sync-server/src/server/delete_document.rs @@ -11,10 +11,7 @@ use super::device_id_header::DeviceIdHeader; use crate::{ app_state::{ AppState, - database::{ - InsertBroadcast, - models::{DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId}, - }, + database::models::{DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId}, }, config::user_config::User, errors::{SyncServerError, not_found_error, server_error, write_transaction_error}, @@ -101,17 +98,7 @@ pub async fn delete_document( state .database - .insert_document_version( - &vault_id, - &new_version, - transaction, - InsertBroadcast { - // Deletion is a content change peers must learn about. - content_changed: true, - // Delete never renames. - path_changed: false, - }, - ) + .insert_document_version(&vault_id, &new_version, transaction) .await .map_err(server_error)?; diff --git a/sync-server/src/server/update_document.rs b/sync-server/src/server/update_document.rs index 1963310a..8b9c3bf5 100644 --- a/sync-server/src/server/update_document.rs +++ b/sync-server/src/server/update_document.rs @@ -17,7 +17,7 @@ use crate::{ app_state::{ AppState, database::{ - InsertBroadcast, WriteTransaction, + WriteTransaction, models::{DocumentId, StoredDocumentVersion, VaultId, VaultUpdateId}, }, }, @@ -292,14 +292,6 @@ pub async fn update_document( latest_version.relative_path.clone() }; - let content_changed = merged_content != latest_version.content; - // Stored path differs from either the prior stored path (peers need - // to learn about the rename) or from the path the origin sent - // (origin needs to learn if its rename was deduped or rejected by - // first-rename-wins). - let path_changed = new_relative_path != latest_version.relative_path - || new_relative_path != sanitized_relative_path; - let new_version = StoredDocumentVersion { document_id, vault_update_id: last_update_id + 1, @@ -315,15 +307,7 @@ pub async fn update_document( state .database - .insert_document_version( - &vault_id, - &new_version, - transaction, - InsertBroadcast { - content_changed, - path_changed, - }, - ) + .insert_document_version(&vault_id, &new_version, transaction) .await .map_err(server_error)?; diff --git a/sync-server/src/server/websocket.rs b/sync-server/src/server/websocket.rs index 4540539a..46d67533 100644 --- a/sync-server/src/server/websocket.rs +++ b/sync-server/src/server/websocket.rs @@ -134,19 +134,21 @@ async fn websocket( } }; - send_update_over_websocket( - &WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate { - documents: get_unseen_documents( - &state, - &vault_id, - authed_handshake.handshake.last_seen_vault_update_id, - ) - .await?, - is_initial_sync: true, - }), - &mut sender, + // Catch-up on versions committed while this client was offline, + // streamed one-at-a-time in ascending `vault_update_id` order + let unseen_documents = get_unseen_documents( + &state, + &vault_id, + authed_handshake.handshake.last_seen_vault_update_id, ) .await?; + for document in unseen_documents { + send_update_over_websocket( + &WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate { document }), + &mut sender, + ) + .await?; + } send_update_over_websocket( &WebSocketServerMessage::CursorPositions(CursorPositionFromServer { @@ -161,6 +163,8 @@ async fn websocket( loop { match broadcast_receiver.recv().await { Ok(update) => { + // Drop messages this device authored because the HTTP + // response already carried authoritative state back. if Some(&device_id) == update.origin_device_id.as_ref() { continue; } @@ -174,8 +178,7 @@ async fn websocket( .filter(|client| client.device_id != device_id) .collect(), }), - WebSocketServerMessage::VaultUpdate(_) - | WebSocketServerMessage::PathChange(_) => update.message, + WebSocketServerMessage::VaultUpdate(_) => update.message, }; send_update_over_websocket(&message, &mut sender).await?;