Fix cursors
This commit is contained in:
parent
44f642f649
commit
fc40d3eccf
12 changed files with 711 additions and 529 deletions
|
|
@ -34,6 +34,7 @@
|
|||
"url": "^0.11.4",
|
||||
"virtual-scroller": "^1.13.1",
|
||||
"webpack": "^5.99.9",
|
||||
"webpack-cli": "^6.0.1"
|
||||
"webpack-cli": "^6.0.1",
|
||||
"reconcile-text": "^0.5.0"
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ import { updateEditorStatusDisplay } from "./views/editor-sync-line/editor-sync-
|
|||
import { remoteCursorsTheme } from "./views/cursors/remote-cursor-theme";
|
||||
import {
|
||||
remoteCursorsPlugin,
|
||||
setCursors
|
||||
RemoteCursorsPluginValue
|
||||
} from "./views/cursors/remote-cursors-plugin";
|
||||
import { LocalCursorUpdateListener } from "./views/cursors/local-cursor-update-listener";
|
||||
import { slowFetchFactory } from "./debugging/slow-fetch-factory";
|
||||
|
|
@ -93,7 +93,7 @@ export default class VaultLinkPlugin extends Plugin {
|
|||
this.registerEditorExtension([remoteCursorsTheme, remoteCursorsPlugin]);
|
||||
|
||||
this.client.addRemoteCursorsUpdateListener((cursors) => {
|
||||
setCursors(cursors, this.app);
|
||||
RemoteCursorsPluginValue.setCursors(cursors, this.app);
|
||||
});
|
||||
|
||||
const cursorListener = new LocalCursorUpdateListener(
|
||||
|
|
|
|||
|
|
@ -13,37 +13,131 @@ import type { CursorSpan, MaybeOutdatedClientCursors } from "sync-client";
|
|||
import type { App } from "obsidian";
|
||||
import { MarkdownView } from "obsidian";
|
||||
|
||||
let cursors: {
|
||||
name: string;
|
||||
path: string;
|
||||
span: CursorSpan;
|
||||
deviceId: string;
|
||||
}[] = [];
|
||||
|
||||
import { StateEffect } from "@codemirror/state";
|
||||
import { getRandomColor } from "src/utils/get-random-color";
|
||||
import { updateSelection } from "./update-selection";
|
||||
import { reconcileWithHistory, SpanWithHistory } from "reconcile-text";
|
||||
|
||||
function findWhereToMoveCursor(
|
||||
cursor: number,
|
||||
spans: SpanWithHistory[]
|
||||
): number | null {
|
||||
let position = 0;
|
||||
for (const span of spans) {
|
||||
// left and origin are the same
|
||||
if (position === cursor && span.history === "AddedFromRight") {
|
||||
return position + span.text.length;
|
||||
}
|
||||
position += span.text.length;
|
||||
if (position === cursor && span.history === "RemovedFromRight") {
|
||||
return position - span.text.length;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
const forceUpdate = StateEffect.define();
|
||||
|
||||
export class RemoteCursorsPluginValue implements PluginValue {
|
||||
public decorations: DecorationSet = RangeSet.of([]);
|
||||
|
||||
public update(update: ViewUpdate): void {
|
||||
update.changes.iterChanges((fromA, toA, fromB, toB, _inserted) => {
|
||||
const spans = cursors.map((cursor) => cursor.span);
|
||||
updateSelection({
|
||||
fromA,
|
||||
toA,
|
||||
fromB,
|
||||
toB,
|
||||
spans
|
||||
private static cursors: {
|
||||
name: string;
|
||||
path: string;
|
||||
span: CursorSpan;
|
||||
deviceId: string;
|
||||
isOutdated: boolean;
|
||||
}[] = [];
|
||||
|
||||
public static setCursors(
|
||||
clients: MaybeOutdatedClientCursors[],
|
||||
app: App
|
||||
): void {
|
||||
RemoteCursorsPluginValue.cursors = [
|
||||
...RemoteCursorsPluginValue.cursors.filter(({ deviceId }) =>
|
||||
clients.some(
|
||||
(client) =>
|
||||
client.deviceId === deviceId && client.isOutdated
|
||||
)
|
||||
),
|
||||
...clients
|
||||
.filter(
|
||||
({ isOutdated, deviceId }) =>
|
||||
!isOutdated ||
|
||||
RemoteCursorsPluginValue.cursors.every(
|
||||
(c) => deviceId !== c.deviceId
|
||||
)
|
||||
)
|
||||
.flatMap((client) => {
|
||||
const clientCursors = client.documentsWithCursors;
|
||||
return clientCursors.flatMap((cursor) =>
|
||||
cursor.cursors.map((span) => ({
|
||||
name: client.userName,
|
||||
path: cursor.relative_path,
|
||||
deviceId: client.deviceId,
|
||||
isOutdated: client.isOutdated,
|
||||
span: { ...span }
|
||||
}))
|
||||
);
|
||||
})
|
||||
];
|
||||
|
||||
app.workspace
|
||||
.getLeavesOfType("markdown")
|
||||
.map((leaf) => leaf.view)
|
||||
.filter((view) => view instanceof MarkdownView)
|
||||
.forEach((view) => {
|
||||
// @ts-expect-error, not typed
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
const editor = view.editor.cm as EditorView;
|
||||
|
||||
editor.dispatch({
|
||||
effects: [forceUpdate.of(null)]
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public update(update: ViewUpdate): void {
|
||||
const original = update.startState.doc.toString();
|
||||
const edited = update.state.doc.toString();
|
||||
|
||||
let updatedPositions: number[] = [];
|
||||
const reconciled = reconcileWithHistory(
|
||||
original,
|
||||
{
|
||||
text: original,
|
||||
cursors: RemoteCursorsPluginValue.cursors.flatMap(
|
||||
({ span }, i) => [
|
||||
{ id: i * 2, position: span.start },
|
||||
{ id: i * 2 + 1, position: span.end }
|
||||
]
|
||||
)
|
||||
},
|
||||
edited,
|
||||
"Character"
|
||||
);
|
||||
|
||||
reconciled.cursors.forEach(({ id, position }) => {
|
||||
const whereToJump = findWhereToMoveCursor(
|
||||
position,
|
||||
reconciled.history
|
||||
);
|
||||
if (whereToJump !== null) {
|
||||
updatedPositions[id] = whereToJump;
|
||||
} else {
|
||||
updatedPositions[id] = position;
|
||||
}
|
||||
});
|
||||
|
||||
RemoteCursorsPluginValue.cursors.forEach(({ span }, i) => {
|
||||
span.start = updatedPositions[i * 2];
|
||||
span.end = updatedPositions[i * 2 + 1];
|
||||
});
|
||||
|
||||
const decorations: Range<Decoration>[] = [];
|
||||
|
||||
cursors.forEach(({ name, span: { start, end } }) => {
|
||||
RemoteCursorsPluginValue.cursors.forEach(
|
||||
({ name, span: { start, end } }) => {
|
||||
const color = getRandomColor(name);
|
||||
const startLine = update.view.state.doc.lineAt(start);
|
||||
const endLine = update.view.state.doc.lineAt(end);
|
||||
|
|
@ -73,7 +167,11 @@ export class RemoteCursorsPluginValue implements PluginValue {
|
|||
});
|
||||
|
||||
// render text-selection in the lines between the first and last line
|
||||
for (let i = startLine.number + 1; i < endLine.number; i++) {
|
||||
for (
|
||||
let i = startLine.number + 1;
|
||||
i < endLine.number;
|
||||
i++
|
||||
) {
|
||||
const currentLine = update.view.state.doc.line(i);
|
||||
decorations.push({
|
||||
from: currentLine.from,
|
||||
|
|
@ -103,7 +201,8 @@ export class RemoteCursorsPluginValue implements PluginValue {
|
|||
widget: new RemoteCursorWidget(color, name)
|
||||
})
|
||||
});
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
this.decorations = Decoration.set(decorations, true);
|
||||
}
|
||||
|
|
@ -115,43 +214,3 @@ export const remoteCursorsPlugin = ViewPlugin.fromClass(
|
|||
decorations: (v) => v.decorations
|
||||
}
|
||||
);
|
||||
|
||||
export function setCursors(
|
||||
clients: MaybeOutdatedClientCursors[],
|
||||
app: App
|
||||
): void {
|
||||
cursors = [
|
||||
...cursors.filter(({ deviceId }) =>
|
||||
clients.some(
|
||||
(client) => client.deviceId === deviceId && client.isOutdated
|
||||
)
|
||||
),
|
||||
...clients
|
||||
.filter(({ isOutdated }) => !isOutdated)
|
||||
.flatMap((client) => {
|
||||
const clientCursors = client.documentsWithCursors;
|
||||
return clientCursors.flatMap((cursor) =>
|
||||
cursor.cursors.map((span) => ({
|
||||
name: client.userName,
|
||||
path: cursor.relative_path,
|
||||
deviceId: client.deviceId,
|
||||
span
|
||||
}))
|
||||
);
|
||||
})
|
||||
];
|
||||
|
||||
app.workspace
|
||||
.getLeavesOfType("markdown")
|
||||
.map((leaf) => leaf.view)
|
||||
.filter((view) => view instanceof MarkdownView)
|
||||
.forEach((view) => {
|
||||
// @ts-expect-error, not typed
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
const editor = view.editor.cm as EditorView;
|
||||
|
||||
editor.dispatch({
|
||||
effects: [forceUpdate.of(null)]
|
||||
});
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,111 +0,0 @@
|
|||
import { updateSelection } from "./update-selection";
|
||||
|
||||
describe("Selection update", () => {
|
||||
it("should handle span fully before - insert", () => {
|
||||
const spans = [{ start: 3, end: 5 }];
|
||||
updateSelection({
|
||||
fromA: 0,
|
||||
toA: 0,
|
||||
fromB: 0,
|
||||
toB: 2,
|
||||
spans
|
||||
});
|
||||
expect(spans).toEqual([{ start: 5, end: 7 }]);
|
||||
});
|
||||
|
||||
it("should handle span fully before - delete", () => {
|
||||
const spans = [{ start: 3, end: 5 }];
|
||||
updateSelection({
|
||||
fromA: 0,
|
||||
toA: 2,
|
||||
fromB: 0,
|
||||
toB: 0,
|
||||
spans
|
||||
});
|
||||
expect(spans).toEqual([{ start: 1, end: 3 }]);
|
||||
});
|
||||
|
||||
it("should handle span fully after - insert", () => {
|
||||
const spans = [{ start: 3, end: 5 }];
|
||||
updateSelection({
|
||||
fromA: 6,
|
||||
toA: 6,
|
||||
fromB: 6,
|
||||
toB: 10,
|
||||
spans
|
||||
});
|
||||
expect(spans).toEqual([{ start: 3, end: 5 }]);
|
||||
});
|
||||
|
||||
it("should handle span fully after - delete", () => {
|
||||
const spans = [{ start: 3, end: 5 }];
|
||||
updateSelection({
|
||||
fromA: 6,
|
||||
toA: 10,
|
||||
fromB: 6,
|
||||
toB: 6,
|
||||
spans
|
||||
});
|
||||
expect(spans).toEqual([{ start: 3, end: 5 }]);
|
||||
});
|
||||
|
||||
it("should handle span fully within - insert", () => {
|
||||
const spans = [{ start: 3, end: 5 }];
|
||||
updateSelection({
|
||||
fromA: 4,
|
||||
toA: 4,
|
||||
fromB: 4,
|
||||
toB: 6,
|
||||
spans
|
||||
});
|
||||
expect(spans).toEqual([{ start: 3, end: 7 }]);
|
||||
});
|
||||
|
||||
it("should handle span fully within - delete", () => {
|
||||
const spans = [{ start: 3, end: 5 }];
|
||||
updateSelection({
|
||||
fromA: 4,
|
||||
toA: 5,
|
||||
fromB: 4,
|
||||
toB: 4,
|
||||
spans
|
||||
});
|
||||
expect(spans).toEqual([{ start: 3, end: 4 }]);
|
||||
});
|
||||
|
||||
it("should handle span overlapping with start", () => {
|
||||
const spans = [{ start: 3, end: 5 }];
|
||||
updateSelection({
|
||||
fromA: 2,
|
||||
toA: 4,
|
||||
fromB: 2,
|
||||
toB: 2,
|
||||
spans
|
||||
});
|
||||
expect(spans).toEqual([{ start: 2, end: 4 }]);
|
||||
});
|
||||
|
||||
it("should handle span overlapping with end", () => {
|
||||
const spans = [{ start: 3, end: 5 }];
|
||||
updateSelection({
|
||||
fromA: 4,
|
||||
toA: 6,
|
||||
fromB: 4,
|
||||
toB: 4,
|
||||
spans
|
||||
});
|
||||
expect(spans).toEqual([{ start: 3, end: 4 }]);
|
||||
});
|
||||
|
||||
it("delete entire selection", () => {
|
||||
const spans = [{ start: 3, end: 5 }];
|
||||
updateSelection({
|
||||
fromA: 0,
|
||||
toA: 10,
|
||||
fromB: 0,
|
||||
toB: 0,
|
||||
spans
|
||||
});
|
||||
expect(spans).toEqual([{ start: 0, end: 0 }]);
|
||||
});
|
||||
});
|
||||
|
|
@ -1,40 +0,0 @@
|
|||
import type { CursorSpan } from "sync-client";
|
||||
|
||||
export const updateSelection = ({
|
||||
fromA,
|
||||
toA,
|
||||
toB,
|
||||
spans
|
||||
}: {
|
||||
fromA: number;
|
||||
toA: number;
|
||||
fromB: number;
|
||||
toB: number;
|
||||
spans: CursorSpan[];
|
||||
}): void => {
|
||||
spans.forEach((span) => {
|
||||
if (fromA <= span.start) {
|
||||
// the change covers the entirety of the selection
|
||||
if (toA > span.end) {
|
||||
span.start = toB;
|
||||
span.end = toB;
|
||||
return;
|
||||
}
|
||||
|
||||
let change = toB - toA;
|
||||
if (change < 0) {
|
||||
// it's a deletion
|
||||
// if overlaps with the start, we can't move it back more than the deleted range
|
||||
change = Math.max(change, fromA - span.start);
|
||||
}
|
||||
|
||||
span.start += change;
|
||||
span.end += change;
|
||||
} else if (toA <= span.end) {
|
||||
span.end += toB - toA;
|
||||
} else if (toB <= span.end) {
|
||||
// a deletion overlaps with the end, so we move the end
|
||||
span.end = toB;
|
||||
}
|
||||
});
|
||||
};
|
||||
1
frontend/package-lock.json
generated
1
frontend/package-lock.json
generated
|
|
@ -6373,6 +6373,7 @@
|
|||
"jest": "^29.7.0",
|
||||
"mini-css-extract-plugin": "^2.9.2",
|
||||
"obsidian": "1.8.7",
|
||||
"reconcile-text": "^0.5.0",
|
||||
"resolve-url-loader": "^5.0.0",
|
||||
"sass": "^1.89.1",
|
||||
"sass-loader": "^16.0.5",
|
||||
|
|
|
|||
|
|
@ -14,26 +14,16 @@ import { ConnectionStatus } from "./services/connection-status";
|
|||
import { UnrestrictedSyncer } from "./sync-operations/unrestricted-syncer";
|
||||
import { rateLimit } from "./utils/rate-limit";
|
||||
import type { NetworkConnectionStatus } from "./types/network-connection-status";
|
||||
import { DocumentUpdateStatus } from "./types/document-update-status";
|
||||
import { DocumentSyncStatus } from "./types/document-sync-status";
|
||||
import { WebSocketManager } from "./services/websocket-manager";
|
||||
import { createClientId } from "./utils/create-client-id";
|
||||
import { CursorTracker } from "./sync-operations/cursor-tracker";
|
||||
import type { CursorSpan } from "./services/types/CursorSpan";
|
||||
import type { ClientCursors } from "./services/types/ClientCursors";
|
||||
import type { DocumentWithCursors } from "./services/types/DocumentWithCursors";
|
||||
import { hash } from "./utils/hash";
|
||||
import type { DocumentWithMaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors";
|
||||
|
||||
enum DocumentUpToDateness {
|
||||
UpToDate = "UpToDate",
|
||||
Prior = "Prior",
|
||||
Later = "Later"
|
||||
}
|
||||
import type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors";
|
||||
import { FileChangeNotifier } from "./sync-operations/file-change-notifier";
|
||||
|
||||
export class SyncClient {
|
||||
private static readonly MINIMUM_SAVE_INTERVAL_MS = 1000;
|
||||
private lastCursorState: DocumentWithCursors[] = [];
|
||||
|
||||
private readonly knownClientCursors: ClientCursors[] = [];
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/max-params
|
||||
private constructor(
|
||||
|
|
@ -45,7 +35,8 @@ export class SyncClient {
|
|||
private readonly webSocketManager: WebSocketManager,
|
||||
private readonly _logger: Logger,
|
||||
private readonly connectionStatus: ConnectionStatus,
|
||||
private readonly fileOperations: FileOperations
|
||||
private readonly cursorTracker: CursorTracker,
|
||||
private readonly fileChangeNotifier: FileChangeNotifier
|
||||
) {
|
||||
this.settings.addOnSettingsChangeListener(
|
||||
async (newSettings, oldSettings) => {
|
||||
|
|
@ -54,10 +45,6 @@ export class SyncClient {
|
|||
}
|
||||
}
|
||||
);
|
||||
|
||||
this.webSocketManager.addRemoteCursorsUpdateListener((cursors) => {
|
||||
this.knownClientCursors.push(...cursors);
|
||||
});
|
||||
}
|
||||
|
||||
public get logger(): Logger {
|
||||
|
|
@ -148,7 +135,6 @@ export class SyncClient {
|
|||
);
|
||||
|
||||
const syncer = new Syncer(
|
||||
deviceId,
|
||||
logger,
|
||||
database,
|
||||
settings,
|
||||
|
|
@ -166,6 +152,13 @@ export class SyncClient {
|
|||
webSocket
|
||||
);
|
||||
|
||||
const fileChangeNotifier = new FileChangeNotifier();
|
||||
const cursorTracker = new CursorTracker(
|
||||
database,
|
||||
webSocketManager,
|
||||
fileOperations,
|
||||
fileChangeNotifier
|
||||
);
|
||||
const client = new SyncClient(
|
||||
history,
|
||||
settings,
|
||||
|
|
@ -175,7 +168,8 @@ export class SyncClient {
|
|||
webSocketManager,
|
||||
logger,
|
||||
connectionStatus,
|
||||
fileOperations
|
||||
cursorTracker,
|
||||
fileChangeNotifier
|
||||
);
|
||||
|
||||
logger.info("SyncClient initialised");
|
||||
|
|
@ -264,12 +258,14 @@ export class SyncClient {
|
|||
public async syncLocallyCreatedFile(
|
||||
relativePath: RelativePath
|
||||
): Promise<void> {
|
||||
this.fileChangeNotifier.notifyOfFileChange(relativePath);
|
||||
return this.syncer.syncLocallyCreatedFile(relativePath);
|
||||
}
|
||||
|
||||
public async syncLocallyDeletedFile(
|
||||
relativePath: RelativePath
|
||||
): Promise<void> {
|
||||
this.fileChangeNotifier.notifyOfFileChange(relativePath);
|
||||
return this.syncer.syncLocallyDeletedFile(relativePath);
|
||||
}
|
||||
|
||||
|
|
@ -280,6 +276,7 @@ export class SyncClient {
|
|||
oldPath?: RelativePath;
|
||||
relativePath: RelativePath;
|
||||
}): Promise<void> {
|
||||
this.fileChangeNotifier.notifyOfFileChange(relativePath);
|
||||
return this.syncer.syncLocallyUpdatedFile({
|
||||
oldPath,
|
||||
relativePath
|
||||
|
|
@ -288,154 +285,26 @@ export class SyncClient {
|
|||
|
||||
public getDocumentSyncingStatus(
|
||||
relativePath: RelativePath
|
||||
): DocumentUpdateStatus {
|
||||
): DocumentSyncStatus {
|
||||
const document =
|
||||
this.database.getLatestDocumentByRelativePath(relativePath);
|
||||
if (document === undefined) {
|
||||
return DocumentUpdateStatus.SYNCING;
|
||||
return DocumentSyncStatus.SYNCING;
|
||||
}
|
||||
return document.updates.length > 0
|
||||
? DocumentUpdateStatus.SYNCING
|
||||
: DocumentUpdateStatus.UP_TO_DATE;
|
||||
? DocumentSyncStatus.SYNCING
|
||||
: DocumentSyncStatus.UP_TO_DATE;
|
||||
}
|
||||
|
||||
/// Update the local cursors for the given documents.
|
||||
/// Can be called frequently as it only emits an event
|
||||
// if the state has actually changed.
|
||||
public async updateLocalCursors(
|
||||
documentToCursors: Record<RelativePath, CursorSpan[]>
|
||||
): Promise<void> {
|
||||
const documentsWithCursors: DocumentWithCursors[] = [];
|
||||
|
||||
for (const [relativePath, cursors] of Object.entries(
|
||||
documentToCursors
|
||||
)) {
|
||||
const record =
|
||||
this.database.getLatestDocumentByRelativePath(relativePath);
|
||||
|
||||
if (!record) {
|
||||
continue; // Let's wait for the file to be created before sending cursors
|
||||
}
|
||||
|
||||
const readContent = await this.fileOperations.read(relativePath);
|
||||
|
||||
if (record.metadata?.hash !== hash(readContent)) {
|
||||
continue; // Wouldn't make sense to sync the positions in a dirty file
|
||||
}
|
||||
|
||||
documentsWithCursors.push({
|
||||
relative_path: relativePath,
|
||||
document_id: record.documentId,
|
||||
vault_update_id: record.metadata.parentVersionId,
|
||||
cursors
|
||||
});
|
||||
}
|
||||
|
||||
if (
|
||||
JSON.stringify(this.lastCursorState) ===
|
||||
JSON.stringify(documentsWithCursors)
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.lastCursorState = documentsWithCursors;
|
||||
|
||||
this.webSocketManager.updateLocalCursors({ documentsWithCursors });
|
||||
await this.cursorTracker.sendLocalCursorsToServer(documentToCursors);
|
||||
}
|
||||
|
||||
public addRemoteCursorsUpdateListener(
|
||||
listener: (cursors: DocumentWithMaybeOutdatedClientCursors[]) => unknown
|
||||
listener: (cursors: MaybeOutdatedClientCursors[]) => unknown
|
||||
): void {
|
||||
this.webSocketManager.addRemoteCursorsUpdateListener(async () => {
|
||||
listener(await this.getRelevantClientCursors());
|
||||
});
|
||||
}
|
||||
|
||||
private async getRelevantClientCursors(): Promise<
|
||||
DocumentWithMaybeOutdatedClientCursors[]
|
||||
> {
|
||||
const result: DocumentWithMaybeOutdatedClientCursors[] = [];
|
||||
const included = new Set<string>();
|
||||
for (const clientCursors of [...this.knownClientCursors].reverse()) {
|
||||
if (included.has(clientCursors.deviceId)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const upToDateness =
|
||||
await this.getDocumentsUpToDateness(clientCursors);
|
||||
if (upToDateness == DocumentUpToDateness.Later) {
|
||||
continue;
|
||||
}
|
||||
|
||||
result.push({
|
||||
...clientCursors,
|
||||
isOutdated: upToDateness == DocumentUpToDateness.Prior
|
||||
});
|
||||
|
||||
included.add(clientCursors.deviceId);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private async getDocumentsUpToDateness(
|
||||
clientCursor: ClientCursors
|
||||
): Promise<DocumentUpToDateness> {
|
||||
const results = [];
|
||||
for (const document of clientCursor.documentsWithCursors) {
|
||||
results.push(await this.getDocumentUpToDateness(document));
|
||||
}
|
||||
|
||||
if (
|
||||
results.every((result) => result === DocumentUpToDateness.UpToDate)
|
||||
) {
|
||||
return DocumentUpToDateness.UpToDate;
|
||||
}
|
||||
|
||||
if (
|
||||
results.every(
|
||||
(result) =>
|
||||
result === DocumentUpToDateness.UpToDate ||
|
||||
result === DocumentUpToDateness.Prior
|
||||
)
|
||||
) {
|
||||
return DocumentUpToDateness.Prior;
|
||||
}
|
||||
|
||||
return DocumentUpToDateness.Later;
|
||||
}
|
||||
|
||||
private async getDocumentUpToDateness(
|
||||
document: DocumentWithCursors
|
||||
): Promise<DocumentUpToDateness> {
|
||||
const record = this.database.getLatestDocumentByRelativePath(
|
||||
document.relative_path
|
||||
);
|
||||
|
||||
if (!record) {
|
||||
// the document of the cursor must be from the future
|
||||
return DocumentUpToDateness.Later;
|
||||
}
|
||||
|
||||
if (
|
||||
(record.metadata?.parentVersionId ?? 0) < document.vault_update_id
|
||||
) {
|
||||
return DocumentUpToDateness.Later;
|
||||
} else if (
|
||||
document.vault_update_id < (record.metadata?.parentVersionId ?? 0)
|
||||
) {
|
||||
// the document of the cursor must be from the past
|
||||
return DocumentUpToDateness.Prior;
|
||||
}
|
||||
|
||||
const currentContent = await this.fileOperations.read(
|
||||
document.relative_path
|
||||
);
|
||||
|
||||
return this.database.getLatestDocumentByRelativePath(
|
||||
document.relative_path
|
||||
)?.metadata?.hash === hash(currentContent)
|
||||
? DocumentUpToDateness.UpToDate
|
||||
: DocumentUpToDateness.Prior;
|
||||
this.cursorTracker.addRemoteCursorsUpdateListener(listener);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
250
frontend/sync-client/src/sync-operations/cursor-tracker.ts
Normal file
250
frontend/sync-client/src/sync-operations/cursor-tracker.ts
Normal file
|
|
@ -0,0 +1,250 @@
|
|||
import type { FileOperations } from "../file-operations/file-operations";
|
||||
import type { Database, RelativePath } from "../persistence/database";
|
||||
import type { ClientCursors } from "../services/types/ClientCursors";
|
||||
import type { CursorSpan } from "../services/types/CursorSpan";
|
||||
import type { DocumentWithCursors } from "../services/types/DocumentWithCursors";
|
||||
import type { WebSocketManager } from "../services/websocket-manager";
|
||||
import type { MaybeOutdatedClientCursors } from "../types/maybe-outdated-client-cursors";
|
||||
import { DocumentUpToDateness } from "../types/document-up-to-dateness";
|
||||
import { hash } from "../utils/hash";
|
||||
import type { FileChangeNotifier } from "./file-change-notifier";
|
||||
import { Lock } from "../utils/locks";
|
||||
|
||||
// Cursor positions are updated separately from documents. However, a given cursor position is only
|
||||
// valid within a certain version of the document it belongs to. This class tracks previous and the latest
|
||||
// known remote cursor positions, and for each document, tries to return the latest cursor positions that are
|
||||
// not from the future.
|
||||
export class CursorTracker {
|
||||
private readonly updateLock = new Lock();
|
||||
|
||||
private knownRemoteCursors: (ClientCursors & {
|
||||
upToDateness: DocumentUpToDateness;
|
||||
})[] = [];
|
||||
|
||||
private lastLocalCursorState: DocumentWithCursors[] = [];
|
||||
private lastLocalCursorStateWithoutDirtyDocuments: DocumentWithCursors[] =
|
||||
[];
|
||||
|
||||
public constructor(
|
||||
private readonly database: Database,
|
||||
private readonly webSocketManager: WebSocketManager,
|
||||
private readonly fileOperations: FileOperations,
|
||||
private readonly fileChangeNotifier: FileChangeNotifier
|
||||
) {
|
||||
this.webSocketManager.addRemoteCursorsUpdateListener(
|
||||
async (clientCursors) => {
|
||||
await this.updateLock.withLock(async () => {
|
||||
// The latest message will contain all active clients, so we can delete the ones
|
||||
// from the local list which are no longer active.
|
||||
const allIds = new Set(
|
||||
clientCursors.map((c) => c.deviceId)
|
||||
);
|
||||
const updatedKnownRemoteCursors =
|
||||
this.knownRemoteCursors.filter((c) =>
|
||||
allIds.has(c.deviceId)
|
||||
);
|
||||
|
||||
for (const cursor of clientCursors.filter((client) =>
|
||||
client.documentsWithCursors.every(
|
||||
(doc) => doc.vault_update_id != null
|
||||
)
|
||||
)) {
|
||||
updatedKnownRemoteCursors.push({
|
||||
...cursor,
|
||||
upToDateness:
|
||||
await this.getDocumentsUpToDateness(cursor)
|
||||
});
|
||||
}
|
||||
|
||||
this.knownRemoteCursors = updatedKnownRemoteCursors;
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
this.fileChangeNotifier.addFileChangeListener(async (relativePath) => {
|
||||
this.updateLock.withLock(async () => {
|
||||
for (const clientCursor of this.knownRemoteCursors) {
|
||||
if (
|
||||
clientCursor.documentsWithCursors.some(
|
||||
(document) =>
|
||||
document.relative_path === relativePath
|
||||
)
|
||||
) {
|
||||
clientCursor.upToDateness =
|
||||
await this.getDocumentsUpToDateness(clientCursor);
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/// Update the local cursors for the given documents.
|
||||
/// Can be called frequently as it only emits an event
|
||||
/// if the state has actually changed.
|
||||
public async sendLocalCursorsToServer(
|
||||
documentToCursors: Record<RelativePath, CursorSpan[]>
|
||||
): Promise<void> {
|
||||
const documentsWithCursors: DocumentWithCursors[] = [];
|
||||
|
||||
for (const [relativePath, cursors] of Object.entries(
|
||||
documentToCursors
|
||||
)) {
|
||||
const record =
|
||||
this.database.getLatestDocumentByRelativePath(relativePath);
|
||||
|
||||
if (!record) {
|
||||
continue; // Let's wait for the file to be created before sending cursors
|
||||
}
|
||||
|
||||
if (!record.metadata) {
|
||||
continue; // this is a new document, no need to sync the cursors
|
||||
}
|
||||
|
||||
documentsWithCursors.push({
|
||||
relative_path: relativePath,
|
||||
document_id: record.documentId,
|
||||
vault_update_id: record.metadata.parentVersionId,
|
||||
cursors
|
||||
});
|
||||
}
|
||||
|
||||
if (
|
||||
JSON.stringify(this.lastLocalCursorState) ===
|
||||
JSON.stringify(documentsWithCursors)
|
||||
) {
|
||||
// Caching step to avoid reading the edited files all the time
|
||||
return;
|
||||
}
|
||||
this.lastLocalCursorState = documentsWithCursors;
|
||||
|
||||
for (const doc of documentsWithCursors) {
|
||||
const readContent = await this.fileOperations.read(
|
||||
doc.relative_path
|
||||
);
|
||||
const record = this.database.getLatestDocumentByRelativePath(
|
||||
doc.relative_path
|
||||
);
|
||||
if (record?.metadata?.hash !== hash(readContent)) {
|
||||
doc.vault_update_id = null;
|
||||
}
|
||||
}
|
||||
|
||||
if (
|
||||
JSON.stringify(this.lastLocalCursorStateWithoutDirtyDocuments) ===
|
||||
JSON.stringify(documentsWithCursors)
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.lastLocalCursorStateWithoutDirtyDocuments = documentsWithCursors;
|
||||
|
||||
this.webSocketManager.updateLocalCursors({ documentsWithCursors });
|
||||
}
|
||||
|
||||
// The returned position may be accurate, if it matches the document version, or outdated, in which case
|
||||
// the client has to heuristically guess it's current position based on the local edits.
|
||||
public addRemoteCursorsUpdateListener(
|
||||
listener: (cursors: MaybeOutdatedClientCursors[]) => unknown
|
||||
): void {
|
||||
// CursorTracker registers its own event listener in the constructor so it must get called first
|
||||
this.webSocketManager.addRemoteCursorsUpdateListener(async () => {
|
||||
await this.updateLock.withLock(() =>
|
||||
listener(this.getRelevantAndPruneKnownClientCursors())
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
private getRelevantAndPruneKnownClientCursors(): MaybeOutdatedClientCursors[] {
|
||||
const result: MaybeOutdatedClientCursors[] = [];
|
||||
const included = new Set<string>();
|
||||
|
||||
const relevantCursors = [];
|
||||
for (const clientCursors of [...this.knownRemoteCursors].reverse()) {
|
||||
if (included.has(clientCursors.deviceId)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (clientCursors.upToDateness == DocumentUpToDateness.Later) {
|
||||
continue;
|
||||
}
|
||||
|
||||
result.push({
|
||||
...clientCursors,
|
||||
isOutdated:
|
||||
clientCursors.upToDateness == DocumentUpToDateness.Prior
|
||||
});
|
||||
|
||||
included.add(clientCursors.deviceId);
|
||||
relevantCursors.unshift(clientCursors); // to reverse order back to normal
|
||||
}
|
||||
|
||||
this.knownRemoteCursors = relevantCursors;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// We store up-to-dateness on a per-client basis to simplify the implementation.
|
||||
// An individual client won't have too many documents open at once, so this is a reasonable trade-off.
|
||||
private async getDocumentsUpToDateness(
|
||||
clientCursor: ClientCursors
|
||||
): Promise<DocumentUpToDateness> {
|
||||
const results = [];
|
||||
for (const document of clientCursor.documentsWithCursors) {
|
||||
results.push(await this.getDocumentUpToDateness(document));
|
||||
}
|
||||
|
||||
if (
|
||||
results.every((result) => result === DocumentUpToDateness.UpToDate)
|
||||
) {
|
||||
return DocumentUpToDateness.UpToDate;
|
||||
}
|
||||
|
||||
if (
|
||||
results.every(
|
||||
(result) =>
|
||||
result === DocumentUpToDateness.UpToDate ||
|
||||
result === DocumentUpToDateness.Prior
|
||||
)
|
||||
) {
|
||||
return DocumentUpToDateness.Prior;
|
||||
}
|
||||
|
||||
return DocumentUpToDateness.Later;
|
||||
}
|
||||
|
||||
private async getDocumentUpToDateness(
|
||||
document: DocumentWithCursors
|
||||
): Promise<DocumentUpToDateness> {
|
||||
const record = this.database.getLatestDocumentByRelativePath(
|
||||
document.relative_path
|
||||
);
|
||||
|
||||
if (!record) {
|
||||
// the document of the cursor must be from the future
|
||||
return DocumentUpToDateness.Later;
|
||||
}
|
||||
|
||||
if (
|
||||
(record.metadata?.parentVersionId ?? 0) <
|
||||
(document.vault_update_id ?? 0)
|
||||
) {
|
||||
return DocumentUpToDateness.Later;
|
||||
} else if (
|
||||
(document.vault_update_id ?? 0) <
|
||||
(record.metadata?.parentVersionId ?? 0)
|
||||
) {
|
||||
// the document of the cursor must be from the past
|
||||
return DocumentUpToDateness.Prior;
|
||||
}
|
||||
|
||||
const currentContent = await this.fileOperations.read(
|
||||
document.relative_path
|
||||
);
|
||||
|
||||
return this.database.getLatestDocumentByRelativePath(
|
||||
document.relative_path
|
||||
)?.metadata?.hash === hash(currentContent)
|
||||
? DocumentUpToDateness.UpToDate
|
||||
: DocumentUpToDateness.Prior;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
import type { RelativePath } from "../persistence/database";
|
||||
|
||||
export class FileChangeNotifier {
|
||||
private readonly listeners: ((filePath: RelativePath) => unknown)[] = [];
|
||||
|
||||
public addFileChangeListener(
|
||||
listener: (filePath: RelativePath) => unknown
|
||||
): void {
|
||||
this.listeners.push(listener);
|
||||
}
|
||||
|
||||
public notifyOfFileChange(filePath: RelativePath): void {
|
||||
this.listeners.forEach((listener) => listener(filePath));
|
||||
}
|
||||
}
|
||||
|
|
@ -9,7 +9,7 @@ import type { Logger } from "../tracing/logger";
|
|||
import PQueue from "p-queue";
|
||||
import { hash } from "../utils/hash";
|
||||
import { v4 as uuidv4 } from "uuid";
|
||||
import type { Settings, SyncSettings } from "../persistence/settings";
|
||||
import type { Settings } from "../persistence/settings";
|
||||
import type { FileOperations } from "../file-operations/file-operations";
|
||||
import { findMatchingFile } from "../utils/find-matching-file";
|
||||
import type { UnrestrictedSyncer } from "./unrestricted-syncer";
|
||||
|
|
@ -27,12 +27,10 @@ export class Syncer {
|
|||
|
||||
private runningScheduleSyncForOfflineChanges: Promise<void> | undefined;
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/max-params
|
||||
public constructor(
|
||||
private readonly deviceId: string,
|
||||
private readonly logger: Logger,
|
||||
private readonly database: Database,
|
||||
private readonly settings: Settings,
|
||||
settings: Settings,
|
||||
private readonly syncService: SyncService,
|
||||
private readonly operations: FileOperations,
|
||||
private readonly internalSyncer: UnrestrictedSyncer
|
||||
|
|
@ -261,20 +259,16 @@ export class Syncer {
|
|||
remoteVersion.documentId
|
||||
);
|
||||
|
||||
let hasLockToRelease = false;
|
||||
if (document === undefined) {
|
||||
// Let's avoid the same documents getting created in parallel multiple times.
|
||||
// There might be multiple tasks waiting for the lock
|
||||
await this.remoteDocumentsLock.waitForLock(
|
||||
remoteVersion.documentId
|
||||
);
|
||||
hasLockToRelease = true;
|
||||
return this.remoteDocumentsLock.withLock(
|
||||
remoteVersion.documentId,
|
||||
async () => {
|
||||
document = this.database.getDocumentByDocumentId(
|
||||
remoteVersion.documentId
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
// We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile`
|
||||
if (document === undefined) {
|
||||
await this.syncQueue.add(async () =>
|
||||
|
|
@ -308,11 +302,34 @@ export class Syncer {
|
|||
}
|
||||
|
||||
this.database.addSeenUpdateId(remoteVersion.vaultUpdateId);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile`
|
||||
const [promise, resolve, reject] = createPromise();
|
||||
|
||||
document = await this.database.getResolvedDocumentByRelativePath(
|
||||
document.relativePath,
|
||||
promise
|
||||
);
|
||||
|
||||
try {
|
||||
await this.syncQueue.add(async () =>
|
||||
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
|
||||
remoteVersion,
|
||||
document
|
||||
)
|
||||
);
|
||||
|
||||
resolve();
|
||||
} catch (e) {
|
||||
reject(e);
|
||||
} finally {
|
||||
if (hasLockToRelease) {
|
||||
this.remoteDocumentsLock.unlock(remoteVersion.documentId);
|
||||
}
|
||||
this.database.removeDocumentPromise(promise);
|
||||
}
|
||||
|
||||
this.database.addSeenUpdateId(remoteVersion.vaultUpdateId);
|
||||
}
|
||||
|
||||
private async internalScheduleSyncForOfflineChanges(): Promise<void> {
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ type ResolveFunction<T> = undefined extends T
|
|||
* A type-safe utility function to create a Promise with resolve and reject functions.
|
||||
* @returns A tuple containing a Promise, a resolve function, and a reject function.
|
||||
*/
|
||||
export function createPromise<T = unknown | undefined>(): [
|
||||
export function createPromise<T = unknown>(): [
|
||||
Promise<T>,
|
||||
ResolveFunction<T>,
|
||||
(error: unknown) => unknown
|
||||
|
|
|
|||
|
|
@ -2,8 +2,9 @@ import { Logger } from "../tracing/logger";
|
|||
import type { RelativePath } from "../persistence/database";
|
||||
import { Locks } from "./locks";
|
||||
|
||||
describe("Document lock", () => {
|
||||
describe("withLock", () => {
|
||||
const testPath: RelativePath = "test/document/path";
|
||||
const testPath2: RelativePath = "test/document/path2";
|
||||
const logger = new Logger();
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/init-declarations
|
||||
|
|
@ -13,77 +14,197 @@ describe("Document lock", () => {
|
|||
locks = new Locks<RelativePath>(logger);
|
||||
});
|
||||
|
||||
test("should lock a document successfully", () => {
|
||||
const result = locks.tryLock(testPath);
|
||||
expect(result).toBe(true);
|
||||
test("should execute function with single key lock", async () => {
|
||||
let executionCount = 0;
|
||||
const result = await locks.withLock(testPath, () => {
|
||||
executionCount++;
|
||||
return "success";
|
||||
});
|
||||
|
||||
test("should not lock a document that is already locked", () => {
|
||||
locks.tryLock(testPath);
|
||||
const result = locks.tryLock(testPath);
|
||||
expect(result).toBe(false);
|
||||
expect(result).toBe("success");
|
||||
expect(executionCount).toBe(1);
|
||||
});
|
||||
|
||||
test("should unlock a locked document", () => {
|
||||
locks.tryLock(testPath);
|
||||
locks.unlock(testPath);
|
||||
const result = locks.tryLock(testPath);
|
||||
expect(result).toBe(true);
|
||||
locks.unlock(testPath);
|
||||
test("should execute async function with single key lock", async () => {
|
||||
let executionCount = 0;
|
||||
const result = await locks.withLock(testPath, async () => {
|
||||
executionCount++;
|
||||
await new Promise(resolve => setTimeout(resolve, 10));
|
||||
return "async-success";
|
||||
});
|
||||
|
||||
test("should throw an error when unlocking a document that is not locked", () => {
|
||||
expect(() => {
|
||||
locks.unlock(testPath);
|
||||
}).toThrow(`Key '${testPath}' is not locked, cannot unlock`);
|
||||
expect(result).toBe("async-success");
|
||||
expect(executionCount).toBe(1);
|
||||
});
|
||||
|
||||
test("should wait for a document lock and resolve when unlocked", async () => {
|
||||
locks.tryLock(testPath);
|
||||
|
||||
let resolved = false;
|
||||
const waitPromise = locks.waitForLock(testPath).then(() => {
|
||||
resolved = true;
|
||||
test("should execute function with multiple key locks", async () => {
|
||||
let executionCount = 0;
|
||||
const result = await locks.withLock([testPath, testPath2], () => {
|
||||
executionCount++;
|
||||
return "multi-success";
|
||||
});
|
||||
|
||||
locks.unlock(testPath);
|
||||
await waitPromise;
|
||||
|
||||
expect(resolved).toBe(true);
|
||||
expect(result).toBe("multi-success");
|
||||
expect(executionCount).toBe(1);
|
||||
});
|
||||
|
||||
test("should resolve multiple waiters in FIFO order", async () => {
|
||||
locks.tryLock(testPath);
|
||||
test("should sort multiple keys to prevent deadlocks", async () => {
|
||||
const executionOrder: string[] = [];
|
||||
|
||||
let firstResolved = false;
|
||||
let secondResolved = false;
|
||||
let thirdResolved = false;
|
||||
|
||||
const firstWaitPromise = locks.waitForLock(testPath).then(() => {
|
||||
firstResolved = true;
|
||||
// Start two concurrent operations with keys in different orders
|
||||
const promise1 = locks.withLock([testPath2, testPath], async () => {
|
||||
executionOrder.push("operation1-start");
|
||||
await new Promise(resolve => setTimeout(resolve, 50));
|
||||
executionOrder.push("operation1-end");
|
||||
return "result1";
|
||||
});
|
||||
|
||||
const secondWaitPromise = locks.waitForLock(testPath).then(() => {
|
||||
secondResolved = true;
|
||||
const promise2 = locks.withLock([testPath, testPath2], async () => {
|
||||
executionOrder.push("operation2-start");
|
||||
await new Promise(resolve => setTimeout(resolve, 50));
|
||||
executionOrder.push("operation2-end");
|
||||
return "result2";
|
||||
});
|
||||
|
||||
const thirdWaitPromise = locks.waitForLock(testPath).then(() => {
|
||||
thirdResolved = true;
|
||||
const [result1, result2] = await Promise.all([promise1, promise2]);
|
||||
|
||||
expect(result1).toBe("result1");
|
||||
expect(result2).toBe("result2");
|
||||
// One operation should complete entirely before the other starts
|
||||
expect(executionOrder).toEqual([
|
||||
"operation1-start",
|
||||
"operation1-end",
|
||||
"operation2-start",
|
||||
"operation2-end"
|
||||
]);
|
||||
});
|
||||
|
||||
locks.unlock(testPath);
|
||||
await firstWaitPromise;
|
||||
expect(firstResolved).toBe(true);
|
||||
expect(secondResolved).toBe(false);
|
||||
expect(thirdResolved).toBe(false);
|
||||
test("should serialize access to same key", async () => {
|
||||
const executionOrder: string[] = [];
|
||||
|
||||
locks.unlock(testPath);
|
||||
await secondWaitPromise;
|
||||
expect(secondResolved).toBe(true);
|
||||
expect(thirdResolved).toBe(false);
|
||||
const promise1 = locks.withLock(testPath, async () => {
|
||||
executionOrder.push("operation1-start");
|
||||
await new Promise(resolve => setTimeout(resolve, 50));
|
||||
executionOrder.push("operation1-end");
|
||||
return "result1";
|
||||
});
|
||||
|
||||
locks.unlock(testPath);
|
||||
await thirdWaitPromise;
|
||||
expect(thirdResolved).toBe(true);
|
||||
const promise2 = locks.withLock(testPath, async () => {
|
||||
executionOrder.push("operation2-start");
|
||||
await new Promise(resolve => setTimeout(resolve, 30));
|
||||
executionOrder.push("operation2-end");
|
||||
return "result2";
|
||||
});
|
||||
|
||||
const [result1, result2] = await Promise.all([promise1, promise2]);
|
||||
|
||||
expect(result1).toBe("result1");
|
||||
expect(result2).toBe("result2");
|
||||
expect(executionOrder).toEqual([
|
||||
"operation1-start",
|
||||
"operation1-end",
|
||||
"operation2-start",
|
||||
"operation2-end"
|
||||
]);
|
||||
});
|
||||
|
||||
test("should allow concurrent access to different keys", async () => {
|
||||
const executionOrder: string[] = [];
|
||||
|
||||
const promise1 = locks.withLock(testPath, async () => {
|
||||
executionOrder.push("operation1-start");
|
||||
await new Promise(resolve => setTimeout(resolve, 50));
|
||||
executionOrder.push("operation1-end");
|
||||
return "result1";
|
||||
});
|
||||
|
||||
const promise2 = locks.withLock(testPath2, async () => {
|
||||
executionOrder.push("operation2-start");
|
||||
await new Promise(resolve => setTimeout(resolve, 30));
|
||||
executionOrder.push("operation2-end");
|
||||
return "result2";
|
||||
});
|
||||
|
||||
const [result1, result2] = await Promise.all([promise1, promise2]);
|
||||
|
||||
expect(result1).toBe("result1");
|
||||
expect(result2).toBe("result2");
|
||||
// Both operations should run concurrently
|
||||
expect(executionOrder[0]).toBe("operation1-start");
|
||||
expect(executionOrder[1]).toBe("operation2-start");
|
||||
});
|
||||
|
||||
test("should release locks even if function throws", async () => {
|
||||
const error = new Error("test error");
|
||||
|
||||
await expect(locks.withLock(testPath, () => {
|
||||
throw error;
|
||||
})).rejects.toThrow("test error");
|
||||
|
||||
// Lock should be released, allowing another operation
|
||||
const result = await locks.withLock(testPath, () => "success-after-error");
|
||||
expect(result).toBe("success-after-error");
|
||||
});
|
||||
|
||||
test("should release locks even if async function throws", async () => {
|
||||
const error = new Error("async test error");
|
||||
|
||||
await expect(locks.withLock(testPath, async () => {
|
||||
await new Promise(resolve => setTimeout(resolve, 10));
|
||||
throw error;
|
||||
})).rejects.toThrow("async test error");
|
||||
|
||||
// Lock should be released, allowing another operation
|
||||
const result = await locks.withLock(testPath, () => "success-after-async-error");
|
||||
expect(result).toBe("success-after-async-error");
|
||||
});
|
||||
|
||||
test("should handle empty array of keys", async () => {
|
||||
const result = await locks.withLock([], () => "empty-keys");
|
||||
expect(result).toBe("empty-keys");
|
||||
});
|
||||
|
||||
test("should maintain FIFO order for multiple waiters", async () => {
|
||||
const executionOrder: string[] = [];
|
||||
|
||||
// Start first operation that holds the lock
|
||||
const firstPromise = locks.withLock(testPath, async () => {
|
||||
executionOrder.push("first-start");
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
executionOrder.push("first-end");
|
||||
return "first";
|
||||
});
|
||||
|
||||
// Small delay to ensure first operation starts
|
||||
await new Promise(resolve => setTimeout(resolve, 10));
|
||||
|
||||
// Queue second and third operations
|
||||
const secondPromise = locks.withLock(testPath, async () => {
|
||||
executionOrder.push("second-start");
|
||||
await new Promise(resolve => setTimeout(resolve, 30));
|
||||
executionOrder.push("second-end");
|
||||
return "second";
|
||||
});
|
||||
|
||||
const thirdPromise = locks.withLock(testPath, async () => {
|
||||
executionOrder.push("third-start");
|
||||
await new Promise(resolve => setTimeout(resolve, 20));
|
||||
executionOrder.push("third-end");
|
||||
return "third";
|
||||
});
|
||||
|
||||
const [first, second, third] = await Promise.all([firstPromise, secondPromise, thirdPromise]);
|
||||
|
||||
expect(first).toBe("first");
|
||||
expect(second).toBe("second");
|
||||
expect(third).toBe("third");
|
||||
expect(executionOrder).toEqual([
|
||||
"first-start",
|
||||
"first-end",
|
||||
"second-start",
|
||||
"second-end",
|
||||
"third-start",
|
||||
"third-end"
|
||||
]);
|
||||
});
|
||||
});
|
||||
Loading…
Add table
Add a link
Reference in a new issue