From eb8c296e62fa9c7d05ef3225de14a7e2745c4557 Mon Sep 17 00:00:00 2001 From: zadam Date: Mon, 2 Dec 2019 22:27:06 +0100 Subject: [PATCH] attempt to make updating clients via websocket faster --- src/public/javascripts/services/tree.js | 4 +- src/public/javascripts/services/ws.js | 56 ++++++++++++++----------- src/routes/api/note_revisions.js | 3 +- src/services/sync_table.js | 19 +++++++-- src/services/ws.js | 26 +++++++++--- 5 files changed, 73 insertions(+), 35 deletions(-) diff --git a/src/public/javascripts/services/tree.js b/src/public/javascripts/services/tree.js index fad10a435..28920c6d2 100644 --- a/src/public/javascripts/services/tree.js +++ b/src/public/javascripts/services/tree.js @@ -795,7 +795,7 @@ ws.subscribeToOutsideSyncMessages(async syncData => { syncData.filter(sync => sync.entityName === 'attributes').forEach(sync => { const note = treeCache.notes[sync.noteId]; - if (note && note.attributeCache) { + if (note && note.__attributeCache) { noteIdsToRefresh.add(sync.entityId); } }); @@ -869,7 +869,7 @@ async function reloadNotes(noteIds, activateNotePath = null) { if (activateNotePath) { const node = await getNodeFromPath(activateNotePath); - if (node) { + if (node && !node.isActive()) { await node.setActive(true); } } diff --git a/src/public/javascripts/services/ws.js b/src/public/javascripts/services/ws.js index 4264cf55c..7e90e23dc 100644 --- a/src/public/javascripts/services/ws.js +++ b/src/public/javascripts/services/ws.js @@ -8,7 +8,8 @@ const outsideSyncMessageHandlers = []; const messageHandlers = []; let ws; -let lastSyncId = window.glob.maxSyncIdAtLoad; +let lastAcceptedSyncId = window.glob.maxSyncIdAtLoad; +let lastProcessedSyncId = window.glob.maxSyncIdAtLoad; let lastPingTs; let syncDataQueue = []; @@ -84,7 +85,7 @@ async function handleMessage(event) { let syncIdReachedListeners = []; function waitForSyncId(desiredSyncId) { - if (desiredSyncId <= lastSyncId) { + if (desiredSyncId <= lastProcessedSyncId) { return Promise.resolve(); } @@ -99,14 +100,14 @@ function waitForSyncId(desiredSyncId) { function checkSyncIdListeners() { syncIdReachedListeners - .filter(l => l.desiredSyncId <= lastSyncId) + .filter(l => l.desiredSyncId <= lastProcessedSyncId) .forEach(l => l.resolvePromise()); syncIdReachedListeners = syncIdReachedListeners - .filter(l => l.desiredSyncId > lastSyncId); + .filter(l => l.desiredSyncId > lastProcessedSyncId); syncIdReachedListeners.filter(l => Date.now() > l.start - 60000) - .forEach(l => console.log(`Waiting for syncId ${l.desiredSyncId} while current is ${lastSyncId} for ${Math.floor((Date.now() - l.start) / 1000)}s`)); + .forEach(l => console.log(`Waiting for syncId ${l.desiredSyncId} while current is ${lastProcessedSyncId} for ${Math.floor((Date.now() - l.start) / 1000)}s`)); } async function consumeSyncData() { @@ -116,13 +117,17 @@ async function consumeSyncData() { const outsideSyncData = allSyncData.filter(sync => sync.sourceId !== glob.sourceId); + // we set lastAcceptedSyncId even before sync processing and send ping so that backend can start sending more updates + lastAcceptedSyncId = Math.max(lastAcceptedSyncId, allSyncData[allSyncData.length - 1].id); + sendPing(); + // the update process should be synchronous as a whole but individual handlers can run in parallel await Promise.all([ ...allSyncMessageHandlers.map(syncHandler => syncHandler(allSyncData)), ...outsideSyncMessageHandlers.map(syncHandler => syncHandler(outsideSyncData)) ]); - lastSyncId = allSyncData[allSyncData.length - 1].id; + lastProcessedSyncId = Math.max(lastProcessedSyncId, allSyncData[allSyncData.length - 1].id); } } @@ -140,29 +145,32 @@ function connectWebSocket() { return ws; } +async function sendPing() { + if (Date.now() - lastPingTs > 30000) { + console.log(utils.now(), "Lost connection to server"); + } + + if (ws.readyState === ws.OPEN) { + ws.send(JSON.stringify({ + type: 'ping', + lastSyncId: lastAcceptedSyncId + })); + } + else if (ws.readyState === ws.CLOSED || ws.readyState === ws.CLOSING) { + console.log(utils.now(), "WS closed or closing, trying to reconnect"); + + ws = connectWebSocket(); + } +} + setTimeout(() => { ws = connectWebSocket(); - lastSyncId = glob.maxSyncIdAtLoad; + lastAcceptedSyncId = glob.maxSyncIdAtLoad; + lastProcessedSyncId = glob.maxSyncIdAtLoad; lastPingTs = Date.now(); - setInterval(async () => { - if (Date.now() - lastPingTs > 30000) { - console.log(utils.now(), "Lost connection to server"); - } - - if (ws.readyState === ws.OPEN) { - ws.send(JSON.stringify({ - type: 'ping', - lastSyncId: lastSyncId - })); - } - else if (ws.readyState === ws.CLOSED || ws.readyState === ws.CLOSING) { - console.log(utils.now(), "WS closed or closing, trying to reconnect"); - - ws = connectWebSocket(); - } - }, 1000); + setInterval(sendPing, 1000); }, 0); subscribeToMessages(message => { diff --git a/src/routes/api/note_revisions.js b/src/routes/api/note_revisions.js index a6922e0b3..c4a8091b3 100644 --- a/src/routes/api/note_revisions.js +++ b/src/routes/api/note_revisions.js @@ -125,7 +125,8 @@ async function getEditedNotesOnDate(req) { SELECT noteId FROM note_revisions WHERE note_revisions.dateLastEdited LIKE '${date}%' ) - ORDER BY isDeleted`); + ORDER BY isDeleted + LIMIT 50`); for (const note of notes) { const notePath = noteCacheService.getNotePath(note.noteId); diff --git a/src/services/sync_table.js b/src/services/sync_table.js index 90ac3a6ba..687368442 100644 --- a/src/services/sync_table.js +++ b/src/services/sync_table.js @@ -4,13 +4,25 @@ const dateUtils = require('./date_utils'); const log = require('./log'); const cls = require('./cls'); +let syncs = []; + async function addEntitySync(entityName, entityId, sourceId) { - await sql.replace("sync", { + const sync = { entityName: entityName, entityId: entityId, utcSyncDate: dateUtils.utcNowDateTime(), sourceId: sourceId || cls.getSourceId() || sourceIdService.getCurrentSourceId() - }); + }; + + sync.id = await sql.replace("sync", sync); + + syncs.push(sync); + + setTimeout(() => require('./ws').sendPingToAllClients(), 50); +} + +function getEntitySyncsNewerThan(syncId) { + return syncs.filter(s => s.id > syncId); } async function cleanupSyncRowsForMissingEntities(entityName, entityKey) { @@ -83,5 +95,6 @@ module.exports = { addAttributeSync: async (attributeId, sourceId) => await addEntitySync("attributes", attributeId, sourceId), addApiTokenSync: async (apiTokenId, sourceId) => await addEntitySync("api_tokens", apiTokenId, sourceId), addEntitySync, - fillAllSyncRows + fillAllSyncRows, + getEntitySyncsNewerThan }; \ No newline at end of file diff --git a/src/services/ws.js b/src/services/ws.js index 7661afca5..1a1a45831 100644 --- a/src/services/ws.js +++ b/src/services/ws.js @@ -5,6 +5,7 @@ const sql = require('./sql'); const syncMutexService = require('./sync_mutex'); let webSocketServer; +let lastAcceptedSyncIds = {}; function init(httpServer, sessionParser) { webSocketServer = new WebSocket.Server({ @@ -23,7 +24,11 @@ function init(httpServer, sessionParser) { }); webSocketServer.on('connection', (ws, req) => { - console.log("websocket client connected"); + ws.id = utils.randomString(10); + + lastAcceptedSyncIds[ws.id] = 0; + + console.log(`websocket client connected`); ws.on('message', messageJson => { const message = JSON.parse(messageJson); @@ -32,7 +37,9 @@ function init(httpServer, sessionParser) { log.error('JS Error: ' + message.error); } else if (message.type === 'ping') { - syncMutexService.doExclusively(async () => await sendPing(ws, message.lastSyncId)); + lastAcceptedSyncIds[ws.id] = message.lastSyncId; + + syncMutexService.doExclusively(async () => await sendPing(ws)); } else { log.error('Unrecognized message: '); @@ -64,8 +71,8 @@ function sendMessageToAllClients(message) { } } -async function sendPing(client, lastSentSyncId) { - const syncData = await sql.getRows("SELECT * FROM sync WHERE id > ?", [lastSentSyncId]); +async function sendPing(client) { + const syncData = require('./sync_table').getEntitySyncsNewerThan(lastAcceptedSyncIds[client.id]); for (const sync of syncData) { // fill in some extra data needed by the frontend @@ -92,6 +99,14 @@ async function sendPing(client, lastSentSyncId) { }); } +function sendPingToAllClients() { + if (webSocketServer) { + webSocketServer.clients.forEach(function each(client) { + sendPing(client); + }); + } +} + function refreshTree() { sendMessageToAllClients({ type: 'refresh-tree' }); } @@ -109,5 +124,6 @@ module.exports = { sendMessageToAllClients, refreshTree, syncPullInProgress, - syncPullFinished + syncPullFinished, + sendPingToAllClients }; \ No newline at end of file