From 392a00ac179dd61b28d98b85a0fb626471ff5d9e Mon Sep 17 00:00:00 2001 From: zadam Date: Sun, 21 Mar 2021 22:43:41 +0100 Subject: [PATCH] sync status widget --- src/public/app/services/ws.js | 68 ++++-------- src/public/app/widgets/global_menu.js | 8 -- src/public/app/widgets/sync_status.js | 147 ++++++++++++++++++-------- src/routes/index.js | 1 + src/services/sql.js | 2 +- src/services/sync.js | 13 ++- src/services/ws.js | 27 +++-- src/views/desktop.ejs | 1 + src/views/mobile.ejs | 1 + 9 files changed, 156 insertions(+), 112 deletions(-) diff --git a/src/public/app/services/ws.js b/src/public/app/services/ws.js index 82f56a346..e2b171566 100644 --- a/src/public/app/services/ws.js +++ b/src/public/app/services/ws.js @@ -12,9 +12,10 @@ const messageHandlers = []; let ws; let lastAcceptedEntityChangeId = window.glob.maxEntityChangeIdAtLoad; +let lastAcceptedEntityChangeSyncId = window.glob.maxEntityChangeSyncIdAtLoad; let lastProcessedEntityChangeId = window.glob.maxEntityChangeIdAtLoad; let lastPingTs; -let syncDataQueue = []; +let frontendUpdateDataQueue = []; function logError(message) { console.error(utils.now(), message); // needs to be separate from .trace() @@ -34,7 +35,7 @@ function subscribeToMessages(messageHandler) { messageHandlers.push(messageHandler); } -// used to serialize sync operations +// used to serialize frontend update operations let consumeQueuePromise = null; // to make sure each change event is processed only once. Not clear if this is still necessary @@ -46,7 +47,7 @@ function logRows(entityChanges) { && (row.entityName !== 'options' || row.entityId !== 'openTabs')); if (filteredRows.length > 0) { - console.debug(utils.now(), "Sync data: ", filteredRows); + console.debug(utils.now(), "Frontend update data: ", filteredRows); } } @@ -57,17 +58,24 @@ async function handleMessage(event) { messageHandler(message); } - if (message.type === 'sync') { - let entityChanges = message.data; + if (message.type === 'frontend-update') { + let {entityChanges, lastSyncedPush} = message.data; lastPingTs = Date.now(); if (entityChanges.length > 0) { logRows(entityChanges); - syncDataQueue.push(...entityChanges); + frontendUpdateDataQueue.push(...entityChanges); - // we set lastAcceptedEntityChangeId even before sync processing and send ping so that backend can start sending more updates + // we set lastAcceptedEntityChangeId even before frontend update processing and send ping so that backend can start sending more updates lastAcceptedEntityChangeId = Math.max(lastAcceptedEntityChangeId, entityChanges[entityChanges.length - 1].id); + + const lastSyncEntityChange = entityChanges.slice().reverse().find(ec => ec.isSynced); + + if (lastSyncEntityChange) { + lastAcceptedEntityChangeSyncId = Math.max(lastAcceptedEntityChangeSyncId, lastSyncEntityChange.id); + } + sendPing(); // first wait for all the preceding consumers to finish @@ -77,7 +85,7 @@ async function handleMessage(event) { try { // it's my turn so start it up - consumeQueuePromise = consumeSyncData(); + consumeQueuePromise = consumeFrontendUpdateData(); await consumeQueuePromise; } @@ -129,19 +137,10 @@ function checkEntityChangeIdListeners() { .forEach(l => console.log(`Waiting for entityChangeId ${l.desiredEntityChangeId} while last processed is ${lastProcessedEntityChangeId} (last accepted ${lastAcceptedEntityChangeId}) for ${Math.floor((Date.now() - l.start) / 1000)}s`)); } -async function runSafely(syncHandler, syncData) { - try { - return await syncHandler(syncData); - } - catch (e) { - console.log(`Sync handler failed with ${e.message}: ${e.stack}`); - } -} - -async function consumeSyncData() { - if (syncDataQueue.length > 0) { - const allEntityChanges = syncDataQueue; - syncDataQueue = []; +async function consumeFrontendUpdateData() { + if (frontendUpdateDataQueue.length > 0) { + const allEntityChanges = frontendUpdateDataQueue; + frontendUpdateDataQueue = []; const nonProcessedEntityChanges = allEntityChanges.filter(ec => !processedEntityChangeIds.has(ec.id)); @@ -213,30 +212,6 @@ setTimeout(() => { setInterval(sendPing, 1000); }, 0); -subscribeToMessages(async message => { - const appContext = (await import("./app_context.js")).default; - - if (message.type === 'sync-pull-in-progress') { - toastService.showPersistent({ - id: 'sync', - title: "Sync status", - message: "Sync update in progress", - icon: "refresh" - }); - - appContext.triggerEvent('syncInProgress'); - } - else if (message.type === 'sync-finished') { - // this gives user a chance to see the toast in case of fast sync finish - setTimeout(() => toastService.closePersistent('sync'), 1000); - - appContext.triggerEvent('syncFinished'); - } - else if (message.type === 'sync-failed') { - appContext.triggerEvent('syncFailed'); - } -}); - async function processEntityChanges(entityChanges) { const loadResults = new LoadResults(treeCache); @@ -413,5 +388,6 @@ export default { logError, subscribeToMessages, waitForEntityChangeId, - waitForMaxKnownEntityChangeId + waitForMaxKnownEntityChangeId, + getMaxKnownEntityChangeSyncId: () => lastAcceptedEntityChangeSyncId }; diff --git a/src/public/app/widgets/global_menu.js b/src/public/app/widgets/global_menu.js index 260e6bd52..4565039cb 100644 --- a/src/public/app/widgets/global_menu.js +++ b/src/public/app/widgets/global_menu.js @@ -1,6 +1,5 @@ import BasicWidget from "./basic_widget.js"; import utils from "../services/utils.js"; -import syncService from "../services/sync.js"; const TPL = `