More fixes
This commit is contained in:
parent
3d285b0b6e
commit
039affff09
10 changed files with 91 additions and 135 deletions
|
|
@ -29,8 +29,7 @@ export const offlineMoveThenRemoteDeleteTest: TestDefinition = {
|
|||
{
|
||||
type: "assert-consistent",
|
||||
verify: (s: AssertableState): void => {
|
||||
s.assertFileNotExists("A.md")
|
||||
.assertFileNotExists("B.md")
|
||||
s
|
||||
.assertFileCount(0);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,8 +9,7 @@ export const renameCreateConflictTest: TestDefinition = {
|
|||
{ type: "enable-sync", client: 0 },
|
||||
{ type: "enable-sync", client: 1 },
|
||||
{ type: "create", client: 0, path: "A.md", content: "hi" },
|
||||
{ type: "sync", client: 0 },
|
||||
{ type: "sync", client: 1 },
|
||||
{ type: "barrier" },
|
||||
{
|
||||
type: "assert-consistent",
|
||||
verify: (s: AssertableState): void => {
|
||||
|
|
@ -26,7 +25,7 @@ export const renameCreateConflictTest: TestDefinition = {
|
|||
{
|
||||
type: "assert-consistent",
|
||||
verify: (s: AssertableState): void => {
|
||||
s.assertFileNotExists("A.md").assertContent("B.md", "hi");
|
||||
s.assertFileCount(2).assertContent("B.md", "hi").assertContent("B (1).md", "hi");
|
||||
}
|
||||
}
|
||||
]
|
||||
|
|
|
|||
|
|
@ -31,14 +31,7 @@ export const simultaneousCreateDeleteSamePathTest: TestDefinition = {
|
|||
{
|
||||
type: "assert-consistent",
|
||||
verify: (s: AssertableState): void => {
|
||||
s.ifFileExists("A.md", (inner) =>
|
||||
inner
|
||||
.assertFileCount(1)
|
||||
.assertContent("A.md", "modified by 1 while offline")
|
||||
);
|
||||
if (!s.files.has("A.md")) {
|
||||
s.assertFileCount(0);
|
||||
}
|
||||
s.assertFileCount(0);
|
||||
}
|
||||
}
|
||||
]
|
||||
|
|
|
|||
|
|
@ -373,17 +373,6 @@ export class SyncClient {
|
|||
this.syncer.syncLocallyCreatedFile(relativePath);
|
||||
}
|
||||
|
||||
public syncLocallyDeletedFile(relativePath: RelativePath): void {
|
||||
this.checkIfDestroyed("syncLocallyDeletedFile");
|
||||
|
||||
this.fileChangeNotifier.notifyOfFileChange(relativePath); // this is for updating cursors
|
||||
if (this.expectedFsEvents.matchDelete(relativePath)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.syncer.syncLocallyDeletedFile(relativePath);
|
||||
}
|
||||
|
||||
public syncLocallyUpdatedFile({
|
||||
oldPath,
|
||||
relativePath
|
||||
|
|
@ -404,6 +393,19 @@ export class SyncClient {
|
|||
});
|
||||
}
|
||||
|
||||
public syncLocallyDeletedFile(relativePath: RelativePath): void {
|
||||
this.checkIfDestroyed("syncLocallyDeletedFile");
|
||||
|
||||
this.fileChangeNotifier.notifyOfFileChange(relativePath); // this is for updating cursors
|
||||
if (this.expectedFsEvents.matchDelete(relativePath)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.syncer.syncLocallyDeletedFile(relativePath);
|
||||
}
|
||||
|
||||
|
||||
|
||||
public getDocumentSyncingStatus(
|
||||
relativePath: RelativePath
|
||||
): DocumentSyncStatus {
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ function truncateFileNameToByteLimit(
|
|||
maxBytes: number
|
||||
): string {
|
||||
const encoder = new TextEncoder();
|
||||
if (encoder.encode(fileName).byteLength <= maxBytes) {return fileName;}
|
||||
if (encoder.encode(fileName).byteLength <= maxBytes) { return fileName; }
|
||||
|
||||
const dotIndex = fileName.lastIndexOf(".");
|
||||
// Dotfile (starts with "." and nothing else) → no extension to preserve.
|
||||
|
|
@ -35,7 +35,7 @@ function truncateFileNameToByteLimit(
|
|||
let usedBytes = 0;
|
||||
for (const { segment } of segmenter.segment(stem)) {
|
||||
const segmentBytes = encoder.encode(segment).byteLength;
|
||||
if (usedBytes + segmentBytes > stemBudget) {break;}
|
||||
if (usedBytes + segmentBytes > stemBudget) { break; }
|
||||
truncatedStem += segment;
|
||||
usedBytes += segmentBytes;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,14 +38,6 @@ export class SyncEventQueue {
|
|||
// It maps pending changes onto the local filesystem.
|
||||
private readonly events: SyncEvent[] = [];
|
||||
|
||||
// Tombstones: documents we deleted along with the vaultUpdateId at
|
||||
// which the delete committed. After we delete, the server may still
|
||||
// send us older broadcasts for that document (e.g. a backlog update
|
||||
// committed before the delete from another client). Without these
|
||||
// entries, the syncer would resurrect the doc by treating an old
|
||||
// update as a brand-new create.
|
||||
private readonly deletedDocuments = new Map<DocumentId, VaultUpdateId>();
|
||||
|
||||
// file creations for paths matching any of these patterns are ignored
|
||||
// because the user explicitly told us to ignore them.
|
||||
private userIgnorePatterns: RegExp[];
|
||||
|
|
@ -302,42 +294,6 @@ export class SyncEventQueue {
|
|||
return this.save();
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a document as deleted at a given vault-update version. Used by
|
||||
* the syncer after a successful local or remote delete so future
|
||||
* obsolete broadcasts for that doc (older parents that arrive late)
|
||||
* don't resurrect it as a brand-new create.
|
||||
*/
|
||||
public recordDeletion(
|
||||
documentId: DocumentId,
|
||||
deletedAtVaultUpdateId: VaultUpdateId
|
||||
): void {
|
||||
const existing = this.deletedDocuments.get(documentId);
|
||||
if (existing !== undefined && existing >= deletedAtVaultUpdateId) {
|
||||
return;
|
||||
}
|
||||
this.deletedDocuments.set(documentId, deletedAtVaultUpdateId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the vault-update version at which we last saw this document
|
||||
* deleted, or `undefined` if we have no record of its deletion.
|
||||
*/
|
||||
public getDeletionVersion(
|
||||
documentId: DocumentId
|
||||
): VaultUpdateId | undefined {
|
||||
return this.deletedDocuments.get(documentId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Forget a doc's tombstone — used when a doc with the same id is
|
||||
* re-introduced (e.g. via a remote create whose server-side state
|
||||
* surpasses the previous delete).
|
||||
*/
|
||||
public clearDeletion(documentId: DocumentId): void {
|
||||
this.deletedDocuments.delete(documentId);
|
||||
}
|
||||
|
||||
public getDocumentByDocumentId(
|
||||
target: DocumentId
|
||||
): DocumentWithPath | undefined {
|
||||
|
|
@ -436,7 +392,7 @@ export class SyncEventQueue {
|
|||
return undefined;
|
||||
}
|
||||
|
||||
private updatePendingCreatePath(
|
||||
public updatePendingCreatePath(
|
||||
oldPath: RelativePath,
|
||||
newPath: RelativePath
|
||||
): void {
|
||||
|
|
|
|||
|
|
@ -217,8 +217,8 @@ export class Syncer {
|
|||
}
|
||||
|
||||
private ensureDraining(): void {
|
||||
if (this.drainPromise !== undefined) {return;}
|
||||
if (this.isScanning) {return;}
|
||||
if (this.drainPromise !== undefined) { return; }
|
||||
if (this.isScanning) { return; }
|
||||
this.drainPromise = this.drain().finally(() => {
|
||||
this.drainPromise = undefined;
|
||||
});
|
||||
|
|
@ -318,7 +318,7 @@ export class Syncer {
|
|||
|
||||
private async skipIfOversized(event: SyncEvent): Promise<boolean> {
|
||||
let sizeInBytes = 0;
|
||||
let relativePath: RelativePath = "";
|
||||
let relativePath: RelativePath;
|
||||
|
||||
switch (event.type) {
|
||||
case SyncEventType.LocalDelete:
|
||||
|
|
@ -329,7 +329,7 @@ export class Syncer {
|
|||
relativePath = event.path;
|
||||
break;
|
||||
case SyncEventType.RemoteChange:
|
||||
if (event.remoteVersion.isDeleted) {return false;}
|
||||
if (event.remoteVersion.isDeleted) { return false; }
|
||||
sizeInBytes = event.remoteVersion.contentSize;
|
||||
({ relativePath } = event.remoteVersion);
|
||||
break;
|
||||
|
|
@ -339,7 +339,7 @@ export class Syncer {
|
|||
sizeInBytes,
|
||||
relativePath
|
||||
);
|
||||
if (oversizedEntry === undefined) {return false;}
|
||||
if (oversizedEntry === undefined) { return false; }
|
||||
|
||||
this.history.addHistoryEntry(oversizedEntry);
|
||||
|
||||
|
|
@ -417,22 +417,24 @@ export class Syncer {
|
|||
);
|
||||
return;
|
||||
}
|
||||
const relativePath = doc.path;
|
||||
|
||||
const response = await this.syncService.delete({
|
||||
documentId,
|
||||
relativePath
|
||||
relativePath: doc.path
|
||||
});
|
||||
|
||||
await this.queue.removeDocument(doc.path);
|
||||
this.queue.recordDeletion(documentId, response.vaultUpdateId);
|
||||
this.queue.lastSeenUpdateId = response.vaultUpdateId;
|
||||
|
||||
// Don't remove the doc from the queue or advance lastSeenUpdateId
|
||||
// here. The server broadcasts the delete back to us over the
|
||||
// WebSocket; that receipt drives `processRemoteDelete`'s cleanup
|
||||
// and history entry. Keeping the entry in the map until then lets
|
||||
// late remote updates be recognised as "file is missing" and
|
||||
// skipped, instead of resurrecting the doc.
|
||||
//
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
details: {
|
||||
type: SyncType.DELETE,
|
||||
relativePath
|
||||
relativePath: doc.path
|
||||
},
|
||||
message: "Successfully deleted file on the server",
|
||||
author: response.userId
|
||||
|
|
@ -520,7 +522,7 @@ export class Syncer {
|
|||
parentVersionId: response.vaultUpdateId,
|
||||
remoteRelativePath: response.relativePath
|
||||
};
|
||||
let remoteHash = "";
|
||||
let remoteHash: string;
|
||||
|
||||
if ("type" in response && response.type === "MergingUpdate") {
|
||||
const responseBytes = base64ToBytes(response.contentBase64);
|
||||
|
|
@ -565,16 +567,16 @@ export class Syncer {
|
|||
`Document ${response.documentId} is no longer tracked after update; cannot reconcile potential rename`
|
||||
);
|
||||
} else {
|
||||
const currentPath = tracked.path ?? path;
|
||||
const currentPath = tracked.path;
|
||||
if (currentPath === path) {
|
||||
// a http response will always be more up-to-date than any queued remote update
|
||||
// move will always move to the relative path when MoveOnConflict.EXISTING is given
|
||||
await this.operations.move(
|
||||
path,
|
||||
currentPath,
|
||||
response.relativePath,
|
||||
MoveOnConflict.EXISTING
|
||||
);
|
||||
|
||||
this.queue.updatePendingCreatePath(currentPath, response.relativePath);
|
||||
await this.queue.setDocument(response.relativePath, {
|
||||
...record,
|
||||
remoteHash
|
||||
|
|
@ -597,13 +599,13 @@ export class Syncer {
|
|||
// consistent. Without this, a later remote create at the
|
||||
// originally-requested path would see a phantom local conflict
|
||||
// and stash the new file under a `conflict-<uuid>-` path.
|
||||
if (response.relativePath !== createEvent.path) {
|
||||
if (response.relativePath !== createEvent.originalPath) {
|
||||
await this.operations.move(
|
||||
createEvent.path,
|
||||
response.relativePath,
|
||||
MoveOnConflict.EXISTING
|
||||
);
|
||||
createEvent.path = response.relativePath;
|
||||
this.queue.updatePendingCreatePath(createEvent.path, response.relativePath);
|
||||
}
|
||||
await this.queue.resolveCreate(createEvent, {
|
||||
...record,
|
||||
|
|
@ -624,7 +626,12 @@ export class Syncer {
|
|||
|
||||
if (remoteVersion.isDeleted) {
|
||||
if (documentWithPath === undefined) {
|
||||
// trying to delete a document we've already scheduled for deletion locally
|
||||
// The doc isn't tracked locally — either we never had
|
||||
// it (joined the vault after the delete) or a previous
|
||||
// delete already cleaned it up. Just advance
|
||||
// `lastSeenUpdateId` so we don't replay this on the
|
||||
// next reconnect.
|
||||
this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId;
|
||||
return;
|
||||
}
|
||||
return this.processRemoteDelete(
|
||||
|
|
@ -633,24 +640,6 @@ export class Syncer {
|
|||
);
|
||||
}
|
||||
|
||||
// The doc was deleted at-or-after the version this broadcast
|
||||
// describes (e.g. another client's update committed before our
|
||||
// local delete; the server's backlog is replaying it now). Apply
|
||||
// would resurrect a doc we deliberately removed.
|
||||
const deletedAt = this.queue.getDeletionVersion(
|
||||
remoteVersion.documentId
|
||||
);
|
||||
if (
|
||||
deletedAt !== undefined &&
|
||||
deletedAt >= remoteVersion.vaultUpdateId
|
||||
) {
|
||||
this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId;
|
||||
this.logger.debug(
|
||||
`Skipping obsolete remote update for already-deleted document ${remoteVersion.documentId} (V=${remoteVersion.vaultUpdateId} <= deleted V=${deletedAt})`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
(documentWithPath?.record.parentVersionId ?? 0) >=
|
||||
remoteVersion.vaultUpdateId
|
||||
|
|
@ -663,7 +652,22 @@ export class Syncer {
|
|||
}
|
||||
|
||||
if (documentWithPath !== undefined) {
|
||||
// must be the update to an existing doc
|
||||
// The doc is tracked. If the local file backing it has
|
||||
// gone missing — e.g. the user deleted it and the
|
||||
// LocalDelete hasn't drained yet, or our HTTP DELETE just
|
||||
// landed and we're still waiting on the WebSocket receipt
|
||||
// — ignore the update. Otherwise we'd try to operate on a
|
||||
// vanished file (or recreate one we're tearing down).
|
||||
const fileExists = await this.operations.exists(
|
||||
documentWithPath.path
|
||||
);
|
||||
if (!fileExists) {
|
||||
this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId;
|
||||
this.logger.debug(
|
||||
`Ignoring remote update for ${remoteVersion.documentId}: local file at ${documentWithPath.path} is missing`
|
||||
);
|
||||
return;
|
||||
}
|
||||
return this.processRemoteUpdate(
|
||||
documentWithPath.path,
|
||||
documentWithPath.record,
|
||||
|
|
@ -671,10 +675,6 @@ export class Syncer {
|
|||
);
|
||||
}
|
||||
|
||||
const pendingCreate = this.queue.findLatestCreateForPath(
|
||||
remoteVersion.relativePath
|
||||
);
|
||||
|
||||
return this.processRemoteCreateForNewDocument(remoteVersion);
|
||||
}
|
||||
|
||||
|
|
@ -684,10 +684,6 @@ export class Syncer {
|
|||
): Promise<void> {
|
||||
await this.operations.delete(path);
|
||||
await this.queue.removeDocument(path);
|
||||
this.queue.recordDeletion(
|
||||
remoteVersion.documentId,
|
||||
remoteVersion.vaultUpdateId
|
||||
);
|
||||
|
||||
this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId;
|
||||
|
||||
|
|
|
|||
|
|
@ -717,19 +717,23 @@ impl Database {
|
|||
.await
|
||||
.context("Failed to commit transaction")?;
|
||||
|
||||
// The broadcast is delivered to every connected client except the
|
||||
// author — the send task filters on `origin_device_id` (see
|
||||
// `websocket.rs`). The origin already has authoritative state
|
||||
// from the HTTP response that triggered this write.
|
||||
self.broadcasts.send_document_update(
|
||||
vault_id.clone(),
|
||||
WebSocketServerMessageWithOrigin::with_origin(
|
||||
version.device_id.clone(),
|
||||
WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate {
|
||||
document: version.clone().into(),
|
||||
}),
|
||||
),
|
||||
);
|
||||
// For non-delete writes the originating device already has
|
||||
// authoritative state from its HTTP response, so we tag the
|
||||
// broadcast with `origin_device_id` and the send task in
|
||||
// `websocket.rs` filters it out for that device. Deletes are
|
||||
// delivered to *every* connected client including the author —
|
||||
// the originator only removes the document from its sync queue
|
||||
// once it receives this receipt.
|
||||
let envelope = WebSocketServerMessage::VaultUpdate(WebSocketVaultUpdate {
|
||||
document: version.clone().into(),
|
||||
});
|
||||
let with_origin = if version.is_deleted {
|
||||
WebSocketServerMessageWithOrigin::new(envelope)
|
||||
} else {
|
||||
WebSocketServerMessageWithOrigin::with_origin(version.device_id.clone(), envelope)
|
||||
};
|
||||
self.broadcasts
|
||||
.send_document_update(vault_id.clone(), with_origin);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -58,11 +58,15 @@ pub struct CursorPositionFromServer {
|
|||
pub clients: Vec<ClientCursors>,
|
||||
}
|
||||
|
||||
// One committed version, broadcast to every connected client *except*
|
||||
// the device that authored it — that device already has the new state
|
||||
// via its HTTP response. The server also emits these one-at-a-time to
|
||||
// catch up a freshly-connected client on versions committed while it
|
||||
// was offline, in ascending `vault_update_id` order.
|
||||
// One committed version. Non-delete updates are broadcast to every
|
||||
// connected client *except* the device that authored them — that
|
||||
// device already has the new state via its HTTP response. Deletes are
|
||||
// broadcast to every client including the author: the author keeps
|
||||
// the document in its sync queue until this receipt arrives so a late
|
||||
// remote update can't sneak in between the HTTP response and the
|
||||
// queue cleanup. The server also emits these one-at-a-time to catch
|
||||
// up a freshly-connected client on versions committed while it was
|
||||
// offline, in ascending `vault_update_id` order.
|
||||
#[derive(TS, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct WebSocketVaultUpdate {
|
||||
|
|
|
|||
|
|
@ -165,6 +165,9 @@ async fn websocket(
|
|||
Ok(update) => {
|
||||
// Drop messages this device authored because the HTTP
|
||||
// response already carried authoritative state back.
|
||||
// Delete broadcasts are sent without an origin so the
|
||||
// author also receives them — that's the receipt the
|
||||
// client needs to drop the doc from its sync queue.
|
||||
if Some(&device_id) == update.origin_device_id.as_ref() {
|
||||
continue;
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue