diff --git a/RETRIEX_SSE_JOB_HARDENING_FIX_README.md b/RETRIEX_SSE_JOB_HARDENING_FIX_README.md new file mode 100644 index 0000000..349797e --- /dev/null +++ b/RETRIEX_SSE_JOB_HARDENING_FIX_README.md @@ -0,0 +1,50 @@ +# RetrieX SSE Job Hardening Fix + +Patch-only fix for the browser streaming job lifecycle. + +## Problem + +`/ask-sse/{jobId}` deleted the stream job immediately when the first EventSource connection started. +If the browser, WLAN, router, proxy or PHP/Nginx connection briefly dropped, EventSource tried to reconnect with the same job id. The job file was already gone, so the user saw: + +> Der Antwort-Job ist abgelaufen oder wurde nicht gefunden. Bitte sende die Anfrage erneut. + +This made normal network interruptions look like an expired job. + +## Change + +`src/Controller/AskSseController.php` now keeps the job file for the configured TTL and uses explicit job states: + +- `pending` +- `running` +- `completed` +- `interrupted` +- `failed` + +The stream endpoint atomically claims a pending job under a file lock instead of deleting it immediately. Reconnects or duplicate opens no longer see a missing job; they receive a more accurate message depending on the stored state. + +## Runtime behavior + +- A new job is created as `pending`. +- The first `/ask-sse/{jobId}` request claims it as `running`. +- Successful completion marks it as `completed`. +- Browser/client connection abort marks it as `interrupted`. +- Stream exceptions or fatal shutdown errors mark it as `failed`. +- Old job files are still cleaned by `JOB_TTL_SECONDS`. + +## Safety + +This patch does not change Retrieval, PromptBuilder, AgentRunner, Shopware, Intent, Vocabulary, scoring or RAG behavior. +It only hardens the SSE job lifecycle and improves user-facing error messages for reconnect/network cases. + +## After applying + +Run: + +```bash +php bin/console cache:clear +php bin/console mto:agent:config:validate +php bin/console mto:agent:regression:test +``` + +Then test in the browser with a normal prompt and, if possible, simulate a short network interruption during streaming. diff --git a/src/Controller/AskSseController.php b/src/Controller/AskSseController.php index 0b57d89..84f17bd 100644 --- a/src/Controller/AskSseController.php +++ b/src/Controller/AskSseController.php @@ -17,6 +17,12 @@ final readonly class AskSseController { private const JOB_TTL_SECONDS = 900; + private const JOB_STATUS_PENDING = 'pending'; + private const JOB_STATUS_RUNNING = 'running'; + private const JOB_STATUS_COMPLETED = 'completed'; + private const JOB_STATUS_INTERRUPTED = 'interrupted'; + private const JOB_STATUS_FAILED = 'failed'; + public function __construct( private AgentRunner $agentRunner, private ClientIdResolver $clientIdResolver, @@ -51,11 +57,14 @@ final readonly class AskSseController try { $this->cleanupExpiredJobs(); $jobId = bin2hex(random_bytes(24)); + $now = time(); $this->writeJob($jobId, [ + 'status' => self::JOB_STATUS_PENDING, 'prompt' => $prompt, 'clientId' => $clientId, 'includeFullContext' => $includeFullContext, - 'createdAt' => time(), + 'createdAt' => $now, + 'updatedAt' => $now, ]); } catch (\Throwable $e) { return new JsonResponse( @@ -78,22 +87,25 @@ final readonly class AskSseController { return new StreamedResponse( function () use ($jobId): void { - $job = $this->readJob($jobId); + $claimed = $this->claimJob($jobId); - if ($job === null) { + if (($claimed['ok'] ?? false) !== true) { $this->prepareStreamRuntime(); echo "retry: 15000\n\n"; - $this->sendEvent('error', 'Der Antwort-Job ist abgelaufen oder wurde nicht gefunden. Bitte sende die Anfrage erneut.'); + $this->sendEvent('error', $this->jobClaimErrorMessage($claimed)); $this->sendEvent('done', '[DONE]'); return; } - $this->deleteJob($jobId); + /** @var array $job */ + $job = $claimed['job']; + $this->streamAgentResponse( prompt: (string) ($job['prompt'] ?? ''), clientId: (string) ($job['clientId'] ?? ''), includeFullContext: (bool) ($job['includeFullContext'] ?? false), - cookieResponse: null + cookieResponse: null, + jobId: $jobId ); }, Response::HTTP_OK, @@ -126,7 +138,8 @@ final readonly class AskSseController prompt: $prompt, clientId: $clientId, includeFullContext: $includeFullContext, - cookieResponse: $cookieResponse + cookieResponse: $cookieResponse, + jobId: null ); }, Response::HTTP_OK, @@ -138,10 +151,11 @@ final readonly class AskSseController string $prompt, string $clientId, bool $includeFullContext, - ?Response $cookieResponse + ?Response $cookieResponse, + ?string $jobId = null ): void { $this->prepareStreamRuntime(); - $this->registerStreamShutdownErrorHandler(); + $this->registerStreamShutdownErrorHandler($jobId); if ($cookieResponse !== null) { foreach ($cookieResponse->headers->getCookies() as $cookie) { @@ -153,6 +167,7 @@ final readonly class AskSseController $this->sendComment('stream-open'); if ($prompt === '') { + $this->markJobStatus($jobId, self::JOB_STATUS_FAILED, 'Empty prompt'); $this->sendEvent('error', 'Empty prompt'); $this->sendEvent('done', '[DONE]'); return; @@ -161,6 +176,11 @@ final readonly class AskSseController try { foreach ($this->agentRunner->run($prompt, $clientId, $includeFullContext) as $chunk) { if (connection_aborted() === 1) { + $this->markJobStatus( + $jobId, + self::JOB_STATUS_INTERRUPTED, + 'Die Verbindung zum Antwort-Stream wurde unterbrochen.' + ); return; } @@ -169,6 +189,7 @@ final readonly class AskSseController } } catch (\Throwable $e) { $message = 'Stream abgebrochen: ' . $this->formatThrowableForClient($e); + $this->markJobStatus($jobId, self::JOB_STATUS_FAILED, $message); $this->sendEvent( 'error', '❌ ' . $message @@ -177,8 +198,12 @@ final readonly class AskSseController if ($prompt !== '' && $clientId !== '') { $this->appendHistoryFailure($clientId, $prompt, $message); } + + $this->sendEvent('done', '[DONE]'); + return; } + $this->markJobStatus($jobId, self::JOB_STATUS_COMPLETED); $this->sendEvent('done', '[DONE]'); } @@ -201,9 +226,9 @@ final readonly class AskSseController } } - private function registerStreamShutdownErrorHandler(): void + private function registerStreamShutdownErrorHandler(?string $jobId = null): void { - register_shutdown_function(function (): void { + register_shutdown_function(function () use ($jobId): void { $error = error_get_last(); if ($error === null) { @@ -223,6 +248,7 @@ final readonly class AskSseController (string) ($error['line'] ?? '?') ); + $this->markJobStatus($jobId, self::JOB_STATUS_FAILED, $message); $this->sendEvent('error', $message); $this->sendEvent('done', '[DONE]'); }); @@ -326,49 +352,201 @@ final readonly class AskSseController } /** - * @return array|null + * @return array{ok: bool, reason?: string, status?: string, message?: string, job?: array} */ - private function readJob(string $jobId): ?array + private function claimJob(string $jobId): array { if (!preg_match('/\A[a-f0-9]{48}\z/', $jobId)) { - return null; + return ['ok' => false, 'reason' => 'invalid']; } - $path = $this->jobPath($jobId); + return $this->mutateJobWithLock($jobId, function (?array $data): array { + if ($data === null) { + return [ + 'persist' => false, + 'result' => ['ok' => false, 'reason' => 'missing'], + ]; + } - if (!is_file($path)) { - return null; - } + $createdAt = (int) ($data['createdAt'] ?? 0); - $content = file_get_contents($path); + if ($createdAt <= 0 || time() - $createdAt > self::JOB_TTL_SECONDS) { + return [ + 'delete' => true, + 'result' => ['ok' => false, 'reason' => 'expired'], + ]; + } - if (!is_string($content) || $content === '') { - return null; - } + $status = (string) ($data['status'] ?? self::JOB_STATUS_PENDING); - $data = json_decode($content, true); + if ($status !== self::JOB_STATUS_PENDING) { + return [ + 'persist' => false, + 'result' => [ + 'ok' => false, + 'reason' => 'not_pending', + 'status' => $status, + 'message' => is_string($data['message'] ?? null) ? (string) $data['message'] : null, + ], + ]; + } - if (!is_array($data)) { - return null; - } + $data['status'] = self::JOB_STATUS_RUNNING; + $data['startedAt'] = time(); + $data['updatedAt'] = time(); - $createdAt = (int) ($data['createdAt'] ?? 0); - - if ($createdAt <= 0 || time() - $createdAt > self::JOB_TTL_SECONDS) { - $this->deleteJob($jobId); - return null; - } - - return $data; + return [ + 'data' => $data, + 'result' => ['ok' => true, 'job' => $data], + ]; + }); } - private function deleteJob(string $jobId): void + private function markJobStatus(?string $jobId, string $status, ?string $message = null): void + { + if ($jobId === null || !preg_match('/\A[a-f0-9]{48}\z/', $jobId)) { + return; + } + + try { + $this->mutateJobWithLock($jobId, function (?array $data) use ($status, $message): array { + if ($data === null) { + return [ + 'persist' => false, + 'result' => ['ok' => false], + ]; + } + + $data['status'] = $status; + $data['updatedAt'] = time(); + + if ($status === self::JOB_STATUS_COMPLETED) { + $data['completedAt'] = time(); + unset($data['message']); + } elseif ($message !== null && trim($message) !== '') { + $data['message'] = trim(preg_replace('/\s+/u', ' ', $message) ?? $message); + } + + return [ + 'data' => $data, + 'result' => ['ok' => true], + ]; + }); + } catch (\Throwable) { + // Job status persistence must never break the already-running stream. + } + } + + /** + * @param callable(array|null): array $callback + * + * @return array + */ + private function mutateJobWithLock(string $jobId, callable $callback): array { $path = $this->jobPath($jobId); - - if (is_file($path)) { - @unlink($path); + if (!is_file($path)) { + return ['ok' => false, 'reason' => 'missing']; } + + $handle = @fopen($path, 'c+'); + + if ($handle === false) { + return ['ok' => false, 'reason' => 'missing']; + } + + try { + if (!flock($handle, LOCK_EX)) { + return ['ok' => false, 'reason' => 'lock_failed']; + } + + clearstatcache(true, $path); + $size = is_file($path) ? (int) filesize($path) : 0; + rewind($handle); + $content = $size > 0 ? stream_get_contents($handle) : ''; + $data = null; + + if (is_string($content) && trim($content) !== '') { + $decoded = json_decode($content, true); + $data = is_array($decoded) ? $decoded : null; + } + + $mutation = $callback($data); + $result = is_array($mutation['result'] ?? null) ? $mutation['result'] : []; + + if (($mutation['delete'] ?? false) === true) { + ftruncate($handle, 0); + fflush($handle); + flock($handle, LOCK_UN); + fclose($handle); + @unlink($path); + + return $result; + } + + if (array_key_exists('data', $mutation) && is_array($mutation['data'])) { + $json = json_encode($mutation['data'], JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); + rewind($handle); + ftruncate($handle, 0); + fwrite($handle, $json); + fflush($handle); + } + + flock($handle, LOCK_UN); + + return $result; + } catch (\Throwable $e) { + @flock($handle, LOCK_UN); + throw $e; + } finally { + if (is_resource($handle)) { + @fclose($handle); + } + } + } + + /** + * @param array $claim + */ + private function jobClaimErrorMessage(array $claim): string + { + $reason = (string) ($claim['reason'] ?? 'missing'); + $status = (string) ($claim['status'] ?? ''); + $message = is_string($claim['message'] ?? null) ? trim((string) $claim['message']) : ''; + + if ($reason === 'expired') { + return 'Der Antwort-Job ist abgelaufen. Bitte sende die Anfrage erneut.'; + } + + if ($reason === 'invalid') { + return 'Der Antwort-Job ist ungültig. Bitte sende die Anfrage erneut.'; + } + + if ($reason === 'lock_failed') { + return 'Der Antwort-Job ist gerade gesperrt. Bitte sende die Anfrage erneut, falls keine Antwort erscheint.'; + } + + if ($reason === 'not_pending') { + if ($status === self::JOB_STATUS_RUNNING) { + return 'Der Antwort-Stream läuft bereits oder wurde nach einem Verbindungsabbruch erneut geöffnet. Bitte sende die Anfrage erneut, falls die Antwort unvollständig ist.'; + } + + if ($status === self::JOB_STATUS_INTERRUPTED) { + return 'Der Antwort-Stream wurde durch einen Verbindungsabbruch unterbrochen. Bitte sende die Anfrage erneut, falls die Antwort unvollständig ist.'; + } + + if ($status === self::JOB_STATUS_COMPLETED) { + return 'Der Antwort-Stream wurde bereits abgeschlossen. Bitte sende eine neue Anfrage, wenn du eine weitere Antwort brauchst.'; + } + + if ($status === self::JOB_STATUS_FAILED) { + return $message !== '' + ? 'Der Antwort-Stream ist fehlgeschlagen: ' . $message + : 'Der Antwort-Stream ist fehlgeschlagen. Bitte sende die Anfrage erneut.'; + } + } + + return 'Der Antwort-Job wurde nicht gefunden. Falls deine Verbindung kurz unterbrochen war, sende die Anfrage bitte erneut.'; } private function cleanupExpiredJobs(): void