Try fixing hanging e2e tests #183
13 changed files with 164 additions and 39 deletions
8
.github/workflows/check.yml
vendored
8
.github/workflows/check.yml
vendored
|
|
@ -5,6 +5,7 @@ on:
|
|||
branches: ["main"]
|
||||
pull_request:
|
||||
branches: ["main"]
|
||||
workflow_dispatch:
|
||||
|
||||
env:
|
||||
CARGO_TERM_COLOR: always
|
||||
|
|
@ -31,12 +32,5 @@ jobs:
|
|||
toolchain: "1.89.0"
|
||||
components: clippy, rustfmt
|
||||
|
||||
- name: Setup rust
|
||||
run: |
|
||||
which sqlx || cargo install sqlx-cli
|
||||
cd sync-server
|
||||
sqlx database create --database-url sqlite://db.sqlite3
|
||||
sqlx migrate run --source src/app_state/database/migrations --database-url sqlite://db.sqlite3
|
||||
|
||||
- name: Lint & test
|
||||
run: scripts/check.sh
|
||||
|
|
|
|||
16
.github/workflows/e2e.yml
vendored
16
.github/workflows/e2e.yml
vendored
|
|
@ -7,6 +7,7 @@ on:
|
|||
branches: ["main"]
|
||||
schedule:
|
||||
- cron: '0 * * * *'
|
||||
workflow_dispatch:
|
||||
|
||||
concurrency:
|
||||
group: e2e-tests
|
||||
|
|
@ -47,9 +48,24 @@ jobs:
|
|||
run: |
|
||||
cd sync-server
|
||||
cargo run config-e2e.yml --color never &
|
||||
SERVER_PID=$!
|
||||
cd ..
|
||||
|
||||
scripts/e2e.sh 8
|
||||
EXIT_CODE=$?
|
||||
|
||||
kill $SERVER_PID 2>/dev/null || true
|
||||
wait $SERVER_PID 2>/dev/null || true
|
||||
|
||||
exit $EXIT_CODE
|
||||
|
||||
- name: Upload e2e logs
|
||||
if: always()
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: e2e-logs
|
||||
path: logs/
|
||||
retention-days: 30
|
||||
|
||||
- name: Cleanup
|
||||
if: always()
|
||||
|
|
|
|||
2
.github/workflows/publish-plugin.yml
vendored
2
.github/workflows/publish-plugin.yml
vendored
|
|
@ -3,8 +3,6 @@ name: Publish Obsidian plugin
|
|||
on:
|
||||
push:
|
||||
tags: ["*"]
|
||||
pull_request:
|
||||
branches: ["main"]
|
||||
|
||||
env:
|
||||
CARGO_TERM_COLOR: always
|
||||
|
|
|
|||
|
|
@ -35,6 +35,8 @@ const LOG_LEVEL_ORDER = {
|
|||
[LogLevel.ERROR]: 3
|
||||
};
|
||||
|
||||
const HEALTH_CHECK_INTERVAL_MS = 30 * 1000;
|
||||
|
||||
async function main(): Promise<void> {
|
||||
const args = parseArgs(process.argv);
|
||||
const absolutePath = path.resolve(args.localPath);
|
||||
|
|
@ -147,7 +149,7 @@ async function main(): Promise<void> {
|
|||
void client.checkConnection().then((status) => {
|
||||
writeHealthStatus(healthFile, status);
|
||||
});
|
||||
}, 30 * 1000); // every 30 seconds
|
||||
}, HEALTH_CHECK_INTERVAL_MS);
|
||||
const clearHealthInterval = (): void => {
|
||||
clearInterval(healthInterval);
|
||||
};
|
||||
|
|
|
|||
|
|
@ -188,6 +188,14 @@ export class WebSocketManager {
|
|||
this.webSocket = new this.webSocketFactoryImplementation(wsUri);
|
||||
|
||||
this.webSocket.onopen = (): void => {
|
||||
// Check if we've been stopped while connecting
|
||||
if (this.isStopped) {
|
||||
this.webSocket?.close(
|
||||
1000,
|
||||
"WebSocketManager was stopped during connection"
|
||||
);
|
||||
return;
|
||||
}
|
||||
this.logger.info("WebSocket connection opened");
|
||||
this.onWebSocketStatusChanged.trigger(true);
|
||||
};
|
||||
|
|
|
|||
|
|
@ -34,6 +34,8 @@ export class SyncClient {
|
|||
private hasStarted = false;
|
||||
private hasBeenDestroyed = false;
|
||||
private unloadTelemetry?: () => void;
|
||||
private isDestroying = false;
|
||||
private readonly eventUnsubscribers: (() => void)[] = [];
|
||||
|
||||
private constructor(
|
||||
private readonly history: SyncHistory,
|
||||
|
|
@ -159,11 +161,6 @@ export class SyncClient {
|
|||
settings.getSettings().isSyncEnabled,
|
||||
logger
|
||||
);
|
||||
settings.onSettingsChanged.add((newSettings, oldSettings) => {
|
||||
if (oldSettings.isSyncEnabled != newSettings.isSyncEnabled) {
|
||||
fetchController.canFetch = newSettings.isSyncEnabled;
|
||||
}
|
||||
});
|
||||
|
||||
const syncService = new SyncService(
|
||||
deviceId,
|
||||
|
|
@ -258,13 +255,27 @@ export class SyncClient {
|
|||
this.unloadTelemetry = setUpTelemetry();
|
||||
}
|
||||
|
||||
this.logger.onLogEmitted.add((log): void => {
|
||||
if (log.level === LogLevel.ERROR && Sentry.isInitialized()) {
|
||||
Sentry.captureMessage(log.message);
|
||||
}
|
||||
});
|
||||
this.eventUnsubscribers.push(
|
||||
this.settings.onSettingsChanged.add((newSettings, oldSettings) => {
|
||||
if (oldSettings.isSyncEnabled != newSettings.isSyncEnabled) {
|
||||
this.fetchController.canFetch = newSettings.isSyncEnabled;
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
this.settings.onSettingsChanged.add(this.onSettingsChange.bind(this));
|
||||
this.eventUnsubscribers.push(
|
||||
this.logger.onLogEmitted.add((log): void => {
|
||||
if (log.level === LogLevel.ERROR && Sentry.isInitialized()) {
|
||||
Sentry.captureMessage(log.message);
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
this.eventUnsubscribers.push(
|
||||
this.settings.onSettingsChanged.add(
|
||||
this.onSettingsChange.bind(this)
|
||||
)
|
||||
);
|
||||
|
||||
if (this.settings.getSettings().isSyncEnabled) {
|
||||
this.logger.info("Starting SyncClient");
|
||||
|
|
@ -431,6 +442,15 @@ export class SyncClient {
|
|||
public async destroy(): Promise<void> {
|
||||
this.checkIfDestroyed("destroy");
|
||||
|
||||
// Prevent concurrent destroy calls
|
||||
if (this.isDestroying) {
|
||||
this.logger.warn(
|
||||
"destroy() called while already destroying, ignoring"
|
||||
);
|
||||
return;
|
||||
}
|
||||
this.isDestroying = true;
|
||||
|
||||
// cancel everything that's in progress
|
||||
await this.pause();
|
||||
|
||||
|
|
@ -438,6 +458,12 @@ export class SyncClient {
|
|||
|
||||
this.resetInMemoryState();
|
||||
|
||||
// Clean up event listeners to prevent memory leaks
|
||||
this.eventUnsubscribers.forEach((unsubscribe) => {
|
||||
unsubscribe();
|
||||
});
|
||||
this.eventUnsubscribers.length = 0;
|
||||
|
||||
this.logger.info("SyncClient has been successfully disposed");
|
||||
|
||||
this.unloadTelemetry?.();
|
||||
|
|
|
|||
|
|
@ -264,7 +264,7 @@ export class Syncer {
|
|||
|
||||
public async waitUntilFinished(): Promise<void> {
|
||||
await this.runningScheduleSyncForOfflineChanges;
|
||||
await this.syncQueue.onEmpty();
|
||||
await this.syncQueue.onIdle(); // Wait for queue to be empty and running tasks to finish
|
||||
}
|
||||
|
||||
public async syncRemotelyUpdatedFile(
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import type { RelativePath } from "../../persistence/database";
|
|||
import { Locks } from "./locks";
|
||||
import { awaitAll } from "../await-all";
|
||||
import { sleep } from "../sleep";
|
||||
import { SyncResetError } from "../../services/sync-reset-error";
|
||||
|
||||
describe("withLock", () => {
|
||||
const testPath: RelativePath = "test/document/path";
|
||||
|
|
@ -230,3 +231,62 @@ describe("withLock", () => {
|
|||
]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("reset", () => {
|
||||
const testPath: RelativePath = "test/document/path";
|
||||
const logger = new Logger();
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/init-declarations
|
||||
let locks: Locks<RelativePath>;
|
||||
|
||||
beforeEach(() => {
|
||||
locks = new Locks<RelativePath>(logger);
|
||||
});
|
||||
|
||||
it("should reject pending waiters with SyncResetError while running operation completes", async () => {
|
||||
const firstPromise = locks.withLock(testPath, async () => {
|
||||
await sleep(2);
|
||||
return "first";
|
||||
});
|
||||
|
||||
await sleep(1);
|
||||
|
||||
const secondPromise = locks.withLock(testPath, async () => "second");
|
||||
void secondPromise.catch(() => {}); // eslint-disable-line @typescript-eslint/no-empty-function
|
||||
|
||||
locks.reset();
|
||||
|
||||
assert.strictEqual(await firstPromise, "first");
|
||||
|
||||
await assert.rejects(secondPromise, (err: Error) => {
|
||||
assert.ok(err instanceof SyncResetError);
|
||||
return true;
|
||||
});
|
||||
});
|
||||
|
||||
it("should allow locks to work normally after reset", async () => {
|
||||
const firstPromise = locks.withLock(testPath, async () => {
|
||||
await sleep(1);
|
||||
return "first";
|
||||
});
|
||||
|
||||
await sleep(1);
|
||||
|
||||
const secondPromise = locks.withLock(testPath, async () => "second");
|
||||
void secondPromise.catch(() => {}); // eslint-disable-line @typescript-eslint/no-empty-function
|
||||
|
||||
locks.reset();
|
||||
|
||||
await firstPromise;
|
||||
|
||||
const result = await locks.withLock(testPath, () => "after-reset");
|
||||
assert.strictEqual(result, "after-reset");
|
||||
});
|
||||
|
||||
it("should handle reset with no pending operations", async () => {
|
||||
locks.reset();
|
||||
|
||||
const result = await locks.withLock(testPath, () => "success");
|
||||
assert.strictEqual(result, "success");
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import { SyncResetError } from "../../services/sync-reset-error";
|
||||
import type { Logger } from "../../tracing/logger";
|
||||
import { awaitAll } from "../await-all";
|
||||
|
||||
|
|
@ -12,7 +13,10 @@ export class Locks<T> {
|
|||
private readonly locked = new Set<T>();
|
||||
|
||||
/** Queue of resolve functions waiting for each key */
|
||||
private readonly waiters = new Map<T, (() => unknown)[]>();
|
||||
private readonly waiters = new Map<
|
||||
T,
|
||||
[() => unknown, (err: unknown) => unknown][]
|
||||
>();
|
||||
|
||||
public constructor(private readonly logger?: Logger) {}
|
||||
|
||||
|
|
@ -67,6 +71,13 @@ export class Locks<T> {
|
|||
}
|
||||
|
||||
public reset(): void {
|
||||
// Resolve all waiting promises before clearing to prevent deadlock
|
||||
// Any operation waiting for a lock will be granted access immediately
|
||||
for (const waiting of this.waiters.values()) {
|
||||
for (const [_, reject] of waiting) {
|
||||
reject(new SyncResetError());
|
||||
}
|
||||
}
|
||||
this.locked.clear();
|
||||
this.waiters.clear();
|
||||
}
|
||||
|
|
@ -102,7 +113,7 @@ export class Locks<T> {
|
|||
|
||||
this.logger?.debug(`Waiting for lock on ${key}`);
|
||||
|
||||
return new Promise((resolve) => {
|
||||
return new Promise((resolve, reject) => {
|
||||
// DefaultDict behavior
|
||||
let waiting = this.waiters.get(key);
|
||||
if (!waiting) {
|
||||
|
|
@ -110,7 +121,7 @@ export class Locks<T> {
|
|||
this.waiters.set(key, waiting);
|
||||
}
|
||||
|
||||
waiting.push(resolve);
|
||||
waiting.push([resolve, reject]);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -127,11 +138,11 @@ export class Locks<T> {
|
|||
}
|
||||
|
||||
// Remove first waiter to ensure FIFO order
|
||||
const nextWaiting = this.waiters.get(key)?.shift();
|
||||
const [resolveNextWaiting, _] = this.waiters.get(key)?.shift() ?? [];
|
||||
|
||||
if (nextWaiting) {
|
||||
if (resolveNextWaiting) {
|
||||
this.logger?.debug(`Granted lock on ${key}`);
|
||||
nextWaiting();
|
||||
resolveNextWaiting();
|
||||
} else {
|
||||
this.locked.delete(key);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,8 @@
|
|||
|
||||
set -e
|
||||
|
||||
./scripts/utils/check-node.sh
|
||||
|
||||
cd docs
|
||||
|
||||
npm ci
|
||||
|
|
|
|||
|
|
@ -8,8 +8,15 @@ if [[ "$1" == "--fix" ]]; then
|
|||
echo "Running in fix mode - will automatically fix linting and formatting issues"
|
||||
fi
|
||||
|
||||
./scripts/utils/check-node.sh
|
||||
|
||||
echo "Running checks in sync-server"
|
||||
|
||||
cd sync-server
|
||||
which sqlx || cargo install sqlx-cli
|
||||
sqlx database create --database-url sqlite://db.sqlite3
|
||||
sqlx migrate run --source src/app_state/database/migrations --database-url sqlite://db.sqlite3
|
||||
|
||||
cargo test --verbose
|
||||
|
||||
if [[ "$FIX_MODE" == true ]]; then
|
||||
|
|
@ -51,8 +58,4 @@ fi
|
|||
|
||||
cd ..
|
||||
|
||||
if [[ "$FIX_MODE" == true ]]; then
|
||||
$0
|
||||
else
|
||||
echo "Success"
|
||||
fi
|
||||
echo "Success"
|
||||
|
|
|
|||
|
|
@ -6,11 +6,7 @@ set -o pipefail
|
|||
NO_COLOR=1
|
||||
FORCE_COLOR=0
|
||||
|
||||
node_version=$(node -v | sed 's/^v\([0-9]*\).*/\1/')
|
||||
if [ "$node_version" != "22" ]; then
|
||||
echo "Error: This script requires Node.js version 22, found: $node_version"
|
||||
exit 1
|
||||
fi
|
||||
./scripts/utils/check-node.sh
|
||||
|
||||
# Check if the argument is provided
|
||||
if [ $# -eq 0 ]; then
|
||||
|
|
@ -51,7 +47,7 @@ for i in $(seq 1 $process_count); do
|
|||
echo "Started process $i with PID: $pid"
|
||||
|
||||
# Read from pipe, prefix with PID
|
||||
(sed "s/^/[PID $pid] /" < "$pipe" | tee "../logs/log_${i}.log"; rm "$pipe") &
|
||||
(sed "s/^/[PID $pid] /" < "$pipe" > "../logs/log_${i}.log"; rm "$pipe") &
|
||||
done
|
||||
|
||||
cd ..
|
||||
|
|
|
|||
9
scripts/utils/check-node.sh
Executable file
9
scripts/utils/check-node.sh
Executable file
|
|
@ -0,0 +1,9 @@
|
|||
#!/usr/bin/env bash
|
||||
|
||||
set -e
|
||||
|
||||
node_version=$(node -v | sed 's/^v\([0-9]*\).*/\1/')
|
||||
if [ "$node_version" != "22" ]; then
|
||||
echo "Error: This script requires Node.js version 22, found: $node_version"
|
||||
exit 1
|
||||
fi
|
||||
Loading…
Add table
Add a link
Reference in a new issue