chore(core): basic integration of SQL + CLS + log

This commit is contained in:
Elian Doran 2026-01-05 15:45:45 +02:00
parent 62803a1817
commit ea31d2f446
No known key found for this signature in database
12 changed files with 526 additions and 503 deletions

View File

@ -0,0 +1,24 @@
import { ExecutionContext } from "@triliumnext/core";
import clsHooked from "cls-hooked";
const namespace = clsHooked.createNamespace("trilium");
export default class ClsHookedExecutionContext implements ExecutionContext {
get<T = any>(key: string): T | undefined {
return namespace.get(key);
}
set(key: string, value: any): void {
namespace.set(key, value);
}
reset(): void {
clsHooked.reset();
}
init<T>(callback: () => T): T {
return namespace.runAndReturn(callback);
}
}

View File

@ -3,10 +3,41 @@
* are loaded later and will result in an empty string.
*/
import { initializeCore } from "@triliumnext/core";
import { initializeTranslations } from "./services/i18n.js";
import BetterSqlite3Provider from "./sql_provider.js";
async function startApplication() {
await initializeTranslations();
const config = (await import("./services/config.js")).default;
initializeCore({
dbConfig: {
provider: new BetterSqlite3Provider(),
isReadOnly: config.General.readOnly,
async onTransactionCommit() {
const ws = (await import("./services/ws.js")).default;
ws.sendTransactionEntityChangesToAllClients();
},
async onTransactionRollback() {
const cls = (await import("./services/cls.js")).default;
const becca_loader = (await import("./becca/becca_loader.js")).default;
const entity_changes = (await import("./services/entity_changes.js")).default;
const log = (await import("./services/log")).default;
const entityChangeIds = cls.getAndClearEntityChangeIds();
if (entityChangeIds.length > 0) {
log.info("Transaction rollback dirtied the becca, forcing reload.");
becca_loader.load();
}
// the maxEntityChangeId has been incremented during failed transaction, need to recalculate
entity_changes.recalculateMaxEntityChangeId();
}
}
});
const startTriliumServer = (await import("./www.js")).default;
await startTriliumServer();
}

View File

