This commit is contained in:
Andras Schmelczer 2026-04-25 19:13:26 +01:00
parent 7f62273e72
commit bff3f5a5e9
8 changed files with 167 additions and 79 deletions

View file

@ -42,7 +42,7 @@ Clients always start with syncing disabled.
```sh
# Build server first
cd sync-server && cargo build --release
cd sync-server && cargo build --release && cd -
# Run all tests
cd frontend && npm run test -w deterministic-tests

View file

@ -246,6 +246,67 @@ describe("WebSocketManager", () => {
await manager.stop();
});
it("handles concurrent stop() calls without stranding either caller", async () => {
// Real WebSocket.close() doesn't fire onclose synchronously, and the
// socket stays reachable across the close handshake. Model that
// here so the manager's `while (isWebSocketConnected)` loop is
// actually awaiting when the second stop() races in. Static OPEN
// is required because the manager compares readyState against
// `factory.OPEN`.
class AsyncCloseWebSocket extends MockWebSocket {
public static readonly OPEN = WebSocket.OPEN;
public override close(code?: number, reason?: string): void {
if (
this.readyState === WebSocket.CLOSED ||
(this as { _closing?: boolean })._closing === true
) {
return;
}
(this as { _closing?: boolean })._closing = true;
setTimeout(() => {
this.readyState = WebSocket.CLOSED;
this.onclose?.(
new MockCloseEvent("close", {
code: code ?? 1000,
reason: reason ?? ""
})
);
}, 5);
}
}
const manager = new WebSocketManager(
mockLogger,
mockSettings,
AsyncCloseWebSocket as unknown as typeof WebSocket
);
manager.start();
await new Promise((resolve) => setTimeout(resolve, 50));
const start = Date.now();
// Two concurrent stops mimic destroy() racing onSettingsChange.
await Promise.all([manager.stop(), manager.stop()]);
const elapsed = Date.now() - start;
// Both should resolve via the normal close path; if the second call
// had clobbered the first's resolver, the first would have been
// stranded until the 10s disconnect timeout.
assert.ok(
elapsed < 1000,
`concurrent stop() took ${elapsed}ms — expected fast resolution`
);
const errorCalls = (
mockLogger.error as unknown as { calls: unknown[] }
).calls;
assert.strictEqual(
errorCalls.length,
0,
"no timeout-recovery error should be logged"
);
});
it("tracks message handling promises", async () => {
const manager = new WebSocketManager(
mockLogger,

View file

@ -28,6 +28,7 @@ export class WebSocketManager {
private isStopped = true;
private resolveDisconnectingPromise: null | (() => unknown) = null;
private stopPromise: Promise<void> | null = null;
private reconnectTimeoutId: ReturnType<typeof setTimeout> | undefined;
private connectionTimeoutId: ReturnType<typeof setTimeout> | undefined;
@ -58,6 +59,17 @@ export class WebSocketManager {
}
public async stop(): Promise<void> {
// Concurrent callers (e.g. destroy() and onSettingsChange) must share
// the same disconnect; otherwise the second call would overwrite
// resolveDisconnectingPromise and strand the first caller's await
// until the timeout rejects.
this.stopPromise ??= this.performStop().finally(() => {
this.stopPromise = null;
});
await this.stopPromise;
}
private async performStop(): Promise<void> {
const { promise, resolve } = Promise.withResolvers<undefined>();
this.resolveDisconnectingPromise = (): void => {
resolve(undefined);
@ -98,7 +110,7 @@ export class WebSocketManager {
`Error while waiting for WebSocket to close: ${String(error)}`
);
// Force cleanup even if close didn't work
this.resolveDisconnectingPromise();
this.resolveDisconnectingPromise?.();
this.resolveDisconnectingPromise = null;
} finally {
// Clear timeout to prevent unhandled rejection

View file

@ -26,6 +26,7 @@ import { setUpTelemetry } from "./utils/set-up-telemetry";
import { DIFF_CACHE_SIZE_MB } from "./consts";
import { ServerConfig } from "./services/server-config";
import type { EventListeners } from "./utils/data-structures/event-listeners";
import { Lock } from "./utils/data-structures/locks";
export class SyncClient {
private hasFinishedOfflineSync = false;
@ -34,6 +35,7 @@ export class SyncClient {
private unloadTelemetry?: () => void;
private isDestroying = false;
private readonly eventUnsubscribers: (() => void)[] = [];
private readonly settingsChangeLock = new Lock("SyncClient.onSettingsChange");
private constructor(
public readonly logger: Logger,
@ -490,32 +492,45 @@ export class SyncClient {
): Promise<void> {
this.checkIfDestroyed("onSettingsChange");
if (
newSettings.vaultName !== oldSettings.vaultName ||
newSettings.remoteUri !== oldSettings.remoteUri
) {
await this.reset();
}
if (newSettings.isSyncEnabled !== oldSettings.isSyncEnabled) {
if (newSettings.isSyncEnabled) {
await this.startSyncing();
} else {
await this.pause();
// Serialize listener invocations so back-to-back settings updates
// can't run reset()/pause()/startSyncing() concurrently.
await this.settingsChangeLock.withLock(async () => {
// The lock is FIFO, so by the time we run the client may have
// been destroyed in a queued invocation ahead of us.
if (this.hasBeenDestroyed) {
return;
}
}
if (newSettings.diffCacheSizeMB !== oldSettings.diffCacheSizeMB) {
this.contentCache.resize(newSettings.diffCacheSizeMB * 1024 * 1024);
}
const connectionChanged =
newSettings.vaultName !== oldSettings.vaultName ||
newSettings.remoteUri !== oldSettings.remoteUri;
if (newSettings.enableTelemetry !== oldSettings.enableTelemetry) {
if (newSettings.enableTelemetry) {
this.unloadTelemetry = setUpTelemetry();
} else {
this.unloadTelemetry?.();
if (connectionChanged) {
// reset() pauses, clears state, then starts iff isSyncEnabled
// — so any concurrent isSyncEnabled change is already applied.
await this.reset();
} else if (newSettings.isSyncEnabled !== oldSettings.isSyncEnabled) {
if (newSettings.isSyncEnabled) {
await this.startSyncing();
} else {
await this.pause();
}
}
}
if (newSettings.diffCacheSizeMB !== oldSettings.diffCacheSizeMB) {
this.contentCache.resize(
newSettings.diffCacheSizeMB * 1024 * 1024
);
}
if (newSettings.enableTelemetry !== oldSettings.enableTelemetry) {
if (newSettings.enableTelemetry) {
this.unloadTelemetry = setUpTelemetry();
} else {
this.unloadTelemetry?.();
}
}
});
}
private checkIfDestroyed(origin: string): void {

View file

@ -45,6 +45,7 @@ export async function scheduleOfflineChanges(
}
}
const renamedPaths = new Set<RelativePath>();
for (const path of locallyPossibleCreatedFiles) {
const content = await operations.read(path);
const contentHash = await hash(content);
@ -62,11 +63,12 @@ export async function scheduleOfflineChanges(
relativePath: path
});
removeFromArray(locallyPossiblyDeletedFiles, matchingDeletedFile);
removeFromArray(locallyPossibleCreatedFiles, path);
renamedPaths.add(path);
}
}
for (const path of locallyPossibleCreatedFiles) {
if (renamedPaths.has(path)) continue;
logger.debug(
`File ${path} was created while offline, scheduling sync to create it`
);

View file

@ -141,13 +141,9 @@ export class SyncEventQueue {
}
if (input.type === SyncEventType.LocalDelete) {
const deleteId = pendingDocumentId ?? documentId;
if (deleteId === undefined) {
throw new Error("Unreachable: deleteId must be defined here");
}
this.events.push({
type: SyncEventType.LocalDelete,
documentId: deleteId
documentId: (pendingDocumentId ?? documentId)!
});
return;
}
@ -174,16 +170,11 @@ export class SyncEventQueue {
}
await this.save();
}
return;
}
const updateId = pendingDocumentId ?? documentId;
if (updateId === undefined) {
throw new Error("Unreachable: updateId must be defined here");
}
this.events.push({
type: SyncEventType.LocalUpdate,
documentId: updateId,
documentId: (pendingDocumentId ?? documentId)!,
path,
originalPath: path
});

View file

@ -162,11 +162,6 @@ export class Syncer {
public reset(): void {
this._isFirstSyncStarted = false;
this.queue.clearPending();
// Don't null the reference synchronously — if the scan is
// still in flight, the next reconnect would spawn a second
// concurrent scan racing on the same queue. Defer the
// clear until the in-flight task actually resolves, so a
// fresh scan can only start once the prior one is done.
const current = this.runningScheduleSyncForOfflineChanges;
if (current !== undefined) {
void current.finally(() => {
@ -619,11 +614,17 @@ export class Syncer {
record: DocumentRecord,
remoteVersion: DocumentVersionWithoutContent
): Promise<void> {
if (record.parentVersionId >= remoteVersion.vaultUpdateId) {
this.logger.debug(`Document ${path} is already up-to-date`);
return;
}
// wait for a local edit to do the actual updating here, so we can't even update the lastSeenUpdateId here
const conflictingDoc = this.queue.getSettledDocumentByPath(
remoteVersion.relativePath
);
const actualPath = await this.operations.move(
path,
remoteVersion.relativePath,
(conflictingDoc?.parentVersionId ?? 0) < remoteVersion.vaultUpdateId
? MoveOnConflict.EXISTING
: MoveOnConflict.NEW
);
if (
!this.queue.hasPendingLocalEventsForDocumentId(
remoteVersion.documentId
@ -645,39 +646,45 @@ export class Syncer {
);
this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId;
} // else we don't need to update the content, a subsequent local update will do that
else {
void this.syncRemotelyUpdatedFile({
// schedule it so that the lastSeenUpdateId remains consistent
document: remoteVersion
});
void this.syncRemotelyUpdatedFile({
// schedule it so that the lastSeenUpdateId remains consistent
document: remoteVersion
});
// wait for a local edit to do the actual updating here, so we can't even update the lastSeenUpdateId here
const conflictingDoc = this.queue.getSettledDocumentByPath(
remoteVersion.relativePath
);
const actualRelativePath = await this.operations.move(
path,
remoteVersion.relativePath,
(conflictingDoc?.parentVersionId ?? 0) < remoteVersion.vaultUpdateId
? MoveOnConflict.EXISTING
: MoveOnConflict.NEW
);
await this.queue.setDocument(actualRelativePath, {
...record,
remoteRelativePath: actualRelativePath
});
await this.queue.setDocument(actualPath, {
...record,
remoteRelativePath: actualPath
});
}
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: {
type: SyncType.MOVE,
relativePath: actualRelativePath,
movedFrom: path
},
// todo: eh
message: `File was renamed remotely from ${path} to ${actualRelativePath}`
});
if (actualPath !== path) {
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: {
type: SyncType.MOVE,
relativePath: actualPath,
movedFrom: path
},
message: `File was renamed remotely from ${path} to ${actualPath}`,
author: remoteVersion.userId,
timestamp: new Date(remoteVersion.updatedDate)
});
} else {
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: {
type: SyncType.UPDATE,
relativePath: actualPath
},
message: "Successfully applied remote update",
author: remoteVersion.userId,
timestamp: new Date(remoteVersion.updatedDate)
});
}
}
private async processRemoteCreateForNewDocument(

View file

@ -1,8 +1,8 @@
export async function hash(content: Uint8Array): Promise<string> {
// Copy into a fresh ArrayBuffer-backed Uint8Array so the buffer type
// matches `BufferSource`/`Uint8Array<ArrayBuffer>` expected by digest.
const owned = new Uint8Array(content);
const digest = await crypto.subtle.digest("SHA-256", owned);
const digest = await crypto.subtle.digest(
"SHA-256",
content as Uint8Array<ArrayBuffer>
);
const bytes = new Uint8Array(digest);
return Array.from(bytes, (b) => b.toString(16).padStart(2, "0")).join("");
}