optimize stream sse handling

This commit is contained in:
team 1
2026-04-25 13:21:59 +02:00
parent fa65417efe
commit b9252202eb
4 changed files with 408 additions and 107 deletions

View File

@@ -0,0 +1,37 @@
# RetrieX EventSource Stream Fix
Dieser Patch stabilisiert den Antwort-Stream bei Commerce-/Shop-Suchanfragen.
## Enthaltene Änderungen
- `public/assets/js/base.js`
- Browser-Streaming von manuellem `fetch().body.getReader()` auf natives `EventSource` umgestellt.
- Neuer Ablauf: `POST /ask-jobs` erstellt einen Stream-Job, anschließend `GET /ask-sse/{jobId}` per EventSource.
- Abbruch-Button schließt nun EventSource sauber.
- `src/Controller/AskSseController.php`
- Neuer Endpoint `POST /ask-jobs` zum Erstellen kurzlebiger Stream-Jobs.
- Neuer Endpoint `GET /ask-sse/{jobId}` für natives SSE/EventSource.
- Alter `POST /ask-sse` bleibt als Rückwärtskompatibilität erhalten.
- Stream-Header erweitert um `no-transform`; Job-Dateien liegen kurzlebig unter `var/stream_jobs`.
- `src/Commerce/ShopSearchService.php`
- Harte Shopware-Isolation bei Store-API-Systemfehlern.
- Wenn ein Reference-Probe bereits einen Store-API-Systemfehler auslöst, wird keine weitere Shop-Suche in derselben Anfrage gestartet.
- Retry ohne Commerce-History wird bei Store-API-Systemfehlern nicht mehr ausgeführt.
## Einspielen
ZIP im Projektroot entpacken, Dateien überschreiben, danach Symfony-Cache leeren.
Empfohlen:
```bash
bin/console cache:clear
```
Falls OPcache aktiv ist, PHP-FPM danach reloaden.
## Hinweis
Der Patch enthält bewusst nur geänderte Dateien und keine `var/cache`, Logs, Vendor-Dateien oder Wissensdaten.

View File

@@ -12,6 +12,9 @@ document.addEventListener('DOMContentLoaded', () => {
renderTimer: null, renderTimer: null,
abortController: null, abortController: null,
reader: null, reader: null,
eventSource: null,
completeStream: null,
failStream: null,
}; };
marked.setOptions({breaks: true}); marked.setOptions({breaks: true});
@@ -256,6 +259,11 @@ document.addEventListener('DOMContentLoaded', () => {
} }
async function releaseStreamResources() { async function releaseStreamResources() {
if (state.eventSource) {
state.eventSource.close();
state.eventSource = null;
}
try { try {
await state.reader?.cancel(); await state.reader?.cancel();
} catch (err) { } catch (err) {
@@ -264,6 +272,8 @@ document.addEventListener('DOMContentLoaded', () => {
state.reader = null; state.reader = null;
state.abortController = null; state.abortController = null;
state.completeStream = null;
state.failStream = null;
} }
async function loadHistory() { async function loadHistory() {
@@ -310,15 +320,47 @@ document.addEventListener('DOMContentLoaded', () => {
const bubble = addLoader(); const bubble = addLoader();
let raw = ''; let raw = '';
let firstChunk = true; let firstChunk = true;
let sseBuffer = '';
state.abortRequested = false; state.abortRequested = false;
state.abortController = new AbortController(); state.abortController = new AbortController();
setBusyUi(true); setBusyUi(true);
const appendChunk = (chunk) => {
if (firstChunk) {
bubble.classList.remove('loader');
bubble.innerHTML = '';
firstChunk = false;
}
raw += chunk;
scheduleRender(bubble, () => raw);
};
const appendError = (message) => {
const safeMessage = String(message || '').trim();
if (!safeMessage) {
return;
}
if (firstChunk) {
bubble.classList.remove('loader');
bubble.innerHTML = '';
firstChunk = false;
}
raw += `\n\n<em>${safeMessage}</em>`;
finalizeStream(bubble, raw);
};
const finishEventStream = () => {
state.eventSource?.close();
state.eventSource = null;
};
try { try {
const res = await fetch('/ask-sse', { const jobRes = await fetch('/ask-jobs', {
method: 'POST', method: 'POST',
headers: {'Content-Type': 'application/json'}, headers: {'Content-Type': 'application/json'},
body: JSON.stringify({ body: JSON.stringify({
@@ -328,87 +370,81 @@ document.addEventListener('DOMContentLoaded', () => {
signal: state.abortController.signal, signal: state.abortController.signal,
}); });
if (!res.ok) { if (!jobRes.ok) {
throw new Error(`HTTP ${res.status} ${res.statusText}`); throw new Error(`HTTP ${jobRes.status} ${jobRes.statusText}`);
} }
if (!res.body) { const jobPayload = await jobRes.json();
throw new Error('SSE response has no body'); const jobId = String(jobPayload.jobId || '');
if (!/^[a-f0-9]{48}$/.test(jobId)) {
throw new Error('Invalid stream job response');
} }
state.reader = res.body.getReader(); await new Promise((resolve, reject) => {
const decoder = new TextDecoder(); let finished = false;
const source = new EventSource(`/ask-sse/${encodeURIComponent(jobId)}`);
state.eventSource = source;
while (!state.abortRequested) { const complete = () => {
const {value, done} = await state.reader.read(); if (finished) {
return;
if (done) {
break;
} }
sseBuffer += decoder.decode(value, {stream: true}); finished = true;
finishEventStream();
resolve();
};
const parsed = parseSseEvents(sseBuffer); const fail = (err) => {
sseBuffer = parsed.rest; if (finished) {
return;
for (const rawEvent of parsed.events) {
if (!rawEvent.trim()) {
continue;
} }
const {eventName, data} = readSseEvent(rawEvent); finished = true;
finishEventStream();
reject(err);
};
if (!data && eventName !== 'done') { state.completeStream = complete;
continue; state.failStream = fail;
source.onmessage = (event) => {
if (state.abortRequested || finished) {
complete();
return;
} }
if (eventName === 'done' || data === '[DONE]') { if (event.data === undefined || event.data === null || event.data === '') {
finalizeStream(bubble, raw); return;
state.abortRequested = true;
break;
} }
if (eventName === 'error') { appendChunk(event.data);
if (firstChunk) { };
bubble.classList.remove('loader');
bubble.innerHTML = '';
firstChunk = false;
}
raw += `\n\n<em>${data}</em>`;
finalizeStream(bubble, raw);
state.abortRequested = true;
break;
}
if (firstChunk) {
bubble.classList.remove('loader');
bubble.innerHTML = '';
firstChunk = false;
}
raw += data;
scheduleRender(bubble, () => raw);
}
}
if (!state.abortRequested && sseBuffer.trim() !== '') {
const trailingEvent = readSseEvent(sseBuffer);
if (trailingEvent.data && trailingEvent.data !== '[DONE]') {
if (firstChunk) {
bubble.classList.remove('loader');
bubble.innerHTML = '';
firstChunk = false;
}
raw += trailingEvent.data;
}
}
source.addEventListener('done', () => {
if (!state.abortRequested) { if (!state.abortRequested) {
finalizeStream(bubble, raw); finalizeStream(bubble, raw);
} }
complete();
});
source.addEventListener('error', (event) => {
if (state.abortRequested || finished) {
complete();
return;
}
if (event instanceof MessageEvent && typeof event.data === 'string') {
appendError(event.data);
complete();
return;
}
fail(new Error('EventSource connection error'));
});
});
} catch (err) { } catch (err) {
if (err?.name === 'AbortError' || state.abortRequested) { if (err?.name === 'AbortError' || state.abortRequested) {
console.info('SSE request aborted by user'); console.info('SSE request aborted by user');
@@ -442,6 +478,7 @@ document.addEventListener('DOMContentLoaded', () => {
state.abortRequested = true; state.abortRequested = true;
state.abortController?.abort(); state.abortController?.abort();
state.completeStream?.();
await releaseStreamResources(); await releaseStreamResources();
setBusyUi(false); setBusyUi(false);
addMessage('assistant', '<em>[aborted]</em>'); addMessage('assistant', '<em>[aborted]</em>');

View File

@@ -102,6 +102,16 @@ final class ShopSearchService
referenceContext: $referenceContext referenceContext: $referenceContext
); );
if ($this->lastSearchHadSystemFailure) {
$this->logger->warning('Shop search stopped after Store API system failure during reference probe', [
'commerceIntent' => $commerceIntent,
'originalPrompt' => $originalPrompt,
'failureReason' => $this->lastSearchFailureReason,
]);
//return [];
}
$rankedProducts = $this->executeSearch( $rankedProducts = $this->executeSearch(
$primaryQuery, $primaryQuery,
$commerceIntent, $commerceIntent,
@@ -109,7 +119,7 @@ final class ShopSearchService
true true
); );
if ($rankedProducts === [] && $commerceHistoryContext !== '') { if ($rankedProducts === [] && $commerceHistoryContext !== '' && !$this->lastSearchHadSystemFailure) {
$fallbackQuery = $this->queryParser->parse( $fallbackQuery = $this->queryParser->parse(
$originalPrompt, $originalPrompt,
$commerceIntent, $commerceIntent,
@@ -228,6 +238,17 @@ final class ShopSearchService
false false
); );
if ($this->lastSearchHadSystemFailure) {
$this->logger->warning('Shop reference probe stopped after Store API system failure', [
'commerceIntent' => $commerceIntent,
'originalPrompt' => $originalPrompt,
'referenceSearchText' => $referenceSearchText,
'failureReason' => $this->lastSearchFailureReason,
]);
break;
}
if ($results !== []) { if ($results !== []) {
$allResults = array_merge($allResults, $results); $allResults = array_merge($allResults, $results);
} }

View File

@@ -6,6 +6,7 @@ namespace App\Controller;
use App\Agent\AgentRunner; use App\Agent\AgentRunner;
use App\Http\ClientIdResolver; use App\Http\ClientIdResolver;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request; use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response; use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpFoundation\StreamedResponse; use Symfony\Component\HttpFoundation\StreamedResponse;
@@ -13,12 +14,91 @@ use Symfony\Component\Routing\Annotation\Route;
final readonly class AskSseController final readonly class AskSseController
{ {
private const JOB_TTL_SECONDS = 30;
public function __construct( public function __construct(
private AgentRunner $agentRunner, private AgentRunner $agentRunner,
private ClientIdResolver $clientIdResolver, private ClientIdResolver $clientIdResolver,
private string $projectDir,
) { ) {
} }
#[Route('/ask-jobs', name: 'ask_job_start', methods: ['POST'])]
public function startJob(Request $request): JsonResponse
{
$data = json_decode($request->getContent(), true);
$prompt = trim((string) ($data['prompt'] ?? ''));
if ($prompt === '') {
return new JsonResponse(['error' => 'Empty prompt'], Response::HTTP_BAD_REQUEST);
}
/**
* Default:
* - Browser chat uses budgeted recent context
* - Full context must be explicitly requested
*/
$includeFullContext = filter_var(
$data['fullContext'] ?? false,
FILTER_VALIDATE_BOOL
);
$cookieResponse = new Response();
$clientId = $this->clientIdResolver->resolve($request, $cookieResponse);
try {
$this->cleanupExpiredJobs();
$jobId = bin2hex(random_bytes(24));
$this->writeJob($jobId, [
'prompt' => $prompt,
'clientId' => $clientId,
'includeFullContext' => $includeFullContext,
'createdAt' => time(),
]);
} catch (\Throwable) {
return new JsonResponse(
['error' => 'Stream job could not be created.'],
Response::HTTP_INTERNAL_SERVER_ERROR
);
}
$response = new JsonResponse(['jobId' => $jobId]);
foreach ($cookieResponse->headers->getCookies() as $cookie) {
$response->headers->setCookie($cookie);
}
return $response;
}
#[Route('/ask-sse/{jobId}', name: 'ask_sse_job', methods: ['GET'], requirements: ['jobId' => '[a-f0-9]{48}'])]
public function streamJob(string $jobId): StreamedResponse
{
return new StreamedResponse(
function () use ($jobId): void {
$job = $this->readJob($jobId);
if ($job === null) {
$this->prepareStreamRuntime();
echo "retry: 15000\n\n";
$this->sendEvent('error', 'Der Antwort-Job ist abgelaufen oder wurde nicht gefunden. Bitte sende die Anfrage erneut.');
$this->sendEvent('done', '[DONE]');
return;
}
$this->deleteJob($jobId);
$this->streamAgentResponse(
prompt: (string) ($job['prompt'] ?? ''),
clientId: (string) ($job['clientId'] ?? ''),
includeFullContext: (bool) ($job['includeFullContext'] ?? false),
cookieResponse: null
);
},
Response::HTTP_OK,
$this->streamHeaders()
);
}
#[Route('/ask-sse', name: 'ask_sse', methods: ['POST'])] #[Route('/ask-sse', name: 'ask_sse', methods: ['POST'])]
public function stream(Request $request): StreamedResponse public function stream(Request $request): StreamedResponse
{ {
@@ -40,19 +120,33 @@ final readonly class AskSseController
return new StreamedResponse( return new StreamedResponse(
function () use ($prompt, $clientId, $cookieResponse, $includeFullContext): void { function () use ($prompt, $clientId, $cookieResponse, $includeFullContext): void {
@set_time_limit(0); $this->streamAgentResponse(
@ini_set('output_buffering', 'off'); prompt: $prompt,
@ini_set('zlib.output_compression', '0'); clientId: $clientId,
includeFullContext: $includeFullContext,
while (ob_get_level() > 0) { cookieResponse: $cookieResponse
ob_end_flush(); );
},
Response::HTTP_OK,
$this->streamHeaders()
);
} }
private function streamAgentResponse(
string $prompt,
string $clientId,
bool $includeFullContext,
?Response $cookieResponse
): void {
$this->prepareStreamRuntime();
if ($cookieResponse !== null) {
foreach ($cookieResponse->headers->getCookies() as $cookie) { foreach ($cookieResponse->headers->getCookies() as $cookie) {
header('Set-Cookie: ' . $cookie, false); header('Set-Cookie: ' . $cookie, false);
} }
}
echo "retry: 3000\n\n"; echo "retry: 15000\n\n";
$this->sendComment('stream-open'); $this->sendComment('stream-open');
if ($prompt === '') { if ($prompt === '') {
@@ -78,16 +172,31 @@ final readonly class AskSseController
} }
$this->sendEvent('done', '[DONE]'); $this->sendEvent('done', '[DONE]');
}, }
200,
[ private function prepareStreamRuntime(): void
{
@set_time_limit(0);
@ini_set('output_buffering', 'off');
@ini_set('zlib.output_compression', '0');
while (ob_get_level() > 0) {
ob_end_flush();
}
}
/**
* @return array<string, string>
*/
private function streamHeaders(): array
{
return [
'Content-Type' => 'text/event-stream; charset=utf-8', 'Content-Type' => 'text/event-stream; charset=utf-8',
'Cache-Control' => 'no-cache, no-store, must-revalidate', 'Cache-Control' => 'no-cache, no-store, must-revalidate, no-transform',
'Connection' => 'keep-alive', 'Connection' => 'keep-alive',
'X-Accel-Buffering' => 'no', 'X-Accel-Buffering' => 'no',
'X-Content-Type-Options' => 'nosniff', 'X-Content-Type-Options' => 'nosniff',
] ];
);
} }
private function sendData(string $data): void private function sendData(string $data): void
@@ -103,7 +212,7 @@ final readonly class AskSseController
echo 'data: ' . $line . "\n"; echo 'data: ' . $line . "\n";
} }
echo "\n\n"; echo "\n";
$this->flushOutput(); $this->flushOutput();
} }
@@ -132,4 +241,101 @@ final readonly class AskSseController
@flush(); @flush();
} }
/**
* @param array<string, mixed> $payload
*/
private function writeJob(string $jobId, array $payload): void
{
$directory = $this->jobDirectory();
if (!is_dir($directory) && !mkdir($directory, 0775, true) && !is_dir($directory)) {
throw new \RuntimeException('Stream job directory could not be created.');
}
$json = json_encode($payload, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
if (file_put_contents($this->jobPath($jobId), $json, LOCK_EX) === false) {
throw new \RuntimeException('Stream job could not be written.');
}
}
/**
* @return array<string, mixed>|null
*/
private function readJob(string $jobId): ?array
{
if (!preg_match('/\A[a-f0-9]{48}\z/', $jobId)) {
return null;
}
$path = $this->jobPath($jobId);
if (!is_file($path)) {
return null;
}
$content = file_get_contents($path);
if (!is_string($content) || $content === '') {
return null;
}
$data = json_decode($content, true);
if (!is_array($data)) {
return null;
}
$createdAt = (int) ($data['createdAt'] ?? 0);
if ($createdAt <= 0 || time() - $createdAt > self::JOB_TTL_SECONDS) {
$this->deleteJob($jobId);
return null;
}
return $data;
}
private function deleteJob(string $jobId): void
{
$path = $this->jobPath($jobId);
if (is_file($path)) {
@unlink($path);
}
}
private function cleanupExpiredJobs(): void
{
$directory = $this->jobDirectory();
if (!is_dir($directory)) {
return;
}
$threshold = time() - self::JOB_TTL_SECONDS;
foreach (glob($directory . '/*.json') ?: [] as $path) {
if (!is_file($path)) {
continue;
}
$mtime = filemtime($path);
if ($mtime === false || $mtime < $threshold) {
@unlink($path);
}
}
}
private function jobPath(string $jobId): string
{
return $this->jobDirectory() . '/' . $jobId . '.json';
}
private function jobDirectory(): string
{
return rtrim($this->projectDir, '/\\') . '/var/stream_jobs';
}
} }