split: server database (app_state, migrations, models)
src/app_state.rs, src/app_state/database.rs (large schema/query rewrite), two new migrations (add_idempotency_key, add_creation_vault_update_id), and src/app_state/database/models.rs.
This commit is contained in:
parent
a9ce09b59d
commit
2d5edc6ec5
5 changed files with 827 additions and 178 deletions
|
|
@ -2,6 +2,8 @@ pub mod cursors;
|
||||||
pub mod database;
|
pub mod database;
|
||||||
pub mod websocket;
|
pub mod websocket;
|
||||||
|
|
||||||
|
use std::sync::{Arc, atomic::AtomicUsize};
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use cursors::Cursors;
|
use cursors::Cursors;
|
||||||
use database::Database;
|
use database::Database;
|
||||||
|
|
@ -15,21 +17,42 @@ pub struct AppState {
|
||||||
pub database: Database,
|
pub database: Database,
|
||||||
pub cursors: Cursors,
|
pub cursors: Cursors,
|
||||||
pub broadcasts: Broadcasts,
|
pub broadcasts: Broadcasts,
|
||||||
|
/// Tracks WebSocket connections that have upgraded but not yet completed
|
||||||
|
/// the authentication handshake
|
||||||
|
pub pending_ws_connections: Arc<AtomicUsize>,
|
||||||
|
/// Send on this channel to stop background tasks (cursor cleanup,
|
||||||
|
/// idle-pool cleanup)
|
||||||
|
shutdown_tx: Arc<tokio::sync::watch::Sender<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AppState {
|
impl AppState {
|
||||||
pub async fn try_new(config: Config) -> Result<Self> {
|
pub async fn try_new(config: Config) -> Result<Self> {
|
||||||
|
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(());
|
||||||
|
|
||||||
let broadcasts = Broadcasts::new(&config.server);
|
let broadcasts = Broadcasts::new(&config.server);
|
||||||
let database = Database::try_new(&config.database, &broadcasts).await?;
|
let database =
|
||||||
|
Database::try_new(&config.database, &broadcasts, shutdown_rx.clone()).await?;
|
||||||
let cursors: Cursors = Cursors::new(&config.database, &broadcasts);
|
let cursors: Cursors = Cursors::new(&config.database, &broadcasts);
|
||||||
|
|
||||||
Cursors::start_background_task(cursors.clone());
|
Cursors::start_background_task(cursors.clone(), shutdown_rx);
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
config,
|
config,
|
||||||
database,
|
database,
|
||||||
cursors,
|
cursors,
|
||||||
broadcasts,
|
broadcasts,
|
||||||
|
pending_ws_connections: Arc::new(AtomicUsize::new(0)),
|
||||||
|
shutdown_tx: Arc::new(shutdown_tx),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Signal all background tasks (idle pool cleanup, cursor cleanup) to stop
|
||||||
|
pub fn shutdown(&self) {
|
||||||
|
let _ = self.shutdown_tx.send(());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a receiver to be notified when shutdown is triggered
|
||||||
|
pub fn subscribe_shutdown(&self) -> tokio::sync::watch::Receiver<()> {
|
||||||
|
self.shutdown_tx.subscribe()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load diff
|
|
@ -0,0 +1,2 @@
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_documents_document_id
|
||||||
|
ON documents (document_id, vault_update_id);
|
||||||
|
|
@ -0,0 +1,20 @@
|
||||||
|
ALTER TABLE documents ADD COLUMN creation_vault_update_id INTEGER NOT NULL DEFAULT 0;
|
||||||
|
|
||||||
|
UPDATE documents
|
||||||
|
SET creation_vault_update_id = (
|
||||||
|
SELECT MIN(d2.vault_update_id)
|
||||||
|
FROM documents d2
|
||||||
|
WHERE d2.document_id = documents.document_id
|
||||||
|
);
|
||||||
|
|
||||||
|
DROP VIEW latest_document_versions;
|
||||||
|
|
||||||
|
CREATE VIEW IF NOT EXISTS latest_document_versions AS --recreate view as it now includes one more field
|
||||||
|
SELECT d.*
|
||||||
|
FROM documents d
|
||||||
|
INNER JOIN (
|
||||||
|
SELECT MAX(vault_update_id) AS max_version_id
|
||||||
|
FROM documents
|
||||||
|
GROUP BY document_id
|
||||||
|
) max_versions
|
||||||
|
ON d.vault_update_id = max_versions.max_version_id;
|
||||||
|
|
@ -13,6 +13,7 @@ pub type DeviceId = String;
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct StoredDocumentVersion {
|
pub struct StoredDocumentVersion {
|
||||||
pub vault_update_id: VaultUpdateId,
|
pub vault_update_id: VaultUpdateId,
|
||||||
|
pub creation_vault_update_id: VaultUpdateId,
|
||||||
pub document_id: DocumentId,
|
pub document_id: DocumentId,
|
||||||
pub relative_path: String,
|
pub relative_path: String,
|
||||||
pub updated_date: DateTime<Utc>,
|
pub updated_date: DateTime<Utc>,
|
||||||
|
|
@ -33,7 +34,7 @@ impl PartialEq<Self> for StoredDocumentVersion {
|
||||||
#[derive(TS, Debug, Clone, Serialize)]
|
#[derive(TS, Debug, Clone, Serialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct DocumentVersionWithoutContent {
|
pub struct DocumentVersionWithoutContent {
|
||||||
#[ts(as = "i32")]
|
#[ts(type = "number")]
|
||||||
pub vault_update_id: VaultUpdateId,
|
pub vault_update_id: VaultUpdateId,
|
||||||
|
|
||||||
pub document_id: DocumentId,
|
pub document_id: DocumentId,
|
||||||
|
|
@ -43,12 +44,16 @@ pub struct DocumentVersionWithoutContent {
|
||||||
pub user_id: UserId,
|
pub user_id: UserId,
|
||||||
pub device_id: DeviceId,
|
pub device_id: DeviceId,
|
||||||
|
|
||||||
#[ts(as = "i32")]
|
#[ts(type = "number")]
|
||||||
pub content_size: u64,
|
pub content_size: u64,
|
||||||
|
|
||||||
|
/// True iff this is the first version of the document
|
||||||
|
pub is_new_file: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<StoredDocumentVersion> for DocumentVersionWithoutContent {
|
impl From<StoredDocumentVersion> for DocumentVersionWithoutContent {
|
||||||
fn from(value: StoredDocumentVersion) -> Self {
|
fn from(value: StoredDocumentVersion) -> Self {
|
||||||
|
let is_new_file = value.creation_vault_update_id == value.vault_update_id;
|
||||||
Self {
|
Self {
|
||||||
vault_update_id: value.vault_update_id,
|
vault_update_id: value.vault_update_id,
|
||||||
document_id: value.document_id,
|
document_id: value.document_id,
|
||||||
|
|
@ -58,6 +63,7 @@ impl From<StoredDocumentVersion> for DocumentVersionWithoutContent {
|
||||||
user_id: value.user_id,
|
user_id: value.user_id,
|
||||||
device_id: value.device_id,
|
device_id: value.device_id,
|
||||||
content_size: value.content.len() as u64,
|
content_size: value.content.len() as u64,
|
||||||
|
is_new_file,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -65,7 +71,7 @@ impl From<StoredDocumentVersion> for DocumentVersionWithoutContent {
|
||||||
#[derive(TS, Debug, Clone, Serialize)]
|
#[derive(TS, Debug, Clone, Serialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct DocumentVersion {
|
pub struct DocumentVersion {
|
||||||
#[ts(as = "i32")]
|
#[ts(type = "number")]
|
||||||
pub vault_update_id: VaultUpdateId,
|
pub vault_update_id: VaultUpdateId,
|
||||||
|
|
||||||
pub document_id: DocumentId,
|
pub document_id: DocumentId,
|
||||||
|
|
@ -77,6 +83,25 @@ pub struct DocumentVersion {
|
||||||
pub device_id: DeviceId,
|
pub device_id: DeviceId,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Row struct for vault history queries (used by `sqlx::query_as!`)
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct VaultHistoryRow {
|
||||||
|
pub vault_update_id: VaultUpdateId,
|
||||||
|
pub creation_vault_update_id: VaultUpdateId,
|
||||||
|
pub document_id: DocumentId,
|
||||||
|
pub relative_path: String,
|
||||||
|
pub updated_date: DateTime<Utc>,
|
||||||
|
pub is_deleted: bool,
|
||||||
|
pub user_id: String,
|
||||||
|
pub device_id: String,
|
||||||
|
pub content_size: Option<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct VaultStats {
|
||||||
|
pub created_at: Option<DateTime<Utc>>,
|
||||||
|
pub document_count: u32,
|
||||||
|
}
|
||||||
|
|
||||||
impl From<StoredDocumentVersion> for DocumentVersion {
|
impl From<StoredDocumentVersion> for DocumentVersion {
|
||||||
fn from(value: StoredDocumentVersion) -> Self {
|
fn from(value: StoredDocumentVersion) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue