diff --git a/src/routes/api/sync.js b/src/routes/api/sync.js index 453e7dfa7..d67b4f5a0 100644 --- a/src/routes/api/sync.js +++ b/src/routes/api/sync.js @@ -11,7 +11,7 @@ const log = require('../../services/log'); async function checkSync() { return { 'hashes': await contentHashService.getHashes(), - 'max_sync_id': await sql.getValue('SELECT MAX(id) FROM sync') + 'maxSyncId': await sql.getValue('SELECT MAX(id) FROM sync') }; } @@ -58,51 +58,17 @@ async function forceNoteSync(req) { async function getChanged(req) { const lastSyncId = parseInt(req.query.lastSyncId); - const records = []; - let length = 0; + const syncs = await sql.getRows("SELECT * FROM sync WHERE id > ? LIMIT 1000", [lastSyncId]); - for (const sync of await sql.getRows("SELECT * FROM sync WHERE id > ?", [lastSyncId])) { - const record = { - sync: sync, - entity: await getEntityRow(sync.entityName, sync.entityId) - }; - - records.push(record); - - length += JSON.stringify(record).length; - - if (length > 1000000) { - break; - } - } - - return records; + return await syncService.getSyncRecords(syncs); } -const primaryKeys = { - "notes": "noteId", - "branches": "branchId", - "note_revisions": "noteRevisionId", - "option": "name", - "recent_notes": "branchId", - "images": "imageId", - "note_images": "noteImageId", - "labels": "labelId", - "api_tokens": "apiTokenId" -}; +async function update(req) { + const sourceId = req.body.sourceId; + const entities = req.body.entities; -async function getEntityRow(entityName, entityId) { - if (entityName === 'note_reordering') { - return await getNoteReordering(entityId); - } - else { - const primaryKey = primaryKeys[entityName]; - - if (!primaryKey) { - throw new Error("Unknown entity " + entityName); - } - - return await sql.getRow(`SELECT * FROM ${entityName} WHERE ${primaryKey} = ?`, [entityId]); + for (const {sync, entity} of entities) { + await syncUpdateService.updateEntity(sync.entityName, entity, sourceId); } } @@ -141,10 +107,6 @@ async function getOption(req) { } } -async function getNoteReordering(parentNoteId) { - return await sql.getMap("SELECT branchId, notePosition FROM branches WHERE parentNoteId = ? AND isDeleted = 0", [parentNoteId]) -} - async function getRecentNote(req) { const branchId = req.params.branchId; @@ -231,7 +193,6 @@ module.exports = { getBranch, getImage, getNoteImage, - getNoteReordering, getNoteRevision, getRecentNote, getOption, @@ -246,5 +207,6 @@ module.exports = { updateRecentNote, updateOption, updateLabel, - updateApiToken + updateApiToken, + update }; \ No newline at end of file diff --git a/src/routes/routes.js b/src/routes/routes.js index 156d190a7..83e58bed1 100644 --- a/src/routes/routes.js +++ b/src/routes/routes.js @@ -147,6 +147,7 @@ function register(app) { apiRoute(POST, '/api/sync/force-full-sync', syncApiRoute.forceFullSync); apiRoute(POST, '/api/sync/force-note-sync/:noteId', syncApiRoute.forceNoteSync); apiRoute(GET, '/api/sync/changed', syncApiRoute.getChanged); + apiRoute(PUT, '/api/sync/update', syncApiRoute.update); apiRoute(GET, '/api/sync/notes/:noteId', syncApiRoute.getNote); apiRoute(GET, '/api/sync/branches/:branchId', syncApiRoute.getBranch); apiRoute(GET, '/api/sync/note_revisions/:noteRevisionId', syncApiRoute.getNoteRevision); diff --git a/src/services/sync.js b/src/services/sync.js index b9bf1f4ea..685451a28 100644 --- a/src/services/sync.js +++ b/src/services/sync.js @@ -120,38 +120,8 @@ async function pullSync(syncContext) { if (!entity) { log.error(`Empty response to pull for sync #${sync.id} ${sync.entityName}, id=${sync.entityId}`); } - else if (sync.entityName === 'notes') { - await syncUpdateService.updateNote(entity, syncContext.sourceId); - } - else if (sync.entityName === 'branches') { - await syncUpdateService.updateBranch(entity, syncContext.sourceId); - } - else if (sync.entityName === 'note_revisions') { - await syncUpdateService.updateNoteRevision(entity, syncContext.sourceId); - } - else if (sync.entityName === 'note_reordering') { - await syncUpdateService.updateNoteReordering(entity, syncContext.sourceId); - } - else if (sync.entityName === 'options') { - await syncUpdateService.updateOptions(entity, syncContext.sourceId); - } - else if (sync.entityName === 'recent_notes') { - await syncUpdateService.updateRecentNotes(entity, syncContext.sourceId); - } - else if (sync.entityName === 'images') { - await syncUpdateService.updateImage(entity, syncContext.sourceId); - } - else if (sync.entityName === 'note_images') { - await syncUpdateService.updateNoteImage(entity, syncContext.sourceId); - } - else if (sync.entityName === 'labels') { - await syncUpdateService.updateLabel(entity, syncContext.sourceId); - } - else if (sync.entityName === 'api_tokens') { - await syncUpdateService.updateApiToken(entity, syncContext.sourceId); - } else { - throw new Error(`Unrecognized entity type ${sync.entityName} in sync #${sync.id}`); + await syncUpdateService.updateEntity(sync.entityName, entity, syncContext.sourceId); } await setLastSyncedPull(sync.id); @@ -172,90 +142,47 @@ async function pushSync(syncContext) { let lastSyncedPush = await getLastSyncedPush(); while (true) { - const sync = await sql.getRowOrNull('SELECT * FROM sync WHERE id > ? LIMIT 1', [lastSyncedPush]); + const syncs = await sql.getRows('SELECT * FROM sync WHERE id > ? LIMIT 1000', [lastSyncedPush]); - if (sync === null) { + const filteredSyncs = syncs.filter(sync => { + if (sync.sourceId === syncContext.sourceId) { + log.info(`Skipping push #${sync.id} ${sync.entityName} ${sync.entityId} because it originates from sync target`); + + // this may set lastSyncedPush beyond what's actually sent (because of size limit) + // so this is applied to the database only if there's no actual update + // TODO: it would be better to simplify this somehow + lastSyncedPush = sync.id; + + return false; + } + else { + return true; + } + }); + + if (filteredSyncs.length === 0) { // nothing to sync log.info("Nothing to push"); + await setLastSyncedPush(lastSyncedPush); + break; } - if (sync.sourceId === syncContext.sourceId) { - log.info(`Skipping push #${sync.id} ${sync.entityName} ${sync.entityId} because it originates from sync target`); - } - else { - await pushEntity(sync, syncContext); - } + const syncRecords = await getSyncRecords(filteredSyncs); - lastSyncedPush = sync.id; + await syncRequest(syncContext, 'PUT', '/api/sync/update', { + sourceId: sourceIdService.getCurrentSourceId(), + entities: syncRecords + }); + + lastSyncedPush = syncRecords[syncRecords.length - 1].sync.id; await setLastSyncedPush(lastSyncedPush); } } -async function pushEntity(sync, syncContext) { - let entity; - - if (sync.entityName === 'notes') { - entity = await sql.getRow('SELECT * FROM notes WHERE noteId = ?', [sync.entityId]); - - serializeNoteContentBuffer(entity); - } - else if (sync.entityName === 'branches') { - entity = await sql.getRow('SELECT * FROM branches WHERE branchId = ?', [sync.entityId]); - } - else if (sync.entityName === 'note_revisions') { - entity = await sql.getRow('SELECT * FROM note_revisions WHERE noteRevisionId = ?', [sync.entityId]); - } - else if (sync.entityName === 'note_reordering') { - entity = { - parentNoteId: sync.entityId, - ordering: await sql.getMap('SELECT branchId, notePosition FROM branches WHERE parentNoteId = ? AND isDeleted = 0', [sync.entityId]) - }; - } - else if (sync.entityName === 'options') { - entity = await sql.getRow('SELECT * FROM options WHERE name = ?', [sync.entityId]); - } - else if (sync.entityName === 'recent_notes') { - entity = await sql.getRow('SELECT * FROM recent_notes WHERE branchId = ?', [sync.entityId]); - } - else if (sync.entityName === 'images') { - entity = await sql.getRow('SELECT * FROM images WHERE imageId = ?', [sync.entityId]); - - if (entity.data !== null) { - entity.data = entity.data.toString('base64'); - } - } - else if (sync.entityName === 'note_images') { - entity = await sql.getRow('SELECT * FROM note_images WHERE noteImageId = ?', [sync.entityId]); - } - else if (sync.entityName === 'labels') { - entity = await sql.getRow('SELECT * FROM labels WHERE labelId = ?', [sync.entityId]); - } - else if (sync.entityName === 'api_tokens') { - entity = await sql.getRow('SELECT * FROM api_tokens WHERE apiTokenId = ?', [sync.entityId]); - } - else { - throw new Error(`Unrecognized entity type ${sync.entityName} in sync #${sync.id}`); - } - - if (!entity) { - log.info(`Sync #${sync.id} entity for ${sync.entityName} ${sync.entityId} doesn't exist. Skipping.`); - return; - } - - log.info(`Pushing changes in sync #${sync.id} ${sync.entityName} ${sync.entityId}`); - - const payload = { - sourceId: sourceIdService.getCurrentSourceId(), - entity: entity - }; - - await syncRequest(syncContext, 'PUT', '/api/sync/' + sync.entityName, payload); -} - function serializeNoteContentBuffer(note) { if (note.type === 'file') { note.content = note.content.toString("binary"); @@ -265,7 +192,7 @@ function serializeNoteContentBuffer(note) { async function checkContentHash(syncContext) { const resp = await syncRequest(syncContext, 'GET', '/api/sync/check'); - if (await getLastSyncedPull() < resp.max_sync_id) { + if (await getLastSyncedPull() < resp.maxSyncId) { log.info("There are some outstanding pulls, skipping content check."); return; @@ -329,6 +256,68 @@ async function syncRequest(syncContext, method, uri, body) { } } +const primaryKeys = { + "notes": "noteId", + "branches": "branchId", + "note_revisions": "noteRevisionId", + "option": "name", + "recent_notes": "branchId", + "images": "imageId", + "note_images": "noteImageId", + "labels": "labelId", + "api_tokens": "apiTokenId" +}; + +async function getEntityRow(entityName, entityId) { + if (entityName === 'note_reordering') { + return await getNoteReordering(entityId); + } + else { + const primaryKey = primaryKeys[entityName]; + + if (!primaryKey) { + throw new Error("Unknown entity " + entityName); + } + + const entityRow = await sql.getRow(`SELECT * FROM ${entityName} WHERE ${primaryKey} = ?`, [entityId]); + + if (entityName === 'notes') { + serializeNoteContentBuffer(entityRow); + } + else if (entityName === 'images') { + entityRow.data = entityRow.data.toString('base64'); + } + + return entityRow; + } +} + +async function getSyncRecords(syncs) { + const records = []; + let length = 0; + + for (const sync of syncs) { + const record = { + sync: sync, + entity: await getEntityRow(sync.entityName, sync.entityId) + }; + + records.push(record); + + length += JSON.stringify(record).length; + + if (length > 1000000) { + break; + } + } + + return records; +} + +async function getNoteReordering(parentNoteId) { + return await sql.getMap("SELECT branchId, notePosition FROM branches WHERE parentNoteId = ? AND isDeleted = 0", [parentNoteId]) +} + sqlInit.dbReady.then(() => { if (syncSetup.isSyncSetup) { log.info("Setting up sync to " + syncSetup.SYNC_SERVER + " with timeout " + syncSetup.SYNC_TIMEOUT); @@ -355,5 +344,7 @@ sqlInit.dbReady.then(() => { module.exports = { sync, - serializeNoteContentBuffer + serializeNoteContentBuffer, + getEntityRow, + getSyncRecords }; \ No newline at end of file diff --git a/src/services/sync_table.js b/src/services/sync_table.js index cf80522c8..7ba8f5212 100644 --- a/src/services/sync_table.js +++ b/src/services/sync_table.js @@ -91,6 +91,8 @@ async function fillSyncRows(entityName, entityKey) { } async function fillAllSyncRows() { + await sql.execute("DELETE FROM sync"); + await fillSyncRows("notes", "noteId"); await fillSyncRows("branches", "branchId"); await fillSyncRows("note_revisions", "noteRevisionId"); diff --git a/src/services/sync_update.js b/src/services/sync_update.js index d33c9009e..2858561a5 100644 --- a/src/services/sync_update.js +++ b/src/services/sync_update.js @@ -3,6 +3,42 @@ const log = require('./log'); const eventLogService = require('./event_log'); const syncTableService = require('./sync_table'); +async function updateEntity(entityName, entity, sourceId) { + if (entityName === 'notes') { + await updateNote(entity, sourceId); + } + else if (entityName === 'branches') { + await updateBranch(entity, sourceId); + } + else if (entityName === 'note_revisions') { + await updateNoteRevision(entity, sourceId); + } + else if (entityName === 'note_reordering') { + await updateNoteReordering(entity, sourceId); + } + else if (entityName === 'options') { + await updateOptions(entity, sourceId); + } + else if (entityName === 'recent_notes') { + await updateRecentNotes(entity, sourceId); + } + else if (entityName === 'images') { + await updateImage(entity, sourceId); + } + else if (entityName === 'note_images') { + await updateNoteImage(entity, sourceId); + } + else if (entityName === 'labels') { + await updateLabel(entity, sourceId); + } + else if (entityName === 'api_tokens') { + await updateApiToken(entity, sourceId); + } + else { + throw new Error(`Unrecognized entity type ${entityName}`); + } +} + function deserializeNoteContentBuffer(note) { if (note.type === 'file') { note.content = new Buffer(note.content, 'binary'); @@ -159,6 +195,7 @@ async function updateApiToken(entity, sourceId) { } module.exports = { + updateEntity, updateNote, updateBranch, updateNoteRevision,