Refactor and improve Syncer API

This commit is contained in:
Andras Schmelczer 2025-02-22 13:03:11 +00:00
parent b0192aae23
commit 8b07507090
No known key found for this signature in database
GPG key ID: FC8F2C3D3D1A718C
2 changed files with 165 additions and 165 deletions

View file

@ -1,64 +0,0 @@
import type { Database } from "../persistence/database";
import type { SyncService } from "src/services/sync-service";
import { Logger } from "src/tracing/logger";
import type { Syncer } from "./syncer";
import type { Settings } from "src/persistence/settings";
let isRunning = false;
export async function applyRemoteChangesLocally({
settings,
database,
syncService,
syncer
}: {
settings: Settings;
database: Database;
syncService: SyncService;
syncer: Syncer;
}): Promise<void> {
if (!settings.getSettings().isSyncEnabled) {
Logger.getInstance().debug(
`Syncing is disabled, not fetching remote changes`
);
return;
} else if (isRunning) {
Logger.getInstance().debug(
"Applying remote changes locally is already in progress, skipping invocation"
);
return;
}
isRunning = true;
try {
const remote = await syncService.getAll(database.getLastSeenUpdateId());
if (remote.latestDocuments.length === 0) {
Logger.getInstance().debug("No remote changes to apply");
return;
}
Logger.getInstance().info("Applying remote changes locally");
await Promise.all(
remote.latestDocuments.map(async (remoteDocument) =>
syncer.syncRemotelyUpdatedFile(remoteDocument)
)
);
const lastSeenUpdateId = database.getLastSeenUpdateId();
if (
lastSeenUpdateId === undefined ||
remote.lastUpdateId > lastSeenUpdateId
) {
await database.setLastSeenUpdateId(remote.lastUpdateId);
}
} catch (e) {
Logger.getInstance().error(
`Failed to apply remote changes locally: ${e}`
);
} finally {
isRunning = false;
}
}

View file

