diff --git a/src/services/llm/pipeline/chat_pipeline.ts b/src/services/llm/pipeline/chat_pipeline.ts index 7638ebf20..fc630bca8 100644 --- a/src/services/llm/pipeline/chat_pipeline.ts +++ b/src/services/llm/pipeline/chat_pipeline.ts @@ -410,7 +410,15 @@ export class ChatPipeline { streamingPaused = true; // Send a dedicated message with a specific type for tool execution streamCallback('', false, { - type: 'tool_execution_start' + text: '', + done: false, + toolExecution: { + type: 'start', + tool: { + name: 'tool_execution', + arguments: {} + } + } }); } @@ -470,11 +478,14 @@ export class ChatPipeline { // Send the structured tool result directly so the client has the raw data streamCallback('', false, { - type: 'tool_result', + text: '', + done: false, toolExecution: { - action: 'result', - tool: toolName, - toolCallId: msg.tool_call_id, + type: 'complete', + tool: { + name: toolName, + arguments: {} + }, result: parsedContent } }); @@ -485,13 +496,15 @@ export class ChatPipeline { log.error(`Error sending structured tool result: ${err}`); // Use structured format here too instead of falling back to text format streamCallback('', false, { - type: 'tool_result', + text: '', + done: false, toolExecution: { - action: 'result', - tool: toolName || 'unknown', - toolCallId: msg.tool_call_id, - result: msg.content, - error: String(err) + type: 'complete', + tool: { + name: toolName || 'unknown', + arguments: {} + }, + result: msg.content } }); } @@ -509,7 +522,15 @@ export class ChatPipeline { // If streaming, show progress to the user if (isStreaming && streamCallback) { streamCallback('', false, { - type: 'tool_completion_processing' + text: '', + done: false, + toolExecution: { + type: 'update', + tool: { + name: 'tool_processing', + arguments: {} + } + } }); } @@ -587,10 +608,15 @@ export class ChatPipeline { // If streaming, show error to the user if (isStreaming && streamCallback) { streamCallback('', false, { - type: 'tool_execution_error', + text: '', + done: false, toolExecution: { - action: 'error', - error: error.message || 'unknown error' + type: 'error', + tool: { + name: 'unknown', + arguments: {} + }, + result: error.message || 'unknown error' } }); } diff --git a/src/services/llm/pipeline/stages/tool_calling_stage.ts b/src/services/llm/pipeline/stages/tool_calling_stage.ts index d42b18610..dc1fcdad0 100644 --- a/src/services/llm/pipeline/stages/tool_calling_stage.ts +++ b/src/services/llm/pipeline/stages/tool_calling_stage.ts @@ -154,12 +154,19 @@ export class ToolCallingStage extends BasePipelineStage log.error(`Error sending tool execution start event: ${e.message}`)); } @@ -172,7 +179,7 @@ export class ToolCallingStage extends BasePipelineStage + }, + result: typeof result === 'string' ? result : result as Record, + type: 'complete' as const }; - + // Don't wait for this to complete, but log any errors - const callbackResult = streamCallback('', false, { toolExecution: toolExecutionData }); + const callbackResult = streamCallback('', false, { + text: '', + done: false, + toolExecution: toolExecutionData + }); if (callbackResult instanceof Promise) { callbackResult.catch((e: Error) => log.error(`Error sending tool execution complete event: ${e.message}`)); } @@ -206,7 +221,7 @@ export class ToolCallingStage extends BasePipelineStage + }, + error: execError.message || String(execError), + type: 'error' as const }; - + // Don't wait for this to complete, but log any errors - const callbackResult = streamCallback('', false, { toolExecution: toolExecutionData }); + const callbackResult = streamCallback('', false, { + text: '', + done: false, + toolExecution: toolExecutionData + }); if (callbackResult instanceof Promise) { callbackResult.catch((e: Error) => log.error(`Error sending tool execution error event: ${e.message}`)); } } - + throw execError; } @@ -262,12 +285,20 @@ export class ToolCallingStage extends BasePipelineStage + }, + error: error.message || String(error), + type: 'error' as const }; - + // Don't wait for this to complete, but log any errors - const callbackResult = streamCallback('', false, { toolExecution: toolExecutionData }); + const callbackResult = streamCallback('', false, { + text: '', + done: false, + toolExecution: toolExecutionData + }); if (callbackResult instanceof Promise) { callbackResult.catch((e: Error) => log.error(`Error sending tool execution error event: ${e.message}`)); } diff --git a/src/services/llm/providers/anthropic_service.ts b/src/services/llm/providers/anthropic_service.ts index f97b7bc6a..955ef494a 100644 --- a/src/services/llm/providers/anthropic_service.ts +++ b/src/services/llm/providers/anthropic_service.ts @@ -96,9 +96,9 @@ export class AnthropicService extends BaseAIService { // Add tool_choice parameter if specified if (opts.tool_choice) { - if (opts.tool_choice === 'auto') { + if (typeof opts.tool_choice === 'string' && opts.tool_choice === 'auto') { requestParams.tool_choice = 'auto'; - } else if (opts.tool_choice === 'none') { + } else if (typeof opts.tool_choice === 'string' && opts.tool_choice === 'none') { requestParams.tool_choice = 'none'; } else if (typeof opts.tool_choice === 'object' && opts.tool_choice.function) { // Map from OpenAI format to Anthropic format @@ -252,9 +252,12 @@ export class AnthropicService extends BaseAIService { done: false, toolExecution: { type: 'start', - tool: toolCall + tool: { + name: toolCall.function.name, + arguments: JSON.parse(toolCall.function.arguments || '{}') + } }, - raw: block + raw: { ...block } as unknown as Record }); } }); @@ -282,7 +285,7 @@ export class AnthropicService extends BaseAIService { type: 'update', tool: toolCall }, - raw: { type: 'json_fragment', data: jsonFragment } + raw: { type: 'json_fragment', data: jsonFragment } as Record }); } } @@ -353,10 +356,14 @@ export class AnthropicService extends BaseAIService { // Send final completion with full text and all tool calls await callback({ - text: fullText, - done: true, - tool_calls: toolCalls.length > 0 ? toolCalls : undefined, - raw: message + text: typeof message.content === 'string' ? + message.content : + message.content + .filter((block: any) => block.type === 'text') + .map((block: any) => block.text) + .join(''), + done: message.role === 'assistant', + raw: { ...message } as unknown as Record }); }); diff --git a/src/services/llm/providers/openai_service.ts b/src/services/llm/providers/openai_service.ts index 7b462127e..fcf7d71f5 100644 --- a/src/services/llm/providers/openai_service.ts +++ b/src/services/llm/providers/openai_service.ts @@ -135,21 +135,21 @@ export class OpenAIService extends BaseAIService { } // Send the chunk to the caller with raw data and any accumulated tool calls - const streamChunk: StreamChunk & { raw: any } = { + const streamChunk: StreamChunk = { text: content, done: isDone, - raw: chunk + raw: chunk as unknown as Record }; - + // Add accumulated tool calls to raw data for compatibility with tool execution display if (accumulatedToolCalls.length > 0) { // Add tool calls to raw data for proper display streamChunk.raw = { - ...streamChunk.raw, + ...streamChunk.raw as object, tool_calls: accumulatedToolCalls.filter(Boolean) - }; + } as Record; } - + await callback(streamChunk); if (isDone) { @@ -173,7 +173,7 @@ export class OpenAIService extends BaseAIService { await callback({ text: content, done: true, - raw: stream, + raw: stream as unknown as Record, tool_calls: toolCalls }); } diff --git a/src/services/llm/providers/stream_handler.ts b/src/services/llm/providers/stream_handler.ts index d766ba251..cbc6e2bb8 100644 --- a/src/services/llm/providers/stream_handler.ts +++ b/src/services/llm/providers/stream_handler.ts @@ -187,9 +187,12 @@ export function createStreamHandler( // Pass each chunk directly to the callback as it arrives // without modifying or accumulating its content await callback({ - text: chunk.text || '', - done: !!chunk.done, // Ensure done is boolean - raw: chunk.raw || chunk // Include raw data + text: chunk.text, + done: true, + tool_calls: chunk.tool_calls, + raw: typeof chunk.raw === 'object' ? + chunk.raw as Record : + { data: chunk.raw } as Record // Include raw data }); }); } catch (error) {