From 15abee3ed0fd54a4d53a19b81810908c4cf5253c Mon Sep 17 00:00:00 2001 From: azivner Date: Tue, 31 Oct 2017 19:34:58 -0400 Subject: [PATCH] incremental pull implementation --- routes/api/sync.js | 34 +++++--- services/sync.js | 198 +++++++++++++++++---------------------------- 2 files changed, 95 insertions(+), 137 deletions(-) diff --git a/routes/api/sync.js b/routes/api/sync.js index 672a57393..4ab84ebf8 100644 --- a/routes/api/sync.js +++ b/routes/api/sync.js @@ -4,6 +4,7 @@ const express = require('express'); const router = express.Router(); const auth = require('../../services/auth'); const sync = require('../../services/sync'); +const sql = require('../../services/sql'); router.post('/now', auth.checkApiAuth, async (req, res, next) => { const log = await sync.sync(); @@ -14,41 +15,50 @@ router.post('/now', auth.checkApiAuth, async (req, res, next) => { }); }); -router.get('/changed/:since', auth.checkApiAuth, async (req, res, next) => { - const since = parseInt(req.params.since); +router.get('/changed', auth.checkApiAuth, async (req, res, next) => { + const lastSyncId = parseInt(req.query.lastSyncId); + const sourceId = parseInt(req.query.sourceId); - const result = await sync.getChangedSince(since); + const result = await sync.getChanged(lastSyncId, sourceId); res.send(result); }); -router.put('/changed', auth.checkApiAuth, async (req, res, next) => { - await sync.putChanged(req.body); +router.get('/notes/:noteId', auth.checkApiAuth, async (req, res, next) => { + const noteId = req.params.noteId; - res.send({}); + res.send({ + entity: await sql.getSingleResult("SELECT * FROM notes WHERE note_id = ?", [noteId]), + links: await sql.getResults("SELECT * FROM links WHERE note_id = ?", [noteId]) + }); }); -router.get('/note/:noteId/:since', auth.checkApiAuth, async (req, res, next) => { +router.get('/notes_tree/:noteId', auth.checkApiAuth, async (req, res, next) => { const noteId = req.params.noteId; - const since = parseInt(req.params.since); - res.send(await sync.getNoteSince(noteId, since)); + res.send(await sql.getSingleResult("SELECT * FROM notes_tree WHERE note_id = ?", [noteId])); +}); + +router.get('/notes_history/:noteHistoryId', auth.checkApiAuth, async (req, res, next) => { + const noteHistoryId = req.params.noteHistoryId; + + res.send(await sql.getSingleResult("SELECT * FROM notes_history WHERE note_history_id = ?", [noteHistoryId])); }); router.put('/notes', auth.checkApiAuth, async (req, res, next) => { - await sync.updateNote(req.body); + await sync.updateNote(req.body.entity, req.body.links, req.body.source_id); res.send({}); }); router.put('/notes_tree', auth.checkApiAuth, async (req, res, next) => { - await sync.updateNoteTree(req.body); + await sync.updateNoteTree(req.body.entity, req.body.source_id); res.send({}); }); router.put('/notes_history', auth.checkApiAuth, async (req, res, next) => { - await sync.updateNoteHistory(req.body); + await sync.updateNoteHistory(req.body.entity, req.body.source_id); res.send({}); }); diff --git a/services/sync.js b/services/sync.js index a4b6ae411..d04901b5f 100644 --- a/services/sync.js +++ b/services/sync.js @@ -6,8 +6,6 @@ const sql = require('./sql'); const migration = require('./migration'); const utils = require('./utils'); const config = require('./config'); -const audit_category = require('./audit_category'); -const crypto = require('crypto'); const SOURCE_ID = require('./source_id'); const SYNC_SERVER = config['Sync']['syncServerHost']; @@ -18,56 +16,56 @@ let syncInProgress = false; async function pullSync(cookieJar, syncLog) { const lastSyncedPull = parseInt(await sql.getOption('last_synced_pull')); - let resp; + let syncRows; try { - resp = await rp({ - uri: SYNC_SERVER + '/api/sync/changed/' + lastSyncedPull, - headers: { - auth: 'sync' - }, + syncRows = await rp({ + uri: SYNC_SERVER + '/api/sync/changed?lastSyncId=' + lastSyncedPull + "&sourceId=" + SOURCE_ID, jar: cookieJar, json: true }); + + logSync("Pulled " + syncRows.length + " changes"); } catch (e) { - throw new Error("Can't pull changed, inner exception: " + e.stack); + throw new Error("Can't pull changes, inner exception: " + e.stack); } - try { + for (const sync of syncRows) { + let resp; + + try { + resp = await rp({ + uri: SYNC_SERVER + "/api/sync/" + sync.entity_name + "/" + sync.entity_id, + json: true, + jar: cookieJar + }); + } + catch (e) { + throw new Error("Can't pull " + sync.entity_name + " " + sync.entity_id + ", inner exception: " + e.stack); + } + await sql.doInTransaction(async () => { - await putChanged(resp, syncLog); + if (sync.entity_name === 'notes') { + await updateNote(resp.entity, resp.links, sync.source_id, syncLog) + } + else if (sync.entity_name === 'notes_tree') { + await updateNoteTree(resp.entity, sync.source_id, syncLog) + } + else if (sync.entity_name === 'notes_history') { + await updateNoteHistory(resp.entity, sync.source_id, syncLog) + } + else { + logSync("Unrecognized entity type " + sync.entity_name, syncLog); - for (const noteId of resp.notes) { - let note; - - try { - note = await rp({ - uri: SYNC_SERVER + "/api/sync/note/" + noteId + "/" + lastSyncedPull, - headers: { - auth: 'sync' - }, - json: true, - jar: cookieJar - }); - } - catch (e) { - throw new Error("Can't pull note " + noteId + ", inner exception: " + e.stack); - } - - await putNote(note, syncLog); + throw new Error("Unrecognized entity type " + sync.entity_name); } - if (resp.notes.length > 0) { - await sql.addAudit(audit_category.SYNC); - } - - await sql.setOption('last_synced_pull', resp.syncTimestamp); + await sql.setOption('last_synced_pull', sync.id); }); } - catch (e) { - throw e; - } + + logSync("Finished pull"); } async function syncEntity(entity, entityName, cookieJar, syncLog) { @@ -91,52 +89,47 @@ async function syncEntity(entity, entityName, cookieJar, syncLog) { }); } catch (e) { - throw new Error("Failed sending update for entity " + entityName + ", inner exception: " + e.stack); - } -} + logSync("Failed sending update for entity " + entityName + ", inner exception: " + e.stack, syncLog); -async function syncEntities(entities, entityName, cookieJar, syncLog) { - for (const entity of entities) { - await syncEntity(entity, entityName, cookieJar, syncLog); + throw new Error("Failed sending update for entity " + entityName + ", inner exception: " + e.stack); } } async function pushSync(cookieJar, syncLog) { let lastSyncedPush = parseInt(await sql.getOption('last_synced_push')); - const syncStarted = utils.nowTimestamp(); while (true) { - const oldestUnsyncedDateModified = await sql.getSingleValue(` - SELECT MIN(date_modified) FROM ( - SELECT MIN(date_modified) AS date_modified FROM notes_tree WHERE date_modified > ? - UNION - SELECT MIN(date_modified) AS date_modified FROM notes WHERE date_modified > ? - UNION - SELECT MIN(date_modified_to) AS date_modified FROM notes_history WHERE date_modified_to > ? - )`, [lastSyncedPush, lastSyncedPush, lastSyncedPush]); + const sync = await sql.getSingleResultOrNull('SELECT * FROM sync WHERE id > ? LIMIT 1', [lastSyncedPush]); + + if (sync === null) { + // nothing to sync + + logSync("Nothing to push", syncLog); - if (oldestUnsyncedDateModified === null) { break; } await sql.doInTransaction(async () => { - const notesTree = await sql.getResults('SELECT * FROM notes_tree WHERE date_modified = ?', [oldestUnsyncedDateModified]); - await syncEntities(notesTree, 'notes_tree', cookieJar, syncLog); + let entity; - const notes = await sql.getResults('SELECT * FROM notes WHERE date_modified = ?', [oldestUnsyncedDateModified]); - await syncEntities(notes, 'notes', cookieJar, syncLog); - - const notesHistory = await sql.getResults('SELECT * FROM notes_history WHERE date_modified_to = ?', [oldestUnsyncedDateModified]); - await syncEntities(notesHistory, 'notes_history', cookieJar, syncLog); - - lastSyncedPush = oldestUnsyncedDateModified; - - // if the sync started in the same second as the last changes then it's possible we synced only parts - // of this second's changes. In that case we'll leave the last_synced_push as it is and stop the sync - // so next time we'll re-do this second again, this guaranteeing all changes have been pushed - if (lastSyncedPush === syncStarted) { - return; + if (sync.entity_name === 'notes') { + entity = await sql.getSingleResult('SELECT * FROM notes WHERE note_id = ?', [sync.entity_id]); } + else if (sync.entity_name === 'notes_tree') { + entity = await sql.getSingleResult('SELECT * FROM notes_tree WHERE note_id = ?', [sync.entity_id]); + } + else if (sync.entity_name === 'notes_history') { + entity = await sql.getSingleResult('SELECT * FROM notes_history WHERE note_history_id = ?', [sync.entity_id]); + } + else { + logSync("Unrecognized entity type " + sync.entity_name, syncLog); + + throw new Error("Unrecognized entity type " + sync.entity_name); + } + + await syncEntity(entity, sync.entity_name, cookieJar, syncLog); + + lastSyncedPush = sync.id; await sql.setOption('last_synced_push', lastSyncedPush); }); @@ -216,48 +209,11 @@ function logSync(message, syncLog) { console.log(message); } -async function getChangedSince(since) { - return { - 'syncTimestamp': utils.nowTimestamp(), - 'tree': await sql.getResults("select * from notes_tree where date_modified >= ?", [since]), - 'notes': await sql.getFlattenedResults('note_id', "select note_id from notes where date_modified >= ?", [since]), - 'audit_log': await sql.getResults("select * from audit_log where category != 'SYNC' and date_modified >= ?", [since]) - }; +async function getChanged(lastSyncId, sourceId) { + return await sql.getResults("SELECT * FROM sync WHERE id > ? AND source_id != ?", [lastSyncId, sourceId]); } -async function getNoteSince(noteId, since) { - return { - 'detail': await sql.getSingleResult("select * from notes where note_id = ?", [noteId]), - 'images': await sql.getResults("select * from images where note_id = ? order by note_offset", [noteId]), - 'history': await sql.getResults("select * from notes_history where note_id = ? and date_modified_to >= ?", [noteId, since]) - }; -} - -async function putChanged(changed, syncLog) { - for (const treeItem of changed.tree) { - delete treeItem['id']; - - await sql.insert("notes_tree", treeItem, true); - - logSync("Update/sync notes_tree " + treeItem.note_id, syncLog); - } - - for (const audit of changed.audit_log) { - await sql.insert("audit_log", audit, true); - - logSync("Update/sync audit_log for category=" + audit.category + ", noteId=" + audit.note_id, syncLog); - } - - if (changed.tree.length > 0 || changed.audit_log.length > 0) { - logSync("Added final audit", syncLog); - - await sql.addAudit(audit_category.SYNC); - } -} - -async function updateNote(body, syncLog) { - const entity = body.entity; - +async function updateNote(entity, links, sourceId, syncLog) { const origNote = await sql.getSingleResult("select * from notes where note_id = ?", [entity.note_id]); if (origNote === null || origNote.date_modified <= entity.date_modified) { @@ -266,13 +222,13 @@ async function updateNote(body, syncLog) { await sql.remove("links", entity.note_id); - for (const link of body.links) { + for (const link of links) { delete link['lnk_id']; await sql.insert('link', link); } - await sql.addNoteSync(entity.note_id, body.source_id); + await sql.addNoteSync(entity.note_id, sourceId); }); logSync("Update/sync note " + entity.note_id, syncLog); @@ -282,16 +238,14 @@ async function updateNote(body, syncLog) { } } -async function updateNoteTree(body, syncLog) { - const entity = body.entity; - +async function updateNoteTree(entity, sourceId, syncLog) { const orig = await sql.getSingleResultOrNull("select * from notes_tree where note_id = ?", [entity.note_id]); if (orig === null || orig.date_modified < entity.date_modified) { await sql.doInTransaction(async () => { await sql.replace('notes_tree', entity); - await sql.addNoteTreeSync(entity.note_id, body.source_id); + await sql.addNoteTreeSync(entity.note_id, sourceId); }); logSync("Update/sync note tree " + entity.note_id, syncLog); @@ -301,20 +255,16 @@ async function updateNoteTree(body, syncLog) { } } -async function updateNoteHistory(body, syncLog) { - const entity = body.entity; - - const orig = await sql.getSingleResultOrNull("select * from notes_history where note_history_id", [entity.note_history_id]); +async function updateNoteHistory(entity, sourceId, syncLog) { + const orig = await sql.getSingleResultOrNull("select * from notes_history where note_history_id = ?", [entity.note_history_id]); if (orig === null || orig.date_modified_to < entity.date_modified_to) { await sql.doInTransaction(async () => { - await sql.execute("delete from notes_history where note_history_id", [entity.note_history_id]); - delete entity['id']; - await sql.insert('notes_history', entity); + await sql.replace('notes_history', entity); - await sql.addNoteHistorySync(entity.note_history_id, body.source_id); + await sql.addNoteHistorySync(entity.note_history_id, sourceId); }); logSync("Update/sync note history " + entity.note_id, syncLog); @@ -338,9 +288,7 @@ else { module.exports = { sync, - getChangedSince, - getNoteSince, - putChanged, + getChanged, updateNote, updateNoteTree, updateNoteHistory