From d3d94d023e3fe348c0885a664ae2519edef9ea8a Mon Sep 17 00:00:00 2001 From: team 1 Date: Wed, 29 Apr 2026 09:09:43 +0200 Subject: [PATCH] optimize technical truth --- public/assets/js/base.js | 124 +++++++++++++++++++++++++++- src/Controller/AskSseController.php | 53 ++++++++++++ src/Infrastructure/OllamaClient.php | 20 ++++- 3 files changed, 192 insertions(+), 5 deletions(-) diff --git a/public/assets/js/base.js b/public/assets/js/base.js index 81b4357..99bf908 100644 --- a/public/assets/js/base.js +++ b/public/assets/js/base.js @@ -8,6 +8,9 @@ document.addEventListener('DOMContentLoaded', () => { const retriexCardsToggleEl = document.getElementById('toggle-retriex-cards'); const LAST_TURN_STORAGE_KEY = 'retriex:lastCompletedTurn'; const DETAIL_CARDS_STORAGE_KEY = 'retriex:showDetailCards'; + const JOB_STATUS_POLL_INTERVAL_MS = 2500; + const JOB_COMPLETION_CATCHUP_GRACE_MS = 10000; + const JOB_CLIENT_STALE_GRACE_MS = 150000; const state = { abortRequested: false, @@ -798,10 +801,13 @@ document.addEventListener('DOMContentLoaded', () => { await new Promise((resolve, reject) => { let finished = false; let lastSseEventId = 0; + let completedStatusSeenAt = null; + let lastClientProgressAt = Date.now(); const source = new EventSource(`/ask-sse/${encodeURIComponent(jobId)}`); state.eventSource = source; let networkErrorTimer = null; + let jobStatusTimer = null; const clearNetworkErrorTimer = () => { if (!networkErrorTimer) { @@ -812,13 +818,32 @@ document.addEventListener('DOMContentLoaded', () => { networkErrorTimer = null; }; + const clearJobStatusTimer = () => { + if (!jobStatusTimer) { + return; + } + + clearTimeout(jobStatusTimer); + jobStatusTimer = null; + }; + + const clearStreamTimers = () => { + clearNetworkErrorTimer(); + clearJobStatusTimer(); + }; + + const parsePositiveInteger = (value) => { + const parsed = Number.parseInt(String(value || ''), 10); + return Number.isFinite(parsed) && parsed > 0 ? parsed : 0; + }; + const complete = () => { if (finished) { return; } finished = true; - clearNetworkErrorTimer(); + clearStreamTimers(); finishEventStream(); resolve(); }; @@ -829,16 +854,107 @@ document.addEventListener('DOMContentLoaded', () => { } finished = true; - clearNetworkErrorTimer(); + clearStreamTimers(); finishEventStream(); reject(err); }; + const completeFromStatus = (statusPayload) => { + if (finished || state.abortRequested) { + complete(); + return; + } + + const statusLastEventId = parsePositiveInteger(statusPayload?.lastEventId); + + if (statusLastEventId > lastSseEventId) { + completedStatusSeenAt ??= Date.now(); + + if (Date.now() - completedStatusSeenAt < JOB_COMPLETION_CATCHUP_GRACE_MS) { + return; + } + } + + finalizeStream(bubble, raw); + rememberCompletedTurn(prompt, raw); + complete(); + }; + + const completeWithJobError = (message) => { + appendError(message || 'Der Antwort-Stream wurde beendet, bevor die Antwort abgeschlossen werden konnte.'); + complete(); + }; + + const pollJobStatus = async () => { + if (finished || state.abortRequested) { + return; + } + + try { + const res = await fetch(`/ask-jobs/${encodeURIComponent(jobId)}`, { + method: 'GET', + cache: 'no-store', + headers: {'Accept': 'application/json'}, + }); + + if (!res.ok) { + if (res.status === 404 && Date.now() - lastClientProgressAt > JOB_CLIENT_STALE_GRACE_MS) { + completeWithJobError('Der Antwort-Job wurde nicht mehr gefunden. Bitte sende die Anfrage erneut.'); + } + return; + } + + const statusPayload = await res.json(); + const status = String(statusPayload?.status || ''); + const message = String(statusPayload?.message || '').trim(); + + if (status === 'completed') { + completeFromStatus(statusPayload); + return; + } + + if (status === 'failed') { + completeWithJobError(message || 'Der Antwort-Stream ist fehlgeschlagen. Bitte sende die Anfrage erneut.'); + return; + } + + if (status === 'interrupted') { + completeWithJobError(message || 'Der Antwort-Stream wurde durch einen Verbindungsabbruch unterbrochen. Bitte sende die Anfrage erneut, falls die Antwort unvollständig ist.'); + return; + } + + if (status === 'missing') { + completeWithJobError(message || 'Der Antwort-Job wurde nicht gefunden. Bitte sende die Anfrage erneut.'); + return; + } + + if (status === 'running') { + const updatedAt = parsePositiveInteger(statusPayload?.updatedAt); + const serverTime = parsePositiveInteger(statusPayload?.serverTime); + const staleAfterSeconds = parsePositiveInteger(statusPayload?.runningStaleAfterSeconds); + + if (updatedAt > 0 && serverTime > 0 && staleAfterSeconds > 0 && serverTime - updatedAt > staleAfterSeconds + 15) { + completeWithJobError(message || 'Der Antwort-Job liefert seit längerer Zeit keine neuen Daten. Der Stream wurde beendet.'); + return; + } + } + } catch (err) { + console.debug('Stream job status polling failed:', err); + } finally { + if (!finished && !state.abortRequested) { + jobStatusTimer = setTimeout(pollJobStatus, JOB_STATUS_POLL_INTERVAL_MS); + } + } + }; + state.completeStream = complete; state.failStream = fail; + jobStatusTimer = setTimeout(pollJobStatus, JOB_STATUS_POLL_INTERVAL_MS); + source.onopen = () => { clearNetworkErrorTimer(); + lastClientProgressAt = Date.now(); }; source.onmessage = (event) => { @@ -848,6 +964,8 @@ document.addEventListener('DOMContentLoaded', () => { } clearNetworkErrorTimer(); + lastClientProgressAt = Date.now(); + completedStatusSeenAt = null; if (event.data === undefined || event.data === null || event.data === '') { return; @@ -962,4 +1080,4 @@ document.addEventListener('DOMContentLoaded', () => { chatEl.innerHTML = ''; addMessage('assistant', 'History cleared.'); }); -}); \ No newline at end of file +}); diff --git a/src/Controller/AskSseController.php b/src/Controller/AskSseController.php index 0bf441f..c7e8828 100644 --- a/src/Controller/AskSseController.php +++ b/src/Controller/AskSseController.php @@ -16,6 +16,7 @@ use Symfony\Component\Routing\Annotation\Route; final readonly class AskSseController { private const JOB_TTL_SECONDS = 900; + private const JOB_RUNNING_STALE_SECONDS = 120; private const JOB_STATUS_PENDING = 'pending'; private const JOB_STATUS_RUNNING = 'running'; @@ -98,12 +99,18 @@ final readonly class AskSseController ], Response::HTTP_NOT_FOUND); } + $job = $this->markRunningJobFailedIfStale($jobId, $job) ?? $job; + $now = time(); + return new JsonResponse([ 'status' => (string) ($job['status'] ?? ''), 'message' => is_string($job['message'] ?? null) ? (string) $job['message'] : '', 'lastEventId' => max(0, (int) ($job['lastEventId'] ?? 0)), 'updatedAt' => max(0, (int) ($job['updatedAt'] ?? 0)), + 'startedAt' => max(0, (int) ($job['startedAt'] ?? 0)), 'completedAt' => max(0, (int) ($job['completedAt'] ?? 0)), + 'serverTime' => $now, + 'runningStaleAfterSeconds' => self::JOB_RUNNING_STALE_SECONDS, ]); } @@ -487,6 +494,15 @@ final readonly class AskSseController ]; } + $currentStatus = (string) ($data['status'] ?? ''); + + if (in_array($currentStatus, [self::JOB_STATUS_COMPLETED, self::JOB_STATUS_FAILED, self::JOB_STATUS_INTERRUPTED], true) && $currentStatus !== $status) { + return [ + 'persist' => false, + 'result' => ['ok' => false, 'reason' => 'terminal_status'], + ]; + } + $data['status'] = $status; $data['updatedAt'] = time(); @@ -629,6 +645,7 @@ final readonly class AskSseController } $job = $this->readJob($jobId); + $job = is_array($job) ? ($this->markRunningJobFailedIfStale($jobId, $job) ?? $job) : null; $status = is_array($job) ? (string) ($job['status'] ?? '') : ''; $message = is_array($job) && is_string($job['message'] ?? null) ? trim((string) $job['message']) @@ -739,6 +756,13 @@ final readonly class AskSseController ]; } + if ((string) ($job['status'] ?? '') !== self::JOB_STATUS_RUNNING) { + return [ + 'persist' => false, + 'result' => ['ok' => false, 'reason' => 'job_not_running'], + ]; + } + $eventId = max(0, (int) ($job['lastEventId'] ?? 0)) + 1; $job['lastEventId'] = $eventId; $job['updatedAt'] = time(); @@ -809,6 +833,35 @@ final readonly class AskSseController return is_array($decoded) ? $decoded : null; } + /** + * @param array $job + * + * @return array|null + */ + private function markRunningJobFailedIfStale(string $jobId, array $job): ?array + { + if ((string) ($job['status'] ?? '') !== self::JOB_STATUS_RUNNING) { + return $job; + } + + $lastProgressAt = max( + (int) ($job['updatedAt'] ?? 0), + (int) ($job['startedAt'] ?? 0), + (int) ($job['createdAt'] ?? 0) + ); + + if ($lastProgressAt <= 0 || time() - $lastProgressAt <= self::JOB_RUNNING_STALE_SECONDS) { + return $job; + } + + $message = 'Der Antwort-Job liefert seit längerer Zeit keine neuen Daten. Der Stream wurde beendet, damit die Oberfläche nicht hängen bleibt.'; + $this->markJobStatus($jobId, self::JOB_STATUS_FAILED, $message); + + $freshJob = $this->readJob($jobId); + + return is_array($freshJob) ? $freshJob : $job; + } + /** * @param array $claim */ diff --git a/src/Infrastructure/OllamaClient.php b/src/Infrastructure/OllamaClient.php index 3258c56..185d994 100644 --- a/src/Infrastructure/OllamaClient.php +++ b/src/Infrastructure/OllamaClient.php @@ -13,6 +13,9 @@ use Throwable; final class OllamaClient { + private const CONNECT_TIMEOUT_SECONDS = 10; + private const LOW_SPEED_LIMIT_BYTES = 1; + private const LOW_SPEED_TIME_SECONDS = 45; private ?ModelGenerationConfig $cachedConfig = null; private $config = null; @@ -66,7 +69,10 @@ final class OllamaClient CURLOPT_HTTPHEADER => ['Content-Type: application/json'], CURLOPT_POSTFIELDS => $payload, CURLOPT_RETURNTRANSFER => false, - CURLOPT_TIMEOUT => $this->timeoutSeconds, + CURLOPT_TIMEOUT => $this->requestTimeoutSeconds(), + CURLOPT_CONNECTTIMEOUT => self::CONNECT_TIMEOUT_SECONDS, + CURLOPT_LOW_SPEED_LIMIT => self::LOW_SPEED_LIMIT_BYTES, + CURLOPT_LOW_SPEED_TIME => self::LOW_SPEED_TIME_SECONDS, CURLOPT_WRITEFUNCTION => function ($curl, string $data) use (&$buffer): int { $buffer .= $data; return strlen($data); @@ -144,7 +150,10 @@ final class OllamaClient CURLOPT_HTTPHEADER => ['Content-Type: application/json'], CURLOPT_POSTFIELDS => $payload, CURLOPT_RETURNTRANSFER => true, - CURLOPT_TIMEOUT => $this->timeoutSeconds, + CURLOPT_TIMEOUT => $this->requestTimeoutSeconds(), + CURLOPT_CONNECTTIMEOUT => self::CONNECT_TIMEOUT_SECONDS, + CURLOPT_LOW_SPEED_LIMIT => self::LOW_SPEED_LIMIT_BYTES, + CURLOPT_LOW_SPEED_TIME => self::LOW_SPEED_TIME_SECONDS, ]); $response = curl_exec($ch); @@ -188,6 +197,13 @@ final class OllamaClient ]; } + private function requestTimeoutSeconds(): int + { + $timeout = (int) $this->timeoutSeconds; + + return $timeout > 0 ? $timeout : 300; + } + /** * Config caching per request */