From e3196c2dc0988b4ddd0fbbd85c74576a54a85f77 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sat, 15 Mar 2025 09:25:09 +0000 Subject: [PATCH] works --- .../src/file-operations/file-operations.ts | 24 +- .../sync-client/src/persistence/database.ts | 116 ++++++---- .../src/services/connected-state.ts | 52 +++++ .../sync-client/src/services/sync-service.ts | 38 +-- frontend/sync-client/src/sync-client.ts | 24 +- .../sync-client/src/sync-operations/syncer.ts | 218 ++++++++---------- .../sync-operations/unrestricted-syncer.ts | 137 +++++------ frontend/test-client/src/agent/mock-agent.ts | 2 - frontend/test-client/src/agent/mock-client.ts | 13 ++ frontend/test-client/src/cli.ts | 15 +- 10 files changed, 338 insertions(+), 301 deletions(-) create mode 100644 frontend/sync-client/src/services/connected-state.ts diff --git a/frontend/sync-client/src/file-operations/file-operations.ts b/frontend/sync-client/src/file-operations/file-operations.ts index c3584de5..8071a0f5 100644 --- a/frontend/sync-client/src/file-operations/file-operations.ts +++ b/frontend/sync-client/src/file-operations/file-operations.ts @@ -38,7 +38,7 @@ export class FileOperations { const decoder = new TextDecoder("utf-8"); - // Normalize line endings to LF on Windows + // Normalize line-endings to LF on Windows let text = decoder.decode(content); text = text.replace(/\r\n/g, "\n"); @@ -53,7 +53,7 @@ export class FileOperations { return this.fs.exists(path); } - // Create and write the file if it doesn't exist. Otherwise, it has the same behavior as write. + // Create and write the file if it doesn't exist.Otherwise, it has the same behavior as write. // All parent directories are created if they don't exist. public async create( path: RelativePath, @@ -73,20 +73,15 @@ export class FileOperations { `Existing metadata for ${path}: ${JSON.stringify(document?.metadata)}` ); - if ( - document?.metadata !== undefined && - document.metadata.documentId === documentId - ) { + if (document !== undefined && document.documentId === documentId) { // This can happen if the document got moved both locally and remotely // to the same file path. In this case, we shouldn't deconflict, however, // we also can't overwrite otherwise we'd lose changes. throw new FileNotFoundError(path); } - this.logger.debug( - `We need to save what's at ${path} to ${deconflictedPath}` - ); - await this.move(path, deconflictedPath, documentId); + this.database.move(path, deconflictedPath); + await this.fs.rename(path, deconflictedPath); } else { await this.createParentDirectories(path); } @@ -147,7 +142,7 @@ export class FileOperations { } public async delete(path: RelativePath): Promise { - if (!(await this.exists(path))) { + if (await this.exists(path)) { this.logger.debug(`Deleting file: ${path}`); return this.fs.delete(path); } else { @@ -175,7 +170,7 @@ export class FileOperations { if ( document?.metadata !== undefined && - document.metadata.documentId === documentId + document.documentId === documentId ) { // This can happen if the document got moved both locally and remotely // to the same file path. In this case, we shouldn't deconflict, however, @@ -183,12 +178,13 @@ export class FileOperations { throw new FileNotFoundError(newPath); } - await this.move(newPath, deconflictedPath, documentId); - // this.database.move(oldPath, newPath); + this.database.move(newPath, deconflictedPath); + await this.fs.rename(newPath, deconflictedPath); } else { await this.createParentDirectories(newPath); } + this.database.move(oldPath, newPath); await this.fs.rename(oldPath, newPath); } diff --git a/frontend/sync-client/src/persistence/database.ts b/frontend/sync-client/src/persistence/database.ts index 4418f55c..0987c0de 100644 --- a/frontend/sync-client/src/persistence/database.ts +++ b/frontend/sync-client/src/persistence/database.ts @@ -6,14 +6,13 @@ export type RelativePath = string; export interface DocumentMetadata { parentVersionId: VaultUpdateId; - documentId: DocumentId; hash: string; } export interface StoredDocumentMetadata { relativePath: RelativePath; - parentVersionId: VaultUpdateId; documentId: DocumentId; + parentVersionId: VaultUpdateId; hash: string; } @@ -25,6 +24,7 @@ export interface StoredDatabase { export interface DocumentRecord { identity: symbol; relativePath: RelativePath; + documentId: DocumentId; metadata: DocumentMetadata | undefined; isDeleted: boolean; updates: Promise[]; @@ -43,14 +43,17 @@ export class Database { initialState ??= {}; this.documents = - initialState.documents?.map(({ relativePath, ...metadata }) => ({ - relativePath, - identity: Symbol(), - metadata, - isDeleted: false, - updates: [], - parallelVersion: 0 - })) ?? []; + initialState.documents?.map( + ({ relativePath, documentId, ...metadata }) => ({ + relativePath, + documentId, + identity: Symbol(), + metadata, + isDeleted: false, + updates: [], + parallelVersion: 0 + }) + ) ?? []; this.ensureConsistency(); this.logger.debug(`Loaded ${this.documents.length} documents`); @@ -135,11 +138,17 @@ export class Database { ({ identity }) => identity !== entry.identity ); + if (entry.relativePath !== relativePath) { + throw new Error( + "Document identity does not match the relative path" + ); + } + this.documents.push({ ...entry, relativePath, + documentId, metadata: { - documentId, parentVersionId, hash } @@ -153,13 +162,13 @@ export class Database { // meaning that two documents occupy the same path in terms of in-flight requests so we // need to create a new parallel version. entry = this.getLatestDocumentByRelativePath(relativePath); - if (entry && entry.metadata?.documentId !== documentId) { + if (entry && entry.documentId !== documentId) { this.documents.push({ // `entry` might be undefined if the document is new identity: Symbol(), relativePath, + documentId, metadata: { - documentId, parentVersionId, hash }, @@ -174,8 +183,8 @@ export class Database { this.documents.push({ identity: Symbol(), relativePath, + documentId, metadata: { - documentId, parentVersionId, hash }, @@ -210,16 +219,13 @@ export class Database { let entry = this.getLatestDocumentByRelativePath(relativePath); if (entry === undefined) { - entry = { - relativePath, - identity: Symbol(), - metadata: undefined, - isDeleted: false, - updates: [], - parallelVersion: 0 - }; - - this.documents.push(entry); + throw new Error( + `Document not found by relative path: ${relativePath}, ${JSON.stringify( + this.documents, + null, + 2 + )}` + ); } const currentPromises = entry.updates; @@ -227,6 +233,30 @@ export class Database { await Promise.all(currentPromises); } + public getNewResolvedDocumentByRelativePath( + documentId: DocumentId, + relativePath: RelativePath, + promise: Promise + ): void { + let previousEntry = this.getLatestDocumentByRelativePath(relativePath); + + const entry = { + relativePath, + documentId, + identity: Symbol(), + metadata: undefined, + isDeleted: false, + updates: [promise], + parallelVersion: + previousEntry?.parallelVersion === undefined + ? 0 + : previousEntry.parallelVersion + 1 + }; + + this.documents.push(entry); + this.save(); + } + public getDocumentByUpdatePromise(promise: Promise): DocumentRecord { const result = this.documents.find(({ updates }) => updates.includes(promise) @@ -240,11 +270,9 @@ export class Database { } public getDocumentByDocumentId( - documentId: DocumentId + find: DocumentId ): DocumentRecord | undefined { - return this.documents.find( - ({ metadata }) => metadata?.documentId === documentId - ); + return this.documents.find(({ documentId }) => documentId === find); } public getDocumentByIdentity(find: symbol): DocumentRecord { @@ -263,9 +291,8 @@ export class Database { ): void { const oldDocument = this.getLatestDocumentByRelativePath(oldRelativePath); + if (oldDocument === undefined) { - // We can try moving a non-existent document if it hasn't yet got created becasue it's - // the result of an offline event while this move happens online before. return; } @@ -275,13 +302,11 @@ export class Database { let newDocument = this.getLatestDocumentByRelativePath(newRelativePath); - // It's either an invalid state of newDocument is pending deletion and we have to wait for it to complete + // It's either an invalid state of newDocument is pending deletion and we have + // to wait for it to complete. this.documents.push({ - identity: oldDocument.identity, - metadata: oldDocument.metadata, + ...oldDocument, relativePath: newRelativePath, - isDeleted: oldDocument.isDeleted, - updates: oldDocument.updates, // We're in a strange state where the target of the move has just got deleted, // however, its metadata might already have a bunch of updates queued up for // the document at the new location. We need to keep these updates. @@ -295,8 +320,9 @@ export class Database { public delete(relativePath: RelativePath): void { const candidate = this.getLatestDocumentByRelativePath(relativePath); if (candidate === undefined) { - // it's fine because the document to be deleted might not have been created yet - return; + throw new Error( + `Document not found by relative path: ${relativePath}` + ); } candidate.isDeleted = true; } @@ -319,16 +345,12 @@ export class Database { private ensureConsistency(): void { const idToPath = new Map(); - this.resolvedDocuments - .filter(({ metadata }) => metadata !== undefined) - .forEach(({ metadata, relativePath }) => { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - idToPath.set(metadata!.documentId, [ - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - ...(idToPath.get(metadata!.documentId) ?? []), - relativePath - ]); - }); + this.resolvedDocuments.forEach(({ relativePath, documentId }) => { + idToPath.set(documentId, [ + ...(idToPath.get(documentId) ?? []), + relativePath + ]); + }); const duplicates = Array.from(idToPath.entries()) .filter(([_, paths]) => paths.length > 1) diff --git a/frontend/sync-client/src/services/connected-state.ts b/frontend/sync-client/src/services/connected-state.ts new file mode 100644 index 00000000..da522816 --- /dev/null +++ b/frontend/sync-client/src/services/connected-state.ts @@ -0,0 +1,52 @@ +import { Syncer } from "../sync-operations/syncer"; +import { Settings } from "../persistence/settings"; +import { Logger } from "../tracing/logger"; +import { createPromise } from "../utils/create-promise"; +import { retriedFetchFactory } from "../utils/retried-fetch"; + +export class ConnectedState { + private resolveIsSyncEnabled: (() => void) | undefined; + private syncIsEnabled: Promise | undefined; + + public constructor( + settings: Settings, + private readonly logger: Logger + ) { + settings.addOnSettingsChangeHandlers((newSettings, oldSettings) => { + if (!oldSettings.isSyncEnabled && newSettings.isSyncEnabled) { + this.handleComingOnline(); + } else if ( + oldSettings.isSyncEnabled && + !newSettings.isSyncEnabled + ) { + this.handleGoingOffline(); + } + }); + } + + private handleComingOnline() { + this.logger.debug("Sync is enabled"); + this.resolveIsSyncEnabled?.(); + } + + private handleGoingOffline() { + this.logger.debug("Sync is disabled"); + [this.syncIsEnabled, this.resolveIsSyncEnabled] = createPromise(); + } + + public getFetchImplementation( + fetch: typeof globalThis.fetch, + { doRetries = true }: { doRetries: boolean } = { doRetries: true } + ): typeof globalThis.fetch { + const retriedFetch = doRetries + ? retriedFetchFactory(this.logger, fetch) + : fetch; + + return async (input: RequestInfo | URL): Promise => { + if (this.syncIsEnabled !== undefined) { + await this.syncIsEnabled; + } + return retriedFetch(input); + }; + } +} diff --git a/frontend/sync-client/src/services/sync-service.ts b/frontend/sync-client/src/services/sync-service.ts index ff7ec8fd..d784aa0d 100644 --- a/frontend/sync-client/src/services/sync-service.ts +++ b/frontend/sync-client/src/services/sync-service.ts @@ -6,20 +6,22 @@ import type { RelativePath, VaultUpdateId } from "../persistence/database"; -import type { Logger } from "src/tracing/logger"; -import { retriedFetchFactory } from "src/utils/retried-fetch"; -import type { Settings } from "src/persistence/settings"; +import type { Logger } from "../tracing/logger"; +import type { Settings } from "../persistence/settings"; +import { ConnectedState } from "./connected-state"; export interface CheckConnectionResult { isSuccessful: boolean; message: string; } + export class SyncService { private client!: Client; private clientWithoutRetries!: Client; private _fetchImplementation: typeof globalThis.fetch = globalThis.fetch; public constructor( + private readonly connectedState: ConnectedState, private readonly settings: Settings, private readonly logger: Logger ) { @@ -52,17 +54,19 @@ export class SyncService { } public async create({ + documentId, relativePath, - contentBytes, - createdDate + contentBytes }: { + documentId?: DocumentId; relativePath: RelativePath; contentBytes: Uint8Array; - createdDate: Date; }): Promise { const formData = new FormData(); + if (documentId !== undefined) { + formData.append("document_id", documentId); + } formData.append("relative_path", relativePath); - formData.append("created_date", createdDate.toISOString()); formData.append("content", new Blob([contentBytes])); const response = await this.client.POST( @@ -100,21 +104,18 @@ export class SyncService { parentVersionId, documentId, relativePath, - contentBytes, - createdDate + contentBytes }: { parentVersionId: VaultUpdateId; documentId: DocumentId; relativePath: RelativePath; contentBytes: Uint8Array; - createdDate: Date; }): Promise { this.logger.debug( `Updating document ${documentId} with parent version ${parentVersionId} & ${new TextDecoder().decode(contentBytes)} & ${relativePath}` ); const formData = new FormData(); formData.append("parent_version_id", parentVersionId.toString()); - formData.append("created_date", createdDate.toISOString()); formData.append("relative_path", relativePath); formData.append("content", new Blob([contentBytes])); @@ -152,12 +153,10 @@ export class SyncService { public async delete({ documentId, - relativePath, - createdDate + relativePath }: { documentId: DocumentId; relativePath: RelativePath; - createdDate: Date; }): Promise { const response = await this.client.DELETE( "/vaults/{vault_id}/documents/{document_id}", @@ -172,7 +171,6 @@ export class SyncService { } }, body: { - createdDate: createdDate.toISOString(), relativePath } } @@ -298,11 +296,17 @@ export class SyncService { private createClient(remoteUri: string): void { this.client = createClient({ baseUrl: remoteUri, - fetch: retriedFetchFactory(this.logger, this._fetchImplementation) + fetch: this.connectedState.getFetchImplementation( + this._fetchImplementation + ) }); this.clientWithoutRetries = createClient({ - baseUrl: remoteUri + baseUrl: remoteUri, + fetch: this.connectedState.getFetchImplementation( + this._fetchImplementation, + { doRetries: false } + ) }); } } diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index b4d6118a..dfd366ca 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -12,6 +12,7 @@ import { SyncService } from "./services/sync-service"; import { Syncer } from "./sync-operations/syncer"; import type { FileSystemOperations } from "./file-operations/filesystem-operations"; import { FileOperations } from "./file-operations/file-operations"; +import { ConnectedState } from "./services/connected-state"; export class SyncClient { private remoteListenerIntervalId: NodeJS.Timeout | null = null; @@ -90,7 +91,9 @@ export class SyncClient { } ); - const syncService = new SyncService(settings, logger); + const connectedState = new ConnectedState(settings, logger); + + const syncService = new SyncService(connectedState, settings, logger); const syncer = new Syncer( logger, @@ -117,18 +120,13 @@ export class SyncClient { ); settings.addOnSettingsChangeHandlers((newSettings, oldSettings) => { - client.registerRemoteEventListener( - newSettings.fetchChangesUpdateIntervalMs - ); - - if (!oldSettings.isSyncEnabled && newSettings.isSyncEnabled) { - syncer - .scheduleSyncForOfflineChanges() - .catch((_error: unknown) => { - logger.error( - "Failed to schedule sync for offline changes" - ); - }); + if ( + newSettings.fetchChangesUpdateIntervalMs !== + oldSettings.fetchChangesUpdateIntervalMs + ) { + client.registerRemoteEventListener( + newSettings.fetchChangesUpdateIntervalMs + ); } }); diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 531e735a..7cb31aac 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -3,6 +3,7 @@ import type { SyncService } from "src/services/sync-service"; import type { Logger } from "src/tracing/logger"; import type { SyncHistory } from "src/tracing/sync-history"; import PQueue from "p-queue"; +import { v4 as uuidv4 } from "uuid"; import { hash } from "src/utils/hash"; import type { components } from "src/services/types"; import type { Settings } from "src/persistence/settings"; @@ -27,7 +28,7 @@ export class Syncer { public constructor( private readonly logger: Logger, private readonly database: Database, - private readonly settings: Settings, + settings: Settings, private readonly syncService: SyncService, private readonly operations: FileOperations, history: SyncHistory @@ -43,9 +44,11 @@ export class Syncer { this.syncQueue.concurrency = newSettings.syncConcurrency; }); - this.syncQueue.on("active", () => { - this.emitRemainingOperationsChange(this.syncQueue.size); - }); + this.syncQueue.on("active", () => + this.remainingOperationsListeners.forEach((listener) => + listener(this.syncQueue.size) + ) + ); this.internalSyncer = new UnrestrictedSyncer( logger, @@ -82,29 +85,32 @@ export class Syncer { } public async syncLocallyCreatedFile( - relativePath: RelativePath, - updateTime?: Date + relativePath: RelativePath ): Promise { - if (!this.settings.getSettings().isSyncEnabled) { - this.logger.info( - `Syncing is disabled, not syncing '${relativePath}'` + if ( + this.database.getLatestDocumentByRelativePath(relativePath) + ?.isDeleted === false + ) { + this.logger.debug( + `Document ${relativePath} already exists in the database, skipping` ); return; } const [promise, resolve, reject] = createPromise(); + const proposedDocumentId = uuidv4(); - // Most likely, we're waiting for the previous delete to finish on the file at this path - await this.database.getResolvedDocumentByRelativePath( + this.database.getNewResolvedDocumentByRelativePath( + proposedDocumentId, relativePath, promise ); try { - await this.syncQueue.add(async () => + await this.syncQueue.add(() => this.internalSyncer.unrestrictedSyncLocallyCreatedFile( - () => this.database.getDocumentByUpdatePromise(promise), - updateTime + proposedDocumentId, + () => this.database.getDocumentByUpdatePromise(promise) ) ); @@ -119,13 +125,8 @@ export class Syncer { public async syncLocallyDeletedFile( relativePath: RelativePath ): Promise { - if (!this.settings.getSettings().isSyncEnabled) { - this.logger.info( - `Syncing is disabled, not syncing '${relativePath}'` - ); - return; - } - + // We have to have a record of the delete in case there's an in-flight update for the same + // document which finishes after the delete has succeeded and would introduce a phantom metadata record. this.database.delete(relativePath); const [promise, resolve, reject] = createPromise(); @@ -137,13 +138,9 @@ export class Syncer { try { await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncLocallyDeletedFile(() => { - this.logger.debug( - `aaaahg ${relativePath} has been deleted locally, syncing to delete it` - ); - - return this.database.getDocumentByUpdatePromise(promise); - }) + this.internalSyncer.unrestrictedSyncLocallyDeletedFile(() => + this.database.getDocumentByUpdatePromise(promise) + ) ); resolve(); @@ -156,23 +153,22 @@ export class Syncer { public async syncLocallyUpdatedFile({ oldPath, - relativePath, - updateTime + relativePath }: { oldPath?: RelativePath; relativePath: RelativePath; - updateTime?: Date; }): Promise { - if (!this.settings.getSettings().isSyncEnabled) { - this.logger.info( - `Syncing is disabled, not syncing '${relativePath}'` - ); - return; - } - - const [promise, resolve, reject] = createPromise(); - if (oldPath !== undefined) { + if ( + this.database.getLatestDocumentByRelativePath(oldPath) + ?.isDeleted === true + ) { + this.logger.debug( + `Document ${oldPath} has been deleted locally, skipping` + ); + return; + } + if (oldPath === relativePath) { throw new Error( `Old path and new path are the same: ${oldPath}` @@ -182,6 +178,18 @@ export class Syncer { this.database.move(oldPath, relativePath); } + if ( + this.database.getLatestDocumentByRelativePath(relativePath) + ?.isDeleted === true + ) { + this.logger.debug( + `Document ${relativePath} has been deleted locally, skipping` + ); + return; + } + + const [promise, resolve, reject] = createPromise(); + await this.database.getResolvedDocumentByRelativePath( relativePath, promise @@ -191,7 +199,6 @@ export class Syncer { await this.syncQueue.add(async () => this.internalSyncer.unrestrictedSyncLocallyUpdatedFile({ oldPath, - updateTime, getLatestDocument: () => this.database.getDocumentByUpdatePromise(promise) }) @@ -206,13 +213,6 @@ export class Syncer { } public async scheduleSyncForOfflineChanges(): Promise { - if (!this.settings.getSettings().isSyncEnabled) { - this.logger.debug( - `Syncing is disabled, not uploading local changes` - ); - return; - } - if (this.runningScheduleSyncForOfflineChanges !== undefined) { this.logger.debug("Uploading local changes is already in progress"); return this.runningScheduleSyncForOfflineChanges; @@ -234,13 +234,6 @@ export class Syncer { } public async applyRemoteChangesLocally(): Promise { - if (!this.settings.getSettings().isSyncEnabled) { - this.logger.debug( - `Syncing is disabled, not fetching remote changes` - ); - return; - } - if (this.runningApplyRemoteChangesLocally != null) { this.logger.debug( "Applying remote changes locally is already in progress" @@ -272,6 +265,35 @@ export class Syncer { this.internalSyncer.reset(); } + private async internalApplyRemoteChangesLocally(): Promise { + const remote = await this.syncQueue.add(async () => + this.syncService.getAll(this.database.getLastSeenUpdateId()) + ); + + if (!remote) { + throw new Error("Failed to fetch remote changes"); + } + + if (remote.latestDocuments.length === 0) { + this.logger.debug("No remote changes to apply"); + return; + } + + this.logger.info("Applying remote changes locally"); + + await Promise.all( + remote.latestDocuments.map(this.syncRemotelyUpdatedFile.bind(this)) + ); + + const lastSeenUpdateId = this.database.getLastSeenUpdateId(); + if ( + lastSeenUpdateId === undefined || + remote.lastUpdateId > lastSeenUpdateId + ) { + this.database.setLastSeenUpdateId(remote.lastUpdateId); + } + } + private async syncRemotelyUpdatedFile( remoteVersion: components["schemas"]["DocumentVersionWithoutContent"] ): Promise { @@ -279,45 +301,38 @@ export class Syncer { remoteVersion.documentId ); - if (document === undefined) { - const candidate = this.database.getLatestDocumentByRelativePath( - remoteVersion.relativePath - ); - if (candidate !== undefined && candidate.metadata === undefined) { - document = candidate; - } - } - - if (document === undefined) { - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( - remoteVersion - ) - ); - - return; - } - const [promise, resolve, reject] = createPromise(); - await this.database.getResolvedDocumentByRelativePath( - document.relativePath, - promise - ); - - try { + if (document === undefined) { await this.syncQueue.add(async () => this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( remoteVersion, - () => this.database.getDocumentByUpdatePromise(promise) + () => + this.database.getDocumentByDocumentId( + remoteVersion.documentId + ) ) ); + } else { + await this.database.getResolvedDocumentByRelativePath( + document.relativePath, + promise + ); - resolve(); - } catch (e) { - reject(e); - } finally { - this.database.removeDocumentPromise(promise); + try { + await this.syncQueue.add(async () => + this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( + remoteVersion, + () => this.database.getDocumentByUpdatePromise(promise) + ) + ); + + resolve(); + } catch (e) { + reject(e); + } finally { + this.database.removeDocumentPromise(promise); + } } } @@ -405,35 +420,4 @@ export class Syncer { await Promise.all([updates, deletes]); } - - private async internalApplyRemoteChangesLocally(): Promise { - const remote = await this.syncService.getAll( - this.database.getLastSeenUpdateId() - ); - - if (remote.latestDocuments.length === 0) { - this.logger.debug("No remote changes to apply"); - return; - } - - this.logger.info("Applying remote changes locally"); - - await Promise.all( - remote.latestDocuments.map(this.syncRemotelyUpdatedFile.bind(this)) - ); - - const lastSeenUpdateId = this.database.getLastSeenUpdateId(); - if ( - lastSeenUpdateId === undefined || - remote.lastUpdateId > lastSeenUpdateId - ) { - this.database.setLastSeenUpdateId(remote.lastUpdateId); - } - } - - private emitRemainingOperationsChange(remainingOperations: number): void { - this.remainingOperationsListeners.forEach((listener) => { - listener(remainingOperations); - }); - } } diff --git a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts index 5e4cf754..84b4f070 100644 --- a/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts +++ b/frontend/sync-client/src/sync-operations/unrestricted-syncer.ts @@ -1,5 +1,6 @@ import type { Database, + DocumentId, DocumentRecord, RelativePath } from "../persistence/database"; @@ -15,6 +16,7 @@ import type { Settings } from "src/persistence/settings"; import type { FileOperations } from "src/file-operations/file-operations"; import { FileNotFoundError } from "src/file-operations/safe-filesystem-operations"; import { DocumentLocks } from "../file-operations/document-locks"; +import { createPromise } from "src/utils/create-promise"; export class UnrestrictedSyncer { private readonly locks: DocumentLocks; @@ -31,8 +33,8 @@ export class UnrestrictedSyncer { } public async unrestrictedSyncLocallyCreatedFile( - getLatestDocument: () => DocumentRecord, - updateTime?: Date + proposedDocumentId: DocumentId, + getLatestDocument: () => DocumentRecord ): Promise { let latestDocument = getLatestDocument(); @@ -41,29 +43,15 @@ export class UnrestrictedSyncer { SyncType.CREATE, SyncSource.PUSH, async () => { - if ( - latestDocument.metadata !== undefined && - !latestDocument.isDeleted - ) { - this.logger.debug( - `Document ${latestDocument.relativePath} already exists in the database, no need to create it again` - ); - return; - } - const contentBytes = await this.operations.read( latestDocument.relativePath ); // this can throw FileNotFoundError const contentHash = hash(contentBytes); - updateTime ??= await this.operations.getModificationTime( - latestDocument.relativePath - ); // this can throw FileNotFoundError - const response = await this.syncService.create({ + documentId: proposedDocumentId, relativePath: latestDocument.relativePath, - contentBytes, - createdDate: updateTime + contentBytes }); latestDocument = getLatestDocument(); @@ -76,15 +64,15 @@ export class UnrestrictedSyncer { type: SyncType.CREATE }); - const newMetadata = { - relativePath: latestDocument.relativePath, - documentId: response.documentId, - parentVersionId: response.vaultUpdateId, - hash: contentHash, - isDeleted: false - }; - - this.database.setDocument(newMetadata, latestDocument.identity); + this.database.setDocument( + { + relativePath: latestDocument.relativePath, + documentId: response.documentId, + parentVersionId: response.vaultUpdateId, + hash: contentHash + }, + latestDocument.identity + ); this.tryIncrementVaultUpdateId(response.vaultUpdateId); } @@ -100,17 +88,9 @@ export class UnrestrictedSyncer { SyncType.DELETE, SyncSource.PUSH, async () => { - if (document.metadata === undefined) { - this.logger.debug( - `Document '${document.relativePath}' has been created yet so deleting it remotely can be skipped` - ); - return; - } - const response = await this.syncService.delete({ - documentId: document.metadata.documentId, - relativePath: document.relativePath, - createdDate: new Date() // We've got the event now, so it must have been deleted just now + documentId: document.documentId, + relativePath: document.relativePath }); this.history.addHistoryEntry({ @@ -123,8 +103,6 @@ export class UnrestrictedSyncer { document = getLatestDocument(); - // We have to have a record of the delete in case there's an in-flight update for the same - // document which finishes after the delete has succeeded and would introduce a phantom metadata record. this.database.setDocument( { relativePath: document.relativePath, @@ -140,12 +118,10 @@ export class UnrestrictedSyncer { public async unrestrictedSyncLocallyUpdatedFile({ oldPath, - getLatestDocument, - updateTime + getLatestDocument }: { oldPath?: RelativePath; getLatestDocument: () => DocumentRecord; - updateTime?: Date; }): Promise { let document = getLatestDocument(); @@ -178,19 +154,13 @@ export class UnrestrictedSyncer { return; } - updateTime ??= await this.operations.getModificationTime( - document.relativePath - ); // this can throw FileNotFoundError; - const response = await this.syncService.put({ - documentId: document.metadata.documentId, + documentId: document.documentId, parentVersionId: document.metadata.parentVersionId, relativePath: document.relativePath, - contentBytes, - createdDate: updateTime + contentBytes }); - // Update relativePath which is the only property that can change while this is running (due to a move) document = getLatestDocument(); if (document.isDeleted) { @@ -252,6 +222,11 @@ export class UnrestrictedSyncer { } if (response.relativePath != document.relativePath) { + // this.database.getNewResolvedDocumentByRelativePath( + // response.relativePath, + // promise + // ); + await this.operations.move( document.relativePath, response.relativePath, @@ -283,10 +258,7 @@ export class UnrestrictedSyncer { this.database.setDocument( { documentId: response.documentId, - relativePath: - response.relativePath != document.relativePath - ? response.relativePath - : document.relativePath, + relativePath: document.relativePath, parentVersionId: response.vaultUpdateId, hash: contentHash }, @@ -300,20 +272,19 @@ export class UnrestrictedSyncer { public async unrestrictedSyncRemotelyUpdatedFile( remoteVersion: components["schemas"]["DocumentVersionWithoutContent"], - getLatestDocument?: () => DocumentRecord + getLatestDocument: () => DocumentRecord | undefined ): Promise { await this.executeSync( [remoteVersion.relativePath], SyncType.UPDATE, SyncSource.PULL, async () => { - const localMetadata = - getLatestDocument?.() ?? - this.database.getDocumentByDocumentId( - remoteVersion.documentId - ); + let localMetadata = getLatestDocument(); - if (localMetadata?.metadata !== undefined) { + if ( + localMetadata !== undefined && + localMetadata?.metadata !== undefined + ) { // If the file exists locally, let's pretend the user has updated it // and deal with remote update/deletion within `unrestrictedSyncLocallyUpdatedFile` if ( @@ -329,7 +300,7 @@ export class UnrestrictedSyncer { return this.unrestrictedSyncLocallyUpdatedFile({ getLatestDocument: () => this.database.getDocumentByIdentity( - localMetadata.identity + localMetadata!.identity ) }); } else if (remoteVersion.isDeleted) { @@ -347,27 +318,26 @@ export class UnrestrictedSyncer { }) ).contentBase64; - const latestDocument = - getLatestDocument?.() ?? - this.database.getDocumentByDocumentId( - remoteVersion.documentId - ); + localMetadata = getLatestDocument(); - if (latestDocument?.isDeleted) { + if (localMetadata?.isDeleted === true) { this.logger.info( `Document ${remoteVersion.relativePath} has been deleted locally before we could finish updating it` ); return; } + if ( + localMetadata?.metadata?.parentVersionId ?? + -1 >= remoteVersion.vaultUpdateId + ) { + this.logger.debug( + `Document ${remoteVersion.relativePath} is already more up to date than the fetched version` + ); + return; + } const contentBytes = deserialize(content); - await this.operations.create( - remoteVersion.relativePath, - contentBytes, - remoteVersion.documentId - ); - this.database.setDocument( { documentId: remoteVersion.documentId, @@ -375,7 +345,13 @@ export class UnrestrictedSyncer { parentVersionId: remoteVersion.vaultUpdateId, hash: hash(contentBytes) }, - latestDocument?.identity + localMetadata?.identity + ); + + await this.operations.create( + remoteVersion.relativePath, + contentBytes, + remoteVersion.documentId ); this.history.addHistoryEntry({ @@ -390,19 +366,12 @@ export class UnrestrictedSyncer { } public async executeSync( - lockedPaths: RelativePath[], + paths: RelativePath[], syncType: SyncType, syncSource: SyncSource, fn: () => Promise ): Promise { - const relativePath = lockedPaths[lockedPaths.length - 1]; - - if (!this.settings.getSettings().isSyncEnabled) { - this.logger.info( - `Syncing is disabled, not syncing '${relativePath}'` - ); - return; - } + const relativePath = paths[paths.length - 1]; if (!this.operations.isFileEligibleForSync(relativePath)) { this.history.addHistoryEntry({ diff --git a/frontend/test-client/src/agent/mock-agent.ts b/frontend/test-client/src/agent/mock-agent.ts index 06c0f10e..94c106da 100644 --- a/frontend/test-client/src/agent/mock-agent.ts +++ b/frontend/test-client/src/agent/mock-agent.ts @@ -83,8 +83,6 @@ export class MockAgent extends MockClient { } public async act(): Promise { - this.assertAllContentIsPresentOnce(); - const options: (() => Promise)[] = [ this.createFileAction.bind(this), this.changeFetchChangesUpdateIntervalMsAction.bind(this) diff --git a/frontend/test-client/src/agent/mock-client.ts b/frontend/test-client/src/agent/mock-client.ts index ec077778..814d3fc1 100644 --- a/frontend/test-client/src/agent/mock-client.ts +++ b/frontend/test-client/src/agent/mock-client.ts @@ -1,3 +1,4 @@ +import { assert } from "../utils/assert"; import type { RelativePath, FileSystemOperations, @@ -81,6 +82,18 @@ export class MockClient implements FileSystemOperations { const newContentUint8Array = new TextEncoder().encode(newContent); this.localFiles.set(path, newContentUint8Array); + const existingPats = currentContent + .split(" ") + .map((part) => part.trim()); + const newParts = newContent.split(" ").map((part) => part.trim()); + existingPats.forEach((part) => + // all changes should be additive + assert( + newParts.includes(part), + `Part ${part} not found in new content` + ) + ); + this.client.logger.info( `Updated file ${path} with:\n current content: ${currentContent}\n new content: ${newContent}` ); diff --git a/frontend/test-client/src/cli.ts b/frontend/test-client/src/cli.ts index 27c59f75..e1a6fd5b 100644 --- a/frontend/test-client/src/cli.ts +++ b/frontend/test-client/src/cli.ts @@ -92,16 +92,16 @@ async function runTest({ async function runTests(): Promise { const agentCounts = [2, 8]; const jitterScaleInSeconds = [0.5, 0, 2]; - const concurrencies = [1]; + const concurrencies = [16, 1]; const iterations = [50, 200]; const doDeletes = [true, false]; - for (let i = 0; i < 10; i++) { - for (const agentCount of agentCounts) { - for (const concurrency of concurrencies) { - for (const jitter of jitterScaleInSeconds) { - for (const iteration of iterations) { - for (const deleteFiles of doDeletes) { + for (const agentCount of agentCounts) { + for (const concurrency of concurrencies) { + for (const jitter of jitterScaleInSeconds) { + for (const iteration of iterations) { + for (const deleteFiles of doDeletes) { + for (let i = 0; i < 10; i++) { await runTest({ agentCount, concurrency, @@ -110,6 +110,7 @@ async function runTests(): Promise { jitterScaleInSeconds: jitter }); } + return; } } }