Compare commits
16 commits
main
...
asch/fix-m
| Author | SHA1 | Date | |
|---|---|---|---|
| c462994af9 | |||
| 02ec098283 | |||
| 491a601ad2 | |||
| fc40d3eccf | |||
| 44f642f649 | |||
| e81d521c9c | |||
| 022c57e88a | |||
| 115c1067f9 | |||
| 49f2d69e59 | |||
| 26cd95cfda | |||
| 063ee2a2c0 | |||
| 7ea8a77403 | |||
| a813ed9f73 | |||
| fc4ddb8f1f | |||
| 8442520ce3 | |||
| efcdb4b9de |
36 changed files with 926 additions and 686 deletions
31
.github/workflows/check.yml
vendored
31
.github/workflows/check.yml
vendored
|
|
@ -30,32 +30,5 @@ jobs:
|
||||||
sqlx database create --database-url sqlite://db.sqlite3
|
sqlx database create --database-url sqlite://db.sqlite3
|
||||||
sqlx migrate run --source src/app_state/database/migrations --database-url sqlite://db.sqlite3
|
sqlx migrate run --source src/app_state/database/migrations --database-url sqlite://db.sqlite3
|
||||||
|
|
||||||
- name: Lint sync-server
|
- name: Lint & test
|
||||||
run: |
|
run: scripts/check.sh
|
||||||
cd sync-server
|
|
||||||
cargo clippy --all-targets --all-features
|
|
||||||
cargo fmt --all -- --check
|
|
||||||
cargo machete
|
|
||||||
|
|
||||||
- name: Test sync-server
|
|
||||||
run: |
|
|
||||||
cd sync-server
|
|
||||||
cargo test --verbose
|
|
||||||
|
|
||||||
- name: Lint frontend
|
|
||||||
run: |
|
|
||||||
cd frontend
|
|
||||||
npm ci
|
|
||||||
npm run build
|
|
||||||
npm run lint
|
|
||||||
if [[ $(git status --porcelain) ]]; then
|
|
||||||
git status --porcelain
|
|
||||||
echo "Failing CI because the working directory is not clean after linting"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
- name: Test frontend
|
|
||||||
run: |
|
|
||||||
cd frontend
|
|
||||||
npm ci
|
|
||||||
npm run test
|
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,7 @@
|
||||||
"url": "^0.11.4",
|
"url": "^0.11.4",
|
||||||
"virtual-scroller": "^1.13.1",
|
"virtual-scroller": "^1.13.1",
|
||||||
"webpack": "^5.99.9",
|
"webpack": "^5.99.9",
|
||||||
"webpack-cli": "^6.0.1"
|
"webpack-cli": "^6.0.1",
|
||||||
|
"reconcile-text": "^0.5.0"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,15 +24,18 @@ export function flakyWebSocketFactory(
|
||||||
|
|
||||||
public set onmessage(callback: (event: MessageEvent) => void) {
|
public set onmessage(callback: (event: MessageEvent) => void) {
|
||||||
super.onmessage = async (event: MessageEvent): Promise<void> => {
|
super.onmessage = async (event: MessageEvent): Promise<void> => {
|
||||||
await this.locks.waitForLock(FlakyWebSocket.RECEIVE_KEY);
|
await this.locks.withLock(
|
||||||
|
FlakyWebSocket.RECEIVE_KEY,
|
||||||
|
async () => {
|
||||||
|
if (jitterScaleInSeconds > 0) {
|
||||||
|
await sleep(
|
||||||
|
Math.random() * jitterScaleInSeconds * 1000
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
if (jitterScaleInSeconds > 0) {
|
callback(event);
|
||||||
await sleep(Math.random() * jitterScaleInSeconds * 1000);
|
}
|
||||||
}
|
);
|
||||||
|
|
||||||
callback(event);
|
|
||||||
|
|
||||||
this.locks.unlock(FlakyWebSocket.RECEIVE_KEY);
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -66,15 +69,12 @@ export function flakyWebSocketFactory(
|
||||||
data: string | ArrayBufferLike | Blob | ArrayBufferView
|
data: string | ArrayBufferLike | Blob | ArrayBufferView
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
// maintain message order
|
// maintain message order
|
||||||
await this.locks.waitForLock(FlakyWebSocket.SEND_KEY);
|
await this.locks.withLock(FlakyWebSocket.SEND_KEY, async () => {
|
||||||
|
if (jitterScaleInSeconds > 0) {
|
||||||
if (jitterScaleInSeconds > 0) {
|
await sleep(Math.random() * jitterScaleInSeconds * 1000);
|
||||||
await sleep(Math.random() * jitterScaleInSeconds * 1000);
|
}
|
||||||
}
|
super.send(data);
|
||||||
|
});
|
||||||
super.send(data);
|
|
||||||
|
|
||||||
this.locks.unlock(FlakyWebSocket.SEND_KEY);
|
|
||||||
}
|
}
|
||||||
} as unknown as typeof WebSocket;
|
} as unknown as typeof WebSocket;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ import { updateEditorStatusDisplay } from "./views/editor-sync-line/editor-sync-
|
||||||
import { remoteCursorsTheme } from "./views/cursors/remote-cursor-theme";
|
import { remoteCursorsTheme } from "./views/cursors/remote-cursor-theme";
|
||||||
import {
|
import {
|
||||||
remoteCursorsPlugin,
|
remoteCursorsPlugin,
|
||||||
setCursors
|
RemoteCursorsPluginValue
|
||||||
} from "./views/cursors/remote-cursors-plugin";
|
} from "./views/cursors/remote-cursors-plugin";
|
||||||
import { LocalCursorUpdateListener } from "./views/cursors/local-cursor-update-listener";
|
import { LocalCursorUpdateListener } from "./views/cursors/local-cursor-update-listener";
|
||||||
import { slowFetchFactory } from "./debugging/slow-fetch-factory";
|
import { slowFetchFactory } from "./debugging/slow-fetch-factory";
|
||||||
|
|
@ -93,7 +93,7 @@ export default class VaultLinkPlugin extends Plugin {
|
||||||
this.registerEditorExtension([remoteCursorsTheme, remoteCursorsPlugin]);
|
this.registerEditorExtension([remoteCursorsTheme, remoteCursorsPlugin]);
|
||||||
|
|
||||||
this.client.addRemoteCursorsUpdateListener((cursors) => {
|
this.client.addRemoteCursorsUpdateListener((cursors) => {
|
||||||
setCursors(cursors, this.app);
|
RemoteCursorsPluginValue.setCursors(cursors, this.app);
|
||||||
});
|
});
|
||||||
|
|
||||||
const cursorListener = new LocalCursorUpdateListener(
|
const cursorListener = new LocalCursorUpdateListener(
|
||||||
|
|
|
||||||
|
|
@ -9,104 +9,201 @@ import type {
|
||||||
ViewUpdate
|
ViewUpdate
|
||||||
} from "@codemirror/view";
|
} from "@codemirror/view";
|
||||||
import { RemoteCursorWidget } from "./remote-cursor-widget";
|
import { RemoteCursorWidget } from "./remote-cursor-widget";
|
||||||
import type {
|
import type { CursorSpan, MaybeOutdatedClientCursors } from "sync-client";
|
||||||
CursorSpan,
|
|
||||||
DocumentWithMaybeOutdatedClientCursors
|
|
||||||
} from "sync-client";
|
|
||||||
import type { App } from "obsidian";
|
import type { App } from "obsidian";
|
||||||
import { MarkdownView } from "obsidian";
|
import { MarkdownView } from "obsidian";
|
||||||
|
|
||||||
let cursors: {
|
|
||||||
name: string;
|
|
||||||
path: string;
|
|
||||||
span: CursorSpan;
|
|
||||||
deviceId: string;
|
|
||||||
}[] = [];
|
|
||||||
|
|
||||||
import { StateEffect } from "@codemirror/state";
|
import { StateEffect } from "@codemirror/state";
|
||||||
import { getRandomColor } from "src/utils/get-random-color";
|
import { getRandomColor } from "src/utils/get-random-color";
|
||||||
import { updateSelection } from "./update-selection";
|
import type { SpanWithHistory } from "reconcile-text";
|
||||||
|
import { reconcileWithHistory } from "reconcile-text";
|
||||||
|
|
||||||
|
function findWhereToMoveCursor(
|
||||||
|
cursor: number,
|
||||||
|
spans: SpanWithHistory[]
|
||||||
|
): number | null {
|
||||||
|
let position = 0;
|
||||||
|
for (const span of spans) {
|
||||||
|
// left and origin are the same
|
||||||
|
if (position === cursor && span.history === "AddedFromRight") {
|
||||||
|
return position + span.text.length;
|
||||||
|
}
|
||||||
|
position += span.text.length;
|
||||||
|
if (position === cursor && span.history === "RemovedFromRight") {
|
||||||
|
return position - span.text.length;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
const forceUpdate = StateEffect.define();
|
const forceUpdate = StateEffect.define();
|
||||||
|
|
||||||
export class RemoteCursorsPluginValue implements PluginValue {
|
export class RemoteCursorsPluginValue implements PluginValue {
|
||||||
|
private static cursors: {
|
||||||
|
name: string;
|
||||||
|
path: string;
|
||||||
|
span: CursorSpan;
|
||||||
|
deviceId: string;
|
||||||
|
isOutdated: boolean;
|
||||||
|
}[] = [];
|
||||||
|
|
||||||
public decorations: DecorationSet = RangeSet.of([]);
|
public decorations: DecorationSet = RangeSet.of([]);
|
||||||
|
|
||||||
public update(update: ViewUpdate): void {
|
public static setCursors(
|
||||||
update.changes.iterChanges((fromA, toA, fromB, toB, _inserted) => {
|
clients: MaybeOutdatedClientCursors[],
|
||||||
const spans = cursors.map((cursor) => cursor.span);
|
app: App
|
||||||
updateSelection({
|
): void {
|
||||||
fromA,
|
RemoteCursorsPluginValue.cursors = [
|
||||||
toA,
|
...RemoteCursorsPluginValue.cursors.filter(({ deviceId }) =>
|
||||||
fromB,
|
clients.some(
|
||||||
toB,
|
(client) =>
|
||||||
spans
|
client.deviceId === deviceId && client.isOutdated
|
||||||
|
)
|
||||||
|
),
|
||||||
|
...clients
|
||||||
|
.filter(
|
||||||
|
({ isOutdated, deviceId }) =>
|
||||||
|
!isOutdated ||
|
||||||
|
RemoteCursorsPluginValue.cursors.every(
|
||||||
|
(c) => deviceId !== c.deviceId
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.flatMap((client) => {
|
||||||
|
const clientCursors = client.documentsWithCursors;
|
||||||
|
return clientCursors.flatMap((cursor) =>
|
||||||
|
cursor.cursors.map((span) => ({
|
||||||
|
name: client.userName,
|
||||||
|
path: cursor.relative_path,
|
||||||
|
deviceId: client.deviceId,
|
||||||
|
isOutdated: client.isOutdated,
|
||||||
|
span: { ...span }
|
||||||
|
}))
|
||||||
|
);
|
||||||
|
})
|
||||||
|
];
|
||||||
|
|
||||||
|
app.workspace
|
||||||
|
.getLeavesOfType("markdown")
|
||||||
|
.map((leaf) => leaf.view)
|
||||||
|
.filter((view) => view instanceof MarkdownView)
|
||||||
|
.forEach((view) => {
|
||||||
|
// @ts-expect-error, not typed
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||||
|
const editor = view.editor.cm as EditorView;
|
||||||
|
|
||||||
|
editor.dispatch({
|
||||||
|
effects: [forceUpdate.of(null)]
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public update(update: ViewUpdate): void {
|
||||||
|
const original = update.startState.doc.toString();
|
||||||
|
const edited = update.state.doc.toString();
|
||||||
|
|
||||||
|
const updatedPositions: number[] = [];
|
||||||
|
const reconciled = reconcileWithHistory(
|
||||||
|
original,
|
||||||
|
{
|
||||||
|
text: original,
|
||||||
|
cursors: RemoteCursorsPluginValue.cursors.flatMap(
|
||||||
|
({ span }, i) => [
|
||||||
|
{ id: i * 2, position: span.start },
|
||||||
|
{ id: i * 2 + 1, position: span.end }
|
||||||
|
]
|
||||||
|
)
|
||||||
|
},
|
||||||
|
edited,
|
||||||
|
"Character"
|
||||||
|
);
|
||||||
|
|
||||||
|
reconciled.cursors.forEach(({ id, position }) => {
|
||||||
|
const whereToJump = findWhereToMoveCursor(
|
||||||
|
position,
|
||||||
|
reconciled.history
|
||||||
|
);
|
||||||
|
if (whereToJump !== null) {
|
||||||
|
updatedPositions[id] = whereToJump;
|
||||||
|
} else {
|
||||||
|
updatedPositions[id] = position;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
RemoteCursorsPluginValue.cursors.forEach(({ span }, i) => {
|
||||||
|
span.start = updatedPositions[i * 2];
|
||||||
|
span.end = updatedPositions[i * 2 + 1];
|
||||||
});
|
});
|
||||||
|
|
||||||
const decorations: Range<Decoration>[] = [];
|
const decorations: Range<Decoration>[] = [];
|
||||||
|
|
||||||
cursors.forEach(({ name, span: { start, end } }) => {
|
RemoteCursorsPluginValue.cursors.forEach(
|
||||||
const color = getRandomColor(name);
|
({ name, span: { start, end } }) => {
|
||||||
const startLine = update.view.state.doc.lineAt(start);
|
const color = getRandomColor(name);
|
||||||
const endLine = update.view.state.doc.lineAt(end);
|
const startLine = update.view.state.doc.lineAt(start);
|
||||||
|
const endLine = update.view.state.doc.lineAt(end);
|
||||||
|
|
||||||
const attributes = {
|
const attributes = {
|
||||||
style: `background-color: ${color};`
|
style: `background-color: ${color};`
|
||||||
};
|
};
|
||||||
|
|
||||||
if (startLine.number === endLine.number) {
|
if (startLine.number === endLine.number) {
|
||||||
// selected content in a single line.
|
// selected content in a single line.
|
||||||
decorations.push({
|
|
||||||
from: start,
|
|
||||||
to: end,
|
|
||||||
value: Decoration.mark({
|
|
||||||
attributes
|
|
||||||
})
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
// selected content in multiple lines
|
|
||||||
// first, render text-selection in the first line
|
|
||||||
decorations.push({
|
|
||||||
from: start,
|
|
||||||
to: startLine.from + startLine.length,
|
|
||||||
value: Decoration.mark({
|
|
||||||
attributes
|
|
||||||
})
|
|
||||||
});
|
|
||||||
|
|
||||||
// render text-selection in the lines between the first and last line
|
|
||||||
for (let i = startLine.number + 1; i < endLine.number; i++) {
|
|
||||||
const currentLine = update.view.state.doc.line(i);
|
|
||||||
decorations.push({
|
decorations.push({
|
||||||
from: currentLine.from,
|
from: start,
|
||||||
to: currentLine.to,
|
to: end,
|
||||||
|
value: Decoration.mark({
|
||||||
|
attributes
|
||||||
|
})
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
// selected content in multiple lines
|
||||||
|
// first, render text-selection in the first line
|
||||||
|
decorations.push({
|
||||||
|
from: start,
|
||||||
|
to: startLine.from + startLine.length,
|
||||||
|
value: Decoration.mark({
|
||||||
|
attributes
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
// render text-selection in the lines between the first and last line
|
||||||
|
for (
|
||||||
|
let i = startLine.number + 1;
|
||||||
|
i < endLine.number;
|
||||||
|
i++
|
||||||
|
) {
|
||||||
|
const currentLine = update.view.state.doc.line(i);
|
||||||
|
decorations.push({
|
||||||
|
from: currentLine.from,
|
||||||
|
to: currentLine.to,
|
||||||
|
value: Decoration.mark({
|
||||||
|
attributes
|
||||||
|
})
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// render text-selection in the last line
|
||||||
|
decorations.push({
|
||||||
|
from: endLine.from,
|
||||||
|
to: end,
|
||||||
value: Decoration.mark({
|
value: Decoration.mark({
|
||||||
attributes
|
attributes
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// render text-selection in the last line
|
|
||||||
decorations.push({
|
decorations.push({
|
||||||
from: endLine.from,
|
from: end,
|
||||||
to: end,
|
to: end,
|
||||||
value: Decoration.mark({
|
value: Decoration.widget({
|
||||||
attributes
|
side: end - start > 0 ? -1 : 1, // the local cursor should be rendered outside the remote selection
|
||||||
|
block: false,
|
||||||
|
widget: new RemoteCursorWidget(color, name)
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
);
|
||||||
decorations.push({
|
|
||||||
from: end,
|
|
||||||
to: end,
|
|
||||||
value: Decoration.widget({
|
|
||||||
side: end - start > 0 ? -1 : 1, // the local cursor should be rendered outside the remote selection
|
|
||||||
block: false,
|
|
||||||
widget: new RemoteCursorWidget(color, name)
|
|
||||||
})
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
this.decorations = Decoration.set(decorations, true);
|
this.decorations = Decoration.set(decorations, true);
|
||||||
}
|
}
|
||||||
|
|
@ -118,43 +215,3 @@ export const remoteCursorsPlugin = ViewPlugin.fromClass(
|
||||||
decorations: (v) => v.decorations
|
decorations: (v) => v.decorations
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
export function setCursors(
|
|
||||||
clients: DocumentWithMaybeOutdatedClientCursors[],
|
|
||||||
app: App
|
|
||||||
): void {
|
|
||||||
cursors = [
|
|
||||||
...cursors.filter(({ deviceId }) =>
|
|
||||||
clients.some(
|
|
||||||
(client) => client.deviceId === deviceId && client.isOutdated
|
|
||||||
)
|
|
||||||
),
|
|
||||||
...clients
|
|
||||||
.filter(({ isOutdated }) => !isOutdated)
|
|
||||||
.flatMap((client) => {
|
|
||||||
const clientCursors = client.documentsWithCursors;
|
|
||||||
return clientCursors.flatMap((cursor) =>
|
|
||||||
cursor.cursors.map((span) => ({
|
|
||||||
name: client.userName,
|
|
||||||
path: cursor.relative_path,
|
|
||||||
deviceId: client.deviceId,
|
|
||||||
span
|
|
||||||
}))
|
|
||||||
);
|
|
||||||
})
|
|
||||||
];
|
|
||||||
|
|
||||||
app.workspace
|
|
||||||
.getLeavesOfType("markdown")
|
|
||||||
.map((leaf) => leaf.view)
|
|
||||||
.filter((view) => view instanceof MarkdownView)
|
|
||||||
.forEach((view) => {
|
|
||||||
// @ts-expect-error, not typed
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
|
||||||
const editor = view.editor.cm as EditorView;
|
|
||||||
|
|
||||||
editor.dispatch({
|
|
||||||
effects: [forceUpdate.of(null)]
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -1,111 +0,0 @@
|
||||||
import { updateSelection } from "./update-selection";
|
|
||||||
|
|
||||||
describe("Selection update", () => {
|
|
||||||
it("should handle span fully before - insert", () => {
|
|
||||||
const spans = [{ start: 3, end: 5 }];
|
|
||||||
updateSelection({
|
|
||||||
fromA: 0,
|
|
||||||
toA: 0,
|
|
||||||
fromB: 0,
|
|
||||||
toB: 2,
|
|
||||||
spans
|
|
||||||
});
|
|
||||||
expect(spans).toEqual([{ start: 5, end: 7 }]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("should handle span fully before - delete", () => {
|
|
||||||
const spans = [{ start: 3, end: 5 }];
|
|
||||||
updateSelection({
|
|
||||||
fromA: 0,
|
|
||||||
toA: 2,
|
|
||||||
fromB: 0,
|
|
||||||
toB: 0,
|
|
||||||
spans
|
|
||||||
});
|
|
||||||
expect(spans).toEqual([{ start: 1, end: 3 }]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("should handle span fully after - insert", () => {
|
|
||||||
const spans = [{ start: 3, end: 5 }];
|
|
||||||
updateSelection({
|
|
||||||
fromA: 6,
|
|
||||||
toA: 6,
|
|
||||||
fromB: 6,
|
|
||||||
toB: 10,
|
|
||||||
spans
|
|
||||||
});
|
|
||||||
expect(spans).toEqual([{ start: 3, end: 5 }]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("should handle span fully after - delete", () => {
|
|
||||||
const spans = [{ start: 3, end: 5 }];
|
|
||||||
updateSelection({
|
|
||||||
fromA: 6,
|
|
||||||
toA: 10,
|
|
||||||
fromB: 6,
|
|
||||||
toB: 6,
|
|
||||||
spans
|
|
||||||
});
|
|
||||||
expect(spans).toEqual([{ start: 3, end: 5 }]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("should handle span fully within - insert", () => {
|
|
||||||
const spans = [{ start: 3, end: 5 }];
|
|
||||||
updateSelection({
|
|
||||||
fromA: 4,
|
|
||||||
toA: 4,
|
|
||||||
fromB: 4,
|
|
||||||
toB: 6,
|
|
||||||
spans
|
|
||||||
});
|
|
||||||
expect(spans).toEqual([{ start: 3, end: 7 }]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("should handle span fully within - delete", () => {
|
|
||||||
const spans = [{ start: 3, end: 5 }];
|
|
||||||
updateSelection({
|
|
||||||
fromA: 4,
|
|
||||||
toA: 5,
|
|
||||||
fromB: 4,
|
|
||||||
toB: 4,
|
|
||||||
spans
|
|
||||||
});
|
|
||||||
expect(spans).toEqual([{ start: 3, end: 4 }]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("should handle span overlapping with start", () => {
|
|
||||||
const spans = [{ start: 3, end: 5 }];
|
|
||||||
updateSelection({
|
|
||||||
fromA: 2,
|
|
||||||
toA: 4,
|
|
||||||
fromB: 2,
|
|
||||||
toB: 2,
|
|
||||||
spans
|
|
||||||
});
|
|
||||||
expect(spans).toEqual([{ start: 2, end: 4 }]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("should handle span overlapping with end", () => {
|
|
||||||
const spans = [{ start: 3, end: 5 }];
|
|
||||||
updateSelection({
|
|
||||||
fromA: 4,
|
|
||||||
toA: 6,
|
|
||||||
fromB: 4,
|
|
||||||
toB: 4,
|
|
||||||
spans
|
|
||||||
});
|
|
||||||
expect(spans).toEqual([{ start: 3, end: 4 }]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("delete entire selection", () => {
|
|
||||||
const spans = [{ start: 3, end: 5 }];
|
|
||||||
updateSelection({
|
|
||||||
fromA: 0,
|
|
||||||
toA: 10,
|
|
||||||
fromB: 0,
|
|
||||||
toB: 0,
|
|
||||||
spans
|
|
||||||
});
|
|
||||||
expect(spans).toEqual([{ start: 0, end: 0 }]);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
@ -1,40 +0,0 @@
|
||||||
import type { CursorSpan } from "sync-client";
|
|
||||||
|
|
||||||
export const updateSelection = ({
|
|
||||||
fromA,
|
|
||||||
toA,
|
|
||||||
toB,
|
|
||||||
spans
|
|
||||||
}: {
|
|
||||||
fromA: number;
|
|
||||||
toA: number;
|
|
||||||
fromB: number;
|
|
||||||
toB: number;
|
|
||||||
spans: CursorSpan[];
|
|
||||||
}): void => {
|
|
||||||
spans.forEach((span) => {
|
|
||||||
if (fromA <= span.start) {
|
|
||||||
// the change covers the entirety of the selection
|
|
||||||
if (toA > span.end) {
|
|
||||||
span.start = toB;
|
|
||||||
span.end = toB;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let change = toB - toA;
|
|
||||||
if (change < 0) {
|
|
||||||
// it's a deletion
|
|
||||||
// if overlaps with the start, we can't move it back more than the deleted range
|
|
||||||
change = Math.max(change, fromA - span.start);
|
|
||||||
}
|
|
||||||
|
|
||||||
span.start += change;
|
|
||||||
span.end += change;
|
|
||||||
} else if (toA <= span.end) {
|
|
||||||
span.end += toB - toA;
|
|
||||||
} else if (toB <= span.end) {
|
|
||||||
// a deletion overlaps with the end, so we move the end
|
|
||||||
span.end = toB;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
import type { Workspace } from "obsidian";
|
import type { Workspace } from "obsidian";
|
||||||
import { FileView, setIcon } from "obsidian";
|
import { FileView, setIcon } from "obsidian";
|
||||||
import type { SyncClient } from "sync-client";
|
import type { SyncClient } from "sync-client";
|
||||||
import { DocumentUpdateStatus } from "sync-client";
|
import { DocumentSyncStatus } from "sync-client";
|
||||||
import "./editor-sync-line.scss";
|
import "./editor-sync-line.scss";
|
||||||
|
|
||||||
export function updateEditorStatusDisplay(
|
export function updateEditorStatusDisplay(
|
||||||
|
|
@ -35,7 +35,7 @@ export function updateEditorStatusDisplay(
|
||||||
|
|
||||||
const isLoading =
|
const isLoading =
|
||||||
client.getDocumentSyncingStatus(filePath) ==
|
client.getDocumentSyncingStatus(filePath) ==
|
||||||
DocumentUpdateStatus.SYNCING;
|
DocumentSyncStatus.SYNCING;
|
||||||
|
|
||||||
if (isLoading) {
|
if (isLoading) {
|
||||||
element.classList.add("loading");
|
element.classList.add("loading");
|
||||||
|
|
|
||||||
1
frontend/package-lock.json
generated
1
frontend/package-lock.json
generated
|
|
@ -6373,6 +6373,7 @@
|
||||||
"jest": "^29.7.0",
|
"jest": "^29.7.0",
|
||||||
"mini-css-extract-plugin": "^2.9.2",
|
"mini-css-extract-plugin": "^2.9.2",
|
||||||
"obsidian": "1.8.7",
|
"obsidian": "1.8.7",
|
||||||
|
"reconcile-text": "^0.5.0",
|
||||||
"resolve-url-loader": "^5.0.0",
|
"resolve-url-loader": "^5.0.0",
|
||||||
"sass": "^1.89.1",
|
"sass": "^1.89.1",
|
||||||
"sass-loader": "^16.0.5",
|
"sass-loader": "^16.0.5",
|
||||||
|
|
|
||||||
|
|
@ -31,16 +31,17 @@ export class SafeFileSystemOperations implements FileSystemOperations {
|
||||||
this.logger.debug(`Reading file '${path}'`);
|
this.logger.debug(`Reading file '${path}'`);
|
||||||
return this.safeOperation(
|
return this.safeOperation(
|
||||||
path,
|
path,
|
||||||
this.decorateToHoldLock(path, async () => this.fs.read(path)),
|
async () =>
|
||||||
|
this.locks.withLock(path, async () => this.fs.read(path)),
|
||||||
"read"
|
"read"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async write(path: RelativePath, content: Uint8Array): Promise<void> {
|
public async write(path: RelativePath, content: Uint8Array): Promise<void> {
|
||||||
this.logger.debug(`Writing to file '${path}'`);
|
this.logger.debug(`Writing to file '${path}'`);
|
||||||
return this.decorateToHoldLock(path, async () =>
|
return this.locks.withLock(path, async () =>
|
||||||
this.fs.write(path, content)
|
this.fs.write(path, content)
|
||||||
)();
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async atomicUpdateText(
|
public async atomicUpdateText(
|
||||||
|
|
@ -50,9 +51,10 @@ export class SafeFileSystemOperations implements FileSystemOperations {
|
||||||
this.logger.debug(`Atomically updating file '${path}'`);
|
this.logger.debug(`Atomically updating file '${path}'`);
|
||||||
return this.safeOperation(
|
return this.safeOperation(
|
||||||
path,
|
path,
|
||||||
this.decorateToHoldLock(path, async () =>
|
async () =>
|
||||||
this.fs.atomicUpdateText(path, updater)
|
this.locks.withLock(path, async () =>
|
||||||
),
|
this.fs.atomicUpdateText(path, updater)
|
||||||
|
),
|
||||||
"atomicUpdateText"
|
"atomicUpdateText"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -61,32 +63,29 @@ export class SafeFileSystemOperations implements FileSystemOperations {
|
||||||
// Logging this would be too noisy
|
// Logging this would be too noisy
|
||||||
return this.safeOperation(
|
return this.safeOperation(
|
||||||
path,
|
path,
|
||||||
this.decorateToHoldLock(path, async () =>
|
async () =>
|
||||||
this.fs.getFileSize(path)
|
this.locks.withLock(path, async () =>
|
||||||
),
|
this.fs.getFileSize(path)
|
||||||
|
),
|
||||||
"getFileSize"
|
"getFileSize"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async exists(path: RelativePath): Promise<boolean> {
|
public async exists(path: RelativePath): Promise<boolean> {
|
||||||
this.logger.debug(`Checking if file '${path}' exists`);
|
this.logger.debug(`Checking if file '${path}' exists`);
|
||||||
return this.decorateToHoldLock(path, async () =>
|
return this.locks.withLock(path, async () => this.fs.exists(path));
|
||||||
this.fs.exists(path)
|
|
||||||
)();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public async createDirectory(path: RelativePath): Promise<void> {
|
public async createDirectory(path: RelativePath): Promise<void> {
|
||||||
this.logger.debug(`Creating directory '${path}'`);
|
this.logger.debug(`Creating directory '${path}'`);
|
||||||
return this.decorateToHoldLock(path, async () =>
|
return this.locks.withLock(path, async () =>
|
||||||
this.fs.createDirectory(path)
|
this.fs.createDirectory(path)
|
||||||
)();
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async delete(path: RelativePath): Promise<void> {
|
public async delete(path: RelativePath): Promise<void> {
|
||||||
this.logger.debug(`Deleting file '${path}'`);
|
this.logger.debug(`Deleting file '${path}'`);
|
||||||
return this.decorateToHoldLock(path, async () =>
|
return this.locks.withLock(path, async () => this.fs.delete(path));
|
||||||
this.fs.delete(path)
|
|
||||||
)();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public async rename(
|
public async rename(
|
||||||
|
|
@ -96,43 +95,14 @@ export class SafeFileSystemOperations implements FileSystemOperations {
|
||||||
this.logger.debug(`Renaming file '${oldPath}' to '${newPath}'`);
|
this.logger.debug(`Renaming file '${oldPath}' to '${newPath}'`);
|
||||||
return this.safeOperation(
|
return this.safeOperation(
|
||||||
oldPath,
|
oldPath,
|
||||||
this.decorateToHoldLock([oldPath, newPath], async () =>
|
async () =>
|
||||||
this.fs.rename(oldPath, newPath)
|
this.locks.withLock([oldPath, newPath], async () =>
|
||||||
),
|
this.fs.rename(oldPath, newPath)
|
||||||
|
),
|
||||||
"rename"
|
"rename"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Decorate an operation to ensure that the file is locked before running it
|
|
||||||
* and that the lock is released afterwards. This results in at-most one
|
|
||||||
* concurrent operation running per file.
|
|
||||||
*/
|
|
||||||
private decorateToHoldLock<T>(
|
|
||||||
pathOrPaths: RelativePath | RelativePath[],
|
|
||||||
operation: () => Promise<T>
|
|
||||||
): () => Promise<T> {
|
|
||||||
return async () => {
|
|
||||||
const paths = Array.isArray(pathOrPaths)
|
|
||||||
? pathOrPaths
|
|
||||||
: [pathOrPaths];
|
|
||||||
|
|
||||||
await Promise.all(
|
|
||||||
paths.map(async (path) => this.locks.waitForLock(path))
|
|
||||||
);
|
|
||||||
|
|
||||||
try {
|
|
||||||
return await operation();
|
|
||||||
} finally {
|
|
||||||
await Promise.all(
|
|
||||||
paths.map((path) => {
|
|
||||||
this.locks.unlock(path);
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Decorate an operation to ensure that the file exists before running it.
|
* Decorate an operation to ensure that the file exists before running it.
|
||||||
* If the operation fails, it will check if the file still exists and throw
|
* If the operation fails, it will check if the file still exists and throw
|
||||||
|
|
|
||||||
|
|
@ -18,8 +18,8 @@ export type { PersistenceProvider } from "./persistence/persistence";
|
||||||
export type { CursorSpan } from "./services/types/CursorSpan";
|
export type { CursorSpan } from "./services/types/CursorSpan";
|
||||||
export type { ClientCursors } from "./services/types/ClientCursors";
|
export type { ClientCursors } from "./services/types/ClientCursors";
|
||||||
export type { NetworkConnectionStatus } from "./types/network-connection-status";
|
export type { NetworkConnectionStatus } from "./types/network-connection-status";
|
||||||
export type { DocumentWithMaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors";
|
export type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors";
|
||||||
export { DocumentUpdateStatus } from "./types/document-update-status";
|
export { DocumentSyncStatus } from "./types/document-sync-status";
|
||||||
export { SyncClient } from "./sync-client";
|
export { SyncClient } from "./sync-client";
|
||||||
|
|
||||||
import { Locks } from "./utils/locks";
|
import { Locks } from "./utils/locks";
|
||||||
|
|
|
||||||
|
|
@ -37,7 +37,7 @@ export interface DocumentRecord {
|
||||||
documentId: DocumentId;
|
documentId: DocumentId;
|
||||||
metadata: DocumentMetadata | undefined;
|
metadata: DocumentMetadata | undefined;
|
||||||
isDeleted: boolean;
|
isDeleted: boolean;
|
||||||
updates: Promise<void>[];
|
updates: Promise<unknown>[];
|
||||||
parallelVersion: number;
|
parallelVersion: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -135,7 +135,7 @@ export class Database {
|
||||||
this.save();
|
this.save();
|
||||||
}
|
}
|
||||||
|
|
||||||
public removeDocumentPromise(promise: Promise<void>): void {
|
public removeDocumentPromise(promise: Promise<unknown>): void {
|
||||||
const entry = this.documents.find(({ updates }) =>
|
const entry = this.documents.find(({ updates }) =>
|
||||||
updates.includes(promise)
|
updates.includes(promise)
|
||||||
);
|
);
|
||||||
|
|
@ -167,7 +167,7 @@ export class Database {
|
||||||
|
|
||||||
public async getResolvedDocumentByRelativePath(
|
public async getResolvedDocumentByRelativePath(
|
||||||
relativePath: RelativePath,
|
relativePath: RelativePath,
|
||||||
promise: Promise<void>
|
promise: Promise<unknown>
|
||||||
): Promise<DocumentRecord> {
|
): Promise<DocumentRecord> {
|
||||||
const entry = this.getLatestDocumentByRelativePath(relativePath);
|
const entry = this.getLatestDocumentByRelativePath(relativePath);
|
||||||
|
|
||||||
|
|
@ -191,7 +191,7 @@ export class Database {
|
||||||
public createNewPendingDocument(
|
public createNewPendingDocument(
|
||||||
documentId: DocumentId,
|
documentId: DocumentId,
|
||||||
relativePath: RelativePath,
|
relativePath: RelativePath,
|
||||||
promise: Promise<void>
|
promise: Promise<unknown>
|
||||||
): DocumentRecord {
|
): DocumentRecord {
|
||||||
const previousEntry =
|
const previousEntry =
|
||||||
this.getLatestDocumentByRelativePath(relativePath);
|
this.getLatestDocumentByRelativePath(relativePath);
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,7 @@
|
||||||
import type { CursorSpan } from "./CursorSpan";
|
import type { CursorSpan } from "./CursorSpan";
|
||||||
|
|
||||||
export interface DocumentWithCursors {
|
export interface DocumentWithCursors {
|
||||||
vault_update_id: number;
|
vault_update_id: number | null;
|
||||||
document_id: string;
|
document_id: string;
|
||||||
relative_path: string;
|
relative_path: string;
|
||||||
cursors: CursorSpan[];
|
cursors: CursorSpan[];
|
||||||
|
|
|
||||||
|
|
@ -152,7 +152,7 @@ export class WebSocketManager {
|
||||||
}
|
}
|
||||||
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
|
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
|
||||||
} else if (message.type === "cursorPositions") {
|
} else if (message.type === "cursorPositions") {
|
||||||
this.logger.info(
|
this.logger.debug(
|
||||||
`Received cursor positions for ${JSON.stringify(message.clients)}`
|
`Received cursor positions for ${JSON.stringify(message.clients)}`
|
||||||
);
|
);
|
||||||
this.remoteCursorsUpdateListeners.forEach((listener) => {
|
this.remoteCursorsUpdateListeners.forEach((listener) => {
|
||||||
|
|
|
||||||
|
|
@ -14,26 +14,16 @@ import { ConnectionStatus } from "./services/connection-status";
|
||||||
import { UnrestrictedSyncer } from "./sync-operations/unrestricted-syncer";
|
import { UnrestrictedSyncer } from "./sync-operations/unrestricted-syncer";
|
||||||
import { rateLimit } from "./utils/rate-limit";
|
import { rateLimit } from "./utils/rate-limit";
|
||||||
import type { NetworkConnectionStatus } from "./types/network-connection-status";
|
import type { NetworkConnectionStatus } from "./types/network-connection-status";
|
||||||
import { DocumentUpdateStatus } from "./types/document-update-status";
|
import { DocumentSyncStatus } from "./types/document-sync-status";
|
||||||
import { WebSocketManager } from "./services/websocket-manager";
|
import { WebSocketManager } from "./services/websocket-manager";
|
||||||
import { createClientId } from "./utils/create-client-id";
|
import { createClientId } from "./utils/create-client-id";
|
||||||
|
import { CursorTracker } from "./sync-operations/cursor-tracker";
|
||||||
import type { CursorSpan } from "./services/types/CursorSpan";
|
import type { CursorSpan } from "./services/types/CursorSpan";
|
||||||
import type { ClientCursors } from "./services/types/ClientCursors";
|
import type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors";
|
||||||
import type { DocumentWithCursors } from "./services/types/DocumentWithCursors";
|
import { FileChangeNotifier } from "./sync-operations/file-change-notifier";
|
||||||
import { hash } from "./utils/hash";
|
|
||||||
import type { DocumentWithMaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors";
|
|
||||||
|
|
||||||
enum DocumentUpToDateness {
|
|
||||||
UpToDate = "UpToDate",
|
|
||||||
Prior = "Prior",
|
|
||||||
Later = "Later"
|
|
||||||
}
|
|
||||||
|
|
||||||
export class SyncClient {
|
export class SyncClient {
|
||||||
private static readonly MINIMUM_SAVE_INTERVAL_MS = 1000;
|
private static readonly MINIMUM_SAVE_INTERVAL_MS = 1000;
|
||||||
private lastCursorState: DocumentWithCursors[] = [];
|
|
||||||
|
|
||||||
private readonly knownClientCursors: ClientCursors[] = [];
|
|
||||||
|
|
||||||
// eslint-disable-next-line @typescript-eslint/max-params
|
// eslint-disable-next-line @typescript-eslint/max-params
|
||||||
private constructor(
|
private constructor(
|
||||||
|
|
@ -45,7 +35,8 @@ export class SyncClient {
|
||||||
private readonly webSocketManager: WebSocketManager,
|
private readonly webSocketManager: WebSocketManager,
|
||||||
private readonly _logger: Logger,
|
private readonly _logger: Logger,
|
||||||
private readonly connectionStatus: ConnectionStatus,
|
private readonly connectionStatus: ConnectionStatus,
|
||||||
private readonly fileOperations: FileOperations
|
private readonly cursorTracker: CursorTracker,
|
||||||
|
private readonly fileChangeNotifier: FileChangeNotifier
|
||||||
) {
|
) {
|
||||||
this.settings.addOnSettingsChangeListener(
|
this.settings.addOnSettingsChangeListener(
|
||||||
async (newSettings, oldSettings) => {
|
async (newSettings, oldSettings) => {
|
||||||
|
|
@ -54,10 +45,6 @@ export class SyncClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
this.webSocketManager.addRemoteCursorsUpdateListener((cursors) => {
|
|
||||||
this.knownClientCursors.push(...cursors);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public get logger(): Logger {
|
public get logger(): Logger {
|
||||||
|
|
@ -148,7 +135,6 @@ export class SyncClient {
|
||||||
);
|
);
|
||||||
|
|
||||||
const syncer = new Syncer(
|
const syncer = new Syncer(
|
||||||
deviceId,
|
|
||||||
logger,
|
logger,
|
||||||
database,
|
database,
|
||||||
settings,
|
settings,
|
||||||
|
|
@ -166,6 +152,13 @@ export class SyncClient {
|
||||||
webSocket
|
webSocket
|
||||||
);
|
);
|
||||||
|
|
||||||
|
const fileChangeNotifier = new FileChangeNotifier();
|
||||||
|
const cursorTracker = new CursorTracker(
|
||||||
|
database,
|
||||||
|
webSocketManager,
|
||||||
|
fileOperations,
|
||||||
|
fileChangeNotifier
|
||||||
|
);
|
||||||
const client = new SyncClient(
|
const client = new SyncClient(
|
||||||
history,
|
history,
|
||||||
settings,
|
settings,
|
||||||
|
|
@ -175,7 +168,8 @@ export class SyncClient {
|
||||||
webSocketManager,
|
webSocketManager,
|
||||||
logger,
|
logger,
|
||||||
connectionStatus,
|
connectionStatus,
|
||||||
fileOperations
|
cursorTracker,
|
||||||
|
fileChangeNotifier
|
||||||
);
|
);
|
||||||
|
|
||||||
logger.info("SyncClient initialised");
|
logger.info("SyncClient initialised");
|
||||||
|
|
@ -264,12 +258,14 @@ export class SyncClient {
|
||||||
public async syncLocallyCreatedFile(
|
public async syncLocallyCreatedFile(
|
||||||
relativePath: RelativePath
|
relativePath: RelativePath
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
|
this.fileChangeNotifier.notifyOfFileChange(relativePath);
|
||||||
return this.syncer.syncLocallyCreatedFile(relativePath);
|
return this.syncer.syncLocallyCreatedFile(relativePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async syncLocallyDeletedFile(
|
public async syncLocallyDeletedFile(
|
||||||
relativePath: RelativePath
|
relativePath: RelativePath
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
|
this.fileChangeNotifier.notifyOfFileChange(relativePath);
|
||||||
return this.syncer.syncLocallyDeletedFile(relativePath);
|
return this.syncer.syncLocallyDeletedFile(relativePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -280,6 +276,7 @@ export class SyncClient {
|
||||||
oldPath?: RelativePath;
|
oldPath?: RelativePath;
|
||||||
relativePath: RelativePath;
|
relativePath: RelativePath;
|
||||||
}): Promise<void> {
|
}): Promise<void> {
|
||||||
|
this.fileChangeNotifier.notifyOfFileChange(relativePath);
|
||||||
return this.syncer.syncLocallyUpdatedFile({
|
return this.syncer.syncLocallyUpdatedFile({
|
||||||
oldPath,
|
oldPath,
|
||||||
relativePath
|
relativePath
|
||||||
|
|
@ -288,154 +285,26 @@ export class SyncClient {
|
||||||
|
|
||||||
public getDocumentSyncingStatus(
|
public getDocumentSyncingStatus(
|
||||||
relativePath: RelativePath
|
relativePath: RelativePath
|
||||||
): DocumentUpdateStatus {
|
): DocumentSyncStatus {
|
||||||
const document =
|
const document =
|
||||||
this.database.getLatestDocumentByRelativePath(relativePath);
|
this.database.getLatestDocumentByRelativePath(relativePath);
|
||||||
if (document === undefined) {
|
if (document === undefined) {
|
||||||
return DocumentUpdateStatus.SYNCING;
|
return DocumentSyncStatus.SYNCING;
|
||||||
}
|
}
|
||||||
return document.updates.length > 0
|
return document.updates.length > 0
|
||||||
? DocumentUpdateStatus.SYNCING
|
? DocumentSyncStatus.SYNCING
|
||||||
: DocumentUpdateStatus.UP_TO_DATE;
|
: DocumentSyncStatus.UP_TO_DATE;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Update the local cursors for the given documents.
|
|
||||||
/// Can be called frequently as it only emits an event
|
|
||||||
// if the state has actually changed.
|
|
||||||
public async updateLocalCursors(
|
public async updateLocalCursors(
|
||||||
documentToCursors: Record<RelativePath, CursorSpan[]>
|
documentToCursors: Record<RelativePath, CursorSpan[]>
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const documentsWithCursors: DocumentWithCursors[] = [];
|
await this.cursorTracker.sendLocalCursorsToServer(documentToCursors);
|
||||||
|
|
||||||
for (const [relativePath, cursors] of Object.entries(
|
|
||||||
documentToCursors
|
|
||||||
)) {
|
|
||||||
const record =
|
|
||||||
this.database.getLatestDocumentByRelativePath(relativePath);
|
|
||||||
|
|
||||||
if (!record) {
|
|
||||||
continue; // Let's wait for the file to be created before sending cursors
|
|
||||||
}
|
|
||||||
|
|
||||||
const readContent = await this.fileOperations.read(relativePath);
|
|
||||||
|
|
||||||
if (record.metadata?.hash !== hash(readContent)) {
|
|
||||||
continue; // Wouldn't make sense to sync the positions in a dirty file
|
|
||||||
}
|
|
||||||
|
|
||||||
documentsWithCursors.push({
|
|
||||||
relative_path: relativePath,
|
|
||||||
document_id: record.documentId,
|
|
||||||
vault_update_id: record.metadata.parentVersionId,
|
|
||||||
cursors
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
if (
|
|
||||||
JSON.stringify(this.lastCursorState) ===
|
|
||||||
JSON.stringify(documentsWithCursors)
|
|
||||||
) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.lastCursorState = documentsWithCursors;
|
|
||||||
|
|
||||||
this.webSocketManager.updateLocalCursors({ documentsWithCursors });
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public addRemoteCursorsUpdateListener(
|
public addRemoteCursorsUpdateListener(
|
||||||
listener: (cursors: DocumentWithMaybeOutdatedClientCursors[]) => unknown
|
listener: (cursors: MaybeOutdatedClientCursors[]) => unknown
|
||||||
): void {
|
): void {
|
||||||
this.webSocketManager.addRemoteCursorsUpdateListener(async () => {
|
this.cursorTracker.addRemoteCursorsUpdateListener(listener);
|
||||||
listener(await this.getRelevantClientCursors());
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private async getRelevantClientCursors(): Promise<
|
|
||||||
DocumentWithMaybeOutdatedClientCursors[]
|
|
||||||
> {
|
|
||||||
const result: DocumentWithMaybeOutdatedClientCursors[] = [];
|
|
||||||
const included = new Set<string>();
|
|
||||||
for (const clientCursors of [...this.knownClientCursors].reverse()) {
|
|
||||||
if (included.has(clientCursors.deviceId)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
const upToDateness =
|
|
||||||
await this.getDocumentsUpToDateness(clientCursors);
|
|
||||||
if (upToDateness == DocumentUpToDateness.Later) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
result.push({
|
|
||||||
...clientCursors,
|
|
||||||
isOutdated: upToDateness == DocumentUpToDateness.Prior
|
|
||||||
});
|
|
||||||
|
|
||||||
included.add(clientCursors.deviceId);
|
|
||||||
}
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
private async getDocumentsUpToDateness(
|
|
||||||
clientCursor: ClientCursors
|
|
||||||
): Promise<DocumentUpToDateness> {
|
|
||||||
const results = [];
|
|
||||||
for (const document of clientCursor.documentsWithCursors) {
|
|
||||||
results.push(await this.getDocumentUpToDateness(document));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (
|
|
||||||
results.every((result) => result === DocumentUpToDateness.UpToDate)
|
|
||||||
) {
|
|
||||||
return DocumentUpToDateness.UpToDate;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (
|
|
||||||
results.every(
|
|
||||||
(result) =>
|
|
||||||
result === DocumentUpToDateness.UpToDate ||
|
|
||||||
result === DocumentUpToDateness.Prior
|
|
||||||
)
|
|
||||||
) {
|
|
||||||
return DocumentUpToDateness.Prior;
|
|
||||||
}
|
|
||||||
|
|
||||||
return DocumentUpToDateness.Later;
|
|
||||||
}
|
|
||||||
|
|
||||||
private async getDocumentUpToDateness(
|
|
||||||
document: DocumentWithCursors
|
|
||||||
): Promise<DocumentUpToDateness> {
|
|
||||||
const record = this.database.getLatestDocumentByRelativePath(
|
|
||||||
document.relative_path
|
|
||||||
);
|
|
||||||
|
|
||||||
if (!record) {
|
|
||||||
// the document of the cursor must be from the future
|
|
||||||
return DocumentUpToDateness.Later;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (
|
|
||||||
(record.metadata?.parentVersionId ?? 0) < document.vault_update_id
|
|
||||||
) {
|
|
||||||
return DocumentUpToDateness.Later;
|
|
||||||
} else if (
|
|
||||||
document.vault_update_id < (record.metadata?.parentVersionId ?? 0)
|
|
||||||
) {
|
|
||||||
// the document of the cursor must be from the past
|
|
||||||
return DocumentUpToDateness.Prior;
|
|
||||||
}
|
|
||||||
|
|
||||||
const currentContent = await this.fileOperations.read(
|
|
||||||
document.relative_path
|
|
||||||
);
|
|
||||||
|
|
||||||
return this.database.getLatestDocumentByRelativePath(
|
|
||||||
document.relative_path
|
|
||||||
)?.metadata?.hash === hash(currentContent)
|
|
||||||
? DocumentUpToDateness.UpToDate
|
|
||||||
: DocumentUpToDateness.Prior;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
253
frontend/sync-client/src/sync-operations/cursor-tracker.ts
Normal file
253
frontend/sync-client/src/sync-operations/cursor-tracker.ts
Normal file
|
|
@ -0,0 +1,253 @@
|
||||||
|
import type { FileOperations } from "../file-operations/file-operations";
|
||||||
|
import type { Database, RelativePath } from "../persistence/database";
|
||||||
|
import type { ClientCursors } from "../services/types/ClientCursors";
|
||||||
|
import type { CursorSpan } from "../services/types/CursorSpan";
|
||||||
|
import type { DocumentWithCursors } from "../services/types/DocumentWithCursors";
|
||||||
|
import type { WebSocketManager } from "../services/websocket-manager";
|
||||||
|
import type { MaybeOutdatedClientCursors } from "../types/maybe-outdated-client-cursors";
|
||||||
|
import { DocumentUpToDateness } from "../types/document-up-to-dateness";
|
||||||
|
import { hash } from "../utils/hash";
|
||||||
|
import type { FileChangeNotifier } from "./file-change-notifier";
|
||||||
|
import { Lock } from "../utils/locks";
|
||||||
|
|
||||||
|
// Cursor positions are updated separately from documents. However, a given cursor position is only
|
||||||
|
// valid within a certain version of the document it belongs to. This class tracks previous and the latest
|
||||||
|
// known remote cursor positions, and for each document, tries to return the latest cursor positions that are
|
||||||
|
// not from the future.
|
||||||
|
export class CursorTracker {
|
||||||
|
private readonly updateLock = new Lock();
|
||||||
|
|
||||||
|
private knownRemoteCursors: (ClientCursors & {
|
||||||
|
upToDateness: DocumentUpToDateness;
|
||||||
|
})[] = [];
|
||||||
|
|
||||||
|
private lastLocalCursorState: DocumentWithCursors[] = [];
|
||||||
|
private lastLocalCursorStateWithoutDirtyDocuments: DocumentWithCursors[] =
|
||||||
|
[];
|
||||||
|
|
||||||
|
public constructor(
|
||||||
|
private readonly database: Database,
|
||||||
|
private readonly webSocketManager: WebSocketManager,
|
||||||
|
private readonly fileOperations: FileOperations,
|
||||||
|
private readonly fileChangeNotifier: FileChangeNotifier
|
||||||
|
) {
|
||||||
|
this.webSocketManager.addRemoteCursorsUpdateListener(
|
||||||
|
async (clientCursors) => {
|
||||||
|
await this.updateLock.withLock(async () => {
|
||||||
|
// The latest message will contain all active clients, so we can delete the ones
|
||||||
|
// from the local list which are no longer active.
|
||||||
|
const allIds = new Set(
|
||||||
|
clientCursors.map((c) => c.deviceId)
|
||||||
|
);
|
||||||
|
const updatedKnownRemoteCursors =
|
||||||
|
this.knownRemoteCursors.filter((c) =>
|
||||||
|
allIds.has(c.deviceId)
|
||||||
|
);
|
||||||
|
|
||||||
|
for (const cursor of clientCursors.filter((client) =>
|
||||||
|
client.documentsWithCursors.every(
|
||||||
|
(doc) => doc.vault_update_id != null
|
||||||
|
)
|
||||||
|
)) {
|
||||||
|
updatedKnownRemoteCursors.push({
|
||||||
|
...cursor,
|
||||||
|
upToDateness:
|
||||||
|
await this.getDocumentsUpToDateness(cursor)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
this.knownRemoteCursors = updatedKnownRemoteCursors;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
this.fileChangeNotifier.addFileChangeListener(async (relativePath) =>
|
||||||
|
this.updateLock.withLock(async () => {
|
||||||
|
for (const clientCursor of this.knownRemoteCursors) {
|
||||||
|
if (
|
||||||
|
clientCursor.documentsWithCursors.some(
|
||||||
|
(document) =>
|
||||||
|
document.relative_path === relativePath
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
clientCursor.upToDateness =
|
||||||
|
await this.getDocumentsUpToDateness(clientCursor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Update the local cursors for the given documents.
|
||||||
|
/// Can be called frequently as it only emits an event
|
||||||
|
/// if the state has actually changed.
|
||||||
|
public async sendLocalCursorsToServer(
|
||||||
|
documentToCursors: Record<RelativePath, CursorSpan[]>
|
||||||
|
): Promise<void> {
|
||||||
|
const documentsWithCursors: DocumentWithCursors[] = [];
|
||||||
|
|
||||||
|
for (const [relativePath, cursors] of Object.entries(
|
||||||
|
documentToCursors
|
||||||
|
)) {
|
||||||
|
const record =
|
||||||
|
this.database.getLatestDocumentByRelativePath(relativePath);
|
||||||
|
|
||||||
|
if (!record) {
|
||||||
|
continue; // Let's wait for the file to be created before sending cursors
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!record.metadata) {
|
||||||
|
continue; // this is a new document, no need to sync the cursors
|
||||||
|
}
|
||||||
|
|
||||||
|
documentsWithCursors.push({
|
||||||
|
relative_path: relativePath,
|
||||||
|
document_id: record.documentId,
|
||||||
|
vault_update_id: record.metadata.parentVersionId,
|
||||||
|
cursors: cursors.map(({ start, end }) => ({
|
||||||
|
start: Math.min(start, end),
|
||||||
|
end: Math.max(start, end)
|
||||||
|
})) // the client might send directional selections
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
JSON.stringify(this.lastLocalCursorState) ===
|
||||||
|
JSON.stringify(documentsWithCursors)
|
||||||
|
) {
|
||||||
|
// Caching step to avoid reading the edited files all the time
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.lastLocalCursorState = documentsWithCursors;
|
||||||
|
|
||||||
|
for (const doc of documentsWithCursors) {
|
||||||
|
const readContent = await this.fileOperations.read(
|
||||||
|
doc.relative_path
|
||||||
|
);
|
||||||
|
const record = this.database.getLatestDocumentByRelativePath(
|
||||||
|
doc.relative_path
|
||||||
|
);
|
||||||
|
if (record?.metadata?.hash !== hash(readContent)) {
|
||||||
|
doc.vault_update_id = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
JSON.stringify(this.lastLocalCursorStateWithoutDirtyDocuments) ===
|
||||||
|
JSON.stringify(documentsWithCursors)
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.lastLocalCursorStateWithoutDirtyDocuments = documentsWithCursors;
|
||||||
|
|
||||||
|
this.webSocketManager.updateLocalCursors({ documentsWithCursors });
|
||||||
|
}
|
||||||
|
|
||||||
|
// The returned position may be accurate, if it matches the document version, or outdated, in which case
|
||||||
|
// the client has to heuristically guess it's current position based on the local edits.
|
||||||
|
public addRemoteCursorsUpdateListener(
|
||||||
|
listener: (cursors: MaybeOutdatedClientCursors[]) => unknown
|
||||||
|
): void {
|
||||||
|
// CursorTracker registers its own event listener in the constructor so it must have been called before this
|
||||||
|
this.webSocketManager.addRemoteCursorsUpdateListener(async () => {
|
||||||
|
await this.updateLock.withLock(() =>
|
||||||
|
listener(this.getRelevantAndPruneKnownClientCursors())
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private getRelevantAndPruneKnownClientCursors(): MaybeOutdatedClientCursors[] {
|
||||||
|
const result: MaybeOutdatedClientCursors[] = [];
|
||||||
|
const included = new Set<string>();
|
||||||
|
|
||||||
|
const relevantCursors = [];
|
||||||
|
for (const clientCursors of [...this.knownRemoteCursors].reverse()) {
|
||||||
|
if (included.has(clientCursors.deviceId)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (clientCursors.upToDateness == DocumentUpToDateness.Later) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
result.push({
|
||||||
|
...clientCursors,
|
||||||
|
isOutdated:
|
||||||
|
clientCursors.upToDateness == DocumentUpToDateness.Prior
|
||||||
|
});
|
||||||
|
|
||||||
|
included.add(clientCursors.deviceId);
|
||||||
|
relevantCursors.unshift(clientCursors); // to reverse order back to normal
|
||||||
|
}
|
||||||
|
|
||||||
|
this.knownRemoteCursors = relevantCursors;
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We store up-to-dateness on a per-client basis to simplify the implementation.
|
||||||
|
// An individual client won't have too many documents open at once, so this is a reasonable trade-off.
|
||||||
|
private async getDocumentsUpToDateness(
|
||||||
|
clientCursor: ClientCursors
|
||||||
|
): Promise<DocumentUpToDateness> {
|
||||||
|
const results = [];
|
||||||
|
for (const document of clientCursor.documentsWithCursors) {
|
||||||
|
results.push(await this.getDocumentUpToDateness(document));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
results.every((result) => result === DocumentUpToDateness.UpToDate)
|
||||||
|
) {
|
||||||
|
return DocumentUpToDateness.UpToDate;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
results.every(
|
||||||
|
(result) =>
|
||||||
|
result === DocumentUpToDateness.UpToDate ||
|
||||||
|
result === DocumentUpToDateness.Prior
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
return DocumentUpToDateness.Prior;
|
||||||
|
}
|
||||||
|
|
||||||
|
return DocumentUpToDateness.Later;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async getDocumentUpToDateness(
|
||||||
|
document: DocumentWithCursors
|
||||||
|
): Promise<DocumentUpToDateness> {
|
||||||
|
const record = this.database.getLatestDocumentByRelativePath(
|
||||||
|
document.relative_path
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!record) {
|
||||||
|
// the document of the cursor must be from the future
|
||||||
|
return DocumentUpToDateness.Later;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
(record.metadata?.parentVersionId ?? 0) <
|
||||||
|
(document.vault_update_id ?? 0)
|
||||||
|
) {
|
||||||
|
return DocumentUpToDateness.Later;
|
||||||
|
} else if (
|
||||||
|
(document.vault_update_id ?? 0) <
|
||||||
|
(record.metadata?.parentVersionId ?? 0)
|
||||||
|
) {
|
||||||
|
// the document of the cursor must be from the past
|
||||||
|
return DocumentUpToDateness.Prior;
|
||||||
|
}
|
||||||
|
|
||||||
|
const currentContent = await this.fileOperations.read(
|
||||||
|
document.relative_path
|
||||||
|
);
|
||||||
|
|
||||||
|
return this.database.getLatestDocumentByRelativePath(
|
||||||
|
document.relative_path
|
||||||
|
)?.metadata?.hash === hash(currentContent)
|
||||||
|
? DocumentUpToDateness.UpToDate
|
||||||
|
: DocumentUpToDateness.Prior;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,15 @@
|
||||||
|
import type { RelativePath } from "../persistence/database";
|
||||||
|
|
||||||
|
export class FileChangeNotifier {
|
||||||
|
private readonly listeners: ((filePath: RelativePath) => unknown)[] = [];
|
||||||
|
|
||||||
|
public addFileChangeListener(
|
||||||
|
listener: (filePath: RelativePath) => unknown
|
||||||
|
): void {
|
||||||
|
this.listeners.push(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
public notifyOfFileChange(filePath: RelativePath): void {
|
||||||
|
this.listeners.forEach((listener) => listener(filePath));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -9,7 +9,7 @@ import type { Logger } from "../tracing/logger";
|
||||||
import PQueue from "p-queue";
|
import PQueue from "p-queue";
|
||||||
import { hash } from "../utils/hash";
|
import { hash } from "../utils/hash";
|
||||||
import { v4 as uuidv4 } from "uuid";
|
import { v4 as uuidv4 } from "uuid";
|
||||||
import type { Settings, SyncSettings } from "../persistence/settings";
|
import type { Settings } from "../persistence/settings";
|
||||||
import type { FileOperations } from "../file-operations/file-operations";
|
import type { FileOperations } from "../file-operations/file-operations";
|
||||||
import { findMatchingFile } from "../utils/find-matching-file";
|
import { findMatchingFile } from "../utils/find-matching-file";
|
||||||
import type { UnrestrictedSyncer } from "./unrestricted-syncer";
|
import type { UnrestrictedSyncer } from "./unrestricted-syncer";
|
||||||
|
|
@ -27,12 +27,10 @@ export class Syncer {
|
||||||
|
|
||||||
private runningScheduleSyncForOfflineChanges: Promise<void> | undefined;
|
private runningScheduleSyncForOfflineChanges: Promise<void> | undefined;
|
||||||
|
|
||||||
// eslint-disable-next-line @typescript-eslint/max-params
|
|
||||||
public constructor(
|
public constructor(
|
||||||
private readonly deviceId: string,
|
|
||||||
private readonly logger: Logger,
|
private readonly logger: Logger,
|
||||||
private readonly database: Database,
|
private readonly database: Database,
|
||||||
private readonly settings: Settings,
|
settings: Settings,
|
||||||
private readonly syncService: SyncService,
|
private readonly syncService: SyncService,
|
||||||
private readonly operations: FileOperations,
|
private readonly operations: FileOperations,
|
||||||
private readonly internalSyncer: UnrestrictedSyncer
|
private readonly internalSyncer: UnrestrictedSyncer
|
||||||
|
|
@ -261,58 +259,77 @@ export class Syncer {
|
||||||
remoteVersion.documentId
|
remoteVersion.documentId
|
||||||
);
|
);
|
||||||
|
|
||||||
let hasLockToRelease = false;
|
|
||||||
if (document === undefined) {
|
if (document === undefined) {
|
||||||
// Let's avoid the same documents getting created in parallel multiple times.
|
// Let's avoid the same documents getting created in parallel multiple times.
|
||||||
// There might be multiple tasks waiting for the lock
|
// There might be multiple tasks waiting for the lock
|
||||||
await this.remoteDocumentsLock.waitForLock(
|
return this.remoteDocumentsLock.withLock(
|
||||||
remoteVersion.documentId
|
remoteVersion.documentId,
|
||||||
);
|
async () => {
|
||||||
hasLockToRelease = true;
|
document = this.database.getDocumentByDocumentId(
|
||||||
document = this.database.getDocumentByDocumentId(
|
remoteVersion.documentId
|
||||||
remoteVersion.documentId
|
);
|
||||||
|
|
||||||
|
// We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile`
|
||||||
|
if (document === undefined) {
|
||||||
|
await this.syncQueue.add(async () =>
|
||||||
|
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
|
||||||
|
remoteVersion
|
||||||
|
)
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
const [promise, resolve, reject] = createPromise();
|
||||||
|
|
||||||
|
document =
|
||||||
|
await this.database.getResolvedDocumentByRelativePath(
|
||||||
|
document.relativePath,
|
||||||
|
promise
|
||||||
|
);
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this.syncQueue.add(async () =>
|
||||||
|
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
|
||||||
|
remoteVersion,
|
||||||
|
document
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
resolve();
|
||||||
|
} catch (e) {
|
||||||
|
reject(e);
|
||||||
|
} finally {
|
||||||
|
this.database.removeDocumentPromise(promise);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.database.addSeenUpdateId(remoteVersion.vaultUpdateId);
|
||||||
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile`
|
||||||
|
const [promise, resolve, reject] = createPromise();
|
||||||
|
|
||||||
|
document = await this.database.getResolvedDocumentByRelativePath(
|
||||||
|
document.relativePath,
|
||||||
|
promise
|
||||||
|
);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile`
|
await this.syncQueue.add(async () =>
|
||||||
if (document === undefined) {
|
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
|
||||||
await this.syncQueue.add(async () =>
|
remoteVersion,
|
||||||
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
|
document
|
||||||
remoteVersion
|
)
|
||||||
)
|
);
|
||||||
);
|
|
||||||
} else {
|
|
||||||
const [promise, resolve, reject] = createPromise();
|
|
||||||
|
|
||||||
document =
|
resolve();
|
||||||
await this.database.getResolvedDocumentByRelativePath(
|
} catch (e) {
|
||||||
document.relativePath,
|
reject(e);
|
||||||
promise
|
|
||||||
);
|
|
||||||
|
|
||||||
try {
|
|
||||||
await this.syncQueue.add(async () =>
|
|
||||||
this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile(
|
|
||||||
remoteVersion,
|
|
||||||
document
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
resolve();
|
|
||||||
} catch (e) {
|
|
||||||
reject(e);
|
|
||||||
} finally {
|
|
||||||
this.database.removeDocumentPromise(promise);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
this.database.addSeenUpdateId(remoteVersion.vaultUpdateId);
|
|
||||||
} finally {
|
} finally {
|
||||||
if (hasLockToRelease) {
|
this.database.removeDocumentPromise(promise);
|
||||||
this.remoteDocumentsLock.unlock(remoteVersion.documentId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.database.addSeenUpdateId(remoteVersion.vaultUpdateId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async internalScheduleSyncForOfflineChanges(): Promise<void> {
|
private async internalScheduleSyncForOfflineChanges(): Promise<void> {
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
export enum DocumentUpdateStatus {
|
export enum DocumentSyncStatus {
|
||||||
UP_TO_DATE = "UP_TO_DATE",
|
UP_TO_DATE = "UP_TO_DATE",
|
||||||
SYNCING = "SYNCING"
|
SYNCING = "SYNCING"
|
||||||
}
|
}
|
||||||
|
|
@ -0,0 +1,5 @@
|
||||||
|
export enum DocumentUpToDateness {
|
||||||
|
UpToDate = "UpToDate", // easiest case, the client can just show the cursors as-is
|
||||||
|
Prior = "Prior", // The cursors are outdated, so the client has to guess the cursor positions based on local updates. This is only possible if this client's cursor has once been up-to-date in a given document.
|
||||||
|
Later = "Later" // The cursors are from a future version of a document, there's no way we can accuratly show them locally.
|
||||||
|
}
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
import type { ClientCursors } from "../services/types/ClientCursors";
|
import type { ClientCursors } from "../services/types/ClientCursors";
|
||||||
|
|
||||||
export interface DocumentWithMaybeOutdatedClientCursors extends ClientCursors {
|
export interface MaybeOutdatedClientCursors extends ClientCursors {
|
||||||
isOutdated: boolean;
|
isOutdated: boolean;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,17 +1,25 @@
|
||||||
|
type ResolveFunction<T> = undefined extends T
|
||||||
|
? (value?: T) => unknown
|
||||||
|
: (value: T) => unknown;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A type-safe utility function to create a Promise with resolve and reject functions.
|
* A type-safe utility function to create a Promise with resolve and reject functions.
|
||||||
* @returns A tuple containing a Promise, a resolve function, and a reject function.
|
* @returns A tuple containing a Promise, a resolve function, and a reject function.
|
||||||
*/
|
*/
|
||||||
export function createPromise<T = unknown>(): [
|
export function createPromise<T = unknown>(): [
|
||||||
Promise<T>,
|
Promise<T>,
|
||||||
(value: T) => unknown,
|
ResolveFunction<T>,
|
||||||
(error: unknown) => unknown
|
(error: unknown) => unknown
|
||||||
] {
|
] {
|
||||||
let resolve: undefined | ((resolved: T) => unknown) = undefined;
|
let resolve: undefined | ResolveFunction<T> = undefined;
|
||||||
let reject: undefined | ((error: unknown) => unknown) = undefined;
|
let reject: undefined | ((error: unknown) => unknown) = undefined;
|
||||||
|
|
||||||
const creationPromise = new Promise<T>(
|
const creationPromise = new Promise<T>(
|
||||||
(resolve_, reject_) => ((resolve = resolve_), (reject = reject_))
|
(resolve_, reject_) =>
|
||||||
|
(
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||||
|
(resolve = resolve_ as ResolveFunction<T>), (reject = reject_)
|
||||||
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
||||||
|
|
|
||||||
|
|
@ -2,8 +2,9 @@ import { Logger } from "../tracing/logger";
|
||||||
import type { RelativePath } from "../persistence/database";
|
import type { RelativePath } from "../persistence/database";
|
||||||
import { Locks } from "./locks";
|
import { Locks } from "./locks";
|
||||||
|
|
||||||
describe("Document lock", () => {
|
describe("withLock", () => {
|
||||||
const testPath: RelativePath = "test/document/path";
|
const testPath: RelativePath = "test/document/path";
|
||||||
|
const testPath2: RelativePath = "test/document/path2";
|
||||||
const logger = new Logger();
|
const logger = new Logger();
|
||||||
|
|
||||||
// eslint-disable-next-line @typescript-eslint/init-declarations
|
// eslint-disable-next-line @typescript-eslint/init-declarations
|
||||||
|
|
@ -13,77 +14,211 @@ describe("Document lock", () => {
|
||||||
locks = new Locks<RelativePath>(logger);
|
locks = new Locks<RelativePath>(logger);
|
||||||
});
|
});
|
||||||
|
|
||||||
test("should lock a document successfully", () => {
|
test("should execute function with single key lock", async () => {
|
||||||
const result = locks.tryLock(testPath);
|
let executionCount = 0;
|
||||||
expect(result).toBe(true);
|
const result = await locks.withLock(testPath, () => {
|
||||||
});
|
executionCount++;
|
||||||
|
return "success";
|
||||||
test("should not lock a document that is already locked", () => {
|
|
||||||
locks.tryLock(testPath);
|
|
||||||
const result = locks.tryLock(testPath);
|
|
||||||
expect(result).toBe(false);
|
|
||||||
});
|
|
||||||
|
|
||||||
test("should unlock a locked document", () => {
|
|
||||||
locks.tryLock(testPath);
|
|
||||||
locks.unlock(testPath);
|
|
||||||
const result = locks.tryLock(testPath);
|
|
||||||
expect(result).toBe(true);
|
|
||||||
locks.unlock(testPath);
|
|
||||||
});
|
|
||||||
|
|
||||||
test("should throw an error when unlocking a document that is not locked", () => {
|
|
||||||
expect(() => {
|
|
||||||
locks.unlock(testPath);
|
|
||||||
}).toThrow(`Key '${testPath}' is not locked, cannot unlock`);
|
|
||||||
});
|
|
||||||
|
|
||||||
test("should wait for a document lock and resolve when unlocked", async () => {
|
|
||||||
locks.tryLock(testPath);
|
|
||||||
|
|
||||||
let resolved = false;
|
|
||||||
const waitPromise = locks.waitForLock(testPath).then(() => {
|
|
||||||
resolved = true;
|
|
||||||
});
|
});
|
||||||
|
|
||||||
locks.unlock(testPath);
|
expect(result).toBe("success");
|
||||||
await waitPromise;
|
expect(executionCount).toBe(1);
|
||||||
|
|
||||||
expect(resolved).toBe(true);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
test("should resolve multiple waiters in FIFO order", async () => {
|
test("should execute async function with single key lock", async () => {
|
||||||
locks.tryLock(testPath);
|
let executionCount = 0;
|
||||||
|
const result = await locks.withLock(testPath, async () => {
|
||||||
let firstResolved = false;
|
executionCount++;
|
||||||
let secondResolved = false;
|
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||||
let thirdResolved = false;
|
return "async-success";
|
||||||
|
|
||||||
const firstWaitPromise = locks.waitForLock(testPath).then(() => {
|
|
||||||
firstResolved = true;
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const secondWaitPromise = locks.waitForLock(testPath).then(() => {
|
expect(result).toBe("async-success");
|
||||||
secondResolved = true;
|
expect(executionCount).toBe(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("should execute function with multiple key locks", async () => {
|
||||||
|
let executionCount = 0;
|
||||||
|
const result = await locks.withLock([testPath, testPath2], () => {
|
||||||
|
executionCount++;
|
||||||
|
return "multi-success";
|
||||||
});
|
});
|
||||||
|
|
||||||
const thirdWaitPromise = locks.waitForLock(testPath).then(() => {
|
expect(result).toBe("multi-success");
|
||||||
thirdResolved = true;
|
expect(executionCount).toBe(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("should sort multiple keys to prevent deadlocks", async () => {
|
||||||
|
const executionOrder: string[] = [];
|
||||||
|
|
||||||
|
// Start two concurrent operations with keys in different orders
|
||||||
|
const promise1 = locks.withLock([testPath2, testPath], async () => {
|
||||||
|
executionOrder.push("operation1-start");
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||||
|
executionOrder.push("operation1-end");
|
||||||
|
return "result1";
|
||||||
});
|
});
|
||||||
|
|
||||||
locks.unlock(testPath);
|
const promise2 = locks.withLock([testPath, testPath2], async () => {
|
||||||
await firstWaitPromise;
|
executionOrder.push("operation2-start");
|
||||||
expect(firstResolved).toBe(true);
|
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||||
expect(secondResolved).toBe(false);
|
executionOrder.push("operation2-end");
|
||||||
expect(thirdResolved).toBe(false);
|
return "result2";
|
||||||
|
});
|
||||||
|
|
||||||
locks.unlock(testPath);
|
const [result1, result2] = await Promise.all([promise1, promise2]);
|
||||||
await secondWaitPromise;
|
|
||||||
expect(secondResolved).toBe(true);
|
|
||||||
expect(thirdResolved).toBe(false);
|
|
||||||
|
|
||||||
locks.unlock(testPath);
|
expect(result1).toBe("result1");
|
||||||
await thirdWaitPromise;
|
expect(result2).toBe("result2");
|
||||||
expect(thirdResolved).toBe(true);
|
// One operation should complete entirely before the other starts
|
||||||
|
expect(executionOrder).toEqual([
|
||||||
|
"operation1-start",
|
||||||
|
"operation1-end",
|
||||||
|
"operation2-start",
|
||||||
|
"operation2-end"
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("should serialize access to same key", async () => {
|
||||||
|
const executionOrder: string[] = [];
|
||||||
|
|
||||||
|
const promise1 = locks.withLock(testPath, async () => {
|
||||||
|
executionOrder.push("operation1-start");
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||||
|
executionOrder.push("operation1-end");
|
||||||
|
return "result1";
|
||||||
|
});
|
||||||
|
|
||||||
|
const promise2 = locks.withLock(testPath, async () => {
|
||||||
|
executionOrder.push("operation2-start");
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 30));
|
||||||
|
executionOrder.push("operation2-end");
|
||||||
|
return "result2";
|
||||||
|
});
|
||||||
|
|
||||||
|
const [result1, result2] = await Promise.all([promise1, promise2]);
|
||||||
|
|
||||||
|
expect(result1).toBe("result1");
|
||||||
|
expect(result2).toBe("result2");
|
||||||
|
expect(executionOrder).toEqual([
|
||||||
|
"operation1-start",
|
||||||
|
"operation1-end",
|
||||||
|
"operation2-start",
|
||||||
|
"operation2-end"
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("should allow concurrent access to different keys", async () => {
|
||||||
|
const executionOrder: string[] = [];
|
||||||
|
|
||||||
|
const promise1 = locks.withLock(testPath, async () => {
|
||||||
|
executionOrder.push("operation1-start");
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||||
|
executionOrder.push("operation1-end");
|
||||||
|
return "result1";
|
||||||
|
});
|
||||||
|
|
||||||
|
const promise2 = locks.withLock(testPath2, async () => {
|
||||||
|
executionOrder.push("operation2-start");
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 30));
|
||||||
|
executionOrder.push("operation2-end");
|
||||||
|
return "result2";
|
||||||
|
});
|
||||||
|
|
||||||
|
const [result1, result2] = await Promise.all([promise1, promise2]);
|
||||||
|
|
||||||
|
expect(result1).toBe("result1");
|
||||||
|
expect(result2).toBe("result2");
|
||||||
|
// Both operations should run concurrently
|
||||||
|
expect(executionOrder[0]).toBe("operation1-start");
|
||||||
|
expect(executionOrder[1]).toBe("operation2-start");
|
||||||
|
});
|
||||||
|
|
||||||
|
test("should release locks even if function throws", async () => {
|
||||||
|
const error = new Error("test error");
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
locks.withLock(testPath, () => {
|
||||||
|
throw error;
|
||||||
|
})
|
||||||
|
).rejects.toThrow("test error");
|
||||||
|
|
||||||
|
// Lock should be released, allowing another operation
|
||||||
|
const result = await locks.withLock(
|
||||||
|
testPath,
|
||||||
|
() => "success-after-error"
|
||||||
|
);
|
||||||
|
expect(result).toBe("success-after-error");
|
||||||
|
});
|
||||||
|
|
||||||
|
test("should release locks even if async function throws", async () => {
|
||||||
|
const error = new Error("async test error");
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
locks.withLock(testPath, async () => {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||||
|
throw error;
|
||||||
|
})
|
||||||
|
).rejects.toThrow("async test error");
|
||||||
|
|
||||||
|
// Lock should be released, allowing another operation
|
||||||
|
const result = await locks.withLock(
|
||||||
|
testPath,
|
||||||
|
() => "success-after-async-error"
|
||||||
|
);
|
||||||
|
expect(result).toBe("success-after-async-error");
|
||||||
|
});
|
||||||
|
|
||||||
|
test("should handle empty array of keys", async () => {
|
||||||
|
const result = await locks.withLock([], () => "empty-keys");
|
||||||
|
expect(result).toBe("empty-keys");
|
||||||
|
});
|
||||||
|
|
||||||
|
test("should maintain FIFO order for multiple waiters", async () => {
|
||||||
|
const executionOrder: string[] = [];
|
||||||
|
|
||||||
|
// Start first operation that holds the lock
|
||||||
|
const firstPromise = locks.withLock(testPath, async () => {
|
||||||
|
executionOrder.push("first-start");
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||||
|
executionOrder.push("first-end");
|
||||||
|
return "first";
|
||||||
|
});
|
||||||
|
|
||||||
|
// Small delay to ensure first operation starts
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||||
|
|
||||||
|
// Queue second and third operations
|
||||||
|
const secondPromise = locks.withLock(testPath, async () => {
|
||||||
|
executionOrder.push("second-start");
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 30));
|
||||||
|
executionOrder.push("second-end");
|
||||||
|
return "second";
|
||||||
|
});
|
||||||
|
|
||||||
|
const thirdPromise = locks.withLock(testPath, async () => {
|
||||||
|
executionOrder.push("third-start");
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||||
|
executionOrder.push("third-end");
|
||||||
|
return "third";
|
||||||
|
});
|
||||||
|
|
||||||
|
const [first, second, third] = await Promise.all([
|
||||||
|
firstPromise,
|
||||||
|
secondPromise,
|
||||||
|
thirdPromise
|
||||||
|
]);
|
||||||
|
|
||||||
|
expect(first).toBe("first");
|
||||||
|
expect(second).toBe("second");
|
||||||
|
expect(third).toBe("third");
|
||||||
|
expect(executionOrder).toEqual([
|
||||||
|
"first-start",
|
||||||
|
"first-end",
|
||||||
|
"second-start",
|
||||||
|
"second-end",
|
||||||
|
"third-start",
|
||||||
|
"third-end"
|
||||||
|
]);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,54 @@ export class Locks<T> {
|
||||||
/** Queue of resolve functions waiting for each key */
|
/** Queue of resolve functions waiting for each key */
|
||||||
private readonly waiters = new Map<T, (() => unknown)[]>();
|
private readonly waiters = new Map<T, (() => unknown)[]>();
|
||||||
|
|
||||||
public constructor(private readonly logger: Logger) {}
|
public constructor(private readonly logger?: Logger) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executes a function while holding exclusive locks on one or more keys.
|
||||||
|
*
|
||||||
|
* This method ensures that the provided function runs with exclusive access to the
|
||||||
|
* specified key(s). Multiple keys are sorted to prevent deadlocks when different
|
||||||
|
* operations request the same keys in different orders.
|
||||||
|
*
|
||||||
|
* @template R The return type of the function to execute
|
||||||
|
* @param keyOrKeys A single key or array of keys to lock during function execution
|
||||||
|
* @param fn The function to execute while holding the lock(s). Can be sync or async.
|
||||||
|
* @returns A Promise that resolves to the return value of the executed function
|
||||||
|
*
|
||||||
|
* @example
|
||||||
|
* ```typescript
|
||||||
|
* // Lock a single key
|
||||||
|
* const result = await locks.withLock('file1', () => {
|
||||||
|
* // Critical section - only one operation can access 'file1' at a time
|
||||||
|
* return processFile('file1');
|
||||||
|
* });
|
||||||
|
*
|
||||||
|
* // Lock multiple keys (prevents deadlocks through consistent ordering)
|
||||||
|
* await locks.withLock(['file1', 'file2'], async () => {
|
||||||
|
* // Critical section - exclusive access to both files
|
||||||
|
* await moveFile('file1', 'file2');
|
||||||
|
* });
|
||||||
|
* ```
|
||||||
|
*
|
||||||
|
* @throws Any error thrown by the provided function will be propagated after locks are released
|
||||||
|
*/
|
||||||
|
public async withLock<R>(
|
||||||
|
keyOrKeys: T | T[],
|
||||||
|
fn: () => R | Promise<R>
|
||||||
|
): Promise<R> {
|
||||||
|
const keys = Array.isArray(keyOrKeys) ? keyOrKeys : [keyOrKeys];
|
||||||
|
keys.sort((a, b) => String(a).localeCompare(String(b))); // Ensure consistent order to prevent deadlocks
|
||||||
|
|
||||||
|
await Promise.all(keys.map(async (key) => this.waitForLock(key)));
|
||||||
|
|
||||||
|
try {
|
||||||
|
return await fn();
|
||||||
|
} finally {
|
||||||
|
keys.forEach((key) => {
|
||||||
|
this.unlock(key);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Attempts to acquire a lock immediately without waiting.
|
* Attempts to acquire a lock immediately without waiting.
|
||||||
|
|
@ -22,7 +69,7 @@ export class Locks<T> {
|
||||||
* @param key The key to lock
|
* @param key The key to lock
|
||||||
* @returns `true` if lock acquired, `false` if already locked
|
* @returns `true` if lock acquired, `false` if already locked
|
||||||
*/
|
*/
|
||||||
public tryLock(key: T): boolean {
|
private tryLock(key: T): boolean {
|
||||||
if (this.locked.has(key)) {
|
if (this.locked.has(key)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
@ -39,12 +86,12 @@ export class Locks<T> {
|
||||||
* @param key The key to wait for and lock
|
* @param key The key to wait for and lock
|
||||||
* @returns Promise that resolves when lock is acquired
|
* @returns Promise that resolves when lock is acquired
|
||||||
*/
|
*/
|
||||||
public async waitForLock(key: T): Promise<void> {
|
private async waitForLock(key: T): Promise<void> {
|
||||||
if (this.tryLock(key)) {
|
if (this.tryLock(key)) {
|
||||||
return Promise.resolve();
|
return Promise.resolve();
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.debug(`Waiting for lock on ${key}`);
|
this.logger?.debug(`Waiting for lock on ${key}`);
|
||||||
|
|
||||||
return new Promise((resolve) => {
|
return new Promise((resolve) => {
|
||||||
// DefaultDict behavior
|
// DefaultDict behavior
|
||||||
|
|
@ -65,7 +112,7 @@ export class Locks<T> {
|
||||||
* @param key The key to unlock
|
* @param key The key to unlock
|
||||||
* @throws {Error} If key is not currently locked
|
* @throws {Error} If key is not currently locked
|
||||||
*/
|
*/
|
||||||
public unlock(key: T): void {
|
private unlock(key: T): void {
|
||||||
if (!this.locked.has(key)) {
|
if (!this.locked.has(key)) {
|
||||||
throw new Error(`Key '${key}' is not locked, cannot unlock`);
|
throw new Error(`Key '${key}' is not locked, cannot unlock`);
|
||||||
}
|
}
|
||||||
|
|
@ -74,19 +121,22 @@ export class Locks<T> {
|
||||||
const nextWaiting = this.waiters.get(key)?.shift();
|
const nextWaiting = this.waiters.get(key)?.shift();
|
||||||
|
|
||||||
if (nextWaiting) {
|
if (nextWaiting) {
|
||||||
this.logger.debug(`Granted lock on ${key}`);
|
this.logger?.debug(`Granted lock on ${key}`);
|
||||||
nextWaiting();
|
nextWaiting();
|
||||||
} else {
|
} else {
|
||||||
this.locked.delete(key);
|
this.locked.delete(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
export class Lock {
|
||||||
* Clears all locks and waiters. Causes waiting operations to hang indefinitely.
|
private readonly locks: Locks<boolean>;
|
||||||
* Use with caution.
|
|
||||||
*/
|
public constructor(logger?: Logger) {
|
||||||
public reset(): void {
|
this.locks = new Locks(logger);
|
||||||
this.locked.clear();
|
}
|
||||||
this.waiters.clear();
|
|
||||||
|
public async withLock<R>(fn: () => R | Promise<R>): Promise<R> {
|
||||||
|
return this.locks.withLock(true, fn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -37,7 +37,7 @@ export class MockClient implements FileSystemOperations {
|
||||||
fs: this,
|
fs: this,
|
||||||
persistence: {
|
persistence: {
|
||||||
load: async () => this.data,
|
load: async () => this.data,
|
||||||
save: async (data) => (this.data = data)
|
save: async (data) => void (this.data = data)
|
||||||
},
|
},
|
||||||
fetch: fetchImplementation,
|
fetch: fetchImplementation,
|
||||||
webSocket: webSocketImplementation
|
webSocket: webSocketImplementation
|
||||||
|
|
|
||||||
|
|
@ -25,15 +25,18 @@ export function flakyWebSocketFactory(
|
||||||
|
|
||||||
public set onmessage(callback: (event: MessageEvent) => void) {
|
public set onmessage(callback: (event: MessageEvent) => void) {
|
||||||
super.onmessage = async (event: MessageEvent): Promise<void> => {
|
super.onmessage = async (event: MessageEvent): Promise<void> => {
|
||||||
await this.locks.waitForLock(FlakyWebSocket.RECEIVE_KEY);
|
return this.locks.withLock(
|
||||||
|
FlakyWebSocket.RECEIVE_KEY,
|
||||||
|
async () => {
|
||||||
|
if (jitterScaleInSeconds > 0) {
|
||||||
|
await sleep(
|
||||||
|
Math.random() * jitterScaleInSeconds * 1000
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
if (jitterScaleInSeconds > 0) {
|
callback(event);
|
||||||
await sleep(Math.random() * jitterScaleInSeconds * 1000);
|
}
|
||||||
}
|
);
|
||||||
|
|
||||||
callback(event);
|
|
||||||
|
|
||||||
this.locks.unlock(FlakyWebSocket.RECEIVE_KEY);
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -67,15 +70,13 @@ export function flakyWebSocketFactory(
|
||||||
data: string | ArrayBufferLike | Blob | ArrayBufferView
|
data: string | ArrayBufferLike | Blob | ArrayBufferView
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
// maintain message order
|
// maintain message order
|
||||||
await this.locks.waitForLock(FlakyWebSocket.SEND_KEY);
|
return this.locks.withLock(FlakyWebSocket.SEND_KEY, async () => {
|
||||||
|
if (jitterScaleInSeconds > 0) {
|
||||||
|
await sleep(Math.random() * jitterScaleInSeconds * 1000);
|
||||||
|
}
|
||||||
|
|
||||||
if (jitterScaleInSeconds > 0) {
|
super.send(data);
|
||||||
await sleep(Math.random() * jitterScaleInSeconds * 1000);
|
});
|
||||||
}
|
|
||||||
|
|
||||||
super.send(data);
|
|
||||||
|
|
||||||
this.locks.unlock(FlakyWebSocket.SEND_KEY);
|
|
||||||
}
|
}
|
||||||
} as unknown as typeof WebSocket;
|
} as unknown as typeof WebSocket;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
27
scripts/check.sh
Executable file
27
scripts/check.sh
Executable file
|
|
@ -0,0 +1,27 @@
|
||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
echo "Running checks in sync-server"
|
||||||
|
cd sync-server
|
||||||
|
cargo clippy --all-targets --all-features
|
||||||
|
cargo fmt --all -- --check
|
||||||
|
cargo machete
|
||||||
|
cargo test --verbose
|
||||||
|
|
||||||
|
echo "Running checks in frontend"
|
||||||
|
cd ../frontend
|
||||||
|
npm ci
|
||||||
|
npm run build
|
||||||
|
npm run lint
|
||||||
|
npm run test
|
||||||
|
|
||||||
|
if [[ $(git status --porcelain) ]]; then
|
||||||
|
git status --porcelain
|
||||||
|
echo "Failing CI because the working directory is not clean after linting"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "Success"
|
||||||
|
|
||||||
|
cd ..
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "sync_server"
|
name = "sync_server"
|
||||||
rust-version = "1.87.0"
|
rust-version = "1.89.0"
|
||||||
authors = ["Andras Schmelczer <andras@schmelczer.dev>"]
|
authors = ["Andras Schmelczer <andras@schmelczer.dev>"]
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
[toolchain]
|
[toolchain]
|
||||||
channel = "nightly-2025-06-06"
|
channel = "1.89.0"
|
||||||
targets = [ "x86_64-unknown-linux-gnu", "x86_64-unknown-linux-musl" ]
|
targets = [ "x86_64-unknown-linux-gnu", "x86_64-unknown-linux-musl" ]
|
||||||
profile = "default"
|
profile = "default"
|
||||||
|
|
|
||||||
|
|
@ -1,8 +0,0 @@
|
||||||
imports_granularity = "crate"
|
|
||||||
condense_wildcard_suffixes = true
|
|
||||||
fn_single_line = true
|
|
||||||
format_strings = true
|
|
||||||
reorder_impl_items = true
|
|
||||||
group_imports = "StdExternalCrate"
|
|
||||||
use_field_init_shorthand = true
|
|
||||||
wrap_comments=true
|
|
||||||
|
|
@ -47,7 +47,7 @@ impl Cursors {
|
||||||
all_device_cursors.retain(|c| &c.client_cursors.device_id != device_id);
|
all_device_cursors.retain(|c| &c.client_cursors.device_id != device_id);
|
||||||
all_device_cursors.push(ClientCursorsWithTimeToLive::new(ClientCursors {
|
all_device_cursors.push(ClientCursorsWithTimeToLive::new(ClientCursors {
|
||||||
user_name,
|
user_name,
|
||||||
device_id: device_id.to_string(),
|
device_id: device_id.clone(),
|
||||||
documents_with_cursors: document_to_cursors,
|
documents_with_cursors: document_to_cursors,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
|
@ -126,5 +126,7 @@ impl ClientCursorsWithTimeToLive {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_expired(&self, ttl: Duration) -> bool { self.last_updated.elapsed() > ttl }
|
pub fn is_expired(&self, ttl: Duration) -> bool {
|
||||||
|
self.last_updated.elapsed() > ttl
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,9 @@ pub struct StoredDocumentVersion {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PartialEq<Self> for StoredDocumentVersion {
|
impl PartialEq<Self> for StoredDocumentVersion {
|
||||||
fn eq(&self, other: &Self) -> bool { self.vault_update_id == other.vault_update_id }
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
self.vault_update_id == other.vault_update_id
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(TS, Debug, Clone, Serialize)]
|
#[derive(TS, Debug, Clone, Serialize)]
|
||||||
|
|
|
||||||
|
|
@ -23,8 +23,13 @@ pub struct CursorPositionFromClient {
|
||||||
|
|
||||||
#[derive(TS, Serialize, Deserialize, Clone, Debug)]
|
#[derive(TS, Serialize, Deserialize, Clone, Debug)]
|
||||||
pub struct DocumentWithCursors {
|
pub struct DocumentWithCursors {
|
||||||
#[ts(as = "u32")]
|
// It's None in case the document is dirty.
|
||||||
pub vault_update_id: VaultUpdateId,
|
// We still want to sync the cursor to mark
|
||||||
|
// that it exists and can be client-side
|
||||||
|
// interpolated. However, the actual
|
||||||
|
// position is meaningless.
|
||||||
|
#[ts(as = "Option<u32>")]
|
||||||
|
pub vault_update_id: Option<VaultUpdateId>,
|
||||||
|
|
||||||
pub document_id: DocumentId,
|
pub document_id: DocumentId,
|
||||||
pub relative_path: String,
|
pub relative_path: String,
|
||||||
|
|
|
||||||
|
|
@ -179,6 +179,10 @@ async fn shutdown_signal() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_404() -> impl IntoResponse { not_found_error(anyhow!("Page not found")) }
|
async fn handle_404() -> impl IntoResponse {
|
||||||
|
not_found_error(anyhow!("Page not found"))
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_405() -> impl IntoResponse { client_error(anyhow!("Method not allowed")) }
|
async fn handle_405() -> impl IntoResponse {
|
||||||
|
client_error(anyhow!("Method not allowed"))
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,9 @@ pub struct DeviceIdHeader(pub String);
|
||||||
pub static DEVICE_ID_HEADER_NAME: HeaderName = HeaderName::from_static("device-id");
|
pub static DEVICE_ID_HEADER_NAME: HeaderName = HeaderName::from_static("device-id");
|
||||||
|
|
||||||
impl Header for DeviceIdHeader {
|
impl Header for DeviceIdHeader {
|
||||||
fn name() -> &'static HeaderName { &DEVICE_ID_HEADER_NAME }
|
fn name() -> &'static HeaderName {
|
||||||
|
&DEVICE_ID_HEADER_NAME
|
||||||
|
}
|
||||||
|
|
||||||
fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
|
fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
|
||||||
where
|
where
|
||||||
|
|
@ -26,7 +28,7 @@ impl Header for DeviceIdHeader {
|
||||||
where
|
where
|
||||||
E: Extend<HeaderValue>,
|
E: Extend<HeaderValue>,
|
||||||
{
|
{
|
||||||
let value = HeaderValue::from_static(Box::leak(self.0.to_string().into_boxed_str()));
|
let value = HeaderValue::from_static(Box::leak(self.0.clone().into_boxed_str()));
|
||||||
|
|
||||||
values.extend(std::iter::once(value));
|
values.extend(std::iter::once(value));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,4 +8,6 @@ where
|
||||||
Ok(normalize_string(&s))
|
Ok(normalize_string(&s))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn normalize_string(s: &str) -> String { s.trim().to_lowercase() }
|
pub fn normalize_string(s: &str) -> String {
|
||||||
|
s.trim().to_lowercase()
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue