From 72be6ba18b6e12e89df5c5bb1d7897474bd162c0 Mon Sep 17 00:00:00 2001 From: Andras Schmelczer Date: Tue, 7 Jan 2025 22:29:13 +0000 Subject: [PATCH] Implement multipart upload endpoints --- backend/sync_server/src/server.rs | 32 +++--- .../sync_server/src/server/create_document.rs | 80 +++++++++++---- backend/sync_server/src/server/requests.rs | 21 ++++ .../sync_server/src/server/update_document.rs | 98 ++++++++++++++----- 4 files changed, 176 insertions(+), 55 deletions(-) diff --git a/backend/sync_server/src/server.rs b/backend/sync_server/src/server.rs index ad7e603f..da809c8e 100644 --- a/backend/sync_server/src/server.rs +++ b/backend/sync_server/src/server.rs @@ -2,34 +2,35 @@ use std::sync::Arc; use aide::{ axum::{ - routing::{delete, get, post, put}, ApiRouter, + routing::{delete, get, post, put}, }, openapi::{Info, OpenApi}, scalar::Scalar, transform::TransformOpenApi, }; -use anyhow::{anyhow, Context as _, Result}; +use anyhow::{Context as _, Result, anyhow}; use app_state::AppState; use axum::{ + Extension, Json, extract::{DefaultBodyLimit, Request}, http::{self, HeaderValue, Method}, response::IntoResponse, - Extension, Json, }; use log::{error, info}; use tokio::signal; use tower_http::{ + LatencyUnit, cors::CorsLayer, + limit::RequestBodyLimitLayer, trace::{ DefaultOnBodyChunk, DefaultOnEos, DefaultOnFailure, DefaultOnRequest, DefaultOnResponse, TraceLayer, }, - LatencyUnit, }; -use tracing::{info_span, Level}; +use tracing::{Level, info_span}; -use crate::errors::{not_found_error, SerializedError}; +use crate::errors::{SerializedError, not_found_error}; mod app_state; mod auth; mod create_document; @@ -42,8 +43,8 @@ mod responses; mod update_document; pub async fn create_server() -> Result<()> { - aide::gen::on_error(|err| error!("{err}")); - aide::gen::extract_schemas(true); + aide::r#gen::on_error(|err| error!("{err}")); + aide::r#gen::extract_schemas(true); let app_state = AppState::try_new() .await @@ -75,7 +76,11 @@ pub async fn create_server() -> Result<()> { ) .api_route( "/vaults/:vault_id/documents", - post(create_document::create_document), + post(create_document::create_document_multipart), + ) + .api_route( + "/vaults/:vault_id/documents/json", + post(create_document::create_document_json), ) .api_route( "/vaults/:vault_id/documents/:document_id", @@ -83,7 +88,11 @@ pub async fn create_server() -> Result<()> { ) .api_route( "/vaults/:vault_id/documents/:document_id", - put(update_document::update_document), + put(update_document::update_document_multipart), + ) + .api_route( + "/vaults/:vault_id/documents/:document_id/json", + put(update_document::update_document_json), ) .api_route( "/vaults/:vault_id/documents/:document_id", @@ -110,7 +119,8 @@ pub async fn create_server() -> Result<()> { .on_eos(DefaultOnEos::new()) .on_failure(DefaultOnFailure::new().level(Level::ERROR)), ) - .layer(DefaultBodyLimit::max( + .layer(DefaultBodyLimit::disable()) + .layer(RequestBodyLimitLayer::new( app_state.config.server.max_body_size_mb * 1024 * 1024, )) .layer( diff --git a/backend/sync_server/src/server/create_document.rs b/backend/sync_server/src/server/create_document.rs index ac186d0c..9dda4bcd 100644 --- a/backend/sync_server/src/server/create_document.rs +++ b/backend/sync_server/src/server/create_document.rs @@ -1,24 +1,26 @@ +use aide_axum_typed_multipart::TypedMultipart; use anyhow::Context as _; -use axum::{ - extract::{Path, State}, - Json, -}; +use axum::extract::{Path, State}; use axum_extra::{ - headers::{authorization::Bearer, Authorization}, TypedHeader, + headers::{Authorization, authorization::Bearer}, }; +use axum_jsonschema::Json; +use chrono::{DateTime, Utc}; use log::info; use schemars::JsonSchema; use serde::Deserialize; use sync_lib::{base64_to_bytes, merge}; use super::{ - app_state::AppState, auth::auth, requests::CreateDocumentVersion, + app_state::AppState, + auth::auth, + requests::{CreateDocumentVersion, CreateDocumentVersionMultipart}, responses::DocumentUpdateResponse, }; use crate::{ database::models::{StoredDocumentVersion, VaultId}, - errors::{client_error, server_error, SyncServerError}, + errors::{SyncServerError, client_error, server_error}, utils::sanitize_path, }; @@ -32,11 +34,57 @@ pub struct PathParams { /// already. If a document with the same path exists, a new version is created /// with their content merged. #[axum::debug_handler] -pub async fn create_document( +pub async fn create_document_multipart( + TypedHeader(auth_header): TypedHeader>, + Path(PathParams { vault_id }): Path, + State(state): State, + TypedMultipart(axum_typed_multipart::TypedMultipart(request)): TypedMultipart< + CreateDocumentVersionMultipart, + >, +) -> Result, SyncServerError> { + internal_create_document( + auth_header, + state, + vault_id, + request.relative_path, + request.created_date, + request.content.contents.to_vec(), + ) + .await +} + +/// Create a new document in case a document with the same doesn't exist +/// already. If a document with the same path exists, a new version is created +/// with their content merged. +#[axum::debug_handler] +pub async fn create_document_json( TypedHeader(auth_header): TypedHeader>, Path(PathParams { vault_id }): Path, State(state): State, Json(request): Json, +) -> Result, SyncServerError> { + let content_bytes = base64_to_bytes(&request.content_base64) + .context("Failed to decode base64 content in request") + .map_err(client_error)?; + + internal_create_document( + auth_header, + state, + vault_id, + request.relative_path, + request.created_date, + content_bytes, + ) + .await +} + +async fn internal_create_document( + auth_header: Authorization, + state: AppState, + vault_id: VaultId, + relative_path: String, + created_date: DateTime, + content: Vec, ) -> Result, SyncServerError> { auth(&state, auth_header.token())?; @@ -52,7 +100,7 @@ pub async fn create_document( .await .map_err(server_error)?; - let sanitized_relative_path = sanitize_path(&request.relative_path); + let sanitized_relative_path = sanitize_path(&relative_path); let maybe_existing_version = state .database @@ -61,12 +109,8 @@ pub async fn create_document( .map_err(server_error)? .and_then(|doc| if doc.is_deleted { None } else { Some(doc) }); - let content_bytes = base64_to_bytes(&request.content_base64) - .context("Failed to decode base64 content in request") - .map_err(client_error)?; - let response = if let Some(existing_version) = maybe_existing_version { - if content_bytes == existing_version.content { + if content == existing_version.content { info!( "Content of the new version is the same as the existing version. Not creating a \ new version." @@ -86,7 +130,7 @@ pub async fn create_document( let merged_content = merge( &[], // the empty string is the first common parent of the two documents, &existing_version.content, - &content_bytes, + &content, ); let new_version = StoredDocumentVersion { @@ -95,7 +139,7 @@ pub async fn create_document( relative_path: sanitized_relative_path, document_id: existing_version.document_id, content: merged_content, - created_date: request.created_date, + created_date, updated_date: chrono::Utc::now(), is_deleted: false, }; @@ -113,8 +157,8 @@ pub async fn create_document( vault_update_id: last_update_id + 1, document_id: uuid::Uuid::new_v4(), relative_path: sanitized_relative_path, - content: content_bytes, - created_date: request.created_date, + content, + created_date, updated_date: chrono::Utc::now(), is_deleted: false, }; diff --git a/backend/sync_server/src/server/requests.rs b/backend/sync_server/src/server/requests.rs index 9c52bd9c..1720f96f 100644 --- a/backend/sync_server/src/server/requests.rs +++ b/backend/sync_server/src/server/requests.rs @@ -1,3 +1,6 @@ +use aide_axum_typed_multipart::FieldData; +use axum::body::Bytes; +use axum_typed_multipart::TryFromMultipart; use chrono::{DateTime, Utc}; use schemars::JsonSchema; use serde::{self, Deserialize}; @@ -12,6 +15,14 @@ pub struct CreateDocumentVersion { pub content_base64: String, } +#[derive(Debug, TryFromMultipart, JsonSchema)] +pub struct CreateDocumentVersionMultipart { + pub relative_path: String, + pub created_date: DateTime, + #[form_data(limit = "unlimited")] + pub content: FieldData, +} + #[derive(Debug, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct UpdateDocumentVersion { @@ -21,6 +32,16 @@ pub struct UpdateDocumentVersion { pub content_base64: String, } +#[derive(Debug, TryFromMultipart, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct UpdateDocumentVersionMultipart { + pub parent_version_id: VaultUpdateId, + pub relative_path: String, + pub created_date: DateTime, + #[form_data(limit = "unlimited")] + pub content: FieldData, +} + #[derive(Debug, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct DeleteDocumentVersion { diff --git a/backend/sync_server/src/server/update_document.rs b/backend/sync_server/src/server/update_document.rs index 31ab4320..073744d4 100644 --- a/backend/sync_server/src/server/update_document.rs +++ b/backend/sync_server/src/server/update_document.rs @@ -1,24 +1,26 @@ -use anyhow::{anyhow, Context as _}; -use axum::{ - extract::{Path, State}, - Json, -}; +use aide_axum_typed_multipart::TypedMultipart; +use anyhow::{Context as _, anyhow}; +use axum::extract::{Path, State}; use axum_extra::{ - headers::{authorization::Bearer, Authorization}, TypedHeader, + headers::{Authorization, authorization::Bearer}, }; +use axum_jsonschema::Json; +use chrono::{DateTime, Utc}; use log::info; use schemars::JsonSchema; use serde::Deserialize; use sync_lib::{base64_to_bytes, merge}; use super::{ - app_state::AppState, auth::auth, requests::UpdateDocumentVersion, + app_state::AppState, + auth::auth, + requests::{UpdateDocumentVersion, UpdateDocumentVersionMultipart}, responses::DocumentUpdateResponse, }; use crate::{ - database::models::{DocumentId, StoredDocumentVersion, VaultId}, - errors::{client_error, not_found_error, server_error, SyncServerError}, + database::models::{DocumentId, StoredDocumentVersion, VaultId, VaultUpdateId}, + errors::{SyncServerError, client_error, not_found_error, server_error}, utils::sanitize_path, }; @@ -30,7 +32,32 @@ pub struct PathParams { } #[axum::debug_handler] -pub async fn update_document( +pub async fn update_document_multipart( + TypedHeader(auth_header): TypedHeader>, + Path(PathParams { + vault_id, + document_id, + }): Path, + State(state): State, + TypedMultipart(axum_typed_multipart::TypedMultipart(request)): TypedMultipart< + UpdateDocumentVersionMultipart, + >, +) -> Result, SyncServerError> { + internal_update_document( + auth_header, + state, + vault_id, + document_id, + request.parent_version_id, + request.relative_path, + request.created_date, + request.content.contents.to_vec(), + ) + .await +} + +#[axum::debug_handler] +pub async fn update_document_json( TypedHeader(auth_header): TypedHeader>, Path(PathParams { vault_id, @@ -38,20 +65,48 @@ pub async fn update_document( }): Path, State(state): State, Json(request): Json, +) -> Result, SyncServerError> { + let content_bytes = base64_to_bytes(&request.content_base64) + .context("Failed to decode base64 content in request") + .map_err(client_error)?; + + internal_update_document( + auth_header, + state, + vault_id, + document_id, + request.parent_version_id, + request.relative_path, + request.created_date, + content_bytes, + ) + .await +} + +#[allow(clippy::too_many_arguments)] +async fn internal_update_document( + auth_header: Authorization, + state: AppState, + vault_id: VaultId, + document_id: DocumentId, + parent_version_id: VaultUpdateId, + relative_path: String, + created_date: DateTime, + content: Vec, ) -> Result, SyncServerError> { auth(&state, auth_header.token())?; // No need for a transaction as document versions are immutable let parent_document = state .database - .get_document_version(&vault_id, request.parent_version_id, None) + .get_document_version(&vault_id, parent_version_id, None) .await .map_err(server_error)? .map_or_else( || { Err(not_found_error(anyhow!( "Parent version with id `{}` not found", - request.parent_version_id + parent_version_id ))) }, Ok, @@ -83,15 +138,10 @@ pub async fn update_document( Ok, )?; - let content_bytes = base64_to_bytes(&request.content_base64) - .context("Failed to decode base64 content in request") - .map_err(client_error)?; - - let sanitized_relative_path = sanitize_path(&request.relative_path); + let sanitized_relative_path = sanitize_path(&relative_path); // Return the latest version if the content and path are the same as the latest // version - if content_bytes == latest_version.content - && sanitized_relative_path == latest_version.relative_path + if content == latest_version.content && sanitized_relative_path == latest_version.relative_path { info!("Document content is the same as the latest version, skipping update"); transaction @@ -105,12 +155,8 @@ pub async fn update_document( ))); } - let merged_content = merge( - &parent_document.content, - &latest_version.content, - &content_bytes, - ); - let is_different_from_request_content = merged_content != content_bytes; + let merged_content = merge(&parent_document.content, &latest_version.content, &content); + let is_different_from_request_content = merged_content != content; // We can only update the relative path if we're the first one to do so let new_relative_path = if parent_document.relative_path == latest_version.relative_path { @@ -125,7 +171,7 @@ pub async fn update_document( vault_update_id: last_update_id + 1, relative_path: new_relative_path, content: merged_content, - created_date: request.created_date, + created_date, updated_date: chrono::Utc::now(), is_deleted: latest_version.is_deleted, };