Compare commits

...
Sign in to create a new pull request.

7 commits

Author SHA1 Message Date
9d99a4ac23 split: sync-client utils and errors reorganization
Move error classes from services/ and file-operations/ into a new errors/
directory (authentication-error, server-version-mismatch-error,
sync-reset-error, file-not-found-error), plus add file-already-exists-error
and http-client-error. Update consts.ts and utils/* (await-all,
create-client-id, hash, rate-limit, find-matching-file). Replace
data-structures (locks, min-covered, event-listeners, fix-sized-cache) and
add debugging utilities (in-memory-file-system, log-to-console,
slow-web-socket-factory). Removes utils/create-promise.ts.
2026-05-08 21:36:29 +01:00
f7beb31d8f split: regenerated TS API type bindings
Auto-generated TS types regenerated from Rust ts-rs derives, mirrored into
frontend/sync-client/src/services/types/ and frontend/history-ui/src/lib/types/.
Adds ListVaultsResponse, VaultHistoryResponse, VaultInfo and updates several
existing types; removes DeleteDocumentVersion and UpdateDocumentVersion.
2026-05-08 21:36:13 +01:00
042233c4d7 split: server websocket + cursors
src/server/websocket.rs handshake/catch-up rewrite, app_state/cursors.rs,
app_state/websocket/{broadcasts,models,utils}.rs.
2026-05-08 21:35:52 +01:00
4ba439b874 split: server REST endpoints + rate limiting
server.rs router rewrite, auth.rs, device_id_header.rs, requests.rs,
responses.rs, plus per-endpoint changes: create/update/delete_document,
fetch_document_version{,_content,s}, fetch_latest_documents, index.rs.
Adds: fetch_vault_history, list_vaults, rate_limit (new files).
2026-05-08 21:35:41 +01:00
2d5edc6ec5 split: server database (app_state, migrations, models)
src/app_state.rs, src/app_state/database.rs (large schema/query rewrite),
two new migrations (add_idempotency_key, add_creation_vault_update_id),
and src/app_state/database/models.rs.
2026-05-08 21:35:30 +01:00
a9ce09b59d split: server foundation (Cargo, config, errors, utils, main)
Cargo.{toml,lock} bumps, build.rs, config-e2e.yml, rust-toolchain.toml,
src/config/* (database/logging/server/user configs), src/consts.rs,
src/errors.rs, src/main.rs, and src/utils/* (dedup_paths,
find_first_available_path, rotating_file_writer, sanitize_path).
2026-05-08 21:35:18 +01:00
70f97c4b16 split: CI workflows, scripts, root tooling, and docs
Some checks failed
Check / build (pull_request) Has been cancelled
E2E tests / build (pull_request) Has been cancelled
Publish CLI / publish-docker (pull_request) Has been cancelled
Publish server Docker image / publish-docker (pull_request) Has been cancelled
Forgejo workflows (new), GitHub workflow tweaks, .gitignore/.vscode, root
package-lock, rustfmt.toml, scripts/* updates, docs/ updates including
data-flow / authentication / server-setup, CLAUDE.md and README updates.
2026-05-08 21:35:07 +01:00
125 changed files with 6874 additions and 4049 deletions

View file

@ -0,0 +1,35 @@
name: Check
on:
push:
branches: ["main"]
pull_request:
branches: ["main"]
workflow_dispatch:
env:
CARGO_TERM_COLOR: always
RUSTFLAGS: "-Dwarnings"
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Setup Node.js environment
uses: actions/setup-node@v4
with:
node-version: "25.x"
- name: Setup Rust toolchain
uses: dtolnay/rust-toolchain@stable
with:
toolchain: "1.92.0"
components: clippy, rustfmt
- name: Lint & test
run: scripts/check.sh

View file

@ -0,0 +1,38 @@
name: Deploy Documentation
on:
push:
branches:
- main
paths:
- "docs/**"
- ".forgejo/workflows/deploy-docs.yml"
workflow_dispatch:
concurrency:
group: pages
cancel-in-progress: false
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Setup Node.js environment
uses: actions/setup-node@v4
with:
node-version: "25.x"
- name: Build docs
run: scripts/build-docs.sh
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: docs
path: docs/.vitepress/dist

View file

@ -0,0 +1,71 @@
name: E2E tests
on:
push:
branches: ["main"]
pull_request:
branches: ["main"]
schedule:
- cron: "0 * * * *"
workflow_dispatch:
concurrency:
group: e2e-tests
cancel-in-progress: false
env:
RUSTFLAGS: "-Dwarnings"
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Setup Node.js environment
uses: actions/setup-node@v4
with:
node-version: "25.x"
- name: Setup Rust toolchain
uses: dtolnay/rust-toolchain@stable
with:
toolchain: "1.92.0"
components: clippy, rustfmt
- name: Setup rust
run: |
which sqlx || cargo install sqlx-cli
cd sync-server
sqlx database create --database-url sqlite://db.sqlite3
sqlx migrate run --source src/app_state/database/migrations --database-url sqlite://db.sqlite3
- name: E2E tests
run: |
cd sync-server
cargo run config-e2e.yml --color never &
SERVER_PID=$!
cd ..
scripts/e2e.sh 8
EXIT_CODE=$?
kill $SERVER_PID 2>/dev/null || true
wait $SERVER_PID 2>/dev/null || true
exit $EXIT_CODE
- name: Upload e2e logs
if: always()
uses: actions/upload-artifact@v4
with:
name: e2e-logs
path: logs/
retention-days: 30
- name: Cleanup
if: always()
run: scripts/clean-up.sh

View file

@ -0,0 +1,51 @@
name: Publish CLI
on:
push:
branches: ["main"]
tags: ["*"]
pull_request:
branches: ["main"]
jobs:
publish-docker:
runs-on: ubuntu-docker
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Extract registry hostname
id: registry
run: echo "host=$(echo '${{ github.server_url }}' | sed 's|https\?://||')" >> "$GITHUB_OUTPUT"
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log into container registry
uses: docker/login-action@v3
with:
registry: ${{ steps.registry.outputs.host }}
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract Docker metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ steps.registry.outputs.host }}/${{ github.repository }}-cli
- name: Build and push Docker image
id: build-and-push
uses: docker/build-push-action@v5
with:
context: frontend
file: frontend/local-client-cli/Dockerfile
platforms: linux/amd64
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=registry,ref=${{ steps.registry.outputs.host }}/${{ github.repository }}-cli:buildcache
cache-to: type=registry,ref=${{ steps.registry.outputs.host }}/${{ github.repository }}-cli:buildcache,mode=max

View file

@ -0,0 +1,71 @@
name: Publish Obsidian plugin
on:
push:
tags: ["*"]
env:
CARGO_TERM_COLOR: always
jobs:
publish-plugin:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Setup Node.js environment
uses: actions/setup-node@v4
with:
node-version: "25.x"
- name: Build plugin
run: |
cd frontend
npm ci
npm run build
- name: Setup Rust toolchain
uses: dtolnay/rust-toolchain@stable
with:
toolchain: "1.92.0"
components: clippy, rustfmt
- name: Install cross-compilation tools
run: |
apt update
apt install -y gcc-aarch64-linux-gnu musl-tools gcc-mingw-w64-x86-64 jq
- name: Build Linux and Windows binaries
run: ./scripts/build-sync-server-binaries.sh
- name: Create release
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SERVER_URL: ${{ github.server_url }}
REPO: ${{ github.repository }}
run: |
tag="${GITHUB_REF#refs/tags/}"
mkdir -p release
cp frontend/obsidian-plugin/dist/* release/
cp sync-server/artifacts/sync-server-* release/
# Create draft release via Forgejo API
RELEASE_ID=$(curl -s -X POST \
"${SERVER_URL}/api/v1/repos/${REPO}/releases" \
-H "Authorization: token ${GITHUB_TOKEN}" \
-H "Content-Type: application/json" \
-d "{\"tag_name\": \"${tag}\", \"name\": \"${tag}\", \"draft\": true}" \
| jq -r '.id')
# Upload release assets
for file in release/*; do
filename=$(basename "$file")
curl -s -X POST \
"${SERVER_URL}/api/v1/repos/${REPO}/releases/${RELEASE_ID}/assets?name=${filename}" \
-H "Authorization: token ${GITHUB_TOKEN}" \
-F "attachment=@${file}"
done

View file

@ -0,0 +1,51 @@
name: Publish server Docker image
on:
push:
branches: ["main"]
tags: ["*"]
pull_request:
branches: ["main"]
jobs:
publish-docker:
runs-on: ubuntu-docker
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Extract registry hostname
id: registry
run: echo "host=$(echo '${{ github.server_url }}' | sed 's|https\?://||')" >> "$GITHUB_OUTPUT"
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log into container registry
if: github.ref_type == 'tag'
uses: docker/login-action@v3
with:
registry: ${{ steps.registry.outputs.host }}
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract Docker metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ steps.registry.outputs.host }}/${{ github.repository }}
- name: Build and push Docker image
id: build-and-push
uses: docker/build-push-action@v5
with:
context: sync-server
platforms: linux/amd64,linux/arm64
push: ${{ github.ref_type == 'tag' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=registry,ref=${{ steps.registry.outputs.host }}/${{ github.repository }}:buildcache
cache-to: type=registry,ref=${{ steps.registry.outputs.host }}/${{ github.repository }}:buildcache,mode=max

View file

@ -23,13 +23,13 @@ jobs:
- name: Setup Node.js environment
uses: actions/setup-node@v4.2.0
with:
node-version: "22.x"
node-version: "25.x"
check-latest: true
- name: Setup Rust toolchain
uses: dtolnay/rust-toolchain@stable
with:
toolchain: "1.89.0"
toolchain: "1.92.0"
components: clippy, rustfmt
- name: Lint & test

View file

@ -5,8 +5,8 @@ on:
branches:
- main
paths:
- 'docs/**'
- '.github/workflows/deploy-docs.yml'
- "docs/**"
- ".github/workflows/deploy-docs.yml"
workflow_dispatch:
permissions:
@ -28,12 +28,11 @@ jobs:
with:
fetch-depth: 0
- name: Setup Node
uses: actions/setup-node@v4
- name: Setup Node.js environment
uses: actions/setup-node@v4.2.0
with:
node-version: 22
cache: npm
cache-dependency-path: docs/package-lock.json
node-version: "25.x"
check-latest: true
- name: Setup Pages
uses: actions/configure-pages@v4

View file

@ -6,7 +6,7 @@ on:
pull_request:
branches: ["main"]
schedule:
- cron: '0 * * * *'
- cron: "0 * * * *"
workflow_dispatch:
concurrency:
@ -28,13 +28,13 @@ jobs:
- name: Setup Node.js environment
uses: actions/setup-node@v4.2.0
with:
node-version: "22.x"
node-version: "25.x"
check-latest: true
- name: Setup Rust toolchain
uses: dtolnay/rust-toolchain@stable
with:
toolchain: "1.89.0"
toolchain: "1.92.0"
components: clippy, rustfmt
- name: Setup rust

View file

@ -19,7 +19,7 @@ jobs:
- name: Setup Node.js environment
uses: actions/setup-node@v4.2.0
with:
node-version: "22.x"
node-version: "25.x"
check-latest: true
- name: Build plugin
@ -31,7 +31,7 @@ jobs:
- name: Setup Rust toolchain
uses: dtolnay/rust-toolchain@stable
with:
toolchain: "1.89.0"
toolchain: "1.92.0"
components: clippy, rustfmt
- name: Install cross-compilation tools

9
.gitignore vendored
View file

@ -7,15 +7,18 @@ node_modules
# Frontend build folders
frontend/*/dist
sync-server/db.sqlite3*
sync-server/databases
# Rust build folders
sync-server/target
sync-server/artifacts
sync-server/bindings/*.ts
# build folders
sync-server/db.sqlite3*
**/databases
*.log
*.sqlx
target
.task

View file

@ -5,6 +5,6 @@
"**/dist": true,
"**/node_modules": true,
"**/.sqlx": true,
"**/target": true,
},
"**/target": true
}
}

195
CLAUDE.md
View file

