more ai changes

This commit is contained in:
Andras Schmelczer 2026-04-25 20:29:38 +01:00
parent bff3f5a5e9
commit 8ce33541a3
11 changed files with 208 additions and 185 deletions

View file

@ -45,7 +45,7 @@ Clients always start with syncing disabled.
cd sync-server && cargo build --release && cd -
# Run all tests
cd frontend && npm run test -w deterministic-tests
cd frontend && npm run build -w sync-client && npm run test -w deterministic-tests
# Filter by name
npm run test -w deterministic-tests -- --filter=rename

View file

@ -8,6 +8,7 @@ import { isFileTypeMergable } from "../utils/is-file-type-mergable";
import { isBinary } from "../utils/is-binary";
import { buildConflictFileName } from "../sync-operations/conflict-path";
import type { ServerConfig } from "../services/server-config";
import { FileNotFoundError } from "../errors/file-not-found-error";
export enum MoveOnConflict {
EXISTING = "EXISTING",
@ -95,67 +96,83 @@ export class FileOperations {
return;
}
if (
!isFileTypeMergable(
path,
(await this.serverConfig.getConfig()).mergeableFileExtensions
) ||
isBinary(expectedContent) ||
isBinary(newContent)
) {
this.logger.debug(
`The expected content is not mergable, so we won't perform a 3-way merge, just overwrite it`
);
await this.fs.write(
path,
// `newContent` might not be binary so we still have to ensure the line endings are correct
this.toNativeLineEndings(newContent)
);
return;
}
let expectedText = "";
let newText = "";
// The exists() check above is racy: between it returning true and
// any of the writes below running, the file can be deleted. The
// safe wrapper around `atomicUpdateText` raises FileNotFoundError
// in that window — treat it the same as the upfront-missing case
// (skip silently) so callers see one consistent outcome regardless
// of when the deletion happened to occur.
try {
expectedText = new TextDecoder("utf-8", { fatal: true }).decode(
expectedContent
); // this comes from a previous read which must only have \n line endings
newText = new TextDecoder("utf-8", { fatal: true }).decode(
newContent
); // this comes from the server which stores text with \n line endings
} catch (decodeError) {
this.logger.warn(
`3-way merge aborted for ${path}: one of expected/new is not valid UTF-8 (${decodeError}); falling back to overwrite`
);
await this.fs.write(path, this.toNativeLineEndings(newContent));
return;
}
await this.fs.atomicUpdateText(
path,
({ text, cursors }: TextWithCursors): TextWithCursors => {
if (
!isFileTypeMergable(
path,
(await this.serverConfig.getConfig()).mergeableFileExtensions
) ||
isBinary(expectedContent) ||
isBinary(newContent)
) {
this.logger.debug(
`Performing a 3-way merge for ${path} with the expected content`
`The expected content is not mergable, so we won't perform a 3-way merge, just overwrite it`
);
text = text.replaceAll(this.nativeLineEndings, "\n");
const merged = reconcile(
expectedText,
{ text, cursors },
newText
await this.fs.write(
path,
// `newContent` might not be binary so we still have to ensure the line endings are correct
this.toNativeLineEndings(newContent)
);
const resultText = merged.text.replaceAll(
"\n",
this.nativeLineEndings
);
return {
text: resultText,
cursors: merged.cursors
};
return;
}
);
let expectedText = "";
let newText = "";
try {
expectedText = new TextDecoder("utf-8", { fatal: true }).decode(
expectedContent
); // this comes from a previous read which must only have \n line endings
newText = new TextDecoder("utf-8", { fatal: true }).decode(
newContent
); // this comes from the server which stores text with \n line endings
} catch (decodeError) {
this.logger.warn(
`3-way merge aborted for ${path}: one of expected/new is not valid UTF-8 (${decodeError}); falling back to overwrite`
);
await this.fs.write(path, this.toNativeLineEndings(newContent));
return;
}
await this.fs.atomicUpdateText(
path,
({ text, cursors }: TextWithCursors): TextWithCursors => {
this.logger.debug(
`Performing a 3-way merge for ${path} with the expected content`
);
text = text.replaceAll(this.nativeLineEndings, "\n");
const merged = reconcile(
expectedText,
{ text, cursors },
newText
);
const resultText = merged.text.replaceAll(
"\n",
this.nativeLineEndings
);
return {
text: resultText,
cursors: merged.cursors
};
}
);
} catch (e) {
if (e instanceof FileNotFoundError) {
this.logger.debug(
`File ${path} disappeared during write; not recreating`
);
return;
}
throw e;
}
}
public async delete(path: RelativePath): Promise<void> {

View file

@ -342,7 +342,7 @@ export class SyncService {
const url = new URL(this.getUrl("/documents"));
if (since !== undefined) {
url.searchParams.append("since", since.toString());
url.searchParams.append("since_update_id", since.toString());
}
const response = await this.client(url.toString(), {
headers: this.getDefaultHeaders()

View file

@ -204,19 +204,22 @@ export class WebSocketManager {
this.logger.info(`Connecting to WebSocket at ${wsUri.toString()}`);
this.webSocket = new this.webSocketFactoryImplementation(wsUri);
const ws = new this.webSocketFactoryImplementation(wsUri);
this.webSocket = ws;
// Set connection timeout to handle cases where server is down and the WebSocket connection won't open
// Set connection timeout to handle cases where server is down and the WebSocket connection won't open.
// The callback closes the *captured* `ws` rather than `this.webSocket` so a delayed timeout cannot
// accidentally close a freshly-constructed replacement socket. (Closing the already-closed `ws` is a no-op.)
this.connectionTimeoutId = setTimeout(() => {
this.connectionTimeoutId = undefined;
this.logger.warn(
`WebSocket connection timeout after ${WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS} seconds`
);
// Force close to trigger onclose handler which will schedule reconnection
this.webSocket?.close(1000, "Connection timeout");
ws.close(1000, "Connection timeout");
}, WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS * 1000);
this.webSocket.onopen = (): void => {
ws.onopen = (): void => {
if (this.connectionTimeoutId !== undefined) {
clearTimeout(this.connectionTimeoutId);
this.connectionTimeoutId = undefined;
@ -224,7 +227,7 @@ export class WebSocketManager {
// Check if we've been stopped while connecting
if (this.isStopped) {
this.webSocket?.close(
ws.close(
1000,
"WebSocketManager was stopped during connection"
);
@ -234,7 +237,7 @@ export class WebSocketManager {
this.onWebSocketStatusChanged.trigger(true);
};
this.webSocket.onmessage = (event): void => {
ws.onmessage = (event): void => {
try {
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
const message = JSON.parse(
@ -265,13 +268,13 @@ export class WebSocketManager {
}
};
this.webSocket.onerror = (error): void => {
ws.onerror = (error): void => {
this.logger.warn(
`WebSocket error occurred: ${error instanceof ErrorEvent ? error.message : "Unknown error"}`
);
};
this.webSocket.onclose = (event): void => {
ws.onclose = (event): void => {
if (this.connectionTimeoutId !== undefined) {
clearTimeout(this.connectionTimeoutId);
this.connectionTimeoutId = undefined;

View file

@ -399,7 +399,7 @@ export class SyncClient {
return DocumentSyncStatus.SYNCING_IS_DISABLED;
}
if (!this.syncer.isFirstSyncComplete || !this.hasFinishedOfflineSync) {
if (!this.syncer.isFirstSyncStarted || !this.hasFinishedOfflineSync) {
return DocumentSyncStatus.SYNCING;
}
@ -428,8 +428,11 @@ export class SyncClient {
* After calling this method, the SyncClient cannot be used again.
*/
public async destroy(): Promise<void> {
this.checkIfDestroyed("destroy");
if (this.hasBeenDestroyed) {
throw new Error(
"SyncClient has been destroyed and can no longer be used; called from destroy"
);
}
if (this.isDestroying) {
this.logger.warn(
"destroy() called while already destroying, ignoring"
@ -534,7 +537,11 @@ export class SyncClient {
}
private checkIfDestroyed(origin: string): void {
if (this.hasBeenDestroyed) {
// Reject new public-API entries the moment destroy() is called,
// not after `pause()` returns. Otherwise an external caller could
// pass the guard and start mutating state while destroy() is
// tearing down the websocket / clearing caches.
if (this.hasBeenDestroyed || this.isDestroying) {
throw new Error(
`SyncClient has been destroyed and can no longer be used; called from ${origin}`
);

View file

@ -30,9 +30,13 @@ export class CursorTracker {
upToDateness: DocumentUpToDateness;
})[] = [];
private lastLocalCursorState: DocumentWithCursors[] = [];
private lastLocalCursorStateWithoutDirtyDocuments: DocumentWithCursors[] =
[];
// Cache the previously sent state as a JSON string rather than as the
// array. We mutate `documentsWithCursors` in-place after the cache check
// (setting `vaultUpdateId = null` for dirty docs); storing the array would
// alias and the next call's equality check would compare against
// post-mutation state.
private lastLocalCursorStateJson = "[]";
private lastLocalCursorStateWithoutDirtyDocumentsJson = "[]";
public constructor(
logger: Logger,
@ -99,65 +103,65 @@ export class CursorTracker {
public async sendLocalCursorsToServer(
documentToCursors: Record<RelativePath, CursorSpan[]>
): Promise<void> {
const documentsWithCursors: DocumentWithCursors[] = [];
// Serialise concurrent senders so they don't interleave on the
// disk reads + state mutations and emit out-of-order cursor messages.
await this.updateLock.withLock(async () => {
const documentsWithCursors: DocumentWithCursors[] = [];
for (const [relativePath, cursors] of Object.entries(
documentToCursors
)) {
const record = this.queue.getSettledDocumentByPath(relativePath);
for (const [relativePath, cursors] of Object.entries(
documentToCursors
)) {
const record = this.queue.getSettledDocumentByPath(relativePath);
if (!record) {
continue; // Let's wait for the file to be created before sending cursors
if (!record) {
continue; // Let's wait for the file to be created before sending cursors
}
documentsWithCursors.push({
relativePath: relativePath,
documentId: record.documentId,
vaultUpdateId: record.parentVersionId,
cursors: cursors.map(({ start, end }) => ({
start: Math.min(start, end),
end: Math.max(start, end)
})) // the client might send directional selections
});
}
documentsWithCursors.push({
relativePath: relativePath,
documentId: record.documentId,
vaultUpdateId: record.parentVersionId,
cursors: cursors.map(({ start, end }) => ({
start: Math.min(start, end),
end: Math.max(start, end)
})) // the client might send directional selections
});
}
if (
JSON.stringify(this.lastLocalCursorState) ===
JSON.stringify(documentsWithCursors)
) {
// Caching step to avoid reading the edited files all the time
return;
}
this.lastLocalCursorState = documentsWithCursors;
for (const doc of documentsWithCursors) {
const readContent = await this.fileOperations.read(
doc.relativePath
);
const record = this.queue.getSettledDocumentByPath(
doc.relativePath
);
if (record?.remoteHash !== (await hash(readContent))) {
doc.vaultUpdateId = null;
const beforeJson = JSON.stringify(documentsWithCursors);
if (this.lastLocalCursorStateJson === beforeJson) {
// Caching step to avoid reading the edited files all the time
return;
}
}
this.lastLocalCursorStateJson = beforeJson;
if (
JSON.stringify(this.lastLocalCursorStateWithoutDirtyDocuments) ===
JSON.stringify(documentsWithCursors)
) {
return;
}
for (const doc of documentsWithCursors) {
const readContent = await this.fileOperations.read(
doc.relativePath
);
const record = this.queue.getSettledDocumentByPath(
doc.relativePath
);
if (record?.remoteHash !== (await hash(readContent))) {
doc.vaultUpdateId = null;
}
}
this.lastLocalCursorStateWithoutDirtyDocuments = documentsWithCursors;
const afterJson = JSON.stringify(documentsWithCursors);
if (this.lastLocalCursorStateWithoutDirtyDocumentsJson === afterJson) {
return;
}
this.webSocketManager.updateLocalCursors({ documentsWithCursors });
this.lastLocalCursorStateWithoutDirtyDocumentsJson = afterJson;
this.webSocketManager.updateLocalCursors({ documentsWithCursors });
});
}
public reset(): void {
this.knownRemoteCursors = [];
this.lastLocalCursorState = [];
this.lastLocalCursorStateWithoutDirtyDocuments = [];
this.lastLocalCursorStateJson = "[]";
this.lastLocalCursorStateWithoutDirtyDocumentsJson = "[]";
this.updateLock.reset();
}

View file

@ -75,7 +75,7 @@ export class Syncer {
);
}
public get isFirstSyncComplete(): boolean {
public get isFirstSyncStarted(): boolean {
return this._isFirstSyncStarted;
}
@ -264,36 +264,34 @@ export class Syncer {
break;
}
} catch (e) {
// The currently-processed event was already shifted off the queue
// by drain() before processEvent ran. If it's a LocalCreate, any
// queued Delete/Update events whose `documentId` is this Create's
// resolvers.promise would `await` it forever once we return — so
// settle the resolvers on every failure path before
// dispatching/re-throwing. clearPending()'s rejectAllPendingCreates
// walks the queue and so cannot reach this in-flight event.
// Re-rejecting an already-resolved promise is a no-op, so it's
// safe to call this unconditionally on the LocalCreate branch.
if (event.type === SyncEventType.LocalCreate) {
event.resolvers.promise.catch(() => {
/* suppressed */
});
event.resolvers.reject(
new Error(`Create was cancelled: ${e}`)
);
}
if (e instanceof FileNotFoundError) {
this.logger.info(
`Skipping sync event '${event.type}' because the file no longer exists`
);
if (event.type === SyncEventType.LocalCreate) {
event.resolvers.promise.catch(() => {
/* suppressed */
});
event.resolvers.reject(new Error("Create was cancelled"));
}
return;
}
if (e instanceof HttpClientError) {
this.logger.error(
`Server rejected ${event.type} request: ${e.message}`
);
// The event was already shifted off the queue before
// `processEvent` ran; if it was a Create, its resolver
// promise would otherwise hang forever, blocking any
// queued Delete / SyncLocal that `await`s it.
if (event.type === SyncEventType.LocalCreate) {
event.resolvers.promise.catch(() => {
/* suppressed */
});
event.resolvers.reject(
new Error(
`Create was cancelled — server rejected the request (${e.message})`
)
);
}
return;
}
throw e;
@ -513,6 +511,7 @@ export class Syncer {
if (createEvent === undefined) {
// a http response will always be more up-to-date than any queued remote update
// move will always move to the relative path when MoveOnConflict.EXISTING is given
await this.operations.move(
path,
response.relativePath,

View file

@ -40,9 +40,12 @@ export class EventListeners<TListener extends (...args: any[]) => any> {
* @param args The arguments to pass to each listener
*/
public trigger(...args: Parameters<TListener>): void {
this.listeners.forEach((listener) => {
const snapshot = this.listeners.slice();
for (const listener of snapshot) {
// allow removing listeners during the trigger loop
if (!this.listeners.includes(listener)) continue;
listener(...args);
});
}
}
/**
@ -53,16 +56,17 @@ export class EventListeners<TListener extends (...args: any[]) => any> {
* @param args The arguments to pass to each listener
*/
public async triggerAsync(...args: Parameters<TListener>): Promise<void> {
await awaitAll(
this.listeners
.map((listener) => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return listener(...args);
})
.filter((result): result is Promise<unknown> => {
return result instanceof Promise;
})
);
const snapshot = this.listeners.slice();
const promises: Promise<unknown>[] = [];
for (const listener of snapshot) {
if (!this.listeners.includes(listener)) continue;
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const result = listener(...args);
if (result instanceof Promise) {
promises.push(result);
}
}
await awaitAll(promises);
}
public clear(): void {

View file

@ -48,29 +48,25 @@ describe("MinCovered", () => {
assert.strictEqual(covered.min, 6);
});
it("should auto-advance when setting min value", () => {
it("should auto-advance when adding the value that fills the next gap", () => {
const covered = new MinCovered(5);
covered.add(7);
covered.add(8);
covered.add(9);
assert.strictEqual(covered.min, 5);
// Setting min to 6 should auto-advance through 7, 8, 9
covered.min = 6;
// Adding 6 fills the gap and auto-advances through 7, 8, 9
covered.add(6);
assert.strictEqual(covered.min, 9);
covered.add(10);
assert.strictEqual(covered.min, 10);
});
it("should handle setting min value with no consecutive values", () => {
it("should rewind when reset is called explicitly", () => {
const covered = new MinCovered(5);
covered.add(10);
covered.add(15);
assert.strictEqual(covered.min, 5);
// Setting min to 8 should not auto-advance (no consecutive values)
covered.min = 8;
assert.strictEqual(covered.min, 8);
// Add 9 to trigger auto-advance to 10
covered.add(9);
assert.strictEqual(covered.min, 10);
covered.add(7);
covered.reset(3);
assert.strictEqual(covered.min, 3);
covered.add(4);
assert.strictEqual(covered.min, 4);
});
});

View file

@ -16,18 +16,12 @@
export class MinCovered {
private seenValues: number[] = [];
public constructor(private minValue: number) {}
public constructor(private minValue: number) { }
public get min(): number {
return this.minValue;
}
public set min(value: number) {
this.minValue = Math.max(value, this.minValue);
this.seenValues = this.seenValues.filter((v) => v > this.minValue);
this.advanceMinWhilePossible();
}
public add(value: number | undefined): void {
if (value === undefined || value < this.minValue) {
return;

View file

@ -44,20 +44,19 @@ export function rateLimit<
newArgs = undefined;
}
const { promise, resolve } = Promise.withResolvers<undefined>();
running = promise;
sleep(
// `running` must signal both "minimum interval has elapsed" *and*
// "fn() has finished" — otherwise an `fn` that takes longer than
// the interval would let a queued waiter fire a concurrent `fn`
const interval =
typeof minIntervalMs === "function"
? minIntervalMs()
: minIntervalMs
)
.then(() => {
resolve(undefined);
})
.catch(() => {
// sleep cannot fail
});
return fn(...args);
: minIntervalMs;
const fnPromise = fn(...args);
running = Promise.all([
fnPromise.catch(() => undefined),
sleep(interval)
]);
return fnPromise;
};
return decoratedFn;