Try fixing hanging e2e tests #183

Merged
schmelczer merged 11 commits from asch/dead-locks into main 2025-12-14 17:19:25 +00:00
13 changed files with 164 additions and 39 deletions

View file

@ -5,6 +5,7 @@ on:
branches: ["main"]
pull_request:
branches: ["main"]
workflow_dispatch:
env:
CARGO_TERM_COLOR: always
@ -31,12 +32,5 @@ jobs:
toolchain: "1.89.0"
components: clippy, rustfmt
- name: Setup rust
run: |
which sqlx || cargo install sqlx-cli
cd sync-server
sqlx database create --database-url sqlite://db.sqlite3
sqlx migrate run --source src/app_state/database/migrations --database-url sqlite://db.sqlite3
- name: Lint & test
run: scripts/check.sh

View file

@ -7,6 +7,7 @@ on:
branches: ["main"]
schedule:
- cron: '0 * * * *'
workflow_dispatch:
concurrency:
group: e2e-tests
@ -47,9 +48,24 @@ jobs:
run: |
cd sync-server
cargo run config-e2e.yml --color never &
SERVER_PID=$!
cd ..
scripts/e2e.sh 8
EXIT_CODE=$?
kill $SERVER_PID 2>/dev/null || true
wait $SERVER_PID 2>/dev/null || true
exit $EXIT_CODE
- name: Upload e2e logs
if: always()
uses: actions/upload-artifact@v4
with:
name: e2e-logs
path: logs/
retention-days: 30
- name: Cleanup
if: always()

View file

@ -3,8 +3,6 @@ name: Publish Obsidian plugin
on:
push:
tags: ["*"]
pull_request:
branches: ["main"]
env:
CARGO_TERM_COLOR: always

View file

@ -35,6 +35,8 @@ const LOG_LEVEL_ORDER = {
[LogLevel.ERROR]: 3
};
const HEALTH_CHECK_INTERVAL_MS = 30 * 1000;
async function main(): Promise<void> {
const args = parseArgs(process.argv);
const absolutePath = path.resolve(args.localPath);
@ -147,7 +149,7 @@ async function main(): Promise<void> {
void client.checkConnection().then((status) => {
writeHealthStatus(healthFile, status);
});
}, 30 * 1000); // every 30 seconds
}, HEALTH_CHECK_INTERVAL_MS);
const clearHealthInterval = (): void => {
clearInterval(healthInterval);
};

View file

