opening transactions only on write operations which enforces exclusive lock only there to improve concurrency, custom handling of sync request timeouts, #1093, #1018

This commit is contained in:
zadam 2020-06-13 10:23:36 +02:00
parent d09b021487
commit 5d47c2b23e
10 changed files with 95 additions and 58 deletions

View File

@ -8,8 +8,8 @@ async function syncNow() {
toastService.showMessage("Sync finished successfully."); toastService.showMessage("Sync finished successfully.");
} }
else { else {
if (result.message.length > 50) { if (result.message.length > 100) {
result.message = result.message.substr(0, 50); result.message = result.message.substr(0, 100);
} }
toastService.showError("Sync failed: " + result.message); toastService.showError("Sync failed: " + result.message);

View File

@ -316,11 +316,11 @@ function dynamicRequire(moduleName) {
} }
} }
function timeLimit(cb, limitMs) { function timeLimit(promise, limitMs) {
return new Promise((res, rej) => { return new Promise((res, rej) => {
let resolved = false; let resolved = false;
cb().then(() => { promise.then(() => {
resolved = true; resolved = true;
res(); res();
@ -328,7 +328,7 @@ function timeLimit(cb, limitMs) {
setTimeout(() => { setTimeout(() => {
if (!resolved) { if (!resolved) {
rej('Process exceeded time limit ' + limitMs); rej(new Error('Process exceeded time limit ' + limitMs));
} }
}, limitMs); }, limitMs);
}); });

View File

@ -157,7 +157,7 @@ async function consumeSyncData() {
const nonProcessedSyncRows = allSyncRows.filter(sync => !processedSyncIds.has(sync.id)); const nonProcessedSyncRows = allSyncRows.filter(sync => !processedSyncIds.has(sync.id));
try { try {
await utils.timeLimit(async () => await processSyncRows(nonProcessedSyncRows), 5000); await utils.timeLimit(processSyncRows(nonProcessedSyncRows), 5000);
} }
catch (e) { catch (e) {
logError(`Encountered error ${e.message}: ${e.stack}, reloading frontend.`); logError(`Encountered error ${e.message}: ${e.stack}, reloading frontend.`);

View File

@ -35,7 +35,7 @@ const TPL = `
<a class="dropdown-item sync-now-button" title="Trigger sync"> <a class="dropdown-item sync-now-button" title="Trigger sync">
<span class="bx bx-refresh"></span> <span class="bx bx-refresh"></span>
Sync (<span id="outstanding-syncs-count">0</span>) Sync now (<span id="outstanding-syncs-count">0</span>)
</a> </a>
<a class="dropdown-item" data-trigger-command="openNewWindow"> <a class="dropdown-item" data-trigger-command="openNewWindow">

View File

@ -55,6 +55,8 @@ async function checkSync() {
} }
async function syncNow() { async function syncNow() {
log.info("Received request to trigger sync now.");
return await syncService.sync(); return await syncService.sync();
} }

View File

@ -40,7 +40,7 @@ function exec(opts) {
host: parsedTargetUrl.hostname, host: parsedTargetUrl.hostname,
port: parsedTargetUrl.port, port: parsedTargetUrl.port,
path: parsedTargetUrl.path, path: parsedTargetUrl.path,
timeout: opts.timeout, timeout: opts.timeout, // works only for node.js client
headers, headers,
agent: proxyAgent agent: proxyAgent
}); });
@ -104,7 +104,7 @@ async function getImage(imageUrl) {
host: parsedTargetUrl.hostname, host: parsedTargetUrl.hostname,
port: parsedTargetUrl.port, port: parsedTargetUrl.port,
path: parsedTargetUrl.path, path: parsedTargetUrl.path,
timeout: opts.timeout, timeout: opts.timeout, // works only for node client
headers: {}, headers: {},
agent: proxyAgent agent: proxyAgent
}); });

View File

@ -6,6 +6,7 @@ const optionService = require('./options');
const syncOptions = require('./sync_options'); const syncOptions = require('./sync_options');
const request = require('./request'); const request = require('./request');
const appInfo = require('./app_info'); const appInfo = require('./app_info');
const utils = require('./utils');
async function hasSyncServerSchemaAndSeed() { async function hasSyncServerSchemaAndSeed() {
const response = await requestToSyncServer('GET', '/api/setup/status'); const response = await requestToSyncServer('GET', '/api/setup/status');
@ -43,13 +44,15 @@ async function sendSeedToSyncServer() {
} }
async function requestToSyncServer(method, path, body = null) { async function requestToSyncServer(method, path, body = null) {
return await request.exec({ const timeout = await syncOptions.getSyncTimeout();
return utils.timeLimit(request.exec({
method, method,
url: await syncOptions.getSyncServerHost() + path, url: await syncOptions.getSyncServerHost() + path,
body, body,
proxy: await syncOptions.getSyncProxy(), proxy: await syncOptions.getSyncProxy(),
timeout: await syncOptions.getSyncTimeout() timeout: timeout
}); }), timeout);
} }
async function setupSyncFromSyncServer(syncServerHost, syncProxy, username, password) { async function setupSyncFromSyncServer(syncServerHost, syncProxy, username, password) {

View File

@ -64,15 +64,15 @@ async function upsert(tableName, primaryKey, rec) {
} }
async function beginTransaction() { async function beginTransaction() {
return await execute("BEGIN"); return await dbConnection.run("BEGIN");
} }
async function commit() { async function commit() {
return await execute("COMMIT"); return await dbConnection.run("COMMIT");
} }
async function rollback() { async function rollback() {
return await execute("ROLLBACK"); return await dbConnection.run("ROLLBACK");
} }
async function getRow(query, params = []) { async function getRow(query, params = []) {
@ -150,6 +150,8 @@ async function getColumn(query, params = []) {
} }
async function execute(query, params = []) { async function execute(query, params = []) {
await startTransactionIfNecessary();
return await wrap(async db => db.run(query, ...params), query); return await wrap(async db => db.run(query, ...params), query);
} }
@ -158,11 +160,15 @@ async function executeNoWrap(query, params = []) {
} }
async function executeMany(query, params) { async function executeMany(query, params) {
await startTransactionIfNecessary();
// essentially just alias // essentially just alias
await getManyRows(query, params); await getManyRows(query, params);
} }
async function executeScript(query) { async function executeScript(query) {
await startTransactionIfNecessary();
return await wrap(async db => db.exec(query), query); return await wrap(async db => db.exec(query), query);
} }
@ -199,61 +205,65 @@ async function wrap(func, query) {
} }
} }
// true if transaction is active globally.
// cls.namespace.get('isTransactional') OTOH indicates active transaction in active CLS
let transactionActive = false; let transactionActive = false;
// resolves when current transaction ends with either COMMIT or ROLLBACK
let transactionPromise = null; let transactionPromise = null;
let transactionPromiseResolve = null;
async function transactional(func) { async function startTransactionIfNecessary() {
if (cls.namespace.get('isInTransaction')) { if (!cls.namespace.get('isTransactional')
return await func(); || cls.namespace.get('isInTransaction')) {
return;
} }
while (transactionActive) { while (transactionActive) {
await transactionPromise; await transactionPromise;
} }
let ret = null;
const thisError = new Error(); // to capture correct stack trace in case of exception
transactionActive = true;
transactionPromise = new Promise(async (resolve, reject) => {
try {
await beginTransaction(); await beginTransaction();
cls.namespace.set('isInTransaction', true); cls.namespace.set('isInTransaction', true);
transactionActive = true;
transactionPromise = new Promise(res => transactionPromiseResolve = res);
}
ret = await func(); async function transactional(func) {
// if the CLS is already transactional then the whole transaction is handled by higher level transactional() call
if (cls.namespace.get('isTransactional')) {
return await func();
}
cls.namespace.set('isTransactional', true); // we will need a transaction if there's a write operation
try {
const ret = await func();
if (cls.namespace.get('isInTransaction')) {
await commit(); await commit();
// note that sync rows sent from this action will be sent again by scheduled periodic ping // note that sync rows sent from this action will be sent again by scheduled periodic ping
require('./ws.js').sendPingToAllClients(); require('./ws.js').sendPingToAllClients();
transactionActive = false; transactionActive = false;
resolve();
setTimeout(() => require('./ws').sendPingToAllClients(), 50);
}
catch (e) {
if (transactionActive) {
log.error("Error executing transaction, executing rollback. Inner stack: " + e.stack + "\nOutside stack: " + thisError.stack);
await rollback();
transactionActive = false;
}
reject(e);
}
finally {
cls.namespace.set('isInTransaction', false); cls.namespace.set('isInTransaction', false);
} transactionPromiseResolve();
});
if (transactionActive) {
await transactionPromise;
} }
return ret; return ret;
}
catch (e) {
if (transactionActive) {
await rollback();
transactionActive = false;
cls.namespace.set('isInTransaction', false);
// resolving since this is just semaphore for allowing another write transaction to proceed
transactionPromiseResolve();
}
throw e;
}
} }
module.exports = { module.exports = {

View File

@ -70,7 +70,7 @@ async function sync() {
}; };
} }
else { else {
log.info("sync failed: " + e.message + e.stack); log.info("sync failed: " + e.message + "\nstack: " + e.stack);
return { return {
success: false, success: false,
@ -97,7 +97,6 @@ async function doLogin() {
const hash = utils.hmac(documentSecret, timestamp); const hash = utils.hmac(documentSecret, timestamp);
const syncContext = { cookieJar: {} }; const syncContext = { cookieJar: {} };
const resp = await syncRequest(syncContext, 'POST', '/api/login/sync', { const resp = await syncRequest(syncContext, 'POST', '/api/login/sync', {
timestamp: timestamp, timestamp: timestamp,
syncVersion: appInfo.syncVersion, syncVersion: appInfo.syncVersion,
@ -259,14 +258,18 @@ async function checkContentHash(syncContext) {
} }
async function syncRequest(syncContext, method, requestPath, body) { async function syncRequest(syncContext, method, requestPath, body) {
return await request.exec({ const timeout = await syncOptions.getSyncTimeout();
const opts = {
method, method,
url: await syncOptions.getSyncServerHost() + requestPath, url: await syncOptions.getSyncServerHost() + requestPath,
cookieJar: syncContext.cookieJar, cookieJar: syncContext.cookieJar,
timeout: await syncOptions.getSyncTimeout(), timeout: timeout,
body, body,
proxy: proxyToggle ? await syncOptions.getSyncProxy() : null proxy: proxyToggle ? await syncOptions.getSyncProxy() : null
}); };
return await utils.timeLimit(request.exec(opts), timeout);
} }
const primaryKeys = { const primaryKeys = {

View File

@ -217,6 +217,24 @@ function formatDownloadTitle(filename, type, mime) {
} }
} }
function timeLimit(promise, limitMs) {
return new Promise((res, rej) => {
let resolved = false;
promise.then(() => {
resolved = true;
res();
});
setTimeout(() => {
if (!resolved) {
rej(new Error('Process exceeded time limit ' + limitMs));
}
}, limitMs);
});
}
module.exports = { module.exports = {
randomSecureToken, randomSecureToken,
randomString, randomString,
@ -245,5 +263,6 @@ module.exports = {
isStringNote, isStringNote,
quoteRegex, quoteRegex,
replaceAll, replaceAll,
formatDownloadTitle formatDownloadTitle,
timeLimit
}; };