From 7c991c3b4d51bd05c84ff71bbbd3fb65ebb0074b Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sun, 4 Jan 2026 14:08:33 +0000 Subject: [PATCH] Fix syncing logic --- frontend/sync-client/src/consts.ts | 1 + .../sync-client/src/persistence/database.ts | 42 +-- .../src/services/websocket-manager.ts | 45 +++- frontend/sync-client/src/sync-client.ts | 6 +- .../sync-operations/unrestricted-syncer.ts | 247 +++++++++--------- sync-server/config-e2e.yml | 32 +-- sync-server/src/app_state/database.rs | 3 +- sync-server/src/server/create_document.rs | 5 +- sync-server/src/server/update_document.rs | 18 +- .../src/utils/find_first_available_path.rs | 8 +- 10 files changed, 223 insertions(+), 184 deletions(-) diff --git a/frontend/sync-client/src/consts.ts b/frontend/sync-client/src/consts.ts index e0a2d60e..9e4fa7d2 100644 --- a/frontend/sync-client/src/consts.ts +++ b/frontend/sync-client/src/consts.ts @@ -4,3 +4,4 @@ export const MAX_LOG_MESSAGE_COUNT = 100000; export const MAX_HISTORY_ENTRY_COUNT = 5000; export const SUPPORTED_API_VERSION = 3; export const WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS = 10; +export const WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS = 10; diff --git a/frontend/sync-client/src/persistence/database.ts b/frontend/sync-client/src/persistence/database.ts index 5b4e943b..a5daf876 100644 --- a/frontend/sync-client/src/persistence/database.ts +++ b/frontend/sync-client/src/persistence/database.ts @@ -26,7 +26,6 @@ export interface StoredDocumentMetadata { export interface StoredDatabase { documents: StoredDocumentMetadata[]; lastSeenUpdateId: VaultUpdateId | undefined; - hasInitialSyncCompleted: boolean; } /** @@ -46,7 +45,6 @@ export interface DocumentRecord { export class Database { private documents: DocumentRecord[]; private lastSeenUpdateIds: CoveredValues; - private hasInitialSyncCompleted: boolean; public constructor( private readonly logger: Logger, @@ -56,15 +54,13 @@ export class Database { initialState ??= {}; this.documents = - initialState.documents?.map( - ({ relativePath, ...metadata }) => ({ - relativePath, - metadata, - isDeleted: false, - updates: [], - parallelVersion: 0 - }) - ) ?? []; + initialState.documents?.map(({ relativePath, ...metadata }) => ({ + relativePath, + metadata, + isDeleted: false, + updates: [], + parallelVersion: 0 + })) ?? []; this.ensureConsistency(); this.logger.debug(`Loaded ${this.documents.length} documents`); @@ -79,11 +75,6 @@ export class Database { this.lastSeenUpdateIds.add(doc.metadata?.parentVersionId); }); - this.hasInitialSyncCompleted = - initialState.hasInitialSyncCompleted ?? false; - this.logger.debug( - `Loaded hasInitialSyncCompleted: ${this.hasInitialSyncCompleted}` - ); } public get length(): number { @@ -199,15 +190,12 @@ export class Database { relativePath: RelativePath, promise: Promise ): DocumentRecord { - this.logger.debug( - `Creating new pending document: ${relativePath}` - ); + this.logger.debug(`Creating new pending document: ${relativePath}`); const previousEntry = this.getLatestDocumentByRelativePath(relativePath); const entry = { relativePath, - documentId: undefined, metadata: undefined, isDeleted: false, updates: [promise], @@ -250,7 +238,9 @@ export class Database { public getDocumentByDocumentId( find: DocumentId ): DocumentRecord | undefined { - return this.documents.find(({ metadata }) => metadata?.documentId === find); + return this.documents.find( + ({ metadata }) => metadata?.documentId === find + ); } public move( @@ -292,14 +282,6 @@ export class Database { candidate.isDeleted = true; } - public getHasInitialSyncCompleted(): boolean { - return this.hasInitialSyncCompleted; - } - - public setHasInitialSyncCompleted(value: boolean): void { - this.hasInitialSyncCompleted = value; - this.saveInTheBackground(); - } public getLastSeenUpdateId(): VaultUpdateId { return this.lastSeenUpdateIds.min; @@ -323,7 +305,6 @@ export class Database { this.lastSeenUpdateIds = new CoveredValues( 0 // the first updateId will be 1 which is the first integer after -1 ); - this.hasInitialSyncCompleted = false; this.saveInTheBackground(); } @@ -337,7 +318,6 @@ export class Database { }) ), lastSeenUpdateId: this.lastSeenUpdateIds.min, - hasInitialSyncCompleted: this.hasInitialSyncCompleted }); } diff --git a/frontend/sync-client/src/services/websocket-manager.ts b/frontend/sync-client/src/services/websocket-manager.ts index 71bd083e..266047ce 100644 --- a/frontend/sync-client/src/services/websocket-manager.ts +++ b/frontend/sync-client/src/services/websocket-manager.ts @@ -6,7 +6,10 @@ import type { CursorPositionFromClient } from "./types/CursorPositionFromClient" import type { ClientCursors } from "./types/ClientCursors"; import { createPromise } from "../utils/create-promise"; import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate"; -import { WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS } from "../consts"; +import { + WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS, + WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS +} from "../consts"; import { removeFromArray } from "../utils/remove-from-array"; import { EventListeners } from "../utils/data-structures/event-listeners"; import { awaitAll } from "../utils/await-all"; @@ -27,6 +30,7 @@ export class WebSocketManager { private isStopped = true; private resolveDisconnectingPromise: null | (() => unknown) = null; private reconnectTimeoutId: ReturnType | undefined; + private connectionTimeoutId: ReturnType | undefined; private readonly outstandingPromises: Promise[] = []; @@ -36,7 +40,7 @@ export class WebSocketManager { private readonly logger: Logger, private readonly settings: Settings, private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket = WebSocket - ) {} + ) { } public get isWebSocketConnected(): boolean { return ( @@ -61,6 +65,11 @@ export class WebSocketManager { this.reconnectTimeoutId = undefined; } + if (this.connectionTimeoutId !== undefined) { + clearTimeout(this.connectionTimeoutId); + this.connectionTimeoutId = undefined; + } + this.webSocket?.close(1000, "WebSocketManager has been stopped"); // eslint-disable-next-line @typescript-eslint/init-declarations @@ -171,7 +180,22 @@ export class WebSocketManager { this.webSocket = new this.webSocketFactoryImplementation(wsUri); + // Set connection timeout to handle cases where server is down and the WebSocket connection won't open + this.connectionTimeoutId = setTimeout(() => { + this.connectionTimeoutId = undefined; + this.logger.warn( + `WebSocket connection timeout after ${WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS} seconds` + ); + // Force close to trigger onclose handler which will schedule reconnection + this.webSocket?.close(); + }, WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS * 1000); + this.webSocket.onopen = (): void => { + if (this.connectionTimeoutId !== undefined) { + clearTimeout(this.connectionTimeoutId); + this.connectionTimeoutId = undefined; + } + // Check if we've been stopped while connecting if (this.isStopped) { this.webSocket?.close( @@ -215,7 +239,18 @@ export class WebSocketManager { } }; + this.webSocket.onerror = (error): void => { + this.logger.error( + `WebSocket error occurred: ${error instanceof ErrorEvent ? error.message : "Unknown error"}` + ); + }; + this.webSocket.onclose = (event): void => { + if (this.connectionTimeoutId !== undefined) { + clearTimeout(this.connectionTimeoutId); + this.connectionTimeoutId = undefined; + } + this.logger.warn( `WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})` ); @@ -225,10 +260,14 @@ export class WebSocketManager { this.resolveDisconnectingPromise?.(); this.resolveDisconnectingPromise = null; } else { + const delay = this.settings.getSettings().webSocketRetryIntervalMs; + this.logger.info( + `Reconnecting to WebSocket in ${delay}ms...` + ); this.reconnectTimeoutId = setTimeout(() => { this.reconnectTimeoutId = undefined; this.initializeWebSocket(); - }, this.settings.getSettings().webSocketRetryIntervalMs); + }, delay); } }; } diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index 9e368a7b..dfcfc3e4 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -56,7 +56,7 @@ export class SyncClient { database: Partial; }> > - ) {} + ) { } public get documentCount(): number { return this.database.length; @@ -339,7 +339,9 @@ export class SyncClient { this.hasFinishedOfflineSync = false; this.serverConfig.reset(); - await this.startSyncing(); + if (this.settings.getSettings().isSyncEnabled) { + await this.startSyncing(); + } } public getSettings(): SyncSettings { diff --git a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts index f277f637..4ea1eda0 100644 --- a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts +++ b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts @@ -84,36 +84,16 @@ export class UnrestrictedSyncer { const response = await this.syncService.create({ relativePath: originalRelativePath, contentBytes, - forceMerge: !this.database.getHasInitialSyncCompleted() // don't duplicate files on first sync + forceMerge: true }); - // In case a document with the same name (but different ID) had existed remotely that we haven't known about - if (response.relativePath != originalRelativePath) { - this.logger.debug( - `Document ${originalRelativePath} has been created remotely at a different path: ${response.relativePath}, moving it locally` - ); - await this.operations.move( - document.relativePath, - response.relativePath - ); // this can throw FileNotFoundError - } - - this.database.updateDocumentMetadata( - { - documentId: response.documentId, - parentVersionId: response.vaultUpdateId, - hash: contentHash, - remoteRelativePath: response.relativePath - }, - document - ); - - this.database.addSeenUpdateId(response.vaultUpdateId); - await this.updateCache( - response.vaultUpdateId, - contentBytes, - response.relativePath - ); + this.handleMaybeMergingResponse({ + document, + response, + contentHash, + originalRelativePath, + originalContentBytes: contentBytes + }); this.history.addHistoryEntry({ status: SyncStatus.SUCCESS, @@ -134,7 +114,7 @@ export class UnrestrictedSyncer { await this.executeSync(updateDetails, async () => { if (document.metadata === undefined) { this.logger.debug( - `Document ${document.relativePath} has no metadata, so it was never synced remotely` + `Document ${document.relativePath} has no metadata, so it has never got synced remotely; no need to delete it remotely` ); return; } @@ -254,69 +234,16 @@ export class UnrestrictedSyncer { }); } - // `document` is mutable and reflects the latest state in the local database - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (document.isDeleted) { - this.logger.info( - `Document ${document.relativePath} has been deleted before we could finish updating it` - ); - this.database.addSeenUpdateId(response.vaultUpdateId); - return; - } - if ( - // `Syncer` creates fake local document metadata for all remote docs with invalid hashes. The parent IDs will likely match - // the latest versions so we still need to update the local versions to turn the fakes into real metadata. - document.metadata.parentVersionId > response.vaultUpdateId - ) { - this.logger.debug( - `Document ${document.relativePath} is already more up to date than the fetched version` - ); - this.database.addSeenUpdateId(response.vaultUpdateId); // in case the previous `vaultUpdateId` update hasn't made it through - return; - } - - if (response.isDeleted) { - return this.applyRemoteDeleteLocally(document, response); - } - - let actualPath = document.relativePath; - - if (response.relativePath != originalRelativePath) { - actualPath = response.relativePath; - // Make sure to update the remote relative path to avoid uploading - // the file as a result of this filesystem event. - document.metadata.remoteRelativePath = response.relativePath; - await this.operations.move( - document.relativePath, - response.relativePath - ); // this can throw FileNotFoundError - } + this.handleMaybeMergingResponse({ + document, + response: response!, + contentHash, + originalRelativePath, + originalContentBytes: contentBytes + }); if (!("type" in response) || response.type === "MergingUpdate") { - const responseBytes = base64ToBytes(response.contentBase64); - contentHash = hash(responseBytes); - - this.database.updateDocumentMetadata( - { - ...document.metadata, - parentVersionId: response.vaultUpdateId, - hash: contentHash, - remoteRelativePath: response.relativePath - }, - document - ); - await this.operations.write( - actualPath, - contentBytes, - responseBytes - ); - await this.updateCache( - response.vaultUpdateId, - responseBytes, - actualPath - ); - if (!force) { this.history.addHistoryEntry({ status: SyncStatus.SUCCESS, @@ -324,32 +251,15 @@ export class UnrestrictedSyncer { message: `The file we updated had been updated remotely, so we downloaded the merged version` }); } - } else { - this.database.updateDocumentMetadata( - { - ...document.metadata, - parentVersionId: response.vaultUpdateId, - hash: contentHash, - remoteRelativePath: response.relativePath - }, - document - ); - await this.updateCache( - response.vaultUpdateId, - contentBytes, - actualPath - ); } - this.database.addSeenUpdateId(response.vaultUpdateId); - const actualUpdateDetails: SyncUpdateDetails | SyncMovedDetails = oldPath !== undefined || response.relativePath != originalRelativePath ? { type: SyncType.MOVE, relativePath: response.relativePath, - movedFrom: originalRelativePath + movedFrom: oldPath ?? originalRelativePath } : { type: SyncType.UPDATE, @@ -363,7 +273,7 @@ export class UnrestrictedSyncer { message: `Successfully uploaded locally updated file to the server`, author: response.userId }); - } else { + } else if (!response.isDeleted) { this.history.addHistoryEntry({ status: SyncStatus.SUCCESS, details: actualUpdateDetails, @@ -371,6 +281,17 @@ export class UnrestrictedSyncer { author: response.userId, timestamp: new Date(response.updatedDate) }); + } else { + this.history.addHistoryEntry({ + status: SyncStatus.SUCCESS, + details: { + type: SyncType.DELETE, + relativePath: document.relativePath + }, + message: "File has been deleted remotely, so we deleted it locally", + author: response.userId, + timestamp: new Date(response.updatedDate) + }); } }); } @@ -539,6 +460,105 @@ export class UnrestrictedSyncer { } } + private async handleMaybeMergingResponse( + { + document, + response, + contentHash, + originalRelativePath, + originalContentBytes + }: { + document: DocumentRecord; + response: DocumentVersion | DocumentUpdateResponse, + contentHash: string, + originalRelativePath: string, + originalContentBytes: Uint8Array + } + ): Promise { + + // `document` is mutable and reflects the latest state in the local database + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (document.isDeleted) { + this.logger.info( + `Document ${document.relativePath} has been deleted before we could finish updating it` + ); + this.database.addSeenUpdateId(response.vaultUpdateId); + return; + } + + if ( + (document.metadata?.parentVersionId ?? 0) > response.vaultUpdateId + ) { + this.logger.debug( + `Document ${document.relativePath} is already more up to date than the fetched version` + ); + this.database.addSeenUpdateId(response.vaultUpdateId); // in case the previous `vaultUpdateId` update hasn't made it through + return; + } + + if (response.isDeleted) { + return this.applyRemoteDeleteLocally(document, response); + } + + let actualPath = document.relativePath; + + // this can't happen on the creation path as we can only get a merging response if a document already exists remotely on the same path + if (response.relativePath != originalRelativePath) { + actualPath = response.relativePath; + // Make sure to update the remote relative path to avoid uploading + // the file as a result of this filesystem event. + if (document.metadata !== undefined) { + document.metadata.remoteRelativePath = response.relativePath; + } + await this.operations.move( + document.relativePath, + response.relativePath + ); // this can throw FileNotFoundError + } + + if (!("type" in response) || response.type === "MergingUpdate") { + const responseBytes = base64ToBytes(response.contentBase64); + contentHash = hash(responseBytes); + + this.database.updateDocumentMetadata( + { + documentId: response.documentId, + parentVersionId: response.vaultUpdateId, + hash: contentHash, + remoteRelativePath: response.relativePath + }, + document + ); + await this.operations.write( + actualPath, + originalContentBytes, + responseBytes + ); + await this.updateCache( + response.vaultUpdateId, + responseBytes, + actualPath + ); + } else { + this.database.updateDocumentMetadata( + { + documentId: response.documentId, + parentVersionId: response.vaultUpdateId, + hash: contentHash, + remoteRelativePath: response.relativePath + }, + document + ); + await this.updateCache( + response.vaultUpdateId, + originalContentBytes, + actualPath + ); + } + + this.database.addSeenUpdateId(response.vaultUpdateId); + } + private getHistoryEntryForSkippedOversizedFile( sizeInBytes: number, relativePath: RelativePath @@ -578,16 +598,7 @@ export class UnrestrictedSyncer { document: DocumentRecord, response: DocumentVersion | DocumentUpdateResponse ): Promise { - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - details: { - type: SyncType.DELETE, - relativePath: document.relativePath - }, - message: "File has been deleted remotely, so we deleted it locally", - author: response.userId, - timestamp: new Date(response.updatedDate) - }); + this.database.delete(document.relativePath); this.database.updateDocumentMetadata( diff --git a/sync-server/config-e2e.yml b/sync-server/config-e2e.yml index e9d47559..1f235b01 100644 --- a/sync-server/config-e2e.yml +++ b/sync-server/config-e2e.yml @@ -9,24 +9,24 @@ server: max_clients_per_vault: 256 response_timeout: 30m mergeable_file_extensions: - - md - - txt + - md + - txt users: user_configs: - - name: admin - token: test-token-change-me - vault_access: - type: allow_access_to_all - - name: other-admin - token: test-token-change-me2 - vault_access: - type: allow_access_to_all - - name: test - token: other-test-token - vault_access: - type: allow_list - allowed: - - default + - name: admin + token: test-token-change-me + vault_access: + type: allow_access_to_all + - name: other-admin + token: test-token-change-me2 + vault_access: + type: allow_access_to_all + - name: test + token: other-test-token + vault_access: + type: allow_list + allowed: + - default logging: log_directory: logs log_rotation: 7days diff --git a/sync-server/src/app_state/database.rs b/sync-server/src/app_state/database.rs index 75ce6df4..135d93bf 100644 --- a/sync-server/src/app_state/database.rs +++ b/sync-server/src/app_state/database.rs @@ -79,6 +79,7 @@ impl Database { }, ); } + info!("Database migrations applied"); let database = Self { config: config.clone(), @@ -301,7 +302,7 @@ impl Database { .context("Cannot fetch max update id in vault") } - pub async fn get_latest_document_by_path( + pub async fn get_latest_non_deleted_document_by_path( &self, vault: &VaultId, relative_path: &str, diff --git a/sync-server/src/server/create_document.rs b/sync-server/src/server/create_document.rs index 20f67193..e4b3c055 100644 --- a/sync-server/src/server/create_document.rs +++ b/sync-server/src/server/create_document.rs @@ -52,7 +52,7 @@ pub async fn create_document( if request.force_merge.unwrap_or_default() { let latest_version = state .database - .get_latest_document_by_path( + .get_latest_non_deleted_document_by_path( &vault_id, &sanitized_relative_path, Some(&mut transaction), @@ -65,7 +65,8 @@ pub async fn create_document( ); return merge_with_stored_version( - latest_version.clone(), + &sanitized_relative_path, + &Vec::new(), latest_version, vault_id, user, diff --git a/sync-server/src/server/update_document.rs b/sync-server/src/server/update_document.rs index 2be40cd3..7bee7b60 100644 --- a/sync-server/src/server/update_document.rs +++ b/sync-server/src/server/update_document.rs @@ -172,7 +172,8 @@ async fn update_document( } merge_with_stored_version( - parent_document, + &parent_document.relative_path, + &parent_document.content, latest_version, vault_id, user, @@ -187,7 +188,8 @@ async fn update_document( #[allow(clippy::too_many_arguments)] pub async fn merge_with_stored_version( - parent_document: StoredDocumentVersion, + parent_document_path: &str, + parent_document_content: &[u8], latest_version: StoredDocumentVersion, vault_id: VaultId, user: User, @@ -203,7 +205,7 @@ pub async fn merge_with_stored_version( { info!( "Document content is the same as the latest version for `{}`, skipping update", - parent_document.document_id + latest_version.document_id ); transaction .rollback() @@ -219,17 +221,17 @@ pub async fn merge_with_stored_version( let are_all_participants_mergable = is_file_type_mergable( sanitized_relative_path, &state.config.server.mergeable_file_extensions, - ) && !is_binary(&parent_document.content) + ) && !is_binary(parent_document_content) && !is_binary(&latest_version.content) && !is_binary(&content); let merged_content = if are_all_participants_mergable { info!( "Merging changes for document `{}` in vault `{vault_id}`", - parent_document.document_id + latest_version.document_id ); reconcile( - str::from_utf8(&parent_document.content) + str::from_utf8(parent_document_content) .expect("parent must be valid UTF-8 because it's not binary"), &str::from_utf8(&latest_version.content) .expect("latest_version must be valid UTF-8 because it's not binary") @@ -247,7 +249,7 @@ pub async fn merge_with_stored_version( }; // We can only update the relative path if we're the first one to do so - let new_relative_path = if parent_document.relative_path == latest_version.relative_path + let new_relative_path = if parent_document_path == &latest_version.relative_path && latest_version.relative_path != sanitized_relative_path { let new_path = find_first_available_path( @@ -279,7 +281,7 @@ pub async fn merge_with_stored_version( let is_different_from_request_content = merged_content != content; let new_version = StoredDocumentVersion { - document_id: parent_document.document_id, + document_id: latest_version.document_id, vault_update_id: last_update_id + 1, relative_path: new_relative_path, content: merged_content, diff --git a/sync-server/src/utils/find_first_available_path.rs b/sync-server/src/utils/find_first_available_path.rs index 7629d8f1..937eecae 100644 --- a/sync-server/src/utils/find_first_available_path.rs +++ b/sync-server/src/utils/find_first_available_path.rs @@ -9,17 +9,19 @@ pub async fn find_first_available_path( database: &crate::app_state::database::Database, transaction: &mut Transaction<'_>, ) -> Result { - info!("Finding first available path for `{sanitized_relative_path}` in vault `{vault_id}`"); for candidate in dedup_paths(sanitized_relative_path) { - debug!("Checking candidate path for deconflicting names: `{candidate}`"); if database - .get_latest_document_by_path(vault_id, &candidate, Some(transaction)) + .get_latest_non_deleted_document_by_path(vault_id, &candidate, Some(transaction)) .await? .is_none() { info!("Selected available path: `{candidate}`"); return Ok(candidate); } + + info!( + "Finding first available path for `{sanitized_relative_path}` in vault `{vault_id}` as `{candidate}` is already taken" + ); } unreachable!("dedup_paths produces infinite paths");