mirror of
https://github.com/zadam/trilium.git
synced 2025-03-01 14:22:32 +01:00
347 lines
11 KiB
JavaScript
347 lines
11 KiB
JavaScript
import utils from './utils.js';
|
|
import toastService from "./toast.js";
|
|
import server from "./server.js";
|
|
import LoadResults from "./load_results.js";
|
|
import Branch from "../entities/branch.js";
|
|
import Attribute from "../entities/attribute.js";
|
|
import options from "./options.js";
|
|
import treeCache from "./tree_cache.js";
|
|
|
|
const $outstandingSyncsCount = $("#outstanding-syncs-count");
|
|
|
|
const messageHandlers = [];
|
|
|
|
let ws;
|
|
let lastAcceptedSyncId = window.glob.maxSyncIdAtLoad;
|
|
let lastProcessedSyncId = window.glob.maxSyncIdAtLoad;
|
|
let lastPingTs;
|
|
let syncDataQueue = [];
|
|
|
|
function logError(message) {
|
|
console.log(utils.now(), message); // needs to be separate from .trace()
|
|
console.trace();
|
|
|
|
if (ws && ws.readyState === 1) {
|
|
ws.send(JSON.stringify({
|
|
type: 'log-error',
|
|
error: message
|
|
}));
|
|
}
|
|
}
|
|
|
|
function subscribeToMessages(messageHandler) {
|
|
messageHandlers.push(messageHandler);
|
|
}
|
|
|
|
// used to serialize sync operations
|
|
let consumeQueuePromise = null;
|
|
|
|
async function handleMessage(event) {
|
|
const message = JSON.parse(event.data);
|
|
|
|
for (const messageHandler of messageHandlers) {
|
|
messageHandler(message);
|
|
}
|
|
|
|
if (message.type === 'sync') {
|
|
let syncRows = message.data;
|
|
lastPingTs = Date.now();
|
|
|
|
$outstandingSyncsCount.html(message.outstandingSyncs);
|
|
|
|
if (syncRows.length > 0) {
|
|
console.debug(utils.now(), "Sync data: ", syncRows);
|
|
|
|
syncDataQueue.push(...syncRows);
|
|
|
|
// we set lastAcceptedSyncId even before sync processing and send ping so that backend can start sending more updates
|
|
lastAcceptedSyncId = Math.max(lastAcceptedSyncId, syncRows[syncRows.length - 1].id);
|
|
sendPing();
|
|
|
|
// first wait for all the preceding consumers to finish
|
|
while (consumeQueuePromise) {
|
|
await consumeQueuePromise;
|
|
}
|
|
|
|
try {
|
|
// it's my turn so start it up
|
|
consumeQueuePromise = consumeSyncData();
|
|
|
|
await consumeQueuePromise;
|
|
}
|
|
finally {
|
|
// finish and set to null to signal somebody else can pick it up
|
|
consumeQueuePromise = null;
|
|
}
|
|
}
|
|
}
|
|
else if (message.type === 'sync-hash-check-failed') {
|
|
toastService.showError("Sync check failed!", 60000);
|
|
}
|
|
else if (message.type === 'consistency-checks-failed') {
|
|
toastService.showError("Consistency checks failed! See logs for details.", 50 * 60000);
|
|
}
|
|
}
|
|
|
|
let syncIdReachedListeners = [];
|
|
|
|
function waitForSyncId(desiredSyncId) {
|
|
if (desiredSyncId <= lastProcessedSyncId) {
|
|
return Promise.resolve();
|
|
}
|
|
|
|
console.debug("Waiting for", desiredSyncId, 'current is', lastProcessedSyncId);
|
|
|
|
return new Promise((res, rej) => {
|
|
syncIdReachedListeners.push({
|
|
desiredSyncId,
|
|
resolvePromise: res,
|
|
start: Date.now()
|
|
})
|
|
});
|
|
}
|
|
|
|
function waitForMaxKnownSyncId() {
|
|
return waitForSyncId(server.getMaxKnownSyncId());
|
|
}
|
|
|
|
function checkSyncIdListeners() {
|
|
syncIdReachedListeners
|
|
.filter(l => l.desiredSyncId <= lastProcessedSyncId)
|
|
.forEach(l => l.resolvePromise());
|
|
|
|
syncIdReachedListeners = syncIdReachedListeners
|
|
.filter(l => l.desiredSyncId > lastProcessedSyncId);
|
|
|
|
syncIdReachedListeners.filter(l => Date.now() > l.start - 60000)
|
|
.forEach(l => console.log(`Waiting for syncId ${l.desiredSyncId} while current is ${lastProcessedSyncId} for ${Math.floor((Date.now() - l.start) / 1000)}s`));
|
|
}
|
|
|
|
async function runSafely(syncHandler, syncData) {
|
|
try {
|
|
return await syncHandler(syncData);
|
|
}
|
|
catch (e) {
|
|
console.log(`Sync handler failed with ${e.message}: ${e.stack}`);
|
|
}
|
|
}
|
|
|
|
async function consumeSyncData() {
|
|
if (syncDataQueue.length > 0) {
|
|
const allSyncData = syncDataQueue;
|
|
syncDataQueue = [];
|
|
|
|
try {
|
|
await processSyncRows(allSyncData);
|
|
}
|
|
catch (e) {
|
|
logError(`Encountered error ${e.message}: ${e.stack}, reloading frontend.`);
|
|
|
|
// if there's an error in updating the frontend then the easy option to recover is to reload the frontend completely
|
|
utils.reloadApp();
|
|
}
|
|
|
|
lastProcessedSyncId = Math.max(lastProcessedSyncId, allSyncData[allSyncData.length - 1].id);
|
|
}
|
|
|
|
checkSyncIdListeners();
|
|
}
|
|
|
|
function connectWebSocket() {
|
|
const loc = window.location;
|
|
const webSocketUri = (loc.protocol === "https:" ? "wss:" : "ws:")
|
|
+ "//" + loc.host + loc.pathname;
|
|
|
|
// use wss for secure messaging
|
|
const ws = new WebSocket(webSocketUri);
|
|
ws.onopen = () => console.debug(utils.now(), `Connected to server ${webSocketUri} with WebSocket`);
|
|
ws.onmessage = handleMessage;
|
|
// we're not handling ws.onclose here because reconnection is done in sendPing()
|
|
|
|
return ws;
|
|
}
|
|
|
|
async function sendPing() {
|
|
if (Date.now() - lastPingTs > 30000) {
|
|
console.log(utils.now(), "Lost websocket connection to the backend");
|
|
}
|
|
|
|
if (ws.readyState === ws.OPEN) {
|
|
ws.send(JSON.stringify({
|
|
type: 'ping',
|
|
lastSyncId: lastAcceptedSyncId
|
|
}));
|
|
}
|
|
else if (ws.readyState === ws.CLOSED || ws.readyState === ws.CLOSING) {
|
|
console.log(utils.now(), "WS closed or closing, trying to reconnect");
|
|
|
|
ws = connectWebSocket();
|
|
}
|
|
}
|
|
|
|
setTimeout(() => {
|
|
ws = connectWebSocket();
|
|
|
|
lastPingTs = Date.now();
|
|
|
|
setInterval(sendPing, 1000);
|
|
}, 0);
|
|
|
|
subscribeToMessages(message => {
|
|
if (message.type === 'sync-pull-in-progress') {
|
|
toastService.showPersistent({
|
|
id: 'sync',
|
|
title: "Sync status",
|
|
message: "Sync update in progress",
|
|
icon: "refresh"
|
|
});
|
|
}
|
|
else if (message.type === 'sync-pull-finished') {
|
|
// this gives user a chance to see the toast in case of fast sync finish
|
|
setTimeout(() => toastService.closePersistent('sync'), 1000);
|
|
}
|
|
});
|
|
|
|
async function processSyncRows(syncRows) {
|
|
const loadResults = new LoadResults(treeCache);
|
|
|
|
syncRows.filter(sync => sync.entityName === 'notes').forEach(sync => {
|
|
const note = treeCache.notes[sync.entityId];
|
|
|
|
if (note) {
|
|
note.update(sync.entity);
|
|
loadResults.addNote(sync.entityId, sync.sourceId);
|
|
}
|
|
});
|
|
|
|
syncRows.filter(sync => sync.entityName === 'branches').forEach(sync => {
|
|
let branch = treeCache.branches[sync.entityId];
|
|
const childNote = treeCache.notes[sync.entity.noteId];
|
|
const parentNote = treeCache.notes[sync.entity.parentNoteId];
|
|
|
|
if (branch) {
|
|
branch.update(sync.entity);
|
|
loadResults.addBranch(sync.entityId, sync.sourceId);
|
|
|
|
if (sync.entity.isDeleted) {
|
|
if (childNote) {
|
|
childNote.parents = childNote.parents.filter(parentNoteId => parentNoteId !== sync.entity.parentNoteId);
|
|
delete childNote.parentToBranch[sync.entity.parentNoteId];
|
|
}
|
|
|
|
if (parentNote) {
|
|
parentNote.children = parentNote.children.filter(childNoteId => childNoteId !== sync.entity.noteId);
|
|
delete parentNote.childToBranch[sync.entity.noteId];
|
|
}
|
|
}
|
|
else {
|
|
if (childNote) {
|
|
childNote.addParent(branch.parentNoteId, branch.branchId);
|
|
}
|
|
|
|
if (parentNote) {
|
|
parentNote.addChild(branch.noteId, branch.branchId);
|
|
}
|
|
}
|
|
}
|
|
else if (!sync.entity.isDeleted) {
|
|
if (childNote || parentNote) {
|
|
branch = new Branch(treeCache, sync.entity);
|
|
treeCache.branches[branch.branchId] = branch;
|
|
|
|
loadResults.addBranch(sync.entityId, sync.sourceId);
|
|
|
|
if (childNote) {
|
|
childNote.addParent(branch.parentNoteId, branch.branchId);
|
|
}
|
|
|
|
if (parentNote) {
|
|
parentNote.addChild(branch.noteId, branch.branchId);
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
syncRows.filter(sync => sync.entityName === 'note_reordering').forEach(sync => {
|
|
for (const branchId in sync.positions) {
|
|
const branch = treeCache.branches[branchId];
|
|
|
|
if (branch) {
|
|
branch.notePosition = sync.positions[branchId];
|
|
}
|
|
}
|
|
|
|
loadResults.addNoteReordering(sync.entityId, sync.sourceId);
|
|
});
|
|
|
|
// missing reloading the relation target note
|
|
syncRows.filter(sync => sync.entityName === 'attributes').forEach(sync => {
|
|
let attribute = treeCache.attributes[sync.entityId];
|
|
const sourceNote = treeCache.notes[sync.entity.noteId];
|
|
const targetNote = sync.entity.type === 'relation' && treeCache.notes[sync.entity.value];
|
|
|
|
if (attribute) {
|
|
attribute.update(sync.entity);
|
|
loadResults.addAttribute(sync.entityId, sync.sourceId);
|
|
|
|
if (sync.entity.isDeleted) {
|
|
if (sourceNote) {
|
|
sourceNote.attributes = sourceNote.attributes.filter(attributeId => attributeId !== attribute.attributeId);
|
|
}
|
|
|
|
if (targetNote) {
|
|
targetNote.targetRelations = targetNote.targetRelations.filter(attributeId => attributeId !== attribute.attributeId);
|
|
}
|
|
}
|
|
}
|
|
else if (!sync.entity.isDeleted) {
|
|
if (sourceNote || targetNote) {
|
|
attribute = new Attribute(treeCache, sync.entity);
|
|
|
|
treeCache.attributes[attribute.attributeId] = attribute;
|
|
|
|
loadResults.addAttribute(sync.entityId, sync.sourceId);
|
|
|
|
if (sourceNote && !sourceNote.attributes.includes(attribute.attributeId)) {
|
|
sourceNote.attributes.push(attribute.attributeId);
|
|
}
|
|
|
|
if (targetNote && !targetNote.targetRelations.includes(attribute.attributeId)) {
|
|
targetNote.targetRelations.push(attribute.attributeId);
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
syncRows.filter(sync => sync.entityName === 'note_contents').forEach(sync => {
|
|
delete treeCache.noteComplementPromises[sync.entityId];
|
|
|
|
loadResults.addNoteContent(sync.entityId, sync.sourceId);
|
|
});
|
|
|
|
syncRows.filter(sync => sync.entityName === 'note_revisions').forEach(sync => {
|
|
loadResults.addNoteRevision(sync.entityId, sync.noteId, sync.sourceId);
|
|
});
|
|
|
|
syncRows.filter(sync => sync.entityName === 'options').forEach(sync => {
|
|
if (sync.entity.name === 'openTabs') {
|
|
return; // only noise
|
|
}
|
|
|
|
options.set(sync.entity.name, sync.entity.value);
|
|
|
|
loadResults.addOption(sync.entity.name);
|
|
});
|
|
|
|
if (!loadResults.isEmpty()) {
|
|
const appContext = (await import("./app_context.js")).default;
|
|
await appContext.triggerEvent('entitiesReloaded', {loadResults});
|
|
}
|
|
}
|
|
|
|
export default {
|
|
logError,
|
|
subscribeToMessages,
|
|
waitForSyncId,
|
|
waitForMaxKnownSyncId
|
|
}; |