well, at least resolve these issues?

This commit is contained in:
perf3ct 2025-04-16 20:33:04 +00:00
parent 8f1723c386
commit ec6214bee0
No known key found for this signature in database
GPG Key ID: 569C4EEC436F5232
5 changed files with 122 additions and 55 deletions

View File

@ -410,7 +410,15 @@ export class ChatPipeline {
streamingPaused = true;
// Send a dedicated message with a specific type for tool execution
streamCallback('', false, {
type: 'tool_execution_start'
text: '',
done: false,
toolExecution: {
type: 'start',
tool: {
name: 'tool_execution',
arguments: {}
}
}
});
}
@ -470,11 +478,14 @@ export class ChatPipeline {
// Send the structured tool result directly so the client has the raw data
streamCallback('', false, {
type: 'tool_result',
text: '',
done: false,
toolExecution: {
action: 'result',
tool: toolName,
toolCallId: msg.tool_call_id,
type: 'complete',
tool: {
name: toolName,
arguments: {}
},
result: parsedContent
}
});
@ -485,13 +496,15 @@ export class ChatPipeline {
log.error(`Error sending structured tool result: ${err}`);
// Use structured format here too instead of falling back to text format
streamCallback('', false, {
type: 'tool_result',
text: '',
done: false,
toolExecution: {
action: 'result',
tool: toolName || 'unknown',
toolCallId: msg.tool_call_id,
result: msg.content,
error: String(err)
type: 'complete',
tool: {
name: toolName || 'unknown',
arguments: {}
},
result: msg.content
}
});
}
@ -509,7 +522,15 @@ export class ChatPipeline {
// If streaming, show progress to the user
if (isStreaming && streamCallback) {
streamCallback('', false, {
type: 'tool_completion_processing'
text: '',
done: false,
toolExecution: {
type: 'update',
tool: {
name: 'tool_processing',
arguments: {}
}
}
});
}
@ -587,10 +608,15 @@ export class ChatPipeline {
// If streaming, show error to the user
if (isStreaming && streamCallback) {
streamCallback('', false, {
type: 'tool_execution_error',
text: '',
done: false,
toolExecution: {
action: 'error',
error: error.message || 'unknown error'
type: 'error',
tool: {
name: 'unknown',
arguments: {}
},
result: error.message || 'unknown error'
}
});
}

View File

@ -154,12 +154,19 @@ export class ToolCallingStage extends BasePipelineStage<ToolExecutionInput, { re
if (streamCallback) {
const toolExecutionData = {
action: 'start',
tool: toolCall.function.name,
args: args
tool: {
name: toolCall.function.name,
arguments: args
},
type: 'start' as const
};
// Don't wait for this to complete, but log any errors
const callbackResult = streamCallback('', false, { toolExecution: toolExecutionData });
const callbackResult = streamCallback('', false, {
text: '',
done: false,
toolExecution: toolExecutionData
});
if (callbackResult instanceof Promise) {
callbackResult.catch((e: Error) => log.error(`Error sending tool execution start event: ${e.message}`));
}
@ -172,7 +179,7 @@ export class ToolCallingStage extends BasePipelineStage<ToolExecutionInput, { re
result = await tool.execute(args);
const executionTime = Date.now() - executionStart;
log.info(`================ TOOL EXECUTION COMPLETED in ${executionTime}ms ================`);
// Record this successful tool execution if there's a sessionId available
if (input.options?.sessionId) {
try {
@ -188,17 +195,25 @@ export class ToolCallingStage extends BasePipelineStage<ToolExecutionInput, { re
log.error(`Failed to record tool execution in chat storage: ${storageError}`);
}
}
// Emit tool completion event if streaming is enabled
if (streamCallback) {
const toolExecutionData = {
action: 'complete',
tool: toolCall.function.name,
result: result
tool: {
name: toolCall.function.name,
arguments: {} as Record<string, unknown>
},
result: typeof result === 'string' ? result : result as Record<string, unknown>,
type: 'complete' as const
};
// Don't wait for this to complete, but log any errors
const callbackResult = streamCallback('', false, { toolExecution: toolExecutionData });
const callbackResult = streamCallback('', false, {
text: '',
done: false,
toolExecution: toolExecutionData
});
if (callbackResult instanceof Promise) {
callbackResult.catch((e: Error) => log.error(`Error sending tool execution complete event: ${e.message}`));
}
@ -206,7 +221,7 @@ export class ToolCallingStage extends BasePipelineStage<ToolExecutionInput, { re
} catch (execError: any) {
const executionTime = Date.now() - executionStart;
log.error(`================ TOOL EXECUTION FAILED in ${executionTime}ms: ${execError.message} ================`);
// Record this failed tool execution if there's a sessionId available
if (input.options?.sessionId) {
try {
@ -222,22 +237,30 @@ export class ToolCallingStage extends BasePipelineStage<ToolExecutionInput, { re
log.error(`Failed to record tool execution error in chat storage: ${storageError}`);
}
}
// Emit tool error event if streaming is enabled
if (streamCallback) {
const toolExecutionData = {
action: 'error',
tool: toolCall.function.name,
error: execError.message || String(execError)
tool: {
name: toolCall.function.name,
arguments: {} as Record<string, unknown>
},
error: execError.message || String(execError),
type: 'error' as const
};
// Don't wait for this to complete, but log any errors
const callbackResult = streamCallback('', false, { toolExecution: toolExecutionData });
const callbackResult = streamCallback('', false, {
text: '',
done: false,
toolExecution: toolExecutionData
});
if (callbackResult instanceof Promise) {
callbackResult.catch((e: Error) => log.error(`Error sending tool execution error event: ${e.message}`));
}
}
throw execError;
}
@ -262,12 +285,20 @@ export class ToolCallingStage extends BasePipelineStage<ToolExecutionInput, { re
if (streamCallback && error.name !== "ExecutionError") {
const toolExecutionData = {
action: 'error',
tool: toolCall.function.name,
error: error.message || String(error)
tool: {
name: toolCall.function.name,
arguments: {} as Record<string, unknown>
},
error: error.message || String(error),
type: 'error' as const
};
// Don't wait for this to complete, but log any errors
const callbackResult = streamCallback('', false, { toolExecution: toolExecutionData });
const callbackResult = streamCallback('', false, {
text: '',
done: false,
toolExecution: toolExecutionData
});
if (callbackResult instanceof Promise) {
callbackResult.catch((e: Error) => log.error(`Error sending tool execution error event: ${e.message}`));
}

View File

@ -96,9 +96,9 @@ export class AnthropicService extends BaseAIService {
// Add tool_choice parameter if specified
if (opts.tool_choice) {
if (opts.tool_choice === 'auto') {
if (typeof opts.tool_choice === 'string' && opts.tool_choice === 'auto') {
requestParams.tool_choice = 'auto';
} else if (opts.tool_choice === 'none') {
} else if (typeof opts.tool_choice === 'string' && opts.tool_choice === 'none') {
requestParams.tool_choice = 'none';
} else if (typeof opts.tool_choice === 'object' && opts.tool_choice.function) {
// Map from OpenAI format to Anthropic format
@ -252,9 +252,12 @@ export class AnthropicService extends BaseAIService {
done: false,
toolExecution: {
type: 'start',
tool: toolCall
tool: {
name: toolCall.function.name,
arguments: JSON.parse(toolCall.function.arguments || '{}')
}
},
raw: block
raw: { ...block } as unknown as Record<string, unknown>
});
}
});
@ -282,7 +285,7 @@ export class AnthropicService extends BaseAIService {
type: 'update',
tool: toolCall
},
raw: { type: 'json_fragment', data: jsonFragment }
raw: { type: 'json_fragment', data: jsonFragment } as Record<string, unknown>
});
}
}
@ -353,10 +356,14 @@ export class AnthropicService extends BaseAIService {
// Send final completion with full text and all tool calls
await callback({
text: fullText,
done: true,
tool_calls: toolCalls.length > 0 ? toolCalls : undefined,
raw: message
text: typeof message.content === 'string' ?
message.content :
message.content
.filter((block: any) => block.type === 'text')
.map((block: any) => block.text)
.join(''),
done: message.role === 'assistant',
raw: { ...message } as unknown as Record<string, unknown>
});
});

View File

@ -135,21 +135,21 @@ export class OpenAIService extends BaseAIService {
}
// Send the chunk to the caller with raw data and any accumulated tool calls
const streamChunk: StreamChunk & { raw: any } = {
const streamChunk: StreamChunk = {
text: content,
done: isDone,
raw: chunk
raw: chunk as unknown as Record<string, unknown>
};
// Add accumulated tool calls to raw data for compatibility with tool execution display
if (accumulatedToolCalls.length > 0) {
// Add tool calls to raw data for proper display
streamChunk.raw = {
...streamChunk.raw,
...streamChunk.raw as object,
tool_calls: accumulatedToolCalls.filter(Boolean)
};
} as Record<string, unknown>;
}
await callback(streamChunk);
if (isDone) {
@ -173,7 +173,7 @@ export class OpenAIService extends BaseAIService {
await callback({
text: content,
done: true,
raw: stream,
raw: stream as unknown as Record<string, unknown>,
tool_calls: toolCalls
});
}

View File

@ -187,9 +187,12 @@ export function createStreamHandler(
// Pass each chunk directly to the callback as it arrives
// without modifying or accumulating its content
await callback({
text: chunk.text || '',
done: !!chunk.done, // Ensure done is boolean
raw: chunk.raw || chunk // Include raw data
text: chunk.text,
done: true,
tool_calls: chunk.tool_calls,
raw: typeof chunk.raw === 'object' ?
chunk.raw as Record<string, unknown> :
{ data: chunk.raw } as Record<string, unknown> // Include raw data
});
});
} catch (error) {