fixes of recent entity changes

This commit is contained in:
zadam 2020-12-14 14:17:51 +01:00
parent 6d7b9e0db3
commit 8b99f065d5
11 changed files with 96 additions and 65 deletions

View File

@ -12,7 +12,7 @@ INSERT INTO mig_entity_changes (entityName, entityId, hash, sourceId, isSynced,
SELECT entityName, entityId, '', sourceId, isSynced, 'now', 0 FROM entity_changes;
UPDATE mig_entity_changes SET isErased = (SELECT isErased FROM notes WHERE noteId = entityId) WHERE entityName = 'notes';
UPDATE mig_entity_changes SET utcDateChanged = (SELECT utcDateModified FROM notes WHERE noteId = entityId) WHERE entityName = 'notes';
UPDATE mig_entity_changes SET utcDateChanged = COALESCE((SELECT utcDateModified FROM notes WHERE noteId = entityId), '2020-12-14 14:07:05.165Z') WHERE entityName = 'notes';
UPDATE mig_entity_changes SET isErased = (
SELECT isErased
@ -20,7 +20,7 @@ UPDATE mig_entity_changes SET isErased = (
JOIN notes USING(noteId)
WHERE attributeId = entityId
) WHERE entityName = 'attributes';
UPDATE mig_entity_changes SET utcDateChanged = (SELECT utcDateModified FROM attributes WHERE attributeId = entityId) WHERE entityName = 'attributes';
UPDATE mig_entity_changes SET utcDateChanged = COALESCE((SELECT utcDateModified FROM attributes WHERE attributeId = entityId), '2020-12-14 14:07:05.165Z') WHERE entityName = 'attributes';
UPDATE mig_entity_changes SET isErased = (
SELECT isErased
@ -28,24 +28,24 @@ UPDATE mig_entity_changes SET isErased = (
JOIN notes USING(noteId)
WHERE branchId = entityId
) WHERE entityName = 'branches';
UPDATE mig_entity_changes SET utcDateChanged = (SELECT utcDateModified FROM branches WHERE branchId = entityId) WHERE entityName = 'branches';
UPDATE mig_entity_changes SET utcDateChanged = COALESCE((SELECT utcDateModified FROM branches WHERE branchId = entityId), '2020-12-14 14:07:05.165Z') WHERE entityName = 'branches';
UPDATE mig_entity_changes SET isErased = (
SELECT isErased
FROM note_revisions
WHERE noteRevisionId = entityId
) WHERE entityName = 'note_revisions';
UPDATE mig_entity_changes SET utcDateChanged = (SELECT utcDateModified FROM note_revisions WHERE noteRevisionId = entityId) WHERE entityName = 'note_revisions';
UPDATE mig_entity_changes SET utcDateChanged = COALESCE((SELECT utcDateModified FROM note_revisions WHERE noteRevisionId = entityId), '2020-12-14 14:07:05.165Z') WHERE entityName = 'note_revisions';
UPDATE mig_entity_changes SET utcDateChanged = (SELECT utcDateCreated FROM api_tokens WHERE apiTokenId = entityId) WHERE entityName = 'api_tokens';
UPDATE mig_entity_changes SET utcDateChanged = COALESCE((SELECT utcDateCreated FROM api_tokens WHERE apiTokenId = entityId), '2020-12-14 14:07:05.165Z') WHERE entityName = 'api_tokens';
UPDATE mig_entity_changes SET utcDateChanged = (SELECT utcDateModified FROM note_contents WHERE noteId = entityId) WHERE entityName = 'note_contents';
UPDATE mig_entity_changes SET utcDateChanged = COALESCE((SELECT utcDateModified FROM note_contents WHERE noteId = entityId), '2020-12-14 14:07:05.165Z') WHERE entityName = 'note_contents';
UPDATE mig_entity_changes SET utcDateChanged = (SELECT utcDateModified FROM note_revision_contents WHERE noteRevisionId = entityId) WHERE entityName = 'note_revision_contents';
UPDATE mig_entity_changes SET utcDateChanged = COALESCE((SELECT utcDateModified FROM note_revision_contents WHERE noteRevisionId = entityId), '2020-12-14 14:07:05.165Z') WHERE entityName = 'note_revision_contents';
UPDATE mig_entity_changes SET utcDateChanged = (SELECT utcDateModified FROM options WHERE name = entityId) WHERE entityName = 'options';
UPDATE mig_entity_changes SET utcDateChanged = COALESCE((SELECT utcDateModified FROM options WHERE name = entityId), '2020-12-14 14:07:05.165Z') WHERE entityName = 'options';
UPDATE mig_entity_changes SET utcDateChanged = (SELECT utcDateCreated FROM recent_notes WHERE noteId = entityId) WHERE entityName = 'options';
UPDATE mig_entity_changes SET utcDateChanged = COALESCE((SELECT utcDateCreated FROM recent_notes WHERE noteId = entityId), '2020-12-14 14:07:05.165Z') WHERE entityName = 'options';
DROP TABLE entity_changes;
ALTER TABLE mig_entity_changes RENAME TO entity_changes;

View File

@ -1 +1 @@
UPDATE options SET name = 'eraseNotesAfterTimeInSeconds' WHERE name = 'eraseNotesAfterTimeInSeconds';
UPDATE options SET name = 'eraseEntitiesAfterTimeInSeconds' WHERE name = 'eraseNotesAfterTimeInSeconds';

View File

@ -0,0 +1,25 @@
CREATE TABLE IF NOT EXISTS "mig_attributes"
(
attributeId TEXT not null primary key,
noteId TEXT not null,
type TEXT not null,
name TEXT not null,
value TEXT default '' not null,
position INT default 0 not null,
utcDateModified TEXT not null,
isDeleted INT not null,
`deleteId` TEXT DEFAULT NULL,
isInheritable int DEFAULT 0 NULL);
INSERT INTO mig_attributes (attributeId, noteId, type, name, value, position, utcDateModified, isDeleted, deleteId, isInheritable)
SELECT attributeId, noteId, type, name, value, position, utcDateModified, isDeleted, deleteId, isInheritable FROM attributes;
DROP TABLE attributes;
ALTER TABLE mig_attributes RENAME TO attributes;
CREATE INDEX IDX_attributes_name_value
on attributes (name, value);
CREATE INDEX IDX_attributes_noteId_index
on attributes (noteId);
CREATE INDEX IDX_attributes_value_index
on attributes (value);

View File

@ -18,7 +18,6 @@ const promotedAttributeDefinitionParser = require("../services/promoted_attribut
* @property {boolean} isInheritable - immutable
* @property {boolean} isDeleted
* @property {string|null} deleteId - ID identifying delete transaction
* @property {string} utcDateCreated
* @property {string} utcDateModified
*
* @extends Entity
@ -26,7 +25,7 @@ const promotedAttributeDefinitionParser = require("../services/promoted_attribut
class Attribute extends Entity {
static get entityName() { return "attributes"; }
static get primaryKeyName() { return "attributeId"; }
static get hashedProperties() { return ["attributeId", "noteId", "type", "name", "value", "isInheritable", "isDeleted", "utcDateCreated"]; }
static get hashedProperties() { return ["attributeId", "noteId", "type", "name", "value", "isInheritable", "isDeleted"]; }
constructor(row) {
super(row);
@ -103,10 +102,6 @@ class Attribute extends Entity {
this.isDeleted = false;
}
if (!this.utcDateCreated) {
this.utcDateCreated = dateUtils.utcNowDateTime();
}
super.beforeSaving();
this.utcDateModified = dateUtils.utcNowDateTime();
@ -121,7 +116,6 @@ class Attribute extends Entity {
position: this.position,
isInheritable: isInheritable,
isDeleted: false,
utcDateCreated: this.utcDateCreated,
utcDateModified: this.utcDateModified
});
}

View File

@ -40,6 +40,10 @@ class Entity {
return utils.hash(contentToHash).substr(0, 10);
}
getUtcDateChanged() {
return this.utcDateModified || this.utcDateCreated;
}
get repository() {
if (!repo) {
repo = require('../services/repository');

View File

@ -40,8 +40,8 @@ let consumeQueuePromise = null;
// to make sure each change event is processed only once. Not clear if this is still necessary
const processedEntityChangeIds = new Set();
function logRows(syncRows) {
const filteredRows = syncRows.filter(row =>
function logRows(entityChanges) {
const filteredRows = entityChanges.filter(row =>
!processedEntityChangeIds.has(row.id)
&& row.entityName !== 'recent_notes'
&& (row.entityName !== 'options' || row.entityId !== 'openTabs'));
@ -59,16 +59,16 @@ async function handleMessage(event) {
}
if (message.type === 'sync') {
let syncRows = message.data;
let entityChanges = message.data;
lastPingTs = Date.now();
if (syncRows.length > 0) {
logRows(syncRows);
if (entityChanges.length > 0) {
logRows(entityChanges);
syncDataQueue.push(...syncRows);
syncDataQueue.push(...entityChanges);
// we set lastAcceptedEntityChangeId even before sync processing and send ping so that backend can start sending more updates
lastAcceptedEntityChangeId = Math.max(lastAcceptedEntityChangeId, syncRows[syncRows.length - 1].id);
lastAcceptedEntityChangeId = Math.max(lastAcceptedEntityChangeId, entityChanges[entityChanges.length - 1].id);
sendPing();
// first wait for all the preceding consumers to finish
@ -141,13 +141,13 @@ async function runSafely(syncHandler, syncData) {
async function consumeSyncData() {
if (syncDataQueue.length > 0) {
const allSyncRows = syncDataQueue;
const allEntityChanges = syncDataQueue;
syncDataQueue = [];
const nonProcessedSyncRows = allSyncRows.filter(sync => !processedEntityChangeIds.has(sync.id));
const nonProcessedEntityChanges = allEntityChanges.filter(sync => !processedEntityChangeIds.has(sync.id));
try {
await utils.timeLimit(processSyncRows(nonProcessedSyncRows), 30000);
await utils.timeLimit(processEntityChanges(nonProcessedEntityChanges), 30000);
}
catch (e) {
logError(`Encountered error ${e.message}: ${e.stack}, reloading frontend.`);
@ -158,17 +158,17 @@ async function consumeSyncData() {
utils.reloadApp();
}
else {
console.log("nonProcessedSyncRows causing the timeout", nonProcessedSyncRows);
console.log("nonProcessedEntityChanges causing the timeout", nonProcessedEntityChanges);
alert(`Encountered error "${e.message}", check out the console.`);
}
}
for (const syncRow of nonProcessedSyncRows) {
processedEntityChangeIds.add(syncRow.id);
for (const entityChange of nonProcessedEntityChanges) {
processedEntityChangeIds.add(entityChange.id);
}
lastProcessedEntityChangeId = Math.max(lastProcessedEntityChangeId, allSyncRows[allSyncRows.length - 1].id);
lastProcessedEntityChangeId = Math.max(lastProcessedEntityChangeId, allEntityChanges[allEntityChanges.length - 1].id);
}
checkEntityChangeIdListeners();
@ -229,10 +229,10 @@ subscribeToMessages(message => {
}
});
async function processSyncRows(syncRows) {
async function processEntityChanges(entityChanges) {
const missingNoteIds = [];
for (const {entityName, entity} of syncRows) {
for (const {entityName, entity} of entityChanges) {
if (entityName === 'branches' && !(entity.parentNoteId in treeCache.notes)) {
missingNoteIds.push(entity.parentNoteId);
}
@ -251,7 +251,7 @@ async function processSyncRows(syncRows) {
const loadResults = new LoadResults(treeCache);
for (const sync of syncRows.filter(sync => sync.entityName === 'notes')) {
for (const sync of entityChanges.filter(sync => sync.entityName === 'notes')) {
const note = treeCache.notes[sync.entityId];
if (note) {
@ -260,7 +260,7 @@ async function processSyncRows(syncRows) {
}
}
for (const sync of syncRows.filter(sync => sync.entityName === 'branches')) {
for (const sync of entityChanges.filter(sync => sync.entityName === 'branches')) {
let branch = treeCache.branches[sync.entityId];
const childNote = treeCache.notes[sync.entity.noteId];
const parentNote = treeCache.notes[sync.entity.parentNoteId];
@ -308,7 +308,7 @@ async function processSyncRows(syncRows) {
}
}
for (const sync of syncRows.filter(sync => sync.entityName === 'note_reordering')) {
for (const sync of entityChanges.filter(sync => sync.entityName === 'note_reordering')) {
const parentNoteIdsToSort = new Set();
for (const branchId in sync.positions) {
@ -333,7 +333,7 @@ async function processSyncRows(syncRows) {
}
// missing reloading the relation target note
for (const sync of syncRows.filter(sync => sync.entityName === 'attributes')) {
for (const sync of entityChanges.filter(sync => sync.entityName === 'attributes')) {
let attribute = treeCache.attributes[sync.entityId];
const sourceNote = treeCache.notes[sync.entity.noteId];
const targetNote = sync.entity.type === 'relation' && treeCache.notes[sync.entity.value];
@ -371,17 +371,17 @@ async function processSyncRows(syncRows) {
}
}
for (const sync of syncRows.filter(sync => sync.entityName === 'note_contents')) {
for (const sync of entityChanges.filter(sync => sync.entityName === 'note_contents')) {
delete treeCache.noteComplementPromises[sync.entityId];
loadResults.addNoteContent(sync.entityId, sync.sourceId);
}
for (const sync of syncRows.filter(sync => sync.entityName === 'note_revisions')) {
for (const sync of entityChanges.filter(sync => sync.entityName === 'note_revisions')) {
loadResults.addNoteRevision(sync.entityId, sync.noteId, sync.sourceId);
}
for (const sync of syncRows.filter(sync => sync.entityName === 'options')) {
for (const sync of entityChanges.filter(sync => sync.entityName === 'options')) {
if (sync.entity.name === 'openTabs') {
continue; // only noise
}

View File

@ -4,7 +4,7 @@ const build = require('./build');
const packageJson = require('../../package');
const {TRILIUM_DATA_DIR} = require('./data_dir');
const APP_DB_VERSION = 175;
const APP_DB_VERSION = 176;
const SYNC_VERSION = 17;
const CLIPPER_PROTOCOL_VERSION = "1.0";

View File

@ -44,20 +44,20 @@ function isEntityEventsDisabled() {
return !!namespace.get('disableEntityEvents');
}
function getAndClearSyncRows() {
const syncRows = namespace.get('syncRows') || [];
function getAndClearEntityChanges() {
const entityChanges = namespace.get('entityChanges') || [];
namespace.set('syncRows', []);
namespace.set('entityChanges', []);
return syncRows;
return entityChanges;
}
function addSyncRow(syncRow) {
const syncRows = namespace.get('syncRows') || [];
function addEntityChange(entityChange) {
const entityChanges = namespace.get('entityChanges') || [];
syncRows.push(syncRow);
entityChanges.push(entityChange);
namespace.set('syncRows', syncRows);
namespace.set('entityChanges', entityChanges);
}
function reset() {
@ -84,8 +84,8 @@ module.exports = {
disableEntityEvents,
isEntityEventsDisabled,
reset,
getAndClearSyncRows,
addSyncRow,
getAndClearEntityChanges,
addEntityChange,
getEntityFromCache,
setEntityToCache
};

View File

@ -6,13 +6,15 @@ const cls = require('./cls');
let maxEntityChangeId = 0;
function insertEntityChange(entityName, entityId, hash, sourceId = null, isSynced = true) {
function insertEntityChange(entityName, entityId, hash, isErased, utcDateChanged, sourceId = null, isSynced = true) {
const entityChange = {
entityName: entityName,
entityId: entityId,
hash: hash,
sourceId: sourceId || cls.getSourceId() || sourceIdService.getCurrentSourceId(),
isSynced: isSynced ? 1 : 0
isSynced: isSynced ? 1 : 0,
isErased: isErased ? 1 : 0,
utcDateChanged: utcDateChanged
};
entityChange.id = sql.replace("entity_changes", entityChange);
@ -23,9 +25,9 @@ function insertEntityChange(entityName, entityId, hash, sourceId = null, isSynce
}
function addEntityChange(entityChange, sourceId, isSynced) {
const sync = insertEntityChange(entityChange.entityName, entityChange.entityId, entityChange.hash, sourceId, isSynced);
const localEntityChange = insertEntityChange(entityChange.entityName, entityChange.entityId, entityChange.hash, entityChange.isErased, entityChange.utcDateChanged, sourceId, isSynced);
cls.addSyncRow(sync);
cls.addEntityChange(localEntityChange);
}
function moveEntityChangeToTop(entityName, entityId) {
@ -48,14 +50,14 @@ function addEntityChangesForSector(entityName, entityPrimaryKey, sector) {
continue
}
insertEntityChange(entityName, entityId, entity.generateHash(), 'content-check', true);
insertEntityChange(entityName, entityId, entity.generateHash(), false, entity.getUtcDateChanged(), 'content-check', true);
}
});
log.info(`Added sector ${sector} of ${entityName} to sync queue in ${Date.now() - startTime}ms.`);
}
function cleanupSyncRowsForMissingEntities(entityName, entityPrimaryKey) {
function cleanupEntityChangesForMissingEntities(entityName, entityPrimaryKey) {
sql.execute(`
DELETE
FROM entity_changes
@ -67,7 +69,7 @@ function cleanupSyncRowsForMissingEntities(entityName, entityPrimaryKey) {
function fillEntityChanges(entityName, entityPrimaryKey, condition = '') {
try {
cleanupSyncRowsForMissingEntities(entityName, entityPrimaryKey);
cleanupEntityChangesForMissingEntities(entityName, entityPrimaryKey);
const entityIds = sql.getColumn(`SELECT ${entityPrimaryKey} FROM ${entityName}`
+ (condition ? ` WHERE ${condition}` : ''));

View File

@ -111,11 +111,17 @@ function updateEntity(entity) {
sql.transactional(() => {
sql.upsert(entityName, primaryKeyName, clone);
const primaryKey = entity[primaryKeyName];
const entityId = entity[primaryKeyName];
const isSynced = entityName !== 'options' || entity.isSynced;
entityChangesService.addEntityChange(entityName, primaryKey, entity.generateHash(), null, isSynced);
entityChangesService.addEntityChange({
entityName,
entityId,
hash: entity.generateHash(),
isErased: false,
utcDateChanged: entity.getUtcDateChanged()
}, null, isSynced);
if (!cls.isEntityEventsDisabled()) {
const eventPayload = {

View File

@ -95,8 +95,8 @@ function fillInAdditionalProperties(sync) {
}
}
function sendPing(client, syncRows = []) {
for (const sync of syncRows) {
function sendPing(client, entityChanges = []) {
for (const sync of entityChanges) {
try {
fillInAdditionalProperties(sync);
}
@ -110,16 +110,16 @@ function sendPing(client, syncRows = []) {
sendMessage(client, {
type: 'sync',
data: syncRows
data: entityChanges
});
}
function sendTransactionSyncsToAllClients() {
if (webSocketServer) {
const syncRows = cls.getAndClearSyncRows();
const entityChanges = cls.getAndClearEntityChanges();
webSocketServer.clients.forEach(function each(client) {
sendPing(client, syncRows);
sendPing(client, entityChanges);
});
}
}