diff --git a/src/app.js b/src/app.js index c6afc27f8..2dad38c7d 100644 --- a/src/app.js +++ b/src/app.js @@ -43,7 +43,7 @@ require('./routes/custom.js').register(app); require('./routes/error_handlers.js').register(app); // triggers sync timer -require('./services/sync.js'); +require('./services/sync'); // triggers backup timer require('./services/backup'); diff --git a/src/routes/api/script.js b/src/routes/api/script.js index 2dac2ca64..f7903f411 100644 --- a/src/routes/api/script.js +++ b/src/routes/api/script.js @@ -3,7 +3,7 @@ const scriptService = require('../../services/script.js'); const attributeService = require('../../services/attributes'); const becca = require('../../becca/becca'); -const syncService = require('../../services/sync.js'); +const syncService = require('../../services/sync'); const sql = require('../../services/sql'); // The async/await here is very confusing, because the body.script may, but may not be async. If it is async, then we diff --git a/src/routes/api/sync.js b/src/routes/api/sync.js index 0d68827e1..c1939552e 100644 --- a/src/routes/api/sync.js +++ b/src/routes/api/sync.js @@ -1,6 +1,6 @@ "use strict"; -const syncService = require('../../services/sync.js'); +const syncService = require('../../services/sync'); const syncUpdateService = require('../../services/sync_update'); const entityChangesService = require('../../services/entity_changes'); const sql = require('../../services/sql'); diff --git a/src/routes/routes.js b/src/routes/routes.js index 7c5ea7e0a..0b906f9f1 100644 --- a/src/routes/routes.js +++ b/src/routes/routes.js @@ -32,7 +32,7 @@ const revisionsApiRoute = require('./api/revisions'); const recentChangesApiRoute = require('./api/recent_changes.js'); const optionsApiRoute = require('./api/options.js'); const passwordApiRoute = require('./api/password'); -const syncApiRoute = require('./api/sync.js'); +const syncApiRoute = require('./api/sync'); const loginApiRoute = require('./api/login.js'); const recentNotesRoute = require('./api/recent_notes.js'); const appInfoRoute = require('./api/app_info'); diff --git a/src/services/entity_changes_interface.ts b/src/services/entity_changes_interface.ts index 745a8a8fa..a0e6bb4bc 100644 --- a/src/services/entity_changes_interface.ts +++ b/src/services/entity_changes_interface.ts @@ -12,4 +12,14 @@ export interface EntityChange { componentId?: string | null; changeId?: string | null; instanceId?: string | null; -} \ No newline at end of file +} + +export interface EntityRow { + isDeleted?: boolean; + content?: Buffer | string; +} + +export interface EntityChangeRecord { + entityChange: EntityChange; + entity?: EntityRow; +} diff --git a/src/services/request.ts b/src/services/request.ts index 47be2d2ef..9922e4b2d 100644 --- a/src/services/request.ts +++ b/src/services/request.ts @@ -4,29 +4,11 @@ import utils = require('./utils'); import log = require('./log'); import url = require('url'); import syncOptions = require('./sync_options'); +import { ExecOpts } from './request_interface'; // this service provides abstraction over node's HTTP/HTTPS and electron net.client APIs // this allows supporting system proxy -interface ExecOpts { - proxy: "noproxy" | null; - method: string; - url: string; - paging?: { - pageCount: number; - pageIndex: number; - requestId: string; - }; - cookieJar?: { - header?: string; - }; - auth?: { - password?: string; - }, - timeout: number; - body: string; -} - interface ClientOpts { method: string; url: string; diff --git a/src/services/request_interface.ts b/src/services/request_interface.ts new file mode 100644 index 000000000..4ba1da0a7 --- /dev/null +++ b/src/services/request_interface.ts @@ -0,0 +1,20 @@ +export interface CookieJar { + header?: string; +} + +export interface ExecOpts { + proxy: "noproxy" | null; + method: string; + url: string; + paging?: { + pageCount: number; + pageIndex: number; + requestId: string; + }; + cookieJar?: CookieJar; + auth?: { + password?: string; + }, + timeout: number; + body: string; +} \ No newline at end of file diff --git a/src/services/setup.js b/src/services/setup.js index 55e559e87..9c5d0c0c4 100644 --- a/src/services/setup.js +++ b/src/services/setup.js @@ -1,4 +1,4 @@ -const syncService = require('./sync.js'); +const syncService = require('./sync'); const log = require('./log'); const sqlInit = require('./sql_init'); const optionService = require('./options'); diff --git a/src/services/sync.js b/src/services/sync.ts similarity index 75% rename from src/services/sync.js rename to src/services/sync.ts index a935e1de8..da559e36a 100644 --- a/src/services/sync.js +++ b/src/services/sync.ts @@ -1,27 +1,50 @@ "use strict"; -const log = require('./log'); -const sql = require('./sql'); -const optionService = require('./options'); -const utils = require('./utils'); -const instanceId = require('./instance_id'); -const dateUtils = require('./date_utils'); -const syncUpdateService = require('./sync_update'); -const contentHashService = require('./content_hash'); -const appInfo = require('./app_info'); -const syncOptions = require('./sync_options'); -const syncMutexService = require('./sync_mutex'); -const cls = require('./cls'); -const request = require('./request'); -const ws = require('./ws'); -const entityChangesService = require('./entity_changes'); -const entityConstructor = require('../becca/entity_constructor'); -const becca = require('../becca/becca'); +import log = require('./log'); +import sql = require('./sql'); +import optionService = require('./options'); +import utils = require('./utils'); +import instanceId = require('./instance_id'); +import dateUtils = require('./date_utils'); +import syncUpdateService = require('./sync_update'); +import contentHashService = require('./content_hash'); +import appInfo = require('./app_info'); +import syncOptions = require('./sync_options'); +import syncMutexService = require('./sync_mutex'); +import cls = require('./cls'); +import request = require('./request'); +import ws = require('./ws'); +import entityChangesService = require('./entity_changes'); +import entityConstructor = require('../becca/entity_constructor'); +import becca = require('../becca/becca'); +import { EntityChange, EntityChangeRecord, EntityRow } from './entity_changes_interface'; +import { CookieJar, ExecOpts } from './request_interface'; let proxyToggle = true; let outstandingPullCount = 0; +interface CheckResponse { + maxEntityChangeId: number; + entityHashes: Record> +} + +interface SyncResponse { + instanceId: string; + maxEntityChangeId: number; +} + +interface ChangesResponse { + entityChanges: EntityChangeRecord[]; + lastEntityChangeId: number; + outstandingPullCount: number; +} + +interface SyncContext { + cookieJar: CookieJar; + instanceId?: string; +} + async function sync() { try { return await syncMutexService.doExclusively(async () => { @@ -53,7 +76,7 @@ async function sync() { }; }); } - catch (e) { + catch (e: any) { // we're dynamically switching whether we're using proxy or not based on whether we encountered error with the current method proxyToggle = !proxyToggle; @@ -93,19 +116,23 @@ async function login() { return await doLogin(); } -async function doLogin() { +async function doLogin(): Promise { const timestamp = dateUtils.utcNowDateTime(); const documentSecret = optionService.getOption('documentSecret'); const hash = utils.hmac(documentSecret, timestamp); - const syncContext = { cookieJar: {} }; - const resp = await syncRequest(syncContext, 'POST', '/api/login/sync', { + const syncContext: SyncContext = { cookieJar: {} }; + const resp = await syncRequest(syncContext, 'POST', '/api/login/sync', { timestamp: timestamp, syncVersion: appInfo.syncVersion, hash: hash }); + if (!resp) { + throw new Error("Got no response."); + } + if (resp.instanceId === instanceId) { throw new Error(`Sync server has instance ID '${resp.instanceId}' which is also local. This usually happens when the sync client is (mis)configured to sync with itself (URL points back to client) instead of the correct sync server.`); } @@ -125,7 +152,7 @@ async function doLogin() { return syncContext; } -async function pullChanges(syncContext) { +async function pullChanges(syncContext: SyncContext) { while (true) { const lastSyncedPull = getLastSyncedPull(); const logMarkerId = utils.randomString(10); // to easily pair sync events between client and server logs @@ -133,7 +160,10 @@ async function pullChanges(syncContext) { const startDate = Date.now(); - const resp = await syncRequest(syncContext, 'GET', changesUri); + const resp = await syncRequest(syncContext, 'GET', changesUri); + if (!resp) { + throw new Error("Request failed."); + } const {entityChanges, lastEntityChangeId} = resp; outstandingPullCount = resp.outstandingPullCount; @@ -141,7 +171,9 @@ async function pullChanges(syncContext) { const pulledDate = Date.now(); sql.transactional(() => { - syncUpdateService.updateEntities(entityChanges, syncContext.instanceId); + if (syncContext.instanceId) { + syncUpdateService.updateEntities(entityChanges, syncContext.instanceId); + } if (lastSyncedPull !== lastEntityChangeId) { setLastSyncedPull(lastEntityChangeId); @@ -156,7 +188,7 @@ async function pullChanges(syncContext) { log.info(`Sync ${logMarkerId}: Pulled ${entityChanges.length} changes in ${sizeInKb} KB, starting at entityChangeId=${lastSyncedPull} in ${pulledDate - startDate}ms and applied them in ${Date.now() - pulledDate}ms, ${outstandingPullCount} outstanding pulls`); } - catch (e) { + catch (e: any) { log.error(`Error occurred ${e.message} ${e.stack}`); } } @@ -165,11 +197,11 @@ async function pullChanges(syncContext) { log.info("Finished pull"); } -async function pushChanges(syncContext) { - let lastSyncedPush = getLastSyncedPush(); +async function pushChanges(syncContext: SyncContext) { + let lastSyncedPush: number | null | undefined = getLastSyncedPush(); while (true) { - const entityChanges = sql.getRows('SELECT * FROM entity_changes WHERE isSynced = 1 AND id > ? LIMIT 1000', [lastSyncedPush]); + const entityChanges = sql.getRows('SELECT * FROM entity_changes WHERE isSynced = 1 AND id > ? LIMIT 1000', [lastSyncedPush]); if (entityChanges.length === 0) { log.info("Nothing to push"); @@ -190,7 +222,7 @@ async function pushChanges(syncContext) { } }); - if (filteredEntityChanges.length === 0) { + if (filteredEntityChanges.length === 0 && lastSyncedPush) { // there still might be more sync changes (because of batch limit), just all the current batch // has been filtered out setLastSyncedPush(lastSyncedPush); @@ -214,16 +246,22 @@ async function pushChanges(syncContext) { lastSyncedPush = entityChangesRecords[entityChangesRecords.length - 1].entityChange.id; - setLastSyncedPush(lastSyncedPush); + if (lastSyncedPush) { + setLastSyncedPush(lastSyncedPush); + } } } -async function syncFinished(syncContext) { +async function syncFinished(syncContext: SyncContext) { await syncRequest(syncContext, 'POST', '/api/sync/finished'); } -async function checkContentHash(syncContext) { - const resp = await syncRequest(syncContext, 'GET', '/api/sync/check'); +async function checkContentHash(syncContext: SyncContext) { + const resp = await syncRequest(syncContext, 'GET', '/api/sync/check'); + if (!resp) { + throw new Error("Got no response."); + } + const lastSyncedPullId = getLastSyncedPull(); if (lastSyncedPullId < resp.maxEntityChangeId) { @@ -261,8 +299,12 @@ async function checkContentHash(syncContext) { const PAGE_SIZE = 1000000; -async function syncRequest(syncContext, method, requestPath, body) { - body = body ? JSON.stringify(body) : ''; +interface SyncContext { + cookieJar: CookieJar +} + +async function syncRequest(syncContext: SyncContext, method: string, requestPath: string, _body?: {}) { + const body = _body ? JSON.stringify(_body) : ''; const timeout = syncOptions.getSyncTimeout(); @@ -272,7 +314,7 @@ async function syncRequest(syncContext, method, requestPath, body) { const pageCount = Math.max(1, Math.ceil(body.length / PAGE_SIZE)); for (let pageIndex = 0; pageIndex < pageCount; pageIndex++) { - const opts = { + const opts: ExecOpts = { method, url: syncOptions.getSyncServerHost() + requestPath, cookieJar: syncContext.cookieJar, @@ -286,13 +328,13 @@ async function syncRequest(syncContext, method, requestPath, body) { proxy: proxyToggle ? syncOptions.getSyncProxy() : null }; - response = await utils.timeLimit(request.exec(opts), timeout); + response = await utils.timeLimit(request.exec(opts), timeout) as T; } return response; } -function getEntityChangeRow(entityChange) { +function getEntityChangeRow(entityChange: EntityChange) { const {entityName, entityId} = entityChange; if (entityName === 'note_reordering') { @@ -305,7 +347,7 @@ function getEntityChangeRow(entityChange) { throw new Error(`Unknown entity for entity change ${JSON.stringify(entityChange)}`); } - const entityRow = sql.getRow(`SELECT * FROM ${entityName} WHERE ${primaryKey} = ?`, [entityId]); + const entityRow = sql.getRow(`SELECT * FROM ${entityName} WHERE ${primaryKey} = ?`, [entityId]); if (!entityRow) { log.error(`Cannot find entity for entity change ${JSON.stringify(entityChange)}`); @@ -317,15 +359,17 @@ function getEntityChangeRow(entityChange) { entityRow.content = Buffer.from(entityRow.content, 'utf-8'); } - entityRow.content = entityRow.content.toString("base64"); + if (entityRow.content) { + entityRow.content = entityRow.content.toString("base64"); + } } return entityRow; } } -function getEntityChangeRecords(entityChanges) { - const records = []; +function getEntityChangeRecords(entityChanges: EntityChange[]) { + const records: EntityChangeRecord[] = []; let length = 0; for (const entityChange of entityChanges) { @@ -340,7 +384,7 @@ function getEntityChangeRecords(entityChanges) { continue; } - const record = { entityChange, entity }; + const record: EntityChangeRecord = { entityChange, entity }; records.push(record); @@ -359,7 +403,7 @@ function getLastSyncedPull() { return parseInt(optionService.getOption('lastSyncedPull')); } -function setLastSyncedPull(entityChangeId) { +function setLastSyncedPull(entityChangeId: number) { const lastSyncedPullOption = becca.getOption('lastSyncedPull'); if (lastSyncedPullOption) { // might be null in initial sync when becca is not loaded @@ -378,7 +422,7 @@ function getLastSyncedPush() { return lastSyncedPush; } -function setLastSyncedPush(entityChangeId) { +function setLastSyncedPush(entityChangeId: number) { ws.setLastSyncedPush(entityChangeId); const lastSyncedPushOption = becca.getOption('lastSyncedPush'); @@ -409,7 +453,7 @@ require('../becca/becca_loader').beccaLoaded.then(() => { getLastSyncedPush(); }); -module.exports = { +export = { sync, login, getEntityChangeRecords, diff --git a/src/services/sync_update.ts b/src/services/sync_update.ts index bed9dcaf6..888947b8b 100644 --- a/src/services/sync_update.ts +++ b/src/services/sync_update.ts @@ -4,17 +4,7 @@ import entityChangesService = require('./entity_changes'); import eventService = require('./events'); import entityConstructor = require('../becca/entity_constructor'); import ws = require('./ws'); -import { EntityChange } from './entity_changes_interface'; - -interface EntityRow { - isDeleted?: boolean; - content: Buffer | string; -} - -interface EntityChangeInput { - entityChange: EntityChange; - entity: EntityRow; -} +import { EntityChange, EntityChangeRecord, EntityRow } from './entity_changes_interface'; interface UpdateContext { alreadyErased: number; @@ -22,7 +12,7 @@ interface UpdateContext { updated: Record } -function updateEntities(entityChanges: EntityChangeInput[], instanceId: string) { +function updateEntities(entityChanges: EntityChangeRecord[], instanceId: string) { if (entityChanges.length === 0) { return; } @@ -51,7 +41,9 @@ function updateEntities(entityChanges: EntityChangeInput[], instanceId: string) atLeastOnePullApplied = true; } - updateEntity(entityChange, entity, instanceId, updateContext); + if (entity) { + updateEntity(entityChange, entity, instanceId, updateContext); + } } logUpdateContext(updateContext); diff --git a/src/services/utils.ts b/src/services/utils.ts index 4cbb96d2f..852b9a1df 100644 --- a/src/services/utils.ts +++ b/src/services/utils.ts @@ -238,7 +238,7 @@ function getNoteTitle(filePath: string, replaceUnderscoresWithSpaces: boolean, n } } -function timeLimit(promise: Promise, limitMs: number, errorMessage: string): Promise { +function timeLimit(promise: Promise, limitMs: number, errorMessage?: string): Promise { if (!promise || !promise.then) { // it's not actually a promise return promise; }