Add event handler class
This commit is contained in:
parent
1ed22c72d7
commit
ad3191957a
14 changed files with 2428 additions and 2309 deletions
|
|
@ -1,113 +1,94 @@
|
|||
import type { Logger } from "../tracing/logger";
|
||||
import { awaitAll } from "../utils/await-all";
|
||||
import { Lock } from "../utils/data-structures/locks";
|
||||
import { removeFromArray } from "../utils/remove-from-array";
|
||||
import { EventListeners } from "../utils/data-structures/event-listeners";
|
||||
|
||||
export interface SyncSettings {
|
||||
remoteUri: string;
|
||||
token: string;
|
||||
vaultName: string;
|
||||
syncConcurrency: number;
|
||||
isSyncEnabled: boolean;
|
||||
maxFileSizeMB: number;
|
||||
ignorePatterns: string[];
|
||||
webSocketRetryIntervalMs: number;
|
||||
diffCacheSizeMB: number;
|
||||
enableTelemetry: boolean;
|
||||
networkRetryIntervalMs: number;
|
||||
minimumSaveIntervalMs: number;
|
||||
remoteUri: string;
|
||||
token: string;
|
||||
vaultName: string;
|
||||
syncConcurrency: number;
|
||||
isSyncEnabled: boolean;
|
||||
maxFileSizeMB: number;
|
||||
ignorePatterns: string[];
|
||||
webSocketRetryIntervalMs: number;
|
||||
diffCacheSizeMB: number;
|
||||
enableTelemetry: boolean;
|
||||
networkRetryIntervalMs: number;
|
||||
minimumSaveIntervalMs: number;
|
||||
}
|
||||
|
||||
export const DEFAULT_SETTINGS: SyncSettings = {
|
||||
remoteUri: "",
|
||||
token: "",
|
||||
vaultName: "default",
|
||||
syncConcurrency: 1,
|
||||
isSyncEnabled: false,
|
||||
maxFileSizeMB: 10,
|
||||
ignorePatterns: [],
|
||||
webSocketRetryIntervalMs: 3500,
|
||||
diffCacheSizeMB: 4,
|
||||
enableTelemetry: false,
|
||||
networkRetryIntervalMs: 1000,
|
||||
minimumSaveIntervalMs: 1000
|
||||
remoteUri: "",
|
||||
token: "",
|
||||
vaultName: "default",
|
||||
syncConcurrency: 1,
|
||||
isSyncEnabled: false,
|
||||
maxFileSizeMB: 10,
|
||||
ignorePatterns: [],
|
||||
webSocketRetryIntervalMs: 3500,
|
||||
diffCacheSizeMB: 4,
|
||||
enableTelemetry: false,
|
||||
networkRetryIntervalMs: 1000,
|
||||
minimumSaveIntervalMs: 1000
|
||||
};
|
||||
|
||||
export class Settings {
|
||||
private settings: SyncSettings;
|
||||
private readonly lock: Lock = new Lock();
|
||||
private settings: SyncSettings;
|
||||
private readonly lock: Lock = new Lock();
|
||||
|
||||
private readonly onSettingsChangeHandlers: ((
|
||||
newSettings: SyncSettings,
|
||||
oldSettings: SyncSettings
|
||||
) => unknown)[] = [];
|
||||
public readonly onSettingsChanged = new EventListeners<
|
||||
(newSettings: SyncSettings, oldSettings: SyncSettings) => unknown
|
||||
>();
|
||||
|
||||
public constructor(
|
||||
private readonly logger: Logger,
|
||||
initialState: Partial<SyncSettings> | undefined,
|
||||
private readonly saveData: (data: SyncSettings) => Promise<void>
|
||||
) {
|
||||
this.settings = {
|
||||
...DEFAULT_SETTINGS,
|
||||
...(initialState ?? {})
|
||||
};
|
||||
public constructor(
|
||||
private readonly logger: Logger,
|
||||
initialState: Partial<SyncSettings> | undefined,
|
||||
private readonly saveData: (data: SyncSettings) => Promise<void>
|
||||
) {
|
||||
this.settings = {
|
||||
...DEFAULT_SETTINGS,
|
||||
...(initialState ?? {})
|
||||
};
|
||||
|
||||
this.logger.debug(
|
||||
`Loaded settings: ${JSON.stringify(this.settings, null, 2)}`
|
||||
);
|
||||
}
|
||||
this.logger.debug(
|
||||
`Loaded settings: ${JSON.stringify(this.settings, null, 2)}`
|
||||
);
|
||||
}
|
||||
|
||||
public getSettings(): SyncSettings {
|
||||
return this.settings;
|
||||
}
|
||||
public getSettings(): SyncSettings {
|
||||
return this.settings;
|
||||
}
|
||||
|
||||
public addOnSettingsChangeListener(
|
||||
listener: (settings: SyncSettings, oldSettings: SyncSettings) => unknown
|
||||
): void {
|
||||
this.onSettingsChangeHandlers.push(listener);
|
||||
}
|
||||
public async setSetting<T extends keyof SyncSettings>(
|
||||
key: T,
|
||||
value: SyncSettings[T]
|
||||
): Promise<void> {
|
||||
await this.setSettings({
|
||||
[key]: value
|
||||
});
|
||||
}
|
||||
|
||||
public removeOnSettingsChangeListener(
|
||||
listener: (settings: SyncSettings, oldSettings: SyncSettings) => unknown
|
||||
): void {
|
||||
removeFromArray(this.onSettingsChangeHandlers, listener);
|
||||
}
|
||||
public async setSettings(value: Partial<SyncSettings>): Promise<void> {
|
||||
await this.lock.withLock(async () => {
|
||||
this.logger.debug(
|
||||
`Updating settings with: ${JSON.stringify(value)}`
|
||||
);
|
||||
const oldSettings = this.settings;
|
||||
this.settings = {
|
||||
...this.settings,
|
||||
...value
|
||||
};
|
||||
|
||||
public async setSetting<T extends keyof SyncSettings>(
|
||||
key: T,
|
||||
value: SyncSettings[T]
|
||||
): Promise<void> {
|
||||
await this.setSettings({
|
||||
[key]: value
|
||||
});
|
||||
}
|
||||
await this.onSettingsChanged.triggerAsync(
|
||||
this.settings,
|
||||
oldSettings
|
||||
);
|
||||
|
||||
public async setSettings(value: Partial<SyncSettings>): Promise<void> {
|
||||
await this.lock.withLock(async () => {
|
||||
this.logger.debug(
|
||||
`Updating settings with: ${JSON.stringify(value)}`
|
||||
);
|
||||
const oldSettings = this.settings;
|
||||
this.settings = {
|
||||
...this.settings,
|
||||
...value
|
||||
};
|
||||
await this.save();
|
||||
});
|
||||
}
|
||||
|
||||
await awaitAll(
|
||||
this.onSettingsChangeHandlers
|
||||
.map((handler) => {
|
||||
return handler(this.settings, oldSettings);
|
||||
})
|
||||
.filter((result): result is Promise<unknown> => {
|
||||
return result instanceof Promise;
|
||||
})
|
||||
);
|
||||
|
||||
await this.save();
|
||||
});
|
||||
}
|
||||
|
||||
private async save(): Promise<void> {
|
||||
await this.saveData(this.settings);
|
||||
}
|
||||
private async save(): Promise<void> {
|
||||
await this.saveData(this.settings);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -122,7 +122,7 @@ describe("WebSocketManager", () => {
|
|||
MockWebSocket as unknown as typeof WebSocket
|
||||
);
|
||||
|
||||
manager.addRemoteVaultUpdateListener(async () => {
|
||||
manager.onRemoteVaultUpdateReceived.add(async () => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
});
|
||||
manager.start();
|
||||
|
|
@ -152,7 +152,7 @@ describe("WebSocketManager", () => {
|
|||
MockWebSocket as unknown as typeof WebSocket
|
||||
);
|
||||
|
||||
manager.addRemoteCursorsUpdateListener(async () => {
|
||||
manager.onRemoteCursorsUpdateReceived.add(async () => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
});
|
||||
manager.start();
|
||||
|
|
@ -227,7 +227,7 @@ describe("WebSocketManager", () => {
|
|||
);
|
||||
|
||||
let statusChangeCount = 0;
|
||||
manager.addWebSocketStatusChangeListener(() => {
|
||||
manager.onWebSocketStatusChanged.add(() => {
|
||||
statusChangeCount++;
|
||||
});
|
||||
|
||||
|
|
@ -269,7 +269,7 @@ describe("WebSocketManager", () => {
|
|||
resolveListener = resolve;
|
||||
});
|
||||
|
||||
manager.addRemoteVaultUpdateListener(async () => {
|
||||
manager.onRemoteVaultUpdateReceived.add(async () => {
|
||||
await listenerPromise;
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -6,295 +6,260 @@ import type { CursorPositionFromClient } from "./types/CursorPositionFromClient"
|
|||
import type { ClientCursors } from "./types/ClientCursors";
|
||||
import { createPromise } from "../utils/create-promise";
|
||||
import type { WebSocketVaultUpdate } from "./types/WebSocketVaultUpdate";
|
||||
import { awaitAll } from "../utils/await-all";
|
||||
import { WEBSOCKET_DISCONNECT_TIMEOUT_IN_S } from "../consts";
|
||||
import { removeFromArray } from "../utils/remove-from-array";
|
||||
import { EventListeners } from "../utils/data-structures/event-listeners";
|
||||
import { awaitAll } from "../utils/await-all";
|
||||
|
||||
export class WebSocketManager {
|
||||
private readonly webSocketStatusChangeListeners: ((
|
||||
isConnected: boolean
|
||||
) => unknown)[] = [];
|
||||
public readonly onWebSocketStatusChanged = new EventListeners<
|
||||
(isConnected: boolean) => unknown
|
||||
>();
|
||||
|
||||
private readonly remoteVaultUpdateListeners: ((
|
||||
update: WebSocketVaultUpdate
|
||||
) => Promise<void>)[] = [];
|
||||
public readonly onRemoteVaultUpdateReceived = new EventListeners<
|
||||
(update: WebSocketVaultUpdate) => Promise<void>
|
||||
>();
|
||||
|
||||
private readonly remoteCursorsUpdateListeners: ((
|
||||
cursors: ClientCursors[]
|
||||
) => Promise<void>)[] = [];
|
||||
public readonly onRemoteCursorsUpdateReceived = new EventListeners<
|
||||
(cursors: ClientCursors[]) => Promise<void>
|
||||
>();
|
||||
|
||||
private isStopped = true;
|
||||
private resolveDisconnectingPromise: null | (() => unknown) = null;
|
||||
private reconnectTimeoutId: ReturnType<typeof setTimeout> | undefined;
|
||||
private isStopped = true;
|
||||
private resolveDisconnectingPromise: null | (() => unknown) = null;
|
||||
private reconnectTimeoutId: ReturnType<typeof setTimeout> | undefined;
|
||||
|
||||
private readonly outstandingPromises: Promise<unknown>[] = [];
|
||||
private readonly outstandingPromises: Promise<unknown>[] = [];
|
||||
|
||||
private webSocket: WebSocket | undefined;
|
||||
private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket;
|
||||
private webSocket: WebSocket | undefined;
|
||||
private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket;
|
||||
|
||||
public constructor(
|
||||
private readonly deviceId: string,
|
||||
private readonly logger: Logger,
|
||||
private readonly settings: Settings,
|
||||
webSocketImplementation?: typeof globalThis.WebSocket
|
||||
) {
|
||||
if (webSocketImplementation) {
|
||||
this.webSocketFactoryImplementation = webSocketImplementation;
|
||||
} else {
|
||||
if (
|
||||
typeof globalThis !== "undefined" &&
|
||||
typeof globalThis.WebSocket === "undefined"
|
||||
) {
|
||||
// eslint-disable-next-line
|
||||
this.webSocketFactoryImplementation = require("ws"); // polyfill for WebSocket in Node.js
|
||||
} else {
|
||||
this.webSocketFactoryImplementation = WebSocket;
|
||||
}
|
||||
}
|
||||
}
|
||||
public constructor(
|
||||
private readonly deviceId: string,
|
||||
private readonly logger: Logger,
|
||||
private readonly settings: Settings,
|
||||
webSocketImplementation?: typeof globalThis.WebSocket
|
||||
) {
|
||||
if (webSocketImplementation) {
|
||||
this.webSocketFactoryImplementation = webSocketImplementation;
|
||||
} else {
|
||||
if (
|
||||
typeof globalThis !== "undefined" &&
|
||||
typeof globalThis.WebSocket === "undefined"
|
||||
) {
|
||||
// eslint-disable-next-line
|
||||
this.webSocketFactoryImplementation = require("ws"); // polyfill for WebSocket in Node.js
|
||||
} else {
|
||||
this.webSocketFactoryImplementation = WebSocket;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public get isWebSocketConnected(): boolean {
|
||||
return (
|
||||
this.webSocket?.readyState ===
|
||||
this.webSocketFactoryImplementation.OPEN
|
||||
);
|
||||
}
|
||||
public get isWebSocketConnected(): boolean {
|
||||
return (
|
||||
this.webSocket?.readyState ===
|
||||
this.webSocketFactoryImplementation.OPEN
|
||||
);
|
||||
}
|
||||
|
||||
public addWebSocketStatusChangeListener(
|
||||
listener: (isConnected: boolean) => unknown
|
||||
): void {
|
||||
this.webSocketStatusChangeListeners.push(listener);
|
||||
}
|
||||
public start(): void {
|
||||
this.isStopped = false;
|
||||
this.initializeWebSocket();
|
||||
}
|
||||
|
||||
public addRemoteCursorsUpdateListener(
|
||||
listener: (cursors: ClientCursors[]) => Promise<void>
|
||||
): void {
|
||||
this.remoteCursorsUpdateListeners.push(listener);
|
||||
}
|
||||
public async stop(): Promise<void> {
|
||||
const [promise, resolve] = createPromise();
|
||||
this.resolveDisconnectingPromise = resolve;
|
||||
|
||||
public addRemoteVaultUpdateListener(
|
||||
listener: (update: WebSocketVaultUpdate) => Promise<void>
|
||||
): void {
|
||||
this.remoteVaultUpdateListeners.push(listener);
|
||||
}
|
||||
this.isStopped = true;
|
||||
|
||||
public start(): void {
|
||||
this.isStopped = false;
|
||||
this.initializeWebSocket();
|
||||
}
|
||||
if (this.reconnectTimeoutId !== undefined) {
|
||||
clearTimeout(this.reconnectTimeoutId);
|
||||
this.reconnectTimeoutId = undefined;
|
||||
}
|
||||
|
||||
public async stop(): Promise<void> {
|
||||
const [promise, resolve] = createPromise();
|
||||
this.resolveDisconnectingPromise = resolve;
|
||||
this.webSocket?.close(1000, "WebSocketManager has been stopped");
|
||||
|
||||
this.isStopped = true;
|
||||
// eslint-disable-next-line @typescript-eslint/init-declarations
|
||||
let timeoutId: ReturnType<typeof setTimeout> | undefined;
|
||||
const timeoutPromise = new Promise<void>((_, reject) => {
|
||||
timeoutId = setTimeout(() => {
|
||||
reject(
|
||||
new Error(
|
||||
`Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_S} seconds`
|
||||
)
|
||||
);
|
||||
}, WEBSOCKET_DISCONNECT_TIMEOUT_IN_S * 1000);
|
||||
});
|
||||
|
||||
if (this.reconnectTimeoutId !== undefined) {
|
||||
clearTimeout(this.reconnectTimeoutId);
|
||||
this.reconnectTimeoutId = undefined;
|
||||
}
|
||||
try {
|
||||
while (this.isWebSocketConnected) {
|
||||
await Promise.race([promise, timeoutPromise]);
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Error while waiting for WebSocket to close: ${String(error)}`
|
||||
);
|
||||
// Force cleanup even if close didn't work
|
||||
this.resolveDisconnectingPromise();
|
||||
this.resolveDisconnectingPromise = null;
|
||||
} finally {
|
||||
// Clear timeout to prevent unhandled rejection
|
||||
if (timeoutId !== undefined) {
|
||||
clearTimeout(timeoutId);
|
||||
}
|
||||
}
|
||||
|
||||
this.webSocket?.close(1000, "WebSocketManager has been stopped");
|
||||
await this.waitUntilFinished();
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/init-declarations
|
||||
let timeoutId: ReturnType<typeof setTimeout> | undefined;
|
||||
const timeoutPromise = new Promise<void>((_, reject) => {
|
||||
timeoutId = setTimeout(() => {
|
||||
reject(
|
||||
new Error(
|
||||
`Timeout waiting for WebSocket to close after ${WEBSOCKET_DISCONNECT_TIMEOUT_IN_S} seconds`
|
||||
)
|
||||
);
|
||||
}, WEBSOCKET_DISCONNECT_TIMEOUT_IN_S * 1000);
|
||||
});
|
||||
public async waitUntilFinished(): Promise<void> {
|
||||
await awaitAll(this.outstandingPromises);
|
||||
}
|
||||
|
||||
try {
|
||||
while (this.isWebSocketConnected) {
|
||||
await Promise.race([promise, timeoutPromise]);
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Error while waiting for WebSocket to close: ${String(error)}`
|
||||
);
|
||||
// Force cleanup even if close didn't work
|
||||
this.resolveDisconnectingPromise();
|
||||
this.resolveDisconnectingPromise = null;
|
||||
} finally {
|
||||
// Clear timeout to prevent unhandled rejection
|
||||
if (timeoutId !== undefined) {
|
||||
clearTimeout(timeoutId);
|
||||
}
|
||||
}
|
||||
public sendHandshakeMessage(
|
||||
message: WebSocketClientMessage & { type: "handshake" }
|
||||
): void {
|
||||
const { webSocket } = this;
|
||||
if (!webSocket) {
|
||||
throw new Error(
|
||||
"WebSocket is not connected, cannot send handshake message"
|
||||
);
|
||||
}
|
||||
|
||||
await this.waitUntilFinished();
|
||||
}
|
||||
try {
|
||||
webSocket.send(JSON.stringify(message));
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Failed to send handshake message: ${String(error)}`
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
public async waitUntilFinished(): Promise<void> {
|
||||
await awaitAll(this.outstandingPromises);
|
||||
}
|
||||
public updateLocalCursors(cursorPositions: CursorPositionFromClient): void {
|
||||
if (!this.isWebSocketConnected || !this.webSocket) {
|
||||
// A missing cursor update is fine, we can just skip it if needed
|
||||
this.logger.warn(
|
||||
"WebSocket is not connected, cannot send cursor positions"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
public sendHandshakeMessage(
|
||||
message: WebSocketClientMessage & { type: "handshake" }
|
||||
): void {
|
||||
const { webSocket } = this;
|
||||
if (!webSocket) {
|
||||
throw new Error(
|
||||
"WebSocket is not connected, cannot send handshake message"
|
||||
);
|
||||
}
|
||||
const message: WebSocketClientMessage = {
|
||||
type: "cursorPositions",
|
||||
...cursorPositions
|
||||
};
|
||||
|
||||
try {
|
||||
webSocket.send(JSON.stringify(message));
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Failed to send handshake message: ${String(error)}`
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
try {
|
||||
this.webSocket.send(JSON.stringify(message));
|
||||
this.logger.debug(
|
||||
`Sent cursor positions: ${JSON.stringify(cursorPositions)}`
|
||||
);
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
`Failed to send cursor positions: ${String(error)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public updateLocalCursors(cursorPositions: CursorPositionFromClient): void {
|
||||
if (!this.isWebSocketConnected || !this.webSocket) {
|
||||
// A missing cursor update is fine, we can just skip it if needed
|
||||
this.logger.warn(
|
||||
"WebSocket is not connected, cannot send cursor positions"
|
||||
);
|
||||
return;
|
||||
}
|
||||
private initializeWebSocket(): void {
|
||||
// Clean up old WebSocket handlers to prevent race conditions
|
||||
if (this.webSocket) {
|
||||
try {
|
||||
// Remove handlers to prevent them from firing after new connection
|
||||
this.webSocket.onopen = null;
|
||||
this.webSocket.onclose = null;
|
||||
this.webSocket.onmessage = null;
|
||||
this.webSocket.onerror = null;
|
||||
this.webSocket.close();
|
||||
} catch (e) {
|
||||
this.logger.error(
|
||||
`Failed to close previous WebSocket connection: ${e}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const message: WebSocketClientMessage = {
|
||||
type: "cursorPositions",
|
||||
...cursorPositions
|
||||
};
|
||||
const wsUri = new URL(this.settings.getSettings().remoteUri);
|
||||
wsUri.protocol = wsUri.protocol === "https" ? "wss" : "ws";
|
||||
wsUri.pathname = `/vaults/${this.settings.getSettings().vaultName}/ws`;
|
||||
|
||||
try {
|
||||
this.webSocket.send(JSON.stringify(message));
|
||||
this.logger.debug(
|
||||
`Sent cursor positions: ${JSON.stringify(cursorPositions)}`
|
||||
);
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
`Failed to send cursor positions: ${String(error)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`);
|
||||
|
||||
private initializeWebSocket(): void {
|
||||
// Clean up old WebSocket handlers to prevent race conditions
|
||||
if (this.webSocket) {
|
||||
try {
|
||||
// Remove handlers to prevent them from firing after new connection
|
||||
this.webSocket.onopen = null;
|
||||
this.webSocket.onclose = null;
|
||||
this.webSocket.onmessage = null;
|
||||
this.webSocket.onerror = null;
|
||||
this.webSocket.close();
|
||||
} catch (e) {
|
||||
this.logger.error(
|
||||
`Failed to close previous WebSocket connection: ${e}`
|
||||
);
|
||||
}
|
||||
}
|
||||
this.webSocket = new this.webSocketFactoryImplementation(wsUri);
|
||||
|
||||
const wsUri = new URL(this.settings.getSettings().remoteUri);
|
||||
wsUri.protocol = wsUri.protocol === "https" ? "wss" : "ws";
|
||||
wsUri.pathname = `/vaults/${this.settings.getSettings().vaultName}/ws`;
|
||||
this.webSocket.onopen = (): void => {
|
||||
this.logger.info("WebSocket connection opened");
|
||||
this.onWebSocketStatusChanged.trigger(true);
|
||||
};
|
||||
|
||||
this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`);
|
||||
this.webSocket.onmessage = (event): void => {
|
||||
try {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
const message = JSON.parse(
|
||||
event.data
|
||||
) as WebSocketServerMessage;
|
||||
|
||||
this.webSocket = new this.webSocketFactoryImplementation(wsUri);
|
||||
// Track the message handling promise
|
||||
const messageHandlingPromise = this.handleWebSocketMessage(
|
||||
message
|
||||
)
|
||||
.catch((error: unknown) => {
|
||||
this.logger.error(
|
||||
`Error handling WebSocket message: ${String(error)}`
|
||||
);
|
||||
})
|
||||
.finally(() => {
|
||||
removeFromArray(
|
||||
this.outstandingPromises,
|
||||
messageHandlingPromise
|
||||
);
|
||||
});
|
||||
|
||||
this.webSocket.onopen = (): void => {
|
||||
this.logger.info("WebSocket connection opened");
|
||||
this.webSocketStatusChangeListeners.forEach((listener) =>
|
||||
listener(true)
|
||||
);
|
||||
};
|
||||
void this.outstandingPromises.push(messageHandlingPromise); // ignore the returned promise
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Error parsing WebSocket message: ${String(error)}`
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
this.webSocket.onmessage = (event): void => {
|
||||
try {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
const message = JSON.parse(
|
||||
event.data
|
||||
) as WebSocketServerMessage;
|
||||
this.webSocket.onclose = (event): void => {
|
||||
this.logger.warn(
|
||||
`WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})`
|
||||
);
|
||||
this.onWebSocketStatusChanged.trigger(false);
|
||||
|
||||
// Track the message handling promise
|
||||
const messageHandlingPromise = this.handleWebSocketMessage(
|
||||
message
|
||||
)
|
||||
.catch((error: unknown) => {
|
||||
this.logger.error(
|
||||
`Error handling WebSocket message: ${String(error)}`
|
||||
);
|
||||
})
|
||||
.finally(() => {
|
||||
removeFromArray(
|
||||
this.outstandingPromises,
|
||||
messageHandlingPromise
|
||||
);
|
||||
});
|
||||
if (this.isStopped) {
|
||||
this.resolveDisconnectingPromise?.();
|
||||
this.resolveDisconnectingPromise = null;
|
||||
} else {
|
||||
this.reconnectTimeoutId = setTimeout(() => {
|
||||
this.reconnectTimeoutId = undefined;
|
||||
this.initializeWebSocket();
|
||||
}, this.settings.getSettings().webSocketRetryIntervalMs);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
void this.outstandingPromises.push(messageHandlingPromise); // ignore the returned promise
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Error parsing WebSocket message: ${String(error)}`
|
||||
);
|
||||
}
|
||||
};
|
||||
private async handleWebSocketMessage(
|
||||
message: WebSocketServerMessage
|
||||
): Promise<void> {
|
||||
if (message.type === "vaultUpdate") {
|
||||
await this.onRemoteVaultUpdateReceived.triggerAsync(message);
|
||||
|
||||
this.webSocket.onclose = (event): void => {
|
||||
this.logger.warn(
|
||||
`WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})`
|
||||
);
|
||||
this.webSocketStatusChangeListeners.forEach((listener) =>
|
||||
listener(false)
|
||||
);
|
||||
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
|
||||
} else if (message.type === "cursorPositions") {
|
||||
this.logger.debug(
|
||||
`Received cursor positions for ${JSON.stringify(message.clients)}`
|
||||
);
|
||||
|
||||
if (this.isStopped) {
|
||||
this.resolveDisconnectingPromise?.();
|
||||
this.resolveDisconnectingPromise = null;
|
||||
} else {
|
||||
this.reconnectTimeoutId = setTimeout(() => {
|
||||
this.reconnectTimeoutId = undefined;
|
||||
this.initializeWebSocket();
|
||||
}, this.settings.getSettings().webSocketRetryIntervalMs);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private async handleWebSocketMessage(
|
||||
message: WebSocketServerMessage
|
||||
): Promise<void> {
|
||||
if (message.type === "vaultUpdate") {
|
||||
await awaitAll(
|
||||
this.remoteVaultUpdateListeners.map(async (listener) => {
|
||||
await listener(message).catch((error: unknown) => {
|
||||
this.logger.error(
|
||||
`Error in vault update listener: ${String(error)}`
|
||||
);
|
||||
});
|
||||
})
|
||||
);
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
|
||||
} else if (message.type === "cursorPositions") {
|
||||
this.logger.debug(
|
||||
`Received cursor positions for ${JSON.stringify(message.clients)}`
|
||||
);
|
||||
|
||||
await awaitAll(
|
||||
this.remoteCursorsUpdateListeners.map(async (listener) => {
|
||||
await listener(message.clients).catch((error: unknown) => {
|
||||
this.logger.error(
|
||||
`Error in cursor positions listener: ${String(error)}`
|
||||
);
|
||||
});
|
||||
})
|
||||
);
|
||||
} else {
|
||||
this.logger.warn(
|
||||
`Received unknown message type: ${JSON.stringify(message)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
await this.onRemoteCursorsUpdateReceived.triggerAsync(
|
||||
message.clients
|
||||
);
|
||||
} else {
|
||||
this.logger.warn(
|
||||
`Received unknown message type: ${JSON.stringify(message)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,495 +26,497 @@ import { FixedSizeDocumentCache } from "./utils/data-structures/fix-sized-cache"
|
|||
import { setUpTelemetry } from "./utils/set-up-telemetry";
|
||||
import { DIFF_CACHE_SIZE_MB } from "./consts";
|
||||
import { ServerConfig } from "./services/server-config";
|
||||
import { EventListeners } from "./utils/data-structures/event-listeners";
|
||||
|
||||
export class SyncClient {
|
||||
private hasStartedOfflineSync = false;
|
||||
private hasFinishedOfflineSync = false;
|
||||
private hasStarted = false;
|
||||
private hasBeenDestroyed = false;
|
||||
private unloadTelemetry?: () => void;
|
||||
|
||||
private constructor(
|
||||
private readonly history: SyncHistory,
|
||||
private readonly settings: Settings,
|
||||
private readonly database: Database,
|
||||
private readonly syncer: Syncer,
|
||||
private readonly webSocketManager: WebSocketManager,
|
||||
public readonly logger: Logger,
|
||||
private readonly fetchController: FetchController,
|
||||
private readonly cursorTracker: CursorTracker,
|
||||
private readonly fileChangeNotifier: FileChangeNotifier,
|
||||
private readonly contentCache: FixedSizeDocumentCache,
|
||||
private readonly fileOperations: FileOperations,
|
||||
private readonly serverConfig: ServerConfig,
|
||||
private readonly persistence: PersistenceProvider<
|
||||
Partial<{
|
||||
settings: Partial<SyncSettings>;
|
||||
database: Partial<StoredDatabase>;
|
||||
}>
|
||||
>
|
||||
) {}
|
||||
|
||||
public get documentCount(): number {
|
||||
return this.database.length;
|
||||
}
|
||||
|
||||
public get isWebSocketConnected(): boolean {
|
||||
return this.webSocketManager.isWebSocketConnected;
|
||||
}
|
||||
public static async create({
|
||||
fs,
|
||||
persistence,
|
||||
fetch,
|
||||
webSocket,
|
||||
nativeLineEndings = "\n"
|
||||
}: {
|
||||
fs: FileSystemOperations;
|
||||
persistence: PersistenceProvider<
|
||||
Partial<{
|
||||
settings: Partial<SyncSettings>;
|
||||
database: Partial<StoredDatabase>;
|
||||
}>
|
||||
>;
|
||||
fetch?: typeof globalThis.fetch;
|
||||
webSocket?: typeof globalThis.WebSocket;
|
||||
nativeLineEndings?: string;
|
||||
}): Promise<SyncClient> {
|
||||
const logger = new Logger();
|
||||
|
||||
const deviceId = createClientId();
|
||||
|
||||
logger.info(`Creating SyncClient with client id ${deviceId}`);
|
||||
|
||||
const history = new SyncHistory(logger);
|
||||
|
||||
let state = (await persistence.load()) ?? {
|
||||
settings: undefined,
|
||||
database: undefined
|
||||
};
|
||||
|
||||
const settings = new Settings(
|
||||
logger,
|
||||
state.settings,
|
||||
async (data): Promise<void> => {
|
||||
state = { ...state, settings: data };
|
||||
// we're not rate-limiting settings saves as (1) we need to initialise the settings to know the rate limit
|
||||
// and (2) settings changes are infrequent enough that rate-limiting is not necessary
|
||||
await persistence.save(state);
|
||||
}
|
||||
);
|
||||
|
||||
const rateLimitedSave = rateLimit(
|
||||
persistence.save,
|
||||
() => settings.getSettings().minimumSaveIntervalMs
|
||||
);
|
||||
|
||||
const database = new Database(
|
||||
logger,
|
||||
state.database,
|
||||
async (data): Promise<void> => {
|
||||
state = { ...state, database: data };
|
||||
await rateLimitedSave(state);
|
||||
}
|
||||
);
|
||||
|
||||
const fetchController = new FetchController(
|
||||
settings.getSettings().isSyncEnabled,
|
||||
logger
|
||||
);
|
||||
settings.addOnSettingsChangeListener((newSettings, oldSettings) => {
|
||||
if (oldSettings.isSyncEnabled != newSettings.isSyncEnabled) {
|
||||
fetchController.canFetch = newSettings.isSyncEnabled;
|
||||
}
|
||||
});
|
||||
|
||||
const syncService = new SyncService(
|
||||
deviceId,
|
||||
fetchController,
|
||||
settings,
|
||||
logger,
|
||||
fetch
|
||||
);
|
||||
|
||||
const serverConfig = new ServerConfig(syncService);
|
||||
|
||||
const fileOperations = new FileOperations(
|
||||
logger,
|
||||
database,
|
||||
fs,
|
||||
serverConfig,
|
||||
nativeLineEndings
|
||||
);
|
||||
|
||||
const contentCache = new FixedSizeDocumentCache(
|
||||
1024 * 1024 * DIFF_CACHE_SIZE_MB
|
||||
);
|
||||
const unrestrictedSyncer = new UnrestrictedSyncer(
|
||||
logger,
|
||||
database,
|
||||
settings,
|
||||
syncService,
|
||||
fileOperations,
|
||||
history,
|
||||
contentCache,
|
||||
serverConfig
|
||||
);
|
||||
|
||||
const webSocketManager = new WebSocketManager(
|
||||
deviceId,
|
||||
logger,
|
||||
settings,
|
||||
webSocket
|
||||
);
|
||||
|
||||
const syncer = new Syncer(
|
||||
deviceId,
|
||||
logger,
|
||||
database,
|
||||
settings,
|
||||
syncService,
|
||||
webSocketManager,
|
||||
fileOperations,
|
||||
unrestrictedSyncer
|
||||
);
|
||||
|
||||
const fileChangeNotifier = new FileChangeNotifier();
|
||||
const cursorTracker = new CursorTracker(
|
||||
database,
|
||||
webSocketManager,
|
||||
fileOperations,
|
||||
fileChangeNotifier
|
||||
);
|
||||
const client = new SyncClient(
|
||||
history,
|
||||
settings,
|
||||
database,
|
||||
syncer,
|
||||
webSocketManager,
|
||||
logger,
|
||||
fetchController,
|
||||
cursorTracker,
|
||||
fileChangeNotifier,
|
||||
contentCache,
|
||||
fileOperations,
|
||||
serverConfig,
|
||||
persistence
|
||||
);
|
||||
|
||||
logger.info("SyncClient created successfully");
|
||||
|
||||
return client;
|
||||
}
|
||||
|
||||
public async start(): Promise<void> {
|
||||
this.checkIfDestroyed("start");
|
||||
|
||||
if (this.hasStarted) {
|
||||
throw new Error("SyncClient has already been started");
|
||||
}
|
||||
this.hasStarted = true;
|
||||
|
||||
if (
|
||||
!this.unloadTelemetry &&
|
||||
this.settings.getSettings().enableTelemetry
|
||||
) {
|
||||
this.unloadTelemetry = setUpTelemetry();
|
||||
}
|
||||
|
||||
this.logger.addOnMessageListener((log): void => {
|
||||
if (log.level === LogLevel.ERROR && Sentry.isInitialized()) {
|
||||
Sentry.captureMessage(log.message);
|
||||
}
|
||||
});
|
||||
|
||||
this.settings.addOnSettingsChangeListener(
|
||||
this.onSettingsChange.bind(this)
|
||||
);
|
||||
|
||||
if (this.settings.getSettings().isSyncEnabled) {
|
||||
this.logger.info("Starting SyncClient");
|
||||
await this.startSyncing();
|
||||
this.logger.info("SyncClient has successfully started");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reload settings from disk overriding current in-memory settings.
|
||||
* Missing values will be filled in from DEFAULT_SETTINGS rather than
|
||||
* retaining current in-memory settings.
|
||||
*/
|
||||
public async reloadSettings(): Promise<void> {
|
||||
this.checkIfDestroyed("reloadSettings");
|
||||
|
||||
const state = (await this.persistence.load()) ?? {
|
||||
settings: undefined
|
||||
};
|
||||
|
||||
const settings = {
|
||||
...DEFAULT_SETTINGS,
|
||||
...(state.settings ?? {})
|
||||
};
|
||||
|
||||
await this.setSettings(settings);
|
||||
}
|
||||
|
||||
public async checkConnection(): Promise<NetworkConnectionStatus> {
|
||||
this.checkIfDestroyed("checkConnection");
|
||||
|
||||
const server = await this.serverConfig.checkConnection(true);
|
||||
return {
|
||||
isSuccessful: server.isSuccessful,
|
||||
serverMessage: server.message,
|
||||
isWebSocketConnected: this.webSocketManager.isWebSocketConnected
|
||||
};
|
||||
}
|
||||
|
||||
public getHistoryEntries(): readonly HistoryEntry[] {
|
||||
return this.history.entries;
|
||||
}
|
||||
|
||||
public addSyncHistoryUpdateListener(
|
||||
listener: (stats: HistoryStats) => unknown
|
||||
): void {
|
||||
this.checkIfDestroyed("addSyncHistoryUpdateListener");
|
||||
|
||||
this.history.addSyncHistoryUpdateListener(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the in-flight operations to finish, reset all tracking,
|
||||
* and the local database but retain the settings.
|
||||
* The SyncClient can be used again after calling this method.
|
||||
*/
|
||||
public async reset(): Promise<void> {
|
||||
this.checkIfDestroyed("reset");
|
||||
|
||||
this.logger.info(
|
||||
"Stopping SyncClient to apply changed connection settings"
|
||||
);
|
||||
await this.pause();
|
||||
|
||||
// clear all local state
|
||||
this.logger.info("Resetting SyncClient's local state");
|
||||
this.database.reset();
|
||||
await this.database.save(); // ensure the new database reads as empty
|
||||
this.resetInMemoryState();
|
||||
this.hasStartedOfflineSync = false;
|
||||
this.hasFinishedOfflineSync = false;
|
||||
this.serverConfig.reset();
|
||||
|
||||
await this.startSyncing();
|
||||
}
|
||||
|
||||
public getSettings(): SyncSettings {
|
||||
return this.settings.getSettings();
|
||||
}
|
||||
|
||||
public async setSetting<T extends keyof SyncSettings>(
|
||||
key: T,
|
||||
value: SyncSettings[T]
|
||||
): Promise<void> {
|
||||
this.checkIfDestroyed("setSetting");
|
||||
|
||||
await this.settings.setSetting(key, value);
|
||||
}
|
||||
|
||||
public async setSettings(value: Partial<SyncSettings>): Promise<void> {
|
||||
this.checkIfDestroyed("setSettings");
|
||||
|
||||
await this.settings.setSettings(value);
|
||||
}
|
||||
|
||||
public addOnSettingsChangeListener(
|
||||
listener: (settings: SyncSettings, oldSettings: SyncSettings) => unknown
|
||||
): void {
|
||||
this.checkIfDestroyed("addOnSettingsChangeListener");
|
||||
|
||||
this.settings.addOnSettingsChangeListener(listener);
|
||||
}
|
||||
|
||||
public addRemainingSyncOperationsListener(
|
||||
listener: (remainingOperations: number) => unknown
|
||||
): void {
|
||||
this.checkIfDestroyed("addRemainingSyncOperationsListener");
|
||||
|
||||
this.syncer.addRemainingOperationsListener(listener);
|
||||
}
|
||||
|
||||
public addWebSocketStatusChangeListener(listener: () => unknown): void {
|
||||
this.checkIfDestroyed("addWebSocketStatusChangeListener");
|
||||
|
||||
this.webSocketManager.addWebSocketStatusChangeListener(listener);
|
||||
}
|
||||
|
||||
public async syncLocallyCreatedFile(
|
||||
relativePath: RelativePath
|
||||
): Promise<void> {
|
||||
this.checkIfDestroyed("syncLocallyCreatedFile");
|
||||
|
||||
this.fileChangeNotifier.notifyOfFileChange(relativePath);
|
||||
return this.syncer.syncLocallyCreatedFile(relativePath);
|
||||
}
|
||||
|
||||
public async syncLocallyDeletedFile(
|
||||
relativePath: RelativePath
|
||||
): Promise<void> {
|
||||
this.checkIfDestroyed("syncLocallyDeletedFile");
|
||||
|
||||
this.fileChangeNotifier.notifyOfFileChange(relativePath);
|
||||
return this.syncer.syncLocallyDeletedFile(relativePath);
|
||||
}
|
||||
|
||||
public async syncLocallyUpdatedFile({
|
||||
oldPath,
|
||||
relativePath
|
||||
}: {
|
||||
oldPath?: RelativePath;
|
||||
relativePath: RelativePath;
|
||||
}): Promise<void> {
|
||||
this.checkIfDestroyed("syncLocallyUpdatedFile");
|
||||
|
||||
this.fileChangeNotifier.notifyOfFileChange(relativePath);
|
||||
return this.syncer.syncLocallyUpdatedFile({
|
||||
oldPath,
|
||||
relativePath
|
||||
});
|
||||
}
|
||||
|
||||
public getDocumentSyncingStatus(
|
||||
relativePath: RelativePath
|
||||
): DocumentSyncStatus {
|
||||
this.checkIfDestroyed("getDocumentSyncingStatus");
|
||||
|
||||
if (!this.settings.getSettings().isSyncEnabled) {
|
||||
return DocumentSyncStatus.SYNCING_IS_DISABLED;
|
||||
}
|
||||
|
||||
if (!this.syncer.isFirstSyncComplete || !this.hasFinishedOfflineSync) {
|
||||
return DocumentSyncStatus.SYNCING;
|
||||
}
|
||||
|
||||
const document =
|
||||
this.database.getLatestDocumentByRelativePath(relativePath);
|
||||
if (document === undefined) {
|
||||
return DocumentSyncStatus.SYNCING;
|
||||
}
|
||||
return document.updates.length > 0
|
||||
? DocumentSyncStatus.SYNCING
|
||||
: DocumentSyncStatus.UP_TO_DATE;
|
||||
}
|
||||
|
||||
public async updateLocalCursors(
|
||||
documentToCursors: Record<RelativePath, CursorSpan[]>
|
||||
): Promise<void> {
|
||||
this.checkIfDestroyed("updateLocalCursors");
|
||||
|
||||
await this.cursorTracker.sendLocalCursorsToServer(documentToCursors);
|
||||
}
|
||||
|
||||
public addRemoteCursorsUpdateListener(
|
||||
listener: (cursors: MaybeOutdatedClientCursors[]) => unknown
|
||||
): void {
|
||||
this.checkIfDestroyed("addRemoteCursorsUpdateListener");
|
||||
|
||||
this.cursorTracker.addRemoteCursorsUpdateListener(listener);
|
||||
}
|
||||
|
||||
public async waitUntilFinished(): Promise<void> {
|
||||
this.checkIfDestroyed("waitUntilIdle");
|
||||
await this.syncer.waitUntilFinished();
|
||||
await this.webSocketManager.waitUntilFinished();
|
||||
await this.database.save(); // flush all changes to disk
|
||||
}
|
||||
|
||||
/**
|
||||
* Completely destroy the SyncClient, cancelling all in-progress operations.
|
||||
* After calling this method, the SyncClient cannot be used again.
|
||||
*/
|
||||
public async destroy(): Promise<void> {
|
||||
this.checkIfDestroyed("destroy");
|
||||
|
||||
// cancel everything that's in progress
|
||||
await this.pause();
|
||||
|
||||
this.hasBeenDestroyed = true;
|
||||
|
||||
this.resetInMemoryState();
|
||||
|
||||
this.logger.info("SyncClient has been successfully disposed");
|
||||
|
||||
this.unloadTelemetry?.();
|
||||
}
|
||||
|
||||
private async startSyncing(): Promise<void> {
|
||||
this.checkIfDestroyed("startSyncing");
|
||||
this.fetchController.finishReset();
|
||||
|
||||
await this.serverConfig.initialize();
|
||||
this.webSocketManager.start();
|
||||
|
||||
if (!this.hasStartedOfflineSync) {
|
||||
this.hasStartedOfflineSync = true;
|
||||
await this.syncer.scheduleSyncForOfflineChanges();
|
||||
}
|
||||
|
||||
this.hasFinishedOfflineSync = true;
|
||||
}
|
||||
|
||||
private async pause(): Promise<void> {
|
||||
this.fetchController.startReset();
|
||||
await this.webSocketManager.stop();
|
||||
await this.waitUntilFinished();
|
||||
}
|
||||
|
||||
private resetInMemoryState(): void {
|
||||
this.history.reset();
|
||||
this.contentCache.reset();
|
||||
// don't reset the logger
|
||||
this.cursorTracker.reset();
|
||||
this.syncer.reset();
|
||||
this.fileOperations.reset();
|
||||
}
|
||||
|
||||
private async onSettingsChange(
|
||||
newSettings: SyncSettings,
|
||||
oldSettings: SyncSettings
|
||||
): Promise<void> {
|
||||
this.checkIfDestroyed("onSettingsChange");
|
||||
|
||||
if (
|
||||
newSettings.vaultName !== oldSettings.vaultName ||
|
||||
newSettings.remoteUri !== oldSettings.remoteUri
|
||||
) {
|
||||
await this.reset();
|
||||
}
|
||||
|
||||
if (newSettings.isSyncEnabled !== oldSettings.isSyncEnabled) {
|
||||
if (newSettings.isSyncEnabled) {
|
||||
await this.startSyncing();
|
||||
} else {
|
||||
await this.pause();
|
||||
}
|
||||
}
|
||||
|
||||
if (newSettings.diffCacheSizeMB !== oldSettings.diffCacheSizeMB) {
|
||||
this.contentCache.resize(newSettings.diffCacheSizeMB * 1024 * 1024);
|
||||
}
|
||||
|
||||
if (newSettings.enableTelemetry !== oldSettings.enableTelemetry) {
|
||||
if (newSettings.enableTelemetry) {
|
||||
this.unloadTelemetry = setUpTelemetry();
|
||||
} else {
|
||||
this.unloadTelemetry?.();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private checkIfDestroyed(origin: string): void {
|
||||
if (this.hasBeenDestroyed) {
|
||||
throw new Error(
|
||||
`SyncClient has been destroyed and can no longer be used; called from ${origin}`
|
||||
);
|
||||
}
|
||||
}
|
||||
private hasStartedOfflineSync = false;
|
||||
private hasFinishedOfflineSync = false;
|
||||
private hasStarted = false;
|
||||
private hasBeenDestroyed = false;
|
||||
private unloadTelemetry?: () => void;
|
||||
|
||||
private constructor(
|
||||
private readonly history: SyncHistory,
|
||||
private readonly settings: Settings,
|
||||
private readonly database: Database,
|
||||
private readonly syncer: Syncer,
|
||||
private readonly webSocketManager: WebSocketManager,
|
||||
public readonly logger: Logger,
|
||||
private readonly fetchController: FetchController,
|
||||
private readonly cursorTracker: CursorTracker,
|
||||
private readonly fileChangeNotifier: FileChangeNotifier,
|
||||
private readonly contentCache: FixedSizeDocumentCache,
|
||||
private readonly fileOperations: FileOperations,
|
||||
private readonly serverConfig: ServerConfig,
|
||||
private readonly persistence: PersistenceProvider<
|
||||
Partial<{
|
||||
settings: Partial<SyncSettings>;
|
||||
database: Partial<StoredDatabase>;
|
||||
}>
|
||||
>
|
||||
) { }
|
||||
|
||||
public get documentCount(): number {
|
||||
return this.database.length;
|
||||
}
|
||||
|
||||
public get isWebSocketConnected(): boolean {
|
||||
return this.webSocketManager.isWebSocketConnected;
|
||||
}
|
||||
public static async create({
|
||||
fs,
|
||||
persistence,
|
||||
fetch,
|
||||
webSocket,
|
||||
nativeLineEndings = "\n"
|
||||
}: {
|
||||
fs: FileSystemOperations;
|
||||
persistence: PersistenceProvider<
|
||||
Partial<{
|
||||
settings: Partial<SyncSettings>;
|
||||
database: Partial<StoredDatabase>;
|
||||
}>
|
||||
>;
|
||||
fetch?: typeof globalThis.fetch;
|
||||
webSocket?: typeof globalThis.WebSocket;
|
||||
nativeLineEndings?: string;
|
||||
}): Promise<SyncClient> {
|
||||
const logger = new Logger();
|
||||
|
||||
const deviceId = createClientId();
|
||||
|
||||
logger.info(`Creating SyncClient with client id ${deviceId}`);
|
||||
|
||||
const history = new SyncHistory(logger);
|
||||
|
||||
let state = (await persistence.load()) ?? {
|
||||
settings: undefined,
|
||||
database: undefined
|
||||
};
|
||||
|
||||
const settings = new Settings(
|
||||
logger,
|
||||
state.settings,
|
||||
async (data): Promise<void> => {
|
||||
state = { ...state, settings: data };
|
||||
// we're not rate-limiting settings saves as (1) we need to initialise the settings to know the rate limit
|
||||
// and (2) settings changes are infrequent enough that rate-limiting is not necessary
|
||||
await persistence.save(state);
|
||||
}
|
||||
);
|
||||
|
||||
const rateLimitedSave = rateLimit(
|
||||
persistence.save,
|
||||
() => settings.getSettings().minimumSaveIntervalMs
|
||||
);
|
||||
|
||||
const database = new Database(
|
||||
logger,
|
||||
state.database,
|
||||
async (data): Promise<void> => {
|
||||
state = { ...state, database: data };
|
||||
await rateLimitedSave(state);
|
||||
}
|
||||
);
|
||||
|
||||
const fetchController = new FetchController(
|
||||
settings.getSettings().isSyncEnabled,
|
||||
logger
|
||||
);
|
||||
settings.onSettingsChanged.add((newSettings, oldSettings) => {
|
||||
if (oldSettings.isSyncEnabled != newSettings.isSyncEnabled) {
|
||||
fetchController.canFetch = newSettings.isSyncEnabled;
|
||||
}
|
||||
});
|
||||
|
||||
const syncService = new SyncService(
|
||||
deviceId,
|
||||
fetchController,
|
||||
settings,
|
||||
logger,
|
||||
fetch
|
||||
);
|
||||
|
||||
const serverConfig = new ServerConfig(syncService);
|
||||
|
||||
const fileOperations = new FileOperations(
|
||||
logger,
|
||||
database,
|
||||
fs,
|
||||
serverConfig,
|
||||
nativeLineEndings
|
||||
);
|
||||
|
||||
const contentCache = new FixedSizeDocumentCache(
|
||||
1024 * 1024 * DIFF_CACHE_SIZE_MB
|
||||
);
|
||||
const unrestrictedSyncer = new UnrestrictedSyncer(
|
||||
logger,
|
||||
database,
|
||||
settings,
|
||||
syncService,
|
||||
fileOperations,
|
||||
history,
|
||||
contentCache,
|
||||
serverConfig
|
||||
);
|
||||
|
||||
const webSocketManager = new WebSocketManager(
|
||||
deviceId,
|
||||
logger,
|
||||
settings,
|
||||
webSocket
|
||||
);
|
||||
|
||||
const syncer = new Syncer(
|
||||
deviceId,
|
||||
logger,
|
||||
database,
|
||||
settings,
|
||||
syncService,
|
||||
webSocketManager,
|
||||
fileOperations,
|
||||
unrestrictedSyncer
|
||||
);
|
||||
|
||||
const fileChangeNotifier = new FileChangeNotifier();
|
||||
const cursorTracker = new CursorTracker(
|
||||
database,
|
||||
webSocketManager,
|
||||
fileOperations,
|
||||
fileChangeNotifier
|
||||
);
|
||||
const client = new SyncClient(
|
||||
history,
|
||||
settings,
|
||||
database,
|
||||
syncer,
|
||||
webSocketManager,
|
||||
logger,
|
||||
fetchController,
|
||||
cursorTracker,
|
||||
fileChangeNotifier,
|
||||
contentCache,
|
||||
fileOperations,
|
||||
serverConfig,
|
||||
persistence
|
||||
);
|
||||
|
||||
logger.info("SyncClient created successfully");
|
||||
|
||||
return client;
|
||||
}
|
||||
|
||||
public async start(): Promise<void> {
|
||||
this.checkIfDestroyed("start");
|
||||
|
||||
if (this.hasStarted) {
|
||||
throw new Error("SyncClient has already been started");
|
||||
}
|
||||
this.hasStarted = true;
|
||||
|
||||
if (
|
||||
!this.unloadTelemetry &&
|
||||
this.settings.getSettings().enableTelemetry
|
||||
) {
|
||||
this.unloadTelemetry = setUpTelemetry();
|
||||
}
|
||||
|
||||
this.logger.onLogEmitted.add((log): void => {
|
||||
if (log.level === LogLevel.ERROR && Sentry.isInitialized()) {
|
||||
Sentry.captureMessage(log.message);
|
||||
}
|
||||
});
|
||||
|
||||
this.settings.onSettingsChanged.add(
|
||||
this.onSettingsChange.bind(this)
|
||||
);
|
||||
|
||||
if (this.settings.getSettings().isSyncEnabled) {
|
||||
this.logger.info("Starting SyncClient");
|
||||
await this.startSyncing();
|
||||
this.logger.info("SyncClient has successfully started");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reload settings from disk overriding current in-memory settings.
|
||||
* Missing values will be filled in from DEFAULT_SETTINGS rather than
|
||||
* retaining current in-memory settings.
|
||||
*/
|
||||
public async reloadSettings(): Promise<void> {
|
||||
this.checkIfDestroyed("reloadSettings");
|
||||
|
||||
const state = (await this.persistence.load()) ?? {
|
||||
settings: undefined
|
||||
};
|
||||
|
||||
const settings = {
|
||||
...DEFAULT_SETTINGS,
|
||||
...(state.settings ?? {})
|
||||
};
|
||||
|
||||
await this.setSettings(settings);
|
||||
}
|
||||
|
||||
public async checkConnection(): Promise<NetworkConnectionStatus> {
|
||||
this.checkIfDestroyed("checkConnection");
|
||||
|
||||
const server = await this.serverConfig.checkConnection(true);
|
||||
return {
|
||||
isSuccessful: server.isSuccessful,
|
||||
serverMessage: server.message,
|
||||
isWebSocketConnected: this.webSocketManager.isWebSocketConnected
|
||||
};
|
||||
}
|
||||
|
||||
public getHistoryEntries(): readonly HistoryEntry[] {
|
||||
return this.history.entries;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the in-flight operations to finish, reset all tracking,
|
||||
* and the local database but retain the settings.
|
||||
* The SyncClient can be used again after calling this method.
|
||||
*/
|
||||
public async reset(): Promise<void> {
|
||||
this.checkIfDestroyed("reset");
|
||||
|
||||
this.logger.info(
|
||||
"Stopping SyncClient to apply changed connection settings"
|
||||
);
|
||||
await this.pause();
|
||||
|
||||
// clear all local state
|
||||
this.logger.info("Resetting SyncClient's local state");
|
||||
this.database.reset();
|
||||
await this.database.save(); // ensure the new database reads as empty
|
||||
this.resetInMemoryState();
|
||||
this.hasStartedOfflineSync = false;
|
||||
this.hasFinishedOfflineSync = false;
|
||||
this.serverConfig.reset();
|
||||
|
||||
await this.startSyncing();
|
||||
}
|
||||
|
||||
public getSettings(): SyncSettings {
|
||||
return this.settings.getSettings();
|
||||
}
|
||||
|
||||
public async setSetting<T extends keyof SyncSettings>(
|
||||
key: T,
|
||||
value: SyncSettings[T]
|
||||
): Promise<void> {
|
||||
this.checkIfDestroyed("setSetting");
|
||||
|
||||
await this.settings.setSetting(key, value);
|
||||
}
|
||||
|
||||
public async setSettings(value: Partial<SyncSettings>): Promise<void> {
|
||||
this.checkIfDestroyed("setSettings");
|
||||
|
||||
await this.settings.setSettings(value);
|
||||
}
|
||||
|
||||
public get onSyncHistoryUpdated(): EventListeners<
|
||||
(stats: HistoryStats) => unknown
|
||||
> {
|
||||
this.checkIfDestroyed("onSyncHistoryUpdated getter");
|
||||
return this.history.onHistoryUpdated;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
public get onSettingsChanged(): EventListeners<
|
||||
(newSettings: SyncSettings, oldSettings: SyncSettings) => unknown
|
||||
> {
|
||||
this.checkIfDestroyed("onSettingsChanged getter");
|
||||
return this.settings.onSettingsChanged;
|
||||
}
|
||||
|
||||
public get onRemainingOperationsCountChanged(): EventListeners<
|
||||
(remainingOperationsCount: number) => unknown
|
||||
> {
|
||||
this.checkIfDestroyed("onRemainingOperationsCountChanged getter");
|
||||
return this.syncer.onRemainingOperationsCountChanged;
|
||||
}
|
||||
|
||||
public get onWebSocketStatusChanged(): EventListeners<
|
||||
(isConnected: boolean) => unknown
|
||||
> {
|
||||
this.checkIfDestroyed("onWebSocketStatusChanged getter");
|
||||
return this.webSocketManager.onWebSocketStatusChanged;
|
||||
}
|
||||
|
||||
public async syncLocallyCreatedFile(
|
||||
relativePath: RelativePath
|
||||
): Promise<void> {
|
||||
this.checkIfDestroyed("syncLocallyCreatedFile");
|
||||
|
||||
this.fileChangeNotifier.notifyOfFileChange(relativePath);
|
||||
return this.syncer.syncLocallyCreatedFile(relativePath);
|
||||
}
|
||||
|
||||
public async syncLocallyDeletedFile(
|
||||
relativePath: RelativePath
|
||||
): Promise<void> {
|
||||
this.checkIfDestroyed("syncLocallyDeletedFile");
|
||||
|
||||
this.fileChangeNotifier.notifyOfFileChange(relativePath);
|
||||
return this.syncer.syncLocallyDeletedFile(relativePath);
|
||||
}
|
||||
|
||||
public async syncLocallyUpdatedFile({
|
||||
oldPath,
|
||||
relativePath
|
||||
}: {
|
||||
oldPath?: RelativePath;
|
||||
relativePath: RelativePath;
|
||||
}): Promise<void> {
|
||||
this.checkIfDestroyed("syncLocallyUpdatedFile");
|
||||
|
||||
this.fileChangeNotifier.notifyOfFileChange(relativePath);
|
||||
return this.syncer.syncLocallyUpdatedFile({
|
||||
oldPath,
|
||||
relativePath
|
||||
});
|
||||
}
|
||||
|
||||
public getDocumentSyncingStatus(
|
||||
relativePath: RelativePath
|
||||
): DocumentSyncStatus {
|
||||
this.checkIfDestroyed("getDocumentSyncingStatus");
|
||||
|
||||
if (!this.settings.getSettings().isSyncEnabled) {
|
||||
return DocumentSyncStatus.SYNCING_IS_DISABLED;
|
||||
}
|
||||
|
||||
if (!this.syncer.isFirstSyncComplete || !this.hasFinishedOfflineSync) {
|
||||
return DocumentSyncStatus.SYNCING;
|
||||
}
|
||||
|
||||
const document =
|
||||
this.database.getLatestDocumentByRelativePath(relativePath);
|
||||
if (document === undefined) {
|
||||
return DocumentSyncStatus.SYNCING;
|
||||
}
|
||||
return document.updates.length > 0
|
||||
? DocumentSyncStatus.SYNCING
|
||||
: DocumentSyncStatus.UP_TO_DATE;
|
||||
}
|
||||
|
||||
public async updateLocalCursors(
|
||||
documentToCursors: Record<RelativePath, CursorSpan[]>
|
||||
): Promise<void> {
|
||||
this.checkIfDestroyed("updateLocalCursors");
|
||||
|
||||
await this.cursorTracker.sendLocalCursorsToServer(documentToCursors);
|
||||
}
|
||||
|
||||
|
||||
public get onRemoteCursorsUpdated(): EventListeners<
|
||||
(cursors: MaybeOutdatedClientCursors[]) => unknown
|
||||
> {
|
||||
this.checkIfDestroyed("onRemoteCursorsUpdated getter");
|
||||
return this.cursorTracker.onRemoteCursorsUpdated;
|
||||
}
|
||||
|
||||
public async waitUntilFinished(): Promise<void> {
|
||||
this.checkIfDestroyed("waitUntilIdle");
|
||||
await this.syncer.waitUntilFinished();
|
||||
await this.webSocketManager.waitUntilFinished();
|
||||
await this.database.save(); // flush all changes to disk
|
||||
}
|
||||
|
||||
/**
|
||||
* Completely destroy the SyncClient, cancelling all in-progress operations.
|
||||
* After calling this method, the SyncClient cannot be used again.
|
||||
*/
|
||||
public async destroy(): Promise<void> {
|
||||
this.checkIfDestroyed("destroy");
|
||||
|
||||
// cancel everything that's in progress
|
||||
await this.pause();
|
||||
|
||||
this.hasBeenDestroyed = true;
|
||||
|
||||
this.resetInMemoryState();
|
||||
|
||||
this.logger.info("SyncClient has been successfully disposed");
|
||||
|
||||
this.unloadTelemetry?.();
|
||||
}
|
||||
|
||||
private async startSyncing(): Promise<void> {
|
||||
this.checkIfDestroyed("startSyncing");
|
||||
this.fetchController.finishReset();
|
||||
|
||||
await this.serverConfig.initialize();
|
||||
this.webSocketManager.start();
|
||||
|
||||
if (!this.hasStartedOfflineSync) {
|
||||
this.hasStartedOfflineSync = true;
|
||||
await this.syncer.scheduleSyncForOfflineChanges();
|
||||
}
|
||||
|
||||
this.hasFinishedOfflineSync = true;
|
||||
}
|
||||
|
||||
private async pause(): Promise<void> {
|
||||
this.fetchController.startReset();
|
||||
await this.webSocketManager.stop();
|
||||
await this.waitUntilFinished();
|
||||
}
|
||||
|
||||
private resetInMemoryState(): void {
|
||||
this.history.reset();
|
||||
this.contentCache.reset();
|
||||
// don't reset the logger
|
||||
this.cursorTracker.reset();
|
||||
this.syncer.reset();
|
||||
this.fileOperations.reset();
|
||||
}
|
||||
|
||||
private async onSettingsChange(
|
||||
newSettings: SyncSettings,
|
||||
oldSettings: SyncSettings
|
||||
): Promise<void> {
|
||||
this.checkIfDestroyed("onSettingsChange");
|
||||
|
||||
if (
|
||||
newSettings.vaultName !== oldSettings.vaultName ||
|
||||
newSettings.remoteUri !== oldSettings.remoteUri
|
||||
) {
|
||||
await this.reset();
|
||||
}
|
||||
|
||||
if (newSettings.isSyncEnabled !== oldSettings.isSyncEnabled) {
|
||||
if (newSettings.isSyncEnabled) {
|
||||
await this.startSyncing();
|
||||
} else {
|
||||
await this.pause();
|
||||
}
|
||||
}
|
||||
|
||||
if (newSettings.diffCacheSizeMB !== oldSettings.diffCacheSizeMB) {
|
||||
this.contentCache.resize(newSettings.diffCacheSizeMB * 1024 * 1024);
|
||||
}
|
||||
|
||||
if (newSettings.enableTelemetry !== oldSettings.enableTelemetry) {
|
||||
if (newSettings.enableTelemetry) {
|
||||
this.unloadTelemetry = setUpTelemetry();
|
||||
} else {
|
||||
this.unloadTelemetry?.();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private checkIfDestroyed(origin: string): void {
|
||||
if (this.hasBeenDestroyed) {
|
||||
throw new Error(
|
||||
`SyncClient has been destroyed and can no longer be used; called from ${origin}`
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,252 +9,252 @@ import { DocumentUpToDateness } from "../types/document-up-to-dateness";
|
|||
import { hash } from "../utils/hash";
|
||||
import type { FileChangeNotifier } from "./file-change-notifier";
|
||||
import { Lock } from "../utils/data-structures/locks";
|
||||
import { EventListeners } from "../utils/data-structures/event-listeners";
|
||||
|
||||
// Cursor positions are updated separately from documents. However, a given cursor position is only
|
||||
// valid within a certain version of the document it belongs to. This class tracks previous and the latest
|
||||
// known remote cursor positions, and for each document, tries to return the latest cursor positions that are
|
||||
// not from the future.
|
||||
export class CursorTracker {
|
||||
private readonly updateLock = new Lock();
|
||||
private readonly updateLock = new Lock();
|
||||
|
||||
private knownRemoteCursors: (ClientCursors & {
|
||||
upToDateness: DocumentUpToDateness;
|
||||
})[] = [];
|
||||
// The returned position may be accurate, if it matches the document version, or outdated, in which case
|
||||
// the client has to heuristically guess it's current position based on the local edits.
|
||||
public readonly onRemoteCursorsUpdated = new EventListeners<
|
||||
(cursors: MaybeOutdatedClientCursors[]) => unknown
|
||||
>();
|
||||
|
||||
private lastLocalCursorState: DocumentWithCursors[] = [];
|
||||
private lastLocalCursorStateWithoutDirtyDocuments: DocumentWithCursors[] =
|
||||
[];
|
||||
private knownRemoteCursors: (ClientCursors & {
|
||||
upToDateness: DocumentUpToDateness;
|
||||
})[] = [];
|
||||
|
||||
public constructor(
|
||||
private readonly database: Database,
|
||||
private readonly webSocketManager: WebSocketManager,
|
||||
private readonly fileOperations: FileOperations,
|
||||
private readonly fileChangeNotifier: FileChangeNotifier
|
||||
) {
|
||||
this.webSocketManager.addRemoteCursorsUpdateListener(
|
||||
async (clientCursors) => {
|
||||
await this.updateLock.withLock(async () => {
|
||||
// The latest message will contain all active clients, so we can delete the ones
|
||||
// from the local list which are no longer active.
|
||||
const allIds = new Set(
|
||||
clientCursors.map((c) => c.deviceId)
|
||||
);
|
||||
const updatedKnownRemoteCursors =
|
||||
this.knownRemoteCursors.filter((c) =>
|
||||
allIds.has(c.deviceId)
|
||||
);
|
||||
private lastLocalCursorState: DocumentWithCursors[] = [];
|
||||
private lastLocalCursorStateWithoutDirtyDocuments: DocumentWithCursors[] =
|
||||
[];
|
||||
|
||||
for (const cursor of clientCursors.filter((client) =>
|
||||
client.documentsWithCursors.every(
|
||||
(doc) => doc.vault_update_id != null
|
||||
)
|
||||
)) {
|
||||
updatedKnownRemoteCursors.push({
|
||||
...cursor,
|
||||
upToDateness:
|
||||
await this.getDocumentsUpToDateness(cursor)
|
||||
});
|
||||
}
|
||||
public constructor(
|
||||
private readonly database: Database,
|
||||
private readonly webSocketManager: WebSocketManager,
|
||||
private readonly fileOperations: FileOperations,
|
||||
private readonly fileChangeNotifier: FileChangeNotifier
|
||||
) {
|
||||
this.webSocketManager.onRemoteCursorsUpdateReceived.add(
|
||||
async (clientCursors) => {
|
||||
await this.updateLock.withLock(async () => {
|
||||
// The latest message will contain all active clients, so we can delete the ones
|
||||
// from the local list which are no longer active.
|
||||
const allIds = new Set(
|
||||
clientCursors.map((c) => c.deviceId)
|
||||
);
|
||||
const updatedKnownRemoteCursors =
|
||||
this.knownRemoteCursors.filter((c) =>
|
||||
allIds.has(c.deviceId)
|
||||
);
|
||||
|
||||
this.knownRemoteCursors = updatedKnownRemoteCursors;
|
||||
});
|
||||
}
|
||||
);
|
||||
for (const cursor of clientCursors.filter((client) =>
|
||||
client.documentsWithCursors.every(
|
||||
(doc) => doc.vault_update_id != null
|
||||
)
|
||||
)) {
|
||||
updatedKnownRemoteCursors.push({
|
||||
...cursor,
|
||||
upToDateness:
|
||||
await this.getDocumentsUpToDateness(cursor)
|
||||
});
|
||||
}
|
||||
|
||||
this.fileChangeNotifier.addFileChangeListener(async (relativePath) =>
|
||||
this.updateLock.withLock(async () => {
|
||||
for (const clientCursor of this.knownRemoteCursors) {
|
||||
if (
|
||||
clientCursor.documentsWithCursors.some(
|
||||
(document) =>
|
||||
document.relative_path === relativePath
|
||||
)
|
||||
) {
|
||||
clientCursor.upToDateness =
|
||||
await this.getDocumentsUpToDateness(clientCursor);
|
||||
}
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
this.knownRemoteCursors = updatedKnownRemoteCursors;
|
||||
});
|
||||
|
||||
/// Update the local cursors for the given documents.
|
||||
/// Can be called frequently as it only emits an event
|
||||
/// if the state has actually changed.
|
||||
public async sendLocalCursorsToServer(
|
||||
documentToCursors: Record<RelativePath, CursorSpan[]>
|
||||
): Promise<void> {
|
||||
const documentsWithCursors: DocumentWithCursors[] = [];
|
||||
this.onRemoteCursorsUpdated.trigger(
|
||||
this.getRelevantAndPruneKnownClientCursors()
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
for (const [relativePath, cursors] of Object.entries(
|
||||
documentToCursors
|
||||
)) {
|
||||
const record =
|
||||
this.database.getLatestDocumentByRelativePath(relativePath);
|
||||
|
||||
if (!record) {
|
||||
continue; // Let's wait for the file to be created before sending cursors
|
||||
}
|
||||
this.fileChangeNotifier.onFileChanged.add(async (relativePath) =>
|
||||
this.updateLock.withLock(async () => {
|
||||
for (const clientCursor of this.knownRemoteCursors) {
|
||||
if (
|
||||
clientCursor.documentsWithCursors.some(
|
||||
(document) =>
|
||||
document.relative_path === relativePath
|
||||
)
|
||||
) {
|
||||
clientCursor.upToDateness =
|
||||
await this.getDocumentsUpToDateness(clientCursor);
|
||||
}
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
if (!record.metadata) {
|
||||
continue; // this is a new document, no need to sync the cursors
|
||||
}
|
||||
/// Update the local cursors for the given documents.
|
||||
/// Can be called frequently as it only emits an event
|
||||
/// if the state has actually changed.
|
||||
public async sendLocalCursorsToServer(
|
||||
documentToCursors: Record<RelativePath, CursorSpan[]>
|
||||
): Promise<void> {
|
||||
const documentsWithCursors: DocumentWithCursors[] = [];
|
||||
|
||||
documentsWithCursors.push({
|
||||
relative_path: relativePath,
|
||||
document_id: record.documentId,
|
||||
vault_update_id: record.metadata.parentVersionId,
|
||||
cursors: cursors.map(({ start, end }) => ({
|
||||
start: Math.min(start, end),
|
||||
end: Math.max(start, end)
|
||||
})) // the client might send directional selections
|
||||
});
|
||||
}
|
||||
for (const [relativePath, cursors] of Object.entries(
|
||||
documentToCursors
|
||||
)) {
|
||||
const record =
|
||||
this.database.getLatestDocumentByRelativePath(relativePath);
|
||||
|
||||
if (
|
||||
JSON.stringify(this.lastLocalCursorState) ===
|
||||
JSON.stringify(documentsWithCursors)
|
||||
) {
|
||||
// Caching step to avoid reading the edited files all the time
|
||||
return;
|
||||
}
|
||||
this.lastLocalCursorState = documentsWithCursors;
|
||||
if (!record) {
|
||||
continue; // Let's wait for the file to be created before sending cursors
|
||||
}
|
||||
|
||||
for (const doc of documentsWithCursors) {
|
||||
const readContent = await this.fileOperations.read(
|
||||
doc.relative_path
|
||||
);
|
||||
const record = this.database.getLatestDocumentByRelativePath(
|
||||
doc.relative_path
|
||||
);
|
||||
if (record?.metadata?.hash !== hash(readContent)) {
|
||||
doc.vault_update_id = null;
|
||||
}
|
||||
}
|
||||
if (!record.metadata) {
|
||||
continue; // this is a new document, no need to sync the cursors
|
||||
}
|
||||
|
||||
if (
|
||||
JSON.stringify(this.lastLocalCursorStateWithoutDirtyDocuments) ===
|
||||
JSON.stringify(documentsWithCursors)
|
||||
) {
|
||||
return;
|
||||
}
|
||||
documentsWithCursors.push({
|
||||
relative_path: relativePath,
|
||||
document_id: record.documentId,
|
||||
vault_update_id: record.metadata.parentVersionId,
|
||||
cursors: cursors.map(({ start, end }) => ({
|
||||
start: Math.min(start, end),
|
||||
end: Math.max(start, end)
|
||||
})) // the client might send directional selections
|
||||
});
|
||||
}
|
||||
|
||||
this.lastLocalCursorStateWithoutDirtyDocuments = documentsWithCursors;
|
||||
if (
|
||||
JSON.stringify(this.lastLocalCursorState) ===
|
||||
JSON.stringify(documentsWithCursors)
|
||||
) {
|
||||
// Caching step to avoid reading the edited files all the time
|
||||
return;
|
||||
}
|
||||
this.lastLocalCursorState = documentsWithCursors;
|
||||
|
||||
this.webSocketManager.updateLocalCursors({ documentsWithCursors });
|
||||
}
|
||||
for (const doc of documentsWithCursors) {
|
||||
const readContent = await this.fileOperations.read(
|
||||
doc.relative_path
|
||||
);
|
||||
const record = this.database.getLatestDocumentByRelativePath(
|
||||
doc.relative_path
|
||||
);
|
||||
if (record?.metadata?.hash !== hash(readContent)) {
|
||||
doc.vault_update_id = null;
|
||||
}
|
||||
}
|
||||
|
||||
// The returned position may be accurate, if it matches the document version, or outdated, in which case
|
||||
// the client has to heuristically guess it's current position based on the local edits.
|
||||
public addRemoteCursorsUpdateListener(
|
||||
listener: (cursors: MaybeOutdatedClientCursors[]) => unknown
|
||||
): void {
|
||||
// CursorTracker registers its own event listener in the constructor so it must have been called before this
|
||||
this.webSocketManager.addRemoteCursorsUpdateListener(async () => {
|
||||
await this.updateLock.withLock(() =>
|
||||
listener(this.getRelevantAndPruneKnownClientCursors())
|
||||
);
|
||||
});
|
||||
}
|
||||
if (
|
||||
JSON.stringify(this.lastLocalCursorStateWithoutDirtyDocuments) ===
|
||||
JSON.stringify(documentsWithCursors)
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
public reset(): void {
|
||||
this.knownRemoteCursors = [];
|
||||
this.lastLocalCursorState = [];
|
||||
this.lastLocalCursorStateWithoutDirtyDocuments = [];
|
||||
this.updateLock.reset();
|
||||
}
|
||||
this.lastLocalCursorStateWithoutDirtyDocuments = documentsWithCursors;
|
||||
|
||||
private getRelevantAndPruneKnownClientCursors(): MaybeOutdatedClientCursors[] {
|
||||
const result: MaybeOutdatedClientCursors[] = [];
|
||||
const included = new Set<string>();
|
||||
this.webSocketManager.updateLocalCursors({ documentsWithCursors });
|
||||
}
|
||||
|
||||
const relevantCursors = [];
|
||||
for (const clientCursors of [...this.knownRemoteCursors].reverse()) {
|
||||
if (included.has(clientCursors.deviceId)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (clientCursors.upToDateness === DocumentUpToDateness.Later) {
|
||||
continue;
|
||||
}
|
||||
public reset(): void {
|
||||
this.knownRemoteCursors = [];
|
||||
this.lastLocalCursorState = [];
|
||||
this.lastLocalCursorStateWithoutDirtyDocuments = [];
|
||||
this.updateLock.reset();
|
||||
}
|
||||
|
||||
result.push({
|
||||
...clientCursors,
|
||||
isOutdated:
|
||||
clientCursors.upToDateness === DocumentUpToDateness.Prior
|
||||
});
|
||||
private getRelevantAndPruneKnownClientCursors(): MaybeOutdatedClientCursors[] {
|
||||
const result: MaybeOutdatedClientCursors[] = [];
|
||||
const included = new Set<string>();
|
||||
|
||||
included.add(clientCursors.deviceId);
|
||||
relevantCursors.unshift(clientCursors); // to reverse order back to normal
|
||||
}
|
||||
const relevantCursors = [];
|
||||
for (const clientCursors of [...this.knownRemoteCursors].reverse()) {
|
||||
if (included.has(clientCursors.deviceId)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
this.knownRemoteCursors = relevantCursors;
|
||||
if (clientCursors.upToDateness === DocumentUpToDateness.Later) {
|
||||
continue;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
result.push({
|
||||
...clientCursors,
|
||||
isOutdated:
|
||||
clientCursors.upToDateness === DocumentUpToDateness.Prior
|
||||
});
|
||||
|
||||
// We store up-to-dateness on a per-client basis to simplify the implementation.
|
||||
// An individual client won't have too many documents open at once, so this is a reasonable trade-off.
|
||||
private async getDocumentsUpToDateness(
|
||||
clientCursor: ClientCursors
|
||||
): Promise<DocumentUpToDateness> {
|
||||
const results = [];
|
||||
for (const document of clientCursor.documentsWithCursors) {
|
||||
results.push(await this.getDocumentUpToDateness(document));
|
||||
}
|
||||
included.add(clientCursors.deviceId);
|
||||
relevantCursors.unshift(clientCursors); // to reverse order back to normal
|
||||
}
|
||||
|
||||
if (
|
||||
results.every((result) => result === DocumentUpToDateness.UpToDate)
|
||||
) {
|
||||
return DocumentUpToDateness.UpToDate;
|
||||
}
|
||||
this.knownRemoteCursors = relevantCursors;
|
||||
|
||||
if (
|
||||
results.every(
|
||||
(result) =>
|
||||
result === DocumentUpToDateness.UpToDate ||
|
||||
result === DocumentUpToDateness.Prior
|
||||
)
|
||||
) {
|
||||
return DocumentUpToDateness.Prior;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
return DocumentUpToDateness.Later;
|
||||
}
|
||||
// We store up-to-dateness on a per-client basis to simplify the implementation.
|
||||
// An individual client won't have too many documents open at once, so this is a reasonable trade-off.
|
||||
private async getDocumentsUpToDateness(
|
||||
clientCursor: ClientCursors
|
||||
): Promise<DocumentUpToDateness> {
|
||||
const results = [];
|
||||
for (const document of clientCursor.documentsWithCursors) {
|
||||
results.push(await this.getDocumentUpToDateness(document));
|
||||
}
|
||||
|
||||
private async getDocumentUpToDateness(
|
||||
document: DocumentWithCursors
|
||||
): Promise<DocumentUpToDateness> {
|
||||
const record = this.database.getLatestDocumentByRelativePath(
|
||||
document.relative_path
|
||||
);
|
||||
if (
|
||||
results.every((result) => result === DocumentUpToDateness.UpToDate)
|
||||
) {
|
||||
return DocumentUpToDateness.UpToDate;
|
||||
}
|
||||
|
||||
if (!record) {
|
||||
// the document of the cursor must be from the future
|
||||
return DocumentUpToDateness.Later;
|
||||
}
|
||||
if (
|
||||
results.every(
|
||||
(result) =>
|
||||
result === DocumentUpToDateness.UpToDate ||
|
||||
result === DocumentUpToDateness.Prior
|
||||
)
|
||||
) {
|
||||
return DocumentUpToDateness.Prior;
|
||||
}
|
||||
|
||||
if (
|
||||
(record.metadata?.parentVersionId ?? 0) <
|
||||
(document.vault_update_id ?? 0)
|
||||
) {
|
||||
return DocumentUpToDateness.Later;
|
||||
} else if (
|
||||
(document.vault_update_id ?? 0) <
|
||||
(record.metadata?.parentVersionId ?? 0)
|
||||
) {
|
||||
// the document of the cursor must be from the past
|
||||
return DocumentUpToDateness.Prior;
|
||||
}
|
||||
return DocumentUpToDateness.Later;
|
||||
}
|
||||
|
||||
const currentContent = await this.fileOperations.read(
|
||||
document.relative_path
|
||||
);
|
||||
private async getDocumentUpToDateness(
|
||||
document: DocumentWithCursors
|
||||
): Promise<DocumentUpToDateness> {
|
||||
const record = this.database.getLatestDocumentByRelativePath(
|
||||
document.relative_path
|
||||
);
|
||||
|
||||
return this.database.getLatestDocumentByRelativePath(
|
||||
document.relative_path
|
||||
)?.metadata?.hash === hash(currentContent)
|
||||
? DocumentUpToDateness.UpToDate
|
||||
: DocumentUpToDateness.Prior;
|
||||
}
|
||||
if (!record) {
|
||||
// the document of the cursor must be from the future
|
||||
return DocumentUpToDateness.Later;
|
||||
}
|
||||
|
||||
if (
|
||||
(record.metadata?.parentVersionId ?? 0) <
|
||||
(document.vault_update_id ?? 0)
|
||||
) {
|
||||
return DocumentUpToDateness.Later;
|
||||
} else if (
|
||||
(document.vault_update_id ?? 0) <
|
||||
(record.metadata?.parentVersionId ?? 0)
|
||||
) {
|
||||
// the document of the cursor must be from the past
|
||||
return DocumentUpToDateness.Prior;
|
||||
}
|
||||
|
||||
const currentContent = await this.fileOperations.read(
|
||||
document.relative_path
|
||||
);
|
||||
|
||||
return this.database.getLatestDocumentByRelativePath(
|
||||
document.relative_path
|
||||
)?.metadata?.hash === hash(currentContent)
|
||||
? DocumentUpToDateness.UpToDate
|
||||
: DocumentUpToDateness.Prior;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,22 +1,12 @@
|
|||
import type { RelativePath } from "../persistence/database";
|
||||
import { removeFromArray } from "../utils/remove-from-array";
|
||||
import { EventListeners } from "../utils/data-structures/event-listeners";
|
||||
|
||||
export class FileChangeNotifier {
|
||||
private readonly listeners: ((filePath: RelativePath) => unknown)[] = [];
|
||||
public readonly onFileChanged = new EventListeners<
|
||||
(filePath: RelativePath) => unknown
|
||||
>();
|
||||
|
||||
public addFileChangeListener(
|
||||
listener: (filePath: RelativePath) => unknown
|
||||
): void {
|
||||
this.listeners.push(listener);
|
||||
}
|
||||
|
||||
public removeFileChangeListener(
|
||||
listener: (filePath: RelativePath) => unknown
|
||||
): void {
|
||||
removeFromArray(this.listeners, listener);
|
||||
}
|
||||
|
||||
public notifyOfFileChange(filePath: RelativePath): void {
|
||||
this.listeners.forEach((listener) => listener(filePath));
|
||||
}
|
||||
public notifyOfFileChange(filePath: RelativePath): void {
|
||||
this.onFileChanged.trigger(filePath);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
File diff suppressed because it is too large
Load diff
|
|
@ -1,87 +1,72 @@
|
|||
import { MAX_LOG_MESSAGE_COUNT } from "../consts";
|
||||
import { removeFromArray } from "../utils/remove-from-array";
|
||||
import { EventListeners } from "../utils/data-structures/event-listeners";
|
||||
|
||||
export enum LogLevel {
|
||||
DEBUG = "DEBUG",
|
||||
INFO = "INFO",
|
||||
WARNING = "WARNING",
|
||||
ERROR = "ERROR"
|
||||
DEBUG = "DEBUG",
|
||||
INFO = "INFO",
|
||||
WARNING = "WARNING",
|
||||
ERROR = "ERROR"
|
||||
}
|
||||
|
||||
const LOG_LEVEL_ORDER = {
|
||||
[LogLevel.DEBUG]: 0,
|
||||
[LogLevel.INFO]: 1,
|
||||
[LogLevel.WARNING]: 2,
|
||||
[LogLevel.ERROR]: 3
|
||||
[LogLevel.DEBUG]: 0,
|
||||
[LogLevel.INFO]: 1,
|
||||
[LogLevel.WARNING]: 2,
|
||||
[LogLevel.ERROR]: 3
|
||||
};
|
||||
|
||||
export class LogLine {
|
||||
public timestamp = new Date();
|
||||
public constructor(
|
||||
public level: LogLevel,
|
||||
public message: string
|
||||
) {}
|
||||
public timestamp = new Date();
|
||||
public constructor(
|
||||
public level: LogLevel,
|
||||
public message: string
|
||||
) { }
|
||||
}
|
||||
|
||||
export class Logger {
|
||||
private readonly messages: LogLine[] = [];
|
||||
private readonly onMessageListeners: ((message: LogLine) => unknown)[] = [];
|
||||
private readonly messages: LogLine[] = [];
|
||||
public readonly onLogEmitted = new EventListeners<
|
||||
(message: LogLine) => unknown
|
||||
>();
|
||||
|
||||
public constructor(
|
||||
...onMessageListeners: ((message: LogLine) => unknown)[]
|
||||
) {
|
||||
this.onMessageListeners = onMessageListeners;
|
||||
}
|
||||
|
||||
public debug(message: string): void {
|
||||
this.pushMessage(message, LogLevel.DEBUG);
|
||||
}
|
||||
public debug(message: string): void {
|
||||
this.pushMessage(message, LogLevel.DEBUG);
|
||||
}
|
||||
|
||||
public info(message: string): void {
|
||||
this.pushMessage(message, LogLevel.INFO);
|
||||
}
|
||||
public info(message: string): void {
|
||||
this.pushMessage(message, LogLevel.INFO);
|
||||
}
|
||||
|
||||
public warn(message: string): void {
|
||||
this.pushMessage(message, LogLevel.WARNING);
|
||||
}
|
||||
public warn(message: string): void {
|
||||
this.pushMessage(message, LogLevel.WARNING);
|
||||
}
|
||||
|
||||
public error(message: string): void {
|
||||
this.pushMessage(message, LogLevel.ERROR);
|
||||
}
|
||||
public error(message: string): void {
|
||||
this.pushMessage(message, LogLevel.ERROR);
|
||||
}
|
||||
|
||||
public getMessages(mininumSeverity: LogLevel): LogLine[] {
|
||||
return this.messages.filter(
|
||||
(message) =>
|
||||
LOG_LEVEL_ORDER[message.level] >=
|
||||
LOG_LEVEL_ORDER[mininumSeverity]
|
||||
);
|
||||
}
|
||||
public getMessages(mininumSeverity: LogLevel): LogLine[] {
|
||||
return this.messages.filter(
|
||||
(message) =>
|
||||
LOG_LEVEL_ORDER[message.level] >=
|
||||
LOG_LEVEL_ORDER[mininumSeverity]
|
||||
);
|
||||
}
|
||||
|
||||
public addOnMessageListener(listener: (message: LogLine) => unknown): void {
|
||||
this.onMessageListeners.push(listener);
|
||||
}
|
||||
public reset(): void {
|
||||
this.messages.length = 0;
|
||||
this.debug("Logger has been reset");
|
||||
}
|
||||
|
||||
public removeOnMessageListener(
|
||||
listener: (message: LogLine) => unknown
|
||||
): void {
|
||||
removeFromArray(this.onMessageListeners, listener);
|
||||
}
|
||||
private pushMessage(message: string, level: LogLevel): void {
|
||||
const logLine = new LogLine(level, message);
|
||||
this.messages.push(logLine);
|
||||
|
||||
public reset(): void {
|
||||
this.messages.length = 0;
|
||||
this.debug("Logger has been reset");
|
||||
}
|
||||
while (this.messages.length > MAX_LOG_MESSAGE_COUNT) {
|
||||
this.messages.shift();
|
||||
}
|
||||
|
||||
private pushMessage(message: string, level: LogLevel): void {
|
||||
const logLine = new LogLine(level, message);
|
||||
this.messages.push(logLine);
|
||||
|
||||
while (this.messages.length > MAX_LOG_MESSAGE_COUNT) {
|
||||
this.messages.shift();
|
||||
}
|
||||
|
||||
this.onMessageListeners.forEach((listener) => {
|
||||
listener(logLine);
|
||||
});
|
||||
}
|
||||
this.onLogEmitted.trigger(logLine);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,183 +1,169 @@
|
|||
import {
|
||||
MAX_HISTORY_ENTRY_COUNT,
|
||||
TIMEOUT_FOR_MERGING_HISTORY_ENTRIES_IN_SECONDS
|
||||
MAX_HISTORY_ENTRY_COUNT,
|
||||
TIMEOUT_FOR_MERGING_HISTORY_ENTRIES_IN_SECONDS
|
||||
} from "../consts";
|
||||
import type { RelativePath } from "../persistence/database";
|
||||
import type { Logger } from "./logger";
|
||||
import { removeFromArray } from "../utils/remove-from-array";
|
||||
import { EventListeners } from "../utils/data-structures/event-listeners";
|
||||
|
||||
export interface SyncCreateDetails {
|
||||
type: SyncType.CREATE;
|
||||
relativePath: RelativePath;
|
||||
type: SyncType.CREATE;
|
||||
relativePath: RelativePath;
|
||||
}
|
||||
|
||||
export interface SyncUpdateDetails {
|
||||
type: SyncType.UPDATE;
|
||||
relativePath: RelativePath;
|
||||
type: SyncType.UPDATE;
|
||||
relativePath: RelativePath;
|
||||
}
|
||||
|
||||
export interface SyncMovedDetails {
|
||||
type: SyncType.MOVE;
|
||||
relativePath: RelativePath;
|
||||
movedFrom: RelativePath;
|
||||
type: SyncType.MOVE;
|
||||
relativePath: RelativePath;
|
||||
movedFrom: RelativePath;
|
||||
}
|
||||
|
||||
export interface SyncDeleteDetails {
|
||||
type: SyncType.DELETE;
|
||||
relativePath: RelativePath;
|
||||
type: SyncType.DELETE;
|
||||
relativePath: RelativePath;
|
||||
}
|
||||
|
||||
export interface SyncSkippedDetails {
|
||||
type: SyncType.SKIPPED;
|
||||
relativePath: RelativePath;
|
||||
type: SyncType.SKIPPED;
|
||||
relativePath: RelativePath;
|
||||
}
|
||||
|
||||
export type SyncDetails =
|
||||
| SyncCreateDetails
|
||||
| SyncUpdateDetails
|
||||
| SyncDeleteDetails
|
||||
| SyncMovedDetails
|
||||
| SyncSkippedDetails;
|
||||
| SyncCreateDetails
|
||||
| SyncUpdateDetails
|
||||
| SyncDeleteDetails
|
||||
| SyncMovedDetails
|
||||
| SyncSkippedDetails;
|
||||
|
||||
export interface CommonHistoryEntry {
|
||||
status: SyncStatus;
|
||||
message: string;
|
||||
details: SyncDetails;
|
||||
author?: string;
|
||||
timestamp?: Date;
|
||||
status: SyncStatus;
|
||||
message: string;
|
||||
details: SyncDetails;
|
||||
author?: string;
|
||||
timestamp?: Date;
|
||||
}
|
||||
|
||||
export enum SyncType {
|
||||
CREATE = "CREATE",
|
||||
UPDATE = "UPDATE",
|
||||
DELETE = "DELETE",
|
||||
MOVE = "MOVE",
|
||||
SKIPPED = "SKIPPED"
|
||||
CREATE = "CREATE",
|
||||
UPDATE = "UPDATE",
|
||||
DELETE = "DELETE",
|
||||
MOVE = "MOVE",
|
||||
SKIPPED = "SKIPPED"
|
||||
}
|
||||
|
||||
export enum SyncStatus {
|
||||
SUCCESS = "SUCCESS",
|
||||
ERROR = "ERROR",
|
||||
SKIPPED = "SKIPPED"
|
||||
SUCCESS = "SUCCESS",
|
||||
ERROR = "ERROR",
|
||||
SKIPPED = "SKIPPED"
|
||||
}
|
||||
|
||||
export type HistoryEntry = CommonHistoryEntry & { timestamp: Date };
|
||||
|
||||
export interface HistoryStats {
|
||||
success: number;
|
||||
error: number;
|
||||
success: number;
|
||||
error: number;
|
||||
}
|
||||
|
||||
export class SyncHistory {
|
||||
private readonly _entries: HistoryEntry[] = [];
|
||||
private readonly _entries: HistoryEntry[] = [];
|
||||
|
||||
private readonly syncHistoryUpdateListeners: ((
|
||||
status: HistoryStats
|
||||
) => unknown)[] = [];
|
||||
public readonly onHistoryUpdated = new EventListeners<
|
||||
(status: HistoryStats) => unknown
|
||||
>();
|
||||
|
||||
private status: HistoryStats = {
|
||||
success: 0,
|
||||
error: 0
|
||||
};
|
||||
private status: HistoryStats = {
|
||||
success: 0,
|
||||
error: 0
|
||||
};
|
||||
|
||||
public constructor(private readonly logger: Logger) {}
|
||||
public constructor(private readonly logger: Logger) { }
|
||||
|
||||
public get entries(): readonly HistoryEntry[] {
|
||||
return this._entries;
|
||||
}
|
||||
public get entries(): readonly HistoryEntry[] {
|
||||
return this._entries;
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert the entry at the beginning of the history list. If the entry
|
||||
* already in the list, it will get moved to the beginning and updated.
|
||||
*
|
||||
* If the entry list is too long, the oldest entry will be removed.
|
||||
*/
|
||||
public addHistoryEntry(entry: CommonHistoryEntry): void {
|
||||
const historyEntry = {
|
||||
...entry,
|
||||
timestamp: entry.timestamp ?? new Date()
|
||||
};
|
||||
/**
|
||||
* Insert the entry at the beginning of the history list. If the entry
|
||||
* already in the list, it will get moved to the beginning and updated.
|
||||
*
|
||||
* If the entry list is too long, the oldest entry will be removed.
|
||||
*/
|
||||
public addHistoryEntry(entry: CommonHistoryEntry): void {
|
||||
const historyEntry = {
|
||||
...entry,
|
||||
timestamp: entry.timestamp ?? new Date()
|
||||
};
|
||||
|
||||
const candidate = this.findSimilarRecentUpdateEntry(historyEntry);
|
||||
if (candidate !== undefined) {
|
||||
removeFromArray(this._entries, candidate);
|
||||
}
|
||||
const candidate = this.findSimilarRecentUpdateEntry(historyEntry);
|
||||
if (candidate !== undefined) {
|
||||
removeFromArray(this._entries, candidate);
|
||||
}
|
||||
|
||||
// Insert the entry at the beginning
|
||||
this._entries.unshift(historyEntry);
|
||||
// Insert the entry at the beginning
|
||||
this._entries.unshift(historyEntry);
|
||||
|
||||
if (this._entries.length > MAX_HISTORY_ENTRY_COUNT) {
|
||||
this._entries.pop();
|
||||
}
|
||||
if (this._entries.length > MAX_HISTORY_ENTRY_COUNT) {
|
||||
this._entries.pop();
|
||||
}
|
||||
|
||||
this.updateSuccessCount(historyEntry);
|
||||
}
|
||||
this.updateSuccessCount(historyEntry);
|
||||
}
|
||||
|
||||
public addSyncHistoryUpdateListener(
|
||||
listener: (stats: HistoryStats) => unknown
|
||||
): void {
|
||||
this.syncHistoryUpdateListeners.push(listener);
|
||||
listener({ ...this.status });
|
||||
}
|
||||
|
||||
public removeSyncHistoryUpdateListener(
|
||||
listener: (stats: HistoryStats) => unknown
|
||||
): void {
|
||||
removeFromArray(this.syncHistoryUpdateListeners, listener);
|
||||
}
|
||||
|
||||
public reset(): void {
|
||||
this._entries.length = 0;
|
||||
this.status = {
|
||||
success: 0,
|
||||
error: 0
|
||||
};
|
||||
this.syncHistoryUpdateListeners.forEach((listener) => {
|
||||
listener(this.status);
|
||||
});
|
||||
}
|
||||
public reset(): void {
|
||||
this._entries.length = 0;
|
||||
this.status = {
|
||||
success: 0,
|
||||
error: 0
|
||||
};
|
||||
this.onHistoryUpdated.trigger(this.status);
|
||||
}
|
||||
|
||||
private findSimilarRecentUpdateEntry(
|
||||
entry: HistoryEntry
|
||||
): HistoryEntry | undefined {
|
||||
if (entry.details.type !== SyncType.UPDATE) {
|
||||
return;
|
||||
}
|
||||
private findSimilarRecentUpdateEntry(
|
||||
entry: HistoryEntry
|
||||
): HistoryEntry | undefined {
|
||||
if (entry.details.type !== SyncType.UPDATE) {
|
||||
return;
|
||||
}
|
||||
|
||||
const candidate = this._entries.find(
|
||||
(e) =>
|
||||
e.details.type === SyncType.UPDATE &&
|
||||
e.details.relativePath === entry.details.relativePath
|
||||
);
|
||||
if (
|
||||
candidate !== undefined &&
|
||||
(this._entries[0] === candidate ||
|
||||
candidate.timestamp.getTime() +
|
||||
TIMEOUT_FOR_MERGING_HISTORY_ENTRIES_IN_SECONDS * 1000 >
|
||||
entry.timestamp.getTime())
|
||||
) {
|
||||
return candidate;
|
||||
}
|
||||
}
|
||||
const candidate = this._entries.find(
|
||||
(e) =>
|
||||
e.details.type === SyncType.UPDATE &&
|
||||
e.details.relativePath === entry.details.relativePath
|
||||
);
|
||||
if (
|
||||
candidate !== undefined &&
|
||||
(this._entries[0] === candidate ||
|
||||
candidate.timestamp.getTime() +
|
||||
TIMEOUT_FOR_MERGING_HISTORY_ENTRIES_IN_SECONDS * 1000 >
|
||||
entry.timestamp.getTime())
|
||||
) {
|
||||
return candidate;
|
||||
}
|
||||
}
|
||||
|
||||
private updateSuccessCount(entry: HistoryEntry): void {
|
||||
const message = `${entry.details.relativePath} - ${entry.message} (${entry.details.type.toLocaleLowerCase()})`;
|
||||
switch (entry.status) {
|
||||
case SyncStatus.SUCCESS:
|
||||
this.status.success++;
|
||||
this.logger.info(`History entry: ${message}`);
|
||||
break;
|
||||
case SyncStatus.ERROR:
|
||||
this.status.error++;
|
||||
this.logger.error(`Cannot sync file: ${message}`);
|
||||
break;
|
||||
case SyncStatus.SKIPPED:
|
||||
this.logger.warn(`Skipping file: ${message}`);
|
||||
break;
|
||||
}
|
||||
private updateSuccessCount(entry: HistoryEntry): void {
|
||||
const message = `${entry.details.relativePath} - ${entry.message} (${entry.details.type.toLocaleLowerCase()})`;
|
||||
switch (entry.status) {
|
||||
case SyncStatus.SUCCESS:
|
||||
this.status.success++;
|
||||
this.logger.info(`History entry: ${message}`);
|
||||
break;
|
||||
case SyncStatus.ERROR:
|
||||
this.status.error++;
|
||||
this.logger.error(`Cannot sync file: ${message}`);
|
||||
break;
|
||||
case SyncStatus.SKIPPED:
|
||||
this.logger.warn(`Skipping file: ${message}`);
|
||||
break;
|
||||
}
|
||||
|
||||
this.syncHistoryUpdateListeners.forEach((listener) => {
|
||||
listener(this.status);
|
||||
});
|
||||
}
|
||||
this.onHistoryUpdated.trigger(this.status);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,147 @@
|
|||
import { describe, it } from "node:test";
|
||||
import assert from "node:assert";
|
||||
import { EventListeners } from "./event-listeners";
|
||||
|
||||
describe("EventListeners", () => {
|
||||
it("should add & remove listeners", () => {
|
||||
const listeners = new EventListeners<() => void>();
|
||||
const listener = () => { };
|
||||
|
||||
listeners.add(listener);
|
||||
|
||||
assert.strictEqual(listeners.count, 1);
|
||||
|
||||
const removed = listeners.remove(listener);
|
||||
assert.strictEqual(removed, true);
|
||||
assert.strictEqual(listeners.count, 0);
|
||||
});
|
||||
|
||||
|
||||
it("should remove listeners using unsubscribe function", () => {
|
||||
const listeners = new EventListeners<() => void>();
|
||||
const listener = () => { };
|
||||
|
||||
const unsubscribe = listeners.add(listener);
|
||||
unsubscribe();
|
||||
|
||||
assert.strictEqual(listeners.count, 0);
|
||||
});
|
||||
|
||||
it("should return false when removing non-existent listener", () => {
|
||||
const listeners = new EventListeners<() => void>();
|
||||
const listener = () => { };
|
||||
|
||||
const removed = listeners.remove(listener);
|
||||
|
||||
assert.strictEqual(removed, false);
|
||||
});
|
||||
|
||||
it("should handle multiple listeners", () => {
|
||||
const listeners = new EventListeners<() => void>();
|
||||
const listener1 = () => { };
|
||||
const listener2 = () => { };
|
||||
const listener3 = () => { };
|
||||
|
||||
listeners.add(listener1);
|
||||
listeners.add(listener2);
|
||||
listeners.add(listener3);
|
||||
|
||||
assert.strictEqual(listeners.count, 3);
|
||||
|
||||
listeners.remove(listener2);
|
||||
|
||||
assert.strictEqual(listeners.count, 2);
|
||||
});
|
||||
|
||||
it("should trigger all listeners synchronously", () => {
|
||||
const listeners = new EventListeners<(value: string) => void>();
|
||||
const calls: string[] = [];
|
||||
|
||||
listeners.add((value) => calls.push(`listener1-${value}`));
|
||||
listeners.add((value) => calls.push(`listener2-${value}`));
|
||||
|
||||
listeners.trigger("test");
|
||||
|
||||
assert.deepStrictEqual(calls, ["listener1-test", "listener2-test"]);
|
||||
});
|
||||
|
||||
it("should trigger listeners with multiple arguments", () => {
|
||||
const listeners = new EventListeners<
|
||||
(a: number, b: string, c: boolean) => void
|
||||
>();
|
||||
const calls: [number, string, boolean][] = [];
|
||||
|
||||
listeners.add((a, b, c) => calls.push([a, b, c]));
|
||||
listeners.trigger(42, "hello", true);
|
||||
|
||||
assert.deepStrictEqual(calls, [[42, "hello", true]]);
|
||||
});
|
||||
|
||||
it("should not trigger removed listeners", () => {
|
||||
const listeners = new EventListeners<() => void>();
|
||||
let count1 = 0;
|
||||
let count2 = 0;
|
||||
|
||||
const listener1 = () => {
|
||||
count1++;
|
||||
};
|
||||
const listener2 = () => {
|
||||
count2++;
|
||||
};
|
||||
|
||||
listeners.add(listener1);
|
||||
const unsubscribe = listeners.add(listener2);
|
||||
|
||||
unsubscribe();
|
||||
listeners.trigger();
|
||||
|
||||
assert.strictEqual(count1, 1);
|
||||
assert.strictEqual(count2, 0);
|
||||
});
|
||||
|
||||
it("should trigger all listeners and await promises", async () => {
|
||||
const listeners = new EventListeners<
|
||||
(value: string) => Promise<void> | void
|
||||
>();
|
||||
const results: string[] = [];
|
||||
|
||||
listeners.add(async (value) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
results.push(`async1-${value}`);
|
||||
});
|
||||
|
||||
listeners.add((value) => {
|
||||
results.push(`sync-${value}`);
|
||||
});
|
||||
|
||||
listeners.add(async (value) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 5));
|
||||
results.push(`async2-${value}`);
|
||||
});
|
||||
|
||||
await listeners.triggerAsync("test");
|
||||
|
||||
assert.ok(results.includes("async1-test"));
|
||||
assert.ok(results.includes("sync-test"));
|
||||
assert.ok(results.includes("async2-test"));
|
||||
assert.strictEqual(results.length, 3);
|
||||
});
|
||||
|
||||
|
||||
|
||||
it("should not trigger cleared listeners", () => {
|
||||
const listeners = new EventListeners<() => void>();
|
||||
let called = false;
|
||||
const listener = () => {
|
||||
called = true;
|
||||
};
|
||||
|
||||
listeners.add(listener);
|
||||
listeners.clear();
|
||||
|
||||
assert.strictEqual(listeners.count, 0);
|
||||
listeners.trigger();
|
||||
|
||||
assert.strictEqual(called, false);
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,71 @@
|
|||
import { removeFromArray } from "../remove-from-array";
|
||||
import { awaitAll } from "../await-all";
|
||||
|
||||
/**
|
||||
* A utility class for managing event listeners with type-safe add/remove operations.
|
||||
*/
|
||||
export class EventListeners<TListener extends (...args: any[]) => any> {
|
||||
private readonly listeners: TListener[] = [];
|
||||
|
||||
/**
|
||||
* Adds a new listener to the collection.
|
||||
*
|
||||
* @param listener The listener callback to add
|
||||
* @returns An unsubscribe function that removes this listener when called
|
||||
*/
|
||||
public add(listener: TListener): () => void {
|
||||
this.listeners.push(listener);
|
||||
return () => this.remove(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a listener from the collection.
|
||||
*
|
||||
* @param listener The listener callback to remove
|
||||
* @returns true if the listener was found and removed, false otherwise
|
||||
*/
|
||||
public remove(listener: TListener): boolean {
|
||||
return removeFromArray(this.listeners, listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* Triggers all listeners synchronously with the provided arguments.
|
||||
* Any returned promises are ignored. Use triggerAsync() to await them.
|
||||
*
|
||||
* @param args The arguments to pass to each listener
|
||||
*/
|
||||
public trigger(...args: Parameters<TListener>): void {
|
||||
this.listeners.forEach((listener) => {
|
||||
listener(...args);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Triggers all listeners and awaits any promises they return.
|
||||
* Synchronous listeners are called immediately, and any async listeners
|
||||
* are awaited in parallel.
|
||||
*
|
||||
* @param args The arguments to pass to each listener
|
||||
*/
|
||||
public async triggerAsync(...args: Parameters<TListener>): Promise<void> {
|
||||
await awaitAll(
|
||||
this.listeners
|
||||
.map((listener) => {
|
||||
return listener(...args);
|
||||
})
|
||||
.filter((result): result is Promise<unknown> => {
|
||||
return result instanceof Promise;
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
public clear(): void {
|
||||
this.listeners.length = 0;
|
||||
}
|
||||
|
||||
public get count(): number {
|
||||
return this.listeners.length;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -3,7 +3,7 @@ import type { LogLine } from "../../tracing/logger";
|
|||
import { LogLevel } from "../../tracing/logger";
|
||||
|
||||
export function logToConsole(client: SyncClient): void {
|
||||
client.logger.addOnMessageListener((logLine: LogLine) => {
|
||||
client.logger.onLogEmitted.add((logLine: LogLine) => {
|
||||
const formatted = `${logLine.timestamp.toISOString()} ${logLine.level} ${logLine.message}`;
|
||||
|
||||
switch (logLine.level) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue