optimize technical truth
This commit is contained in:
@@ -8,6 +8,9 @@ document.addEventListener('DOMContentLoaded', () => {
|
|||||||
const retriexCardsToggleEl = document.getElementById('toggle-retriex-cards');
|
const retriexCardsToggleEl = document.getElementById('toggle-retriex-cards');
|
||||||
const LAST_TURN_STORAGE_KEY = 'retriex:lastCompletedTurn';
|
const LAST_TURN_STORAGE_KEY = 'retriex:lastCompletedTurn';
|
||||||
const DETAIL_CARDS_STORAGE_KEY = 'retriex:showDetailCards';
|
const DETAIL_CARDS_STORAGE_KEY = 'retriex:showDetailCards';
|
||||||
|
const JOB_STATUS_POLL_INTERVAL_MS = 2500;
|
||||||
|
const JOB_COMPLETION_CATCHUP_GRACE_MS = 10000;
|
||||||
|
const JOB_CLIENT_STALE_GRACE_MS = 150000;
|
||||||
|
|
||||||
const state = {
|
const state = {
|
||||||
abortRequested: false,
|
abortRequested: false,
|
||||||
@@ -798,10 +801,13 @@ document.addEventListener('DOMContentLoaded', () => {
|
|||||||
await new Promise((resolve, reject) => {
|
await new Promise((resolve, reject) => {
|
||||||
let finished = false;
|
let finished = false;
|
||||||
let lastSseEventId = 0;
|
let lastSseEventId = 0;
|
||||||
|
let completedStatusSeenAt = null;
|
||||||
|
let lastClientProgressAt = Date.now();
|
||||||
const source = new EventSource(`/ask-sse/${encodeURIComponent(jobId)}`);
|
const source = new EventSource(`/ask-sse/${encodeURIComponent(jobId)}`);
|
||||||
state.eventSource = source;
|
state.eventSource = source;
|
||||||
|
|
||||||
let networkErrorTimer = null;
|
let networkErrorTimer = null;
|
||||||
|
let jobStatusTimer = null;
|
||||||
|
|
||||||
const clearNetworkErrorTimer = () => {
|
const clearNetworkErrorTimer = () => {
|
||||||
if (!networkErrorTimer) {
|
if (!networkErrorTimer) {
|
||||||
@@ -812,13 +818,32 @@ document.addEventListener('DOMContentLoaded', () => {
|
|||||||
networkErrorTimer = null;
|
networkErrorTimer = null;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const clearJobStatusTimer = () => {
|
||||||
|
if (!jobStatusTimer) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
clearTimeout(jobStatusTimer);
|
||||||
|
jobStatusTimer = null;
|
||||||
|
};
|
||||||
|
|
||||||
|
const clearStreamTimers = () => {
|
||||||
|
clearNetworkErrorTimer();
|
||||||
|
clearJobStatusTimer();
|
||||||
|
};
|
||||||
|
|
||||||
|
const parsePositiveInteger = (value) => {
|
||||||
|
const parsed = Number.parseInt(String(value || ''), 10);
|
||||||
|
return Number.isFinite(parsed) && parsed > 0 ? parsed : 0;
|
||||||
|
};
|
||||||
|
|
||||||
const complete = () => {
|
const complete = () => {
|
||||||
if (finished) {
|
if (finished) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
finished = true;
|
finished = true;
|
||||||
clearNetworkErrorTimer();
|
clearStreamTimers();
|
||||||
finishEventStream();
|
finishEventStream();
|
||||||
resolve();
|
resolve();
|
||||||
};
|
};
|
||||||
@@ -829,16 +854,107 @@ document.addEventListener('DOMContentLoaded', () => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
finished = true;
|
finished = true;
|
||||||
clearNetworkErrorTimer();
|
clearStreamTimers();
|
||||||
finishEventStream();
|
finishEventStream();
|
||||||
reject(err);
|
reject(err);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const completeFromStatus = (statusPayload) => {
|
||||||
|
if (finished || state.abortRequested) {
|
||||||
|
complete();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const statusLastEventId = parsePositiveInteger(statusPayload?.lastEventId);
|
||||||
|
|
||||||
|
if (statusLastEventId > lastSseEventId) {
|
||||||
|
completedStatusSeenAt ??= Date.now();
|
||||||
|
|
||||||
|
if (Date.now() - completedStatusSeenAt < JOB_COMPLETION_CATCHUP_GRACE_MS) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
finalizeStream(bubble, raw);
|
||||||
|
rememberCompletedTurn(prompt, raw);
|
||||||
|
complete();
|
||||||
|
};
|
||||||
|
|
||||||
|
const completeWithJobError = (message) => {
|
||||||
|
appendError(message || 'Der Antwort-Stream wurde beendet, bevor die Antwort abgeschlossen werden konnte.');
|
||||||
|
complete();
|
||||||
|
};
|
||||||
|
|
||||||
|
const pollJobStatus = async () => {
|
||||||
|
if (finished || state.abortRequested) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const res = await fetch(`/ask-jobs/${encodeURIComponent(jobId)}`, {
|
||||||
|
method: 'GET',
|
||||||
|
cache: 'no-store',
|
||||||
|
headers: {'Accept': 'application/json'},
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!res.ok) {
|
||||||
|
if (res.status === 404 && Date.now() - lastClientProgressAt > JOB_CLIENT_STALE_GRACE_MS) {
|
||||||
|
completeWithJobError('Der Antwort-Job wurde nicht mehr gefunden. Bitte sende die Anfrage erneut.');
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const statusPayload = await res.json();
|
||||||
|
const status = String(statusPayload?.status || '');
|
||||||
|
const message = String(statusPayload?.message || '').trim();
|
||||||
|
|
||||||
|
if (status === 'completed') {
|
||||||
|
completeFromStatus(statusPayload);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (status === 'failed') {
|
||||||
|
completeWithJobError(message || 'Der Antwort-Stream ist fehlgeschlagen. Bitte sende die Anfrage erneut.');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (status === 'interrupted') {
|
||||||
|
completeWithJobError(message || 'Der Antwort-Stream wurde durch einen Verbindungsabbruch unterbrochen. Bitte sende die Anfrage erneut, falls die Antwort unvollständig ist.');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (status === 'missing') {
|
||||||
|
completeWithJobError(message || 'Der Antwort-Job wurde nicht gefunden. Bitte sende die Anfrage erneut.');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (status === 'running') {
|
||||||
|
const updatedAt = parsePositiveInteger(statusPayload?.updatedAt);
|
||||||
|
const serverTime = parsePositiveInteger(statusPayload?.serverTime);
|
||||||
|
const staleAfterSeconds = parsePositiveInteger(statusPayload?.runningStaleAfterSeconds);
|
||||||
|
|
||||||
|
if (updatedAt > 0 && serverTime > 0 && staleAfterSeconds > 0 && serverTime - updatedAt > staleAfterSeconds + 15) {
|
||||||
|
completeWithJobError(message || 'Der Antwort-Job liefert seit längerer Zeit keine neuen Daten. Der Stream wurde beendet.');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.debug('Stream job status polling failed:', err);
|
||||||
|
} finally {
|
||||||
|
if (!finished && !state.abortRequested) {
|
||||||
|
jobStatusTimer = setTimeout(pollJobStatus, JOB_STATUS_POLL_INTERVAL_MS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
state.completeStream = complete;
|
state.completeStream = complete;
|
||||||
state.failStream = fail;
|
state.failStream = fail;
|
||||||
|
|
||||||
|
jobStatusTimer = setTimeout(pollJobStatus, JOB_STATUS_POLL_INTERVAL_MS);
|
||||||
|
|
||||||
source.onopen = () => {
|
source.onopen = () => {
|
||||||
clearNetworkErrorTimer();
|
clearNetworkErrorTimer();
|
||||||
|
lastClientProgressAt = Date.now();
|
||||||
};
|
};
|
||||||
|
|
||||||
source.onmessage = (event) => {
|
source.onmessage = (event) => {
|
||||||
@@ -848,6 +964,8 @@ document.addEventListener('DOMContentLoaded', () => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
clearNetworkErrorTimer();
|
clearNetworkErrorTimer();
|
||||||
|
lastClientProgressAt = Date.now();
|
||||||
|
completedStatusSeenAt = null;
|
||||||
|
|
||||||
if (event.data === undefined || event.data === null || event.data === '') {
|
if (event.data === undefined || event.data === null || event.data === '') {
|
||||||
return;
|
return;
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ use Symfony\Component\Routing\Annotation\Route;
|
|||||||
final readonly class AskSseController
|
final readonly class AskSseController
|
||||||
{
|
{
|
||||||
private const JOB_TTL_SECONDS = 900;
|
private const JOB_TTL_SECONDS = 900;
|
||||||
|
private const JOB_RUNNING_STALE_SECONDS = 120;
|
||||||
|
|
||||||
private const JOB_STATUS_PENDING = 'pending';
|
private const JOB_STATUS_PENDING = 'pending';
|
||||||
private const JOB_STATUS_RUNNING = 'running';
|
private const JOB_STATUS_RUNNING = 'running';
|
||||||
@@ -98,12 +99,18 @@ final readonly class AskSseController
|
|||||||
], Response::HTTP_NOT_FOUND);
|
], Response::HTTP_NOT_FOUND);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$job = $this->markRunningJobFailedIfStale($jobId, $job) ?? $job;
|
||||||
|
$now = time();
|
||||||
|
|
||||||
return new JsonResponse([
|
return new JsonResponse([
|
||||||
'status' => (string) ($job['status'] ?? ''),
|
'status' => (string) ($job['status'] ?? ''),
|
||||||
'message' => is_string($job['message'] ?? null) ? (string) $job['message'] : '',
|
'message' => is_string($job['message'] ?? null) ? (string) $job['message'] : '',
|
||||||
'lastEventId' => max(0, (int) ($job['lastEventId'] ?? 0)),
|
'lastEventId' => max(0, (int) ($job['lastEventId'] ?? 0)),
|
||||||
'updatedAt' => max(0, (int) ($job['updatedAt'] ?? 0)),
|
'updatedAt' => max(0, (int) ($job['updatedAt'] ?? 0)),
|
||||||
|
'startedAt' => max(0, (int) ($job['startedAt'] ?? 0)),
|
||||||
'completedAt' => max(0, (int) ($job['completedAt'] ?? 0)),
|
'completedAt' => max(0, (int) ($job['completedAt'] ?? 0)),
|
||||||
|
'serverTime' => $now,
|
||||||
|
'runningStaleAfterSeconds' => self::JOB_RUNNING_STALE_SECONDS,
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -487,6 +494,15 @@ final readonly class AskSseController
|
|||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$currentStatus = (string) ($data['status'] ?? '');
|
||||||
|
|
||||||
|
if (in_array($currentStatus, [self::JOB_STATUS_COMPLETED, self::JOB_STATUS_FAILED, self::JOB_STATUS_INTERRUPTED], true) && $currentStatus !== $status) {
|
||||||
|
return [
|
||||||
|
'persist' => false,
|
||||||
|
'result' => ['ok' => false, 'reason' => 'terminal_status'],
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
$data['status'] = $status;
|
$data['status'] = $status;
|
||||||
$data['updatedAt'] = time();
|
$data['updatedAt'] = time();
|
||||||
|
|
||||||
@@ -629,6 +645,7 @@ final readonly class AskSseController
|
|||||||
}
|
}
|
||||||
|
|
||||||
$job = $this->readJob($jobId);
|
$job = $this->readJob($jobId);
|
||||||
|
$job = is_array($job) ? ($this->markRunningJobFailedIfStale($jobId, $job) ?? $job) : null;
|
||||||
$status = is_array($job) ? (string) ($job['status'] ?? '') : '';
|
$status = is_array($job) ? (string) ($job['status'] ?? '') : '';
|
||||||
$message = is_array($job) && is_string($job['message'] ?? null)
|
$message = is_array($job) && is_string($job['message'] ?? null)
|
||||||
? trim((string) $job['message'])
|
? trim((string) $job['message'])
|
||||||
@@ -739,6 +756,13 @@ final readonly class AskSseController
|
|||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((string) ($job['status'] ?? '') !== self::JOB_STATUS_RUNNING) {
|
||||||
|
return [
|
||||||
|
'persist' => false,
|
||||||
|
'result' => ['ok' => false, 'reason' => 'job_not_running'],
|
||||||
|
];
|
||||||
|
}
|
||||||
|
|
||||||
$eventId = max(0, (int) ($job['lastEventId'] ?? 0)) + 1;
|
$eventId = max(0, (int) ($job['lastEventId'] ?? 0)) + 1;
|
||||||
$job['lastEventId'] = $eventId;
|
$job['lastEventId'] = $eventId;
|
||||||
$job['updatedAt'] = time();
|
$job['updatedAt'] = time();
|
||||||
@@ -809,6 +833,35 @@ final readonly class AskSseController
|
|||||||
return is_array($decoded) ? $decoded : null;
|
return is_array($decoded) ? $decoded : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param array<string, mixed> $job
|
||||||
|
*
|
||||||
|
* @return array<string, mixed>|null
|
||||||
|
*/
|
||||||
|
private function markRunningJobFailedIfStale(string $jobId, array $job): ?array
|
||||||
|
{
|
||||||
|
if ((string) ($job['status'] ?? '') !== self::JOB_STATUS_RUNNING) {
|
||||||
|
return $job;
|
||||||
|
}
|
||||||
|
|
||||||
|
$lastProgressAt = max(
|
||||||
|
(int) ($job['updatedAt'] ?? 0),
|
||||||
|
(int) ($job['startedAt'] ?? 0),
|
||||||
|
(int) ($job['createdAt'] ?? 0)
|
||||||
|
);
|
||||||
|
|
||||||
|
if ($lastProgressAt <= 0 || time() - $lastProgressAt <= self::JOB_RUNNING_STALE_SECONDS) {
|
||||||
|
return $job;
|
||||||
|
}
|
||||||
|
|
||||||
|
$message = 'Der Antwort-Job liefert seit längerer Zeit keine neuen Daten. Der Stream wurde beendet, damit die Oberfläche nicht hängen bleibt.';
|
||||||
|
$this->markJobStatus($jobId, self::JOB_STATUS_FAILED, $message);
|
||||||
|
|
||||||
|
$freshJob = $this->readJob($jobId);
|
||||||
|
|
||||||
|
return is_array($freshJob) ? $freshJob : $job;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param array<string, mixed> $claim
|
* @param array<string, mixed> $claim
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -13,6 +13,9 @@ use Throwable;
|
|||||||
|
|
||||||
final class OllamaClient
|
final class OllamaClient
|
||||||
{
|
{
|
||||||
|
private const CONNECT_TIMEOUT_SECONDS = 10;
|
||||||
|
private const LOW_SPEED_LIMIT_BYTES = 1;
|
||||||
|
private const LOW_SPEED_TIME_SECONDS = 45;
|
||||||
private ?ModelGenerationConfig $cachedConfig = null;
|
private ?ModelGenerationConfig $cachedConfig = null;
|
||||||
private $config = null;
|
private $config = null;
|
||||||
|
|
||||||
@@ -66,7 +69,10 @@ final class OllamaClient
|
|||||||
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
|
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
|
||||||
CURLOPT_POSTFIELDS => $payload,
|
CURLOPT_POSTFIELDS => $payload,
|
||||||
CURLOPT_RETURNTRANSFER => false,
|
CURLOPT_RETURNTRANSFER => false,
|
||||||
CURLOPT_TIMEOUT => $this->timeoutSeconds,
|
CURLOPT_TIMEOUT => $this->requestTimeoutSeconds(),
|
||||||
|
CURLOPT_CONNECTTIMEOUT => self::CONNECT_TIMEOUT_SECONDS,
|
||||||
|
CURLOPT_LOW_SPEED_LIMIT => self::LOW_SPEED_LIMIT_BYTES,
|
||||||
|
CURLOPT_LOW_SPEED_TIME => self::LOW_SPEED_TIME_SECONDS,
|
||||||
CURLOPT_WRITEFUNCTION => function ($curl, string $data) use (&$buffer): int {
|
CURLOPT_WRITEFUNCTION => function ($curl, string $data) use (&$buffer): int {
|
||||||
$buffer .= $data;
|
$buffer .= $data;
|
||||||
return strlen($data);
|
return strlen($data);
|
||||||
@@ -144,7 +150,10 @@ final class OllamaClient
|
|||||||
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
|
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
|
||||||
CURLOPT_POSTFIELDS => $payload,
|
CURLOPT_POSTFIELDS => $payload,
|
||||||
CURLOPT_RETURNTRANSFER => true,
|
CURLOPT_RETURNTRANSFER => true,
|
||||||
CURLOPT_TIMEOUT => $this->timeoutSeconds,
|
CURLOPT_TIMEOUT => $this->requestTimeoutSeconds(),
|
||||||
|
CURLOPT_CONNECTTIMEOUT => self::CONNECT_TIMEOUT_SECONDS,
|
||||||
|
CURLOPT_LOW_SPEED_LIMIT => self::LOW_SPEED_LIMIT_BYTES,
|
||||||
|
CURLOPT_LOW_SPEED_TIME => self::LOW_SPEED_TIME_SECONDS,
|
||||||
]);
|
]);
|
||||||
|
|
||||||
$response = curl_exec($ch);
|
$response = curl_exec($ch);
|
||||||
@@ -188,6 +197,13 @@ final class OllamaClient
|
|||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private function requestTimeoutSeconds(): int
|
||||||
|
{
|
||||||
|
$timeout = (int) $this->timeoutSeconds;
|
||||||
|
|
||||||
|
return $timeout > 0 ? $timeout : 300;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Config caching per request
|
* Config caching per request
|
||||||
*/
|
*/
|
||||||
|
|||||||
Reference in New Issue
Block a user