@ -4,7 +4,6 @@ import type {
RelativePath
} from "../persistence/database";
import type { FileOperations } from "src/file-operations";
import type { SyncService } from "src/services/sync-service";
import { Logger } from "src/tracing/logger";
import type { SyncHistory } from "src/tracing/sync-history";
@ -15,6 +14,7 @@ import { EMPTY_HASH, hash } from "src/utils/hash";
import type { components } from "src/services/types";
import { deserialize } from "src/utils/deserialize";
import type { Settings } from "src/persistence/settings";
import { FileOperations } from "src/file-operations/file-operations";
export class Syncer {
private readonly remainingOperationsListeners: ((
@ -23,7 +23,10 @@ export class Syncer {
private readonly syncQueue: PQueue;
private isRunningOfflineSync = false;
private runningScheduleSyncForOfflineChanges: Promise<void> | undefined =
undefined;
private runningApplyRemoteChangesLocally: Promise<void> | undefined =
undefined;
public constructor(
private readonly database: Database,
@ -78,7 +81,7 @@ export class Syncer {
);
}
public async syncRemotelyUpdatedFile(
private async syncRemotelyUpdatedFile(
remoteVersion: components["schemas"]["DocumentVersionWithoutContent"]
): Promise<void> {
await this.syncQueue.add(async () =>
@ -87,13 +90,6 @@ export class Syncer {
}
public async scheduleSyncForOfflineChanges(): Promise<void> {
if (this.isRunningOfflineSync) {
Logger.getInstance().warn(
"Uploading local changes is already in progress, skipping"
);
return;
}
if (!this.settings.getSettings().isSyncEnabled) {
Logger.getInstance().debug(
`Syncing is disabled, not uploading local changes`
@ -101,98 +97,17 @@ export class Syncer {
return;
}
this.isRunningOfflineSync = true;
if (this.runningScheduleSyncForOfflineChanges != null) {
Logger.getInstance().debug(
"Uploading local changes is already in progress"
);
return this.runningScheduleSyncForOfflineChanges;
}
try {
const allLocalFiles = await this.operations.listAllFiles();
let locallyDeletedFiles = [
...this.database.getDocuments().entries()
].filter(([path, _]) => !allLocalFiles.includes(path));
await Promise.all(
allLocalFiles.map(async (relativePath) =>
this.syncQueue.add(async () => {
const metadata =
this.database.getDocument(relativePath);
// If there's no metadata, it must be a new file
if (!metadata) {
// Perhaps the file has been moved. Let's check by looking at the deleted files
const contentBytes =
await this.operations.read(relativePath);
const contentHash = hash(contentBytes);
const originalFile =
await this.findMatchingFileBasedOnHash(
contentHash,
locallyDeletedFiles
);
if (originalFile !== undefined) {
// `originalFile` hasn't been deleted but it got moved instead
locallyDeletedFiles =
locallyDeletedFiles.filter(
(item) => item != originalFile
);
Logger.getInstance().debug(
`Document ${relativePath} was not found under its current path in the database but was found under a different path ${originalFile[0]}, scheduling sync to move it`
);
return this.internalSyncLocallyUpdatedFile({
oldPath: originalFile[0],
relativePath: relativePath,
updateTime:
await this.operations.getModificationTime(
relativePath
),
optimisations: {
contentBytes,
contentHash
}
});
}
Logger.getInstance().debug(
`Document ${relativePath} not found in database, scheduling sync to create it`
);
return this.internalSyncLocallyCreatedFile(
relativePath,
await this.operations.getModificationTime(
relativePath
)
);
}
Logger.getInstance().debug(
`Document ${relativePath} has been updated locally, scheduling sync to update it`
);
return this.internalSyncLocallyUpdatedFile({
relativePath,
updateTime:
await this.operations.getModificationTime(
relativePath
)
});
})
)
);
await Promise.all(
locallyDeletedFiles.map(async ([relativePath, _]) => {
Logger.getInstance().debug(
`Document ${relativePath} has been deleted locally, scheduling sync to delete it`
);
if (await this.operations.exists(relativePath)) {
Logger.getInstance().debug(
`Document ${relativePath} actually exists locally, skipping`
);
return Promise.resolve();
}
return this.internalSyncLocallyDeletedFile(relativePath);
})
);
this.runningScheduleSyncForOfflineChanges =
this.internalScheduleSyncForOfflineChanges();
await this.runningScheduleSyncForOfflineChanges;
Logger.getInstance().info(
`All local changes have been applied remotely`
);
@ -200,8 +115,157 @@ export class Syncer {
Logger.getInstance().error(
`Not all local changes have been applied remotely: ${e}`
);
throw e;
} finally {
this.isRunningOfflineSync = false;
this.runningScheduleSyncForOfflineChanges = undefined;
}
}
private async internalScheduleSyncForOfflineChanges(): Promise<void> {
const allLocalFiles = await this.operations.listAllFiles();
let locallyDeletedFiles = [
...this.database.getDocuments().entries()
].filter(([path, _]) => !allLocalFiles.includes(path));
await Promise.all(
allLocalFiles.map(async (relativePath) =>
this.syncQueue.add(async () => {
const metadata = this.database.getDocument(relativePath);
// If there's no metadata, it must be a new file
if (!metadata) {
// Perhaps the file has been moved. Let's check by looking at the deleted files
const contentBytes =
await this.operations.read(relativePath);
const contentHash = hash(contentBytes);
const originalFile =
await this.findMatchingFileBasedOnHash(
contentHash,
locallyDeletedFiles
);
if (originalFile !== undefined) {
// `originalFile` hasn't been deleted but it got moved instead
locallyDeletedFiles = locallyDeletedFiles.filter(
(item) => item != originalFile
);
Logger.getInstance().debug(
`Document ${relativePath} was not found under its current path in the database but was found under a different path ${originalFile[0]}, scheduling sync to move it`
);
return this.internalSyncLocallyUpdatedFile({
oldPath: originalFile[0],
relativePath: relativePath,
updateTime:
await this.operations.getModificationTime(
relativePath
),
optimisations: {
contentBytes,
contentHash
}
});
}
Logger.getInstance().debug(
`Document ${relativePath} not found in database, scheduling sync to create it`
);
return this.internalSyncLocallyCreatedFile(
relativePath,
await this.operations.getModificationTime(
relativePath
)
);
}
Logger.getInstance().debug(
`Document ${relativePath} has been updated locally, scheduling sync to update it`
);
return this.internalSyncLocallyUpdatedFile({
relativePath,
updateTime:
await this.operations.getModificationTime(
relativePath
)
});
})
)
);
await Promise.all(
locallyDeletedFiles.map(async ([relativePath, _]) => {
Logger.getInstance().debug(
`Document ${relativePath} has been deleted locally, scheduling sync to delete it`
);
if (await this.operations.exists(relativePath)) {
Logger.getInstance().debug(
`Document ${relativePath} actually exists locally, skipping`
);
return Promise.resolve();
}
return this.internalSyncLocallyDeletedFile(relativePath);
})
);
}
public async applyRemoteChangesLocally(): Promise<void> {
if (!this.settings.getSettings().isSyncEnabled) {
Logger.getInstance().debug(
`Syncing is disabled, not fetching remote changes`
);
return;
}
if (this.runningApplyRemoteChangesLocally != null) {
Logger.getInstance().debug(
"Applying remote changes locally is already in progress"
);
return this.runningApplyRemoteChangesLocally;
}
try {
this.runningApplyRemoteChangesLocally =
this.internalApplyRemoteChangesLocally();
await this.runningApplyRemoteChangesLocally;
Logger.getInstance().info(
"All remote changes have been applied locally"
);
} catch (e) {
Logger.getInstance().error(
`Failed to apply remote changes locally: ${e}`
);
throw e;
} finally {
this.runningApplyRemoteChangesLocally = undefined;
}
}
private async internalApplyRemoteChangesLocally(): Promise<void> {
const remote = await this.syncService.getAll(
this.database.getLastSeenUpdateId()
);
if (remote.latestDocuments.length === 0) {
Logger.getInstance().debug("No remote changes to apply");
return;
}
Logger.getInstance().info("Applying remote changes locally");
await Promise.all(
remote.latestDocuments.map(async (remoteDocument) =>
this.syncRemotelyUpdatedFile(remoteDocument)
)
);
const lastSeenUpdateId = this.database.getLastSeenUpdateId();
if (
lastSeenUpdateId === undefined ||
remote.lastUpdateId > lastSeenUpdateId
) {
await this.database.setLastSeenUpdateId(remote.lastUpdateId);
}
}