Fix reset logic for WS
This commit is contained in:
parent
9f1f4beae4
commit
cb2a1c0df1
1 changed files with 95 additions and 82 deletions
|
|
@ -1,31 +1,38 @@
|
|||
import type { Database } from "../persistence/database";
|
||||
import type { Logger } from "../tracing/logger";
|
||||
import type { Settings, SyncSettings } from "../persistence/settings";
|
||||
import type { Settings } from "../persistence/settings";
|
||||
import type { WebSocketServerMessage } from "./types/WebSocketServerMessage";
|
||||
import type { Syncer } from "../sync-operations/syncer";
|
||||
import type { WebSocketClientMessage } from "./types/WebSocketClientMessage";
|
||||
import type { CursorPositionFromClient } from "./types/CursorPositionFromClient";
|
||||
import type { ClientCursors } from "./types/ClientCursors";
|
||||
import { createPromise } from "../utils/create-promise";
|
||||
import { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate";
|
||||
|
||||
export class WebSocketManager {
|
||||
private readonly webSocketStatusChangeListeners: (() => unknown)[] = [];
|
||||
private readonly webSocketStatusChangeListeners: ((
|
||||
isConnected: boolean
|
||||
) => unknown)[] = [];
|
||||
|
||||
private readonly remoteVaultUpdateListeners: ((
|
||||
update: WebSocketVaultUpdate
|
||||
) => Promise<void>)[] = [];
|
||||
|
||||
private readonly remoteCursorsUpdateListeners: ((
|
||||
cursors: ClientCursors[]
|
||||
) => unknown)[] = [];
|
||||
) => Promise<void>)[] = [];
|
||||
|
||||
private webSocket: WebSocket | undefined;
|
||||
|
||||
private isStopped = true;
|
||||
private _isFirstSyncCompleted = false;
|
||||
private resolveDisconnectingPromise: null | (() => unknown) = null;
|
||||
private reconnectTimeoutId: ReturnType<typeof setTimeout> | undefined;
|
||||
|
||||
private readonly outstandingPromises: Array<Promise<unknown>> = [];
|
||||
private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket;
|
||||
|
||||
public constructor(
|
||||
private readonly deviceId: string,
|
||||
private readonly logger: Logger,
|
||||
private readonly database: Database,
|
||||
private readonly settings: Settings,
|
||||
private readonly syncer: Syncer,
|
||||
webSocketImplementation?: typeof globalThis.WebSocket
|
||||
) {
|
||||
if (webSocketImplementation) {
|
||||
|
|
@ -41,16 +48,6 @@ export class WebSocketManager {
|
|||
this.webSocketFactoryImplementation = WebSocket;
|
||||
}
|
||||
}
|
||||
|
||||
settings.addOnSettingsChangeListener((newSettings, oldSettings) => {
|
||||
if (
|
||||
newSettings.remoteUri !== oldSettings.remoteUri ||
|
||||
newSettings.vaultName !== oldSettings.vaultName ||
|
||||
newSettings.token !== oldSettings.token
|
||||
) {
|
||||
this.initializeWebSocket(newSettings);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public get isWebSocketConnected(): boolean {
|
||||
|
|
@ -60,42 +57,66 @@ export class WebSocketManager {
|
|||
);
|
||||
}
|
||||
|
||||
public get isFirstSyncCompleted(): boolean {
|
||||
return this._isFirstSyncCompleted;
|
||||
}
|
||||
|
||||
public addWebSocketStatusChangeListener(listener: () => unknown): void {
|
||||
public addWebSocketStatusChangeListener(
|
||||
listener: (isConnected: boolean) => unknown
|
||||
): void {
|
||||
this.webSocketStatusChangeListeners.push(listener);
|
||||
}
|
||||
|
||||
public addRemoteCursorsUpdateListener(
|
||||
listener: (cursors: ClientCursors[]) => unknown
|
||||
listener: (cursors: ClientCursors[]) => Promise<void>
|
||||
): void {
|
||||
this.remoteCursorsUpdateListeners.push(listener);
|
||||
}
|
||||
|
||||
public removeRemoteCursorsUpdateListener(
|
||||
listener: (cursors: ClientCursors[]) => unknown
|
||||
public addRemoteVaultUpdateListener(
|
||||
listener: (update: WebSocketVaultUpdate) => Promise<void>
|
||||
): void {
|
||||
const index = this.remoteCursorsUpdateListeners.indexOf(listener);
|
||||
if (index !== -1) {
|
||||
this.remoteCursorsUpdateListeners.splice(index, 1);
|
||||
}
|
||||
this.remoteVaultUpdateListeners.push(listener);
|
||||
}
|
||||
|
||||
public start(): void {
|
||||
this.isStopped = false;
|
||||
this._isFirstSyncCompleted = false;
|
||||
this.initializeWebSocket(this.settings.getSettings());
|
||||
this.initializeWebSocket();
|
||||
}
|
||||
|
||||
public stop(): void {
|
||||
public async stop(): Promise<void> {
|
||||
const [promise, resolve] = createPromise<void>();
|
||||
this.resolveDisconnectingPromise = resolve;
|
||||
|
||||
this.isStopped = true;
|
||||
|
||||
// Clear pending reconnect timeout
|
||||
if (this.reconnectTimeoutId !== undefined) {
|
||||
clearTimeout(this.reconnectTimeoutId);
|
||||
this.reconnectTimeoutId = undefined;
|
||||
}
|
||||
|
||||
this.webSocket?.close(1000, "WebSocketManager has been stopped");
|
||||
|
||||
while (this.isWebSocketConnected) {
|
||||
await promise;
|
||||
}
|
||||
|
||||
await Promise.allSettled(this.outstandingPromises).then(() => {});
|
||||
}
|
||||
|
||||
public sendHandshakeMessage(
|
||||
message: WebSocketClientMessage & { type: "handshake" }
|
||||
): void {
|
||||
const webSocket = this.webSocket;
|
||||
if (!webSocket) {
|
||||
throw new Error(
|
||||
"WebSocket is not connected, cannot send handshake message"
|
||||
);
|
||||
}
|
||||
|
||||
webSocket.send(JSON.stringify(message));
|
||||
}
|
||||
|
||||
public updateLocalCursors(cursorPositions: CursorPositionFromClient): void {
|
||||
if (!this.isWebSocketConnected) {
|
||||
// A missing cursor update is fine, we can just skip it if needed
|
||||
this.logger.warn(
|
||||
"WebSocket is not connected, cannot send cursor positions"
|
||||
);
|
||||
|
|
@ -105,43 +126,41 @@ export class WebSocketManager {
|
|||
type: "cursorPositions",
|
||||
...cursorPositions
|
||||
};
|
||||
this.webSocket?.send(JSON.stringify(message));
|
||||
const webSocket = this.webSocket;
|
||||
if (!webSocket) {
|
||||
this.logger.warn(
|
||||
"WebSocket is not connected, cannot send cursor positions"
|
||||
);
|
||||
return;
|
||||
}
|
||||
webSocket.send(JSON.stringify(message));
|
||||
this.logger.debug(
|
||||
`Sent cursor positions: ${JSON.stringify(cursorPositions)}`
|
||||
);
|
||||
}
|
||||
|
||||
private initializeWebSocket(settings: SyncSettings): void {
|
||||
if (this.isStopped) {
|
||||
return;
|
||||
}
|
||||
|
||||
private initializeWebSocket(): void {
|
||||
try {
|
||||
this.webSocket?.close();
|
||||
} catch (e) {
|
||||
this.logger.warn(`Failed to close WebSocket: ${e}`);
|
||||
this.logger.error(
|
||||
`Failed to close previous WebSocket connection: ${e}`
|
||||
);
|
||||
}
|
||||
|
||||
const wsUri = new URL(settings.remoteUri);
|
||||
const wsUri = new URL(this.settings.getSettings().remoteUri);
|
||||
wsUri.protocol = wsUri.protocol === "https" ? "wss" : "ws";
|
||||
wsUri.pathname = `/vaults/${settings.vaultName}/ws`;
|
||||
wsUri.pathname = `/vaults/${this.settings.getSettings().vaultName}/ws`;
|
||||
|
||||
this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`);
|
||||
|
||||
this.webSocket = new this.webSocketFactoryImplementation(wsUri);
|
||||
|
||||
// The JS WebSocket API doesn't support setting headers, so we have to send the token as a message
|
||||
this.webSocket.onopen = (): void => {
|
||||
this.logger.info("WebSocket connection opened");
|
||||
this.webSocketStatusChangeListeners.forEach((l) => l());
|
||||
|
||||
const message: WebSocketClientMessage = {
|
||||
type: "handshake",
|
||||
deviceId: this.deviceId,
|
||||
token: settings.token,
|
||||
lastSeenVaultUpdateId: this.database.getLastSeenUpdateId()
|
||||
};
|
||||
this.webSocket?.send(JSON.stringify(message));
|
||||
this.webSocketStatusChangeListeners.forEach((listener) =>
|
||||
listener(true)
|
||||
);
|
||||
};
|
||||
|
||||
this.webSocket.onmessage = async (event): Promise<void> => {
|
||||
|
|
@ -151,14 +170,20 @@ export class WebSocketManager {
|
|||
};
|
||||
|
||||
this.webSocket.onclose = (event): void => {
|
||||
this.logger.warn(
|
||||
this.logger.error(
|
||||
`WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})`
|
||||
);
|
||||
this.webSocketStatusChangeListeners.forEach((l) => l());
|
||||
this.webSocketStatusChangeListeners.forEach((listener) =>
|
||||
listener(false)
|
||||
);
|
||||
|
||||
if (!this.isStopped) {
|
||||
setTimeout(() => {
|
||||
this.initializeWebSocket(this.settings.getSettings());
|
||||
if (this.isStopped) {
|
||||
this.resolveDisconnectingPromise?.();
|
||||
this.resolveDisconnectingPromise = null;
|
||||
} else {
|
||||
this.reconnectTimeoutId = setTimeout(() => {
|
||||
this.reconnectTimeoutId = undefined;
|
||||
this.initializeWebSocket();
|
||||
}, this.settings.getSettings().webSocketRetryIntervalMs);
|
||||
}
|
||||
};
|
||||
|
|
@ -168,37 +193,25 @@ export class WebSocketManager {
|
|||
message: WebSocketServerMessage
|
||||
): Promise<void> {
|
||||
if (message.type === "vaultUpdate") {
|
||||
try {
|
||||
await Promise.all(
|
||||
message.documents.map(async (document) =>
|
||||
this.syncer.syncRemotelyUpdatedFile(document)
|
||||
)
|
||||
);
|
||||
|
||||
if (message.isInitialSync && message.documents.length > 0) {
|
||||
this.database.setLastSeenUpdateId(
|
||||
message.documents
|
||||
.map((document) => document.vaultUpdateId)
|
||||
.reduce((a, b) => Math.max(a, b))
|
||||
);
|
||||
}
|
||||
|
||||
this._isFirstSyncCompleted = true;
|
||||
} catch (e) {
|
||||
this.logger.error(`Failed to sync remotely updated file: ${e}`);
|
||||
}
|
||||
this.outstandingPromises.push(
|
||||
...this.remoteVaultUpdateListeners.map((listener) =>
|
||||
listener(message)
|
||||
)
|
||||
);
|
||||
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
|
||||
} else if (message.type === "cursorPositions") {
|
||||
this.logger.debug(
|
||||
`Received cursor positions for ${JSON.stringify(message.clients)}`
|
||||
);
|
||||
this.remoteCursorsUpdateListeners.forEach((listener) => {
|
||||
listener(
|
||||
message.clients.filter(
|
||||
(client) => client.deviceId !== this.deviceId
|
||||
this.outstandingPromises.push(
|
||||
...this.remoteCursorsUpdateListeners.map((listener) =>
|
||||
listener(
|
||||
message.clients.filter(
|
||||
(client) => client.deviceId !== this.deviceId
|
||||
)
|
||||
)
|
||||
);
|
||||
});
|
||||
)
|
||||
);
|
||||
} else {
|
||||
this.logger.warn(
|
||||
`Received unknown message type: ${JSON.stringify(message)}`
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue