Fix syncing when network latency is present (#4)

* WIP

* Add debug

* Dedupe inserts

* Add deterministic ordering

* Fix whitespaces

* Update insta

* Add integration test script

* Rename

* Add test

* Working for non-deletes

* omg it mostly works for deletes

* Isdeleted fix

* remove created dates

* update api

* Take document id

* No max attempt

* works

* Use string uuids

* .

* working!!!! (hopefully)

* Improve bundling

* Add module

* lint

* .

* lint

* Fix CI

* use toolchain

* clean up

* Add useSlowFileEvents

* Delete fuzz

* Fix CI

* use docker

* fix script

* clean up

* Clean up

* change node version

* Build docker image on every commit

* fix ci

* 1 db per vault

* Add scritps folder

* Bump versions

* Lint

* .

* Fix tests for real

* Style

* .

* try

* Consistent ordering

* Fix tests

* hmm

* .

* Clean up diff

* Fixes

* .

* Fix version bump

* .

* .

* .
This commit is contained in:
Andras Schmelczer 2025-03-16 20:13:49 +00:00 committed by GitHub
parent bcf48c428d
commit 8b8f1d91d9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
91 changed files with 2252 additions and 1586 deletions

35
backend/Cargo.lock generated
View file

@ -105,12 +105,6 @@ dependencies = [
"backtrace",
]
[[package]]
name = "arbitrary"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dde20b3d026af13f561bdd0f15edf01fc734f0dafcedbaf42bba506a9517f223"
[[package]]
name = "async-trait"
version = "0.1.85"
@ -381,8 +375,6 @@ version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f34d93e62b03caf570cccc334cbc6c2fceca82f39211051345108adcba3eebdc"
dependencies = [
"jobserver",
"libc",
"shlex",
]
@ -1228,15 +1220,6 @@ version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674"
[[package]]
name = "jobserver"
version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.76"
@ -1290,16 +1273,6 @@ version = "0.2.167"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09d6582e104315a817dff97f75133544b2e094ee22447d2acf4a74e189ba06fc"
[[package]]
name = "libfuzzer-sys"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b9569d2f74e257076d8c6bfa73fb505b46b851e51ddaecc825944aa3bed17fa"
dependencies = [
"arbitrary",
"cc",
]
[[package]]
name = "libm"
version = "0.2.11"
@ -1781,14 +1754,6 @@ dependencies = [
"test-case",
]
[[package]]
name = "reconcile-fuzz"
version = "0.0.30"
dependencies = [
"libfuzzer-sys",
"reconcile",
]
[[package]]
name = "redox_syscall"
version = "0.5.7"

View file

@ -2,7 +2,6 @@
resolver = "2"
members = [
"reconcile",
"fuzz",
"sync_server",
"sync_lib"
]
@ -57,3 +56,19 @@ uninlined_format_args = "warn"
unnested_or_patterns = "warn"
unused_self = "warn"
verbose_file_reads = "warn"
cast_possible_truncation = { level = "allow", priority = 1 }
doc_link_with_quotes = { level = "allow", priority = 1 }
cast_sign_loss = { level = "allow", priority = 1 }
cast_possible_wrap = { level = "allow", priority = 1 }
struct_field_names = { level = "allow", priority = 1 }
single_call_fn = { level = "allow", priority = 1 }
absolute_paths = { level = "allow", priority = 1 }
arithmetic_side_effects = { level = "allow", priority = 1 }
similar_names = { level = "allow", priority = 1 }
self_named_module_files = { level = "allow", priority = 1 }
single_char_lifetime_names = { level = "allow", priority = 1 }
missing_docs_in_private_items = { level = "allow", priority = 1 }
question_mark_used = { level = "allow", priority = 1 }
implicit_return = { level = "allow", priority = 1 }
pedantic = { level = "warn", priority = 0 }

View file

@ -3,8 +3,6 @@ FROM rust:1.83 AS builder
WORKDIR /usr/src/backend
RUN apt update && apt install -y musl-tools
RUN rustup install nightly && rustup default nightly
RUN rustup target add x86_64-unknown-linux-musl
RUN cargo install sqlx-cli
COPY . .
@ -23,7 +21,7 @@ RUN apk add --no-cache curl
COPY --from=builder /usr/src/backend/target/x86_64-unknown-linux-musl/release/sync_server /app/sync_server
VOLUME /data
VOLUME /data/databases
EXPOSE 3000/tcp
WORKDIR /data

View file

@ -1,4 +0,0 @@
target
corpus
artifacts
coverage

View file

@ -1,25 +0,0 @@
[package]
name = "reconcile-fuzz"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
repository.workspace = true
publish = false
[package.metadata]
cargo-fuzz = true
[dependencies]
libfuzzer-sys = "0.4"
reconcile = { path = "../reconcile" }
[[bin]]
name = "reconcile"
path = "fuzz_targets/reconcile.rs"
test = false
doc = false
bench = false
[lints]
workspace = true

View file

@ -1,8 +0,0 @@
#![no_main]
use libfuzzer_sys::fuzz_target;
fuzz_target!(|texts: (String, String, String)| {
let (original, left, right) = texts;
let _ = reconcile::reconcile(&original, &left, &right);
});

View file

@ -38,7 +38,7 @@ use crate::{
/// execution time permitted before it bails and falls back to an approximation.
pub fn diff<T>(old: &[Token<T>], new: &[Token<T>]) -> Vec<RawOperation<T>>
where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
let max_d = (old.len() + new.len()).div_ceil(2) + 1;
let mut vb = V::new(max_d);
@ -99,7 +99,6 @@ impl IndexMut<isize> for V {
}
}
#[inline(always)]
fn split_at(range: Range<usize>, at: usize) -> (Range<usize>, Range<usize>) {
(range.start..at, at..range.end)
}
@ -124,7 +123,7 @@ fn find_middle_snake<T>(
vb: &mut V,
) -> Option<(usize, usize)>
where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
let n = old_range.len();
let m = new_range.len();
@ -230,7 +229,7 @@ fn conquer<T>(
vb: &mut V,
result: &mut Vec<RawOperation<T>>,
) where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
// Check for common prefix
let common_prefix_len = common_prefix_len(old, old_range.clone(), new, new_range.clone());

View file

@ -3,7 +3,7 @@ use crate::tokenizer::token::Token;
#[derive(Debug, Clone, PartialEq)]
pub enum RawOperation<T>
where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
Insert(Vec<Token<T>>),
Delete(Vec<Token<T>>),
@ -12,13 +12,13 @@ where
impl<T> RawOperation<T>
where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
pub fn tokens(&self) -> &Vec<Token<T>> {
match self {
RawOperation::Insert(tokens) => tokens,
RawOperation::Delete(tokens) => tokens,
RawOperation::Equal(tokens) => tokens,
RawOperation::Insert(tokens)
| RawOperation::Delete(tokens)
| RawOperation::Equal(tokens) => tokens,
}
}

View file

@ -37,7 +37,7 @@ pub fn reconcile_with_tokenizer<F, T>(
tokenizer: &Tokenizer<T>,
) -> String
where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
let left_operations = EditedText::from_strings_with_tokenizer(original, left, tokenizer);
let right_operations = EditedText::from_strings_with_tokenizer(original, right, tokenizer);
@ -73,7 +73,8 @@ mod test {
"original_1 edit_1 original_3",
);
// One deleted a large range, the other deleted subranges and inserted as well
// One deleted a large range, the other deleted subranges and inserted as
// well
test_merge_both_ways(
"original_1 original_2 original_3 original_4 original_5",
"original_1 original_5",
@ -120,9 +121,6 @@ mod test {
"hi, my friend!",
);
// test_merge_both_ways("hello world", "world !", "hi hello world", "hi world
// !");
test_merge_both_ways(
"both delete the same word",
"both the same word",
@ -147,7 +145,33 @@ mod test {
);
}
#[ignore = "it's too slow"]
#[test]
fn test_reconcile_idempotent_inserts() {
// Both inserted the same prefix; this should get deduped
test_merge_both_ways(
"hi ",
"hi there ",
"hi there my friend ",
"hi there my friend ",
);
// The prefix of the 2nd appears on the 1st so it shouldn't get duplicated
test_merge_both_ways(
"hi ",
"hi there you ",
"hi there my friend ",
"hi there my friend you ",
);
test_merge_both_ways("a", "a b c", "a b c d", "a b c d");
test_merge_both_ways(
" |7ca2b36d-6ee7-49eb-8eb1-d77e4cc1a001| ",
" |7ca2b36d-6ee7-49eb-8eb1-d77e4cc1a001| |cd9195cc-103a-4f13-90c8-4fba0ba421ee| |d39156cc-cfd6-42a8-b70a-75020896069d| |fbad794c-9c47-41f2-a343-490284ecb5a0| |dup| ",
" |7ca2b36d-6ee7-49eb-8eb1-d77e4cc1a001| |cd9195cc-103a-4f13-90c8-4fba0ba421ee| |dup| ",
" |7ca2b36d-6ee7-49eb-8eb1-d77e4cc1a001| |cd9195cc-103a-4f13-90c8-4fba0ba421ee| |d39156cc-cfd6-42a8-b70a-75020896069d| |fbad794c-9c47-41f2-a343-490284ecb5a0| |dup| |dup| ");
}
#[test_matrix( [
"pride_and_prejudice.txt",
"romeo_and_juliet.txt",

View file

@ -25,7 +25,7 @@ use crate::{
#[derive(Debug, Clone, PartialEq, Default)]
pub struct EditedText<'a, T>
where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
text: &'a str,
operations: Vec<OrderedOperation<T>>,
@ -46,7 +46,7 @@ impl<'a> EditedText<'a, String> {
impl<'a, T> EditedText<'a, T>
where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
/// Create an `EditedText` from the given original (old) and updated (new)
/// strings. The returned `EditedText` represents the changes from the
@ -65,7 +65,6 @@ where
Self::new(
original,
// Self::cook_operations(diff),
Self::cook_operations(Self::elongate_operations(diff)).collect(),
)
}
@ -191,7 +190,7 @@ where
pub fn merge(self, other: Self) -> Self {
debug_assert_eq!(
self.text, other.text,
"EditedText-s must be derived from the same text to be mergable"
"`EditedText`-s must be derived from the same text to be mergable"
);
let mut left_merge_context = MergeContext::default();
@ -207,9 +206,21 @@ where
|(operation, _)| {
(
operation.order,
// Operations on left and right must come in the same order so that
// Operations on the left and right must come in the same order so that
// inserts can be merged with other inserts and deletes with deletes.
usize::from(matches!(operation.operation, Operation::Delete { .. })),
// Make sure that the ordering is deterministic regardless which text
// is left or right.
match &operation.operation {
Operation::Insert { text, .. } => text
.iter()
.map(super::super::tokenizer::token::Token::original)
.collect::<String>(),
Operation::Delete {
deleted_character_count,
..
} => deleted_character_count.to_string(),
},
)
},
)
@ -232,6 +243,7 @@ where
}
/// Apply the operations to the text and return the resulting text.
#[must_use]
pub fn apply(&self) -> String {
let mut builder: StringBuilder<'_> = StringBuilder::new(self.text);
@ -282,7 +294,7 @@ mod tests {
let original = "hello world! ...";
let left = "Hello world! I'm Andras.";
let right = "Hello world! How are you?";
let expected = "Hello world! I'm Andras.How are you?";
let expected = "Hello world! How are you? I'm Andras.";
let operations_1 = EditedText::from_strings(original, left);
let operations_2 = EditedText::from_strings(original, right);

View file

@ -5,7 +5,7 @@ use crate::operation_transformation::Operation;
#[derive(Clone)]
pub struct MergeContext<T>
where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
last_operation: Option<Operation<T>>,
pub shift: i64,
@ -13,7 +13,7 @@ where
impl<T> Default for MergeContext<T>
where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
fn default() -> Self {
MergeContext {
@ -25,7 +25,7 @@ where
impl<T> Debug for MergeContext<T>
where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("MergeContext")
@ -37,7 +37,7 @@ where
impl<T> MergeContext<T>
where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
pub fn last_operation(&self) -> Option<&Operation<T>> { self.last_operation.as_ref() }

View file

@ -1,7 +1,5 @@
use core::{
fmt::{Debug, Display},
ops::Range,
};
use core::fmt::{Debug, Display};
use std::ops::Range;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
@ -9,7 +7,10 @@ use serde::{Deserialize, Serialize};
use super::merge_context::MergeContext;
use crate::{
Token,
utils::{find_common_overlap::find_common_overlap, string_builder::StringBuilder},
utils::{
find_longest_prefix_contained_within::find_longest_prefix_contained_within,
string_builder::StringBuilder,
},
};
/// Represents a change that can be applied to a text document.
@ -19,7 +20,7 @@ use crate::{
#[derive(Clone, PartialEq)]
pub enum Operation<T>
where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
Insert {
index: usize,
@ -37,7 +38,7 @@ where
impl<T> Operation<T>
where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
/// Creates an insert operation with the given index and text.
/// If the text is empty (meaning that the operation would be a no-op),
@ -81,15 +82,8 @@ where
})
}
/// Tries to apply the operation to the given `ropey::Rope` text, returning
/// the modified text.
///
/// # Errors
///
/// Returns a `SyncLibError::OperationApplicationError` if the operation
/// cannot be applied.
///
/// # Panics
/// Applies the operation to the given `StringBuilder`, returning the
/// modified `StringBuilder`.
///
/// When compiled in debug mode, panics if a delete operation is attempted
/// on a range of text that does not match the text to be deleted.
@ -114,7 +108,7 @@ where
builder.delete(self.range());
}
};
}
builder
}
@ -122,8 +116,7 @@ where
/// Returns the index of the first character that the operation affects.
pub fn start_index(&self) -> usize {
match self {
Operation::Insert { index, .. } => *index,
Operation::Delete { index, .. } => *index,
Operation::Insert { index, .. } | Operation::Delete { index, .. } => *index,
}
}
@ -137,6 +130,7 @@ where
}
/// Returns the range of indices of characters that the operation affects.
#[allow(clippy::range_plus_one)]
pub fn range(&self) -> Range<usize> { self.start_index()..self.end_index() + 1 }
/// Returns the number of affected characters. It is always greater than 0
@ -212,17 +206,20 @@ where
..
}),
) => {
let offset_in_tokens = find_common_overlap(previous_inserted_text, &text);
let trimmed_length_in_tokens = previous_inserted_text.len() - offset_in_tokens;
let trimmed_length = previous_inserted_text
// In case the current insert's prefix appears in the previously inserted text,
// we can trim the current insert to only include the non-overlapping part.
// This way, we don't end up duplicating text.
let offset_in_tokens =
find_longest_prefix_contained_within(previous_inserted_text, &text);
let offset_in_length = text
.iter()
.skip(offset_in_tokens)
.take(offset_in_tokens)
.map(Token::get_original_length)
.sum::<usize>();
let trimmed_operation =
Operation::create_insert(index, text[trimmed_length_in_tokens..].to_vec());
Operation::create_insert(index, text[offset_in_tokens..].to_vec());
affecting_context.shift -= trimmed_length as i64;
affecting_context.shift -= offset_in_length as i64;
produced_context.shift += trimmed_operation
.as_ref()
.map(Operation::len)
@ -297,7 +294,7 @@ where
impl<T> Display for Operation<T>
where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
@ -341,7 +338,7 @@ where
impl<T> Debug for Operation<T>
where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { write!(f, "{self}") }
}
@ -353,7 +350,7 @@ mod tests {
use super::*;
#[test]
#[should_panic]
#[should_panic(expected = "Shifted index must be non-negative")]
fn test_shifting_error() {
insta::assert_debug_snapshot!(
Operation::create_insert(1, vec!["hi".into()])

View file

@ -8,19 +8,19 @@ EditedText {
operations: [
OrderedOperation {
order: 0,
operation: <insert 'Hello, my friend! ' from index 0>,
operation: <insert 'Hello, my friend!' from index 0>,
},
OrderedOperation {
order: 0,
operation: <delete 'hello world! ' from index 18>,
operation: <delete 'hello world!' from index 17>,
},
OrderedOperation {
order: 21,
operation: <insert 'you doing? Albert' from index 26>,
order: 20,
operation: <insert ' you doing? Albert' from index 25>,
},
OrderedOperation {
order: 21,
operation: <delete 'you? Adam' from index 43>,
order: 20,
operation: <delete ' you? Adam' from index 43>,
},
],
}

View file

@ -0,0 +1,6 @@
---
source: reconcile/src/tokenizer/word_tokenizer.rs
expression: "word_tokenizer(\"\")"
snapshot_kind: text
---
[]

View file

@ -0,0 +1,15 @@
---
source: reconcile/src/tokenizer/word_tokenizer.rs
expression: "word_tokenizer(\" what? \")"
snapshot_kind: text
---
[
Token {
normalised: "what?",
original: " what?",
},
Token {
normalised: "",
original: " ",
},
]

View file

@ -0,0 +1,23 @@
---
source: reconcile/src/tokenizer/word_tokenizer.rs
expression: "word_tokenizer(\" hello, \\nwhere are you?\")"
snapshot_kind: text
---
[
Token {
normalised: "hello,",
original: " hello,",
},
Token {
normalised: "where",
original: " \nwhere",
},
Token {
normalised: "are",
original: " are",
},
Token {
normalised: "you?",
original: " you?",
},
]

View file

@ -0,0 +1,15 @@
---
source: reconcile/src/tokenizer/word_tokenizer.rs
expression: "word_tokenizer(\"Hi there!\")"
snapshot_kind: text
---
[
Token {
normalised: "Hi",
original: "Hi",
},
Token {
normalised: "there!",
original: " there!",
},
]

View file

@ -8,24 +8,19 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Clone)]
pub struct Token<T>
where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
normalised: T,
original: String,
}
impl From<&str> for Token<String> {
fn from(s: &str) -> Self {
Token {
normalised: s.to_owned(),
original: s.to_owned(),
}
}
fn from(s: &str) -> Self { Token::new(s.trim().to_owned(), s.to_owned()) }
}
impl<T> Token<T>
where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
pub fn new(normalised: T, original: String) -> Self {
Token {
@ -43,7 +38,7 @@ where
impl<T> PartialEq for Token<T>
where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
fn eq(&self, other: &Self) -> bool { self.normalised == other.normalised }
}

View file

@ -1,7 +1,48 @@
use super::token::Token;
/// Splits on whitespace keeping the leading whitespace.
///
///
/// ## Example
///
/// "Hi there!" -> ["Hi", " there!"]
pub fn word_tokenizer(text: &str) -> Vec<Token<String>> {
text.split_inclusive(char::is_whitespace)
.map(|s| Token::new(s.to_owned(), s.to_owned()))
.collect()
let mut result: Vec<Token<String>> = Vec::new();
let mut last_whitespace = 0;
let mut previous_char_is_whitespace = true;
for (i, c) in text.char_indices() {
let is_current_char_whitespace = c.is_whitespace();
if !previous_char_is_whitespace && is_current_char_whitespace {
result.push(text[last_whitespace..i].into());
last_whitespace = i;
}
previous_char_is_whitespace = is_current_char_whitespace;
}
if last_whitespace < text.len() {
result.push(text[last_whitespace..].into());
}
result
}
#[cfg(test)]
mod tests {
use insta::assert_debug_snapshot;
use super::*;
#[test]
fn test_with_snapshots() {
assert_debug_snapshot!(word_tokenizer("Hi there!"));
assert_debug_snapshot!(word_tokenizer(""));
assert_debug_snapshot!(word_tokenizer(" what? "));
assert_debug_snapshot!(word_tokenizer(" hello, \nwhere are you?"));
}
}

View file

@ -1,6 +1,6 @@
pub mod common_prefix_len;
pub mod common_suffix_len;
pub mod find_common_overlap;
pub mod find_longest_prefix_contained_within;
pub mod merge_iters;
pub mod ordered_operation;
pub mod side;

View file

@ -1,71 +0,0 @@
use crate::Token;
/// Given two lists of tokens, returns the offset in the first (old) list from
/// which the two lists have the same tokens until the end of the first list.
/// Thus, the suffix of the old list from the offset to the end is equal to a
/// prefix of the new list.
///
/// If there is no overlap, the function returns the maxmium offset, the length
/// of the old list.
///
/// ## Example
///
/// ```not_rust
/// old: [0, 1, 9, 0, 2, 5]
/// new: [9, 0, 2, 5, 1]
/// ```
/// > results in an offset of 2
pub fn find_common_overlap<T>(old: &[Token<T>], new: &[Token<T>]) -> usize
where
T: PartialEq + Clone,
{
let minimum_offset = old.len().saturating_sub(new.len());
for offset in minimum_offset..old.len() {
if old.iter().skip(offset).zip(new.iter()).all(|(a, b)| a == b) {
return offset;
}
}
old.len()
}
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use super::*;
#[test]
fn test_common_overlap() {
assert_eq!(find_common_overlap(&["".into()], &["".into()]), 0);
assert_eq!(
find_common_overlap(
&["a".into(), "b".into(), "c".into()],
&["b".into(), "c".into(), "a".into()]
),
1
);
assert_eq!(
find_common_overlap(
&["a".into(), "a".into(), "a".into()],
&["a".into(), "b".into(), "c".into()]
),
2
);
assert_eq!(
find_common_overlap(
&["a".into(), "b".into(), "c".into()],
&["d".into(), "e".into(), "a".into()]
),
3
);
assert_eq!(
find_common_overlap(&["a".into(), "a".into()], &["a".into()]),
1
);
}
}

View file

@ -0,0 +1,103 @@
use crate::Token;
/// Given two lists of tokens, returns `length` where `old` list somewhere
/// within contains the `length` prefix of the `new` list.
///
/// ## Example
///
/// ```not_rust
/// old: [0, 1, 9, 0, 2, 5]
/// new: [9, 0, 2, 5, 1]
/// ```
/// > results in an length of 4
///
///
/// ```not_rust
/// old: [0, 1, 9, 0, 2, 5]
/// new: [0, 2]
/// ```
/// > results in an length of 2
///
/// ```not_rust
/// old: [0, 1, 9, 0, 2, 5]
/// new: [0, 4]
/// ```
/// > results in an length of 1
pub fn find_longest_prefix_contained_within<T>(old: &[Token<T>], new: &[Token<T>]) -> usize
where
T: PartialEq + Clone + std::fmt::Debug,
{
let max_possible = new.len().min(old.len());
for len in (1..=max_possible).rev() {
let prefix = &new[..len];
if old.windows(len).any(|window| window == prefix) {
return len;
}
}
0
}
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use super::*;
#[test]
fn test_common_overlap() {
assert_eq!(
find_longest_prefix_contained_within(&["".into()], &["".into()]),
1
);
assert_eq!(
find_longest_prefix_contained_within(
&["a".into(), "b".into(), "c".into()],
&["b".into(), "c".into(), "a".into()]
),
2
);
assert_eq!(
find_longest_prefix_contained_within(
&["a".into(), "b".into(), "c".into()],
&["b".into(), "c".into()]
),
2
);
assert_eq!(
find_longest_prefix_contained_within(
&["a".into(), "b".into(), "c".into()],
&["b".into()]
),
1
);
assert_eq!(
find_longest_prefix_contained_within(
&["a".into(), "b".into(), "c".into(), "b".into(), "a".into()],
&["b".into(), "a".into()]
),
2
);
assert_eq!(
find_longest_prefix_contained_within(
&["a".into(), "a".into(), "a".into()],
&["a".into(), "b".into(), "c".into()]
),
1
);
assert_eq!(
find_longest_prefix_contained_within(
&["a".into(), "b".into(), "c".into()],
&["d".into(), "e".into(), "a".into()]
),
0
);
}
}

View file

@ -46,8 +46,7 @@ where
};
match order {
Some(Ordering::Less) | None => self.left.next(),
Some(Ordering::Equal) => self.left.next(),
Some(Ordering::Less | Ordering::Equal) | None => self.left.next(),
Some(Ordering::Greater) => self.right.next(),
}
}

View file

@ -7,7 +7,7 @@ use crate::operation_transformation::Operation;
#[derive(Debug, Clone, PartialEq)]
pub struct OrderedOperation<T>
where
T: PartialEq + Clone,
T: PartialEq + Clone + std::fmt::Debug,
{
pub order: usize,
pub operation: Operation<T>,

View file

@ -0,0 +1,4 @@
[toolchain]
channel = "nightly-2025-03-14"
targets = [ "x86_64-unknown-linux-gnu", "x86_64-unknown-linux-musl" ]
profile = "default"

View file

@ -18,7 +18,20 @@ pub mod errors;
/// Encode binary data for easy transport over HTTP. Inverse of
/// `base64_to_bytes`.
///
/// # Arguments
///
/// - `input`: The binary data to encode.
///
/// # Returns
///
/// The base64-encoded string.
///
/// # Panics
///
/// If the input is not valid UTF-8.
#[wasm_bindgen(js_name = bytesToBase64)]
#[must_use]
pub fn bytes_to_base64(input: &[u8]) -> String {
set_panic_hook();
@ -26,6 +39,19 @@ pub fn bytes_to_base64(input: &[u8]) -> String {
}
/// Inverse of `bytes_to_base64`.
/// Decode base64-encoded data into binary data.
///
/// # Arguments
///
/// - `input`: The base64-encoded string.
///
/// # Returns
///
/// The decoded binary data.
///
/// # Errors
///
/// If the input is not valid base64.
#[wasm_bindgen(js_name = base64ToBytes)]
pub fn base64_to_bytes(input: &str) -> Result<Vec<u8>, SyncLibError> {
set_panic_hook();
@ -36,7 +62,22 @@ pub fn base64_to_bytes(input: &str) -> Result<Vec<u8>, SyncLibError> {
/// Merge two documents with a common parent. Relies on `reconcile::reconcile`
/// for texts and returns the right document as-is if either of the updated
/// documents is binary.
///
/// # Arguments
///
/// - `parent`: The common parent document.
/// - `left`: The left document updated by one user.
/// - `right`: The right document updated by another user.
///
/// # Returns
///
/// The merged document.
///
/// # Panics
///
/// If any of the input documents are not valid UTF-8 strings.
#[wasm_bindgen]
#[must_use]
pub fn merge(parent: &[u8], left: &[u8], right: &[u8]) -> Vec<u8> {
set_panic_hook();
@ -54,6 +95,7 @@ pub fn merge(parent: &[u8], left: &[u8], right: &[u8]) -> Vec<u8> {
/// WASM wrapper around `reconcile::reconcile` for text merging.
#[wasm_bindgen(js_name = mergeText)]
#[must_use]
pub fn merge_text(parent: &str, left: &str, right: &str) -> String {
set_panic_hook();
@ -63,10 +105,11 @@ pub fn merge_text(parent: &str, left: &str, right: &str) -> String {
/// Heuristically determine if the given data is a binary or a text file's
/// content.
#[wasm_bindgen(js_name = isBinary)]
#[must_use]
pub fn is_binary(data: &[u8]) -> bool {
set_panic_hook();
if data.iter().any(|&b| b == 0) {
if data.contains(&0) {
// Even though the NUL character is valid in UTF-8, it's highly suspicious in
// human-readable text.
return true;
@ -77,6 +120,7 @@ pub fn is_binary(data: &[u8]) -> bool {
/// We don't want to support merging structured data like JSON, YAML, etc.
#[wasm_bindgen(js_name = isFileTypeMergable)]
#[must_use]
pub fn is_file_type_mergable(path_or_file_name: &str) -> bool {
set_panic_hook();

View file

@ -26,7 +26,8 @@ fn test_base64_to_bytes_error() {
fn merge_text() {
let left = b"hello ";
let right = b"world";
assert_eq!(merge(b"", left, right), b"hello world".to_vec());
let result = merge(b"", left, right);
assert_eq!(result, b"hello world");
}
#[wasm_bindgen_test(unsupported = test)]

View file

@ -42,9 +42,12 @@ impl Config {
}
pub async fn load_from_file(path: &Path) -> Result<Self> {
let contents = fs::read_to_string(path)
.await
.with_context(|| format!("Cannot load configuration from disk from ({path:?})"))?;
let contents = fs::read_to_string(path).await.with_context(|| {
format!(
"Cannot load configuration from disk from {}",
path.display()
)
})?;
let config = serde_yaml::from_str(&contents).context("Failed to parse configuration")?;

View file

@ -1,31 +1,33 @@
use std::path::PathBuf;
use log::debug;
use serde::{Deserialize, Serialize};
use crate::consts::{DEFAULT_MAX_CONNECTIONS, DEFAULT_SQLITE_URL};
use crate::consts::{DEFAULT_DATABASES_DIRECTORY_PATH, DEFAULT_MAX_CONNECTIONS};
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct DatabaseConfig {
#[serde(default = "default_sqlite_url")]
pub sqlite_url: String,
#[serde(default = "default_databases_directory_path")]
pub databases_directory_path: PathBuf,
#[serde(default = "default_max_connections")]
pub max_connections: u32,
}
fn default_sqlite_url() -> String {
debug!("Using default sqlite url: {}", DEFAULT_SQLITE_URL);
DEFAULT_SQLITE_URL.to_owned()
fn default_databases_directory_path() -> PathBuf {
debug!("Using default databases directory path: {DEFAULT_DATABASES_DIRECTORY_PATH:?}");
PathBuf::from(DEFAULT_DATABASES_DIRECTORY_PATH)
}
fn default_max_connections() -> u32 {
debug!("Using default max connections: {}", DEFAULT_MAX_CONNECTIONS);
debug!("Using default max connections: {DEFAULT_MAX_CONNECTIONS}");
DEFAULT_MAX_CONNECTIONS
}
impl Default for DatabaseConfig {
fn default() -> Self {
Self {
sqlite_url: default_sqlite_url(),
databases_directory_path: default_databases_directory_path(),
max_connections: default_max_connections(),
}
}

View file

@ -15,20 +15,17 @@ pub struct ServerConfig {
}
fn default_host() -> String {
debug!("Using default server host: {}", DEFAULT_HOST);
debug!("Using default server host: {DEFAULT_HOST}");
DEFAULT_HOST.to_owned()
}
fn default_port() -> u16 {
debug!("Using default server port: {}", DEFAULT_PORT);
debug!("Using default server port: {DEFAULT_PORT}");
DEFAULT_PORT
}
fn default_max_body_size_mb() -> usize {
debug!(
"Using default max body size (MB): {}",
DEFAULT_MAX_BODY_SIZE_MB
);
debug!("Using default max body size (MB): {DEFAULT_MAX_BODY_SIZE_MB}");
DEFAULT_MAX_BODY_SIZE_MB
}

View file

@ -1,5 +1,5 @@
pub const CONFIG_PATH: &str = "config.yml";
pub const DEFAULT_SQLITE_URL: &str = "db.sqlite3";
pub const DEFAULT_DATABASES_DIRECTORY_PATH: &str = "databases";
pub const DEFAULT_HOST: &str = "127.0.0.1";
pub const DEFAULT_PORT: u16 = 3000;
pub const DEFAULT_MAX_CONNECTIONS: u32 = 12;

View file

@ -1,4 +1,5 @@
use core::{str::FromStr as _, time::Duration};
use core::time::Duration;
use std::{collections::HashMap, sync::Arc};
use anyhow::{Context as _, Result};
use models::{
@ -7,19 +8,66 @@ use models::{
use sqlx::{sqlite::SqliteConnectOptions, types::chrono::Utc};
pub mod models;
use sqlx::{Pool, Sqlite, sqlite::SqlitePoolOptions};
use tokio::sync::Mutex;
use uuid::fmt::Hyphenated;
use crate::config::database_config::DatabaseConfig;
#[derive(Clone, Debug)]
pub struct Database {
connection_pool: Pool<Sqlite>,
config: DatabaseConfig,
connection_pools: Arc<Mutex<HashMap<VaultId, Pool<Sqlite>>>>,
}
pub type Transaction<'a> = sqlx::Transaction<'a, Sqlite>;
impl Database {
pub async fn try_new(config: &DatabaseConfig) -> Result<Self> {
let connection_options = SqliteConnectOptions::from_str(&config.sqlite_url)?
tokio::fs::create_dir_all(&config.databases_directory_path)
.await
.with_context(|| {
format!(
"Failed to create databases directory: {}",
config.databases_directory_path.to_string_lossy()
)
})?;
let mut connection_pools = std::collections::HashMap::new();
let mut entries = tokio::fs::read_dir(&config.databases_directory_path).await?;
while let Some(entry) = entries.next_entry().await? {
if !entry.file_name().to_string_lossy().ends_with(".sqlite") {
continue;
}
let vault: VaultId = entry
.file_name()
.to_string_lossy()
.trim_end_matches(".sqlite")
.to_owned();
connection_pools.insert(
vault.clone(),
Self::create_vault_database(config, &vault).await?,
);
}
Ok(Self {
config: config.clone(),
connection_pools: Arc::new(Mutex::new(connection_pools)),
})
}
async fn create_vault_database(
config: &DatabaseConfig,
vault: &VaultId,
) -> Result<Pool<Sqlite>> {
let file_name = config
.databases_directory_path
.join(format!("{vault}.sqlite"));
let connection_options = SqliteConnectOptions::new()
.filename(file_name.clone())
.create_if_missing(true)
.busy_timeout(Duration::from_secs(3600))
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal);
@ -29,18 +77,11 @@ impl Database {
.test_before_acquire(true)
.connect_with(connection_options)
.await
.with_context(|| {
format!(
"Cannot connect to database with url: {}",
&config.sqlite_url
)
})?;
.with_context(|| format!("Cannot open database at {}", file_name.display()))?;
Self::run_migrations(&pool).await?;
Ok(Self {
connection_pool: pool,
})
Ok(pool)
}
async fn run_migrations(pool: &Pool<Sqlite>) -> Result<()> {
@ -50,17 +91,38 @@ impl Database {
.context("Cannot check for pending migrations")
}
async fn get_connection_pool(&mut self, vault: &VaultId) -> Result<Pool<Sqlite>> {
let mut pools = self.connection_pools.lock().await;
if !pools.contains_key(vault) {
let pool = Self::create_vault_database(&self.config, vault).await?;
pools.insert(vault.clone(), pool);
}
let pool = pools
.get(vault)
.expect("Pool was just inserted or already exists");
Ok(pool.clone())
}
/// Attempting to write from this transaction might result in a
/// database locked error. Use this transaction for read-only operations.
pub async fn create_readonly_transaction(&self) -> Result<Transaction<'_>> {
self.connection_pool
pub async fn create_readonly_transaction(
&mut self,
vault: &VaultId,
) -> Result<Transaction<'static>> {
self.get_connection_pool(vault)
.await?
.begin()
.await
.context("Cannot create transaction")
}
pub async fn create_write_transaction(&self) -> Result<Transaction<'_>> {
let mut transaction = self.create_readonly_transaction().await?;
pub async fn create_write_transaction(
&mut self,
vault: &VaultId,
) -> Result<Transaction<'static>> {
let mut transaction = self.create_readonly_transaction(vault).await?;
// sqlx doesn't support immediate transactions for sqlite: https://github.com/launchbadge/sqlx/issues/481
sqlx::query!("END; BEGIN IMMEDIATE;")
@ -72,7 +134,7 @@ impl Database {
/// Return the latest state of all documents in the vault
pub async fn get_latest_documents(
&self,
&mut self,
vault: &VaultId,
transaction: Option<&mut Transaction<'_>>,
) -> Result<Vec<DocumentVersionWithoutContent>> {
@ -80,24 +142,22 @@ impl Database {
DocumentVersionWithoutContent,
r#"
select
vault_id,
vault_update_id,
document_id as "document_id: uuid::Uuid",
document_id as "document_id: Hyphenated",
relative_path,
created_date as "created_date: chrono::DateTime<Utc>",
updated_date as "updated_date: chrono::DateTime<Utc>",
is_deleted
from latest_document_versions
where vault_id = ?
order by vault_update_id desc
"#,
vault,
);
if let Some(transaction) = transaction {
query.fetch_all(&mut **transaction).await
} else {
query.fetch_all(&self.connection_pool).await
query
.fetch_all(&self.get_connection_pool(vault).await?)
.await
}
.context("Cannot fetch latest documents")
}
@ -105,7 +165,7 @@ impl Database {
/// Return the latest state of all documents (including deleted) in the
/// vault which have changed since the given update id
pub async fn get_latest_documents_since(
&self,
&mut self,
vault: &VaultId,
vault_update_id: VaultUpdateId,
transaction: Option<&mut Transaction<'_>>,
@ -114,25 +174,24 @@ impl Database {
DocumentVersionWithoutContent,
r#"
select
vault_id,
vault_update_id,
document_id as "document_id: uuid::Uuid",
document_id as "document_id: Hyphenated",
relative_path,
created_date as "created_date: chrono::DateTime<Utc>",
updated_date as "updated_date: chrono::DateTime<Utc>",
is_deleted
from latest_document_versions
where vault_id = ? and vault_update_id > ?
where vault_update_id > ?
order by vault_update_id desc
"#,
vault,
vault_update_id
);
if let Some(transaction) = transaction {
query.fetch_all(&mut **transaction).await
} else {
query.fetch_all(&self.connection_pool).await
query
.fetch_all(&self.get_connection_pool(vault).await?)
.await
}
.with_context(|| {
format!("Cannot fetch latest documents since vault_update_id {vault_update_id}")
@ -140,7 +199,7 @@ impl Database {
}
pub async fn get_max_update_id_in_vault(
&self,
&mut self,
vault: &VaultId,
transaction: Option<&mut Transaction<'_>>,
) -> Result<i64> {
@ -148,22 +207,22 @@ impl Database {
r#"
select coalesce(max(vault_update_id), 0) as max_vault_update_id
from documents
where vault_id = ?
"#,
vault
);
if let Some(transaction) = transaction {
query.fetch_one(&mut **transaction).await
} else {
query.fetch_one(&self.connection_pool).await
query
.fetch_one(&self.get_connection_pool(vault).await?)
.await
}
.map(|row| row.max_vault_update_id)
.context("Cannot fetch max update id in vault")
}
pub async fn get_latest_document_by_path(
&self,
&mut self,
vault: &VaultId,
relative_path: &str,
transaction: Option<&mut Transaction<'_>>,
@ -172,68 +231,67 @@ impl Database {
StoredDocumentVersion,
r#"
select
vault_id,
vault_update_id,
document_id as "document_id: uuid::Uuid",
document_id as "document_id: Hyphenated",
relative_path,
created_date as "created_date: chrono::DateTime<Utc>",
updated_date as "updated_date: chrono::DateTime<Utc>",
content,
is_deleted
from latest_document_versions
where vault_id = ? and relative_path = ?
where relative_path = ?
order by vault_update_id desc -- `latest_document_versions` only contains a single latest version of each document, however,
-- multiple documents can have the same `relative_path`, if they have been deleted. That's
-- why we only care about the latest version of the document with the given relative path.
limit 1
"#,
vault,
relative_path
);
if let Some(transaction) = transaction {
query.fetch_optional(&mut **transaction).await
} else {
query.fetch_optional(&self.connection_pool).await
query
.fetch_optional(&self.get_connection_pool(vault).await?)
.await
}
.context("Cannot fetch latest document version")
}
pub async fn get_latest_document(
&self,
&mut self,
vault: &VaultId,
document_id: &DocumentId,
transaction: Option<&mut Transaction<'_>>,
) -> Result<Option<StoredDocumentVersion>> {
let document_id = document_id.as_hyphenated();
let query = sqlx::query_as!(
StoredDocumentVersion,
r#"
select
vault_id,
vault_update_id,
document_id as "document_id: uuid::Uuid",
document_id as "document_id: Hyphenated",
relative_path,
created_date as "created_date: chrono::DateTime<Utc>",
updated_date as "updated_date: chrono::DateTime<Utc>",
content,
is_deleted
from latest_document_versions
where vault_id = ? and document_id = ?
where document_id = ?
"#,
vault,
document_id
);
if let Some(transaction) = transaction {
query.fetch_optional(&mut **transaction).await
} else {
query.fetch_optional(&self.connection_pool).await
query
.fetch_optional(&self.get_connection_pool(vault).await?)
.await
}
.context("Cannot fetch latest document version")
}
pub async fn get_document_version(
&self,
&mut self,
vault: &VaultId,
vault_update_id: VaultUpdateId,
transaction: Option<&mut Transaction<'_>>,
@ -242,52 +300,49 @@ impl Database {
StoredDocumentVersion,
r#"
select
vault_id,
vault_update_id,
document_id as "document_id: uuid::Uuid",
document_id as "document_id: Hyphenated",
relative_path,
created_date as "created_date: chrono::DateTime<Utc>",
updated_date as "updated_date: chrono::DateTime<Utc>",
content,
is_deleted
from documents
where vault_id = ? and vault_update_id = ?"#,
vault,
where vault_update_id = ?"#,
vault_update_id
);
if let Some(transaction) = transaction {
query.fetch_optional(&mut **transaction).await
} else {
query.fetch_optional(&self.connection_pool).await
query
.fetch_optional(&self.get_connection_pool(vault).await?)
.await
}
.context("Cannot fetch document version")
}
pub async fn insert_document_version(
&self,
&mut self,
vault: &VaultId,
version: &StoredDocumentVersion,
transaction: Option<&mut Transaction<'_>>,
) -> Result<()> {
let document_id = version.document_id.as_hyphenated();
let query = sqlx::query!(
r#"
insert into documents (
vault_id,
vault_update_id,
document_id,
relative_path,
created_date,
updated_date,
content,
is_deleted
)
values (?, ?, ?, ?, ?, ?, ?, ?)
values (?, ?, ?, ?, ?, ?)
"#,
version.vault_id,
version.vault_update_id,
version.document_id,
document_id,
version.relative_path,
version.created_date,
version.updated_date,
version.content,
version.is_deleted
@ -296,7 +351,7 @@ impl Database {
if let Some(transaction) = transaction {
query.execute(&mut **transaction).await
} else {
query.execute(&self.connection_pool).await
query.execute(&self.get_connection_pool(vault).await?).await
}
.context("Cannot insert document version")?;

View file

@ -1,25 +1,21 @@
CREATE TABLE IF NOT EXISTS documents (
vault_id TEXT NOT NULL,
vault_update_id INTEGER NOT NULL,
vault_update_id INTEGER NOT NULL PRIMARY KEY,
document_id TEXT NOT NULL,
relative_path TEXT NOT NULL,
created_date TIMESTAMP NOT NULL,
updated_date TIMESTAMP NOT NULL,
content BLOB NOT NULL,
is_deleted BOOLEAN NOT NULL,
PRIMARY KEY (vault_id, vault_update_id)
is_deleted BOOLEAN NOT NULL
);
CREATE VIEW IF NOT EXISTS latest_document_versions AS
SELECT d.*
FROM documents d
INNER JOIN (
SELECT vault_id, MAX(vault_update_id) AS max_version_id
SELECT MAX(vault_update_id) AS max_version_id
FROM documents
GROUP BY vault_id, document_id
GROUP BY document_id
) max_versions
ON d.vault_id = max_versions.vault_id
AND d.vault_update_id = max_versions.max_version_id;
ON d.vault_update_id = max_versions.max_version_id;
CREATE INDEX IF NOT EXISTS idx_documents_vault_id_relative_path
ON documents (vault_id, relative_path);
ON documents (relative_path);

View file

@ -9,30 +9,24 @@ pub type DocumentId = uuid::Uuid;
#[derive(Debug, Clone)]
pub struct StoredDocumentVersion {
pub vault_id: VaultId,
pub vault_update_id: VaultUpdateId,
pub document_id: DocumentId,
pub relative_path: String,
pub created_date: DateTime<Utc>,
pub updated_date: DateTime<Utc>,
pub content: Vec<u8>,
pub is_deleted: bool,
}
impl PartialEq<Self> for StoredDocumentVersion {
fn eq(&self, other: &Self) -> bool {
self.vault_id == other.vault_id && self.vault_update_id == other.vault_update_id
}
fn eq(&self, other: &Self) -> bool { self.vault_update_id == other.vault_update_id }
}
#[derive(Debug, Clone, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct DocumentVersionWithoutContent {
pub vault_id: VaultId,
pub vault_update_id: VaultUpdateId,
pub document_id: DocumentId,
pub relative_path: String,
pub created_date: DateTime<Utc>,
pub updated_date: DateTime<Utc>,
pub is_deleted: bool,
}
@ -40,11 +34,9 @@ pub struct DocumentVersionWithoutContent {
impl From<StoredDocumentVersion> for DocumentVersionWithoutContent {
fn from(value: StoredDocumentVersion) -> Self {
Self {
vault_id: value.vault_id,
vault_update_id: value.vault_update_id,
document_id: value.document_id,
relative_path: value.relative_path,
created_date: value.created_date,
updated_date: value.updated_date,
is_deleted: value.is_deleted,
}
@ -54,11 +46,9 @@ impl From<StoredDocumentVersion> for DocumentVersionWithoutContent {
#[derive(Debug, Clone, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct DocumentVersion {
pub vault_id: VaultId,
pub vault_update_id: VaultUpdateId,
pub document_id: DocumentId,
pub relative_path: String,
pub created_date: DateTime<Utc>,
pub updated_date: DateTime<Utc>,
pub content_base64: String,
pub is_deleted: bool,
@ -67,11 +57,9 @@ pub struct DocumentVersion {
impl From<StoredDocumentVersion> for DocumentVersion {
fn from(value: StoredDocumentVersion) -> Self {
Self {
vault_id: value.vault_id,
vault_update_id: value.vault_update_id,
document_id: value.document_id,
relative_path: value.relative_path,
created_date: value.created_date,
updated_date: value.updated_date,
content_base64: bytes_to_base64(&value.content),
is_deleted: value.is_deleted,

View file

@ -33,12 +33,12 @@ pub enum SyncServerError {
impl SyncServerError {
pub fn serialize(&self) -> SerializedError {
match self {
Self::InitError(error) => error.into(),
Self::ClientError(error) => error.into(),
Self::ServerError(error) => error.into(),
Self::NotFound(error) => error.into(),
Self::Unauthorized(error) => error.into(),
Self::PermissionDeniedError(error) => error.into(),
Self::InitError(error)
| Self::ClientError(error)
| Self::ServerError(error)
| Self::NotFound(error)
| Self::Unauthorized(error)
| Self::PermissionDeniedError(error) => error.into(),
}
}
}
@ -48,9 +48,10 @@ impl IntoResponse for SyncServerError {
let body = Json(self.serialize());
match self {
Self::InitError(_) => (StatusCode::INTERNAL_SERVER_ERROR, body).into_response(),
Self::InitError(_) | Self::ServerError(_) => {
(StatusCode::INTERNAL_SERVER_ERROR, body).into_response()
}
Self::ClientError(_) => (StatusCode::BAD_REQUEST, body).into_response(),
Self::ServerError(_) => (StatusCode::INTERNAL_SERVER_ERROR, body).into_response(),
Self::NotFound(_) => (StatusCode::NOT_FOUND, body).into_response(),
Self::Unauthorized(_) => (StatusCode::UNAUTHORIZED, body).into_response(),
Self::PermissionDeniedError(_) => (StatusCode::FORBIDDEN, body).into_response(),

View file

@ -16,6 +16,7 @@ use axum::{
extract::{DefaultBodyLimit, Request},
http::{self, HeaderValue, Method},
response::IntoResponse,
routing::IntoMakeService,
};
use log::{error, info};
use tokio::signal;
@ -30,7 +31,10 @@ use tower_http::{
};
use tracing::{Level, info_span};
use crate::errors::{SerializedError, not_found_error};
use crate::{
config::server_config::ServerConfig,
errors::{SerializedError, not_found_error},
};
mod app_state;
mod auth;
mod create_document;
@ -52,24 +56,9 @@ pub async fn create_server() -> Result<()> {
.await
.context("Failed to initialise app state")?;
let address = format!(
"{}:{}",
&app_state.config.server.host, &app_state.config.server.port
);
let mut api = OpenApi {
info: Info {
title: "VaultLink sync server".to_owned(),
summary: Some(
"Simple API for syncing documents between concurrent clients.".to_owned(),
),
description: Some(include_str!("../README.md").to_owned()),
version: env!("CARGO_PKG_VERSION").to_owned(),
..Info::default()
},
..OpenApi::default()
};
let server_config = app_state.config.server.clone();
let mut api = create_open_api();
let app = ApiRouter::new()
.api_route("/ping", get(ping::ping))
.api_route(
@ -140,11 +129,42 @@ pub async fn create_server() -> Result<()> {
.allow_methods([Method::GET, Method::POST, Method::PUT, Method::DELETE]),
)
.with_state(app_state)
.finish_api_with(&mut api, api_docs)
.finish_api_with(&mut api, add_api_docs_error_example)
.layer(Extension(Arc::new(api))) // https://github.com/tamasfe/aide/blob/507f4a8822bc0c13cbda0f589da1e0f4cbcdb812/examples/example-axum/src/main.rs#L39
.fallback(handler_404)
.into_make_service();
start_server(app, &server_config).await
}
async fn serve_api(Extension(api): Extension<Arc<OpenApi>>) -> impl IntoResponse { Json(api) }
fn create_open_api() -> OpenApi {
OpenApi {
info: Info {
title: "VaultLink sync server".to_owned(),
summary: Some(
"Simple API for syncing documents between concurrent clients.".to_owned(),
),
description: Some(include_str!("../README.md").to_owned()),
version: env!("CARGO_PKG_VERSION").to_owned(),
..Info::default()
},
..OpenApi::default()
}
}
fn add_api_docs_error_example(api: TransformOpenApi<'_>) -> TransformOpenApi<'_> {
api.default_response_with::<Json<SerializedError>, _>(|res| {
res.example(SerializedError {
message: "An error has occurred".to_owned(),
causes: vec![],
})
})
}
async fn start_server(app: IntoMakeService<axum::Router>, config: &ServerConfig) -> Result<()> {
let address = format!("{}:{}", config.host, config.port);
let listener = tokio::net::TcpListener::bind(address.clone())
.await
.with_context(|| format!("Failed to bind to address: {address}"))?;
@ -163,17 +183,6 @@ pub async fn create_server() -> Result<()> {
.context("Failed to start server")
}
async fn serve_api(Extension(api): Extension<Arc<OpenApi>>) -> impl IntoResponse { Json(api) }
fn api_docs(api: TransformOpenApi<'_>) -> TransformOpenApi<'_> {
api.default_response_with::<Json<SerializedError>, _>(|res| {
res.example(SerializedError {
message: "An error has occurred".to_owned(),
causes: vec![],
})
})
}
async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c()
@ -193,8 +202,8 @@ async fn shutdown_signal() {
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
() = ctrl_c => {},
() = terminate => {},
}
}

View file

@ -6,7 +6,6 @@ use axum_extra::{
headers::{Authorization, authorization::Bearer},
};
use axum_jsonschema::Json;
use chrono::{DateTime, Utc};
use schemars::JsonSchema;
use serde::Deserialize;
use sync_lib::base64_to_bytes;
@ -17,7 +16,7 @@ use super::{
requests::{CreateDocumentVersion, CreateDocumentVersionMultipart},
};
use crate::{
database::models::{DocumentVersionWithoutContent, StoredDocumentVersion, VaultId},
database::models::{DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId},
errors::{SyncServerError, client_error, server_error},
utils::sanitize_path,
};
@ -44,8 +43,8 @@ pub async fn create_document_multipart(
auth_header,
state,
vault_id,
request.document_id,
request.relative_path,
request.created_date,
request.content.contents.to_vec(),
)
.await
@ -69,8 +68,8 @@ pub async fn create_document_json(
auth_header,
state,
vault_id,
request.document_id,
request.relative_path,
request.created_date,
content_bytes,
)
.await
@ -78,20 +77,39 @@ pub async fn create_document_json(
async fn internal_create_document(
auth_header: Authorization<Bearer>,
state: AppState,
mut state: AppState,
vault_id: VaultId,
document_id: Option<DocumentId>,
relative_path: String,
created_date: DateTime<Utc>,
content: Vec<u8>,
) -> Result<Json<DocumentVersionWithoutContent>, SyncServerError> {
auth(&state, auth_header.token())?;
let mut transaction = state
.database
.create_write_transaction()
.create_write_transaction(&vault_id)
.await
.map_err(server_error)?;
let document_id = match document_id {
Some(document_id) => {
let existing_version = state
.database
.get_latest_document(&vault_id, &document_id, Some(&mut transaction))
.await
.map_err(server_error)?;
if existing_version.is_some() {
return Err(client_error(anyhow::anyhow!(
"Document with the same ID already exists"
)));
}
document_id
}
None => uuid::Uuid::new_v4(),
};
let last_update_id = state
.database
.get_max_update_id_in_vault(&vault_id, Some(&mut transaction))
@ -101,19 +119,17 @@ async fn internal_create_document(
let sanitized_relative_path = sanitize_path(&relative_path);
let new_version = StoredDocumentVersion {
vault_id,
vault_update_id: last_update_id + 1,
document_id: uuid::Uuid::new_v4(),
document_id,
relative_path: sanitized_relative_path,
content,
created_date,
updated_date: chrono::Utc::now(),
is_deleted: false,
};
state
.database
.insert_document_version(&new_version, Some(&mut transaction))
.insert_document_version(&vault_id, &new_version, Some(&mut transaction))
.await
.map_err(server_error)?;

View file

@ -10,7 +10,7 @@ use serde::Deserialize;
use super::{app_state::AppState, auth::auth, requests::DeleteDocumentVersion};
use crate::{
database::models::{DocumentId, StoredDocumentVersion, VaultId},
database::models::{DocumentId, DocumentVersionWithoutContent, StoredDocumentVersion, VaultId},
errors::{SyncServerError, server_error},
utils::sanitize_path,
};
@ -29,14 +29,14 @@ pub async fn delete_document(
vault_id,
document_id,
}): Path<PathParams>,
State(state): State<AppState>,
State(mut state): State<AppState>,
Json(request): Json<DeleteDocumentVersion>,
) -> Result<(), SyncServerError> {
) -> Result<Json<DocumentVersionWithoutContent>, SyncServerError> {
auth(&state, auth_header.token())?;
let mut transaction = state
.database
.create_write_transaction()
.create_write_transaction(&vault_id)
.await
.map_err(server_error)?;
@ -47,19 +47,17 @@ pub async fn delete_document(
.map_err(server_error)?;
let new_version = StoredDocumentVersion {
vault_id,
vault_update_id: last_update_id + 1,
document_id,
relative_path: sanitize_path(&request.relative_path),
content: vec![],
created_date: request.created_date,
updated_date: chrono::Utc::now(),
is_deleted: true,
};
state
.database
.insert_document_version(&new_version, Some(&mut transaction))
.insert_document_version(&vault_id, &new_version, Some(&mut transaction))
.await
.map_err(server_error)?;
@ -69,5 +67,5 @@ pub async fn delete_document(
.context("Failed to commit successful transaction")
.map_err(server_error)?;
Ok(())
Ok(Json(new_version.into()))
}

View file

@ -30,7 +30,7 @@ pub async fn fetch_document_version(
document_id,
vault_update_id,
}): Path<PathParams>,
State(state): State<AppState>,
State(mut state): State<AppState>,
) -> Result<Json<DocumentVersion>, SyncServerError> {
auth(&state, auth_header.token())?;
@ -39,12 +39,14 @@ pub async fn fetch_document_version(
.get_document_version(&vault_id, vault_update_id, None)
.await
.map_err(server_error)?
.map(Ok)
.unwrap_or_else(|| {
Err(not_found_error(anyhow!(
"Document with vault update id `{vault_update_id}` not found",
)))
})?;
.map_or_else(
|| {
Err(not_found_error(anyhow!(
"Document with vault update id `{vault_update_id}` not found",
)))
},
Ok,
)?;
if result.document_id != document_id {
return Err(not_found_error(anyhow!(

View file

@ -32,7 +32,7 @@ pub async fn fetch_document_version_content(
document_id,
vault_update_id,
}): Path<PathParams>,
State(state): State<AppState>,
State(mut state): State<AppState>,
) -> Result<Bytes, SyncServerError> {
auth(&state, auth_header.token())?;
@ -41,12 +41,14 @@ pub async fn fetch_document_version_content(
.get_document_version(&vault_id, vault_update_id, None)
.await
.map_err(server_error)?
.map(Ok)
.unwrap_or_else(|| {
Err(not_found_error(anyhow!(
"Document with vault update id `{vault_update_id}` not found",
)))
})?;
.map_or_else(
|| {
Err(not_found_error(anyhow!(
"Document with vault update id `{vault_update_id}` not found",
)))
},
Ok,
)?;
if result.document_id != document_id {
return Err(not_found_error(anyhow!(

View file

@ -28,7 +28,7 @@ pub async fn fetch_latest_document_version(
vault_id,
document_id,
}): Path<PathParams>,
State(state): State<AppState>,
State(mut state): State<AppState>,
) -> Result<Json<DocumentVersion>, SyncServerError> {
auth(&state, auth_header.token())?;
@ -37,12 +37,14 @@ pub async fn fetch_latest_document_version(
.get_latest_document(&vault_id, &document_id, None)
.await
.map_err(server_error)?
.map(Ok)
.unwrap_or_else(|| {
Err(not_found_error(anyhow!(
"Document with id `{document_id}` not found",
)))
})?;
.map_or_else(
|| {
Err(not_found_error(anyhow!(
"Document with id `{document_id}` not found",
)))
},
Ok,
)?;
Ok(Json(latest_version.into()))
}

View file

@ -30,7 +30,7 @@ pub async fn fetch_latest_documents(
TypedHeader(auth_header): TypedHeader<Authorization<Bearer>>,
Path(PathParams { vault_id }): Path<PathParams>,
Query(QueryParams { since_update_id }): Query<QueryParams>,
State(state): State<AppState>,
State(mut state): State<AppState>,
) -> Result<Json<FetchLatestDocumentsResponse>, SyncServerError> {
auth(&state, auth_header.token())?;

View file

@ -1,24 +1,27 @@
use aide_axum_typed_multipart::FieldData;
use axum::body::Bytes;
use axum_typed_multipart::TryFromMultipart;
use chrono::{DateTime, Utc};
use schemars::JsonSchema;
use serde::{self, Deserialize};
use crate::database::models::VaultUpdateId;
use crate::database::models::{DocumentId, VaultUpdateId};
#[derive(Debug, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateDocumentVersion {
/// The client can decide the document id (if it wishes to) in order
/// to help with syncing. If the client does not provide a document id,
/// the server will generate one. If the client provides a document id
/// it must not already exist in the database.
pub document_id: Option<DocumentId>,
pub relative_path: String,
pub created_date: DateTime<Utc>,
pub content_base64: String,
}
#[derive(Debug, TryFromMultipart, JsonSchema)]
pub struct CreateDocumentVersionMultipart {
pub document_id: Option<DocumentId>,
pub relative_path: String,
pub created_date: DateTime<Utc>,
#[form_data(limit = "unlimited")]
pub content: FieldData<Bytes>,
}
@ -28,7 +31,6 @@ pub struct CreateDocumentVersionMultipart {
pub struct UpdateDocumentVersion {
pub parent_version_id: VaultUpdateId,
pub relative_path: String,
pub created_date: DateTime<Utc>,
pub content_base64: String,
}
@ -37,7 +39,6 @@ pub struct UpdateDocumentVersion {
pub struct UpdateDocumentVersionMultipart {
pub parent_version_id: VaultUpdateId,
pub relative_path: String,
pub created_date: DateTime<Utc>,
#[form_data(limit = "unlimited")]
pub content: FieldData<Bytes>,
}
@ -46,5 +47,4 @@ pub struct UpdateDocumentVersionMultipart {
#[serde(rename_all = "camelCase")]
pub struct DeleteDocumentVersion {
pub relative_path: String,
pub created_date: DateTime<Utc>,
}

View file

@ -6,7 +6,6 @@ use axum_extra::{
headers::{Authorization, authorization::Bearer},
};
use axum_jsonschema::Json;
use chrono::{DateTime, Utc};
use log::info;
use schemars::JsonSchema;
use serde::Deserialize;
@ -50,7 +49,6 @@ pub async fn update_document_multipart(
document_id,
request.parent_version_id,
request.relative_path,
request.created_date,
request.content.contents.to_vec(),
)
.await
@ -77,21 +75,19 @@ pub async fn update_document_json(
document_id,
request.parent_version_id,
request.relative_path,
request.created_date,
content_bytes,
)
.await
}
#[allow(clippy::too_many_arguments)]
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
async fn internal_update_document(
auth_header: Authorization<Bearer>,
state: AppState,
mut state: AppState,
vault_id: VaultId,
document_id: DocumentId,
parent_version_id: VaultUpdateId,
relative_path: String,
created_date: DateTime<Utc>,
content: Vec<u8>,
) -> Result<Json<DocumentUpdateResponse>, SyncServerError> {
auth(&state, auth_header.token())?;
@ -114,7 +110,7 @@ async fn internal_update_document(
let mut transaction = state
.database
.create_write_transaction()
.create_write_transaction(&vault_id)
.await
.map_err(server_error)?;
@ -138,6 +134,18 @@ async fn internal_update_document(
Ok,
)?;
if latest_version.is_deleted {
transaction
.rollback()
.await
.context("Failed to roll back transaction")
.map_err(server_error)?;
return Ok(Json(DocumentUpdateResponse::FastForwardUpdate(
latest_version.into(),
)));
}
let sanitized_relative_path = sanitize_path(&relative_path);
// Return the latest version if the content and path are the same as the latest
@ -168,7 +176,7 @@ async fn internal_update_document(
let new_relative_path = if parent_document.relative_path == latest_version.relative_path
&& latest_version.relative_path != sanitized_relative_path
{
let mut new_relative_path = Default::default();
let mut new_relative_path = String::default();
for candidate in deduped_file_paths(&sanitized_relative_path) {
if state
.database
@ -188,19 +196,17 @@ async fn internal_update_document(
};
let new_version = StoredDocumentVersion {
vault_id,
document_id,
vault_update_id: last_update_id + 1,
relative_path: new_relative_path,
content: merged_content,
created_date,
updated_date: chrono::Utc::now(),
is_deleted: latest_version.is_deleted,
is_deleted: false,
};
state
.database
.insert_document_version(&new_version, Some(&mut transaction))
.insert_document_version(&vault_id, &new_version, Some(&mut transaction))
.await
.map_err(server_error)?;