From a9ce09b59d398b739cdc00ddf6c7d79ab357ff31 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Fri, 8 May 2026 21:35:18 +0100 Subject: [PATCH] split: server foundation (Cargo, config, errors, utils, main) Cargo.{toml,lock} bumps, build.rs, config-e2e.yml, rust-toolchain.toml, src/config/* (database/logging/server/user configs), src/consts.rs, src/errors.rs, src/main.rs, and src/utils/* (dedup_paths, find_first_available_path, rotating_file_writer, sanitize_path). --- sync-server/Cargo.lock | 193 ++++++++++++++++-- sync-server/Cargo.toml | 10 +- sync-server/build.rs | 13 +- sync-server/config-e2e.yml | 40 ++-- sync-server/rust-toolchain.toml | 2 +- sync-server/src/config.rs | 42 ++-- sync-server/src/config/database_config.rs | 19 ++ sync-server/src/config/logging_config.rs | 19 +- sync-server/src/config/server_config.rs | 75 ++++++- sync-server/src/config/user_config.rs | 20 +- sync-server/src/consts.rs | 22 +- sync-server/src/errors.rs | 45 +++- sync-server/src/main.rs | 28 ++- sync-server/src/utils/dedup_paths.rs | 16 +- .../src/utils/find_first_available_path.rs | 11 +- sync-server/src/utils/rotating_file_writer.rs | 23 ++- sync-server/src/utils/sanitize_path.rs | 56 ++++- 17 files changed, 535 insertions(+), 99 deletions(-) diff --git a/sync-server/Cargo.lock b/sync-server/Cargo.lock index b3da1486..82a7ce92 100644 --- a/sync-server/Cargo.lock +++ b/sync-server/Cargo.lock @@ -337,10 +337,11 @@ checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" [[package]] name = "cc" -version = "1.2.2" +version = "1.2.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f34d93e62b03caf570cccc334cbc6c2fceca82f39211051345108adcba3eebdc" +checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423" dependencies = [ + "find-msvc-tools", "shlex", ] @@ -456,6 +457,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.11" @@ -533,6 +543,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "deranged" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" +dependencies = [ + "powerfmt", +] + [[package]] name = "digest" version = "0.10.7" @@ -624,6 +643,12 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" +[[package]] +name = "find-msvc-tools" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" + [[package]] name = "flume" version = "0.11.1" @@ -1272,6 +1297,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "miniz_oxide" version = "0.8.0" @@ -1335,6 +1370,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-conv" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967" + [[package]] name = "num-integer" version = "0.1.46" @@ -1463,6 +1504,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.20" @@ -1582,12 +1629,12 @@ dependencies = [ [[package]] name = "reconcile-text" -version = "0.8.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "599cf9539996a2a19e501110404c59ba62f4974009f8fb864a8b7151c15ee5a5" +checksum = "52e0cf361887ea64c479ca871c1170dda761f84e122f2616b5579906a38d7557" dependencies = [ "serde", - "thiserror 2.0.17", + "thiserror 2.0.18", ] [[package]] @@ -1648,6 +1695,40 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rust-embed" +version = "8.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04113cb9355a377d83f06ef1f0a45b8ab8cd7d8b1288160717d66df5c7988d27" +dependencies = [ + "rust-embed-impl", + "rust-embed-utils", + "walkdir", +] + +[[package]] +name = "rust-embed-impl" +version = "8.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0902e4c7c8e997159ab384e6d0fc91c221375f6894346ae107f47dd0f3ccaa" +dependencies = [ + "proc-macro2", + "quote", + "rust-embed-utils", + "syn 2.0.90", + "walkdir", +] + +[[package]] +name = "rust-embed-utils" +version = "8.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bcdef0be6fe7f6fa333b1073c949729274b05f123a0ad7efcb8efd878e5c3b1" +dependencies = [ + "sha2", + "walkdir", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -1679,6 +1760,15 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "sanitize-filename" version = "0.6.0" @@ -1916,7 +2006,7 @@ dependencies = [ "serde_json", "sha2", "smallvec", - "thiserror 2.0.17", + "thiserror 2.0.18", "tokio", "tokio-stream", "tracing", @@ -2000,7 +2090,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror 2.0.17", + "thiserror 2.0.18", "tracing", "uuid", "whoami", @@ -2039,7 +2129,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror 2.0.17", + "thiserror 2.0.18", "tracing", "uuid", "whoami", @@ -2065,7 +2155,7 @@ dependencies = [ "serde", "serde_urlencoded", "sqlx-core", - "thiserror 2.0.17", + "thiserror 2.0.18", "tracing", "url", "uuid", @@ -2100,6 +2190,12 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "symlink" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7973cce6668464ea31f176d85b13c7ab3bba2cb3b77a2ed26abd7801688010a" + [[package]] name = "syn" version = "1.0.109" @@ -2136,18 +2232,22 @@ dependencies = [ "futures", "humantime-serde", "log", + "mime_guess", "rand 0.9.0", "reconcile-text", "regex", + "rust-embed", "sanitize-filename", "serde", "serde_json", "serde_yaml", "sqlx", - "thiserror 2.0.17", + "subtle", + "thiserror 2.0.18", "tokio", "tower-http", "tracing", + "tracing-appender", "tracing-subscriber", "ts-rs", "uuid", @@ -2203,11 +2303,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.17" +version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" +checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl 2.0.17", + "thiserror-impl 2.0.18", ] [[package]] @@ -2223,9 +2323,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.17" +version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" +checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2", "quote", @@ -2242,6 +2342,37 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.3.47" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde_core", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" + +[[package]] +name = "time-macros" +version = "0.2.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinystr" version = "0.7.6" @@ -2276,7 +2407,6 @@ dependencies = [ "bytes", "libc", "mio", - "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", @@ -2376,6 +2506,19 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "050686193eb999b4bb3bc2acfa891a13da00f79734704c4b8b4ef1a10b368a3c" +dependencies = [ + "crossbeam-channel", + "symlink", + "thiserror 2.0.18", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.28" @@ -2434,7 +2577,7 @@ checksum = "e640d9b0964e9d39df633548591090ab92f7a4567bc31d3891af23471a3365c6" dependencies = [ "chrono", "lazy_static", - "thiserror 2.0.17", + "thiserror 2.0.18", "ts-rs-macros", "uuid", ] @@ -2481,6 +2624,12 @@ version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f720def6ce1ee2fc44d40ac9ed6d3a59c361c80a75a7aa8e75bb9baed31cf2ea" +[[package]] +name = "unicase" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" + [[package]] name = "unicode-bidi" version = "0.3.17" @@ -2577,6 +2726,16 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/sync-server/Cargo.toml b/sync-server/Cargo.toml index fac06efa..6de17653 100644 --- a/sync-server/Cargo.toml +++ b/sync-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sync_server" -rust-version = "1.89.0" +rust-version = "1.94.0" authors = ["Andras Schmelczer "] edition = "2024" license = "MIT" @@ -10,7 +10,7 @@ version = "0.14.0" [dependencies] serde = { version = "1.0.219", default-features = false, features = ["derive"] } thiserror = { version = "2.0.12", default-features = false } -tokio = { version = "1.48.0", features = ["full"]} +tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread", "sync", "time", "net", "fs", "signal"]} uuid = { version = "1.16.0", features = ["v4", "serde"] } log = { version = "0.4.28" } anyhow = { version = "1.0.100", features = ["backtrace"] } @@ -20,6 +20,7 @@ axum_typed_multipart = "0.11.0" tower-http = { version = "0.6.1", features = ["cors", "trace", "limit", "timeout"] } tracing = "0.1.41" tracing-subscriber = { version = "0.3.20", features = ["fmt", "env-filter"]} +tracing-appender = "0.2.5" humantime-serde = "1.1.1" sqlx = { version = "0.8.6", features = ["sqlite", "runtime-tokio", "uuid", "chrono"] } chrono = { version = "0.4.41", features = ["serde"] } @@ -33,7 +34,10 @@ serde_json = "1.0.140" bimap = "0.6.3" ts-rs = { version = "10.1", features = ["uuid-impl", "chrono-impl"] } base64 = "0.22.1" -reconcile-text = { version = "0.8.0", features = ["serde"] } +reconcile-text = { version = "0.11.0", features = ["serde"] } +rust-embed = "8.5" +mime_guess = "2.0" +subtle = "2.6.1" [profile.release] codegen-units = 1 diff --git a/sync-server/build.rs b/sync-server/build.rs index d5068697..53bd111b 100644 --- a/sync-server/build.rs +++ b/sync-server/build.rs @@ -1,5 +1,16 @@ -// generated by `sqlx migrate build-script` fn main() { // trigger recompilation when a new migration is added println!("cargo:rerun-if-changed=migrations"); + + // Ensure the history-ui dist directory exists so rust-embed can compile + // even when the frontend hasn't been built yet. + let dist_path = std::path::Path::new("../frontend/history-ui/dist"); + if !dist_path.exists() { + std::fs::create_dir_all(dist_path).expect("Failed to create history-ui dist directory"); + std::fs::write( + dist_path.join("index.html"), + "

Run npm run build -w history-ui first.

", + ) + .expect("Failed to write placeholder index.html"); + } } diff --git a/sync-server/config-e2e.yml b/sync-server/config-e2e.yml index 1f235b01..03b860b7 100644 --- a/sync-server/config-e2e.yml +++ b/sync-server/config-e2e.yml @@ -1,32 +1,34 @@ database: - databases_directory_path: databases - max_connections_per_vault: 12 + databases_directory_path: /host/tmp/vaultlink-e2e-databases + max_connections_per_vault: 8 cursor_timeout: 1m server: host: 0.0.0.0 - port: 3000 + port: 3010 max_body_size_mb: 512 max_clients_per_vault: 256 + max_pending_websocket_connections: 4096 + broadcast_channel_capacity: 1024 response_timeout: 30m mergeable_file_extensions: - - md - - txt + - md + - txt users: user_configs: - - name: admin - token: test-token-change-me - vault_access: - type: allow_access_to_all - - name: other-admin - token: test-token-change-me2 - vault_access: - type: allow_access_to_all - - name: test - token: other-test-token - vault_access: - type: allow_list - allowed: - - default + - name: admin + token: test-token-change-me + vault_access: + type: allow_access_to_all + - name: other-admin + token: test-token-change-me2 + vault_access: + type: allow_access_to_all + - name: test + token: other-test-token + vault_access: + type: allow_list + allowed: + - default logging: log_directory: logs log_rotation: 7days diff --git a/sync-server/rust-toolchain.toml b/sync-server/rust-toolchain.toml index 010956cc..567721ef 100644 --- a/sync-server/rust-toolchain.toml +++ b/sync-server/rust-toolchain.toml @@ -1,5 +1,5 @@ [toolchain] -channel = "1.89.0" +channel = "1.94.0" targets = [ "x86_64-unknown-linux-gnu", "x86_64-unknown-linux-musl", diff --git a/sync-server/src/config.rs b/sync-server/src/config.rs index 6a003d2e..26b11a4c 100644 --- a/sync-server/src/config.rs +++ b/sync-server/src/config.rs @@ -27,24 +27,34 @@ pub struct Config { } impl Config { + pub fn validate(&self) -> Result<()> { + self.server + .validate() + .context("Invalid server configuration")?; + self.logging + .validate() + .context("Invalid logging configuration")?; + self.database + .validate() + .context("Invalid database configuration")?; + Ok(()) + } + pub async fn read_or_create(path: &Path) -> Result { - let config = if path.exists() { - info!( - "Loading configuration from `{}`", - path.canonicalize().unwrap().display() - ); - Self::load_from_file(path).await? + let display_path = path.canonicalize().unwrap_or_else(|_| path.to_path_buf()); + + if path.exists() { + info!("Loading configuration from `{}`", display_path.display()); + Self::load_from_file(path).await } else { - Self::default() - }; - - config.write(path).await?; - info!( - "Updated configuration at `{}`", - path.canonicalize().unwrap().display() - ); - - Ok(config) + let config = Self::default(); + config.write(path).await?; + info!( + "Created default configuration at `{}`", + display_path.display() + ); + Ok(config) + } } pub async fn load_from_file(path: &Path) -> Result { diff --git a/sync-server/src/config/database_config.rs b/sync-server/src/config/database_config.rs index 20a9a21e..a6f57e1f 100644 --- a/sync-server/src/config/database_config.rs +++ b/sync-server/src/config/database_config.rs @@ -1,5 +1,6 @@ use std::{path::PathBuf, time::Duration}; +use anyhow::{Result, ensure}; use log::debug; use serde::{Deserialize, Serialize}; @@ -34,6 +35,24 @@ fn default_cursor_timeout() -> Duration { DEFAULT_CURSOR_TIMEOUT } +impl DatabaseConfig { + pub fn validate(&self) -> Result<()> { + ensure!( + !self.databases_directory_path.as_os_str().is_empty(), + "databases_directory_path must not be empty" + ); + ensure!( + self.max_connections_per_vault > 0, + "max_connections_per_vault must be greater than 0" + ); + ensure!( + !self.cursor_timeout.is_zero(), + "cursor_timeout must be greater than 0" + ); + Ok(()) + } +} + impl Default for DatabaseConfig { fn default() -> Self { Self { diff --git a/sync-server/src/config/logging_config.rs b/sync-server/src/config/logging_config.rs index ad449d1a..dae67288 100644 --- a/sync-server/src/config/logging_config.rs +++ b/sync-server/src/config/logging_config.rs @@ -1,10 +1,13 @@ use std::time::Duration; +use anyhow::{Result, ensure}; use log::debug; use serde::{Deserialize, Serialize}; use crate::{ - consts::{DEFAULT_LOG_DIRECTORY, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_INTERVAL}, + consts::{ + DEFAULT_LOG_DIRECTORY, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_INTERVAL, DURATION_ZERO, + }, utils::log_level::LogLevel, }; @@ -20,6 +23,20 @@ pub struct LoggingConfig { pub log_level: LogLevel, } +impl LoggingConfig { + pub fn validate(&self) -> Result<()> { + ensure!( + !self.log_directory.is_empty(), + "log_directory must not be an empty string" + ); + ensure!( + self.log_rotation > DURATION_ZERO, + "log_rotation must be greater than 0" + ); + Ok(()) + } +} + impl Default for LoggingConfig { fn default() -> Self { Self { diff --git a/sync-server/src/config/server_config.rs b/sync-server/src/config/server_config.rs index 4a9da0f4..715d216c 100644 --- a/sync-server/src/config/server_config.rs +++ b/sync-server/src/config/server_config.rs @@ -1,10 +1,13 @@ +use anyhow::{Result, ensure}; use log::debug; use serde::{Deserialize, Serialize}; use std::time::Duration; use crate::consts::{ - DEFAULT_HOST, DEFAULT_MAX_BODY_SIZE_MB, DEFAULT_MAX_CLIENTS_PER_VAULT, - DEFAULT_MERGEABLE_FILE_EXTENSIONS, DEFAULT_PORT, DEFAULT_RESPONSE_TIMEOUT_SECONDS, + DEFAULT_ALLOWED_ORIGINS, DEFAULT_BROADCAST_CHANNEL_CAPACITY, DEFAULT_HOST, + DEFAULT_MAX_BODY_SIZE_MB, DEFAULT_MAX_CLIENTS_PER_VAULT, DEFAULT_MAX_PENDING_WS_CONNECTIONS, + DEFAULT_MERGEABLE_FILE_EXTENSIONS, DEFAULT_PORT, DEFAULT_RATE_LIMIT_PER_USER_PER_SECOND, + DEFAULT_RESPONSE_TIMEOUT_SECONDS, DURATION_ZERO, }; #[derive(Debug, Deserialize, Serialize, Clone, Default)] @@ -21,11 +24,56 @@ pub struct ServerConfig { #[serde(default = "default_max_clients_per_vault")] pub max_clients_per_vault: usize, + #[serde(default = "default_broadcast_channel_capacity")] + pub broadcast_channel_capacity: usize, + #[serde(default = "default_response_timeout", with = "humantime_serde")] pub response_timeout: Duration, #[serde(default = "default_mergeable_file_extensions")] pub mergeable_file_extensions: Vec, + + /// Per-user maximum requests per second (keyed by bearer token). + /// `None` disables rate limiting. + #[serde(default = "default_rate_limit_per_user_per_second")] + pub rate_limit_per_user_per_second: Option, + + /// Allowed CORS origins. Default: `["*"]` (allow all). + #[serde(default = "default_allowed_origins")] + pub allowed_origins: Vec, + + /// Maximum concurrent unauthenticated WebSocket connections waiting for + /// handshake. Limits resource consumption from clients that connect but + /// never authenticate. + #[serde(default = "default_max_pending_websocket_connections")] + pub max_pending_websocket_connections: usize, +} + +impl ServerConfig { + pub fn validate(&self) -> Result<()> { + ensure!( + self.response_timeout > DURATION_ZERO, + "response_timeout must be greater than 0" + ); + ensure!( + self.max_body_size_mb > 0, + "max_body_size_mb must be greater than 0" + ); + ensure!( + self.max_clients_per_vault > 0, + "max_clients_per_vault must be greater than 0" + ); + ensure!( + self.broadcast_channel_capacity > 0, + "broadcast_channel_capacity must be greater than 0" + ); + ensure!( + self.max_pending_websocket_connections > 0, + "max_pending_websocket_connections must be greater than 0" + ); + + Ok(()) + } } fn default_host() -> String { @@ -48,6 +96,11 @@ fn default_max_clients_per_vault() -> usize { DEFAULT_MAX_CLIENTS_PER_VAULT } +fn default_broadcast_channel_capacity() -> usize { + debug!("Using default broadcast channel capacity: {DEFAULT_BROADCAST_CHANNEL_CAPACITY}"); + DEFAULT_BROADCAST_CHANNEL_CAPACITY +} + fn default_response_timeout() -> Duration { debug!("Using default response timeout: {DEFAULT_RESPONSE_TIMEOUT_SECONDS:?}"); DEFAULT_RESPONSE_TIMEOUT_SECONDS @@ -60,3 +113,21 @@ fn default_mergeable_file_extensions() -> Vec { .map(|s| (*s).to_owned()) .collect() } + +fn default_rate_limit_per_user_per_second() -> Option { + debug!("Using default rate limit per second: {DEFAULT_RATE_LIMIT_PER_USER_PER_SECOND:?}"); + DEFAULT_RATE_LIMIT_PER_USER_PER_SECOND +} + +fn default_allowed_origins() -> Vec { + debug!("Using default allowed origins: {DEFAULT_ALLOWED_ORIGINS:?}"); + DEFAULT_ALLOWED_ORIGINS + .iter() + .map(|s| (*s).to_owned()) + .collect() +} + +fn default_max_pending_websocket_connections() -> usize { + debug!("Using default max pending WebSocket connections: {DEFAULT_MAX_PENDING_WS_CONNECTIONS}"); + DEFAULT_MAX_PENDING_WS_CONNECTIONS +} diff --git a/sync-server/src/config/user_config.rs b/sync-server/src/config/user_config.rs index 8b2537f0..fd824f39 100644 --- a/sync-server/src/config/user_config.rs +++ b/sync-server/src/config/user_config.rs @@ -1,6 +1,7 @@ use bimap::BiHashMap; use rand::{Rng, distr::Alphanumeric, rng}; use serde::{Deserialize, Deserializer, Serialize, de::Error}; +use subtle::ConstantTimeEq; use crate::app_state::database::models::VaultId; @@ -19,10 +20,19 @@ where let mut user_token_map = BiHashMap::new(); for user in &users { if let Some(existing_name) = user_token_map.get_by_right(&user.token) { + let redacted = if user.token.len() > 6 { + format!( + "{}...{}", + &user.token[..3], + &user.token[user.token.len() - 3..] + ) + } else { + "***".to_owned() + }; return Err(D::Error::custom(format!( - "Duplicate user token found: `{}` for users `{}` and `{}`. User tokens must be \ - unique.", - user.token, existing_name, user.name + "Duplicate user token found: `{redacted}` for users `{}` and `{}`. User tokens \ + must be unique.", + existing_name, user.name ))); } @@ -41,7 +51,9 @@ where impl UserConfig { pub fn get_user(&self, token: &str) -> Option<&User> { - self.user_configs.iter().find(|u| u.token == token) + self.user_configs + .iter() + .find(|u| u.token.as_bytes().ct_eq(token.as_bytes()).into()) } } diff --git a/sync-server/src/consts.rs b/sync-server/src/consts.rs index 98ed1c1f..e03b848f 100644 --- a/sync-server/src/consts.rs +++ b/sync-server/src/consts.rs @@ -2,22 +2,36 @@ use std::time::Duration; use crate::utils::log_level::LogLevel; +pub const DURATION_ZERO: Duration = Duration::from_secs(0); + pub const DEFAULT_CONFIG_PATH: &str = "config.yml"; pub const DEFAULT_DATABASES_DIRECTORY_PATH: &str = "databases"; -pub const DEFAULT_MAX_CONNECTIONS_PER_VAULT: u32 = 12; +pub const DEFAULT_MAX_CONNECTIONS_PER_VAULT: u32 = 6; pub const DEFAULT_CURSOR_TIMEOUT: Duration = Duration::from_secs(60); pub const DEFAULT_HOST: &str = "127.0.0.1"; pub const DEFAULT_PORT: u16 = 3000; pub const DEFAULT_MAX_BODY_SIZE_MB: usize = 4096; -pub const DEFAULT_RESPONSE_TIMEOUT_SECONDS: Duration = Duration::from_secs(1800); +pub const DEFAULT_RESPONSE_TIMEOUT_SECONDS: Duration = Duration::from_mins(30); pub const DEFAULT_MAX_CLIENTS_PER_VAULT: usize = 256; +pub const DEFAULT_BROADCAST_CHANNEL_CAPACITY: usize = 4096; +pub const DEFAULT_MAX_PENDING_WS_CONNECTIONS: usize = 128; pub const DEFAULT_LOG_DIRECTORY: &str = "logs"; -pub const DEFAULT_LOG_ROTATION_INTERVAL: Duration = Duration::from_secs(60 * 60 * 24); // 1 day +pub const DEFAULT_LOG_ROTATION_INTERVAL: Duration = Duration::from_hours(24); +pub const IDLE_POOL_TIMEOUT: Duration = Duration::from_mins(5); +pub const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10); +pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10); + +pub const MAX_CURSOR_DOCUMENTS: usize = 1000; +pub const MAX_CURSORS_PER_DOCUMENT: usize = 100; +pub const MAX_RELATIVE_PATH_LEN: usize = 4096; + pub const DEFAULT_LOG_LEVEL: LogLevel = LogLevel::Info; pub const DEFAULT_MERGEABLE_FILE_EXTENSIONS: &[&str] = &["md", "txt"]; -pub const SUPPORTED_API_VERSION: u32 = 2; +pub const DEFAULT_RATE_LIMIT_PER_USER_PER_SECOND: Option = None; +pub const DEFAULT_ALLOWED_ORIGINS: &[&str] = &["*"]; +pub const SUPPORTED_API_VERSION: u32 = 3; diff --git a/sync-server/src/errors.rs b/sync-server/src/errors.rs index 831b0e86..892db36f 100644 --- a/sync-server/src/errors.rs +++ b/sync-server/src/errors.rs @@ -5,7 +5,7 @@ use axum::{ http::StatusCode, response::{IntoResponse, Response}, }; -use log::{debug, error}; +use log::{debug, error, warn}; use serde::Serialize; use thiserror::Error; use ts_rs::TS; @@ -29,6 +29,9 @@ pub enum SyncServerError { #[error("Permission denied error: {0}")] PermissionDeniedError(#[source] anyhow::Error), + + #[error("Too many requests: {0}")] + TooManyRequests(#[source] anyhow::Error), } impl SyncServerError { @@ -39,7 +42,8 @@ impl SyncServerError { | Self::ServerError(error) | Self::NotFound(error) | Self::Unauthenticated(error) - | Self::PermissionDeniedError(error) => error.into(), + | Self::PermissionDeniedError(error) + | Self::TooManyRequests(error) => error.into(), } } } @@ -69,7 +73,22 @@ impl Display for SerializedError { impl IntoResponse for SyncServerError { fn into_response(self) -> Response { - let body = Json(self.serialize()); + let serialized = self.serialize(); + + match &self { + Self::InitError(_) | Self::ServerError(_) => { + error!("{serialized}"); + } + Self::ClientError(_) | Self::NotFound(_) => { + warn!("{serialized}"); + } + Self::TooManyRequests(_) => { + warn!("{serialized}"); + } + Self::Unauthenticated(_) | Self::PermissionDeniedError(_) => {} + } + + let body = Json(serialized); match self { Self::InitError(_) | Self::ServerError(_) => { @@ -79,6 +98,7 @@ impl IntoResponse for SyncServerError { Self::NotFound(_) => (StatusCode::NOT_FOUND, body).into_response(), Self::Unauthenticated(_) => (StatusCode::UNAUTHORIZED, body).into_response(), Self::PermissionDeniedError(_) => (StatusCode::FORBIDDEN, body).into_response(), + Self::TooManyRequests(_) => (StatusCode::TOO_MANY_REQUESTS, body).into_response(), } } } @@ -102,6 +122,7 @@ impl From<&anyhow::Error> for SerializedError { SyncServerError::NotFound(_) => "NotFound", SyncServerError::Unauthenticated(_) => "Unauthenticated", SyncServerError::PermissionDeniedError(_) => "PermissionDeniedError", + SyncServerError::TooManyRequests(_) => "TooManyRequests", }, ), message: error.to_string(), @@ -139,3 +160,21 @@ pub fn permission_denied_error(error: anyhow::Error) -> SyncServerError { debug!("Permission denied: {error:?}"); SyncServerError::PermissionDeniedError(error) } + +pub fn too_many_requests_error(error: anyhow::Error) -> SyncServerError { + debug!("Too many requests: {error:?}"); + SyncServerError::TooManyRequests(error) +} + +/// Maps a `create_write_transaction` error to 429 if the database is busy, +/// or 500 for all other failures. +pub fn write_transaction_error(error: anyhow::Error) -> SyncServerError { + if error + .downcast_ref::() + .is_some() + { + too_many_requests_error(error) + } else { + server_error(error) + } +} diff --git a/sync-server/src/main.rs b/sync-server/src/main.rs index 1285ed7b..dc00d4d5 100644 --- a/sync-server/src/main.rs +++ b/sync-server/src/main.rs @@ -16,6 +16,7 @@ use consts::DEFAULT_CONFIG_PATH; use errors::{SyncServerError, init_error}; use log::info; use server::create_server; +use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{EnvFilter, fmt::format, layer::SubscriberExt, util::SubscriberInitExt}; use utils::rotating_file_writer::RotatingFileWriter; @@ -41,11 +42,14 @@ async fn main() -> ExitCode { } }; - let mut result = set_up_logging(&args, &config.logging); - - if result.is_ok() { - result = start_server(config).await; + let result = async { + config.validate().map_err(init_error)?; + // Hold the non-blocking writer guards until shutdown so the + // dedicated writer threads stay alive and flush queued log lines. + let _log_guards = set_up_logging(&args, &config.logging)?; + start_server(config).await } + .await; match result { Ok(()) => ExitCode::SUCCESS, @@ -59,7 +63,7 @@ async fn main() -> ExitCode { fn set_up_logging( args: &Args, logging_config: &config::logging_config::LoggingConfig, -) -> Result<(), SyncServerError> { +) -> Result<[WorkerGuard; 2], SyncServerError> { let level_filter = logging_config.log_level.as_tracing_level(); let env_filter = EnvFilter::builder() @@ -80,6 +84,14 @@ fn set_up_logging( .context("Failed to create rotating file writer") .map_err(init_error)?; + // Decouple log emission from disk/stderr I/O. Without this, a tokio + // worker that holds the writer's std::sync::Mutex while a `write(2)` + // is throttled by the kernel (e.g. btrfs writeback) cascades the + // stall to every other worker that tries to log, freezing the whole + // runtime. The guards must outlive every emitter. + let (file_writer, file_guard) = tracing_appender::non_blocking(file_appender); + let (stderr_writer, stderr_guard) = tracing_appender::non_blocking(std::io::stderr()); + let format = format() .with_target(is_debug_mode) .with_line_number(is_debug_mode) @@ -87,12 +99,12 @@ fn set_up_logging( let stderr_layer = tracing_subscriber::fmt::layer() .with_ansi(use_colors) - .with_writer(std::io::stderr) + .with_writer(stderr_writer) .event_format(format.clone()); let file_layer = tracing_subscriber::fmt::layer() .with_ansi(false) - .with_writer(file_appender) + .with_writer(file_writer) .event_format(format); tracing_subscriber::registry() @@ -103,7 +115,7 @@ fn set_up_logging( .context("Failed to initialise tracing") .map_err(init_error)?; - Ok(()) + Ok([file_guard, stderr_guard]) } async fn start_server(config: Config) -> Result<(), SyncServerError> { diff --git a/sync-server/src/utils/dedup_paths.rs b/sync-server/src/utils/dedup_paths.rs index bc687f6a..0baf8ba8 100644 --- a/sync-server/src/utils/dedup_paths.rs +++ b/sync-server/src/utils/dedup_paths.rs @@ -1,8 +1,17 @@ +use std::sync::LazyLock; + use regex::Regex; +static DEDUP_SUFFIX_REGEX: LazyLock = + LazyLock::new(|| Regex::new(r" \((\d+)\)$").expect("invalid regex")); + pub fn dedup_paths(path: &str) -> impl Iterator { let mut path_parts = path.split('/').collect::>(); - let file_name = path_parts.pop().unwrap().to_owned(); + let file_name = path_parts + .pop() + .filter(|s| !s.is_empty()) + .unwrap_or(path) + .to_owned(); let mut directory = path_parts.join("/"); if !directory.is_empty() { @@ -29,14 +38,13 @@ pub fn dedup_paths(path: &str) -> impl Iterator { } }; - let regex = Regex::new(r" \((\d+)\)$").unwrap(); - let start_number = regex + let start_number = DEDUP_SUFFIX_REGEX .captures(&stem) .and_then(|caps| caps.get(1)) .and_then(|m| m.as_str().parse::().ok()) .unwrap_or(0); - let clean_stem = regex.replace(&stem, "").to_string(); + let clean_stem = DEDUP_SUFFIX_REGEX.replace(&stem, "").to_string(); (start_number..).map(move |dedup_number| { if dedup_number == 0 { diff --git a/sync-server/src/utils/find_first_available_path.rs b/sync-server/src/utils/find_first_available_path.rs index 7629d8f1..eddd81d2 100644 --- a/sync-server/src/utils/find_first_available_path.rs +++ b/sync-server/src/utils/find_first_available_path.rs @@ -1,25 +1,30 @@ use crate::app_state::database::models::VaultId; -use crate::{app_state::database::Transaction, utils::dedup_paths::dedup_paths}; +use crate::utils::dedup_paths::dedup_paths; use anyhow::Result; use log::{debug, info}; +use sqlx::sqlite::SqliteConnection; pub async fn find_first_available_path( vault_id: &VaultId, sanitized_relative_path: &str, database: &crate::app_state::database::Database, - transaction: &mut Transaction<'_>, + connection: &mut SqliteConnection, ) -> Result { info!("Finding first available path for `{sanitized_relative_path}` in vault `{vault_id}`"); for candidate in dedup_paths(sanitized_relative_path) { debug!("Checking candidate path for deconflicting names: `{candidate}`"); if database - .get_latest_document_by_path(vault_id, &candidate, Some(transaction)) + .get_latest_non_deleted_document_by_path(vault_id, &candidate, Some(connection)) .await? .is_none() { info!("Selected available path: `{candidate}`"); return Ok(candidate); } + + info!( + "Finding first available path for `{sanitized_relative_path}` in vault `{vault_id}` as `{candidate}` is already taken" + ); } unreachable!("dedup_paths produces infinite paths"); diff --git a/sync-server/src/utils/rotating_file_writer.rs b/sync-server/src/utils/rotating_file_writer.rs index f04f9ba9..1c5c86c5 100644 --- a/sync-server/src/utils/rotating_file_writer.rs +++ b/sync-server/src/utils/rotating_file_writer.rs @@ -6,7 +6,7 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; -use chrono::{Local, NaiveDateTime}; +use chrono::NaiveDateTime; use tracing_subscriber::fmt::MakeWriter; #[derive(Clone)] @@ -55,7 +55,7 @@ impl RotatingFileWriter { let timestamp_str = filename.get(prefix_len..filename.len().checked_sub(4)?)?; let dt = NaiveDateTime::parse_from_str(timestamp_str, "%Y-%m-%d_%H-%M-%S").ok()?; - let timestamp = dt.and_local_timezone(Local).single()?; + let timestamp = dt.and_utc(); let secs: u64 = timestamp.timestamp().try_into().ok()?; Some(UNIX_EPOCH + Duration::from_secs(secs)) @@ -114,7 +114,7 @@ impl RotatingFileWriter { } fn rotate(inner: &mut RotatingFileWriterInner) -> io::Result<()> { - let timestamp = Local::now().format("%Y-%m-%d_%H-%M-%S"); + let timestamp = chrono::Utc::now().format("%Y-%m-%d_%H-%M-%S"); let filename = format!("{}.{}.log", inner.file_prefix, timestamp); let filepath = inner.directory.join(filename); @@ -132,8 +132,14 @@ impl RotatingFileWriter { impl Write for RotatingFileWriter { fn write(&mut self, buf: &[u8]) -> io::Result { - let mut inner = self.inner.lock().unwrap(); + let mut inner = self.inner.lock().unwrap_or_else(|poisoned| { + eprintln!("RotatingFileWriter mutex was poisoned, recovering"); + poisoned.into_inner() + }); + // Reset file handle after poison recovery so the next branch + // re-opens a valid file rather than writing to a potentially + // half-closed handle. if inner.current_file.is_none() { Self::open_or_create_log_file(&mut inner)?; } else if Self::should_rotate(&inner) { @@ -148,7 +154,10 @@ impl Write for RotatingFileWriter { } fn flush(&mut self) -> io::Result<()> { - let mut inner = self.inner.lock().unwrap(); + let mut inner = self.inner.lock().unwrap_or_else(|poisoned| { + eprintln!("RotatingFileWriter mutex was poisoned, recovering"); + poisoned.into_inner() + }); if let Some(ref mut file) = inner.current_file { file.flush() } else { @@ -267,7 +276,7 @@ mod tests { // Parse the expected time let expected_dt = NaiveDateTime::parse_from_str(timestamp_str, "%Y-%m-%d_%H-%M-%S").unwrap(); - let expected_timestamp = expected_dt.and_local_timezone(Local).single().unwrap(); + let expected_timestamp = expected_dt.and_utc(); let expected_duration = Duration::from_secs(expected_timestamp.timestamp().try_into().unwrap()); let expected_next = UNIX_EPOCH + expected_duration + rotation_duration; @@ -306,7 +315,7 @@ mod tests { // Should use the latest file (2025-10-26_14-00-00) let expected_dt = NaiveDateTime::parse_from_str("2025-10-26_14-00-00", "%Y-%m-%d_%H-%M-%S").unwrap(); - let expected_timestamp = expected_dt.and_local_timezone(Local).single().unwrap(); + let expected_timestamp = expected_dt.and_utc(); let expected_duration = Duration::from_secs(expected_timestamp.timestamp().try_into().unwrap()); let expected_next = UNIX_EPOCH + expected_duration + rotation_duration; diff --git a/sync-server/src/utils/sanitize_path.rs b/sync-server/src/utils/sanitize_path.rs index 9703225c..05100f68 100644 --- a/sync-server/src/utils/sanitize_path.rs +++ b/sync-server/src/utils/sanitize_path.rs @@ -1,14 +1,28 @@ +use anyhow::{Result, ensure}; + +use crate::consts::MAX_RELATIVE_PATH_LEN; + /// Sanitize the document's path to allow all clients to create the same path in /// their filesystem. If we didn't do this server-side, client's would need to /// deal with mapping invalid names to valid ones and then back. -pub fn sanitize_path(path: &str) -> String { +pub fn sanitize_path(path: &str) -> Result { + // Enforce the length cap at the single chokepoint every create/update + // handler goes through, so clients can't blow up axum's JSON/multipart + // parser with a 1 MB `relative_path` before the handler ever runs. + // The WebSocket cursor handler enforces this separately. + ensure!( + path.len() <= MAX_RELATIVE_PATH_LEN, + "Relative path exceeds the maximum length of {MAX_RELATIVE_PATH_LEN} bytes" + ); + let options = sanitize_filename::Options { truncate: true, windows: true, // Windows is the lowest common denominator replacement: "", }; - path.split('/') + let result = path + .split('/') .map(|part| { let proposal = sanitize_filename::sanitize_with_options(part, options.clone()); if !part.is_empty() && proposal.is_empty() { @@ -18,7 +32,13 @@ pub fn sanitize_path(path: &str) -> String { } }) .collect::>() - .join("/") + .join("/"); + + ensure!( + !result.is_empty(), + "Relative path is empty after sanitization" + ); + Ok(result) } #[cfg(test)] @@ -27,8 +47,32 @@ mod test { #[test] fn test_sanitize_path() { - assert_eq!(sanitize_path("/my/path/what?"), "/my/path/what"); - assert_eq!(sanitize_path("file (1).md"), "file (1).md"); - assert_eq!(sanitize_path("/my/path/\\\\:?"), "/my/path/_"); + assert_eq!(sanitize_path("/my/path/what?").unwrap(), "/my/path/what"); + assert_eq!(sanitize_path("file (1).md").unwrap(), "file (1).md"); + assert_eq!(sanitize_path("/my/path/\\\\:?").unwrap(), "/my/path/_"); + } + + #[test] + fn test_sanitize_path_empty() { + assert!(sanitize_path("").is_err()); + } + + #[test] + fn test_sanitize_path_idempotent_simple() { + let mut result = sanitize_path("notes/my file.md").unwrap(); + for _ in 0..5 { + result = sanitize_path(&result).unwrap(); + } + assert_eq!(result, "notes/my file.md"); + } + + #[test] + fn test_sanitize_path_idempotent_special_chars() { + let first = sanitize_path("/my/path/what?/file:name<>.md").unwrap(); + let mut result = first.clone(); + for _ in 0..5 { + result = sanitize_path(&result).unwrap(); + } + assert_eq!(result, first); } }