From d23c1a8dbc5d96c2afd195d2d476095a08f45644 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Mon, 10 Mar 2025 22:49:51 +0000 Subject: [PATCH] omg it mostly works for deletes --- .../file-operations/file-operations.test.ts | 10 +- .../src/file-operations/file-operations.ts | 11 +- .../sync-client/src/persistence/database.ts | 157 +++++++++++++----- .../sync-client/src/sync-operations/syncer.ts | 103 +++++++----- .../sync-operations/unrestricted-syncer.ts | 99 ++++++----- frontend/test-client/run.sh | 2 +- 6 files changed, 243 insertions(+), 139 deletions(-) diff --git a/frontend/sync-client/src/file-operations/file-operations.test.ts b/frontend/sync-client/src/file-operations/file-operations.test.ts index b1299098..26ae3267 100644 --- a/frontend/sync-client/src/file-operations/file-operations.test.ts +++ b/frontend/sync-client/src/file-operations/file-operations.test.ts @@ -1,7 +1,6 @@ import type { FileSystemOperations } from "sync-client"; import type { Database, - DocumentMetadata, DocumentRecord, RelativePath } from "../persistence/database"; @@ -11,14 +10,7 @@ import { assertSetContainsExactly } from "../utils/assert-set-contains-exactly"; describe("File operations", () => { class MockDatabase { - public move( - _oldRelativePath: RelativePath, - _newRelativePath: RelativePath - ): void { - // this is called but irrelevant for this mock - } - - public getDocumentByRelativePath( + public getLatestDocumentByRelativePath( _find: RelativePath ): DocumentRecord | undefined { return undefined; diff --git a/frontend/sync-client/src/file-operations/file-operations.ts b/frontend/sync-client/src/file-operations/file-operations.ts index 34ed19d9..ef9dda55 100644 --- a/frontend/sync-client/src/file-operations/file-operations.ts +++ b/frontend/sync-client/src/file-operations/file-operations.ts @@ -71,15 +71,12 @@ export class FileOperations { `Didn't expect ${path} to exist, deconflicting by moving it to '${deconflictedPath}'` ); - const document = this.database.getDocumentByRelativePath(path); + const document = + this.database.getLatestDocumentByRelativePath(path); this.logger.debug( `Existing metadata for ${path}: ${JSON.stringify(document?.metadata)}` ); - this.logger.debug( - `We need to save what's at ${path} to ${deconflictedPath}` - ); - if ( document?.metadata !== undefined && document.metadata.documentId === documentId @@ -94,7 +91,6 @@ export class FileOperations { `We need to save what's at ${path} to ${deconflictedPath}` ); await this.move(path, deconflictedPath, documentId); - // this.database.move(path, deconflictedPath); } else { await this.createParentDirectories(path); } @@ -178,7 +174,8 @@ export class FileOperations { `Conflict when moving '${oldPath}' to '${newPath}', the latter already exists, deconflicting by moving it to '${deconflictedPath}'` ); - const document = this.database.getDocumentByRelativePath(newPath); + const document = + this.database.getLatestDocumentByRelativePath(newPath); if ( document?.metadata !== undefined && diff --git a/frontend/sync-client/src/persistence/database.ts b/frontend/sync-client/src/persistence/database.ts index c529889c..0bbbd5b1 100644 --- a/frontend/sync-client/src/persistence/database.ts +++ b/frontend/sync-client/src/persistence/database.ts @@ -10,6 +10,7 @@ export interface DocumentMetadata { hash: string; isDeleted: boolean; } + export interface StoredDocumentMetadata { relativePath: RelativePath; parentVersionId: VaultUpdateId; @@ -25,6 +26,7 @@ export interface StoredDatabase { export interface DocumentRecord { identity: symbol; + parallelVersion: number; relativePath: RelativePath; metadata: DocumentMetadata | undefined; updates: Promise[]; @@ -46,7 +48,8 @@ export class Database { relativePath, identity: Symbol(), metadata, - updates: [] + updates: [], + parallelVersion: 0 })) ?? []; this.ensureConsistency(); @@ -63,7 +66,38 @@ export class Database { } public get resolvedDocuments(): DocumentRecord[] { - return this.documents.filter(({ metadata }) => metadata !== undefined); + const paths = new Map(); + this.documents + .filter( + ({ metadata }) => metadata !== undefined && !metadata.isDeleted + ) + .forEach((record) => + paths.set(record.relativePath, [ + record, + ...(paths.get(record.relativePath) ?? []) + ]) + ); + + return Array.from(paths.values()).map((records) => { + records.sort( + (a, b) => b.parallelVersion - a.parallelVersion // descending + ); + + if ( + records.length > 1 && + records.some((current, i) => + i === 0 + ? false + : records[i - 1].parallelVersion === + current.parallelVersion + ) + ) { + throw new Error( + `Multiple documents with the same parallel version and path at ${records[0].relativePath}` + ); + } + return records[0]; + }); } public getLastSeenUpdateId(): VaultUpdateId | undefined { @@ -81,25 +115,53 @@ export class Database { this.save(); } - public setDocument({ - documentId, - relativePath, - parentVersionId, - hash, - isDeleted - }: { - documentId: DocumentId; - relativePath: RelativePath; - parentVersionId: VaultUpdateId; - hash: string; - isDeleted: boolean; - }): void { - const entry = this.getDocumentByRelativePath(relativePath); + public setDocument( + { + documentId, + relativePath, + parentVersionId, + hash, + isDeleted + }: { + documentId: DocumentId; + relativePath: RelativePath; + parentVersionId: VaultUpdateId; + hash: string; + isDeleted: boolean; + }, + identity?: symbol + ): void { + let entry: DocumentRecord | undefined; + if (identity !== undefined) { + entry = this.getDocumentByIdentity(identity); - if (entry !== undefined) { - this.documents = this.documents.filter( - ({ identity }) => identity !== entry.identity - ); + if (entry !== undefined) { + this.documents = this.documents.filter( + ({ identity }) => identity !== entry!.identity + ); + } + } else { + entry = this.getLatestDocumentByRelativePath(relativePath); + if ( + entry?.metadata?.documentId !== undefined && + entry.metadata.documentId !== documentId + ) { + this.documents.push({ + // `entry` might be undefined if the document is new + identity: Symbol(), + relativePath, + metadata: { + documentId, + parentVersionId, + hash, + isDeleted + }, + updates: [], + parallelVersion: entry?.parallelVersion + 1 + }); + } + this.save(); + return; } this.documents.push({ @@ -112,7 +174,8 @@ export class Database { hash, isDeleted }, - updates: entry?.updates ?? [] + updates: entry?.updates ?? [], + parallelVersion: entry?.parallelVersion ?? 0 }); this.save(); @@ -124,24 +187,29 @@ export class Database { // No need to save as Promises don't get serialized } - public getDocumentByRelativePath( + public getLatestDocumentByRelativePath( find: RelativePath ): DocumentRecord | undefined { - return this.documents.find(({ relativePath }) => relativePath === find); + const candidates = this.documents.filter( + ({ relativePath }) => relativePath === find + ); + candidates.sort((a, b) => b.parallelVersion - a.parallelVersion); // descending + return candidates[0]; } public async getResolvedDocumentByRelativePath( relativePath: RelativePath, promise: Promise - ): Promise { - let entry = this.getDocumentByRelativePath(relativePath); + ): Promise { + let entry = this.getLatestDocumentByRelativePath(relativePath); if (entry === undefined) { entry = { relativePath, identity: Symbol(), metadata: undefined, - updates: [] + updates: [], + parallelVersion: 0 }; this.documents.push(entry); @@ -150,9 +218,6 @@ export class Database { const currentPromises = entry.updates; entry.updates = [...currentPromises, promise]; await Promise.all(currentPromises); - - // Refetch the document as it might have been updated - return this.getDocumentByIdentity(entry.identity); } public getDocumentByUpdatePromise(promise: Promise): DocumentRecord { @@ -189,38 +254,40 @@ export class Database { oldRelativePath: RelativePath, newRelativePath: RelativePath ): void { - const oldDocument = this.getDocumentByRelativePath(oldRelativePath); + const oldDocument = + this.getLatestDocumentByRelativePath(oldRelativePath); if (oldDocument === undefined) { + return; throw new Error( `Document to be moved not found: ${oldRelativePath}` ); } - const newDocument = this.getDocumentByRelativePath(newRelativePath); - if ( - newDocument !== undefined && - newDocument.metadata?.isDeleted === false - ) { - throw new Error( - `Cannot move document to existing path: ${newRelativePath}` - ); - } - this.documents = this.documents.filter( - ({ identity }) => - identity !== oldDocument.identity && - identity !== newDocument?.identity + ({ identity }) => identity !== oldDocument.identity ); + let newDocument = this.getLatestDocumentByRelativePath(newRelativePath); + + // It's either an invalid state of newDocument is pending deletion and we have to wait for it to complete this.documents.push({ - ...oldDocument, - relativePath: newRelativePath + identity: oldDocument.identity, + metadata: oldDocument.metadata, + relativePath: newRelativePath, + updates: oldDocument.updates, + // We're in a strange state where the target of the move has just got deleted, + // however, its metadata might already have a bunch of updates queued up for + // the document at the new location. We need to keep these updates. + parallelVersion: + newDocument !== undefined ? newDocument.parallelVersion + 1 : 0 }); this.save(); } private save(): void { + this.logger.debug(JSON.stringify(this.documents, null, 2)); + this.ensureConsistency(); void this.saveData({ documents: this.resolvedDocuments.map( diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 2b86fdbc..e61e7c45 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -1,9 +1,4 @@ -import type { - Database, - DocumentMetadata, - RelativePath -} from "../persistence/database"; - +import type { Database, RelativePath } from "../persistence/database"; import type { SyncService } from "src/services/sync-service"; import type { Logger } from "src/tracing/logger"; import type { SyncHistory } from "src/tracing/sync-history"; @@ -24,10 +19,8 @@ export class Syncer { private readonly syncQueue: PQueue; - private runningScheduleSyncForOfflineChanges: Promise | undefined = - undefined; - private runningApplyRemoteChangesLocally: Promise | undefined = - undefined; + private runningScheduleSyncForOfflineChanges: Promise | undefined; + private runningApplyRemoteChangesLocally: Promise | undefined; private readonly internalSyncer: UnrestrictedSyncer; @@ -92,10 +85,17 @@ export class Syncer { relativePath: RelativePath, updateTime?: Date ): Promise { + if (!this.settings.getSettings().isSyncEnabled) { + this.logger.info( + `Syncing is disabled, not syncing '${relativePath}'` + ); + return; + } + const [promise, resolve, reject] = createPromise(); // Most likely, we're waiting for the previous delete to finish on the file at this path - const document = await this.database.getResolvedDocumentByRelativePath( + await this.database.getResolvedDocumentByRelativePath( relativePath, promise ); @@ -103,8 +103,7 @@ export class Syncer { try { await this.syncQueue.add(async () => this.internalSyncer.unrestrictedSyncLocallyCreatedFile( - () => - this.database.getDocumentByIdentity(document.identity), + () => this.database.getDocumentByUpdatePromise(promise), updateTime ) ); @@ -120,18 +119,29 @@ export class Syncer { public async syncLocallyDeletedFile( relativePath: RelativePath ): Promise { + if (!this.settings.getSettings().isSyncEnabled) { + this.logger.info( + `Syncing is disabled, not syncing '${relativePath}'` + ); + return; + } + const [promise, resolve, reject] = createPromise(); - const document = await this.database.getResolvedDocumentByRelativePath( + await this.database.getResolvedDocumentByRelativePath( relativePath, promise ); try { await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncLocallyDeletedFile(() => - this.database.getDocumentByIdentity(document.identity) - ) + this.internalSyncer.unrestrictedSyncLocallyDeletedFile(() => { + this.logger.debug( + `aaaahg ${relativePath} has been deleted locally, syncing to delete it` + ); + + return this.database.getDocumentByUpdatePromise(promise); + }) ); resolve(); @@ -142,34 +152,46 @@ export class Syncer { } } - public async syncLocallyUpdatedFile(args: { + public async syncLocallyUpdatedFile({ + oldPath, + relativePath, + updateTime + }: { oldPath?: RelativePath; relativePath: RelativePath; updateTime?: Date; }): Promise { - if (args.oldPath !== undefined) { - if (args.oldPath === args.relativePath) { - throw new Error( - `Old path and new path are the same: ${args.oldPath}` - ); - } - - this.database.move(args.oldPath, args.relativePath); + if (!this.settings.getSettings().isSyncEnabled) { + this.logger.info( + `Syncing is disabled, not syncing '${relativePath}'` + ); + return; } const [promise, resolve, reject] = createPromise(); - const metadata = await this.database.getResolvedDocumentByRelativePath( - args.relativePath, + if (oldPath !== undefined) { + if (oldPath === relativePath) { + throw new Error( + `Old path and new path are the same: ${oldPath}` + ); + } + + this.database.move(oldPath, relativePath); + } + + await this.database.getResolvedDocumentByRelativePath( + relativePath, promise ); try { await this.syncQueue.add(async () => this.internalSyncer.unrestrictedSyncLocallyUpdatedFile({ - ...args, + oldPath, + updateTime, getLatestDocument: () => - this.database.getDocumentByIdentity(metadata.identity) + this.database.getDocumentByUpdatePromise(promise) }) ); @@ -189,7 +211,7 @@ export class Syncer { return; } - if (this.runningScheduleSyncForOfflineChanges != null) { + if (this.runningScheduleSyncForOfflineChanges !== undefined) { this.logger.debug("Uploading local changes is already in progress"); return this.runningScheduleSyncForOfflineChanges; } @@ -244,9 +266,7 @@ export class Syncer { public async reset(): Promise { this.syncQueue.clear(); await this.syncQueue.onEmpty(); - this.remainingOperationsListeners.forEach((listener) => { - listener(0); - }); + this.remainingOperationsListeners.forEach((listener) => listener(0)); this.internalSyncer.reset(); } @@ -257,6 +277,15 @@ export class Syncer { remoteVersion.documentId ); + if (document === undefined) { + const candidate = this.database.getLatestDocumentByRelativePath( + remoteVersion.relativePath + ); + if (candidate !== undefined && candidate.metadata === undefined) { + document = candidate; + } + } + if (document === undefined) { await this.syncQueue.add(async () => this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( @@ -269,7 +298,7 @@ export class Syncer { const [promise, resolve, reject] = createPromise(); - document = await this.database.getResolvedDocumentByRelativePath( + await this.database.getResolvedDocumentByRelativePath( document.relativePath, promise ); @@ -278,7 +307,7 @@ export class Syncer { await this.syncQueue.add(async () => this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( remoteVersion, - () => this.database.getDocumentByIdentity(document.identity) + () => this.database.getDocumentByUpdatePromise(promise) ) ); @@ -300,7 +329,7 @@ export class Syncer { const updates = Promise.all( allLocalFiles.map(async (relativePath) => { if ( - this.database.getDocumentByRelativePath(relativePath) + this.database.getLatestDocumentByRelativePath(relativePath) ?.metadata !== undefined ) { this.logger.debug( diff --git a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts index b1951f89..109cc673 100644 --- a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts +++ b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts @@ -1,6 +1,5 @@ import type { Database, - DocumentMetadata, DocumentRecord, RelativePath } from "../persistence/database"; @@ -61,7 +60,7 @@ export class UnrestrictedSyncer { createdDate: updateTime }); - const { relativePath: currentRelativePath } = + const { relativePath: currentRelativePath, identity } = getLatestDocument(); this.history.addHistoryEntry({ @@ -80,7 +79,7 @@ export class UnrestrictedSyncer { isDeleted: false }; - this.database.setDocument(newMetadata); + this.database.setDocument(newMetadata, identity); this.tryIncrementVaultUpdateId(response.vaultUpdateId); } @@ -101,7 +100,7 @@ export class UnrestrictedSyncer { document.metadata.isDeleted ) { this.logger.debug( - `Document ${document.relativePath} has been already deleted, no need to delete it again` + `Document '${document.relativePath}' has been already deleted, no need to delete it again` ); return; } @@ -124,13 +123,16 @@ export class UnrestrictedSyncer { // We have to have a record of the delete in case there's an in-flight update for the same // document which finishes after the delete has succeeded and would introduce a phantom metadata record. - this.database.setDocument({ - relativePath: document.relativePath, - documentId: response.documentId, - parentVersionId: response.vaultUpdateId, - hash: EMPTY_HASH, - isDeleted: true - }); + this.database.setDocument( + { + relativePath: document.relativePath, + documentId: response.documentId, + parentVersionId: response.vaultUpdateId, + hash: EMPTY_HASH, + isDeleted: true + }, + document.identity + ); } ); } @@ -222,13 +224,16 @@ export class UnrestrictedSyncer { type: SyncType.DELETE }); - this.database.setDocument({ - documentId: response.documentId, - relativePath: document.relativePath, - parentVersionId: response.vaultUpdateId, - hash: EMPTY_HASH, - isDeleted: true - }); + this.database.setDocument( + { + documentId: response.documentId, + relativePath: document.relativePath, + parentVersionId: response.vaultUpdateId, + hash: EMPTY_HASH, + isDeleted: true + }, + document.identity + ); this.tryIncrementVaultUpdateId(response.vaultUpdateId); @@ -262,16 +267,19 @@ export class UnrestrictedSyncer { }); } - this.database.setDocument({ - documentId: response.documentId, - relativePath: - response.relativePath != document.relativePath - ? response.relativePath - : document.relativePath, - parentVersionId: response.vaultUpdateId, - hash: contentHash, - isDeleted: response.isDeleted - }); + this.database.setDocument( + { + documentId: response.documentId, + relativePath: + response.relativePath != document.relativePath + ? response.relativePath + : document.relativePath, + parentVersionId: response.vaultUpdateId, + hash: contentHash, + isDeleted: response.isDeleted + }, + document.identity + ); this.tryIncrementVaultUpdateId(response.vaultUpdateId); } @@ -293,10 +301,7 @@ export class UnrestrictedSyncer { remoteVersion.documentId ); - if ( - localMetadata?.metadata !== undefined && - !localMetadata.metadata.isDeleted - ) { + if (localMetadata?.metadata !== undefined) { // If the file exists locally, let's pretend the user has updated it // and deal with remote update/deletion within `unrestrictedSyncLocallyUpdatedFile` if ( @@ -315,6 +320,11 @@ export class UnrestrictedSyncer { localMetadata.identity ) }); + } else if (remoteVersion.isDeleted) { + this.logger.debug( + `Document ${remoteVersion.relativePath} has been deleted remotely, no need to sync` + ); + return; } const content = ( @@ -330,13 +340,22 @@ export class UnrestrictedSyncer { remoteVersion.documentId ); - this.database.setDocument({ - documentId: remoteVersion.documentId, - relativePath: remoteVersion.relativePath, - parentVersionId: remoteVersion.vaultUpdateId, - hash: hash(contentBytes), - isDeleted: remoteVersion.isDeleted - }); + this.database.setDocument( + { + documentId: remoteVersion.documentId, + relativePath: remoteVersion.relativePath, + parentVersionId: remoteVersion.vaultUpdateId, + hash: hash(contentBytes), + isDeleted: remoteVersion.isDeleted + }, + getLatestDocument?.()?.identity ?? + this.database.getDocumentByDocumentId( + remoteVersion.documentId + )?.identity ?? + this.database.getLatestDocumentByRelativePath( + remoteVersion.relativePath + )?.identity + ); this.history.addHistoryEntry({ status: SyncStatus.SUCCESS, @@ -359,7 +378,7 @@ export class UnrestrictedSyncer { if (!this.settings.getSettings().isSyncEnabled) { this.logger.info( - `Syncing is disabled, not syncing ${relativePath}` + `Syncing is disabled, not syncing '${relativePath}'` ); return; } diff --git a/frontend/test-client/run.sh b/frontend/test-client/run.sh index 6effc147..5bf19a63 100755 --- a/frontend/test-client/run.sh +++ b/frontend/test-client/run.sh @@ -16,7 +16,7 @@ npm run build pids=() for i in $(seq 1 $process_count); do - node dist/cli.js 2>&1 | tee "log_${i}.log" & + node dist/cli.js 2>&1 > "log_${i}.log" & pids+=($!) done