harden sse errors and aborts
This commit is contained in:
50
RETRIEX_SSE_JOB_HARDENING_FIX_README.md
Normal file
50
RETRIEX_SSE_JOB_HARDENING_FIX_README.md
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
# RetrieX SSE Job Hardening Fix
|
||||||
|
|
||||||
|
Patch-only fix for the browser streaming job lifecycle.
|
||||||
|
|
||||||
|
## Problem
|
||||||
|
|
||||||
|
`/ask-sse/{jobId}` deleted the stream job immediately when the first EventSource connection started.
|
||||||
|
If the browser, WLAN, router, proxy or PHP/Nginx connection briefly dropped, EventSource tried to reconnect with the same job id. The job file was already gone, so the user saw:
|
||||||
|
|
||||||
|
> Der Antwort-Job ist abgelaufen oder wurde nicht gefunden. Bitte sende die Anfrage erneut.
|
||||||
|
|
||||||
|
This made normal network interruptions look like an expired job.
|
||||||
|
|
||||||
|
## Change
|
||||||
|
|
||||||
|
`src/Controller/AskSseController.php` now keeps the job file for the configured TTL and uses explicit job states:
|
||||||
|
|
||||||
|
- `pending`
|
||||||
|
- `running`
|
||||||
|
- `completed`
|
||||||
|
- `interrupted`
|
||||||
|
- `failed`
|
||||||
|
|
||||||
|
The stream endpoint atomically claims a pending job under a file lock instead of deleting it immediately. Reconnects or duplicate opens no longer see a missing job; they receive a more accurate message depending on the stored state.
|
||||||
|
|
||||||
|
## Runtime behavior
|
||||||
|
|
||||||
|
- A new job is created as `pending`.
|
||||||
|
- The first `/ask-sse/{jobId}` request claims it as `running`.
|
||||||
|
- Successful completion marks it as `completed`.
|
||||||
|
- Browser/client connection abort marks it as `interrupted`.
|
||||||
|
- Stream exceptions or fatal shutdown errors mark it as `failed`.
|
||||||
|
- Old job files are still cleaned by `JOB_TTL_SECONDS`.
|
||||||
|
|
||||||
|
## Safety
|
||||||
|
|
||||||
|
This patch does not change Retrieval, PromptBuilder, AgentRunner, Shopware, Intent, Vocabulary, scoring or RAG behavior.
|
||||||
|
It only hardens the SSE job lifecycle and improves user-facing error messages for reconnect/network cases.
|
||||||
|
|
||||||
|
## After applying
|
||||||
|
|
||||||
|
Run:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
php bin/console cache:clear
|
||||||
|
php bin/console mto:agent:config:validate
|
||||||
|
php bin/console mto:agent:regression:test
|
||||||
|
```
|
||||||
|
|
||||||
|
Then test in the browser with a normal prompt and, if possible, simulate a short network interruption during streaming.
|
||||||
@@ -17,6 +17,12 @@ final readonly class AskSseController
|
|||||||
{
|
{
|
||||||
private const JOB_TTL_SECONDS = 900;
|
private const JOB_TTL_SECONDS = 900;
|
||||||
|
|
||||||
|
private const JOB_STATUS_PENDING = 'pending';
|
||||||
|
private const JOB_STATUS_RUNNING = 'running';
|
||||||
|
private const JOB_STATUS_COMPLETED = 'completed';
|
||||||
|
private const JOB_STATUS_INTERRUPTED = 'interrupted';
|
||||||
|
private const JOB_STATUS_FAILED = 'failed';
|
||||||
|
|
||||||
public function __construct(
|
public function __construct(
|
||||||
private AgentRunner $agentRunner,
|
private AgentRunner $agentRunner,
|
||||||
private ClientIdResolver $clientIdResolver,
|
private ClientIdResolver $clientIdResolver,
|
||||||
@@ -51,11 +57,14 @@ final readonly class AskSseController
|
|||||||
try {
|
try {
|
||||||
$this->cleanupExpiredJobs();
|
$this->cleanupExpiredJobs();
|
||||||
$jobId = bin2hex(random_bytes(24));
|
$jobId = bin2hex(random_bytes(24));
|
||||||
|
$now = time();
|
||||||
$this->writeJob($jobId, [
|
$this->writeJob($jobId, [
|
||||||
|
'status' => self::JOB_STATUS_PENDING,
|
||||||
'prompt' => $prompt,
|
'prompt' => $prompt,
|
||||||
'clientId' => $clientId,
|
'clientId' => $clientId,
|
||||||
'includeFullContext' => $includeFullContext,
|
'includeFullContext' => $includeFullContext,
|
||||||
'createdAt' => time(),
|
'createdAt' => $now,
|
||||||
|
'updatedAt' => $now,
|
||||||
]);
|
]);
|
||||||
} catch (\Throwable $e) {
|
} catch (\Throwable $e) {
|
||||||
return new JsonResponse(
|
return new JsonResponse(
|
||||||
@@ -78,22 +87,25 @@ final readonly class AskSseController
|
|||||||
{
|
{
|
||||||
return new StreamedResponse(
|
return new StreamedResponse(
|
||||||
function () use ($jobId): void {
|
function () use ($jobId): void {
|
||||||
$job = $this->readJob($jobId);
|
$claimed = $this->claimJob($jobId);
|
||||||
|
|
||||||
if ($job === null) {
|
if (($claimed['ok'] ?? false) !== true) {
|
||||||
$this->prepareStreamRuntime();
|
$this->prepareStreamRuntime();
|
||||||
echo "retry: 15000\n\n";
|
echo "retry: 15000\n\n";
|
||||||
$this->sendEvent('error', 'Der Antwort-Job ist abgelaufen oder wurde nicht gefunden. Bitte sende die Anfrage erneut.');
|
$this->sendEvent('error', $this->jobClaimErrorMessage($claimed));
|
||||||
$this->sendEvent('done', '[DONE]');
|
$this->sendEvent('done', '[DONE]');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->deleteJob($jobId);
|
/** @var array<string, mixed> $job */
|
||||||
|
$job = $claimed['job'];
|
||||||
|
|
||||||
$this->streamAgentResponse(
|
$this->streamAgentResponse(
|
||||||
prompt: (string) ($job['prompt'] ?? ''),
|
prompt: (string) ($job['prompt'] ?? ''),
|
||||||
clientId: (string) ($job['clientId'] ?? ''),
|
clientId: (string) ($job['clientId'] ?? ''),
|
||||||
includeFullContext: (bool) ($job['includeFullContext'] ?? false),
|
includeFullContext: (bool) ($job['includeFullContext'] ?? false),
|
||||||
cookieResponse: null
|
cookieResponse: null,
|
||||||
|
jobId: $jobId
|
||||||
);
|
);
|
||||||
},
|
},
|
||||||
Response::HTTP_OK,
|
Response::HTTP_OK,
|
||||||
@@ -126,7 +138,8 @@ final readonly class AskSseController
|
|||||||
prompt: $prompt,
|
prompt: $prompt,
|
||||||
clientId: $clientId,
|
clientId: $clientId,
|
||||||
includeFullContext: $includeFullContext,
|
includeFullContext: $includeFullContext,
|
||||||
cookieResponse: $cookieResponse
|
cookieResponse: $cookieResponse,
|
||||||
|
jobId: null
|
||||||
);
|
);
|
||||||
},
|
},
|
||||||
Response::HTTP_OK,
|
Response::HTTP_OK,
|
||||||
@@ -138,10 +151,11 @@ final readonly class AskSseController
|
|||||||
string $prompt,
|
string $prompt,
|
||||||
string $clientId,
|
string $clientId,
|
||||||
bool $includeFullContext,
|
bool $includeFullContext,
|
||||||
?Response $cookieResponse
|
?Response $cookieResponse,
|
||||||
|
?string $jobId = null
|
||||||
): void {
|
): void {
|
||||||
$this->prepareStreamRuntime();
|
$this->prepareStreamRuntime();
|
||||||
$this->registerStreamShutdownErrorHandler();
|
$this->registerStreamShutdownErrorHandler($jobId);
|
||||||
|
|
||||||
if ($cookieResponse !== null) {
|
if ($cookieResponse !== null) {
|
||||||
foreach ($cookieResponse->headers->getCookies() as $cookie) {
|
foreach ($cookieResponse->headers->getCookies() as $cookie) {
|
||||||
@@ -153,6 +167,7 @@ final readonly class AskSseController
|
|||||||
$this->sendComment('stream-open');
|
$this->sendComment('stream-open');
|
||||||
|
|
||||||
if ($prompt === '') {
|
if ($prompt === '') {
|
||||||
|
$this->markJobStatus($jobId, self::JOB_STATUS_FAILED, 'Empty prompt');
|
||||||
$this->sendEvent('error', 'Empty prompt');
|
$this->sendEvent('error', 'Empty prompt');
|
||||||
$this->sendEvent('done', '[DONE]');
|
$this->sendEvent('done', '[DONE]');
|
||||||
return;
|
return;
|
||||||
@@ -161,6 +176,11 @@ final readonly class AskSseController
|
|||||||
try {
|
try {
|
||||||
foreach ($this->agentRunner->run($prompt, $clientId, $includeFullContext) as $chunk) {
|
foreach ($this->agentRunner->run($prompt, $clientId, $includeFullContext) as $chunk) {
|
||||||
if (connection_aborted() === 1) {
|
if (connection_aborted() === 1) {
|
||||||
|
$this->markJobStatus(
|
||||||
|
$jobId,
|
||||||
|
self::JOB_STATUS_INTERRUPTED,
|
||||||
|
'Die Verbindung zum Antwort-Stream wurde unterbrochen.'
|
||||||
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -169,6 +189,7 @@ final readonly class AskSseController
|
|||||||
}
|
}
|
||||||
} catch (\Throwable $e) {
|
} catch (\Throwable $e) {
|
||||||
$message = 'Stream abgebrochen: ' . $this->formatThrowableForClient($e);
|
$message = 'Stream abgebrochen: ' . $this->formatThrowableForClient($e);
|
||||||
|
$this->markJobStatus($jobId, self::JOB_STATUS_FAILED, $message);
|
||||||
$this->sendEvent(
|
$this->sendEvent(
|
||||||
'error',
|
'error',
|
||||||
'❌ ' . $message
|
'❌ ' . $message
|
||||||
@@ -177,8 +198,12 @@ final readonly class AskSseController
|
|||||||
if ($prompt !== '' && $clientId !== '') {
|
if ($prompt !== '' && $clientId !== '') {
|
||||||
$this->appendHistoryFailure($clientId, $prompt, $message);
|
$this->appendHistoryFailure($clientId, $prompt, $message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$this->sendEvent('done', '[DONE]');
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$this->markJobStatus($jobId, self::JOB_STATUS_COMPLETED);
|
||||||
$this->sendEvent('done', '[DONE]');
|
$this->sendEvent('done', '[DONE]');
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -201,9 +226,9 @@ final readonly class AskSseController
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private function registerStreamShutdownErrorHandler(): void
|
private function registerStreamShutdownErrorHandler(?string $jobId = null): void
|
||||||
{
|
{
|
||||||
register_shutdown_function(function (): void {
|
register_shutdown_function(function () use ($jobId): void {
|
||||||
$error = error_get_last();
|
$error = error_get_last();
|
||||||
|
|
||||||
if ($error === null) {
|
if ($error === null) {
|
||||||
@@ -223,6 +248,7 @@ final readonly class AskSseController
|
|||||||
(string) ($error['line'] ?? '?')
|
(string) ($error['line'] ?? '?')
|
||||||
);
|
);
|
||||||
|
|
||||||
|
$this->markJobStatus($jobId, self::JOB_STATUS_FAILED, $message);
|
||||||
$this->sendEvent('error', $message);
|
$this->sendEvent('error', $message);
|
||||||
$this->sendEvent('done', '[DONE]');
|
$this->sendEvent('done', '[DONE]');
|
||||||
});
|
});
|
||||||
@@ -326,49 +352,201 @@ final readonly class AskSseController
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return array<string, mixed>|null
|
* @return array{ok: bool, reason?: string, status?: string, message?: string, job?: array<string, mixed>}
|
||||||
*/
|
*/
|
||||||
private function readJob(string $jobId): ?array
|
private function claimJob(string $jobId): array
|
||||||
{
|
{
|
||||||
if (!preg_match('/\A[a-f0-9]{48}\z/', $jobId)) {
|
if (!preg_match('/\A[a-f0-9]{48}\z/', $jobId)) {
|
||||||
return null;
|
return ['ok' => false, 'reason' => 'invalid'];
|
||||||
}
|
}
|
||||||
|
|
||||||
$path = $this->jobPath($jobId);
|
return $this->mutateJobWithLock($jobId, function (?array $data): array {
|
||||||
|
if ($data === null) {
|
||||||
if (!is_file($path)) {
|
return [
|
||||||
return null;
|
'persist' => false,
|
||||||
}
|
'result' => ['ok' => false, 'reason' => 'missing'],
|
||||||
|
];
|
||||||
$content = file_get_contents($path);
|
|
||||||
|
|
||||||
if (!is_string($content) || $content === '') {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
$data = json_decode($content, true);
|
|
||||||
|
|
||||||
if (!is_array($data)) {
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$createdAt = (int) ($data['createdAt'] ?? 0);
|
$createdAt = (int) ($data['createdAt'] ?? 0);
|
||||||
|
|
||||||
if ($createdAt <= 0 || time() - $createdAt > self::JOB_TTL_SECONDS) {
|
if ($createdAt <= 0 || time() - $createdAt > self::JOB_TTL_SECONDS) {
|
||||||
$this->deleteJob($jobId);
|
return [
|
||||||
return null;
|
'delete' => true,
|
||||||
|
'result' => ['ok' => false, 'reason' => 'expired'],
|
||||||
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
return $data;
|
$status = (string) ($data['status'] ?? self::JOB_STATUS_PENDING);
|
||||||
|
|
||||||
|
if ($status !== self::JOB_STATUS_PENDING) {
|
||||||
|
return [
|
||||||
|
'persist' => false,
|
||||||
|
'result' => [
|
||||||
|
'ok' => false,
|
||||||
|
'reason' => 'not_pending',
|
||||||
|
'status' => $status,
|
||||||
|
'message' => is_string($data['message'] ?? null) ? (string) $data['message'] : null,
|
||||||
|
],
|
||||||
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
private function deleteJob(string $jobId): void
|
$data['status'] = self::JOB_STATUS_RUNNING;
|
||||||
|
$data['startedAt'] = time();
|
||||||
|
$data['updatedAt'] = time();
|
||||||
|
|
||||||
|
return [
|
||||||
|
'data' => $data,
|
||||||
|
'result' => ['ok' => true, 'job' => $data],
|
||||||
|
];
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private function markJobStatus(?string $jobId, string $status, ?string $message = null): void
|
||||||
|
{
|
||||||
|
if ($jobId === null || !preg_match('/\A[a-f0-9]{48}\z/', $jobId)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
$this->mutateJobWithLock($jobId, function (?array $data) use ($status, $message): array {
|
||||||
|
if ($data === null) {
|
||||||
|
return [
|
||||||
|
'persist' => false,
|
||||||
|
'result' => ['ok' => false],
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
|
$data['status'] = $status;
|
||||||
|
$data['updatedAt'] = time();
|
||||||
|
|
||||||
|
if ($status === self::JOB_STATUS_COMPLETED) {
|
||||||
|
$data['completedAt'] = time();
|
||||||
|
unset($data['message']);
|
||||||
|
} elseif ($message !== null && trim($message) !== '') {
|
||||||
|
$data['message'] = trim(preg_replace('/\s+/u', ' ', $message) ?? $message);
|
||||||
|
}
|
||||||
|
|
||||||
|
return [
|
||||||
|
'data' => $data,
|
||||||
|
'result' => ['ok' => true],
|
||||||
|
];
|
||||||
|
});
|
||||||
|
} catch (\Throwable) {
|
||||||
|
// Job status persistence must never break the already-running stream.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param callable(array<string, mixed>|null): array<string, mixed> $callback
|
||||||
|
*
|
||||||
|
* @return array<string, mixed>
|
||||||
|
*/
|
||||||
|
private function mutateJobWithLock(string $jobId, callable $callback): array
|
||||||
{
|
{
|
||||||
$path = $this->jobPath($jobId);
|
$path = $this->jobPath($jobId);
|
||||||
|
if (!is_file($path)) {
|
||||||
if (is_file($path)) {
|
return ['ok' => false, 'reason' => 'missing'];
|
||||||
@unlink($path);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$handle = @fopen($path, 'c+');
|
||||||
|
|
||||||
|
if ($handle === false) {
|
||||||
|
return ['ok' => false, 'reason' => 'missing'];
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (!flock($handle, LOCK_EX)) {
|
||||||
|
return ['ok' => false, 'reason' => 'lock_failed'];
|
||||||
|
}
|
||||||
|
|
||||||
|
clearstatcache(true, $path);
|
||||||
|
$size = is_file($path) ? (int) filesize($path) : 0;
|
||||||
|
rewind($handle);
|
||||||
|
$content = $size > 0 ? stream_get_contents($handle) : '';
|
||||||
|
$data = null;
|
||||||
|
|
||||||
|
if (is_string($content) && trim($content) !== '') {
|
||||||
|
$decoded = json_decode($content, true);
|
||||||
|
$data = is_array($decoded) ? $decoded : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
$mutation = $callback($data);
|
||||||
|
$result = is_array($mutation['result'] ?? null) ? $mutation['result'] : [];
|
||||||
|
|
||||||
|
if (($mutation['delete'] ?? false) === true) {
|
||||||
|
ftruncate($handle, 0);
|
||||||
|
fflush($handle);
|
||||||
|
flock($handle, LOCK_UN);
|
||||||
|
fclose($handle);
|
||||||
|
@unlink($path);
|
||||||
|
|
||||||
|
return $result;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (array_key_exists('data', $mutation) && is_array($mutation['data'])) {
|
||||||
|
$json = json_encode($mutation['data'], JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
|
||||||
|
rewind($handle);
|
||||||
|
ftruncate($handle, 0);
|
||||||
|
fwrite($handle, $json);
|
||||||
|
fflush($handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
flock($handle, LOCK_UN);
|
||||||
|
|
||||||
|
return $result;
|
||||||
|
} catch (\Throwable $e) {
|
||||||
|
@flock($handle, LOCK_UN);
|
||||||
|
throw $e;
|
||||||
|
} finally {
|
||||||
|
if (is_resource($handle)) {
|
||||||
|
@fclose($handle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param array<string, mixed> $claim
|
||||||
|
*/
|
||||||
|
private function jobClaimErrorMessage(array $claim): string
|
||||||
|
{
|
||||||
|
$reason = (string) ($claim['reason'] ?? 'missing');
|
||||||
|
$status = (string) ($claim['status'] ?? '');
|
||||||
|
$message = is_string($claim['message'] ?? null) ? trim((string) $claim['message']) : '';
|
||||||
|
|
||||||
|
if ($reason === 'expired') {
|
||||||
|
return 'Der Antwort-Job ist abgelaufen. Bitte sende die Anfrage erneut.';
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($reason === 'invalid') {
|
||||||
|
return 'Der Antwort-Job ist ungültig. Bitte sende die Anfrage erneut.';
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($reason === 'lock_failed') {
|
||||||
|
return 'Der Antwort-Job ist gerade gesperrt. Bitte sende die Anfrage erneut, falls keine Antwort erscheint.';
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($reason === 'not_pending') {
|
||||||
|
if ($status === self::JOB_STATUS_RUNNING) {
|
||||||
|
return 'Der Antwort-Stream läuft bereits oder wurde nach einem Verbindungsabbruch erneut geöffnet. Bitte sende die Anfrage erneut, falls die Antwort unvollständig ist.';
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($status === self::JOB_STATUS_INTERRUPTED) {
|
||||||
|
return 'Der Antwort-Stream wurde durch einen Verbindungsabbruch unterbrochen. Bitte sende die Anfrage erneut, falls die Antwort unvollständig ist.';
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($status === self::JOB_STATUS_COMPLETED) {
|
||||||
|
return 'Der Antwort-Stream wurde bereits abgeschlossen. Bitte sende eine neue Anfrage, wenn du eine weitere Antwort brauchst.';
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($status === self::JOB_STATUS_FAILED) {
|
||||||
|
return $message !== ''
|
||||||
|
? 'Der Antwort-Stream ist fehlgeschlagen: ' . $message
|
||||||
|
: 'Der Antwort-Stream ist fehlgeschlagen. Bitte sende die Anfrage erneut.';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 'Der Antwort-Job wurde nicht gefunden. Falls deine Verbindung kurz unterbrochen war, sende die Anfrage bitte erneut.';
|
||||||
}
|
}
|
||||||
|
|
||||||
private function cleanupExpiredJobs(): void
|
private function cleanupExpiredJobs(): void
|
||||||
|
|||||||
Reference in New Issue
Block a user