This commit is contained in:
Andras Schmelczer 2026-04-07 21:03:21 +01:00
parent d5958fcbaa
commit 5a4723cd00
9 changed files with 163 additions and 697 deletions

View file

@ -19,7 +19,7 @@ class MockServerConfig implements Pick<ServerConfig, "getConfig"> {
}
class MockQueue implements Pick<SyncEventQueue, "getDocument" | "moveDocument"> {
public getDocument(
public getDocumentByPath(
_path: RelativePath
): DocumentRecord | undefined {
return undefined;

View file

@ -284,7 +284,7 @@ export class FileOperations {
// Avoid multiple deconflictPath calls returning the same path
await this.fs.waitForLock(newName);
const existingRecord = this.queue.getDocument(newName);
const existingRecord = this.queue.getSettledDocumentByPath(newName);
if (
existingRecord !== undefined || // the document might have been confirmed by the server at a new path but haven't yet moved there locally
(await this.fs.exists(newName, true))

View file

@ -105,7 +105,7 @@ export class CursorTracker {
for (const [relativePath, cursors] of Object.entries(
documentToCursors
)) {
const record = this.queue.getDocument(relativePath);
const record = this.queue.getSettledDocumentByPath(relativePath);
if (!record) {
continue; // Let's wait for the file to be created before sending cursors
@ -135,8 +135,8 @@ export class CursorTracker {
const readContent = await this.fileOperations.read(
doc.relative_path
);
const record = this.queue.getDocument(doc.relative_path);
if (record?.hash !== (await hash(readContent))) {
const record = this.queue.getSettledDocumentByPath(doc.relative_path);
if (record?.remoteHash !== (await hash(readContent))) {
doc.vault_update_id = null;
}
}
@ -221,7 +221,7 @@ export class CursorTracker {
private async getDocumentUpToDateness(
document: DocumentWithCursors
): Promise<DocumentUpToDateness> {
const record = this.queue.getDocument(document.relative_path);
const record = this.queue.getSettledDocumentByPath(document.relative_path);
if (!record) {
// the document of the cursor must be from the future
@ -243,8 +243,8 @@ export class CursorTracker {
document.relative_path
);
const currentRecord = this.queue.getDocument(document.relative_path);
return currentRecord?.hash === (await hash(currentContent))
const currentRecord = this.queue.getSettledDocumentByPath(document.relative_path);
return currentRecord?.remoteHash === (await hash(currentContent))
? DocumentUpToDateness.UpToDate
: DocumentUpToDateness.Prior;
}

View file

@ -14,20 +14,31 @@ import {
} from "./types";
export class SyncEventQueue {
// latest state of the filesystem as we know it, excluding
// unconfirmed creates but including pending deletes,
// it's always indexed by the latest path on disk
// Latest state of the filesystem as we know it, excluding
// unconfirmed creates but including pending deletes.
//
// It's always indexed by the latest path on disk.
//
// It maps a subset of the remote state onto the local filesystem.
private readonly documents = new Map<RelativePath, DocumentRecord>();
// all outstanding operations in order of occurrence,
// All outstanding operations in order of occurrence,
// can include multiple generations of the same document,
// e.g.: a create, delete, create sequence for the same path.
//
// The paths for the events must always correspond to the latest
// path on disk, so the path of each event may be updated multiple
// times.
//
// It maps pending changes onto the local filesystem.
private readonly events: SyncEvent[] = [];
// TODO: remove
// Log the last seen update before which we've seen all ids so that
// on the next startup, we can skip re-syncing what we have already
private lastSeenUpdateIds: CoveredValues;
// file creations for paths matching any of these patterns will be ignored
private ignorePatterns: RegExp[];
public constructor(
@ -57,6 +68,8 @@ export class SyncEventQueue {
}
const { lastSeenUpdateId } = initialState;
this.lastSeenUpdateIds = new CoveredValues(
Math.max(0, lastSeenUpdateId ?? 0)
);
@ -93,7 +106,7 @@ export class SyncEventQueue {
}
}
// todo: let's remove
public getSettledDocumentByPath(path: RelativePath): DocumentRecord | undefined {
return this.documents.get(path);
}
@ -120,9 +133,7 @@ export class SyncEventQueue {
}
/**
* Settle a Create event: add the document to the settled map,
* resolve the create promise, and replace promise-based documentId
* references in the event queue with the actual string documentId.
* Call once a create has been acknowledged by the server.
*/
public resolveCreate(
event: Extract<SyncEvent, { type: SyncEventType.Create }>,
@ -207,6 +218,7 @@ export class SyncEventQueue {
(e.type === SyncEventType.Delete &&
e.documentId === docId) ||
(e.type === SyncEventType.SyncRemote &&
// we care about the local path not the remote
this.getDocumentByDocumentId(e.remoteVersion.documentId as DocumentId)?.path === path)
);
}
@ -235,7 +247,8 @@ export class SyncEventQueue {
this.events.length = 0;
}
public enqueue(event: SyncEvent): void {
// todo: maybe move next() logic here to stop storing rubbish
public enqueue(event: SyncEvent): void { // new type
if (this.isIgnored(event)) return;
if (event.type === SyncEventType.SyncLocal) {
@ -309,8 +322,7 @@ export class SyncEventQueue {
e.documentId === documentId
);
if (deleteEvent !== undefined) {
this.removeAllSyncLocalsForDocumentId(await documentId);
removeFromArray(this.events, deleteEvent);
this.removeAllEventsForDocumentId(await documentId);
return deleteEvent;
}
@ -344,7 +356,9 @@ export class SyncEventQueue {
}
private isIgnored(event: SyncEvent): boolean {
if (event.type !== SyncEventType.Create) return false;
if (event.type !== SyncEventType.Create) {
return false;
}
return this.ignorePatterns.some((pattern) => pattern.test(event.path));
}
@ -365,19 +379,6 @@ export class SyncEventQueue {
}
}
private removeAllSyncLocalsForDocumentId(documentId: DocumentId): void {
for (let i = this.events.length - 1; i >= 0; i--) {
const e = this.events[i];
if (
e.type === SyncEventType.SyncLocal &&
e.documentId === documentId
) {
// eslint-disable-next-line no-restricted-syntax -- Bulk removal by predicate, not single-item removal
this.events.splice(i, 1);
}
}
}
public updatePendingCreatePath(
oldPath: RelativePath,
newPath: RelativePath

View file

@ -84,13 +84,15 @@ export class Syncer {
}
public syncLocallyCreatedFile(relativePath: RelativePath): void {
this.queue.enqueue({ type: SyncEventType.Create, path: relativePath });
this.queue.enqueue({ type: SyncEventType.Create, path: relativePath, originalPath: relativePath });
this.ensureDraining();
}
public syncLocallyDeletedFile(relativePath: RelativePath): void {
const record = this.queue.getDocument(relativePath);
const documentId = record?.documentId ?? "";
const record = this.queue.getSettledDocumentByPath(relativePath);
const documentId: DocumentId | Promise<DocumentId> | undefined =
record?.documentId ?? this.queue.getCreatePromise(relativePath);
if (documentId === undefined) return;
this.queue.enqueue({
type: SyncEventType.Delete,
documentId,
@ -107,7 +109,7 @@ export class Syncer {
relativePath: RelativePath;
}): void {
if (oldPath === undefined) {
const record = this.queue.getDocument(relativePath);
const record = this.queue.getSettledDocumentByPath(relativePath);
if (record === undefined) {
this.syncLocallyCreatedFile(relativePath);
return;
@ -115,17 +117,19 @@ export class Syncer {
this.queue.enqueue({
type: SyncEventType.SyncLocal,
documentId: record.documentId,
path: relativePath,
originalPath: relativePath,
});
this.ensureDraining();
return;
}
// Handle rename
const sourceRecord = this.queue.getDocument(oldPath);
const sourceRecord = this.queue.getSettledDocumentByPath(oldPath);
if (sourceRecord !== undefined) {
// Capture the displaced document's version before
// moveDocument removes it from the store
const displacedRecord = this.queue.getDocument(relativePath);
const displacedRecord = this.queue.getSettledDocumentByPath(relativePath);
const displacedDocumentId = this.queue.moveDocument(
oldPath,
relativePath
@ -141,17 +145,14 @@ export class Syncer {
this.queue.enqueue({
type: SyncEventType.SyncLocal,
documentId: sourceRecord.documentId,
path: relativePath,
originalPath: relativePath,
});
} else if (this.queue.hasCreateEvent(oldPath)) {
const updated = this.queue.updateCreatePath(oldPath, relativePath);
if (!updated) {
this.syncLocallyCreatedFile(relativePath);
}
} else {
// The create event may have already been dequeued and
// processed (e.g. skipped due to a concurrent rename
// deleting the file at the old path). Treat the file at
// the new path as a fresh create
// No settled document at the old path — enqueue a fresh
// create at the new path. If a Create for the old path is
// still in the queue it will fail with FileNotFoundError
// and reject its resolvers, cancelling any dependent events.
this.syncLocallyCreatedFile(relativePath);
}
@ -215,11 +216,9 @@ export class Syncer {
// past gaps — correct for incremental updates but wrong for a
// snapshot whose IDs are intentionally sparse
if (message.isInitialSync) {
this.queue.setLastSeenUpdateId(
Math.max(
...message.documents.map((d) => d.vaultUpdateId),
this.queue.getLastSeenUpdateId()
)
this.queue.lastSeenUpdateId = Math.max(
...message.documents.map((d) => d.vaultUpdateId),
this.queue.lastSeenUpdateId
);
this._isFirstSyncComplete = true;
}
@ -252,7 +251,7 @@ export class Syncer {
type: "handshake",
deviceId: this.deviceId,
token: this.settings.getSettings().token,
lastSeenVaultUpdateId: this.queue.getLastSeenUpdateId()
lastSeenVaultUpdateId: this.queue.lastSeenUpdateId
};
this.webSocketManager.sendHandshakeMessage(message);
}
@ -270,7 +269,7 @@ export class Syncer {
// Detect documents whose local path diverges from the server path.
// This happens when a rename was recorded while sync was disabled.
const allDocuments = this.queue.allDocuments();
const allDocuments = this.queue.allSettledDocuments();
const locallyRenamedPaths = new Set<RelativePath>();
for (const [path, record] of allDocuments) {
@ -285,6 +284,8 @@ export class Syncer {
this.queue.enqueue({
type: SyncEventType.SyncLocal,
documentId: record.documentId,
path,
originalPath: path,
});
locallyRenamedPaths.add(path);
}
@ -314,7 +315,7 @@ export class Syncer {
continue;
}
const existingRecord = this.queue.getDocument(relativePath);
const existingRecord = this.queue.getSettledDocumentByPath(relativePath);
if (existingRecord !== undefined) {
// Verify the content actually belongs to this document.
@ -331,7 +332,7 @@ export class Syncer {
throw e;
}
if (contentHash !== existingRecord.hash) {
if (contentHash !== existingRecord.remoteHash) {
const originalFile = await findMatchingFile(
contentHash,
locallyPossiblyDeletedFiles
@ -496,6 +497,10 @@ export class Syncer {
this.logger.info(
`Skipping sync event '${event.type}' because the file no longer exists`
);
if (event.type === SyncEventType.Create) {
event.resolvers?.promise.catch(() => { });
event.resolvers?.reject(new Error("Create was cancelled"));
}
return;
}
if (
@ -515,7 +520,7 @@ export class Syncer {
const localBytes =
await this.operations.read(eventPath);
const localHash = await hash(localBytes);
if (localHash !== record.hash) {
if (localHash !== record.remoteHash) {
this.logger.info(
`Server rejected update for ${eventPath} but local content changed, re-creating`
);
@ -559,14 +564,18 @@ export class Syncer {
);
if (oversizedEntry !== undefined) {
this.history.addHistoryEntry(oversizedEntry);
event.resolvers?.promise.catch(() => { });
event.resolvers?.reject(new Error("Create was cancelled"));
return;
}
const response = await this.syncService.create({
relativePath: effectivePath,
relativePath: event.originalPath,
contentBytes
});
event.resolvers?.resolve(response.documentId);
// Handle concurrent move & creation: the server merged our create
// with an existing document that we also have locally at a different path
const existingDoc = this.queue.getDocumentByDocumentId(
@ -586,7 +595,7 @@ export class Syncer {
// file AND the foreign document's record to the deconflicted path,
// then overwrite it — orphaning the foreign document. Handle this
// by writing directly to the deconflicted path instead of moving
const foreignRecord = this.queue.getDocument(effectivePath);
const foreignRecord = this.queue.getSettledDocumentByPath(effectivePath);
const pathOccupiedByForeignDocument =
response.relativePath !== effectivePath &&
foreignRecord !== undefined &&
@ -604,7 +613,7 @@ export class Syncer {
this.queue.setDocument(actualPath, {
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: afterWriteHash,
remoteHash: afterWriteHash,
remoteRelativePath: response.relativePath
});
await this.updateCache(
@ -617,7 +626,7 @@ export class Syncer {
this.queue.setDocument(actualPath, {
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: contentHash,
remoteHash: contentHash,
remoteRelativePath: response.relativePath
});
await this.updateCache(
@ -651,22 +660,20 @@ export class Syncer {
private async processDelete(
event: Extract<SyncEvent, { type: SyncEventType.Delete }>
): Promise<void> {
let { documentId } = event;
const { path } = event;
// Empty string means the documentId wasn't known when the
// delete was enqueued (e.g. a create was still in flight).
// Try to resolve it from the store now that the create may
// have completed
if (documentId === "") {
const record = this.queue.getDocument(path);
if (record === undefined) {
let documentId: DocumentId;
if (typeof event.documentId === "string") {
documentId = event.documentId;
} else {
try {
documentId = await event.documentId;
} catch {
this.logger.debug(
"Skipping delete for a document whose create was cancelled"
);
return;
}
documentId = record.documentId;
}
// For displacement deletes (side effect of a rename), check
@ -681,9 +688,6 @@ export class Syncer {
this.logger.info(
`Skipping displacement delete for ${documentId} — document was updated by another client`
);
// Allow broadcasts for this document to be processed
// normally so the updated content is downloaded
this.queue.unmarkRecentlyDeleted(documentId);
return;
}
}
@ -721,40 +725,57 @@ export class Syncer {
private async processSyncLocal(
event: Extract<SyncEvent, { type: SyncEventType.SyncLocal }>
): Promise<void> {
const doc = this.queue.getDocumentByDocumentId(event.documentId);
let documentId: DocumentId;
if (typeof event.documentId === "string") {
documentId = event.documentId;
} else {
try {
documentId = await event.documentId;
} catch {
this.logger.debug(
"Skipping sync-local for a document whose create was cancelled"
);
return;
}
}
const doc = this.queue.getDocumentByDocumentId(documentId);
if (doc === undefined) {
this.logger.debug(
`Skipping sync-local for unknown document ${event.documentId}`
`Skipping sync-local for unknown document ${documentId}`
);
return;
}
const { path: eventPath, record } = doc;
const { path: diskPath, record } = doc;
// Read file and compare hash
const contentBytes = await this.operations.read(eventPath);
// Read file from the current disk path
const contentBytes = await this.operations.read(diskPath);
const contentHash = await hash(contentBytes);
// Upload using the original path
const uploadPath = event.originalPath;
const pathChanged =
record.remoteRelativePath !== undefined &&
record.remoteRelativePath !== eventPath;
record.remoteRelativePath !== uploadPath;
if (contentHash === record.hash && !pathChanged) {
if (contentHash === record.remoteHash && !pathChanged) {
this.logger.debug(
`File hash of ${eventPath} matches last synced version; no need to sync`
`File hash of ${diskPath} matches last synced version; no need to sync`
);
return;
}
const response = await this.sendUpdate(
record,
eventPath,
uploadPath,
contentBytes
);
await this.handleMaybeMergingResponse({
path: eventPath,
path: diskPath,
response,
contentHash,
originalContentBytes: contentBytes
@ -768,7 +789,7 @@ export class Syncer {
status: SyncStatus.SUCCESS,
details: {
type: SyncType.UPDATE,
relativePath: eventPath
relativePath: diskPath
},
message: isMerge
? "Updated file and merged with remote changes"
@ -806,14 +827,6 @@ export class Syncer {
return;
}
if (this.queue.wasRecentlyDeleted(remoteVersion.documentId)) {
this.logger.debug(
`Ignoring stale broadcast for recently-deleted document ${remoteVersion.documentId}`
);
this.queue.addSeenUpdateId(remoteVersion.vaultUpdateId);
return;
}
if (remoteVersion.isDeleted) {
this.logger.debug(
`Document ${remoteVersion.relativePath} has been deleted remotely, no need to sync`
@ -836,7 +849,7 @@ export class Syncer {
try {
const contentBytes = await this.operations.read(currentPath);
const contentHash = await hash(contentBytes);
hasLocalChanges = record.hash !== contentHash;
hasLocalChanges = record.remoteHash !== contentHash;
} catch (e) {
if (!(e instanceof FileNotFoundError)) throw e;
}
@ -877,7 +890,7 @@ export class Syncer {
if (fullVersion.isDeleted) {
const contentBytes = await this.operations.read(currentPath);
const localHash = await hash(contentBytes);
if (localHash !== record.hash) {
if (localHash !== record.remoteHash) {
this.queue.removeDocument(currentPath);
this.syncLocallyCreatedFile(currentPath);
} else {
@ -891,7 +904,7 @@ export class Syncer {
const contentBytes = await this.operations.read(currentPath);
const contentHash = await hash(contentBytes);
const hasLocalChanges = record.hash !== contentHash;
const hasLocalChanges = record.remoteHash !== contentHash;
if (hasLocalChanges) {
const response = await this.sendUpdate(
@ -949,7 +962,7 @@ export class Syncer {
this.queue.setDocument(actualPath, {
documentId: fullVersion.documentId,
parentVersionId: fullVersion.vaultUpdateId,
hash: afterWriteHash,
remoteHash: afterWriteHash,
remoteRelativePath: fullVersion.relativePath
});
@ -1030,7 +1043,7 @@ export class Syncer {
this.queue.setDocument(remoteVersion.relativePath, {
documentId: remoteVersion.documentId,
parentVersionId: remoteVersion.vaultUpdateId,
hash: contentHash,
remoteHash: contentHash,
remoteRelativePath: remoteVersion.relativePath
});
@ -1113,8 +1126,8 @@ export class Syncer {
if (await this.operations.exists(path)) {
const localBytes = await this.operations.read(path);
const localHash = await hash(localBytes);
const record = this.queue.getDocument(path);
if (record !== undefined && localHash !== record.hash) {
const record = this.queue.getSettledDocumentByPath(path);
if (record !== undefined && localHash !== record.remoteHash) {
this.queue.removeDocument(path);
this.queue.addSeenUpdateId(response.vaultUpdateId);
this.syncLocallyCreatedFile(path);
@ -1137,15 +1150,17 @@ export class Syncer {
);
if (displacedPath !== undefined) {
const displacedRecord =
this.queue.getDocument(displacedPath);
this.queue.getSettledDocumentByPath(displacedPath);
if (displacedRecord !== undefined) {
const displacedBytes =
await this.operations.read(displacedPath);
const displacedHash = await hash(displacedBytes);
if (displacedHash !== displacedRecord.hash) {
if (displacedHash !== displacedRecord.remoteHash) {
this.queue.enqueue({
type: SyncEventType.SyncLocal,
documentId: displacedRecord.documentId,
path: displacedPath,
originalPath: displacedPath,
});
}
}
@ -1169,7 +1184,7 @@ export class Syncer {
this.queue.setDocument(actualPath, {
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: afterWriteHash,
remoteHash: afterWriteHash,
remoteRelativePath: response.relativePath
});
@ -1184,7 +1199,7 @@ export class Syncer {
this.queue.setDocument(actualPath, {
documentId: response.documentId,
parentVersionId: response.vaultUpdateId,
hash: contentHash,
remoteHash: contentHash,
remoteRelativePath: response.relativePath
});

View file

@ -10,5 +10,5 @@ export async function findMatchingFile(
return undefined;
}
return candidates.find(({ record }) => record.hash === contentHash);
return candidates.find(({ record }) => record.remoteHash === contentHash);
}