diff --git a/public/assets/js/base.js b/public/assets/js/base.js
index 81b4357..99bf908 100644
--- a/public/assets/js/base.js
+++ b/public/assets/js/base.js
@@ -8,6 +8,9 @@ document.addEventListener('DOMContentLoaded', () => {
const retriexCardsToggleEl = document.getElementById('toggle-retriex-cards');
const LAST_TURN_STORAGE_KEY = 'retriex:lastCompletedTurn';
const DETAIL_CARDS_STORAGE_KEY = 'retriex:showDetailCards';
+ const JOB_STATUS_POLL_INTERVAL_MS = 2500;
+ const JOB_COMPLETION_CATCHUP_GRACE_MS = 10000;
+ const JOB_CLIENT_STALE_GRACE_MS = 150000;
const state = {
abortRequested: false,
@@ -798,10 +801,13 @@ document.addEventListener('DOMContentLoaded', () => {
await new Promise((resolve, reject) => {
let finished = false;
let lastSseEventId = 0;
+ let completedStatusSeenAt = null;
+ let lastClientProgressAt = Date.now();
const source = new EventSource(`/ask-sse/${encodeURIComponent(jobId)}`);
state.eventSource = source;
let networkErrorTimer = null;
+ let jobStatusTimer = null;
const clearNetworkErrorTimer = () => {
if (!networkErrorTimer) {
@@ -812,13 +818,32 @@ document.addEventListener('DOMContentLoaded', () => {
networkErrorTimer = null;
};
+ const clearJobStatusTimer = () => {
+ if (!jobStatusTimer) {
+ return;
+ }
+
+ clearTimeout(jobStatusTimer);
+ jobStatusTimer = null;
+ };
+
+ const clearStreamTimers = () => {
+ clearNetworkErrorTimer();
+ clearJobStatusTimer();
+ };
+
+ const parsePositiveInteger = (value) => {
+ const parsed = Number.parseInt(String(value || ''), 10);
+ return Number.isFinite(parsed) && parsed > 0 ? parsed : 0;
+ };
+
const complete = () => {
if (finished) {
return;
}
finished = true;
- clearNetworkErrorTimer();
+ clearStreamTimers();
finishEventStream();
resolve();
};
@@ -829,16 +854,107 @@ document.addEventListener('DOMContentLoaded', () => {
}
finished = true;
- clearNetworkErrorTimer();
+ clearStreamTimers();
finishEventStream();
reject(err);
};
+ const completeFromStatus = (statusPayload) => {
+ if (finished || state.abortRequested) {
+ complete();
+ return;
+ }
+
+ const statusLastEventId = parsePositiveInteger(statusPayload?.lastEventId);
+
+ if (statusLastEventId > lastSseEventId) {
+ completedStatusSeenAt ??= Date.now();
+
+ if (Date.now() - completedStatusSeenAt < JOB_COMPLETION_CATCHUP_GRACE_MS) {
+ return;
+ }
+ }
+
+ finalizeStream(bubble, raw);
+ rememberCompletedTurn(prompt, raw);
+ complete();
+ };
+
+ const completeWithJobError = (message) => {
+ appendError(message || 'Der Antwort-Stream wurde beendet, bevor die Antwort abgeschlossen werden konnte.');
+ complete();
+ };
+
+ const pollJobStatus = async () => {
+ if (finished || state.abortRequested) {
+ return;
+ }
+
+ try {
+ const res = await fetch(`/ask-jobs/${encodeURIComponent(jobId)}`, {
+ method: 'GET',
+ cache: 'no-store',
+ headers: {'Accept': 'application/json'},
+ });
+
+ if (!res.ok) {
+ if (res.status === 404 && Date.now() - lastClientProgressAt > JOB_CLIENT_STALE_GRACE_MS) {
+ completeWithJobError('Der Antwort-Job wurde nicht mehr gefunden. Bitte sende die Anfrage erneut.');
+ }
+ return;
+ }
+
+ const statusPayload = await res.json();
+ const status = String(statusPayload?.status || '');
+ const message = String(statusPayload?.message || '').trim();
+
+ if (status === 'completed') {
+ completeFromStatus(statusPayload);
+ return;
+ }
+
+ if (status === 'failed') {
+ completeWithJobError(message || 'Der Antwort-Stream ist fehlgeschlagen. Bitte sende die Anfrage erneut.');
+ return;
+ }
+
+ if (status === 'interrupted') {
+ completeWithJobError(message || 'Der Antwort-Stream wurde durch einen Verbindungsabbruch unterbrochen. Bitte sende die Anfrage erneut, falls die Antwort unvollständig ist.');
+ return;
+ }
+
+ if (status === 'missing') {
+ completeWithJobError(message || 'Der Antwort-Job wurde nicht gefunden. Bitte sende die Anfrage erneut.');
+ return;
+ }
+
+ if (status === 'running') {
+ const updatedAt = parsePositiveInteger(statusPayload?.updatedAt);
+ const serverTime = parsePositiveInteger(statusPayload?.serverTime);
+ const staleAfterSeconds = parsePositiveInteger(statusPayload?.runningStaleAfterSeconds);
+
+ if (updatedAt > 0 && serverTime > 0 && staleAfterSeconds > 0 && serverTime - updatedAt > staleAfterSeconds + 15) {
+ completeWithJobError(message || 'Der Antwort-Job liefert seit längerer Zeit keine neuen Daten. Der Stream wurde beendet.');
+ return;
+ }
+ }
+ } catch (err) {
+ console.debug('Stream job status polling failed:', err);
+ } finally {
+ if (!finished && !state.abortRequested) {
+ jobStatusTimer = setTimeout(pollJobStatus, JOB_STATUS_POLL_INTERVAL_MS);
+ }
+ }
+ };
+
state.completeStream = complete;
state.failStream = fail;
+ jobStatusTimer = setTimeout(pollJobStatus, JOB_STATUS_POLL_INTERVAL_MS);
+
source.onopen = () => {
clearNetworkErrorTimer();
+ lastClientProgressAt = Date.now();
};
source.onmessage = (event) => {
@@ -848,6 +964,8 @@ document.addEventListener('DOMContentLoaded', () => {
}
clearNetworkErrorTimer();
+ lastClientProgressAt = Date.now();
+ completedStatusSeenAt = null;
if (event.data === undefined || event.data === null || event.data === '') {
return;
@@ -962,4 +1080,4 @@ document.addEventListener('DOMContentLoaded', () => {
chatEl.innerHTML = '';
addMessage('assistant', 'History cleared.');
});
-});
\ No newline at end of file
+});
diff --git a/src/Controller/AskSseController.php b/src/Controller/AskSseController.php
index 0bf441f..c7e8828 100644
--- a/src/Controller/AskSseController.php
+++ b/src/Controller/AskSseController.php
@@ -16,6 +16,7 @@ use Symfony\Component\Routing\Annotation\Route;
final readonly class AskSseController
{
private const JOB_TTL_SECONDS = 900;
+ private const JOB_RUNNING_STALE_SECONDS = 120;
private const JOB_STATUS_PENDING = 'pending';
private const JOB_STATUS_RUNNING = 'running';
@@ -98,12 +99,18 @@ final readonly class AskSseController
], Response::HTTP_NOT_FOUND);
}
+ $job = $this->markRunningJobFailedIfStale($jobId, $job) ?? $job;
+ $now = time();
+
return new JsonResponse([
'status' => (string) ($job['status'] ?? ''),
'message' => is_string($job['message'] ?? null) ? (string) $job['message'] : '',
'lastEventId' => max(0, (int) ($job['lastEventId'] ?? 0)),
'updatedAt' => max(0, (int) ($job['updatedAt'] ?? 0)),
+ 'startedAt' => max(0, (int) ($job['startedAt'] ?? 0)),
'completedAt' => max(0, (int) ($job['completedAt'] ?? 0)),
+ 'serverTime' => $now,
+ 'runningStaleAfterSeconds' => self::JOB_RUNNING_STALE_SECONDS,
]);
}
@@ -487,6 +494,15 @@ final readonly class AskSseController
];
}
+ $currentStatus = (string) ($data['status'] ?? '');
+
+ if (in_array($currentStatus, [self::JOB_STATUS_COMPLETED, self::JOB_STATUS_FAILED, self::JOB_STATUS_INTERRUPTED], true) && $currentStatus !== $status) {
+ return [
+ 'persist' => false,
+ 'result' => ['ok' => false, 'reason' => 'terminal_status'],
+ ];
+ }
+
$data['status'] = $status;
$data['updatedAt'] = time();
@@ -629,6 +645,7 @@ final readonly class AskSseController
}
$job = $this->readJob($jobId);
+ $job = is_array($job) ? ($this->markRunningJobFailedIfStale($jobId, $job) ?? $job) : null;
$status = is_array($job) ? (string) ($job['status'] ?? '') : '';
$message = is_array($job) && is_string($job['message'] ?? null)
? trim((string) $job['message'])
@@ -739,6 +756,13 @@ final readonly class AskSseController
];
}
+ if ((string) ($job['status'] ?? '') !== self::JOB_STATUS_RUNNING) {
+ return [
+ 'persist' => false,
+ 'result' => ['ok' => false, 'reason' => 'job_not_running'],
+ ];
+ }
+
$eventId = max(0, (int) ($job['lastEventId'] ?? 0)) + 1;
$job['lastEventId'] = $eventId;
$job['updatedAt'] = time();
@@ -809,6 +833,35 @@ final readonly class AskSseController
return is_array($decoded) ? $decoded : null;
}
+ /**
+ * @param array $job
+ *
+ * @return array|null
+ */
+ private function markRunningJobFailedIfStale(string $jobId, array $job): ?array
+ {
+ if ((string) ($job['status'] ?? '') !== self::JOB_STATUS_RUNNING) {
+ return $job;
+ }
+
+ $lastProgressAt = max(
+ (int) ($job['updatedAt'] ?? 0),
+ (int) ($job['startedAt'] ?? 0),
+ (int) ($job['createdAt'] ?? 0)
+ );
+
+ if ($lastProgressAt <= 0 || time() - $lastProgressAt <= self::JOB_RUNNING_STALE_SECONDS) {
+ return $job;
+ }
+
+ $message = 'Der Antwort-Job liefert seit längerer Zeit keine neuen Daten. Der Stream wurde beendet, damit die Oberfläche nicht hängen bleibt.';
+ $this->markJobStatus($jobId, self::JOB_STATUS_FAILED, $message);
+
+ $freshJob = $this->readJob($jobId);
+
+ return is_array($freshJob) ? $freshJob : $job;
+ }
+
/**
* @param array $claim
*/
diff --git a/src/Infrastructure/OllamaClient.php b/src/Infrastructure/OllamaClient.php
index 3258c56..185d994 100644
--- a/src/Infrastructure/OllamaClient.php
+++ b/src/Infrastructure/OllamaClient.php
@@ -13,6 +13,9 @@ use Throwable;
final class OllamaClient
{
+ private const CONNECT_TIMEOUT_SECONDS = 10;
+ private const LOW_SPEED_LIMIT_BYTES = 1;
+ private const LOW_SPEED_TIME_SECONDS = 45;
private ?ModelGenerationConfig $cachedConfig = null;
private $config = null;
@@ -66,7 +69,10 @@ final class OllamaClient
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_POSTFIELDS => $payload,
CURLOPT_RETURNTRANSFER => false,
- CURLOPT_TIMEOUT => $this->timeoutSeconds,
+ CURLOPT_TIMEOUT => $this->requestTimeoutSeconds(),
+ CURLOPT_CONNECTTIMEOUT => self::CONNECT_TIMEOUT_SECONDS,
+ CURLOPT_LOW_SPEED_LIMIT => self::LOW_SPEED_LIMIT_BYTES,
+ CURLOPT_LOW_SPEED_TIME => self::LOW_SPEED_TIME_SECONDS,
CURLOPT_WRITEFUNCTION => function ($curl, string $data) use (&$buffer): int {
$buffer .= $data;
return strlen($data);
@@ -144,7 +150,10 @@ final class OllamaClient
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_POSTFIELDS => $payload,
CURLOPT_RETURNTRANSFER => true,
- CURLOPT_TIMEOUT => $this->timeoutSeconds,
+ CURLOPT_TIMEOUT => $this->requestTimeoutSeconds(),
+ CURLOPT_CONNECTTIMEOUT => self::CONNECT_TIMEOUT_SECONDS,
+ CURLOPT_LOW_SPEED_LIMIT => self::LOW_SPEED_LIMIT_BYTES,
+ CURLOPT_LOW_SPEED_TIME => self::LOW_SPEED_TIME_SECONDS,
]);
$response = curl_exec($ch);
@@ -188,6 +197,13 @@ final class OllamaClient
];
}
+ private function requestTimeoutSeconds(): int
+ {
+ $timeout = (int) $this->timeoutSeconds;
+
+ return $timeout > 0 ? $timeout : 300;
+ }
+
/**
* Config caching per request
*/