From d38ca72e085e850504a1744b45a54289705388ce Mon Sep 17 00:00:00 2001 From: perfectra1n Date: Sat, 9 Aug 2025 13:40:17 -0700 Subject: [PATCH] feat(llm): remove overly complex circuit breaker --- apps/server/src/routes/api/llm_metrics.ts | 65 +-- .../src/services/llm/config/llm_options.ts | 51 -- .../__tests__/circuit_breaker.spec.ts | 319 ------------ .../services/llm/providers/circuit_breaker.ts | 457 ------------------ .../llm/providers/provider_factory.ts | 215 ++++---- 5 files changed, 123 insertions(+), 984 deletions(-) delete mode 100644 apps/server/src/services/llm/providers/__tests__/circuit_breaker.spec.ts delete mode 100644 apps/server/src/services/llm/providers/circuit_breaker.ts diff --git a/apps/server/src/routes/api/llm_metrics.ts b/apps/server/src/routes/api/llm_metrics.ts index 213a79288..26a55cec5 100644 --- a/apps/server/src/routes/api/llm_metrics.ts +++ b/apps/server/src/routes/api/llm_metrics.ts @@ -79,51 +79,7 @@ router.get('/llm/metrics/summary', (req: Request, res: Response) => { } }); -/** - * GET /api/llm/circuit-breaker/status - * Returns circuit breaker status for all providers - */ -router.get('/llm/circuit-breaker/status', (req: Request, res: Response) => { - try { - const factory = getProviderFactory(); - - if (!factory) { - return res.status(503).json({ error: 'LLM service not initialized' }); - } - const status = factory.getCircuitBreakerStatus(); - - if (!status) { - return res.status(503).json({ error: 'Circuit breaker not enabled' }); - } - - res.json(status); - } catch (error: any) { - log.error(`[LLM Metrics API] Error getting circuit breaker status: ${error.message}`); - res.status(500).json({ error: 'Internal server error' }); - } -}); - -/** - * POST /api/llm/circuit-breaker/reset/:provider - * Reset circuit breaker for a specific provider - */ -router.post('/llm/circuit-breaker/reset/:provider', (req: Request, res: Response) => { - try { - const { provider } = req.params; - const factory = getProviderFactory(); - - if (!factory) { - return res.status(503).json({ error: 'LLM service not initialized' }); - } - - factory.resetCircuitBreaker(provider as any); - res.json({ message: `Circuit breaker reset for provider: ${provider}` }); - } catch (error: any) { - log.error(`[LLM Metrics API] Error resetting circuit breaker: ${error.message}`); - res.status(500).json({ error: 'Internal server error' }); - } -}); /** * GET /api/llm/health @@ -140,16 +96,28 @@ router.get('/llm/health', (req: Request, res: Response) => { }); } - const circuitStatus = factory.getCircuitBreakerStatus(); const metrics = factory.getMetricsSummary(); const statistics = factory.getStatistics(); + const healthStatuses = factory.getAllHealthStatuses(); + + // Get available/unavailable providers from health statuses + const available: string[] = []; + const unavailable: string[] = []; + + for (const [provider, status] of healthStatuses) { + if (status.healthy) { + available.push(provider); + } else { + unavailable.push(provider); + } + } const health = { status: 'healthy', timestamp: new Date().toISOString(), providers: { - available: circuitStatus?.summary?.availableProviders || [], - unavailable: circuitStatus?.summary?.unavailableProviders || [], + available, + unavailable, cached: statistics?.cachedProviders || 0, healthy: statistics?.healthyProviders || 0, unhealthy: statistics?.unhealthyProviders || 0 @@ -158,8 +126,7 @@ router.get('/llm/health', (req: Request, res: Response) => { totalRequests: metrics?.system?.totalRequests || 0, totalFailures: metrics?.system?.totalFailures || 0, uptime: metrics?.system?.uptime || 0 - }, - circuitBreakers: circuitStatus?.summary || {} + } }; // Determine overall health diff --git a/apps/server/src/services/llm/config/llm_options.ts b/apps/server/src/services/llm/config/llm_options.ts index db42975cc..77bd76970 100644 --- a/apps/server/src/services/llm/config/llm_options.ts +++ b/apps/server/src/services/llm/config/llm_options.ts @@ -13,13 +13,6 @@ import { ExportFormat } from '../metrics/metrics_exporter.js'; * LLM configuration options */ export interface LLMOptions { - // Circuit Breaker Configuration - circuitBreakerEnabled: boolean; - circuitBreakerFailureThreshold: number; - circuitBreakerFailureWindow: number; - circuitBreakerCooldownPeriod: number; - circuitBreakerSuccessThreshold: number; - // Metrics Configuration metricsEnabled: boolean; metricsExportFormat: ExportFormat; @@ -43,13 +36,6 @@ export interface LLMOptions { * Default LLM options */ const DEFAULT_OPTIONS: LLMOptions = { - // Circuit Breaker Defaults - circuitBreakerEnabled: true, - circuitBreakerFailureThreshold: 5, - circuitBreakerFailureWindow: 60000, // 1 minute - circuitBreakerCooldownPeriod: 30000, // 30 seconds - circuitBreakerSuccessThreshold: 2, - // Metrics Defaults metricsEnabled: true, metricsExportFormat: 'prometheus' as ExportFormat, @@ -70,13 +56,6 @@ const DEFAULT_OPTIONS: LLMOptions = { * Option keys in Trilium's option system */ export const LLM_OPTION_KEYS = { - // Circuit Breaker - CIRCUIT_BREAKER_ENABLED: 'llmCircuitBreakerEnabled' as const, - CIRCUIT_BREAKER_FAILURE_THRESHOLD: 'llmCircuitBreakerFailureThreshold' as const, - CIRCUIT_BREAKER_FAILURE_WINDOW: 'llmCircuitBreakerFailureWindow' as const, - CIRCUIT_BREAKER_COOLDOWN_PERIOD: 'llmCircuitBreakerCooldownPeriod' as const, - CIRCUIT_BREAKER_SUCCESS_THRESHOLD: 'llmCircuitBreakerSuccessThreshold' as const, - // Metrics METRICS_ENABLED: 'llmMetricsEnabled' as const, METRICS_EXPORT_FORMAT: 'llmMetricsExportFormat' as const, @@ -110,28 +89,6 @@ export function getLLMOptions(): LLMOptions { } return { - // Circuit Breaker - circuitBreakerEnabled: getOptionSafe( - () => optionService.getOptionBool(LLM_OPTION_KEYS.CIRCUIT_BREAKER_ENABLED), - DEFAULT_OPTIONS.circuitBreakerEnabled - ), - circuitBreakerFailureThreshold: getOptionSafe( - () => optionService.getOptionInt(LLM_OPTION_KEYS.CIRCUIT_BREAKER_FAILURE_THRESHOLD), - DEFAULT_OPTIONS.circuitBreakerFailureThreshold - ), - circuitBreakerFailureWindow: getOptionSafe( - () => optionService.getOptionInt(LLM_OPTION_KEYS.CIRCUIT_BREAKER_FAILURE_WINDOW), - DEFAULT_OPTIONS.circuitBreakerFailureWindow - ), - circuitBreakerCooldownPeriod: getOptionSafe( - () => optionService.getOptionInt(LLM_OPTION_KEYS.CIRCUIT_BREAKER_COOLDOWN_PERIOD), - DEFAULT_OPTIONS.circuitBreakerCooldownPeriod - ), - circuitBreakerSuccessThreshold: getOptionSafe( - () => optionService.getOptionInt(LLM_OPTION_KEYS.CIRCUIT_BREAKER_SUCCESS_THRESHOLD), - DEFAULT_OPTIONS.circuitBreakerSuccessThreshold - ), - // Metrics metricsEnabled: getOptionSafe( () => optionService.getOptionBool(LLM_OPTION_KEYS.METRICS_ENABLED), @@ -269,14 +226,6 @@ export function createProviderFactoryOptions() { enableCaching: options.providerCachingEnabled, cacheTimeout: options.providerCacheTimeout, enableMetrics: options.metricsEnabled, - enableCircuitBreaker: options.circuitBreakerEnabled, - circuitBreakerConfig: { - failureThreshold: options.circuitBreakerFailureThreshold, - failureWindow: options.circuitBreakerFailureWindow, - cooldownPeriod: options.circuitBreakerCooldownPeriod, - successThreshold: options.circuitBreakerSuccessThreshold, - enableLogging: true - }, metricsExporterConfig: { enabled: options.metricsEnabled, format: options.metricsExportFormat, diff --git a/apps/server/src/services/llm/providers/__tests__/circuit_breaker.spec.ts b/apps/server/src/services/llm/providers/__tests__/circuit_breaker.spec.ts deleted file mode 100644 index e6f4f531f..000000000 --- a/apps/server/src/services/llm/providers/__tests__/circuit_breaker.spec.ts +++ /dev/null @@ -1,319 +0,0 @@ -/** - * Circuit Breaker Tests - */ - -import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; -import { - CircuitBreaker, - CircuitBreakerManager, - CircuitState, - CircuitOpenError -} from '../circuit_breaker.js'; - -describe('CircuitBreaker', () => { - let breaker: CircuitBreaker; - - beforeEach(() => { - breaker = new CircuitBreaker('test-provider', { - failureThreshold: 3, - failureWindow: 1000, - cooldownPeriod: 500, - successThreshold: 2, - enableLogging: false - }); - }); - - afterEach(() => { - breaker.dispose(); - }); - - describe('State Transitions', () => { - it('should start in CLOSED state', () => { - expect(breaker.getState()).toBe(CircuitState.CLOSED); - }); - - it('should open after failure threshold', async () => { - const failingFn = () => Promise.reject(new Error('Test failure')); - - // First two failures - should remain closed - await expect(breaker.execute(failingFn)).rejects.toThrow('Test failure'); - expect(breaker.getState()).toBe(CircuitState.CLOSED); - - await expect(breaker.execute(failingFn)).rejects.toThrow('Test failure'); - expect(breaker.getState()).toBe(CircuitState.CLOSED); - - // Third failure - should open - await expect(breaker.execute(failingFn)).rejects.toThrow('Test failure'); - expect(breaker.getState()).toBe(CircuitState.OPEN); - }); - - it('should reject requests when open', async () => { - // Force open - breaker.forceOpen('Test'); - - const fn = () => Promise.resolve('success'); - await expect(breaker.execute(fn)).rejects.toThrow(CircuitOpenError); - }); - - it('should transition to HALF_OPEN after cooldown', async () => { - // Force open - breaker.forceOpen('Test'); - expect(breaker.getState()).toBe(CircuitState.OPEN); - - // Wait for cooldown - await new Promise(resolve => setTimeout(resolve, 600)); - expect(breaker.getState()).toBe(CircuitState.HALF_OPEN); - }); - - it('should close after success threshold in HALF_OPEN', async () => { - // Force to half-open - breaker.forceOpen('Test'); - await new Promise(resolve => setTimeout(resolve, 600)); - expect(breaker.getState()).toBe(CircuitState.HALF_OPEN); - - const successFn = () => Promise.resolve('success'); - - // First success - await breaker.execute(successFn); - expect(breaker.getState()).toBe(CircuitState.HALF_OPEN); - - // Second success - should close - await breaker.execute(successFn); - expect(breaker.getState()).toBe(CircuitState.CLOSED); - }); - - it('should reopen on failure in HALF_OPEN', async () => { - // Force to half-open - breaker.forceOpen('Test'); - await new Promise(resolve => setTimeout(resolve, 600)); - expect(breaker.getState()).toBe(CircuitState.HALF_OPEN); - - const failingFn = () => Promise.reject(new Error('Test failure')); - - // Failure in half-open should immediately open - await expect(breaker.execute(failingFn)).rejects.toThrow('Test failure'); - expect(breaker.getState()).toBe(CircuitState.OPEN); - }); - }); - - describe('Failure Window', () => { - it('should reset failures outside window', async () => { - const failingFn = () => Promise.reject(new Error('Test failure')); - - // Two failures - await expect(breaker.execute(failingFn)).rejects.toThrow(); - await expect(breaker.execute(failingFn)).rejects.toThrow(); - expect(breaker.getState()).toBe(CircuitState.CLOSED); - - // Wait for window to expire - await new Promise(resolve => setTimeout(resolve, 1100)); - - // Two more failures - should still be closed (window reset) - await expect(breaker.execute(failingFn)).rejects.toThrow(); - await expect(breaker.execute(failingFn)).rejects.toThrow(); - expect(breaker.getState()).toBe(CircuitState.CLOSED); - }); - }); - - describe('Statistics', () => { - it('should track statistics', async () => { - const successFn = () => Promise.resolve('success'); - const failingFn = () => Promise.reject(new Error('Test failure')); - - await breaker.execute(successFn); - await expect(breaker.execute(failingFn)).rejects.toThrow(); - - const stats = breaker.getStats(); - expect(stats.totalRequests).toBe(2); - expect(stats.successes).toBe(1); - expect(stats.failures).toBe(1); - expect(stats.rejectedRequests).toBe(0); - }); - - it('should track rejected requests', async () => { - breaker.forceOpen('Test'); - - const fn = () => Promise.resolve('success'); - await expect(breaker.execute(fn)).rejects.toThrow(CircuitOpenError); - await expect(breaker.execute(fn)).rejects.toThrow(CircuitOpenError); - - const stats = breaker.getStats(); - expect(stats.rejectedRequests).toBe(2); - }); - - it('should track state history', async () => { - breaker.forceOpen('Test open'); - breaker.forceClose('Test close'); - - const stats = breaker.getStats(); - expect(stats.stateHistory).toHaveLength(2); - expect(stats.stateHistory[0].state).toBe(CircuitState.OPEN); - expect(stats.stateHistory[1].state).toBe(CircuitState.CLOSED); - }); - }); - - describe('Timeout', () => { - it('should apply timeout in HALF_OPEN state', async () => { - // Force to half-open - breaker.forceOpen('Test'); - await new Promise(resolve => setTimeout(resolve, 600)); - - const slowFn = () => new Promise(resolve => - setTimeout(() => resolve('success'), 10000) - ); - - // Should timeout (half-open timeout is 5000ms in config) - await expect(breaker.execute(slowFn)).rejects.toThrow(/timed out/); - }); - }); -}); - -describe('CircuitBreakerManager', () => { - let manager: CircuitBreakerManager; - - beforeEach(() => { - manager = new CircuitBreakerManager({ - failureThreshold: 2, - failureWindow: 1000, - cooldownPeriod: 500, - enableLogging: false - }); - }); - - afterEach(() => { - manager.dispose(); - }); - - describe('Breaker Management', () => { - it('should create breakers on demand', () => { - const breaker1 = manager.getBreaker('provider1'); - const breaker2 = manager.getBreaker('provider2'); - - expect(breaker1).toBeDefined(); - expect(breaker2).toBeDefined(); - expect(breaker1).not.toBe(breaker2); - }); - - it('should return same breaker for same provider', () => { - const breaker1 = manager.getBreaker('provider1'); - const breaker2 = manager.getBreaker('provider1'); - - expect(breaker1).toBe(breaker2); - }); - - it('should execute with breaker protection', async () => { - const fn = () => Promise.resolve('success'); - const result = await manager.execute('provider1', fn); - - expect(result).toBe('success'); - }); - }); - - describe('Health Summary', () => { - it('should provide health summary', async () => { - const failingFn = () => Promise.reject(new Error('Test')); - const successFn = () => Promise.resolve('success'); - - // Create some breakers in different states - await manager.execute('provider1', successFn); - - // Force provider2 to open - const breaker2 = manager.getBreaker('provider2'); - breaker2.forceOpen('Test'); - - const summary = manager.getHealthSummary(); - expect(summary.total).toBe(2); - expect(summary.closed).toBe(1); - expect(summary.open).toBe(1); - expect(summary.availableProviders).toContain('provider1'); - expect(summary.unavailableProviders).toContain('provider2'); - }); - }); - - describe('Global Operations', () => { - it('should reset all breakers', () => { - const breaker1 = manager.getBreaker('provider1'); - const breaker2 = manager.getBreaker('provider2'); - - breaker1.forceOpen('Test'); - breaker2.forceOpen('Test'); - - manager.resetAll(); - - expect(breaker1.getState()).toBe(CircuitState.CLOSED); - expect(breaker2.getState()).toBe(CircuitState.CLOSED); - }); - - it('should get all stats', async () => { - await manager.execute('provider1', () => Promise.resolve('success')); - await manager.execute('provider2', () => Promise.resolve('success')); - - const allStats = manager.getAllStats(); - expect(allStats.size).toBe(2); - expect(allStats.has('provider1')).toBe(true); - expect(allStats.has('provider2')).toBe(true); - }); - }); -}); - -describe('Circuit Breaker Integration', () => { - it('should handle rapid failures gracefully', async () => { - const breaker = new CircuitBreaker('rapid-test', { - failureThreshold: 3, - failureWindow: 1000, - cooldownPeriod: 100, - enableLogging: false - }); - - const failingFn = () => Promise.reject(new Error('Rapid failure')); - const promises: Promise[] = []; - - // Fire 10 rapid requests - for (let i = 0; i < 10; i++) { - promises.push( - breaker.execute(failingFn).catch(err => err.message) - ); - } - - const results = await Promise.all(promises); - - // First 3 should be actual failures - expect(results.slice(0, 3)).toEqual([ - 'Rapid failure', - 'Rapid failure', - 'Rapid failure' - ]); - - // Rest should be circuit open errors - const openErrors = results.slice(3).filter((msg: string) => - msg.includes('Circuit breaker is OPEN') - ); - expect(openErrors.length).toBeGreaterThan(0); - - breaker.dispose(); - }); - - it('should handle concurrent successes correctly', async () => { - const breaker = new CircuitBreaker('concurrent-test', { - failureThreshold: 3, - enableLogging: false - }); - - let counter = 0; - const successFn = () => { - counter++; - return Promise.resolve(counter); - }; - - const promises: Promise[] = []; - for (let i = 0; i < 5; i++) { - promises.push(breaker.execute(successFn)); - } - - const results = await Promise.all(promises); - expect(results).toEqual([1, 2, 3, 4, 5]); - expect(breaker.getState()).toBe(CircuitState.CLOSED); - - breaker.dispose(); - }); -}); \ No newline at end of file diff --git a/apps/server/src/services/llm/providers/circuit_breaker.ts b/apps/server/src/services/llm/providers/circuit_breaker.ts deleted file mode 100644 index 069264adf..000000000 --- a/apps/server/src/services/llm/providers/circuit_breaker.ts +++ /dev/null @@ -1,457 +0,0 @@ -/** - * Circuit Breaker Pattern Implementation for LLM Providers - * - * Implements a circuit breaker to prevent hammering failing providers. - * States: - * - CLOSED: Normal operation, requests pass through - * - OPEN: Provider is failing, requests are rejected immediately - * - HALF_OPEN: Testing if provider has recovered - */ - -import log from '../../log.js'; - -/** - * Circuit breaker states - */ -export enum CircuitState { - CLOSED = 'CLOSED', - OPEN = 'OPEN', - HALF_OPEN = 'HALF_OPEN' -} - -/** - * Circuit breaker configuration - */ -export interface CircuitBreakerConfig { - /** Number of failures before opening circuit */ - failureThreshold: number; - /** Time window for counting failures (ms) */ - failureWindow: number; - /** Cooldown period before attempting half-open (ms) */ - cooldownPeriod: number; - /** Number of successes in half-open to close circuit */ - successThreshold: number; - /** Request timeout for half-open state (ms) */ - halfOpenTimeout: number; - /** Whether to log state transitions */ - enableLogging: boolean; -} - -/** - * Circuit breaker statistics - */ -export interface CircuitBreakerStats { - state: CircuitState; - failures: number; - successes: number; - lastFailureTime?: Date; - lastSuccessTime?: Date; - lastStateChange: Date; - totalRequests: number; - rejectedRequests: number; - stateHistory: Array<{ - state: CircuitState; - timestamp: Date; - reason: string; - }>; -} - -/** - * Error type for circuit breaker rejections - */ -export class CircuitOpenError extends Error { - constructor(public readonly providerName: string, public readonly nextRetryTime: Date) { - super(`Circuit breaker is OPEN for provider ${providerName}. Will retry after ${nextRetryTime.toISOString()}`); - this.name = 'CircuitOpenError'; - } -} - -/** - * Circuit Breaker implementation - */ -export class CircuitBreaker { - private state: CircuitState = CircuitState.CLOSED; - private failures: number = 0; - private successes: number = 0; - private failureTimestamps: Date[] = []; - private lastStateChangeTime: Date = new Date(); - private cooldownTimer?: NodeJS.Timeout; - private stats: CircuitBreakerStats; - private readonly config: CircuitBreakerConfig; - - constructor( - private readonly name: string, - config?: Partial - ) { - this.config = { - failureThreshold: config?.failureThreshold ?? 5, - failureWindow: config?.failureWindow ?? 60000, // 1 minute - cooldownPeriod: config?.cooldownPeriod ?? 30000, // 30 seconds - successThreshold: config?.successThreshold ?? 2, - halfOpenTimeout: config?.halfOpenTimeout ?? 5000, // 5 seconds - enableLogging: config?.enableLogging ?? true - }; - - this.stats = { - state: this.state, - failures: 0, - successes: 0, - lastStateChange: this.lastStateChangeTime, - totalRequests: 0, - rejectedRequests: 0, - stateHistory: [] - }; - } - - /** - * Execute a function with circuit breaker protection - */ - public async execute( - fn: () => Promise, - timeout?: number - ): Promise { - this.stats.totalRequests++; - - // Check if circuit is open - if (this.state === CircuitState.OPEN) { - this.stats.rejectedRequests++; - const nextRetryTime = new Date(this.lastStateChangeTime.getTime() + this.config.cooldownPeriod); - throw new CircuitOpenError(this.name, nextRetryTime); - } - - // Apply timeout for half-open state - const executionTimeout = this.state === CircuitState.HALF_OPEN - ? this.config.halfOpenTimeout - : timeout; - - try { - const result = await this.executeWithTimeout(fn, executionTimeout); - this.onSuccess(); - return result; - } catch (error) { - this.onFailure(error); - throw error; - } - } - - /** - * Execute function with timeout - */ - private async executeWithTimeout( - fn: () => Promise, - timeout?: number - ): Promise { - if (!timeout) { - return fn(); - } - - return Promise.race([ - fn(), - new Promise((_, reject) => - setTimeout(() => reject(new Error(`Operation timed out after ${timeout}ms`)), timeout) - ) - ]); - } - - /** - * Handle successful execution - */ - private onSuccess(): void { - this.successes++; - this.stats.successes++; - this.stats.lastSuccessTime = new Date(); - - switch (this.state) { - case CircuitState.HALF_OPEN: - if (this.successes >= this.config.successThreshold) { - this.transitionTo(CircuitState.CLOSED, 'Success threshold reached'); - this.reset(); - } - break; - - case CircuitState.CLOSED: - // Clear old failure timestamps - this.cleanupFailureTimestamps(); - break; - } - } - - /** - * Handle failed execution - */ - private onFailure(error: any): void { - const now = new Date(); - this.failures++; - this.stats.failures++; - this.stats.lastFailureTime = now; - this.failureTimestamps.push(now); - - switch (this.state) { - case CircuitState.HALF_OPEN: - // Immediately open on failure in half-open state - this.transitionTo(CircuitState.OPEN, `Failure in HALF_OPEN state: ${error.message}`); - this.scheduleCooldown(); - break; - - case CircuitState.CLOSED: - // Check if we've exceeded failure threshold - this.cleanupFailureTimestamps(); - if (this.failureTimestamps.length >= this.config.failureThreshold) { - this.transitionTo(CircuitState.OPEN, `Failure threshold exceeded: ${this.failures} failures in ${this.config.failureWindow}ms`); - this.scheduleCooldown(); - } - break; - } - } - - /** - * Clean up old failure timestamps outside the window - */ - private cleanupFailureTimestamps(): void { - const now = Date.now(); - const windowStart = now - this.config.failureWindow; - this.failureTimestamps = this.failureTimestamps.filter( - timestamp => timestamp.getTime() > windowStart - ); - } - - /** - * Transition to a new state - */ - private transitionTo(newState: CircuitState, reason: string): void { - const oldState = this.state; - this.state = newState; - this.lastStateChangeTime = new Date(); - this.stats.state = newState; - this.stats.lastStateChange = this.lastStateChangeTime; - - // Add to state history - this.stats.stateHistory.push({ - state: newState, - timestamp: this.lastStateChangeTime, - reason - }); - - // Keep only last 100 state transitions - if (this.stats.stateHistory.length > 100) { - this.stats.stateHistory = this.stats.stateHistory.slice(-100); - } - - if (this.config.enableLogging) { - log.info(`[CircuitBreaker:${this.name}] State transition: ${oldState} -> ${newState}. Reason: ${reason}`); - } - } - - /** - * Schedule cooldown period - */ - private scheduleCooldown(): void { - if (this.cooldownTimer) { - clearTimeout(this.cooldownTimer); - } - - this.cooldownTimer = setTimeout(() => { - if (this.state === CircuitState.OPEN) { - this.transitionTo(CircuitState.HALF_OPEN, 'Cooldown period expired'); - this.successes = 0; // Reset success counter for half-open state - } - }, this.config.cooldownPeriod); - } - - /** - * Reset counters - */ - private reset(): void { - this.failures = 0; - this.successes = 0; - this.failureTimestamps = []; - - if (this.cooldownTimer) { - clearTimeout(this.cooldownTimer); - this.cooldownTimer = undefined; - } - } - - /** - * Get current state - */ - public getState(): CircuitState { - return this.state; - } - - /** - * Get statistics - */ - public getStats(): CircuitBreakerStats { - return { ...this.stats }; - } - - /** - * Force open the circuit (for testing or manual intervention) - */ - public forceOpen(reason: string = 'Manual intervention'): void { - this.transitionTo(CircuitState.OPEN, reason); - this.scheduleCooldown(); - } - - /** - * Force close the circuit (for testing or manual intervention) - */ - public forceClose(reason: string = 'Manual intervention'): void { - this.transitionTo(CircuitState.CLOSED, reason); - this.reset(); - } - - /** - * Check if circuit allows requests - */ - public isAvailable(): boolean { - return this.state !== CircuitState.OPEN; - } - - /** - * Get time until next retry (if circuit is open) - */ - public getNextRetryTime(): Date | null { - if (this.state !== CircuitState.OPEN) { - return null; - } - return new Date(this.lastStateChangeTime.getTime() + this.config.cooldownPeriod); - } - - /** - * Cleanup resources - */ - public dispose(): void { - if (this.cooldownTimer) { - clearTimeout(this.cooldownTimer); - this.cooldownTimer = undefined; - } - } -} - -/** - * Circuit Breaker Manager for managing multiple circuit breakers - */ -export class CircuitBreakerManager { - private static instance: CircuitBreakerManager | null = null; - private breakers: Map = new Map(); - private defaultConfig: Partial; - - constructor(defaultConfig?: Partial) { - this.defaultConfig = defaultConfig || {}; - } - - /** - * Get singleton instance - */ - public static getInstance(defaultConfig?: Partial): CircuitBreakerManager { - if (!CircuitBreakerManager.instance) { - CircuitBreakerManager.instance = new CircuitBreakerManager(defaultConfig); - } - return CircuitBreakerManager.instance; - } - - /** - * Get or create a circuit breaker for a provider - */ - public getBreaker( - providerName: string, - config?: Partial - ): CircuitBreaker { - if (!this.breakers.has(providerName)) { - const breakerConfig = { ...this.defaultConfig, ...config }; - this.breakers.set(providerName, new CircuitBreaker(providerName, breakerConfig)); - } - return this.breakers.get(providerName)!; - } - - /** - * Execute with circuit breaker protection - */ - public async execute( - providerName: string, - fn: () => Promise, - config?: Partial - ): Promise { - const breaker = this.getBreaker(providerName, config); - return breaker.execute(fn); - } - - /** - * Get all circuit breaker stats - */ - public getAllStats(): Map { - const stats = new Map(); - for (const [name, breaker] of this.breakers) { - stats.set(name, breaker.getStats()); - } - return stats; - } - - /** - * Get health summary - */ - public getHealthSummary(): { - total: number; - closed: number; - open: number; - halfOpen: number; - availableProviders: string[]; - unavailableProviders: string[]; - } { - const summary = { - total: this.breakers.size, - closed: 0, - open: 0, - halfOpen: 0, - availableProviders: [] as string[], - unavailableProviders: [] as string[] - }; - - for (const [name, breaker] of this.breakers) { - const state = breaker.getState(); - switch (state) { - case CircuitState.CLOSED: - summary.closed++; - summary.availableProviders.push(name); - break; - case CircuitState.OPEN: - summary.open++; - summary.unavailableProviders.push(name); - break; - case CircuitState.HALF_OPEN: - summary.halfOpen++; - summary.availableProviders.push(name); - break; - } - } - - return summary; - } - - /** - * Reset all circuit breakers - */ - public resetAll(): void { - for (const breaker of this.breakers.values()) { - breaker.forceClose('Global reset'); - } - } - - /** - * Dispose all circuit breakers - */ - public dispose(): void { - for (const breaker of this.breakers.values()) { - breaker.dispose(); - } - this.breakers.clear(); - CircuitBreakerManager.instance = null; - } -} - -// Export singleton getter -export const getCircuitBreakerManager = (config?: Partial): CircuitBreakerManager => { - return CircuitBreakerManager.getInstance(config); -}; \ No newline at end of file diff --git a/apps/server/src/services/llm/providers/provider_factory.ts b/apps/server/src/services/llm/providers/provider_factory.ts index b9f630eeb..c0432c915 100644 --- a/apps/server/src/services/llm/providers/provider_factory.ts +++ b/apps/server/src/services/llm/providers/provider_factory.ts @@ -22,11 +22,6 @@ import { getAnthropicOptions, getOllamaOptions } from './providers.js'; -import { - CircuitBreakerManager, - CircuitOpenError, - type CircuitBreakerConfig -} from './circuit_breaker.js'; import { MetricsExporter, ExportFormat, @@ -96,8 +91,6 @@ export interface ProviderFactoryOptions { enableCaching?: boolean; cacheTimeout?: number; enableMetrics?: boolean; - enableCircuitBreaker?: boolean; - circuitBreakerConfig?: Partial; metricsExporterConfig?: Partial; } @@ -128,7 +121,8 @@ export class ProviderFactory { private options: ProviderFactoryOptions; private healthCheckTimer?: NodeJS.Timeout; private disposed: boolean = false; - private circuitBreakerManager?: CircuitBreakerManager; + private retryCount: Map = new Map(); + private lastRetryTime: Map = new Map(); private metricsExporter?: MetricsExporter; constructor(options: ProviderFactoryOptions = {}) { @@ -140,19 +134,10 @@ export class ProviderFactory { enableCaching: options.enableCaching ?? true, cacheTimeout: options.cacheTimeout ?? 300000, // 5 minutes enableMetrics: options.enableMetrics ?? true, - enableCircuitBreaker: options.enableCircuitBreaker ?? true, - circuitBreakerConfig: options.circuitBreakerConfig, metricsExporterConfig: options.metricsExporterConfig }; this.initializeCapabilities(); - - // Initialize circuit breaker if enabled - if (this.options.enableCircuitBreaker) { - this.circuitBreakerManager = CircuitBreakerManager.getInstance( - this.options.circuitBreakerConfig - ); - } // Initialize metrics exporter if enabled if (this.options.enableMetrics) { @@ -292,7 +277,7 @@ export class ProviderFactory { } /** - * Instantiate a specific provider + * Instantiate a specific provider with retry and fallback logic */ private async instantiateProvider( type: ProviderType, @@ -300,56 +285,35 @@ export class ProviderFactory { options?: ChatCompletionOptions ): Promise { const startTime = Date.now(); + const maxRetries = 3; + const baseDelay = 1000; // 1 second try { - // Use circuit breaker if enabled - if (this.circuitBreakerManager) { - const breaker = this.circuitBreakerManager.getBreaker(type); - - // Check if circuit is open - if (!breaker.isAvailable()) { - const nextRetry = breaker.getNextRetryTime(); - log.info(`[ProviderFactory] Circuit breaker OPEN for ${type}. Next retry: ${nextRetry?.toISOString()}`); - - // Record metric - if (this.metricsExporter) { - this.metricsExporter.getCollector().recordError(type, 'Circuit breaker open'); - } - - // Try fallback immediately - if (this.options.enableFallback && this.options.fallbackProviders?.length) { - return this.tryFallbackProvider(options); - } - - throw new CircuitOpenError(type, nextRetry!); - } - - // Execute with circuit breaker protection - return await breaker.execute(async () => { - const service = await this.createProviderInternal(type, config, options); - - // Record success metric - if (this.metricsExporter && service) { - const latency = Date.now() - startTime; - this.metricsExporter.getCollector().recordLatency(type, latency); - this.metricsExporter.getCollector().recordRequest(type, true); - } - - return service; - }); - } else { - // No circuit breaker, create directly - const service = await this.createProviderInternal(type, config, options); - - // Record metrics - if (this.metricsExporter && service) { + // Try to create the provider + const service = await this.createProviderByType(type, config, options); + + if (service && service.isAvailable()) { + // Record success metric + if (this.metricsExporter) { const latency = Date.now() - startTime; this.metricsExporter.getCollector().recordLatency(type, latency); this.metricsExporter.getCollector().recordRequest(type, true); } + // Reset retry count on success + this.retryCount.delete(type); + this.lastRetryTime.delete(type); + return service; } + + // If not available, try fallback + if (this.options.enableFallback && this.options.fallbackProviders?.length) { + log.info(`[ProviderFactory] Provider ${type} not available, trying fallback`); + return this.tryFallbackProvider(options); + } + + return null; } catch (error: any) { log.error(`[ProviderFactory] Error creating ${type} provider: ${error.message}`); @@ -359,10 +323,24 @@ export class ProviderFactory { this.metricsExporter.getCollector().recordError(type, error.message); } - // Try fallback if enabled and not a circuit breaker error - if (!(error instanceof CircuitOpenError) && - this.options.enableFallback && - this.options.fallbackProviders?.length) { + // Simple exponential backoff for retries + if (this.shouldRetry(type, error, maxRetries)) { + const retryDelay = await this.getRetryDelay(type, baseDelay, error); + log.info(`[ProviderFactory] Retrying ${type} after ${retryDelay}ms`); + + await new Promise(resolve => setTimeout(resolve, retryDelay)); + + // Increment retry count + const currentRetries = this.retryCount.get(type) || 0; + this.retryCount.set(type, currentRetries + 1); + this.lastRetryTime.set(type, Date.now()); + + return this.instantiateProvider(type, config, options); + } + + // Try fallback on failure + if (this.options.enableFallback && this.options.fallbackProviders?.length) { + log.info(`[ProviderFactory] Max retries reached for ${type}, trying fallback`); return this.tryFallbackProvider(options); } @@ -371,9 +349,9 @@ export class ProviderFactory { } /** - * Internal provider creation logic + * Create provider by type */ - private async createProviderInternal( + private async createProviderByType( type: ProviderType, config?: Partial, options?: ChatCompletionOptions @@ -397,6 +375,65 @@ export class ProviderFactory { } } + /** + * Check if we should retry a failed request + */ + private shouldRetry(type: ProviderType, error: any, maxRetries: number): boolean { + const currentRetries = this.retryCount.get(type) || 0; + + if (currentRetries >= maxRetries) { + return false; + } + + // Check for retryable errors + if (error.status === 429) { // Rate limit + return true; + } + + if (error.status >= 500) { // Server errors + return true; + } + + if (error.code === 'ECONNREFUSED' || error.code === 'ETIMEDOUT') { + return true; + } + + return false; + } + + /** + * Calculate retry delay with exponential backoff + */ + private async getRetryDelay(type: ProviderType, baseDelay: number, error: any): Promise { + const currentRetries = this.retryCount.get(type) || 0; + + // Check for rate limit headers + if (error.status === 429 && error.headers) { + // Check for Retry-After header + const retryAfter = error.headers['retry-after']; + if (retryAfter) { + const delay = parseInt(retryAfter) * 1000; + return Math.min(delay, 60000); // Cap at 60 seconds + } + + // Check for X-RateLimit-Reset header + const resetTime = error.headers['x-ratelimit-reset']; + if (resetTime) { + const delay = Math.max(0, parseInt(resetTime) * 1000 - Date.now()); + return Math.min(delay, 60000); + } + } + + // Exponential backoff: baseDelay * (2 ^ retries) + const delay = baseDelay * Math.pow(2, currentRetries); + + // Add jitter to prevent thundering herd + const jitter = Math.random() * 0.3 * delay; + + return Math.min(delay + jitter, 30000); // Cap at 30 seconds + } + + /** * Create OpenAI provider */ @@ -469,7 +506,7 @@ export class ProviderFactory { for (const fallbackType of this.options.fallbackProviders) { try { log.info(`[ProviderFactory] Trying fallback provider: ${fallbackType}`); - const service = await this.instantiateProvider(fallbackType, undefined, options); + const service = await this.createProviderByType(fallbackType, undefined, options); if (service && service.isAvailable()) { log.info(`[ProviderFactory] Fallback to ${fallbackType} successful`); @@ -553,20 +590,14 @@ export class ProviderFactory { const startTime = Date.now(); try { - const service = await this.createProvider(type); - - // Try a simple completion to test the service - const testMessages = [{ role: 'user' as const, content: 'Hi' }]; - const response = await service.generateChatCompletion(testMessages, { - maxTokens: 1, - temperature: 0 - }); - + // Just try to create the provider and check if it's available + const service = await this.createProviderByType(type); + const isHealthy = service ? service.isAvailable() : false; const latency = Date.now() - startTime; const status: ProviderHealthStatus = { provider: type, - healthy: true, + healthy: isHealthy, lastChecked: new Date(), latency }; @@ -771,22 +802,6 @@ export class ProviderFactory { } } - /** - * Get circuit breaker status - */ - public getCircuitBreakerStatus(): any { - if (!this.circuitBreakerManager) { - return null; - } - - return { - summary: this.circuitBreakerManager.getHealthSummary(), - details: Array.from(this.circuitBreakerManager.getAllStats().entries()).map(([name, stats]) => ({ - provider: name, - ...stats - })) - }; - } /** * Get metrics summary @@ -821,18 +836,6 @@ export class ProviderFactory { return this.metricsExporter.export(exportFormat); } - /** - * Reset circuit breaker for a specific provider - */ - public resetCircuitBreaker(provider: ProviderType): void { - if (!this.circuitBreakerManager) { - return; - } - - const breaker = this.circuitBreakerManager.getBreaker(provider); - breaker.forceClose('Manual reset'); - log.info(`[ProviderFactory] Circuit breaker reset for ${provider}`); - } /** * Configure metrics export @@ -862,10 +865,6 @@ export class ProviderFactory { this.healthCheckTimer = undefined; } - // Dispose circuit breaker manager - if (this.circuitBreakerManager) { - this.circuitBreakerManager.dispose(); - } // Dispose metrics exporter if (this.metricsExporter) {