Refactor syncer and add internal, non-concurrency limited methods

This commit is contained in:
Andras Schmelczer 2025-01-03 14:45:26 +00:00
parent c733448a02
commit 5f5bdf75ea
No known key found for this signature in database
GPG key ID: FC8F2C3D3D1A718C
2 changed files with 220 additions and 218 deletions

View file

@ -15,55 +15,31 @@ import { EMPTY_HASH, hash } from "src/utils/hash";
import type { components } from "src/services/types.js";
export class Syncer {
private readonly database: Database;
private readonly syncService: SyncService;
private readonly operations: FileOperations;
private readonly history: SyncHistory;
private isRunningOfflineSync = false;
// The offline sync methods call file sync methods, however, we can't preempt promises so we 2 queues to avoid a deadlock
private readonly offlineSyncQueue: PQueue;
private readonly fileSyncQueue: PQueue;
private readonly remainingOperationsListeners: ((
remainingOperations: number
) => void)[] = [];
public constructor({
database,
syncService,
operations,
history,
}: {
database: Database;
syncService: SyncService;
operations: FileOperations;
history: SyncHistory;
}) {
this.database = database;
this.syncService = syncService;
this.operations = operations;
this.history = history;
private readonly syncQueue: PQueue;
this.fileSyncQueue = new PQueue({
concurrency: database.getSettings().syncConcurrency,
});
this.offlineSyncQueue = new PQueue({
private isRunningOfflineSync = false;
public constructor(
private readonly database: Database,
private readonly syncService: SyncService,
private readonly operations: FileOperations,
private readonly history: SyncHistory
) {
this.syncQueue = new PQueue({
concurrency: database.getSettings().syncConcurrency,
});
database.addOnSettingsChangeHandlers((settings) => {
this.fileSyncQueue.concurrency = settings.syncConcurrency;
this.offlineSyncQueue.concurrency = settings.syncConcurrency;
this.syncQueue.concurrency = settings.syncConcurrency;
});
const updateRemainingOperations = () =>
this.emitRemainingOperationsChange(
this.fileSyncQueue.size + this.offlineSyncQueue.size
);
this.fileSyncQueue.on("active", updateRemainingOperations);
this.offlineSyncQueue.on("active", updateRemainingOperations);
this.syncQueue.on("active", () => {
this.emitRemainingOperationsChange(this.syncQueue.size);
});
}
public addRemainingOperationsListener(
@ -76,8 +52,171 @@ export class Syncer {
relativePath: RelativePath,
updateTime: Date
): Promise<void> {
await this.safelySync(async () => {
try {
await this.syncQueue.add(async () =>
this.internalSyncLocallyCreatedFile(relativePath, updateTime)
);
}
public async syncLocallyUpdatedFile(args: {
oldPath?: RelativePath;
relativePath: RelativePath;
updateTime: Date;
}): Promise<void> {
await this.syncQueue.add(async () =>
this.internalSyncLocallyUpdatedFile(args)
);
}
public async syncLocallyDeletedFile(
relativePath: RelativePath
): Promise<void> {
await this.syncQueue.add(async () =>
this.internalSyncLocallyDeletedFile(relativePath)
);
}
public async syncRemotelyUpdatedFile(
remoteVersion: components["schemas"]["DocumentVersionWithoutContent"]
): Promise<void> {
await this.syncQueue.add(async () =>
this.internalSyncRemotelyUpdatedFile(remoteVersion)
);
}
public async scheduleSyncForOfflineChanges(): Promise<void> {
if (this.isRunningOfflineSync) {
Logger.getInstance().warn(
"Uploading local changes is already in progress, skipping"
);
return;
}
if (!this.database.getSettings().isSyncEnabled) {
Logger.getInstance().debug(
`Syncing is disabled, not uploading local changes`
);
return;
}
this.isRunningOfflineSync = true;
try {
const allLocalFiles = await this.operations.listAllFiles();
const locallyDeletedFiles = [
...this.database.getDocuments().entries(),
].filter(([path, _]) => !allLocalFiles.includes(path));
await Promise.all(
allLocalFiles.map(async (relativePath) =>
this.syncQueue.add(async () => {
const metadata =
this.database.getDocument(relativePath);
// If there's no metadata, it must be a new file
if (!metadata) {
// Perhaps the file has been moved. Let's check by looking at the deleted files
const originalFile =
await this.findMatchingFileBasedOnHash(
relativePath,
locallyDeletedFiles
);
if (originalFile !== undefined) {
// `originalFile` hasn't been deleted but it got moved instead
locallyDeletedFiles.remove(originalFile);
Logger.getInstance().debug(
`Document ${relativePath} was not found under its current path in the database but was found under a different path ${originalFile[0]}, scheduling sync to move it`
);
return this.internalSyncLocallyUpdatedFile({
oldPath: originalFile[0],
relativePath: relativePath,
updateTime:
await this.operations.getModificationTime(
relativePath
),
});
}
Logger.getInstance().debug(
`Document ${relativePath} not found in database, scheduling sync to create it`
);
return this.internalSyncLocallyCreatedFile(
relativePath,
await this.operations.getModificationTime(
relativePath
)
);
}
const content = await this.operations.read(
relativePath
);
if (metadata.hash !== hash(content)) {
Logger.getInstance().debug(
`Document ${relativePath} has been updated locally, scheduling sync to update it`
);
return this.internalSyncLocallyUpdatedFile({
relativePath: relativePath,
updateTime:
await this.operations.getModificationTime(
relativePath
),
});
}
this.history.addHistoryEntry({
status: SyncStatus.NO_OP,
source: SyncSource.PUSH,
relativePath,
message:
"Document hasn't been updated locally, no need to sync",
});
return Promise.resolve();
})
)
);
await Promise.all(
locallyDeletedFiles.map(async ([relativePath, _]) => {
Logger.getInstance().debug(
`Document ${relativePath} has been deleted locally, scheduling sync to delete it`
);
return this.internalSyncLocallyDeletedFile(relativePath);
})
);
Logger.getInstance().info(
`All local changes have been applied remotely`
);
} catch (e) {
Logger.getInstance().error(
`Not all local changes have been applied remotely: ${e}`
);
} finally {
this.isRunningOfflineSync = false;
}
}
public async reset(): Promise<void> {
this.syncQueue.clear();
await this.syncQueue.onEmpty();
await this.database.resetSyncState();
this.history.reset();
this.remainingOperationsListeners.forEach((listener) => {
listener(0);
});
}
private async internalSyncLocallyCreatedFile(
relativePath: RelativePath,
updateTime: Date
): Promise<void> {
await this.executeWhileHoldingFileLock(
relativePath,
SyncType.CREATE,
SyncSource.PUSH,
async () => {
const contentBytes = await this.operations.read(relativePath);
const contentHash = hash(contentBytes);
@ -138,19 +277,11 @@ export class Syncer {
});
await this.tryIncrementVaultUpdateId(response.vaultUpdateId);
} catch (e) {
this.history.addHistoryEntry({
status: SyncStatus.ERROR,
relativePath,
message: `Failed to reconcile locally created file: ${e}`,
type: SyncType.CREATE,
});
throw e;
}
}, relativePath);
);
}
public async syncLocallyUpdatedFile({
private async internalSyncLocallyUpdatedFile({
oldPath,
relativePath,
updateTime,
@ -159,8 +290,11 @@ export class Syncer {
relativePath: RelativePath;
updateTime: Date;
}): Promise<void> {
await this.safelySync(async () => {
try {
await this.executeWhileHoldingFileLock(
relativePath,
SyncType.UPDATE,
SyncSource.PUSH,
async () => {
const metadata = this.database.getDocument(
oldPath ?? relativePath
);
@ -262,23 +396,18 @@ export class Syncer {
});
await this.tryIncrementVaultUpdateId(response.vaultUpdateId);
} catch (e) {
this.history.addHistoryEntry({
status: SyncStatus.ERROR,
relativePath,
message: `Failed to reconcile locally updated file: ${e}`,
type: SyncType.UPDATE,
});
throw e;
}
}, relativePath);
);
}
public async syncLocallyDeletedFile(
private async internalSyncLocallyDeletedFile(
relativePath: RelativePath
): Promise<void> {
await this.safelySync(async () => {
try {
await this.executeWhileHoldingFileLock(
relativePath,
SyncType.DELETE,
SyncSource.PUSH,
async () => {
const metadata = this.database.getDocument(relativePath);
if (!metadata) {
this.history.addHistoryEntry({
@ -305,138 +434,18 @@ export class Syncer {
});
await this.database.removeDocument(relativePath);
} catch (e) {
this.history.addHistoryEntry({
status: SyncStatus.ERROR,
relativePath,
message: `Failed to remotely delete locally deleted file: ${e}`,
type: SyncType.DELETE,
});
throw e;
}
}, relativePath);
);
}
public async scheduleSyncForOfflineChanges(): Promise<void> {
if (this.isRunningOfflineSync) {
Logger.getInstance().warn(
"Uploading local changes is already in progress, skipping"
);
return;
}
if (!this.database.getSettings().isSyncEnabled) {
Logger.getInstance().debug(
`Syncing is disabled, not uploading local changes`
);
return;
}
this.isRunningOfflineSync = true;
try {
const allLocalFiles = await this.operations.listAllFiles();
const locallyDeletedFiles = [
...this.database.getDocuments().entries(),
].filter(([path, _]) => !allLocalFiles.includes(path));
await Promise.all(
allLocalFiles.map(async (relativePath) =>
this.offlineSyncQueue.add(async () => {
const metadata =
this.database.getDocument(relativePath);
// If there's no metadata, it must be a new file
if (!metadata) {
// Perhaps the file has been moved. Let's check by looking at the deleted files
const originalFile =
await this.findMatchingFileBasedOnHash(
relativePath,
locallyDeletedFiles
);
if (originalFile !== undefined) {
// `originalFile` hasn't been deleted but it got moved instead
locallyDeletedFiles.remove(originalFile);
Logger.getInstance().debug(
`Document ${relativePath} was not found under its current path in the database but was found under a different path ${originalFile[0]}, scheduling sync to move it`
);
return this.syncLocallyUpdatedFile({
oldPath: originalFile[0],
relativePath: relativePath,
updateTime:
await this.operations.getModificationTime(
relativePath
),
});
}
Logger.getInstance().debug(
`Document ${relativePath} not found in database, scheduling sync to create it`
);
return this.syncLocallyCreatedFile(
relativePath,
await this.operations.getModificationTime(
relativePath
)
);
}
const content = await this.operations.read(
relativePath
);
if (metadata.hash !== hash(content)) {
Logger.getInstance().debug(
`Document ${relativePath} has been updated locally, scheduling sync to update it`
);
return this.syncLocallyUpdatedFile({
relativePath: relativePath,
updateTime:
await this.operations.getModificationTime(
relativePath
),
});
}
this.history.addHistoryEntry({
status: SyncStatus.NO_OP,
source: SyncSource.PUSH,
relativePath,
message:
"Document hasn't been updated locally, no need to sync",
});
return Promise.resolve();
})
)
);
await Promise.all(
locallyDeletedFiles.map(async ([relativePath, _]) => {
Logger.getInstance().debug(
`Document ${relativePath} has been deleted locally, scheduling sync to delete it`
);
return this.syncLocallyDeletedFile(relativePath);
})
);
Logger.getInstance().info(
`All local changes have been applied remotely`
);
} catch (e) {
Logger.getInstance().error(
`Not all local changes have been applied remotely: ${e}`
);
} finally {
this.isRunningOfflineSync = false;
}
}
public async syncRemotelyUpdatedFile(
private async internalSyncRemotelyUpdatedFile(
remoteVersion: components["schemas"]["DocumentVersionWithoutContent"]
): Promise<void> {
await this.safelySync(async () => {
try {
await this.executeWhileHoldingFileLock(
remoteVersion.relativePath,
SyncType.UPDATE,
SyncSource.PULL,
async () => {
const currentVersion = this.database.getDocumentByDocumentId(
remoteVersion.documentId
);
@ -559,31 +568,15 @@ export class Syncer {
unlockDocument(relativePath);
}
}
} catch (e) {
this.history.addHistoryEntry({
status: SyncStatus.ERROR,
source: SyncSource.PULL,
relativePath: remoteVersion.relativePath,
message: `Failed to reconcile remotely updated file: ${e}`,
});
throw e;
}
}, remoteVersion.relativePath);
);
}
public async reset(): Promise<void> {
this.fileSyncQueue.clear();
await this.fileSyncQueue.onEmpty();
await this.database.resetSyncState();
this.history.reset();
this.remainingOperationsListeners.forEach((listener) => {
listener(0);
});
}
private async safelySync(
fn: () => Promise<void>,
relativePath: RelativePath
private async executeWhileHoldingFileLock(
relativePath: RelativePath,
syncType: SyncType,
syncSource: SyncSource,
fn: () => Promise<void>
): Promise<void> {
if (!this.database.getSettings().isSyncEnabled) {
Logger.getInstance().info(
@ -595,7 +588,16 @@ export class Syncer {
await waitForDocumentLock(relativePath);
try {
await this.fileSyncQueue.add(fn);
await fn();
} catch (e) {
this.history.addHistoryEntry({
status: SyncStatus.ERROR,
relativePath,
message: `Failed to ${syncSource.toLocaleLowerCase()} file ${e} when trying to ${syncType.toLocaleLowerCase()} it`,
type: syncType,
source: syncSource,
});
throw e;
} finally {
unlockDocument(relativePath);
}