optimize msg flow
This commit is contained in:
@@ -16,7 +16,7 @@ parameters:
|
|||||||
no_concrete_shop_query: 'Ich kann die Shop-Suche noch nicht belastbar auflösen. Bitte nenne das Produkt, den Messparameter oder das Zubehör etwas konkreter.'
|
no_concrete_shop_query: 'Ich kann die Shop-Suche noch nicht belastbar auflösen. Bitte nenne das Produkt, den Messparameter oder das Zubehör etwas konkreter.'
|
||||||
fetch_search_data_template: 'Ich rufe Recherchedaten ab (type: %s)'
|
fetch_search_data_template: 'Ich rufe Recherchedaten ab (type: %s)'
|
||||||
analyze_all_information: 'Ich analysiere alle Informationen...'
|
analyze_all_information: 'Ich analysiere alle Informationen...'
|
||||||
thinking_while_streaming: 'Antwort wird generiert...'
|
thinking_while_streaming: 'Denke nach...'
|
||||||
no_llm_data_received: '❌ Es wurden keine Daten vom LLM empfangen.'
|
no_llm_data_received: '❌ Es wurden keine Daten vom LLM empfangen.'
|
||||||
generic_internal_error: '❌ Bei der Verarbeitung der Anfrage ist ein interner Fehler aufgetreten.'
|
generic_internal_error: '❌ Bei der Verarbeitung der Anfrage ist ein interner Fehler aufgetreten.'
|
||||||
debug_internal_error_prefix: '❌ Interner Fehler: '
|
debug_internal_error_prefix: '❌ Interner Fehler: '
|
||||||
|
|||||||
@@ -764,6 +764,8 @@ document.addEventListener('DOMContentLoaded', () => {
|
|||||||
state.eventSource = source;
|
state.eventSource = source;
|
||||||
|
|
||||||
let networkErrorTimer = null;
|
let networkErrorTimer = null;
|
||||||
|
let completionWatchdogTimer = null;
|
||||||
|
let completionWatchdogInFlight = false;
|
||||||
|
|
||||||
const clearNetworkErrorTimer = () => {
|
const clearNetworkErrorTimer = () => {
|
||||||
if (!networkErrorTimer) {
|
if (!networkErrorTimer) {
|
||||||
@@ -774,6 +776,15 @@ document.addEventListener('DOMContentLoaded', () => {
|
|||||||
networkErrorTimer = null;
|
networkErrorTimer = null;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const clearCompletionWatchdog = () => {
|
||||||
|
if (!completionWatchdogTimer) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
clearInterval(completionWatchdogTimer);
|
||||||
|
completionWatchdogTimer = null;
|
||||||
|
};
|
||||||
|
|
||||||
const complete = () => {
|
const complete = () => {
|
||||||
if (finished) {
|
if (finished) {
|
||||||
return;
|
return;
|
||||||
@@ -781,6 +792,7 @@ document.addEventListener('DOMContentLoaded', () => {
|
|||||||
|
|
||||||
finished = true;
|
finished = true;
|
||||||
clearNetworkErrorTimer();
|
clearNetworkErrorTimer();
|
||||||
|
clearCompletionWatchdog();
|
||||||
finishEventStream();
|
finishEventStream();
|
||||||
resolve();
|
resolve();
|
||||||
};
|
};
|
||||||
@@ -792,10 +804,73 @@ document.addEventListener('DOMContentLoaded', () => {
|
|||||||
|
|
||||||
finished = true;
|
finished = true;
|
||||||
clearNetworkErrorTimer();
|
clearNetworkErrorTimer();
|
||||||
|
clearCompletionWatchdog();
|
||||||
finishEventStream();
|
finishEventStream();
|
||||||
reject(err);
|
reject(err);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const finishFromStatus = (payload) => {
|
||||||
|
if (finished || state.abortRequested || !payload || typeof payload !== 'object') {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const status = String(payload.status || '');
|
||||||
|
const serverLastEventId = Number.parseInt(String(payload.lastEventId || '0'), 10) || 0;
|
||||||
|
|
||||||
|
// Only finalize from the side-channel when all numbered content chunks are already visible.
|
||||||
|
// This prevents cutting off the final answer if the browser missed more than just the terminal done event.
|
||||||
|
if (serverLastEventId > lastSseEventId) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (status === 'completed') {
|
||||||
|
finalizeStream(bubble, raw);
|
||||||
|
rememberCompletedTurn(prompt, raw);
|
||||||
|
complete();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (status === 'failed' || status === 'interrupted') {
|
||||||
|
appendError(payload.message || 'Der Antwort-Stream wurde beendet, ohne ein Abschluss-Signal an den Browser zu senden.');
|
||||||
|
complete();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
const checkCompletionStatus = async () => {
|
||||||
|
if (finished || state.abortRequested || completionWatchdogInFlight) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
completionWatchdogInFlight = true;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const statusRes = await fetch(`/ask-jobs/${encodeURIComponent(jobId)}`, {
|
||||||
|
method: 'GET',
|
||||||
|
cache: 'no-store',
|
||||||
|
signal: state.abortController?.signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!statusRes.ok) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return finishFromStatus(await statusRes.json());
|
||||||
|
} catch (err) {
|
||||||
|
if (!state.abortRequested) {
|
||||||
|
console.debug('Stream completion watchdog failed:', err);
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
} finally {
|
||||||
|
completionWatchdogInFlight = false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
completionWatchdogTimer = setInterval(checkCompletionStatus, 2500);
|
||||||
|
|
||||||
state.completeStream = complete;
|
state.completeStream = complete;
|
||||||
state.failStream = fail;
|
state.failStream = fail;
|
||||||
|
|
||||||
@@ -815,6 +890,13 @@ document.addEventListener('DOMContentLoaded', () => {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (event.data === '[DONE]') {
|
||||||
|
finalizeStream(bubble, raw);
|
||||||
|
rememberCompletedTurn(prompt, raw);
|
||||||
|
complete();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const numericEventId = Number.parseInt(event.lastEventId || '', 10);
|
const numericEventId = Number.parseInt(event.lastEventId || '', 10);
|
||||||
|
|
||||||
if (Number.isFinite(numericEventId) && numericEventId > 0) {
|
if (Number.isFinite(numericEventId) && numericEventId > 0) {
|
||||||
@@ -849,8 +931,20 @@ document.addEventListener('DOMContentLoaded', () => {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void checkCompletionStatus();
|
||||||
|
|
||||||
if (source.readyState === EventSource.CLOSED) {
|
if (source.readyState === EventSource.CLOSED) {
|
||||||
fail(new Error('EventSource connection closed'));
|
window.setTimeout(async () => {
|
||||||
|
if (finished || state.abortRequested) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const statusCompleted = await checkCompletionStatus();
|
||||||
|
|
||||||
|
if (!statusCompleted && !finished) {
|
||||||
|
fail(new Error('EventSource connection closed'));
|
||||||
|
}
|
||||||
|
}, 250);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -85,6 +85,28 @@ final readonly class AskSseController
|
|||||||
return $response;
|
return $response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[Route('/ask-jobs/{jobId}', name: 'ask_job_status', methods: ['GET'], requirements: ['jobId' => '[a-f0-9]{48}'])]
|
||||||
|
public function jobStatus(string $jobId): JsonResponse
|
||||||
|
{
|
||||||
|
$job = $this->readJob($jobId);
|
||||||
|
|
||||||
|
if (!is_array($job)) {
|
||||||
|
return new JsonResponse([
|
||||||
|
'status' => 'missing',
|
||||||
|
'message' => 'Der Antwort-Job wurde nicht gefunden.',
|
||||||
|
'lastEventId' => 0,
|
||||||
|
], Response::HTTP_NOT_FOUND);
|
||||||
|
}
|
||||||
|
|
||||||
|
return new JsonResponse([
|
||||||
|
'status' => (string) ($job['status'] ?? ''),
|
||||||
|
'message' => is_string($job['message'] ?? null) ? (string) $job['message'] : '',
|
||||||
|
'lastEventId' => max(0, (int) ($job['lastEventId'] ?? 0)),
|
||||||
|
'updatedAt' => max(0, (int) ($job['updatedAt'] ?? 0)),
|
||||||
|
'completedAt' => max(0, (int) ($job['completedAt'] ?? 0)),
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
|
||||||
#[Route('/ask-sse/{jobId}', name: 'ask_sse_job', methods: ['GET'], requirements: ['jobId' => '[a-f0-9]{48}'])]
|
#[Route('/ask-sse/{jobId}', name: 'ask_sse_job', methods: ['GET'], requirements: ['jobId' => '[a-f0-9]{48}'])]
|
||||||
public function streamJob(Request $request, string $jobId): StreamedResponse
|
public function streamJob(Request $request, string $jobId): StreamedResponse
|
||||||
{
|
{
|
||||||
@@ -96,7 +118,7 @@ final readonly class AskSseController
|
|||||||
|
|
||||||
if (($claimed['ok'] ?? false) !== true) {
|
if (($claimed['ok'] ?? false) !== true) {
|
||||||
$this->prepareStreamRuntime();
|
$this->prepareStreamRuntime();
|
||||||
echo "retry: 30000\n\n";
|
echo "retry: 3000\n\n";
|
||||||
|
|
||||||
if ($this->canReplayOrTailClaimedJob($claimed)) {
|
if ($this->canReplayOrTailClaimedJob($claimed)) {
|
||||||
$this->streamStoredJobResponse($jobId, $lastEventId);
|
$this->streamStoredJobResponse($jobId, $lastEventId);
|
||||||
@@ -104,7 +126,7 @@ final readonly class AskSseController
|
|||||||
}
|
}
|
||||||
|
|
||||||
$this->sendEvent('error', $this->jobClaimErrorMessage($claimed));
|
$this->sendEvent('error', $this->jobClaimErrorMessage($claimed));
|
||||||
$this->sendEvent('done', '[DONE]');
|
$this->sendDoneEvent();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -179,13 +201,13 @@ final readonly class AskSseController
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
echo "retry: 15000\n\n";
|
echo "retry: 3000\n\n";
|
||||||
$this->sendComment('stream-open');
|
$this->sendComment('stream-open');
|
||||||
|
|
||||||
if ($prompt === '') {
|
if ($prompt === '') {
|
||||||
$this->markJobStatus($jobId, self::JOB_STATUS_FAILED, 'Empty prompt');
|
$this->markJobStatus($jobId, self::JOB_STATUS_FAILED, 'Empty prompt');
|
||||||
$this->sendEvent('error', 'Empty prompt');
|
$this->sendEvent('error', 'Empty prompt');
|
||||||
$this->sendEvent('done', '[DONE]');
|
$this->sendDoneEvent();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -216,12 +238,12 @@ final readonly class AskSseController
|
|||||||
$this->appendHistoryFailure($clientId, $prompt, $message);
|
$this->appendHistoryFailure($clientId, $prompt, $message);
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->sendEvent('done', '[DONE]');
|
$this->sendDoneEvent();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->markJobStatus($jobId, self::JOB_STATUS_COMPLETED);
|
$this->markJobStatus($jobId, self::JOB_STATUS_COMPLETED);
|
||||||
$this->sendEvent('done', '[DONE]');
|
$this->sendDoneEvent();
|
||||||
}
|
}
|
||||||
|
|
||||||
private function appendHistoryFailure(string $clientId, string $prompt, string $message): void
|
private function appendHistoryFailure(string $clientId, string $prompt, string $message): void
|
||||||
@@ -267,7 +289,7 @@ final readonly class AskSseController
|
|||||||
|
|
||||||
$this->markJobStatus($jobId, self::JOB_STATUS_FAILED, $message);
|
$this->markJobStatus($jobId, self::JOB_STATUS_FAILED, $message);
|
||||||
$this->sendEvent('error', $message);
|
$this->sendEvent('error', $message);
|
||||||
$this->sendEvent('done', '[DONE]');
|
$this->sendDoneEvent();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -355,6 +377,15 @@ final readonly class AskSseController
|
|||||||
$this->flushOutput();
|
$this->flushOutput();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private function sendDoneEvent(): void
|
||||||
|
{
|
||||||
|
$this->sendEvent('done', '[DONE]');
|
||||||
|
|
||||||
|
if (function_exists('fastcgi_finish_request')) {
|
||||||
|
@fastcgi_finish_request();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private function sendComment(string $comment): void
|
private function sendComment(string $comment): void
|
||||||
{
|
{
|
||||||
$safe = str_replace(["\r", "\n"], ' ', $comment);
|
$safe = str_replace(["\r", "\n"], ' ', $comment);
|
||||||
@@ -605,7 +636,7 @@ final readonly class AskSseController
|
|||||||
|
|
||||||
if ($status === self::JOB_STATUS_COMPLETED) {
|
if ($status === self::JOB_STATUS_COMPLETED) {
|
||||||
$this->sendComment('replayed-completed-stream');
|
$this->sendComment('replayed-completed-stream');
|
||||||
$this->sendEvent('done', '[DONE]');
|
$this->sendDoneEvent();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -616,7 +647,7 @@ final readonly class AskSseController
|
|||||||
? 'Der Antwort-Stream ist fehlgeschlagen: ' . $message
|
? 'Der Antwort-Stream ist fehlgeschlagen: ' . $message
|
||||||
: 'Der Antwort-Stream ist fehlgeschlagen. Bitte sende die Anfrage erneut.'
|
: 'Der Antwort-Stream ist fehlgeschlagen. Bitte sende die Anfrage erneut.'
|
||||||
);
|
);
|
||||||
$this->sendEvent('done', '[DONE]');
|
$this->sendDoneEvent();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -627,7 +658,7 @@ final readonly class AskSseController
|
|||||||
? $message
|
? $message
|
||||||
: 'Der Antwort-Stream wurde durch einen Verbindungsabbruch unterbrochen. Bitte sende die Anfrage erneut, falls die Antwort unvollständig ist.'
|
: 'Der Antwort-Stream wurde durch einen Verbindungsabbruch unterbrochen. Bitte sende die Anfrage erneut, falls die Antwort unvollständig ist.'
|
||||||
);
|
);
|
||||||
$this->sendEvent('done', '[DONE]');
|
$this->sendDoneEvent();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -637,7 +668,7 @@ final readonly class AskSseController
|
|||||||
'status' => $status,
|
'status' => $status,
|
||||||
'message' => $message,
|
'message' => $message,
|
||||||
]));
|
]));
|
||||||
$this->sendEvent('done', '[DONE]');
|
$this->sendDoneEvent();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user