diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index cf890830..9aa71fb4 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -5,6 +5,7 @@ on: branches: ["main"] pull_request: branches: ["main"] + workflow_dispatch: env: CARGO_TERM_COLOR: always @@ -31,12 +32,5 @@ jobs: toolchain: "1.89.0" components: clippy, rustfmt - - name: Setup rust - run: | - which sqlx || cargo install sqlx-cli - cd sync-server - sqlx database create --database-url sqlite://db.sqlite3 - sqlx migrate run --source src/app_state/database/migrations --database-url sqlite://db.sqlite3 - - name: Lint & test run: scripts/check.sh diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 0e437cbd..7d0a2a0f 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -7,6 +7,7 @@ on: branches: ["main"] schedule: - cron: '0 * * * *' + workflow_dispatch: concurrency: group: e2e-tests @@ -47,9 +48,24 @@ jobs: run: | cd sync-server cargo run config-e2e.yml --color never & + SERVER_PID=$! cd .. scripts/e2e.sh 8 + EXIT_CODE=$? + + kill $SERVER_PID 2>/dev/null || true + wait $SERVER_PID 2>/dev/null || true + + exit $EXIT_CODE + + - name: Upload e2e logs + if: always() + uses: actions/upload-artifact@v4 + with: + name: e2e-logs + path: logs/ + retention-days: 30 - name: Cleanup if: always() diff --git a/.github/workflows/publish-plugin.yml b/.github/workflows/publish-plugin.yml index 9e74c60d..92dd199b 100644 --- a/.github/workflows/publish-plugin.yml +++ b/.github/workflows/publish-plugin.yml @@ -3,8 +3,6 @@ name: Publish Obsidian plugin on: push: tags: ["*"] - pull_request: - branches: ["main"] env: CARGO_TERM_COLOR: always diff --git a/frontend/local-client-cli/src/cli.ts b/frontend/local-client-cli/src/cli.ts index 0f8262f7..48fd8954 100644 --- a/frontend/local-client-cli/src/cli.ts +++ b/frontend/local-client-cli/src/cli.ts @@ -35,6 +35,8 @@ const LOG_LEVEL_ORDER = { [LogLevel.ERROR]: 3 }; +const HEALTH_CHECK_INTERVAL_MS = 30 * 1000; + async function main(): Promise { const args = parseArgs(process.argv); const absolutePath = path.resolve(args.localPath); @@ -147,7 +149,7 @@ async function main(): Promise { void client.checkConnection().then((status) => { writeHealthStatus(healthFile, status); }); - }, 30 * 1000); // every 30 seconds + }, HEALTH_CHECK_INTERVAL_MS); const clearHealthInterval = (): void => { clearInterval(healthInterval); }; diff --git a/frontend/sync-client/src/services/websocket-manager.ts b/frontend/sync-client/src/services/websocket-manager.ts index f8dc59d4..09787bce 100644 --- a/frontend/sync-client/src/services/websocket-manager.ts +++ b/frontend/sync-client/src/services/websocket-manager.ts @@ -188,6 +188,14 @@ export class WebSocketManager { this.webSocket = new this.webSocketFactoryImplementation(wsUri); this.webSocket.onopen = (): void => { + // Check if we've been stopped while connecting + if (this.isStopped) { + this.webSocket?.close( + 1000, + "WebSocketManager was stopped during connection" + ); + return; + } this.logger.info("WebSocket connection opened"); this.onWebSocketStatusChanged.trigger(true); }; diff --git a/frontend/sync-client/src/sync-client.ts b/frontend/sync-client/src/sync-client.ts index 1544a1e0..2a272c86 100644 --- a/frontend/sync-client/src/sync-client.ts +++ b/frontend/sync-client/src/sync-client.ts @@ -34,6 +34,8 @@ export class SyncClient { private hasStarted = false; private hasBeenDestroyed = false; private unloadTelemetry?: () => void; + private isDestroying = false; + private readonly eventUnsubscribers: (() => void)[] = []; private constructor( private readonly history: SyncHistory, @@ -159,11 +161,6 @@ export class SyncClient { settings.getSettings().isSyncEnabled, logger ); - settings.onSettingsChanged.add((newSettings, oldSettings) => { - if (oldSettings.isSyncEnabled != newSettings.isSyncEnabled) { - fetchController.canFetch = newSettings.isSyncEnabled; - } - }); const syncService = new SyncService( deviceId, @@ -258,13 +255,27 @@ export class SyncClient { this.unloadTelemetry = setUpTelemetry(); } - this.logger.onLogEmitted.add((log): void => { - if (log.level === LogLevel.ERROR && Sentry.isInitialized()) { - Sentry.captureMessage(log.message); - } - }); + this.eventUnsubscribers.push( + this.settings.onSettingsChanged.add((newSettings, oldSettings) => { + if (oldSettings.isSyncEnabled != newSettings.isSyncEnabled) { + this.fetchController.canFetch = newSettings.isSyncEnabled; + } + }) + ); - this.settings.onSettingsChanged.add(this.onSettingsChange.bind(this)); + this.eventUnsubscribers.push( + this.logger.onLogEmitted.add((log): void => { + if (log.level === LogLevel.ERROR && Sentry.isInitialized()) { + Sentry.captureMessage(log.message); + } + }) + ); + + this.eventUnsubscribers.push( + this.settings.onSettingsChanged.add( + this.onSettingsChange.bind(this) + ) + ); if (this.settings.getSettings().isSyncEnabled) { this.logger.info("Starting SyncClient"); @@ -431,6 +442,15 @@ export class SyncClient { public async destroy(): Promise { this.checkIfDestroyed("destroy"); + // Prevent concurrent destroy calls + if (this.isDestroying) { + this.logger.warn( + "destroy() called while already destroying, ignoring" + ); + return; + } + this.isDestroying = true; + // cancel everything that's in progress await this.pause(); @@ -438,6 +458,12 @@ export class SyncClient { this.resetInMemoryState(); + // Clean up event listeners to prevent memory leaks + this.eventUnsubscribers.forEach((unsubscribe) => { + unsubscribe(); + }); + this.eventUnsubscribers.length = 0; + this.logger.info("SyncClient has been successfully disposed"); this.unloadTelemetry?.(); diff --git a/frontend/sync-client/src/sync-operations/syncer.ts b/frontend/sync-client/src/sync-operations/syncer.ts index 78cef699..71dedd85 100644 --- a/frontend/sync-client/src/sync-operations/syncer.ts +++ b/frontend/sync-client/src/sync-operations/syncer.ts @@ -264,7 +264,7 @@ export class Syncer { public async waitUntilFinished(): Promise { await this.runningScheduleSyncForOfflineChanges; - await this.syncQueue.onEmpty(); + await this.syncQueue.onIdle(); // Wait for queue to be empty and running tasks to finish } public async syncRemotelyUpdatedFile( diff --git a/frontend/sync-client/src/utils/data-structures/locks.test.ts b/frontend/sync-client/src/utils/data-structures/locks.test.ts index 0c09c062..9beb867a 100644 --- a/frontend/sync-client/src/utils/data-structures/locks.test.ts +++ b/frontend/sync-client/src/utils/data-structures/locks.test.ts @@ -5,6 +5,7 @@ import type { RelativePath } from "../../persistence/database"; import { Locks } from "./locks"; import { awaitAll } from "../await-all"; import { sleep } from "../sleep"; +import { SyncResetError } from "../../services/sync-reset-error"; describe("withLock", () => { const testPath: RelativePath = "test/document/path"; @@ -230,3 +231,62 @@ describe("withLock", () => { ]); }); }); + +describe("reset", () => { + const testPath: RelativePath = "test/document/path"; + const logger = new Logger(); + + // eslint-disable-next-line @typescript-eslint/init-declarations + let locks: Locks; + + beforeEach(() => { + locks = new Locks(logger); + }); + + it("should reject pending waiters with SyncResetError while running operation completes", async () => { + const firstPromise = locks.withLock(testPath, async () => { + await sleep(2); + return "first"; + }); + + await sleep(1); + + const secondPromise = locks.withLock(testPath, async () => "second"); + void secondPromise.catch(() => {}); // eslint-disable-line @typescript-eslint/no-empty-function + + locks.reset(); + + assert.strictEqual(await firstPromise, "first"); + + await assert.rejects(secondPromise, (err: Error) => { + assert.ok(err instanceof SyncResetError); + return true; + }); + }); + + it("should allow locks to work normally after reset", async () => { + const firstPromise = locks.withLock(testPath, async () => { + await sleep(1); + return "first"; + }); + + await sleep(1); + + const secondPromise = locks.withLock(testPath, async () => "second"); + void secondPromise.catch(() => {}); // eslint-disable-line @typescript-eslint/no-empty-function + + locks.reset(); + + await firstPromise; + + const result = await locks.withLock(testPath, () => "after-reset"); + assert.strictEqual(result, "after-reset"); + }); + + it("should handle reset with no pending operations", async () => { + locks.reset(); + + const result = await locks.withLock(testPath, () => "success"); + assert.strictEqual(result, "success"); + }); +}); diff --git a/frontend/sync-client/src/utils/data-structures/locks.ts b/frontend/sync-client/src/utils/data-structures/locks.ts index 8ad60429..e55c76b0 100644 --- a/frontend/sync-client/src/utils/data-structures/locks.ts +++ b/frontend/sync-client/src/utils/data-structures/locks.ts @@ -1,3 +1,4 @@ +import { SyncResetError } from "../../services/sync-reset-error"; import type { Logger } from "../../tracing/logger"; import { awaitAll } from "../await-all"; @@ -12,7 +13,10 @@ export class Locks { private readonly locked = new Set(); /** Queue of resolve functions waiting for each key */ - private readonly waiters = new Map unknown)[]>(); + private readonly waiters = new Map< + T, + [() => unknown, (err: unknown) => unknown][] + >(); public constructor(private readonly logger?: Logger) {} @@ -67,6 +71,13 @@ export class Locks { } public reset(): void { + // Resolve all waiting promises before clearing to prevent deadlock + // Any operation waiting for a lock will be granted access immediately + for (const waiting of this.waiters.values()) { + for (const [_, reject] of waiting) { + reject(new SyncResetError()); + } + } this.locked.clear(); this.waiters.clear(); } @@ -102,7 +113,7 @@ export class Locks { this.logger?.debug(`Waiting for lock on ${key}`); - return new Promise((resolve) => { + return new Promise((resolve, reject) => { // DefaultDict behavior let waiting = this.waiters.get(key); if (!waiting) { @@ -110,7 +121,7 @@ export class Locks { this.waiters.set(key, waiting); } - waiting.push(resolve); + waiting.push([resolve, reject]); }); } @@ -127,11 +138,11 @@ export class Locks { } // Remove first waiter to ensure FIFO order - const nextWaiting = this.waiters.get(key)?.shift(); + const [resolveNextWaiting, _] = this.waiters.get(key)?.shift() ?? []; - if (nextWaiting) { + if (resolveNextWaiting) { this.logger?.debug(`Granted lock on ${key}`); - nextWaiting(); + resolveNextWaiting(); } else { this.locked.delete(key); } diff --git a/scripts/build-docs.sh b/scripts/build-docs.sh index 9f3c76d4..c87144a9 100755 --- a/scripts/build-docs.sh +++ b/scripts/build-docs.sh @@ -2,6 +2,8 @@ set -e +./scripts/utils/check-node.sh + cd docs npm ci diff --git a/scripts/check.sh b/scripts/check.sh index 2a13953a..7c3c87e5 100755 --- a/scripts/check.sh +++ b/scripts/check.sh @@ -8,8 +8,15 @@ if [[ "$1" == "--fix" ]]; then echo "Running in fix mode - will automatically fix linting and formatting issues" fi +./scripts/utils/check-node.sh + echo "Running checks in sync-server" + cd sync-server +which sqlx || cargo install sqlx-cli +sqlx database create --database-url sqlite://db.sqlite3 +sqlx migrate run --source src/app_state/database/migrations --database-url sqlite://db.sqlite3 + cargo test --verbose if [[ "$FIX_MODE" == true ]]; then @@ -51,8 +58,4 @@ fi cd .. -if [[ "$FIX_MODE" == true ]]; then - $0 -else - echo "Success" -fi +echo "Success" diff --git a/scripts/e2e.sh b/scripts/e2e.sh index 49f320a0..6c66e835 100755 --- a/scripts/e2e.sh +++ b/scripts/e2e.sh @@ -6,11 +6,7 @@ set -o pipefail NO_COLOR=1 FORCE_COLOR=0 -node_version=$(node -v | sed 's/^v\([0-9]*\).*/\1/') -if [ "$node_version" != "22" ]; then - echo "Error: This script requires Node.js version 22, found: $node_version" - exit 1 -fi +./scripts/utils/check-node.sh # Check if the argument is provided if [ $# -eq 0 ]; then @@ -51,7 +47,7 @@ for i in $(seq 1 $process_count); do echo "Started process $i with PID: $pid" # Read from pipe, prefix with PID - (sed "s/^/[PID $pid] /" < "$pipe" | tee "../logs/log_${i}.log"; rm "$pipe") & + (sed "s/^/[PID $pid] /" < "$pipe" > "../logs/log_${i}.log"; rm "$pipe") & done cd .. diff --git a/scripts/utils/check-node.sh b/scripts/utils/check-node.sh new file mode 100755 index 00000000..c9ede47e --- /dev/null +++ b/scripts/utils/check-node.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +set -e + +node_version=$(node -v | sed 's/^v\([0-9]*\).*/\1/') +if [ "$node_version" != "22" ]; then + echo "Error: This script requires Node.js version 22, found: $node_version" + exit 1 +fi