Fix resetting

This commit is contained in:
Andras Schmelczer 2025-11-23 16:41:42 +00:00
parent 99d90d2e0c
commit cf68ff0ec1
11 changed files with 161 additions and 56 deletions

View file

@ -254,4 +254,8 @@ export class FileOperations {
return newName; return newName;
} }
public reset(): void {
this.fs.reset();
}
} }

View file

@ -138,4 +138,8 @@ export class SafeFileSystemOperations implements FileSystemOperations {
} }
} }
} }
public reset(): void {
this.locks.reset();
}
} }

View file

@ -133,7 +133,7 @@ export class Database {
toUpdate.metadata = metadata; toUpdate.metadata = metadata;
this.save(); this.saveInTheBackground();
} }
public removeDocumentPromise(promise: Promise<unknown>): void { public removeDocumentPromise(promise: Promise<unknown>): void {
@ -153,7 +153,7 @@ export class Database {
public removeDocument(find: DocumentRecord): void { public removeDocument(find: DocumentRecord): void {
this.documents = this.documents.filter((document) => document !== find); this.documents = this.documents.filter((document) => document !== find);
this.save(); this.saveInTheBackground();
} }
public getLatestDocumentByRelativePath( public getLatestDocumentByRelativePath(
@ -210,7 +210,7 @@ export class Database {
}; };
this.documents.push(entry); this.documents.push(entry);
this.save(); this.saveInTheBackground();
return entry; return entry;
} }
@ -234,7 +234,7 @@ export class Database {
}; };
this.documents.push(entry); this.documents.push(entry);
this.save(); this.saveInTheBackground();
return entry; return entry;
} }
@ -271,7 +271,7 @@ export class Database {
oldDocument.parallelVersion = oldDocument.parallelVersion =
newDocument !== undefined ? newDocument.parallelVersion + 1 : 0; newDocument !== undefined ? newDocument.parallelVersion + 1 : 0;
this.save(); this.saveInTheBackground();
} }
public delete(relativePath: RelativePath): void { public delete(relativePath: RelativePath): void {
@ -290,7 +290,7 @@ export class Database {
public setHasInitialSyncCompleted(value: boolean): void { public setHasInitialSyncCompleted(value: boolean): void {
this.hasInitialSyncCompleted = value; this.hasInitialSyncCompleted = value;
this.save(); this.saveInTheBackground();
} }
public getLastSeenUpdateId(): VaultUpdateId { public getLastSeenUpdateId(): VaultUpdateId {
@ -301,13 +301,13 @@ export class Database {
const previousMin = this.lastSeenUpdateIds.min; const previousMin = this.lastSeenUpdateIds.min;
this.lastSeenUpdateIds.add(value); this.lastSeenUpdateIds.add(value);
if (previousMin !== this.lastSeenUpdateIds.min) { if (previousMin !== this.lastSeenUpdateIds.min) {
this.save(); this.saveInTheBackground();
} }
} }
public setLastSeenUpdateId(value: number): void { public setLastSeenUpdateId(value: number): void {
this.lastSeenUpdateIds.min = value; this.lastSeenUpdateIds.min = value;
this.save(); this.saveInTheBackground();
} }
public reset(): void { public reset(): void {
@ -316,12 +316,18 @@ export class Database {
0 // the first updateId will be 1 which is the first integer after -1 0 // the first updateId will be 1 which is the first integer after -1
); );
this.hasInitialSyncCompleted = false; this.hasInitialSyncCompleted = false;
this.save(); this.saveInTheBackground();
} }
private save(): void { private saveInTheBackground(): void {
this.ensureConsistency(); this.ensureConsistency();
void this.saveData({ void this.save().catch((error: unknown) => {
this.logger.error(`Error saving data: ${error}`);
});
}
public save(): Promise<void> {
return this.saveData({
documents: this.resolvedDocuments.map( documents: this.resolvedDocuments.map(
({ relativePath, documentId, metadata }) => ({ ({ relativePath, documentId, metadata }) => ({
documentId, documentId,
@ -332,8 +338,6 @@ export class Database {
), ),
lastSeenUpdateId: this.lastSeenUpdateIds.min, lastSeenUpdateId: this.lastSeenUpdateIds.min,
hasInitialSyncCompleted: this.hasInitialSyncCompleted hasInitialSyncCompleted: this.hasInitialSyncCompleted
}).catch((error: unknown) => {
this.logger.error(`Error saving data: ${error}`);
}); });
} }

View file

@ -77,7 +77,7 @@ export class FetchController {
*/ */
public finishReset(): void { public finishReset(): void {
if (!this.isResetting) { if (!this.isResetting) {
throw new Error("Cannot finish reset when not resetting"); return;
} }
this.isResetting = false; this.isResetting = false;

View file

@ -21,13 +21,13 @@ export class WebSocketManager {
cursors: ClientCursors[] cursors: ClientCursors[]
) => Promise<void>)[] = []; ) => Promise<void>)[] = [];
private webSocket: WebSocket | undefined;
private isStopped = true; private isStopped = true;
private resolveDisconnectingPromise: null | (() => unknown) = null; private resolveDisconnectingPromise: null | (() => unknown) = null;
private reconnectTimeoutId: ReturnType<typeof setTimeout> | undefined; private reconnectTimeoutId: ReturnType<typeof setTimeout> | undefined;
private readonly outstandingPromises: Promise<unknown>[] = []; private readonly outstandingPromises: Promise<unknown>[] = [];
private webSocket: WebSocket | undefined;
private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket; private readonly webSocketFactoryImplementation: typeof globalThis.WebSocket;
public constructor( public constructor(

View file

@ -29,6 +29,8 @@ import { DIFF_CACHE_SIZE_MB } from "./consts";
export class SyncClient { export class SyncClient {
private hasStartedOfflineSync = false; private hasStartedOfflineSync = false;
private hasFinishedOfflineSync = false; private hasFinishedOfflineSync = false;
private hasStarted = false;
private hasBeenDestroyed = false;
private unloadTelemetry?: () => void; private unloadTelemetry?: () => void;
private constructor( private constructor(
@ -43,6 +45,7 @@ export class SyncClient {
private readonly cursorTracker: CursorTracker, private readonly cursorTracker: CursorTracker,
private readonly fileChangeNotifier: FileChangeNotifier, private readonly fileChangeNotifier: FileChangeNotifier,
private readonly contentCache: FixedSizeDocumentCache, private readonly contentCache: FixedSizeDocumentCache,
private readonly fileOperations: FileOperations,
private readonly persistence: PersistenceProvider< private readonly persistence: PersistenceProvider<
Partial<{ Partial<{
settings: Partial<SyncSettings>; settings: Partial<SyncSettings>;
@ -52,7 +55,17 @@ export class SyncClient {
) {} ) {}
public async start(): Promise<void> { public async start(): Promise<void> {
if (this.settings.getSettings().enableTelemetry) { this.checkIfDestroyed();
if (this.hasStarted) {
throw new Error("SyncClient has already been started");
}
this.hasStarted = true;
if (
!this.unloadTelemetry &&
this.settings.getSettings().enableTelemetry
) {
this.unloadTelemetry = setUpTelemetry(); this.unloadTelemetry = setUpTelemetry();
} }
@ -73,10 +86,14 @@ export class SyncClient {
} }
} }
// Reload settings from disk overriding current in-memory settings. /**
// Missing values will be filled in from DEFAULT_SETTINGS rather than * Reload settings from disk overriding current in-memory settings.
// retaining current in-memory settings. * Missing values will be filled in from DEFAULT_SETTINGS rather than
* retaining current in-memory settings.
*/
public async reloadSettings(): Promise<void> { public async reloadSettings(): Promise<void> {
this.checkIfDestroyed();
const state = (await this.persistence.load()) ?? { const state = (await this.persistence.load()) ?? {
settings: undefined settings: undefined
}; };
@ -93,15 +110,20 @@ export class SyncClient {
newSettings: SyncSettings, newSettings: SyncSettings,
oldSettings: SyncSettings oldSettings: SyncSettings
): Promise<void> { ): Promise<void> {
if (newSettings.vaultName !== oldSettings.vaultName) { this.checkIfDestroyed();
await this.reset();
if (
newSettings.vaultName !== oldSettings.vaultName ||
newSettings.remoteUri !== oldSettings.remoteUri
) {
await this.applyChangedConnectionSettings();
} }
if (newSettings.isSyncEnabled !== oldSettings.isSyncEnabled) { if (newSettings.isSyncEnabled !== oldSettings.isSyncEnabled) {
if (newSettings.isSyncEnabled) { if (newSettings.isSyncEnabled) {
await this.startSyncing(); await this.startSyncing();
} else { } else {
this.stop(); await this.pause();
} }
} }
@ -119,10 +141,14 @@ export class SyncClient {
} }
public get documentCount(): number { public get documentCount(): number {
this.checkIfDestroyed();
return this.database.length; return this.database.length;
} }
public get isWebSocketConnected(): boolean { public get isWebSocketConnected(): boolean {
this.checkIfDestroyed();
return this.webSocketManager.isWebSocketConnected; return this.webSocketManager.isWebSocketConnected;
} }
@ -203,7 +229,6 @@ export class SyncClient {
const fileOperations = new FileOperations( const fileOperations = new FileOperations(
logger, logger,
database, database,
settings,
fs, fs,
nativeLineEndings nativeLineEndings
); );
@ -258,6 +283,7 @@ export class SyncClient {
cursorTracker, cursorTracker,
fileChangeNotifier, fileChangeNotifier,
contentCache, contentCache,
fileOperations,
persistence persistence
); );
@ -267,6 +293,8 @@ export class SyncClient {
} }
public async checkConnection(): Promise<NetworkConnectionStatus> { public async checkConnection(): Promise<NetworkConnectionStatus> {
this.checkIfDestroyed();
const server = await this.syncService.checkConnection(); const server = await this.syncService.checkConnection();
return { return {
isSuccessful: server.isSuccessful, isSuccessful: server.isSuccessful,
@ -276,59 +304,94 @@ export class SyncClient {
} }
public getHistoryEntries(): readonly HistoryEntry[] { public getHistoryEntries(): readonly HistoryEntry[] {
this.checkIfDestroyed();
return this.history.entries; return this.history.entries;
} }
public addSyncHistoryUpdateListener( public addSyncHistoryUpdateListener(
listener: (stats: HistoryStats) => unknown listener: (stats: HistoryStats) => unknown
): void { ): void {
this.checkIfDestroyed();
this.history.addSyncHistoryUpdateListener(listener); this.history.addSyncHistoryUpdateListener(listener);
} }
private async startSyncing(): Promise<void> { private async startSyncing(): Promise<void> {
this.checkIfDestroyed();
if (!this.hasStartedOfflineSync) { if (!this.hasStartedOfflineSync) {
this.hasStartedOfflineSync = true; this.hasStartedOfflineSync = true;
await this.syncer.scheduleSyncForOfflineChanges(); await this.syncer.scheduleSyncForOfflineChanges();
} }
this.hasFinishedOfflineSync = true; this.hasFinishedOfflineSync = true;
this.webSocketManager.start();
}
private stop(): void {
this.hasFinishedOfflineSync = false;
this.webSocketManager.stop();
this.unloadTelemetry?.();
}
public async waitUntilStopped(): Promise<void> {
await this.syncer.waitUntilFinished();
}
public async applyChangedConnectionSettings(): Promise<void> {
this.fetchController.startReset();
this.webSocketManager.stop();
this.webSocketManager.start();
this.fetchController.finishReset(); this.fetchController.finishReset();
this.webSocketManager.start();
} }
/// Wait for the in-flight operations to finish, reset all tracking, /**
/// and the local database but retain the settings. * Wait for the in-flight operations to finish, reset all tracking,
/// The SyncClient can be used again after calling this method. * and the local database but retain the settings.
private async reset(): Promise<void> { * The SyncClient can be used again after calling this method.
this.stop(); */
this.fetchController.startReset(); public async applyChangedConnectionSettings(): Promise<void> {
this.contentCache.clear(); this.checkIfDestroyed();
await this.syncer.reset();
this.history.reset(); this.logger.info(
"Stopping SyncClient to apply changed connection settings"
);
await this.pause();
// clear all local state
this.logger.info("Resetting SyncClient's local state");
this.database.reset(); this.database.reset();
this.logger.reset(); await this.database.save(); // ensure the new database reads as empty
this.resetInMemoryState();
this.hasStartedOfflineSync = false;
this.hasFinishedOfflineSync = false;
// restart syncing
this.fetchController.finishReset(); this.fetchController.finishReset();
await this.startSyncing(); await this.startSyncing();
} }
/**
* Completely destroy the SyncClient, cancelling all in-progress operations.
* After calling this method, the SyncClient cannot be used again.
*/
public async destroy(): Promise<void> {
this.checkIfDestroyed();
// cancel everything that's in progress
this.fetchController.startReset();
await this.pause();
// clean-up memory early
this.resetInMemoryState();
this.logger.info("SyncClient has been successfully disposed");
this.unloadTelemetry?.();
}
private async pause(): Promise<void> {
this.checkIfDestroyed();
this.fetchController.startReset();
await this.webSocketManager.stop();
await this.syncer.waitUntilFinished();
await this.database.save(); // flush all changes to disk
}
private resetInMemoryState(): void {
this.history.reset();
this.contentCache.reset();
this.logger.reset();
this.cursorTracker.reset();
this.syncer.reset();
this.fileOperations.reset();
}
public getSettings(): SyncSettings { public getSettings(): SyncSettings {
return this.settings.getSettings(); return this.settings.getSettings();
} }
@ -420,4 +483,12 @@ export class SyncClient {
): void { ): void {
this.cursorTracker.addRemoteCursorsUpdateListener(listener); this.cursorTracker.addRemoteCursorsUpdateListener(listener);
} }
private checkIfDestroyed(): void {
if (this.hasBeenDestroyed) {
throw new Error(
"SyncClient has been destroyed and can no longer be used."
);
}
}
} }

View file

@ -250,4 +250,11 @@ export class CursorTracker {
? DocumentUpToDateness.UpToDate ? DocumentUpToDateness.UpToDate
: DocumentUpToDateness.Prior; : DocumentUpToDateness.Prior;
} }
public reset(): void {
this.knownRemoteCursors = [];
this.lastLocalCursorState = [];
this.lastLocalCursorStateWithoutDirtyDocuments = [];
this.updateLock.reset();
}
} }

View file

@ -32,7 +32,6 @@ export class Syncer {
private readonly syncQueue: PQueue; private readonly syncQueue: PQueue;
private _isFirstSyncComplete = false; private _isFirstSyncComplete = false;
private runningScheduleSyncForOfflineChanges: Promise<void> | undefined; private runningScheduleSyncForOfflineChanges: Promise<void> | undefined;
public constructor( public constructor(
@ -514,4 +513,11 @@ export class Syncer {
this.database.setHasInitialSyncCompleted(true); this.database.setHasInitialSyncCompleted(true);
} }
public reset(): void {
this._isFirstSyncComplete = false;
this.syncQueue.clear();
this.remoteDocumentsLock.reset();
this.runningScheduleSyncForOfflineChanges = undefined;
}
} }

View file

@ -89,7 +89,7 @@ describe("fixedSizeDocumentCache", () => {
assert.equal(cache.get(1), doc1); assert.equal(cache.get(1), doc1);
assert.equal(cache.get(2), doc2); assert.equal(cache.get(2), doc2);
cache.clear(); cache.reset();
assert.equal(cache.get(1), undefined); assert.equal(cache.get(1), undefined);
assert.equal(cache.get(2), undefined); assert.equal(cache.get(2), undefined);

View file

@ -57,7 +57,7 @@ export class FixedSizeDocumentCache {
this.fitBelowMaxSize(); this.fitBelowMaxSize();
} }
public clear(): void { public reset(): void {
this.cache.clear(); this.cache.clear();
this.head = null; this.head = null;
this.tail = null; this.tail = null;

View file

@ -131,6 +131,11 @@ export class Locks<T> {
this.locked.delete(key); this.locked.delete(key);
} }
} }
public reset(): void {
this.locked.clear();
this.waiters.clear();
}
} }
export class Lock { export class Lock {
@ -143,4 +148,8 @@ export class Lock {
public async withLock<R>(fn: () => R | Promise<R>): Promise<R> { public async withLock<R>(fn: () => R | Promise<R>): Promise<R> {
return this.locks.withLock(true, fn); return this.locks.withLock(true, fn);
} }
public reset(): void {
this.locks.reset();
}
} }