bulk push sync

This commit is contained in:
azivner 2018-04-07 22:25:28 -04:00
parent 64336ffbee
commit 13f71f8967
5 changed files with 144 additions and 151 deletions

View File

@ -11,7 +11,7 @@ const log = require('../../services/log');
async function checkSync() {
return {
'hashes': await contentHashService.getHashes(),
'max_sync_id': await sql.getValue('SELECT MAX(id) FROM sync')
'maxSyncId': await sql.getValue('SELECT MAX(id) FROM sync')
};
}
@ -58,51 +58,17 @@ async function forceNoteSync(req) {
async function getChanged(req) {
const lastSyncId = parseInt(req.query.lastSyncId);
const records = [];
let length = 0;
const syncs = await sql.getRows("SELECT * FROM sync WHERE id > ? LIMIT 1000", [lastSyncId]);
for (const sync of await sql.getRows("SELECT * FROM sync WHERE id > ?", [lastSyncId])) {
const record = {
sync: sync,
entity: await getEntityRow(sync.entityName, sync.entityId)
};
records.push(record);
length += JSON.stringify(record).length;
if (length > 1000000) {
break;
}
}
return records;
return await syncService.getSyncRecords(syncs);
}
const primaryKeys = {
"notes": "noteId",
"branches": "branchId",
"note_revisions": "noteRevisionId",
"option": "name",
"recent_notes": "branchId",
"images": "imageId",
"note_images": "noteImageId",
"labels": "labelId",
"api_tokens": "apiTokenId"
};
async function update(req) {
const sourceId = req.body.sourceId;
const entities = req.body.entities;
async function getEntityRow(entityName, entityId) {
if (entityName === 'note_reordering') {
return await getNoteReordering(entityId);
}
else {
const primaryKey = primaryKeys[entityName];
if (!primaryKey) {
throw new Error("Unknown entity " + entityName);
}
return await sql.getRow(`SELECT * FROM ${entityName} WHERE ${primaryKey} = ?`, [entityId]);
for (const {sync, entity} of entities) {
await syncUpdateService.updateEntity(sync.entityName, entity, sourceId);
}
}
@ -141,10 +107,6 @@ async function getOption(req) {
}
}
async function getNoteReordering(parentNoteId) {
return await sql.getMap("SELECT branchId, notePosition FROM branches WHERE parentNoteId = ? AND isDeleted = 0", [parentNoteId])
}
async function getRecentNote(req) {
const branchId = req.params.branchId;
@ -231,7 +193,6 @@ module.exports = {
getBranch,
getImage,
getNoteImage,
getNoteReordering,
getNoteRevision,
getRecentNote,
getOption,
@ -246,5 +207,6 @@ module.exports = {
updateRecentNote,
updateOption,
updateLabel,
updateApiToken
updateApiToken,
update
};

View File

@ -147,6 +147,7 @@ function register(app) {
apiRoute(POST, '/api/sync/force-full-sync', syncApiRoute.forceFullSync);
apiRoute(POST, '/api/sync/force-note-sync/:noteId', syncApiRoute.forceNoteSync);
apiRoute(GET, '/api/sync/changed', syncApiRoute.getChanged);
apiRoute(PUT, '/api/sync/update', syncApiRoute.update);
apiRoute(GET, '/api/sync/notes/:noteId', syncApiRoute.getNote);
apiRoute(GET, '/api/sync/branches/:branchId', syncApiRoute.getBranch);
apiRoute(GET, '/api/sync/note_revisions/:noteRevisionId', syncApiRoute.getNoteRevision);

View File

@ -120,38 +120,8 @@ async function pullSync(syncContext) {
if (!entity) {
log.error(`Empty response to pull for sync #${sync.id} ${sync.entityName}, id=${sync.entityId}`);
}
else if (sync.entityName === 'notes') {
await syncUpdateService.updateNote(entity, syncContext.sourceId);
}
else if (sync.entityName === 'branches') {
await syncUpdateService.updateBranch(entity, syncContext.sourceId);
}
else if (sync.entityName === 'note_revisions') {
await syncUpdateService.updateNoteRevision(entity, syncContext.sourceId);
}
else if (sync.entityName === 'note_reordering') {
await syncUpdateService.updateNoteReordering(entity, syncContext.sourceId);
}
else if (sync.entityName === 'options') {
await syncUpdateService.updateOptions(entity, syncContext.sourceId);
}
else if (sync.entityName === 'recent_notes') {
await syncUpdateService.updateRecentNotes(entity, syncContext.sourceId);
}
else if (sync.entityName === 'images') {
await syncUpdateService.updateImage(entity, syncContext.sourceId);
}
else if (sync.entityName === 'note_images') {
await syncUpdateService.updateNoteImage(entity, syncContext.sourceId);
}
else if (sync.entityName === 'labels') {
await syncUpdateService.updateLabel(entity, syncContext.sourceId);
}
else if (sync.entityName === 'api_tokens') {
await syncUpdateService.updateApiToken(entity, syncContext.sourceId);
}
else {
throw new Error(`Unrecognized entity type ${sync.entityName} in sync #${sync.id}`);
await syncUpdateService.updateEntity(sync.entityName, entity, syncContext.sourceId);
}
await setLastSyncedPull(sync.id);
@ -172,90 +142,47 @@ async function pushSync(syncContext) {
let lastSyncedPush = await getLastSyncedPush();
while (true) {
const sync = await sql.getRowOrNull('SELECT * FROM sync WHERE id > ? LIMIT 1', [lastSyncedPush]);
const syncs = await sql.getRows('SELECT * FROM sync WHERE id > ? LIMIT 1000', [lastSyncedPush]);
if (sync === null) {
const filteredSyncs = syncs.filter(sync => {
if (sync.sourceId === syncContext.sourceId) {
log.info(`Skipping push #${sync.id} ${sync.entityName} ${sync.entityId} because it originates from sync target`);
// this may set lastSyncedPush beyond what's actually sent (because of size limit)
// so this is applied to the database only if there's no actual update
// TODO: it would be better to simplify this somehow
lastSyncedPush = sync.id;
return false;
}
else {
return true;
}
});
if (filteredSyncs.length === 0) {
// nothing to sync
log.info("Nothing to push");
await setLastSyncedPush(lastSyncedPush);
break;
}
if (sync.sourceId === syncContext.sourceId) {
log.info(`Skipping push #${sync.id} ${sync.entityName} ${sync.entityId} because it originates from sync target`);
}
else {
await pushEntity(sync, syncContext);
}
const syncRecords = await getSyncRecords(filteredSyncs);
lastSyncedPush = sync.id;
await syncRequest(syncContext, 'PUT', '/api/sync/update', {
sourceId: sourceIdService.getCurrentSourceId(),
entities: syncRecords
});
lastSyncedPush = syncRecords[syncRecords.length - 1].sync.id;
await setLastSyncedPush(lastSyncedPush);
}
}
async function pushEntity(sync, syncContext) {
let entity;
if (sync.entityName === 'notes') {
entity = await sql.getRow('SELECT * FROM notes WHERE noteId = ?', [sync.entityId]);
serializeNoteContentBuffer(entity);
}
else if (sync.entityName === 'branches') {
entity = await sql.getRow('SELECT * FROM branches WHERE branchId = ?', [sync.entityId]);
}
else if (sync.entityName === 'note_revisions') {
entity = await sql.getRow('SELECT * FROM note_revisions WHERE noteRevisionId = ?', [sync.entityId]);
}
else if (sync.entityName === 'note_reordering') {
entity = {
parentNoteId: sync.entityId,
ordering: await sql.getMap('SELECT branchId, notePosition FROM branches WHERE parentNoteId = ? AND isDeleted = 0', [sync.entityId])
};
}
else if (sync.entityName === 'options') {
entity = await sql.getRow('SELECT * FROM options WHERE name = ?', [sync.entityId]);
}
else if (sync.entityName === 'recent_notes') {
entity = await sql.getRow('SELECT * FROM recent_notes WHERE branchId = ?', [sync.entityId]);
}
else if (sync.entityName === 'images') {
entity = await sql.getRow('SELECT * FROM images WHERE imageId = ?', [sync.entityId]);
if (entity.data !== null) {
entity.data = entity.data.toString('base64');
}
}
else if (sync.entityName === 'note_images') {
entity = await sql.getRow('SELECT * FROM note_images WHERE noteImageId = ?', [sync.entityId]);
}
else if (sync.entityName === 'labels') {
entity = await sql.getRow('SELECT * FROM labels WHERE labelId = ?', [sync.entityId]);
}
else if (sync.entityName === 'api_tokens') {
entity = await sql.getRow('SELECT * FROM api_tokens WHERE apiTokenId = ?', [sync.entityId]);
}
else {
throw new Error(`Unrecognized entity type ${sync.entityName} in sync #${sync.id}`);
}
if (!entity) {
log.info(`Sync #${sync.id} entity for ${sync.entityName} ${sync.entityId} doesn't exist. Skipping.`);
return;
}
log.info(`Pushing changes in sync #${sync.id} ${sync.entityName} ${sync.entityId}`);
const payload = {
sourceId: sourceIdService.getCurrentSourceId(),
entity: entity
};
await syncRequest(syncContext, 'PUT', '/api/sync/' + sync.entityName, payload);
}
function serializeNoteContentBuffer(note) {
if (note.type === 'file') {
note.content = note.content.toString("binary");
@ -265,7 +192,7 @@ function serializeNoteContentBuffer(note) {
async function checkContentHash(syncContext) {
const resp = await syncRequest(syncContext, 'GET', '/api/sync/check');
if (await getLastSyncedPull() < resp.max_sync_id) {
if (await getLastSyncedPull() < resp.maxSyncId) {
log.info("There are some outstanding pulls, skipping content check.");
return;
@ -329,6 +256,68 @@ async function syncRequest(syncContext, method, uri, body) {
}
}
const primaryKeys = {
"notes": "noteId",
"branches": "branchId",
"note_revisions": "noteRevisionId",
"option": "name",
"recent_notes": "branchId",
"images": "imageId",
"note_images": "noteImageId",
"labels": "labelId",
"api_tokens": "apiTokenId"
};
async function getEntityRow(entityName, entityId) {
if (entityName === 'note_reordering') {
return await getNoteReordering(entityId);
}
else {
const primaryKey = primaryKeys[entityName];
if (!primaryKey) {
throw new Error("Unknown entity " + entityName);
}
const entityRow = await sql.getRow(`SELECT * FROM ${entityName} WHERE ${primaryKey} = ?`, [entityId]);
if (entityName === 'notes') {
serializeNoteContentBuffer(entityRow);
}
else if (entityName === 'images') {
entityRow.data = entityRow.data.toString('base64');
}
return entityRow;
}
}
async function getSyncRecords(syncs) {
const records = [];
let length = 0;
for (const sync of syncs) {
const record = {
sync: sync,
entity: await getEntityRow(sync.entityName, sync.entityId)
};
records.push(record);
length += JSON.stringify(record).length;
if (length > 1000000) {
break;
}
}
return records;
}
async function getNoteReordering(parentNoteId) {
return await sql.getMap("SELECT branchId, notePosition FROM branches WHERE parentNoteId = ? AND isDeleted = 0", [parentNoteId])
}
sqlInit.dbReady.then(() => {
if (syncSetup.isSyncSetup) {
log.info("Setting up sync to " + syncSetup.SYNC_SERVER + " with timeout " + syncSetup.SYNC_TIMEOUT);
@ -355,5 +344,7 @@ sqlInit.dbReady.then(() => {
module.exports = {
sync,
serializeNoteContentBuffer
serializeNoteContentBuffer,
getEntityRow,
getSyncRecords
};

View File

@ -91,6 +91,8 @@ async function fillSyncRows(entityName, entityKey) {
}
async function fillAllSyncRows() {
await sql.execute("DELETE FROM sync");
await fillSyncRows("notes", "noteId");
await fillSyncRows("branches", "branchId");
await fillSyncRows("note_revisions", "noteRevisionId");

View File

@ -3,6 +3,42 @@ const log = require('./log');
const eventLogService = require('./event_log');
const syncTableService = require('./sync_table');
async function updateEntity(entityName, entity, sourceId) {
if (entityName === 'notes') {
await updateNote(entity, sourceId);
}
else if (entityName === 'branches') {
await updateBranch(entity, sourceId);
}
else if (entityName === 'note_revisions') {
await updateNoteRevision(entity, sourceId);
}
else if (entityName === 'note_reordering') {
await updateNoteReordering(entity, sourceId);
}
else if (entityName === 'options') {
await updateOptions(entity, sourceId);
}
else if (entityName === 'recent_notes') {
await updateRecentNotes(entity, sourceId);
}
else if (entityName === 'images') {
await updateImage(entity, sourceId);
}
else if (entityName === 'note_images') {
await updateNoteImage(entity, sourceId);
}
else if (entityName === 'labels') {
await updateLabel(entity, sourceId);
}
else if (entityName === 'api_tokens') {
await updateApiToken(entity, sourceId);
}
else {
throw new Error(`Unrecognized entity type ${entityName}`);
}
}
function deserializeNoteContentBuffer(note) {
if (note.type === 'file') {
note.content = new Buffer(note.content, 'binary');
@ -159,6 +195,7 @@ async function updateApiToken(entity, sourceId) {
}
module.exports = {
updateEntity,
updateNote,
updateBranch,
updateNoteRevision,