fix conflict path handling
This commit is contained in:
parent
321b503379
commit
081e35be5c
6 changed files with 91 additions and 184 deletions
|
|
@ -87,7 +87,6 @@ function makeOps(): {
|
|||
const fs = new FakeFileSystemOperations();
|
||||
const ops = new FileOperations(
|
||||
new Logger(),
|
||||
new MockQueue() as SyncEventQueue, // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
|
||||
fs,
|
||||
new MockServerConfig() as ServerConfig // eslint-disable-line @typescript-eslint/no-unsafe-type-assertion
|
||||
);
|
||||
|
|
|
|||
|
|
@ -10,12 +10,17 @@ import { isBinary } from "../utils/is-binary";
|
|||
import { buildConflictFileName } from "../sync-operations/conflict-path";
|
||||
import type { ServerConfig } from "../services/server-config";
|
||||
|
||||
|
||||
export enum MoveOnConflict {
|
||||
EXISTING = "EXISTING",
|
||||
NEW = "NEW",
|
||||
}
|
||||
|
||||
export class FileOperations {
|
||||
private readonly fs: SafeFileSystemOperations;
|
||||
|
||||
public constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly queue: SyncEventQueue,
|
||||
fs: FileSystemOperations,
|
||||
private readonly serverConfig: ServerConfig,
|
||||
private readonly nativeLineEndings = "\n"
|
||||
|
|
@ -50,44 +55,44 @@ export class FileOperations {
|
|||
*
|
||||
* If a file with the same name already exists, it is moved before creating the new one.
|
||||
* Parent directories are created if necessary.
|
||||
*
|
||||
* Returns the actual path the file was created at.
|
||||
*/
|
||||
public async create(
|
||||
path: RelativePath,
|
||||
newContent: Uint8Array
|
||||
): Promise<void> {
|
||||
await this.ensureClearPath(path);
|
||||
return this.fs.write(path, this.toNativeLineEndings(newContent));
|
||||
newContent: Uint8Array,
|
||||
moveOnConflict: MoveOnConflict
|
||||
): Promise<RelativePath> {
|
||||
const actualPath = await this.ensureClearPath(path, moveOnConflict);
|
||||
await this.fs.write(actualPath, this.toNativeLineEndings(newContent));
|
||||
return actualPath;
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure nothing sits at `path` so the caller can write to it.
|
||||
*
|
||||
* If a file is already there, it is moved aside to a `conflict-<uuid>-<name>`
|
||||
* path in the same directory. The sync layer treats conflict-named files
|
||||
* as invisible (see `CONFLICT_PATH_REGEX`), so no events are enqueued and no
|
||||
* document records are touched — any pre-existing record or pending
|
||||
* events for the displaced path are left behind for the caller to
|
||||
* overwrite as part of whatever operation prompted the displacement.
|
||||
*
|
||||
* Returns the conflict path the existing file was moved to, or `undefined`
|
||||
* if the path was already clear.
|
||||
*/
|
||||
public async ensureClearPath(
|
||||
path: RelativePath
|
||||
): Promise<RelativePath | undefined> {
|
||||
private async ensureClearPath(
|
||||
path: RelativePath,
|
||||
moveOnConflict: MoveOnConflict
|
||||
): Promise<RelativePath> {
|
||||
if (await this.fs.exists(path)) {
|
||||
const conflictPath = FileOperations.buildConflictPath(path);
|
||||
|
||||
if (moveOnConflict === MoveOnConflict.NEW) {
|
||||
return conflictPath;
|
||||
}
|
||||
|
||||
this.logger.debug(
|
||||
`Displacing existing file at ${path} to '${conflictPath}' to make room`
|
||||
);
|
||||
|
||||
this.queue.moveDocument(path, conflictPath);
|
||||
await this.fs.rename(path, conflictPath, true);
|
||||
await this.fs.rename(path, conflictPath);
|
||||
return conflictPath;
|
||||
}
|
||||
|
||||
this.logger.debug(`No existing file at ${path}, creating parent directories if needed`);
|
||||
await this.createParentDirectories(path);
|
||||
return undefined;
|
||||
return path;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -188,32 +193,23 @@ export class FileOperations {
|
|||
return this.fs.exists(path);
|
||||
}
|
||||
|
||||
// Returns the conflict path a displaced file was moved to, or undefined.
|
||||
// Returns the actual path the file got moved to.
|
||||
public async move(
|
||||
oldPath: RelativePath,
|
||||
newPath: RelativePath
|
||||
): Promise<RelativePath | undefined> {
|
||||
newPath: RelativePath,
|
||||
moveOnConflict: MoveOnConflict
|
||||
): Promise<RelativePath> {
|
||||
if (oldPath === newPath) {
|
||||
return undefined;
|
||||
return oldPath;
|
||||
}
|
||||
|
||||
const conflictPath = await this.ensureClearPath(newPath);
|
||||
// Do the disk rename *before* updating the queue. If the rename
|
||||
// throws (permissions, concurrent deletion, …), the queue still
|
||||
// reflects the actual on-disk state instead of claiming the doc
|
||||
// has already moved.
|
||||
await this.fs.rename(oldPath, newPath);
|
||||
this.queue.moveDocument(oldPath, newPath);
|
||||
|
||||
const actualPath = await this.ensureClearPath(newPath, moveOnConflict);
|
||||
await this.fs.rename(oldPath, actualPath);
|
||||
await this.deletingEmptyParentDirectoriesOfDeletedFile(oldPath);
|
||||
return conflictPath;
|
||||
return actualPath;
|
||||
}
|
||||
|
||||
|
||||
public reset(): void {
|
||||
this.fs.reset();
|
||||
}
|
||||
|
||||
private async deletingEmptyParentDirectoriesOfDeletedFile(
|
||||
path: RelativePath
|
||||
): Promise<void> {
|
||||
|
|
|
|||
|
|
@ -1,24 +1,18 @@
|
|||
import type { RelativePath } from "../sync-operations/types";
|
||||
import type { FileSystemOperations } from "./filesystem-operations";
|
||||
import type { Logger } from "../tracing/logger";
|
||||
import { Locks } from "../utils/data-structures/locks";
|
||||
import { FileNotFoundError } from "../errors/file-not-found-error";
|
||||
import type { TextWithCursors } from "reconcile-text";
|
||||
|
||||
/**
|
||||
* Decorates `FileSystemOperations` to replace errors with `FileNotFoundError`
|
||||
* if the accessed file doesn't exist. It also ensures that there's at most a
|
||||
* single request in-flight for any one file through the use of locks.
|
||||
* if the accessed file doesn't exist.
|
||||
*/
|
||||
export class SafeFileSystemOperations implements FileSystemOperations {
|
||||
private readonly locks: Locks<RelativePath>;
|
||||
|
||||
public constructor(
|
||||
private readonly fs: FileSystemOperations,
|
||||
private readonly logger: Logger
|
||||
) {
|
||||
this.locks = new Locks(SafeFileSystemOperations.name, logger);
|
||||
}
|
||||
) {}
|
||||
|
||||
public async listFilesRecursively(
|
||||
root: RelativePath | undefined
|
||||
|
|
@ -31,19 +25,12 @@ export class SafeFileSystemOperations implements FileSystemOperations {
|
|||
|
||||
public async read(path: RelativePath): Promise<Uint8Array> {
|
||||
this.logger.debug(`Reading file '${path}'`);
|
||||
return this.safeOperation(
|
||||
path,
|
||||
async () =>
|
||||
this.locks.withLock(path, async () => this.fs.read(path)),
|
||||
"read"
|
||||
);
|
||||
return this.safeOperation(path, async () => this.fs.read(path), "read");
|
||||
}
|
||||
|
||||
public async write(path: RelativePath, content: Uint8Array): Promise<void> {
|
||||
this.logger.debug(`Writing to file '${path}'`);
|
||||
return this.locks.withLock(path, async () =>
|
||||
this.fs.write(path, content)
|
||||
);
|
||||
return this.fs.write(path, content);
|
||||
}
|
||||
|
||||
public async atomicUpdateText(
|
||||
|
|
@ -53,10 +40,7 @@ export class SafeFileSystemOperations implements FileSystemOperations {
|
|||
this.logger.debug(`Atomically updating file '${path}'`);
|
||||
return this.safeOperation(
|
||||
path,
|
||||
async () =>
|
||||
this.locks.withLock(path, async () =>
|
||||
this.fs.atomicUpdateText(path, updater)
|
||||
),
|
||||
async () => this.fs.atomicUpdateText(path, updater),
|
||||
"atomicUpdateText"
|
||||
);
|
||||
}
|
||||
|
|
@ -65,75 +49,38 @@ export class SafeFileSystemOperations implements FileSystemOperations {
|
|||
// Logging this would be too noisy
|
||||
return this.safeOperation(
|
||||
path,
|
||||
async () =>
|
||||
this.locks.withLock(path, async () =>
|
||||
this.fs.getFileSize(path)
|
||||
),
|
||||
async () => this.fs.getFileSize(path),
|
||||
"getFileSize"
|
||||
);
|
||||
}
|
||||
|
||||
public async exists(
|
||||
path: RelativePath,
|
||||
skipLock = false
|
||||
): Promise<boolean> {
|
||||
public async exists(path: RelativePath): Promise<boolean> {
|
||||
this.logger.debug(`Checking if file '${path}' exists`);
|
||||
if (skipLock) {
|
||||
return this.fs.exists(path);
|
||||
} else {
|
||||
return this.locks.withLock(path, async () => this.fs.exists(path));
|
||||
}
|
||||
return this.fs.exists(path);
|
||||
}
|
||||
|
||||
public async createDirectory(path: RelativePath): Promise<void> {
|
||||
this.logger.debug(`Creating directory '${path}'`);
|
||||
return this.locks.withLock(path, async () =>
|
||||
this.fs.createDirectory(path)
|
||||
);
|
||||
return this.fs.createDirectory(path);
|
||||
}
|
||||
|
||||
public async delete(path: RelativePath): Promise<void> {
|
||||
this.logger.debug(`Deleting file '${path}'`);
|
||||
return this.locks.withLock(path, async () => this.fs.delete(path));
|
||||
return this.fs.delete(path);
|
||||
}
|
||||
|
||||
public async rename(
|
||||
oldPath: RelativePath,
|
||||
newPath: RelativePath,
|
||||
skipLock = false
|
||||
newPath: RelativePath
|
||||
): Promise<void> {
|
||||
this.logger.debug(`Renaming file '${oldPath}' to '${newPath}'`);
|
||||
return this.safeOperation(
|
||||
oldPath,
|
||||
async () => {
|
||||
if (skipLock) {
|
||||
return this.fs.rename(oldPath, newPath);
|
||||
} else {
|
||||
return this.locks.withLock([oldPath, newPath], async () =>
|
||||
this.fs.rename(oldPath, newPath)
|
||||
);
|
||||
}
|
||||
},
|
||||
async () => this.fs.rename(oldPath, newPath),
|
||||
"rename"
|
||||
);
|
||||
}
|
||||
|
||||
public tryLock(path: RelativePath): boolean {
|
||||
return this.locks.tryLock(path);
|
||||
}
|
||||
|
||||
public async waitForLock(path: RelativePath): Promise<void> {
|
||||
return this.locks.waitForLock(path);
|
||||
}
|
||||
|
||||
public unlock(path: RelativePath): void {
|
||||
this.locks.unlock(path);
|
||||
}
|
||||
|
||||
public reset(): void {
|
||||
this.locks.reset();
|
||||
}
|
||||
|
||||
/**
|
||||
* Decorate an operation to ensure that the file exists before running it.
|
||||
* If the operation fails, it will check if the file still exists and throw
|
||||
|
|
@ -154,9 +101,6 @@ export class SafeFileSystemOperations implements FileSystemOperations {
|
|||
try {
|
||||
return await operation();
|
||||
} catch (error) {
|
||||
// Without locking the file, this isn't atomic, however, it's good enough in practice.
|
||||
// This will only break if the file exists, gets deleted and then immediately
|
||||
// recreated while `operation` is running.
|
||||
if (await this.fs.exists(path)) {
|
||||
throw error;
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -172,7 +172,6 @@ export class SyncClient {
|
|||
|
||||
const fileOperations = new FileOperations(
|
||||
logger,
|
||||
syncEventQueue,
|
||||
fs,
|
||||
serverConfig,
|
||||
nativeLineEndings
|
||||
|
|
@ -489,7 +488,6 @@ export class SyncClient {
|
|||
this.contentCache.reset();
|
||||
this.cursorTracker.reset();
|
||||
this.syncer.reset();
|
||||
this.fileOperations.reset();
|
||||
}
|
||||
|
||||
private async onSettingsChange(
|
||||
|
|
|
|||
|
|
@ -221,7 +221,7 @@ export class SyncEventQueue {
|
|||
...record
|
||||
})
|
||||
),
|
||||
lastSeenUpdateId: this._lastSeenUpdateId
|
||||
lastSeenUpdateId: this.lastSeenUpdateId
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -230,6 +230,7 @@ export class SyncEventQueue {
|
|||
return this.documents.get(path);
|
||||
}
|
||||
|
||||
|
||||
public allSettledDocuments(): Map<RelativePath, DocumentRecord> {
|
||||
return new Map(this.documents.entries());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import {
|
|||
import type { Logger } from "../tracing/logger";
|
||||
import { hash } from "../utils/hash";
|
||||
import type { Settings } from "../persistence/settings";
|
||||
import type { FileOperations } from "../file-operations/file-operations";
|
||||
import { MoveOnConflict, type FileOperations } from "../file-operations/file-operations";
|
||||
import { scheduleOfflineChanges } from "./offline-change-detector";
|
||||
import { SyncResetError } from "../errors/sync-reset-error";
|
||||
import type { DocumentVersionWithoutContent } from "../services/types/DocumentVersionWithoutContent";
|
||||
|
|
@ -523,7 +523,9 @@ export class Syncer {
|
|||
}
|
||||
|
||||
if (createEvent === undefined) {
|
||||
this.ensurePath(path, response.relativePath, Move.Existing);
|
||||
// a http response will always be more up-to-date than any queued remote update
|
||||
this.operations.move(path, response.relativePath, MoveOnConflict.EXISTING);
|
||||
|
||||
await this.queue.setDocument(response.relativePath, {
|
||||
...record,
|
||||
remoteHash
|
||||
|
|
@ -633,22 +635,23 @@ export class Syncer {
|
|||
|
||||
|
||||
// wait for a local edit to do the actual updating here, so we can't even update the lastSeenUpdateId here
|
||||
this.ensurePath(path, remoteVersion.relativePath);
|
||||
const conflictingDoc = this.queue.getSettledDocumentByPath(remoteVersion.relativePath);
|
||||
const actualRelativePath = await this.operations.move(path, remoteVersion.relativePath, conflictingDoc?.parentVersionId ?? 0 < remoteVersion.vaultUpdateId ? MoveOnConflict.EXISTING : MoveOnConflict.NEW);
|
||||
|
||||
this.queue.setDocument(remoteVersion.relativePath, {
|
||||
this.queue.setDocument(actualRelativePath, {
|
||||
...record,
|
||||
remoteRelativePath: remoteVersion.relativePath
|
||||
remoteRelativePath: actualRelativePath
|
||||
});
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
details: {
|
||||
type: SyncType.MOVE,
|
||||
relativePath: remoteVersion.relativePath,
|
||||
relativePath: actualRelativePath,
|
||||
movedFrom: path
|
||||
},
|
||||
// todo: eh
|
||||
message: `File was renamed remotely from ${path} to ${remoteVersion.relativePath}`,
|
||||
message: `File was renamed remotely from ${path} to ${actualRelativePath}`,
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -658,19 +661,22 @@ export class Syncer {
|
|||
vaultUpdateId: remoteVersion.vaultUpdateId
|
||||
});
|
||||
|
||||
await this.operations.create(
|
||||
const conflictingDoc = this.queue.getSettledDocumentByPath(remoteVersion.relativePath);
|
||||
|
||||
const actualPath = await this.operations.create(
|
||||
remoteVersion.relativePath,
|
||||
remoteContent
|
||||
remoteContent,
|
||||
conflictingDoc?.parentVersionId ?? 0 < remoteVersion.vaultUpdateId ? MoveOnConflict.EXISTING : MoveOnConflict.NEW
|
||||
);
|
||||
|
||||
await this.updateCache(
|
||||
remoteVersion.vaultUpdateId,
|
||||
remoteContent,
|
||||
remoteVersion.relativePath
|
||||
actualPath
|
||||
);
|
||||
|
||||
const contentHash = await hash(remoteContent);
|
||||
await this.queue.setDocument(remoteVersion.relativePath, {
|
||||
await this.queue.setDocument(actualPath, {
|
||||
documentId: remoteVersion.documentId,
|
||||
parentVersionId: remoteVersion.vaultUpdateId,
|
||||
remoteHash: contentHash,
|
||||
|
|
@ -683,7 +689,7 @@ export class Syncer {
|
|||
status: SyncStatus.SUCCESS,
|
||||
details: {
|
||||
type: SyncType.CREATE,
|
||||
relativePath: remoteVersion.relativePath
|
||||
relativePath: actualPath
|
||||
},
|
||||
message:
|
||||
"Successfully downloaded remote file which hadn't existed locally",
|
||||
|
|
@ -706,72 +712,35 @@ export class Syncer {
|
|||
const remoteHash = await hash(remoteContent);
|
||||
|
||||
const path = remoteVersion.relativePath;
|
||||
const localContent = await this.operations.read(path);
|
||||
const currentContent = await this.operations.read(pendingCreateEvent.path);
|
||||
|
||||
const canMergeText =
|
||||
isFileTypeMergable(
|
||||
path,
|
||||
(await this.serverConfig.getConfig()).mergeableFileExtensions
|
||||
) &&
|
||||
!isBinary(localContent) &&
|
||||
!isBinary(remoteContent);
|
||||
await this.operations.write(path, currentContent, remoteContent);
|
||||
await this.updateCache(
|
||||
remoteVersion.vaultUpdateId,
|
||||
remoteContent,
|
||||
path
|
||||
);
|
||||
|
||||
if (canMergeText) {
|
||||
const currentContent = await this.operations.read(pendingCreateEvent.path);
|
||||
await this.queue.resolveCreate(pendingCreateEvent, {
|
||||
documentId: remoteVersion.documentId,
|
||||
parentVersionId: remoteVersion.vaultUpdateId,
|
||||
remoteHash,
|
||||
remoteRelativePath: path
|
||||
});
|
||||
this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId;
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
details: {
|
||||
type: SyncType.UPDATE,
|
||||
relativePath: path
|
||||
},
|
||||
message:
|
||||
`Adopted remote create at ${path}`,
|
||||
author: remoteVersion.userId,
|
||||
timestamp: new Date(remoteVersion.updatedDate)
|
||||
});
|
||||
|
||||
|
||||
const merged = reconcile("", new TextDecoder().decode(currentContent), new TextDecoder().decode(remoteContent)).text;
|
||||
await this.operations.write(path, currentContent, new TextEncoder().encode(merged));
|
||||
await this.updateCache(
|
||||
remoteVersion.vaultUpdateId,
|
||||
remoteContent,
|
||||
path
|
||||
);
|
||||
|
||||
await this.queue.resolveCreate(pendingCreateEvent, {
|
||||
documentId: remoteVersion.documentId,
|
||||
parentVersionId: remoteVersion.vaultUpdateId,
|
||||
remoteHash,
|
||||
remoteRelativePath: path
|
||||
});
|
||||
this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId;
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
details: {
|
||||
type: SyncType.UPDATE,
|
||||
relativePath: path
|
||||
},
|
||||
message:
|
||||
`Adopted remote create at ${path}`,
|
||||
author: remoteVersion.userId,
|
||||
timestamp: new Date(remoteVersion.updatedDate)
|
||||
});
|
||||
return;
|
||||
} else {
|
||||
await this.operations.ensureClearPath(path);
|
||||
await this.operations.create(path, remoteContent);
|
||||
await this.queue.setDocument(path, {
|
||||
documentId: remoteVersion.documentId,
|
||||
parentVersionId: remoteVersion.vaultUpdateId,
|
||||
remoteHash,
|
||||
remoteRelativePath: path
|
||||
});
|
||||
this.queue.lastSeenUpdateId = remoteVersion.vaultUpdateId;
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
details: {
|
||||
type: SyncType.CREATE,
|
||||
relativePath: path
|
||||
},
|
||||
message:
|
||||
`Created remotly created file at ${path}`,
|
||||
author: remoteVersion.userId,
|
||||
timestamp: new Date(remoteVersion.updatedDate)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue