diff --git a/frontend/sync-client/src/file-operations/file-operations.test.ts b/frontend/sync-client/src/file-operations/file-operations.test.ts index 9d2945d5..b1299098 100644 --- a/frontend/sync-client/src/file-operations/file-operations.test.ts +++ b/frontend/sync-client/src/file-operations/file-operations.test.ts @@ -2,6 +2,7 @@ import type { FileSystemOperations } from "sync-client"; import type { Database, DocumentMetadata, + DocumentRecord, RelativePath } from "../persistence/database"; import { FileOperations } from "./file-operations"; @@ -10,16 +11,16 @@ import { assertSetContainsExactly } from "../utils/assert-set-contains-exactly"; describe("File operations", () => { class MockDatabase { - public async move( + public move( _oldRelativePath: RelativePath, _newRelativePath: RelativePath - ): Promise { + ): void { // this is called but irrelevant for this mock } - public getResolvedDocument( - _relativePath: RelativePath | undefined - ): DocumentMetadata | undefined { + public getDocumentByRelativePath( + _find: RelativePath + ): DocumentRecord | undefined { return undefined; } } diff --git a/frontend/sync-client/src/file-operations/file-operations.ts b/frontend/sync-client/src/file-operations/file-operations.ts index 73818786..34ed19d9 100644 --- a/frontend/sync-client/src/file-operations/file-operations.ts +++ b/frontend/sync-client/src/file-operations/file-operations.ts @@ -71,27 +71,30 @@ export class FileOperations { `Didn't expect ${path} to exist, deconflicting by moving it to '${deconflictedPath}'` ); - const existingMetadata = this.database.getResolvedDocument(path); + const document = this.database.getDocumentByRelativePath(path); this.logger.debug( - `Existing metadata for ${path}: ${JSON.stringify(existingMetadata)}` + `Existing metadata for ${path}: ${JSON.stringify(document?.metadata)}` ); + + this.logger.debug( + `We need to save what's at ${path} to ${deconflictedPath}` + ); + if ( - existingMetadata === undefined || - existingMetadata.isDeleted || - existingMetadata.documentId !== documentId || - !documentId + document?.metadata !== undefined && + document.metadata.documentId === documentId ) { - this.logger.debug( - `We need to save what's at ${path} to ${deconflictedPath}` - ); - await this.move(path, deconflictedPath, documentId); - await this.database.move(path, deconflictedPath); - } else { // This can happen if the document got moved both locally and remotely // to the same file path. In this case, we shouldn't deconflict, however, // we also can't overwrite otherwise we'd lose changes. throw new FileNotFoundError(path); } + + this.logger.debug( + `We need to save what's at ${path} to ${deconflictedPath}` + ); + await this.move(path, deconflictedPath, documentId); + // this.database.move(path, deconflictedPath); } else { await this.createParentDirectories(path); } @@ -135,7 +138,7 @@ export class FileOperations { currentText = currentText.replace(/\r\n/g, "\n"); if (currentText !== expectedText) { this.logger.debug( - `Performing a 3-way merge for ${path} with the expected content` + `Performing a 3-way merge for ${path} with the expected content:\n${expectedText}` ); return mergeText(expectedText, currentText, newText); @@ -174,21 +177,21 @@ export class FileOperations { this.logger.debug( `Conflict when moving '${oldPath}' to '${newPath}', the latter already exists, deconflicting by moving it to '${deconflictedPath}'` ); - const existingMetadata = this.database.getResolvedDocument(newPath); + + const document = this.database.getDocumentByRelativePath(newPath); + if ( - existingMetadata === undefined || - existingMetadata.isDeleted || - existingMetadata.documentId !== documentId || - !documentId + document?.metadata !== undefined && + document.metadata.documentId === documentId ) { - await this.move(newPath, deconflictedPath, documentId); - await this.database.move(oldPath, newPath); - } else { // This can happen if the document got moved both locally and remotely // to the same file path. In this case, we shouldn't deconflict, however, // we also can't overwrite otherwise we'd lose changes. throw new FileNotFoundError(newPath); } + + await this.move(newPath, deconflictedPath, documentId); + // this.database.move(oldPath, newPath); } else { await this.createParentDirectories(newPath); } diff --git a/frontend/sync-client/src/persistence/database.ts b/frontend/sync-client/src/persistence/database.ts index 705f3aea..c529889c 100644 --- a/frontend/sync-client/src/persistence/database.ts +++ b/frontend/sync-client/src/persistence/database.ts @@ -1,3 +1,5 @@ +import type { Logger } from "../tracing/logger"; + export type VaultUpdateId = number; export type DocumentId = string; export type RelativePath = string; @@ -8,20 +10,28 @@ export interface DocumentMetadata { hash: string; isDeleted: boolean; } - -import type { Logger } from "src/tracing/logger"; +export interface StoredDocumentMetadata { + relativePath: RelativePath; + parentVersionId: VaultUpdateId; + documentId: DocumentId; + hash: string; + isDeleted: boolean; +} export interface StoredDatabase { - documents: Record; + documents: StoredDocumentMetadata[]; lastSeenUpdateId: VaultUpdateId | undefined; } -export class Database { - private documents = new Map< - RelativePath, - DocumentMetadata | Promise - >(); +export interface DocumentRecord { + identity: symbol; + relativePath: RelativePath; + metadata: DocumentMetadata | undefined; + updates: Promise[]; +} +export class Database { + private documents: DocumentRecord[]; private lastSeenUpdateId: VaultUpdateId | undefined; public constructor( @@ -30,16 +40,17 @@ export class Database { private readonly saveData: (data: StoredDatabase) => Promise ) { initialState ??= {}; - if (initialState.documents) { - for (const [relativePath, metadata] of Object.entries( - initialState.documents - )) { - this.documents.set(relativePath, metadata); - } - } - this.ensureConsistency(); - this.logger.debug(`Loaded ${this.documents.size} documents`); + this.documents = + initialState.documents?.map(({ relativePath, ...metadata }) => ({ + relativePath, + identity: Symbol(), + metadata, + updates: [] + })) ?? []; + + this.ensureConsistency(); + this.logger.debug(`Loaded ${this.documents.length} documents`); this.lastSeenUpdateId = initialState.lastSeenUpdateId; this.logger.debug( @@ -48,62 +59,29 @@ export class Database { } public get length(): number { - return this.documents.size; + return this.documents.length; } - public get resolvedDocuments(): [RelativePath, DocumentMetadata][] { - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - return Array.from(this.documents.entries()).filter( - ([_, metadata]) => !(metadata instanceof Promise) - ) as [RelativePath, DocumentMetadata][]; + public get resolvedDocuments(): DocumentRecord[] { + return this.documents.filter(({ metadata }) => metadata !== undefined); } public getLastSeenUpdateId(): VaultUpdateId | undefined { return this.lastSeenUpdateId; } - public async setLastSeenUpdateId( - value: VaultUpdateId | undefined - ): Promise { + public setLastSeenUpdateId(value: VaultUpdateId | undefined): void { this.lastSeenUpdateId = value; - await this.save(); + this.save(); } - public async resetSyncState(): Promise { - this.documents = new Map(); + public resetSyncState(): void { + this.documents = []; this.lastSeenUpdateId = 0; - await this.save(); + this.save(); } - public getDocumentByDocumentId( - documentId: DocumentId - ): [RelativePath, DocumentMetadata] | undefined { - return this.resolvedDocuments.find( - ([_, metadata]) => metadata.documentId === documentId - ); - } - - public getDocumentByIdentity( - document: - | DocumentMetadata - | Promise - | undefined - ): - | [ - RelativePath, - DocumentMetadata | Promise - ] - | undefined { - if (document === undefined) { - return undefined; - } - - return Array.from(this.documents.entries()).find( - ([_, metadata]) => metadata === document - ); - } - - public async setDocument({ + public setDocument({ documentId, relativePath, parentVersionId, @@ -115,84 +93,142 @@ export class Database { parentVersionId: VaultUpdateId; hash: string; isDeleted: boolean; - }): Promise { - this.documents.set(relativePath, { - documentId, - parentVersionId, - hash, - isDeleted - }); - await this.save(); - } + }): void { + const entry = this.getDocumentByRelativePath(relativePath); - public async setDocumentPromise({ - relativePath, - promise - }: { - relativePath: RelativePath; - promise: Promise; - }): Promise { - this.documents.set(relativePath, promise); - // No need to save as Promises don't get serialized - // and a crash would only result in the document being - // creatied again. - } - - public getResolvedDocument( - relativePath: RelativePath | undefined - ): DocumentMetadata | undefined { - if (relativePath == undefined) { - return undefined; - } - - const metadata = this.documents.get(relativePath); - if (metadata instanceof Promise) { - return undefined; - } - - return metadata; - } - - public getDocument( - relativePath: RelativePath | undefined - ): Promise | DocumentMetadata | undefined { - if (relativePath == undefined) { - return undefined; - } - - return this.documents.get(relativePath); - } - - public async move( - oldRelativePath: RelativePath, - newRelativePath: RelativePath - ): Promise { - const document = this.documents.get(oldRelativePath); - if (!document) { - return; - } - - const resolvedDocument = this.getResolvedDocument(oldRelativePath); - if ( - this.documents.has(newRelativePath) && - resolvedDocument != undefined && - resolvedDocument.isDeleted - ) { - throw new Error( - `Cannot update physical path to path that is already in use: ${newRelativePath}` + if (entry !== undefined) { + this.documents = this.documents.filter( + ({ identity }) => identity !== entry.identity ); } - this.documents.delete(oldRelativePath); - this.documents.set(newRelativePath, document); + this.documents.push({ + // `entry` might be undefined if the document is new + identity: entry?.identity ?? Symbol(), + relativePath, + metadata: { + documentId, + parentVersionId, + hash, + isDeleted + }, + updates: entry?.updates ?? [] + }); - await this.save(); + this.save(); } - private async save(): Promise { + public removeDocumentPromise(promise: Promise): void { + const entry = this.getDocumentByUpdatePromise(promise); + entry.updates = entry.updates.filter((update) => update !== promise); + // No need to save as Promises don't get serialized + } + + public getDocumentByRelativePath( + find: RelativePath + ): DocumentRecord | undefined { + return this.documents.find(({ relativePath }) => relativePath === find); + } + + public async getResolvedDocumentByRelativePath( + relativePath: RelativePath, + promise: Promise + ): Promise { + let entry = this.getDocumentByRelativePath(relativePath); + + if (entry === undefined) { + entry = { + relativePath, + identity: Symbol(), + metadata: undefined, + updates: [] + }; + + this.documents.push(entry); + } + + const currentPromises = entry.updates; + entry.updates = [...currentPromises, promise]; + await Promise.all(currentPromises); + + // Refetch the document as it might have been updated + return this.getDocumentByIdentity(entry.identity); + } + + public getDocumentByUpdatePromise(promise: Promise): DocumentRecord { + const result = this.documents.find(({ updates }) => + updates.includes(promise) + ); + + if (result === undefined) { + throw new Error("Document not found by update promise"); + } + + return result; + } + + public getDocumentByDocumentId( + documentId: DocumentId + ): DocumentRecord | undefined { + return this.documents.find( + ({ metadata }) => metadata?.documentId === documentId + ); + } + + public getDocumentByIdentity(find: symbol): DocumentRecord { + const result = this.documents.find(({ identity }) => identity === find); + + if (result === undefined) { + throw new Error("Document not found by identity symbol"); + } + + return result; + } + + public move( + oldRelativePath: RelativePath, + newRelativePath: RelativePath + ): void { + const oldDocument = this.getDocumentByRelativePath(oldRelativePath); + if (oldDocument === undefined) { + throw new Error( + `Document to be moved not found: ${oldRelativePath}` + ); + } + + const newDocument = this.getDocumentByRelativePath(newRelativePath); + if ( + newDocument !== undefined && + newDocument.metadata?.isDeleted === false + ) { + throw new Error( + `Cannot move document to existing path: ${newRelativePath}` + ); + } + + this.documents = this.documents.filter( + ({ identity }) => + identity !== oldDocument.identity && + identity !== newDocument?.identity + ); + + this.documents.push({ + ...oldDocument, + relativePath: newRelativePath + }); + + this.save(); + } + + private save(): void { this.ensureConsistency(); - await this.saveData({ - documents: Object.fromEntries(this.resolvedDocuments), + void this.saveData({ + documents: this.resolvedDocuments.map( + ({ relativePath, metadata }) => ({ + relativePath, + ...metadata + }) + ) as StoredDocumentMetadata[], lastSeenUpdateId: this.lastSeenUpdateId }); } @@ -200,12 +236,16 @@ export class Database { private ensureConsistency(): void { const idToPath = new Map(); - this.resolvedDocuments.forEach(([name, metadata]) => { - idToPath.set(metadata.documentId, [ - ...(idToPath.get(metadata.documentId) ?? []), - name - ]); - }); + this.resolvedDocuments + .filter(({ metadata }) => metadata !== undefined) + .forEach(({ metadata, relativePath }) => { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + idToPath.set(metadata!.documentId, [ + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + ...(idToPath.get(metadata!.documentId) ?? []), + relativePath + ]); + }); const duplicates = Array.from(idToPath.entries()) .filter(([_, paths]) => paths.length > 1) diff --git a/frontend/sync-client/src/services/sync-service.ts b/frontend/sync-client/src/services/sync-service.ts index 6f5b52e7..ff7ec8fd 100644 --- a/frontend/sync-client/src/services/sync-service.ts +++ b/frontend/sync-client/src/services/sync-service.ts @@ -109,6 +109,9 @@ export class SyncService { contentBytes: Uint8Array; createdDate: Date; }): Promise { + this.logger.debug( + `Updating document ${documentId} with parent version ${parentVersionId} & ${new TextDecoder().decode(contentBytes)} & ${relativePath}` + ); const formData = new FormData(); formData.append("parent_version_id", parentVersionId.toString()); formData.append("created_date", createdDate.toISOString()); diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index 54ba6b45..b4d6118a 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -148,7 +148,7 @@ export class SyncClient { this.stop(); await this._syncer.reset(); this._history.reset(); - await this._database.resetSyncState(); + this._database.resetSyncState(); this.logger.reset(); } diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 193848c4..2b86fdbc 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -12,9 +12,10 @@ import { hash } from "src/utils/hash"; import type { components } from "src/services/types"; import type { Settings } from "src/persistence/settings"; import type { FileOperations } from "src/file-operations/file-operations"; -import { findMatchingFileBasedOnHash } from "src/utils/find-matching-file-based-on-hash"; +import { findMatchingFile } from "src/utils/find-matching-file"; import { UnrestrictedSyncer } from "./unrestricted-syncer"; import { FileNotFoundError } from "src/file-operations/safe-filesystem-operations"; +import { createPromise } from "src/utils/create-promise"; export class Syncer { private readonly remainingOperationsListeners: (( @@ -74,9 +75,10 @@ export class Syncer { logger.debug( `File has been deleted or moved before we had a chance to inspect it, skipping` ); - } else { - throw e; + return undefined; } + + throw e; } } @@ -88,77 +90,95 @@ export class Syncer { public async syncLocallyCreatedFile( relativePath: RelativePath, - updateTime: Date + updateTime?: Date ): Promise { - let resolve: - | undefined - | ((metadata: DocumentMetadata | undefined) => void) = undefined; + const [promise, resolve, reject] = createPromise(); - const creationPromise = new Promise( - (r) => (resolve = r) + // Most likely, we're waiting for the previous delete to finish on the file at this path + const document = await this.database.getResolvedDocumentByRelativePath( + relativePath, + promise ); - await this.database.setDocumentPromise({ - relativePath, - promise: creationPromise - }); - - await this.syncQueue.add(async () => { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - resolve!( - await this.internalSyncer.unrestrictedSyncLocallyCreatedFile( - relativePath, + try { + await this.syncQueue.add(async () => + this.internalSyncer.unrestrictedSyncLocallyCreatedFile( + () => + this.database.getDocumentByIdentity(document.identity), updateTime ) ); - }); + + resolve(); + } catch (e) { + reject(e); + } finally { + this.database.removeDocumentPromise(promise); + } } public async syncLocallyDeletedFile( relativePath: RelativePath ): Promise { - let metadata = this.database.getDocument(relativePath); - if (metadata !== undefined && !(metadata instanceof Promise)) { - metadata = Promise.resolve(metadata); - } + const [promise, resolve, reject] = createPromise(); - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncLocallyDeletedFile( - relativePath, - metadata - ) + const document = await this.database.getResolvedDocumentByRelativePath( + relativePath, + promise ); + + try { + await this.syncQueue.add(async () => + this.internalSyncer.unrestrictedSyncLocallyDeletedFile(() => + this.database.getDocumentByIdentity(document.identity) + ) + ); + + resolve(); + } catch (e) { + reject(e); + } finally { + this.database.removeDocumentPromise(promise); + } } public async syncLocallyUpdatedFile(args: { oldPath?: RelativePath; relativePath: RelativePath; - updateTime: Date; + updateTime?: Date; }): Promise { - if (args.oldPath === args.relativePath) { - throw new Error( - `Old path and new path are the same: ${args.oldPath}` - ); - } - if (args.oldPath !== undefined) { - await this.database.move(args.oldPath, args.relativePath); + if (args.oldPath === args.relativePath) { + throw new Error( + `Old path and new path are the same: ${args.oldPath}` + ); + } + + this.database.move(args.oldPath, args.relativePath); } - let metadata = this.database.getDocument(args.relativePath); - if (metadata !== undefined && !(metadata instanceof Promise)) { - metadata = Promise.resolve(metadata); - } - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncLocallyUpdatedFile({ - ...args, - metadata - }) + const [promise, resolve, reject] = createPromise(); + + const metadata = await this.database.getResolvedDocumentByRelativePath( + args.relativePath, + promise ); - } - public async waitForSyncQueue(): Promise { - return this.syncQueue.onEmpty(); + try { + await this.syncQueue.add(async () => + this.internalSyncer.unrestrictedSyncLocallyUpdatedFile({ + ...args, + getLatestDocument: () => + this.database.getDocumentByIdentity(metadata.identity) + }) + ); + + resolve(); + } catch (e) { + reject(e); + } finally { + this.database.removeDocumentPromise(promise); + } } public async scheduleSyncForOfflineChanges(): Promise { @@ -217,6 +237,10 @@ export class Syncer { } } + public async waitForSyncQueue(): Promise { + return this.syncQueue.onEmpty(); + } + public async reset(): Promise { this.syncQueue.clear(); await this.syncQueue.onEmpty(); @@ -229,53 +253,67 @@ export class Syncer { private async syncRemotelyUpdatedFile( remoteVersion: components["schemas"]["DocumentVersionWithoutContent"] ): Promise { - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( - remoteVersion - ) + let document = this.database.getDocumentByDocumentId( + remoteVersion.documentId ); + + if (document === undefined) { + await this.syncQueue.add(async () => + this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( + remoteVersion + ) + ); + + return; + } + + const [promise, resolve, reject] = createPromise(); + + document = await this.database.getResolvedDocumentByRelativePath( + document.relativePath, + promise + ); + + try { + await this.syncQueue.add(async () => + this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( + remoteVersion, + () => this.database.getDocumentByIdentity(document.identity) + ) + ); + + resolve(); + } catch (e) { + reject(e); + } finally { + this.database.removeDocumentPromise(promise); + } } private async internalScheduleSyncForOfflineChanges(): Promise { const allLocalFiles = await this.operations.listAllFiles(); - // This includes renamed files for now let locallyPossiblyDeletedFiles = [ ...this.database.resolvedDocuments - ].filter(([path, _]) => !allLocalFiles.includes(path)); + ].filter(({ relativePath }) => !allLocalFiles.includes(relativePath)); const updates = Promise.all( - allLocalFiles.map(async (relativePath) => - this.syncQueue.add(async () => { - const metadata = - this.database.getResolvedDocument(relativePath); + allLocalFiles.map(async (relativePath) => { + if ( + this.database.getDocumentByRelativePath(relativePath) + ?.metadata !== undefined + ) { + this.logger.debug( + `Document ${relativePath} might have been updated locally, scheduling sync to validate and update it` + ); - if (metadata) { - this.logger.debug( - `Document ${relativePath} might have been updated locally, scheduling sync to validate and update it` - ); - const updateTime = - await Syncer.forgivingFileNotFoundWrapper( - async () => - this.operations.getModificationTime( - relativePath - ), - this.logger - ); - if (updateTime === undefined) { - return; - } + return this.syncLocallyUpdatedFile({ + relativePath + }); + } - return this.internalSyncer.unrestrictedSyncLocallyUpdatedFile( - { - relativePath, - updateTime, - metadata: Promise.resolve(metadata) - } - ); - } - - // Perhaps the file has been moved. Let's check by looking at the deleted files + // Perhaps the file has been moved; let's check by looking at the deleted files + const contentHash = await this.syncQueue.add(async () => { const contentBytes = await Syncer.forgivingFileNotFoundWrapper( async () => this.operations.read(relativePath), @@ -284,90 +322,51 @@ export class Syncer { if (contentBytes === undefined) { return; } + return hash(contentBytes); + }); - const contentHash = hash(contentBytes); + if (contentHash == undefined) { + // The file was deleted before we had a chance to read it, no need to sync it here + return; + } - // todo: make this smarter so that offline files can be renamed & edited at the same time - const originalFile = findMatchingFileBasedOnHash( - contentHash, - locallyPossiblyDeletedFiles - ); - if (originalFile !== undefined) { - // `originalFile` hasn't been deleted but it got moved instead - locallyPossiblyDeletedFiles = - locallyPossiblyDeletedFiles.filter( - (item) => item[0] !== originalFile[0] - ); - - this.logger.debug( - `Document '${originalFile[0]}' was not found under its current path in the database but was found under a different path (${relativePath}), scheduling sync to move it` + const originalFile = findMatchingFile( + contentHash, + locallyPossiblyDeletedFiles + ); + if (originalFile !== undefined) { + // `originalFile` hasn't been deleted but it got moved instead + locallyPossiblyDeletedFiles = + locallyPossiblyDeletedFiles.filter( + (item) => + item.relativePath !== originalFile.relativePath ); - const updateTime = - await Syncer.forgivingFileNotFoundWrapper( - async () => - this.operations.getModificationTime( - relativePath - ), - this.logger - ); - if (updateTime === undefined) { - return; - } - - return this.internalSyncer.unrestrictedSyncLocallyUpdatedFile( - { - oldPath: originalFile[0], - relativePath, - updateTime, - metadata: Promise.resolve( - this.database.getResolvedDocument( - relativePath - ) - ), - optimisations: { - contentBytes, - contentHash - } - } - ); - } - this.logger.debug( - `Document ${relativePath} not found in database, scheduling sync to create it` + `Document '${originalFile.relativePath}' was not found under its current path in the database but was found under a different path (${relativePath}), scheduling sync to move it` ); - const updateTime = - await Syncer.forgivingFileNotFoundWrapper( - async () => - this.operations.getModificationTime( - relativePath - ), - this.logger - ); - if (updateTime === undefined) { - return; - } - return this.internalSyncer.unrestrictedSyncLocallyCreatedFile( - relativePath, - updateTime - ); - }) - ) + + // We're outside of the pqueue, so we need to call the public wrapper + return this.syncLocallyUpdatedFile({ + oldPath: originalFile.relativePath, + relativePath + }); + } + + this.logger.debug( + `Document ${relativePath} not found in database, scheduling sync to create it` + ); + // We're outside of the pqueue, so we need to call the public wrapper + return this.syncLocallyCreatedFile(relativePath); + }) ); const deletes = Promise.all( - locallyPossiblyDeletedFiles.map(async ([relativePath, _]) => { + locallyPossiblyDeletedFiles.map(async ({ relativePath }) => { this.logger.debug( `Document ${relativePath} has been deleted locally, scheduling sync to delete it` ); - if (await this.operations.exists(relativePath)) { - this.logger.debug( - `Document ${relativePath} actually exists locally, skipping` - ); - return Promise.resolve(); - } - // We're outside of the pqueue, so we need to call the public wrapper return this.syncLocallyDeletedFile(relativePath); }) @@ -389,15 +388,7 @@ export class Syncer { this.logger.info("Applying remote changes locally"); await Promise.all( - remote.latestDocuments - .filter( - (remoteDocument) => - remoteDocument.vaultUpdateId > - (this.database.getDocumentByDocumentId( - remoteDocument.documentId - )?.[1].parentVersionId ?? -1) - ) - .map(this.syncRemotelyUpdatedFile.bind(this)) + remote.latestDocuments.map(this.syncRemotelyUpdatedFile.bind(this)) ); const lastSeenUpdateId = this.database.getLastSeenUpdateId(); @@ -405,7 +396,7 @@ export class Syncer { lastSeenUpdateId === undefined || remote.lastUpdateId > lastSeenUpdateId ) { - await this.database.setLastSeenUpdateId(remote.lastUpdateId); + this.database.setLastSeenUpdateId(remote.lastUpdateId); } } diff --git a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts index c7b6f044..b1951f89 100644 --- a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts +++ b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts @@ -1,11 +1,12 @@ import type { Database, DocumentMetadata, + DocumentRecord, RelativePath } from "../persistence/database"; import type { SyncService } from "src/services/sync-service"; -import type { Logger } from "src/tracing/logger"; +import { Logger } from "src/tracing/logger"; import type { SyncHistory } from "src/tracing/sync-history"; import { SyncSource, SyncStatus, SyncType } from "src/tracing/sync-history"; import { EMPTY_HASH, hash } from "src/utils/hash"; @@ -31,37 +32,28 @@ export class UnrestrictedSyncer { } public async unrestrictedSyncLocallyCreatedFile( - relativePath: RelativePath, - updateTime: Date, - optimisations?: { - contentBytes?: Uint8Array; - contentHash?: string; - } - ): Promise { + getLatestDocument: () => DocumentRecord, + updateTime?: Date + ): Promise { + const { relativePath, metadata } = getLatestDocument(); + return this.executeSync( [relativePath], SyncType.CREATE, SyncSource.PUSH, async () => { - const localMetadata = this.database.getDocument(relativePath); - - if ( - !(localMetadata instanceof Promise) && - localMetadata && - !localMetadata.isDeleted - ) { + if (metadata !== undefined && !metadata.isDeleted) { this.logger.debug( - `Document metadata already exists for ${relativePath}, it must have been downloaded from the server` + `Document ${relativePath} already exists in the database, no need to create it again` ); - return; } - const contentBytes = - optimisations?.contentBytes ?? - (await this.operations.read(relativePath)); // this can throw FileNotFoundError - const contentHash = - optimisations?.contentHash ?? hash(contentBytes); + const contentBytes = await this.operations.read(relativePath); // this can throw FileNotFoundError + const contentHash = hash(contentBytes); + + updateTime ??= + await this.operations.getModificationTime(relativePath); // this can throw FileNotFoundError const response = await this.syncService.create({ relativePath, @@ -69,95 +61,71 @@ export class UnrestrictedSyncer { createdDate: updateTime }); - const currentMetadata = - this.database.getDocumentByIdentity(localMetadata); - if (!currentMetadata) { - throw new Error( - `Document metadata for ${relativePath} not found after creation` - ); - } + const { relativePath: currentRelativePath } = + getLatestDocument(); this.history.addHistoryEntry({ status: SyncStatus.SUCCESS, source: SyncSource.PUSH, - relativePath: currentMetadata[0], + relativePath, message: `Successfully uploaded locally created file`, type: SyncType.CREATE }); const newMetadata = { + relativePath: currentRelativePath, documentId: response.documentId, parentVersionId: response.vaultUpdateId, hash: contentHash, isDeleted: false }; - await this.database.setDocument({ - relativePath: currentMetadata[0], - ...newMetadata - }); + this.database.setDocument(newMetadata); - await this.tryIncrementVaultUpdateId(response.vaultUpdateId); - - return newMetadata; + this.tryIncrementVaultUpdateId(response.vaultUpdateId); } ); } public async unrestrictedSyncLocallyDeletedFile( - relativePath: RelativePath, - metadata: Promise | undefined + getLatestDocument: () => DocumentRecord ): Promise { + let document = getLatestDocument(); await this.executeSync( - [relativePath], + [document.relativePath], SyncType.DELETE, SyncSource.PUSH, async () => { - const localMetadata = - metadata !== undefined - ? await metadata - : this.database.getResolvedDocument(relativePath); - - if (!localMetadata || localMetadata.isDeleted) { - this.logger.info( - `Locally deleted file hasn't been uploaded yet, so there's no need to delete it on the remote server` + if ( + document.metadata === undefined || + document.metadata.isDeleted + ) { + this.logger.debug( + `Document ${document.relativePath} has been already deleted, no need to delete it again` ); - return; } const response = await this.syncService.delete({ - documentId: localMetadata.documentId, - relativePath, - createdDate: new Date() // We got the event now, so it must have been deleted just now + documentId: document.metadata.documentId, + relativePath: document.relativePath, + createdDate: new Date() // We've got the event now, so it must have been deleted just now }); this.history.addHistoryEntry({ status: SyncStatus.SUCCESS, source: SyncSource.PUSH, - relativePath, + relativePath: document.relativePath, message: `Successfully deleted locally deleted file on the remote server`, type: SyncType.DELETE }); - const currentMetadata = this.database.getDocumentByDocumentId( - localMetadata.documentId - ); - - if (!currentMetadata || currentMetadata[1].isDeleted) { - this.logger.info( - `No metadata found for deleted file, '${relativePath}' must have been deleted by another operation` - ); - - return; - } - - await this.operations.delete(currentMetadata[0]); + document = getLatestDocument(); // We have to have a record of the delete in case there's an in-flight update for the same // document which finishes after the delete has succeeded and would introduce a phantom metadata record. - await this.database.setDocument({ - relativePath: currentMetadata[0], + this.database.setDocument({ + relativePath: document.relativePath, documentId: response.documentId, parentVersionId: response.vaultUpdateId, hash: EMPTY_HASH, @@ -169,84 +137,64 @@ export class UnrestrictedSyncer { public async unrestrictedSyncLocallyUpdatedFile({ oldPath, - relativePath, - metadata, - updateTime, - optimisations + getLatestDocument, + updateTime }: { oldPath?: RelativePath; - relativePath: RelativePath; - metadata: Promise | undefined; - updateTime: Date; - optimisations?: { - contentBytes?: Uint8Array; - contentHash?: string; - }; + getLatestDocument: () => DocumentRecord; + updateTime?: Date; }): Promise { + let document = getLatestDocument(); + await this.executeSync( - [oldPath, relativePath].filter((path) => path !== undefined), + [oldPath, document.relativePath].filter( + (path) => path !== undefined + ), SyncType.UPDATE, SyncSource.PUSH, async () => { - const localMetadata = - metadata !== undefined - ? await metadata - : this.database.getResolvedDocument(relativePath); - - if (!localMetadata || localMetadata.isDeleted) { - // It's fine, a subsequent sync operation must have dealt with this - return; - } - - const contentBytes = - optimisations?.contentBytes ?? - (await this.operations.read(relativePath)); // this can throw FileNotFoundError - - let contentHash = - optimisations?.contentHash ?? hash(contentBytes); - if ( - localMetadata.hash === contentHash && - oldPath === undefined + document.metadata === undefined || + document.metadata.isDeleted ) { this.logger.debug( - `File hash of ${relativePath} matches with last synced version and the path hasn't changed; no need to sync` + `Document ${document.relativePath} has been already deleted, no need to update it, ${JSON.stringify(document)}, ${document.metadata?.isDeleted}` ); return; } - // Re-fetch based on the documentId instead of the relativePath because - // the relativePath might have changed since this operation was scheduled - let latestMetadata = this.database.getDocumentByDocumentId( - localMetadata.documentId - ); - if (!latestMetadata || latestMetadata[1].isDeleted) { - // It's fine, a subsequent sync operation must have dealt with this + const contentBytes = await this.operations.read( + document.relativePath + ); // this can throw FileNotFoundError + let contentHash = hash(contentBytes); + + if ( + document.metadata.hash === contentHash && + oldPath === undefined + ) { + this.logger.debug( + `File hash of ${document.relativePath} matches with last synced version and the path hasn't changed; no need to sync` + ); return; } + updateTime ??= await this.operations.getModificationTime( + document.relativePath + ); // this can throw FileNotFoundError; + const response = await this.syncService.put({ - documentId: latestMetadata[1].documentId, - parentVersionId: latestMetadata[1].parentVersionId, - relativePath: latestMetadata[0], + documentId: document.metadata.documentId, + parentVersionId: document.metadata.parentVersionId, + relativePath: document.relativePath, contentBytes, createdDate: updateTime }); - latestMetadata = this.database.getDocumentByDocumentId( - response.documentId - ); - - if (!latestMetadata || latestMetadata[1].isDeleted) { - // The document has been deleted since this operation was scheduled - return; - } - if ( - latestMetadata[1].parentVersionId >= response.vaultUpdateId + document.metadata.parentVersionId >= response.vaultUpdateId ) { this.logger.debug( - `Document ${relativePath} is already more up to date than the fetched version` + `Document ${document.relativePath} is already more up to date than the fetched version` ); return; } @@ -254,50 +202,42 @@ export class UnrestrictedSyncer { this.history.addHistoryEntry({ status: SyncStatus.SUCCESS, source: SyncSource.PUSH, - relativePath, + relativePath: document.relativePath, message: `Successfully uploaded locally updated file to the remote server`, type: SyncType.UPDATE }); + // Update relativePath which is the only property that can change while this is running (due to a move) + document = getLatestDocument(); + if (response.isDeleted) { - await this.operations.delete(relativePath); + await this.operations.delete(document.relativePath); this.history.addHistoryEntry({ status: SyncStatus.SUCCESS, source: SyncSource.PULL, - relativePath, + relativePath: document.relativePath, message: "The file we tried to update had been deleted remotely, therefore, we have deleted it locally", type: SyncType.DELETE }); - await this.database.setDocument({ + this.database.setDocument({ documentId: response.documentId, - relativePath: latestMetadata[0], + relativePath: document.relativePath, parentVersionId: response.vaultUpdateId, hash: EMPTY_HASH, isDeleted: true }); - await this.tryIncrementVaultUpdateId( - response.vaultUpdateId - ); + this.tryIncrementVaultUpdateId(response.vaultUpdateId); return; } - if ( - latestMetadata[1].parentVersionId >= response.vaultUpdateId - ) { - this.logger.debug( - `Document ${relativePath} is already more up to date than the fetched version` - ); - return; - } - - if (response.relativePath != relativePath) { + if (response.relativePath != document.relativePath) { await this.operations.move( - latestMetadata[0], + document.relativePath, response.relativePath, response.documentId ); // this can throw FileNotFoundError @@ -316,155 +256,85 @@ export class UnrestrictedSyncer { this.history.addHistoryEntry({ status: SyncStatus.SUCCESS, source: SyncSource.PULL, - relativePath, + relativePath: document.relativePath, message: `The file we updated had been updated remotely, so we downloaded the merged version`, type: SyncType.UPDATE }); } - await this.database.setDocument({ + this.database.setDocument({ documentId: response.documentId, relativePath: - response.relativePath != relativePath + response.relativePath != document.relativePath ? response.relativePath - : latestMetadata[0], + : document.relativePath, parentVersionId: response.vaultUpdateId, hash: contentHash, isDeleted: response.isDeleted }); - await this.tryIncrementVaultUpdateId(response.vaultUpdateId); + this.tryIncrementVaultUpdateId(response.vaultUpdateId); } ); } public async unrestrictedSyncRemotelyUpdatedFile( - remoteVersion: components["schemas"]["DocumentVersionWithoutContent"] + remoteVersion: components["schemas"]["DocumentVersionWithoutContent"], + getLatestDocument?: () => DocumentRecord ): Promise { await this.executeSync( [remoteVersion.relativePath], SyncType.UPDATE, SyncSource.PULL, async () => { + const localMetadata = + getLatestDocument?.() ?? + this.database.getDocumentByDocumentId( + remoteVersion.documentId + ); + + if ( + localMetadata?.metadata !== undefined && + !localMetadata.metadata.isDeleted + ) { + // If the file exists locally, let's pretend the user has updated it + // and deal with remote update/deletion within `unrestrictedSyncLocallyUpdatedFile` + if ( + localMetadata.metadata.parentVersionId >= + remoteVersion.vaultUpdateId + ) { + this.logger.debug( + `Document ${remoteVersion.relativePath} is already more up to date than the fetched version` + ); + return; + } + + return this.unrestrictedSyncLocallyUpdatedFile({ + getLatestDocument: () => + this.database.getDocumentByIdentity( + localMetadata.identity + ) + }); + } + const content = ( await this.syncService.get({ documentId: remoteVersion.documentId }) ).contentBase64; const contentBytes = deserialize(content); - const contentHash = hash(contentBytes); - const localMetadata = this.database.getDocumentByDocumentId( - remoteVersion.documentId - ); - if ( - localMetadata?.[1].documentId === - remoteVersion.documentId && - localMetadata[1].parentVersionId > - remoteVersion.vaultUpdateId - ) { - this.logger.info( - `Document ${remoteVersion.relativePath} is already up to date` - ); - return; - } - - const localBytes = await this.operations.read( - remoteVersion.relativePath - ); // this can throw FileNotFoundError - const localHash = hash(localBytes); - - if (localHash !== localMetadata?.[1].hash) { - this.logger.info( - `Document ${remoteVersion.relativePath} has pending local changes, so we shouldn't update it here` - ); - return; - } - - if (!localMetadata || localMetadata[1].isDeleted) { - if (remoteVersion.isDeleted) { - this.logger.info( - `Remotely deleted file hasn't been synced yet, so there's no need to delete it locally` - ); - return; - } - - await this.operations.create( - remoteVersion.relativePath, - contentBytes, - remoteVersion.documentId - ); - - await this.database.setDocument({ - documentId: remoteVersion.documentId, - relativePath: remoteVersion.relativePath, - parentVersionId: remoteVersion.vaultUpdateId, - hash: hash(contentBytes), - isDeleted: remoteVersion.isDeleted - }); - - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - source: SyncSource.PULL, - relativePath: remoteVersion.relativePath, - message: `Successfully downloaded remote file which hadn't existed locally`, - type: SyncType.CREATE - }); - return; - } - - const [relativePath, metadata] = localMetadata; - if (remoteVersion.vaultUpdateId <= metadata.parentVersionId) { - this.logger.debug( - `Document ${relativePath} is already up to date` - ); - return; - } - - if (remoteVersion.isDeleted) { - await this.operations.delete(relativePath); - - this.history.addHistoryEntry({ - status: SyncStatus.SUCCESS, - source: SyncSource.PULL, - relativePath: remoteVersion.relativePath, - message: `Successfully deleted remotely deleted file locally`, - type: SyncType.DELETE - }); - - await this.database.setDocument({ - documentId: remoteVersion.documentId, - relativePath: relativePath, - parentVersionId: remoteVersion.vaultUpdateId, - hash: EMPTY_HASH, - isDeleted: true - }); - - return; - } - - if (relativePath !== remoteVersion.relativePath) { - // TODO: this can fail, that's bad - await this.operations.move( - // this can throw FileNotFoundError - relativePath, - remoteVersion.relativePath, - remoteVersion.documentId - ); - } - - // todo: why await this.operations.create( remoteVersion.relativePath, contentBytes, remoteVersion.documentId ); - await this.database.setDocument({ + this.database.setDocument({ documentId: remoteVersion.documentId, relativePath: remoteVersion.relativePath, parentVersionId: remoteVersion.vaultUpdateId, - hash: contentHash, + hash: hash(contentBytes), isDeleted: remoteVersion.isDeleted }); @@ -472,8 +342,8 @@ export class UnrestrictedSyncer { status: SyncStatus.SUCCESS, source: SyncSource.PULL, relativePath: remoteVersion.relativePath, - message: `Successfully updated remotely updated file locally`, - type: SyncType.UPDATE + message: `Successfully downloaded remote file which hadn't existed locally`, + type: SyncType.CREATE }); } ); @@ -551,11 +421,9 @@ export class UnrestrictedSyncer { this.locks.reset(); } - private async tryIncrementVaultUpdateId( - responseVaultUpdateId: number - ): Promise { + private tryIncrementVaultUpdateId(responseVaultUpdateId: number): void { if (this.database.getLastSeenUpdateId() === responseVaultUpdateId - 1) { - await this.database.setLastSeenUpdateId(responseVaultUpdateId); + this.database.setLastSeenUpdateId(responseVaultUpdateId); } } } diff --git a/frontend/sync-client/src/utils/create-promise.ts b/frontend/sync-client/src/utils/create-promise.ts new file mode 100644 index 00000000..056c169c --- /dev/null +++ b/frontend/sync-client/src/utils/create-promise.ts @@ -0,0 +1,15 @@ +export function createPromise(): [ + Promise, + (value: T) => void, + (error: unknown) => void +] { + let resolve: undefined | ((resolved: T) => void) = undefined; + let reject: undefined | ((error: unknown) => void) = undefined; + + const creationPromise = new Promise( + (resolve_, reject_) => ((resolve = resolve_), (reject = reject_)) + ); + + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + return [creationPromise, resolve!, reject!]; +} diff --git a/frontend/sync-client/src/utils/find-matching-file.ts b/frontend/sync-client/src/utils/find-matching-file.ts index 27a4c875..10545f2c 100644 --- a/frontend/sync-client/src/utils/find-matching-file.ts +++ b/frontend/sync-client/src/utils/find-matching-file.ts @@ -1,14 +1,14 @@ -import type { DocumentMetadata, RelativePath } from "../persistence/database"; +import type { DocumentRecord } from "../persistence/database"; import { EMPTY_HASH } from "./hash"; // TODO: make this smarter so that offline files can be renamed & edited at the same time export function findMatchingFile( contentHash: string, - candidates: [RelativePath, DocumentMetadata][] -): [RelativePath, DocumentMetadata] | undefined { + candidates: DocumentRecord[] +): DocumentRecord | undefined { if (contentHash === EMPTY_HASH) { return undefined; } - return candidates.find(([_, metadata]) => metadata.hash === contentHash); + return candidates.find(({ metadata }) => metadata?.hash === contentHash); } diff --git a/frontend/test-client/run.sh b/frontend/test-client/run.sh index 892be317..6effc147 100755 --- a/frontend/test-client/run.sh +++ b/frontend/test-client/run.sh @@ -1,21 +1,71 @@ #!/bin/bash set -e +set -o pipefail + +# Check if the argument is provided +if [ $# -eq 0 ]; then + echo "Usage: $0 " + exit 1 +fi + +# Get the number of processes from the first argument +process_count=$1 npm run build pids=() -for i in {1..10}; do +for i in $(seq 1 $process_count); do node dist/cli.js 2>&1 | tee "log_${i}.log" & pids+=($!) done -trap 'kill ${pids[@]} 2>/dev/null' SIGINT SIGTERM +print_failed_log() { + for i in $(seq 1 $process_count); do + if [ -n "${pids[$i-1]}" ] && ! kill -0 ${pids[$i-1]} 2>/dev/null; then + # Get the exit code of the process + wait ${pids[$i-1]} + exit_code=$? + + # Only consider non-zero exit codes as failures + if [ $exit_code -ne 0 ]; then + echo "Process ${pids[$i-1]} failed with exit code $exit_code. Log file: $(pwd)/log_${i}.log" + return 0 + else + echo "Process ${pids[$i-1]} completed successfully with exit code 0" + # Mark this PID as processed by setting it to empty + pids[$i-1]="" + fi + fi + done + return 1 +} -for pid in ${pids[@]}; do - if ! wait $pid; then - kill ${pids[@]} 2>/dev/null - echo "Process $pid failed, see log_$(echo ${pids[@]} | tr ' ' '\n' | grep -n "^$pid$" | cut -d: -f1).log" +# Monitor processes +while true; do + if print_failed_log; then + # Kill remaining processes + for pid in "${pids[@]}"; do + if [ -n "$pid" ]; then + kill $pid 2>/dev/null || true + fi + done exit 1 fi + + # Check if all processes have completed + all_done=true + for pid in "${pids[@]}"; do + if [ -n "$pid" ] && kill -0 $pid 2>/dev/null; then + all_done=false + break + fi + done + + if $all_done; then + echo "All processes completed successfully" + exit 0 + fi + + sleep 0.2 done diff --git a/frontend/test-client/src/agent/mock-agent.ts b/frontend/test-client/src/agent/mock-agent.ts index fad989fb..06c0f10e 100644 --- a/frontend/test-client/src/agent/mock-agent.ts +++ b/frontend/test-client/src/agent/mock-agent.ts @@ -64,7 +64,7 @@ export class MockAgent extends MockClient { // Let's not ignore errors // eslint-disable-next-line @typescript-eslint/no-floating-promises - sleep(1000).then(() => process.exit(1)); + sleep(100).then(() => process.exit(1)); break; case LogLevel.WARNING: diff --git a/frontend/test-client/src/cli.ts b/frontend/test-client/src/cli.ts index 26a0f23f..e87666f3 100644 --- a/frontend/test-client/src/cli.ts +++ b/frontend/test-client/src/cli.ts @@ -38,7 +38,9 @@ async function runTest({ ) ); } + // for debugging + // eslint-disable-next-line (globalThis as any).clients = clients; try { @@ -88,11 +90,11 @@ async function runTest({ } async function runTests(): Promise { - const agentCounts = [2, 10]; - const jitterScaleInSeconds = [0, 0.5, 3]; - const concurrencies = [1, 16]; - const iterations = [50, 300]; - const doDeletes = [false]; + const agentCounts = [2, 8]; + const jitterScaleInSeconds = [0.5, 0, 2]; + const concurrencies = [1]; + const iterations = [50, 200]; + const doDeletes = [true, false]; for (const agentCount of agentCounts) { for (const concurrency of concurrencies) { @@ -106,6 +108,7 @@ async function runTests(): Promise { doDeletes: deleteFiles, jitterScaleInSeconds: jitter }); + return; } } } @@ -113,15 +116,13 @@ async function runTests(): Promise { } } -process.on("uncaughtException", async (error) => { +process.on("uncaughtException", (error) => { console.error("Uncaught Exception:", error); - await sleep(1000); process.exit(1); }); -process.on("unhandledRejection", async (reason, promise) => { +process.on("unhandledRejection", (reason, _promise) => { console.error("Unhandled Rejection:", reason); - await sleep(1000); process.exit(1); }); @@ -131,6 +132,5 @@ runTests() }) .catch(async (err: unknown) => { console.error(err); - await sleep(1000); process.exit(1); });