From 5ee9db00076b4c367e209b248cf715f74741d333 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Tue, 21 Apr 2026 20:30:04 +0100 Subject: [PATCH] store creation id and implement moves --- .../src/services/websocket-manager.ts | 43 ++++++++----- .../sync-client/src/sync-operations/syncer.ts | 61 +++++++++++++++++++ sync-server/src/app_state/database.rs | 7 ++- ...421000000_add_creation_vault_update_id.sql | 20 ++++++ sync-server/src/app_state/database/models.rs | 1 + sync-server/src/server/create_document.rs | 4 +- sync-server/src/server/delete_document.rs | 13 ++-- .../src/server/restore_document_version.rs | 1 + sync-server/src/server/update_document.rs | 1 + 9 files changed, 128 insertions(+), 23 deletions(-) create mode 100644 sync-server/src/app_state/database/migrations/20260421000000_add_creation_vault_update_id.sql diff --git a/frontend/sync-client/src/services/websocket-manager.ts b/frontend/sync-client/src/services/websocket-manager.ts index 9263142a..5ec40e49 100644 --- a/frontend/sync-client/src/services/websocket-manager.ts +++ b/frontend/sync-client/src/services/websocket-manager.ts @@ -5,6 +5,7 @@ import type { WebSocketClientMessage } from "./types/WebSocketClientMessage"; import type { CursorPositionFromClient } from "./types/CursorPositionFromClient"; import type { ClientCursors } from "./types/ClientCursors"; import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate"; +import type { WebSocketVaultPathChange } from "./types/WebSocketVaultPathChange"; import { WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS, WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS @@ -22,6 +23,10 @@ export class WebSocketManager { (update: WebSocketVaultUpdate) => Promise >(); + public readonly onRemotePathChangeReceived = new EventListeners< + (pathChange: WebSocketVaultPathChange) => Promise + >(); + public readonly onRemoteCursorsUpdateReceived = new EventListeners< (cursors: ClientCursors[]) => Promise >(); @@ -280,22 +285,28 @@ export class WebSocketManager { private async handleWebSocketMessage( message: WebSocketServerMessage ): Promise { - if (message.type === "vaultUpdate") { - await this.onRemoteVaultUpdateReceived.triggerAsync(message); - - - } else if (message.type === "cursorPositions") { - this.logger.debug( - `Received cursor positions for ${JSON.stringify(message.clients)}` - ); - - await this.onRemoteCursorsUpdateReceived.triggerAsync( - message.clients - ); - } else { - this.logger.warn( - `Received unknown message type: ${JSON.stringify(message)}` - ); + switch (message.type) { + case "vaultUpdate": + await this.onRemoteVaultUpdateReceived.triggerAsync(message); + return; + case "pathChange": + this.logger.debug( + `Received path change for document ${message.documentId} → ${message.relativePath}` + ); + await this.onRemotePathChangeReceived.triggerAsync(message); + return; + case "cursorPositions": + this.logger.debug( + `Received cursor positions for ${JSON.stringify(message.clients)}` + ); + await this.onRemoteCursorsUpdateReceived.triggerAsync( + message.clients + ); + return; + default: + this.logger.warn( + `Received unknown message type: ${JSON.stringify(message)}` + ); } } } diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 17458cbe..7e772432 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -14,6 +14,7 @@ import { scheduleOfflineChanges } from "./offline-change-detector"; import { SyncResetError } from "../errors/sync-reset-error"; import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent"; import type { WebSocketVaultUpdate } from "../services/types/WebSocketVaultUpdate"; +import type { WebSocketVaultPathChange } from "../services/types/WebSocketVaultPathChange"; import type { WebSocketManager } from "../services/websocket-manager"; import type { WebSocketClientMessage } from "../services/types/WebSocketClientMessage"; import { EventListeners } from "../utils/data-structures/event-listeners"; @@ -73,6 +74,9 @@ export class Syncer { this.webSocketManager.onRemoteVaultUpdateReceived.add( this.syncRemotelyUpdatedFile.bind(this) ); + this.webSocketManager.onRemotePathChangeReceived.add( + this.syncRemotelyChangedPath.bind(this) + ); } public get isFirstSyncComplete(): boolean { @@ -173,6 +177,63 @@ export class Syncer { } } + // A PathChange notifies us that a document now lives at a new server- + // canonical path. It's delivered to every client (origin included) + // because the create/update HTTP response no longer carries the path, + // so the only way the origin learns about dedupe or first-rename-wins + // is via this event. + public async syncRemotelyChangedPath( + pathChange: WebSocketVaultPathChange + ): Promise { + try { + const existing = this.queue.getDocumentByDocumentId( + pathChange.documentId + ); + if (existing === undefined) { + throw new Error( + `Received path change for unknown document ${pathChange.documentId}` + ); + } + + const { path: currentPath, record } = existing; + const newPath = pathChange.relativePath; + + if (currentPath !== newPath) { + await this.operations.move(currentPath, newPath); + + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: { + type: SyncType.MOVE, + relativePath: newPath, + movedFrom: currentPath + }, + message: "Applied remote path change" + }); + } + + // `operations.move` updates the queue's path index, but + // doesn't touch `remoteRelativePath`. Refresh it so offline + // change detection compares against the server's path. + // parentVersionId intentionally stays at its prior value: + // if the write also changed content, the corresponding + // VaultUpdate handles that; advancing it here would make us + // skip fetching content we don't yet have. + this.queue.setDocument(newPath, { + ...record, + remoteRelativePath: newPath + }); + } catch (e) { + if (e instanceof SyncResetError) { + this.logger.info( + "Failed to apply remote path change due to a reset" + ); + return; + } + this.logger.error(`Failed to apply remote path change: ${e}`); + } + } + public reset(): void { this._isFirstSyncComplete = false; this.queue.clear(); diff --git a/sync-server/src/app_state/database.rs b/sync-server/src/app_state/database.rs index b9bf8df1..a249dadc 100644 --- a/sync-server/src/app_state/database.rs +++ b/sync-server/src/app_state/database.rs @@ -586,6 +586,7 @@ impl Database { r#" select vault_update_id, + creation_vault_update_id, document_id as "document_id: Hyphenated", relative_path, updated_date as "updated_date: chrono::DateTime", @@ -626,6 +627,7 @@ impl Database { r#" select vault_update_id, + creation_vault_update_id, document_id as "document_id: Hyphenated", relative_path, updated_date as "updated_date: chrono::DateTime", @@ -661,6 +663,7 @@ impl Database { r#" select vault_update_id, + creation_vault_update_id, document_id as "document_id: Hyphenated", relative_path, updated_date as "updated_date: chrono::DateTime", @@ -697,6 +700,7 @@ impl Database { r#" insert into documents ( vault_update_id, + creation_vault_update_id, document_id, relative_path, updated_date, @@ -706,9 +710,10 @@ impl Database { device_id, has_been_merged ) - values (?, ?, ?, ?, ?, ?, ?, ?, ?) + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) "#, version.vault_update_id, + version.creation_vault_update_id, document_id, version.relative_path, version.updated_date, diff --git a/sync-server/src/app_state/database/migrations/20260421000000_add_creation_vault_update_id.sql b/sync-server/src/app_state/database/migrations/20260421000000_add_creation_vault_update_id.sql new file mode 100644 index 00000000..40dc85fb --- /dev/null +++ b/sync-server/src/app_state/database/migrations/20260421000000_add_creation_vault_update_id.sql @@ -0,0 +1,20 @@ +ALTER TABLE documents ADD COLUMN creation_vault_update_id INTEGER NOT NULL DEFAULT 0; + +UPDATE documents +SET creation_vault_update_id = ( + SELECT MIN(d2.vault_update_id) + FROM documents d2 + WHERE d2.document_id = documents.document_id +); + +DROP VIEW latest_document_versions; + +CREATE VIEW IF NOT EXISTS latest_document_versions AS --recreate view as it now includes one more field +SELECT d.* +FROM documents d +INNER JOIN ( + SELECT MAX(vault_update_id) AS max_version_id + FROM documents + GROUP BY document_id +) max_versions +ON d.vault_update_id = max_versions.max_version_id; diff --git a/sync-server/src/app_state/database/models.rs b/sync-server/src/app_state/database/models.rs index 9812703d..80b628e8 100644 --- a/sync-server/src/app_state/database/models.rs +++ b/sync-server/src/app_state/database/models.rs @@ -13,6 +13,7 @@ pub type DeviceId = String; #[derive(Debug, Clone)] pub struct StoredDocumentVersion { pub vault_update_id: VaultUpdateId, + pub creation_vault_update_id: VaultUpdateId, pub document_id: DocumentId, pub relative_path: String, pub updated_date: DateTime, diff --git a/sync-server/src/server/create_document.rs b/sync-server/src/server/create_document.rs index 51ed5b47..7f0f6b60 100644 --- a/sync-server/src/server/create_document.rs +++ b/sync-server/src/server/create_document.rs @@ -121,8 +121,10 @@ pub async fn create_document( let path_changed = deduped_path != sanitized_relative_path; + let new_vault_update_id = last_update_id + 1; let new_version = StoredDocumentVersion { - vault_update_id: last_update_id + 1, + vault_update_id: new_vault_update_id, + creation_vault_update_id: new_vault_update_id, document_id, relative_path: deduped_path, content: new_content, diff --git a/sync-server/src/server/delete_document.rs b/sync-server/src/server/delete_document.rs index 48872e34..a523c499 100644 --- a/sync-server/src/server/delete_document.rs +++ b/sync-server/src/server/delete_document.rs @@ -73,13 +73,16 @@ pub async fn delete_document( return Ok(Json(latest_version.clone().into())); } - let (latest_relative_path, latest_content) = latest_version.map_or_else( - || (String::new(), Vec::new()), - |version| (version.relative_path, version.content), - ); + let new_vault_update_id = last_update_id + 1; + let (latest_relative_path, latest_content, creation_vault_update_id) = + latest_version.map_or_else( + || (String::new(), Vec::new(), new_vault_update_id), + |version| (version.relative_path, version.content, version.creation_vault_update_id), + ); let new_version = StoredDocumentVersion { - vault_update_id: last_update_id + 1, + vault_update_id: new_vault_update_id, + creation_vault_update_id, document_id, relative_path: latest_relative_path, content: latest_content, // copy the content from the latest version diff --git a/sync-server/src/server/restore_document_version.rs b/sync-server/src/server/restore_document_version.rs index fadbdb5a..bb4e6775 100644 --- a/sync-server/src/server/restore_document_version.rs +++ b/sync-server/src/server/restore_document_version.rs @@ -133,6 +133,7 @@ pub async fn restore_document_version( let new_version = StoredDocumentVersion { vault_update_id: last_update_id + 1, + creation_vault_update_id: target_version.creation_vault_update_id, document_id, relative_path: restore_path, content: target_version.content, diff --git a/sync-server/src/server/update_document.rs b/sync-server/src/server/update_document.rs index 998c2dd4..1963310a 100644 --- a/sync-server/src/server/update_document.rs +++ b/sync-server/src/server/update_document.rs @@ -303,6 +303,7 @@ pub async fn update_document( let new_version = StoredDocumentVersion { document_id, vault_update_id: last_update_id + 1, + creation_vault_update_id: latest_version.creation_vault_update_id, relative_path: new_relative_path, content: merged_content, updated_date: chrono::Utc::now(),