attempt to make updating clients via websocket faster

This commit is contained in:
zadam 2019-12-02 22:27:06 +01:00
parent dc063983ea
commit eb8c296e62
5 changed files with 73 additions and 35 deletions

View File

@ -795,7 +795,7 @@ ws.subscribeToOutsideSyncMessages(async syncData => {
syncData.filter(sync => sync.entityName === 'attributes').forEach(sync => {
const note = treeCache.notes[sync.noteId];
if (note && note.attributeCache) {
if (note && note.__attributeCache) {
noteIdsToRefresh.add(sync.entityId);
}
});
@ -869,7 +869,7 @@ async function reloadNotes(noteIds, activateNotePath = null) {
if (activateNotePath) {
const node = await getNodeFromPath(activateNotePath);
if (node) {
if (node && !node.isActive()) {
await node.setActive(true);
}
}

View File

@ -8,7 +8,8 @@ const outsideSyncMessageHandlers = [];
const messageHandlers = [];
let ws;
let lastSyncId = window.glob.maxSyncIdAtLoad;
let lastAcceptedSyncId = window.glob.maxSyncIdAtLoad;
let lastProcessedSyncId = window.glob.maxSyncIdAtLoad;
let lastPingTs;
let syncDataQueue = [];
@ -84,7 +85,7 @@ async function handleMessage(event) {
let syncIdReachedListeners = [];
function waitForSyncId(desiredSyncId) {
if (desiredSyncId <= lastSyncId) {
if (desiredSyncId <= lastProcessedSyncId) {
return Promise.resolve();
}
@ -99,14 +100,14 @@ function waitForSyncId(desiredSyncId) {
function checkSyncIdListeners() {
syncIdReachedListeners
.filter(l => l.desiredSyncId <= lastSyncId)
.filter(l => l.desiredSyncId <= lastProcessedSyncId)
.forEach(l => l.resolvePromise());
syncIdReachedListeners = syncIdReachedListeners
.filter(l => l.desiredSyncId > lastSyncId);
.filter(l => l.desiredSyncId > lastProcessedSyncId);
syncIdReachedListeners.filter(l => Date.now() > l.start - 60000)
.forEach(l => console.log(`Waiting for syncId ${l.desiredSyncId} while current is ${lastSyncId} for ${Math.floor((Date.now() - l.start) / 1000)}s`));
.forEach(l => console.log(`Waiting for syncId ${l.desiredSyncId} while current is ${lastProcessedSyncId} for ${Math.floor((Date.now() - l.start) / 1000)}s`));
}
async function consumeSyncData() {
@ -116,13 +117,17 @@ async function consumeSyncData() {
const outsideSyncData = allSyncData.filter(sync => sync.sourceId !== glob.sourceId);
// we set lastAcceptedSyncId even before sync processing and send ping so that backend can start sending more updates
lastAcceptedSyncId = Math.max(lastAcceptedSyncId, allSyncData[allSyncData.length - 1].id);
sendPing();
// the update process should be synchronous as a whole but individual handlers can run in parallel
await Promise.all([
...allSyncMessageHandlers.map(syncHandler => syncHandler(allSyncData)),
...outsideSyncMessageHandlers.map(syncHandler => syncHandler(outsideSyncData))
]);
lastSyncId = allSyncData[allSyncData.length - 1].id;
lastProcessedSyncId = Math.max(lastProcessedSyncId, allSyncData[allSyncData.length - 1].id);
}
}
@ -140,29 +145,32 @@ function connectWebSocket() {
return ws;
}
async function sendPing() {
if (Date.now() - lastPingTs > 30000) {
console.log(utils.now(), "Lost connection to server");
}
if (ws.readyState === ws.OPEN) {
ws.send(JSON.stringify({
type: 'ping',
lastSyncId: lastAcceptedSyncId
}));
}
else if (ws.readyState === ws.CLOSED || ws.readyState === ws.CLOSING) {
console.log(utils.now(), "WS closed or closing, trying to reconnect");
ws = connectWebSocket();
}
}
setTimeout(() => {
ws = connectWebSocket();
lastSyncId = glob.maxSyncIdAtLoad;
lastAcceptedSyncId = glob.maxSyncIdAtLoad;
lastProcessedSyncId = glob.maxSyncIdAtLoad;
lastPingTs = Date.now();
setInterval(async () => {
if (Date.now() - lastPingTs > 30000) {
console.log(utils.now(), "Lost connection to server");
}
if (ws.readyState === ws.OPEN) {
ws.send(JSON.stringify({
type: 'ping',
lastSyncId: lastSyncId
}));
}
else if (ws.readyState === ws.CLOSED || ws.readyState === ws.CLOSING) {
console.log(utils.now(), "WS closed or closing, trying to reconnect");
ws = connectWebSocket();
}
}, 1000);
setInterval(sendPing, 1000);
}, 0);
subscribeToMessages(message => {

View File

@ -125,7 +125,8 @@ async function getEditedNotesOnDate(req) {
SELECT noteId FROM note_revisions
WHERE note_revisions.dateLastEdited LIKE '${date}%'
)
ORDER BY isDeleted`);
ORDER BY isDeleted
LIMIT 50`);
for (const note of notes) {
const notePath = noteCacheService.getNotePath(note.noteId);

View File

@ -4,13 +4,25 @@ const dateUtils = require('./date_utils');
const log = require('./log');
const cls = require('./cls');
let syncs = [];
async function addEntitySync(entityName, entityId, sourceId) {
await sql.replace("sync", {
const sync = {
entityName: entityName,
entityId: entityId,
utcSyncDate: dateUtils.utcNowDateTime(),
sourceId: sourceId || cls.getSourceId() || sourceIdService.getCurrentSourceId()
});
};
sync.id = await sql.replace("sync", sync);
syncs.push(sync);
setTimeout(() => require('./ws').sendPingToAllClients(), 50);
}
function getEntitySyncsNewerThan(syncId) {
return syncs.filter(s => s.id > syncId);
}
async function cleanupSyncRowsForMissingEntities(entityName, entityKey) {
@ -83,5 +95,6 @@ module.exports = {
addAttributeSync: async (attributeId, sourceId) => await addEntitySync("attributes", attributeId, sourceId),
addApiTokenSync: async (apiTokenId, sourceId) => await addEntitySync("api_tokens", apiTokenId, sourceId),
addEntitySync,
fillAllSyncRows
fillAllSyncRows,
getEntitySyncsNewerThan
};

View File

@ -5,6 +5,7 @@ const sql = require('./sql');
const syncMutexService = require('./sync_mutex');
let webSocketServer;
let lastAcceptedSyncIds = {};
function init(httpServer, sessionParser) {
webSocketServer = new WebSocket.Server({
@ -23,7 +24,11 @@ function init(httpServer, sessionParser) {
});
webSocketServer.on('connection', (ws, req) => {
console.log("websocket client connected");
ws.id = utils.randomString(10);
lastAcceptedSyncIds[ws.id] = 0;
console.log(`websocket client connected`);
ws.on('message', messageJson => {
const message = JSON.parse(messageJson);
@ -32,7 +37,9 @@ function init(httpServer, sessionParser) {
log.error('JS Error: ' + message.error);
}
else if (message.type === 'ping') {
syncMutexService.doExclusively(async () => await sendPing(ws, message.lastSyncId));
lastAcceptedSyncIds[ws.id] = message.lastSyncId;
syncMutexService.doExclusively(async () => await sendPing(ws));
}
else {
log.error('Unrecognized message: ');
@ -64,8 +71,8 @@ function sendMessageToAllClients(message) {
}
}
async function sendPing(client, lastSentSyncId) {
const syncData = await sql.getRows("SELECT * FROM sync WHERE id > ?", [lastSentSyncId]);
async function sendPing(client) {
const syncData = require('./sync_table').getEntitySyncsNewerThan(lastAcceptedSyncIds[client.id]);
for (const sync of syncData) {
// fill in some extra data needed by the frontend
@ -92,6 +99,14 @@ async function sendPing(client, lastSentSyncId) {
});
}
function sendPingToAllClients() {
if (webSocketServer) {
webSocketServer.clients.forEach(function each(client) {
sendPing(client);
});
}
}
function refreshTree() {
sendMessageToAllClients({ type: 'refresh-tree' });
}
@ -109,5 +124,6 @@ module.exports = {
sendMessageToAllClients,
refreshTree,
syncPullInProgress,
syncPullFinished
syncPullFinished,
sendPingToAllClients
};