Use websocket on the FE

This commit is contained in:
Andras Schmelczer 2025-03-25 22:26:08 +00:00
parent 48ca6e7f7e
commit 0320308f1a
No known key found for this signature in database
GPG key ID: FC8F2C3D3D1A718C
2 changed files with 81 additions and 66 deletions

View file

@ -152,7 +152,12 @@ export class SyncClient {
await this.syncer.scheduleSyncForOfflineChanges();
}
public stop(): void {
this.syncer.stop();
}
public async waitAndStop(): Promise<void> {
this.stop();
await this.syncer.waitUntilFinished();
}
@ -160,6 +165,7 @@ export class SyncClient {
/// and the local database but retain the settings.
/// The SyncClient can be used again after calling this method.
public async reset(): Promise<void> {
this.stop();
this.connectionStatus.startReset();
await this.syncer.reset();
this.history.reset();

View file

@ -5,7 +5,7 @@ import PQueue from "p-queue";
import { hash } from "../utils/hash";
import { v4 as uuidv4 } from "uuid";
import type { components } from "../services/types";
import type { Settings } from "../persistence/settings";
import type { Settings, SyncSettings } from "../persistence/settings";
import type { FileOperations } from "../file-operations/file-operations";
import { findMatchingFile } from "../utils/find-matching-file";
import type { UnrestrictedSyncer } from "./unrestricted-syncer";
@ -19,12 +19,15 @@ export class Syncer {
private readonly syncQueue: PQueue;
private runningScheduleSyncForOfflineChanges: Promise<void> | undefined;
private runningApplyRemoteChangesLocally: Promise<void> | undefined;
private refreshApplyRemoteChangesWebSocketInterval:
| NodeJS.Timeout
| undefined;
private applyRemoteChangesWebSocket: WebSocket | undefined;
public constructor(
private readonly logger: Logger,
private readonly database: Database,
settings: Settings,
private readonly settings: Settings,
private readonly syncService: SyncService,
private readonly operations: FileOperations,
private readonly internalSyncer: UnrestrictedSyncer
@ -33,11 +36,21 @@ export class Syncer {
concurrency: settings.getSettings().syncConcurrency
});
this.updateWebSocket(settings.getSettings());
settings.addOnSettingsChangeListener((newSettings, oldSettings) => {
if (newSettings.syncConcurrency === oldSettings.syncConcurrency) {
return;
if (
newSettings.remoteUri !== oldSettings.remoteUri ||
newSettings.vaultName !== oldSettings.vaultName ||
newSettings.token !== oldSettings.token ||
newSettings.isSyncEnabled !== oldSettings.isSyncEnabled
) {
this.updateWebSocket(newSettings);
}
if (newSettings.syncConcurrency !== oldSettings.syncConcurrency) {
this.syncQueue.concurrency = newSettings.syncConcurrency;
}
this.syncQueue.concurrency = newSettings.syncConcurrency;
});
this.syncQueue.on("active", () => {
@ -45,6 +58,20 @@ export class Syncer {
listener(this.syncQueue.size);
});
});
this.setWebSocketRefreshInterval();
}
public async reset(): Promise<void> {
await this.waitUntilFinished();
this.internalSyncer.reset();
this.setWebSocketRefreshInterval();
this.updateWebSocket(this.settings.getSettings());
}
public stop(): void {
clearInterval(this.refreshApplyRemoteChangesWebSocketInterval);
this.applyRemoteChangesWebSocket?.close();
}
public addRemainingOperationsListener(
@ -206,78 +233,60 @@ export class Syncer {
}
}
public async applyRemoteChangesLocally(): Promise<void> {
if (this.runningApplyRemoteChangesLocally !== undefined) {
this.logger.debug(
"Applying remote changes locally is already in progress"
);
return this.runningApplyRemoteChangesLocally;
}
try {
this.runningApplyRemoteChangesLocally =
this.internalApplyRemoteChangesLocally();
await this.runningApplyRemoteChangesLocally;
this.logger.info("All remote changes have been applied locally");
} catch (e) {
if (e instanceof SyncResetError) {
this.logger.info(
"Failed to apply remote changes locally due to a reset"
);
return;
}
this.logger.error(`Failed to apply remote changes locally: ${e}`);
throw e;
} finally {
this.runningApplyRemoteChangesLocally = undefined;
}
}
public async reset(): Promise<void> {
await this.waitUntilFinished();
this.internalSyncer.reset();
}
public async waitUntilFinished(): Promise<void> {
await Promise.allSettled([
this.runningScheduleSyncForOfflineChanges,
this.runningApplyRemoteChangesLocally
]);
await this.runningScheduleSyncForOfflineChanges;
return this.syncQueue.onEmpty();
}
private async internalApplyRemoteChangesLocally(): Promise<void> {
const remote = await this.syncQueue.add(async () =>
this.syncService.getAll(this.database.getLastSeenUpdateId())
);
private updateWebSocket(settings: SyncSettings): void {
this.applyRemoteChangesWebSocket?.close();
if (!remote) {
throw new Error("Failed to fetch remote changes");
}
if (remote.latestDocuments.length === 0) {
this.logger.debug("No remote changes to apply");
if (!settings.isSyncEnabled) {
this.applyRemoteChangesWebSocket = undefined;
return;
}
this.logger.info("Applying remote changes locally");
const wsUri = new URL(settings.remoteUri);
wsUri.protocol = wsUri.protocol === "https" ? "wss" : "ws";
wsUri.pathname = `/vaults/${settings.vaultName}/ws`;
this.applyRemoteChangesWebSocket = new WebSocket(wsUri);
await Promise.all(
remote.latestDocuments.map(this.syncRemotelyUpdatedFile.bind(this))
);
this.applyRemoteChangesWebSocket.onmessage = (event): void =>
void this.syncRemotelyUpdatedFile(event.data).catch(
(e: unknown) => {
this.logger.error(
`Failed to sync remotely updated file: ${e}`
);
}
);
const lastSeenUpdateId = this.database.getLastSeenUpdateId();
if (
lastSeenUpdateId === undefined ||
lastSeenUpdateId < remote.lastUpdateId
) {
this.database.setLastSeenUpdateId(remote.lastUpdateId);
}
this.applyRemoteChangesWebSocket.onerror = (event): void => {
console.error(event);
this.logger.error(`WebSocket error`);
};
// The JS WebSocket API doesn't support setting headers, so we have to send the token as a message
this.applyRemoteChangesWebSocket.onopen = (): void =>
this.applyRemoteChangesWebSocket?.send(settings.token);
}
private async syncRemotelyUpdatedFile(
remoteVersion: components["schemas"]["DocumentVersionWithoutContent"]
): Promise<void> {
private setWebSocketRefreshInterval(): void {
this.refreshApplyRemoteChangesWebSocketInterval = setInterval(() => {
if (
this.applyRemoteChangesWebSocket?.readyState === WebSocket.OPEN
) {
return;
}
this.updateWebSocket(this.settings.getSettings());
}, 5000);
}
private async syncRemotelyUpdatedFile(message: string): Promise<void> {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
const remoteVersion = JSON.parse(
message
) as components["schemas"]["DocumentVersionWithoutContent"];
let document = this.database.getDocumentByDocumentId(
remoteVersion.documentId
);