1 db per vault
This commit is contained in:
parent
c62957087f
commit
c49ee759ac
16 changed files with 151 additions and 99 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -10,6 +10,7 @@ backend/target
|
||||||
frontend/*/dist
|
frontend/*/dist
|
||||||
|
|
||||||
backend/db.sqlite3*
|
backend/db.sqlite3*
|
||||||
|
backend/databases
|
||||||
backend/config.yml
|
backend/config.yml
|
||||||
|
|
||||||
*.log
|
*.log
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,7 @@ RUN apk add --no-cache curl
|
||||||
|
|
||||||
COPY --from=builder /usr/src/backend/target/x86_64-unknown-linux-musl/release/sync_server /app/sync_server
|
COPY --from=builder /usr/src/backend/target/x86_64-unknown-linux-musl/release/sync_server /app/sync_server
|
||||||
|
|
||||||
VOLUME /data
|
VOLUME /data/databases
|
||||||
EXPOSE 3000/tcp
|
EXPOSE 3000/tcp
|
||||||
|
|
||||||
WORKDIR /data
|
WORKDIR /data
|
||||||
|
|
|
||||||
|
|
@ -1,31 +1,33 @@
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::consts::{DEFAULT_MAX_CONNECTIONS, DEFAULT_SQLITE_URL};
|
use crate::consts::{DEFAULT_DATABASES_DIRECTORY_PATH, DEFAULT_MAX_CONNECTIONS};
|
||||||
|
|
||||||
#[derive(Debug, Deserialize, Serialize, Clone)]
|
#[derive(Debug, Deserialize, Serialize, Clone)]
|
||||||
pub struct DatabaseConfig {
|
pub struct DatabaseConfig {
|
||||||
#[serde(default = "default_sqlite_url")]
|
#[serde(default = "default_databases_directory_path")]
|
||||||
pub sqlite_url: String,
|
pub databases_directory_path: PathBuf,
|
||||||
|
|
||||||
#[serde(default = "default_max_connections")]
|
#[serde(default = "default_max_connections")]
|
||||||
pub max_connections: u32,
|
pub max_connections: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_sqlite_url() -> String {
|
fn default_databases_directory_path() -> PathBuf {
|
||||||
debug!("Using default sqlite url: {}", DEFAULT_SQLITE_URL);
|
debug!("Using default databases directory path: {DEFAULT_DATABASES_DIRECTORY_PATH:?}");
|
||||||
DEFAULT_SQLITE_URL.to_owned()
|
PathBuf::from(DEFAULT_DATABASES_DIRECTORY_PATH)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_max_connections() -> u32 {
|
fn default_max_connections() -> u32 {
|
||||||
debug!("Using default max connections: {}", DEFAULT_MAX_CONNECTIONS);
|
debug!("Using default max connections: {DEFAULT_MAX_CONNECTIONS}");
|
||||||
DEFAULT_MAX_CONNECTIONS
|
DEFAULT_MAX_CONNECTIONS
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for DatabaseConfig {
|
impl Default for DatabaseConfig {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
sqlite_url: default_sqlite_url(),
|
databases_directory_path: default_databases_directory_path(),
|
||||||
max_connections: default_max_connections(),
|
max_connections: default_max_connections(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,20 +15,17 @@ pub struct ServerConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_host() -> String {
|
fn default_host() -> String {
|
||||||
debug!("Using default server host: {}", DEFAULT_HOST);
|
debug!("Using default server host: {DEFAULT_HOST}");
|
||||||
DEFAULT_HOST.to_owned()
|
DEFAULT_HOST.to_owned()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_port() -> u16 {
|
fn default_port() -> u16 {
|
||||||
debug!("Using default server port: {}", DEFAULT_PORT);
|
debug!("Using default server port: {DEFAULT_PORT}");
|
||||||
DEFAULT_PORT
|
DEFAULT_PORT
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_max_body_size_mb() -> usize {
|
fn default_max_body_size_mb() -> usize {
|
||||||
debug!(
|
debug!("Using default max body size (MB): {DEFAULT_MAX_BODY_SIZE_MB}");
|
||||||
"Using default max body size (MB): {}",
|
|
||||||
DEFAULT_MAX_BODY_SIZE_MB
|
|
||||||
);
|
|
||||||
DEFAULT_MAX_BODY_SIZE_MB
|
DEFAULT_MAX_BODY_SIZE_MB
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
pub const CONFIG_PATH: &str = "config.yml";
|
pub const CONFIG_PATH: &str = "config.yml";
|
||||||
pub const DEFAULT_SQLITE_URL: &str = "db.sqlite3";
|
pub const DEFAULT_DATABASES_DIRECTORY_PATH: &str = "databases";
|
||||||
pub const DEFAULT_HOST: &str = "127.0.0.1";
|
pub const DEFAULT_HOST: &str = "127.0.0.1";
|
||||||
pub const DEFAULT_PORT: u16 = 3000;
|
pub const DEFAULT_PORT: u16 = 3000;
|
||||||
pub const DEFAULT_MAX_CONNECTIONS: u32 = 12;
|
pub const DEFAULT_MAX_CONNECTIONS: u32 = 12;
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
use core::{str::FromStr as _, time::Duration};
|
use core::time::Duration;
|
||||||
|
use std::{collections::HashMap, sync::Arc};
|
||||||
|
|
||||||
use anyhow::{Context as _, Result};
|
use anyhow::{Context as _, Result};
|
||||||
use models::{
|
use models::{
|
||||||
|
|
@ -7,20 +8,68 @@ use models::{
|
||||||
use sqlx::{sqlite::SqliteConnectOptions, types::chrono::Utc};
|
use sqlx::{sqlite::SqliteConnectOptions, types::chrono::Utc};
|
||||||
pub mod models;
|
pub mod models;
|
||||||
use sqlx::{Pool, Sqlite, sqlite::SqlitePoolOptions};
|
use sqlx::{Pool, Sqlite, sqlite::SqlitePoolOptions};
|
||||||
|
use tokio::sync::Mutex;
|
||||||
use uuid::fmt::Hyphenated;
|
use uuid::fmt::Hyphenated;
|
||||||
|
|
||||||
use crate::config::database_config::DatabaseConfig;
|
use crate::config::database_config::DatabaseConfig;
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Database {
|
pub struct Database {
|
||||||
connection_pool: Pool<Sqlite>,
|
config: DatabaseConfig,
|
||||||
|
connection_pools: Arc<Mutex<HashMap<VaultId, Pool<Sqlite>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Transaction<'a> = sqlx::Transaction<'a, Sqlite>;
|
pub type Transaction<'a> = sqlx::Transaction<'a, Sqlite>;
|
||||||
|
|
||||||
impl Database {
|
impl Database {
|
||||||
pub async fn try_new(config: &DatabaseConfig) -> Result<Self> {
|
pub async fn try_new(config: &DatabaseConfig) -> Result<Self> {
|
||||||
let connection_options = SqliteConnectOptions::from_str(&config.sqlite_url)?
|
// Create the databases directory if it doesn't exist
|
||||||
|
tokio::fs::create_dir_all(&config.databases_directory_path)
|
||||||
|
.await
|
||||||
|
.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"Failed to create databases directory: {}",
|
||||||
|
config.databases_directory_path.to_string_lossy()
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let mut connection_pools = std::collections::HashMap::new();
|
||||||
|
|
||||||
|
let mut entries = tokio::fs::read_dir(&config.databases_directory_path).await?;
|
||||||
|
while let Some(entry) = entries.next_entry().await? {
|
||||||
|
if !entry.file_name().to_string_lossy().ends_with(".sqlite") {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let vault: VaultId = entry
|
||||||
|
.file_name()
|
||||||
|
.to_string_lossy()
|
||||||
|
.trim_end_matches(".sqlite")
|
||||||
|
.to_owned();
|
||||||
|
|
||||||
|
connection_pools.insert(
|
||||||
|
vault.clone(),
|
||||||
|
Self::create_vault_database(config, &vault).await?,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
config: config.clone(),
|
||||||
|
connection_pools: Arc::new(Mutex::new(connection_pools)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_vault_database(
|
||||||
|
config: &DatabaseConfig,
|
||||||
|
vault: &VaultId,
|
||||||
|
) -> Result<Pool<Sqlite>> {
|
||||||
|
let file_name = config
|
||||||
|
.databases_directory_path
|
||||||
|
.join(format!("{vault}.sqlite"));
|
||||||
|
|
||||||
|
// Continue with database connection setup
|
||||||
|
let connection_options = SqliteConnectOptions::new()
|
||||||
|
.filename(file_name.clone())
|
||||||
.create_if_missing(true)
|
.create_if_missing(true)
|
||||||
.busy_timeout(Duration::from_secs(3600))
|
.busy_timeout(Duration::from_secs(3600))
|
||||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal);
|
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal);
|
||||||
|
|
@ -30,18 +79,11 @@ impl Database {
|
||||||
.test_before_acquire(true)
|
.test_before_acquire(true)
|
||||||
.connect_with(connection_options)
|
.connect_with(connection_options)
|
||||||
.await
|
.await
|
||||||
.with_context(|| {
|
.with_context(|| format!("Cannot open database at '{file_name:?}'"))?;
|
||||||
format!(
|
|
||||||
"Cannot connect to database with url: {}",
|
|
||||||
&config.sqlite_url
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Self::run_migrations(&pool).await?;
|
Self::run_migrations(&pool).await?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(pool)
|
||||||
connection_pool: pool,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_migrations(pool: &Pool<Sqlite>) -> Result<()> {
|
async fn run_migrations(pool: &Pool<Sqlite>) -> Result<()> {
|
||||||
|
|
@ -51,17 +93,38 @@ impl Database {
|
||||||
.context("Cannot check for pending migrations")
|
.context("Cannot check for pending migrations")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_connection_pool(&mut self, vault: &VaultId) -> Result<Pool<Sqlite>> {
|
||||||
|
let mut pools = self.connection_pools.lock().await;
|
||||||
|
if !pools.contains_key(vault) {
|
||||||
|
let pool = Self::create_vault_database(&self.config, vault).await?;
|
||||||
|
pools.insert(vault.clone(), pool);
|
||||||
|
}
|
||||||
|
|
||||||
|
let pool = pools
|
||||||
|
.get(vault)
|
||||||
|
.expect("Pool was just inserted or already exists");
|
||||||
|
|
||||||
|
Ok(pool.clone())
|
||||||
|
}
|
||||||
|
|
||||||
/// Attempting to write from this transaction might result in a
|
/// Attempting to write from this transaction might result in a
|
||||||
/// database locked error. Use this transaction for read-only operations.
|
/// database locked error. Use this transaction for read-only operations.
|
||||||
pub async fn create_readonly_transaction(&self) -> Result<Transaction<'_>> {
|
pub async fn create_readonly_transaction(
|
||||||
self.connection_pool
|
&mut self,
|
||||||
|
vault: &VaultId,
|
||||||
|
) -> Result<Transaction<'static>> {
|
||||||
|
self.get_connection_pool(vault)
|
||||||
|
.await?
|
||||||
.begin()
|
.begin()
|
||||||
.await
|
.await
|
||||||
.context("Cannot create transaction")
|
.context("Cannot create transaction")
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn create_write_transaction(&self) -> Result<Transaction<'_>> {
|
pub async fn create_write_transaction(
|
||||||
let mut transaction = self.create_readonly_transaction().await?;
|
&mut self,
|
||||||
|
vault: &VaultId,
|
||||||
|
) -> Result<Transaction<'static>> {
|
||||||
|
let mut transaction = self.create_readonly_transaction(vault).await?;
|
||||||
|
|
||||||
// sqlx doesn't support immediate transactions for sqlite: https://github.com/launchbadge/sqlx/issues/481
|
// sqlx doesn't support immediate transactions for sqlite: https://github.com/launchbadge/sqlx/issues/481
|
||||||
sqlx::query!("END; BEGIN IMMEDIATE;")
|
sqlx::query!("END; BEGIN IMMEDIATE;")
|
||||||
|
|
@ -73,7 +136,7 @@ impl Database {
|
||||||
|
|
||||||
/// Return the latest state of all documents in the vault
|
/// Return the latest state of all documents in the vault
|
||||||
pub async fn get_latest_documents(
|
pub async fn get_latest_documents(
|
||||||
&self,
|
&mut self,
|
||||||
vault: &VaultId,
|
vault: &VaultId,
|
||||||
transaction: Option<&mut Transaction<'_>>,
|
transaction: Option<&mut Transaction<'_>>,
|
||||||
) -> Result<Vec<DocumentVersionWithoutContent>> {
|
) -> Result<Vec<DocumentVersionWithoutContent>> {
|
||||||
|
|
@ -81,23 +144,22 @@ impl Database {
|
||||||
DocumentVersionWithoutContent,
|
DocumentVersionWithoutContent,
|
||||||
r#"
|
r#"
|
||||||
select
|
select
|
||||||
vault_id,
|
|
||||||
vault_update_id,
|
vault_update_id,
|
||||||
document_id as "document_id: Hyphenated",
|
document_id as "document_id: Hyphenated",
|
||||||
relative_path,
|
relative_path,
|
||||||
updated_date as "updated_date: chrono::DateTime<Utc>",
|
updated_date as "updated_date: chrono::DateTime<Utc>",
|
||||||
is_deleted
|
is_deleted
|
||||||
from latest_document_versions
|
from latest_document_versions
|
||||||
where vault_id = ?
|
|
||||||
order by vault_update_id desc
|
order by vault_update_id desc
|
||||||
"#,
|
"#,
|
||||||
vault,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
if let Some(transaction) = transaction {
|
if let Some(transaction) = transaction {
|
||||||
query.fetch_all(&mut **transaction).await
|
query.fetch_all(&mut **transaction).await
|
||||||
} else {
|
} else {
|
||||||
query.fetch_all(&self.connection_pool).await
|
query
|
||||||
|
.fetch_all(&self.get_connection_pool(vault).await?)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
.context("Cannot fetch latest documents")
|
.context("Cannot fetch latest documents")
|
||||||
}
|
}
|
||||||
|
|
@ -105,7 +167,7 @@ impl Database {
|
||||||
/// Return the latest state of all documents (including deleted) in the
|
/// Return the latest state of all documents (including deleted) in the
|
||||||
/// vault which have changed since the given update id
|
/// vault which have changed since the given update id
|
||||||
pub async fn get_latest_documents_since(
|
pub async fn get_latest_documents_since(
|
||||||
&self,
|
&mut self,
|
||||||
vault: &VaultId,
|
vault: &VaultId,
|
||||||
vault_update_id: VaultUpdateId,
|
vault_update_id: VaultUpdateId,
|
||||||
transaction: Option<&mut Transaction<'_>>,
|
transaction: Option<&mut Transaction<'_>>,
|
||||||
|
|
@ -114,24 +176,24 @@ impl Database {
|
||||||
DocumentVersionWithoutContent,
|
DocumentVersionWithoutContent,
|
||||||
r#"
|
r#"
|
||||||
select
|
select
|
||||||
vault_id,
|
|
||||||
vault_update_id,
|
vault_update_id,
|
||||||
document_id as "document_id: Hyphenated",
|
document_id as "document_id: Hyphenated",
|
||||||
relative_path,
|
relative_path,
|
||||||
updated_date as "updated_date: chrono::DateTime<Utc>",
|
updated_date as "updated_date: chrono::DateTime<Utc>",
|
||||||
is_deleted
|
is_deleted
|
||||||
from latest_document_versions
|
from latest_document_versions
|
||||||
where vault_id = ? and vault_update_id > ?
|
where vault_update_id > ?
|
||||||
order by vault_update_id desc
|
order by vault_update_id desc
|
||||||
"#,
|
"#,
|
||||||
vault,
|
|
||||||
vault_update_id
|
vault_update_id
|
||||||
);
|
);
|
||||||
|
|
||||||
if let Some(transaction) = transaction {
|
if let Some(transaction) = transaction {
|
||||||
query.fetch_all(&mut **transaction).await
|
query.fetch_all(&mut **transaction).await
|
||||||
} else {
|
} else {
|
||||||
query.fetch_all(&self.connection_pool).await
|
query
|
||||||
|
.fetch_all(&self.get_connection_pool(vault).await?)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
.with_context(|| {
|
.with_context(|| {
|
||||||
format!("Cannot fetch latest documents since vault_update_id {vault_update_id}")
|
format!("Cannot fetch latest documents since vault_update_id {vault_update_id}")
|
||||||
|
|
@ -139,7 +201,7 @@ impl Database {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_max_update_id_in_vault(
|
pub async fn get_max_update_id_in_vault(
|
||||||
&self,
|
&mut self,
|
||||||
vault: &VaultId,
|
vault: &VaultId,
|
||||||
transaction: Option<&mut Transaction<'_>>,
|
transaction: Option<&mut Transaction<'_>>,
|
||||||
) -> Result<i64> {
|
) -> Result<i64> {
|
||||||
|
|
@ -147,22 +209,22 @@ impl Database {
|
||||||
r#"
|
r#"
|
||||||
select coalesce(max(vault_update_id), 0) as max_vault_update_id
|
select coalesce(max(vault_update_id), 0) as max_vault_update_id
|
||||||
from documents
|
from documents
|
||||||
where vault_id = ?
|
|
||||||
"#,
|
"#,
|
||||||
vault
|
|
||||||
);
|
);
|
||||||
|
|
||||||
if let Some(transaction) = transaction {
|
if let Some(transaction) = transaction {
|
||||||
query.fetch_one(&mut **transaction).await
|
query.fetch_one(&mut **transaction).await
|
||||||
} else {
|
} else {
|
||||||
query.fetch_one(&self.connection_pool).await
|
query
|
||||||
|
.fetch_one(&self.get_connection_pool(vault).await?)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
.map(|row| row.max_vault_update_id)
|
.map(|row| row.max_vault_update_id)
|
||||||
.context("Cannot fetch max update id in vault")
|
.context("Cannot fetch max update id in vault")
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_latest_document_by_path(
|
pub async fn get_latest_document_by_path(
|
||||||
&self,
|
&mut self,
|
||||||
vault: &VaultId,
|
vault: &VaultId,
|
||||||
relative_path: &str,
|
relative_path: &str,
|
||||||
transaction: Option<&mut Transaction<'_>>,
|
transaction: Option<&mut Transaction<'_>>,
|
||||||
|
|
@ -171,7 +233,6 @@ impl Database {
|
||||||
StoredDocumentVersion,
|
StoredDocumentVersion,
|
||||||
r#"
|
r#"
|
||||||
select
|
select
|
||||||
vault_id,
|
|
||||||
vault_update_id,
|
vault_update_id,
|
||||||
document_id as "document_id: Hyphenated",
|
document_id as "document_id: Hyphenated",
|
||||||
relative_path,
|
relative_path,
|
||||||
|
|
@ -179,26 +240,27 @@ impl Database {
|
||||||
content,
|
content,
|
||||||
is_deleted
|
is_deleted
|
||||||
from latest_document_versions
|
from latest_document_versions
|
||||||
where vault_id = ? and relative_path = ?
|
where relative_path = ?
|
||||||
order by vault_update_id desc -- `latest_document_versions` only contains a single latest version of each document, however,
|
order by vault_update_id desc -- `latest_document_versions` only contains a single latest version of each document, however,
|
||||||
-- multiple documents can have the same `relative_path`, if they have been deleted. That's
|
-- multiple documents can have the same `relative_path`, if they have been deleted. That's
|
||||||
-- why we only care about the latest version of the document with the given relative path.
|
-- why we only care about the latest version of the document with the given relative path.
|
||||||
limit 1
|
limit 1
|
||||||
"#,
|
"#,
|
||||||
vault,
|
|
||||||
relative_path
|
relative_path
|
||||||
);
|
);
|
||||||
|
|
||||||
if let Some(transaction) = transaction {
|
if let Some(transaction) = transaction {
|
||||||
query.fetch_optional(&mut **transaction).await
|
query.fetch_optional(&mut **transaction).await
|
||||||
} else {
|
} else {
|
||||||
query.fetch_optional(&self.connection_pool).await
|
query
|
||||||
|
.fetch_optional(&self.get_connection_pool(vault).await?)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
.context("Cannot fetch latest document version")
|
.context("Cannot fetch latest document version")
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_latest_document(
|
pub async fn get_latest_document(
|
||||||
&self,
|
&mut self,
|
||||||
vault: &VaultId,
|
vault: &VaultId,
|
||||||
document_id: &DocumentId,
|
document_id: &DocumentId,
|
||||||
transaction: Option<&mut Transaction<'_>>,
|
transaction: Option<&mut Transaction<'_>>,
|
||||||
|
|
@ -208,7 +270,6 @@ impl Database {
|
||||||
StoredDocumentVersion,
|
StoredDocumentVersion,
|
||||||
r#"
|
r#"
|
||||||
select
|
select
|
||||||
vault_id,
|
|
||||||
vault_update_id,
|
vault_update_id,
|
||||||
document_id as "document_id: Hyphenated",
|
document_id as "document_id: Hyphenated",
|
||||||
relative_path,
|
relative_path,
|
||||||
|
|
@ -216,22 +277,23 @@ impl Database {
|
||||||
content,
|
content,
|
||||||
is_deleted
|
is_deleted
|
||||||
from latest_document_versions
|
from latest_document_versions
|
||||||
where vault_id = ? and document_id = ?
|
where document_id = ?
|
||||||
"#,
|
"#,
|
||||||
vault,
|
|
||||||
document_id
|
document_id
|
||||||
);
|
);
|
||||||
|
|
||||||
if let Some(transaction) = transaction {
|
if let Some(transaction) = transaction {
|
||||||
query.fetch_optional(&mut **transaction).await
|
query.fetch_optional(&mut **transaction).await
|
||||||
} else {
|
} else {
|
||||||
query.fetch_optional(&self.connection_pool).await
|
query
|
||||||
|
.fetch_optional(&self.get_connection_pool(vault).await?)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
.context("Cannot fetch latest document version")
|
.context("Cannot fetch latest document version")
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_document_version(
|
pub async fn get_document_version(
|
||||||
&self,
|
&mut self,
|
||||||
vault: &VaultId,
|
vault: &VaultId,
|
||||||
vault_update_id: VaultUpdateId,
|
vault_update_id: VaultUpdateId,
|
||||||
transaction: Option<&mut Transaction<'_>>,
|
transaction: Option<&mut Transaction<'_>>,
|
||||||
|
|
@ -240,7 +302,6 @@ impl Database {
|
||||||
StoredDocumentVersion,
|
StoredDocumentVersion,
|
||||||
r#"
|
r#"
|
||||||
select
|
select
|
||||||
vault_id,
|
|
||||||
vault_update_id,
|
vault_update_id,
|
||||||
document_id as "document_id: Hyphenated",
|
document_id as "document_id: Hyphenated",
|
||||||
relative_path,
|
relative_path,
|
||||||
|
|
@ -248,21 +309,23 @@ impl Database {
|
||||||
content,
|
content,
|
||||||
is_deleted
|
is_deleted
|
||||||
from documents
|
from documents
|
||||||
where vault_id = ? and vault_update_id = ?"#,
|
where vault_update_id = ?"#,
|
||||||
vault,
|
|
||||||
vault_update_id
|
vault_update_id
|
||||||
);
|
);
|
||||||
|
|
||||||
if let Some(transaction) = transaction {
|
if let Some(transaction) = transaction {
|
||||||
query.fetch_optional(&mut **transaction).await
|
query.fetch_optional(&mut **transaction).await
|
||||||
} else {
|
} else {
|
||||||
query.fetch_optional(&self.connection_pool).await
|
query
|
||||||
|
.fetch_optional(&self.get_connection_pool(vault).await?)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
.context("Cannot fetch document version")
|
.context("Cannot fetch document version")
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn insert_document_version(
|
pub async fn insert_document_version(
|
||||||
&self,
|
&mut self,
|
||||||
|
vault: &VaultId,
|
||||||
version: &StoredDocumentVersion,
|
version: &StoredDocumentVersion,
|
||||||
transaction: Option<&mut Transaction<'_>>,
|
transaction: Option<&mut Transaction<'_>>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
|
@ -270,7 +333,6 @@ impl Database {
|
||||||
let query = sqlx::query!(
|
let query = sqlx::query!(
|
||||||
r#"
|
r#"
|
||||||
insert into documents (
|
insert into documents (
|
||||||
vault_id,
|
|
||||||
vault_update_id,
|
vault_update_id,
|
||||||
document_id,
|
document_id,
|
||||||
relative_path,
|
relative_path,
|
||||||
|
|
@ -278,9 +340,8 @@ impl Database {
|
||||||
content,
|
content,
|
||||||
is_deleted
|
is_deleted
|
||||||
)
|
)
|
||||||
values (?, ?, ?, ?, ?, ?, ?)
|
values (?, ?, ?, ?, ?, ?)
|
||||||
"#,
|
"#,
|
||||||
version.vault_id,
|
|
||||||
version.vault_update_id,
|
version.vault_update_id,
|
||||||
document_id,
|
document_id,
|
||||||
version.relative_path,
|
version.relative_path,
|
||||||
|
|
@ -292,7 +353,7 @@ impl Database {
|
||||||
if let Some(transaction) = transaction {
|
if let Some(transaction) = transaction {
|
||||||
query.execute(&mut **transaction).await
|
query.execute(&mut **transaction).await
|
||||||
} else {
|
} else {
|
||||||
query.execute(&self.connection_pool).await
|
query.execute(&self.get_connection_pool(vault).await?).await
|
||||||
}
|
}
|
||||||
.context("Cannot insert document version")?;
|
.context("Cannot insert document version")?;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,24 +1,21 @@
|
||||||
CREATE TABLE IF NOT EXISTS documents (
|
CREATE TABLE IF NOT EXISTS documents (
|
||||||
vault_id TEXT NOT NULL,
|
vault_update_id INTEGER NOT NULL PRIMARY KEY,
|
||||||
vault_update_id INTEGER NOT NULL,
|
|
||||||
document_id TEXT NOT NULL,
|
document_id TEXT NOT NULL,
|
||||||
relative_path TEXT NOT NULL,
|
relative_path TEXT NOT NULL,
|
||||||
updated_date TIMESTAMP NOT NULL,
|
updated_date TIMESTAMP NOT NULL,
|
||||||
content BLOB NOT NULL,
|
content BLOB NOT NULL,
|
||||||
is_deleted BOOLEAN NOT NULL,
|
is_deleted BOOLEAN NOT NULL
|
||||||
PRIMARY KEY (vault_id, vault_update_id)
|
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE VIEW IF NOT EXISTS latest_document_versions AS
|
CREATE VIEW IF NOT EXISTS latest_document_versions AS
|
||||||
SELECT d.*
|
SELECT d.*
|
||||||
FROM documents d
|
FROM documents d
|
||||||
INNER JOIN (
|
INNER JOIN (
|
||||||
SELECT vault_id, MAX(vault_update_id) AS max_version_id
|
SELECT MAX(vault_update_id) AS max_version_id
|
||||||
FROM documents
|
FROM documents
|
||||||
GROUP BY vault_id, document_id
|
GROUP BY document_id
|
||||||
) max_versions
|
) max_versions
|
||||||
ON d.vault_id = max_versions.vault_id
|
ON d.vault_update_id = max_versions.max_version_id;
|
||||||
AND d.vault_update_id = max_versions.max_version_id;
|
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_documents_vault_id_relative_path
|
CREATE INDEX IF NOT EXISTS idx_documents_vault_id_relative_path
|
||||||
ON documents (vault_id, relative_path);
|
ON documents (relative_path);
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,6 @@ pub type DocumentId = uuid::Uuid;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct StoredDocumentVersion {
|
pub struct StoredDocumentVersion {
|
||||||
pub vault_id: VaultId,
|
|
||||||
pub vault_update_id: VaultUpdateId,
|
pub vault_update_id: VaultUpdateId,
|
||||||
pub document_id: DocumentId,
|
pub document_id: DocumentId,
|
||||||
pub relative_path: String,
|
pub relative_path: String,
|
||||||
|
|
@ -19,15 +18,12 @@ pub struct StoredDocumentVersion {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PartialEq<Self> for StoredDocumentVersion {
|
impl PartialEq<Self> for StoredDocumentVersion {
|
||||||
fn eq(&self, other: &Self) -> bool {
|
fn eq(&self, other: &Self) -> bool { self.vault_update_id == other.vault_update_id }
|
||||||
self.vault_id == other.vault_id && self.vault_update_id == other.vault_update_id
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, JsonSchema)]
|
#[derive(Debug, Clone, Serialize, JsonSchema)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct DocumentVersionWithoutContent {
|
pub struct DocumentVersionWithoutContent {
|
||||||
pub vault_id: VaultId,
|
|
||||||
pub vault_update_id: VaultUpdateId,
|
pub vault_update_id: VaultUpdateId,
|
||||||
pub document_id: DocumentId,
|
pub document_id: DocumentId,
|
||||||
pub relative_path: String,
|
pub relative_path: String,
|
||||||
|
|
@ -38,7 +34,6 @@ pub struct DocumentVersionWithoutContent {
|
||||||
impl From<StoredDocumentVersion> for DocumentVersionWithoutContent {
|
impl From<StoredDocumentVersion> for DocumentVersionWithoutContent {
|
||||||
fn from(value: StoredDocumentVersion) -> Self {
|
fn from(value: StoredDocumentVersion) -> Self {
|
||||||
Self {
|
Self {
|
||||||
vault_id: value.vault_id,
|
|
||||||
vault_update_id: value.vault_update_id,
|
vault_update_id: value.vault_update_id,
|
||||||
document_id: value.document_id,
|
document_id: value.document_id,
|
||||||
relative_path: value.relative_path,
|
relative_path: value.relative_path,
|
||||||
|
|
@ -51,7 +46,6 @@ impl From<StoredDocumentVersion> for DocumentVersionWithoutContent {
|
||||||
#[derive(Debug, Clone, Serialize, JsonSchema)]
|
#[derive(Debug, Clone, Serialize, JsonSchema)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct DocumentVersion {
|
pub struct DocumentVersion {
|
||||||
pub vault_id: VaultId,
|
|
||||||
pub vault_update_id: VaultUpdateId,
|
pub vault_update_id: VaultUpdateId,
|
||||||
pub document_id: DocumentId,
|
pub document_id: DocumentId,
|
||||||
pub relative_path: String,
|
pub relative_path: String,
|
||||||
|
|
@ -63,7 +57,6 @@ pub struct DocumentVersion {
|
||||||
impl From<StoredDocumentVersion> for DocumentVersion {
|
impl From<StoredDocumentVersion> for DocumentVersion {
|
||||||
fn from(value: StoredDocumentVersion) -> Self {
|
fn from(value: StoredDocumentVersion) -> Self {
|
||||||
Self {
|
Self {
|
||||||
vault_id: value.vault_id,
|
|
||||||
vault_update_id: value.vault_update_id,
|
vault_update_id: value.vault_update_id,
|
||||||
document_id: value.document_id,
|
document_id: value.document_id,
|
||||||
relative_path: value.relative_path,
|
relative_path: value.relative_path,
|
||||||
|
|
|
||||||
|
|
@ -77,7 +77,7 @@ pub async fn create_document_json(
|
||||||
|
|
||||||
async fn internal_create_document(
|
async fn internal_create_document(
|
||||||
auth_header: Authorization<Bearer>,
|
auth_header: Authorization<Bearer>,
|
||||||
state: AppState,
|
mut state: AppState,
|
||||||
vault_id: VaultId,
|
vault_id: VaultId,
|
||||||
document_id: Option<DocumentId>,
|
document_id: Option<DocumentId>,
|
||||||
relative_path: String,
|
relative_path: String,
|
||||||
|
|
@ -87,7 +87,7 @@ async fn internal_create_document(
|
||||||
|
|
||||||
let mut transaction = state
|
let mut transaction = state
|
||||||
.database
|
.database
|
||||||
.create_write_transaction()
|
.create_write_transaction(&vault_id)
|
||||||
.await
|
.await
|
||||||
.map_err(server_error)?;
|
.map_err(server_error)?;
|
||||||
|
|
||||||
|
|
@ -119,7 +119,6 @@ async fn internal_create_document(
|
||||||
let sanitized_relative_path = sanitize_path(&relative_path);
|
let sanitized_relative_path = sanitize_path(&relative_path);
|
||||||
|
|
||||||
let new_version = StoredDocumentVersion {
|
let new_version = StoredDocumentVersion {
|
||||||
vault_id,
|
|
||||||
vault_update_id: last_update_id + 1,
|
vault_update_id: last_update_id + 1,
|
||||||
document_id,
|
document_id,
|
||||||
relative_path: sanitized_relative_path,
|
relative_path: sanitized_relative_path,
|
||||||
|
|
@ -130,7 +129,7 @@ async fn internal_create_document(
|
||||||
|
|
||||||
state
|
state
|
||||||
.database
|
.database
|
||||||
.insert_document_version(&new_version, Some(&mut transaction))
|
.insert_document_version(&vault_id, &new_version, Some(&mut transaction))
|
||||||
.await
|
.await
|
||||||
.map_err(server_error)?;
|
.map_err(server_error)?;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -29,14 +29,14 @@ pub async fn delete_document(
|
||||||
vault_id,
|
vault_id,
|
||||||
document_id,
|
document_id,
|
||||||
}): Path<PathParams>,
|
}): Path<PathParams>,
|
||||||
State(state): State<AppState>,
|
State(mut state): State<AppState>,
|
||||||
Json(request): Json<DeleteDocumentVersion>,
|
Json(request): Json<DeleteDocumentVersion>,
|
||||||
) -> Result<Json<DocumentVersionWithoutContent>, SyncServerError> {
|
) -> Result<Json<DocumentVersionWithoutContent>, SyncServerError> {
|
||||||
auth(&state, auth_header.token())?;
|
auth(&state, auth_header.token())?;
|
||||||
|
|
||||||
let mut transaction = state
|
let mut transaction = state
|
||||||
.database
|
.database
|
||||||
.create_write_transaction()
|
.create_write_transaction(&vault_id)
|
||||||
.await
|
.await
|
||||||
.map_err(server_error)?;
|
.map_err(server_error)?;
|
||||||
|
|
||||||
|
|
@ -47,7 +47,6 @@ pub async fn delete_document(
|
||||||
.map_err(server_error)?;
|
.map_err(server_error)?;
|
||||||
|
|
||||||
let new_version = StoredDocumentVersion {
|
let new_version = StoredDocumentVersion {
|
||||||
vault_id,
|
|
||||||
vault_update_id: last_update_id + 1,
|
vault_update_id: last_update_id + 1,
|
||||||
document_id,
|
document_id,
|
||||||
relative_path: sanitize_path(&request.relative_path),
|
relative_path: sanitize_path(&request.relative_path),
|
||||||
|
|
@ -58,7 +57,7 @@ pub async fn delete_document(
|
||||||
|
|
||||||
state
|
state
|
||||||
.database
|
.database
|
||||||
.insert_document_version(&new_version, Some(&mut transaction))
|
.insert_document_version(&vault_id, &new_version, Some(&mut transaction))
|
||||||
.await
|
.await
|
||||||
.map_err(server_error)?;
|
.map_err(server_error)?;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ pub async fn fetch_document_version(
|
||||||
document_id,
|
document_id,
|
||||||
vault_update_id,
|
vault_update_id,
|
||||||
}): Path<PathParams>,
|
}): Path<PathParams>,
|
||||||
State(state): State<AppState>,
|
State(mut state): State<AppState>,
|
||||||
) -> Result<Json<DocumentVersion>, SyncServerError> {
|
) -> Result<Json<DocumentVersion>, SyncServerError> {
|
||||||
auth(&state, auth_header.token())?;
|
auth(&state, auth_header.token())?;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,7 @@ pub async fn fetch_document_version_content(
|
||||||
document_id,
|
document_id,
|
||||||
vault_update_id,
|
vault_update_id,
|
||||||
}): Path<PathParams>,
|
}): Path<PathParams>,
|
||||||
State(state): State<AppState>,
|
State(mut state): State<AppState>,
|
||||||
) -> Result<Bytes, SyncServerError> {
|
) -> Result<Bytes, SyncServerError> {
|
||||||
auth(&state, auth_header.token())?;
|
auth(&state, auth_header.token())?;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ pub async fn fetch_latest_document_version(
|
||||||
vault_id,
|
vault_id,
|
||||||
document_id,
|
document_id,
|
||||||
}): Path<PathParams>,
|
}): Path<PathParams>,
|
||||||
State(state): State<AppState>,
|
State(mut state): State<AppState>,
|
||||||
) -> Result<Json<DocumentVersion>, SyncServerError> {
|
) -> Result<Json<DocumentVersion>, SyncServerError> {
|
||||||
auth(&state, auth_header.token())?;
|
auth(&state, auth_header.token())?;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ pub async fn fetch_latest_documents(
|
||||||
TypedHeader(auth_header): TypedHeader<Authorization<Bearer>>,
|
TypedHeader(auth_header): TypedHeader<Authorization<Bearer>>,
|
||||||
Path(PathParams { vault_id }): Path<PathParams>,
|
Path(PathParams { vault_id }): Path<PathParams>,
|
||||||
Query(QueryParams { since_update_id }): Query<QueryParams>,
|
Query(QueryParams { since_update_id }): Query<QueryParams>,
|
||||||
State(state): State<AppState>,
|
State(mut state): State<AppState>,
|
||||||
) -> Result<Json<FetchLatestDocumentsResponse>, SyncServerError> {
|
) -> Result<Json<FetchLatestDocumentsResponse>, SyncServerError> {
|
||||||
auth(&state, auth_header.token())?;
|
auth(&state, auth_header.token())?;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -83,7 +83,7 @@ pub async fn update_document_json(
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
async fn internal_update_document(
|
async fn internal_update_document(
|
||||||
auth_header: Authorization<Bearer>,
|
auth_header: Authorization<Bearer>,
|
||||||
state: AppState,
|
mut state: AppState,
|
||||||
vault_id: VaultId,
|
vault_id: VaultId,
|
||||||
document_id: DocumentId,
|
document_id: DocumentId,
|
||||||
parent_version_id: VaultUpdateId,
|
parent_version_id: VaultUpdateId,
|
||||||
|
|
@ -110,7 +110,7 @@ async fn internal_update_document(
|
||||||
|
|
||||||
let mut transaction = state
|
let mut transaction = state
|
||||||
.database
|
.database
|
||||||
.create_write_transaction()
|
.create_write_transaction(&vault_id)
|
||||||
.await
|
.await
|
||||||
.map_err(server_error)?;
|
.map_err(server_error)?;
|
||||||
|
|
||||||
|
|
@ -196,7 +196,6 @@ async fn internal_update_document(
|
||||||
};
|
};
|
||||||
|
|
||||||
let new_version = StoredDocumentVersion {
|
let new_version = StoredDocumentVersion {
|
||||||
vault_id,
|
|
||||||
document_id,
|
document_id,
|
||||||
vault_update_id: last_update_id + 1,
|
vault_update_id: last_update_id + 1,
|
||||||
relative_path: new_relative_path,
|
relative_path: new_relative_path,
|
||||||
|
|
@ -207,7 +206,7 @@ async fn internal_update_document(
|
||||||
|
|
||||||
state
|
state
|
||||||
.database
|
.database
|
||||||
.insert_document_version(&new_version, Some(&mut transaction))
|
.insert_document_version(&vault_id, &new_version, Some(&mut transaction))
|
||||||
.await
|
.await
|
||||||
.map_err(server_error)?;
|
.map_err(server_error)?;
|
||||||
|
|
||||||
|
|
|
||||||
4
clean-up.sh
Normal file
4
clean-up.sh
Normal file
|
|
@ -0,0 +1,4 @@
|
||||||
|
#!/bin/sh
|
||||||
|
|
||||||
|
rm -rf backend/databases
|
||||||
|
rm -rf frontend/test-client/logs
|
||||||
Loading…
Add table
Add a link
Reference in a new issue