@ -1,11 +1,10 @@
import clsHooked from "cls-hooked";
import type { EntityChange } from "@triliumnext/commons";
const namespace = clsHooked.createNamespace("trilium");
import { getContext } from "@triliumnext/core/src/services/context";
type Callback = (...args: any[]) => any;
function init(callback: Callback) {
return namespace.runAndReturn(callback);
function init(callback: () => void) {
getContext().init(callback);
}
function wrap(callback: Callback) {
@ -18,81 +17,73 @@ function wrap(callback: Callback) {
};
}
function get(key: string) {
return namespace.get(key);
}
function set(key: string, value: any) {
namespace.set(key, value);
}
function getHoistedNoteId() {
return namespace.get("hoistedNoteId") || "root";
return getContext().get("hoistedNoteId") || "root";
}
function getComponentId() {
return namespace.get("componentId");
return getContext().get("componentId");
}
function getLocalNowDateTime() {
return namespace.get("localNowDateTime");
return getContext().get("localNowDateTime");
}
function disableEntityEvents() {
namespace.set("disableEntityEvents", true);
getContext().set("disableEntityEvents", true);
}
function enableEntityEvents() {
namespace.set("disableEntityEvents", false);
getContext().set("disableEntityEvents", false);
}
function isEntityEventsDisabled() {
return !!namespace.get("disableEntityEvents");
return !!getContext().get("disableEntityEvents");
}
function setMigrationRunning(running: boolean) {
namespace.set("migrationRunning", !!running);
getContext().set("migrationRunning", !!running);
}
function isMigrationRunning() {
return !!namespace.get("migrationRunning");
}
function disableSlowQueryLogging(disable: boolean) {
namespace.set("disableSlowQueryLogging", disable);
}
function isSlowQueryLoggingDisabled() {
return !!namespace.get("disableSlowQueryLogging");
return !!getContext().get("migrationRunning");
}
function getAndClearEntityChangeIds() {
const entityChangeIds = namespace.get("entityChangeIds") || [];
const entityChangeIds = getContext().get("entityChangeIds") || [];
namespace.set("entityChangeIds", []);
getContext().set("entityChangeIds", []);
return entityChangeIds;
}
function putEntityChange(entityChange: EntityChange) {
if (namespace.get("ignoreEntityChangeIds")) {
if (getContext().get("ignoreEntityChangeIds")) {
return;
}
const entityChangeIds = namespace.get("entityChangeIds") || [];
const entityChangeIds = getContext().get("entityChangeIds") || [];
// store only ID since the record can be modified (e.g., in erase)
entityChangeIds.push(entityChange.id);
namespace.set("entityChangeIds", entityChangeIds);
}
function reset() {
clsHooked.reset();
getContext().set("entityChangeIds", entityChangeIds);
}
function ignoreEntityChangeIds() {
namespace.set("ignoreEntityChangeIds", true);
getContext().set("ignoreEntityChangeIds", true);
}
function get(key: string) {
return getContext().get(key);
}
function set(key: string, value: unknown) {
getContext().set(key, value);
}
function reset() {
getContext().reset();
}
export default {
@ -100,7 +91,6 @@ export default {
wrap,
get,
set,
namespace,
getHoistedNoteId,
getComponentId,
getLocalNowDateTime,
@ -111,8 +101,6 @@ export default {
getAndClearEntityChangeIds,
putEntityChange,
ignoreEntityChangeIds,
disableSlowQueryLogging,
isSlowQueryLoggingDisabled,
setMigrationRunning,
isMigrationRunning
};

View File

@ -1,12 +1,12 @@
"use strict";
import { getLog } from "@triliumnext/core/src/services/log.js";
import type { Request, Response } from "express";
import fs from "fs";
import path from "path";
import { EOL } from "os";
import dataDir from "./data_dir.js";
import path from "path";
import cls from "./cls.js";
import config, { LOGGING_DEFAULT_RETENTION_DAYS } from "./config.js";
import dataDir from "./data_dir.js";
if (!fs.existsSync(dataDir.LOG_DIR)) {
fs.mkdirSync(dataDir.LOG_DIR, 0o700);
@ -40,7 +40,7 @@ async function cleanupOldLogFiles() {
retentionDays = customRetentionDays;
} else if (customRetentionDays <= -1){
info(`Log cleanup: keeping all log files, as specified by configuration.`);
return
return;
}
const cutoffDate = new Date();
@ -150,11 +150,11 @@ function log(str: string | Error) {
}
function info(message: string | Error) {
log(message);
getLog().info(message);
}
function error(message: string | Error | unknown) {
log(`ERROR: ${message}`);
getLog().error(message);
}
const requestBlacklist = ["/app", "/images", "/stylesheets", "/api/recent-notes"];
@ -170,7 +170,7 @@ function request(req: Request, res: Response, timeMs: number, responseLength: nu
return;
}
info((timeMs >= 10 ? "Slow " : "") + `${res.statusCode} ${req.method} ${req.url} with ${responseLength} bytes took ${timeMs}ms`);
info(`${timeMs >= 10 ? "Slow " : "" }${res.statusCode} ${req.method} ${req.url} with ${responseLength} bytes took ${timeMs}ms`);
}
function pad(num: number) {
@ -184,9 +184,9 @@ function padMilli(num: number) {
return `00${num}`;
} else if (num < 100) {
return `0${num}`;
} else {
return num.toString();
}
return num.toString();
}
function formatTime(millisSinceMidnight: number) {

View File

@ -1,443 +1,3 @@
import fs from "fs";
import { getSql } from "@triliumnext/core";
import becca_loader from "../becca/becca_loader.js";
import cls from "./cls.js";
import config from "./config.js";
import dataDir from "./data_dir.js";
import entity_changes from "./entity_changes.js";
import log from "./log.js";
import BetterSqlite3Provider from "./sql_nodejs.js";
import ws from "./ws.js";
type Params = any;
export interface Statement {
run(...params: Params): RunResult;
get(params: Params): unknown;
all(...params: Params): unknown[];
iterate(...params: Params): IterableIterator<unknown>;
raw(toggleState?: boolean): this;
pluck(toggleState?: boolean): this;
}
export interface Transaction {
deferred(): void;
}
export interface RunResult {
changes: number;
lastInsertRowid: number | bigint;
}
export interface DatabaseProvider {
loadFromFile(path: string, isReadOnly: boolean): void;
loadFromMemory(): void;
loadFromBuffer(buffer: NonSharedBuffer): void;
backup(destinationFile: string): void;
prepare(query: string): Statement;
transaction<T>(func: (statement: Statement) => T): Transaction;
get inTransaction(): boolean;
exec(query: string): void;
close(): void;
}
const dbConnection: DatabaseProvider = new BetterSqlite3Provider();
let statementCache: Record<string, Statement> = {};
function buildDatabase() {
// for integration tests, ignore the config's readOnly setting
if (process.env.TRILIUM_INTEGRATION_TEST === "memory") {
buildIntegrationTestDatabase();
} else if (process.env.TRILIUM_INTEGRATION_TEST === "memory-no-store") {
dbConnection.loadFromMemory();
}
dbConnection.loadFromFile(dataDir.DOCUMENT_PATH, config.General.readOnly);
}
buildDatabase();
function buildIntegrationTestDatabase(dbPath?: string) {
const buffer = fs.readFileSync(dbPath ?? dataDir.DOCUMENT_PATH);
dbConnection.loadFromBuffer(buffer);
}
function rebuildIntegrationTestDatabase(dbPath?: string) {
if (dbConnection) {
dbConnection.close();
}
// This allows a database that is read normally but is kept in memory and discards all modifications.
buildIntegrationTestDatabase(dbPath);
statementCache = {};
}
const LOG_ALL_QUERIES = false;
function insert<T extends {}>(tableName: string, rec: T, replace = false) {
const keys = Object.keys(rec || {});
if (keys.length === 0) {
log.error(`Can't insert empty object into table ${tableName}`);
return;
}
const columns = keys.join(", ");
const questionMarks = keys.map((p) => "?").join(", ");
const query = `INSERT
${replace ? "OR REPLACE" : ""} INTO
${tableName}
(
${columns}
)
VALUES (${questionMarks})`;
const res = execute(query, Object.values(rec));
return res ? res.lastInsertRowid : null;
}
function replace<T extends {}>(tableName: string, rec: T): number | null {
return insert(tableName, rec, true) as number | null;
}
function upsert<T extends {}>(tableName: string, primaryKey: string, rec: T) {
const keys = Object.keys(rec || {});
if (keys.length === 0) {
log.error(`Can't upsert empty object into table ${tableName}`);
return;
}
const columns = keys.join(", ");
const questionMarks = keys.map((colName) => `@${colName}`).join(", ");
const updateMarks = keys.map((colName) => `${colName} = @${colName}`).join(", ");
const query = `INSERT INTO ${tableName} (${columns}) VALUES (${questionMarks})
ON CONFLICT (${primaryKey}) DO UPDATE SET ${updateMarks}`;
for (const idx in rec) {
if (rec[idx] === true || rec[idx] === false) {
(rec as any)[idx] = rec[idx] ? 1 : 0;
}
}
execute(query, rec);
}
/**
* For the given SQL query, returns a prepared statement. For the same query (string comparison), the same statement is returned.
*
* @param sql the SQL query for which to return a prepared statement.
* @param isRaw indicates whether `.raw()` is going to be called on the prepared statement in order to return the raw rows (e.g. via {@link getRawRows()}). The reason is that the raw state is preserved in the saved statement and would break non-raw calls for the same query.
* @returns the corresponding {@link Statement}.
*/
function stmt(sql: string, isRaw?: boolean) {
const key = (isRaw ? `raw/${sql}` : sql);
if (!(key in statementCache)) {
statementCache[key] = dbConnection.prepare(sql);
}
return statementCache[key];
}
function getRow<T>(query: string, params: Params = []): T {
return wrap(query, (s) => s.get(params)) as T;
}
function getRowOrNull<T>(query: string, params: Params = []): T | null {
const all = getRows(query, params);
if (!all) {
return null;
}
return (all.length > 0 ? all[0] : null) as T | null;
}
function getValue<T>(query: string, params: Params = []): T {
return wrap(query, (s) => s.pluck().get(params)) as T;
}
// smaller values can result in better performance due to better usage of statement cache
const PARAM_LIMIT = 100;
function getManyRows<T>(query: string, params: Params): T[] {
let results: unknown[] = [];
while (params.length > 0) {
const curParams = params.slice(0, Math.min(params.length, PARAM_LIMIT));
params = params.slice(curParams.length);
const curParamsObj: Record<string, any> = {};
let j = 1;
for (const param of curParams) {
curParamsObj[`param${j++}`] = param;
}
let i = 1;
const questionMarks = curParams.map(() => `:param${i++}`).join(",");
const curQuery = query.replace(/\?\?\?/g, questionMarks);
const statement = curParams.length === PARAM_LIMIT ? stmt(curQuery) : dbConnection.prepare(curQuery);
const subResults = statement.all(curParamsObj);
results = results.concat(subResults);
}
return (results as T[] | null) || [];
}
function getRows<T>(query: string, params: Params = []): T[] {
return wrap(query, (s) => s.all(params)) as T[];
}
function getRawRows<T extends {} | unknown[]>(query: string, params: Params = []): T[] {
return (wrap(query, (s) => s.raw().all(params), true) as T[]) || [];
}
function iterateRows<T>(query: string, params: Params = []): IterableIterator<T> {
if (LOG_ALL_QUERIES) {
console.log(query);
}
return stmt(query).iterate(params) as IterableIterator<T>;
}
function getMap<K extends string | number | symbol, V>(query: string, params: Params = []) {
const map: Record<K, V> = {} as Record<K, V>;
const results = getRawRows<[K, V]>(query, params);
for (const row of results || []) {
map[row[0]] = row[1];
}
return map;
}
function getColumn<T>(query: string, params: Params = []): T[] {
return wrap(query, (s) => s.pluck().all(params)) as T[];
}
function execute(query: string, params: Params = []): RunResult {
if (config.General.readOnly && (query.startsWith("UPDATE") || query.startsWith("INSERT") || query.startsWith("DELETE"))) {
log.error(`read-only DB ignored: ${query} with parameters ${JSON.stringify(params)}`);
return {
changes: 0,
lastInsertRowid: 0
};
}
return wrap(query, (s) => s.run(params)) as RunResult;
}
function executeMany(query: string, params: Params) {
if (LOG_ALL_QUERIES) {
console.log(query);
}
while (params.length > 0) {
const curParams = params.slice(0, Math.min(params.length, PARAM_LIMIT));
params = params.slice(curParams.length);
const curParamsObj: Record<string, any> = {};
let j = 1;
for (const param of curParams) {
curParamsObj[`param${j++}`] = param;
}
let i = 1;
const questionMarks = curParams.map(() => `:param${i++}`).join(",");
const curQuery = query.replace(/\?\?\?/g, questionMarks);
dbConnection.prepare(curQuery).run(curParamsObj);
}
}
function executeScript(query: string) {
if (LOG_ALL_QUERIES) {
console.log(query);
}
dbConnection.exec(query);
}
/**
* @param isRaw indicates whether `.raw()` is going to be called on the prepared statement in order to return the raw rows (e.g. via {@link getRawRows()}). The reason is that the raw state is preserved in the saved statement and would break non-raw calls for the same query.
*/
function wrap(query: string, func: (statement: Statement) => unknown, isRaw?: boolean): unknown {
const startTimestamp = Date.now();
let result;
if (LOG_ALL_QUERIES) {
console.log(query);
}
try {
result = func(stmt(query, isRaw));
} catch (e: any) {
if (e.message.includes("The database connection is not open")) {
// this often happens on killing the app which puts these alerts in front of user
// in these cases error should be simply ignored.
console.log(e.message);
return null;
}
throw e;
}
const milliseconds = Date.now() - startTimestamp;
if (milliseconds >= 20 && !cls.isSlowQueryLoggingDisabled()) {
if (query.includes("WITH RECURSIVE")) {
log.info(`Slow recursive query took ${milliseconds}ms.`);
} else {
log.info(`Slow query took ${milliseconds}ms: ${query.trim().replace(/\s+/g, " ")}`);
}
}
return result;
}
function transactional<T>(func: (statement: Statement) => T) {
try {
const ret = (dbConnection.transaction(func) as any).deferred();
if (!dbConnection.inTransaction) {
// i.e. transaction was really committed (and not just savepoint released)
ws.sendTransactionEntityChangesToAllClients();
}
return ret as T;
} catch (e) {
console.warn("Got error ", e);
const entityChangeIds = cls.getAndClearEntityChangeIds();
if (entityChangeIds.length > 0) {
log.info("Transaction rollback dirtied the becca, forcing reload.");
becca_loader.load();
}
// the maxEntityChangeId has been incremented during failed transaction, need to recalculate
entity_changes.recalculateMaxEntityChangeId();
throw e;
}
}
function fillParamList(paramIds: string[] | Set<string>, truncate = true) {
if ("length" in paramIds && paramIds.length === 0) {
return;
}
if (truncate) {
execute("DELETE FROM param_list");
}
paramIds = Array.from(new Set(paramIds));
if (paramIds.length > 30000) {
fillParamList(paramIds.slice(30000), false);
paramIds = paramIds.slice(0, 30000);
}
// doing it manually to avoid this showing up on the slow query list
const s = stmt(`INSERT INTO param_list VALUES ${paramIds.map((paramId) => `(?)`).join(",")}`);
s.run(paramIds);
}
async function copyDatabase(targetFilePath: string) {
try {
fs.unlinkSync(targetFilePath);
} catch (e) { } // unlink throws exception if the file did not exist
await dbConnection.backup(targetFilePath);
}
function disableSlowQueryLogging<T>(cb: () => T) {
const orig = cls.isSlowQueryLoggingDisabled();
try {
cls.disableSlowQueryLogging(true);
return cb();
} finally {
cls.disableSlowQueryLogging(orig);
}
}
export default {
insert,
replace,
/**
* Get single value from the given query - first column from first returned row.
*
* @param query - SQL query with ? used as parameter placeholder
* @param params - array of params if needed
* @returns single value
*/
getValue,
/**
* Get first returned row.
*
* @param query - SQL query with ? used as parameter placeholder
* @param params - array of params if needed
* @returns - map of column name to column value
*/
getRow,
getRowOrNull,
/**
* Get all returned rows.
*
* @param query - SQL query with ? used as parameter placeholder
* @param params - array of params if needed
* @returns - array of all rows, each row is a map of column name to column value
*/
getRows,
getRawRows,
iterateRows,
getManyRows,
/**
* Get a map of first column mapping to second column.
*
* @param query - SQL query with ? used as parameter placeholder
* @param params - array of params if needed
* @returns - map of first column to second column
*/
getMap,
/**
* Get a first column in an array.
*
* @param query - SQL query with ? used as parameter placeholder
* @param params - array of params if needed
* @returns array of first column of all returned rows
*/
getColumn,
/**
* Execute SQL
*
* @param query - SQL query with ? used as parameter placeholder
* @param params - array of params if needed
*/
execute,
executeMany,
executeScript,
transactional,
upsert,
fillParamList,
copyDatabase,
disableSlowQueryLogging,
rebuildIntegrationTestDatabase
};
export default getSql();

View File

@ -1,8 +1,6 @@
import type { DatabaseProvider, Statement, Transaction } from "@triliumnext/core";
import Database, { type Database as DatabaseType } from "better-sqlite3";
import { readFileSync } from "fs";
import dataDirs from "./data_dir";
import type { DatabaseProvider, Statement, Transaction } from "./sql";
import { unlinkSync } from "fs";
const dbOpts: Database.Options = {
nativeBinding: process.env.BETTERSQLITE3_NATIVE_PATH || undefined
@ -37,6 +35,10 @@ export default class BetterSqlite3Provider implements DatabaseProvider {
}
backup(destinationFile: string) {
try {
unlinkSync(destinationFile);
} catch (e) { } // unlink throws exception if the file did not exist
this.dbConnection?.backup(destinationFile);
}
@ -47,7 +49,7 @@ export default class BetterSqlite3Provider implements DatabaseProvider {
transaction<T>(func: (statement: Statement) => T): Transaction {
if (!this.dbConnection) throw new Error("DB not open.");
return this.dbConnection.transaction(func);
return this.dbConnection.transaction(func) as any;
}
get inTransaction() {
@ -64,8 +66,3 @@ export default class BetterSqlite3Provider implements DatabaseProvider {
}
}
function buildIntegrationTestDatabase(dbPath?: string) {
const dbBuffer = readFileSync(dbPath ?? dataDirs.DOCUMENT_PATH);
return new Database(dbBuffer, dbOpts);
}

View File

@ -0,0 +1,17 @@
import { ExecutionContext, initContext } from "./services/context";
import { getLog, initLog } from "./services/log";
import { initSql } from "./services/sql/index";
import { SqlService, SqlServiceParams } from "./services/sql/sql";
export type * from "./services/sql/types";
export * from "./services/sql/index";
export type { ExecutionContext } from "./services/context";
export function initializeCore({ dbConfig, executionContext }: {
dbConfig: SqlServiceParams,
executionContext: ExecutionContext
}) {
initLog();
initSql(new SqlService(dbConfig, getLog()));
initContext(executionContext);
};

View File

@ -0,0 +1,18 @@
export interface ExecutionContext {
init<T>(fn: () => T): T;
get<T = any>(key: string): T | undefined;
set(key: string, value: any): void;
reset(): void;
}
let ctx: ExecutionContext | null = null;
export function initContext(context: ExecutionContext) {
if (ctx) throw new Error("Context already initialized");
ctx = context;
}
export function getContext(): ExecutionContext {
if (!ctx) throw new Error("Context not initialized");
return ctx;
}

View File

@ -0,0 +1,26 @@
export default class LogService {
log(message: string | Error) {
console.log(message);
}
info(message: string | Error) {
this.log(message);
}
error(message: string | Error | unknown) {
this.log(`ERROR: ${message}`);
}
}
let log: LogService;
export function initLog() {
log = new LogService();
}
export function getLog() {
if (!log) throw new Error("Log service not initialized.");
return log;
}

View File

@ -0,0 +1,13 @@
import type { SqlService } from "./sql";
let sql: SqlService | null = null;
export function initSql(instance: SqlService) {
if (sql) throw new Error("SQL already initialized");
sql = instance;
}
export function getSql(): SqlService {
if (!sql) throw new Error("SQL not initialized");
return sql;
}

View File

@ -0,0 +1,318 @@
import { getContext } from "../context.js";
import type LogService from "../log.js";
import type { DatabaseProvider, Params, RunResult, Statement } from "./types.js";
const LOG_ALL_QUERIES = false;
// smaller values can result in better performance due to better usage of statement cache
const PARAM_LIMIT = 100;
export interface SqlServiceParams {
provider: DatabaseProvider;
onTransactionRollback: () => void;
onTransactionCommit: () => void;
isReadOnly: boolean;
}
export class SqlService {
private dbConnection: DatabaseProvider;
private statementCache: Record<string, Statement> = {};
private params: Omit<SqlServiceParams, "provider">;
constructor({ provider, ...restParams }: SqlServiceParams,
private log: LogService
) {
this.dbConnection = provider;
this.params = restParams;
}
insert<T extends {}>(tableName: string, rec: T, replace = false) {
const keys = Object.keys(rec || {});
if (keys.length === 0) {
this.log.error(`Can't insert empty object into table ${tableName}`);
return;
}
const columns = keys.join(", ");
const questionMarks = keys.map((p) => "?").join(", ");
const query = `INSERT
${replace ? "OR REPLACE" : ""} INTO
${tableName}
(
${columns}
)
VALUES (${questionMarks})`;
const res = this.execute(query, Object.values(rec));
return res ? res.lastInsertRowid : null;
}
replace<T extends {}>(tableName: string, rec: T): number | null {
return this.insert(tableName, rec, true) as number | null;
}
upsert<T extends {}>(tableName: string, primaryKey: string, rec: T) {
const keys = Object.keys(rec || {});
if (keys.length === 0) {
this.log.error(`Can't upsert empty object into table ${tableName}`);
return;
}
const columns = keys.join(", ");
const questionMarks = keys.map((colName) => `@${colName}`).join(", ");
const updateMarks = keys.map((colName) => `${colName} = @${colName}`).join(", ");
const query = `INSERT INTO ${tableName} (${columns}) VALUES (${questionMarks})
ON CONFLICT (${primaryKey}) DO UPDATE SET ${updateMarks}`;
for (const idx in rec) {
if (rec[idx] === true || rec[idx] === false) {
(rec as any)[idx] = rec[idx] ? 1 : 0;
}
}
this.execute(query, rec);
}
/**
* For the given SQL query, returns a prepared statement. For the same query (string comparison), the same statement is returned.
*
* @param sql the SQL query for which to return a prepared statement.
* @param isRaw indicates whether `.raw()` is going to be called on the prepared statement in order to return the raw rows (e.g. via {@link getRawRows()}). The reason is that the raw state is preserved in the saved statement and would break non-raw calls for the same query.
* @returns the corresponding {@link Statement}.
*/
stmt(sql: string, isRaw?: boolean) {
const key = (isRaw ? `raw/${sql}` : sql);
if (!(key in this.statementCache)) {
this.statementCache[key] = this.dbConnection.prepare(sql);
}
return this.statementCache[key];
}
getRow<T>(query: string, params: Params = []): T {
return this.wrap(query, (s) => s.get(params)) as T;
}
getRowOrNull<T>(query: string, params: Params = []): T | null {
const all = this.getRows(query, params);
if (!all) {
return null;
}
return (all.length > 0 ? all[0] : null) as T | null;
}
getValue<T>(query: string, params: Params = []): T {
return this.wrap(query, (s) => s.pluck().get(params)) as T;
}
getManyRows<T>(query: string, params: Params): T[] {
let results: unknown[] = [];
while (params.length > 0) {
const curParams = params.slice(0, Math.min(params.length, PARAM_LIMIT));
params = params.slice(curParams.length);
const curParamsObj: Record<string, any> = {};
let j = 1;
for (const param of curParams) {
curParamsObj[`param${j++}`] = param;
}
let i = 1;
const questionMarks = curParams.map(() => `:param${i++}`).join(",");
const curQuery = query.replace(/\?\?\?/g, questionMarks);
const statement = curParams.length === PARAM_LIMIT ? this.stmt(curQuery) : this.dbConnection.prepare(curQuery);
const subResults = statement.all(curParamsObj);
results = results.concat(subResults);
}
return (results as T[] | null) || [];
}
getRows<T>(query: string, params: Params = []): T[] {
return this.wrap(query, (s) => s.all(params)) as T[];
}
getRawRows<T extends {} | unknown[]>(query: string, params: Params = []): T[] {
return (this.wrap(query, (s) => s.raw().all(params), true) as T[]) || [];
}
iterateRows<T>(query: string, params: Params = []): IterableIterator<T> {
if (LOG_ALL_QUERIES) {
console.log(query);
}
return this.stmt(query).iterate(params) as IterableIterator<T>;
}
getMap<K extends string | number | symbol, V>(query: string, params: Params = []) {
const map: Record<K, V> = {} as Record<K, V>;
const results = this.getRawRows<[K, V]>(query, params);
for (const row of results || []) {
map[row[0]] = row[1];
}
return map;
}
getColumn<T>(query: string, params: Params = []): T[] {
return this.wrap(query, (s) => s.pluck().all(params)) as T[];
}
execute(query: string, params: Params = []): RunResult {
if (this.params.isReadOnly && (query.startsWith("UPDATE") || query.startsWith("INSERT") || query.startsWith("DELETE"))) {
this.log.error(`read-only DB ignored: ${query} with parameters ${JSON.stringify(params)}`);
return {
changes: 0,
lastInsertRowid: 0
};
}
return this.wrap(query, (s) => s.run(params)) as RunResult;
}
executeMany(query: string, params: Params) {
if (LOG_ALL_QUERIES) {
console.log(query);
}
while (params.length > 0) {
const curParams = params.slice(0, Math.min(params.length, PARAM_LIMIT));
params = params.slice(curParams.length);
const curParamsObj: Record<string, any> = {};
let j = 1;
for (const param of curParams) {
curParamsObj[`param${j++}`] = param;
}
let i = 1;
const questionMarks = curParams.map(() => `:param${i++}`).join(",");
const curQuery = query.replace(/\?\?\?/g, questionMarks);
this.dbConnection.prepare(curQuery).run(curParamsObj);
}
}
executeScript(query: string) {
if (LOG_ALL_QUERIES) {
console.log(query);
}
this.dbConnection.exec(query);
}
/**
* @param isRaw indicates whether `.raw()` is going to be called on the prepared statement in order to return the raw rows (e.g. via {@link getRawRows()}). The reason is that the raw state is preserved in the saved statement and would break non-raw calls for the same query.
*/
wrap(query: string, func: (statement: Statement) => unknown, isRaw?: boolean): unknown {
const startTimestamp = Date.now();
let result;
if (LOG_ALL_QUERIES) {
console.log(query);
}
try {
result = func(this.stmt(query, isRaw));
} catch (e: any) {
if (e.message.includes("The database connection is not open")) {
// this often happens on killing the app which puts these alerts in front of user
// in these cases error should be simply ignored.
console.log(e.message);
return null;
}
throw e;
}
const milliseconds = Date.now() - startTimestamp;
if (milliseconds >= 20 && !isSlowQueryLoggingDisabled()) {
if (query.includes("WITH RECURSIVE")) {
this.log.info(`Slow recursive query took ${milliseconds}ms.`);
} else {
this.log.info(`Slow query took ${milliseconds}ms: ${query.trim().replace(/\s+/g, " ")}`);
}
}
return result;
}
transactional<T>(func: (statement: Statement) => T) {
try {
const ret = (this.dbConnection.transaction(func) as any).deferred();
if (!this.dbConnection.inTransaction) {
// i.e. transaction was really committed (and not just savepoint released)
this.params.onTransactionCommit();
}
return ret as T;
} catch (e) {
console.warn("Got error ", e);
this.params.onTransactionRollback();
throw e;
}
}
fillParamList(paramIds: string[] | Set<string>, truncate = true) {
if ("length" in paramIds && paramIds.length === 0) {
return;
}
if (truncate) {
this.execute("DELETE FROM param_list");
}
paramIds = Array.from(new Set(paramIds));
if (paramIds.length > 30000) {
this.fillParamList(paramIds.slice(30000), false);
paramIds = paramIds.slice(0, 30000);
}
// doing it manually to avoid this showing up on the slow query list
const s = this.stmt(`INSERT INTO param_list VALUES ${paramIds.map((paramId) => `(?)`).join(",")}`);
s.run(paramIds);
}
async copyDatabase(targetFilePath: string) {
await this.dbConnection.backup(targetFilePath);
}
disableSlowQueryLogging<T>(cb: () => T) {
const orig = isSlowQueryLoggingDisabled();
try {
disableSlowQueryLogging(true);
return cb();
} finally {
disableSlowQueryLogging(orig);
}
}
}
function disableSlowQueryLogging(disable: boolean) {
getContext().set("disableSlowQueryLogging", disable);
}
function isSlowQueryLoggingDisabled() {
return !!getContext().get("disableSlowQueryLogging");
}

View File

@ -0,0 +1,31 @@
export type Params = any;
export interface Statement {
run(...params: Params): RunResult;
get(params: Params): unknown;
all(...params: Params): unknown[];
iterate(...params: Params): IterableIterator<unknown>;
raw(toggleState?: boolean): this;
pluck(toggleState?: boolean): this;
}
export interface Transaction {
deferred(): void;
}
export interface RunResult {
changes: number;
lastInsertRowid: number | bigint;
}
export interface DatabaseProvider {
loadFromFile(path: string, isReadOnly: boolean): void;
loadFromMemory(): void;
loadFromBuffer(buffer: NonSharedBuffer): void;
backup(destinationFile: string): void;
prepare(query: string): Statement;
transaction<T>(func: (statement: Statement) => T): Transaction;
get inTransaction(): boolean;
exec(query: string): void;
close(): void;
}