transaction handling fixes

This commit is contained in:
zadam 2020-06-14 00:35:53 +02:00
parent 0df7851214
commit e0b4b369dc
4 changed files with 21 additions and 22 deletions

View File

@ -47,7 +47,7 @@ async function copyFile(backupFile) {
for (; attemptCount < COPY_ATTEMPT_COUNT && !success; attemptCount++) { for (; attemptCount < COPY_ATTEMPT_COUNT && !success; attemptCount++) {
try { try {
await sql.executeNoWrap(`VACUUM INTO '${backupFile}'`); await sql.executeWithoutTransaction(`VACUUM INTO '${backupFile}'`);
success = true; success = true;
} catch (e) { } catch (e) {

View File

@ -140,10 +140,6 @@ async function updateEntity(entity) {
await eventService.emit(entity.isDeleted ? eventService.ENTITY_DELETED : eventService.ENTITY_CHANGED, eventPayload); await eventService.emit(entity.isDeleted ? eventService.ENTITY_DELETED : eventService.ENTITY_CHANGED, eventPayload);
} }
} }
if (entity.afterSaving) {
await entity.afterSaving();
}
}); });
} }

View File

@ -155,7 +155,7 @@ async function execute(query, params = []) {
return await wrap(async db => db.run(query, ...params), query); return await wrap(async db => db.run(query, ...params), query);
} }
async function executeNoWrap(query, params = []) { async function executeWithoutTransaction(query, params = []) {
await dbConnection.run(query, ...params); await dbConnection.run(query, ...params);
} }
@ -222,10 +222,12 @@ async function startTransactionIfNecessary() {
await transactionPromise; await transactionPromise;
} }
await beginTransaction(); // first set semaphore (atomic operation and only then start transaction
cls.namespace.set('isInTransaction', true);
transactionActive = true; transactionActive = true;
transactionPromise = new Promise(res => transactionPromiseResolve = res); transactionPromise = new Promise(res => transactionPromiseResolve = res);
cls.namespace.set('isInTransaction', true);
await beginTransaction();
} }
async function transactional(func) { async function transactional(func) {
@ -234,7 +236,7 @@ async function transactional(func) {
return await func(); return await func();
} }
cls.namespace.set('isTransactional', true); // we will need a transaction if there's a write operation cls.namespace.set('isTransactional', true); // this signals that transaction will be needed if there's a write operation
try { try {
const ret = await func(); const ret = await func();
@ -244,26 +246,27 @@ async function transactional(func) {
// note that sync rows sent from this action will be sent again by scheduled periodic ping // note that sync rows sent from this action will be sent again by scheduled periodic ping
require('./ws.js').sendPingToAllClients(); require('./ws.js').sendPingToAllClients();
transactionActive = false;
cls.namespace.set('isInTransaction', false);
transactionPromiseResolve();
} }
return ret; return ret;
} }
catch (e) { catch (e) {
if (transactionActive) { if (cls.namespace.get('isInTransaction')) {
await rollback(); await rollback();
transactionActive = false;
cls.namespace.set('isInTransaction', false);
// resolving since this is just semaphore for allowing another write transaction to proceed
transactionPromiseResolve();
} }
throw e; throw e;
} }
finally {
cls.namespace.set('isTransactional', false);
if (cls.namespace.get('isInTransaction')) {
transactionActive = false;
cls.namespace.set('isInTransaction', false);
// resolving even for rollback since this is just semaphore for allowing another write transaction to proceed
transactionPromiseResolve();
}
}
} }
module.exports = { module.exports = {
@ -278,7 +281,7 @@ module.exports = {
getMap, getMap,
getColumn, getColumn,
execute, execute,
executeNoWrap, executeWithoutTransaction,
executeMany, executeMany,
executeScript, executeScript,
transactional, transactional,

View File

@ -372,7 +372,7 @@ sqlInit.dbReady.then(async () => {
setInterval(cls.wrap(sync), 60000); setInterval(cls.wrap(sync), 60000);
// kickoff initial sync immediately // kickoff initial sync immediately
setTimeout(cls.wrap(sync), 1000); setTimeout(cls.wrap(sync), 3000);
setInterval(cls.wrap(updatePushStats), 1000); setInterval(cls.wrap(updatePushStats), 1000);
}); });