Add vault listing endpoint
This commit is contained in:
parent
9ae1a5e09e
commit
44947dc3a5
6 changed files with 314 additions and 5 deletions
|
|
@ -6,7 +6,7 @@ use log::info;
|
|||
use models::{
|
||||
DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId, VaultUpdateId,
|
||||
};
|
||||
use sqlx::{ConnectOptions, sqlite::SqliteConnectOptions, types::chrono::Utc};
|
||||
use sqlx::{ConnectOptions, Connection, sqlite::SqliteConnectOptions, types::chrono::Utc};
|
||||
|
||||
pub mod models;
|
||||
|
||||
|
|
@ -171,6 +171,45 @@ fn rollback_before_acquire(
|
|||
}
|
||||
|
||||
impl Database {
|
||||
/// Lists all vault IDs that exist on disk (have a `.sqlite` file).
|
||||
pub async fn list_vaults(&self) -> Result<Vec<VaultId>> {
|
||||
let mut vaults = Vec::new();
|
||||
let mut entries = tokio::fs::read_dir(&self.config.databases_directory_path)
|
||||
.await
|
||||
.context("Failed to read databases directory")?;
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let name = entry.file_name().to_string_lossy().to_string();
|
||||
if let Some(vault) = name.strip_suffix(".sqlite") {
|
||||
vaults.push(vault.to_owned());
|
||||
}
|
||||
}
|
||||
vaults.sort();
|
||||
Ok(vaults)
|
||||
}
|
||||
|
||||
pub async fn get_vault_stats(
|
||||
&self,
|
||||
vault: &VaultId,
|
||||
) -> Result<models::VaultStats> {
|
||||
let pool = self.get_connection_pool(vault).await?;
|
||||
let row = sqlx::query!(
|
||||
r#"
|
||||
SELECT
|
||||
(SELECT MIN(updated_date) FROM documents)
|
||||
AS "created_at: chrono::DateTime<Utc>",
|
||||
(SELECT COUNT(DISTINCT document_id) FROM latest_document_versions
|
||||
WHERE is_deleted = false)
|
||||
AS "document_count!: u32"
|
||||
"#,
|
||||
)
|
||||
.fetch_one(&pool)
|
||||
.await?;
|
||||
Ok(models::VaultStats {
|
||||
created_at: row.created_at,
|
||||
document_count: row.document_count,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn try_new(
|
||||
config: &DatabaseConfig,
|
||||
broadcasts: &Broadcasts,
|
||||
|
|
@ -683,6 +722,140 @@ impl Database {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Return all versions (without content) of a specific document, ordered by `vault_update_id`
|
||||
pub async fn get_document_versions(
|
||||
&self,
|
||||
vault: &VaultId,
|
||||
document_id: &DocumentId,
|
||||
connection: Option<&mut SqliteConnection>,
|
||||
) -> Result<Vec<DocumentVersionWithoutContent>> {
|
||||
let document_id = document_id.as_hyphenated();
|
||||
let query = sqlx::query!(
|
||||
r#"
|
||||
select
|
||||
vault_update_id,
|
||||
document_id as "document_id: Hyphenated",
|
||||
relative_path,
|
||||
updated_date as "updated_date: chrono::DateTime<Utc>",
|
||||
is_deleted,
|
||||
user_id,
|
||||
device_id,
|
||||
length(content) as "content_size: u64"
|
||||
from documents
|
||||
where document_id = ?
|
||||
order by vault_update_id
|
||||
"#,
|
||||
document_id,
|
||||
);
|
||||
|
||||
if let Some(conn) = connection {
|
||||
query.fetch_all(&mut *conn).await
|
||||
} else {
|
||||
query
|
||||
.fetch_all(&self.get_connection_pool(vault).await?)
|
||||
.await
|
||||
}
|
||||
.with_context(|| format!("Cannot fetch document versions for document `{document_id}`"))
|
||||
.map(|rows| {
|
||||
rows.into_iter()
|
||||
.map(|row| DocumentVersionWithoutContent {
|
||||
vault_update_id: row.vault_update_id,
|
||||
document_id: row.document_id.into(),
|
||||
relative_path: row.relative_path,
|
||||
updated_date: row.updated_date,
|
||||
is_deleted: row.is_deleted,
|
||||
user_id: row.user_id,
|
||||
device_id: row.device_id,
|
||||
content_size: row.content_size.unwrap_or(0),
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
}
|
||||
|
||||
/// Return all versions across all documents, paginated, ordered by `vault_update_id` DESC
|
||||
pub async fn get_vault_history(
|
||||
&self,
|
||||
vault: &VaultId,
|
||||
limit: i64,
|
||||
before_update_id: Option<VaultUpdateId>,
|
||||
connection: Option<&mut SqliteConnection>,
|
||||
) -> Result<Vec<DocumentVersionWithoutContent>> {
|
||||
let map_row = |row: models::VaultHistoryRow| DocumentVersionWithoutContent {
|
||||
vault_update_id: row.vault_update_id,
|
||||
document_id: row.document_id,
|
||||
relative_path: row.relative_path,
|
||||
updated_date: row.updated_date,
|
||||
is_deleted: row.is_deleted,
|
||||
user_id: row.user_id,
|
||||
device_id: row.device_id,
|
||||
content_size: row.content_size.unwrap_or(0),
|
||||
};
|
||||
|
||||
if let Some(before) = before_update_id {
|
||||
let query = sqlx::query_as!(
|
||||
models::VaultHistoryRow,
|
||||
r#"
|
||||
select
|
||||
vault_update_id,
|
||||
document_id as "document_id: Hyphenated",
|
||||
relative_path,
|
||||
updated_date as "updated_date: chrono::DateTime<Utc>",
|
||||
is_deleted,
|
||||
user_id,
|
||||
device_id,
|
||||
length(content) as "content_size: u64"
|
||||
from documents
|
||||
where vault_update_id < ?
|
||||
order by vault_update_id desc
|
||||
limit ?
|
||||
"#,
|
||||
before,
|
||||
limit,
|
||||
);
|
||||
|
||||
let rows = if let Some(conn) = connection {
|
||||
query.fetch_all(&mut *conn).await
|
||||
} else {
|
||||
query
|
||||
.fetch_all(&self.get_connection_pool(vault).await?)
|
||||
.await
|
||||
}
|
||||
.context("Cannot fetch vault history")?;
|
||||
|
||||
Ok(rows.into_iter().map(map_row).collect())
|
||||
} else {
|
||||
let query = sqlx::query_as!(
|
||||
models::VaultHistoryRow,
|
||||
r#"
|
||||
select
|
||||
vault_update_id,
|
||||
document_id as "document_id: Hyphenated",
|
||||
relative_path,
|
||||
updated_date as "updated_date: chrono::DateTime<Utc>",
|
||||
is_deleted,
|
||||
user_id,
|
||||
device_id,
|
||||
length(content) as "content_size: u64"
|
||||
from documents
|
||||
order by vault_update_id desc
|
||||
limit ?
|
||||
"#,
|
||||
limit,
|
||||
);
|
||||
|
||||
let rows = if let Some(conn) = connection {
|
||||
query.fetch_all(&mut *conn).await
|
||||
} else {
|
||||
query
|
||||
.fetch_all(&self.get_connection_pool(vault).await?)
|
||||
.await
|
||||
}
|
||||
.context("Cannot fetch vault history")?;
|
||||
|
||||
Ok(rows.into_iter().map(map_row).collect())
|
||||
}
|
||||
}
|
||||
|
||||
/// Cleanup idle connection pools that haven't been accessed in more than 5 minutes
|
||||
async fn cleanup_idle_pools(&self) {
|
||||
// Collect idle vaults and remove them from the map while holding
|
||||
|
|
|
|||
|
|
@ -77,6 +77,24 @@ pub struct DocumentVersion {
|
|||
pub device_id: DeviceId,
|
||||
}
|
||||
|
||||
/// Row struct for vault history queries (used by `sqlx::query_as!`)
|
||||
#[derive(Debug)]
|
||||
pub struct VaultHistoryRow {
|
||||
pub vault_update_id: VaultUpdateId,
|
||||
pub document_id: DocumentId,
|
||||
pub relative_path: String,
|
||||
pub updated_date: DateTime<Utc>,
|
||||
pub is_deleted: bool,
|
||||
pub user_id: String,
|
||||
pub device_id: String,
|
||||
pub content_size: Option<u64>,
|
||||
}
|
||||
|
||||
pub struct VaultStats {
|
||||
pub created_at: Option<DateTime<Utc>>,
|
||||
pub document_count: u32,
|
||||
}
|
||||
|
||||
impl From<StoredDocumentVersion> for DocumentVersion {
|
||||
fn from(value: StoredDocumentVersion) -> Self {
|
||||
Self {
|
||||
|
|
|
|||
|
|
@ -55,6 +55,8 @@ pub async fn create_server(config: Config) -> Result<()> {
|
|||
let app = Router::new()
|
||||
.nest("/", get_authed_routes(app_state.clone()))
|
||||
.route("/", get(index::index))
|
||||
.route("/assets/*path", get(index::spa_assets))
|
||||
.route("/vaults", get(list_vaults::list_vaults))
|
||||
.route("/vaults/:vault_id/ping", get(ping::ping))
|
||||
.route("/vaults/:vault_id/ws", get(websocket::websocket_handler))
|
||||
.fallback(index::spa_fallback);
|
||||
|
|
@ -106,7 +108,7 @@ fn build_cors_layer(server_config: &ServerConfig) -> Result<CorsLayer> {
|
|||
let origins = &server_config.allowed_origins;
|
||||
|
||||
let cors = if origins.len() == 1 && origins[0] == "*" {
|
||||
info!("CORS: allowing all origins (wildcard)");
|
||||
info!("CORS: allowing all origins");
|
||||
let header: HeaderValue = "*"
|
||||
.parse()
|
||||
.context("Failed to parse wildcard CORS origin")?;
|
||||
|
|
|
|||
|
|
@ -41,13 +41,17 @@ pub async fn auth_middleware(
|
|||
Ok(next.run(req).await)
|
||||
}
|
||||
|
||||
pub fn auth(state: &AppState, token: &str, vault_id: &VaultId) -> Result<User, SyncServerError> {
|
||||
let user = state
|
||||
pub fn authenticate(state: &AppState, token: &str) -> Result<User, SyncServerError> {
|
||||
state
|
||||
.config
|
||||
.users
|
||||
.get_user(token)
|
||||
.cloned()
|
||||
.ok_or_else(|| unauthenticated_error(anyhow::anyhow!("Invalid token")))?;
|
||||
.ok_or_else(|| unauthenticated_error(anyhow::anyhow!("Invalid token")))
|
||||
}
|
||||
|
||||
pub fn auth(state: &AppState, token: &str, vault_id: &VaultId) -> Result<User, SyncServerError> {
|
||||
let user = authenticate(state, token)?;
|
||||
|
||||
if match user.vault_access {
|
||||
VaultAccess::AllowAccessToAll => true,
|
||||
|
|
|
|||
82
sync-server/src/server/list_vaults.rs
Normal file
82
sync-server/src/server/list_vaults.rs
Normal file
|
|
@ -0,0 +1,82 @@
|
|||
use axum::{
|
||||
Json,
|
||||
extract::{Query, State},
|
||||
};
|
||||
use axum_extra::{
|
||||
TypedHeader,
|
||||
headers::{Authorization, authorization::Bearer},
|
||||
};
|
||||
use log::debug;
|
||||
use serde::Deserialize;
|
||||
|
||||
use super::{
|
||||
auth::authenticate,
|
||||
responses::{ListVaultsResponse, VaultInfo},
|
||||
};
|
||||
use crate::{
|
||||
app_state::AppState,
|
||||
config::user_config::{AllowListedVaults, VaultAccess},
|
||||
errors::{SyncServerError, server_error, unauthenticated_error},
|
||||
};
|
||||
|
||||
const DEFAULT_LIMIT: usize = 50;
|
||||
const MAX_LIMIT: usize = 200;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct QueryParams {
|
||||
limit: Option<usize>,
|
||||
after: Option<String>,
|
||||
}
|
||||
|
||||
#[axum::debug_handler]
|
||||
pub async fn list_vaults(
|
||||
auth_header: Option<TypedHeader<Authorization<Bearer>>>,
|
||||
Query(QueryParams { limit, after }): Query<QueryParams>,
|
||||
State(state): State<AppState>,
|
||||
) -> Result<Json<ListVaultsResponse>, SyncServerError> {
|
||||
let auth_header = auth_header
|
||||
.ok_or_else(|| unauthenticated_error(anyhow::anyhow!("Missing Authorization header")))?;
|
||||
|
||||
let user = authenticate(&state, auth_header.token().trim())?;
|
||||
|
||||
debug!("User `{}` listing accessible vaults", user.name);
|
||||
|
||||
let existing_vaults = state.database.list_vaults().await.map_err(server_error)?;
|
||||
|
||||
let mut accessible: Vec<String> = match user.vault_access {
|
||||
VaultAccess::AllowAccessToAll => existing_vaults,
|
||||
VaultAccess::AllowList(AllowListedVaults { ref allowed }) => existing_vaults
|
||||
.into_iter()
|
||||
.filter(|v| allowed.contains(v))
|
||||
.collect(),
|
||||
};
|
||||
|
||||
// Cursor-based pagination: skip vaults up to and including `after`
|
||||
if let Some(ref cursor) = after {
|
||||
accessible.retain(|v| v.as_str() > cursor.as_str());
|
||||
}
|
||||
|
||||
let limit = limit.unwrap_or(DEFAULT_LIMIT).clamp(1, MAX_LIMIT);
|
||||
let has_more = accessible.len() > limit;
|
||||
accessible.truncate(limit);
|
||||
|
||||
let mut vaults = Vec::with_capacity(accessible.len());
|
||||
for name in accessible {
|
||||
let stats = state
|
||||
.database
|
||||
.get_vault_stats(&name)
|
||||
.await
|
||||
.map_err(server_error)?;
|
||||
vaults.push(VaultInfo {
|
||||
name,
|
||||
document_count: stats.document_count,
|
||||
created_at: stats.created_at,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(Json(ListVaultsResponse {
|
||||
vaults,
|
||||
has_more,
|
||||
user_name: user.name,
|
||||
}))
|
||||
}
|
||||
|
|
@ -1,3 +1,4 @@
|
|||
use chrono::{DateTime, Utc};
|
||||
use serde::{self, Serialize};
|
||||
use ts_rs::TS;
|
||||
|
||||
|
|
@ -36,6 +37,35 @@ pub struct FetchLatestDocumentsResponse {
|
|||
pub last_update_id: VaultUpdateId,
|
||||
}
|
||||
|
||||
/// Response to a vault history request (paginated).
|
||||
#[derive(TS, Debug, Clone, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export)]
|
||||
pub struct VaultHistoryResponse {
|
||||
pub versions: Vec<DocumentVersionWithoutContent>,
|
||||
pub has_more: bool,
|
||||
}
|
||||
|
||||
/// Summary of a single vault returned by the list-vaults endpoint.
|
||||
#[derive(TS, Debug, Clone, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export)]
|
||||
pub struct VaultInfo {
|
||||
pub name: String,
|
||||
pub document_count: u32,
|
||||
pub created_at: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
/// Response to listing vaults accessible to the authenticated user.
|
||||
#[derive(TS, Debug, Clone, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export)]
|
||||
pub struct ListVaultsResponse {
|
||||
pub vaults: Vec<VaultInfo>,
|
||||
pub has_more: bool,
|
||||
pub user_name: String,
|
||||
}
|
||||
|
||||
/// Response to an update document request.
|
||||
#[derive(TS, Debug, Clone, Serialize)]
|
||||
#[serde(tag = "type")]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue