From dd7be3a330afb895fefee8e311622854ab16147e Mon Sep 17 00:00:00 2001 From: team 1 Date: Tue, 28 Apr 2026 08:19:53 +0200 Subject: [PATCH] optimize msg flow --- config/retriex/agent.yaml | 2 +- public/assets/js/base.js | 96 ++++++++++++++++++++++++++++- src/Controller/AskSseController.php | 53 ++++++++++++---- 3 files changed, 138 insertions(+), 13 deletions(-) diff --git a/config/retriex/agent.yaml b/config/retriex/agent.yaml index 908396b..43cd98b 100644 --- a/config/retriex/agent.yaml +++ b/config/retriex/agent.yaml @@ -16,7 +16,7 @@ parameters: no_concrete_shop_query: 'Ich kann die Shop-Suche noch nicht belastbar auflösen. Bitte nenne das Produkt, den Messparameter oder das Zubehör etwas konkreter.' fetch_search_data_template: 'Ich rufe Recherchedaten ab (type: %s)' analyze_all_information: 'Ich analysiere alle Informationen...' - thinking_while_streaming: 'Antwort wird generiert...' + thinking_while_streaming: 'Denke nach...' no_llm_data_received: '❌ Es wurden keine Daten vom LLM empfangen.' generic_internal_error: '❌ Bei der Verarbeitung der Anfrage ist ein interner Fehler aufgetreten.' debug_internal_error_prefix: '❌ Interner Fehler: ' diff --git a/public/assets/js/base.js b/public/assets/js/base.js index 28285ac..859c9dc 100644 --- a/public/assets/js/base.js +++ b/public/assets/js/base.js @@ -764,6 +764,8 @@ document.addEventListener('DOMContentLoaded', () => { state.eventSource = source; let networkErrorTimer = null; + let completionWatchdogTimer = null; + let completionWatchdogInFlight = false; const clearNetworkErrorTimer = () => { if (!networkErrorTimer) { @@ -774,6 +776,15 @@ document.addEventListener('DOMContentLoaded', () => { networkErrorTimer = null; }; + const clearCompletionWatchdog = () => { + if (!completionWatchdogTimer) { + return; + } + + clearInterval(completionWatchdogTimer); + completionWatchdogTimer = null; + }; + const complete = () => { if (finished) { return; @@ -781,6 +792,7 @@ document.addEventListener('DOMContentLoaded', () => { finished = true; clearNetworkErrorTimer(); + clearCompletionWatchdog(); finishEventStream(); resolve(); }; @@ -792,10 +804,73 @@ document.addEventListener('DOMContentLoaded', () => { finished = true; clearNetworkErrorTimer(); + clearCompletionWatchdog(); finishEventStream(); reject(err); }; + const finishFromStatus = (payload) => { + if (finished || state.abortRequested || !payload || typeof payload !== 'object') { + return false; + } + + const status = String(payload.status || ''); + const serverLastEventId = Number.parseInt(String(payload.lastEventId || '0'), 10) || 0; + + // Only finalize from the side-channel when all numbered content chunks are already visible. + // This prevents cutting off the final answer if the browser missed more than just the terminal done event. + if (serverLastEventId > lastSseEventId) { + return false; + } + + if (status === 'completed') { + finalizeStream(bubble, raw); + rememberCompletedTurn(prompt, raw); + complete(); + return true; + } + + if (status === 'failed' || status === 'interrupted') { + appendError(payload.message || 'Der Antwort-Stream wurde beendet, ohne ein Abschluss-Signal an den Browser zu senden.'); + complete(); + return true; + } + + return false; + }; + + const checkCompletionStatus = async () => { + if (finished || state.abortRequested || completionWatchdogInFlight) { + return false; + } + + completionWatchdogInFlight = true; + + try { + const statusRes = await fetch(`/ask-jobs/${encodeURIComponent(jobId)}`, { + method: 'GET', + cache: 'no-store', + signal: state.abortController?.signal, + }); + + if (!statusRes.ok) { + return false; + } + + return finishFromStatus(await statusRes.json()); + } catch (err) { + if (!state.abortRequested) { + console.debug('Stream completion watchdog failed:', err); + } + + return false; + } finally { + completionWatchdogInFlight = false; + } + }; + + completionWatchdogTimer = setInterval(checkCompletionStatus, 2500); + state.completeStream = complete; state.failStream = fail; @@ -815,6 +890,13 @@ document.addEventListener('DOMContentLoaded', () => { return; } + if (event.data === '[DONE]') { + finalizeStream(bubble, raw); + rememberCompletedTurn(prompt, raw); + complete(); + return; + } + const numericEventId = Number.parseInt(event.lastEventId || '', 10); if (Number.isFinite(numericEventId) && numericEventId > 0) { @@ -849,8 +931,20 @@ document.addEventListener('DOMContentLoaded', () => { return; } + void checkCompletionStatus(); + if (source.readyState === EventSource.CLOSED) { - fail(new Error('EventSource connection closed')); + window.setTimeout(async () => { + if (finished || state.abortRequested) { + return; + } + + const statusCompleted = await checkCompletionStatus(); + + if (!statusCompleted && !finished) { + fail(new Error('EventSource connection closed')); + } + }, 250); return; } diff --git a/src/Controller/AskSseController.php b/src/Controller/AskSseController.php index 6712d6d..0bf441f 100644 --- a/src/Controller/AskSseController.php +++ b/src/Controller/AskSseController.php @@ -85,6 +85,28 @@ final readonly class AskSseController return $response; } + #[Route('/ask-jobs/{jobId}', name: 'ask_job_status', methods: ['GET'], requirements: ['jobId' => '[a-f0-9]{48}'])] + public function jobStatus(string $jobId): JsonResponse + { + $job = $this->readJob($jobId); + + if (!is_array($job)) { + return new JsonResponse([ + 'status' => 'missing', + 'message' => 'Der Antwort-Job wurde nicht gefunden.', + 'lastEventId' => 0, + ], Response::HTTP_NOT_FOUND); + } + + return new JsonResponse([ + 'status' => (string) ($job['status'] ?? ''), + 'message' => is_string($job['message'] ?? null) ? (string) $job['message'] : '', + 'lastEventId' => max(0, (int) ($job['lastEventId'] ?? 0)), + 'updatedAt' => max(0, (int) ($job['updatedAt'] ?? 0)), + 'completedAt' => max(0, (int) ($job['completedAt'] ?? 0)), + ]); + } + #[Route('/ask-sse/{jobId}', name: 'ask_sse_job', methods: ['GET'], requirements: ['jobId' => '[a-f0-9]{48}'])] public function streamJob(Request $request, string $jobId): StreamedResponse { @@ -96,7 +118,7 @@ final readonly class AskSseController if (($claimed['ok'] ?? false) !== true) { $this->prepareStreamRuntime(); - echo "retry: 30000\n\n"; + echo "retry: 3000\n\n"; if ($this->canReplayOrTailClaimedJob($claimed)) { $this->streamStoredJobResponse($jobId, $lastEventId); @@ -104,7 +126,7 @@ final readonly class AskSseController } $this->sendEvent('error', $this->jobClaimErrorMessage($claimed)); - $this->sendEvent('done', '[DONE]'); + $this->sendDoneEvent(); return; } @@ -179,13 +201,13 @@ final readonly class AskSseController } } - echo "retry: 15000\n\n"; + echo "retry: 3000\n\n"; $this->sendComment('stream-open'); if ($prompt === '') { $this->markJobStatus($jobId, self::JOB_STATUS_FAILED, 'Empty prompt'); $this->sendEvent('error', 'Empty prompt'); - $this->sendEvent('done', '[DONE]'); + $this->sendDoneEvent(); return; } @@ -216,12 +238,12 @@ final readonly class AskSseController $this->appendHistoryFailure($clientId, $prompt, $message); } - $this->sendEvent('done', '[DONE]'); + $this->sendDoneEvent(); return; } $this->markJobStatus($jobId, self::JOB_STATUS_COMPLETED); - $this->sendEvent('done', '[DONE]'); + $this->sendDoneEvent(); } private function appendHistoryFailure(string $clientId, string $prompt, string $message): void @@ -267,7 +289,7 @@ final readonly class AskSseController $this->markJobStatus($jobId, self::JOB_STATUS_FAILED, $message); $this->sendEvent('error', $message); - $this->sendEvent('done', '[DONE]'); + $this->sendDoneEvent(); }); } @@ -355,6 +377,15 @@ final readonly class AskSseController $this->flushOutput(); } + private function sendDoneEvent(): void + { + $this->sendEvent('done', '[DONE]'); + + if (function_exists('fastcgi_finish_request')) { + @fastcgi_finish_request(); + } + } + private function sendComment(string $comment): void { $safe = str_replace(["\r", "\n"], ' ', $comment); @@ -605,7 +636,7 @@ final readonly class AskSseController if ($status === self::JOB_STATUS_COMPLETED) { $this->sendComment('replayed-completed-stream'); - $this->sendEvent('done', '[DONE]'); + $this->sendDoneEvent(); return; } @@ -616,7 +647,7 @@ final readonly class AskSseController ? 'Der Antwort-Stream ist fehlgeschlagen: ' . $message : 'Der Antwort-Stream ist fehlgeschlagen. Bitte sende die Anfrage erneut.' ); - $this->sendEvent('done', '[DONE]'); + $this->sendDoneEvent(); return; } @@ -627,7 +658,7 @@ final readonly class AskSseController ? $message : 'Der Antwort-Stream wurde durch einen Verbindungsabbruch unterbrochen. Bitte sende die Anfrage erneut, falls die Antwort unvollständig ist.' ); - $this->sendEvent('done', '[DONE]'); + $this->sendDoneEvent(); return; } @@ -637,7 +668,7 @@ final readonly class AskSseController 'status' => $status, 'message' => $message, ])); - $this->sendEvent('done', '[DONE]'); + $this->sendDoneEvent(); return; }