From 44947dc3a52eb353290cc32dc7d3f37e88a555c1 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Sat, 28 Mar 2026 18:15:43 +0000 Subject: [PATCH] Add vault listing endpoint --- sync-server/src/app_state/database.rs | 175 ++++++++++++++++++- sync-server/src/app_state/database/models.rs | 18 ++ sync-server/src/server.rs | 4 +- sync-server/src/server/auth.rs | 10 +- sync-server/src/server/list_vaults.rs | 82 +++++++++ sync-server/src/server/responses.rs | 30 ++++ 6 files changed, 314 insertions(+), 5 deletions(-) create mode 100644 sync-server/src/server/list_vaults.rs diff --git a/sync-server/src/app_state/database.rs b/sync-server/src/app_state/database.rs index b0ef0ee7..eb450c56 100644 --- a/sync-server/src/app_state/database.rs +++ b/sync-server/src/app_state/database.rs @@ -6,7 +6,7 @@ use log::info; use models::{ DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId, VaultUpdateId, }; -use sqlx::{ConnectOptions, sqlite::SqliteConnectOptions, types::chrono::Utc}; +use sqlx::{ConnectOptions, Connection, sqlite::SqliteConnectOptions, types::chrono::Utc}; pub mod models; @@ -171,6 +171,45 @@ fn rollback_before_acquire( } impl Database { + /// Lists all vault IDs that exist on disk (have a `.sqlite` file). + pub async fn list_vaults(&self) -> Result> { + let mut vaults = Vec::new(); + let mut entries = tokio::fs::read_dir(&self.config.databases_directory_path) + .await + .context("Failed to read databases directory")?; + while let Some(entry) = entries.next_entry().await? { + let name = entry.file_name().to_string_lossy().to_string(); + if let Some(vault) = name.strip_suffix(".sqlite") { + vaults.push(vault.to_owned()); + } + } + vaults.sort(); + Ok(vaults) + } + + pub async fn get_vault_stats( + &self, + vault: &VaultId, + ) -> Result { + let pool = self.get_connection_pool(vault).await?; + let row = sqlx::query!( + r#" + SELECT + (SELECT MIN(updated_date) FROM documents) + AS "created_at: chrono::DateTime", + (SELECT COUNT(DISTINCT document_id) FROM latest_document_versions + WHERE is_deleted = false) + AS "document_count!: u32" + "#, + ) + .fetch_one(&pool) + .await?; + Ok(models::VaultStats { + created_at: row.created_at, + document_count: row.document_count, + }) + } + pub async fn try_new( config: &DatabaseConfig, broadcasts: &Broadcasts, @@ -683,6 +722,140 @@ impl Database { Ok(()) } + /// Return all versions (without content) of a specific document, ordered by `vault_update_id` + pub async fn get_document_versions( + &self, + vault: &VaultId, + document_id: &DocumentId, + connection: Option<&mut SqliteConnection>, + ) -> Result> { + let document_id = document_id.as_hyphenated(); + let query = sqlx::query!( + r#" + select + vault_update_id, + document_id as "document_id: Hyphenated", + relative_path, + updated_date as "updated_date: chrono::DateTime", + is_deleted, + user_id, + device_id, + length(content) as "content_size: u64" + from documents + where document_id = ? + order by vault_update_id + "#, + document_id, + ); + + if let Some(conn) = connection { + query.fetch_all(&mut *conn).await + } else { + query + .fetch_all(&self.get_connection_pool(vault).await?) + .await + } + .with_context(|| format!("Cannot fetch document versions for document `{document_id}`")) + .map(|rows| { + rows.into_iter() + .map(|row| DocumentVersionWithoutContent { + vault_update_id: row.vault_update_id, + document_id: row.document_id.into(), + relative_path: row.relative_path, + updated_date: row.updated_date, + is_deleted: row.is_deleted, + user_id: row.user_id, + device_id: row.device_id, + content_size: row.content_size.unwrap_or(0), + }) + .collect() + }) + } + + /// Return all versions across all documents, paginated, ordered by `vault_update_id` DESC + pub async fn get_vault_history( + &self, + vault: &VaultId, + limit: i64, + before_update_id: Option, + connection: Option<&mut SqliteConnection>, + ) -> Result> { + let map_row = |row: models::VaultHistoryRow| DocumentVersionWithoutContent { + vault_update_id: row.vault_update_id, + document_id: row.document_id, + relative_path: row.relative_path, + updated_date: row.updated_date, + is_deleted: row.is_deleted, + user_id: row.user_id, + device_id: row.device_id, + content_size: row.content_size.unwrap_or(0), + }; + + if let Some(before) = before_update_id { + let query = sqlx::query_as!( + models::VaultHistoryRow, + r#" + select + vault_update_id, + document_id as "document_id: Hyphenated", + relative_path, + updated_date as "updated_date: chrono::DateTime", + is_deleted, + user_id, + device_id, + length(content) as "content_size: u64" + from documents + where vault_update_id < ? + order by vault_update_id desc + limit ? + "#, + before, + limit, + ); + + let rows = if let Some(conn) = connection { + query.fetch_all(&mut *conn).await + } else { + query + .fetch_all(&self.get_connection_pool(vault).await?) + .await + } + .context("Cannot fetch vault history")?; + + Ok(rows.into_iter().map(map_row).collect()) + } else { + let query = sqlx::query_as!( + models::VaultHistoryRow, + r#" + select + vault_update_id, + document_id as "document_id: Hyphenated", + relative_path, + updated_date as "updated_date: chrono::DateTime", + is_deleted, + user_id, + device_id, + length(content) as "content_size: u64" + from documents + order by vault_update_id desc + limit ? + "#, + limit, + ); + + let rows = if let Some(conn) = connection { + query.fetch_all(&mut *conn).await + } else { + query + .fetch_all(&self.get_connection_pool(vault).await?) + .await + } + .context("Cannot fetch vault history")?; + + Ok(rows.into_iter().map(map_row).collect()) + } + } + /// Cleanup idle connection pools that haven't been accessed in more than 5 minutes async fn cleanup_idle_pools(&self) { // Collect idle vaults and remove them from the map while holding diff --git a/sync-server/src/app_state/database/models.rs b/sync-server/src/app_state/database/models.rs index 59d08c82..7aea3358 100644 --- a/sync-server/src/app_state/database/models.rs +++ b/sync-server/src/app_state/database/models.rs @@ -77,6 +77,24 @@ pub struct DocumentVersion { pub device_id: DeviceId, } +/// Row struct for vault history queries (used by `sqlx::query_as!`) +#[derive(Debug)] +pub struct VaultHistoryRow { + pub vault_update_id: VaultUpdateId, + pub document_id: DocumentId, + pub relative_path: String, + pub updated_date: DateTime, + pub is_deleted: bool, + pub user_id: String, + pub device_id: String, + pub content_size: Option, +} + +pub struct VaultStats { + pub created_at: Option>, + pub document_count: u32, +} + impl From for DocumentVersion { fn from(value: StoredDocumentVersion) -> Self { Self { diff --git a/sync-server/src/server.rs b/sync-server/src/server.rs index 95b0038b..48191893 100644 --- a/sync-server/src/server.rs +++ b/sync-server/src/server.rs @@ -55,6 +55,8 @@ pub async fn create_server(config: Config) -> Result<()> { let app = Router::new() .nest("/", get_authed_routes(app_state.clone())) .route("/", get(index::index)) + .route("/assets/*path", get(index::spa_assets)) + .route("/vaults", get(list_vaults::list_vaults)) .route("/vaults/:vault_id/ping", get(ping::ping)) .route("/vaults/:vault_id/ws", get(websocket::websocket_handler)) .fallback(index::spa_fallback); @@ -106,7 +108,7 @@ fn build_cors_layer(server_config: &ServerConfig) -> Result { let origins = &server_config.allowed_origins; let cors = if origins.len() == 1 && origins[0] == "*" { - info!("CORS: allowing all origins (wildcard)"); + info!("CORS: allowing all origins"); let header: HeaderValue = "*" .parse() .context("Failed to parse wildcard CORS origin")?; diff --git a/sync-server/src/server/auth.rs b/sync-server/src/server/auth.rs index 3b5474d4..7fa45abd 100644 --- a/sync-server/src/server/auth.rs +++ b/sync-server/src/server/auth.rs @@ -41,13 +41,17 @@ pub async fn auth_middleware( Ok(next.run(req).await) } -pub fn auth(state: &AppState, token: &str, vault_id: &VaultId) -> Result { - let user = state +pub fn authenticate(state: &AppState, token: &str) -> Result { + state .config .users .get_user(token) .cloned() - .ok_or_else(|| unauthenticated_error(anyhow::anyhow!("Invalid token")))?; + .ok_or_else(|| unauthenticated_error(anyhow::anyhow!("Invalid token"))) +} + +pub fn auth(state: &AppState, token: &str, vault_id: &VaultId) -> Result { + let user = authenticate(state, token)?; if match user.vault_access { VaultAccess::AllowAccessToAll => true, diff --git a/sync-server/src/server/list_vaults.rs b/sync-server/src/server/list_vaults.rs new file mode 100644 index 00000000..7ef23405 --- /dev/null +++ b/sync-server/src/server/list_vaults.rs @@ -0,0 +1,82 @@ +use axum::{ + Json, + extract::{Query, State}, +}; +use axum_extra::{ + TypedHeader, + headers::{Authorization, authorization::Bearer}, +}; +use log::debug; +use serde::Deserialize; + +use super::{ + auth::authenticate, + responses::{ListVaultsResponse, VaultInfo}, +}; +use crate::{ + app_state::AppState, + config::user_config::{AllowListedVaults, VaultAccess}, + errors::{SyncServerError, server_error, unauthenticated_error}, +}; + +const DEFAULT_LIMIT: usize = 50; +const MAX_LIMIT: usize = 200; + +#[derive(Deserialize)] +pub struct QueryParams { + limit: Option, + after: Option, +} + +#[axum::debug_handler] +pub async fn list_vaults( + auth_header: Option>>, + Query(QueryParams { limit, after }): Query, + State(state): State, +) -> Result, SyncServerError> { + let auth_header = auth_header + .ok_or_else(|| unauthenticated_error(anyhow::anyhow!("Missing Authorization header")))?; + + let user = authenticate(&state, auth_header.token().trim())?; + + debug!("User `{}` listing accessible vaults", user.name); + + let existing_vaults = state.database.list_vaults().await.map_err(server_error)?; + + let mut accessible: Vec = match user.vault_access { + VaultAccess::AllowAccessToAll => existing_vaults, + VaultAccess::AllowList(AllowListedVaults { ref allowed }) => existing_vaults + .into_iter() + .filter(|v| allowed.contains(v)) + .collect(), + }; + + // Cursor-based pagination: skip vaults up to and including `after` + if let Some(ref cursor) = after { + accessible.retain(|v| v.as_str() > cursor.as_str()); + } + + let limit = limit.unwrap_or(DEFAULT_LIMIT).clamp(1, MAX_LIMIT); + let has_more = accessible.len() > limit; + accessible.truncate(limit); + + let mut vaults = Vec::with_capacity(accessible.len()); + for name in accessible { + let stats = state + .database + .get_vault_stats(&name) + .await + .map_err(server_error)?; + vaults.push(VaultInfo { + name, + document_count: stats.document_count, + created_at: stats.created_at, + }); + } + + Ok(Json(ListVaultsResponse { + vaults, + has_more, + user_name: user.name, + })) +} diff --git a/sync-server/src/server/responses.rs b/sync-server/src/server/responses.rs index a8b3fcd7..f393747d 100644 --- a/sync-server/src/server/responses.rs +++ b/sync-server/src/server/responses.rs @@ -1,3 +1,4 @@ +use chrono::{DateTime, Utc}; use serde::{self, Serialize}; use ts_rs::TS; @@ -36,6 +37,35 @@ pub struct FetchLatestDocumentsResponse { pub last_update_id: VaultUpdateId, } +/// Response to a vault history request (paginated). +#[derive(TS, Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +#[ts(export)] +pub struct VaultHistoryResponse { + pub versions: Vec, + pub has_more: bool, +} + +/// Summary of a single vault returned by the list-vaults endpoint. +#[derive(TS, Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +#[ts(export)] +pub struct VaultInfo { + pub name: String, + pub document_count: u32, + pub created_at: Option>, +} + +/// Response to listing vaults accessible to the authenticated user. +#[derive(TS, Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +#[ts(export)] +pub struct ListVaultsResponse { + pub vaults: Vec, + pub has_more: bool, + pub user_name: String, +} + /// Response to an update document request. #[derive(TS, Debug, Clone, Serialize)] #[serde(tag = "type")]