diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index cb54ca89..0f0d18e1 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -30,32 +30,5 @@ jobs: sqlx database create --database-url sqlite://db.sqlite3 sqlx migrate run --source src/app_state/database/migrations --database-url sqlite://db.sqlite3 - - name: Lint sync-server - run: | - cd sync-server - cargo clippy --all-targets --all-features - cargo fmt --all -- --check - cargo machete - - - name: Test sync-server - run: | - cd sync-server - cargo test --verbose - - - name: Lint frontend - run: | - cd frontend - npm ci - npm run build - npm run lint - if [[ $(git status --porcelain) ]]; then - git status --porcelain - echo "Failing CI because the working directory is not clean after linting" - exit 1 - fi - - - name: Test frontend - run: | - cd frontend - npm ci - npm run test + - name: Lint & test + run: scripts/check.sh diff --git a/frontend/obsidian-plugin/package.json b/frontend/obsidian-plugin/package.json index 24a95e03..0d004408 100644 --- a/frontend/obsidian-plugin/package.json +++ b/frontend/obsidian-plugin/package.json @@ -34,6 +34,7 @@ "url": "^0.11.4", "virtual-scroller": "^1.13.1", "webpack": "^5.99.9", - "webpack-cli": "^6.0.1" + "webpack-cli": "^6.0.1", + "reconcile-text": "^0.5.0" } } diff --git a/frontend/obsidian-plugin/src/debugging/flaky-websocket-factory.ts b/frontend/obsidian-plugin/src/debugging/flaky-websocket-factory.ts index 56310aa6..f59cce19 100644 --- a/frontend/obsidian-plugin/src/debugging/flaky-websocket-factory.ts +++ b/frontend/obsidian-plugin/src/debugging/flaky-websocket-factory.ts @@ -24,15 +24,18 @@ export function flakyWebSocketFactory( public set onmessage(callback: (event: MessageEvent) => void) { super.onmessage = async (event: MessageEvent): Promise => { - await this.locks.waitForLock(FlakyWebSocket.RECEIVE_KEY); + await this.locks.withLock( + FlakyWebSocket.RECEIVE_KEY, + async () => { + if (jitterScaleInSeconds > 0) { + await sleep( + Math.random() * jitterScaleInSeconds * 1000 + ); + } - if (jitterScaleInSeconds > 0) { - await sleep(Math.random() * jitterScaleInSeconds * 1000); - } - - callback(event); - - this.locks.unlock(FlakyWebSocket.RECEIVE_KEY); + callback(event); + } + ); }; } @@ -66,15 +69,12 @@ export function flakyWebSocketFactory( data: string | ArrayBufferLike | Blob | ArrayBufferView ): Promise { // maintain message order - await this.locks.waitForLock(FlakyWebSocket.SEND_KEY); - - if (jitterScaleInSeconds > 0) { - await sleep(Math.random() * jitterScaleInSeconds * 1000); - } - - super.send(data); - - this.locks.unlock(FlakyWebSocket.SEND_KEY); + await this.locks.withLock(FlakyWebSocket.SEND_KEY, async () => { + if (jitterScaleInSeconds > 0) { + await sleep(Math.random() * jitterScaleInSeconds * 1000); + } + super.send(data); + }); } } as unknown as typeof WebSocket; } diff --git a/frontend/obsidian-plugin/src/vault-link-plugin.ts b/frontend/obsidian-plugin/src/vault-link-plugin.ts index 0d4c1a1d..7e0eff1b 100644 --- a/frontend/obsidian-plugin/src/vault-link-plugin.ts +++ b/frontend/obsidian-plugin/src/vault-link-plugin.ts @@ -19,7 +19,7 @@ import { updateEditorStatusDisplay } from "./views/editor-sync-line/editor-sync- import { remoteCursorsTheme } from "./views/cursors/remote-cursor-theme"; import { remoteCursorsPlugin, - setCursors + RemoteCursorsPluginValue } from "./views/cursors/remote-cursors-plugin"; import { LocalCursorUpdateListener } from "./views/cursors/local-cursor-update-listener"; import { slowFetchFactory } from "./debugging/slow-fetch-factory"; @@ -93,7 +93,7 @@ export default class VaultLinkPlugin extends Plugin { this.registerEditorExtension([remoteCursorsTheme, remoteCursorsPlugin]); this.client.addRemoteCursorsUpdateListener((cursors) => { - setCursors(cursors, this.app); + RemoteCursorsPluginValue.setCursors(cursors, this.app); }); const cursorListener = new LocalCursorUpdateListener( diff --git a/frontend/obsidian-plugin/src/views/cursors/remote-cursors-plugin.ts b/frontend/obsidian-plugin/src/views/cursors/remote-cursors-plugin.ts index 661aa452..5dff2c59 100644 --- a/frontend/obsidian-plugin/src/views/cursors/remote-cursors-plugin.ts +++ b/frontend/obsidian-plugin/src/views/cursors/remote-cursors-plugin.ts @@ -9,104 +9,201 @@ import type { ViewUpdate } from "@codemirror/view"; import { RemoteCursorWidget } from "./remote-cursor-widget"; -import type { - CursorSpan, - DocumentWithMaybeOutdatedClientCursors -} from "sync-client"; +import type { CursorSpan, MaybeOutdatedClientCursors } from "sync-client"; import type { App } from "obsidian"; import { MarkdownView } from "obsidian"; -let cursors: { - name: string; - path: string; - span: CursorSpan; - deviceId: string; -}[] = []; - import { StateEffect } from "@codemirror/state"; import { getRandomColor } from "src/utils/get-random-color"; -import { updateSelection } from "./update-selection"; +import type { SpanWithHistory } from "reconcile-text"; +import { reconcileWithHistory } from "reconcile-text"; + +function findWhereToMoveCursor( + cursor: number, + spans: SpanWithHistory[] +): number | null { + let position = 0; + for (const span of spans) { + // left and origin are the same + if (position === cursor && span.history === "AddedFromRight") { + return position + span.text.length; + } + position += span.text.length; + if (position === cursor && span.history === "RemovedFromRight") { + return position - span.text.length; + } + } + + return null; +} const forceUpdate = StateEffect.define(); export class RemoteCursorsPluginValue implements PluginValue { + private static cursors: { + name: string; + path: string; + span: CursorSpan; + deviceId: string; + isOutdated: boolean; + }[] = []; + public decorations: DecorationSet = RangeSet.of([]); - public update(update: ViewUpdate): void { - update.changes.iterChanges((fromA, toA, fromB, toB, _inserted) => { - const spans = cursors.map((cursor) => cursor.span); - updateSelection({ - fromA, - toA, - fromB, - toB, - spans + public static setCursors( + clients: MaybeOutdatedClientCursors[], + app: App + ): void { + RemoteCursorsPluginValue.cursors = [ + ...RemoteCursorsPluginValue.cursors.filter(({ deviceId }) => + clients.some( + (client) => + client.deviceId === deviceId && client.isOutdated + ) + ), + ...clients + .filter( + ({ isOutdated, deviceId }) => + !isOutdated || + RemoteCursorsPluginValue.cursors.every( + (c) => deviceId !== c.deviceId + ) + ) + .flatMap((client) => { + const clientCursors = client.documentsWithCursors; + return clientCursors.flatMap((cursor) => + cursor.cursors.map((span) => ({ + name: client.userName, + path: cursor.relative_path, + deviceId: client.deviceId, + isOutdated: client.isOutdated, + span: { ...span } + })) + ); + }) + ]; + + app.workspace + .getLeavesOfType("markdown") + .map((leaf) => leaf.view) + .filter((view) => view instanceof MarkdownView) + .forEach((view) => { + // @ts-expect-error, not typed + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + const editor = view.editor.cm as EditorView; + + editor.dispatch({ + effects: [forceUpdate.of(null)] + }); }); + } + + public update(update: ViewUpdate): void { + const original = update.startState.doc.toString(); + const edited = update.state.doc.toString(); + + const updatedPositions: number[] = []; + const reconciled = reconcileWithHistory( + original, + { + text: original, + cursors: RemoteCursorsPluginValue.cursors.flatMap( + ({ span }, i) => [ + { id: i * 2, position: span.start }, + { id: i * 2 + 1, position: span.end } + ] + ) + }, + edited, + "Character" + ); + + reconciled.cursors.forEach(({ id, position }) => { + const whereToJump = findWhereToMoveCursor( + position, + reconciled.history + ); + if (whereToJump !== null) { + updatedPositions[id] = whereToJump; + } else { + updatedPositions[id] = position; + } + }); + + RemoteCursorsPluginValue.cursors.forEach(({ span }, i) => { + span.start = updatedPositions[i * 2]; + span.end = updatedPositions[i * 2 + 1]; }); const decorations: Range[] = []; - cursors.forEach(({ name, span: { start, end } }) => { - const color = getRandomColor(name); - const startLine = update.view.state.doc.lineAt(start); - const endLine = update.view.state.doc.lineAt(end); + RemoteCursorsPluginValue.cursors.forEach( + ({ name, span: { start, end } }) => { + const color = getRandomColor(name); + const startLine = update.view.state.doc.lineAt(start); + const endLine = update.view.state.doc.lineAt(end); - const attributes = { - style: `background-color: ${color};` - }; + const attributes = { + style: `background-color: ${color};` + }; - if (startLine.number === endLine.number) { - // selected content in a single line. - decorations.push({ - from: start, - to: end, - value: Decoration.mark({ - attributes - }) - }); - } else { - // selected content in multiple lines - // first, render text-selection in the first line - decorations.push({ - from: start, - to: startLine.from + startLine.length, - value: Decoration.mark({ - attributes - }) - }); - - // render text-selection in the lines between the first and last line - for (let i = startLine.number + 1; i < endLine.number; i++) { - const currentLine = update.view.state.doc.line(i); + if (startLine.number === endLine.number) { + // selected content in a single line. decorations.push({ - from: currentLine.from, - to: currentLine.to, + from: start, + to: end, + value: Decoration.mark({ + attributes + }) + }); + } else { + // selected content in multiple lines + // first, render text-selection in the first line + decorations.push({ + from: start, + to: startLine.from + startLine.length, + value: Decoration.mark({ + attributes + }) + }); + + // render text-selection in the lines between the first and last line + for ( + let i = startLine.number + 1; + i < endLine.number; + i++ + ) { + const currentLine = update.view.state.doc.line(i); + decorations.push({ + from: currentLine.from, + to: currentLine.to, + value: Decoration.mark({ + attributes + }) + }); + } + + // render text-selection in the last line + decorations.push({ + from: endLine.from, + to: end, value: Decoration.mark({ attributes }) }); } - // render text-selection in the last line decorations.push({ - from: endLine.from, + from: end, to: end, - value: Decoration.mark({ - attributes + value: Decoration.widget({ + side: end - start > 0 ? -1 : 1, // the local cursor should be rendered outside the remote selection + block: false, + widget: new RemoteCursorWidget(color, name) }) }); } - - decorations.push({ - from: end, - to: end, - value: Decoration.widget({ - side: end - start > 0 ? -1 : 1, // the local cursor should be rendered outside the remote selection - block: false, - widget: new RemoteCursorWidget(color, name) - }) - }); - }); + ); this.decorations = Decoration.set(decorations, true); } @@ -118,43 +215,3 @@ export const remoteCursorsPlugin = ViewPlugin.fromClass( decorations: (v) => v.decorations } ); - -export function setCursors( - clients: DocumentWithMaybeOutdatedClientCursors[], - app: App -): void { - cursors = [ - ...cursors.filter(({ deviceId }) => - clients.some( - (client) => client.deviceId === deviceId && client.isOutdated - ) - ), - ...clients - .filter(({ isOutdated }) => !isOutdated) - .flatMap((client) => { - const clientCursors = client.documentsWithCursors; - return clientCursors.flatMap((cursor) => - cursor.cursors.map((span) => ({ - name: client.userName, - path: cursor.relative_path, - deviceId: client.deviceId, - span - })) - ); - }) - ]; - - app.workspace - .getLeavesOfType("markdown") - .map((leaf) => leaf.view) - .filter((view) => view instanceof MarkdownView) - .forEach((view) => { - // @ts-expect-error, not typed - // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion - const editor = view.editor.cm as EditorView; - - editor.dispatch({ - effects: [forceUpdate.of(null)] - }); - }); -} diff --git a/frontend/obsidian-plugin/src/views/cursors/update-selection.test.ts b/frontend/obsidian-plugin/src/views/cursors/update-selection.test.ts deleted file mode 100644 index 991ff76b..00000000 --- a/frontend/obsidian-plugin/src/views/cursors/update-selection.test.ts +++ /dev/null @@ -1,111 +0,0 @@ -import { updateSelection } from "./update-selection"; - -describe("Selection update", () => { - it("should handle span fully before - insert", () => { - const spans = [{ start: 3, end: 5 }]; - updateSelection({ - fromA: 0, - toA: 0, - fromB: 0, - toB: 2, - spans - }); - expect(spans).toEqual([{ start: 5, end: 7 }]); - }); - - it("should handle span fully before - delete", () => { - const spans = [{ start: 3, end: 5 }]; - updateSelection({ - fromA: 0, - toA: 2, - fromB: 0, - toB: 0, - spans - }); - expect(spans).toEqual([{ start: 1, end: 3 }]); - }); - - it("should handle span fully after - insert", () => { - const spans = [{ start: 3, end: 5 }]; - updateSelection({ - fromA: 6, - toA: 6, - fromB: 6, - toB: 10, - spans - }); - expect(spans).toEqual([{ start: 3, end: 5 }]); - }); - - it("should handle span fully after - delete", () => { - const spans = [{ start: 3, end: 5 }]; - updateSelection({ - fromA: 6, - toA: 10, - fromB: 6, - toB: 6, - spans - }); - expect(spans).toEqual([{ start: 3, end: 5 }]); - }); - - it("should handle span fully within - insert", () => { - const spans = [{ start: 3, end: 5 }]; - updateSelection({ - fromA: 4, - toA: 4, - fromB: 4, - toB: 6, - spans - }); - expect(spans).toEqual([{ start: 3, end: 7 }]); - }); - - it("should handle span fully within - delete", () => { - const spans = [{ start: 3, end: 5 }]; - updateSelection({ - fromA: 4, - toA: 5, - fromB: 4, - toB: 4, - spans - }); - expect(spans).toEqual([{ start: 3, end: 4 }]); - }); - - it("should handle span overlapping with start", () => { - const spans = [{ start: 3, end: 5 }]; - updateSelection({ - fromA: 2, - toA: 4, - fromB: 2, - toB: 2, - spans - }); - expect(spans).toEqual([{ start: 2, end: 4 }]); - }); - - it("should handle span overlapping with end", () => { - const spans = [{ start: 3, end: 5 }]; - updateSelection({ - fromA: 4, - toA: 6, - fromB: 4, - toB: 4, - spans - }); - expect(spans).toEqual([{ start: 3, end: 4 }]); - }); - - it("delete entire selection", () => { - const spans = [{ start: 3, end: 5 }]; - updateSelection({ - fromA: 0, - toA: 10, - fromB: 0, - toB: 0, - spans - }); - expect(spans).toEqual([{ start: 0, end: 0 }]); - }); -}); diff --git a/frontend/obsidian-plugin/src/views/cursors/update-selection.ts b/frontend/obsidian-plugin/src/views/cursors/update-selection.ts deleted file mode 100644 index 9ff7c207..00000000 --- a/frontend/obsidian-plugin/src/views/cursors/update-selection.ts +++ /dev/null @@ -1,40 +0,0 @@ -import type { CursorSpan } from "sync-client"; - -export const updateSelection = ({ - fromA, - toA, - toB, - spans -}: { - fromA: number; - toA: number; - fromB: number; - toB: number; - spans: CursorSpan[]; -}): void => { - spans.forEach((span) => { - if (fromA <= span.start) { - // the change covers the entirety of the selection - if (toA > span.end) { - span.start = toB; - span.end = toB; - return; - } - - let change = toB - toA; - if (change < 0) { - // it's a deletion - // if overlaps with the start, we can't move it back more than the deleted range - change = Math.max(change, fromA - span.start); - } - - span.start += change; - span.end += change; - } else if (toA <= span.end) { - span.end += toB - toA; - } else if (toB <= span.end) { - // a deletion overlaps with the end, so we move the end - span.end = toB; - } - }); -}; diff --git a/frontend/obsidian-plugin/src/views/editor-sync-line/editor-sync-line.ts b/frontend/obsidian-plugin/src/views/editor-sync-line/editor-sync-line.ts index 67750687..78ef1bd8 100644 --- a/frontend/obsidian-plugin/src/views/editor-sync-line/editor-sync-line.ts +++ b/frontend/obsidian-plugin/src/views/editor-sync-line/editor-sync-line.ts @@ -1,7 +1,7 @@ import type { Workspace } from "obsidian"; import { FileView, setIcon } from "obsidian"; import type { SyncClient } from "sync-client"; -import { DocumentUpdateStatus } from "sync-client"; +import { DocumentSyncStatus } from "sync-client"; import "./editor-sync-line.scss"; export function updateEditorStatusDisplay( @@ -35,7 +35,7 @@ export function updateEditorStatusDisplay( const isLoading = client.getDocumentSyncingStatus(filePath) == - DocumentUpdateStatus.SYNCING; + DocumentSyncStatus.SYNCING; if (isLoading) { element.classList.add("loading"); diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 36221400..f73ac157 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -6373,6 +6373,7 @@ "jest": "^29.7.0", "mini-css-extract-plugin": "^2.9.2", "obsidian": "1.8.7", + "reconcile-text": "^0.5.0", "resolve-url-loader": "^5.0.0", "sass": "^1.89.1", "sass-loader": "^16.0.5", diff --git a/frontend/sync-client/src/file-operations/safe-filesystem-operations.ts b/frontend/sync-client/src/file-operations/safe-filesystem-operations.ts index 214f9f6e..2b1f908a 100644 --- a/frontend/sync-client/src/file-operations/safe-filesystem-operations.ts +++ b/frontend/sync-client/src/file-operations/safe-filesystem-operations.ts @@ -31,16 +31,17 @@ export class SafeFileSystemOperations implements FileSystemOperations { this.logger.debug(`Reading file '${path}'`); return this.safeOperation( path, - this.decorateToHoldLock(path, async () => this.fs.read(path)), + async () => + this.locks.withLock(path, async () => this.fs.read(path)), "read" ); } public async write(path: RelativePath, content: Uint8Array): Promise { this.logger.debug(`Writing to file '${path}'`); - return this.decorateToHoldLock(path, async () => + return this.locks.withLock(path, async () => this.fs.write(path, content) - )(); + ); } public async atomicUpdateText( @@ -50,9 +51,10 @@ export class SafeFileSystemOperations implements FileSystemOperations { this.logger.debug(`Atomically updating file '${path}'`); return this.safeOperation( path, - this.decorateToHoldLock(path, async () => - this.fs.atomicUpdateText(path, updater) - ), + async () => + this.locks.withLock(path, async () => + this.fs.atomicUpdateText(path, updater) + ), "atomicUpdateText" ); } @@ -61,32 +63,29 @@ export class SafeFileSystemOperations implements FileSystemOperations { // Logging this would be too noisy return this.safeOperation( path, - this.decorateToHoldLock(path, async () => - this.fs.getFileSize(path) - ), + async () => + this.locks.withLock(path, async () => + this.fs.getFileSize(path) + ), "getFileSize" ); } public async exists(path: RelativePath): Promise { this.logger.debug(`Checking if file '${path}' exists`); - return this.decorateToHoldLock(path, async () => - this.fs.exists(path) - )(); + return this.locks.withLock(path, async () => this.fs.exists(path)); } public async createDirectory(path: RelativePath): Promise { this.logger.debug(`Creating directory '${path}'`); - return this.decorateToHoldLock(path, async () => + return this.locks.withLock(path, async () => this.fs.createDirectory(path) - )(); + ); } public async delete(path: RelativePath): Promise { this.logger.debug(`Deleting file '${path}'`); - return this.decorateToHoldLock(path, async () => - this.fs.delete(path) - )(); + return this.locks.withLock(path, async () => this.fs.delete(path)); } public async rename( @@ -96,43 +95,14 @@ export class SafeFileSystemOperations implements FileSystemOperations { this.logger.debug(`Renaming file '${oldPath}' to '${newPath}'`); return this.safeOperation( oldPath, - this.decorateToHoldLock([oldPath, newPath], async () => - this.fs.rename(oldPath, newPath) - ), + async () => + this.locks.withLock([oldPath, newPath], async () => + this.fs.rename(oldPath, newPath) + ), "rename" ); } - /** - * Decorate an operation to ensure that the file is locked before running it - * and that the lock is released afterwards. This results in at-most one - * concurrent operation running per file. - */ - private decorateToHoldLock( - pathOrPaths: RelativePath | RelativePath[], - operation: () => Promise - ): () => Promise { - return async () => { - const paths = Array.isArray(pathOrPaths) - ? pathOrPaths - : [pathOrPaths]; - - await Promise.all( - paths.map(async (path) => this.locks.waitForLock(path)) - ); - - try { - return await operation(); - } finally { - await Promise.all( - paths.map((path) => { - this.locks.unlock(path); - }) - ); - } - }; - } - /** * Decorate an operation to ensure that the file exists before running it. * If the operation fails, it will check if the file still exists and throw diff --git a/frontend/sync-client/src/index.ts b/frontend/sync-client/src/index.ts index ce903f25..00b19940 100644 --- a/frontend/sync-client/src/index.ts +++ b/frontend/sync-client/src/index.ts @@ -18,8 +18,8 @@ export type { PersistenceProvider } from "./persistence/persistence"; export type { CursorSpan } from "./services/types/CursorSpan"; export type { ClientCursors } from "./services/types/ClientCursors"; export type { NetworkConnectionStatus } from "./types/network-connection-status"; -export type { DocumentWithMaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors"; -export { DocumentUpdateStatus } from "./types/document-update-status"; +export type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors"; +export { DocumentSyncStatus } from "./types/document-sync-status"; export { SyncClient } from "./sync-client"; import { Locks } from "./utils/locks"; diff --git a/frontend/sync-client/src/persistence/database.ts b/frontend/sync-client/src/persistence/database.ts index 0abefd4f..9425c629 100644 --- a/frontend/sync-client/src/persistence/database.ts +++ b/frontend/sync-client/src/persistence/database.ts @@ -37,7 +37,7 @@ export interface DocumentRecord { documentId: DocumentId; metadata: DocumentMetadata | undefined; isDeleted: boolean; - updates: Promise[]; + updates: Promise[]; parallelVersion: number; } @@ -135,7 +135,7 @@ export class Database { this.save(); } - public removeDocumentPromise(promise: Promise): void { + public removeDocumentPromise(promise: Promise): void { const entry = this.documents.find(({ updates }) => updates.includes(promise) ); @@ -167,7 +167,7 @@ export class Database { public async getResolvedDocumentByRelativePath( relativePath: RelativePath, - promise: Promise + promise: Promise ): Promise { const entry = this.getLatestDocumentByRelativePath(relativePath); @@ -191,7 +191,7 @@ export class Database { public createNewPendingDocument( documentId: DocumentId, relativePath: RelativePath, - promise: Promise + promise: Promise ): DocumentRecord { const previousEntry = this.getLatestDocumentByRelativePath(relativePath); diff --git a/frontend/sync-client/src/services/types/DocumentWithCursors.ts b/frontend/sync-client/src/services/types/DocumentWithCursors.ts index cbe56399..dae654c7 100644 --- a/frontend/sync-client/src/services/types/DocumentWithCursors.ts +++ b/frontend/sync-client/src/services/types/DocumentWithCursors.ts @@ -2,7 +2,7 @@ import type { CursorSpan } from "./CursorSpan"; export interface DocumentWithCursors { - vault_update_id: number; + vault_update_id: number | null; document_id: string; relative_path: string; cursors: CursorSpan[]; diff --git a/frontend/sync-client/src/services/websocket-manager.ts b/frontend/sync-client/src/services/websocket-manager.ts index 3a5b32b4..dde8f068 100644 --- a/frontend/sync-client/src/services/websocket-manager.ts +++ b/frontend/sync-client/src/services/websocket-manager.ts @@ -152,7 +152,7 @@ export class WebSocketManager { } // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition } else if (message.type === "cursorPositions") { - this.logger.info( + this.logger.debug( `Received cursor positions for ${JSON.stringify(message.clients)}` ); this.remoteCursorsUpdateListeners.forEach((listener) => { diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index c7f0ea1b..ddab8860 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -14,26 +14,16 @@ import { ConnectionStatus } from "./services/connection-status"; import { UnrestrictedSyncer } from "./sync-operations/unrestricted-syncer"; import { rateLimit } from "./utils/rate-limit"; import type { NetworkConnectionStatus } from "./types/network-connection-status"; -import { DocumentUpdateStatus } from "./types/document-update-status"; +import { DocumentSyncStatus } from "./types/document-sync-status"; import { WebSocketManager } from "./services/websocket-manager"; import { createClientId } from "./utils/create-client-id"; +import { CursorTracker } from "./sync-operations/cursor-tracker"; import type { CursorSpan } from "./services/types/CursorSpan"; -import type { ClientCursors } from "./services/types/ClientCursors"; -import type { DocumentWithCursors } from "./services/types/DocumentWithCursors"; -import { hash } from "./utils/hash"; -import type { DocumentWithMaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors"; - -enum DocumentUpToDateness { - UpToDate = "UpToDate", - Prior = "Prior", - Later = "Later" -} +import type { MaybeOutdatedClientCursors } from "./types/maybe-outdated-client-cursors"; +import { FileChangeNotifier } from "./sync-operations/file-change-notifier"; export class SyncClient { private static readonly MINIMUM_SAVE_INTERVAL_MS = 1000; - private lastCursorState: DocumentWithCursors[] = []; - - private readonly knownClientCursors: ClientCursors[] = []; // eslint-disable-next-line @typescript-eslint/max-params private constructor( @@ -45,7 +35,8 @@ export class SyncClient { private readonly webSocketManager: WebSocketManager, private readonly _logger: Logger, private readonly connectionStatus: ConnectionStatus, - private readonly fileOperations: FileOperations + private readonly cursorTracker: CursorTracker, + private readonly fileChangeNotifier: FileChangeNotifier ) { this.settings.addOnSettingsChangeListener( async (newSettings, oldSettings) => { @@ -54,10 +45,6 @@ export class SyncClient { } } ); - - this.webSocketManager.addRemoteCursorsUpdateListener((cursors) => { - this.knownClientCursors.push(...cursors); - }); } public get logger(): Logger { @@ -148,7 +135,6 @@ export class SyncClient { ); const syncer = new Syncer( - deviceId, logger, database, settings, @@ -166,6 +152,13 @@ export class SyncClient { webSocket ); + const fileChangeNotifier = new FileChangeNotifier(); + const cursorTracker = new CursorTracker( + database, + webSocketManager, + fileOperations, + fileChangeNotifier + ); const client = new SyncClient( history, settings, @@ -175,7 +168,8 @@ export class SyncClient { webSocketManager, logger, connectionStatus, - fileOperations + cursorTracker, + fileChangeNotifier ); logger.info("SyncClient initialised"); @@ -264,12 +258,14 @@ export class SyncClient { public async syncLocallyCreatedFile( relativePath: RelativePath ): Promise { + this.fileChangeNotifier.notifyOfFileChange(relativePath); return this.syncer.syncLocallyCreatedFile(relativePath); } public async syncLocallyDeletedFile( relativePath: RelativePath ): Promise { + this.fileChangeNotifier.notifyOfFileChange(relativePath); return this.syncer.syncLocallyDeletedFile(relativePath); } @@ -280,6 +276,7 @@ export class SyncClient { oldPath?: RelativePath; relativePath: RelativePath; }): Promise { + this.fileChangeNotifier.notifyOfFileChange(relativePath); return this.syncer.syncLocallyUpdatedFile({ oldPath, relativePath @@ -288,154 +285,26 @@ export class SyncClient { public getDocumentSyncingStatus( relativePath: RelativePath - ): DocumentUpdateStatus { + ): DocumentSyncStatus { const document = this.database.getLatestDocumentByRelativePath(relativePath); if (document === undefined) { - return DocumentUpdateStatus.SYNCING; + return DocumentSyncStatus.SYNCING; } return document.updates.length > 0 - ? DocumentUpdateStatus.SYNCING - : DocumentUpdateStatus.UP_TO_DATE; + ? DocumentSyncStatus.SYNCING + : DocumentSyncStatus.UP_TO_DATE; } - /// Update the local cursors for the given documents. - /// Can be called frequently as it only emits an event - // if the state has actually changed. public async updateLocalCursors( documentToCursors: Record ): Promise { - const documentsWithCursors: DocumentWithCursors[] = []; - - for (const [relativePath, cursors] of Object.entries( - documentToCursors - )) { - const record = - this.database.getLatestDocumentByRelativePath(relativePath); - - if (!record) { - continue; // Let's wait for the file to be created before sending cursors - } - - const readContent = await this.fileOperations.read(relativePath); - - if (record.metadata?.hash !== hash(readContent)) { - continue; // Wouldn't make sense to sync the positions in a dirty file - } - - documentsWithCursors.push({ - relative_path: relativePath, - document_id: record.documentId, - vault_update_id: record.metadata.parentVersionId, - cursors - }); - } - - if ( - JSON.stringify(this.lastCursorState) === - JSON.stringify(documentsWithCursors) - ) { - return; - } - - this.lastCursorState = documentsWithCursors; - - this.webSocketManager.updateLocalCursors({ documentsWithCursors }); + await this.cursorTracker.sendLocalCursorsToServer(documentToCursors); } public addRemoteCursorsUpdateListener( - listener: (cursors: DocumentWithMaybeOutdatedClientCursors[]) => unknown + listener: (cursors: MaybeOutdatedClientCursors[]) => unknown ): void { - this.webSocketManager.addRemoteCursorsUpdateListener(async () => { - listener(await this.getRelevantClientCursors()); - }); - } - - private async getRelevantClientCursors(): Promise< - DocumentWithMaybeOutdatedClientCursors[] - > { - const result: DocumentWithMaybeOutdatedClientCursors[] = []; - const included = new Set(); - for (const clientCursors of [...this.knownClientCursors].reverse()) { - if (included.has(clientCursors.deviceId)) { - continue; - } - - const upToDateness = - await this.getDocumentsUpToDateness(clientCursors); - if (upToDateness == DocumentUpToDateness.Later) { - continue; - } - - result.push({ - ...clientCursors, - isOutdated: upToDateness == DocumentUpToDateness.Prior - }); - - included.add(clientCursors.deviceId); - } - - return result; - } - - private async getDocumentsUpToDateness( - clientCursor: ClientCursors - ): Promise { - const results = []; - for (const document of clientCursor.documentsWithCursors) { - results.push(await this.getDocumentUpToDateness(document)); - } - - if ( - results.every((result) => result === DocumentUpToDateness.UpToDate) - ) { - return DocumentUpToDateness.UpToDate; - } - - if ( - results.every( - (result) => - result === DocumentUpToDateness.UpToDate || - result === DocumentUpToDateness.Prior - ) - ) { - return DocumentUpToDateness.Prior; - } - - return DocumentUpToDateness.Later; - } - - private async getDocumentUpToDateness( - document: DocumentWithCursors - ): Promise { - const record = this.database.getLatestDocumentByRelativePath( - document.relative_path - ); - - if (!record) { - // the document of the cursor must be from the future - return DocumentUpToDateness.Later; - } - - if ( - (record.metadata?.parentVersionId ?? 0) < document.vault_update_id - ) { - return DocumentUpToDateness.Later; - } else if ( - document.vault_update_id < (record.metadata?.parentVersionId ?? 0) - ) { - // the document of the cursor must be from the past - return DocumentUpToDateness.Prior; - } - - const currentContent = await this.fileOperations.read( - document.relative_path - ); - - return this.database.getLatestDocumentByRelativePath( - document.relative_path - )?.metadata?.hash === hash(currentContent) - ? DocumentUpToDateness.UpToDate - : DocumentUpToDateness.Prior; + this.cursorTracker.addRemoteCursorsUpdateListener(listener); } } diff --git a/frontend/sync-client/src/sync-operations/cursor-tracker.ts b/frontend/sync-client/src/sync-operations/cursor-tracker.ts new file mode 100644 index 00000000..17f166c4 --- /dev/null +++ b/frontend/sync-client/src/sync-operations/cursor-tracker.ts @@ -0,0 +1,253 @@ +import type { FileOperations } from "../file-operations/file-operations"; +import type { Database, RelativePath } from "../persistence/database"; +import type { ClientCursors } from "../services/types/ClientCursors"; +import type { CursorSpan } from "../services/types/CursorSpan"; +import type { DocumentWithCursors } from "../services/types/DocumentWithCursors"; +import type { WebSocketManager } from "../services/websocket-manager"; +import type { MaybeOutdatedClientCursors } from "../types/maybe-outdated-client-cursors"; +import { DocumentUpToDateness } from "../types/document-up-to-dateness"; +import { hash } from "../utils/hash"; +import type { FileChangeNotifier } from "./file-change-notifier"; +import { Lock } from "../utils/locks"; + +// Cursor positions are updated separately from documents. However, a given cursor position is only +// valid within a certain version of the document it belongs to. This class tracks previous and the latest +// known remote cursor positions, and for each document, tries to return the latest cursor positions that are +// not from the future. +export class CursorTracker { + private readonly updateLock = new Lock(); + + private knownRemoteCursors: (ClientCursors & { + upToDateness: DocumentUpToDateness; + })[] = []; + + private lastLocalCursorState: DocumentWithCursors[] = []; + private lastLocalCursorStateWithoutDirtyDocuments: DocumentWithCursors[] = + []; + + public constructor( + private readonly database: Database, + private readonly webSocketManager: WebSocketManager, + private readonly fileOperations: FileOperations, + private readonly fileChangeNotifier: FileChangeNotifier + ) { + this.webSocketManager.addRemoteCursorsUpdateListener( + async (clientCursors) => { + await this.updateLock.withLock(async () => { + // The latest message will contain all active clients, so we can delete the ones + // from the local list which are no longer active. + const allIds = new Set( + clientCursors.map((c) => c.deviceId) + ); + const updatedKnownRemoteCursors = + this.knownRemoteCursors.filter((c) => + allIds.has(c.deviceId) + ); + + for (const cursor of clientCursors.filter((client) => + client.documentsWithCursors.every( + (doc) => doc.vault_update_id != null + ) + )) { + updatedKnownRemoteCursors.push({ + ...cursor, + upToDateness: + await this.getDocumentsUpToDateness(cursor) + }); + } + + this.knownRemoteCursors = updatedKnownRemoteCursors; + }); + } + ); + + this.fileChangeNotifier.addFileChangeListener(async (relativePath) => + this.updateLock.withLock(async () => { + for (const clientCursor of this.knownRemoteCursors) { + if ( + clientCursor.documentsWithCursors.some( + (document) => + document.relative_path === relativePath + ) + ) { + clientCursor.upToDateness = + await this.getDocumentsUpToDateness(clientCursor); + } + } + }) + ); + } + + /// Update the local cursors for the given documents. + /// Can be called frequently as it only emits an event + /// if the state has actually changed. + public async sendLocalCursorsToServer( + documentToCursors: Record + ): Promise { + const documentsWithCursors: DocumentWithCursors[] = []; + + for (const [relativePath, cursors] of Object.entries( + documentToCursors + )) { + const record = + this.database.getLatestDocumentByRelativePath(relativePath); + + if (!record) { + continue; // Let's wait for the file to be created before sending cursors + } + + if (!record.metadata) { + continue; // this is a new document, no need to sync the cursors + } + + documentsWithCursors.push({ + relative_path: relativePath, + document_id: record.documentId, + vault_update_id: record.metadata.parentVersionId, + cursors: cursors.map(({ start, end }) => ({ + start: Math.min(start, end), + end: Math.max(start, end) + })) // the client might send directional selections + }); + } + + if ( + JSON.stringify(this.lastLocalCursorState) === + JSON.stringify(documentsWithCursors) + ) { + // Caching step to avoid reading the edited files all the time + return; + } + this.lastLocalCursorState = documentsWithCursors; + + for (const doc of documentsWithCursors) { + const readContent = await this.fileOperations.read( + doc.relative_path + ); + const record = this.database.getLatestDocumentByRelativePath( + doc.relative_path + ); + if (record?.metadata?.hash !== hash(readContent)) { + doc.vault_update_id = null; + } + } + + if ( + JSON.stringify(this.lastLocalCursorStateWithoutDirtyDocuments) === + JSON.stringify(documentsWithCursors) + ) { + return; + } + + this.lastLocalCursorStateWithoutDirtyDocuments = documentsWithCursors; + + this.webSocketManager.updateLocalCursors({ documentsWithCursors }); + } + + // The returned position may be accurate, if it matches the document version, or outdated, in which case + // the client has to heuristically guess it's current position based on the local edits. + public addRemoteCursorsUpdateListener( + listener: (cursors: MaybeOutdatedClientCursors[]) => unknown + ): void { + // CursorTracker registers its own event listener in the constructor so it must have been called before this + this.webSocketManager.addRemoteCursorsUpdateListener(async () => { + await this.updateLock.withLock(() => + listener(this.getRelevantAndPruneKnownClientCursors()) + ); + }); + } + + private getRelevantAndPruneKnownClientCursors(): MaybeOutdatedClientCursors[] { + const result: MaybeOutdatedClientCursors[] = []; + const included = new Set(); + + const relevantCursors = []; + for (const clientCursors of [...this.knownRemoteCursors].reverse()) { + if (included.has(clientCursors.deviceId)) { + continue; + } + + if (clientCursors.upToDateness == DocumentUpToDateness.Later) { + continue; + } + + result.push({ + ...clientCursors, + isOutdated: + clientCursors.upToDateness == DocumentUpToDateness.Prior + }); + + included.add(clientCursors.deviceId); + relevantCursors.unshift(clientCursors); // to reverse order back to normal + } + + this.knownRemoteCursors = relevantCursors; + + return result; + } + + // We store up-to-dateness on a per-client basis to simplify the implementation. + // An individual client won't have too many documents open at once, so this is a reasonable trade-off. + private async getDocumentsUpToDateness( + clientCursor: ClientCursors + ): Promise { + const results = []; + for (const document of clientCursor.documentsWithCursors) { + results.push(await this.getDocumentUpToDateness(document)); + } + + if ( + results.every((result) => result === DocumentUpToDateness.UpToDate) + ) { + return DocumentUpToDateness.UpToDate; + } + + if ( + results.every( + (result) => + result === DocumentUpToDateness.UpToDate || + result === DocumentUpToDateness.Prior + ) + ) { + return DocumentUpToDateness.Prior; + } + + return DocumentUpToDateness.Later; + } + + private async getDocumentUpToDateness( + document: DocumentWithCursors + ): Promise { + const record = this.database.getLatestDocumentByRelativePath( + document.relative_path + ); + + if (!record) { + // the document of the cursor must be from the future + return DocumentUpToDateness.Later; + } + + if ( + (record.metadata?.parentVersionId ?? 0) < + (document.vault_update_id ?? 0) + ) { + return DocumentUpToDateness.Later; + } else if ( + (document.vault_update_id ?? 0) < + (record.metadata?.parentVersionId ?? 0) + ) { + // the document of the cursor must be from the past + return DocumentUpToDateness.Prior; + } + + const currentContent = await this.fileOperations.read( + document.relative_path + ); + + return this.database.getLatestDocumentByRelativePath( + document.relative_path + )?.metadata?.hash === hash(currentContent) + ? DocumentUpToDateness.UpToDate + : DocumentUpToDateness.Prior; + } +} diff --git a/frontend/sync-client/src/sync-operations/file-change-notifier.ts b/frontend/sync-client/src/sync-operations/file-change-notifier.ts new file mode 100644 index 00000000..8a7af66c --- /dev/null +++ b/frontend/sync-client/src/sync-operations/file-change-notifier.ts @@ -0,0 +1,15 @@ +import type { RelativePath } from "../persistence/database"; + +export class FileChangeNotifier { + private readonly listeners: ((filePath: RelativePath) => unknown)[] = []; + + public addFileChangeListener( + listener: (filePath: RelativePath) => unknown + ): void { + this.listeners.push(listener); + } + + public notifyOfFileChange(filePath: RelativePath): void { + this.listeners.forEach((listener) => listener(filePath)); + } +} diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 7e9301a5..186b9a9b 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -9,7 +9,7 @@ import type { Logger } from "../tracing/logger"; import PQueue from "p-queue"; import { hash } from "../utils/hash"; import { v4 as uuidv4 } from "uuid"; -import type { Settings, SyncSettings } from "../persistence/settings"; +import type { Settings } from "../persistence/settings"; import type { FileOperations } from "../file-operations/file-operations"; import { findMatchingFile } from "../utils/find-matching-file"; import type { UnrestrictedSyncer } from "./unrestricted-syncer"; @@ -27,12 +27,10 @@ export class Syncer { private runningScheduleSyncForOfflineChanges: Promise | undefined; - // eslint-disable-next-line @typescript-eslint/max-params public constructor( - private readonly deviceId: string, private readonly logger: Logger, private readonly database: Database, - private readonly settings: Settings, + settings: Settings, private readonly syncService: SyncService, private readonly operations: FileOperations, private readonly internalSyncer: UnrestrictedSyncer @@ -261,58 +259,77 @@ export class Syncer { remoteVersion.documentId ); - let hasLockToRelease = false; if (document === undefined) { // Let's avoid the same documents getting created in parallel multiple times. // There might be multiple tasks waiting for the lock - await this.remoteDocumentsLock.waitForLock( - remoteVersion.documentId - ); - hasLockToRelease = true; - document = this.database.getDocumentByDocumentId( - remoteVersion.documentId + return this.remoteDocumentsLock.withLock( + remoteVersion.documentId, + async () => { + document = this.database.getDocumentByDocumentId( + remoteVersion.documentId + ); + + // We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile` + if (document === undefined) { + await this.syncQueue.add(async () => + this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( + remoteVersion + ) + ); + } else { + const [promise, resolve, reject] = createPromise(); + + document = + await this.database.getResolvedDocumentByRelativePath( + document.relativePath, + promise + ); + + try { + await this.syncQueue.add(async () => + this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( + remoteVersion, + document + ) + ); + + resolve(); + } catch (e) { + reject(e); + } finally { + this.database.removeDocumentPromise(promise); + } + } + + this.database.addSeenUpdateId(remoteVersion.vaultUpdateId); + } ); } + // We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile` + const [promise, resolve, reject] = createPromise(); + + document = await this.database.getResolvedDocumentByRelativePath( + document.relativePath, + promise + ); + try { - // We're either the first one to get the lock, so we have to create the document in `unrestrictedSyncRemotelyUpdatedFile` - if (document === undefined) { - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( - remoteVersion - ) - ); - } else { - const [promise, resolve, reject] = createPromise(); + await this.syncQueue.add(async () => + this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( + remoteVersion, + document + ) + ); - document = - await this.database.getResolvedDocumentByRelativePath( - document.relativePath, - promise - ); - - try { - await this.syncQueue.add(async () => - this.internalSyncer.unrestrictedSyncRemotelyUpdatedFile( - remoteVersion, - document - ) - ); - - resolve(); - } catch (e) { - reject(e); - } finally { - this.database.removeDocumentPromise(promise); - } - } - - this.database.addSeenUpdateId(remoteVersion.vaultUpdateId); + resolve(); + } catch (e) { + reject(e); } finally { - if (hasLockToRelease) { - this.remoteDocumentsLock.unlock(remoteVersion.documentId); - } + this.database.removeDocumentPromise(promise); } + + this.database.addSeenUpdateId(remoteVersion.vaultUpdateId); } private async internalScheduleSyncForOfflineChanges(): Promise { diff --git a/frontend/sync-client/src/types/document-update-status.ts b/frontend/sync-client/src/types/document-sync-status.ts similarity index 59% rename from frontend/sync-client/src/types/document-update-status.ts rename to frontend/sync-client/src/types/document-sync-status.ts index 7fa1c888..a2ec01c2 100644 --- a/frontend/sync-client/src/types/document-update-status.ts +++ b/frontend/sync-client/src/types/document-sync-status.ts @@ -1,4 +1,4 @@ -export enum DocumentUpdateStatus { +export enum DocumentSyncStatus { UP_TO_DATE = "UP_TO_DATE", SYNCING = "SYNCING" } diff --git a/frontend/sync-client/src/types/document-up-to-dateness.ts b/frontend/sync-client/src/types/document-up-to-dateness.ts new file mode 100644 index 00000000..2f93f9b4 --- /dev/null +++ b/frontend/sync-client/src/types/document-up-to-dateness.ts @@ -0,0 +1,5 @@ +export enum DocumentUpToDateness { + UpToDate = "UpToDate", // easiest case, the client can just show the cursors as-is + Prior = "Prior", // The cursors are outdated, so the client has to guess the cursor positions based on local updates. This is only possible if this client's cursor has once been up-to-date in a given document. + Later = "Later" // The cursors are from a future version of a document, there's no way we can accuratly show them locally. +} diff --git a/frontend/sync-client/src/types/maybe-outdated-client-cursors.ts b/frontend/sync-client/src/types/maybe-outdated-client-cursors.ts index acced952..e062f84e 100644 --- a/frontend/sync-client/src/types/maybe-outdated-client-cursors.ts +++ b/frontend/sync-client/src/types/maybe-outdated-client-cursors.ts @@ -1,5 +1,5 @@ import type { ClientCursors } from "../services/types/ClientCursors"; -export interface DocumentWithMaybeOutdatedClientCursors extends ClientCursors { +export interface MaybeOutdatedClientCursors extends ClientCursors { isOutdated: boolean; } diff --git a/frontend/sync-client/src/utils/create-promise.ts b/frontend/sync-client/src/utils/create-promise.ts index 959183f1..3099f0da 100644 --- a/frontend/sync-client/src/utils/create-promise.ts +++ b/frontend/sync-client/src/utils/create-promise.ts @@ -1,17 +1,25 @@ +type ResolveFunction = undefined extends T + ? (value?: T) => unknown + : (value: T) => unknown; + /** * A type-safe utility function to create a Promise with resolve and reject functions. * @returns A tuple containing a Promise, a resolve function, and a reject function. */ export function createPromise(): [ Promise, - (value: T) => unknown, + ResolveFunction, (error: unknown) => unknown ] { - let resolve: undefined | ((resolved: T) => unknown) = undefined; + let resolve: undefined | ResolveFunction = undefined; let reject: undefined | ((error: unknown) => unknown) = undefined; const creationPromise = new Promise( - (resolve_, reject_) => ((resolve = resolve_), (reject = reject_)) + (resolve_, reject_) => + ( + // eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion + (resolve = resolve_ as ResolveFunction), (reject = reject_) + ) ); // eslint-disable-next-line @typescript-eslint/no-non-null-assertion diff --git a/frontend/sync-client/src/utils/locks.test.ts b/frontend/sync-client/src/utils/locks.test.ts index 33d99da9..1e6bd38b 100644 --- a/frontend/sync-client/src/utils/locks.test.ts +++ b/frontend/sync-client/src/utils/locks.test.ts @@ -2,8 +2,9 @@ import { Logger } from "../tracing/logger"; import type { RelativePath } from "../persistence/database"; import { Locks } from "./locks"; -describe("Document lock", () => { +describe("withLock", () => { const testPath: RelativePath = "test/document/path"; + const testPath2: RelativePath = "test/document/path2"; const logger = new Logger(); // eslint-disable-next-line @typescript-eslint/init-declarations @@ -13,77 +14,211 @@ describe("Document lock", () => { locks = new Locks(logger); }); - test("should lock a document successfully", () => { - const result = locks.tryLock(testPath); - expect(result).toBe(true); - }); - - test("should not lock a document that is already locked", () => { - locks.tryLock(testPath); - const result = locks.tryLock(testPath); - expect(result).toBe(false); - }); - - test("should unlock a locked document", () => { - locks.tryLock(testPath); - locks.unlock(testPath); - const result = locks.tryLock(testPath); - expect(result).toBe(true); - locks.unlock(testPath); - }); - - test("should throw an error when unlocking a document that is not locked", () => { - expect(() => { - locks.unlock(testPath); - }).toThrow(`Key '${testPath}' is not locked, cannot unlock`); - }); - - test("should wait for a document lock and resolve when unlocked", async () => { - locks.tryLock(testPath); - - let resolved = false; - const waitPromise = locks.waitForLock(testPath).then(() => { - resolved = true; + test("should execute function with single key lock", async () => { + let executionCount = 0; + const result = await locks.withLock(testPath, () => { + executionCount++; + return "success"; }); - locks.unlock(testPath); - await waitPromise; - - expect(resolved).toBe(true); + expect(result).toBe("success"); + expect(executionCount).toBe(1); }); - test("should resolve multiple waiters in FIFO order", async () => { - locks.tryLock(testPath); - - let firstResolved = false; - let secondResolved = false; - let thirdResolved = false; - - const firstWaitPromise = locks.waitForLock(testPath).then(() => { - firstResolved = true; + test("should execute async function with single key lock", async () => { + let executionCount = 0; + const result = await locks.withLock(testPath, async () => { + executionCount++; + await new Promise((resolve) => setTimeout(resolve, 10)); + return "async-success"; }); - const secondWaitPromise = locks.waitForLock(testPath).then(() => { - secondResolved = true; + expect(result).toBe("async-success"); + expect(executionCount).toBe(1); + }); + + test("should execute function with multiple key locks", async () => { + let executionCount = 0; + const result = await locks.withLock([testPath, testPath2], () => { + executionCount++; + return "multi-success"; }); - const thirdWaitPromise = locks.waitForLock(testPath).then(() => { - thirdResolved = true; + expect(result).toBe("multi-success"); + expect(executionCount).toBe(1); + }); + + test("should sort multiple keys to prevent deadlocks", async () => { + const executionOrder: string[] = []; + + // Start two concurrent operations with keys in different orders + const promise1 = locks.withLock([testPath2, testPath], async () => { + executionOrder.push("operation1-start"); + await new Promise((resolve) => setTimeout(resolve, 50)); + executionOrder.push("operation1-end"); + return "result1"; }); - locks.unlock(testPath); - await firstWaitPromise; - expect(firstResolved).toBe(true); - expect(secondResolved).toBe(false); - expect(thirdResolved).toBe(false); + const promise2 = locks.withLock([testPath, testPath2], async () => { + executionOrder.push("operation2-start"); + await new Promise((resolve) => setTimeout(resolve, 50)); + executionOrder.push("operation2-end"); + return "result2"; + }); - locks.unlock(testPath); - await secondWaitPromise; - expect(secondResolved).toBe(true); - expect(thirdResolved).toBe(false); + const [result1, result2] = await Promise.all([promise1, promise2]); - locks.unlock(testPath); - await thirdWaitPromise; - expect(thirdResolved).toBe(true); + expect(result1).toBe("result1"); + expect(result2).toBe("result2"); + // One operation should complete entirely before the other starts + expect(executionOrder).toEqual([ + "operation1-start", + "operation1-end", + "operation2-start", + "operation2-end" + ]); + }); + + test("should serialize access to same key", async () => { + const executionOrder: string[] = []; + + const promise1 = locks.withLock(testPath, async () => { + executionOrder.push("operation1-start"); + await new Promise((resolve) => setTimeout(resolve, 50)); + executionOrder.push("operation1-end"); + return "result1"; + }); + + const promise2 = locks.withLock(testPath, async () => { + executionOrder.push("operation2-start"); + await new Promise((resolve) => setTimeout(resolve, 30)); + executionOrder.push("operation2-end"); + return "result2"; + }); + + const [result1, result2] = await Promise.all([promise1, promise2]); + + expect(result1).toBe("result1"); + expect(result2).toBe("result2"); + expect(executionOrder).toEqual([ + "operation1-start", + "operation1-end", + "operation2-start", + "operation2-end" + ]); + }); + + test("should allow concurrent access to different keys", async () => { + const executionOrder: string[] = []; + + const promise1 = locks.withLock(testPath, async () => { + executionOrder.push("operation1-start"); + await new Promise((resolve) => setTimeout(resolve, 50)); + executionOrder.push("operation1-end"); + return "result1"; + }); + + const promise2 = locks.withLock(testPath2, async () => { + executionOrder.push("operation2-start"); + await new Promise((resolve) => setTimeout(resolve, 30)); + executionOrder.push("operation2-end"); + return "result2"; + }); + + const [result1, result2] = await Promise.all([promise1, promise2]); + + expect(result1).toBe("result1"); + expect(result2).toBe("result2"); + // Both operations should run concurrently + expect(executionOrder[0]).toBe("operation1-start"); + expect(executionOrder[1]).toBe("operation2-start"); + }); + + test("should release locks even if function throws", async () => { + const error = new Error("test error"); + + await expect( + locks.withLock(testPath, () => { + throw error; + }) + ).rejects.toThrow("test error"); + + // Lock should be released, allowing another operation + const result = await locks.withLock( + testPath, + () => "success-after-error" + ); + expect(result).toBe("success-after-error"); + }); + + test("should release locks even if async function throws", async () => { + const error = new Error("async test error"); + + await expect( + locks.withLock(testPath, async () => { + await new Promise((resolve) => setTimeout(resolve, 10)); + throw error; + }) + ).rejects.toThrow("async test error"); + + // Lock should be released, allowing another operation + const result = await locks.withLock( + testPath, + () => "success-after-async-error" + ); + expect(result).toBe("success-after-async-error"); + }); + + test("should handle empty array of keys", async () => { + const result = await locks.withLock([], () => "empty-keys"); + expect(result).toBe("empty-keys"); + }); + + test("should maintain FIFO order for multiple waiters", async () => { + const executionOrder: string[] = []; + + // Start first operation that holds the lock + const firstPromise = locks.withLock(testPath, async () => { + executionOrder.push("first-start"); + await new Promise((resolve) => setTimeout(resolve, 100)); + executionOrder.push("first-end"); + return "first"; + }); + + // Small delay to ensure first operation starts + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Queue second and third operations + const secondPromise = locks.withLock(testPath, async () => { + executionOrder.push("second-start"); + await new Promise((resolve) => setTimeout(resolve, 30)); + executionOrder.push("second-end"); + return "second"; + }); + + const thirdPromise = locks.withLock(testPath, async () => { + executionOrder.push("third-start"); + await new Promise((resolve) => setTimeout(resolve, 20)); + executionOrder.push("third-end"); + return "third"; + }); + + const [first, second, third] = await Promise.all([ + firstPromise, + secondPromise, + thirdPromise + ]); + + expect(first).toBe("first"); + expect(second).toBe("second"); + expect(third).toBe("third"); + expect(executionOrder).toEqual([ + "first-start", + "first-end", + "second-start", + "second-end", + "third-start", + "third-end" + ]); }); }); diff --git a/frontend/sync-client/src/utils/locks.ts b/frontend/sync-client/src/utils/locks.ts index 77b3b767..e09da236 100644 --- a/frontend/sync-client/src/utils/locks.ts +++ b/frontend/sync-client/src/utils/locks.ts @@ -13,7 +13,54 @@ export class Locks { /** Queue of resolve functions waiting for each key */ private readonly waiters = new Map unknown)[]>(); - public constructor(private readonly logger: Logger) {} + public constructor(private readonly logger?: Logger) {} + + /** + * Executes a function while holding exclusive locks on one or more keys. + * + * This method ensures that the provided function runs with exclusive access to the + * specified key(s). Multiple keys are sorted to prevent deadlocks when different + * operations request the same keys in different orders. + * + * @template R The return type of the function to execute + * @param keyOrKeys A single key or array of keys to lock during function execution + * @param fn The function to execute while holding the lock(s). Can be sync or async. + * @returns A Promise that resolves to the return value of the executed function + * + * @example + * ```typescript + * // Lock a single key + * const result = await locks.withLock('file1', () => { + * // Critical section - only one operation can access 'file1' at a time + * return processFile('file1'); + * }); + * + * // Lock multiple keys (prevents deadlocks through consistent ordering) + * await locks.withLock(['file1', 'file2'], async () => { + * // Critical section - exclusive access to both files + * await moveFile('file1', 'file2'); + * }); + * ``` + * + * @throws Any error thrown by the provided function will be propagated after locks are released + */ + public async withLock( + keyOrKeys: T | T[], + fn: () => R | Promise + ): Promise { + const keys = Array.isArray(keyOrKeys) ? keyOrKeys : [keyOrKeys]; + keys.sort((a, b) => String(a).localeCompare(String(b))); // Ensure consistent order to prevent deadlocks + + await Promise.all(keys.map(async (key) => this.waitForLock(key))); + + try { + return await fn(); + } finally { + keys.forEach((key) => { + this.unlock(key); + }); + } + } /** * Attempts to acquire a lock immediately without waiting. @@ -22,7 +69,7 @@ export class Locks { * @param key The key to lock * @returns `true` if lock acquired, `false` if already locked */ - public tryLock(key: T): boolean { + private tryLock(key: T): boolean { if (this.locked.has(key)) { return false; } @@ -39,12 +86,12 @@ export class Locks { * @param key The key to wait for and lock * @returns Promise that resolves when lock is acquired */ - public async waitForLock(key: T): Promise { + private async waitForLock(key: T): Promise { if (this.tryLock(key)) { return Promise.resolve(); } - this.logger.debug(`Waiting for lock on ${key}`); + this.logger?.debug(`Waiting for lock on ${key}`); return new Promise((resolve) => { // DefaultDict behavior @@ -65,7 +112,7 @@ export class Locks { * @param key The key to unlock * @throws {Error} If key is not currently locked */ - public unlock(key: T): void { + private unlock(key: T): void { if (!this.locked.has(key)) { throw new Error(`Key '${key}' is not locked, cannot unlock`); } @@ -74,19 +121,22 @@ export class Locks { const nextWaiting = this.waiters.get(key)?.shift(); if (nextWaiting) { - this.logger.debug(`Granted lock on ${key}`); + this.logger?.debug(`Granted lock on ${key}`); nextWaiting(); } else { this.locked.delete(key); } } +} - /** - * Clears all locks and waiters. Causes waiting operations to hang indefinitely. - * Use with caution. - */ - public reset(): void { - this.locked.clear(); - this.waiters.clear(); +export class Lock { + private readonly locks: Locks; + + public constructor(logger?: Logger) { + this.locks = new Locks(logger); + } + + public async withLock(fn: () => R | Promise): Promise { + return this.locks.withLock(true, fn); } } diff --git a/frontend/test-client/src/agent/mock-client.ts b/frontend/test-client/src/agent/mock-client.ts index 2833ba29..3ef55c8f 100644 --- a/frontend/test-client/src/agent/mock-client.ts +++ b/frontend/test-client/src/agent/mock-client.ts @@ -37,7 +37,7 @@ export class MockClient implements FileSystemOperations { fs: this, persistence: { load: async () => this.data, - save: async (data) => (this.data = data) + save: async (data) => void (this.data = data) }, fetch: fetchImplementation, webSocket: webSocketImplementation diff --git a/frontend/test-client/src/utils/flaky-websocket-factory.ts b/frontend/test-client/src/utils/flaky-websocket-factory.ts index 6a146de2..c2c13525 100644 --- a/frontend/test-client/src/utils/flaky-websocket-factory.ts +++ b/frontend/test-client/src/utils/flaky-websocket-factory.ts @@ -25,15 +25,18 @@ export function flakyWebSocketFactory( public set onmessage(callback: (event: MessageEvent) => void) { super.onmessage = async (event: MessageEvent): Promise => { - await this.locks.waitForLock(FlakyWebSocket.RECEIVE_KEY); + return this.locks.withLock( + FlakyWebSocket.RECEIVE_KEY, + async () => { + if (jitterScaleInSeconds > 0) { + await sleep( + Math.random() * jitterScaleInSeconds * 1000 + ); + } - if (jitterScaleInSeconds > 0) { - await sleep(Math.random() * jitterScaleInSeconds * 1000); - } - - callback(event); - - this.locks.unlock(FlakyWebSocket.RECEIVE_KEY); + callback(event); + } + ); }; } @@ -67,15 +70,13 @@ export function flakyWebSocketFactory( data: string | ArrayBufferLike | Blob | ArrayBufferView ): Promise { // maintain message order - await this.locks.waitForLock(FlakyWebSocket.SEND_KEY); + return this.locks.withLock(FlakyWebSocket.SEND_KEY, async () => { + if (jitterScaleInSeconds > 0) { + await sleep(Math.random() * jitterScaleInSeconds * 1000); + } - if (jitterScaleInSeconds > 0) { - await sleep(Math.random() * jitterScaleInSeconds * 1000); - } - - super.send(data); - - this.locks.unlock(FlakyWebSocket.SEND_KEY); + super.send(data); + }); } } as unknown as typeof WebSocket; } diff --git a/scripts/check.sh b/scripts/check.sh new file mode 100755 index 00000000..f807d2c8 --- /dev/null +++ b/scripts/check.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash + +set -e + +echo "Running checks in sync-server" +cd sync-server +cargo clippy --all-targets --all-features +cargo fmt --all -- --check +cargo machete +cargo test --verbose + +echo "Running checks in frontend" +cd ../frontend +npm ci +npm run build +npm run lint +npm run test + +if [[ $(git status --porcelain) ]]; then + git status --porcelain + echo "Failing CI because the working directory is not clean after linting" + exit 1 +fi + +echo "Success" + +cd .. diff --git a/sync-server/Cargo.toml b/sync-server/Cargo.toml index 94539a48..cbbaa14c 100644 --- a/sync-server/Cargo.toml +++ b/sync-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sync_server" -rust-version = "1.87.0" +rust-version = "1.89.0" authors = ["Andras Schmelczer "] edition = "2024" license = "MIT" diff --git a/sync-server/rust-toolchain.toml b/sync-server/rust-toolchain.toml index 0d5c6104..ed32db00 100644 --- a/sync-server/rust-toolchain.toml +++ b/sync-server/rust-toolchain.toml @@ -1,4 +1,4 @@ [toolchain] -channel = "nightly-2025-06-06" +channel = "1.89.0" targets = [ "x86_64-unknown-linux-gnu", "x86_64-unknown-linux-musl" ] profile = "default" diff --git a/sync-server/rustfmt.toml b/sync-server/rustfmt.toml deleted file mode 100644 index 6640f544..00000000 --- a/sync-server/rustfmt.toml +++ /dev/null @@ -1,8 +0,0 @@ -imports_granularity = "crate" -condense_wildcard_suffixes = true -fn_single_line = true -format_strings = true -reorder_impl_items = true -group_imports = "StdExternalCrate" -use_field_init_shorthand = true -wrap_comments=true diff --git a/sync-server/src/app_state/cursors.rs b/sync-server/src/app_state/cursors.rs index 1e6509c7..d083e1ac 100644 --- a/sync-server/src/app_state/cursors.rs +++ b/sync-server/src/app_state/cursors.rs @@ -47,7 +47,7 @@ impl Cursors { all_device_cursors.retain(|c| &c.client_cursors.device_id != device_id); all_device_cursors.push(ClientCursorsWithTimeToLive::new(ClientCursors { user_name, - device_id: device_id.to_string(), + device_id: device_id.clone(), documents_with_cursors: document_to_cursors, })); @@ -126,5 +126,7 @@ impl ClientCursorsWithTimeToLive { } } - pub fn is_expired(&self, ttl: Duration) -> bool { self.last_updated.elapsed() > ttl } + pub fn is_expired(&self, ttl: Duration) -> bool { + self.last_updated.elapsed() > ttl + } } diff --git a/sync-server/src/app_state/database/models.rs b/sync-server/src/app_state/database/models.rs index 7796f627..24c0c370 100644 --- a/sync-server/src/app_state/database/models.rs +++ b/sync-server/src/app_state/database/models.rs @@ -23,7 +23,9 @@ pub struct StoredDocumentVersion { } impl PartialEq for StoredDocumentVersion { - fn eq(&self, other: &Self) -> bool { self.vault_update_id == other.vault_update_id } + fn eq(&self, other: &Self) -> bool { + self.vault_update_id == other.vault_update_id + } } #[derive(TS, Debug, Clone, Serialize)] diff --git a/sync-server/src/app_state/websocket/models.rs b/sync-server/src/app_state/websocket/models.rs index ed61177c..e037fb7e 100644 --- a/sync-server/src/app_state/websocket/models.rs +++ b/sync-server/src/app_state/websocket/models.rs @@ -23,8 +23,13 @@ pub struct CursorPositionFromClient { #[derive(TS, Serialize, Deserialize, Clone, Debug)] pub struct DocumentWithCursors { - #[ts(as = "u32")] - pub vault_update_id: VaultUpdateId, + // It's None in case the document is dirty. + // We still want to sync the cursor to mark + // that it exists and can be client-side + // interpolated. However, the actual + // position is meaningless. + #[ts(as = "Option")] + pub vault_update_id: Option, pub document_id: DocumentId, pub relative_path: String, diff --git a/sync-server/src/server.rs b/sync-server/src/server.rs index 3f659c97..cddcc1b5 100644 --- a/sync-server/src/server.rs +++ b/sync-server/src/server.rs @@ -179,6 +179,10 @@ async fn shutdown_signal() { } } -async fn handle_404() -> impl IntoResponse { not_found_error(anyhow!("Page not found")) } +async fn handle_404() -> impl IntoResponse { + not_found_error(anyhow!("Page not found")) +} -async fn handle_405() -> impl IntoResponse { client_error(anyhow!("Method not allowed")) } +async fn handle_405() -> impl IntoResponse { + client_error(anyhow!("Method not allowed")) +} diff --git a/sync-server/src/server/device_id_header.rs b/sync-server/src/server/device_id_header.rs index be36c8d8..af9d6413 100644 --- a/sync-server/src/server/device_id_header.rs +++ b/sync-server/src/server/device_id_header.rs @@ -6,7 +6,9 @@ pub struct DeviceIdHeader(pub String); pub static DEVICE_ID_HEADER_NAME: HeaderName = HeaderName::from_static("device-id"); impl Header for DeviceIdHeader { - fn name() -> &'static HeaderName { &DEVICE_ID_HEADER_NAME } + fn name() -> &'static HeaderName { + &DEVICE_ID_HEADER_NAME + } fn decode<'i, I>(values: &mut I) -> Result where @@ -26,7 +28,7 @@ impl Header for DeviceIdHeader { where E: Extend, { - let value = HeaderValue::from_static(Box::leak(self.0.to_string().into_boxed_str())); + let value = HeaderValue::from_static(Box::leak(self.0.clone().into_boxed_str())); values.extend(std::iter::once(value)); } diff --git a/sync-server/src/utils/normalize.rs b/sync-server/src/utils/normalize.rs index adb83ac1..6553dd25 100644 --- a/sync-server/src/utils/normalize.rs +++ b/sync-server/src/utils/normalize.rs @@ -8,4 +8,6 @@ where Ok(normalize_string(&s)) } -pub fn normalize_string(s: &str) -> String { s.trim().to_lowercase() } +pub fn normalize_string(s: &str) -> String { + s.trim().to_lowercase() +}