mirror of
https://github.com/zadam/trilium.git
synced 2025-12-05 06:54:23 +01:00
feat(llm): remove overly complex circuit breaker
This commit is contained in:
parent
16622f43e3
commit
d38ca72e08
@ -79,51 +79,7 @@ router.get('/llm/metrics/summary', (req: Request, res: Response) => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
/**
|
|
||||||
* GET /api/llm/circuit-breaker/status
|
|
||||||
* Returns circuit breaker status for all providers
|
|
||||||
*/
|
|
||||||
router.get('/llm/circuit-breaker/status', (req: Request, res: Response) => {
|
|
||||||
try {
|
|
||||||
const factory = getProviderFactory();
|
|
||||||
|
|
||||||
if (!factory) {
|
|
||||||
return res.status(503).json({ error: 'LLM service not initialized' });
|
|
||||||
}
|
|
||||||
|
|
||||||
const status = factory.getCircuitBreakerStatus();
|
|
||||||
|
|
||||||
if (!status) {
|
|
||||||
return res.status(503).json({ error: 'Circuit breaker not enabled' });
|
|
||||||
}
|
|
||||||
|
|
||||||
res.json(status);
|
|
||||||
} catch (error: any) {
|
|
||||||
log.error(`[LLM Metrics API] Error getting circuit breaker status: ${error.message}`);
|
|
||||||
res.status(500).json({ error: 'Internal server error' });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
/**
|
|
||||||
* POST /api/llm/circuit-breaker/reset/:provider
|
|
||||||
* Reset circuit breaker for a specific provider
|
|
||||||
*/
|
|
||||||
router.post('/llm/circuit-breaker/reset/:provider', (req: Request, res: Response) => {
|
|
||||||
try {
|
|
||||||
const { provider } = req.params;
|
|
||||||
const factory = getProviderFactory();
|
|
||||||
|
|
||||||
if (!factory) {
|
|
||||||
return res.status(503).json({ error: 'LLM service not initialized' });
|
|
||||||
}
|
|
||||||
|
|
||||||
factory.resetCircuitBreaker(provider as any);
|
|
||||||
res.json({ message: `Circuit breaker reset for provider: ${provider}` });
|
|
||||||
} catch (error: any) {
|
|
||||||
log.error(`[LLM Metrics API] Error resetting circuit breaker: ${error.message}`);
|
|
||||||
res.status(500).json({ error: 'Internal server error' });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* GET /api/llm/health
|
* GET /api/llm/health
|
||||||
@ -140,16 +96,28 @@ router.get('/llm/health', (req: Request, res: Response) => {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
const circuitStatus = factory.getCircuitBreakerStatus();
|
|
||||||
const metrics = factory.getMetricsSummary();
|
const metrics = factory.getMetricsSummary();
|
||||||
const statistics = factory.getStatistics();
|
const statistics = factory.getStatistics();
|
||||||
|
const healthStatuses = factory.getAllHealthStatuses();
|
||||||
|
|
||||||
|
// Get available/unavailable providers from health statuses
|
||||||
|
const available: string[] = [];
|
||||||
|
const unavailable: string[] = [];
|
||||||
|
|
||||||
|
for (const [provider, status] of healthStatuses) {
|
||||||
|
if (status.healthy) {
|
||||||
|
available.push(provider);
|
||||||
|
} else {
|
||||||
|
unavailable.push(provider);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const health = {
|
const health = {
|
||||||
status: 'healthy',
|
status: 'healthy',
|
||||||
timestamp: new Date().toISOString(),
|
timestamp: new Date().toISOString(),
|
||||||
providers: {
|
providers: {
|
||||||
available: circuitStatus?.summary?.availableProviders || [],
|
available,
|
||||||
unavailable: circuitStatus?.summary?.unavailableProviders || [],
|
unavailable,
|
||||||
cached: statistics?.cachedProviders || 0,
|
cached: statistics?.cachedProviders || 0,
|
||||||
healthy: statistics?.healthyProviders || 0,
|
healthy: statistics?.healthyProviders || 0,
|
||||||
unhealthy: statistics?.unhealthyProviders || 0
|
unhealthy: statistics?.unhealthyProviders || 0
|
||||||
@ -158,8 +126,7 @@ router.get('/llm/health', (req: Request, res: Response) => {
|
|||||||
totalRequests: metrics?.system?.totalRequests || 0,
|
totalRequests: metrics?.system?.totalRequests || 0,
|
||||||
totalFailures: metrics?.system?.totalFailures || 0,
|
totalFailures: metrics?.system?.totalFailures || 0,
|
||||||
uptime: metrics?.system?.uptime || 0
|
uptime: metrics?.system?.uptime || 0
|
||||||
},
|
}
|
||||||
circuitBreakers: circuitStatus?.summary || {}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Determine overall health
|
// Determine overall health
|
||||||
|
|||||||
@ -13,13 +13,6 @@ import { ExportFormat } from '../metrics/metrics_exporter.js';
|
|||||||
* LLM configuration options
|
* LLM configuration options
|
||||||
*/
|
*/
|
||||||
export interface LLMOptions {
|
export interface LLMOptions {
|
||||||
// Circuit Breaker Configuration
|
|
||||||
circuitBreakerEnabled: boolean;
|
|
||||||
circuitBreakerFailureThreshold: number;
|
|
||||||
circuitBreakerFailureWindow: number;
|
|
||||||
circuitBreakerCooldownPeriod: number;
|
|
||||||
circuitBreakerSuccessThreshold: number;
|
|
||||||
|
|
||||||
// Metrics Configuration
|
// Metrics Configuration
|
||||||
metricsEnabled: boolean;
|
metricsEnabled: boolean;
|
||||||
metricsExportFormat: ExportFormat;
|
metricsExportFormat: ExportFormat;
|
||||||
@ -43,13 +36,6 @@ export interface LLMOptions {
|
|||||||
* Default LLM options
|
* Default LLM options
|
||||||
*/
|
*/
|
||||||
const DEFAULT_OPTIONS: LLMOptions = {
|
const DEFAULT_OPTIONS: LLMOptions = {
|
||||||
// Circuit Breaker Defaults
|
|
||||||
circuitBreakerEnabled: true,
|
|
||||||
circuitBreakerFailureThreshold: 5,
|
|
||||||
circuitBreakerFailureWindow: 60000, // 1 minute
|
|
||||||
circuitBreakerCooldownPeriod: 30000, // 30 seconds
|
|
||||||
circuitBreakerSuccessThreshold: 2,
|
|
||||||
|
|
||||||
// Metrics Defaults
|
// Metrics Defaults
|
||||||
metricsEnabled: true,
|
metricsEnabled: true,
|
||||||
metricsExportFormat: 'prometheus' as ExportFormat,
|
metricsExportFormat: 'prometheus' as ExportFormat,
|
||||||
@ -70,13 +56,6 @@ const DEFAULT_OPTIONS: LLMOptions = {
|
|||||||
* Option keys in Trilium's option system
|
* Option keys in Trilium's option system
|
||||||
*/
|
*/
|
||||||
export const LLM_OPTION_KEYS = {
|
export const LLM_OPTION_KEYS = {
|
||||||
// Circuit Breaker
|
|
||||||
CIRCUIT_BREAKER_ENABLED: 'llmCircuitBreakerEnabled' as const,
|
|
||||||
CIRCUIT_BREAKER_FAILURE_THRESHOLD: 'llmCircuitBreakerFailureThreshold' as const,
|
|
||||||
CIRCUIT_BREAKER_FAILURE_WINDOW: 'llmCircuitBreakerFailureWindow' as const,
|
|
||||||
CIRCUIT_BREAKER_COOLDOWN_PERIOD: 'llmCircuitBreakerCooldownPeriod' as const,
|
|
||||||
CIRCUIT_BREAKER_SUCCESS_THRESHOLD: 'llmCircuitBreakerSuccessThreshold' as const,
|
|
||||||
|
|
||||||
// Metrics
|
// Metrics
|
||||||
METRICS_ENABLED: 'llmMetricsEnabled' as const,
|
METRICS_ENABLED: 'llmMetricsEnabled' as const,
|
||||||
METRICS_EXPORT_FORMAT: 'llmMetricsExportFormat' as const,
|
METRICS_EXPORT_FORMAT: 'llmMetricsExportFormat' as const,
|
||||||
@ -110,28 +89,6 @@ export function getLLMOptions(): LLMOptions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
// Circuit Breaker
|
|
||||||
circuitBreakerEnabled: getOptionSafe(
|
|
||||||
() => optionService.getOptionBool(LLM_OPTION_KEYS.CIRCUIT_BREAKER_ENABLED),
|
|
||||||
DEFAULT_OPTIONS.circuitBreakerEnabled
|
|
||||||
),
|
|
||||||
circuitBreakerFailureThreshold: getOptionSafe(
|
|
||||||
() => optionService.getOptionInt(LLM_OPTION_KEYS.CIRCUIT_BREAKER_FAILURE_THRESHOLD),
|
|
||||||
DEFAULT_OPTIONS.circuitBreakerFailureThreshold
|
|
||||||
),
|
|
||||||
circuitBreakerFailureWindow: getOptionSafe(
|
|
||||||
() => optionService.getOptionInt(LLM_OPTION_KEYS.CIRCUIT_BREAKER_FAILURE_WINDOW),
|
|
||||||
DEFAULT_OPTIONS.circuitBreakerFailureWindow
|
|
||||||
),
|
|
||||||
circuitBreakerCooldownPeriod: getOptionSafe(
|
|
||||||
() => optionService.getOptionInt(LLM_OPTION_KEYS.CIRCUIT_BREAKER_COOLDOWN_PERIOD),
|
|
||||||
DEFAULT_OPTIONS.circuitBreakerCooldownPeriod
|
|
||||||
),
|
|
||||||
circuitBreakerSuccessThreshold: getOptionSafe(
|
|
||||||
() => optionService.getOptionInt(LLM_OPTION_KEYS.CIRCUIT_BREAKER_SUCCESS_THRESHOLD),
|
|
||||||
DEFAULT_OPTIONS.circuitBreakerSuccessThreshold
|
|
||||||
),
|
|
||||||
|
|
||||||
// Metrics
|
// Metrics
|
||||||
metricsEnabled: getOptionSafe(
|
metricsEnabled: getOptionSafe(
|
||||||
() => optionService.getOptionBool(LLM_OPTION_KEYS.METRICS_ENABLED),
|
() => optionService.getOptionBool(LLM_OPTION_KEYS.METRICS_ENABLED),
|
||||||
@ -269,14 +226,6 @@ export function createProviderFactoryOptions() {
|
|||||||
enableCaching: options.providerCachingEnabled,
|
enableCaching: options.providerCachingEnabled,
|
||||||
cacheTimeout: options.providerCacheTimeout,
|
cacheTimeout: options.providerCacheTimeout,
|
||||||
enableMetrics: options.metricsEnabled,
|
enableMetrics: options.metricsEnabled,
|
||||||
enableCircuitBreaker: options.circuitBreakerEnabled,
|
|
||||||
circuitBreakerConfig: {
|
|
||||||
failureThreshold: options.circuitBreakerFailureThreshold,
|
|
||||||
failureWindow: options.circuitBreakerFailureWindow,
|
|
||||||
cooldownPeriod: options.circuitBreakerCooldownPeriod,
|
|
||||||
successThreshold: options.circuitBreakerSuccessThreshold,
|
|
||||||
enableLogging: true
|
|
||||||
},
|
|
||||||
metricsExporterConfig: {
|
metricsExporterConfig: {
|
||||||
enabled: options.metricsEnabled,
|
enabled: options.metricsEnabled,
|
||||||
format: options.metricsExportFormat,
|
format: options.metricsExportFormat,
|
||||||
|
|||||||
@ -1,319 +0,0 @@
|
|||||||
/**
|
|
||||||
* Circuit Breaker Tests
|
|
||||||
*/
|
|
||||||
|
|
||||||
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
|
|
||||||
import {
|
|
||||||
CircuitBreaker,
|
|
||||||
CircuitBreakerManager,
|
|
||||||
CircuitState,
|
|
||||||
CircuitOpenError
|
|
||||||
} from '../circuit_breaker.js';
|
|
||||||
|
|
||||||
describe('CircuitBreaker', () => {
|
|
||||||
let breaker: CircuitBreaker;
|
|
||||||
|
|
||||||
beforeEach(() => {
|
|
||||||
breaker = new CircuitBreaker('test-provider', {
|
|
||||||
failureThreshold: 3,
|
|
||||||
failureWindow: 1000,
|
|
||||||
cooldownPeriod: 500,
|
|
||||||
successThreshold: 2,
|
|
||||||
enableLogging: false
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
afterEach(() => {
|
|
||||||
breaker.dispose();
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('State Transitions', () => {
|
|
||||||
it('should start in CLOSED state', () => {
|
|
||||||
expect(breaker.getState()).toBe(CircuitState.CLOSED);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should open after failure threshold', async () => {
|
|
||||||
const failingFn = () => Promise.reject(new Error('Test failure'));
|
|
||||||
|
|
||||||
// First two failures - should remain closed
|
|
||||||
await expect(breaker.execute(failingFn)).rejects.toThrow('Test failure');
|
|
||||||
expect(breaker.getState()).toBe(CircuitState.CLOSED);
|
|
||||||
|
|
||||||
await expect(breaker.execute(failingFn)).rejects.toThrow('Test failure');
|
|
||||||
expect(breaker.getState()).toBe(CircuitState.CLOSED);
|
|
||||||
|
|
||||||
// Third failure - should open
|
|
||||||
await expect(breaker.execute(failingFn)).rejects.toThrow('Test failure');
|
|
||||||
expect(breaker.getState()).toBe(CircuitState.OPEN);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should reject requests when open', async () => {
|
|
||||||
// Force open
|
|
||||||
breaker.forceOpen('Test');
|
|
||||||
|
|
||||||
const fn = () => Promise.resolve('success');
|
|
||||||
await expect(breaker.execute(fn)).rejects.toThrow(CircuitOpenError);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should transition to HALF_OPEN after cooldown', async () => {
|
|
||||||
// Force open
|
|
||||||
breaker.forceOpen('Test');
|
|
||||||
expect(breaker.getState()).toBe(CircuitState.OPEN);
|
|
||||||
|
|
||||||
// Wait for cooldown
|
|
||||||
await new Promise(resolve => setTimeout(resolve, 600));
|
|
||||||
expect(breaker.getState()).toBe(CircuitState.HALF_OPEN);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should close after success threshold in HALF_OPEN', async () => {
|
|
||||||
// Force to half-open
|
|
||||||
breaker.forceOpen('Test');
|
|
||||||
await new Promise(resolve => setTimeout(resolve, 600));
|
|
||||||
expect(breaker.getState()).toBe(CircuitState.HALF_OPEN);
|
|
||||||
|
|
||||||
const successFn = () => Promise.resolve('success');
|
|
||||||
|
|
||||||
// First success
|
|
||||||
await breaker.execute(successFn);
|
|
||||||
expect(breaker.getState()).toBe(CircuitState.HALF_OPEN);
|
|
||||||
|
|
||||||
// Second success - should close
|
|
||||||
await breaker.execute(successFn);
|
|
||||||
expect(breaker.getState()).toBe(CircuitState.CLOSED);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should reopen on failure in HALF_OPEN', async () => {
|
|
||||||
// Force to half-open
|
|
||||||
breaker.forceOpen('Test');
|
|
||||||
await new Promise(resolve => setTimeout(resolve, 600));
|
|
||||||
expect(breaker.getState()).toBe(CircuitState.HALF_OPEN);
|
|
||||||
|
|
||||||
const failingFn = () => Promise.reject(new Error('Test failure'));
|
|
||||||
|
|
||||||
// Failure in half-open should immediately open
|
|
||||||
await expect(breaker.execute(failingFn)).rejects.toThrow('Test failure');
|
|
||||||
expect(breaker.getState()).toBe(CircuitState.OPEN);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('Failure Window', () => {
|
|
||||||
it('should reset failures outside window', async () => {
|
|
||||||
const failingFn = () => Promise.reject(new Error('Test failure'));
|
|
||||||
|
|
||||||
// Two failures
|
|
||||||
await expect(breaker.execute(failingFn)).rejects.toThrow();
|
|
||||||
await expect(breaker.execute(failingFn)).rejects.toThrow();
|
|
||||||
expect(breaker.getState()).toBe(CircuitState.CLOSED);
|
|
||||||
|
|
||||||
// Wait for window to expire
|
|
||||||
await new Promise(resolve => setTimeout(resolve, 1100));
|
|
||||||
|
|
||||||
// Two more failures - should still be closed (window reset)
|
|
||||||
await expect(breaker.execute(failingFn)).rejects.toThrow();
|
|
||||||
await expect(breaker.execute(failingFn)).rejects.toThrow();
|
|
||||||
expect(breaker.getState()).toBe(CircuitState.CLOSED);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('Statistics', () => {
|
|
||||||
it('should track statistics', async () => {
|
|
||||||
const successFn = () => Promise.resolve('success');
|
|
||||||
const failingFn = () => Promise.reject(new Error('Test failure'));
|
|
||||||
|
|
||||||
await breaker.execute(successFn);
|
|
||||||
await expect(breaker.execute(failingFn)).rejects.toThrow();
|
|
||||||
|
|
||||||
const stats = breaker.getStats();
|
|
||||||
expect(stats.totalRequests).toBe(2);
|
|
||||||
expect(stats.successes).toBe(1);
|
|
||||||
expect(stats.failures).toBe(1);
|
|
||||||
expect(stats.rejectedRequests).toBe(0);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should track rejected requests', async () => {
|
|
||||||
breaker.forceOpen('Test');
|
|
||||||
|
|
||||||
const fn = () => Promise.resolve('success');
|
|
||||||
await expect(breaker.execute(fn)).rejects.toThrow(CircuitOpenError);
|
|
||||||
await expect(breaker.execute(fn)).rejects.toThrow(CircuitOpenError);
|
|
||||||
|
|
||||||
const stats = breaker.getStats();
|
|
||||||
expect(stats.rejectedRequests).toBe(2);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should track state history', async () => {
|
|
||||||
breaker.forceOpen('Test open');
|
|
||||||
breaker.forceClose('Test close');
|
|
||||||
|
|
||||||
const stats = breaker.getStats();
|
|
||||||
expect(stats.stateHistory).toHaveLength(2);
|
|
||||||
expect(stats.stateHistory[0].state).toBe(CircuitState.OPEN);
|
|
||||||
expect(stats.stateHistory[1].state).toBe(CircuitState.CLOSED);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('Timeout', () => {
|
|
||||||
it('should apply timeout in HALF_OPEN state', async () => {
|
|
||||||
// Force to half-open
|
|
||||||
breaker.forceOpen('Test');
|
|
||||||
await new Promise(resolve => setTimeout(resolve, 600));
|
|
||||||
|
|
||||||
const slowFn = () => new Promise(resolve =>
|
|
||||||
setTimeout(() => resolve('success'), 10000)
|
|
||||||
);
|
|
||||||
|
|
||||||
// Should timeout (half-open timeout is 5000ms in config)
|
|
||||||
await expect(breaker.execute(slowFn)).rejects.toThrow(/timed out/);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('CircuitBreakerManager', () => {
|
|
||||||
let manager: CircuitBreakerManager;
|
|
||||||
|
|
||||||
beforeEach(() => {
|
|
||||||
manager = new CircuitBreakerManager({
|
|
||||||
failureThreshold: 2,
|
|
||||||
failureWindow: 1000,
|
|
||||||
cooldownPeriod: 500,
|
|
||||||
enableLogging: false
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
afterEach(() => {
|
|
||||||
manager.dispose();
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('Breaker Management', () => {
|
|
||||||
it('should create breakers on demand', () => {
|
|
||||||
const breaker1 = manager.getBreaker('provider1');
|
|
||||||
const breaker2 = manager.getBreaker('provider2');
|
|
||||||
|
|
||||||
expect(breaker1).toBeDefined();
|
|
||||||
expect(breaker2).toBeDefined();
|
|
||||||
expect(breaker1).not.toBe(breaker2);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should return same breaker for same provider', () => {
|
|
||||||
const breaker1 = manager.getBreaker('provider1');
|
|
||||||
const breaker2 = manager.getBreaker('provider1');
|
|
||||||
|
|
||||||
expect(breaker1).toBe(breaker2);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should execute with breaker protection', async () => {
|
|
||||||
const fn = () => Promise.resolve('success');
|
|
||||||
const result = await manager.execute('provider1', fn);
|
|
||||||
|
|
||||||
expect(result).toBe('success');
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('Health Summary', () => {
|
|
||||||
it('should provide health summary', async () => {
|
|
||||||
const failingFn = () => Promise.reject(new Error('Test'));
|
|
||||||
const successFn = () => Promise.resolve('success');
|
|
||||||
|
|
||||||
// Create some breakers in different states
|
|
||||||
await manager.execute('provider1', successFn);
|
|
||||||
|
|
||||||
// Force provider2 to open
|
|
||||||
const breaker2 = manager.getBreaker('provider2');
|
|
||||||
breaker2.forceOpen('Test');
|
|
||||||
|
|
||||||
const summary = manager.getHealthSummary();
|
|
||||||
expect(summary.total).toBe(2);
|
|
||||||
expect(summary.closed).toBe(1);
|
|
||||||
expect(summary.open).toBe(1);
|
|
||||||
expect(summary.availableProviders).toContain('provider1');
|
|
||||||
expect(summary.unavailableProviders).toContain('provider2');
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('Global Operations', () => {
|
|
||||||
it('should reset all breakers', () => {
|
|
||||||
const breaker1 = manager.getBreaker('provider1');
|
|
||||||
const breaker2 = manager.getBreaker('provider2');
|
|
||||||
|
|
||||||
breaker1.forceOpen('Test');
|
|
||||||
breaker2.forceOpen('Test');
|
|
||||||
|
|
||||||
manager.resetAll();
|
|
||||||
|
|
||||||
expect(breaker1.getState()).toBe(CircuitState.CLOSED);
|
|
||||||
expect(breaker2.getState()).toBe(CircuitState.CLOSED);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should get all stats', async () => {
|
|
||||||
await manager.execute('provider1', () => Promise.resolve('success'));
|
|
||||||
await manager.execute('provider2', () => Promise.resolve('success'));
|
|
||||||
|
|
||||||
const allStats = manager.getAllStats();
|
|
||||||
expect(allStats.size).toBe(2);
|
|
||||||
expect(allStats.has('provider1')).toBe(true);
|
|
||||||
expect(allStats.has('provider2')).toBe(true);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('Circuit Breaker Integration', () => {
|
|
||||||
it('should handle rapid failures gracefully', async () => {
|
|
||||||
const breaker = new CircuitBreaker('rapid-test', {
|
|
||||||
failureThreshold: 3,
|
|
||||||
failureWindow: 1000,
|
|
||||||
cooldownPeriod: 100,
|
|
||||||
enableLogging: false
|
|
||||||
});
|
|
||||||
|
|
||||||
const failingFn = () => Promise.reject(new Error('Rapid failure'));
|
|
||||||
const promises: Promise<any>[] = [];
|
|
||||||
|
|
||||||
// Fire 10 rapid requests
|
|
||||||
for (let i = 0; i < 10; i++) {
|
|
||||||
promises.push(
|
|
||||||
breaker.execute(failingFn).catch(err => err.message)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
const results = await Promise.all(promises);
|
|
||||||
|
|
||||||
// First 3 should be actual failures
|
|
||||||
expect(results.slice(0, 3)).toEqual([
|
|
||||||
'Rapid failure',
|
|
||||||
'Rapid failure',
|
|
||||||
'Rapid failure'
|
|
||||||
]);
|
|
||||||
|
|
||||||
// Rest should be circuit open errors
|
|
||||||
const openErrors = results.slice(3).filter((msg: string) =>
|
|
||||||
msg.includes('Circuit breaker is OPEN')
|
|
||||||
);
|
|
||||||
expect(openErrors.length).toBeGreaterThan(0);
|
|
||||||
|
|
||||||
breaker.dispose();
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should handle concurrent successes correctly', async () => {
|
|
||||||
const breaker = new CircuitBreaker('concurrent-test', {
|
|
||||||
failureThreshold: 3,
|
|
||||||
enableLogging: false
|
|
||||||
});
|
|
||||||
|
|
||||||
let counter = 0;
|
|
||||||
const successFn = () => {
|
|
||||||
counter++;
|
|
||||||
return Promise.resolve(counter);
|
|
||||||
};
|
|
||||||
|
|
||||||
const promises: Promise<number>[] = [];
|
|
||||||
for (let i = 0; i < 5; i++) {
|
|
||||||
promises.push(breaker.execute(successFn));
|
|
||||||
}
|
|
||||||
|
|
||||||
const results = await Promise.all(promises);
|
|
||||||
expect(results).toEqual([1, 2, 3, 4, 5]);
|
|
||||||
expect(breaker.getState()).toBe(CircuitState.CLOSED);
|
|
||||||
|
|
||||||
breaker.dispose();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
@ -1,457 +0,0 @@
|
|||||||
/**
|
|
||||||
* Circuit Breaker Pattern Implementation for LLM Providers
|
|
||||||
*
|
|
||||||
* Implements a circuit breaker to prevent hammering failing providers.
|
|
||||||
* States:
|
|
||||||
* - CLOSED: Normal operation, requests pass through
|
|
||||||
* - OPEN: Provider is failing, requests are rejected immediately
|
|
||||||
* - HALF_OPEN: Testing if provider has recovered
|
|
||||||
*/
|
|
||||||
|
|
||||||
import log from '../../log.js';
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Circuit breaker states
|
|
||||||
*/
|
|
||||||
export enum CircuitState {
|
|
||||||
CLOSED = 'CLOSED',
|
|
||||||
OPEN = 'OPEN',
|
|
||||||
HALF_OPEN = 'HALF_OPEN'
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Circuit breaker configuration
|
|
||||||
*/
|
|
||||||
export interface CircuitBreakerConfig {
|
|
||||||
/** Number of failures before opening circuit */
|
|
||||||
failureThreshold: number;
|
|
||||||
/** Time window for counting failures (ms) */
|
|
||||||
failureWindow: number;
|
|
||||||
/** Cooldown period before attempting half-open (ms) */
|
|
||||||
cooldownPeriod: number;
|
|
||||||
/** Number of successes in half-open to close circuit */
|
|
||||||
successThreshold: number;
|
|
||||||
/** Request timeout for half-open state (ms) */
|
|
||||||
halfOpenTimeout: number;
|
|
||||||
/** Whether to log state transitions */
|
|
||||||
enableLogging: boolean;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Circuit breaker statistics
|
|
||||||
*/
|
|
||||||
export interface CircuitBreakerStats {
|
|
||||||
state: CircuitState;
|
|
||||||
failures: number;
|
|
||||||
successes: number;
|
|
||||||
lastFailureTime?: Date;
|
|
||||||
lastSuccessTime?: Date;
|
|
||||||
lastStateChange: Date;
|
|
||||||
totalRequests: number;
|
|
||||||
rejectedRequests: number;
|
|
||||||
stateHistory: Array<{
|
|
||||||
state: CircuitState;
|
|
||||||
timestamp: Date;
|
|
||||||
reason: string;
|
|
||||||
}>;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Error type for circuit breaker rejections
|
|
||||||
*/
|
|
||||||
export class CircuitOpenError extends Error {
|
|
||||||
constructor(public readonly providerName: string, public readonly nextRetryTime: Date) {
|
|
||||||
super(`Circuit breaker is OPEN for provider ${providerName}. Will retry after ${nextRetryTime.toISOString()}`);
|
|
||||||
this.name = 'CircuitOpenError';
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Circuit Breaker implementation
|
|
||||||
*/
|
|
||||||
export class CircuitBreaker {
|
|
||||||
private state: CircuitState = CircuitState.CLOSED;
|
|
||||||
private failures: number = 0;
|
|
||||||
private successes: number = 0;
|
|
||||||
private failureTimestamps: Date[] = [];
|
|
||||||
private lastStateChangeTime: Date = new Date();
|
|
||||||
private cooldownTimer?: NodeJS.Timeout;
|
|
||||||
private stats: CircuitBreakerStats;
|
|
||||||
private readonly config: CircuitBreakerConfig;
|
|
||||||
|
|
||||||
constructor(
|
|
||||||
private readonly name: string,
|
|
||||||
config?: Partial<CircuitBreakerConfig>
|
|
||||||
) {
|
|
||||||
this.config = {
|
|
||||||
failureThreshold: config?.failureThreshold ?? 5,
|
|
||||||
failureWindow: config?.failureWindow ?? 60000, // 1 minute
|
|
||||||
cooldownPeriod: config?.cooldownPeriod ?? 30000, // 30 seconds
|
|
||||||
successThreshold: config?.successThreshold ?? 2,
|
|
||||||
halfOpenTimeout: config?.halfOpenTimeout ?? 5000, // 5 seconds
|
|
||||||
enableLogging: config?.enableLogging ?? true
|
|
||||||
};
|
|
||||||
|
|
||||||
this.stats = {
|
|
||||||
state: this.state,
|
|
||||||
failures: 0,
|
|
||||||
successes: 0,
|
|
||||||
lastStateChange: this.lastStateChangeTime,
|
|
||||||
totalRequests: 0,
|
|
||||||
rejectedRequests: 0,
|
|
||||||
stateHistory: []
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Execute a function with circuit breaker protection
|
|
||||||
*/
|
|
||||||
public async execute<T>(
|
|
||||||
fn: () => Promise<T>,
|
|
||||||
timeout?: number
|
|
||||||
): Promise<T> {
|
|
||||||
this.stats.totalRequests++;
|
|
||||||
|
|
||||||
// Check if circuit is open
|
|
||||||
if (this.state === CircuitState.OPEN) {
|
|
||||||
this.stats.rejectedRequests++;
|
|
||||||
const nextRetryTime = new Date(this.lastStateChangeTime.getTime() + this.config.cooldownPeriod);
|
|
||||||
throw new CircuitOpenError(this.name, nextRetryTime);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply timeout for half-open state
|
|
||||||
const executionTimeout = this.state === CircuitState.HALF_OPEN
|
|
||||||
? this.config.halfOpenTimeout
|
|
||||||
: timeout;
|
|
||||||
|
|
||||||
try {
|
|
||||||
const result = await this.executeWithTimeout(fn, executionTimeout);
|
|
||||||
this.onSuccess();
|
|
||||||
return result;
|
|
||||||
} catch (error) {
|
|
||||||
this.onFailure(error);
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Execute function with timeout
|
|
||||||
*/
|
|
||||||
private async executeWithTimeout<T>(
|
|
||||||
fn: () => Promise<T>,
|
|
||||||
timeout?: number
|
|
||||||
): Promise<T> {
|
|
||||||
if (!timeout) {
|
|
||||||
return fn();
|
|
||||||
}
|
|
||||||
|
|
||||||
return Promise.race([
|
|
||||||
fn(),
|
|
||||||
new Promise<T>((_, reject) =>
|
|
||||||
setTimeout(() => reject(new Error(`Operation timed out after ${timeout}ms`)), timeout)
|
|
||||||
)
|
|
||||||
]);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Handle successful execution
|
|
||||||
*/
|
|
||||||
private onSuccess(): void {
|
|
||||||
this.successes++;
|
|
||||||
this.stats.successes++;
|
|
||||||
this.stats.lastSuccessTime = new Date();
|
|
||||||
|
|
||||||
switch (this.state) {
|
|
||||||
case CircuitState.HALF_OPEN:
|
|
||||||
if (this.successes >= this.config.successThreshold) {
|
|
||||||
this.transitionTo(CircuitState.CLOSED, 'Success threshold reached');
|
|
||||||
this.reset();
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case CircuitState.CLOSED:
|
|
||||||
// Clear old failure timestamps
|
|
||||||
this.cleanupFailureTimestamps();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Handle failed execution
|
|
||||||
*/
|
|
||||||
private onFailure(error: any): void {
|
|
||||||
const now = new Date();
|
|
||||||
this.failures++;
|
|
||||||
this.stats.failures++;
|
|
||||||
this.stats.lastFailureTime = now;
|
|
||||||
this.failureTimestamps.push(now);
|
|
||||||
|
|
||||||
switch (this.state) {
|
|
||||||
case CircuitState.HALF_OPEN:
|
|
||||||
// Immediately open on failure in half-open state
|
|
||||||
this.transitionTo(CircuitState.OPEN, `Failure in HALF_OPEN state: ${error.message}`);
|
|
||||||
this.scheduleCooldown();
|
|
||||||
break;
|
|
||||||
|
|
||||||
case CircuitState.CLOSED:
|
|
||||||
// Check if we've exceeded failure threshold
|
|
||||||
this.cleanupFailureTimestamps();
|
|
||||||
if (this.failureTimestamps.length >= this.config.failureThreshold) {
|
|
||||||
this.transitionTo(CircuitState.OPEN, `Failure threshold exceeded: ${this.failures} failures in ${this.config.failureWindow}ms`);
|
|
||||||
this.scheduleCooldown();
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Clean up old failure timestamps outside the window
|
|
||||||
*/
|
|
||||||
private cleanupFailureTimestamps(): void {
|
|
||||||
const now = Date.now();
|
|
||||||
const windowStart = now - this.config.failureWindow;
|
|
||||||
this.failureTimestamps = this.failureTimestamps.filter(
|
|
||||||
timestamp => timestamp.getTime() > windowStart
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Transition to a new state
|
|
||||||
*/
|
|
||||||
private transitionTo(newState: CircuitState, reason: string): void {
|
|
||||||
const oldState = this.state;
|
|
||||||
this.state = newState;
|
|
||||||
this.lastStateChangeTime = new Date();
|
|
||||||
this.stats.state = newState;
|
|
||||||
this.stats.lastStateChange = this.lastStateChangeTime;
|
|
||||||
|
|
||||||
// Add to state history
|
|
||||||
this.stats.stateHistory.push({
|
|
||||||
state: newState,
|
|
||||||
timestamp: this.lastStateChangeTime,
|
|
||||||
reason
|
|
||||||
});
|
|
||||||
|
|
||||||
// Keep only last 100 state transitions
|
|
||||||
if (this.stats.stateHistory.length > 100) {
|
|
||||||
this.stats.stateHistory = this.stats.stateHistory.slice(-100);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this.config.enableLogging) {
|
|
||||||
log.info(`[CircuitBreaker:${this.name}] State transition: ${oldState} -> ${newState}. Reason: ${reason}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Schedule cooldown period
|
|
||||||
*/
|
|
||||||
private scheduleCooldown(): void {
|
|
||||||
if (this.cooldownTimer) {
|
|
||||||
clearTimeout(this.cooldownTimer);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.cooldownTimer = setTimeout(() => {
|
|
||||||
if (this.state === CircuitState.OPEN) {
|
|
||||||
this.transitionTo(CircuitState.HALF_OPEN, 'Cooldown period expired');
|
|
||||||
this.successes = 0; // Reset success counter for half-open state
|
|
||||||
}
|
|
||||||
}, this.config.cooldownPeriod);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Reset counters
|
|
||||||
*/
|
|
||||||
private reset(): void {
|
|
||||||
this.failures = 0;
|
|
||||||
this.successes = 0;
|
|
||||||
this.failureTimestamps = [];
|
|
||||||
|
|
||||||
if (this.cooldownTimer) {
|
|
||||||
clearTimeout(this.cooldownTimer);
|
|
||||||
this.cooldownTimer = undefined;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get current state
|
|
||||||
*/
|
|
||||||
public getState(): CircuitState {
|
|
||||||
return this.state;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get statistics
|
|
||||||
*/
|
|
||||||
public getStats(): CircuitBreakerStats {
|
|
||||||
return { ...this.stats };
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Force open the circuit (for testing or manual intervention)
|
|
||||||
*/
|
|
||||||
public forceOpen(reason: string = 'Manual intervention'): void {
|
|
||||||
this.transitionTo(CircuitState.OPEN, reason);
|
|
||||||
this.scheduleCooldown();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Force close the circuit (for testing or manual intervention)
|
|
||||||
*/
|
|
||||||
public forceClose(reason: string = 'Manual intervention'): void {
|
|
||||||
this.transitionTo(CircuitState.CLOSED, reason);
|
|
||||||
this.reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if circuit allows requests
|
|
||||||
*/
|
|
||||||
public isAvailable(): boolean {
|
|
||||||
return this.state !== CircuitState.OPEN;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get time until next retry (if circuit is open)
|
|
||||||
*/
|
|
||||||
public getNextRetryTime(): Date | null {
|
|
||||||
if (this.state !== CircuitState.OPEN) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return new Date(this.lastStateChangeTime.getTime() + this.config.cooldownPeriod);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Cleanup resources
|
|
||||||
*/
|
|
||||||
public dispose(): void {
|
|
||||||
if (this.cooldownTimer) {
|
|
||||||
clearTimeout(this.cooldownTimer);
|
|
||||||
this.cooldownTimer = undefined;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Circuit Breaker Manager for managing multiple circuit breakers
|
|
||||||
*/
|
|
||||||
export class CircuitBreakerManager {
|
|
||||||
private static instance: CircuitBreakerManager | null = null;
|
|
||||||
private breakers: Map<string, CircuitBreaker> = new Map();
|
|
||||||
private defaultConfig: Partial<CircuitBreakerConfig>;
|
|
||||||
|
|
||||||
constructor(defaultConfig?: Partial<CircuitBreakerConfig>) {
|
|
||||||
this.defaultConfig = defaultConfig || {};
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get singleton instance
|
|
||||||
*/
|
|
||||||
public static getInstance(defaultConfig?: Partial<CircuitBreakerConfig>): CircuitBreakerManager {
|
|
||||||
if (!CircuitBreakerManager.instance) {
|
|
||||||
CircuitBreakerManager.instance = new CircuitBreakerManager(defaultConfig);
|
|
||||||
}
|
|
||||||
return CircuitBreakerManager.instance;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get or create a circuit breaker for a provider
|
|
||||||
*/
|
|
||||||
public getBreaker(
|
|
||||||
providerName: string,
|
|
||||||
config?: Partial<CircuitBreakerConfig>
|
|
||||||
): CircuitBreaker {
|
|
||||||
if (!this.breakers.has(providerName)) {
|
|
||||||
const breakerConfig = { ...this.defaultConfig, ...config };
|
|
||||||
this.breakers.set(providerName, new CircuitBreaker(providerName, breakerConfig));
|
|
||||||
}
|
|
||||||
return this.breakers.get(providerName)!;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Execute with circuit breaker protection
|
|
||||||
*/
|
|
||||||
public async execute<T>(
|
|
||||||
providerName: string,
|
|
||||||
fn: () => Promise<T>,
|
|
||||||
config?: Partial<CircuitBreakerConfig>
|
|
||||||
): Promise<T> {
|
|
||||||
const breaker = this.getBreaker(providerName, config);
|
|
||||||
return breaker.execute(fn);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get all circuit breaker stats
|
|
||||||
*/
|
|
||||||
public getAllStats(): Map<string, CircuitBreakerStats> {
|
|
||||||
const stats = new Map<string, CircuitBreakerStats>();
|
|
||||||
for (const [name, breaker] of this.breakers) {
|
|
||||||
stats.set(name, breaker.getStats());
|
|
||||||
}
|
|
||||||
return stats;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get health summary
|
|
||||||
*/
|
|
||||||
public getHealthSummary(): {
|
|
||||||
total: number;
|
|
||||||
closed: number;
|
|
||||||
open: number;
|
|
||||||
halfOpen: number;
|
|
||||||
availableProviders: string[];
|
|
||||||
unavailableProviders: string[];
|
|
||||||
} {
|
|
||||||
const summary = {
|
|
||||||
total: this.breakers.size,
|
|
||||||
closed: 0,
|
|
||||||
open: 0,
|
|
||||||
halfOpen: 0,
|
|
||||||
availableProviders: [] as string[],
|
|
||||||
unavailableProviders: [] as string[]
|
|
||||||
};
|
|
||||||
|
|
||||||
for (const [name, breaker] of this.breakers) {
|
|
||||||
const state = breaker.getState();
|
|
||||||
switch (state) {
|
|
||||||
case CircuitState.CLOSED:
|
|
||||||
summary.closed++;
|
|
||||||
summary.availableProviders.push(name);
|
|
||||||
break;
|
|
||||||
case CircuitState.OPEN:
|
|
||||||
summary.open++;
|
|
||||||
summary.unavailableProviders.push(name);
|
|
||||||
break;
|
|
||||||
case CircuitState.HALF_OPEN:
|
|
||||||
summary.halfOpen++;
|
|
||||||
summary.availableProviders.push(name);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return summary;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Reset all circuit breakers
|
|
||||||
*/
|
|
||||||
public resetAll(): void {
|
|
||||||
for (const breaker of this.breakers.values()) {
|
|
||||||
breaker.forceClose('Global reset');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Dispose all circuit breakers
|
|
||||||
*/
|
|
||||||
public dispose(): void {
|
|
||||||
for (const breaker of this.breakers.values()) {
|
|
||||||
breaker.dispose();
|
|
||||||
}
|
|
||||||
this.breakers.clear();
|
|
||||||
CircuitBreakerManager.instance = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Export singleton getter
|
|
||||||
export const getCircuitBreakerManager = (config?: Partial<CircuitBreakerConfig>): CircuitBreakerManager => {
|
|
||||||
return CircuitBreakerManager.getInstance(config);
|
|
||||||
};
|
|
||||||
@ -22,11 +22,6 @@ import {
|
|||||||
getAnthropicOptions,
|
getAnthropicOptions,
|
||||||
getOllamaOptions
|
getOllamaOptions
|
||||||
} from './providers.js';
|
} from './providers.js';
|
||||||
import {
|
|
||||||
CircuitBreakerManager,
|
|
||||||
CircuitOpenError,
|
|
||||||
type CircuitBreakerConfig
|
|
||||||
} from './circuit_breaker.js';
|
|
||||||
import {
|
import {
|
||||||
MetricsExporter,
|
MetricsExporter,
|
||||||
ExportFormat,
|
ExportFormat,
|
||||||
@ -96,8 +91,6 @@ export interface ProviderFactoryOptions {
|
|||||||
enableCaching?: boolean;
|
enableCaching?: boolean;
|
||||||
cacheTimeout?: number;
|
cacheTimeout?: number;
|
||||||
enableMetrics?: boolean;
|
enableMetrics?: boolean;
|
||||||
enableCircuitBreaker?: boolean;
|
|
||||||
circuitBreakerConfig?: Partial<CircuitBreakerConfig>;
|
|
||||||
metricsExporterConfig?: Partial<ExporterConfig>;
|
metricsExporterConfig?: Partial<ExporterConfig>;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -128,7 +121,8 @@ export class ProviderFactory {
|
|||||||
private options: ProviderFactoryOptions;
|
private options: ProviderFactoryOptions;
|
||||||
private healthCheckTimer?: NodeJS.Timeout;
|
private healthCheckTimer?: NodeJS.Timeout;
|
||||||
private disposed: boolean = false;
|
private disposed: boolean = false;
|
||||||
private circuitBreakerManager?: CircuitBreakerManager;
|
private retryCount: Map<string, number> = new Map();
|
||||||
|
private lastRetryTime: Map<string, number> = new Map();
|
||||||
private metricsExporter?: MetricsExporter;
|
private metricsExporter?: MetricsExporter;
|
||||||
|
|
||||||
constructor(options: ProviderFactoryOptions = {}) {
|
constructor(options: ProviderFactoryOptions = {}) {
|
||||||
@ -140,20 +134,11 @@ export class ProviderFactory {
|
|||||||
enableCaching: options.enableCaching ?? true,
|
enableCaching: options.enableCaching ?? true,
|
||||||
cacheTimeout: options.cacheTimeout ?? 300000, // 5 minutes
|
cacheTimeout: options.cacheTimeout ?? 300000, // 5 minutes
|
||||||
enableMetrics: options.enableMetrics ?? true,
|
enableMetrics: options.enableMetrics ?? true,
|
||||||
enableCircuitBreaker: options.enableCircuitBreaker ?? true,
|
|
||||||
circuitBreakerConfig: options.circuitBreakerConfig,
|
|
||||||
metricsExporterConfig: options.metricsExporterConfig
|
metricsExporterConfig: options.metricsExporterConfig
|
||||||
};
|
};
|
||||||
|
|
||||||
this.initializeCapabilities();
|
this.initializeCapabilities();
|
||||||
|
|
||||||
// Initialize circuit breaker if enabled
|
|
||||||
if (this.options.enableCircuitBreaker) {
|
|
||||||
this.circuitBreakerManager = CircuitBreakerManager.getInstance(
|
|
||||||
this.options.circuitBreakerConfig
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize metrics exporter if enabled
|
// Initialize metrics exporter if enabled
|
||||||
if (this.options.enableMetrics) {
|
if (this.options.enableMetrics) {
|
||||||
this.metricsExporter = MetricsExporter.getInstance({
|
this.metricsExporter = MetricsExporter.getInstance({
|
||||||
@ -292,7 +277,7 @@ export class ProviderFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Instantiate a specific provider
|
* Instantiate a specific provider with retry and fallback logic
|
||||||
*/
|
*/
|
||||||
private async instantiateProvider(
|
private async instantiateProvider(
|
||||||
type: ProviderType,
|
type: ProviderType,
|
||||||
@ -300,56 +285,35 @@ export class ProviderFactory {
|
|||||||
options?: ChatCompletionOptions
|
options?: ChatCompletionOptions
|
||||||
): Promise<AIService | null> {
|
): Promise<AIService | null> {
|
||||||
const startTime = Date.now();
|
const startTime = Date.now();
|
||||||
|
const maxRetries = 3;
|
||||||
|
const baseDelay = 1000; // 1 second
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Use circuit breaker if enabled
|
// Try to create the provider
|
||||||
if (this.circuitBreakerManager) {
|
const service = await this.createProviderByType(type, config, options);
|
||||||
const breaker = this.circuitBreakerManager.getBreaker(type);
|
|
||||||
|
|
||||||
// Check if circuit is open
|
if (service && service.isAvailable()) {
|
||||||
if (!breaker.isAvailable()) {
|
// Record success metric
|
||||||
const nextRetry = breaker.getNextRetryTime();
|
|
||||||
log.info(`[ProviderFactory] Circuit breaker OPEN for ${type}. Next retry: ${nextRetry?.toISOString()}`);
|
|
||||||
|
|
||||||
// Record metric
|
|
||||||
if (this.metricsExporter) {
|
if (this.metricsExporter) {
|
||||||
this.metricsExporter.getCollector().recordError(type, 'Circuit breaker open');
|
const latency = Date.now() - startTime;
|
||||||
|
this.metricsExporter.getCollector().recordLatency(type, latency);
|
||||||
|
this.metricsExporter.getCollector().recordRequest(type, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try fallback immediately
|
// Reset retry count on success
|
||||||
|
this.retryCount.delete(type);
|
||||||
|
this.lastRetryTime.delete(type);
|
||||||
|
|
||||||
|
return service;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If not available, try fallback
|
||||||
if (this.options.enableFallback && this.options.fallbackProviders?.length) {
|
if (this.options.enableFallback && this.options.fallbackProviders?.length) {
|
||||||
|
log.info(`[ProviderFactory] Provider ${type} not available, trying fallback`);
|
||||||
return this.tryFallbackProvider(options);
|
return this.tryFallbackProvider(options);
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new CircuitOpenError(type, nextRetry!);
|
return null;
|
||||||
}
|
|
||||||
|
|
||||||
// Execute with circuit breaker protection
|
|
||||||
return await breaker.execute(async () => {
|
|
||||||
const service = await this.createProviderInternal(type, config, options);
|
|
||||||
|
|
||||||
// Record success metric
|
|
||||||
if (this.metricsExporter && service) {
|
|
||||||
const latency = Date.now() - startTime;
|
|
||||||
this.metricsExporter.getCollector().recordLatency(type, latency);
|
|
||||||
this.metricsExporter.getCollector().recordRequest(type, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
return service;
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
// No circuit breaker, create directly
|
|
||||||
const service = await this.createProviderInternal(type, config, options);
|
|
||||||
|
|
||||||
// Record metrics
|
|
||||||
if (this.metricsExporter && service) {
|
|
||||||
const latency = Date.now() - startTime;
|
|
||||||
this.metricsExporter.getCollector().recordLatency(type, latency);
|
|
||||||
this.metricsExporter.getCollector().recordRequest(type, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
return service;
|
|
||||||
}
|
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
log.error(`[ProviderFactory] Error creating ${type} provider: ${error.message}`);
|
log.error(`[ProviderFactory] Error creating ${type} provider: ${error.message}`);
|
||||||
|
|
||||||
@ -359,10 +323,24 @@ export class ProviderFactory {
|
|||||||
this.metricsExporter.getCollector().recordError(type, error.message);
|
this.metricsExporter.getCollector().recordError(type, error.message);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try fallback if enabled and not a circuit breaker error
|
// Simple exponential backoff for retries
|
||||||
if (!(error instanceof CircuitOpenError) &&
|
if (this.shouldRetry(type, error, maxRetries)) {
|
||||||
this.options.enableFallback &&
|
const retryDelay = await this.getRetryDelay(type, baseDelay, error);
|
||||||
this.options.fallbackProviders?.length) {
|
log.info(`[ProviderFactory] Retrying ${type} after ${retryDelay}ms`);
|
||||||
|
|
||||||
|
await new Promise(resolve => setTimeout(resolve, retryDelay));
|
||||||
|
|
||||||
|
// Increment retry count
|
||||||
|
const currentRetries = this.retryCount.get(type) || 0;
|
||||||
|
this.retryCount.set(type, currentRetries + 1);
|
||||||
|
this.lastRetryTime.set(type, Date.now());
|
||||||
|
|
||||||
|
return this.instantiateProvider(type, config, options);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try fallback on failure
|
||||||
|
if (this.options.enableFallback && this.options.fallbackProviders?.length) {
|
||||||
|
log.info(`[ProviderFactory] Max retries reached for ${type}, trying fallback`);
|
||||||
return this.tryFallbackProvider(options);
|
return this.tryFallbackProvider(options);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -371,9 +349,9 @@ export class ProviderFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Internal provider creation logic
|
* Create provider by type
|
||||||
*/
|
*/
|
||||||
private async createProviderInternal(
|
private async createProviderByType(
|
||||||
type: ProviderType,
|
type: ProviderType,
|
||||||
config?: Partial<ProviderConfig>,
|
config?: Partial<ProviderConfig>,
|
||||||
options?: ChatCompletionOptions
|
options?: ChatCompletionOptions
|
||||||
@ -397,6 +375,65 @@ export class ProviderFactory {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if we should retry a failed request
|
||||||
|
*/
|
||||||
|
private shouldRetry(type: ProviderType, error: any, maxRetries: number): boolean {
|
||||||
|
const currentRetries = this.retryCount.get(type) || 0;
|
||||||
|
|
||||||
|
if (currentRetries >= maxRetries) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for retryable errors
|
||||||
|
if (error.status === 429) { // Rate limit
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (error.status >= 500) { // Server errors
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (error.code === 'ECONNREFUSED' || error.code === 'ETIMEDOUT') {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculate retry delay with exponential backoff
|
||||||
|
*/
|
||||||
|
private async getRetryDelay(type: ProviderType, baseDelay: number, error: any): Promise<number> {
|
||||||
|
const currentRetries = this.retryCount.get(type) || 0;
|
||||||
|
|
||||||
|
// Check for rate limit headers
|
||||||
|
if (error.status === 429 && error.headers) {
|
||||||
|
// Check for Retry-After header
|
||||||
|
const retryAfter = error.headers['retry-after'];
|
||||||
|
if (retryAfter) {
|
||||||
|
const delay = parseInt(retryAfter) * 1000;
|
||||||
|
return Math.min(delay, 60000); // Cap at 60 seconds
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for X-RateLimit-Reset header
|
||||||
|
const resetTime = error.headers['x-ratelimit-reset'];
|
||||||
|
if (resetTime) {
|
||||||
|
const delay = Math.max(0, parseInt(resetTime) * 1000 - Date.now());
|
||||||
|
return Math.min(delay, 60000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exponential backoff: baseDelay * (2 ^ retries)
|
||||||
|
const delay = baseDelay * Math.pow(2, currentRetries);
|
||||||
|
|
||||||
|
// Add jitter to prevent thundering herd
|
||||||
|
const jitter = Math.random() * 0.3 * delay;
|
||||||
|
|
||||||
|
return Math.min(delay + jitter, 30000); // Cap at 30 seconds
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create OpenAI provider
|
* Create OpenAI provider
|
||||||
*/
|
*/
|
||||||
@ -469,7 +506,7 @@ export class ProviderFactory {
|
|||||||
for (const fallbackType of this.options.fallbackProviders) {
|
for (const fallbackType of this.options.fallbackProviders) {
|
||||||
try {
|
try {
|
||||||
log.info(`[ProviderFactory] Trying fallback provider: ${fallbackType}`);
|
log.info(`[ProviderFactory] Trying fallback provider: ${fallbackType}`);
|
||||||
const service = await this.instantiateProvider(fallbackType, undefined, options);
|
const service = await this.createProviderByType(fallbackType, undefined, options);
|
||||||
|
|
||||||
if (service && service.isAvailable()) {
|
if (service && service.isAvailable()) {
|
||||||
log.info(`[ProviderFactory] Fallback to ${fallbackType} successful`);
|
log.info(`[ProviderFactory] Fallback to ${fallbackType} successful`);
|
||||||
@ -553,20 +590,14 @@ export class ProviderFactory {
|
|||||||
const startTime = Date.now();
|
const startTime = Date.now();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const service = await this.createProvider(type);
|
// Just try to create the provider and check if it's available
|
||||||
|
const service = await this.createProviderByType(type);
|
||||||
// Try a simple completion to test the service
|
const isHealthy = service ? service.isAvailable() : false;
|
||||||
const testMessages = [{ role: 'user' as const, content: 'Hi' }];
|
|
||||||
const response = await service.generateChatCompletion(testMessages, {
|
|
||||||
maxTokens: 1,
|
|
||||||
temperature: 0
|
|
||||||
});
|
|
||||||
|
|
||||||
const latency = Date.now() - startTime;
|
const latency = Date.now() - startTime;
|
||||||
|
|
||||||
const status: ProviderHealthStatus = {
|
const status: ProviderHealthStatus = {
|
||||||
provider: type,
|
provider: type,
|
||||||
healthy: true,
|
healthy: isHealthy,
|
||||||
lastChecked: new Date(),
|
lastChecked: new Date(),
|
||||||
latency
|
latency
|
||||||
};
|
};
|
||||||
@ -771,22 +802,6 @@ export class ProviderFactory {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Get circuit breaker status
|
|
||||||
*/
|
|
||||||
public getCircuitBreakerStatus(): any {
|
|
||||||
if (!this.circuitBreakerManager) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
|
||||||
summary: this.circuitBreakerManager.getHealthSummary(),
|
|
||||||
details: Array.from(this.circuitBreakerManager.getAllStats().entries()).map(([name, stats]) => ({
|
|
||||||
provider: name,
|
|
||||||
...stats
|
|
||||||
}))
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get metrics summary
|
* Get metrics summary
|
||||||
@ -821,18 +836,6 @@ export class ProviderFactory {
|
|||||||
return this.metricsExporter.export(exportFormat);
|
return this.metricsExporter.export(exportFormat);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Reset circuit breaker for a specific provider
|
|
||||||
*/
|
|
||||||
public resetCircuitBreaker(provider: ProviderType): void {
|
|
||||||
if (!this.circuitBreakerManager) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const breaker = this.circuitBreakerManager.getBreaker(provider);
|
|
||||||
breaker.forceClose('Manual reset');
|
|
||||||
log.info(`[ProviderFactory] Circuit breaker reset for ${provider}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configure metrics export
|
* Configure metrics export
|
||||||
@ -862,10 +865,6 @@ export class ProviderFactory {
|
|||||||
this.healthCheckTimer = undefined;
|
this.healthCheckTimer = undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dispose circuit breaker manager
|
|
||||||
if (this.circuitBreakerManager) {
|
|
||||||
this.circuitBreakerManager.dispose();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dispose metrics exporter
|
// Dispose metrics exporter
|
||||||
if (this.metricsExporter) {
|
if (this.metricsExporter) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user