Push down db error returning

This commit is contained in:
Andras Schmelczer 2026-05-11 20:25:37 +01:00
parent ce995cdc33
commit 2d69d4b26d
11 changed files with 172 additions and 129 deletions

View file

@ -5,13 +5,15 @@ use std::{
sync::atomic::{AtomicU64, Ordering}, sync::atomic::{AtomicU64, Ordering},
}; };
use anyhow::{Context as _, Result, anyhow}; use anyhow::{Context as _, Result};
use log::info; use log::info;
use models::{ use models::{
DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId, VaultUpdateId, DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId, VaultUpdateId,
}; };
use sqlx::{ConnectOptions, Connection, sqlite::SqliteConnectOptions, types::chrono::Utc}; use sqlx::{ConnectOptions, Connection, sqlite::SqliteConnectOptions, types::chrono::Utc};
use crate::errors::{SyncServerError, database_error, server_error};
pub mod models; pub mod models;
/// Sentinel error indicating the `SQLite` database is busy (`SQLITE_BUSY`). /// Sentinel error indicating the `SQLite` database is busy (`SQLITE_BUSY`).
@ -20,6 +22,24 @@ pub mod models;
#[error("Database is busy")] #[error("Database is busy")]
pub struct WriteBusyError; pub struct WriteBusyError;
/// Detects whether a `sqlx::Error` indicates the database is currently
/// unavailable for a retryable reason: a `SQLITE_BUSY` from the engine, or
/// a `PoolTimedOut` from our short acquire timeout. Both should surface as
/// 429 so the client retries instead of treating it as a server fault.
pub fn is_sqlite_busy_error(err: &sqlx::Error) -> bool {
match err {
sqlx::Error::Database(db_err) => {
// SQLITE_BUSY base code is 5. Extended codes share base 5.
let busy_by_code = db_err
.code()
.is_some_and(|c| c.parse::<u32>().is_ok_and(|n| n & 0xFF == 5));
busy_by_code || db_err.message().contains("database is locked")
}
sqlx::Error::PoolTimedOut => true,
_ => false,
}
}
use sqlx::{ use sqlx::{
Pool, Sqlite, pool::PoolConnection, sqlite::SqliteConnection, sqlite::SqlitePoolOptions, Pool, Sqlite, pool::PoolConnection, sqlite::SqliteConnection, sqlite::SqlitePoolOptions,
}; };
@ -32,7 +52,7 @@ use super::websocket::{
models::{WebSocketServerMessage, WebSocketVaultUpdate}, models::{WebSocketServerMessage, WebSocketVaultUpdate},
}; };
use crate::config::database_config::DatabaseConfig; use crate::config::database_config::DatabaseConfig;
use crate::consts::IDLE_POOL_TIMEOUT; use crate::consts::{IDLE_POOL_TIMEOUT, POOL_ACQUIRE_TIMEOUT};
fn duration_millis_u64(duration: Duration) -> u64 { fn duration_millis_u64(duration: Duration) -> u64 {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX) u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
@ -87,22 +107,16 @@ impl WriteTransaction {
pool: &Pool<Sqlite>, pool: &Pool<Sqlite>,
write_guard: tokio::sync::OwnedMutexGuard<()>, write_guard: tokio::sync::OwnedMutexGuard<()>,
) -> Result<Self> { ) -> Result<Self> {
let mut conn = pool let mut conn = match pool.acquire().await {
.acquire() Ok(conn) => conn,
.await Err(e) if is_sqlite_busy_error(&e) => return Err(WriteBusyError.into()),
.context("Cannot acquire connection for write transaction")?; Err(e) => {
return Err(anyhow::Error::from(e)
.context("Cannot acquire connection for write transaction"));
}
};
if let Err(e) = sqlx::query("BEGIN IMMEDIATE").execute(&mut *conn).await { if let Err(e) = sqlx::query("BEGIN IMMEDIATE").execute(&mut *conn).await {
let is_busy = match &e { if is_sqlite_busy_error(&e) {
sqlx::Error::Database(db_err) => {
// SQLITE_BUSY base code is 5. Extended codes share base 5.
let busy_by_code = db_err
.code()
.is_some_and(|c| c.parse::<u32>().is_ok_and(|n| n & 0xFF == 5));
busy_by_code || db_err.message().contains("database is locked")
}
_ => false,
};
if is_busy {
return Err(WriteBusyError.into()); return Err(WriteBusyError.into());
} }
return Err(e).context("Cannot begin immediate transaction"); return Err(e).context("Cannot begin immediate transaction");
@ -113,22 +127,24 @@ impl WriteTransaction {
}) })
} }
pub async fn commit(mut self) -> Result<()> { pub async fn commit(mut self) -> Result<(), SyncServerError> {
if let Some(mut conn) = self.conn.take() { if let Some(mut conn) = self.conn.take() {
sqlx::query("COMMIT") sqlx::query("COMMIT")
.execute(&mut *conn) .execute(&mut *conn)
.await .await
.context("Failed to commit transaction")?; .context("Failed to commit transaction")
.map_err(database_error)?;
} }
Ok(()) Ok(())
} }
pub async fn rollback(mut self) -> Result<()> { pub async fn rollback(mut self) -> Result<(), SyncServerError> {
if let Some(mut conn) = self.conn.take() { if let Some(mut conn) = self.conn.take() {
sqlx::query("ROLLBACK") sqlx::query("ROLLBACK")
.execute(&mut *conn) .execute(&mut *conn)
.await .await
.context("Failed to rollback transaction")?; .context("Failed to rollback transaction")
.map_err(database_error)?;
} }
Ok(()) Ok(())
} }
@ -265,10 +281,14 @@ impl Database {
drop(init_conn); drop(init_conn);
// Per-connection PRAGMAs shared by both reader and writer pools. // Per-connection PRAGMAs shared by both reader and writer pools.
// journal_mode = WAL is a no-op on an already-WAL database. // Database-level PRAGMAs (auto_vacuum, journal_mode) are deliberately
// omitted here: they require a write lock to verify or set, so issuing
// them on every new pool connection blocks behind any in-flight writer
// and can fail with SQLITE_BUSY just to open a connection. The init
// connection above set them once; the WAL mode persists in the database
// header, so subsequent opens pick it up automatically.
let base_options = SqliteConnectOptions::new() let base_options = SqliteConnectOptions::new()
.filename(file_name.clone()) .filename(file_name.clone())
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.busy_timeout(Duration::from_secs(30)) .busy_timeout(Duration::from_secs(30))
.log_slow_statements(log::LevelFilter::Warn, Duration::from_secs(30)) .log_slow_statements(log::LevelFilter::Warn, Duration::from_secs(30))
// In WAL mode, NORMAL is safe: data survives OS crashes, only the // In WAL mode, NORMAL is safe: data survives OS crashes, only the
@ -292,6 +312,7 @@ impl Database {
// Reader pool: multiple connections for concurrent reads. // Reader pool: multiple connections for concurrent reads.
let reader = SqlitePoolOptions::new() let reader = SqlitePoolOptions::new()
.max_connections(config.max_connections_per_vault) .max_connections(config.max_connections_per_vault)
.acquire_timeout(POOL_ACQUIRE_TIMEOUT)
.acquire_slow_threshold(Duration::from_secs(30)) .acquire_slow_threshold(Duration::from_secs(30))
// Disabled: the health-check query is subject to busy_timeout // Disabled: the health-check query is subject to busy_timeout
// and blocks all connection checkouts when a write is active, // and blocks all connection checkouts when a write is active,
@ -309,6 +330,7 @@ impl Database {
// reader pool ensures writes never compete with reads for pool slots. // reader pool ensures writes never compete with reads for pool slots.
let writer = SqlitePoolOptions::new() let writer = SqlitePoolOptions::new()
.max_connections(1) .max_connections(1)
.acquire_timeout(POOL_ACQUIRE_TIMEOUT)
.acquire_slow_threshold(Duration::from_secs(30)) .acquire_slow_threshold(Duration::from_secs(30))
.test_before_acquire(false) .test_before_acquire(false)
.before_acquire(rollback_before_acquire) .before_acquire(rollback_before_acquire)
@ -375,7 +397,10 @@ impl Database {
Ok(self.get_vault_pools(vault).await?.reader) Ok(self.get_vault_pools(vault).await?.reader)
} }
pub async fn create_write_transaction(&self, vault: &VaultId) -> Result<WriteTransaction> { pub async fn create_write_transaction(
&self,
vault: &VaultId,
) -> Result<WriteTransaction, SyncServerError> {
let write_lock = { let write_lock = {
let mut locks = self.write_locks.lock().await; let mut locks = self.write_locks.lock().await;
locks locks
@ -384,8 +409,10 @@ impl Database {
.clone() .clone()
}; };
let write_guard = write_lock.lock_owned().await; let write_guard = write_lock.lock_owned().await;
let pools = self.get_vault_pools(vault).await?; let pools = self.get_vault_pools(vault).await.map_err(database_error)?;
WriteTransaction::new(&pools.writer, write_guard).await WriteTransaction::new(&pools.writer, write_guard)
.await
.map_err(database_error)
} }
/// Return the latest state of all documents in the vault, optionally /// Return the latest state of all documents in the vault, optionally
@ -397,7 +424,7 @@ impl Database {
vault: &VaultId, vault: &VaultId,
up_to_vault_update_id: Option<VaultUpdateId>, up_to_vault_update_id: Option<VaultUpdateId>,
connection: Option<&mut SqliteConnection>, connection: Option<&mut SqliteConnection>,
) -> Result<Vec<DocumentVersionWithoutContent>> { ) -> Result<Vec<DocumentVersionWithoutContent>, SyncServerError> {
// `i64::MAX` makes the upper bound a no-op for callers that don't // `i64::MAX` makes the upper bound a no-op for callers that don't
// care about an exact snapshot (they pass `None`). // care about an exact snapshot (they pass `None`).
let upper = up_to_vault_update_id.unwrap_or(i64::MAX); let upper = up_to_vault_update_id.unwrap_or(i64::MAX);
@ -423,7 +450,12 @@ impl Database {
query.fetch_all(&mut *conn).await query.fetch_all(&mut *conn).await
} else { } else {
query query
.fetch_all(&self.get_connection_pool(vault).await?) .fetch_all(
&self
.get_connection_pool(vault)
.await
.map_err(database_error)?,
)
.await .await
} }
.context("Cannot fetch latest documents") .context("Cannot fetch latest documents")
@ -441,6 +473,7 @@ impl Database {
}) })
.collect() .collect()
}) })
.map_err(database_error)
} }
/// Return the latest state of all documents (including deleted) in the /// Return the latest state of all documents (including deleted) in the
@ -454,7 +487,7 @@ impl Database {
vault_update_id: VaultUpdateId, vault_update_id: VaultUpdateId,
up_to_vault_update_id: Option<VaultUpdateId>, up_to_vault_update_id: Option<VaultUpdateId>,
connection: Option<&mut SqliteConnection>, connection: Option<&mut SqliteConnection>,
) -> Result<Vec<DocumentVersionWithoutContent>> { ) -> Result<Vec<DocumentVersionWithoutContent>, SyncServerError> {
// `i64::MAX` makes the upper bound a no-op for callers that don't // `i64::MAX` makes the upper bound a no-op for callers that don't
// care about an exact snapshot (they pass `None`). // care about an exact snapshot (they pass `None`).
let upper = up_to_vault_update_id.unwrap_or(i64::MAX); let upper = up_to_vault_update_id.unwrap_or(i64::MAX);
@ -499,7 +532,12 @@ impl Database {
query.fetch_all(&mut *conn).await query.fetch_all(&mut *conn).await
} else { } else {
query query
.fetch_all(&self.get_connection_pool(vault).await?) .fetch_all(
&self
.get_connection_pool(vault)
.await
.map_err(database_error)?,
)
.await .await
} }
.with_context(|| { .with_context(|| {
@ -519,13 +557,14 @@ impl Database {
}) })
.collect() .collect()
}) })
.map_err(database_error)
} }
pub async fn get_max_update_id_in_vault( pub async fn get_max_update_id_in_vault(
&self, &self,
vault: &VaultId, vault: &VaultId,
connection: Option<&mut SqliteConnection>, connection: Option<&mut SqliteConnection>,
) -> Result<i64> { ) -> Result<i64, SyncServerError> {
let query = sqlx::query!( let query = sqlx::query!(
r#" r#"
select coalesce(max(vault_update_id), 0) as max_vault_update_id select coalesce(max(vault_update_id), 0) as max_vault_update_id
@ -537,11 +576,17 @@ impl Database {
query.fetch_one(&mut *conn).await query.fetch_one(&mut *conn).await
} else { } else {
query query
.fetch_one(&self.get_connection_pool(vault).await?) .fetch_one(
&self
.get_connection_pool(vault)
.await
.map_err(database_error)?,
)
.await .await
} }
.map(|row| row.max_vault_update_id) .map(|row| row.max_vault_update_id)
.context("Cannot fetch max update id in vault") .context("Cannot fetch max update id in vault")
.map_err(database_error)
} }
pub async fn get_latest_non_deleted_document_by_path( pub async fn get_latest_non_deleted_document_by_path(
@ -549,7 +594,7 @@ impl Database {
vault: &VaultId, vault: &VaultId,
relative_path: &str, relative_path: &str,
connection: Option<&mut SqliteConnection>, connection: Option<&mut SqliteConnection>,
) -> Result<Option<StoredDocumentVersion>> { ) -> Result<Option<StoredDocumentVersion>, SyncServerError> {
let query = sqlx::query_as!( let query = sqlx::query_as!(
StoredDocumentVersion, StoredDocumentVersion,
r#" r#"
@ -578,10 +623,16 @@ impl Database {
query.fetch_optional(&mut *conn).await query.fetch_optional(&mut *conn).await
} else { } else {
query query
.fetch_optional(&self.get_connection_pool(vault).await?) .fetch_optional(
&self
.get_connection_pool(vault)
.await
.map_err(database_error)?,
)
.await .await
} }
.context("Cannot fetch latest document version") .context("Cannot fetch latest document version")
.map_err(database_error)
} }
/// Find a doc whose CREATE was authored by this device with /// Find a doc whose CREATE was authored by this device with
@ -611,7 +662,7 @@ impl Database {
last_seen_vault_update_id: VaultUpdateId, last_seen_vault_update_id: VaultUpdateId,
content: &[u8], content: &[u8],
connection: Option<&mut SqliteConnection>, connection: Option<&mut SqliteConnection>,
) -> Result<Option<StoredDocumentVersion>> { ) -> Result<Option<StoredDocumentVersion>, SyncServerError> {
let query = sqlx::query_as!( let query = sqlx::query_as!(
StoredDocumentVersion, StoredDocumentVersion,
r#" r#"
@ -646,10 +697,16 @@ impl Database {
query.fetch_optional(&mut *conn).await query.fetch_optional(&mut *conn).await
} else { } else {
query query
.fetch_optional(&self.get_connection_pool(vault).await?) .fetch_optional(
&self
.get_connection_pool(vault)
.await
.map_err(database_error)?,
)
.await .await
} }
.context("Cannot fetch lost-create candidate") .context("Cannot fetch lost-create candidate")
.map_err(database_error)
} }
pub async fn get_latest_document( pub async fn get_latest_document(
@ -657,7 +714,7 @@ impl Database {
vault: &VaultId, vault: &VaultId,
document_id: &DocumentId, document_id: &DocumentId,
connection: Option<&mut SqliteConnection>, connection: Option<&mut SqliteConnection>,
) -> Result<Option<StoredDocumentVersion>> { ) -> Result<Option<StoredDocumentVersion>, SyncServerError> {
let document_id = document_id.as_hyphenated(); let document_id = document_id.as_hyphenated();
let query = sqlx::query_as!( let query = sqlx::query_as!(
StoredDocumentVersion, StoredDocumentVersion,
@ -683,10 +740,16 @@ impl Database {
query.fetch_optional(&mut *conn).await query.fetch_optional(&mut *conn).await
} else { } else {
query query
.fetch_optional(&self.get_connection_pool(vault).await?) .fetch_optional(
&self
.get_connection_pool(vault)
.await
.map_err(database_error)?,
)
.await .await
} }
.context("Cannot fetch latest document version") .context("Cannot fetch latest document version")
.map_err(database_error)
} }
pub async fn get_document_version( pub async fn get_document_version(
@ -694,7 +757,7 @@ impl Database {
vault: &VaultId, vault: &VaultId,
vault_update_id: VaultUpdateId, vault_update_id: VaultUpdateId,
connection: Option<&mut SqliteConnection>, connection: Option<&mut SqliteConnection>,
) -> Result<Option<StoredDocumentVersion>> { ) -> Result<Option<StoredDocumentVersion>, SyncServerError> {
let query = sqlx::query_as!( let query = sqlx::query_as!(
StoredDocumentVersion, StoredDocumentVersion,
r#" r#"
@ -718,10 +781,16 @@ impl Database {
query.fetch_optional(&mut *conn).await query.fetch_optional(&mut *conn).await
} else { } else {
query query
.fetch_optional(&self.get_connection_pool(vault).await?) .fetch_optional(
&self
.get_connection_pool(vault)
.await
.map_err(database_error)?,
)
.await .await
} }
.context("Cannot fetch document version") .context("Cannot fetch document version")
.map_err(database_error)
} }
// inserting the document must be the last step of the transaction // inserting the document must be the last step of the transaction
@ -730,7 +799,7 @@ impl Database {
vault_id: &VaultId, vault_id: &VaultId,
version: &StoredDocumentVersion, version: &StoredDocumentVersion,
mut transaction: WriteTransaction, mut transaction: WriteTransaction,
) -> Result<()> { ) -> Result<(), SyncServerError> {
let document_id = version.document_id.as_hyphenated(); let document_id = version.document_id.as_hyphenated();
let query = sqlx::query!( let query = sqlx::query!(
r#" r#"
@ -766,14 +835,12 @@ impl Database {
let _send_guard = self.broadcasts.acquire_send_lock(vault_id).await; let _send_guard = self.broadcasts.acquire_send_lock(vault_id).await;
query query
.execute(transaction.connection_mut()?) .execute(transaction.connection_mut().map_err(server_error)?)
.await .await
.context("Cannot insert document version")?; .context("Cannot insert document version")
.map_err(database_error)?;
transaction transaction.commit().await?;
.commit()
.await
.context("Failed to commit transaction")?;
// Broadcast every commit to every connected client, including // Broadcast every commit to every connected client, including
// the originator. The HTTP response is the originator's normal // the originator. The HTTP response is the originator's normal

View file

@ -65,13 +65,11 @@ pub async fn get_unseen_documents(
.database .database
.get_latest_documents_since(vault_id, update_id, Some(up_to_vault_update_id), None) .get_latest_documents_since(vault_id, update_id, Some(up_to_vault_update_id), None)
.await .await
.map_err(server_error)
} else { } else {
state state
.database .database
.get_latest_documents(vault_id, Some(up_to_vault_update_id), None) .get_latest_documents(vault_id, Some(up_to_vault_update_id), None)
.await .await
.map_err(server_error)
} }
} }

View file

@ -21,6 +21,10 @@ pub const DEFAULT_MAX_PENDING_WS_CONNECTIONS: usize = 128;
pub const DEFAULT_LOG_DIRECTORY: &str = "logs"; pub const DEFAULT_LOG_DIRECTORY: &str = "logs";
pub const DEFAULT_LOG_ROTATION_INTERVAL: Duration = Duration::from_hours(24); pub const DEFAULT_LOG_ROTATION_INTERVAL: Duration = Duration::from_hours(24);
pub const IDLE_POOL_TIMEOUT: Duration = Duration::from_mins(5); pub const IDLE_POOL_TIMEOUT: Duration = Duration::from_mins(5);
/// Fail fast on pool acquire so a transiently locked database surfaces as
/// a 429 in seconds, not after a 30s busy_timeout. Callers retry.
pub const POOL_ACQUIRE_TIMEOUT: Duration = Duration::from_secs(5);
pub const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10); pub const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10); pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);

View file

@ -163,15 +163,26 @@ pub fn too_many_requests_error(error: anyhow::Error) -> SyncServerError {
SyncServerError::TooManyRequests(error) SyncServerError::TooManyRequests(error)
} }
/// Maps a `create_write_transaction` error to 429 if the database is busy, /// Maps a database-operation error to 429 if the database is busy or the
/// or 500 for all other failures. /// pool acquire timed out (both retryable), or 500 for all other failures.
pub fn write_transaction_error(error: anyhow::Error) -> SyncServerError { pub fn database_error(error: anyhow::Error) -> SyncServerError {
if error if is_database_busy(&error) {
.downcast_ref::<crate::app_state::database::WriteBusyError>()
.is_some()
{
too_many_requests_error(error) too_many_requests_error(error)
} else { } else {
server_error(error) server_error(error)
} }
} }
fn is_database_busy(error: &anyhow::Error) -> bool {
if error
.downcast_ref::<crate::app_state::database::WriteBusyError>()
.is_some()
{
return true;
}
error.chain().any(|cause| {
cause
.downcast_ref::<sqlx::Error>()
.is_some_and(crate::app_state::database::is_sqlite_busy_error)
})
}

View file

@ -14,7 +14,7 @@ use crate::{
database::models::{StoredDocumentVersion, VaultId}, database::models::{StoredDocumentVersion, VaultId},
}, },
config::user_config::User, config::user_config::User,
errors::{SyncServerError, client_error, server_error, write_transaction_error}, errors::{SyncServerError, client_error, server_error},
server::{responses::DocumentUpdateResponse, update_document}, server::{responses::DocumentUpdateResponse, update_document},
utils::{ utils::{
find_first_available_path::find_first_available_path, is_binary::is_binary, find_first_available_path::find_first_available_path, is_binary::is_binary,
@ -49,8 +49,7 @@ pub async fn create_document(
let mut transaction = state let mut transaction = state
.database .database
.create_write_transaction(&vault_id) .create_write_transaction(&vault_id)
.await .await?;
.map_err(write_transaction_error)?;
let sanitized_relative_path = sanitize_path(&request.relative_path).map_err(client_error)?; let sanitized_relative_path = sanitize_path(&request.relative_path).map_err(client_error)?;
let new_content = request.content.contents.to_vec(); let new_content = request.content.contents.to_vec();
@ -62,8 +61,7 @@ pub async fn create_document(
&sanitized_relative_path, &sanitized_relative_path,
Some(transaction.connection_mut().map_err(server_error)?), Some(transaction.connection_mut().map_err(server_error)?),
) )
.await .await?;
.map_err(server_error)?;
if let Some(latest_version) = latest_version { if let Some(latest_version) = latest_version {
// Only merge with an existing document the client couldn't have // Only merge with an existing document the client couldn't have
@ -131,8 +129,7 @@ pub async fn create_document(
&new_content, &new_content,
Some(transaction.connection_mut().map_err(server_error)?), Some(transaction.connection_mut().map_err(server_error)?),
) )
.await .await?
.map_err(server_error)?
{ {
info!( info!(
"Lost-create recovery: binding retry at `{sanitized_relative_path}` to existing doc {} (was at `{}`) in vault `{vault_id}` for device `{}`", "Lost-create recovery: binding retry at `{sanitized_relative_path}` to existing doc {} (was at `{}`) in vault `{vault_id}` for device `{}`",
@ -161,8 +158,7 @@ pub async fn create_document(
&vault_id, &vault_id,
Some(transaction.connection_mut().map_err(server_error)?), Some(transaction.connection_mut().map_err(server_error)?),
) )
.await .await?;
.map_err(server_error)?;
let deduped_path = find_first_available_path( let deduped_path = find_first_available_path(
&vault_id, &vault_id,
@ -170,8 +166,7 @@ pub async fn create_document(
&state.database, &state.database,
&mut transaction, &mut transaction,
) )
.await .await?;
.map_err(server_error)?;
if deduped_path != sanitized_relative_path { if deduped_path != sanitized_relative_path {
info!( info!(
@ -198,8 +193,7 @@ pub async fn create_document(
state state
.database .database
.insert_document_version(&vault_id, &new_version, transaction) .insert_document_version(&vault_id, &new_version, transaction)
.await .await?;
.map_err(server_error)?;
Ok(Json(DocumentUpdateResponse::FastForwardUpdate( Ok(Json(DocumentUpdateResponse::FastForwardUpdate(
new_version.into(), new_version.into(),

View file

@ -1,4 +1,4 @@
use anyhow::{Context, anyhow}; use anyhow::anyhow;
use axum::{ use axum::{
Extension, Json, Extension, Json,
extract::{Path, State}, extract::{Path, State},
@ -16,7 +16,7 @@ use crate::{
}, },
}, },
config::user_config::User, config::user_config::User,
errors::{SyncServerError, not_found_error, server_error, write_transaction_error}, errors::{SyncServerError, not_found_error, server_error},
utils::normalize::normalize, utils::normalize::normalize,
}; };
@ -43,8 +43,7 @@ pub async fn delete_document(
let mut transaction = state let mut transaction = state
.database .database
.create_write_transaction(&vault_id) .create_write_transaction(&vault_id)
.await .await?;
.map_err(write_transaction_error)?;
let last_update_id = state let last_update_id = state
.database .database
@ -52,8 +51,7 @@ pub async fn delete_document(
&vault_id, &vault_id,
Some(transaction.connection_mut().map_err(server_error)?), Some(transaction.connection_mut().map_err(server_error)?),
) )
.await .await?;
.map_err(server_error)?;
let latest_version = state let latest_version = state
.database .database
@ -62,26 +60,17 @@ pub async fn delete_document(
&document_id, &document_id,
Some(transaction.connection_mut().map_err(server_error)?), Some(transaction.connection_mut().map_err(server_error)?),
) )
.await .await?;
.map_err(server_error)?;
let Some(latest_version) = latest_version else { let Some(latest_version) = latest_version else {
transaction transaction.rollback().await?;
.rollback()
.await
.context("Failed to roll back transaction")
.map_err(server_error)?;
return Err(not_found_error(anyhow!( return Err(not_found_error(anyhow!(
"Document `{document_id}` not found in vault `{vault_id}`" "Document `{document_id}` not found in vault `{vault_id}`"
))); )));
}; };
if latest_version.is_deleted { if latest_version.is_deleted {
transaction transaction.rollback().await?;
.rollback()
.await
.context("Failed to roll back transaction")
.map_err(server_error)?;
info!("Document `{document_id}` has already been deleted",); info!("Document `{document_id}` has already been deleted",);
return Ok(Json(latest_version.into())); return Ok(Json(latest_version.into()));
@ -110,8 +99,7 @@ pub async fn delete_document(
state state
.database .database
.insert_document_version(&vault_id, &new_version, transaction) .insert_document_version(&vault_id, &new_version, transaction)
.await .await?;
.map_err(server_error)?;
Ok(Json(new_version.into())) Ok(Json(new_version.into()))
} }

View file

@ -11,7 +11,7 @@ use crate::{
AppState, AppState,
database::models::{DocumentId, DocumentVersion, VaultId, VaultUpdateId}, database::models::{DocumentId, DocumentVersion, VaultId, VaultUpdateId},
}, },
errors::{SyncServerError, not_found_error, server_error}, errors::{SyncServerError, not_found_error},
utils::normalize::normalize, utils::normalize::normalize,
}; };
@ -40,8 +40,7 @@ pub async fn fetch_document_version(
let result = state let result = state
.database .database
.get_document_version(&vault_id, vault_update_id, None) .get_document_version(&vault_id, vault_update_id, None)
.await .await?
.map_err(server_error)?
.map_or_else( .map_or_else(
|| { || {
Err(not_found_error(anyhow!( Err(not_found_error(anyhow!(

View file

@ -11,7 +11,7 @@ use crate::{
AppState, AppState,
database::models::{DocumentId, VaultId, VaultUpdateId}, database::models::{DocumentId, VaultId, VaultUpdateId},
}, },
errors::{SyncServerError, not_found_error, server_error}, errors::{SyncServerError, not_found_error},
utils::normalize::normalize, utils::normalize::normalize,
}; };
@ -40,8 +40,7 @@ pub async fn fetch_document_version_content(
let result = state let result = state
.database .database
.get_document_version(&vault_id, vault_update_id, None) .get_document_version(&vault_id, vault_update_id, None)
.await .await?
.map_err(server_error)?
.map_or_else( .map_or_else(
|| { || {
Err(not_found_error(anyhow!( Err(not_found_error(anyhow!(

View file

@ -22,9 +22,7 @@ use crate::{
}, },
}, },
config::user_config::User, config::user_config::User,
errors::{ errors::{SyncServerError, client_error, not_found_error, server_error},
SyncServerError, client_error, not_found_error, server_error, write_transaction_error,
},
server::requests::UpdateBinaryDocumentVersion, server::requests::UpdateBinaryDocumentVersion,
utils::{ utils::{
find_first_available_path::find_first_available_path, is_binary::as_non_binary_text, find_first_available_path::find_first_available_path, is_binary::as_non_binary_text,
@ -58,8 +56,7 @@ pub async fn update_binary(
let transaction = state let transaction = state
.database .database
.create_write_transaction(&vault_id) .create_write_transaction(&vault_id)
.await .await?;
.map_err(write_transaction_error)?;
update_document( update_document(
&parent_document.relative_path, &parent_document.relative_path,
@ -104,8 +101,7 @@ pub async fn update_text(
let transaction = state let transaction = state
.database .database
.create_write_transaction(&vault_id) .create_write_transaction(&vault_id)
.await .await?;
.map_err(write_transaction_error)?;
update_document( update_document(
&parent_document.relative_path, &parent_document.relative_path,
@ -131,8 +127,7 @@ async fn get_parent_document(
let parent = state let parent = state
.database .database
.get_document_version(vault_id, parent_version_id, None) .get_document_version(vault_id, parent_version_id, None)
.await .await?
.map_err(server_error)?
.map_or_else( .map_or_else(
|| { || {
Err(not_found_error(anyhow!( Err(not_found_error(anyhow!(
@ -177,8 +172,7 @@ pub async fn update_document(
&vault_id, &vault_id,
Some(transaction.connection_mut().map_err(server_error)?), Some(transaction.connection_mut().map_err(server_error)?),
) )
.await .await?;
.map_err(server_error)?;
let latest_version = state let latest_version = state
.database .database
@ -187,8 +181,7 @@ pub async fn update_document(
&document_id, &document_id,
Some(transaction.connection_mut().map_err(server_error)?), Some(transaction.connection_mut().map_err(server_error)?),
) )
.await .await?
.map_err(server_error)?
.map_or_else( .map_or_else(
|| { || {
Err(not_found_error(anyhow!( Err(not_found_error(anyhow!(
@ -199,11 +192,7 @@ pub async fn update_document(
)?; )?;
if latest_version.is_deleted { if latest_version.is_deleted {
transaction transaction.rollback().await?;
.rollback()
.await
.context("Failed to roll back transaction")
.map_err(server_error)?;
info!("Document `{document_id}` has been deleted, ignoring update to it",); info!("Document `{document_id}` has been deleted, ignoring update to it",);
return Ok(Json(DocumentUpdateResponse::FastForwardUpdate( return Ok(Json(DocumentUpdateResponse::FastForwardUpdate(
@ -221,11 +210,7 @@ pub async fn update_document(
info!( info!(
"Document content is the same as the latest version for `{document_id}`, skipping update" "Document content is the same as the latest version for `{document_id}`, skipping update"
); );
transaction transaction.rollback().await?;
.rollback()
.await
.context("Failed to roll back transaction")
.map_err(server_error)?;
return Ok(Json(DocumentUpdateResponse::FastForwardUpdate( return Ok(Json(DocumentUpdateResponse::FastForwardUpdate(
latest_version.into(), latest_version.into(),
@ -289,8 +274,7 @@ pub async fn update_document(
{ {
let new_path = let new_path =
find_first_available_path(&vault_id, requested, &state.database, &mut transaction) find_first_available_path(&vault_id, requested, &state.database, &mut transaction)
.await .await?;
.map_err(server_error)?;
if new_path != requested { if new_path != requested {
info!( info!(
@ -321,8 +305,7 @@ pub async fn update_document(
state state
.database .database
.insert_document_version(&vault_id, &new_version, transaction) .insert_document_version(&vault_id, &new_version, transaction)
.await .await?;
.map_err(server_error)?;
Ok(Json(if is_same_as_request { Ok(Json(if is_same_as_request {
DocumentUpdateResponse::FastForwardUpdate(new_version.into()) DocumentUpdateResponse::FastForwardUpdate(new_version.into())

View file

@ -162,8 +162,7 @@ async fn websocket(
let cursor = state let cursor = state
.database .database
.get_max_update_id_in_vault(&vault_id, None) .get_max_update_id_in_vault(&vault_id, None)
.await .await?;
.map_err(server_error)?;
drop(send_guard); drop(send_guard);
// Catch-up on versions committed while this client was offline, // Catch-up on versions committed while this client was offline,

View file

@ -1,6 +1,7 @@
use crate::app_state::database::{WriteTransaction, models::VaultId}; use crate::app_state::database::{WriteTransaction, models::VaultId};
use crate::errors::{SyncServerError, server_error};
use crate::utils::dedup_paths::dedup_paths; use crate::utils::dedup_paths::dedup_paths;
use anyhow::{Result, anyhow}; use anyhow::anyhow;
use log::{debug, info}; use log::{debug, info};
pub async fn find_first_available_path( pub async fn find_first_available_path(
@ -8,7 +9,7 @@ pub async fn find_first_available_path(
sanitized_relative_path: &str, sanitized_relative_path: &str,
database: &crate::app_state::database::Database, database: &crate::app_state::database::Database,
transaction: &mut WriteTransaction, transaction: &mut WriteTransaction,
) -> Result<String> { ) -> Result<String, SyncServerError> {
info!("Finding first available path for `{sanitized_relative_path}` in vault `{vault_id}`"); info!("Finding first available path for `{sanitized_relative_path}` in vault `{vault_id}`");
for candidate in dedup_paths(sanitized_relative_path) { for candidate in dedup_paths(sanitized_relative_path) {
debug!("Checking candidate path for deconflicting names: `{candidate}`"); debug!("Checking candidate path for deconflicting names: `{candidate}`");
@ -16,7 +17,7 @@ pub async fn find_first_available_path(
.get_latest_non_deleted_document_by_path( .get_latest_non_deleted_document_by_path(
vault_id, vault_id,
&candidate, &candidate,
Some(transaction.connection_mut()?), Some(transaction.connection_mut().map_err(server_error)?),
) )
.await? .await?
.is_none() .is_none()
@ -30,7 +31,7 @@ pub async fn find_first_available_path(
); );
} }
Err(anyhow!( Err(server_error(anyhow!(
"No available path candidates produced for `{sanitized_relative_path}` in vault `{vault_id}`" "No available path candidates produced for `{sanitized_relative_path}` in vault `{vault_id}`"
)) )))
} }