mirror of
				https://github.com/zadam/trilium.git
				synced 2025-11-04 13:39:01 +01:00 
			
		
		
		
	incremental push sync
This commit is contained in:
		
							parent
							
								
									de3d1b3e39
								
							
						
					
					
						commit
						8630b3685d
					
				@ -35,8 +35,20 @@ router.get('/note/:noteId/:since', auth.checkApiAuth, async (req, res, next) =>
 | 
			
		||||
    res.send(await sync.getNoteSince(noteId, since));
 | 
			
		||||
});
 | 
			
		||||
 | 
			
		||||
router.put('/note', auth.checkApiAuth, async (req, res, next) => {
 | 
			
		||||
    await sync.putNote(req.body);
 | 
			
		||||
router.put('/notes', auth.checkApiAuth, async (req, res, next) => {
 | 
			
		||||
    await sync.updateNote(req.body);
 | 
			
		||||
 | 
			
		||||
    res.send({});
 | 
			
		||||
});
 | 
			
		||||
 | 
			
		||||
router.put('/notes_tree', auth.checkApiAuth, async (req, res, next) => {
 | 
			
		||||
    await sync.updateNoteTree(req.body);
 | 
			
		||||
 | 
			
		||||
    res.send({});
 | 
			
		||||
});
 | 
			
		||||
 | 
			
		||||
router.put('/notes_history', auth.checkApiAuth, async (req, res, next) => {
 | 
			
		||||
    await sync.updateNoteHistory(req.body);
 | 
			
		||||
 | 
			
		||||
    res.send({});
 | 
			
		||||
});
 | 
			
		||||
 | 
			
		||||
@ -21,6 +21,10 @@ async function insert(table_name, rec, replace = false) {
 | 
			
		||||
    return res.lastID;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async function replace(table_name, rec) {
 | 
			
		||||
    return await insert(table_name, rec, true);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async function beginTransaction() {
 | 
			
		||||
    return await db.run("BEGIN");
 | 
			
		||||
}
 | 
			
		||||
@ -56,7 +60,17 @@ async function getSingleResult(query, params = []) {
 | 
			
		||||
async function getSingleResultOrNull(query, params = []) {
 | 
			
		||||
    const all = await db.all(query, ...params);
 | 
			
		||||
 | 
			
		||||
    return all ? all[0] : null;
 | 
			
		||||
    return all.length > 0 ? all[0] : null;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async function getSingleValue(query, params = []) {
 | 
			
		||||
    const row = await getSingleResultOrNull(query, params);
 | 
			
		||||
 | 
			
		||||
    if (!row) {
 | 
			
		||||
        return null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return row[Object.keys(row)[0]];
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async function getResults(query, params = []) {
 | 
			
		||||
@ -126,7 +140,10 @@ async function doInTransaction(func) {
 | 
			
		||||
 | 
			
		||||
module.exports = {
 | 
			
		||||
    insert,
 | 
			
		||||
    replace,
 | 
			
		||||
    getSingleValue,
 | 
			
		||||
    getSingleResult,
 | 
			
		||||
    getSingleResultOrNull,
 | 
			
		||||
    getResults,
 | 
			
		||||
    getFlattenedResults,
 | 
			
		||||
    execute,
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										160
									
								
								services/sync.js
									
									
									
									
									
								
							
							
						
						
									
										160
									
								
								services/sync.js
									
									
									
									
									
								
							@ -69,57 +69,69 @@ async function pullSync(cookieJar, syncLog) {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async function pushSync(cookieJar, syncLog) {
 | 
			
		||||
    const lastSyncedPush = parseInt(await sql.getOption('last_synced_push'));
 | 
			
		||||
    const syncStarted = utils.nowTimestamp();
 | 
			
		||||
 | 
			
		||||
    const changed = await getChangedSince(lastSyncedPush);
 | 
			
		||||
 | 
			
		||||
    if (changed.tree.length > 0 || changed.audit_log.length > 0) {
 | 
			
		||||
        logSync("Sending " + changed.tree.length + " tree changes and " + changed.audit_log.length + " audit changes", syncLog);
 | 
			
		||||
 | 
			
		||||
async function syncEntity(entity, entityName, cookieJar, syncLog) {
 | 
			
		||||
    try {
 | 
			
		||||
        const payload = {
 | 
			
		||||
            entity: entity
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        if (entityName === 'notes') {
 | 
			
		||||
            payload.links = await sql.getResults('select * from links where note_id = ?', [entity.note_id]);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        await rp({
 | 
			
		||||
            method: 'PUT',
 | 
			
		||||
                uri: SYNC_SERVER + '/api/sync/changed',
 | 
			
		||||
                headers: {
 | 
			
		||||
                    auth: 'sync'
 | 
			
		||||
                },
 | 
			
		||||
                body: changed,
 | 
			
		||||
                json: true,
 | 
			
		||||
                timeout: 300 * 1000, // this can take long time
 | 
			
		||||
                jar: cookieJar
 | 
			
		||||
            });
 | 
			
		||||
        }
 | 
			
		||||
        catch (e) {
 | 
			
		||||
            throw new Error("Can't send tree changes and audit, inner exception: " + e.stack);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    for (const noteId of changed.notes) {
 | 
			
		||||
        logSync("Sending note " + noteId, syncLog);
 | 
			
		||||
 | 
			
		||||
        const note = await getNoteSince(noteId);
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
            await rp({
 | 
			
		||||
                method: 'PUT',
 | 
			
		||||
                uri: SYNC_SERVER + '/api/sync/note',
 | 
			
		||||
                headers: {
 | 
			
		||||
                    auth: 'sync'
 | 
			
		||||
                },
 | 
			
		||||
                body: note,
 | 
			
		||||
            uri: SYNC_SERVER + '/api/sync/' + entityName,
 | 
			
		||||
            body: payload,
 | 
			
		||||
            json: true,
 | 
			
		||||
            timeout: 60 * 1000,
 | 
			
		||||
            jar: cookieJar
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
    catch (e) {
 | 
			
		||||
            throw new Error("Can't send note update, inner exception: " + e.stack);
 | 
			
		||||
        throw new Error("Failed sending update for entity " + entityName + ", inner exception: " + e.stack);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async function syncEntities(entities, entityName, cookieJar, syncLog) {
 | 
			
		||||
    for (const entity of entities) {
 | 
			
		||||
        await syncEntity(entity, entityName, cookieJar, syncLog);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async function pushSync(cookieJar, syncLog) {
 | 
			
		||||
    let lastSyncedPush = parseInt(await sql.getOption('last_synced_push'));
 | 
			
		||||
    const syncStarted = utils.nowTimestamp();
 | 
			
		||||
 | 
			
		||||
    while (true) {
 | 
			
		||||
        const oldestUnsyncedDateModified = await sql.getSingleValue(`
 | 
			
		||||
    SELECT MIN(date_modified) FROM (
 | 
			
		||||
        SELECT MIN(date_modified) AS date_modified FROM notes_tree WHERE date_modified > ?
 | 
			
		||||
        UNION
 | 
			
		||||
        SELECT MIN(date_modified) AS date_modified FROM notes WHERE date_modified > ?
 | 
			
		||||
        UNION
 | 
			
		||||
        SELECT MIN(date_modified_to) AS date_modified FROM notes_history WHERE date_modified_to > ?
 | 
			
		||||
    )`, [lastSyncedPush, lastSyncedPush, lastSyncedPush]);
 | 
			
		||||
 | 
			
		||||
        if (oldestUnsyncedDateModified === null) {
 | 
			
		||||
            break;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    await sql.setOption('last_synced_push', syncStarted);
 | 
			
		||||
        await sql.doInTransaction(async () => {
 | 
			
		||||
            const notesTree = await sql.getResults('SELECT * FROM notes_tree WHERE date_modified = ?', [oldestUnsyncedDateModified]);
 | 
			
		||||
            await syncEntities(notesTree, 'notes_tree', cookieJar, syncLog);
 | 
			
		||||
 | 
			
		||||
            const notes = await sql.getResults('SELECT * FROM notes WHERE date_modified = ?', [oldestUnsyncedDateModified]);
 | 
			
		||||
            await syncEntities(notes, 'notes', cookieJar, syncLog);
 | 
			
		||||
 | 
			
		||||
            const notesHistory = await sql.getResults('SELECT * FROM notes_history WHERE date_modified_to = ?', [oldestUnsyncedDateModified]);
 | 
			
		||||
            await syncEntities(notesHistory, 'notes_history', cookieJar, syncLog);
 | 
			
		||||
 | 
			
		||||
            lastSyncedPush = oldestUnsyncedDateModified;
 | 
			
		||||
 | 
			
		||||
            await sql.setOption('last_synced_push', lastSyncedPush);
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async function login() {
 | 
			
		||||
@ -173,7 +185,7 @@ async function sync() {
 | 
			
		||||
 | 
			
		||||
        await pushSync(cookieJar, syncLog);
 | 
			
		||||
 | 
			
		||||
        await pullSync(cookieJar, syncLog);
 | 
			
		||||
        //await pullSync(cookieJar, syncLog);
 | 
			
		||||
    }
 | 
			
		||||
    catch (e) {
 | 
			
		||||
        logSync("sync failed: " + e.stack, syncLog);
 | 
			
		||||
@ -234,34 +246,62 @@ async function putChanged(changed, syncLog) {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async function putNote(note, syncLog) {
 | 
			
		||||
    const origNote = await sql.getSingleResult("select * from notes where note_id = ?", [note.detail.note_id]);
 | 
			
		||||
async function updateNote(body, syncLog) {
 | 
			
		||||
    const entity = body.entity;
 | 
			
		||||
 | 
			
		||||
    try {
 | 
			
		||||
        if (origNote !== null && origNote.date_modified >= note.detail.date_modified) {
 | 
			
		||||
            // version we have in DB is actually newer than the one we're getting from sync
 | 
			
		||||
            // so we'll leave the current state as it is. The synced version should be stored in the history
 | 
			
		||||
    const origNote = await sql.getSingleResult("select * from notes where note_id = ?", [entity.note_id]);
 | 
			
		||||
 | 
			
		||||
    if (origNote === null || origNote.date_modified <= entity.date_modified) {
 | 
			
		||||
        await sql.doInTransaction(async () => {
 | 
			
		||||
            await sql.replace("notes", entity);
 | 
			
		||||
 | 
			
		||||
            await sql.remove("links", entity.note_id);
 | 
			
		||||
 | 
			
		||||
            for (const link of body.links) {
 | 
			
		||||
                delete link['lnk_id'];
 | 
			
		||||
 | 
			
		||||
                await sql.insert('link', link);
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        logSync("Update/sync note " + entity.note_id, syncLog);
 | 
			
		||||
    }
 | 
			
		||||
    else {
 | 
			
		||||
            await sql.insert("notes", note.detail, true);
 | 
			
		||||
        logSync("Sync conflict in note " + entity.note_id, syncLog);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
        await sql.remove("images", note.detail.note_id);
 | 
			
		||||
async function updateNoteTree(body, syncLog) {
 | 
			
		||||
    const entity = body.entity;
 | 
			
		||||
 | 
			
		||||
        for (const image of note.images) {
 | 
			
		||||
            await sql.insert("images", image);
 | 
			
		||||
    const orig = await sql.getSingleResultOrNull("select * from notes_tree where note_id = ?", [entity.note_id]);
 | 
			
		||||
 | 
			
		||||
    if (orig === null || orig.date_modified < entity.date_modified) {
 | 
			
		||||
        await sql.replace('notes_tree', entity);
 | 
			
		||||
 | 
			
		||||
        logSync("Update/sync note tree " + entity.note_id, syncLog);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
        for (const history of note.history) {
 | 
			
		||||
            delete history['id'];
 | 
			
		||||
 | 
			
		||||
            await sql.insert("notes_history", history, true);
 | 
			
		||||
    else {
 | 
			
		||||
        logSync("Sync conflict in note tree " + entity.note_id, syncLog);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
        logSync("Update/sync note " + note.detail.note_id, syncLog);
 | 
			
		||||
async function updateNoteHistory(body, syncLog) {
 | 
			
		||||
    const entity = body.entity;
 | 
			
		||||
 | 
			
		||||
    const orig = await sql.getSingleResultOrNull("select * from notes_history where note_id = ? and date_modified_from = ?", [entity.note_id, entity.date_modified_from]);
 | 
			
		||||
 | 
			
		||||
    if (orig === null || orig.date_modified_to < entity.date_modified_to) {
 | 
			
		||||
        await sql.execute("delete from notes_history where note_id = ? and date_modified_from = ?", [entity.note_id, entity.date_modified_from]);
 | 
			
		||||
 | 
			
		||||
        delete entity['id'];
 | 
			
		||||
 | 
			
		||||
        await sql.insert('notes_history', entity);
 | 
			
		||||
 | 
			
		||||
        logSync("Update/sync note history " + entity.note_id, syncLog);
 | 
			
		||||
    }
 | 
			
		||||
    catch (e) {
 | 
			
		||||
        throw new Error("Update note " + note.detail.note_id + " failed, inner exception: " + e.stack);
 | 
			
		||||
    else {
 | 
			
		||||
        logSync("Sync conflict in note history for " + entity.note_id + ", from=" + entity.date_modified_from + ", to=" + entity.date_modified_to, syncLog);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -282,5 +322,7 @@ module.exports = {
 | 
			
		||||
    getChangedSince,
 | 
			
		||||
    getNoteSince,
 | 
			
		||||
    putChanged,
 | 
			
		||||
    putNote
 | 
			
		||||
    updateNote,
 | 
			
		||||
    updateNoteTree,
 | 
			
		||||
    updateNoteHistory
 | 
			
		||||
};
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user