diff --git a/plugin/src/plugin.ts b/plugin/src/plugin.ts index afd3f63..3b7be81 100644 --- a/plugin/src/plugin.ts +++ b/plugin/src/plugin.ts @@ -43,12 +43,12 @@ export default class SyncPlugin extends Plugin { const syncService = new SyncService(database); - const syncer = new Syncer({ + const syncer = new Syncer( database, - operations: this.operations, syncService, - history: this.history, - }); + this.operations, + this.history + ); const statusDescription = new StatusDescription( database, diff --git a/plugin/src/sync-operations/syncer.ts b/plugin/src/sync-operations/syncer.ts index 8d80438..efa42de 100644 --- a/plugin/src/sync-operations/syncer.ts +++ b/plugin/src/sync-operations/syncer.ts @@ -15,55 +15,31 @@ import { EMPTY_HASH, hash } from "src/utils/hash"; import type { components } from "src/services/types.js"; export class Syncer { - private readonly database: Database; - private readonly syncService: SyncService; - private readonly operations: FileOperations; - private readonly history: SyncHistory; - - private isRunningOfflineSync = false; - - // The offline sync methods call file sync methods, however, we can't preempt promises so we 2 queues to avoid a deadlock - private readonly offlineSyncQueue: PQueue; - private readonly fileSyncQueue: PQueue; - private readonly remainingOperationsListeners: (( remainingOperations: number ) => void)[] = []; - public constructor({ - database, - syncService, - operations, - history, - }: { - database: Database; - syncService: SyncService; - operations: FileOperations; - history: SyncHistory; - }) { - this.database = database; - this.syncService = syncService; - this.operations = operations; - this.history = history; + private readonly syncQueue: PQueue; - this.fileSyncQueue = new PQueue({ - concurrency: database.getSettings().syncConcurrency, - }); - this.offlineSyncQueue = new PQueue({ + private isRunningOfflineSync = false; + + public constructor( + private readonly database: Database, + private readonly syncService: SyncService, + private readonly operations: FileOperations, + private readonly history: SyncHistory + ) { + this.syncQueue = new PQueue({ concurrency: database.getSettings().syncConcurrency, }); database.addOnSettingsChangeHandlers((settings) => { - this.fileSyncQueue.concurrency = settings.syncConcurrency; - this.offlineSyncQueue.concurrency = settings.syncConcurrency; + this.syncQueue.concurrency = settings.syncConcurrency; }); - const updateRemainingOperations = () => - this.emitRemainingOperationsChange( - this.fileSyncQueue.size + this.offlineSyncQueue.size - ); - this.fileSyncQueue.on("active", updateRemainingOperations); - this.offlineSyncQueue.on("active", updateRemainingOperations); + this.syncQueue.on("active", () => { + this.emitRemainingOperationsChange(this.syncQueue.size); + }); } public addRemainingOperationsListener( @@ -76,8 +52,171 @@ export class Syncer { relativePath: RelativePath, updateTime: Date ): Promise { - await this.safelySync(async () => { - try { + await this.syncQueue.add(async () => + this.internalSyncLocallyCreatedFile(relativePath, updateTime) + ); + } + + public async syncLocallyUpdatedFile(args: { + oldPath?: RelativePath; + relativePath: RelativePath; + updateTime: Date; + }): Promise { + await this.syncQueue.add(async () => + this.internalSyncLocallyUpdatedFile(args) + ); + } + + public async syncLocallyDeletedFile( + relativePath: RelativePath + ): Promise { + await this.syncQueue.add(async () => + this.internalSyncLocallyDeletedFile(relativePath) + ); + } + + public async syncRemotelyUpdatedFile( + remoteVersion: components["schemas"]["DocumentVersionWithoutContent"] + ): Promise { + await this.syncQueue.add(async () => + this.internalSyncRemotelyUpdatedFile(remoteVersion) + ); + } + + public async scheduleSyncForOfflineChanges(): Promise { + if (this.isRunningOfflineSync) { + Logger.getInstance().warn( + "Uploading local changes is already in progress, skipping" + ); + return; + } + + if (!this.database.getSettings().isSyncEnabled) { + Logger.getInstance().debug( + `Syncing is disabled, not uploading local changes` + ); + return; + } + + this.isRunningOfflineSync = true; + + try { + const allLocalFiles = await this.operations.listAllFiles(); + const locallyDeletedFiles = [ + ...this.database.getDocuments().entries(), + ].filter(([path, _]) => !allLocalFiles.includes(path)); + + await Promise.all( + allLocalFiles.map(async (relativePath) => + this.syncQueue.add(async () => { + const metadata = + this.database.getDocument(relativePath); + + // If there's no metadata, it must be a new file + if (!metadata) { + // Perhaps the file has been moved. Let's check by looking at the deleted files + const originalFile = + await this.findMatchingFileBasedOnHash( + relativePath, + locallyDeletedFiles + ); + if (originalFile !== undefined) { + // `originalFile` hasn't been deleted but it got moved instead + locallyDeletedFiles.remove(originalFile); + + Logger.getInstance().debug( + `Document ${relativePath} was not found under its current path in the database but was found under a different path ${originalFile[0]}, scheduling sync to move it` + ); + return this.internalSyncLocallyUpdatedFile({ + oldPath: originalFile[0], + relativePath: relativePath, + updateTime: + await this.operations.getModificationTime( + relativePath + ), + }); + } + + Logger.getInstance().debug( + `Document ${relativePath} not found in database, scheduling sync to create it` + ); + return this.internalSyncLocallyCreatedFile( + relativePath, + await this.operations.getModificationTime( + relativePath + ) + ); + } + + const content = await this.operations.read( + relativePath + ); + if (metadata.hash !== hash(content)) { + Logger.getInstance().debug( + `Document ${relativePath} has been updated locally, scheduling sync to update it` + ); + return this.internalSyncLocallyUpdatedFile({ + relativePath: relativePath, + updateTime: + await this.operations.getModificationTime( + relativePath + ), + }); + } + + this.history.addHistoryEntry({ + status: SyncStatus.NO_OP, + source: SyncSource.PUSH, + relativePath, + message: + "Document hasn't been updated locally, no need to sync", + }); + return Promise.resolve(); + }) + ) + ); + + await Promise.all( + locallyDeletedFiles.map(async ([relativePath, _]) => { + Logger.getInstance().debug( + `Document ${relativePath} has been deleted locally, scheduling sync to delete it` + ); + + return this.internalSyncLocallyDeletedFile(relativePath); + }) + ); + + Logger.getInstance().info( + `All local changes have been applied remotely` + ); + } catch (e) { + Logger.getInstance().error( + `Not all local changes have been applied remotely: ${e}` + ); + } finally { + this.isRunningOfflineSync = false; + } + } + + public async reset(): Promise { + this.syncQueue.clear(); + await this.syncQueue.onEmpty(); + await this.database.resetSyncState(); + this.history.reset(); + this.remainingOperationsListeners.forEach((listener) => { + listener(0); + }); + } + + private async internalSyncLocallyCreatedFile( + relativePath: RelativePath, + updateTime: Date + ): Promise { + await this.executeWhileHoldingFileLock( + relativePath, + SyncType.CREATE, + SyncSource.PUSH, + async () => { const contentBytes = await this.operations.read(relativePath); const contentHash = hash(contentBytes); @@ -138,19 +277,11 @@ export class Syncer { }); await this.tryIncrementVaultUpdateId(response.vaultUpdateId); - } catch (e) { - this.history.addHistoryEntry({ - status: SyncStatus.ERROR, - relativePath, - message: `Failed to reconcile locally created file: ${e}`, - type: SyncType.CREATE, - }); - throw e; } - }, relativePath); + ); } - public async syncLocallyUpdatedFile({ + private async internalSyncLocallyUpdatedFile({ oldPath, relativePath, updateTime, @@ -159,8 +290,11 @@ export class Syncer { relativePath: RelativePath; updateTime: Date; }): Promise { - await this.safelySync(async () => { - try { + await this.executeWhileHoldingFileLock( + relativePath, + SyncType.UPDATE, + SyncSource.PUSH, + async () => { const metadata = this.database.getDocument( oldPath ?? relativePath ); @@ -262,23 +396,18 @@ export class Syncer { }); await this.tryIncrementVaultUpdateId(response.vaultUpdateId); - } catch (e) { - this.history.addHistoryEntry({ - status: SyncStatus.ERROR, - relativePath, - message: `Failed to reconcile locally updated file: ${e}`, - type: SyncType.UPDATE, - }); - throw e; } - }, relativePath); + ); } - public async syncLocallyDeletedFile( + private async internalSyncLocallyDeletedFile( relativePath: RelativePath ): Promise { - await this.safelySync(async () => { - try { + await this.executeWhileHoldingFileLock( + relativePath, + SyncType.DELETE, + SyncSource.PUSH, + async () => { const metadata = this.database.getDocument(relativePath); if (!metadata) { this.history.addHistoryEntry({ @@ -305,138 +434,18 @@ export class Syncer { }); await this.database.removeDocument(relativePath); - } catch (e) { - this.history.addHistoryEntry({ - status: SyncStatus.ERROR, - relativePath, - message: `Failed to remotely delete locally deleted file: ${e}`, - type: SyncType.DELETE, - }); - throw e; } - }, relativePath); + ); } - public async scheduleSyncForOfflineChanges(): Promise { - if (this.isRunningOfflineSync) { - Logger.getInstance().warn( - "Uploading local changes is already in progress, skipping" - ); - return; - } - - if (!this.database.getSettings().isSyncEnabled) { - Logger.getInstance().debug( - `Syncing is disabled, not uploading local changes` - ); - return; - } - - this.isRunningOfflineSync = true; - - try { - const allLocalFiles = await this.operations.listAllFiles(); - const locallyDeletedFiles = [ - ...this.database.getDocuments().entries(), - ].filter(([path, _]) => !allLocalFiles.includes(path)); - - await Promise.all( - allLocalFiles.map(async (relativePath) => - this.offlineSyncQueue.add(async () => { - const metadata = - this.database.getDocument(relativePath); - - // If there's no metadata, it must be a new file - if (!metadata) { - // Perhaps the file has been moved. Let's check by looking at the deleted files - const originalFile = - await this.findMatchingFileBasedOnHash( - relativePath, - locallyDeletedFiles - ); - if (originalFile !== undefined) { - // `originalFile` hasn't been deleted but it got moved instead - locallyDeletedFiles.remove(originalFile); - - Logger.getInstance().debug( - `Document ${relativePath} was not found under its current path in the database but was found under a different path ${originalFile[0]}, scheduling sync to move it` - ); - return this.syncLocallyUpdatedFile({ - oldPath: originalFile[0], - relativePath: relativePath, - updateTime: - await this.operations.getModificationTime( - relativePath - ), - }); - } - - Logger.getInstance().debug( - `Document ${relativePath} not found in database, scheduling sync to create it` - ); - return this.syncLocallyCreatedFile( - relativePath, - await this.operations.getModificationTime( - relativePath - ) - ); - } - - const content = await this.operations.read( - relativePath - ); - if (metadata.hash !== hash(content)) { - Logger.getInstance().debug( - `Document ${relativePath} has been updated locally, scheduling sync to update it` - ); - return this.syncLocallyUpdatedFile({ - relativePath: relativePath, - updateTime: - await this.operations.getModificationTime( - relativePath - ), - }); - } - - this.history.addHistoryEntry({ - status: SyncStatus.NO_OP, - source: SyncSource.PUSH, - relativePath, - message: - "Document hasn't been updated locally, no need to sync", - }); - return Promise.resolve(); - }) - ) - ); - - await Promise.all( - locallyDeletedFiles.map(async ([relativePath, _]) => { - Logger.getInstance().debug( - `Document ${relativePath} has been deleted locally, scheduling sync to delete it` - ); - - return this.syncLocallyDeletedFile(relativePath); - }) - ); - - Logger.getInstance().info( - `All local changes have been applied remotely` - ); - } catch (e) { - Logger.getInstance().error( - `Not all local changes have been applied remotely: ${e}` - ); - } finally { - this.isRunningOfflineSync = false; - } - } - - public async syncRemotelyUpdatedFile( + private async internalSyncRemotelyUpdatedFile( remoteVersion: components["schemas"]["DocumentVersionWithoutContent"] ): Promise { - await this.safelySync(async () => { - try { + await this.executeWhileHoldingFileLock( + remoteVersion.relativePath, + SyncType.UPDATE, + SyncSource.PULL, + async () => { const currentVersion = this.database.getDocumentByDocumentId( remoteVersion.documentId ); @@ -559,31 +568,15 @@ export class Syncer { unlockDocument(relativePath); } } - } catch (e) { - this.history.addHistoryEntry({ - status: SyncStatus.ERROR, - source: SyncSource.PULL, - relativePath: remoteVersion.relativePath, - message: `Failed to reconcile remotely updated file: ${e}`, - }); - throw e; } - }, remoteVersion.relativePath); + ); } - public async reset(): Promise { - this.fileSyncQueue.clear(); - await this.fileSyncQueue.onEmpty(); - await this.database.resetSyncState(); - this.history.reset(); - this.remainingOperationsListeners.forEach((listener) => { - listener(0); - }); - } - - private async safelySync( - fn: () => Promise, - relativePath: RelativePath + private async executeWhileHoldingFileLock( + relativePath: RelativePath, + syncType: SyncType, + syncSource: SyncSource, + fn: () => Promise ): Promise { if (!this.database.getSettings().isSyncEnabled) { Logger.getInstance().info( @@ -595,7 +588,16 @@ export class Syncer { await waitForDocumentLock(relativePath); try { - await this.fileSyncQueue.add(fn); + await fn(); + } catch (e) { + this.history.addHistoryEntry({ + status: SyncStatus.ERROR, + relativePath, + message: `Failed to ${syncSource.toLocaleLowerCase()} file ${e} when trying to ${syncType.toLocaleLowerCase()} it`, + type: syncType, + source: syncSource, + }); + throw e; } finally { unlockDocument(relativePath); }