@ -188,6 +188,14 @@ export class WebSocketManager {
this.webSocket = new this.webSocketFactoryImplementation(wsUri);
this.webSocket.onopen = (): void => {
// Check if we've been stopped while connecting
if (this.isStopped) {
this.webSocket?.close(
1000,
"WebSocketManager was stopped during connection"
);
return;
}
this.logger.info("WebSocket connection opened");
this.onWebSocketStatusChanged.trigger(true);
};

View file

@ -34,6 +34,8 @@ export class SyncClient {
private hasStarted = false;
private hasBeenDestroyed = false;
private unloadTelemetry?: () => void;
private isDestroying = false;
private readonly eventUnsubscribers: (() => void)[] = [];
private constructor(
private readonly history: SyncHistory,
@ -159,11 +161,6 @@ export class SyncClient {
settings.getSettings().isSyncEnabled,
logger
);
settings.onSettingsChanged.add((newSettings, oldSettings) => {
if (oldSettings.isSyncEnabled != newSettings.isSyncEnabled) {
fetchController.canFetch = newSettings.isSyncEnabled;
}
});
const syncService = new SyncService(
deviceId,
@ -258,13 +255,27 @@ export class SyncClient {
this.unloadTelemetry = setUpTelemetry();
}
this.logger.onLogEmitted.add((log): void => {
if (log.level === LogLevel.ERROR && Sentry.isInitialized()) {
Sentry.captureMessage(log.message);
}
});
this.eventUnsubscribers.push(
this.settings.onSettingsChanged.add((newSettings, oldSettings) => {
if (oldSettings.isSyncEnabled != newSettings.isSyncEnabled) {
this.fetchController.canFetch = newSettings.isSyncEnabled;
}
})
);
this.settings.onSettingsChanged.add(this.onSettingsChange.bind(this));
this.eventUnsubscribers.push(
this.logger.onLogEmitted.add((log): void => {
if (log.level === LogLevel.ERROR && Sentry.isInitialized()) {
Sentry.captureMessage(log.message);
}
})
);
this.eventUnsubscribers.push(
this.settings.onSettingsChanged.add(
this.onSettingsChange.bind(this)
)
);
if (this.settings.getSettings().isSyncEnabled) {
this.logger.info("Starting SyncClient");
@ -431,6 +442,15 @@ export class SyncClient {
public async destroy(): Promise<void> {
this.checkIfDestroyed("destroy");
// Prevent concurrent destroy calls
if (this.isDestroying) {
this.logger.warn(
"destroy() called while already destroying, ignoring"
);
return;
}
this.isDestroying = true;
// cancel everything that's in progress
await this.pause();
@ -438,6 +458,12 @@ export class SyncClient {
this.resetInMemoryState();
// Clean up event listeners to prevent memory leaks
this.eventUnsubscribers.forEach((unsubscribe) => {
unsubscribe();
});
this.eventUnsubscribers.length = 0;
this.logger.info("SyncClient has been successfully disposed");
this.unloadTelemetry?.();

View file

@ -264,7 +264,7 @@ export class Syncer {
public async waitUntilFinished(): Promise<void> {
await this.runningScheduleSyncForOfflineChanges;
await this.syncQueue.onEmpty();
await this.syncQueue.onIdle(); // Wait for queue to be empty and running tasks to finish
}
public async syncRemotelyUpdatedFile(

View file

@ -5,6 +5,7 @@ import type { RelativePath } from "../../persistence/database";
import { Locks } from "./locks";
import { awaitAll } from "../await-all";
import { sleep } from "../sleep";
import { SyncResetError } from "../../services/sync-reset-error";
describe("withLock", () => {
const testPath: RelativePath = "test/document/path";
@ -230,3 +231,62 @@ describe("withLock", () => {
]);
});
});
describe("reset", () => {
const testPath: RelativePath = "test/document/path";
const logger = new Logger();
// eslint-disable-next-line @typescript-eslint/init-declarations
let locks: Locks<RelativePath>;
beforeEach(() => {
locks = new Locks<RelativePath>(logger);
});
it("should reject pending waiters with SyncResetError while running operation completes", async () => {
const firstPromise = locks.withLock(testPath, async () => {
await sleep(2);
return "first";
});
await sleep(1);
const secondPromise = locks.withLock(testPath, async () => "second");
void secondPromise.catch(() => {}); // eslint-disable-line @typescript-eslint/no-empty-function
locks.reset();
assert.strictEqual(await firstPromise, "first");
await assert.rejects(secondPromise, (err: Error) => {
assert.ok(err instanceof SyncResetError);
return true;
});
});
it("should allow locks to work normally after reset", async () => {
const firstPromise = locks.withLock(testPath, async () => {
await sleep(1);
return "first";
});
await sleep(1);
const secondPromise = locks.withLock(testPath, async () => "second");
void secondPromise.catch(() => {}); // eslint-disable-line @typescript-eslint/no-empty-function
locks.reset();
await firstPromise;
const result = await locks.withLock(testPath, () => "after-reset");
assert.strictEqual(result, "after-reset");
});
it("should handle reset with no pending operations", async () => {
locks.reset();
const result = await locks.withLock(testPath, () => "success");
assert.strictEqual(result, "success");
});
});

View file

@ -1,3 +1,4 @@
import { SyncResetError } from "../../services/sync-reset-error";
import type { Logger } from "../../tracing/logger";
import { awaitAll } from "../await-all";
@ -12,7 +13,10 @@ export class Locks<T> {
private readonly locked = new Set<T>();
/** Queue of resolve functions waiting for each key */
private readonly waiters = new Map<T, (() => unknown)[]>();
private readonly waiters = new Map<
T,
[() => unknown, (err: unknown) => unknown][]
>();
public constructor(private readonly logger?: Logger) {}
@ -67,6 +71,13 @@ export class Locks<T> {
}
public reset(): void {
// Resolve all waiting promises before clearing to prevent deadlock
// Any operation waiting for a lock will be granted access immediately
for (const waiting of this.waiters.values()) {
for (const [_, reject] of waiting) {
reject(new SyncResetError());
}
}
this.locked.clear();
this.waiters.clear();
}
@ -102,7 +113,7 @@ export class Locks<T> {
this.logger?.debug(`Waiting for lock on ${key}`);
return new Promise((resolve) => {
return new Promise((resolve, reject) => {
// DefaultDict behavior
let waiting = this.waiters.get(key);
if (!waiting) {
@ -110,7 +121,7 @@ export class Locks<T> {
this.waiters.set(key, waiting);
}
waiting.push(resolve);
waiting.push([resolve, reject]);
});
}
@ -127,11 +138,11 @@ export class Locks<T> {
}
// Remove first waiter to ensure FIFO order
const nextWaiting = this.waiters.get(key)?.shift();
const [resolveNextWaiting, _] = this.waiters.get(key)?.shift() ?? [];
if (nextWaiting) {
if (resolveNextWaiting) {
this.logger?.debug(`Granted lock on ${key}`);
nextWaiting();
resolveNextWaiting();
} else {
this.locked.delete(key);
}

View file

@ -2,6 +2,8 @@
set -e
./scripts/utils/check-node.sh
cd docs
npm ci

View file

@ -8,8 +8,15 @@ if [[ "$1" == "--fix" ]]; then
echo "Running in fix mode - will automatically fix linting and formatting issues"
fi
./scripts/utils/check-node.sh
echo "Running checks in sync-server"
cd sync-server
which sqlx || cargo install sqlx-cli
sqlx database create --database-url sqlite://db.sqlite3
sqlx migrate run --source src/app_state/database/migrations --database-url sqlite://db.sqlite3
cargo test --verbose
if [[ "$FIX_MODE" == true ]]; then
@ -51,8 +58,4 @@ fi
cd ..
if [[ "$FIX_MODE" == true ]]; then
$0
else
echo "Success"
fi
echo "Success"

View file

@ -6,11 +6,7 @@ set -o pipefail
NO_COLOR=1
FORCE_COLOR=0
node_version=$(node -v | sed 's/^v\([0-9]*\).*/\1/')
if [ "$node_version" != "22" ]; then
echo "Error: This script requires Node.js version 22, found: $node_version"
exit 1
fi
./scripts/utils/check-node.sh
# Check if the argument is provided
if [ $# -eq 0 ]; then
@ -51,7 +47,7 @@ for i in $(seq 1 $process_count); do
echo "Started process $i with PID: $pid"
# Read from pipe, prefix with PID
(sed "s/^/[PID $pid] /" < "$pipe" | tee "../logs/log_${i}.log"; rm "$pipe") &
(sed "s/^/[PID $pid] /" < "$pipe" > "../logs/log_${i}.log"; rm "$pipe") &
done
cd ..

9
scripts/utils/check-node.sh Executable file
View file

@ -0,0 +1,9 @@
#!/usr/bin/env bash
set -e
node_version=$(node -v | sed 's/^v\([0-9]*\).*/\1/')
if [ "$node_version" != "22" ]; then
echo "Error: This script requires Node.js version 22, found: $node_version"
exit 1
fi