From a3b2e705ce153f672fe7a4f61a363ba6779556f7 Mon Sep 17 00:00:00 2001 From: azivner Date: Thu, 9 Nov 2017 20:52:47 -0500 Subject: [PATCH] refactoring of sync code --- routes/api/sync.js | 13 +- services/sync.js | 430 ++++++++++++---------------------------- services/sync_update.js | 123 ++++++++++++ 3 files changed, 262 insertions(+), 304 deletions(-) create mode 100644 services/sync_update.js diff --git a/routes/api/sync.js b/routes/api/sync.js index 55fa0365b..6ab0be33e 100644 --- a/routes/api/sync.js +++ b/routes/api/sync.js @@ -4,6 +4,7 @@ const express = require('express'); const router = express.Router(); const auth = require('../../services/auth'); const sync = require('../../services/sync'); +const syncUpdate = require('../../services/sync_update'); const sql = require('../../services/sql'); const options = require('../../services/options'); @@ -66,37 +67,37 @@ router.get('/recent_notes/:noteId', auth.checkApiAuth, async (req, res, next) => }); router.put('/notes', auth.checkApiAuth, async (req, res, next) => { - await sync.updateNote(req.body.entity, req.body.links, req.body.sourceId); + await syncUpdate.updateNote(req.body.entity, req.body.links, req.body.sourceId); res.send({}); }); router.put('/notes_tree', auth.checkApiAuth, async (req, res, next) => { - await sync.updateNoteTree(req.body.entity, req.body.sourceId); + await syncUpdate.updateNoteTree(req.body.entity, req.body.sourceId); res.send({}); }); router.put('/notes_history', auth.checkApiAuth, async (req, res, next) => { - await sync.updateNoteHistory(req.body.entity, req.body.sourceId); + await syncUpdate.updateNoteHistory(req.body.entity, req.body.sourceId); res.send({}); }); router.put('/notes_reordering', auth.checkApiAuth, async (req, res, next) => { - await sync.updateNoteReordering(req.body.entity, req.body.sourceId); + await syncUpdate.updateNoteReordering(req.body.entity, req.body.sourceId); res.send({}); }); router.put('/options', auth.checkApiAuth, async (req, res, next) => { - await sync.updateOptions(req.body.entity, req.body.sourceId); + await syncUpdate.updateOptions(req.body.entity, req.body.sourceId); res.send({}); }); router.put('/recent_notes', auth.checkApiAuth, async (req, res, next) => { - await sync.updateRecentNotes(req.body.entity, req.body.sourceId); + await syncUpdate.updateRecentNotes(req.body.entity, req.body.sourceId); res.send({}); }); diff --git a/services/sync.js b/services/sync.js index 90b01bc7e..045ecdcf6 100644 --- a/services/sync.js +++ b/services/sync.js @@ -8,9 +8,8 @@ const migration = require('./migration'); const utils = require('./utils'); const config = require('./config'); const SOURCE_ID = require('./source_id'); -const audit_category = require('./audit_category'); -const eventLog = require('./event_log'); const notes = require('./notes'); +const syncUpdate = require('./sync_update'); const SYNC_SERVER = config['Sync']['syncServerHost']; const isSyncSetup = !!SYNC_SERVER; @@ -18,92 +17,146 @@ const SYNC_TIMEOUT = config['Sync']['syncServerTimeout'] || 5000; let syncInProgress = false; +async function sync() { + if (syncInProgress) { + log.info("Sync already in progress"); + + return { + success: false, + message: "Sync already in progress" + }; + } + + syncInProgress = true; + + try { + if (!await migration.isDbUpToDate()) { + log.info("DB not up to date"); + + return { + success: false, + message: "DB not up to date" + }; + } + + const syncContext = await login(); + + await pushSync(syncContext); + + await pullSync(syncContext); + + await pushSync(syncContext); + + return { + success: true + }; + } + catch (e) { + if (e.message.indexOf('ECONNREFUSED') !== -1) { + log.info("No connection to sync server."); + + return { + success: false, + message: "No connection to sync server." + }; + } + else { + log.info("sync failed: " + e.stack); + + return { + success: false, + message: e.message + } + } + } + finally { + syncInProgress = false; + } +} + +async function login() { + const timestamp = utils.nowTimestamp(); + + const documentSecret = await options.getOption('document_secret'); + const hash = utils.hmac(documentSecret, timestamp); + + const syncContext = { cookieJar: rp.jar() }; + + const resp = await syncRequest(syncContext, 'POST', '/api/login', { + timestamp: timestamp, + dbVersion: migration.APP_DB_VERSION, + hash: hash + }); + + syncContext.sourceId = resp.sourceId; + + return syncContext; +} + async function pullSync(syncContext) { const lastSyncedPull = parseInt(await options.getOption('last_synced_pull')); - let syncRows; + const changesUri = '/api/sync/changed?lastSyncId=' + lastSyncedPull + "&sourceId=" + SOURCE_ID; - try { - logSync("Pulling changes: " + SYNC_SERVER + '/api/sync/changed?lastSyncId=' + lastSyncedPull + "&sourceId=" + SOURCE_ID); + const syncRows = await syncRequest(syncContext, 'GET', changesUri); - syncRows = await rp({ - uri: SYNC_SERVER + '/api/sync/changed?lastSyncId=' + lastSyncedPull + "&sourceId=" + SOURCE_ID, - jar: syncContext.cookieJar, - json: true, - timeout: SYNC_TIMEOUT - }); - - logSync("Pulled " + syncRows.length + " changes"); - } - catch (e) { - logSyncError("Can't pull changes, inner exception: ", e); - } + log.info("Pulled " + syncRows.length + " changes from " + changesUri); for (const sync of syncRows) { - let resp; - - try { - resp = await rp({ - uri: SYNC_SERVER + "/api/sync/" + sync.entity_name + "/" + sync.entity_id, - json: true, - jar: syncContext.cookieJar, - timeout: SYNC_TIMEOUT - }); - } - catch (e) { - logSyncError("Can't pull " + sync.entity_name + " " + sync.entity_id, e); - } + const resp = await syncRequest(syncContext, 'GET', "/api/sync/" + sync.entity_name + "/" + sync.entity_id); if (sync.entity_name === 'notes') { - await updateNote(resp.entity, resp.links, syncContext.sourceId); + await syncUpdate.updateNote(resp.entity, resp.links, syncContext.sourceId); } else if (sync.entity_name === 'notes_tree') { - await updateNoteTree(resp, syncContext.sourceId); + await syncUpdate.updateNoteTree(resp, syncContext.sourceId); } else if (sync.entity_name === 'notes_history') { - await updateNoteHistory(resp, syncContext.sourceId); + await syncUpdate.updateNoteHistory(resp, syncContext.sourceId); } else if (sync.entity_name === 'notes_reordering') { - await updateNoteReordering(resp, syncContext.sourceId); + await syncUpdate.updateNoteReordering(resp, syncContext.sourceId); } else if (sync.entity_name === 'options') { - await updateOptions(resp, syncContext.sourceId); + await syncUpdate.updateOptions(resp, syncContext.sourceId); } else if (sync.entity_name === 'recent_notes') { - await updateRecentNotes(resp, syncContext.sourceId); + await syncUpdate.updateRecentNotes(resp, syncContext.sourceId); } else { - logSyncError("Unrecognized entity type " + sync.entity_name, e); + throw new Error("Unrecognized entity type " + sync.entity_name); } await options.setOption('last_synced_pull', sync.id); } - logSync("Finished pull"); + log.info("Finished pull"); } -async function sendEntity(entity, entityName, cookieJar) { - try { - const payload = { - sourceId: SOURCE_ID, - entity: entity - }; +async function pushSync(syncContext) { + let lastSyncedPush = parseInt(await options.getOption('last_synced_push')); - if (entityName === 'notes') { - payload.links = await sql.getResults('select * from links where note_id = ?', [entity.note_id]); + while (true) { + const sync = await sql.getSingleResultOrNull('SELECT * FROM sync WHERE id > ? LIMIT 1', [lastSyncedPush]); + + if (sync === null) { + // nothing to sync + + log.info("Nothing to push"); + + break; } - await rp({ - method: 'PUT', - uri: SYNC_SERVER + '/api/sync/' + entityName, - body: payload, - json: true, - timeout: SYNC_TIMEOUT, - jar: cookieJar - }); - } - catch (e) { - logSyncError("Failed sending update for entity " + entityName, e); + if (sync.source_id === syncContext.sourceId) { + log.info("Skipping sync " + sync.entity_name + " " + sync.entity_id + " because it originates from sync target"); + } + else { + await readAndPushEntity(sync, syncContext); + } + + lastSyncedPush = sync.id; + + await options.setOption('last_synced_push', lastSyncedPush); } } @@ -132,260 +185,47 @@ async function readAndPushEntity(sync, syncContext) { entity = await sql.getSingleResult('SELECT * FROM recent_notes WHERE note_id = ?', [sync.entity_id]); } else { - logSyncError("Unrecognized entity type " + sync.entity_name, null); + throw new Error("Unrecognized entity type " + sync.entity_name); } if (!entity) { - logSync("Sync entity for " + sync.entity_name + " " + sync.entity_id + " doesn't exist. Skipping."); + log.info("Sync entity for " + sync.entity_name + " " + sync.entity_id + " doesn't exist. Skipping."); return; } - logSync("Pushing changes in " + sync.entity_name + " " + sync.entity_id); + log.info("Pushing changes in " + sync.entity_name + " " + sync.entity_id); - await sendEntity(entity, sync.entity_name, syncContext.cookieJar); + await sendEntity(syncContext, entity, sync.entity_name); } -async function pushSync(syncContext) { - let lastSyncedPush = parseInt(await options.getOption('last_synced_push')); +async function sendEntity(syncContext, entity, entityName) { + const payload = { + sourceId: SOURCE_ID, + entity: entity + }; - while (true) { - const sync = await sql.getSingleResultOrNull('SELECT * FROM sync WHERE id > ? LIMIT 1', [lastSyncedPush]); - - if (sync === null) { - // nothing to sync - - logSync("Nothing to push"); - - break; - } - - if (sync.source_id === syncContext.sourceId) { - logSync("Skipping sync " + sync.entity_name + " " + sync.entity_id + " because it originates from sync target"); - } - else { - await readAndPushEntity(sync, syncContext); - } - - lastSyncedPush = sync.id; - - await options.setOption('last_synced_push', lastSyncedPush); + if (entityName === 'notes') { + payload.links = await sql.getResults('select * from links where note_id = ?', [entity.note_id]); } + + await syncRequest(syncContext, 'PUT', '/api/sync/' + entityName, payload); } -async function login() { - const timestamp = utils.nowTimestamp(); - - const documentSecret = await options.getOption('document_secret'); - const hash = utils.hmac(documentSecret, timestamp); - - const cookieJar = rp.jar(); +async function syncRequest(syncContext, method, uri, body) { + const fullUri = SYNC_SERVER + uri; try { - const resp = await rp({ - method: 'POST', - uri: SYNC_SERVER + '/api/login', - body: { - timestamp: timestamp, - dbVersion: migration.APP_DB_VERSION, - hash: hash - }, + return await rp({ + method: method, + uri: fullUri, + jar: syncContext.cookieJar, json: true, - timeout: SYNC_TIMEOUT, - jar: cookieJar + body: body, + timeout: SYNC_TIMEOUT }); - - return { - cookieJar: cookieJar, - sourceId: resp.sourceId - }; } catch (e) { - logSyncError("Can't login to API for sync, inner exception: ", e); - } -} - -async function sync() { - if (syncInProgress) { - logSyncError("Sync already in progress"); - - return { - success: false, - message: "Sync already in progress" - }; - } - - syncInProgress = true; - - try { - if (!await migration.isDbUpToDate()) { - logSyncError("DB not up to date"); - - return { - success: false, - message: "DB not up to date" - }; - } - - let syncContext; - - try { - syncContext = await login(); - } - catch (e) { - if (e.message.indexOf('ECONNREFUSED') !== -1) { - logSync("No connection to sync server."); - - return { - success: false, - message: "No connection to sync server." - }; - } - else { - throw e; - } - } - - await pushSync(syncContext); - - await pullSync(syncContext); - - await pushSync(syncContext); - - return { - success: true - }; - } - catch (e) { - logSync("sync failed: " + e.stack); - - return { - success: false, - message: e.message - } - } - finally { - syncInProgress = false; - } -} - -function logSync(message) { - log.info(message); -} - -function logSyncError(message, e) { - let completeMessage = message; - - if (e) { - completeMessage += ", inner exception: " + e.stack; - } - - throw new Error(completeMessage); -} - -async function updateNote(entity, links, sourceId) { - const origNote = await sql.getSingleResult("select * from notes where note_id = ?", [entity.note_id]); - - if (!origNote || origNote.date_modified <= entity.date_modified) { - await sql.doInTransaction(async () => { - await sql.replace("notes", entity); - - await sql.remove("links", entity.note_id); - - for (const link of links) { - delete link['lnk_id']; - - await sql.insert('link', link); - } - - await sql.addNoteSync(entity.note_id, sourceId); - await notes.addNoteAudits(origNote, entity, sourceId); - await eventLog.addNoteEvent(entity.note_id, "Synced note "); - }); - - logSync("Update/sync note " + entity.note_id); - } - else { - await eventLog.addNoteEvent(entity.note_id, "Sync conflict in note , " + utils.formatTwoTimestamps(origNote.date_modified, entity.date_modified)); - } -} - -async function updateNoteTree(entity, sourceId) { - const orig = await sql.getSingleResultOrNull("select * from notes_tree where note_id = ?", [entity.note_id]); - - if (orig === null || orig.date_modified < entity.date_modified) { - await sql.doInTransaction(async () => { - await sql.replace('notes_tree', entity); - - await sql.addNoteTreeSync(entity.note_id, sourceId); - - await sql.addAudit(audit_category.UPDATE_TITLE, sourceId, entity.note_id); - }); - - logSync("Update/sync note tree " + entity.note_id); - } - else { - await eventLog.addNoteEvent(entity.note_id, "Sync conflict in note tree , " + utils.formatTwoTimestamps(orig.date_modified, entity.date_modified)); - } -} - -async function updateNoteHistory(entity, sourceId) { - const orig = await sql.getSingleResultOrNull("select * from notes_history where note_history_id = ?", [entity.note_history_id]); - - if (orig === null || orig.date_modified_to < entity.date_modified_to) { - await sql.doInTransaction(async () => { - await sql.replace('notes_history', entity); - - await sql.addNoteHistorySync(entity.note_history_id, sourceId); - }); - - logSync("Update/sync note history " + entity.note_history_id); - } - else { - await eventLog.addNoteEvent(entity.note_id, "Sync conflict in note history for , " + utils.formatTwoTimestamps(orig.date_modified_to, entity.date_modified_to)); - } -} - -async function updateNoteReordering(entity, sourceId) { - await sql.doInTransaction(async () => { - Object.keys(entity.ordering).forEach(async key => { - await sql.execute("UPDATE notes_tree SET note_pos = ? WHERE note_id = ?", [entity.ordering[key], key]); - }); - - await sql.addNoteReorderingSync(entity.note_pid, sourceId); - await sql.addAudit(audit_category.CHANGE_POSITION, sourceId, entity.note_pid); - }); -} - -async function updateOptions(entity, sourceId) { - if (!options.SYNCED_OPTIONS.includes(entity.opt_name)) { - return; - } - - const orig = await sql.getSingleResultOrNull("select * from options where opt_name = ?", [entity.opt_name]); - - if (orig === null || orig.date_modified < entity.date_modified) { - await sql.doInTransaction(async () => { - await sql.replace('options', entity); - - await sql.addOptionsSync(entity.opt_name, sourceId); - }); - - await eventLog.addEvent("Synced option " + entity.opt_name); - } - else { - await eventLog.addEvent("Sync conflict in options for " + entity.opt_name + ", " + utils.formatTwoTimestamps(orig.date_modified, entity.date_modified)); - } -} - -async function updateRecentNotes(entity, sourceId) { - const orig = await sql.getSingleResultOrNull("select * from recent_notes where note_id = ?", [entity.note_id]); - - if (orig === null || orig.date_accessed < entity.date_accessed) { - await sql.doInTransaction(async () => { - await sql.replace('recent_notes', entity); - - await sql.addRecentNoteSync(entity.note_id, sourceId); - }); + throw new Error("Request to " + method + " " + fullUri + " failed, inner exception: " + e.stack); } } @@ -403,11 +243,5 @@ else { module.exports = { sync, - updateNote, - updateNoteTree, - updateNoteHistory, - updateNoteReordering, - updateOptions, - updateRecentNotes, isSyncSetup }; \ No newline at end of file diff --git a/services/sync_update.js b/services/sync_update.js new file mode 100644 index 000000000..6b7bef571 --- /dev/null +++ b/services/sync_update.js @@ -0,0 +1,123 @@ +const sql = require('./sql'); +const log = require('./log'); +const options = require('./options'); +const utils = require('./utils'); +const audit_category = require('./audit_category'); +const eventLog = require('./event_log'); +const notes = require('./notes'); + +async function updateNote(entity, links, sourceId) { + const origNote = await sql.getSingleResult("select * from notes where note_id = ?", [entity.note_id]); + + if (!origNote || origNote.date_modified <= entity.date_modified) { + await sql.doInTransaction(async () => { + await sql.replace("notes", entity); + + await sql.remove("links", entity.note_id); + + for (const link of links) { + delete link['lnk_id']; + + await sql.insert('link', link); + } + + await sql.addNoteSync(entity.note_id, sourceId); + await notes.addNoteAudits(origNote, entity, sourceId); + await eventLog.addNoteEvent(entity.note_id, "Synced note "); + }); + + log.info("Update/sync note " + entity.note_id); + } + else { + await eventLog.addNoteEvent(entity.note_id, "Sync conflict in note , " + utils.formatTwoTimestamps(origNote.date_modified, entity.date_modified)); + } +} + +async function updateNoteTree(entity, sourceId) { + const orig = await sql.getSingleResultOrNull("select * from notes_tree where note_id = ?", [entity.note_id]); + + if (orig === null || orig.date_modified < entity.date_modified) { + await sql.doInTransaction(async () => { + await sql.replace('notes_tree', entity); + + await sql.addNoteTreeSync(entity.note_id, sourceId); + + await sql.addAudit(audit_category.UPDATE_TITLE, sourceId, entity.note_id); + }); + + log.info("Update/sync note tree " + entity.note_id); + } + else { + await eventLog.addNoteEvent(entity.note_id, "Sync conflict in note tree , " + utils.formatTwoTimestamps(orig.date_modified, entity.date_modified)); + } +} + +async function updateNoteHistory(entity, sourceId) { + const orig = await sql.getSingleResultOrNull("select * from notes_history where note_history_id = ?", [entity.note_history_id]); + + if (orig === null || orig.date_modified_to < entity.date_modified_to) { + await sql.doInTransaction(async () => { + await sql.replace('notes_history', entity); + + await sql.addNoteHistorySync(entity.note_history_id, sourceId); + }); + + log.info("Update/sync note history " + entity.note_history_id); + } + else { + await eventLog.addNoteEvent(entity.note_id, "Sync conflict in note history for , " + utils.formatTwoTimestamps(orig.date_modified_to, entity.date_modified_to)); + } +} + +async function updateNoteReordering(entity, sourceId) { + await sql.doInTransaction(async () => { + Object.keys(entity.ordering).forEach(async key => { + await sql.execute("UPDATE notes_tree SET note_pos = ? WHERE note_id = ?", [entity.ordering[key], key]); + }); + + await sql.addNoteReorderingSync(entity.note_pid, sourceId); + await sql.addAudit(audit_category.CHANGE_POSITION, sourceId, entity.note_pid); + }); +} + +async function updateOptions(entity, sourceId) { + if (!options.SYNCED_OPTIONS.includes(entity.opt_name)) { + return; + } + + const orig = await sql.getSingleResultOrNull("select * from options where opt_name = ?", [entity.opt_name]); + + if (orig === null || orig.date_modified < entity.date_modified) { + await sql.doInTransaction(async () => { + await sql.replace('options', entity); + + await sql.addOptionsSync(entity.opt_name, sourceId); + }); + + await eventLog.addEvent("Synced option " + entity.opt_name); + } + else { + await eventLog.addEvent("Sync conflict in options for " + entity.opt_name + ", " + utils.formatTwoTimestamps(orig.date_modified, entity.date_modified)); + } +} + +async function updateRecentNotes(entity, sourceId) { + const orig = await sql.getSingleResultOrNull("select * from recent_notes where note_id = ?", [entity.note_id]); + + if (orig === null || orig.date_accessed < entity.date_accessed) { + await sql.doInTransaction(async () => { + await sql.replace('recent_notes', entity); + + await sql.addRecentNoteSync(entity.note_id, sourceId); + }); + } +} + +module.exports = { + updateNote, + updateNoteTree, + updateNoteHistory, + updateNoteReordering, + updateOptions, + updateRecentNotes +}; \ No newline at end of file