diff --git a/apps/server/src/services/llm/chat/rest_chat_service.ts b/apps/server/src/services/llm/chat/rest_chat_service.ts index 1d5434ac1..53ea457a1 100644 --- a/apps/server/src/services/llm/chat/rest_chat_service.ts +++ b/apps/server/src/services/llm/chat/rest_chat_service.ts @@ -237,14 +237,6 @@ class RestChatService { // Send WebSocket message wsService.sendMessageToAllClients(message); - - // Send SSE response for compatibility - const responseData: any = { content: data, done }; - if (rawChunk?.toolExecution) { - responseData.toolExecution = rawChunk.toolExecution; - } - - res.write(`data: ${JSON.stringify(responseData)}\n\n`); // When streaming is complete, save the accumulated content to the chat note if (done) { @@ -266,8 +258,8 @@ class RestChatService { log.error(`Error saving streaming response: ${error}`); } - // End the response - res.end(); + // Note: For WebSocket-only streaming, we don't end the HTTP response here + // since it was already handled by the calling endpoint } } diff --git a/apps/server/src/services/llm/pipeline/chat_pipeline.ts b/apps/server/src/services/llm/pipeline/chat_pipeline.ts index 947a562e6..50671a809 100644 --- a/apps/server/src/services/llm/pipeline/chat_pipeline.ts +++ b/apps/server/src/services/llm/pipeline/chat_pipeline.ts @@ -298,6 +298,9 @@ export class ChatPipeline { this.updateStageMetrics('llmCompletion', llmStartTime); log.info(`Received LLM response from model: ${completion.response.model}, provider: ${completion.response.provider}`); + // Track whether content has been streamed to prevent duplication + let hasStreamedContent = false; + // Handle streaming if enabled and available // Use shouldEnableStream variable which contains our streaming decision if (shouldEnableStream && completion.response.stream && streamCallback) { @@ -311,6 +314,9 @@ export class ChatPipeline { // Forward to callback with original chunk data in case it contains additional information streamCallback(processedChunk.text, processedChunk.done, chunk); + + // Mark that we have streamed content to prevent duplication + hasStreamedContent = true; }); } @@ -767,11 +773,15 @@ export class ChatPipeline { const responseText = currentResponse.text || ""; log.info(`Resuming streaming with final response: ${responseText.length} chars`); - if (responseText.length > 0) { - // Resume streaming with the final response text + if (responseText.length > 0 && !hasStreamedContent) { + // Resume streaming with the final response text only if we haven't already streamed content // This is where we send the definitive done:true signal with the complete content streamCallback(responseText, true); log.info(`Sent final response with done=true signal and text content`); + } else if (hasStreamedContent) { + log.info(`Content already streamed, sending done=true signal only after tool execution`); + // Just send the done signal without duplicating content + streamCallback('', true); } else { // For Anthropic, sometimes text is empty but response is in stream if ((currentResponse.provider === 'Anthropic' || currentResponse.provider === 'OpenAI') && currentResponse.stream) { @@ -803,13 +813,17 @@ export class ChatPipeline { log.info(`LLM response did not contain any tool calls, skipping tool execution`); // Handle streaming for responses without tool calls - if (shouldEnableStream && streamCallback) { + if (shouldEnableStream && streamCallback && !hasStreamedContent) { log.info(`Sending final streaming response without tool calls: ${currentResponse.text.length} chars`); // Send the final response with done=true to complete the streaming streamCallback(currentResponse.text, true); log.info(`Sent final non-tool response with done=true signal`); + } else if (shouldEnableStream && streamCallback && hasStreamedContent) { + log.info(`Content already streamed, sending done=true signal only`); + // Just send the done signal without duplicating content + streamCallback('', true); } }