@ -2,109 +2,154 @@
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
## Project shape
VaultLink is a self-hosted Obsidian plugin for real-time collaborative file syncing. The project consists of a Rust-based sync server and a TypeScript frontend with three main components: an Obsidian plugin, a sync client library, and a test client.
VaultLink is a self-hosted Obsidian file-sync system. Two halves of one repo:
## Architecture
- `sync-server/` — Rust (axum + sqlx/SQLite). Source of truth for vault state, broadcasts changes via WebSocket.
- `frontend/` — npm workspaces. The sync engine (`sync-client`) is consumed by an Obsidian plugin, a standalone CLI, a fuzz E2E harness, a scripted determinism harness, and a history UI.
### Core Components
The HTTP/WS API types are generated from Rust (`ts-rs`) and mirrored into the TS workspaces. **Never hand-edit files in `frontend/sync-client/src/services/types/` or `frontend/history-ui/src/lib/types/`** — run `scripts/update-api-types.sh` after changing anything Serde-derived in the server.
- **sync-server/**: Rust-based WebSocket server with SQLite database for document versioning and real-time synchronization
- **frontend/sync-client/**: TypeScript library providing core sync functionality, WebSocket management, and file operations
- **frontend/obsidian-plugin/**: Obsidian plugin that integrates the sync client with Obsidian's API
- **frontend/test-client/**: CLI testing tool for the sync functionality
### Frontend workspaces
### Key Technologies
- `sync-client` — the sync engine; published to consumers via `dist/`. All other TS workspaces depend on it via `file:../sync-client`.
- `obsidian-plugin` — Obsidian plugin built from `sync-client`.
- `local-client-cli` — same engine wrapped as a standalone CLI.
- `history-ui` — vault-history web UI.
- `test-client` — fuzz E2E harness (random ops across N processes).
- `deterministic-tests` — scripted multi-client tests with an in-memory FS, run against a real server.
- **Backend**: Rust with Axum framework, SQLite with SQLx, WebSockets for real-time sync
- **Frontend**: TypeScript, Webpack for bundling, Jest for testing
- **Sync Algorithm**: Uses reconcile-text library for operational transformation
## Common commands
## Development Commands
Pre-push hygiene (formats, lints, runs tests, requires clean git state):
### Server Development
```bash
cd sync-server
cargo run config-e2e.yml # Start development server
cargo test --verbose # Run Rust tests
cargo clippy --all-targets --all-features # Lint Rust code
cargo clippy --all-targets --all-features --fix --allow-dirty --allow-staged # Auto-fix clippy warnings
cargo fmt --all -- --check # Check Rust formatting
cargo fmt --all # Auto-format Rust code
cargo machete --with-metadata # Detect unused dependencies
```sh
scripts/check.sh --fix
```
### Frontend Development
```bash
Run the fuzz E2E (N parallel processes):
```sh
scripts/e2e.sh 12
# Logs land in logs/log_<i>.log. Clean with scripts/clean-up.sh
```
Run deterministic tests (require a release-built server in `sync-server/target/release/sync_server` — they spawn it themselves):
```sh
cd sync-server && cargo build --release && cd ..
cd frontend
npm run dev # Start development mode (watches sync-client and obsidian-plugin)
npm run build # Build all workspaces
npm run test # Run all tests
npm run lint # Lint and format TypeScript code
npm run build -w sync-client -w deterministic-tests
node deterministic-tests/dist/cli.js # all
node deterministic-tests/dist/cli.js --filter=rename # subset
node deterministic-tests/dist/cli.js --filter=… -j 4 # cap parallelism
```
### Database Setup (Development)
```bash
Run a single sync-client unit test by file:
```sh
cd frontend/sync-client && npx tsx --test 'src/**/sync-event-queue.test.ts'
```
Server: dev runs from `sync-server/` against `config-e2e.yml`:
```sh
cd sync-server
cargo run config-e2e.yml # dev
cargo build --release # used by both e2e harnesses
cargo test # unit + ts-rs binding export tests
```
Frontend dev (sync-client + obsidian-plugin watch in parallel):
```sh
cd frontend && npm install && npm run dev
```
Regenerate TS bindings from Rust types (touches `frontend/{sync-client,history-ui}/src/.../types/`):
```sh
scripts/update-api-types.sh
```
## SQLite / sqlx
The server uses `sqlx::query!` macros that need a prepared `.sqlx` cache to compile offline. Touching any SQL means regenerating it:
```sh
cd sync-server
sqlx database create --database-url sqlite://db.sqlite3
sqlx migrate run --source src/app_state/database/migrations --database-url sqlite://db.sqlite3
cargo sqlx prepare --workspace
```
### Initial Setup
```bash
# Install required cargo tools
cargo install sqlx-cli cargo-machete cargo-edit
New migrations: `sqlx migrate add --source src/app_state/database/migrations <name>`.
## Sync engine architecture
Read `frontend/sync-client/src/sync-operations/` to follow the sync engine; the rest of `sync-client` is plumbing (filesystem ops, persistence, services, telemetry).
The engine is **two independent loops with separate invariants**:
- **Wire loop** (`syncer.ts`) — drains the single-consumer FIFO queue. HTTP and WS handlers update record fields (`remoteRelativePath`, `parentVersionId`, `remoteHash`) and write content to the file at `record.localPath`. They never move files for path placement.
- **Path reconciler** (`reconciler.ts`) — runs after every drained event. Best-effort pass that moves files to make `localPath === remoteRelativePath`. The move graph is topologically sorted; cycles are resolved by reading every file in the cycle into memory and writing each back to its new slot (no tmp files). Records with pending local events are skipped on each pass — the reconciler operates only on settled records. Failures (slot occupied by an untracked file, etc.) are silent skips; the next pass retries.
**`SyncEventQueue`** (`sync-event-queue.ts`) holds:
- `byDocId: Map<DocumentId, DocumentRecord>` — primary record store.
- `byLocalPath: Map<RelativePath, DocumentRecord>` — derived index for path lookups, maintained at every mutation point.
- `events: SyncEvent[]` — pending wire ops in FIFO drain order.
```ts
DocumentRecord = {
documentId,
parentVersionId,
remoteHash?,
remoteRelativePath,
localPath: RelativePath | undefined
}
```
### Scripts
- `scripts/check.sh`: Full CI check (builds, lints, tests both server and frontend)
- `scripts/check.sh --fix`: Same as above but auto-fixes linting and formatting issues
- `scripts/e2e.sh`: End-to-end testing
- `scripts/clean-up.sh`: Clean logs and database files
- `scripts/bump-version.sh patch`: Publish new version
- `scripts/update-api-types.sh`: Update TypeScript bindings from Rust types
`localPath === undefined` means the doc has no local file yet — typically a remote create whose target slot was occupied at receive time; the reconciler will fetch and place when the slot frees (the bytes wait in `pendingPlacementContent`).
## Code Structure
Local FS events from the watcher update `localPath` synchronously at enqueue time via `setLocalPath` / `upsertRecord`. The wire loop never updates it for path placement; only the reconciler does. A user rename onto a tracked slot enqueues a `LocalDelete` for the displaced doc (the OS rename clobbered its content) and clears that doc's `localPath`.
### Workspace Configuration
The frontend uses npm workspaces with four packages:
- `sync-client`: Core synchronization logic
- `obsidian-plugin`: Obsidian-specific integration
- `test-client`: Testing utilities
- `local-client-cli`: Standalone CLI for VaultLink sync client
**Pending creates** use a `Promise<DocumentId>` chain to serialize dependent ops (`LocalUpdate`, `LocalDelete`) behind the still-in-flight `LocalCreate`. `resolveCreate` resolves the promise once the server returns a docId, and `replacePendingDocumentId` swaps the resolved id across already-queued events. `findLatestCreateForPath` is the lookup the watcher uses to attach dependents; `updatePendingCreatePath` rewrites a pending create's `event.path` in place when the user renames the file before its create has acked.
### Type Generation
Rust structs generate TypeScript types via ts-rs crate, stored in `sync-server/bindings/` and used by frontend packages.
**Watermark.** `lastSeenUpdateId` uses a `MinCovered` (a contiguous-prefix tracker over a stream of integers): we only advance the published min when the next consecutive id has been processed, so out-of-order RemoteChange ids don't fool the WebSocket handshake into requesting a too-recent catch-up.
### Key Files
- `sync-server/src/`: Rust server implementation with WebSocket handlers
- `frontend/sync-client/src/sync-client.ts`: Main sync client entry point
- `frontend/obsidian-plugin/src/vault-link-plugin.ts`: Main Obsidian plugin class
- `frontend/sync-client/src/services/sync-service.ts`: Core synchronization logic
**Server catch-up.** The server's WS handshake replays events newer than the client's `last_seen_vault_update_id` from the `latest_document_versions` view (one row per doc, the latest). On those replayed rows `is_new_file` means _new to this client_ (`creation_vault_update_id > last_seen_vault_update_id`), not "this row is the doc's first version" — necessary because the catch-up only carries the latest version; if a doc was created and updated past the watermark, the client never sees its create otherwise.
## Testing
## Edge-case patterns the sync engine has to survive
### Running Tests
- Server: `cargo test --verbose`
- Frontend: `npm run test` (runs Jest across all workspaces)
- E2E: `scripts/e2e.sh`
The two-loop split defuses most of the old race catalogue (slot-collision stashes, conflict-uuid divergence, `MoveOnConflict.NEW`/`EXISTING` policy choices) by separating wire transport from path placement. What's left:
### Test Structure
- Rust: Unit tests alongside source files
- TypeScript: `.test.ts` files using Jest
- E2E: Uses test-client to simulate multiple concurrent users
**Pending-create docId is a `Promise`, not a string, until the create acks.** Any `LocalUpdate` / `LocalDelete` queued behind a still-in-flight `LocalCreate` carries the create's `resolvers.promise` as its `documentId`. `replacePendingDocumentId` swaps the resolved id across queued events when the create resolves; `===` comparisons against the resolved string elsewhere will silently fail until that swap runs. Anything that walks `events[]` looking for a docId match must either run after the swap or be tolerant of `Promise`-typed ids.
## Code Style
**`processCreate` reads `event.path` live, not `event.originalPath`.** The watcher rewrites `event.path` in place via `updatePendingCreatePath` when the user renames a pending-create file. `originalPath` was removed from `LocalCreate` events specifically because reading it would send the stale pre-rename path to the server.
### Rust
- Uses extensive Clippy lints (see Cargo.toml)
- Follows pedantic linting rules
- Forbids unsafe code
- Uses cargo fmt with default settings
**`record.localPath` mutates in place across awaits.** When the watcher renames a doc while a drain handler is awaiting an HTTP roundtrip, the queue mutates the in-flight event's record so subsequent reads see the new path. Snapshotting `record.localPath` into a local at function entry and using it after an `await` reads/writes a now-vacated slot. Read `record.localPath` live; only snapshot for the deliberate "did it change while I was awaiting" comparison.
### TypeScript
- Prettier configuration: 4-space tabs, trailing commas removed, LF line endings
- ESLint with unused imports plugin
- Consistent across all three frontend packages
**Reconciler-defer is the wire-loop's contract with the reconciler.** The reconciler skips records where `hasPendingLocalEventsForDocumentId` returns true. Wire-loop handlers can therefore freely write `remoteRelativePath` to whatever the server returned — even if it disagrees with `localPath` — knowing the reconciler won't move the file out from under a queued user rename.
**Watermark advancement is load-bearing both ways.** Branches that _skip_ a remote event without advancing `lastSeenUpdateId` create permanent gaps that re-deliver forever. Branches that _advance_ without applying the content lose data: the server has no further event to re-deliver, the catch-up only carries the latest version, and any state in between is gone. Don't advance unless the event was actually applied (or deliberately discarded after weighing both halves).
**`isNewFile` semantics differ between catch-up and real-time.** On WS handshake replay it means _new to this client_ (`creation_vault_update_id > last_seen_vault_update_id`); on real-time broadcasts it means _this version is the create_ (`creation_vault_update_id == vault_update_id`). A handler that decides based on one interpretation will be wrong on the other channel; reasoning about fetch-and-treat-as-new vs. ignore needs to know which channel delivered the event.
**Pause / disable-sync mid-flight** is the one race the new model doesn't structurally fix. An HTTP that committed server-side but whose response was discarded leaves the server holding a doc the client has no record of. Resume → offline scan → server-side dedupe handles it (the server merges the duplicate create into the existing doc), but if the merge produces a deconflict, the client picks up an extra file. Out of scope for the two-loop split.
**Cycle reconciliation uses in-memory content swap.** When the move graph contains a cycle, the reconciler reads every file in the cycle into memory and writes each back to its new slot, with no tmp files. A write-ahead marker at `.vaultlink/swap-<uuid>.json` lists each leg; on startup the reconciler reads the marker, hashes each `from` to determine which legs ran, and replays the rest. The `.vaultlink/**` glob is hard-coded as an internal ignore pattern so swap markers don't get sync'd.
## Two complementary E2E harnesses
- **`test-client` (fuzz):** random ops across N parallel processes for many minutes. Used by `scripts/e2e.sh`. Catches bugs nobody thought to write a test for, but reproductions are noisy.
- **`deterministic-tests`:** scripted scenarios with an in-memory FS pinned to a real server. Used to _capture_ a fuzz-discovered bug as a minimal repro before fixing it. See `frontend/deterministic-tests/README.md` for the step grammar (`pause-server`, `pause-websocket`, `barrier`, `assert-consistent`, etc.).
When a fuzz failure surfaces, the workflow is: root-cause from logs → write a deterministic test that fails on the bug → fix → confirm both the deterministic test and `e2e.sh` pass.
## Style
- TS: 4-space indent, no tabs, LF, prettier (`trailingComma: "none"`). YAML/MD use 2-space indent.
- Rust: `rustfmt.toml` enforces 4-space spaces, LF.
- Lint: ESLint for TS, Clippy for Rust, `cargo machete` for unused deps. All wired into `scripts/check.sh`.

View file

@ -8,12 +8,12 @@
## Develop
### Install [nvm](https://github.com/nvm-sh/nvm)
### Set up Node.JS 25 with [nvm](https://github.com/nvm-sh/nvm)
- `curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.40.1/install.sh | bash`
- `nvm install 22`
- `nvm use 22`
- Optionally set the system-wide default: `nvm alias default 22`
- `nvm install 25`
- `nvm use 25`
- Optionally, set the system-wide default: `nvm alias default 25`
### Set up Rust

View file

@ -2,12 +2,7 @@
"version": "0.2",
"language": "en-GB",
"dictionaries": ["en-gb"],
"ignorePaths": [
"node_modules",
".vitepress/dist",
".vitepress/cache",
"package-lock.json"
],
"ignorePaths": ["node_modules", ".vitepress/dist", ".vitepress/cache", "package-lock.json"],
"words": [
"VaultLink",
"Obsidian",

View file

@ -361,11 +361,11 @@ VALUES (?, ?, ?);
```json
{
"type": "upload_file",
"path": "notes/example.md",
"content": "File content here...",
"base_version": 10,
"timestamp": "2024-01-01T12:00:00Z"
"type": "upload_file",
"path": "notes/example.md",
"content": "File content here...",
"base_version": 10,
"timestamp": "2024-01-01T12:00:00Z"
}
```
@ -373,8 +373,8 @@ VALUES (?, ?, ?);
```json
{
"type": "download_file",
"path": "notes/example.md"
"type": "download_file",
"path": "notes/example.md"
}
```
@ -382,8 +382,8 @@ VALUES (?, ?, ?);
```json
{
"type": "delete_file",
"path": "notes/old.md"
"type": "delete_file",
"path": "notes/old.md"
}
```
@ -391,8 +391,8 @@ VALUES (?, ?, ?);
```json
{
"type": "list_files",
"since_version": 0
"type": "list_files",
"since_version": 0
}
```
@ -402,11 +402,11 @@ VALUES (?, ?, ?);
```json
{
"type": "file_updated",
"path": "notes/example.md",
"version": 11,
"size": 1024,
"hash": "abc123..."
"type": "file_updated",
"path": "notes/example.md",
"version": 11,
"size": 1024,
"hash": "abc123..."
}
```
@ -414,10 +414,10 @@ VALUES (?, ?, ?);
```json
{
"type": "file_content",
"path": "notes/example.md",
"content": "Updated content...",
"version": 11
"type": "file_content",
"path": "notes/example.md",
"content": "Updated content...",
"version": 11
}
```
@ -425,9 +425,9 @@ VALUES (?, ?, ?);
```json
{
"type": "file_deleted",
"path": "notes/old.md",
"version": 12
"type": "file_deleted",
"path": "notes/old.md",
"version": 12
}
```
@ -435,9 +435,9 @@ VALUES (?, ?, ?);
```json
{
"type": "sync_complete",
"total_files": 150,
"current_version": 200
"type": "sync_complete",
"total_files": 150,
"current_version": 200
}
```
@ -445,9 +445,9 @@ VALUES (?, ?, ?);
```json
{
"type": "error",
"message": "File too large",
"code": "FILE_TOO_LARGE"
"type": "error",
"message": "File too large",
"code": "FILE_TOO_LARGE"
}
```

View file

@ -53,7 +53,7 @@ Central authority for synchronisation. Rust + Axum framework.
**Technology**:
- **Language**: Rust 1.89+
- **Language**: Rust 1.92+
- **Framework**: Axum (async web framework)
- **Database**: SQLite with SQLx
- **Protocol**: WebSockets for real-time communication

View file

@ -243,9 +243,9 @@ users:
2. Client sends authentication message:
```json
{
"type": "auth",
"token": "user-token",
"vault": "vault-name"
"type": "auth",
"token": "user-token",
"vault": "vault-name"
}
```
3. Server validates:

View file

@ -75,7 +75,7 @@ chmod +x sync_server-linux-x86_64
### Build from Source
Requirements: Rust 1.89.0+, SQLite development headers, SQLx CLI
Requirements: Rust 1.92.0+, SQLite development headers, SQLx CLI
```bash
# Clone the repository

5960
docs/package-lock.json generated

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,8 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentWithCursors } from "./DocumentWithCursors";
export type ClientCursors = {
userName: string;
deviceId: string;
documentsWithCursors: Array<DocumentWithCursors>;
};

View file

@ -1,7 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface UpdateDocumentVersion {
parent_version_id: bigint;
export type CreateDocumentVersion = {
relative_path: string;
content: number[];
}
last_seen_vault_update_id: number;
content: Array<number>;
};

View file

@ -0,0 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentWithCursors } from "./DocumentWithCursors";
export type CursorPositionFromClient = {
documentsWithCursors: Array<DocumentWithCursors>;
};

View file

@ -0,0 +1,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { ClientCursors } from "./ClientCursors";
export type CursorPositionFromServer = { clients: Array<ClientCursors> };

View file

@ -1,5 +1,3 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface DeleteDocumentVersion {
relativePath: string;
}
export type CursorSpan = { start: number; end: number };

View file

@ -0,0 +1,10 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentVersion } from "./DocumentVersion";
import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent";
/**
* Response to a create/update document request.
*/
export type DocumentUpdateResponse =
| ({ type: "FastForwardUpdate" } & DocumentVersionWithoutContent)
| ({ type: "MergingUpdate" } & DocumentVersion);

View file

@ -0,0 +1,12 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type DocumentVersion = {
vaultUpdateId: number;
documentId: string;
relativePath: string;
updatedDate: string;
contentBase64: string;
isDeleted: boolean;
userId: string;
deviceId: string;
};

View file

@ -0,0 +1,16 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type DocumentVersionWithoutContent = {
vaultUpdateId: number;
documentId: string;
relativePath: string;
updatedDate: string;
isDeleted: boolean;
userId: string;
deviceId: string;
contentSize: number;
/**
* True iff this is the first version of the document
*/
isNewFile: boolean;
};

View file

@ -0,0 +1,9 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { CursorSpan } from "./CursorSpan";
export type DocumentWithCursors = {
vaultUpdateId: number | null;
documentId: string;
relativePath: string;
cursors: Array<CursorSpan>;
};

View file

@ -0,0 +1,13 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent";
/**
* Response to a fetch latest documents request.
*/
export type FetchLatestDocumentsResponse = {
latestDocuments: Array<DocumentVersionWithoutContent>;
/**
* The update ID of the latest document in the response.
*/
lastUpdateId: bigint;
};

View file

@ -0,0 +1,11 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { VaultInfo } from "./VaultInfo";
/**
* Response to listing vaults accessible to the authenticated user.
*/
export type ListVaultsResponse = {
vaults: Array<VaultInfo>;
hasMore: boolean;
userName: string;
};

View file

@ -0,0 +1,25 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
/**
* Response to a ping request.
*/
export type PingResponse = {
/**
* Semantic version of the server.
*/
serverVersion: string;
/**
* Whether the client is authenticated based on the sent Authorization
* header.
*/
isAuthenticated: boolean;
/**
* List of file extensions that are allowed to be merged.
*/
mergeableFileExtensions: Array<string>;
/**
* API version ensuring backwards & forwards compatibility between the client
* and server.
*/
supportedApiVersion: number;
};

View file

@ -0,0 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type SerializedError = {
errorType: string;
message: string;
causes: Array<string>;
};

View file

@ -0,0 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type UpdateTextDocumentVersion = {
parentVersionId: number;
relativePath: string | null;
content: Array<number | string>;
};

View file

@ -0,0 +1,10 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent";
/**
* Response to a vault history request (paginated).
*/
export type VaultHistoryResponse = {
versions: Array<DocumentVersionWithoutContent>;
hasMore: boolean;
};

View file

@ -0,0 +1,10 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
/**
* Summary of a single vault returned by the list-vaults endpoint.
*/
export type VaultInfo = {
name: string;
documentCount: number;
createdAt: string | null;
};

View file

@ -0,0 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { CursorPositionFromClient } from "./CursorPositionFromClient";
import type { WebSocketHandshake } from "./WebSocketHandshake";
export type WebSocketClientMessage =
| ({ type: "handshake" } & WebSocketHandshake)
| ({ type: "cursorPositions" } & CursorPositionFromClient);

View file

@ -0,0 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type WebSocketHandshake = {
token: string;
deviceId: string;
lastSeenVaultUpdateId: number | null;
};

View file

@ -0,0 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { CursorPositionFromServer } from "./CursorPositionFromServer";
import type { WebSocketVaultUpdate } from "./WebSocketVaultUpdate";
export type WebSocketServerMessage =
| ({ type: "vaultUpdate" } & WebSocketVaultUpdate)
| ({ type: "cursorPositions" } & CursorPositionFromServer);

View file

@ -0,0 +1,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent";
export type WebSocketVaultUpdate = { document: DocumentVersionWithoutContent };

View file

@ -1,6 +1,6 @@
export const TIMEOUT_FOR_MERGING_HISTORY_ENTRIES_IN_SECONDS = 60;
export const DIFF_CACHE_SIZE_MB = 2;
export const MAX_LOG_MESSAGE_COUNT = 100000;
export const MAX_HISTORY_ENTRY_COUNT = 5000;
export const SUPPORTED_API_VERSION = 2;
export const WEBSOCKET_DISCONNECT_TIMEOUT_IN_S = 10;
export const SUPPORTED_API_VERSION = 3;
export const WEBSOCKET_DISCONNECT_TIMEOUT_IN_SECONDS = 10;
export const WEBSOCKET_CONNECTION_TIMEOUT_IN_SECONDS = 10;

View file

@ -0,0 +1,9 @@
export class FileAlreadyExistsError extends Error {
public constructor(
message: string,
public readonly filePath: string
) {
super(message);
this.name = "FileAlreadyExistsError";
}
}

View file

@ -0,0 +1,9 @@
export class HttpClientError extends Error {
public constructor(
public readonly statusCode: number,
message: string
) {
super(message);
this.name = "HttpClientError";
}
}

View file

@ -1,13 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export interface CreateDocumentVersion {
/**
* The client can decide the document id (if it wishes to) in order
* to help with syncing. If the client does not provide a document id,
* the server will generate one. If the client provides a document id
* it must not already exist in the database.
*/
document_id: string | null;
relative_path: string;
last_seen_vault_update_id: number;
content: number[];
}

View file

@ -3,7 +3,7 @@ import type { DocumentVersion } from "./DocumentVersion";
import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent";
/**
* Response to an update document request.
* Response to a create/update document request.
*/
export type DocumentUpdateResponse =
| ({ type: "FastForwardUpdate" } & DocumentVersionWithoutContent)

View file

@ -9,4 +9,8 @@ export interface DocumentVersionWithoutContent {
userId: string;
deviceId: string;
contentSize: number;
/**
* True iff this is the first version of the document
*/
isNewFile: boolean;
}

View file

@ -2,8 +2,8 @@
import type { CursorSpan } from "./CursorSpan";
export interface DocumentWithCursors {
vault_update_id: number | null;
document_id: string;
relative_path: string;
vaultUpdateId: number | null;
documentId: string;
relativePath: string;
cursors: CursorSpan[];
}

View file

@ -7,7 +7,7 @@ import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutCont
export interface FetchLatestDocumentsResponse {
latestDocuments: DocumentVersionWithoutContent[];
/**
* The update ID of the latest document in the response.
*/
* The update ID of the latest document in the response.
*/
lastUpdateId: bigint;
}

View file

@ -0,0 +1,11 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { VaultInfo } from "./VaultInfo";
/**
* Response to listing vaults accessible to the authenticated user.
*/
export interface ListVaultsResponse {
vaults: VaultInfo[];
hasMore: boolean;
userName: string;
}

View file

@ -5,21 +5,21 @@
*/
export interface PingResponse {
/**
* Semantic version of the server.
*/
* Semantic version of the server.
*/
serverVersion: string;
/**
* Whether the client is authenticated based on the sent Authorization
* header.
*/
* Whether the client is authenticated based on the sent Authorization
* header.
*/
isAuthenticated: boolean;
/**
* List of file extensions that are allowed to be merged.
*/
* List of file extensions that are allowed to be merged.
*/
mergeableFileExtensions: string[];
/**
* API version ensuring backwards & forwards compatibility between the client
* and server.
*/
* API version ensuring backwards & forwards compatibility between the client
* and server.
*/
supportedApiVersion: number;
}

View file

@ -2,6 +2,6 @@
export interface UpdateTextDocumentVersion {
parentVersionId: number;
relativePath: string;
relativePath: string | null;
content: (number | string)[];
}

View file

@ -0,0 +1,10 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent";
/**
* Response to a vault history request (paginated).
*/
export interface VaultHistoryResponse {
versions: DocumentVersionWithoutContent[];
hasMore: boolean;
}

View file

@ -0,0 +1,10 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
/**
* Summary of a single vault returned by the list-vaults endpoint.
*/
export interface VaultInfo {
name: string;
documentCount: number;
createdAt: string | null;
}

View file

@ -2,6 +2,5 @@
import type { DocumentVersionWithoutContent } from "./DocumentVersionWithoutContent";
export interface WebSocketVaultUpdate {
documents: DocumentVersionWithoutContent[];
isInitialSync: boolean;
document: DocumentVersionWithoutContent;
}

View file

@ -9,7 +9,7 @@ type ResolvedTuple<T extends readonly unknown[]> = {
export const awaitAll = async <T extends readonly unknown[]>(
promises: PromiseTuple<T>
): Promise<ResolvedTuple<T>> => {
// eslint-disable-next-line no-restricted-properties
// eslint-disable-next-line no-restricted-properties, @typescript-eslint/await-thenable
const result = await Promise.allSettled(promises);
for (const res of result) {
if (res.status === "rejected") {

View file

@ -1,5 +1,3 @@
import { v4 as uuidv4 } from "uuid";
export function createClientId(): string {
// @ts-expect-error, injected by webpack
const packageVersion = __CURRENT_VERSION__; // eslint-disable-line
@ -8,8 +6,8 @@ export function createClientId(): string {
typeof navigator !== "undefined"
? navigator.platform // eslint-disable-line @typescript-eslint/no-deprecated
: typeof process !== "undefined"
? process.platform
: "unknown";
? process.platform
: "unknown";
return `vault-link/${packageVersion} (${uuidv4()}; ${platform})`;
return `vault-link/${packageVersion} (${Math.round(Math.random() * 1e10)}; ${platform})`;
}

View file

@ -1,25 +0,0 @@
type ResolveFunction<T> = undefined extends T
? (value?: T) => unknown
: (value: T) => unknown;
/**
* A type-safe utility function to create a Promise with resolve and reject functions.
* @returns A tuple containing a Promise, a resolve function, and a reject function.
*/
export function createPromise<T = unknown>(): [
Promise<T>,
ResolveFunction<T>,
(error: unknown) => unknown
] {
let resolve: undefined | ResolveFunction<T> = undefined;
let reject: undefined | ((error: unknown) => unknown) = undefined;
const creationPromise = new Promise<T>(
(resolve_, reject_) =>
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
((resolve = resolve_ as ResolveFunction<T>), (reject = reject_))
);
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return [creationPromise, resolve!, reject!];
}

View file

@ -13,56 +13,64 @@ export class EventListeners<TListener extends (...args: any[]) => any> {
}
/**
* Adds a new listener to the collection.
*
* @param listener The listener callback to add
* @returns An unsubscribe function that removes this listener when called
*/
* Adds a new listener to the collection.
*
* @param listener The listener callback to add
* @returns An unsubscribe function that removes this listener when called
*/
public add(listener: TListener): () => void {
this.listeners.push(listener);
return () => this.remove(listener);
}
/**
* Removes a listener from the collection.
*
* @param listener The listener callback to remove
* @returns true if the listener was found and removed, false otherwise
*/
* Removes a listener from the collection.
*
* @param listener The listener callback to remove
* @returns true if the listener was found and removed, false otherwise
*/
public remove(listener: TListener): boolean {
return removeFromArray(this.listeners, listener);
}
/**
* Triggers all listeners synchronously with the provided arguments.
* Any returned promises are ignored. Use triggerAsync() to await them.
*
* @param args The arguments to pass to each listener
*/
* Triggers all listeners synchronously with the provided arguments.
* Any returned promises are ignored. Use triggerAsync() to await them.
*
* @param args The arguments to pass to each listener
*/
public trigger(...args: Parameters<TListener>): void {
this.listeners.forEach((listener) => {
const snapshot = this.listeners.slice();
for (const listener of snapshot) {
// allow removing listeners during the trigger loop
if (!this.listeners.includes(listener)) {
continue;
}
listener(...args);
});
}
}
/**
* Triggers all listeners and awaits any promises they return.
* Synchronous listeners are called immediately, and any async listeners
* are awaited in parallel.
*
* @param args The arguments to pass to each listener
*/
* Triggers all listeners and awaits any promises they return.
* Synchronous listeners are called immediately, and any async listeners
* are awaited in parallel.
*
* @param args The arguments to pass to each listener
*/
public async triggerAsync(...args: Parameters<TListener>): Promise<void> {
await awaitAll(
this.listeners
.map((listener) => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return listener(...args);
})
.filter((result): result is Promise<unknown> => {
return result instanceof Promise;
})
);
const snapshot = this.listeners.slice();
const promises: Promise<unknown>[] = [];
for (const listener of snapshot) {
if (!this.listeners.includes(listener)) {
continue;
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const result = listener(...args);
if (result instanceof Promise) {
promises.push(result);
}
}
await awaitAll(promises);
}
public clear(): void {

View file

@ -1,6 +1,6 @@
// Implements an in-memory fixed-size cache for document contents,
import type { VaultUpdateId } from "../../persistence/database";
import type { VaultUpdateId } from "../../sync-operations/types";
// Doubly-linked list node for O(1) LRU operations
class LRUNode {

View file

@ -1,22 +1,24 @@
import { describe, it, beforeEach } from "node:test";
import assert from "node:assert";
import { Logger } from "../../tracing/logger";
import type { RelativePath } from "../../persistence/database";
import type { RelativePath } from "../../sync-operations/types";
import { Locks } from "./locks";
import { awaitAll } from "../await-all";
import { sleep } from "../sleep";
import { SyncResetError } from "../../services/sync-reset-error";
import { SyncResetError } from "../../errors/sync-reset-error";
describe("withLock", () => {
const testPath: RelativePath = "test/document/path";
const testPath2: RelativePath = "test/document/path2";
const testPath3: RelativePath = "test/document/path3";
const logger = new Logger();
// eslint-disable-next-line @typescript-eslint/init-declarations
let locks: Locks<RelativePath>;
beforeEach(() => {
locks = new Locks<RelativePath>(logger);
locks = new Locks<RelativePath>("locks-test", logger);
});
it("should execute function with single key lock", async () => {
@ -56,22 +58,32 @@ describe("withLock", () => {
it("should sort multiple keys to prevent deadlocks", async () => {
const executionOrder: string[] = [];
// Start two concurrent operations with keys in different orders
const promise1 = locks.withLock([testPath2, testPath], async () => {
executionOrder.push("operation1-start");
await sleep(50);
executionOrder.push("operation1-end");
return "result1";
});
await locks.waitForLock(testPath);
const promise2 = locks.withLock([testPath, testPath2], async () => {
executionOrder.push("operation2-start");
await sleep(50);
executionOrder.push("operation2-end");
return "result2";
});
const promise = awaitAll([
locks.withLock([testPath2, testPath3, testPath], async () => {
executionOrder.push("operation1-start");
executionOrder.push("operation1-end");
return "result1";
}),
const [result1, result2] = await awaitAll([promise1, promise2]);
locks.withLock([testPath3, testPath, testPath2], async () => {
executionOrder.push("operation2-start");
executionOrder.push("operation2-end");
return "result2";
})
]);
locks.unlock(testPath);
const [result1, result2] = await Promise.race([
promise,
new Promise<never>((_, reject) => {
setTimeout(() => {
reject(new Error("Deadlock detected"));
}, 1000);
})
]);
assert.strictEqual(result1, "result1");
assert.strictEqual(result2, "result2");
@ -234,13 +246,14 @@ describe("withLock", () => {
describe("reset", () => {
const testPath: RelativePath = "test/document/path";
const testPath2: RelativePath = "test/document/path2";
const logger = new Logger();
// eslint-disable-next-line @typescript-eslint/init-declarations
let locks: Locks<RelativePath>;
beforeEach(() => {
locks = new Locks<RelativePath>(logger);
locks = new Locks<RelativePath>("locks-test", logger);
});
it("should reject pending waiters with SyncResetError while running operation completes", async () => {
@ -289,4 +302,38 @@ describe("reset", () => {
const result = await locks.withLock(testPath, () => "success");
assert.strictEqual(result, "success");
});
it("should release partially acquired locks when reset interrupts multi-key acquisition", async () => {
// Hold testPath2 so multi-key acquisition will block on it
await locks.waitForLock(testPath2);
// Start multi-key lock that will acquire testPath first, then block on testPath2
const multiKeyPromise = locks.withLock(
[testPath, testPath2],
async () => "multi"
);
void multiKeyPromise.catch(() => {}); // eslint-disable-line @typescript-eslint/no-empty-function
// Wait for the multi-key operation to acquire testPath and start waiting on testPath2
await sleep(10);
// Reset should reject the waiting operation
locks.reset();
await assert.rejects(multiKeyPromise, (err: Error) => {
assert.ok(err instanceof SyncResetError);
return true;
});
// The key that was already acquired (testPath) should now be released
// This would hang/timeout if the lock was leaked
const result = await Promise.race([
locks.withLock(testPath, () => "success"),
sleep(100).then(() => {
throw new Error("Lock was not released - deadlock detected");
})
]);
assert.strictEqual(result, "success");
});
});

View file

@ -1,6 +1,5 @@
import { SyncResetError } from "../../services/sync-reset-error";
import { SyncResetError } from "../../errors/sync-reset-error";
import type { Logger } from "../../tracing/logger";
import { awaitAll } from "../await-all";
/**
* Manages exclusive locks on items to prevent concurrent modifications.
@ -8,47 +7,53 @@ import { awaitAll } from "../await-all";
*
* @template T The type of the key used for locking
*/
/** Waiter entry with callbacks */
interface WaiterEntry {
resolve: () => unknown;
reject: (err: unknown) => unknown;
}
export class Locks<T> {
/** Currently locked keys */
private readonly locked = new Set<T>();
/** Queue of resolve functions waiting for each key */
private readonly waiters = new Map<
T,
[() => unknown, (err: unknown) => unknown][]
>();
/** Queue of waiters for each key */
private readonly waiters = new Map<T, WaiterEntry[]>();
public constructor(private readonly logger?: Logger) {}
public constructor(
private readonly name: string,
private readonly logger?: Logger
) {}
/**
* Executes a function while holding exclusive locks on one or more keys.
*
* This method ensures that the provided function runs with exclusive access to the
* specified key(s). Multiple keys are sorted to prevent deadlocks when different
* operations request the same keys in different orders.
*
* @template R The return type of the function to execute
* @param keyOrKeys A single key or array of keys to lock during function execution
* @param fn The function to execute while holding the lock(s). Can be sync or async.
* @returns A Promise that resolves to the return value of the executed function
*
* @example
* ```typescript
* // Lock a single key
* const result = await locks.withLock('file1', () => {
* // Critical section - only one operation can access 'file1' at a time
* return processFile('file1');
* });
*
* // Lock multiple keys (prevents deadlocks through consistent ordering)
* await locks.withLock(['file1', 'file2'], async () => {
* // Critical section - exclusive access to both files
* await moveFile('file1', 'file2');
* });
* ```
*
* @throws Any error thrown by the provided function will be propagated after locks are released
*/
* Executes a function while holding exclusive locks on one or more keys.
*
* This method ensures that the provided function runs with exclusive access to the
* specified key(s). Multiple keys are sorted to prevent deadlocks when different
* operations request the same keys in different orders.
*
* @template R The return type of the function to execute
* @param keyOrKeys A single key or array of keys to lock during function execution
* @param fn The function to execute while holding the lock(s). Can be sync or async.
* @returns A Promise that resolves to the return value of the executed function
*
* @example
* ```typescript
* // Lock a single key
* const result = await locks.withLock('file1', () => {
* // Critical section - only one operation can access 'file1' at a time
* return processFile('file1');
* });
*
* // Lock multiple keys (prevents deadlocks through consistent ordering)
* await locks.withLock(['file1', 'file2'], async () => {
* // Critical section - exclusive access to both files
* await moveFile('file1', 'file2');
* });
* ```
*
* @throws Any error thrown by the provided function will be propagated after locks are released
*/
public async withLock<R>(
keyOrKeys: T | T[],
fn: () => R | Promise<R>
@ -59,12 +64,17 @@ export class Locks<T> {
const uniqueKeys = Array.from(new Set(keys));
uniqueKeys.sort((a, b) => String(a).localeCompare(String(b))); // Ensure consistent order to prevent deadlocks
await awaitAll(uniqueKeys.map(async (key) => this.waitForLock(key)));
const lockedKeys = [];
try {
for (const key of uniqueKeys) {
// Must acquire locks in-order (not concurrently) to prevent deadlocks
await this.waitForLock(key);
lockedKeys.push(key);
}
return await fn();
} finally {
uniqueKeys.forEach((key) => {
lockedKeys.forEach((key) => {
this.unlock(key);
});
}
@ -74,7 +84,7 @@ export class Locks<T> {
// Resolve all waiting promises before clearing to prevent deadlock
// Any operation waiting for a lock will be granted access immediately
for (const waiting of this.waiters.values()) {
for (const [_, reject] of waiting) {
for (const { reject } of waiting) {
reject(new SyncResetError());
}
}
@ -82,13 +92,17 @@ export class Locks<T> {
this.waiters.clear();
}
public isLocked(key: T): boolean {
return this.locked.has(key);
}
/**
* Attempts to acquire a lock immediately without waiting.
* Must call `unlock()` if successful.
*
* @param key The key to lock
* @returns `true` if lock acquired, `false` if already locked
*/
* Attempts to acquire a lock immediately without waiting.
* Must call `unlock()` if successful.
*
* @param key The key to lock
* @returns `true` if lock acquired, `false` if already locked
*/
public tryLock(key: T): boolean {
if (this.locked.has(key)) {
return false;
@ -100,18 +114,18 @@ export class Locks<T> {
}
/**
* Waits to acquire a lock, blocking until available.
* Operations are queued in FIFO order. Must call `unlock()` when done.
*
* @param key The key to wait for and lock
* @returns Promise that resolves when lock is acquired
*/
* Waits to acquire a lock, blocking until available.
* Operations are queued in FIFO order. Must call `unlock()` when done.
*
* @param key The key to wait for and lock
* @returns Promise that resolves when lock is acquired
*/
public async waitForLock(key: T): Promise<void> {
if (this.tryLock(key)) {
return Promise.resolve();
}
this.logger?.debug(`Waiting for lock on ${key}`);
this.logger?.debug(`Waiting for lock '${this.name}' on '${key}'`);
return new Promise((resolve, reject) => {
// DefaultDict behavior
@ -121,28 +135,36 @@ export class Locks<T> {
this.waiters.set(key, waiting);
}
waiting.push([resolve, reject]);
waiting.push({
resolve,
reject
});
});
}
/**
* Releases a lock and grants access to the next waiting operation in FIFO order.
* Removes the key from locked set if no waiters.
*
* @param key The key to unlock
* @throws {Error} If key is not currently locked
*/
* Releases a lock and grants access to the next waiting operation in FIFO order.
* Removes the key from locked set if no waiters.
*
* @param key The key to unlock
* @throws {Error} If key is not currently locked
*/
public unlock(key: T): void {
if (!this.locked.has(key)) {
this.logger?.debug(
`Attempted to unlock '${this.name}' on '${key}' which is not locked`
);
return;
}
// Remove first waiter to ensure FIFO order
const [resolveNextWaiting, _] = this.waiters.get(key)?.shift() ?? [];
this.logger?.debug(`Releasing lock '${this.name}' on '${key}'`);
if (resolveNextWaiting) {
this.logger?.debug(`Granted lock on ${key}`);
resolveNextWaiting();
// Remove first waiter to ensure FIFO order
const nextWaiter = this.waiters.get(key)?.shift();
if (nextWaiter) {
this.logger?.debug(`Granted lock '${this.name}' on '${key}'`);
nextWaiter.resolve();
} else {
this.locked.delete(key);
}
@ -152,8 +174,8 @@ export class Locks<T> {
export class Lock {
private readonly locks: Locks<boolean>;
public constructor(logger?: Logger) {
this.locks = new Locks(logger);
public constructor(name: string, logger?: Logger) {
this.locks = new Locks(name, logger);
}
public async withLock<R>(fn: () => R | Promise<R>): Promise<R> {

View file

@ -1,15 +1,15 @@
import { describe, it } from "node:test";
import assert from "node:assert";
import { CoveredValues } from "./min-covered";
import { MinCovered } from "./min-covered";
describe("CoveredValues", () => {
describe("MinCovered", () => {
it("should initialize with the given min value", () => {
const covered = new CoveredValues(5);
const covered = new MinCovered(5);
assert.strictEqual(covered.min, 5);
});
it("should add values greater than min", () => {
const covered = new CoveredValues(0);
const covered = new MinCovered(0);
covered.add(3);
assert.strictEqual(covered.min, 0);
covered.add(1);
@ -21,7 +21,7 @@ describe("CoveredValues", () => {
});
it("should ignore duplicate values", () => {
const covered = new CoveredValues(0);
const covered = new MinCovered(0);
covered.add(3);
covered.add(3);
covered.add(3);
@ -32,7 +32,7 @@ describe("CoveredValues", () => {
});
it("should handle multiple consecutive values", () => {
const covered = new CoveredValues(132);
const covered = new MinCovered(132);
for (let i = 250; i > 132; i--) {
assert.strictEqual(covered.min, 132);
covered.add(i);
@ -41,36 +41,32 @@ describe("CoveredValues", () => {
});
it("should handle adding values lower than current min", () => {
const covered = new CoveredValues(5);
const covered = new MinCovered(5);
covered.add(3);
assert.strictEqual(covered.min, 5);
covered.add(6);
assert.strictEqual(covered.min, 6);
});
it("should auto-advance when setting min value", () => {
const covered = new CoveredValues(5);
it("should auto-advance when adding the value that fills the next gap", () => {
const covered = new MinCovered(5);
covered.add(7);
covered.add(8);
covered.add(9);
assert.strictEqual(covered.min, 5);
// Setting min to 6 should auto-advance through 7, 8, 9
covered.min = 6;
// Adding 6 fills the gap and auto-advances through 7, 8, 9
covered.add(6);
assert.strictEqual(covered.min, 9);
covered.add(10);
assert.strictEqual(covered.min, 10);
});
it("should handle setting min value with no consecutive values", () => {
const covered = new CoveredValues(5);
covered.add(10);
covered.add(15);
assert.strictEqual(covered.min, 5);
// Setting min to 8 should not auto-advance (no consecutive values)
covered.min = 8;
assert.strictEqual(covered.min, 8);
// Add 9 to trigger auto-advance to 10
covered.add(9);
assert.strictEqual(covered.min, 10);
it("should rewind when reset is called explicitly", () => {
const covered = new MinCovered(5);
covered.add(7);
covered.reset(3);
assert.strictEqual(covered.min, 3);
covered.add(4);
assert.strictEqual(covered.min, 4);
});
});

View file

@ -7,13 +7,13 @@
*
* @example
* ```typescript
* const covered = new CoveredValues(0);
* const covered = new MinCovered(0);
* covered.add(2); // seenValues = [2], min = 0
* covered.add(1); // seenValues = [], min = 2
* covered.min; // returns 2
* ```
*/
export class CoveredValues {
export class MinCovered {
private seenValues: number[] = [];
public constructor(private minValue: number) {}
@ -22,12 +22,6 @@ export class CoveredValues {
return this.minValue;
}
public set min(value: number) {
this.minValue = Math.max(value, this.minValue);
this.seenValues = this.seenValues.filter((v) => v > this.minValue);
this.advanceMinWhilePossible();
}
public add(value: number | undefined): void {
if (value === undefined || value < this.minValue) {
return;
@ -49,6 +43,11 @@ export class CoveredValues {
this.advanceMinWhilePossible();
}
public reset(minValue?: number): void {
this.minValue = minValue ?? 0;
this.seenValues = [];
}
private advanceMinWhilePossible(): void {
while (
this.seenValues.length > 0 &&

View file

@ -0,0 +1,69 @@
import type { RelativePath } from "../../sync-operations/types";
import type { TextWithCursors } from "reconcile-text";
import type { FileSystemOperations } from "../../file-operations/filesystem-operations";
export class InMemoryFileSystem implements FileSystemOperations {
protected readonly files = new Map<string, Uint8Array>();
public async listFilesRecursively(
_root: RelativePath | undefined = undefined // we don't use multi-level paths during tests
): Promise<RelativePath[]> {
return Array.from(this.files.keys());
}
public async read(path: RelativePath): Promise<Uint8Array> {
const file = this.files.get(path);
if (!file) {
throw new Error(`File ${path} does not exist`);
}
return file;
}
public async write(path: RelativePath, content: Uint8Array): Promise<void> {
this.files.set(path, content);
}
public async atomicUpdateText(
path: RelativePath,
updater: (current: TextWithCursors) => TextWithCursors
): Promise<string> {
const file = this.files.get(path);
if (!file) {
throw new Error(`File ${path} does not exist`);
}
const currentContent = new TextDecoder().decode(file);
const newContent = updater({ text: currentContent, cursors: [] }).text;
this.files.set(path, new TextEncoder().encode(newContent));
return newContent;
}
public async getFileSize(path: RelativePath): Promise<number> {
return (await this.read(path)).length;
}
public async exists(path: RelativePath): Promise<boolean> {
return this.files.has(path);
}
public async createDirectory(_path: RelativePath): Promise<void> {
// This doesn't mean anything in our virtual FS representation
}
public async delete(path: RelativePath): Promise<void> {
this.files.delete(path);
}
public async rename(
oldPath: RelativePath,
newPath: RelativePath
): Promise<void> {
const file = this.files.get(oldPath);
if (!file) {
throw new Error(`File ${oldPath} does not exist`);
}
this.files.set(newPath, file);
if (oldPath !== newPath) {
this.files.delete(oldPath);
}
}
}

View file

@ -1,10 +1,44 @@
import type { SyncClient } from "../../sync-client";
import type { LogLine } from "../../tracing/logger";
/* eslint-disable no-console */
import type { Logger, LogLine } from "../../tracing/logger";
import { LogLevel } from "../../tracing/logger";
export function logToConsole(client: SyncClient): void {
client.logger.onLogEmitted.add((logLine: LogLine) => {
const formatted = `${logLine.timestamp.toISOString()} ${logLine.level} ${logLine.message}`;
const COLORS = {
reset: "\x1b[0m",
red: "\x1b[31m",
yellow: "\x1b[33m",
blue: "\x1b[34m",
gray: "\x1b[90m"
};
export function logToConsole(
logger: Logger,
{ useColors = true }: { useColors?: boolean } = {}
): void {
logger.onLogEmitted.add((logLine: LogLine) => {
const timestamp = logLine.timestamp.toISOString();
const { message } = logLine;
let color = "";
let reset = "";
if (useColors) {
({ reset } = COLORS);
switch (logLine.level) {
case LogLevel.ERROR:
color = COLORS.red;
break;
case LogLevel.WARNING:
color = COLORS.yellow;
break;
case LogLevel.INFO:
color = COLORS.blue;
break;
case LogLevel.DEBUG:
color = COLORS.gray;
break;
}
}
const formatted = `${timestamp} ${color}${logLine.level}${reset} ${message}`;
switch (logLine.level) {
case LogLevel.ERROR:

View file

@ -11,7 +11,7 @@ export function slowWebSocketFactory(
private static readonly RECEIVE_KEY = "websocket-receive";
private static readonly SEND_KEY = "websocket-send";
private readonly locks = new Locks(logger);
private readonly locks = new Locks(FlakyWebSocket.name, logger);
public set onopen(callback: ((event: Event) => void) | null) {
super.onopen = async (event: Event): Promise<void> => {

View file

@ -1,14 +1,17 @@
import type { DocumentRecord } from "../persistence/database";
import type { DocumentRecord } from "../sync-operations/types";
import { EMPTY_HASH } from "./hash";
// TODO: make this smarter so that offline files can be renamed & edited at the same time
export function findMatchingFile(
export async function findMatchingFile(
contentHash: string,
candidates: DocumentRecord[]
): DocumentRecord | undefined {
if (contentHash === EMPTY_HASH) {
): Promise<DocumentRecord | undefined> {
if (contentHash === (await EMPTY_HASH)) {
return undefined;
}
return candidates.find(({ metadata }) => metadata?.hash === contentHash);
return candidates.find(
(record) =>
record.remoteHash !== undefined && record.remoteHash === contentHash
);
}

View file

@ -1,12 +1,14 @@
// https://stackoverflow.com/questions/7616461/generate-a-hash-from-string-in-javascript
export function hash(content: Uint8Array): string {
let result = 0;
// eslint-disable-next-line @typescript-eslint/prefer-for-of
for (let i = 0; i < content.length; i++) {
result = (result << 5) - result + content[i];
result |= 0; // Convert to 32bit integer
}
return Math.abs(result).toString(16).padStart(8, "0");
export async function hash(content: Uint8Array): Promise<string> {
// Re-wrap into a fresh Uint8Array<ArrayBuffer> so SubtleCrypto's
// BufferSource overload accepts it without an unsafe type assertion.
// The lib types require an ArrayBuffer-backed view; the source may
// be backed by SharedArrayBuffer in some runtimes.
const buffer = new ArrayBuffer(content.byteLength);
new Uint8Array(buffer).set(content);
const digest = await crypto.subtle.digest("SHA-256", buffer);
const bytes = new Uint8Array(digest);
return Array.from(bytes, (b) => b.toString(16).padStart(2, "0")).join("");
}
export const EMPTY_HASH = hash(new Uint8Array(0));
// SHA-256 of empty content, computed once at import time
export const EMPTY_HASH: Promise<string> = hash(new Uint8Array());

View file

@ -1,4 +1,4 @@
import { createPromise } from "./create-promise";
import { awaitAll } from "./await-all";
import { sleep } from "./sleep";
/**
@ -45,18 +45,16 @@ export function rateLimit<
newArgs = undefined;
}
const [promise, resolve] = createPromise();
running = promise;
sleep(
// `running` must signal both "minimum interval has elapsed" *and*
// "fn() has finished" — otherwise an `fn` that takes longer than
// the interval would let a queued waiter fire a concurrent `fn`
const interval =
typeof minIntervalMs === "function"
? minIntervalMs()
: minIntervalMs
)
.then(resolve)
.catch(() => {
// sleep cannot fail
});
return fn(...args);
: minIntervalMs;
const fnPromise = fn(...args);
running = awaitAll([fnPromise.catch(() => undefined), sleep(interval)]);
return fnPromise;
};
return decoratedFn;

6
package-lock.json generated Normal file
View file

@ -0,0 +1,6 @@
{
"name": "vault-link",
"lockfileVersion": 3,
"requires": true,
"packages": {}
}

11
rustfmt.toml Normal file
View file

@ -0,0 +1,11 @@
# Rustfmt configuration
# This should match the .editorconfig settings
# Use spaces for indentation (matches .editorconfig indent_style = space)
hard_tabs = false
# Use 4 spaces for indentation (matches .editorconfig indent_size = 4)
tab_spaces = 4
# Use Unix line endings (matches .editorconfig end_of_line = lf)
newline_style = "Unix"

View file

@ -35,7 +35,8 @@ cd ..
cp frontend/obsidian-plugin/manifest.json manifest.json # for BRAT, otherwise it wouldn't update
git ls-files | xargs npx eclint fix
# Format all files across the project (frontend and backend)
npx -C frontend prettier --write "**/*.{ts,js,json,md,yml,yaml}"
# Commit and tag
git add .

View file

@ -30,8 +30,11 @@ fi
which cargo-machete || cargo install cargo-machete
cargo machete --with-metadata
cd ..
scripts/update-api-types.sh # this will dirty up the git state if not up-to-date
echo "Running checks in frontend"
cd ../frontend
cd frontend
if [[ "$FIX_MODE" == true ]]; then
npm install
@ -45,10 +48,11 @@ cd frontend
npm run build
npm run test
npm run lint
cd ..
# Use git ls-files to only check tracked files, respecting .gitignore
# We always run in fix mode and then check with git status
git ls-files | xargs npx eclint fix
# Format all files across the project (frontend and backend)
# Prettier respects .gitignore by default
npx -C frontend prettier --write "**/*.{ts,js,json,md,yml,yaml}"
if [[ "$FIX_MODE" == false ]] && [[ $(git status --porcelain) ]]; then
git status --porcelain
@ -56,6 +60,4 @@ if [[ "$FIX_MODE" == false ]] && [[ $(git status --porcelain) ]]; then
exit 1
fi
cd ..
echo "Success"

View file

@ -1,4 +1,4 @@
#!/bin/bash
rm -rf sync-server/databases
rm -rf /host/tmp/vaultlink-e2e-databases
rm -rf logs

View file

@ -19,35 +19,51 @@ process_count=$1
mkdir -p logs
# Build and restart the server
echo "Building server..."
cd sync-server
cargo build --release
# Kill any existing server process
echo "Stopping existing server..."
pkill -f "sync_server" 2>/dev/null || true
sleep 1
# Clean databases (uses tmpfs via /dev/shm for zero disk I/O)
echo "Cleaning databases..."
rm -rf /host/tmp/vaultlink-e2e-databases
# Start the server in the background
echo "Starting server..."
./target/release/sync_server config-e2e.yml &
server_pid=$!
echo "Server started with PID: $server_pid"
# Ensure server is killed on script exit
cleanup_server() {
if [ -n "$server_pid" ]; then
echo "Stopping server (PID: $server_pid)..."
kill $server_pid 2>/dev/null || true
wait $server_pid 2>/dev/null || true
server_pid=""
fi
}
trap cleanup_server EXIT
cd ..
cd frontend
npm ci
npm run build
../scripts/utils/wait-for-server.sh
cd ..
scripts/update-api-types.sh
if [[ $(git status --porcelain) ]]; then
git status --porcelain
echo "Failing CI because the working directory is not clean after generating api types"
exit 1
fi
cd frontend
pids=()
for i in $(seq 1 $process_count); do
# Create a named pipe for this process
pipe="/tmp/vaultlink_pipe_$$_$i"
mkfifo "$pipe"
# Start the node process writing to the pipe
node test-client/dist/cli.js > "$pipe" 2>&1 &
node test-client/dist/cli.js > "../logs/log_${i}.log" 2>&1 &
pid=$!
pids+=($pid)
echo "Started process $i with PID: $pid"
# Read from pipe, prefix with PID
(sed "s/^/[PID $pid] /" < "$pipe" > "../logs/log_${i}.log"; rm "$pipe") &
echo "Started process $i with PID: $pid (log: logs/log_${i}.log)"
done
cd ..
@ -75,10 +91,25 @@ print_failed_log() {
return 1
}
echo "Monitoring $process_count processes"
E2E_TIMEOUT=${2:-3600}
start_time=$(date +%s)
echo "Monitoring $process_count processes (timeout: ${E2E_TIMEOUT}s)"
# Monitor processes
while true; do
# Script-level timeout to prevent indefinite hangs
current_time=$(date +%s)
elapsed=$((current_time - start_time))
if [ $elapsed -ge $E2E_TIMEOUT ]; then
echo "E2E timeout reached (${E2E_TIMEOUT}s). Killing remaining processes."
for pid in "${pids[@]}"; do
if [ -n "$pid" ]; then
kill $pid 2>/dev/null || true
fi
done
exit 1
fi
if print_failed_log; then
# Kill remaining processes
for pid in "${pids[@]}"; do
@ -99,6 +130,7 @@ while true; do
done
if $all_done; then
cleanup_server
echo "All processes completed successfully"
exit 0
fi

View file

@ -8,9 +8,15 @@ cd sync-server
cargo test export_bindings
cd -
# Both target directories contain only generated bindings — wipe and copy
rm -f frontend/sync-client/src/services/types/*.ts
rm -f frontend/history-ui/src/lib/types/*.ts
cp -r sync-server/bindings/* frontend/sync-client/src/services/types/
cp -r sync-server/bindings/* frontend/history-ui/src/lib/types/
cd frontend
npm run lint
git ls-files | xargs npx eclint fix
cd -
cd ..
# Format all files across the project (frontend and backend)
npx -C frontend prettier --write "**/*.{ts,js,json,md,yml,yaml}"

View file

@ -2,8 +2,10 @@
set -e
TARGET_NODE_VERSION=25
node_version=$(node -v | sed 's/^v\([0-9]*\).*/\1/')
if [ "$node_version" != "22" ]; then
echo "Error: This script requires Node.js version 22, found: $node_version"
if [ "$node_version" != "$TARGET_NODE_VERSION" ]; then
echo "Error: This script requires Node.js version $TARGET_NODE_VERSION, found: $node_version"
exit 1
fi

View file

@ -2,14 +2,14 @@
set -e
SERVER_URL="http://localhost:3000"
SERVER_URL="http://localhost:3010"
MAX_RETRIES=30
RETRY_INTERVAL_IN_SECONDS=5
echo "Waiting for $SERVER_URL to become available..."
count=0
while [ $count -lt $MAX_RETRIES ]; do
if curl -s -f -o /dev/null $SERVER_URL; then
if curl -s -o /dev/null $SERVER_URL; then
echo "$SERVER_URL is now available!"
break
fi

193
sync-server/Cargo.lock generated
View file

@ -337,10 +337,11 @@ checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b"
[[package]]
name = "cc"
version = "1.2.2"
version = "1.2.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f34d93e62b03caf570cccc334cbc6c2fceca82f39211051345108adcba3eebdc"
checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423"
dependencies = [
"find-msvc-tools",
"shlex",
]
@ -456,6 +457,15 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
[[package]]
name = "crossbeam-channel"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.11"
@ -533,6 +543,15 @@ dependencies = [
"zeroize",
]
[[package]]
name = "deranged"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c"
dependencies = [
"powerfmt",
]
[[package]]
name = "digest"
version = "0.10.7"
@ -624,6 +643,12 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4"
[[package]]
name = "find-msvc-tools"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582"
[[package]]
name = "flume"
version = "0.11.1"
@ -1272,6 +1297,16 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "miniz_oxide"
version = "0.8.0"
@ -1335,6 +1370,12 @@ dependencies = [
"zeroize",
]
[[package]]
name = "num-conv"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967"
[[package]]
name = "num-integer"
version = "0.1.46"
@ -1463,6 +1504,12 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2"
[[package]]
name = "powerfmt"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "ppv-lite86"
version = "0.2.20"
@ -1582,12 +1629,12 @@ dependencies = [
[[package]]
name = "reconcile-text"
version = "0.8.0"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "599cf9539996a2a19e501110404c59ba62f4974009f8fb864a8b7151c15ee5a5"
checksum = "52e0cf361887ea64c479ca871c1170dda761f84e122f2616b5579906a38d7557"
dependencies = [
"serde",
"thiserror 2.0.17",
"thiserror 2.0.18",
]
[[package]]
@ -1648,6 +1695,40 @@ dependencies = [
"zeroize",
]
[[package]]
name = "rust-embed"
version = "8.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04113cb9355a377d83f06ef1f0a45b8ab8cd7d8b1288160717d66df5c7988d27"
dependencies = [
"rust-embed-impl",
"rust-embed-utils",
"walkdir",
]
[[package]]
name = "rust-embed-impl"
version = "8.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da0902e4c7c8e997159ab384e6d0fc91c221375f6894346ae107f47dd0f3ccaa"
dependencies = [
"proc-macro2",
"quote",
"rust-embed-utils",
"syn 2.0.90",
"walkdir",
]
[[package]]
name = "rust-embed-utils"
version = "8.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bcdef0be6fe7f6fa333b1073c949729274b05f123a0ad7efcb8efd878e5c3b1"
dependencies = [
"sha2",
"walkdir",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
@ -1679,6 +1760,15 @@ version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]]
name = "sanitize-filename"
version = "0.6.0"
@ -1916,7 +2006,7 @@ dependencies = [
"serde_json",
"sha2",
"smallvec",
"thiserror 2.0.17",
"thiserror 2.0.18",
"tokio",
"tokio-stream",
"tracing",
@ -2000,7 +2090,7 @@ dependencies = [
"smallvec",
"sqlx-core",
"stringprep",
"thiserror 2.0.17",
"thiserror 2.0.18",
"tracing",
"uuid",
"whoami",
@ -2039,7 +2129,7 @@ dependencies = [
"smallvec",
"sqlx-core",
"stringprep",
"thiserror 2.0.17",
"thiserror 2.0.18",
"tracing",
"uuid",
"whoami",
@ -2065,7 +2155,7 @@ dependencies = [
"serde",
"serde_urlencoded",
"sqlx-core",
"thiserror 2.0.17",
"thiserror 2.0.18",
"tracing",
"url",
"uuid",
@ -2100,6 +2190,12 @@ version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "symlink"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7973cce6668464ea31f176d85b13c7ab3bba2cb3b77a2ed26abd7801688010a"
[[package]]
name = "syn"
version = "1.0.109"
@ -2136,18 +2232,22 @@ dependencies = [
"futures",
"humantime-serde",
"log",
"mime_guess",
"rand 0.9.0",
"reconcile-text",
"regex",
"rust-embed",
"sanitize-filename",
"serde",
"serde_json",
"serde_yaml",
"sqlx",
"thiserror 2.0.17",
"subtle",
"thiserror 2.0.18",
"tokio",
"tower-http",
"tracing",
"tracing-appender",
"tracing-subscriber",
"ts-rs",
"uuid",
@ -2203,11 +2303,11 @@ dependencies = [
[[package]]
name = "thiserror"
version = "2.0.17"
version = "2.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8"
checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4"
dependencies = [
"thiserror-impl 2.0.17",
"thiserror-impl 2.0.18",
]
[[package]]
@ -2223,9 +2323,9 @@ dependencies = [
[[package]]
name = "thiserror-impl"
version = "2.0.17"
version = "2.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913"
checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5"
dependencies = [
"proc-macro2",
"quote",
@ -2242,6 +2342,37 @@ dependencies = [
"once_cell",
]
[[package]]
name = "time"
version = "0.3.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c"
dependencies = [
"deranged",
"itoa",
"num-conv",
"powerfmt",
"serde_core",
"time-core",
"time-macros",
]
[[package]]
name = "time-core"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca"
[[package]]
name = "time-macros"
version = "0.2.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215"
dependencies = [
"num-conv",
"time-core",
]
[[package]]
name = "tinystr"
version = "0.7.6"
@ -2276,7 +2407,6 @@ dependencies = [
"bytes",
"libc",
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
@ -2376,6 +2506,19 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-appender"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "050686193eb999b4bb3bc2acfa891a13da00f79734704c4b8b4ef1a10b368a3c"
dependencies = [
"crossbeam-channel",
"symlink",
"thiserror 2.0.18",
"time",
"tracing-subscriber",
]
[[package]]
name = "tracing-attributes"
version = "0.1.28"
@ -2434,7 +2577,7 @@ checksum = "e640d9b0964e9d39df633548591090ab92f7a4567bc31d3891af23471a3365c6"
dependencies = [
"chrono",
"lazy_static",
"thiserror 2.0.17",
"thiserror 2.0.18",
"ts-rs-macros",
"uuid",
]
@ -2481,6 +2624,12 @@ version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f720def6ce1ee2fc44d40ac9ed6d3a59c361c80a75a7aa8e75bb9baed31cf2ea"
[[package]]
name = "unicase"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142"
[[package]]
name = "unicode-bidi"
version = "0.3.17"
@ -2577,6 +2726,16 @@ version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]]
name = "walkdir"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
dependencies = [
"same-file",
"winapi-util",
]
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"

View file

@ -1,6 +1,6 @@
[package]
name = "sync_server"
rust-version = "1.89.0"
rust-version = "1.94.0"
authors = ["Andras Schmelczer <andras@schmelczer.dev>"]
edition = "2024"
license = "MIT"
@ -10,7 +10,7 @@ version = "0.14.0"
[dependencies]
serde = { version = "1.0.219", default-features = false, features = ["derive"] }
thiserror = { version = "2.0.12", default-features = false }
tokio = { version = "1.48.0", features = ["full"]}
tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread", "sync", "time", "net", "fs", "signal"]}
uuid = { version = "1.16.0", features = ["v4", "serde"] }
log = { version = "0.4.28" }
anyhow = { version = "1.0.100", features = ["backtrace"] }
@ -20,6 +20,7 @@ axum_typed_multipart = "0.11.0"
tower-http = { version = "0.6.1", features = ["cors", "trace", "limit", "timeout"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.20", features = ["fmt", "env-filter"]}
tracing-appender = "0.2.5"
humantime-serde = "1.1.1"
sqlx = { version = "0.8.6", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] }
chrono = { version = "0.4.41", features = ["serde"] }
@ -33,7 +34,10 @@ serde_json = "1.0.140"
bimap = "0.6.3"
ts-rs = { version = "10.1", features = ["uuid-impl", "chrono-impl"] }
base64 = "0.22.1"
reconcile-text = { version = "0.8.0", features = ["serde"] }
reconcile-text = { version = "0.11.0", features = ["serde"] }
rust-embed = "8.5"
mime_guess = "2.0"
subtle = "2.6.1"
[profile.release]
codegen-units = 1

View file

@ -1,5 +1,16 @@
// generated by `sqlx migrate build-script`
fn main() {
// trigger recompilation when a new migration is added
println!("cargo:rerun-if-changed=migrations");
// Ensure the history-ui dist directory exists so rust-embed can compile
// even when the frontend hasn't been built yet.
let dist_path = std::path::Path::new("../frontend/history-ui/dist");
if !dist_path.exists() {
std::fs::create_dir_all(dist_path).expect("Failed to create history-ui dist directory");
std::fs::write(
dist_path.join("index.html"),
"<!DOCTYPE html><html><body><p>Run <code>npm run build -w history-ui</code> first.</p></body></html>",
)
.expect("Failed to write placeholder index.html");
}
}

View file

@ -1,32 +1,34 @@
database:
databases_directory_path: databases
max_connections_per_vault: 12
databases_directory_path: /host/tmp/vaultlink-e2e-databases
max_connections_per_vault: 8
cursor_timeout: 1m
server:
host: 0.0.0.0
port: 3000
port: 3010
max_body_size_mb: 512
max_clients_per_vault: 256
max_pending_websocket_connections: 4096
broadcast_channel_capacity: 1024
response_timeout: 30m
mergeable_file_extensions:
- md
- txt
- md
- txt
users:
user_configs:
- name: admin
token: test-token-change-me
vault_access:
type: allow_access_to_all
- name: other-admin
token: test-token-change-me2
vault_access:
type: allow_access_to_all
- name: test
token: other-test-token
vault_access:
type: allow_list
allowed:
- default
- name: admin
token: test-token-change-me
vault_access:
type: allow_access_to_all
- name: other-admin
token: test-token-change-me2
vault_access:
type: allow_access_to_all
- name: test
token: other-test-token
vault_access:
type: allow_list
allowed:
- default
logging:
log_directory: logs
log_rotation: 7days

View file

@ -1,5 +1,5 @@
[toolchain]
channel = "1.89.0"
channel = "1.94.0"
targets = [
"x86_64-unknown-linux-gnu",
"x86_64-unknown-linux-musl",

View file

@ -2,6 +2,8 @@ pub mod cursors;
pub mod database;
pub mod websocket;
use std::sync::{Arc, atomic::AtomicUsize};
use anyhow::Result;
use cursors::Cursors;
use database::Database;
@ -15,21 +17,42 @@ pub struct AppState {
pub database: Database,
pub cursors: Cursors,
pub broadcasts: Broadcasts,
/// Tracks WebSocket connections that have upgraded but not yet completed
/// the authentication handshake
pub pending_ws_connections: Arc<AtomicUsize>,
/// Send on this channel to stop background tasks (cursor cleanup,
/// idle-pool cleanup)
shutdown_tx: Arc<tokio::sync::watch::Sender<()>>,
}
impl AppState {
pub async fn try_new(config: Config) -> Result<Self> {
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(());
let broadcasts = Broadcasts::new(&config.server);
let database = Database::try_new(&config.database, &broadcasts).await?;
let database =
Database::try_new(&config.database, &broadcasts, shutdown_rx.clone()).await?;
let cursors: Cursors = Cursors::new(&config.database, &broadcasts);
Cursors::start_background_task(cursors.clone());
Cursors::start_background_task(cursors.clone(), shutdown_rx);
Ok(Self {
config,
database,
cursors,
broadcasts,
pending_ws_connections: Arc::new(AtomicUsize::new(0)),
shutdown_tx: Arc::new(shutdown_tx),
})
}
/// Signal all background tasks (idle pool cleanup, cursor cleanup) to stop
pub fn shutdown(&self) {
let _ = self.shutdown_tx.send(());
}
/// Get a receiver to be notified when shutdown is triggered
pub fn subscribe_shutdown(&self) -> tokio::sync::watch::Receiver<()> {
self.shutdown_tx.subscribe()
}
}

View file

@ -42,7 +42,9 @@ impl Cursors {
) {
let mut vault_to_cursors = self.vault_to_cursors.lock().await;
let all_device_cursors = vault_to_cursors.entry(vault_id).or_insert_with(Vec::new);
let all_device_cursors = vault_to_cursors
.entry(vault_id.clone())
.or_insert_with(Vec::new);
all_device_cursors.retain(|c| &c.client_cursors.device_id != device_id);
all_device_cursors.push(ClientCursorsWithTimeToLive::new(ClientCursors {
@ -52,7 +54,7 @@ impl Cursors {
}));
drop(vault_to_cursors); // Explicitly drop the lock before broadcasting to avoid deadlock
self.broadcast_cursors().await;
self.broadcast_cursors_for_vault(&vault_id).await;
}
pub async fn get_cursors(&self, vault_id: &VaultId) -> Vec<ClientCursors> {
@ -69,45 +71,81 @@ impl Cursors {
.unwrap_or_default()
}
pub fn start_background_task(self) {
pub fn start_background_task(self, mut shutdown: tokio::sync::watch::Receiver<()>) {
tokio::spawn(async move {
loop {
self.remove_expired_cursors().await;
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::select! {
() = tokio::time::sleep(Duration::from_secs(1)) => {
self.remove_expired_cursors().await;
}
Ok(()) = shutdown.changed() => break,
}
}
});
}
async fn remove_expired_cursors(&self) {
let mut vault_to_cursors = self.vault_to_cursors.lock().await;
let changed_vaults: Vec<VaultId> = {
let mut vault_to_cursors = self.vault_to_cursors.lock().await;
for (_vault_id, cursors) in vault_to_cursors.iter_mut() {
cursors.retain(|cursor| !cursor.is_expired(self.config.cursor_timeout));
let mut changed = Vec::new();
for (vault_id, cursors) in vault_to_cursors.iter_mut() {
let before = cursors.len();
cursors.retain(|cursor| !cursor.is_expired(self.config.cursor_timeout));
if cursors.len() != before {
changed.push(vault_id.clone());
}
}
// Remove empty vault entries to prevent unbounded growth
vault_to_cursors.retain(|_, cursors| !cursors.is_empty());
changed
};
for vault_id in &changed_vaults {
self.broadcast_cursors_for_vault(vault_id).await;
}
}
async fn broadcast_cursors(&self) {
let vault_to_cursors = self.vault_to_cursors.lock().await;
async fn broadcast_cursors_for_vault(&self, vault_id: &VaultId) {
let client_cursors: Vec<ClientCursors> = {
let vault_to_cursors = self.vault_to_cursors.lock().await;
vault_to_cursors
.get(vault_id)
.map(|cursors| cursors.iter().map(|c| c.client_cursors.clone()).collect())
.unwrap_or_default()
};
for (vault_id, cursors) in vault_to_cursors.iter() {
self.broadcasts
.send_document_update(
vault_id.clone(),
WebSocketServerMessageWithOrigin::new(WebSocketServerMessage::CursorPositions(
CursorPositionFromServer {
clients: cursors.iter().map(|c| c.client_cursors.clone()).collect(),
},
)),
)
.await;
}
self.broadcasts.send_document_update(
vault_id.clone(),
WebSocketServerMessageWithOrigin::new(WebSocketServerMessage::CursorPositions(
CursorPositionFromServer {
clients: client_cursors,
},
)),
);
}
pub async fn remove_cursors_of_device(&self, vault_id: &str, device_id: &str) {
let mut vault_to_cursors = self.vault_to_cursors.lock().await;
pub async fn remove_cursors_of_device(&self, vault_id: &VaultId, device_id: &DeviceId) {
let changed = {
let mut vault_to_cursors = self.vault_to_cursors.lock().await;
if let Some(cursors) = vault_to_cursors.get_mut(vault_id) {
cursors.retain(|c| c.client_cursors.device_id != device_id);
if let Some(cursors) = vault_to_cursors.get_mut(vault_id) {
let before = cursors.len();
cursors.retain(|c| c.client_cursors.device_id != *device_id);
let changed = cursors.len() != before;
if cursors.is_empty() {
vault_to_cursors.remove(vault_id);
}
changed
} else {
false
}
};
if changed {
self.broadcast_cursors_for_vault(vault_id).await;
}
}
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,2 @@
CREATE INDEX IF NOT EXISTS idx_documents_document_id
ON documents (document_id, vault_update_id);

View file

@ -0,0 +1,20 @@
ALTER TABLE documents ADD COLUMN creation_vault_update_id INTEGER NOT NULL DEFAULT 0;
UPDATE documents
SET creation_vault_update_id = (
SELECT MIN(d2.vault_update_id)
FROM documents d2
WHERE d2.document_id = documents.document_id
);
DROP VIEW latest_document_versions;
CREATE VIEW IF NOT EXISTS latest_document_versions AS --recreate view as it now includes one more field
SELECT d.*
FROM documents d
INNER JOIN (
SELECT MAX(vault_update_id) AS max_version_id
FROM documents
GROUP BY document_id
) max_versions
ON d.vault_update_id = max_versions.max_version_id;

View file

@ -13,6 +13,7 @@ pub type DeviceId = String;
#[derive(Debug, Clone)]
pub struct StoredDocumentVersion {
pub vault_update_id: VaultUpdateId,
pub creation_vault_update_id: VaultUpdateId,
pub document_id: DocumentId,
pub relative_path: String,
pub updated_date: DateTime<Utc>,
@ -33,7 +34,7 @@ impl PartialEq<Self> for StoredDocumentVersion {
#[derive(TS, Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DocumentVersionWithoutContent {
#[ts(as = "i32")]
#[ts(type = "number")]
pub vault_update_id: VaultUpdateId,
pub document_id: DocumentId,
@ -43,12 +44,16 @@ pub struct DocumentVersionWithoutContent {
pub user_id: UserId,
pub device_id: DeviceId,
#[ts(as = "i32")]
#[ts(type = "number")]
pub content_size: u64,
/// True iff this is the first version of the document
pub is_new_file: bool,
}
impl From<StoredDocumentVersion> for DocumentVersionWithoutContent {
fn from(value: StoredDocumentVersion) -> Self {
let is_new_file = value.creation_vault_update_id == value.vault_update_id;
Self {
vault_update_id: value.vault_update_id,
document_id: value.document_id,
@ -58,6 +63,7 @@ impl From<StoredDocumentVersion> for DocumentVersionWithoutContent {
user_id: value.user_id,
device_id: value.device_id,
content_size: value.content.len() as u64,
is_new_file,
}
}
}
@ -65,7 +71,7 @@ impl From<StoredDocumentVersion> for DocumentVersionWithoutContent {
#[derive(TS, Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DocumentVersion {
#[ts(as = "i32")]
#[ts(type = "number")]
pub vault_update_id: VaultUpdateId,
pub document_id: DocumentId,
@ -77,6 +83,25 @@ pub struct DocumentVersion {
pub device_id: DeviceId,
}
/// Row struct for vault history queries (used by `sqlx::query_as!`)
#[derive(Debug)]
pub struct VaultHistoryRow {
pub vault_update_id: VaultUpdateId,
pub creation_vault_update_id: VaultUpdateId,
pub document_id: DocumentId,
pub relative_path: String,
pub updated_date: DateTime<Utc>,
pub is_deleted: bool,
pub user_id: String,
pub device_id: String,
pub content_size: Option<u64>,
}
pub struct VaultStats {
pub created_at: Option<DateTime<Utc>>,
pub document_count: u32,
}
impl From<StoredDocumentVersion> for DocumentVersion {
fn from(value: StoredDocumentVersion) -> Self {
Self {

View file

@ -1,69 +1,147 @@
use std::{collections::HashMap, sync::Arc};
use std::{
collections::HashMap,
sync::{Arc, Mutex as StdMutex},
};
use anyhow::Context;
use log::{debug, warn};
use log::{debug, info, warn};
use tokio::sync::{Mutex, broadcast};
use super::models::WebSocketServerMessageWithOrigin;
use crate::{
app_state::database::models::VaultId, config::server_config::ServerConfig, errors::server_error,
};
use super::models::{WebSocketServerMessage, WebSocketServerMessageWithOrigin};
use crate::{app_state::database::models::VaultId, config::server_config::ServerConfig};
#[derive(Debug, Clone)]
pub struct Broadcasts {
max_clients_per_vault: usize,
tx: Arc<Mutex<HashMap<VaultId, broadcast::Sender<WebSocketServerMessageWithOrigin>>>>,
broadcast_channel_capacity: usize,
// `tx` uses a blocking std::sync::Mutex because the critical section is
// a HashMap lookup plus a synchronous `broadcast::Sender::send`. Making
// this non-async lets `send_document_update` run without an `.await`,
// so an axum handler that is cancelled between `transaction.commit()`
// and the broadcast can never drop the notification mid-flight.
tx: Arc<StdMutex<HashMap<VaultId, broadcast::Sender<WebSocketServerMessageWithOrigin>>>>,
send_locks: Arc<Mutex<HashMap<VaultId, Arc<tokio::sync::Mutex<()>>>>>,
}
type TxMap = HashMap<VaultId, broadcast::Sender<WebSocketServerMessageWithOrigin>>;
impl Broadcasts {
pub fn new(server_config: &ServerConfig) -> Self {
Self {
max_clients_per_vault: server_config.max_clients_per_vault,
tx: Arc::new(Mutex::new(HashMap::new())),
broadcast_channel_capacity: server_config.broadcast_channel_capacity,
tx: Arc::new(StdMutex::new(HashMap::new())),
send_locks: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn get_receiver(
/// Acquire a per-vault lock that serializes broadcasts in commit order.
/// Must be acquired before the insert, held through commit and broadcast.
pub async fn acquire_send_lock(&self, vault: &VaultId) -> tokio::sync::OwnedMutexGuard<()> {
let lock = {
let mut locks = self.send_locks.lock().await;
locks
.entry(vault.clone())
.or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
.clone()
};
lock.lock_owned().await
}
/// Remove senders for vaults with no active receivers
fn prune_inactive_vaults(tx_map: &mut TxMap) -> Vec<VaultId> {
let mut pruned = Vec::new();
tx_map.retain(|vault, sender| {
let alive = sender.receiver_count() > 0;
if !alive {
pruned.push(vault.clone());
}
alive
});
pruned
}
pub fn get_receiver(
&self,
vault: VaultId,
) -> broadcast::Receiver<WebSocketServerMessageWithOrigin> {
let tx = self.get_or_create(vault).await;
max_clients: usize,
) -> Result<broadcast::Receiver<WebSocketServerMessageWithOrigin>, crate::errors::SyncServerError>
{
let mut tx_map = self
.tx
.lock()
.expect("broadcasts.tx mutex poisoned — a previous holder panicked");
tx.subscribe()
let count_before_prune = tx_map
.get(&vault)
.map_or(0, tokio::sync::broadcast::Sender::receiver_count);
let pruned = Self::prune_inactive_vaults(&mut tx_map);
let pruned_self = pruned.contains(&vault);
let sender = tx_map
.entry(vault.clone())
.or_insert_with(|| broadcast::channel(self.broadcast_channel_capacity).0);
// Hold the lock across the count check *and* the subscribe so the
// `max_clients` cap is atomic: two concurrent callers can't both
// observe `receiver_count() < max_clients` and both subscribe.
if sender.receiver_count() >= max_clients {
return Err(crate::errors::client_error(anyhow::anyhow!(
"Vault has reached the maximum number of clients ({max_clients})"
)));
}
let receiver = sender.subscribe();
let count_after = sender.receiver_count();
info!(
"[BCAST] get_receiver vault={vault} count_before_prune={count_before_prune} pruned_self={pruned_self} pruned_total={} count_after_subscribe={count_after}",
pruned.len()
);
Ok(receiver)
}
/// Notify all clients (who are subscribed to the vault) about an update.
/// We only log failures and don't propagate them.
pub async fn send_document_update(
&self,
vault: VaultId,
document: WebSocketServerMessageWithOrigin,
) {
let tx = self.get_or_create(vault.clone()).await;
/// Synchronous: safe to invoke from a handler between `commit()` and
/// function return without worrying about task cancellation dropping
/// the broadcast mid-flight. Failures are logged, never propagated.
pub fn send_document_update(&self, vault: VaultId, document: WebSocketServerMessageWithOrigin) {
let vault_update_id = match &document.message {
WebSocketServerMessage::VaultUpdate(u) => Some(u.document.vault_update_id),
WebSocketServerMessage::CursorPositions(_) => None,
};
let is_deleted = match &document.message {
WebSocketServerMessage::VaultUpdate(u) => Some(u.document.is_deleted),
WebSocketServerMessage::CursorPositions(_) => None,
};
let mut tx_map = self
.tx
.lock()
.expect("broadcasts.tx mutex poisoned — a previous holder panicked");
let count_before_prune = tx_map
.get(&vault)
.map_or(0, tokio::sync::broadcast::Sender::receiver_count);
let pruned = Self::prune_inactive_vaults(&mut tx_map);
let pruned_self = pruned.contains(&vault);
if tx.receiver_count() == 0 {
let sender = tx_map
.entry(vault.clone())
.or_insert_with(|| broadcast::channel(self.broadcast_channel_capacity).0);
let count_before_send = sender.receiver_count();
if count_before_send == 0 {
info!(
"[BCAST] send_document_update vault={vault} vuid={vault_update_id:?} is_deleted={is_deleted:?} count_before_prune={count_before_prune} pruned_self={pruned_self} count_before_send=0 SKIPPED"
);
debug!("Skipping broadcast, no clients connected for vault `{vault}`");
return;
}
let result = tx
.send(document)
.context("Cannot broadcast server message to websocket listeners")
.map_err(server_error);
if result.is_err() {
warn!("Failed to send message: {result:?}");
let send_result = sender.send(document);
match &send_result {
Ok(n) => info!(
"[BCAST] send_document_update vault={vault} vuid={vault_update_id:?} is_deleted={is_deleted:?} count_before_prune={count_before_prune} pruned_self={pruned_self} count_before_send={count_before_send} SENT delivered_to={n}"
),
Err(e) => warn!(
"[BCAST] send_document_update vault={vault} vuid={vault_update_id:?} is_deleted={is_deleted:?} count_before_prune={count_before_prune} pruned_self={pruned_self} count_before_send={count_before_send} FAILED err={e}"
),
}
}
async fn get_or_create(
&self,
vault: VaultId,
) -> broadcast::Sender<WebSocketServerMessageWithOrigin> {
let mut tx = self.tx.lock().await;
tx.entry(vault)
.or_insert_with(|| broadcast::channel(self.max_clients_per_vault).0.clone())
.clone()
}
}

View file

@ -11,7 +11,7 @@ pub struct WebSocketHandshake {
pub token: String,
pub device_id: DeviceId,
#[ts(as = "Option<i32>")]
#[ts(type = "number | null")]
pub last_seen_vault_update_id: Option<VaultUpdateId>,
}
@ -22,13 +22,14 @@ pub struct CursorPositionFromClient {
}
#[derive(TS, Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct DocumentWithCursors {
// It's None in case the document is dirty.
// We still want to sync the cursor to mark
// that it exists and can be client-side
// interpolated. However, the actual
// position is meaningless.
#[ts(as = "Option<u32>")]
#[ts(type = "number | null")]
pub vault_update_id: Option<VaultUpdateId>,
pub document_id: DocumentId,
@ -57,11 +58,19 @@ pub struct CursorPositionFromServer {
pub clients: Vec<ClientCursors>,
}
// One committed version. Non-delete updates are broadcast to every
// connected client *except* the device that authored them — that
// device already has the new state via its HTTP response. Deletes are
// broadcast to every client including the author: the author keeps
// the document in its sync queue until this receipt arrives so a late
// remote update can't sneak in between the HTTP response and the
// queue cleanup. The server also emits these one-at-a-time to catch
// up a freshly-connected client on versions committed while it was
// offline, in ascending `vault_update_id` order.
#[derive(TS, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct WebSocketVaultUpdate {
pub documents: Vec<DocumentVersionWithoutContent>,
pub is_initial_sync: bool,
pub document: DocumentVersionWithoutContent,
}
#[derive(TS, Deserialize, Clone, Debug)]
@ -80,6 +89,10 @@ pub enum WebSocketServerMessage {
CursorPositions(CursorPositionFromServer),
}
/// Broadcast envelope carrying the message plus the device that produced
/// it. The per-recipient send task compares `origin_device_id` against
/// its own device id to fill in `originates_from_self` before the message
/// is serialized on the wire.
#[derive(Clone, Debug)]
pub struct WebSocketServerMessageWithOrigin {
pub origin_device_id: Option<DeviceId>,

View file

@ -9,7 +9,7 @@ use crate::{
database::models::{DocumentVersionWithoutContent, VaultId, VaultUpdateId},
},
config::user_config::User,
errors::{SyncServerError, server_error, unauthenticated_error},
errors::{SyncServerError, client_error, server_error, unauthenticated_error},
server::auth::auth,
};
@ -26,7 +26,7 @@ pub fn get_authenticated_handshake(
if let Some(Message::Text(message)) = message {
let message: WebSocketClientMessage = serde_json::from_str(&message)
.context("Failed to parse message")
.map_err(server_error)?;
.map_err(client_error)?;
match message {
WebSocketClientMessage::Handshake(handshake) => {
@ -44,21 +44,29 @@ pub fn get_authenticated_handshake(
}
}
/// Stream the documents the client missed while offline, bounded above
/// by `up_to_vault_update_id` so the catch-up is a stable snapshot at
/// exactly that cursor. The WebSocket handshake atomically subscribes
/// to the broadcast channel and snapshots this cursor under the per-
/// vault send lock; commits past the cursor are then delivered solely
/// through the broadcast channel (filtered by the same cursor on the
/// receive side), so every committed update is delivered exactly once.
pub async fn get_unseen_documents(
state: &AppState,
vault_id: &VaultId,
last_seen_vault_update_id: Option<VaultUpdateId>,
up_to_vault_update_id: VaultUpdateId,
) -> Result<Vec<DocumentVersionWithoutContent>, SyncServerError> {
if let Some(update_id) = last_seen_vault_update_id {
state
.database
.get_latest_documents_since(vault_id, update_id, None)
.get_latest_documents_since(vault_id, update_id, Some(up_to_vault_update_id), None)
.await
.map_err(server_error)
} else {
state
.database
.get_latest_documents(vault_id, None)
.get_latest_documents(vault_id, Some(up_to_vault_update_id), None)
.await
.map_err(server_error)
}

View file

@ -27,24 +27,34 @@ pub struct Config {
}
impl Config {
pub fn validate(&self) -> Result<()> {
self.server
.validate()
.context("Invalid server configuration")?;
self.logging
.validate()
.context("Invalid logging configuration")?;
self.database
.validate()
.context("Invalid database configuration")?;
Ok(())
}
pub async fn read_or_create(path: &Path) -> Result<Self> {
let config = if path.exists() {
info!(
"Loading configuration from `{}`",
path.canonicalize().unwrap().display()
);
Self::load_from_file(path).await?
let display_path = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
if path.exists() {
info!("Loading configuration from `{}`", display_path.display());
Self::load_from_file(path).await
} else {
Self::default()
};
config.write(path).await?;
info!(
"Updated configuration at `{}`",
path.canonicalize().unwrap().display()
);
Ok(config)
let config = Self::default();
config.write(path).await?;
info!(
"Created default configuration at `{}`",
display_path.display()
);
Ok(config)
}
}
pub async fn load_from_file(path: &Path) -> Result<Self> {

View file

@ -1,5 +1,6 @@
use std::{path::PathBuf, time::Duration};
use anyhow::{Result, ensure};
use log::debug;
use serde::{Deserialize, Serialize};
@ -34,6 +35,24 @@ fn default_cursor_timeout() -> Duration {
DEFAULT_CURSOR_TIMEOUT
}
impl DatabaseConfig {
pub fn validate(&self) -> Result<()> {
ensure!(
!self.databases_directory_path.as_os_str().is_empty(),
"databases_directory_path must not be empty"
);
ensure!(
self.max_connections_per_vault > 0,
"max_connections_per_vault must be greater than 0"
);
ensure!(
!self.cursor_timeout.is_zero(),
"cursor_timeout must be greater than 0"
);
Ok(())
}
}
impl Default for DatabaseConfig {
fn default() -> Self {
Self {

View file

@ -1,10 +1,13 @@
use std::time::Duration;
use anyhow::{Result, ensure};
use log::debug;
use serde::{Deserialize, Serialize};
use crate::{
consts::{DEFAULT_LOG_DIRECTORY, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_INTERVAL},
consts::{
DEFAULT_LOG_DIRECTORY, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_INTERVAL, DURATION_ZERO,
},
utils::log_level::LogLevel,
};
@ -20,6 +23,20 @@ pub struct LoggingConfig {
pub log_level: LogLevel,
}
impl LoggingConfig {
pub fn validate(&self) -> Result<()> {
ensure!(
!self.log_directory.is_empty(),
"log_directory must not be an empty string"
);
ensure!(
self.log_rotation > DURATION_ZERO,
"log_rotation must be greater than 0"
);
Ok(())
}
}
impl Default for LoggingConfig {
fn default() -> Self {
Self {

View file

@ -1,10 +1,13 @@
use anyhow::{Result, ensure};
use log::debug;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use crate::consts::{
DEFAULT_HOST, DEFAULT_MAX_BODY_SIZE_MB, DEFAULT_MAX_CLIENTS_PER_VAULT,
DEFAULT_MERGEABLE_FILE_EXTENSIONS, DEFAULT_PORT, DEFAULT_RESPONSE_TIMEOUT_SECONDS,
DEFAULT_ALLOWED_ORIGINS, DEFAULT_BROADCAST_CHANNEL_CAPACITY, DEFAULT_HOST,
DEFAULT_MAX_BODY_SIZE_MB, DEFAULT_MAX_CLIENTS_PER_VAULT, DEFAULT_MAX_PENDING_WS_CONNECTIONS,
DEFAULT_MERGEABLE_FILE_EXTENSIONS, DEFAULT_PORT, DEFAULT_RATE_LIMIT_PER_USER_PER_SECOND,
DEFAULT_RESPONSE_TIMEOUT_SECONDS, DURATION_ZERO,
};
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
@ -21,11 +24,56 @@ pub struct ServerConfig {
#[serde(default = "default_max_clients_per_vault")]
pub max_clients_per_vault: usize,
#[serde(default = "default_broadcast_channel_capacity")]
pub broadcast_channel_capacity: usize,
#[serde(default = "default_response_timeout", with = "humantime_serde")]
pub response_timeout: Duration,
#[serde(default = "default_mergeable_file_extensions")]
pub mergeable_file_extensions: Vec<String>,
/// Per-user maximum requests per second (keyed by bearer token).
/// `None` disables rate limiting.
#[serde(default = "default_rate_limit_per_user_per_second")]
pub rate_limit_per_user_per_second: Option<u64>,
/// Allowed CORS origins. Default: `["*"]` (allow all).
#[serde(default = "default_allowed_origins")]
pub allowed_origins: Vec<String>,
/// Maximum concurrent unauthenticated WebSocket connections waiting for
/// handshake. Limits resource consumption from clients that connect but
/// never authenticate.
#[serde(default = "default_max_pending_websocket_connections")]
pub max_pending_websocket_connections: usize,
}
impl ServerConfig {
pub fn validate(&self) -> Result<()> {
ensure!(
self.response_timeout > DURATION_ZERO,
"response_timeout must be greater than 0"
);
ensure!(
self.max_body_size_mb > 0,
"max_body_size_mb must be greater than 0"
);
ensure!(
self.max_clients_per_vault > 0,
"max_clients_per_vault must be greater than 0"
);
ensure!(
self.broadcast_channel_capacity > 0,
"broadcast_channel_capacity must be greater than 0"
);
ensure!(
self.max_pending_websocket_connections > 0,
"max_pending_websocket_connections must be greater than 0"
);
Ok(())
}
}
fn default_host() -> String {
@ -48,6 +96,11 @@ fn default_max_clients_per_vault() -> usize {
DEFAULT_MAX_CLIENTS_PER_VAULT
}
fn default_broadcast_channel_capacity() -> usize {
debug!("Using default broadcast channel capacity: {DEFAULT_BROADCAST_CHANNEL_CAPACITY}");
DEFAULT_BROADCAST_CHANNEL_CAPACITY
}
fn default_response_timeout() -> Duration {
debug!("Using default response timeout: {DEFAULT_RESPONSE_TIMEOUT_SECONDS:?}");
DEFAULT_RESPONSE_TIMEOUT_SECONDS
@ -60,3 +113,21 @@ fn default_mergeable_file_extensions() -> Vec<String> {
.map(|s| (*s).to_owned())
.collect()
}
fn default_rate_limit_per_user_per_second() -> Option<u64> {
debug!("Using default rate limit per second: {DEFAULT_RATE_LIMIT_PER_USER_PER_SECOND:?}");
DEFAULT_RATE_LIMIT_PER_USER_PER_SECOND
}
fn default_allowed_origins() -> Vec<String> {
debug!("Using default allowed origins: {DEFAULT_ALLOWED_ORIGINS:?}");
DEFAULT_ALLOWED_ORIGINS
.iter()
.map(|s| (*s).to_owned())
.collect()
}
fn default_max_pending_websocket_connections() -> usize {
debug!("Using default max pending WebSocket connections: {DEFAULT_MAX_PENDING_WS_CONNECTIONS}");
DEFAULT_MAX_PENDING_WS_CONNECTIONS
}

Some files were not shown because too many files have changed in this diff Show more