diff --git a/backend/sync_server/src/database.rs b/backend/sync_server/src/database.rs index 2222ce3..0241651 100644 --- a/backend/sync_server/src/database.rs +++ b/backend/sync_server/src/database.rs @@ -1,7 +1,9 @@ use std::str::FromStr; use anyhow::{Context, Result}; -use models::{DocumentVersionWithoutContent, StoredDocumentVersion, VaultId, VaultUpdateId}; +use models::{ + DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId, VaultUpdateId, +}; use sqlx::{sqlite::SqliteConnectOptions, types::chrono::Utc}; pub mod models; use sqlx::{sqlite::SqlitePoolOptions, Pool, Sqlite}; @@ -54,6 +56,7 @@ impl Database { .context("Cannot create transaction") } + /// Return the latest state of all non-deleted documents in the vault pub async fn get_latest_documents( &self, vault: &VaultId, @@ -65,6 +68,7 @@ impl Database { select vault_id, vault_update_id, + document_id as "document_id: uuid::Uuid", relative_path, created_date as "created_date: chrono::DateTime", updated_date as "updated_date: chrono::DateTime", @@ -83,6 +87,42 @@ impl Database { .context("Cannot fetch latest documents") } + /// Return the latest state of all documents (including deleted) in the + /// vault which have changed since the given update id + pub async fn get_latest_documents_since( + &self, + vault: &VaultId, + vault_update_id: VaultUpdateId, + transaction: Option<&mut Transaction<'_>>, + ) -> Result> { + let query = sqlx::query_as!( + DocumentVersionWithoutContent, + r#" + select + vault_id, + vault_update_id, + document_id as "document_id: uuid::Uuid", + relative_path, + created_date as "created_date: chrono::DateTime", + updated_date as "updated_date: chrono::DateTime", + is_deleted + from latest_document_versions + where vault_id = ? and vault_update_id > ? + "#, + vault, + vault_update_id + ); + + if let Some(transaction) = transaction { + query.fetch_all(&mut **transaction).await + } else { + query.fetch_all(&self.connection_pool).await + } + .with_context(|| { + format!("Cannot fetch latest documents since vault_update_id {vault_update_id}") + }) + } + pub async fn get_max_update_id_in_vault( &self, vault: &VaultId, @@ -106,40 +146,7 @@ impl Database { .context("Cannot fetch max update id in vault") } - pub async fn get_latest_documents_since( - &self, - vault: &VaultId, - vault_update_id: VaultUpdateId, - transaction: Option<&mut Transaction<'_>>, - ) -> Result> { - let query = sqlx::query_as!( - DocumentVersionWithoutContent, - r#" - select - vault_id, - vault_update_id, - relative_path, - created_date as "created_date: chrono::DateTime", - updated_date as "updated_date: chrono::DateTime", - is_deleted - from latest_document_versions - where vault_id = ? and vault_update_id > ? - "#, - vault, - vault_update_id - ); - - if let Some(transaction) = transaction { - query.fetch_all(&mut **transaction).await - } else { - query.fetch_all(&self.connection_pool).await - } - .with_context(|| { - format!("Cannot fetch latest documents since vault_update_id {vault_update_id}") - }) - } - - pub async fn get_latest_document( + pub async fn get_latest_document_by_path( &self, vault: &VaultId, relative_path: &str, @@ -151,6 +158,7 @@ impl Database { select vault_id, vault_update_id, + document_id as "document_id: uuid::Uuid", relative_path, created_date as "created_date: chrono::DateTime", updated_date as "updated_date: chrono::DateTime", @@ -171,6 +179,39 @@ impl Database { .context("Cannot fetch latest document version") } + pub async fn get_latest_document( + &self, + vault: &VaultId, + document_id: &DocumentId, + transaction: Option<&mut Transaction<'_>>, + ) -> Result> { + let query = sqlx::query_as!( + StoredDocumentVersion, + r#" + select + vault_id, + vault_update_id, + document_id as "document_id: uuid::Uuid", + relative_path, + created_date as "created_date: chrono::DateTime", + updated_date as "updated_date: chrono::DateTime", + content, + is_deleted + from latest_document_versions + where vault_id = ? and document_id = ? + "#, + vault, + document_id + ); + + if let Some(transaction) = transaction { + query.fetch_optional(&mut **transaction).await + } else { + query.fetch_optional(&self.connection_pool).await + } + .context("Cannot fetch latest document version") + } + pub async fn get_document_version( &self, vault: &VaultId, @@ -183,6 +224,7 @@ impl Database { select vault_id, vault_update_id, + document_id as "document_id: uuid::Uuid", relative_path, created_date as "created_date: chrono::DateTime", updated_date as "updated_date: chrono::DateTime", @@ -212,16 +254,18 @@ impl Database { insert into documents ( vault_id, vault_update_id, + document_id, relative_path, created_date, updated_date, content, is_deleted ) - values (?, ?, ?, ?, ?, ?, ?) + values (?, ?, ?, ?, ?, ?, ?, ?) "#, version.vault_id, version.vault_update_id, + version.document_id, version.relative_path, version.created_date, version.updated_date, diff --git a/backend/sync_server/src/database/migrations/20241207143519_bootstrap.sql b/backend/sync_server/src/database/migrations/20241207143519_bootstrap.sql index f1c7ac8..6ab89a4 100644 --- a/backend/sync_server/src/database/migrations/20241207143519_bootstrap.sql +++ b/backend/sync_server/src/database/migrations/20241207143519_bootstrap.sql @@ -1,6 +1,7 @@ CREATE TABLE IF NOT EXISTS documents ( vault_id TEXT NOT NULL, vault_update_id INTEGER NOT NULL, + document_id TEXT UNIQUE NOT NULL, relative_path TEXT NOT NULL, created_date TIMESTAMP NOT NULL, updated_date TIMESTAMP NOT NULL, @@ -15,10 +16,13 @@ FROM documents d INNER JOIN ( SELECT vault_id, MAX(vault_update_id) AS max_version_id FROM documents - GROUP BY vault_id, relative_path + GROUP BY vault_id, document_id ) max_versions ON d.vault_id = max_versions.vault_id AND d.vault_update_id = max_versions.max_version_id; -CREATE INDEX IF NOT EXISTS idx_documents_vault_doc +CREATE INDEX IF NOT EXISTS idx_documents_vault_id_relative_path ON documents (vault_id, relative_path); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_documents_vault_id_document_id +ON documents (vault_id, document_id); diff --git a/backend/sync_server/src/database/models.rs b/backend/sync_server/src/database/models.rs index d17cf4b..ba9d05f 100644 --- a/backend/sync_server/src/database/models.rs +++ b/backend/sync_server/src/database/models.rs @@ -5,11 +5,13 @@ use sync_lib::bytes_to_base64; pub type VaultId = String; pub type VaultUpdateId = i64; +pub type DocumentId = uuid::Uuid; #[derive(Debug, Clone)] pub struct StoredDocumentVersion { pub vault_id: VaultId, pub vault_update_id: VaultUpdateId, + pub document_id: DocumentId, pub relative_path: String, pub created_date: DateTime, pub updated_date: DateTime, @@ -17,11 +19,18 @@ pub struct StoredDocumentVersion { pub is_deleted: bool, } +impl PartialEq for StoredDocumentVersion { + fn eq(&self, other: &Self) -> bool { + self.vault_id == other.vault_id && self.vault_update_id == other.vault_update_id + } +} + #[derive(Debug, Clone, Serialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct DocumentVersionWithoutContent { pub vault_id: VaultId, pub vault_update_id: VaultUpdateId, + pub document_id: DocumentId, pub relative_path: String, pub created_date: DateTime, pub updated_date: DateTime, @@ -33,6 +42,7 @@ impl From for DocumentVersionWithoutContent { Self { vault_id: value.vault_id, vault_update_id: value.vault_update_id, + document_id: value.document_id, relative_path: value.relative_path, created_date: value.created_date, updated_date: value.updated_date, @@ -46,6 +56,7 @@ impl From for DocumentVersionWithoutContent { pub struct DocumentVersion { pub vault_id: VaultId, pub vault_update_id: VaultUpdateId, + pub document_id: DocumentId, pub relative_path: String, pub created_date: DateTime, pub updated_date: DateTime, @@ -58,6 +69,7 @@ impl From for DocumentVersion { Self { vault_id: value.vault_id, vault_update_id: value.vault_update_id, + document_id: value.document_id, relative_path: value.relative_path, created_date: value.created_date, updated_date: value.updated_date, diff --git a/backend/sync_server/src/server.rs b/backend/sync_server/src/server.rs index c0b8073..501b4cb 100644 --- a/backend/sync_server/src/server.rs +++ b/backend/sync_server/src/server.rs @@ -1,6 +1,6 @@ use aide::{ axum::{ - routing::{delete, get, put}, + routing::{delete, get, post, put}, ApiRouter, }, openapi::{Info, OpenApi}, @@ -18,6 +18,7 @@ use tower_http::cors::CorsLayer; use crate::app_state::AppState; mod auth; +mod create_document; mod delete_document; mod fetch_latest_document_version; mod fetch_latest_documents; @@ -47,15 +48,19 @@ pub async fn create_server(app_state: AppState) -> Result<()> { get(fetch_latest_documents::fetch_latest_documents), ) .api_route( - "/vaults/:vault_id/documents/:relative_path", + "/vaults/:vault_id/documents", + post(create_document::create_document), + ) + .api_route( + "/vaults/:vault_id/documents/:document_id", get(fetch_latest_document_version::fetch_latest_document_version), ) .api_route( - "/vaults/:vault_id/documents/:relative_path", + "/vaults/:vault_id/documents/:document_id", put(update_document::update_document), ) .api_route( - "/vaults/:vault_id/documents/:relative_path", + "/vaults/:vault_id/documents/:document_id", delete(delete_document::delete_document), ) .api_route("/ws", get(handler)) diff --git a/backend/sync_server/src/server/create_document.rs b/backend/sync_server/src/server/create_document.rs new file mode 100644 index 0000000..796443f --- /dev/null +++ b/backend/sync_server/src/server/create_document.rs @@ -0,0 +1,109 @@ +use anyhow::Context; +use axum::{ + extract::{Path, State}, + Json, +}; +use axum_extra::{ + headers::{authorization::Bearer, Authorization}, + TypedHeader, +}; +use schemars::JsonSchema; +use serde::Deserialize; +use sync_lib::{base64_to_bytes, merge}; + +use super::{auth::auth, requests::CreateDocumentVersion}; +use crate::{ + app_state::AppState, + database::models::{DocumentVersion, StoredDocumentVersion, VaultId}, + errors::{client_error, server_error, SyncServerError}, +}; + +// This is required for aide to infer the path parameter types and names +#[derive(Deserialize, JsonSchema)] +pub struct PathParams { + vault_id: VaultId, +} + +/// Create a new document in case a document with the same doesn't exist +/// already. If a document with the same path exists, a new version is created +/// with their content merged. +#[axum::debug_handler] +pub async fn create_document( + TypedHeader(auth_header): TypedHeader>, + Path(PathParams { vault_id }): Path, + State(state): State, + Json(request): Json, +) -> Result, SyncServerError> { + auth(&state, auth_header.token())?; + + let mut transaction = state + .database + .create_transaction() + .await + .map_err(server_error)?; + + let last_update_id = state + .database + .get_max_update_id_in_vault(&vault_id, Some(&mut transaction)) + .await + .map_err(server_error)?; + + let maybe_existing_version = state + .database + .get_latest_document_by_path(&vault_id, &request.relative_path, Some(&mut transaction)) + .await + .map_err(server_error)? + .and_then(|doc| if doc.is_deleted { None } else { Some(doc) }); + + let new_version = if let Some(existing_version) = maybe_existing_version { + let content_bytes = base64_to_bytes(&request.content_base64) + .context("Failed to decode base64 content in request") + .map_err(client_error)?; + + let merged_content = merge( + &[], // the empty string is the first common parent of the two documents, + &existing_version.content, + &content_bytes, + ) + .context("Failed to decode bytes as UTF-8") + .map_err(client_error)?; + + StoredDocumentVersion { + vault_id, + vault_update_id: last_update_id + 1, + relative_path: request.relative_path, + document_id: existing_version.document_id, + content: merged_content, + created_date: request.created_date, + updated_date: chrono::Utc::now(), + is_deleted: false, + } + } else { + StoredDocumentVersion { + vault_id, + vault_update_id: last_update_id + 1, + document_id: uuid::Uuid::new_v4(), + relative_path: request.relative_path, + content: base64_to_bytes(&request.content_base64) + .context("Cannot convert base64 encoded content to bytes") + .map_err(client_error)?, + created_date: request.created_date, + updated_date: chrono::Utc::now(), + is_deleted: false, + } + }; + + state + .database + .insert_document_version(&new_version, Some(&mut transaction)) + .await + .map_err(server_error)?; + + transaction + .commit() + .await + .context("Failed to commit successful transaction") + .map_err(server_error)?; + + Ok(Json(new_version.into())) +} diff --git a/backend/sync_server/src/server/delete_document.rs b/backend/sync_server/src/server/delete_document.rs index 0db07c7..a966a54 100644 --- a/backend/sync_server/src/server/delete_document.rs +++ b/backend/sync_server/src/server/delete_document.rs @@ -13,7 +13,7 @@ use serde::Deserialize; use super::{auth::auth, requests::DeleteDocumentVersion}; use crate::{ app_state::AppState, - database::models::{StoredDocumentVersion, VaultId}, + database::models::{DocumentId, StoredDocumentVersion, VaultId}, errors::{server_error, SyncServerError}, }; @@ -21,7 +21,7 @@ use crate::{ #[derive(Deserialize, JsonSchema)] pub struct PathParams { vault_id: VaultId, - relative_path: String, + document_id: DocumentId, } #[axum::debug_handler] @@ -29,7 +29,7 @@ pub async fn delete_document( TypedHeader(auth_header): TypedHeader>, Path(PathParams { vault_id, - relative_path, + document_id, }): Path, State(state): State, Json(request): Json, @@ -51,7 +51,8 @@ pub async fn delete_document( let new_version = StoredDocumentVersion { vault_id, vault_update_id: last_update_id + 1, - relative_path, + document_id, + relative_path: request.relative_path, content: vec![], created_date: request.created_date, updated_date: chrono::Utc::now(), diff --git a/backend/sync_server/src/server/fetch_latest_document_version.rs b/backend/sync_server/src/server/fetch_latest_document_version.rs index 0e84ff6..3cf617e 100644 --- a/backend/sync_server/src/server/fetch_latest_document_version.rs +++ b/backend/sync_server/src/server/fetch_latest_document_version.rs @@ -13,7 +13,7 @@ use serde::Deserialize; use super::auth::auth; use crate::{ app_state::AppState, - database::models::{DocumentVersion, VaultId}, + database::models::{DocumentId, DocumentVersion, VaultId}, errors::{not_found_error, server_error, SyncServerError}, }; @@ -21,7 +21,7 @@ use crate::{ #[derive(Deserialize, JsonSchema)] pub struct PathParams { vault_id: VaultId, - relative_path: String, + document_id: DocumentId, } #[axum::debug_handler] @@ -29,7 +29,7 @@ pub async fn fetch_latest_document_version( TypedHeader(auth_header): TypedHeader>, Path(PathParams { vault_id, - relative_path, + document_id, }): Path, State(state): State, ) -> Result, SyncServerError> { @@ -37,14 +37,13 @@ pub async fn fetch_latest_document_version( let latest_version = state .database - .get_latest_document(&vault_id, &relative_path, None) + .get_latest_document(&vault_id, &document_id, None) .await .map_err(server_error)? .map(Ok) .unwrap_or_else(|| { Err(not_found_error(anyhow!( - "Latest document version of document `{}` not found", - relative_path + "Document with id `{document_id}` not found", ))) })?; diff --git a/backend/sync_server/src/server/requests.rs b/backend/sync_server/src/server/requests.rs index 7c0e1e4..9c52bd9 100644 --- a/backend/sync_server/src/server/requests.rs +++ b/backend/sync_server/src/server/requests.rs @@ -4,10 +4,19 @@ use serde::{self, Deserialize}; use crate::database::models::VaultUpdateId; +#[derive(Debug, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateDocumentVersion { + pub relative_path: String, + pub created_date: DateTime, + pub content_base64: String, +} + #[derive(Debug, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct UpdateDocumentVersion { - pub parent_version_id: Option, + pub parent_version_id: VaultUpdateId, + pub relative_path: String, pub created_date: DateTime, pub content_base64: String, } @@ -15,5 +24,6 @@ pub struct UpdateDocumentVersion { #[derive(Debug, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct DeleteDocumentVersion { + pub relative_path: String, pub created_date: DateTime, } diff --git a/backend/sync_server/src/server/update_document.rs b/backend/sync_server/src/server/update_document.rs index 1c06c75..b13173c 100644 --- a/backend/sync_server/src/server/update_document.rs +++ b/backend/sync_server/src/server/update_document.rs @@ -15,7 +15,7 @@ use sync_lib::{base64_to_bytes, merge}; use super::{auth::auth, requests::UpdateDocumentVersion}; use crate::{ app_state::AppState, - database::models::{DocumentVersion, StoredDocumentVersion, VaultId}, + database::models::{DocumentId, DocumentVersion, StoredDocumentVersion, VaultId}, errors::{client_error, not_found_error, server_error, SyncServerError}, }; @@ -23,7 +23,7 @@ use crate::{ #[derive(Deserialize, JsonSchema)] pub struct PathParams { vault_id: VaultId, - relative_path: String, + document_id: DocumentId, } #[axum::debug_handler] @@ -31,35 +31,26 @@ pub async fn update_document( TypedHeader(auth_header): TypedHeader>, Path(PathParams { vault_id, - relative_path, + document_id, }): Path, State(state): State, Json(request): Json, ) -> Result, SyncServerError> { auth(&state, auth_header.token())?; - let parent_content = if let Some(parent_version_id) = request.parent_version_id { - state - .database - .get_document_version(&vault_id, parent_version_id, None) - .await - .map_err(server_error)? - .map(Ok) - .unwrap_or_else(|| { - Err(not_found_error(anyhow!( - "Parent version with id `{}` not found", - parent_version_id - ))) - }) - .map(|version| version.content) - } else { - // the empty string is the first common parent of the two documents - Ok(Vec::default()) - }?; - - let content_bytes = base64_to_bytes(&request.content_base64) - .context("Failed to decode base64 content in request") - .map_err(client_error)?; + // No need for a transaction as document versions are immutable + let parent_document = state + .database + .get_document_version(&vault_id, request.parent_version_id, None) + .await + .map_err(server_error)? + .map(Ok) + .unwrap_or_else(|| { + Err(not_found_error(anyhow!( + "Parent version with id `{}` not found", + request.parent_version_id + ))) + })?; let mut transaction = state .database @@ -67,7 +58,7 @@ pub async fn update_document( .await .map_err(server_error)?; - let mut last_update_id = state + let last_update_id = state .database .get_max_update_id_in_vault(&vault_id, Some(&mut transaction)) .await @@ -75,57 +66,59 @@ pub async fn update_document( let latest_version = state .database - .get_latest_document(&vault_id, &relative_path, Some(&mut transaction)) + .get_latest_document(&vault_id, &document_id, Some(&mut transaction)) .await - .map_err(server_error)?; + .map_err(server_error)? + .map(Ok) + .unwrap_or_else(|| { + Err(not_found_error(anyhow!( + "Document with id `{document_id}` not found", + ))) + })?; - if let Some(latest) = latest_version.as_ref() { - if content_bytes == latest.content && relative_path == latest.relative_path { - info!("Document content is the same as the latest version, skipping update"); - transaction - .rollback() - .await - .context("Failed to rollback transaction") - .map_err(server_error)?; + let content_bytes = base64_to_bytes(&request.content_base64) + .context("Failed to decode base64 content in request") + .map_err(client_error)?; - return Ok(Json(latest.clone().into())); - } else if relative_path != latest.relative_path { - let delete_at_previous_path = StoredDocumentVersion { - vault_id: vault_id.clone(), - vault_update_id: last_update_id + 1, - relative_path: latest.relative_path.clone(), - content: vec![], - created_date: request.created_date, - updated_date: chrono::Utc::now(), - is_deleted: true, - }; + // Return the latest version if the content and path are the same as the latest + // version + if content_bytes == latest_version.content + && request.relative_path == latest_version.relative_path + { + info!("Document content is the same as the latest version, skipping update"); + transaction + .rollback() + .await + .context("Failed to rollback transaction") + .map_err(server_error)?; - last_update_id += 1; - - state - .database - .insert_document_version(&delete_at_previous_path, Some(&mut transaction)) - .await - .map_err(server_error)?; - } + return Ok(Json(latest_version.into())); } - let latest_version_content = latest_version - .map(|v| v.content) - .unwrap_or_else(Vec::default); + let merged_content = merge( + &parent_document.content, + &latest_version.content, + &content_bytes, + ) + .context("Failed to decode bytes as UTF-8") + .map_err(client_error)?; - let merged_content = merge(&parent_content, &latest_version_content, &content_bytes) - .context("Failed to decode bytes as UTF-8") - .map_err(client_error)?; + // We can only update the relative path if we're the first one to do so + let new_relative_path = if parent_document.relative_path == latest_version.relative_path { + request.relative_path.clone() + } else { + latest_version.relative_path.clone() + }; let new_version = StoredDocumentVersion { vault_id, + document_id, vault_update_id: last_update_id + 1, - relative_path, + relative_path: new_relative_path, content: merged_content, created_date: request.created_date, updated_date: chrono::Utc::now(), - is_deleted: false, + is_deleted: parent_document.is_deleted, }; state