split: server foundation (Cargo, config, errors, utils, main)

Cargo.{toml,lock} bumps, build.rs, config-e2e.yml, rust-toolchain.toml,
src/config/* (database/logging/server/user configs), src/consts.rs,
src/errors.rs, src/main.rs, and src/utils/* (dedup_paths,
find_first_available_path, rotating_file_writer, sanitize_path).
This commit is contained in:
Andras Schmelczer 2026-05-08 21:35:18 +01:00
parent 70f97c4b16
commit a9ce09b59d
17 changed files with 535 additions and 99 deletions

193
sync-server/Cargo.lock generated
View file

@ -337,10 +337,11 @@ checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b"
[[package]]
name = "cc"
version = "1.2.2"
version = "1.2.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f34d93e62b03caf570cccc334cbc6c2fceca82f39211051345108adcba3eebdc"
checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423"
dependencies = [
"find-msvc-tools",
"shlex",
]
@ -456,6 +457,15 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
[[package]]
name = "crossbeam-channel"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.11"
@ -533,6 +543,15 @@ dependencies = [
"zeroize",
]
[[package]]
name = "deranged"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c"
dependencies = [
"powerfmt",
]
[[package]]
name = "digest"
version = "0.10.7"
@ -624,6 +643,12 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4"
[[package]]
name = "find-msvc-tools"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582"
[[package]]
name = "flume"
version = "0.11.1"
@ -1272,6 +1297,16 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "miniz_oxide"
version = "0.8.0"
@ -1335,6 +1370,12 @@ dependencies = [
"zeroize",
]
[[package]]
name = "num-conv"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967"
[[package]]
name = "num-integer"
version = "0.1.46"
@ -1463,6 +1504,12 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2"
[[package]]
name = "powerfmt"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "ppv-lite86"
version = "0.2.20"
@ -1582,12 +1629,12 @@ dependencies = [
[[package]]
name = "reconcile-text"
version = "0.8.0"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "599cf9539996a2a19e501110404c59ba62f4974009f8fb864a8b7151c15ee5a5"
checksum = "52e0cf361887ea64c479ca871c1170dda761f84e122f2616b5579906a38d7557"
dependencies = [
"serde",
"thiserror 2.0.17",
"thiserror 2.0.18",
]
[[package]]
@ -1648,6 +1695,40 @@ dependencies = [
"zeroize",
]
[[package]]
name = "rust-embed"
version = "8.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04113cb9355a377d83f06ef1f0a45b8ab8cd7d8b1288160717d66df5c7988d27"
dependencies = [
"rust-embed-impl",
"rust-embed-utils",
"walkdir",
]
[[package]]
name = "rust-embed-impl"
version = "8.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da0902e4c7c8e997159ab384e6d0fc91c221375f6894346ae107f47dd0f3ccaa"
dependencies = [
"proc-macro2",
"quote",
"rust-embed-utils",
"syn 2.0.90",
"walkdir",
]
[[package]]
name = "rust-embed-utils"
version = "8.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bcdef0be6fe7f6fa333b1073c949729274b05f123a0ad7efcb8efd878e5c3b1"
dependencies = [
"sha2",
"walkdir",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
@ -1679,6 +1760,15 @@ version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]]
name = "sanitize-filename"
version = "0.6.0"
@ -1916,7 +2006,7 @@ dependencies = [
"serde_json",
"sha2",
"smallvec",
"thiserror 2.0.17",
"thiserror 2.0.18",
"tokio",
"tokio-stream",
"tracing",
@ -2000,7 +2090,7 @@ dependencies = [
"smallvec",
"sqlx-core",
"stringprep",
"thiserror 2.0.17",
"thiserror 2.0.18",
"tracing",
"uuid",
"whoami",
@ -2039,7 +2129,7 @@ dependencies = [
"smallvec",
"sqlx-core",
"stringprep",
"thiserror 2.0.17",
"thiserror 2.0.18",
"tracing",
"uuid",
"whoami",
@ -2065,7 +2155,7 @@ dependencies = [
"serde",
"serde_urlencoded",
"sqlx-core",
"thiserror 2.0.17",
"thiserror 2.0.18",
"tracing",
"url",
"uuid",
@ -2100,6 +2190,12 @@ version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "symlink"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7973cce6668464ea31f176d85b13c7ab3bba2cb3b77a2ed26abd7801688010a"
[[package]]
name = "syn"
version = "1.0.109"
@ -2136,18 +2232,22 @@ dependencies = [
"futures",
"humantime-serde",
"log",
"mime_guess",
"rand 0.9.0",
"reconcile-text",
"regex",
"rust-embed",
"sanitize-filename",
"serde",
"serde_json",
"serde_yaml",
"sqlx",
"thiserror 2.0.17",
"subtle",
"thiserror 2.0.18",
"tokio",
"tower-http",
"tracing",
"tracing-appender",
"tracing-subscriber",
"ts-rs",
"uuid",
@ -2203,11 +2303,11 @@ dependencies = [
[[package]]
name = "thiserror"
version = "2.0.17"
version = "2.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8"
checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4"
dependencies = [
"thiserror-impl 2.0.17",
"thiserror-impl 2.0.18",
]
[[package]]
@ -2223,9 +2323,9 @@ dependencies = [
[[package]]
name = "thiserror-impl"
version = "2.0.17"
version = "2.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913"
checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5"
dependencies = [
"proc-macro2",
"quote",
@ -2242,6 +2342,37 @@ dependencies = [
"once_cell",
]
[[package]]
name = "time"
version = "0.3.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c"
dependencies = [
"deranged",
"itoa",
"num-conv",
"powerfmt",
"serde_core",
"time-core",
"time-macros",
]
[[package]]
name = "time-core"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca"
[[package]]
name = "time-macros"
version = "0.2.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215"
dependencies = [
"num-conv",
"time-core",
]
[[package]]
name = "tinystr"
version = "0.7.6"
@ -2276,7 +2407,6 @@ dependencies = [
"bytes",
"libc",
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
@ -2376,6 +2506,19 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-appender"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "050686193eb999b4bb3bc2acfa891a13da00f79734704c4b8b4ef1a10b368a3c"
dependencies = [
"crossbeam-channel",
"symlink",
"thiserror 2.0.18",
"time",
"tracing-subscriber",
]
[[package]]
name = "tracing-attributes"
version = "0.1.28"
@ -2434,7 +2577,7 @@ checksum = "e640d9b0964e9d39df633548591090ab92f7a4567bc31d3891af23471a3365c6"
dependencies = [
"chrono",
"lazy_static",
"thiserror 2.0.17",
"thiserror 2.0.18",
"ts-rs-macros",
"uuid",
]
@ -2481,6 +2624,12 @@ version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f720def6ce1ee2fc44d40ac9ed6d3a59c361c80a75a7aa8e75bb9baed31cf2ea"
[[package]]
name = "unicase"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142"
[[package]]
name = "unicode-bidi"
version = "0.3.17"
@ -2577,6 +2726,16 @@ version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]]
name = "walkdir"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
dependencies = [
"same-file",
"winapi-util",
]
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"

View file

@ -1,6 +1,6 @@
[package]
name = "sync_server"
rust-version = "1.89.0"
rust-version = "1.94.0"
authors = ["Andras Schmelczer <andras@schmelczer.dev>"]
edition = "2024"
license = "MIT"
@ -10,7 +10,7 @@ version = "0.14.0"
[dependencies]
serde = { version = "1.0.219", default-features = false, features = ["derive"] }
thiserror = { version = "2.0.12", default-features = false }
tokio = { version = "1.48.0", features = ["full"]}
tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread", "sync", "time", "net", "fs", "signal"]}
uuid = { version = "1.16.0", features = ["v4", "serde"] }
log = { version = "0.4.28" }
anyhow = { version = "1.0.100", features = ["backtrace"] }
@ -20,6 +20,7 @@ axum_typed_multipart = "0.11.0"
tower-http = { version = "0.6.1", features = ["cors", "trace", "limit", "timeout"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.20", features = ["fmt", "env-filter"]}
tracing-appender = "0.2.5"
humantime-serde = "1.1.1"
sqlx = { version = "0.8.6", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] }
chrono = { version = "0.4.41", features = ["serde"] }
@ -33,7 +34,10 @@ serde_json = "1.0.140"
bimap = "0.6.3"
ts-rs = { version = "10.1", features = ["uuid-impl", "chrono-impl"] }
base64 = "0.22.1"
reconcile-text = { version = "0.8.0", features = ["serde"] }
reconcile-text = { version = "0.11.0", features = ["serde"] }
rust-embed = "8.5"
mime_guess = "2.0"
subtle = "2.6.1"
[profile.release]
codegen-units = 1

View file

@ -1,5 +1,16 @@
// generated by `sqlx migrate build-script`
fn main() {
// trigger recompilation when a new migration is added
println!("cargo:rerun-if-changed=migrations");
// Ensure the history-ui dist directory exists so rust-embed can compile
// even when the frontend hasn't been built yet.
let dist_path = std::path::Path::new("../frontend/history-ui/dist");
if !dist_path.exists() {
std::fs::create_dir_all(dist_path).expect("Failed to create history-ui dist directory");
std::fs::write(
dist_path.join("index.html"),
"<!DOCTYPE html><html><body><p>Run <code>npm run build -w history-ui</code> first.</p></body></html>",
)
.expect("Failed to write placeholder index.html");
}
}

View file

@ -1,32 +1,34 @@
database:
databases_directory_path: databases
max_connections_per_vault: 12
databases_directory_path: /host/tmp/vaultlink-e2e-databases
max_connections_per_vault: 8
cursor_timeout: 1m
server:
host: 0.0.0.0
port: 3000
port: 3010
max_body_size_mb: 512
max_clients_per_vault: 256
max_pending_websocket_connections: 4096
broadcast_channel_capacity: 1024
response_timeout: 30m
mergeable_file_extensions:
- md
- txt
- md
- txt
users:
user_configs:
- name: admin
token: test-token-change-me
vault_access:
type: allow_access_to_all
- name: other-admin
token: test-token-change-me2
vault_access:
type: allow_access_to_all
- name: test
token: other-test-token
vault_access:
type: allow_list
allowed:
- default
- name: admin
token: test-token-change-me
vault_access:
type: allow_access_to_all
- name: other-admin
token: test-token-change-me2
vault_access:
type: allow_access_to_all
- name: test
token: other-test-token
vault_access:
type: allow_list
allowed:
- default
logging:
log_directory: logs
log_rotation: 7days

View file

@ -1,5 +1,5 @@
[toolchain]
channel = "1.89.0"
channel = "1.94.0"
targets = [
"x86_64-unknown-linux-gnu",
"x86_64-unknown-linux-musl",

View file

@ -27,24 +27,34 @@ pub struct Config {
}
impl Config {
pub fn validate(&self) -> Result<()> {
self.server
.validate()
.context("Invalid server configuration")?;
self.logging
.validate()
.context("Invalid logging configuration")?;
self.database
.validate()
.context("Invalid database configuration")?;
Ok(())
}
pub async fn read_or_create(path: &Path) -> Result<Self> {
let config = if path.exists() {
info!(
"Loading configuration from `{}`",
path.canonicalize().unwrap().display()
);
Self::load_from_file(path).await?
let display_path = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
if path.exists() {
info!("Loading configuration from `{}`", display_path.display());
Self::load_from_file(path).await
} else {
Self::default()
};
config.write(path).await?;
info!(
"Updated configuration at `{}`",
path.canonicalize().unwrap().display()
);
Ok(config)
let config = Self::default();
config.write(path).await?;
info!(
"Created default configuration at `{}`",
display_path.display()
);
Ok(config)
}
}
pub async fn load_from_file(path: &Path) -> Result<Self> {

View file

@ -1,5 +1,6 @@
use std::{path::PathBuf, time::Duration};
use anyhow::{Result, ensure};
use log::debug;
use serde::{Deserialize, Serialize};
@ -34,6 +35,24 @@ fn default_cursor_timeout() -> Duration {
DEFAULT_CURSOR_TIMEOUT
}
impl DatabaseConfig {
pub fn validate(&self) -> Result<()> {
ensure!(
!self.databases_directory_path.as_os_str().is_empty(),
"databases_directory_path must not be empty"
);
ensure!(
self.max_connections_per_vault > 0,
"max_connections_per_vault must be greater than 0"
);
ensure!(
!self.cursor_timeout.is_zero(),
"cursor_timeout must be greater than 0"
);
Ok(())
}
}
impl Default for DatabaseConfig {
fn default() -> Self {
Self {

View file

@ -1,10 +1,13 @@
use std::time::Duration;
use anyhow::{Result, ensure};
use log::debug;
use serde::{Deserialize, Serialize};
use crate::{
consts::{DEFAULT_LOG_DIRECTORY, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_INTERVAL},
consts::{
DEFAULT_LOG_DIRECTORY, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_INTERVAL, DURATION_ZERO,
},
utils::log_level::LogLevel,
};
@ -20,6 +23,20 @@ pub struct LoggingConfig {
pub log_level: LogLevel,
}
impl LoggingConfig {
pub fn validate(&self) -> Result<()> {
ensure!(
!self.log_directory.is_empty(),
"log_directory must not be an empty string"
);
ensure!(
self.log_rotation > DURATION_ZERO,
"log_rotation must be greater than 0"
);
Ok(())
}
}
impl Default for LoggingConfig {
fn default() -> Self {
Self {

View file

@ -1,10 +1,13 @@
use anyhow::{Result, ensure};
use log::debug;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use crate::consts::{
DEFAULT_HOST, DEFAULT_MAX_BODY_SIZE_MB, DEFAULT_MAX_CLIENTS_PER_VAULT,
DEFAULT_MERGEABLE_FILE_EXTENSIONS, DEFAULT_PORT, DEFAULT_RESPONSE_TIMEOUT_SECONDS,
DEFAULT_ALLOWED_ORIGINS, DEFAULT_BROADCAST_CHANNEL_CAPACITY, DEFAULT_HOST,
DEFAULT_MAX_BODY_SIZE_MB, DEFAULT_MAX_CLIENTS_PER_VAULT, DEFAULT_MAX_PENDING_WS_CONNECTIONS,
DEFAULT_MERGEABLE_FILE_EXTENSIONS, DEFAULT_PORT, DEFAULT_RATE_LIMIT_PER_USER_PER_SECOND,
DEFAULT_RESPONSE_TIMEOUT_SECONDS, DURATION_ZERO,
};
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
@ -21,11 +24,56 @@ pub struct ServerConfig {
#[serde(default = "default_max_clients_per_vault")]
pub max_clients_per_vault: usize,
#[serde(default = "default_broadcast_channel_capacity")]
pub broadcast_channel_capacity: usize,
#[serde(default = "default_response_timeout", with = "humantime_serde")]
pub response_timeout: Duration,
#[serde(default = "default_mergeable_file_extensions")]
pub mergeable_file_extensions: Vec<String>,
/// Per-user maximum requests per second (keyed by bearer token).
/// `None` disables rate limiting.
#[serde(default = "default_rate_limit_per_user_per_second")]
pub rate_limit_per_user_per_second: Option<u64>,
/// Allowed CORS origins. Default: `["*"]` (allow all).
#[serde(default = "default_allowed_origins")]
pub allowed_origins: Vec<String>,
/// Maximum concurrent unauthenticated WebSocket connections waiting for
/// handshake. Limits resource consumption from clients that connect but
/// never authenticate.
#[serde(default = "default_max_pending_websocket_connections")]
pub max_pending_websocket_connections: usize,
}
impl ServerConfig {
pub fn validate(&self) -> Result<()> {
ensure!(
self.response_timeout > DURATION_ZERO,
"response_timeout must be greater than 0"
);
ensure!(
self.max_body_size_mb > 0,
"max_body_size_mb must be greater than 0"
);
ensure!(
self.max_clients_per_vault > 0,
"max_clients_per_vault must be greater than 0"
);
ensure!(
self.broadcast_channel_capacity > 0,
"broadcast_channel_capacity must be greater than 0"
);
ensure!(
self.max_pending_websocket_connections > 0,
"max_pending_websocket_connections must be greater than 0"
);
Ok(())
}
}
fn default_host() -> String {
@ -48,6 +96,11 @@ fn default_max_clients_per_vault() -> usize {
DEFAULT_MAX_CLIENTS_PER_VAULT
}
fn default_broadcast_channel_capacity() -> usize {
debug!("Using default broadcast channel capacity: {DEFAULT_BROADCAST_CHANNEL_CAPACITY}");
DEFAULT_BROADCAST_CHANNEL_CAPACITY
}
fn default_response_timeout() -> Duration {
debug!("Using default response timeout: {DEFAULT_RESPONSE_TIMEOUT_SECONDS:?}");
DEFAULT_RESPONSE_TIMEOUT_SECONDS
@ -60,3 +113,21 @@ fn default_mergeable_file_extensions() -> Vec<String> {
.map(|s| (*s).to_owned())
.collect()
}
fn default_rate_limit_per_user_per_second() -> Option<u64> {
debug!("Using default rate limit per second: {DEFAULT_RATE_LIMIT_PER_USER_PER_SECOND:?}");
DEFAULT_RATE_LIMIT_PER_USER_PER_SECOND
}
fn default_allowed_origins() -> Vec<String> {
debug!("Using default allowed origins: {DEFAULT_ALLOWED_ORIGINS:?}");
DEFAULT_ALLOWED_ORIGINS
.iter()
.map(|s| (*s).to_owned())
.collect()
}
fn default_max_pending_websocket_connections() -> usize {
debug!("Using default max pending WebSocket connections: {DEFAULT_MAX_PENDING_WS_CONNECTIONS}");
DEFAULT_MAX_PENDING_WS_CONNECTIONS
}

View file

@ -1,6 +1,7 @@
use bimap::BiHashMap;
use rand::{Rng, distr::Alphanumeric, rng};
use serde::{Deserialize, Deserializer, Serialize, de::Error};
use subtle::ConstantTimeEq;
use crate::app_state::database::models::VaultId;
@ -19,10 +20,19 @@ where
let mut user_token_map = BiHashMap::new();
for user in &users {
if let Some(existing_name) = user_token_map.get_by_right(&user.token) {
let redacted = if user.token.len() > 6 {
format!(
"{}...{}",
&user.token[..3],
&user.token[user.token.len() - 3..]
)
} else {
"***".to_owned()
};
return Err(D::Error::custom(format!(
"Duplicate user token found: `{}` for users `{}` and `{}`. User tokens must be \
unique.",
user.token, existing_name, user.name
"Duplicate user token found: `{redacted}` for users `{}` and `{}`. User tokens \
must be unique.",
existing_name, user.name
)));
}
@ -41,7 +51,9 @@ where
impl UserConfig {
pub fn get_user(&self, token: &str) -> Option<&User> {
self.user_configs.iter().find(|u| u.token == token)
self.user_configs
.iter()
.find(|u| u.token.as_bytes().ct_eq(token.as_bytes()).into())
}
}

View file

@ -2,22 +2,36 @@ use std::time::Duration;
use crate::utils::log_level::LogLevel;
pub const DURATION_ZERO: Duration = Duration::from_secs(0);
pub const DEFAULT_CONFIG_PATH: &str = "config.yml";
pub const DEFAULT_DATABASES_DIRECTORY_PATH: &str = "databases";
pub const DEFAULT_MAX_CONNECTIONS_PER_VAULT: u32 = 12;
pub const DEFAULT_MAX_CONNECTIONS_PER_VAULT: u32 = 6;
pub const DEFAULT_CURSOR_TIMEOUT: Duration = Duration::from_secs(60);
pub const DEFAULT_HOST: &str = "127.0.0.1";
pub const DEFAULT_PORT: u16 = 3000;
pub const DEFAULT_MAX_BODY_SIZE_MB: usize = 4096;
pub const DEFAULT_RESPONSE_TIMEOUT_SECONDS: Duration = Duration::from_secs(1800);
pub const DEFAULT_RESPONSE_TIMEOUT_SECONDS: Duration = Duration::from_mins(30);
pub const DEFAULT_MAX_CLIENTS_PER_VAULT: usize = 256;
pub const DEFAULT_BROADCAST_CHANNEL_CAPACITY: usize = 4096;
pub const DEFAULT_MAX_PENDING_WS_CONNECTIONS: usize = 128;
pub const DEFAULT_LOG_DIRECTORY: &str = "logs";
pub const DEFAULT_LOG_ROTATION_INTERVAL: Duration = Duration::from_secs(60 * 60 * 24); // 1 day
pub const DEFAULT_LOG_ROTATION_INTERVAL: Duration = Duration::from_hours(24);
pub const IDLE_POOL_TIMEOUT: Duration = Duration::from_mins(5);
pub const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);
pub const MAX_CURSOR_DOCUMENTS: usize = 1000;
pub const MAX_CURSORS_PER_DOCUMENT: usize = 100;
pub const MAX_RELATIVE_PATH_LEN: usize = 4096;
pub const DEFAULT_LOG_LEVEL: LogLevel = LogLevel::Info;
pub const DEFAULT_MERGEABLE_FILE_EXTENSIONS: &[&str] = &["md", "txt"];
pub const SUPPORTED_API_VERSION: u32 = 2;
pub const DEFAULT_RATE_LIMIT_PER_USER_PER_SECOND: Option<u64> = None;
pub const DEFAULT_ALLOWED_ORIGINS: &[&str] = &["*"];
pub const SUPPORTED_API_VERSION: u32 = 3;

View file

@ -5,7 +5,7 @@ use axum::{
http::StatusCode,
response::{IntoResponse, Response},
};
use log::{debug, error};
use log::{debug, error, warn};
use serde::Serialize;
use thiserror::Error;
use ts_rs::TS;
@ -29,6 +29,9 @@ pub enum SyncServerError {
#[error("Permission denied error: {0}")]
PermissionDeniedError(#[source] anyhow::Error),
#[error("Too many requests: {0}")]
TooManyRequests(#[source] anyhow::Error),
}
impl SyncServerError {
@ -39,7 +42,8 @@ impl SyncServerError {
| Self::ServerError(error)
| Self::NotFound(error)
| Self::Unauthenticated(error)
| Self::PermissionDeniedError(error) => error.into(),
| Self::PermissionDeniedError(error)
| Self::TooManyRequests(error) => error.into(),
}
}
}
@ -69,7 +73,22 @@ impl Display for SerializedError {
impl IntoResponse for SyncServerError {
fn into_response(self) -> Response {
let body = Json(self.serialize());
let serialized = self.serialize();
match &self {
Self::InitError(_) | Self::ServerError(_) => {
error!("{serialized}");
}
Self::ClientError(_) | Self::NotFound(_) => {
warn!("{serialized}");
}
Self::TooManyRequests(_) => {
warn!("{serialized}");
}
Self::Unauthenticated(_) | Self::PermissionDeniedError(_) => {}
}
let body = Json(serialized);
match self {
Self::InitError(_) | Self::ServerError(_) => {
@ -79,6 +98,7 @@ impl IntoResponse for SyncServerError {
Self::NotFound(_) => (StatusCode::NOT_FOUND, body).into_response(),
Self::Unauthenticated(_) => (StatusCode::UNAUTHORIZED, body).into_response(),
Self::PermissionDeniedError(_) => (StatusCode::FORBIDDEN, body).into_response(),
Self::TooManyRequests(_) => (StatusCode::TOO_MANY_REQUESTS, body).into_response(),
}
}
}
@ -102,6 +122,7 @@ impl From<&anyhow::Error> for SerializedError {
SyncServerError::NotFound(_) => "NotFound",
SyncServerError::Unauthenticated(_) => "Unauthenticated",
SyncServerError::PermissionDeniedError(_) => "PermissionDeniedError",
SyncServerError::TooManyRequests(_) => "TooManyRequests",
},
),
message: error.to_string(),
@ -139,3 +160,21 @@ pub fn permission_denied_error(error: anyhow::Error) -> SyncServerError {
debug!("Permission denied: {error:?}");
SyncServerError::PermissionDeniedError(error)
}
pub fn too_many_requests_error(error: anyhow::Error) -> SyncServerError {
debug!("Too many requests: {error:?}");
SyncServerError::TooManyRequests(error)
}
/// Maps a `create_write_transaction` error to 429 if the database is busy,
/// or 500 for all other failures.
pub fn write_transaction_error(error: anyhow::Error) -> SyncServerError {
if error
.downcast_ref::<crate::app_state::database::WriteBusyError>()
.is_some()
{
too_many_requests_error(error)
} else {
server_error(error)
}
}

View file

@ -16,6 +16,7 @@ use consts::DEFAULT_CONFIG_PATH;
use errors::{SyncServerError, init_error};
use log::info;
use server::create_server;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{EnvFilter, fmt::format, layer::SubscriberExt, util::SubscriberInitExt};
use utils::rotating_file_writer::RotatingFileWriter;
@ -41,11 +42,14 @@ async fn main() -> ExitCode {
}
};
let mut result = set_up_logging(&args, &config.logging);
if result.is_ok() {
result = start_server(config).await;
let result = async {
config.validate().map_err(init_error)?;
// Hold the non-blocking writer guards until shutdown so the
// dedicated writer threads stay alive and flush queued log lines.
let _log_guards = set_up_logging(&args, &config.logging)?;
start_server(config).await
}
.await;
match result {
Ok(()) => ExitCode::SUCCESS,
@ -59,7 +63,7 @@ async fn main() -> ExitCode {
fn set_up_logging(
args: &Args,
logging_config: &config::logging_config::LoggingConfig,
) -> Result<(), SyncServerError> {
) -> Result<[WorkerGuard; 2], SyncServerError> {
let level_filter = logging_config.log_level.as_tracing_level();
let env_filter = EnvFilter::builder()
@ -80,6 +84,14 @@ fn set_up_logging(
.context("Failed to create rotating file writer")
.map_err(init_error)?;
// Decouple log emission from disk/stderr I/O. Without this, a tokio
// worker that holds the writer's std::sync::Mutex while a `write(2)`
// is throttled by the kernel (e.g. btrfs writeback) cascades the
// stall to every other worker that tries to log, freezing the whole
// runtime. The guards must outlive every emitter.
let (file_writer, file_guard) = tracing_appender::non_blocking(file_appender);
let (stderr_writer, stderr_guard) = tracing_appender::non_blocking(std::io::stderr());
let format = format()
.with_target(is_debug_mode)
.with_line_number(is_debug_mode)
@ -87,12 +99,12 @@ fn set_up_logging(
let stderr_layer = tracing_subscriber::fmt::layer()
.with_ansi(use_colors)
.with_writer(std::io::stderr)
.with_writer(stderr_writer)
.event_format(format.clone());
let file_layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(file_appender)
.with_writer(file_writer)
.event_format(format);
tracing_subscriber::registry()
@ -103,7 +115,7 @@ fn set_up_logging(
.context("Failed to initialise tracing")
.map_err(init_error)?;
Ok(())
Ok([file_guard, stderr_guard])
}
async fn start_server(config: Config) -> Result<(), SyncServerError> {

View file

@ -1,8 +1,17 @@
use std::sync::LazyLock;
use regex::Regex;
static DEDUP_SUFFIX_REGEX: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r" \((\d+)\)$").expect("invalid regex"));
pub fn dedup_paths(path: &str) -> impl Iterator<Item = String> {
let mut path_parts = path.split('/').collect::<Vec<_>>();
let file_name = path_parts.pop().unwrap().to_owned();
let file_name = path_parts
.pop()
.filter(|s| !s.is_empty())
.unwrap_or(path)
.to_owned();
let mut directory = path_parts.join("/");
if !directory.is_empty() {
@ -29,14 +38,13 @@ pub fn dedup_paths(path: &str) -> impl Iterator<Item = String> {
}
};
let regex = Regex::new(r" \((\d+)\)$").unwrap();
let start_number = regex
let start_number = DEDUP_SUFFIX_REGEX
.captures(&stem)
.and_then(|caps| caps.get(1))
.and_then(|m| m.as_str().parse::<u32>().ok())
.unwrap_or(0);
let clean_stem = regex.replace(&stem, "").to_string();
let clean_stem = DEDUP_SUFFIX_REGEX.replace(&stem, "").to_string();
(start_number..).map(move |dedup_number| {
if dedup_number == 0 {

View file

@ -1,25 +1,30 @@
use crate::app_state::database::models::VaultId;
use crate::{app_state::database::Transaction, utils::dedup_paths::dedup_paths};
use crate::utils::dedup_paths::dedup_paths;
use anyhow::Result;
use log::{debug, info};
use sqlx::sqlite::SqliteConnection;
pub async fn find_first_available_path(
vault_id: &VaultId,
sanitized_relative_path: &str,
database: &crate::app_state::database::Database,
transaction: &mut Transaction<'_>,
connection: &mut SqliteConnection,
) -> Result<String> {
info!("Finding first available path for `{sanitized_relative_path}` in vault `{vault_id}`");
for candidate in dedup_paths(sanitized_relative_path) {
debug!("Checking candidate path for deconflicting names: `{candidate}`");
if database
.get_latest_document_by_path(vault_id, &candidate, Some(transaction))
.get_latest_non_deleted_document_by_path(vault_id, &candidate, Some(connection))
.await?
.is_none()
{
info!("Selected available path: `{candidate}`");
return Ok(candidate);
}
info!(
"Finding first available path for `{sanitized_relative_path}` in vault `{vault_id}` as `{candidate}` is already taken"
);
}
unreachable!("dedup_paths produces infinite paths");

View file

@ -6,7 +6,7 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};
use chrono::{Local, NaiveDateTime};
use chrono::NaiveDateTime;
use tracing_subscriber::fmt::MakeWriter;
#[derive(Clone)]
@ -55,7 +55,7 @@ impl RotatingFileWriter {
let timestamp_str = filename.get(prefix_len..filename.len().checked_sub(4)?)?;
let dt = NaiveDateTime::parse_from_str(timestamp_str, "%Y-%m-%d_%H-%M-%S").ok()?;
let timestamp = dt.and_local_timezone(Local).single()?;
let timestamp = dt.and_utc();
let secs: u64 = timestamp.timestamp().try_into().ok()?;
Some(UNIX_EPOCH + Duration::from_secs(secs))
@ -114,7 +114,7 @@ impl RotatingFileWriter {
}
fn rotate(inner: &mut RotatingFileWriterInner) -> io::Result<()> {
let timestamp = Local::now().format("%Y-%m-%d_%H-%M-%S");
let timestamp = chrono::Utc::now().format("%Y-%m-%d_%H-%M-%S");
let filename = format!("{}.{}.log", inner.file_prefix, timestamp);
let filepath = inner.directory.join(filename);
@ -132,8 +132,14 @@ impl RotatingFileWriter {
impl Write for RotatingFileWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut inner = self.inner.lock().unwrap();
let mut inner = self.inner.lock().unwrap_or_else(|poisoned| {
eprintln!("RotatingFileWriter mutex was poisoned, recovering");
poisoned.into_inner()
});
// Reset file handle after poison recovery so the next branch
// re-opens a valid file rather than writing to a potentially
// half-closed handle.
if inner.current_file.is_none() {
Self::open_or_create_log_file(&mut inner)?;
} else if Self::should_rotate(&inner) {
@ -148,7 +154,10 @@ impl Write for RotatingFileWriter {
}
fn flush(&mut self) -> io::Result<()> {
let mut inner = self.inner.lock().unwrap();
let mut inner = self.inner.lock().unwrap_or_else(|poisoned| {
eprintln!("RotatingFileWriter mutex was poisoned, recovering");
poisoned.into_inner()
});
if let Some(ref mut file) = inner.current_file {
file.flush()
} else {
@ -267,7 +276,7 @@ mod tests {
// Parse the expected time
let expected_dt =
NaiveDateTime::parse_from_str(timestamp_str, "%Y-%m-%d_%H-%M-%S").unwrap();
let expected_timestamp = expected_dt.and_local_timezone(Local).single().unwrap();
let expected_timestamp = expected_dt.and_utc();
let expected_duration =
Duration::from_secs(expected_timestamp.timestamp().try_into().unwrap());
let expected_next = UNIX_EPOCH + expected_duration + rotation_duration;
@ -306,7 +315,7 @@ mod tests {
// Should use the latest file (2025-10-26_14-00-00)
let expected_dt =
NaiveDateTime::parse_from_str("2025-10-26_14-00-00", "%Y-%m-%d_%H-%M-%S").unwrap();
let expected_timestamp = expected_dt.and_local_timezone(Local).single().unwrap();
let expected_timestamp = expected_dt.and_utc();
let expected_duration =
Duration::from_secs(expected_timestamp.timestamp().try_into().unwrap());
let expected_next = UNIX_EPOCH + expected_duration + rotation_duration;

View file

@ -1,14 +1,28 @@
use anyhow::{Result, ensure};
use crate::consts::MAX_RELATIVE_PATH_LEN;
/// Sanitize the document's path to allow all clients to create the same path in
/// their filesystem. If we didn't do this server-side, client's would need to
/// deal with mapping invalid names to valid ones and then back.
pub fn sanitize_path(path: &str) -> String {
pub fn sanitize_path(path: &str) -> Result<String> {
// Enforce the length cap at the single chokepoint every create/update
// handler goes through, so clients can't blow up axum's JSON/multipart
// parser with a 1 MB `relative_path` before the handler ever runs.
// The WebSocket cursor handler enforces this separately.
ensure!(
path.len() <= MAX_RELATIVE_PATH_LEN,
"Relative path exceeds the maximum length of {MAX_RELATIVE_PATH_LEN} bytes"
);
let options = sanitize_filename::Options {
truncate: true,
windows: true, // Windows is the lowest common denominator
replacement: "",
};
path.split('/')
let result = path
.split('/')
.map(|part| {
let proposal = sanitize_filename::sanitize_with_options(part, options.clone());
if !part.is_empty() && proposal.is_empty() {
@ -18,7 +32,13 @@ pub fn sanitize_path(path: &str) -> String {
}
})
.collect::<Vec<_>>()
.join("/")
.join("/");
ensure!(
!result.is_empty(),
"Relative path is empty after sanitization"
);
Ok(result)
}
#[cfg(test)]
@ -27,8 +47,32 @@ mod test {
#[test]
fn test_sanitize_path() {
assert_eq!(sanitize_path("/my/path/what?"), "/my/path/what");
assert_eq!(sanitize_path("file (1).md"), "file (1).md");
assert_eq!(sanitize_path("/my/path/\\\\:?"), "/my/path/_");
assert_eq!(sanitize_path("/my/path/what?").unwrap(), "/my/path/what");
assert_eq!(sanitize_path("file (1).md").unwrap(), "file (1).md");
assert_eq!(sanitize_path("/my/path/\\\\:?").unwrap(), "/my/path/_");
}
#[test]
fn test_sanitize_path_empty() {
assert!(sanitize_path("").is_err());
}
#[test]
fn test_sanitize_path_idempotent_simple() {
let mut result = sanitize_path("notes/my file.md").unwrap();
for _ in 0..5 {
result = sanitize_path(&result).unwrap();
}
assert_eq!(result, "notes/my file.md");
}
#[test]
fn test_sanitize_path_idempotent_special_chars() {
let first = sanitize_path("/my/path/what?/file:name<>.md").unwrap();
let mut result = first.clone();
for _ in 0..5 {
result = sanitize_path(&result).unwrap();
}
assert_eq!(result, first);
}
}