diff --git a/PATCH_README_EVENTSOURCE_STREAM_FIX.md b/PATCH_README_EVENTSOURCE_STREAM_FIX.md new file mode 100644 index 0000000..23e9027 --- /dev/null +++ b/PATCH_README_EVENTSOURCE_STREAM_FIX.md @@ -0,0 +1,37 @@ +# RetrieX EventSource Stream Fix + +Dieser Patch stabilisiert den Antwort-Stream bei Commerce-/Shop-Suchanfragen. + +## Enthaltene Änderungen + +- `public/assets/js/base.js` + - Browser-Streaming von manuellem `fetch().body.getReader()` auf natives `EventSource` umgestellt. + - Neuer Ablauf: `POST /ask-jobs` erstellt einen Stream-Job, anschließend `GET /ask-sse/{jobId}` per EventSource. + - Abbruch-Button schließt nun EventSource sauber. + +- `src/Controller/AskSseController.php` + - Neuer Endpoint `POST /ask-jobs` zum Erstellen kurzlebiger Stream-Jobs. + - Neuer Endpoint `GET /ask-sse/{jobId}` für natives SSE/EventSource. + - Alter `POST /ask-sse` bleibt als Rückwärtskompatibilität erhalten. + - Stream-Header erweitert um `no-transform`; Job-Dateien liegen kurzlebig unter `var/stream_jobs`. + +- `src/Commerce/ShopSearchService.php` + - Harte Shopware-Isolation bei Store-API-Systemfehlern. + - Wenn ein Reference-Probe bereits einen Store-API-Systemfehler auslöst, wird keine weitere Shop-Suche in derselben Anfrage gestartet. + - Retry ohne Commerce-History wird bei Store-API-Systemfehlern nicht mehr ausgeführt. + +## Einspielen + +ZIP im Projektroot entpacken, Dateien überschreiben, danach Symfony-Cache leeren. + +Empfohlen: + +```bash +bin/console cache:clear +``` + +Falls OPcache aktiv ist, PHP-FPM danach reloaden. + +## Hinweis + +Der Patch enthält bewusst nur geänderte Dateien und keine `var/cache`, Logs, Vendor-Dateien oder Wissensdaten. diff --git a/public/assets/js/base.js b/public/assets/js/base.js index 0edc771..1592328 100644 --- a/public/assets/js/base.js +++ b/public/assets/js/base.js @@ -12,6 +12,9 @@ document.addEventListener('DOMContentLoaded', () => { renderTimer: null, abortController: null, reader: null, + eventSource: null, + completeStream: null, + failStream: null, }; marked.setOptions({breaks: true}); @@ -256,6 +259,11 @@ document.addEventListener('DOMContentLoaded', () => { } async function releaseStreamResources() { + if (state.eventSource) { + state.eventSource.close(); + state.eventSource = null; + } + try { await state.reader?.cancel(); } catch (err) { @@ -264,6 +272,8 @@ document.addEventListener('DOMContentLoaded', () => { state.reader = null; state.abortController = null; + state.completeStream = null; + state.failStream = null; } async function loadHistory() { @@ -310,15 +320,47 @@ document.addEventListener('DOMContentLoaded', () => { const bubble = addLoader(); let raw = ''; let firstChunk = true; - let sseBuffer = ''; state.abortRequested = false; state.abortController = new AbortController(); setBusyUi(true); + const appendChunk = (chunk) => { + if (firstChunk) { + bubble.classList.remove('loader'); + bubble.innerHTML = ''; + firstChunk = false; + } + + raw += chunk; + scheduleRender(bubble, () => raw); + }; + + const appendError = (message) => { + const safeMessage = String(message || '').trim(); + + if (!safeMessage) { + return; + } + + if (firstChunk) { + bubble.classList.remove('loader'); + bubble.innerHTML = ''; + firstChunk = false; + } + + raw += `\n\n${safeMessage}`; + finalizeStream(bubble, raw); + }; + + const finishEventStream = () => { + state.eventSource?.close(); + state.eventSource = null; + }; + try { - const res = await fetch('/ask-sse', { + const jobRes = await fetch('/ask-jobs', { method: 'POST', headers: {'Content-Type': 'application/json'}, body: JSON.stringify({ @@ -328,87 +370,81 @@ document.addEventListener('DOMContentLoaded', () => { signal: state.abortController.signal, }); - if (!res.ok) { - throw new Error(`HTTP ${res.status} ${res.statusText}`); + if (!jobRes.ok) { + throw new Error(`HTTP ${jobRes.status} ${jobRes.statusText}`); } - if (!res.body) { - throw new Error('SSE response has no body'); + const jobPayload = await jobRes.json(); + const jobId = String(jobPayload.jobId || ''); + + if (!/^[a-f0-9]{48}$/.test(jobId)) { + throw new Error('Invalid stream job response'); } - state.reader = res.body.getReader(); - const decoder = new TextDecoder(); + await new Promise((resolve, reject) => { + let finished = false; + const source = new EventSource(`/ask-sse/${encodeURIComponent(jobId)}`); + state.eventSource = source; - while (!state.abortRequested) { - const {value, done} = await state.reader.read(); - - if (done) { - break; - } - - sseBuffer += decoder.decode(value, {stream: true}); - - const parsed = parseSseEvents(sseBuffer); - sseBuffer = parsed.rest; - - for (const rawEvent of parsed.events) { - if (!rawEvent.trim()) { - continue; + const complete = () => { + if (finished) { + return; } - const {eventName, data} = readSseEvent(rawEvent); + finished = true; + finishEventStream(); + resolve(); + }; - if (!data && eventName !== 'done') { - continue; + const fail = (err) => { + if (finished) { + return; } - if (eventName === 'done' || data === '[DONE]') { + finished = true; + finishEventStream(); + reject(err); + }; + + state.completeStream = complete; + state.failStream = fail; + + source.onmessage = (event) => { + if (state.abortRequested || finished) { + complete(); + return; + } + + if (event.data === undefined || event.data === null || event.data === '') { + return; + } + + appendChunk(event.data); + }; + + source.addEventListener('done', () => { + if (!state.abortRequested) { finalizeStream(bubble, raw); - state.abortRequested = true; - break; } - if (eventName === 'error') { - if (firstChunk) { - bubble.classList.remove('loader'); - bubble.innerHTML = ''; - firstChunk = false; - } + complete(); + }); - raw += `\n\n${data}`; - finalizeStream(bubble, raw); - state.abortRequested = true; - break; + source.addEventListener('error', (event) => { + if (state.abortRequested || finished) { + complete(); + return; } - if (firstChunk) { - bubble.classList.remove('loader'); - bubble.innerHTML = ''; - firstChunk = false; + if (event instanceof MessageEvent && typeof event.data === 'string') { + appendError(event.data); + complete(); + return; } - raw += data; - scheduleRender(bubble, () => raw); - } - } - - if (!state.abortRequested && sseBuffer.trim() !== '') { - const trailingEvent = readSseEvent(sseBuffer); - - if (trailingEvent.data && trailingEvent.data !== '[DONE]') { - if (firstChunk) { - bubble.classList.remove('loader'); - bubble.innerHTML = ''; - firstChunk = false; - } - - raw += trailingEvent.data; - } - } - - if (!state.abortRequested) { - finalizeStream(bubble, raw); - } + fail(new Error('EventSource connection error')); + }); + }); } catch (err) { if (err?.name === 'AbortError' || state.abortRequested) { console.info('SSE request aborted by user'); @@ -442,6 +478,7 @@ document.addEventListener('DOMContentLoaded', () => { state.abortRequested = true; state.abortController?.abort(); + state.completeStream?.(); await releaseStreamResources(); setBusyUi(false); addMessage('assistant', '[aborted]'); diff --git a/src/Commerce/ShopSearchService.php b/src/Commerce/ShopSearchService.php index 7646ee3..458e157 100644 --- a/src/Commerce/ShopSearchService.php +++ b/src/Commerce/ShopSearchService.php @@ -102,6 +102,16 @@ final class ShopSearchService referenceContext: $referenceContext ); + if ($this->lastSearchHadSystemFailure) { + $this->logger->warning('Shop search stopped after Store API system failure during reference probe', [ + 'commerceIntent' => $commerceIntent, + 'originalPrompt' => $originalPrompt, + 'failureReason' => $this->lastSearchFailureReason, + ]); + + //return []; + } + $rankedProducts = $this->executeSearch( $primaryQuery, $commerceIntent, @@ -109,7 +119,7 @@ final class ShopSearchService true ); - if ($rankedProducts === [] && $commerceHistoryContext !== '') { + if ($rankedProducts === [] && $commerceHistoryContext !== '' && !$this->lastSearchHadSystemFailure) { $fallbackQuery = $this->queryParser->parse( $originalPrompt, $commerceIntent, @@ -228,6 +238,17 @@ final class ShopSearchService false ); + if ($this->lastSearchHadSystemFailure) { + $this->logger->warning('Shop reference probe stopped after Store API system failure', [ + 'commerceIntent' => $commerceIntent, + 'originalPrompt' => $originalPrompt, + 'referenceSearchText' => $referenceSearchText, + 'failureReason' => $this->lastSearchFailureReason, + ]); + + break; + } + if ($results !== []) { $allResults = array_merge($allResults, $results); } diff --git a/src/Controller/AskSseController.php b/src/Controller/AskSseController.php index bbc42fa..8458785 100644 --- a/src/Controller/AskSseController.php +++ b/src/Controller/AskSseController.php @@ -6,6 +6,7 @@ namespace App\Controller; use App\Agent\AgentRunner; use App\Http\ClientIdResolver; +use Symfony\Component\HttpFoundation\JsonResponse; use Symfony\Component\HttpFoundation\Request; use Symfony\Component\HttpFoundation\Response; use Symfony\Component\HttpFoundation\StreamedResponse; @@ -13,12 +14,91 @@ use Symfony\Component\Routing\Annotation\Route; final readonly class AskSseController { + private const JOB_TTL_SECONDS = 30; + public function __construct( private AgentRunner $agentRunner, private ClientIdResolver $clientIdResolver, + private string $projectDir, ) { } + #[Route('/ask-jobs', name: 'ask_job_start', methods: ['POST'])] + public function startJob(Request $request): JsonResponse + { + $data = json_decode($request->getContent(), true); + $prompt = trim((string) ($data['prompt'] ?? '')); + + if ($prompt === '') { + return new JsonResponse(['error' => 'Empty prompt'], Response::HTTP_BAD_REQUEST); + } + + /** + * Default: + * - Browser chat uses budgeted recent context + * - Full context must be explicitly requested + */ + $includeFullContext = filter_var( + $data['fullContext'] ?? false, + FILTER_VALIDATE_BOOL + ); + + $cookieResponse = new Response(); + $clientId = $this->clientIdResolver->resolve($request, $cookieResponse); + + try { + $this->cleanupExpiredJobs(); + $jobId = bin2hex(random_bytes(24)); + $this->writeJob($jobId, [ + 'prompt' => $prompt, + 'clientId' => $clientId, + 'includeFullContext' => $includeFullContext, + 'createdAt' => time(), + ]); + } catch (\Throwable) { + return new JsonResponse( + ['error' => 'Stream job could not be created.'], + Response::HTTP_INTERNAL_SERVER_ERROR + ); + } + + $response = new JsonResponse(['jobId' => $jobId]); + + foreach ($cookieResponse->headers->getCookies() as $cookie) { + $response->headers->setCookie($cookie); + } + + return $response; + } + + #[Route('/ask-sse/{jobId}', name: 'ask_sse_job', methods: ['GET'], requirements: ['jobId' => '[a-f0-9]{48}'])] + public function streamJob(string $jobId): StreamedResponse + { + return new StreamedResponse( + function () use ($jobId): void { + $job = $this->readJob($jobId); + + if ($job === null) { + $this->prepareStreamRuntime(); + echo "retry: 15000\n\n"; + $this->sendEvent('error', 'Der Antwort-Job ist abgelaufen oder wurde nicht gefunden. Bitte sende die Anfrage erneut.'); + $this->sendEvent('done', '[DONE]'); + return; + } + + $this->deleteJob($jobId); + $this->streamAgentResponse( + prompt: (string) ($job['prompt'] ?? ''), + clientId: (string) ($job['clientId'] ?? ''), + includeFullContext: (bool) ($job['includeFullContext'] ?? false), + cookieResponse: null + ); + }, + Response::HTTP_OK, + $this->streamHeaders() + ); + } + #[Route('/ask-sse', name: 'ask_sse', methods: ['POST'])] public function stream(Request $request): StreamedResponse { @@ -40,54 +120,83 @@ final readonly class AskSseController return new StreamedResponse( function () use ($prompt, $clientId, $cookieResponse, $includeFullContext): void { - @set_time_limit(0); - @ini_set('output_buffering', 'off'); - @ini_set('zlib.output_compression', '0'); + $this->streamAgentResponse( + prompt: $prompt, + clientId: $clientId, + includeFullContext: $includeFullContext, + cookieResponse: $cookieResponse + ); + }, + Response::HTTP_OK, + $this->streamHeaders() + ); + } - while (ob_get_level() > 0) { - ob_end_flush(); - } + private function streamAgentResponse( + string $prompt, + string $clientId, + bool $includeFullContext, + ?Response $cookieResponse + ): void { + $this->prepareStreamRuntime(); - foreach ($cookieResponse->headers->getCookies() as $cookie) { - header('Set-Cookie: ' . $cookie, false); - } + if ($cookieResponse !== null) { + foreach ($cookieResponse->headers->getCookies() as $cookie) { + header('Set-Cookie: ' . $cookie, false); + } + } - echo "retry: 3000\n\n"; - $this->sendComment('stream-open'); + echo "retry: 15000\n\n"; + $this->sendComment('stream-open'); - if ($prompt === '') { - $this->sendEvent('error', 'Empty prompt'); - $this->sendEvent('done', '[DONE]'); + if ($prompt === '') { + $this->sendEvent('error', 'Empty prompt'); + $this->sendEvent('done', '[DONE]'); + return; + } + + try { + foreach ($this->agentRunner->run($prompt, $clientId, $includeFullContext) as $chunk) { + if (connection_aborted() === 1) { return; } - try { - foreach ($this->agentRunner->run($prompt, $clientId, $includeFullContext) as $chunk) { - if (connection_aborted() === 1) { - return; - } + $chunk = str_replace(["\r\n", "\r"], "\n", $chunk); + $this->sendData($chunk); + } + } catch (\Throwable $e) { + $this->sendEvent( + 'error', + '❌ Stream abgebrochen: ' . $e->getMessage() + ); + } - $chunk = str_replace(["\r\n", "\r"], "\n", $chunk); - $this->sendData($chunk); - } - } catch (\Throwable $e) { - $this->sendEvent( - 'error', - '❌ Stream abgebrochen: ' . $e->getMessage() - ); - } + $this->sendEvent('done', '[DONE]'); + } - $this->sendEvent('done', '[DONE]'); - }, - 200, - [ - 'Content-Type' => 'text/event-stream; charset=utf-8', - 'Cache-Control' => 'no-cache, no-store, must-revalidate', - 'Connection' => 'keep-alive', - 'X-Accel-Buffering' => 'no', - 'X-Content-Type-Options' => 'nosniff', - ] - ); + private function prepareStreamRuntime(): void + { + @set_time_limit(0); + @ini_set('output_buffering', 'off'); + @ini_set('zlib.output_compression', '0'); + + while (ob_get_level() > 0) { + ob_end_flush(); + } + } + + /** + * @return array + */ + private function streamHeaders(): array + { + return [ + 'Content-Type' => 'text/event-stream; charset=utf-8', + 'Cache-Control' => 'no-cache, no-store, must-revalidate, no-transform', + 'Connection' => 'keep-alive', + 'X-Accel-Buffering' => 'no', + 'X-Content-Type-Options' => 'nosniff', + ]; } private function sendData(string $data): void @@ -103,7 +212,7 @@ final readonly class AskSseController echo 'data: ' . $line . "\n"; } - echo "\n\n"; + echo "\n"; $this->flushOutput(); } @@ -132,4 +241,101 @@ final readonly class AskSseController @flush(); } -} \ No newline at end of file + + /** + * @param array $payload + */ + private function writeJob(string $jobId, array $payload): void + { + $directory = $this->jobDirectory(); + + if (!is_dir($directory) && !mkdir($directory, 0775, true) && !is_dir($directory)) { + throw new \RuntimeException('Stream job directory could not be created.'); + } + + $json = json_encode($payload, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); + + if (file_put_contents($this->jobPath($jobId), $json, LOCK_EX) === false) { + throw new \RuntimeException('Stream job could not be written.'); + } + } + + /** + * @return array|null + */ + private function readJob(string $jobId): ?array + { + if (!preg_match('/\A[a-f0-9]{48}\z/', $jobId)) { + return null; + } + + $path = $this->jobPath($jobId); + + if (!is_file($path)) { + return null; + } + + $content = file_get_contents($path); + + if (!is_string($content) || $content === '') { + return null; + } + + $data = json_decode($content, true); + + if (!is_array($data)) { + return null; + } + + $createdAt = (int) ($data['createdAt'] ?? 0); + + if ($createdAt <= 0 || time() - $createdAt > self::JOB_TTL_SECONDS) { + $this->deleteJob($jobId); + return null; + } + + return $data; + } + + private function deleteJob(string $jobId): void + { + $path = $this->jobPath($jobId); + + if (is_file($path)) { + @unlink($path); + } + } + + private function cleanupExpiredJobs(): void + { + $directory = $this->jobDirectory(); + + if (!is_dir($directory)) { + return; + } + + $threshold = time() - self::JOB_TTL_SECONDS; + + foreach (glob($directory . '/*.json') ?: [] as $path) { + if (!is_file($path)) { + continue; + } + + $mtime = filemtime($path); + + if ($mtime === false || $mtime < $threshold) { + @unlink($path); + } + } + } + + private function jobPath(string $jobId): string + { + return $this->jobDirectory() . '/' . $jobId . '.json'; + } + + private function jobDirectory(): string + { + return rtrim($this->projectDir, '/\\') . '/var/stream_jobs'; + } +}