fmt
This commit is contained in:
parent
d715d94b6d
commit
a7b588da97
14 changed files with 48 additions and 48 deletions
|
|
@ -1,3 +1,3 @@
|
||||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||||
|
|
||||||
export type WebSocketVaultPathChange = { vaultUpdateId: number, documentId: string, relativePath: string, updatedDate: string, userId: string, deviceId: string, };
|
export type WebSocketVaultPathChange = { vaultUpdateId: number, documentId: string, relativePath: string, };
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,3 @@
|
||||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||||
|
|
||||||
export interface WebSocketVaultPathChange { vaultUpdateId: number, documentId: string, relativePath: string, updatedDate: string, userId: string, deviceId: string, }
|
export interface WebSocketVaultPathChange { vaultUpdateId: number, documentId: string, relativePath: string, }
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ use sqlx::{ConnectOptions, Connection, sqlite::SqliteConnectOptions, types::chro
|
||||||
|
|
||||||
pub mod models;
|
pub mod models;
|
||||||
|
|
||||||
/// Sentinel error indicating the SQLite database is busy (SQLITE_BUSY).
|
/// Sentinel error indicating the `SQLite` database is busy (`SQLITE_BUSY`).
|
||||||
/// Handlers can downcast to this to return 429 instead of 500.
|
/// Handlers can downcast to this to return 429 instead of 500.
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
#[error("Database is busy")]
|
#[error("Database is busy")]
|
||||||
|
|
@ -76,11 +76,11 @@ pub struct Database {
|
||||||
config: DatabaseConfig,
|
config: DatabaseConfig,
|
||||||
broadcasts: Broadcasts,
|
broadcasts: Broadcasts,
|
||||||
connection_pools: Arc<Mutex<HashMap<VaultId, Arc<VaultPool>>>>,
|
connection_pools: Arc<Mutex<HashMap<VaultId, Arc<VaultPool>>>>,
|
||||||
/// Per-vault write serialization. SQLite allows only one writer at a
|
/// Per-vault write serialization. `SQLite` allows only one writer at a
|
||||||
/// time; `BEGIN IMMEDIATE` on a second connection blocks until the first
|
/// time; `BEGIN IMMEDIATE` on a second connection blocks until the first
|
||||||
/// commits (up to `busy_timeout`). Under concurrent load the blocked
|
/// commits (up to `busy_timeout`). Under concurrent load the blocked
|
||||||
/// connections consume the pool, starving even read-only requests.
|
/// connections consume the pool, starving even read-only requests.
|
||||||
/// This mutex moves the wait from the SQLite layer (where it holds a
|
/// This mutex moves the wait from the `SQLite` layer (where it holds a
|
||||||
/// pool connection) to the Tokio layer (where it holds nothing).
|
/// pool connection) to the Tokio layer (where it holds nothing).
|
||||||
write_locks: Arc<Mutex<HashMap<VaultId, Arc<tokio::sync::Mutex<()>>>>>,
|
write_locks: Arc<Mutex<HashMap<VaultId, Arc<tokio::sync::Mutex<()>>>>>,
|
||||||
/// Monotonic epoch for lock-free `last_accessed_ms` timestamps
|
/// Monotonic epoch for lock-free `last_accessed_ms` timestamps
|
||||||
|
|
@ -768,9 +768,6 @@ impl Database {
|
||||||
vault_update_id: version.vault_update_id,
|
vault_update_id: version.vault_update_id,
|
||||||
document_id: version.document_id,
|
document_id: version.document_id,
|
||||||
relative_path: version.relative_path.clone(),
|
relative_path: version.relative_path.clone(),
|
||||||
updated_date: version.updated_date,
|
|
||||||
user_id: version.user_id.clone(),
|
|
||||||
device_id: version.device_id.clone(),
|
|
||||||
},
|
},
|
||||||
)),
|
)),
|
||||||
);
|
);
|
||||||
|
|
|
||||||
|
|
@ -82,11 +82,7 @@ impl Broadcasts {
|
||||||
/// Synchronous: safe to invoke from a handler between `commit()` and
|
/// Synchronous: safe to invoke from a handler between `commit()` and
|
||||||
/// function return without worrying about task cancellation dropping
|
/// function return without worrying about task cancellation dropping
|
||||||
/// the broadcast mid-flight. Failures are logged, never propagated.
|
/// the broadcast mid-flight. Failures are logged, never propagated.
|
||||||
pub fn send_document_update(
|
pub fn send_document_update(&self, vault: VaultId, document: WebSocketServerMessageWithOrigin) {
|
||||||
&self,
|
|
||||||
vault: VaultId,
|
|
||||||
document: WebSocketServerMessageWithOrigin,
|
|
||||||
) {
|
|
||||||
let mut tx_map = self
|
let mut tx_map = self
|
||||||
.tx
|
.tx
|
||||||
.lock()
|
.lock()
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
|
||||||
use ts_rs::TS;
|
use ts_rs::TS;
|
||||||
|
|
||||||
use crate::app_state::database::models::{
|
use crate::app_state::database::models::{
|
||||||
DeviceId, DocumentId, DocumentVersionWithoutContent, UserId, VaultUpdateId,
|
DeviceId, DocumentId, DocumentVersionWithoutContent, VaultUpdateId,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(TS, Deserialize, Clone, Debug)]
|
#[derive(TS, Deserialize, Clone, Debug)]
|
||||||
|
|
|
||||||
|
|
@ -33,9 +33,9 @@ pub fn get_authenticated_handshake(
|
||||||
let user = auth(state, handshake.token.trim(), vault_id)?;
|
let user = auth(state, handshake.token.trim(), vault_id)?;
|
||||||
Ok(AuthenticatedWebSocketHandshake { handshake, user })
|
Ok(AuthenticatedWebSocketHandshake { handshake, user })
|
||||||
}
|
}
|
||||||
WebSocketClientMessage::CursorPositions(_) => Err(
|
WebSocketClientMessage::CursorPositions(_) => Err(unauthenticated_error(
|
||||||
unauthenticated_error(anyhow::anyhow!("Expected a handshake message")),
|
anyhow::anyhow!("Expected a handshake message"),
|
||||||
),
|
)),
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Err(unauthenticated_error(anyhow::anyhow!(
|
Err(unauthenticated_error(anyhow::anyhow!(
|
||||||
|
|
|
||||||
|
|
@ -38,7 +38,7 @@ fn default_cursor_timeout() -> Duration {
|
||||||
impl DatabaseConfig {
|
impl DatabaseConfig {
|
||||||
pub fn validate(&self) -> Result<()> {
|
pub fn validate(&self) -> Result<()> {
|
||||||
ensure!(
|
ensure!(
|
||||||
self.databases_directory_path.as_os_str().len() > 0,
|
!self.databases_directory_path.as_os_str().is_empty(),
|
||||||
"databases_directory_path must not be empty"
|
"databases_directory_path must not be empty"
|
||||||
);
|
);
|
||||||
ensure!(
|
ensure!(
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,9 @@ use log::debug;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
consts::{DEFAULT_LOG_DIRECTORY, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_INTERVAL, DURATION_ZERO},
|
consts::{
|
||||||
|
DEFAULT_LOG_DIRECTORY, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_INTERVAL, DURATION_ZERO,
|
||||||
|
},
|
||||||
utils::log_level::LogLevel,
|
utils::log_level::LogLevel,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -27,7 +29,10 @@ impl LoggingConfig {
|
||||||
!self.log_directory.is_empty(),
|
!self.log_directory.is_empty(),
|
||||||
"log_directory must not be an empty string"
|
"log_directory must not be an empty string"
|
||||||
);
|
);
|
||||||
ensure!(self.log_rotation > DURATION_ZERO, "log_rotation must be greater than 0");
|
ensure!(
|
||||||
|
self.log_rotation > DURATION_ZERO,
|
||||||
|
"log_rotation must be greater than 0"
|
||||||
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -98,9 +98,7 @@ impl IntoResponse for SyncServerError {
|
||||||
Self::NotFound(_) => (StatusCode::NOT_FOUND, body).into_response(),
|
Self::NotFound(_) => (StatusCode::NOT_FOUND, body).into_response(),
|
||||||
Self::Unauthenticated(_) => (StatusCode::UNAUTHORIZED, body).into_response(),
|
Self::Unauthenticated(_) => (StatusCode::UNAUTHORIZED, body).into_response(),
|
||||||
Self::PermissionDeniedError(_) => (StatusCode::FORBIDDEN, body).into_response(),
|
Self::PermissionDeniedError(_) => (StatusCode::FORBIDDEN, body).into_response(),
|
||||||
Self::TooManyRequests(_) => {
|
Self::TooManyRequests(_) => (StatusCode::TOO_MANY_REQUESTS, body).into_response(),
|
||||||
(StatusCode::TOO_MANY_REQUESTS, body).into_response()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -171,7 +169,10 @@ pub fn too_many_requests_error(error: anyhow::Error) -> SyncServerError {
|
||||||
/// Maps a `create_write_transaction` error to 429 if the database is busy,
|
/// Maps a `create_write_transaction` error to 429 if the database is busy,
|
||||||
/// or 500 for all other failures.
|
/// or 500 for all other failures.
|
||||||
pub fn write_transaction_error(error: anyhow::Error) -> SyncServerError {
|
pub fn write_transaction_error(error: anyhow::Error) -> SyncServerError {
|
||||||
if error.downcast_ref::<crate::app_state::database::WriteBusyError>().is_some() {
|
if error
|
||||||
|
.downcast_ref::<crate::app_state::database::WriteBusyError>()
|
||||||
|
.is_some()
|
||||||
|
{
|
||||||
too_many_requests_error(error)
|
too_many_requests_error(error)
|
||||||
} else {
|
} else {
|
||||||
server_error(error)
|
server_error(error)
|
||||||
|
|
|
||||||
|
|
@ -40,4 +40,3 @@ pub struct UpdateTextDocumentVersion {
|
||||||
#[ts(type = "Array<number | string>")]
|
#[ts(type = "Array<number | string>")]
|
||||||
pub content: Vec<NumberOrText>,
|
pub content: Vec<NumberOrText>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,9 @@ use crate::{
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
config::user_config::User,
|
config::user_config::User,
|
||||||
errors::{SyncServerError, client_error, not_found_error, server_error, write_transaction_error},
|
errors::{
|
||||||
|
SyncServerError, client_error, not_found_error, server_error, write_transaction_error,
|
||||||
|
},
|
||||||
utils::{find_first_available_path::find_first_available_path, normalize::normalize},
|
utils::{find_first_available_path::find_first_available_path, normalize::normalize},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,15 +1,3 @@
|
||||||
use anyhow::Context;
|
|
||||||
use axum::{
|
|
||||||
extract::{
|
|
||||||
Path, State,
|
|
||||||
ws::{Message, WebSocket, WebSocketUpgrade},
|
|
||||||
},
|
|
||||||
response::Response,
|
|
||||||
};
|
|
||||||
use futures::sink::SinkExt;
|
|
||||||
use futures::stream::StreamExt;
|
|
||||||
use log::{debug, info, warn};
|
|
||||||
use serde::Deserialize;
|
|
||||||
use crate::{
|
use crate::{
|
||||||
app_state::{
|
app_state::{
|
||||||
AppState,
|
AppState,
|
||||||
|
|
@ -25,12 +13,23 @@ use crate::{
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
consts::{
|
consts::{
|
||||||
HANDSHAKE_TIMEOUT, MAX_CURSORS_PER_DOCUMENT, MAX_CURSOR_DOCUMENTS,
|
HANDSHAKE_TIMEOUT, MAX_CURSOR_DOCUMENTS, MAX_CURSORS_PER_DOCUMENT, MAX_RELATIVE_PATH_LEN,
|
||||||
MAX_RELATIVE_PATH_LEN,
|
|
||||||
},
|
},
|
||||||
errors::{SyncServerError, client_error, server_error},
|
errors::{SyncServerError, client_error, server_error},
|
||||||
utils::normalize::normalize,
|
utils::normalize::normalize,
|
||||||
};
|
};
|
||||||
|
use anyhow::Context;
|
||||||
|
use axum::{
|
||||||
|
extract::{
|
||||||
|
Path, State,
|
||||||
|
ws::{Message, WebSocket, WebSocketUpgrade},
|
||||||
|
},
|
||||||
|
response::Response,
|
||||||
|
};
|
||||||
|
use futures::sink::SinkExt;
|
||||||
|
use futures::stream::StreamExt;
|
||||||
|
use log::{debug, info, warn};
|
||||||
|
use serde::Deserialize;
|
||||||
|
|
||||||
/// Tracks a pending (not yet authenticated) WebSocket connection.
|
/// Tracks a pending (not yet authenticated) WebSocket connection.
|
||||||
/// Decrements the counter when dropped, ensuring cleanup even if
|
/// Decrements the counter when dropped, ensuring cleanup even if
|
||||||
|
|
@ -39,8 +38,7 @@ struct PendingWsGuard(std::sync::Arc<std::sync::atomic::AtomicUsize>);
|
||||||
|
|
||||||
impl Drop for PendingWsGuard {
|
impl Drop for PendingWsGuard {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.0
|
self.0.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
|
||||||
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -112,9 +110,7 @@ async fn websocket(
|
||||||
drop(pending_guard);
|
drop(pending_guard);
|
||||||
|
|
||||||
let max_clients = state.config.server.max_clients_per_vault;
|
let max_clients = state.config.server.max_clients_per_vault;
|
||||||
let mut broadcast_receiver = match state
|
let mut broadcast_receiver = match state.broadcasts.get_receiver(vault_id.clone(), max_clients)
|
||||||
.broadcasts
|
|
||||||
.get_receiver(vault_id.clone(), max_clients)
|
|
||||||
{
|
{
|
||||||
Ok(receiver) => receiver,
|
Ok(receiver) => receiver,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
|
@ -229,7 +225,9 @@ async fn websocket(
|
||||||
&& doc.relative_path.len() <= MAX_RELATIVE_PATH_LEN
|
&& doc.relative_path.len() <= MAX_RELATIVE_PATH_LEN
|
||||||
});
|
});
|
||||||
if !valid {
|
if !valid {
|
||||||
warn!("Cursor update rejected: a document exceeds cursor or path length limits");
|
warn!(
|
||||||
|
"Cursor update rejected: a document exceeds cursor or path length limits"
|
||||||
|
);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@ use anyhow::Result;
|
||||||
use log::{debug, info};
|
use log::{debug, info};
|
||||||
use sqlx::sqlite::SqliteConnection;
|
use sqlx::sqlite::SqliteConnection;
|
||||||
|
|
||||||
|
|
||||||
pub async fn find_first_available_path(
|
pub async fn find_first_available_path(
|
||||||
vault_id: &VaultId,
|
vault_id: &VaultId,
|
||||||
sanitized_relative_path: &str,
|
sanitized_relative_path: &str,
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,10 @@ pub fn sanitize_path(path: &str) -> Result<String> {
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.join("/");
|
.join("/");
|
||||||
|
|
||||||
ensure!(!result.is_empty(), "Relative path is empty after sanitization");
|
ensure!(
|
||||||
|
!result.is_empty(),
|
||||||
|
"Relative path is empty after sanitization"
|
||||||
|
);
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue