Fix lints & format

This commit is contained in:
Andras Schmelczer 2026-05-09 15:28:43 +01:00
parent 6d40097bcd
commit 792f57dc7e
36 changed files with 342 additions and 1687 deletions

View file

@ -118,7 +118,7 @@ impl Cursors {
};
self.broadcasts.send_document_update(
vault_id.clone(),
vault_id,
WebSocketServerMessageWithOrigin::new(WebSocketServerMessage::CursorPositions(
CursorPositionFromServer {
clients: client_cursors,

View file

@ -34,6 +34,10 @@ use super::websocket::{
use crate::config::database_config::DatabaseConfig;
use crate::consts::IDLE_POOL_TIMEOUT;
fn duration_millis_u64(duration: Duration) -> u64 {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
}
/// Holds separate reader and writer pools for a single vault.
/// The writer pool has exactly 1 connection so writes never compete
/// with reads for pool slots.
@ -182,7 +186,7 @@ fn rollback_before_acquire(
impl Database {
fn now_ms(&self) -> u64 {
self.epoch.elapsed().as_millis() as u64
duration_millis_u64(self.epoch.elapsed())
}
pub async fn try_new(
@ -817,8 +821,7 @@ impl Database {
} else {
WebSocketServerMessageWithOrigin::with_origin(version.device_id.clone(), envelope)
};
self.broadcasts
.send_document_update(vault_id.clone(), with_origin);
self.broadcasts.send_document_update(vault_id, with_origin);
Ok(())
}
@ -831,7 +834,7 @@ impl Database {
let idle_pools: Vec<(VaultId, Arc<VaultPool>)> = {
let mut pools = self.connection_pools.lock().await;
let now_ms = self.now_ms();
let idle_threshold_ms = IDLE_POOL_TIMEOUT.as_millis() as u64;
let idle_threshold_ms = duration_millis_u64(IDLE_POOL_TIMEOUT);
let vaults_to_remove: Vec<VaultId> = pools
.iter()

View file

@ -83,7 +83,6 @@ pub struct DocumentVersion {
pub device_id: DeviceId,
}
impl From<StoredDocumentVersion> for DocumentVersion {
fn from(value: StoredDocumentVersion) -> Self {
Self {

View file

@ -10,7 +10,7 @@ use crate::{
},
config::user_config::User,
errors::{SyncServerError, client_error, server_error, unauthenticated_error},
server::auth::auth,
server::auth::authenticate_for_vault,
};
pub struct AuthenticatedWebSocketHandshake {
@ -30,7 +30,7 @@ pub fn get_authenticated_handshake(
match message {
WebSocketClientMessage::Handshake(handshake) => {
let user = auth(state, handshake.token.trim(), vault_id)?;
let user = authenticate_for_vault(state, handshake.token.trim(), vault_id)?;
Ok(AuthenticatedWebSocketHandshake { handshake, user })
}
WebSocketClientMessage::CursorPositions(_) => Err(unauthenticated_error(

View file

@ -79,10 +79,7 @@ impl IntoResponse for SyncServerError {
Self::InitError(_) | Self::ServerError(_) => {
error!("{serialized}");
}
Self::ClientError(_) | Self::NotFound(_) => {
warn!("{serialized}");
}
Self::TooManyRequests(_) => {
Self::ClientError(_) | Self::NotFound(_) | Self::TooManyRequests(_) => {
warn!("{serialized}");
}
Self::Unauthenticated(_) | Self::PermissionDeniedError(_) => {}

View file

@ -14,7 +14,7 @@ use cli::args::Args;
use config::Config;
use consts::DEFAULT_CONFIG_PATH;
use errors::{SyncServerError, init_error};
use log::info;
use log::{error, info, warn};
use server::create_server;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{EnvFilter, fmt::format, layer::SubscriberExt, util::SubscriberInitExt};
@ -36,30 +36,63 @@ async fn main() -> ExitCode {
.map_err(init_error)
{
Ok(config) => config,
Err(e) => {
eprintln!("{}", e.serialize());
return ExitCode::FAILURE;
Err(error) => {
return exit_with_startup_error(&args, &error);
}
};
let result = async {
config.validate().map_err(init_error)?;
// Hold the non-blocking writer guards until shutdown so the
// dedicated writer threads stay alive and flush queued log lines.
let _log_guards = set_up_logging(&args, &config.logging)?;
start_server(config).await
if let Err(error) = config.validate().map_err(init_error) {
return exit_with_startup_error(&args, &error);
}
.await;
match result {
// Hold the non-blocking writer guards until shutdown so the dedicated
// writer threads stay alive and flush queued log lines.
let _log_guards = match set_up_logging(&args, &config.logging) {
Ok(log_guards) => log_guards,
Err(error) => {
return exit_with_startup_error(&args, &error);
}
};
match start_server(config).await {
Ok(()) => ExitCode::SUCCESS,
Err(e) => {
eprintln!("{}", e.serialize());
Err(error) => {
let serialized = error.serialize();
warn!("{serialized}");
ExitCode::FAILURE
}
}
}
fn exit_with_startup_error(args: &Args, err: &SyncServerError) -> ExitCode {
let _ = set_up_stderr_logging(args);
let serialized = err.serialize();
error!("{serialized}");
ExitCode::FAILURE
}
fn set_up_stderr_logging(args: &Args) -> Result<(), SyncServerError> {
let env_filter = EnvFilter::builder()
.with_default_directive(tracing::Level::WARN.into())
.from_env()
.context("Failed to create logging env filter")
.map_err(init_error)?;
let stderr_layer = tracing_subscriber::fmt::layer()
.with_ansi(args.color.use_colors())
.with_writer(std::io::stderr)
.event_format(format().compact());
tracing_subscriber::registry()
.with(env_filter)
.with(stderr_layer)
.try_init()
.context("Failed to initialise fallback tracing")
.map_err(init_error)
}
fn set_up_logging(
args: &Args,
logging_config: &config::logging_config::LoggingConfig,

View file

@ -4,8 +4,6 @@ mod delete_document;
mod device_id_header;
mod fetch_document_version;
mod fetch_document_version_content;
mod fetch_latest_document_version;
mod fetch_latest_documents;
mod index;
mod ping;
mod rate_limit;
@ -14,13 +12,14 @@ mod responses;
mod update_document;
mod websocket;
use anyhow::{Context as _, Result};
use anyhow::{Context as _, Result, anyhow};
use auth::auth_middleware;
use axum::{
Router,
extract::{DefaultBodyLimit, Request},
http::{self, HeaderValue, Method},
middleware,
response::IntoResponse,
routing::{IntoMakeService, delete, get, post, put},
};
use device_id_header::DEVICE_ID_HEADER_NAME;
@ -42,6 +41,7 @@ use crate::{
app_state::AppState,
config::{Config, server_config::ServerConfig},
consts::GRACEFUL_SHUTDOWN_TIMEOUT,
errors::not_found_error,
};
pub async fn create_server(config: Config) -> Result<()> {
@ -95,6 +95,7 @@ pub async fn create_server(config: Config) -> Result<()> {
.on_failure(DefaultOnFailure::new().level(Level::ERROR)),
)
.with_state(app_state.clone())
.fallback(handle_404)
.into_make_service();
start_server(app, &server_config, app_state).await
@ -131,18 +132,10 @@ fn build_cors_layer(server_config: &ServerConfig) -> Result<CorsLayer> {
fn get_authed_routes(app_state: AppState) -> Router<AppState> {
Router::new()
.route(
"/vaults/:vault_id/documents",
get(fetch_latest_documents::fetch_latest_documents),
)
.route(
"/vaults/:vault_id/documents",
post(create_document::create_document),
)
.route(
"/vaults/:vault_id/documents/:document_id",
get(fetch_latest_document_version::fetch_latest_document_version),
)
.route(
"/vaults/:vault_id/documents/:document_id/binary",
put(update_document::update_binary),
@ -233,3 +226,7 @@ async fn shutdown_signal() {
() = terminate => {},
}
}
async fn handle_404() -> impl IntoResponse {
not_found_error(anyhow!("Endpoint not found"))
}

View file

@ -34,7 +34,7 @@ pub async fn auth_middleware(
.ok_or_else(|| unauthenticated_error(anyhow::anyhow!("Missing vault_id")))?,
);
let user = auth(&state, token, &vault_id)?;
let user = authenticate_for_vault(&state, token, &vault_id)?;
req.extensions_mut().insert(user);
@ -50,7 +50,11 @@ pub fn authenticate(state: &AppState, token: &str) -> Result<User, SyncServerErr
.ok_or_else(|| unauthenticated_error(anyhow::anyhow!("Invalid token")))
}
pub fn auth(state: &AppState, token: &str, vault_id: &VaultId) -> Result<User, SyncServerError> {
pub fn authenticate_for_vault(
state: &AppState,
token: &str,
vault_id: &VaultId,
) -> Result<User, SyncServerError> {
let user = authenticate(state, token)?;
if match user.vault_access {

View file

@ -136,9 +136,7 @@ pub async fn create_document(
{
info!(
"Lost-create recovery: binding retry at `{sanitized_relative_path}` to existing doc {} (was at `{}`) in vault `{vault_id}` for device `{}`",
lost_create.document_id,
lost_create.relative_path,
device_id.0
lost_create.document_id, lost_create.relative_path, device_id.0
);
return update_document::update_document(
&sanitized_relative_path,

View file

@ -136,8 +136,7 @@ async fn websocket(
// catch-up and in a contended-then-released broadcast is
// delivered exactly once (via the catch-up).
let send_guard = state.broadcasts.acquire_send_lock(&vault_id).await;
let mut broadcast_receiver = match state.broadcasts.get_receiver(vault_id.clone(), max_clients)
{
let mut broadcast_receiver = match state.broadcasts.get_receiver(&vault_id, max_clients) {
Ok(receiver) => receiver,
Err(err) => {
drop(send_guard);