missing ensure and covered
This commit is contained in:
parent
b52c09fecc
commit
addaa1699f
2 changed files with 23 additions and 33 deletions
|
|
@ -39,7 +39,6 @@ export class SyncEventQueue {
|
|||
// file creations for paths matching any of these patterns will be ignored
|
||||
private ignorePatterns: RegExp[];
|
||||
|
||||
private savePending = false;
|
||||
|
||||
|
||||
public readonly lastSeenUpdateId: VaultUpdateId;
|
||||
|
|
@ -85,7 +84,7 @@ export class SyncEventQueue {
|
|||
return this.documents.size;
|
||||
}
|
||||
|
||||
public enqueue(input: FileSyncEvent): void {
|
||||
public async enqueue(input: FileSyncEvent): Promise<void> {
|
||||
const path = (input.type === SyncEventType.RemoteChange) ? input.remoteVersion.relativePath : input.path;
|
||||
|
||||
if (this.ignorePatterns.some((pattern) => pattern.test(path))) {
|
||||
|
|
@ -108,21 +107,30 @@ export class SyncEventQueue {
|
|||
|
||||
const lookupPath = (input.type === SyncEventType.LocalUpdate && input.oldPath) ? input.oldPath : path;
|
||||
const record = this.documents.get(lookupPath);
|
||||
const documentId: DocumentId | Promise<DocumentId> | undefined =
|
||||
this.findLatestCreateForPath(lookupPath)?.resolvers.promise ?? record?.documentId;
|
||||
|
||||
if (documentId === undefined) {
|
||||
// latest creation must take precedence as it's from the doc's latest generation
|
||||
const pendingDocumentId: Promise<DocumentId> | undefined =
|
||||
this.findLatestCreateForPath(lookupPath)?.resolvers.promise;
|
||||
|
||||
const documentId: DocumentId | undefined =
|
||||
record?.documentId;
|
||||
|
||||
|
||||
if (pendingDocumentId === undefined && documentId === undefined) {
|
||||
// we can get here when deleting a local document after a remote update
|
||||
return;
|
||||
}
|
||||
|
||||
if (input.type === SyncEventType.LocalDelete) {
|
||||
this.events.push({ type: SyncEventType.LocalDelete, documentId });
|
||||
this.events.push({ type: SyncEventType.LocalDelete, documentId: pendingDocumentId ?? documentId! });
|
||||
return;
|
||||
}
|
||||
|
||||
if (input.oldPath !== undefined) {
|
||||
if (typeof documentId === "string") {
|
||||
if (pendingDocumentId !== undefined) {
|
||||
this.updatePendingCreatePath(input.oldPath, path);
|
||||
this.events.push({ type: SyncEventType.LocalUpdate, documentId: pendingDocumentId, path, originalPath: path });
|
||||
} else {
|
||||
this.documents.delete(input.oldPath);
|
||||
this.documents.set(path, record!);
|
||||
for (const e of this.events) {
|
||||
|
|
@ -131,12 +139,11 @@ export class SyncEventQueue {
|
|||
e.path = path;
|
||||
}
|
||||
}
|
||||
this.saveInTheBackground();
|
||||
} else {
|
||||
this.updatePendingCreatePath(input.oldPath, path);
|
||||
this.events.push({ type: SyncEventType.LocalUpdate, documentId: documentId!, path, originalPath: path });
|
||||
await this.save();
|
||||
|
||||
}
|
||||
}
|
||||
this.events.push({ type: SyncEventType.LocalUpdate, documentId, path, originalPath: path });
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -312,15 +319,4 @@ export class SyncEventQueue {
|
|||
}
|
||||
|
||||
|
||||
// Coalesce bursts of mutations into one persist per microtask. A drain
|
||||
// iteration can easily produce 10+ mutations; without this, we'd fire
|
||||
// 10 overlapping `save()` calls racing on the persistence backend.
|
||||
private saveInTheBackground(): void {
|
||||
if (this.savePending) return;
|
||||
this.savePending = true;
|
||||
queueMicrotask(() => {
|
||||
this.savePending = false;
|
||||
this.save();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -79,7 +79,7 @@ export class Syncer {
|
|||
}
|
||||
|
||||
public syncLocallyCreatedFile(relativePath: RelativePath): void {
|
||||
this.queue.enqueue({ type: SyncEventType.LocalCreate, path: relativePath });
|
||||
void this.queue.enqueue({ type: SyncEventType.LocalCreate, path: relativePath });
|
||||
this.ensureDraining();
|
||||
}
|
||||
|
||||
|
|
@ -90,12 +90,12 @@ export class Syncer {
|
|||
oldPath?: RelativePath;
|
||||
relativePath: RelativePath;
|
||||
}): void {
|
||||
this.queue.enqueue({ type: SyncEventType.LocalUpdate, path: relativePath, oldPath });
|
||||
void this.queue.enqueue({ type: SyncEventType.LocalUpdate, path: relativePath, oldPath });
|
||||
this.ensureDraining();
|
||||
}
|
||||
|
||||
public syncLocallyDeletedFile(relativePath: RelativePath): void {
|
||||
this.queue.enqueue({
|
||||
void this.queue.enqueue({
|
||||
type: SyncEventType.LocalDelete,
|
||||
path: relativePath,
|
||||
});
|
||||
|
|
@ -107,7 +107,7 @@ export class Syncer {
|
|||
): Promise<void> {
|
||||
await this.scheduleSyncForOfflineChanges();
|
||||
|
||||
this.queue.enqueue({
|
||||
void this.queue.enqueue({
|
||||
type: SyncEventType.RemoteChange,
|
||||
remoteVersion: message.document
|
||||
});
|
||||
|
|
@ -189,18 +189,13 @@ export class Syncer {
|
|||
|
||||
|
||||
private async internalScheduleSyncForOfflineChanges(): Promise<void> {
|
||||
// Offline scan wipes the event queue via `queue.clear()` and then
|
||||
// rebuilds events from disk. That MUST NOT race against an
|
||||
// in-flight drain iteration that may already hold a reference to
|
||||
// a freshly-cleared event — wait for any drain to finish, and
|
||||
// suppress new drains for the duration of the scan.
|
||||
this.isScanning = true;
|
||||
try {
|
||||
while (this.drainPromise !== undefined) {
|
||||
await this.drainPromise;
|
||||
}
|
||||
await scheduleOfflineChanges(
|
||||
{ logger: this.logger, operations: this.operations, queue: this.queue },
|
||||
this.logger, this.operations, this.queue,
|
||||
(path) => { this.syncLocallyCreatedFile(path); },
|
||||
(args) => { this.syncLocallyUpdatedFile(args); },
|
||||
(path) => { this.syncLocallyDeletedFile(path); },
|
||||
|
|
@ -210,7 +205,6 @@ export class Syncer {
|
|||
}
|
||||
|
||||
this.ensureDraining();
|
||||
await this.drainPromise;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue