From 75c376c7a80556231a64538b5370544d121b337c Mon Sep 17 00:00:00 2001 From: team 1 Date: Mon, 27 Apr 2026 15:57:38 +0200 Subject: [PATCH] fix shop research --- ..._META_CONTEXT_AND_SSE_REPLAY_FIX_README.md | 84 +++++ ...P_META_CONTEXT_HINT_FRONTEND_FIX_README.md | 62 ++++ RETRIEX_SSE_RECONNECT_REPLAY_FIX_README.md | 43 +++ config/retriex/agent.yaml | 5 +- public/assets/js/base.js | 192 ++++++++++++ src/Agent/AgentRunner.php | 39 ++- src/Config/AgentRunnerConfig.php | 5 +- src/Controller/AskSseController.php | 296 ++++++++++++++++-- 8 files changed, 700 insertions(+), 26 deletions(-) create mode 100644 RETRIEX_SHOP_META_CONTEXT_AND_SSE_REPLAY_FIX_README.md create mode 100644 RETRIEX_SHOP_META_CONTEXT_HINT_FRONTEND_FIX_README.md create mode 100644 RETRIEX_SSE_RECONNECT_REPLAY_FIX_README.md diff --git a/RETRIEX_SHOP_META_CONTEXT_AND_SSE_REPLAY_FIX_README.md b/RETRIEX_SHOP_META_CONTEXT_AND_SSE_REPLAY_FIX_README.md new file mode 100644 index 0000000..68b4971 --- /dev/null +++ b/RETRIEX_SHOP_META_CONTEXT_AND_SSE_REPLAY_FIX_README.md @@ -0,0 +1,84 @@ +# RetrieX Shop-Meta-Context + SSE-Reconnect-Replay Fix + +## Problem + +Kurze referenzielle Shop-Befehle wie: + +```text +suche im shop +``` + +konnten trotz vorheriger Antwort scheitern mit: + +```text +Ich habe keine konkrete Shop-Suchanfrage erkannt. +``` + +Das passierte besonders nach langen oder reconnect-anfälligen SSE-Antworten, weil der Shop-Fallback vollständig von der serverseitig bereits geschriebenen History und einer stabilen Client-ID abhängig war. + +Zusätzlich war das Job-ID-/SSE-Verhalten fragil: ein automatischer `EventSource`-Reconnect auf einen bereits laufenden Job konnte weiterhin als Duplicate-Stream behandelt werden. + +## Lösung + +### 1. SSE-Reconnect-Replay + +- Antwort-Chunks werden pro Job mit fortlaufender SSE-`id` gesendet. +- Chunks werden zusätzlich in `var/stream_jobs/*.stream.ndjson` gepuffert. +- Reconnects mit `Last-Event-ID` replayen fehlende Chunks und tailen den laufenden Job. +- Doppelte Events werden im Frontend defensiv ignoriert. +- Completed/failed/interrupted Jobs werden sauber beendet. + +### 2. Ephemeral Client Context Hint + +- Das Frontend merkt sich den zuletzt vollständig abgeschlossenen Turn. +- `/ask-jobs` sendet diesen Turn als `contextHint` mit. +- Der Server speichert diesen Hint im Stream-Job und reicht ihn an `AgentRunner` weiter. +- `AgentRunner` hängt den Hint nur temporär an den Commerce-History-Kontext an. +- Der Hint wird nicht als eigener History-Eintrag persistiert. + +Damit kann `suche im shop` auch dann auf die letzte Nutzerfrage/Antwort zurückfallen, wenn die serverseitige History gerade noch nicht zuverlässig greift oder die Client-ID-/Cookie-Situation im Browser wackelt. + +### 3. Shop-Fallback-Tokenfilter geschärft + +- `messung` wird im Shop-Context-Fallback nicht mehr entfernt, weil Begriffe wie `redox messung` fachlich relevant sind. +- Zusätzliche Füllwörter wie `ist`, `sind`, `gut`, `geeignet` werden entfernt. + +Beispiel aus dem Fehlerfall: + +```text +welcher pockettester ist für Redox messung gut +suche im shop +``` + +Der zweite Befehl kann nun aus dem Kontext wieder auf `pockettester redox messung` bzw. eine vergleichbare Shop-Query kommen, statt ohne konkrete Shop-Suchanfrage abzubrechen. + +## Geänderte Dateien + +```text +src/Controller/AskSseController.php +src/Agent/AgentRunner.php +src/Config/AgentRunnerConfig.php +config/retriex/agent.yaml +public/assets/js/base.js +``` + +## Prüfung + +```bash +php -n -l src/Controller/AskSseController.php +php -n -l src/Agent/AgentRunner.php +php -n -l src/Config/AgentRunnerConfig.php +node --check public/assets/js/base.js +``` + +`vendor/autoload.php` war in der ZIP-Arbeitskopie nicht enthalten, daher wurde `bin/console mto:agent:config:validate` hier nicht ausgeführt. + +## Nach dem Einspielen + +```bash +php bin/console cache:clear +php bin/console mto:agent:config:validate +php bin/console mto:agent:regression:test +``` + +Danach den Browser hart neu laden, weil `public/assets/js/base.js` geändert wurde. diff --git a/RETRIEX_SHOP_META_CONTEXT_HINT_FRONTEND_FIX_README.md b/RETRIEX_SHOP_META_CONTEXT_HINT_FRONTEND_FIX_README.md new file mode 100644 index 0000000..1569886 --- /dev/null +++ b/RETRIEX_SHOP_META_CONTEXT_HINT_FRONTEND_FIX_README.md @@ -0,0 +1,62 @@ +# RetrieX Shop Meta Context Hint Frontend Fix + +Patch-only fix for the referential shop follow-up flow: + +```text +welcher pockettester ist fuer Redox messung gut +suche im shop +``` + +## Problem + +The backend fallback for meta-only shop prompts such as `suche im shop` can only resolve a concrete shop query when the previous turn is available through server history or through the frontend `contextHint`. + +The previous frontend hint used only the in-memory `lastCompletedUserPrompt` / `lastCompletedAssistantText` state. If that state was empty, overwritten, or lost after a reload/reconnect sequence, the next `/ask-jobs` request sent an empty hint. The backend then had no concrete product/search context and returned: + +```text +Ich habe keine konkrete Shop-Suchanfrage erkannt. Bitte nenne das Produkt, Zubehör oder die Artikelnummer. +``` + +A failed meta-only turn could also overwrite the frontend's last useful context, making immediate retries fragile. + +## Fix + +`public/assets/js/base.js` now builds the request `contextHint` more defensively: + +1. Reconstruct the latest completed visible chat turn from the DOM before sending a new prompt. +2. Persist the last completed turn in `sessionStorage` as a per-tab fallback. +3. Do not overwrite the useful last turn with the generic no-concrete-shop-query response for meta-only prompts. +4. Clear the stored fallback when the user clears the chat history. + +## Scope + +Changed file: + +```text +public/assets/js/base.js +``` + +No changes to retrieval, scoring, PromptBuilder, AgentRunner, ShopSearchService, SSE job replay, or YAML prompt logic. + +## Validation + +```bash +node --check public/assets/js/base.js +``` + +Expected regression flow: + +```text +Was ist der niedrigste Grenzwert fuer die Wasserhaerte, welcher mit einem Testomaten ueberwacht werden kann? +mit welchem indikator wird der wert gemessen +was kostet der indikator +``` + +Expected shop meta flow: + +```text +welcher pockettester ist fuer Redox messung gut +suche im shop +``` + +The second prompt should reuse the previous Redox/Pockettester context and no longer return the generic no-concrete-shop-query message. diff --git a/RETRIEX_SSE_RECONNECT_REPLAY_FIX_README.md b/RETRIEX_SSE_RECONNECT_REPLAY_FIX_README.md new file mode 100644 index 0000000..e6940c3 --- /dev/null +++ b/RETRIEX_SSE_RECONNECT_REPLAY_FIX_README.md @@ -0,0 +1,43 @@ +# RetrieX SSE reconnect replay fix + +Patch-only fix for fragile browser streaming after the job-based EventSource flow. + +## Problem + +EventSource is allowed to reconnect automatically when the browser, proxy or PHP/Nginx connection is interrupted. The previous job guard treated a second `/ask-sse/{jobId}` connection while the job was still `running` as an application error. In the UI this could append messages like: + +```text +event: error +data: Der Antwort-Stream läuft bereits oder wurde nach einem Verbindungsabbruch erneut geöffnet... +``` + +This happened especially in slower Shopware/search flows, but it could also happen in pure RAG answers. + +## Change + +- Streamed answer chunks now receive monotonically increasing SSE `id:` values. +- Each streamed chunk is additionally written to a per-job replay buffer under `var/stream_jobs/*.stream.ndjson`. +- If EventSource reconnects to a job that is already `running`, the controller no longer emits the misleading duplicate-stream error. +- Instead, the reconnecting request reads `Last-Event-ID`, replays missing chunks and tails the still-running job until it completes, fails or is marked interrupted. +- Completed jobs can replay any missing chunks and then emit `done`. +- The frontend tracks `event.lastEventId` and ignores duplicate/replayed chunks defensively. +- Expired job cleanup also removes the replay buffer sidecar file. + +## Changed files + +- `src/Controller/AskSseController.php` +- `public/assets/js/base.js` + +## Safety + +This patch does not change retrieval, PromptBuilder, AgentRunner, scoring, intent detection, Shopware query generation or RAG behavior. It only changes the transport/reconnect layer for SSE. + +## After installing + +```bash +php bin/console cache:clear +php bin/console mto:agent:config:validate +php bin/console mto:agent:regression:test +``` + +Hard-refresh the browser or clear browser cache because `public/assets/js/base.js` changed. diff --git a/config/retriex/agent.yaml b/config/retriex/agent.yaml index 59fecdd..6fbbd49 100644 --- a/config/retriex/agent.yaml +++ b/config/retriex/agent.yaml @@ -101,6 +101,10 @@ parameters: - welches - welchem - welchen + - ist + - sind + - gut + - geeignet - was - wie - wo @@ -123,7 +127,6 @@ parameters: - fuer - messen - gemessen - - messung meta_only_terms: - shop - shopsuche diff --git a/public/assets/js/base.js b/public/assets/js/base.js index 17d267e..13be49a 100644 --- a/public/assets/js/base.js +++ b/public/assets/js/base.js @@ -5,6 +5,7 @@ document.addEventListener('DOMContentLoaded', () => { const abortBtn = document.getElementById('abort'); const clearBtn = document.getElementById('clear'); const aiCloudEl = document.getElementById('ai-cloud'); + const LAST_TURN_STORAGE_KEY = 'retriex:lastCompletedTurn'; const state = { abortRequested: false, @@ -15,6 +16,8 @@ document.addEventListener('DOMContentLoaded', () => { eventSource: null, completeStream: null, failStream: null, + lastCompletedUserPrompt: '', + lastCompletedAssistantText: '', }; marked.setOptions({breaks: true}); @@ -23,6 +26,163 @@ document.addEventListener('DOMContentLoaded', () => { return DOMPurify.sanitize(marked.parse(text)); } + function normalizeContextHintText(value) { + return String(value || '') + .replace(/\r\n/g, '\n') + .replace(/\r/g, '\n') + .replace(/[\t ]+/g, ' ') + .replace(/\n{3,}/g, '\n\n') + .trim(); + } + + function tokenizeClientMetaGuardText(value) { + return normalizeContextHintText(value) + .toLowerCase() + .replace(/[-/_]/g, ' ') + .replace(/[^\p{L}\p{N}]+/gu, ' ') + .trim() + .split(/\s+/u) + .filter(Boolean); + } + + function isClientMetaOnlyShopPrompt(value) { + const tokens = tokenizeClientMetaGuardText(value); + + if (!tokens.length) { + return true; + } + + const metaTerms = new Set([ + 'shop', 'shopsuche', 'suche', 'suchen', 'such', 'finde', 'find', + 'zeige', 'zeig', 'bitte', 'mal', 'im', 'in', 'nach', 'den', 'die', + 'das', 'der', 'dem', + ]); + + return tokens.every((token) => metaTerms.has(token)); + } + + function isNoConcreteShopResponse(value) { + return normalizeContextHintText(value) + .toLowerCase() + .includes('keine konkrete shop-suchanfrage erkannt'); + } + + function rememberCompletedTurn(userPrompt, assistantText) { + const normalizedPrompt = normalizeContextHintText(userPrompt); + const normalizedAssistantText = normalizeContextHintText(assistantText); + + if (!normalizedPrompt) { + return; + } + + if (isClientMetaOnlyShopPrompt(normalizedPrompt) && isNoConcreteShopResponse(normalizedAssistantText)) { + return; + } + + state.lastCompletedUserPrompt = normalizedPrompt.slice(0, 800); + state.lastCompletedAssistantText = normalizedAssistantText.slice(0, 3000); + + try { + window.sessionStorage?.setItem(LAST_TURN_STORAGE_KEY, JSON.stringify({ + userPrompt: state.lastCompletedUserPrompt, + assistantText: state.lastCompletedAssistantText, + rememberedAt: Date.now(), + })); + } catch (err) { + console.debug('Could not persist last completed turn:', err); + } + } + + function loadStoredCompletedTurn() { + try { + const raw = window.sessionStorage?.getItem(LAST_TURN_STORAGE_KEY) || ''; + + if (!raw) { + return null; + } + + const data = JSON.parse(raw); + const userPrompt = normalizeContextHintText(data?.userPrompt || ''); + const assistantText = normalizeContextHintText(data?.assistantText || ''); + + if (!userPrompt) { + return null; + } + + return { + userPrompt: userPrompt.slice(0, 800), + assistantText: assistantText.slice(0, 3000), + }; + } catch (err) { + console.debug('Could not read last completed turn:', err); + return null; + } + } + + function extractLatestVisibleCompletedTurn() { + if (!chatEl) { + return null; + } + + const messages = Array.from(chatEl.querySelectorAll('.message')); + let pendingUserPrompt = ''; + let latestTurn = null; + + messages.forEach((message) => { + const bubble = message.querySelector('.bubble'); + const text = normalizeContextHintText(bubble?.innerText || bubble?.textContent || ''); + + if (!text) { + return; + } + + if (message.classList.contains('user')) { + pendingUserPrompt = text; + return; + } + + if (!message.classList.contains('assistant') || !pendingUserPrompt) { + return; + } + + if (bubble?.classList.contains('loader')) { + return; + } + + if (isClientMetaOnlyShopPrompt(pendingUserPrompt) && isNoConcreteShopResponse(text)) { + pendingUserPrompt = ''; + return; + } + + latestTurn = { + userPrompt: pendingUserPrompt, + assistantText: text, + }; + pendingUserPrompt = ''; + }); + + return latestTurn; + } + + function buildClientContextHint() { + const visibleTurn = extractLatestVisibleCompletedTurn(); + const storedTurn = loadStoredCompletedTurn(); + const userPrompt = visibleTurn?.userPrompt || state.lastCompletedUserPrompt || storedTurn?.userPrompt || ''; + const assistantText = visibleTurn?.assistantText || state.lastCompletedAssistantText || storedTurn?.assistantText || ''; + + if (!userPrompt) { + return ''; + } + + const lines = [`Question: ${userPrompt.slice(0, 800)}`]; + + if (assistantText) { + lines.push(assistantText.slice(0, 3000)); + } + + return normalizeContextHintText(lines.join('\n')).slice(0, 4000); + } + function scrollChatToBottom() { if (!chatEl) { return; @@ -368,10 +528,20 @@ document.addEventListener('DOMContentLoaded', () => { } const messages = await res.json(); + let latestLoadedUserPrompt = ''; messages.forEach((message) => { const bubble = addMessage(message.role); renderBubbleContent(bubble, message.text); + + if (message.role === 'user') { + latestLoadedUserPrompt = normalizeContextHintText(message.text); + return; + } + + if (message.role === 'assistant' && latestLoadedUserPrompt) { + rememberCompletedTurn(latestLoadedUserPrompt, message.text); + } }); enhanceChatLinks(chatEl); @@ -396,6 +566,8 @@ document.addEventListener('DOMContentLoaded', () => { return; } + const contextHint = buildClientContextHint(); + addMessage('user', renderMarkdown(prompt)); promptEl.value = ''; @@ -449,6 +621,7 @@ document.addEventListener('DOMContentLoaded', () => { body: JSON.stringify({ prompt, fullContext: false, + contextHint, }), signal: state.abortController.signal, }); @@ -466,6 +639,7 @@ document.addEventListener('DOMContentLoaded', () => { await new Promise((resolve, reject) => { let finished = false; + let lastSseEventId = 0; const source = new EventSource(`/ask-sse/${encodeURIComponent(jobId)}`); state.eventSource = source; @@ -521,12 +695,23 @@ document.addEventListener('DOMContentLoaded', () => { return; } + const numericEventId = Number.parseInt(event.lastEventId || '', 10); + + if (Number.isFinite(numericEventId) && numericEventId > 0) { + if (numericEventId <= lastSseEventId) { + return; + } + + lastSseEventId = numericEventId; + } + appendChunk(event.data); }; source.addEventListener('done', () => { if (!state.abortRequested) { finalizeStream(bubble, raw); + rememberCompletedTurn(prompt, raw); } complete(); @@ -609,6 +794,13 @@ document.addEventListener('DOMContentLoaded', () => { console.error('History delete failed:', err); } + state.lastCompletedUserPrompt = ''; + state.lastCompletedAssistantText = ''; + try { + window.sessionStorage?.removeItem(LAST_TURN_STORAGE_KEY); + } catch (err) { + console.debug('Could not clear last completed turn:', err); + } chatEl.innerHTML = ''; addMessage('assistant', 'History cleared.'); }); diff --git a/src/Agent/AgentRunner.php b/src/Agent/AgentRunner.php index 4e08bf6..c44d48a 100644 --- a/src/Agent/AgentRunner.php +++ b/src/Agent/AgentRunner.php @@ -39,7 +39,7 @@ final readonly class AgentRunner $this->systemMsgOn = true; } - public function run(string $prompt, string $userId, bool $forceFullContext = false): Generator + public function run(string $prompt, string $userId, bool $forceFullContext = false, string $requestContextHint = ''): Generator { $prompt = trim($prompt); @@ -109,7 +109,7 @@ final readonly class AgentRunner if ($this->isCommerceIntent($commerceIntent)) { yield $this->systemMsg($this->agentRunnerConfig->getOptimizeSearchMessage(), 'think'); - $commerceHistoryContext = $this->buildCommerceHistoryContext($userId); + $commerceHistoryContext = $this->buildCommerceHistoryContext($userId, $requestContextHint); if ($commerceHistoryContext !== '') { $this->addSource($sources, $this->agentRunnerConfig->getConversationHistorySourceLabel()); @@ -136,6 +136,7 @@ final readonly class AgentRunner 'optimizedShopQuery' => $optimizedShopQuery, 'hasCommerceHistoryContext' => $commerceHistoryContext !== '', 'commerceHistoryContextLength' => mb_strlen($commerceHistoryContext), + 'hasRequestContextHint' => trim($requestContextHint) !== '', ]); yield $this->systemMsg( @@ -925,12 +926,42 @@ final readonly class AgentRunner } } - private function buildCommerceHistoryContext(string $userId): string + private function buildCommerceHistoryContext(string $userId, string $requestContextHint = ''): string { - return $this->contextService->buildUserContextWithinBudget( + $history = $this->contextService->buildUserContextWithinBudget( $userId, $this->agentRunnerConfig->getCommerceHistoryBudgetChars() ); + + $requestContextHint = $this->sanitizeRequestContextHintForCommerce($requestContextHint); + + if ($requestContextHint === '') { + return $history; + } + + if ($history === '') { + return $requestContextHint; + } + + return trim($history) . "\n\n" . $requestContextHint; + } + + private function sanitizeRequestContextHintForCommerce(string $requestContextHint): string + { + $requestContextHint = str_replace(["\r\n", "\r"], "\n", $requestContextHint); + $requestContextHint = preg_replace('/[\t ]+/u', ' ', $requestContextHint) ?? $requestContextHint; + $requestContextHint = preg_replace('/\n{3,}/u', "\n\n", $requestContextHint) ?? $requestContextHint; + $requestContextHint = trim($requestContextHint); + + if ($requestContextHint === '') { + return ''; + } + + if (mb_strlen($requestContextHint, 'UTF-8') > 4000) { + $requestContextHint = mb_substr($requestContextHint, 0, 4000, 'UTF-8'); + } + + return trim($requestContextHint); } private function limitKnowledgeChunks(array $knowledgeChunks, string $commerceIntent): array diff --git a/src/Config/AgentRunnerConfig.php b/src/Config/AgentRunnerConfig.php index 50870aa..6c4919c 100644 --- a/src/Config/AgentRunnerConfig.php +++ b/src/Config/AgentRunnerConfig.php @@ -460,6 +460,10 @@ final class AgentRunnerConfig 'welches', 'welchem', 'welchen', + 'ist', + 'sind', + 'gut', + 'geeignet', 'was', 'wie', 'wo', @@ -482,7 +486,6 @@ final class AgentRunnerConfig 'fuer', 'messen', 'gemessen', - 'messung', ]); } diff --git a/src/Controller/AskSseController.php b/src/Controller/AskSseController.php index 6855877..6712d6d 100644 --- a/src/Controller/AskSseController.php +++ b/src/Controller/AskSseController.php @@ -51,6 +51,8 @@ final readonly class AskSseController FILTER_VALIDATE_BOOL ); + $requestContextHint = $this->sanitizeRequestContextHint((string) ($data['contextHint'] ?? '')); + $cookieResponse = new Response(); $clientId = $this->clientIdResolver->resolve($request, $cookieResponse); @@ -63,6 +65,7 @@ final readonly class AskSseController 'prompt' => $prompt, 'clientId' => $clientId, 'includeFullContext' => $includeFullContext, + 'requestContextHint' => $requestContextHint, 'createdAt' => $now, 'updatedAt' => $now, ]); @@ -83,19 +86,20 @@ final readonly class AskSseController } #[Route('/ask-sse/{jobId}', name: 'ask_sse_job', methods: ['GET'], requirements: ['jobId' => '[a-f0-9]{48}'])] - public function streamJob(string $jobId): StreamedResponse + public function streamJob(Request $request, string $jobId): StreamedResponse { + $lastEventId = $this->resolveLastEventId($request); + return new StreamedResponse( - function () use ($jobId): void { + function () use ($jobId, $lastEventId): void { $claimed = $this->claimJob($jobId); if (($claimed['ok'] ?? false) !== true) { $this->prepareStreamRuntime(); - echo "retry: 15000\n\n"; + echo "retry: 30000\n\n"; - if ($this->shouldSilentlyCloseDuplicateJobStream($claimed)) { - $this->sendComment('duplicate-or-finished-stream'); - $this->sendEvent('done', '[DONE]'); + if ($this->canReplayOrTailClaimedJob($claimed)) { + $this->streamStoredJobResponse($jobId, $lastEventId); return; } @@ -112,7 +116,8 @@ final readonly class AskSseController clientId: (string) ($job['clientId'] ?? ''), includeFullContext: (bool) ($job['includeFullContext'] ?? false), cookieResponse: null, - jobId: $jobId + jobId: $jobId, + requestContextHint: is_string($job['requestContextHint'] ?? null) ? (string) $job['requestContextHint'] : '' ); }, Response::HTTP_OK, @@ -136,17 +141,20 @@ final readonly class AskSseController FILTER_VALIDATE_BOOL ); + $requestContextHint = $this->sanitizeRequestContextHint((string) ($data['contextHint'] ?? '')); + $cookieResponse = new Response(); $clientId = $this->clientIdResolver->resolve($request, $cookieResponse); return new StreamedResponse( - function () use ($prompt, $clientId, $cookieResponse, $includeFullContext): void { + function () use ($prompt, $clientId, $cookieResponse, $includeFullContext, $requestContextHint): void { $this->streamAgentResponse( prompt: $prompt, clientId: $clientId, includeFullContext: $includeFullContext, cookieResponse: $cookieResponse, - jobId: null + jobId: null, + requestContextHint: $requestContextHint ); }, Response::HTTP_OK, @@ -159,7 +167,8 @@ final readonly class AskSseController string $clientId, bool $includeFullContext, ?Response $cookieResponse, - ?string $jobId = null + ?string $jobId = null, + string $requestContextHint = '' ): void { $this->prepareStreamRuntime(); $this->registerStreamShutdownErrorHandler($jobId); @@ -181,7 +190,7 @@ final readonly class AskSseController } try { - foreach ($this->agentRunner->run($prompt, $clientId, $includeFullContext) as $chunk) { + foreach ($this->agentRunner->run($prompt, $clientId, $includeFullContext, $requestContextHint) as $chunk) { if (connection_aborted() === 1) { $this->markJobStatus( $jobId, @@ -192,7 +201,8 @@ final readonly class AskSseController } $chunk = str_replace(["\r\n", "\r"], "\n", $chunk); - $this->sendData($chunk); + $eventId = $this->appendJobOutput($jobId, $chunk); + $this->sendData($chunk, $eventId); } } catch (\Throwable $e) { $message = 'Stream abgebrochen: ' . $this->formatThrowableForClient($e); @@ -261,6 +271,24 @@ final readonly class AskSseController }); } + private function sanitizeRequestContextHint(string $contextHint): string + { + $contextHint = str_replace(["\r\n", "\r"], "\n", $contextHint); + $contextHint = preg_replace('/[\t ]+/u', ' ', $contextHint) ?? $contextHint; + $contextHint = preg_replace('/\n{3,}/u', "\n\n", $contextHint) ?? $contextHint; + $contextHint = trim($contextHint); + + if ($contextHint === '') { + return ''; + } + + if (mb_strlen($contextHint, 'UTF-8') > 4000) { + $contextHint = mb_substr($contextHint, 0, 4000, 'UTF-8'); + } + + return trim($contextHint); + } + private function formatThrowableForClient(\Throwable $e): string { $message = trim($e->getMessage()); @@ -297,13 +325,17 @@ final readonly class AskSseController ]; } - private function sendData(string $data): void + private function sendData(string $data, ?int $eventId = null): void { if ($data === '') { $this->sendComment('keepalive'); return; } + if ($eventId !== null && $eventId > 0) { + echo 'id: ' . $eventId . "\n"; + } + $lines = explode("\n", $data); foreach ($lines as $line) { @@ -511,22 +543,239 @@ final readonly class AskSseController } } } + private function resolveLastEventId(Request $request): int + { + $header = trim((string) $request->headers->get('Last-Event-ID', '')); + + if ($header === '' || !ctype_digit($header)) { + return 0; + } + + return max(0, (int) $header); + } + /** - * EventSource may reconnect to an already running or already completed job. - * Those duplicate connections should be closed quietly so the UI does not - * append a misleading error after the real stream already produced output. - * * @param array $claim */ - private function shouldSilentlyCloseDuplicateJobStream(array $claim): bool + private function canReplayOrTailClaimedJob(array $claim): bool { if (($claim['reason'] ?? null) !== 'not_pending') { return false; } - $status = (string) ($claim['status'] ?? ''); + return in_array( + (string) ($claim['status'] ?? ''), + [ + self::JOB_STATUS_RUNNING, + self::JOB_STATUS_COMPLETED, + self::JOB_STATUS_INTERRUPTED, + self::JOB_STATUS_FAILED, + ], + true + ); + } - return $status === self::JOB_STATUS_COMPLETED; + private function streamStoredJobResponse(string $jobId, int $lastEventId): void + { + $afterEventId = max(0, $lastEventId); + $lastKeepaliveAt = 0; + + while (true) { + if (connection_aborted() === 1) { + return; + } + + foreach ($this->readJobOutputAfter($jobId, $afterEventId) as $event) { + $eventId = (int) ($event['id'] ?? 0); + $data = is_string($event['data'] ?? null) ? (string) $event['data'] : ''; + + if ($eventId <= $afterEventId || $data === '') { + continue; + } + + $this->sendData($data, $eventId); + $afterEventId = $eventId; + } + + $job = $this->readJob($jobId); + $status = is_array($job) ? (string) ($job['status'] ?? '') : ''; + $message = is_array($job) && is_string($job['message'] ?? null) + ? trim((string) $job['message']) + : ''; + + if ($status === self::JOB_STATUS_COMPLETED) { + $this->sendComment('replayed-completed-stream'); + $this->sendEvent('done', '[DONE]'); + return; + } + + if ($status === self::JOB_STATUS_FAILED) { + $this->sendEvent( + 'error', + $message !== '' + ? 'Der Antwort-Stream ist fehlgeschlagen: ' . $message + : 'Der Antwort-Stream ist fehlgeschlagen. Bitte sende die Anfrage erneut.' + ); + $this->sendEvent('done', '[DONE]'); + return; + } + + if ($status === self::JOB_STATUS_INTERRUPTED) { + $this->sendEvent( + 'error', + $message !== '' + ? $message + : 'Der Antwort-Stream wurde durch einen Verbindungsabbruch unterbrochen. Bitte sende die Anfrage erneut, falls die Antwort unvollständig ist.' + ); + $this->sendEvent('done', '[DONE]'); + return; + } + + if ($status !== self::JOB_STATUS_RUNNING) { + $this->sendEvent('error', $this->jobClaimErrorMessage([ + 'reason' => 'not_pending', + 'status' => $status, + 'message' => $message, + ])); + $this->sendEvent('done', '[DONE]'); + return; + } + + if (time() - $lastKeepaliveAt >= 10) { + $this->sendComment('waiting-for-running-stream'); + $lastKeepaliveAt = time(); + } + + usleep(250000); + } + } + + /** + * @return list + */ + private function readJobOutputAfter(string $jobId, int $afterEventId): array + { + $path = $this->jobOutputPath($jobId); + + if (!is_file($path)) { + return []; + } + + $lines = @file($path, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES); + + if (!is_array($lines)) { + return []; + } + + $events = []; + + foreach ($lines as $line) { + if (!is_string($line) || trim($line) === '') { + continue; + } + + $decoded = json_decode($line, true); + + if (!is_array($decoded)) { + continue; + } + + $id = (int) ($decoded['id'] ?? 0); + $data = is_string($decoded['data'] ?? null) ? (string) $decoded['data'] : ''; + + if ($id > $afterEventId && $data !== '') { + $events[] = ['id' => $id, 'data' => $data]; + } + } + + return $events; + } + + private function appendJobOutput(?string $jobId, string $data): ?int + { + if ($jobId === null || $data === '' || !preg_match('/\A[a-f0-9]{48}\z/', $jobId)) { + return null; + } + + $eventId = null; + + try { + $this->mutateJobWithLock($jobId, function (?array $job) use (&$eventId): array { + if ($job === null) { + return [ + 'persist' => false, + 'result' => ['ok' => false], + ]; + } + + $eventId = max(0, (int) ($job['lastEventId'] ?? 0)) + 1; + $job['lastEventId'] = $eventId; + $job['updatedAt'] = time(); + + return [ + 'data' => $job, + 'result' => ['ok' => true], + ]; + }); + + if ($eventId === null || $eventId <= 0) { + return null; + } + + $line = json_encode( + ['id' => $eventId, 'data' => $data], + JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES + ) . "\n"; + + if (file_put_contents($this->jobOutputPath($jobId), $line, FILE_APPEND | LOCK_EX) === false) { + return null; + } + } catch (\Throwable) { + return null; + } + + return $eventId; + } + + /** + * @return array|null + */ + private function readJob(string $jobId): ?array + { + if (!preg_match('/\A[a-f0-9]{48}\z/', $jobId)) { + return null; + } + + $path = $this->jobPath($jobId); + + if (!is_file($path)) { + return null; + } + + $handle = @fopen($path, 'r'); + + if ($handle === false) { + return null; + } + + try { + if (!flock($handle, LOCK_SH)) { + return null; + } + + $content = stream_get_contents($handle); + flock($handle, LOCK_UN); + } finally { + fclose($handle); + } + + if (!is_string($content) || trim($content) === '') { + return null; + } + + $decoded = json_decode($content, true); + + return is_array($decoded) ? $decoded : null; } /** @@ -590,7 +839,9 @@ final readonly class AskSseController $mtime = filemtime($path); if ($mtime === false || $mtime < $threshold) { + $base = preg_replace('/\.json\z/', '', $path) ?? $path; @unlink($path); + @unlink($base . '.stream.ndjson'); } } } @@ -600,6 +851,11 @@ final readonly class AskSseController return $this->jobDirectory() . '/' . $jobId . '.json'; } + private function jobOutputPath(string $jobId): string + { + return $this->jobDirectory() . '/' . $jobId . '.stream.ndjson'; + } + private function jobDirectory(): string { return rtrim($this->projectDir, '/\\') . '/var/stream_jobs';