Refactor and remove extra sync step
This commit is contained in:
parent
fb8badae44
commit
62c41e6ecd
1 changed files with 105 additions and 92 deletions
|
|
@ -1,5 +1,8 @@
|
|||
import type { Database } from "src/database/database";
|
||||
import type { RelativePath } from "src/database/document-metadata";
|
||||
import type {
|
||||
DocumentMetadata,
|
||||
RelativePath,
|
||||
} from "src/database/document-metadata";
|
||||
import type { FileOperations } from "src/file-operations/file-operations";
|
||||
import * as lib from "../../../backend/sync_lib/pkg/sync_lib.js";
|
||||
import type { SyncService } from "src/services/sync-service";
|
||||
|
|
@ -19,8 +22,10 @@ export class Syncer {
|
|||
|
||||
private isRunningOfflineSync = false;
|
||||
|
||||
// The offline sync methods call file sync methods, however, we can't preempt promises so we 2 queues to avoid a deadlock
|
||||
private readonly offlineSyncQueue: PQueue;
|
||||
private readonly fileSyncQueue: PQueue;
|
||||
|
||||
private readonly remainingOperationsListeners: ((
|
||||
remainingOperations: number
|
||||
) => void)[] = [];
|
||||
|
|
@ -53,16 +58,12 @@ export class Syncer {
|
|||
this.offlineSyncQueue.concurrency = settings.syncConcurrency;
|
||||
});
|
||||
|
||||
this.fileSyncQueue.on("active", () => {
|
||||
const updateRemainingOperations = () =>
|
||||
this.emitRemainingOperationsChange(
|
||||
this.fileSyncQueue.size + this.offlineSyncQueue.size
|
||||
);
|
||||
});
|
||||
this.offlineSyncQueue.on("active", () => {
|
||||
this.emitRemainingOperationsChange(
|
||||
this.fileSyncQueue.size + this.offlineSyncQueue.size
|
||||
);
|
||||
});
|
||||
this.fileSyncQueue.on("active", updateRemainingOperations);
|
||||
this.offlineSyncQueue.on("active", updateRemainingOperations);
|
||||
}
|
||||
|
||||
public addRemainingOperationsListener(
|
||||
|
|
@ -77,15 +78,25 @@ export class Syncer {
|
|||
): Promise<void> {
|
||||
await this.safelySync(async () => {
|
||||
try {
|
||||
const contentBytes = await this.operations.read(relativePath);
|
||||
const contentHash = hash(contentBytes);
|
||||
|
||||
const metadata = this.database.getDocument(relativePath);
|
||||
if (metadata) {
|
||||
Logger.getInstance().debug(
|
||||
`Document metadata already exists for ${relativePath}, it must have been downloaded from the server`
|
||||
);
|
||||
}
|
||||
|
||||
const contentBytes = await this.operations.read(relativePath);
|
||||
const contentHash = hash(contentBytes);
|
||||
if (metadata.hash === contentHash) {
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.NO_OP,
|
||||
relativePath,
|
||||
message: `File hash matches with last synced version, no need to sync`,
|
||||
type: SyncType.UPDATE,
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const response = await this.syncService.create({
|
||||
relativePath,
|
||||
|
|
@ -126,14 +137,7 @@ export class Syncer {
|
|||
hash: responseHash,
|
||||
});
|
||||
|
||||
if (
|
||||
this.database.getLastSeenUpdateId() ===
|
||||
response.vaultUpdateId - 1
|
||||
) {
|
||||
await this.database.setLastSeenUpdateId(
|
||||
response.vaultUpdateId
|
||||
);
|
||||
}
|
||||
await this.tryIncrementVaultUpdateId(response.vaultUpdateId);
|
||||
} catch (e) {
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.ERROR,
|
||||
|
|
@ -146,50 +150,6 @@ export class Syncer {
|
|||
}, relativePath);
|
||||
}
|
||||
|
||||
public async syncLocallyDeletedFile(
|
||||
relativePath: RelativePath
|
||||
): Promise<void> {
|
||||
await this.safelySync(async () => {
|
||||
try {
|
||||
const metadata = this.database.getDocument(relativePath);
|
||||
if (!metadata) {
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.NO_OP,
|
||||
relativePath,
|
||||
message: `Locally deleted file hasn't been uploaded yet, so there's no need to delete it on the remote server`,
|
||||
type: SyncType.DELETE,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
await this.syncService.delete({
|
||||
documentId: metadata.documentId,
|
||||
relativePath,
|
||||
// We got the event now, so it must have been deleted just now
|
||||
createdDate: new Date(),
|
||||
});
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
source: SyncSource.PUSH,
|
||||
relativePath,
|
||||
message: `Successfully deleted locally deleted file on the remote server`,
|
||||
type: SyncType.DELETE,
|
||||
});
|
||||
|
||||
await this.database.removeDocument(relativePath);
|
||||
} catch (e) {
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.ERROR,
|
||||
relativePath,
|
||||
message: `Failed to remotely delete locally deleted file: ${e}`,
|
||||
type: SyncType.DELETE,
|
||||
});
|
||||
throw e;
|
||||
}
|
||||
}, relativePath);
|
||||
}
|
||||
|
||||
public async syncLocallyUpdatedFile({
|
||||
oldPath,
|
||||
relativePath,
|
||||
|
|
@ -206,7 +166,7 @@ export class Syncer {
|
|||
);
|
||||
if (!metadata) {
|
||||
throw new Error(
|
||||
`Document metadata not found for ${relativePath}. Consider resetting the plugin's sync history.`
|
||||
`Document metadata not found for ${relativePath}. This implies a corrupt local database. Consider resetting the plugin's sync history.`
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -242,15 +202,9 @@ export class Syncer {
|
|||
if (response.isDeleted) {
|
||||
await this.operations.remove(oldPath ?? relativePath);
|
||||
await this.database.removeDocument(oldPath ?? relativePath);
|
||||
|
||||
if (
|
||||
this.database.getLastSeenUpdateId() ===
|
||||
response.vaultUpdateId - 1
|
||||
) {
|
||||
await this.database.setLastSeenUpdateId(
|
||||
response.vaultUpdateId
|
||||
);
|
||||
}
|
||||
await this.tryIncrementVaultUpdateId(
|
||||
response.vaultUpdateId
|
||||
);
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
|
|
@ -307,14 +261,7 @@ export class Syncer {
|
|||
hash: responseHash,
|
||||
});
|
||||
|
||||
if (
|
||||
this.database.getLastSeenUpdateId() ===
|
||||
response.vaultUpdateId - 1
|
||||
) {
|
||||
await this.database.setLastSeenUpdateId(
|
||||
response.vaultUpdateId
|
||||
);
|
||||
}
|
||||
await this.tryIncrementVaultUpdateId(response.vaultUpdateId);
|
||||
} catch (e) {
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.ERROR,
|
||||
|
|
@ -327,6 +274,49 @@ export class Syncer {
|
|||
}, relativePath);
|
||||
}
|
||||
|
||||
public async syncLocallyDeletedFile(
|
||||
relativePath: RelativePath
|
||||
): Promise<void> {
|
||||
await this.safelySync(async () => {
|
||||
try {
|
||||
const metadata = this.database.getDocument(relativePath);
|
||||
if (!metadata) {
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.NO_OP,
|
||||
relativePath,
|
||||
message: `Locally deleted file hasn't been uploaded yet, so there's no need to delete it on the remote server`,
|
||||
type: SyncType.DELETE,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
await this.syncService.delete({
|
||||
documentId: metadata.documentId,
|
||||
relativePath,
|
||||
createdDate: new Date(), // We got the event now, so it must have been deleted just now
|
||||
});
|
||||
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.SUCCESS,
|
||||
source: SyncSource.PUSH,
|
||||
relativePath,
|
||||
message: `Successfully deleted locally deleted file on the remote server`,
|
||||
type: SyncType.DELETE,
|
||||
});
|
||||
|
||||
await this.database.removeDocument(relativePath);
|
||||
} catch (e) {
|
||||
this.history.addHistoryEntry({
|
||||
status: SyncStatus.ERROR,
|
||||
relativePath,
|
||||
message: `Failed to remotely delete locally deleted file: ${e}`,
|
||||
type: SyncType.DELETE,
|
||||
});
|
||||
throw e;
|
||||
}
|
||||
}, relativePath);
|
||||
}
|
||||
|
||||
public async scheduleSyncForOfflineChanges(): Promise<void> {
|
||||
if (this.isRunningOfflineSync) {
|
||||
Logger.getInstance().warn(
|
||||
|
|
@ -355,22 +345,24 @@ export class Syncer {
|
|||
this.offlineSyncQueue.add(async () => {
|
||||
const metadata =
|
||||
this.database.getDocument(relativePath);
|
||||
if (!metadata) {
|
||||
const contentHash = hash(
|
||||
await this.operations.read(relativePath)
|
||||
);
|
||||
const match = locallyDeletedFiles.find(
|
||||
([_, document]) => document.hash === contentHash
|
||||
);
|
||||
|
||||
if (contentHash != EMPTY_HASH && match) {
|
||||
locallyDeletedFiles.remove(match);
|
||||
// If there's no metadata, it must be a new file
|
||||
if (!metadata) {
|
||||
// Perhaps the file has been moved. Let's check by looking at the deleted files
|
||||
const originalFile =
|
||||
await findMatchingFileBasedOnHash(
|
||||
relativePath,
|
||||
locallyDeletedFiles
|
||||
);
|
||||
if (originalFile !== undefined) {
|
||||
// `originalFile` hasn't been deleted but it got moved instead
|
||||
locallyDeletedFiles.remove(originalFile);
|
||||
|
||||
Logger.getInstance().debug(
|
||||
`Document ${relativePath} not found in database but found under a different path ${match[0]}, scheduling sync to move it`
|
||||
`Document ${relativePath} was not found under its current path in the database but was found under a different path ${originalFile[0]}, scheduling sync to move it`
|
||||
);
|
||||
return this.syncLocallyUpdatedFile({
|
||||
oldPath: match[0],
|
||||
oldPath: originalFile[0],
|
||||
relativePath: relativePath,
|
||||
updateTime:
|
||||
await this.operations.getModificationTime(
|
||||
|
|
@ -614,4 +606,25 @@ export class Syncer {
|
|||
listener(remainingOperations);
|
||||
});
|
||||
}
|
||||
|
||||
private async tryIncrementVaultUpdateId(
|
||||
responseVaultUpdateId: number
|
||||
): Promise<void> {
|
||||
if (this.database.getLastSeenUpdateId() === responseVaultUpdateId - 1) {
|
||||
await this.database.setLastSeenUpdateId(responseVaultUpdateId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function findMatchingFileBasedOnHash(
|
||||
filePath: RelativePath,
|
||||
candidates: [RelativePath, DocumentMetadata][]
|
||||
): Promise<[RelativePath, DocumentMetadata] | undefined> {
|
||||
const contentHash = hash(await this.operations.read(filePath));
|
||||
|
||||
if (contentHash != EMPTY_HASH) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return candidates.find(([_, document]) => document.hash === contentHash);
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue