Improve logging
This commit is contained in:
parent
4456767ec4
commit
84f077f36b
16 changed files with 90 additions and 34 deletions
|
|
@ -50,7 +50,7 @@ impl Database {
|
|||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to create databases directory: {}",
|
||||
"Failed to create databases directory at `{}`",
|
||||
config.databases_directory_path.to_string_lossy()
|
||||
)
|
||||
})?;
|
||||
|
|
@ -110,7 +110,7 @@ impl Database {
|
|||
.test_before_acquire(true)
|
||||
.connect_with(connection_options)
|
||||
.await
|
||||
.with_context(|| format!("Cannot open database at {}", file_name.display()))?;
|
||||
.with_context(|| format!("Cannot open database at `{}`", file_name.display()))?;
|
||||
|
||||
Self::run_migrations(&pool).await?;
|
||||
|
||||
|
|
@ -254,7 +254,7 @@ impl Database {
|
|||
.await
|
||||
}
|
||||
.with_context(|| {
|
||||
format!("Cannot fetch latest documents since vault_update_id {vault_update_id}")
|
||||
format!("Cannot fetch latest documents since vault_update_id `{vault_update_id}`")
|
||||
})
|
||||
.map(|rows| {
|
||||
rows.into_iter()
|
||||
|
|
@ -489,10 +489,7 @@ impl Database {
|
|||
// Close and remove idle pools
|
||||
for vault_id in &vaults_to_remove {
|
||||
if let Some(pool_with_timestamp) = pools.remove(vault_id) {
|
||||
info!(
|
||||
"Closing idle database connection pool for vault {}",
|
||||
vault_id
|
||||
);
|
||||
info!("Closing idle database connection pool for vault `{vault_id}`");
|
||||
pool_with_timestamp.pool.close().await;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use anyhow::Context;
|
||||
use log::warn;
|
||||
use tokio::sync::{Mutex, broadcast};
|
||||
|
||||
use super::models::WebSocketServerMessageWithOrigin;
|
||||
|
|
@ -32,7 +33,7 @@ impl Broadcasts {
|
|||
}
|
||||
|
||||
/// Notify all clients (who are subscribed to the vault) about an update.
|
||||
/// We only log failures.
|
||||
/// We only log failures and don't propagate them.
|
||||
pub async fn send_document_update(
|
||||
&self,
|
||||
vault: VaultId,
|
||||
|
|
@ -46,7 +47,7 @@ impl Broadcasts {
|
|||
.map_err(server_error);
|
||||
|
||||
if result.is_err() {
|
||||
log::debug!("Failed to send message: {result:?}");
|
||||
warn!("Failed to send message: {result:?}");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ impl Config {
|
|||
pub async fn read_or_create(path: &Path) -> Result<Self> {
|
||||
let config = if path.exists() {
|
||||
info!(
|
||||
"Loading configuration from '{}'",
|
||||
"Loading configuration from `{}`",
|
||||
path.canonicalize().unwrap().display()
|
||||
);
|
||||
Self::load_from_file(path).await?
|
||||
|
|
@ -40,7 +40,7 @@ impl Config {
|
|||
|
||||
config.write(path).await?;
|
||||
info!(
|
||||
"Updated configuration at '{}'",
|
||||
"Updated configuration at `{}`",
|
||||
path.canonicalize().unwrap().display()
|
||||
);
|
||||
|
||||
|
|
@ -50,14 +50,12 @@ impl Config {
|
|||
pub async fn load_from_file(path: &Path) -> Result<Self> {
|
||||
let contents = fs::read_to_string(path).await.with_context(|| {
|
||||
format!(
|
||||
"Cannot load configuration from disk from {}",
|
||||
"Cannot load configuration from disk from `{}`",
|
||||
path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
let config = serde_yaml::from_str(&contents).context("Failed to parse configuration")?;
|
||||
|
||||
Ok(config)
|
||||
serde_yaml::from_str(&contents).context("Failed to parse configuration")
|
||||
}
|
||||
|
||||
pub async fn write(&self, path: &Path) -> Result<()> {
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ impl Default for LoggingConfig {
|
|||
}
|
||||
|
||||
fn default_log_directory() -> String {
|
||||
debug!("Using default log directory: {DEFAULT_LOG_DIRECTORY}");
|
||||
debug!("Using default log directory: `{DEFAULT_LOG_DIRECTORY}`");
|
||||
DEFAULT_LOG_DIRECTORY.to_owned()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ fn default_port() -> u16 {
|
|||
}
|
||||
|
||||
fn default_max_body_size_mb() -> usize {
|
||||
debug!("Using default max body size (MB): {DEFAULT_MAX_BODY_SIZE_MB}");
|
||||
debug!("Using default max body size {DEFAULT_MAX_BODY_SIZE_MB} MB");
|
||||
DEFAULT_MAX_BODY_SIZE_MB
|
||||
}
|
||||
|
||||
|
|
@ -48,7 +48,7 @@ fn default_max_clients_per_vault() -> usize {
|
|||
}
|
||||
|
||||
fn default_response_timeout_seconds() -> u64 {
|
||||
debug!("Using default response timeout (seconds): {DEFAULT_RESPONSE_TIMEOUT_SECONDS}");
|
||||
debug!("Using default response timeout: {DEFAULT_RESPONSE_TIMEOUT_SECONDS} seconds");
|
||||
DEFAULT_RESPONSE_TIMEOUT_SECONDS
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ where
|
|||
for user in &users {
|
||||
if let Some(existing_name) = user_token_map.get_by_right(&user.token) {
|
||||
return Err(D::Error::custom(format!(
|
||||
"Duplicate user token found: '{}' for users '{}' and '{}'. User tokens must be \
|
||||
"Duplicate user token found: `{}` for users `{}` and `{}`. User tokens must be \
|
||||
unique.",
|
||||
user.token, existing_name, user.name
|
||||
)));
|
||||
|
|
@ -28,7 +28,7 @@ where
|
|||
|
||||
if user_token_map.contains_left(&user.name) {
|
||||
return Err(D::Error::custom(format!(
|
||||
"Duplicate user name found: '{}'. User names must be unique.",
|
||||
"Duplicate user name found: `{}`. User names must be unique.",
|
||||
user.name
|
||||
)));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,14 +52,14 @@ pub fn auth(state: &AppState, token: &str, vault_id: &VaultId) -> Result<User, S
|
|||
VaultAccess::AllowList(AllowListedVaults { ref allowed }) => allowed.contains(vault_id),
|
||||
} {
|
||||
info!(
|
||||
"User '{}' is authenticated and is authorised to access to vault '{vault_id}'",
|
||||
"User `{}` is authenticated and is authorised to access to vault `{vault_id}`",
|
||||
user.name
|
||||
);
|
||||
|
||||
Ok(user)
|
||||
} else {
|
||||
info!(
|
||||
"User '{}' is authenticated but is not authorised to access vault '{vault_id}'",
|
||||
"User `{}` is authenticated but is not authorised to access vault `{vault_id}`",
|
||||
user.name
|
||||
);
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ use axum::{
|
|||
};
|
||||
use axum_extra::TypedHeader;
|
||||
use axum_typed_multipart::TypedMultipart;
|
||||
use log::{debug, info};
|
||||
use serde::Deserialize;
|
||||
|
||||
use super::{device_id_header::DeviceIdHeader, requests::CreateDocumentVersion};
|
||||
|
|
@ -37,6 +38,8 @@ pub async fn create_document(
|
|||
State(state): State<AppState>,
|
||||
TypedMultipart(request): TypedMultipart<CreateDocumentVersion>,
|
||||
) -> Result<Json<DocumentVersionWithoutContent>, SyncServerError> {
|
||||
debug!("Creating document in vault `{vault_id}`");
|
||||
|
||||
let mut transaction = state
|
||||
.database
|
||||
.create_write_transaction(&vault_id)
|
||||
|
|
@ -53,7 +56,7 @@ pub async fn create_document(
|
|||
|
||||
if existing_version.is_some() {
|
||||
return Err(client_error(anyhow::anyhow!(
|
||||
"Document with the same ID already exists"
|
||||
"Document with the same ID `{document_id}` already exists"
|
||||
)));
|
||||
}
|
||||
|
||||
|
|
@ -78,6 +81,12 @@ pub async fn create_document(
|
|||
.await
|
||||
.map_err(server_error)?;
|
||||
|
||||
if deduped_path != sanitized_relative_path {
|
||||
info!(
|
||||
"Document already exists at new location: `{sanitized_relative_path}` when trying to create it in vault `{vault_id}`, deconflicting by creating at `{deduped_path}`"
|
||||
);
|
||||
}
|
||||
|
||||
let new_version = StoredDocumentVersion {
|
||||
vault_update_id: last_update_id + 1,
|
||||
document_id,
|
||||
|
|
|
|||
|
|
@ -1,8 +1,10 @@
|
|||
use anyhow::Context;
|
||||
use axum::{
|
||||
Extension, Json,
|
||||
extract::{Path, State},
|
||||
};
|
||||
use axum_extra::TypedHeader;
|
||||
use log::{debug, info};
|
||||
use serde::Deserialize;
|
||||
|
||||
use super::{device_id_header::DeviceIdHeader, requests::DeleteDocumentVersion};
|
||||
|
|
@ -37,6 +39,8 @@ pub async fn delete_document(
|
|||
State(state): State<AppState>,
|
||||
Json(request): Json<DeleteDocumentVersion>,
|
||||
) -> Result<Json<DocumentVersionWithoutContent>, SyncServerError> {
|
||||
debug!("Deleting document `{document_id}` in vault `{vault_id}`");
|
||||
|
||||
let mut transaction = state
|
||||
.database
|
||||
.create_write_transaction(&vault_id)
|
||||
|
|
@ -49,12 +53,26 @@ pub async fn delete_document(
|
|||
.await
|
||||
.map_err(server_error)?;
|
||||
|
||||
let latest_content = state
|
||||
let latest_version = state
|
||||
.database
|
||||
.get_latest_document(&vault_id, &document_id, Some(&mut transaction))
|
||||
.await
|
||||
.map_err(server_error)?
|
||||
.map_or_else(Vec::new, |version| version.content); // in case the document has never existed before deleting it
|
||||
.map_err(server_error)?;
|
||||
|
||||
if let Some(latest_version) = &latest_version
|
||||
&& latest_version.is_deleted
|
||||
{
|
||||
transaction
|
||||
.rollback()
|
||||
.await
|
||||
.context("Failed to roll back transaction")
|
||||
.map_err(server_error)?;
|
||||
|
||||
info!("Document `{document_id}` has already been deleted",);
|
||||
return Ok(Json(latest_version.clone().into()));
|
||||
}
|
||||
|
||||
let latest_content = latest_version.map_or_else(Vec::new, |version| version.content); // in case the document has never existed before deleting it
|
||||
|
||||
let new_version = StoredDocumentVersion {
|
||||
vault_update_id: last_update_id + 1,
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ use axum::{
|
|||
Json,
|
||||
extract::{Path, State},
|
||||
};
|
||||
use log::debug;
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::{
|
||||
|
|
@ -32,6 +33,10 @@ pub async fn fetch_document_version(
|
|||
}): Path<FetchDocumentVersionPathParams>,
|
||||
State(state): State<AppState>,
|
||||
) -> Result<Json<DocumentVersion>, SyncServerError> {
|
||||
debug!(
|
||||
"Fetching document version `{vault_update_id}` for document `{document_id}` in vault `{vault_id}`"
|
||||
);
|
||||
|
||||
let result = state
|
||||
.database
|
||||
.get_document_version(&vault_id, vault_update_id, None)
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ use axum::{
|
|||
body::Bytes,
|
||||
extract::{Path, State},
|
||||
};
|
||||
use log::debug;
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::{
|
||||
|
|
@ -32,6 +33,10 @@ pub async fn fetch_document_version_content(
|
|||
}): Path<FetchDocumentVersionContentPathParams>,
|
||||
State(state): State<AppState>,
|
||||
) -> Result<Bytes, SyncServerError> {
|
||||
debug!(
|
||||
"Fetching document version `{vault_update_id}` for document `{document_id}` in vault `{vault_id}`"
|
||||
);
|
||||
|
||||
let result = state
|
||||
.database
|
||||
.get_document_version(&vault_id, vault_update_id, None)
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ use axum::{
|
|||
Json,
|
||||
extract::{Path, State},
|
||||
};
|
||||
use log::debug;
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::{
|
||||
|
|
@ -30,6 +31,8 @@ pub async fn fetch_latest_document_version(
|
|||
}): Path<FetchLatestDocumentVersionPathParams>,
|
||||
State(state): State<AppState>,
|
||||
) -> Result<Json<DocumentVersion>, SyncServerError> {
|
||||
debug!("Fetching latest document version for document `{document_id}` in vault `{vault_id}`");
|
||||
|
||||
let latest_version = state
|
||||
.database
|
||||
.get_latest_document(&vault_id, &document_id, None)
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ use axum::{
|
|||
Json,
|
||||
extract::{Path, Query, State},
|
||||
};
|
||||
use log::debug;
|
||||
use serde::Deserialize;
|
||||
|
||||
use super::responses::FetchLatestDocumentsResponse;
|
||||
|
|
@ -31,6 +32,8 @@ pub async fn fetch_latest_documents(
|
|||
Query(QueryParams { since_update_id }): Query<QueryParams>,
|
||||
State(state): State<AppState>,
|
||||
) -> Result<Json<FetchLatestDocumentsResponse>, SyncServerError> {
|
||||
debug!("Fetching latest documents in vault `{vault_id}` since update ID `{since_update_id:?}`");
|
||||
|
||||
let documents = if let Some(since_update_id) = since_update_id {
|
||||
state
|
||||
.database
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ use axum_extra::{
|
|||
TypedHeader,
|
||||
headers::{Authorization, authorization::Bearer},
|
||||
};
|
||||
use log::debug;
|
||||
use serde::Deserialize;
|
||||
|
||||
use super::{auth::auth, responses::PingResponse};
|
||||
|
|
@ -28,6 +29,8 @@ pub async fn ping(
|
|||
Path(PingPathParams { vault_id }): Path<PingPathParams>,
|
||||
State(state): State<AppState>,
|
||||
) -> Result<Json<PingResponse>, SyncServerError> {
|
||||
debug!("Pinging vault `{vault_id}`");
|
||||
|
||||
let is_authenticated = maybe_auth_header
|
||||
.is_some_and(|auth_header| auth(&state, auth_header.token(), &vault_id).is_ok());
|
||||
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ use axum::{
|
|||
};
|
||||
use axum_extra::TypedHeader;
|
||||
use axum_typed_multipart::TypedMultipart;
|
||||
use log::info;
|
||||
use log::{debug, info};
|
||||
use reconcile_text::{BuiltinTokenizer, EditedText, reconcile};
|
||||
use serde::Deserialize;
|
||||
|
||||
|
|
@ -129,6 +129,8 @@ async fn update_document(
|
|||
relative_path: &str,
|
||||
content: Vec<u8>,
|
||||
) -> Result<Json<DocumentUpdateResponse>, SyncServerError> {
|
||||
debug!("Updating document `{document_id}` in vault `{vault_id}`");
|
||||
|
||||
let sanitized_relative_path = sanitize_path(relative_path);
|
||||
|
||||
let mut transaction = state
|
||||
|
|
@ -164,6 +166,7 @@ async fn update_document(
|
|||
.context("Failed to roll back transaction")
|
||||
.map_err(server_error)?;
|
||||
|
||||
info!("Document `{document_id}` has been deleted, ignoring update to it",);
|
||||
return Ok(Json(DocumentUpdateResponse::FastForwardUpdate(
|
||||
latest_version.into(),
|
||||
)));
|
||||
|
|
@ -173,7 +176,9 @@ async fn update_document(
|
|||
// version
|
||||
if content == latest_version.content && sanitized_relative_path == latest_version.relative_path
|
||||
{
|
||||
info!("Document content is the same as the latest version, skipping update");
|
||||
info!(
|
||||
"Document content is the same as the latest version for `{document_id}`, skipping update"
|
||||
);
|
||||
transaction
|
||||
.rollback()
|
||||
.await
|
||||
|
|
@ -193,6 +198,7 @@ async fn update_document(
|
|||
&& !is_binary(&content);
|
||||
|
||||
let merged_content = if are_all_participants_mergable {
|
||||
info!("Merging changes for document `{document_id}` in vault `{vault_id}`");
|
||||
reconcile(
|
||||
str::from_utf8(&parent_document.content)
|
||||
.expect("parent must be valid UTF-8 because it's not binary"),
|
||||
|
|
@ -217,14 +223,22 @@ async fn update_document(
|
|||
let new_relative_path = if parent_document.relative_path == latest_version.relative_path
|
||||
&& latest_version.relative_path != sanitized_relative_path
|
||||
{
|
||||
find_first_available_path(
|
||||
let new_path = find_first_available_path(
|
||||
&vault_id,
|
||||
&sanitized_relative_path,
|
||||
&state.database,
|
||||
&mut transaction,
|
||||
)
|
||||
.await
|
||||
.map_err(server_error)?
|
||||
.map_err(server_error)?;
|
||||
|
||||
if new_path != sanitized_relative_path {
|
||||
info!(
|
||||
"Document already exists at new location: `{sanitized_relative_path}` when trying to update it in vault `{vault_id}`, deconflicting by creating at `{new_path}`"
|
||||
);
|
||||
}
|
||||
|
||||
new_path
|
||||
} else {
|
||||
latest_version.relative_path.clone()
|
||||
};
|
||||
|
|
|
|||
|
|
@ -43,12 +43,12 @@ pub async fn websocket_handler(
|
|||
}
|
||||
|
||||
async fn websocket_wrapped(state: AppState, stream: WebSocket, vault_id: VaultId) {
|
||||
info!("WebSocket connection opened on vault '{vault_id}'");
|
||||
info!("WebSocket connection opened on vault `{vault_id}`");
|
||||
|
||||
let result = websocket(state, stream, vault_id.clone()).await;
|
||||
|
||||
if let Err(err) = result {
|
||||
debug!("WebSocket connection error on vault '{vault_id}': {err}");
|
||||
debug!("WebSocket connection error on vault `{vault_id}`: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -71,7 +71,7 @@ async fn websocket(
|
|||
)?;
|
||||
|
||||
info!(
|
||||
"WebSocket handshake successful for vault '{vault_id}' for '{}'",
|
||||
"WebSocket handshake successful for vault `{vault_id}` for `{}`",
|
||||
authed_handshake.handshake.device_id
|
||||
);
|
||||
|
||||
|
|
@ -184,7 +184,7 @@ async fn websocket(
|
|||
|
||||
if result.is_err() {
|
||||
info!(
|
||||
"WebSocket disconnected on vault '{vault_id}' for '{}'",
|
||||
"WebSocket disconnected on vault `{vault_id}` for `{}`",
|
||||
authed_handshake.handshake.device_id
|
||||
);
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue