This commit is contained in:
Andras Schmelczer 2026-04-24 20:56:03 +01:00
parent a7b588da97
commit 19d5dc1999
11 changed files with 358 additions and 355 deletions

View file

@ -67,27 +67,6 @@ export class Syncer {
this.webSocketManager.onWebSocketStatusChanged.add((isConnected) => {
if (isConnected) {
this.sendHandshakeMessage();
} else {
// Don't null the reference synchronously — if the scan is
// still in flight, the next reconnect would spawn a second
// concurrent scan racing on the same queue. Defer the
// clear until the in-flight task actually resolves, so a
// fresh scan can only start once the prior one is done.
const current = this.runningScheduleSyncForOfflineChanges;
if (current === undefined) return;
current
.catch(() => {
/* swallow — internal error already logged */
})
.finally(() => {
if (
this.runningScheduleSyncForOfflineChanges ===
current
) {
this.runningScheduleSyncForOfflineChanges =
undefined;
}
});
}
});
this.webSocketManager.onRemoteVaultUpdateReceived.add(
@ -102,20 +81,8 @@ export class Syncer {
return this._isFirstSyncComplete;
}
public hasPendingOperationsForDocument(relativePath: string): boolean {
return this.queue.hasPendingEventsForPath(relativePath);
}
public syncLocallyCreatedFile(relativePath: RelativePath): void {
this.queue.enqueue({ type: SyncEventType.Create, path: relativePath });
this.ensureDraining();
}
public syncLocallyDeletedFile(relativePath: RelativePath): void {
this.queue.enqueue({
type: SyncEventType.Delete,
path: relativePath,
});
this.queue.enqueue({ type: SyncEventType.LocalCreate, path: relativePath });
this.ensureDraining();
}
@ -126,10 +93,78 @@ export class Syncer {
oldPath?: RelativePath;
relativePath: RelativePath;
}): void {
this.queue.enqueue({ type: SyncEventType.SyncLocal, path: relativePath, oldPath });
this.queue.enqueue({ type: SyncEventType.LocalUpdate, path: relativePath, oldPath });
this.ensureDraining();
}
public syncLocallyDeletedFile(relativePath: RelativePath): void {
this.queue.enqueue({
type: SyncEventType.LocalDelete,
path: relativePath,
});
this.ensureDraining();
}
public async syncRemotelyUpdatedFile(
message: WebSocketVaultUpdate
): Promise<void> {
await this.scheduleSyncForOfflineChanges();
for (const remoteVersion of message.documents) {
this.queue.enqueue({
type: SyncEventType.RemoteUpdate,
remoteVersion
});
}
if (message.isInitialSync) {
this._isFirstSyncComplete = true;
}
this.ensureDraining();
}
// A PathChange notifies us that a document now lives at a new server-
// canonical path. It's delivered to every client (origin included)
// because the create/update HTTP response no longer carries the path,
// so the only way the origin learns about dedupe or first-rename-wins
// is via this event.
//
// Algorithmic assumptions:
// (1) Per-vault broadcast ordering is preserved by the server, so if
// the same write produced a `VaultUpdate` (content change) and a
// `PathChange` (path change), the `VaultUpdate` is handled first
// — that's what lets us skip advancing `parentVersionId` here
// without risking a stuck "already up-to-date" check later.
// (2) On a lag-induced disconnect (`broadcast::error::Lagged`) the
// server disconnects the client for a full resync, so out-of-
// order delivery across a reconnect boundary can't leave us with
// a stale PathChange overwriting a newer one.
public async syncRemotelyChangedPath(
pathChange: WebSocketVaultPathChange
): Promise<void> {
try {
await this.scheduleSyncForOfflineChanges();
this.queue.enqueue({
type: SyncEventType.RemotePathChange,
pathChange
});
await this.scheduleDrain();
} catch (e) {
if (e instanceof SyncResetError) {
this.logger.info(
"Failed to apply remote path change due to a reset"
);
return;
}
this.logger.error(`Failed to apply remote path change: ${e}`);
}
}
public async scheduleSyncForOfflineChanges(): Promise<void> {
if (this.runningScheduleSyncForOfflineChanges !== undefined) {
this.logger.debug("Uploading local changes is already in progress");
@ -167,114 +202,27 @@ export class Syncer {
}
}
public async syncRemotelyUpdatedFile(
message: WebSocketVaultUpdate
): Promise<void> {
try {
await this.scheduleSyncForOfflineChanges();
for (const remoteVersion of message.documents) {
this.queue.enqueue({
type: SyncEventType.SyncRemote,
remoteVersion
});
}
if (message.isInitialSync) {
this._isFirstSyncComplete = true;
}
await this.scheduleDrain();
} catch (e) {
if (e instanceof SyncResetError) {
this.logger.info(
"Failed to sync remotely updated file due to a reset"
);
return;
}
this.logger.error(`Failed to sync remotely updated file: ${e}`);
}
}
// A PathChange notifies us that a document now lives at a new server-
// canonical path. It's delivered to every client (origin included)
// because the create/update HTTP response no longer carries the path,
// so the only way the origin learns about dedupe or first-rename-wins
// is via this event.
//
// Algorithmic assumptions:
// (1) Per-vault broadcast ordering is preserved by the server, so if
// the same write produced a `VaultUpdate` (content change) and a
// `PathChange` (path change), the `VaultUpdate` is handled first
// — that's what lets us skip advancing `parentVersionId` here
// without risking a stuck "already up-to-date" check later.
// (2) On a lag-induced disconnect (`broadcast::error::Lagged`) the
// server disconnects the client for a full resync, so out-of-
// order delivery across a reconnect boundary can't leave us with
// a stale PathChange overwriting a newer one.
public async syncRemotelyChangedPath(
pathChange: WebSocketVaultPathChange
): Promise<void> {
// Serialize onto the drain chain so this handler can't race against
// an in-flight `processSyncRemote` / `processSyncLocal` etc. that
// captured the old path before our move.
try {
await this.chainOntoDrain(async () => {
const existing = this.queue.getDocumentByDocumentId(
pathChange.documentId
);
if (existing === undefined) {
throw new Error(
`Received path change for unknown document ${pathChange.documentId}`
);
}
const { path: currentPath, record } = existing;
const newPath = pathChange.relativePath;
if (currentPath !== newPath) {
await this.operations.move(currentPath, newPath);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: {
type: SyncType.MOVE,
relativePath: newPath,
movedFrom: currentPath
},
message: "Applied remote path change",
author: pathChange.userId,
timestamp: new Date(pathChange.updatedDate)
});
}
// `operations.move` updates the queue's path index, but
// doesn't touch `remoteRelativePath`. Refresh it so offline
// change detection compares against the server's path.
// parentVersionId intentionally stays at its prior value:
// if the write also changed content, the corresponding
// VaultUpdate handles that; advancing it here would make us
// skip fetching content we don't yet have.
this.queue.setDocument(newPath, {
...record,
remoteRelativePath: newPath
});
});
} catch (e) {
if (e instanceof SyncResetError) {
this.logger.info(
"Failed to apply remote path change due to a reset"
);
return;
}
this.logger.error(`Failed to apply remote path change: ${e}`);
}
}
public reset(): void {
this._isFirstSyncComplete = false;
this.queue.clear();
this.runningScheduleSyncForOfflineChanges = undefined;
// Don't null the reference synchronously — if the scan is
// still in flight, the next reconnect would spawn a second
// concurrent scan racing on the same queue. Defer the
// clear until the in-flight task actually resolves, so a
// fresh scan can only start once the prior one is done.
const current = this.runningScheduleSyncForOfflineChanges;
if (current !== undefined) {
current.finally(() => {
if (
this.runningScheduleSyncForOfflineChanges ===
current
) {
this.runningScheduleSyncForOfflineChanges =
undefined;
}
});
}
// Do not set this.draining = undefined — the in-flight drain will
// exit naturally (SyncResetError or empty queue) and the promise
// chain stays intact, preventing concurrent drain invocations
@ -372,17 +320,20 @@ export class Syncer {
try {
switch (event.type) {
case SyncEventType.Create:
case SyncEventType.LocalCreate:
await this.processCreate(event);
break;
case SyncEventType.Delete:
case SyncEventType.LocalDelete:
await this.processDelete(event);
break;
case SyncEventType.SyncLocal:
case SyncEventType.LocalUpdate:
await this.processSyncLocal(event);
break;
case SyncEventType.SyncRemote:
await this.processSyncRemote(event);
case SyncEventType.RemoteUpdate:
await this.processSyncRemoteContent(event);
break;
case SyncEventType.RemotePathChange:
await this.processSyncRemotePath(event);
break;
}
} catch (e) {
@ -390,7 +341,7 @@ export class Syncer {
this.logger.info(
`Skipping sync event '${event.type}' because the file no longer exists`
);
if (event.type === SyncEventType.Create) {
if (event.type === SyncEventType.LocalCreate) {
event.resolvers?.promise.catch(() => { });
event.resolvers?.reject(new Error("Create was cancelled"));
}
@ -404,7 +355,7 @@ export class Syncer {
// `processEvent` ran; if it was a Create, its resolver
// promise would otherwise hang forever, blocking any
// queued Delete / SyncLocal that `await`s it.
if (event.type === SyncEventType.Create) {
if (event.type === SyncEventType.LocalCreate) {
event.resolvers?.promise.catch(() => {
/* suppressed */
});
@ -423,7 +374,7 @@ export class Syncer {
private async processCreate(
event: Extract<SyncEvent, { type: SyncEventType.Create }>
event: Extract<SyncEvent, { type: SyncEventType.LocalCreate }>
): Promise<void> {
const effectivePath = event.path;
const contentBytes = await this.operations.read(effectivePath);
@ -487,7 +438,7 @@ export class Syncer {
}
private async processDelete(
event: Extract<SyncEvent, { type: SyncEventType.Delete }>
event: Extract<SyncEvent, { type: SyncEventType.LocalDelete }>
): Promise<void> {
let documentId: DocumentId;
if (typeof event.documentId === "string") {
@ -531,7 +482,7 @@ export class Syncer {
}
private async processSyncLocal(
event: Extract<SyncEvent, { type: SyncEventType.SyncLocal }>
event: Extract<SyncEvent, { type: SyncEventType.LocalUpdate }>
): Promise<void> {
let documentId: DocumentId;
if (typeof event.documentId === "string") {
@ -606,8 +557,8 @@ export class Syncer {
});
}
private async processSyncRemote(
event: Extract<SyncEvent, { type: SyncEventType.SyncRemote }>
private async processSyncRemoteContent(
event: Extract<SyncEvent, { type: SyncEventType.RemoteUpdate }>
): Promise<void> {
const { remoteVersion } = event;
const existingDoc = this.queue.getDocumentByDocumentId(
@ -643,6 +594,51 @@ export class Syncer {
await this.processRemoteUpdateForNewDocument(remoteVersion);
}
private async processSyncRemotePath(
event: Extract<SyncEvent, { type: SyncEventType.RemotePathChange }>
): Promise<void> {
const { pathChange } = event;
const existing = this.queue.getDocumentByDocumentId(
pathChange.documentId
);
if (existing === undefined) {
throw new Error(
`Received path change for unknown document ${pathChange.documentId}`
);
}
const { path: currentPath, record } = existing;
const newPath = pathChange.relativePath;
if (currentPath !== newPath) {
await this.operations.move(currentPath, newPath);
this.history.addHistoryEntry({
status: SyncStatus.SUCCESS,
details: {
type: SyncType.MOVE,
relativePath: newPath,
movedFrom: currentPath
},
message: "Applied remote path change",
author: pathChange.userId,
timestamp: new Date(pathChange.updatedDate)
});
}
// `operations.move` updates the queue's path index, but doesn't
// touch `remoteRelativePath`. Refresh it so offline change
// detection compares against the server's path. parentVersionId
// intentionally stays at its prior value: if the write also
// changed content, the corresponding VaultUpdate handles that;
// advancing it here would make us skip fetching content we don't
// yet have.
this.queue.setDocument(newPath, {
...record,
remoteRelativePath: newPath
});
}
private async processRemoteUpdateForExistingDocument(
currentPath: RelativePath,
record: DocumentRecord,
@ -793,14 +789,14 @@ export class Syncer {
details:
targetPath !== currentPath
? {
type: SyncType.MOVE,
relativePath: targetPath,
movedFrom: currentPath
}
type: SyncType.MOVE,
relativePath: targetPath,
movedFrom: currentPath
}
: {
type: SyncType.UPDATE,
relativePath: targetPath
},
type: SyncType.UPDATE,
relativePath: targetPath
},
message:
"Successfully downloaded remotely updated file from the server",
author: fullVersion.userId,
@ -1063,7 +1059,7 @@ export class Syncer {
// `resolvers` promise can be fulfilled (or rejected, on a deleted
// response). Dependent SyncLocal/Delete events are chained through
// that promise and would otherwise `await` forever.
createEvent?: Extract<SyncEvent, { type: SyncEventType.Create }>;
createEvent?: Extract<SyncEvent, { type: SyncEventType.LocalCreate }>;
}): Promise<void> {
if (response.isDeleted) {
// A Create that the server returned as already-deleted means
@ -1222,7 +1218,7 @@ export class Syncer {
}
private notifyRemainingOperationsChanged(): void {
const currentCount = this.queue.size;
const currentCount = this.queue.pendingUpdateCount;
if (this.previousRemainingOperationsCount !== currentCount) {
this.previousRemainingOperationsCount = currentCount;
this.onRemainingOperationsCountChanged.trigger(currentCount);