split: sync-engine rewrite (sync-operations + sync-client.ts)

Replace the single unrestricted-syncer.ts with a two-loop architecture:
- syncer.ts drains the FIFO wire queue (HTTP + WS handlers).
- reconciler.ts moves files to make localPath match remoteRelativePath
  (topo-sorted move graph, in-memory cycle resolution with crash-safe
  swap markers).
- sync-event-queue.ts holds the byDocId / byLocalPath indexes and the
  pending-create promise chain.
- offline-change-detector.ts, expected-fs-events.ts, types.ts, and a
  rewritten cursor-tracker.ts / file-change-notifier.ts round it out.
Plus sync-client.ts wiring, tracing/sync-history.ts updates, index.ts
re-exports, and sync-client tsconfig/webpack/package.json.
This commit is contained in:
Andras Schmelczer 2026-05-08 21:37:26 +01:00
parent 0fda95ff8e
commit 42c9d55489
18 changed files with 5068 additions and 1257 deletions

View file

@ -2,8 +2,12 @@ import type { PersistenceProvider } from "./persistence/persistence";
import type { HistoryEntry, HistoryStats } from "./tracing/sync-history";
import { SyncHistory } from "./tracing/sync-history";
import { Logger, LogLevel, LogLine } from "./tracing/logger";
import type { RelativePath, StoredDatabase } from "./persistence/database";
import { Database } from "./persistence/database";
import type {
DocumentId,
RelativePath,
StoredSyncState
} from "./sync-operations/types";
import { SyncEventQueue } from "./sync-operations/sync-event-queue";
import * as Sentry from "@sentry/browser";
import type { SyncSettings } from "./persistence/settings";
import { DEFAULT_SETTINGS, Settings } from "./persistence/settings";
@ -12,7 +16,6 @@ import { Syncer } from "./sync-operations/syncer";
import type { FileSystemOperations } from "./file-operations/filesystem-operations";
import { FileOperations } from "./file-operations/file-operations";
import { FetchController } from "./services/fetch-controller";
import { UnrestrictedSyncer } from "./sync-operations/unrestricted-syncer";
import { rateLimit } from "./utils/rate-limit";
import type { NetworkConnectionStatus } from "./types/network-connection-status";
import { DocumentSyncStatus } from "./types/document-sync-status";
@ -24,42 +27,46 @@ import type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-c
import { FileChangeNotifier } from "./sync-operations/file-change-notifier";
import { FixedSizeDocumentCache } from "./utils/data-structures/fix-sized-cache";
import { setUpTelemetry } from "./utils/set-up-telemetry";
import { DIFF_CACHE_SIZE_MB } from "./consts";
import { ServerConfig } from "./services/server-config";
import type { EventListeners } from "./utils/data-structures/event-listeners";
import { Lock } from "./utils/data-structures/locks";
import { ExpectedFsEvents } from "./sync-operations/expected-fs-events";
export class SyncClient {
private hasStartedOfflineSync = false;
private hasFinishedOfflineSync = false;
private hasStarted = false;
private hasBeenDestroyed = false;
private unloadTelemetry?: () => void;
private isDestroying = false;
private readonly eventUnsubscribers: (() => void)[] = [];
private readonly settingsChangeLock = new Lock(
"SyncClient.onSettingsChange"
);
private constructor(
public readonly logger: Logger,
private readonly history: SyncHistory,
private readonly settings: Settings,
private readonly database: Database,
private readonly syncEventQueue: SyncEventQueue,
private readonly syncer: Syncer,
private readonly webSocketManager: WebSocketManager,
public readonly logger: Logger,
private readonly fetchController: FetchController,
private readonly cursorTracker: CursorTracker,
private readonly fileChangeNotifier: FileChangeNotifier,
private readonly contentCache: FixedSizeDocumentCache,
private readonly fileOperations: FileOperations,
private readonly serverConfig: ServerConfig,
private readonly syncService: SyncService,
private readonly expectedFsEvents: ExpectedFsEvents,
private readonly persistence: PersistenceProvider<
Partial<{
settings: Partial<SyncSettings>;
database: Partial<StoredDatabase>;
database: Partial<StoredSyncState>;
}>
>
) {}
public get documentCount(): number {
return this.database.length;
public get syncedDocumentCount(): number {
return this.syncEventQueue.syncedDocumentCount;
}
public get isWebSocketConnected(): boolean {
@ -73,6 +80,27 @@ export class SyncClient {
return this.history.onHistoryUpdated;
}
/**
* Fires whenever a tracked document's local file moves on disk
* watcher-driven user renames, post-create deconflicts placed by
* the reconciler, lost-rename replays in offline scan, slot
* displacements when another record claims a path. Both
* `oldPath` and `newPath` may be `undefined` (placement-pending
* state). Useful for callers that mirror disk-side path state
* e.g. test harnesses tracking which paths are safe to mutate
* and need a signal beyond the user-facing history.
*/
public get onDocumentPathChanged(): EventListeners<
(
documentId: DocumentId,
oldPath: RelativePath | undefined,
newPath: RelativePath | undefined
) => unknown
> {
this.checkIfDestroyed("onDocumentPathChanged getter");
return this.syncEventQueue.onDocumentPathChanged;
}
public get onSettingsChanged(): EventListeners<
(newSettings: SyncSettings, oldSettings: SyncSettings) => unknown
> {
@ -101,6 +129,13 @@ export class SyncClient {
return this.cursorTracker.onRemoteCursorsUpdated;
}
public get hasPendingWork(): boolean {
return (
this.syncEventQueue.pendingUpdateCount > 0 ||
this.webSocketManager.hasOutstandingWork
);
}
public static async create({
fs,
persistence,
@ -112,7 +147,8 @@ export class SyncClient {
persistence: PersistenceProvider<
Partial<{
settings: Partial<SyncSettings>;
database: Partial<StoredDatabase>;
database: Partial<StoredSyncState>;
deviceId: string;
}>
>;
fetch?: typeof globalThis.fetch;
@ -121,39 +157,46 @@ export class SyncClient {
}): Promise<SyncClient> {
const logger = new Logger();
const deviceId = createClientId();
logger.info(`Creating SyncClient with client id ${deviceId}`);
const history = new SyncHistory(logger);
let state = (await persistence.load()) ?? {
settings: undefined,
database: undefined
database: undefined,
deviceId: undefined
};
// Persist deviceId across destroy + init so the server's
// lost-create dedup (which scopes by device_id) can recognise
// a retry as belonging to the same client. Without this,
// every fresh `SyncClient` after a destroy would generate a
// new deviceId, the server-side query would miss, and the
// pending-but-lost create would deconflict instead of
// binding to the doc its content was already absorbed into.
let deviceId = state.deviceId;
if (deviceId === undefined) {
deviceId = createClientId();
state = { ...state, deviceId };
await persistence.save(state);
}
logger.info(`Creating SyncClient with client id ${deviceId}`);
const settings = new Settings(
logger,
state.settings,
async (data): Promise<void> => {
state = { ...state, settings: data };
// we're not rate-limiting settings saves as (1) we need to initialise the settings to know the rate limit
// and (2) settings changes are infrequent enough that rate-limiting is not necessary
await persistence.save(state);
}
);
const rateLimitedSave = rateLimit(
persistence.save,
() => settings.getSettings().minimumSaveIntervalMs
);
const database = new Database(
const syncEventQueue = new SyncEventQueue(
settings,
logger,
state.database,
async (data): Promise<void> => {
state = { ...state, database: data };
await rateLimitedSave(state);
await persistence.save(state);
}
);
@ -170,32 +213,23 @@ export class SyncClient {
fetch
);
const serverConfig = new ServerConfig(syncService);
const serverConfig = new ServerConfig(syncService, settings);
const expectedFsEvents = new ExpectedFsEvents();
const fileOperations = new FileOperations(
logger,
database,
fs,
serverConfig,
expectedFsEvents,
nativeLineEndings
);
const contentCache = new FixedSizeDocumentCache(
1024 * 1024 * DIFF_CACHE_SIZE_MB
);
const unrestrictedSyncer = new UnrestrictedSyncer(
logger,
database,
settings,
syncService,
fileOperations,
history,
contentCache,
serverConfig
1024 * 1024 * settings.getSettings().diffCacheSizeMB
);
const webSocketManager = new WebSocketManager(
deviceId,
logger,
settings,
webSocket
@ -204,34 +238,38 @@ export class SyncClient {
const syncer = new Syncer(
deviceId,
logger,
database,
settings,
syncService,
webSocketManager,
fileOperations,
unrestrictedSyncer
syncService,
history,
contentCache,
serverConfig,
syncEventQueue
);
const fileChangeNotifier = new FileChangeNotifier();
const cursorTracker = new CursorTracker(
database,
logger,
syncEventQueue,
webSocketManager,
fileOperations,
fileChangeNotifier
);
const client = new SyncClient(
logger,
history,
settings,
database,
syncEventQueue,
syncer,
webSocketManager,
logger,
fetchController,
cursorTracker,
fileChangeNotifier,
contentCache,
fileOperations,
serverConfig,
syncService,
expectedFsEvents,
persistence
);
@ -285,10 +323,10 @@ export class SyncClient {
}
/**
* Reload settings from disk overriding current in-memory settings.
* Missing values will be filled in from DEFAULT_SETTINGS rather than
* retaining current in-memory settings.
*/
* Reload settings from disk overriding current in-memory settings.
* Missing values will be filled in from DEFAULT_SETTINGS rather than
* retaining current in-memory settings.
*/
public async reloadSettings(): Promise<void> {
this.checkIfDestroyed("reloadSettings");
@ -320,10 +358,10 @@ export class SyncClient {
}
/**
* Wait for the in-flight operations to finish, reset all tracking,
* and the local database but retain the settings.
* The SyncClient can be used again after calling this method.
*/
* Wait for the in-flight operations to finish, reset all tracking,
* and the local state but retain the settings.
* The SyncClient can be used again after calling this method.
*/
public async reset(): Promise<void> {
this.checkIfDestroyed("reset");
@ -332,16 +370,16 @@ export class SyncClient {
);
await this.pause();
// clear all local state
this.logger.info("Resetting SyncClient's local state");
this.database.reset();
await this.database.save(); // ensure the new database reads as empty
await this.syncEventQueue.clearAllState();
await this.syncEventQueue.save();
this.resetInMemoryState();
this.hasStartedOfflineSync = false;
this.hasFinishedOfflineSync = false;
this.serverConfig.reset();
await this.startSyncing();
if (this.settings.getSettings().isSyncEnabled) {
await this.startSyncing();
}
}
public getSettings(): SyncSettings {
@ -363,40 +401,48 @@ export class SyncClient {
await this.settings.setSettings(value);
}
public async syncLocallyCreatedFile(
relativePath: RelativePath
): Promise<void> {
public syncLocallyCreatedFile(relativePath: RelativePath): void {
this.checkIfDestroyed("syncLocallyCreatedFile");
this.fileChangeNotifier.notifyOfFileChange(relativePath);
return this.syncer.syncLocallyCreatedFile(relativePath);
this.fileChangeNotifier.notifyOfFileChange(relativePath); // this is for updating cursors
if (this.expectedFsEvents.matchCreate(relativePath)) {
return;
}
this.syncer.syncLocallyCreatedFile(relativePath);
}
public async syncLocallyDeletedFile(
relativePath: RelativePath
): Promise<void> {
this.checkIfDestroyed("syncLocallyDeletedFile");
this.fileChangeNotifier.notifyOfFileChange(relativePath);
return this.syncer.syncLocallyDeletedFile(relativePath);
}
public async syncLocallyUpdatedFile({
public syncLocallyUpdatedFile({
oldPath,
relativePath
}: {
oldPath?: RelativePath;
relativePath: RelativePath;
}): Promise<void> {
}): void {
this.checkIfDestroyed("syncLocallyUpdatedFile");
this.fileChangeNotifier.notifyOfFileChange(relativePath);
return this.syncer.syncLocallyUpdatedFile({
this.fileChangeNotifier.notifyOfFileChange(relativePath); // this is for updating cursors
if (this.expectedFsEvents.matchUpdate(relativePath, oldPath)) {
return;
}
this.syncer.syncLocallyUpdatedFile({
oldPath,
relativePath
});
}
public syncLocallyDeletedFile(relativePath: RelativePath): void {
this.checkIfDestroyed("syncLocallyDeletedFile");
this.fileChangeNotifier.notifyOfFileChange(relativePath); // this is for updating cursors
if (this.expectedFsEvents.matchDelete(relativePath)) {
return;
}
this.syncer.syncLocallyDeletedFile(relativePath);
}
public getDocumentSyncingStatus(
relativePath: RelativePath
): DocumentSyncStatus {
@ -406,16 +452,11 @@ export class SyncClient {
return DocumentSyncStatus.SYNCING_IS_DISABLED;
}
if (!this.syncer.isFirstSyncComplete || !this.hasFinishedOfflineSync) {
if (!this.hasFinishedOfflineSync) {
return DocumentSyncStatus.SYNCING;
}
const document =
this.database.getLatestDocumentByRelativePath(relativePath);
if (document === undefined) {
return DocumentSyncStatus.SYNCING;
}
return document.updates.length > 0
return this.syncEventQueue.hasPendingEventsForPath(relativePath)
? DocumentSyncStatus.SYNCING
: DocumentSyncStatus.UP_TO_DATE;
}
@ -429,20 +470,20 @@ export class SyncClient {
}
public async waitUntilFinished(): Promise<void> {
this.checkIfDestroyed("waitUntilIdle");
await this.syncer.waitUntilFinished();
await this.webSocketManager.waitUntilFinished();
await this.database.save(); // flush all changes to disk
this.checkIfDestroyed("waitUntilFinished");
await this.waitUntilFinishedInternal();
}
/**
* Completely destroy the SyncClient, cancelling all in-progress operations.
* After calling this method, the SyncClient cannot be used again.
*/
* Completely destroy the SyncClient, cancelling all in-progress operations.
* After calling this method, the SyncClient cannot be used again.
*/
public async destroy(): Promise<void> {
this.checkIfDestroyed("destroy");
// Prevent concurrent destroy calls
if (this.hasBeenDestroyed) {
throw new Error(
"SyncClient has been destroyed and can no longer be used; called from destroy"
);
}
if (this.isDestroying) {
this.logger.warn(
"destroy() called while already destroying, ignoring"
@ -451,52 +492,92 @@ export class SyncClient {
}
this.isDestroying = true;
// cancel everything that's in progress
await this.pause();
// Run cleanup in `finally` so a thrown pause() — or anything else
// mid-shutdown — still leaves the client in the disposed state
// instead of bricked with subscribers/telemetry hanging on.
try {
await this.pause();
} finally {
this.hasBeenDestroyed = true;
this.hasBeenDestroyed = true;
this.resetInMemoryState();
this.resetInMemoryState();
this.eventUnsubscribers.forEach((unsubscribe) => {
unsubscribe();
});
this.eventUnsubscribers.length = 0;
// Clean up event listeners to prevent memory leaks
this.eventUnsubscribers.forEach((unsubscribe) => {
unsubscribe();
});
this.eventUnsubscribers.length = 0;
this.logger.info("SyncClient has been successfully disposed");
this.logger.info("SyncClient has been successfully disposed");
this.unloadTelemetry?.();
}
}
this.unloadTelemetry?.();
/**
* The actual drain separated from `waitUntilFinished` so internal
* shutdown paths (`pause` / `destroy`) can wait for in-flight work
* without tripping the public `checkIfDestroyed` guard, which exists
* only to keep external callers from continuing to use a disposed
* client.
*
* Loops because a WebSocket message handler completing is what enqueues
* a `RemoteChange` into the syncer; if we awaited the syncer first and
* the WS handler second, a message arriving mid-wait would leave a fresh
* drain pending while `save()` ran. Each iteration waits for both, then
* re-checks; we exit only once both report idle in the same pass.
*/
private async waitUntilFinishedInternal(): Promise<void> {
while (
this.webSocketManager.hasOutstandingWork ||
this.syncer.hasPendingWork
) {
await this.webSocketManager.waitUntilFinished();
await this.syncer.waitUntilFinished();
}
await this.syncEventQueue.save();
}
private async startSyncing(): Promise<void> {
this.checkIfDestroyed("startSyncing");
this.fetchController.finishReset();
// Undo any earlier `pause()` stop so retryForever keeps retrying.
this.syncService.resume();
await this.serverConfig.initialize();
await this.serverConfig.getConfig();
await this.syncer.scheduleSyncForOfflineChanges();
this.syncer.resumeDraining();
this.webSocketManager.start();
if (!this.hasStartedOfflineSync) {
this.hasStartedOfflineSync = true;
await this.syncer.scheduleSyncForOfflineChanges();
}
this.hasFinishedOfflineSync = true;
}
private async pause(): Promise<void> {
this.hasFinishedOfflineSync = false;
this.syncer.pauseDraining();
this.fetchController.startReset();
// Signal the service so any `retryForever` loop exits at its next
// iteration instead of continuing to retry a network request while
// the rest of the client is winding down.
this.syncService.stop();
await this.webSocketManager.stop();
await this.waitUntilFinished();
await this.waitUntilFinishedInternal();
// Clear the offline-scan gate so a subsequent `startSyncing()`
// re-runs the scan; otherwise any local changes made while sync was
// paused (offline edits, deletes, renames) wouldn't be detected, and
// an incoming remote update would silently overwrite them.
this.syncer.clearOfflineScanGate();
// Drop any expected fs events that were registered but never matched
// (e.g. an op aborted by SyncResetError). Otherwise a real user edit
// at the same path after re-enable would be swallowed.
this.expectedFsEvents.clear();
}
private resetInMemoryState(): void {
this.history.reset();
this.contentCache.reset();
// don't reset the logger
this.cursorTracker.reset();
this.syncer.reset();
this.fileOperations.reset();
}
private async onSettingsChange(
@ -505,36 +586,55 @@ export class SyncClient {
): Promise<void> {
this.checkIfDestroyed("onSettingsChange");
if (
newSettings.vaultName !== oldSettings.vaultName ||
newSettings.remoteUri !== oldSettings.remoteUri
) {
await this.reset();
}
if (newSettings.isSyncEnabled !== oldSettings.isSyncEnabled) {
if (newSettings.isSyncEnabled) {
await this.startSyncing();
} else {
await this.pause();
// Serialize listener invocations so back-to-back settings updates
// can't run reset()/pause()/startSyncing() concurrently.
await this.settingsChangeLock.withLock(async () => {
// The lock is FIFO, so by the time we run the client may have
// been destroyed in a queued invocation ahead of us.
if (this.hasBeenDestroyed) {
return;
}
}
if (newSettings.diffCacheSizeMB !== oldSettings.diffCacheSizeMB) {
this.contentCache.resize(newSettings.diffCacheSizeMB * 1024 * 1024);
}
const connectionChanged =
newSettings.vaultName !== oldSettings.vaultName ||
newSettings.remoteUri !== oldSettings.remoteUri;
if (newSettings.enableTelemetry !== oldSettings.enableTelemetry) {
if (newSettings.enableTelemetry) {
this.unloadTelemetry = setUpTelemetry();
} else {
this.unloadTelemetry?.();
if (connectionChanged) {
// reset() pauses, clears state, then starts iff isSyncEnabled
// — so any concurrent isSyncEnabled change is already applied.
await this.reset();
} else if (
newSettings.isSyncEnabled !== oldSettings.isSyncEnabled
) {
if (newSettings.isSyncEnabled) {
await this.startSyncing();
} else {
await this.pause();
}
}
}
if (newSettings.diffCacheSizeMB !== oldSettings.diffCacheSizeMB) {
this.contentCache.resize(
newSettings.diffCacheSizeMB * 1024 * 1024
);
}
if (newSettings.enableTelemetry !== oldSettings.enableTelemetry) {
if (newSettings.enableTelemetry) {
this.unloadTelemetry = setUpTelemetry();
} else {
this.unloadTelemetry?.();
}
}
});
}
private checkIfDestroyed(origin: string): void {
if (this.hasBeenDestroyed) {
// Reject new public-API entries the moment destroy() is called,
// not after `pause()` returns. Otherwise an external caller could
// pass the guard and start mutating state while destroy() is
// tearing down the websocket / clearing caches.
if (this.hasBeenDestroyed || this.isDestroying) {
throw new Error(
`SyncClient has been destroyed and can no longer be used; called from ${origin}`
);