This commit is contained in:
Andras Schmelczer 2026-04-25 21:59:32 +01:00
parent a5b3cc5f3a
commit d23750f15b
6 changed files with 90 additions and 17 deletions

View file

@ -15,6 +15,10 @@ import {
} from "./consts";
import { randomUUID } from "node:crypto";
class ConflictFilesDetectedError extends Error {
public override readonly name = "ConflictFilesDetectedError";
}
export class TestRunner {
private agents: DeterministicAgent[] = [];
private readonly serverControl: ServerControl;
@ -224,6 +228,9 @@ export class TestRunner {
this.logger.info("Barrier complete: all clients converged");
return;
} catch (error) {
if (error instanceof ConflictFilesDetectedError) {
throw error;
}
lastError =
error instanceof Error ? error : new Error(String(error));
this.logger.info("Barrier: not yet converged, retrying...");
@ -289,6 +296,25 @@ export class TestRunner {
clientFiles.push(fileMap);
}
const conflictsByClient = clientFiles.map((files) =>
Array.from(files.keys()).filter((path) =>
CONFLICT_PATH_REGEX.test(path)
)
);
if (conflictsByClient.some((conflicts) => conflicts.length > 0)) {
const summary = conflictsByClient
.map((conflicts, i) =>
conflicts.length > 0
? `client ${i}: [${conflicts.join(", ")}]`
: null
)
.filter((s): s is string => s !== null)
.join("; ");
throw new ConflictFilesDetectedError(
`Found local conflict file(s): ${summary}`
);
}
const referenceFiles = Array.from(clientFiles[0].keys());
this.logger.info(
@ -327,15 +353,6 @@ export class TestRunner {
this.logger.info("✓ All clients are consistent");
const conflictFiles = referenceFiles.filter((path) =>
CONFLICT_PATH_REGEX.test(path)
);
if (conflictFiles.length > 0) {
throw new Error(
`Found ${conflictFiles.length} conflict file(s) — local displacements indicate a reconciliation regression: [${conflictFiles.join(", ")}]`
);
}
if (verify) {
this.logger.info("Running custom verification...");
try {

View file

@ -9,6 +9,7 @@ import type { TextWithCursors } from "reconcile-text";
import type { ServerConfig, ServerConfigData } from "../services/server-config";
import { CONFLICT_PATH_REGEX } from "../sync-operations/conflict-path";
import { removeFromArray } from "../utils/remove-from-array";
import { ExpectedFsEvents } from "../sync-operations/expected-fs-events";
class MockServerConfig implements Pick<ServerConfig, "getConfig"> {
public async getConfig(): Promise<ServerConfigData> {
@ -72,7 +73,8 @@ function makeOps(): {
const ops = new FileOperations(
new Logger(),
fs,
new MockServerConfig() as ServerConfig // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
new MockServerConfig() as ServerConfig, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
new ExpectedFsEvents()
);
return { fs, ops };
}

View file

@ -9,6 +9,7 @@ import { isBinary } from "../utils/is-binary";
import { buildConflictFileName } from "../sync-operations/conflict-path";
import type { ServerConfig } from "../services/server-config";
import { FileNotFoundError } from "../errors/file-not-found-error";
import type { ExpectedFsEvents } from "../sync-operations/expected-fs-events";
export enum MoveOnConflict {
EXISTING = "EXISTING",
@ -22,6 +23,7 @@ export class FileOperations {
private readonly logger: Logger,
fs: FileSystemOperations,
private readonly serverConfig: ServerConfig,
private readonly expectedFsEvents: ExpectedFsEvents,
private readonly nativeLineEndings = "\n"
) {
this.fs = new SafeFileSystemOperations(fs, logger);
@ -74,6 +76,10 @@ export class FileOperations {
moveOnConflict: MoveOnConflict
): Promise<RelativePath> {
const actualPath = await this.ensureClearPath(path, moveOnConflict);
// ensureClearPath leaves actualPath empty: either the file never
// existed, or it was just renamed away. The upcoming write therefore
// looks like a fresh create to the watcher.
this.expectedFsEvents.expectCreate(actualPath);
await this.fs.write(actualPath, this.toNativeLineEndings(newContent));
return actualPath;
}
@ -114,6 +120,7 @@ export class FileOperations {
this.logger.debug(
`The expected content is not mergable, so we won't perform a 3-way merge, just overwrite it`
);
this.expectedFsEvents.expectUpdate(path);
await this.fs.write(
path,
// `newContent` might not be binary so we still have to ensure the line endings are correct
@ -135,10 +142,12 @@ export class FileOperations {
this.logger.warn(
`3-way merge aborted for ${path}: one of expected/new is not valid UTF-8 (${decodeError}); falling back to overwrite`
);
this.expectedFsEvents.expectUpdate(path);
await this.fs.write(path, this.toNativeLineEndings(newContent));
return;
}
this.expectedFsEvents.expectUpdate(path);
await this.fs.atomicUpdateText(
path,
({ text, cursors }: TextWithCursors): TextWithCursors => {
@ -177,6 +186,7 @@ export class FileOperations {
public async delete(path: RelativePath): Promise<void> {
if (await this.exists(path)) {
this.expectedFsEvents.expectDelete(path);
await this.fs.delete(path);
await this.deletingEmptyParentDirectoriesOfDeletedFile(path);
} else {
@ -203,6 +213,7 @@ export class FileOperations {
}
const actualPath = await this.ensureClearPath(newPath, moveOnConflict);
this.expectedFsEvents.expectRename(oldPath, actualPath);
await this.fs.rename(oldPath, actualPath);
await this.deletingEmptyParentDirectoriesOfDeletedFile(oldPath);
return actualPath;
@ -223,6 +234,7 @@ export class FileOperations {
`Displacing existing file at ${path} to '${conflictPath}' to make room`
);
this.expectedFsEvents.expectRename(path, conflictPath);
await this.fs.rename(path, conflictPath);
return path;
}

View file

@ -27,6 +27,7 @@ import { DIFF_CACHE_SIZE_MB } from "./consts";
import { ServerConfig } from "./services/server-config";
import type { EventListeners } from "./utils/data-structures/event-listeners";
import { Lock } from "./utils/data-structures/locks";
import { ExpectedFsEvents } from "./sync-operations/expected-fs-events";
export class SyncClient {
private hasFinishedOfflineSync = false;
@ -50,6 +51,7 @@ export class SyncClient {
private readonly contentCache: FixedSizeDocumentCache,
private readonly serverConfig: ServerConfig,
private readonly syncService: SyncService,
private readonly expectedFsEvents: ExpectedFsEvents,
private readonly persistence: PersistenceProvider<
Partial<{
settings: Partial<SyncSettings>;
@ -178,10 +180,13 @@ export class SyncClient {
const serverConfig = new ServerConfig(syncService);
const expectedFsEvents = new ExpectedFsEvents();
const fileOperations = new FileOperations(
logger,
fs,
serverConfig,
expectedFsEvents,
nativeLineEndings
);
@ -229,6 +234,7 @@ export class SyncClient {
contentCache,
serverConfig,
syncService,
expectedFsEvents,
persistence
);
@ -363,14 +369,22 @@ export class SyncClient {
public syncLocallyCreatedFile(relativePath: RelativePath): void {
this.checkIfDestroyed("syncLocallyCreatedFile");
this.fileChangeNotifier.notifyOfFileChange(relativePath);
this.fileChangeNotifier.notifyOfFileChange(relativePath); // this is for updating cursors
if (this.expectedFsEvents.matchCreate(relativePath)) {
return;
}
this.syncer.syncLocallyCreatedFile(relativePath);
}
public syncLocallyDeletedFile(relativePath: RelativePath): void {
this.checkIfDestroyed("syncLocallyDeletedFile");
this.fileChangeNotifier.notifyOfFileChange(relativePath);
this.fileChangeNotifier.notifyOfFileChange(relativePath); // this is for updating cursors
if (this.expectedFsEvents.matchDelete(relativePath)) {
return;
}
this.syncer.syncLocallyDeletedFile(relativePath);
}
@ -383,7 +397,11 @@ export class SyncClient {
}): void {
this.checkIfDestroyed("syncLocallyUpdatedFile");
this.fileChangeNotifier.notifyOfFileChange(relativePath);
this.fileChangeNotifier.notifyOfFileChange(relativePath); // this is for updating cursors
if (this.expectedFsEvents.matchUpdate(relativePath, oldPath)) {
return;
}
this.syncer.syncLocallyUpdatedFile({
oldPath,
relativePath
@ -485,6 +503,15 @@ export class SyncClient {
this.syncService.stop();
await this.webSocketManager.stop();
await this.waitUntilFinished();
// Clear the offline-scan gate so a subsequent `startSyncing()`
// re-runs the scan; otherwise any local changes made while sync was
// paused (offline edits, deletes, renames) wouldn't be detected, and
// an incoming remote update would silently overwrite them.
this.syncer.clearOfflineScanGate();
// Drop any expected fs events that were registered but never matched
// (e.g. an op aborted by SyncResetError). Otherwise a real user edit
// at the same path after re-enable would be swallowed.
this.expectedFsEvents.clear();
}
private resetInMemoryState(): void {

View file

@ -24,15 +24,19 @@ export async function scheduleOfflineChanges(
}) => void,
enqueueDelete: (path: RelativePath) => void
): Promise<void> {
const allLocalFiles = await operations.listFilesRecursively();
logger.info(`Scheduling sync for ${allLocalFiles.length} local files`);
const allLocalFiles = new Set(await operations.listFilesRecursively());
logger.info(`Scheduling sync for ${allLocalFiles.size} local files`);
const allDocuments = queue.allSettledDocuments();
// A doc is "possibly deleted" only if it has no local file. Including
// docs that still exist locally would queue a spurious delete alongside
// the update below.
const locallyPossiblyDeletedFiles: DocumentWithPath[] = [];
for (const [path, record] of allDocuments.entries()) {
if (!allLocalFiles.has(path)) {
locallyPossiblyDeletedFiles.push({ path, record });
}
}
const locallyPossibleCreatedFiles: RelativePath[] = [];
const syncedLocalFiles: RelativePath[] = [];

View file

@ -154,6 +154,17 @@ export class Syncer {
public reset(): void {
this.queue.clearPending();
this.clearOfflineScanGate();
}
/**
* Reset the "have we already scanned this session" gate so a later
* `scheduleSyncForOfflineChanges()` actually performs a fresh scan
* instead of returning the previous (resolved) promise. Called when
* sync is paused so the next start picks up any offline edits made
* while sync was off.
*/
public clearOfflineScanGate(): void {
const current = this.runningScheduleSyncForOfflineChanges;
if (current !== undefined) {
void current.finally(() => {