Keep updating the last seen id correctly
This commit is contained in:
parent
f63d3dd830
commit
96bf542b91
3 changed files with 66 additions and 41 deletions
|
|
@ -1,5 +1,6 @@
|
|||
import type { Logger } from "../tracing/logger";
|
||||
import { EMPTY_HASH } from "../utils/hash";
|
||||
import { CoveredValues } from "../utils/min-covered";
|
||||
|
||||
export type VaultUpdateId = number;
|
||||
export type DocumentId = string;
|
||||
|
|
@ -40,7 +41,7 @@ export interface DocumentRecord {
|
|||
|
||||
export class Database {
|
||||
private documents: DocumentRecord[];
|
||||
private lastSeenUpdateId: VaultUpdateId | undefined;
|
||||
private lastSeenUpdateIds: CoveredValues;
|
||||
private hasInitialSyncCompleted: boolean;
|
||||
|
||||
public constructor(
|
||||
|
|
@ -65,9 +66,10 @@ export class Database {
|
|||
this.ensureConsistency();
|
||||
this.logger.debug(`Loaded ${this.documents.length} documents`);
|
||||
|
||||
this.lastSeenUpdateId = initialState.lastSeenUpdateId;
|
||||
this.logger.debug(
|
||||
`Loaded last seen update id: ${this.lastSeenUpdateId}`
|
||||
const { lastSeenUpdateId } = initialState;
|
||||
this.logger.debug(`Loaded last seen update id: ${lastSeenUpdateId}`);
|
||||
this.lastSeenUpdateIds = new CoveredValues(
|
||||
Math.max(0, lastSeenUpdateId ?? 0) // the first updateId will be 1 which is the first integer after -1
|
||||
);
|
||||
|
||||
this.hasInitialSyncCompleted =
|
||||
|
|
@ -286,18 +288,28 @@ export class Database {
|
|||
this.save();
|
||||
}
|
||||
|
||||
public getLastSeenUpdateId(): VaultUpdateId | undefined {
|
||||
return this.lastSeenUpdateId;
|
||||
public getLastSeenUpdateId(): VaultUpdateId {
|
||||
return this.lastSeenUpdateIds.min;
|
||||
}
|
||||
|
||||
public setLastSeenUpdateId(value: VaultUpdateId | undefined): void {
|
||||
this.lastSeenUpdateId = value;
|
||||
public addLastSeenUpdateId(value: number): void {
|
||||
const previousMin = this.lastSeenUpdateIds.min;
|
||||
this.lastSeenUpdateIds.add(value);
|
||||
if (previousMin !== this.lastSeenUpdateIds.min) {
|
||||
this.save();
|
||||
}
|
||||
}
|
||||
|
||||
public setLastSeenUpdateId(value: number): void {
|
||||
this.lastSeenUpdateIds.min = value;
|
||||
this.save();
|
||||
}
|
||||
|
||||
public reset(): void {
|
||||
this.documents = [];
|
||||
this.lastSeenUpdateId = 0;
|
||||
this.lastSeenUpdateIds = new CoveredValues(
|
||||
0 // the first updateId will be 1 which is the first integer after -1
|
||||
);
|
||||
this.hasInitialSyncCompleted = false;
|
||||
this.save();
|
||||
}
|
||||
|
|
@ -313,7 +325,7 @@ export class Database {
|
|||
...metadata! // `resolvedDocuments` only returns docs with metadata set
|
||||
})
|
||||
),
|
||||
lastSeenUpdateId: this.lastSeenUpdateId,
|
||||
lastSeenUpdateId: this.lastSeenUpdateIds.min,
|
||||
hasInitialSyncCompleted: this.hasInitialSyncCompleted
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,11 @@ import { createPromise } from "../utils/create-promise";
|
|||
import { SyncResetError } from "../services/sync-reset-error";
|
||||
import { Locks } from "../utils/locks";
|
||||
|
||||
interface WebsocketVaultUpdate {
|
||||
documents: components["schemas"]["DocumentVersionWithoutContent"][];
|
||||
isInitialSync: boolean;
|
||||
}
|
||||
|
||||
export class Syncer {
|
||||
private readonly remoteDocumentsLock: Locks<DocumentId>;
|
||||
private readonly remainingOperationsListeners: ((
|
||||
|
|
@ -273,6 +278,7 @@ export class Syncer {
|
|||
|
||||
public stop(): void {
|
||||
clearInterval(this.refreshApplyRemoteChangesWebSocketInterval);
|
||||
|
||||
try {
|
||||
this.applyRemoteChangesWebSocket?.close();
|
||||
} catch (e) {
|
||||
|
|
@ -302,14 +308,30 @@ export class Syncer {
|
|||
wsUri
|
||||
);
|
||||
|
||||
this.applyRemoteChangesWebSocket.onmessage = (event): void =>
|
||||
void this.syncRemotelyUpdatedFile(event.data).catch(
|
||||
(e: unknown) => {
|
||||
this.logger.error(
|
||||
`Failed to sync remotely updated file: ${e}`
|
||||
this.applyRemoteChangesWebSocket.onmessage = async (
|
||||
event
|
||||
): Promise<void> => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
const message = JSON.parse(event.data) as WebsocketVaultUpdate;
|
||||
|
||||
try {
|
||||
await Promise.all(
|
||||
message.documents.map(async (document) =>
|
||||
this.syncRemotelyUpdatedFile(document)
|
||||
)
|
||||
);
|
||||
|
||||
if (message.isInitialSync && message.documents.length > 0) {
|
||||
this.database.setLastSeenUpdateId(
|
||||
message.documents
|
||||
.map((document) => document.vaultUpdateId)
|
||||
.reduce((a, b) => Math.max(a, b))
|
||||
);
|
||||
}
|
||||
);
|
||||
} catch (e) {
|
||||
this.logger.error(`Failed to sync remotely updated file: ${e}`);
|
||||
}
|
||||
};
|
||||
|
||||
// The JS WebSocket API doesn't support setting headers, so we have to send the token as a message
|
||||
this.applyRemoteChangesWebSocket.onopen = (): void => {
|
||||
|
|
@ -317,7 +339,8 @@ export class Syncer {
|
|||
this.applyRemoteChangesWebSocket?.send(
|
||||
JSON.stringify({
|
||||
deviceId: this.deviceId,
|
||||
token: settings.token
|
||||
token: settings.token,
|
||||
lastSeenVaultUpdateId: this.database.getLastSeenUpdateId()
|
||||
})
|
||||
);
|
||||
this.webSocketStatusChangeListeners.forEach((listener) => {
|
||||
|
|
@ -327,7 +350,7 @@ export class Syncer {
|
|||
|
||||
this.applyRemoteChangesWebSocket.onclose = (event): void => {
|
||||
this.logger.warn(
|
||||
`WebSocket closed with code ${event.code}: ${event.reason}`
|
||||
`WebSocket closed with code ${event.code} (${event.reason == "" ? "unknown reason" : event.reason})`
|
||||
);
|
||||
this.webSocketStatusChangeListeners.forEach((listener) => {
|
||||
listener();
|
||||
|
|
@ -347,12 +370,9 @@ export class Syncer {
|
|||
}, 5000);
|
||||
}
|
||||
|
||||
private async syncRemotelyUpdatedFile(message: string): Promise<void> {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-type-assertion
|
||||
const remoteVersion = JSON.parse(
|
||||
message
|
||||
) as components["schemas"]["DocumentVersionWithoutContent"];
|
||||
|
||||
private async syncRemotelyUpdatedFile(
|
||||
remoteVersion: components["schemas"]["DocumentVersionWithoutContent"]
|
||||
): Promise<void> {
|
||||
let document = this.database.getDocumentByDocumentId(
|
||||
remoteVersion.documentId
|
||||
);
|
||||
|
|
@ -400,6 +420,8 @@ export class Syncer {
|
|||
this.database.removeDocumentPromise(promise);
|
||||
}
|
||||
}
|
||||
|
||||
this.database.addLastSeenUpdateId(remoteVersion.vaultUpdateId);
|
||||
} finally {
|
||||
if (hasLockToRelease) {
|
||||
this.remoteDocumentsLock.unlock(remoteVersion.documentId);
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ export class UnrestrictedSyncer {
|
|||
document
|
||||
);
|
||||
|
||||
this.tryIncrementVaultUpdateId(response.vaultUpdateId);
|
||||
this.database.addLastSeenUpdateId(response.vaultUpdateId);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
@ -90,6 +90,8 @@ export class UnrestrictedSyncer {
|
|||
},
|
||||
document
|
||||
);
|
||||
|
||||
this.database.addLastSeenUpdateId(response.vaultUpdateId);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
@ -156,6 +158,7 @@ export class UnrestrictedSyncer {
|
|||
this.logger.info(
|
||||
`Document ${document.relativePath} has been deleted before we could finish updating it`
|
||||
);
|
||||
this.database.addLastSeenUpdateId(response.vaultUpdateId);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -174,6 +177,7 @@ export class UnrestrictedSyncer {
|
|||
this.logger.debug(
|
||||
`Document ${document.relativePath} is already more up to date than the fetched version`
|
||||
);
|
||||
this.database.addLastSeenUpdateId(response.vaultUpdateId); // in case the previous `vaultUpdateId` update hasn't made it through
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -206,7 +210,7 @@ export class UnrestrictedSyncer {
|
|||
|
||||
await this.operations.delete(document.relativePath);
|
||||
|
||||
this.tryIncrementVaultUpdateId(response.vaultUpdateId);
|
||||
this.database.addLastSeenUpdateId(response.vaultUpdateId);
|
||||
|
||||
return;
|
||||
}
|
||||
|
|
@ -221,14 +225,6 @@ export class UnrestrictedSyncer {
|
|||
); // this can throw FileNotFoundError
|
||||
}
|
||||
|
||||
this.database.updateDocumentMetadata(
|
||||
{
|
||||
parentVersionId: response.vaultUpdateId,
|
||||
hash: contentHash
|
||||
},
|
||||
document
|
||||
);
|
||||
|
||||
if (
|
||||
!("type" in response) ||
|
||||
response.type === "MergingUpdate"
|
||||
|
|
@ -268,7 +264,7 @@ export class UnrestrictedSyncer {
|
|||
);
|
||||
}
|
||||
|
||||
this.tryIncrementVaultUpdateId(response.vaultUpdateId);
|
||||
this.database.addLastSeenUpdateId(response.vaultUpdateId);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
@ -291,6 +287,7 @@ export class UnrestrictedSyncer {
|
|||
this.logger.debug(
|
||||
`Document ${remoteVersion.relativePath} is already at least as up to date as the fetched version`
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -425,10 +422,4 @@ export class UnrestrictedSyncer {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private tryIncrementVaultUpdateId(responseVaultUpdateId: number): void {
|
||||
if (this.database.getLastSeenUpdateId() === responseVaultUpdateId - 1) {
|
||||
this.database.setLastSeenUpdateId(responseVaultUpdateId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue