diff --git a/RETRIEX_SHOP_META_CONTEXT_AND_SSE_REPLAY_FIX_README.md b/RETRIEX_SHOP_META_CONTEXT_AND_SSE_REPLAY_FIX_README.md
new file mode 100644
index 0000000..68b4971
--- /dev/null
+++ b/RETRIEX_SHOP_META_CONTEXT_AND_SSE_REPLAY_FIX_README.md
@@ -0,0 +1,84 @@
+# RetrieX Shop-Meta-Context + SSE-Reconnect-Replay Fix
+
+## Problem
+
+Kurze referenzielle Shop-Befehle wie:
+
+```text
+suche im shop
+```
+
+konnten trotz vorheriger Antwort scheitern mit:
+
+```text
+Ich habe keine konkrete Shop-Suchanfrage erkannt.
+```
+
+Das passierte besonders nach langen oder reconnect-anfälligen SSE-Antworten, weil der Shop-Fallback vollständig von der serverseitig bereits geschriebenen History und einer stabilen Client-ID abhängig war.
+
+Zusätzlich war das Job-ID-/SSE-Verhalten fragil: ein automatischer `EventSource`-Reconnect auf einen bereits laufenden Job konnte weiterhin als Duplicate-Stream behandelt werden.
+
+## Lösung
+
+### 1. SSE-Reconnect-Replay
+
+- Antwort-Chunks werden pro Job mit fortlaufender SSE-`id` gesendet.
+- Chunks werden zusätzlich in `var/stream_jobs/*.stream.ndjson` gepuffert.
+- Reconnects mit `Last-Event-ID` replayen fehlende Chunks und tailen den laufenden Job.
+- Doppelte Events werden im Frontend defensiv ignoriert.
+- Completed/failed/interrupted Jobs werden sauber beendet.
+
+### 2. Ephemeral Client Context Hint
+
+- Das Frontend merkt sich den zuletzt vollständig abgeschlossenen Turn.
+- `/ask-jobs` sendet diesen Turn als `contextHint` mit.
+- Der Server speichert diesen Hint im Stream-Job und reicht ihn an `AgentRunner` weiter.
+- `AgentRunner` hängt den Hint nur temporär an den Commerce-History-Kontext an.
+- Der Hint wird nicht als eigener History-Eintrag persistiert.
+
+Damit kann `suche im shop` auch dann auf die letzte Nutzerfrage/Antwort zurückfallen, wenn die serverseitige History gerade noch nicht zuverlässig greift oder die Client-ID-/Cookie-Situation im Browser wackelt.
+
+### 3. Shop-Fallback-Tokenfilter geschärft
+
+- `messung` wird im Shop-Context-Fallback nicht mehr entfernt, weil Begriffe wie `redox messung` fachlich relevant sind.
+- Zusätzliche Füllwörter wie `ist`, `sind`, `gut`, `geeignet` werden entfernt.
+
+Beispiel aus dem Fehlerfall:
+
+```text
+welcher pockettester ist für Redox messung gut
+suche im shop
+```
+
+Der zweite Befehl kann nun aus dem Kontext wieder auf `pockettester redox messung` bzw. eine vergleichbare Shop-Query kommen, statt ohne konkrete Shop-Suchanfrage abzubrechen.
+
+## Geänderte Dateien
+
+```text
+src/Controller/AskSseController.php
+src/Agent/AgentRunner.php
+src/Config/AgentRunnerConfig.php
+config/retriex/agent.yaml
+public/assets/js/base.js
+```
+
+## Prüfung
+
+```bash
+php -n -l src/Controller/AskSseController.php
+php -n -l src/Agent/AgentRunner.php
+php -n -l src/Config/AgentRunnerConfig.php
+node --check public/assets/js/base.js
+```
+
+`vendor/autoload.php` war in der ZIP-Arbeitskopie nicht enthalten, daher wurde `bin/console mto:agent:config:validate` hier nicht ausgeführt.
+
+## Nach dem Einspielen
+
+```bash
+php bin/console cache:clear
+php bin/console mto:agent:config:validate
+php bin/console mto:agent:regression:test
+```
+
+Danach den Browser hart neu laden, weil `public/assets/js/base.js` geändert wurde.
diff --git a/RETRIEX_SHOP_META_CONTEXT_HINT_FRONTEND_FIX_README.md b/RETRIEX_SHOP_META_CONTEXT_HINT_FRONTEND_FIX_README.md
new file mode 100644
index 0000000..1569886
--- /dev/null
+++ b/RETRIEX_SHOP_META_CONTEXT_HINT_FRONTEND_FIX_README.md
@@ -0,0 +1,62 @@
+# RetrieX Shop Meta Context Hint Frontend Fix
+
+Patch-only fix for the referential shop follow-up flow:
+
+```text
+welcher pockettester ist fuer Redox messung gut
+suche im shop
+```
+
+## Problem
+
+The backend fallback for meta-only shop prompts such as `suche im shop` can only resolve a concrete shop query when the previous turn is available through server history or through the frontend `contextHint`.
+
+The previous frontend hint used only the in-memory `lastCompletedUserPrompt` / `lastCompletedAssistantText` state. If that state was empty, overwritten, or lost after a reload/reconnect sequence, the next `/ask-jobs` request sent an empty hint. The backend then had no concrete product/search context and returned:
+
+```text
+Ich habe keine konkrete Shop-Suchanfrage erkannt. Bitte nenne das Produkt, Zubehör oder die Artikelnummer.
+```
+
+A failed meta-only turn could also overwrite the frontend's last useful context, making immediate retries fragile.
+
+## Fix
+
+`public/assets/js/base.js` now builds the request `contextHint` more defensively:
+
+1. Reconstruct the latest completed visible chat turn from the DOM before sending a new prompt.
+2. Persist the last completed turn in `sessionStorage` as a per-tab fallback.
+3. Do not overwrite the useful last turn with the generic no-concrete-shop-query response for meta-only prompts.
+4. Clear the stored fallback when the user clears the chat history.
+
+## Scope
+
+Changed file:
+
+```text
+public/assets/js/base.js
+```
+
+No changes to retrieval, scoring, PromptBuilder, AgentRunner, ShopSearchService, SSE job replay, or YAML prompt logic.
+
+## Validation
+
+```bash
+node --check public/assets/js/base.js
+```
+
+Expected regression flow:
+
+```text
+Was ist der niedrigste Grenzwert fuer die Wasserhaerte, welcher mit einem Testomaten ueberwacht werden kann?
+mit welchem indikator wird der wert gemessen
+was kostet der indikator
+```
+
+Expected shop meta flow:
+
+```text
+welcher pockettester ist fuer Redox messung gut
+suche im shop
+```
+
+The second prompt should reuse the previous Redox/Pockettester context and no longer return the generic no-concrete-shop-query message.
diff --git a/RETRIEX_SSE_RECONNECT_REPLAY_FIX_README.md b/RETRIEX_SSE_RECONNECT_REPLAY_FIX_README.md
new file mode 100644
index 0000000..e6940c3
--- /dev/null
+++ b/RETRIEX_SSE_RECONNECT_REPLAY_FIX_README.md
@@ -0,0 +1,43 @@
+# RetrieX SSE reconnect replay fix
+
+Patch-only fix for fragile browser streaming after the job-based EventSource flow.
+
+## Problem
+
+EventSource is allowed to reconnect automatically when the browser, proxy or PHP/Nginx connection is interrupted. The previous job guard treated a second `/ask-sse/{jobId}` connection while the job was still `running` as an application error. In the UI this could append messages like:
+
+```text
+event: error
+data: Der Antwort-Stream läuft bereits oder wurde nach einem Verbindungsabbruch erneut geöffnet...
+```
+
+This happened especially in slower Shopware/search flows, but it could also happen in pure RAG answers.
+
+## Change
+
+- Streamed answer chunks now receive monotonically increasing SSE `id:` values.
+- Each streamed chunk is additionally written to a per-job replay buffer under `var/stream_jobs/*.stream.ndjson`.
+- If EventSource reconnects to a job that is already `running`, the controller no longer emits the misleading duplicate-stream error.
+- Instead, the reconnecting request reads `Last-Event-ID`, replays missing chunks and tails the still-running job until it completes, fails or is marked interrupted.
+- Completed jobs can replay any missing chunks and then emit `done`.
+- The frontend tracks `event.lastEventId` and ignores duplicate/replayed chunks defensively.
+- Expired job cleanup also removes the replay buffer sidecar file.
+
+## Changed files
+
+- `src/Controller/AskSseController.php`
+- `public/assets/js/base.js`
+
+## Safety
+
+This patch does not change retrieval, PromptBuilder, AgentRunner, scoring, intent detection, Shopware query generation or RAG behavior. It only changes the transport/reconnect layer for SSE.
+
+## After installing
+
+```bash
+php bin/console cache:clear
+php bin/console mto:agent:config:validate
+php bin/console mto:agent:regression:test
+```
+
+Hard-refresh the browser or clear browser cache because `public/assets/js/base.js` changed.
diff --git a/config/retriex/agent.yaml b/config/retriex/agent.yaml
index 59fecdd..6fbbd49 100644
--- a/config/retriex/agent.yaml
+++ b/config/retriex/agent.yaml
@@ -101,6 +101,10 @@ parameters:
- welches
- welchem
- welchen
+ - ist
+ - sind
+ - gut
+ - geeignet
- was
- wie
- wo
@@ -123,7 +127,6 @@ parameters:
- fuer
- messen
- gemessen
- - messung
meta_only_terms:
- shop
- shopsuche
diff --git a/public/assets/js/base.js b/public/assets/js/base.js
index 17d267e..13be49a 100644
--- a/public/assets/js/base.js
+++ b/public/assets/js/base.js
@@ -5,6 +5,7 @@ document.addEventListener('DOMContentLoaded', () => {
const abortBtn = document.getElementById('abort');
const clearBtn = document.getElementById('clear');
const aiCloudEl = document.getElementById('ai-cloud');
+ const LAST_TURN_STORAGE_KEY = 'retriex:lastCompletedTurn';
const state = {
abortRequested: false,
@@ -15,6 +16,8 @@ document.addEventListener('DOMContentLoaded', () => {
eventSource: null,
completeStream: null,
failStream: null,
+ lastCompletedUserPrompt: '',
+ lastCompletedAssistantText: '',
};
marked.setOptions({breaks: true});
@@ -23,6 +26,163 @@ document.addEventListener('DOMContentLoaded', () => {
return DOMPurify.sanitize(marked.parse(text));
}
+ function normalizeContextHintText(value) {
+ return String(value || '')
+ .replace(/\r\n/g, '\n')
+ .replace(/\r/g, '\n')
+ .replace(/[\t ]+/g, ' ')
+ .replace(/\n{3,}/g, '\n\n')
+ .trim();
+ }
+
+ function tokenizeClientMetaGuardText(value) {
+ return normalizeContextHintText(value)
+ .toLowerCase()
+ .replace(/[-/_]/g, ' ')
+ .replace(/[^\p{L}\p{N}]+/gu, ' ')
+ .trim()
+ .split(/\s+/u)
+ .filter(Boolean);
+ }
+
+ function isClientMetaOnlyShopPrompt(value) {
+ const tokens = tokenizeClientMetaGuardText(value);
+
+ if (!tokens.length) {
+ return true;
+ }
+
+ const metaTerms = new Set([
+ 'shop', 'shopsuche', 'suche', 'suchen', 'such', 'finde', 'find',
+ 'zeige', 'zeig', 'bitte', 'mal', 'im', 'in', 'nach', 'den', 'die',
+ 'das', 'der', 'dem',
+ ]);
+
+ return tokens.every((token) => metaTerms.has(token));
+ }
+
+ function isNoConcreteShopResponse(value) {
+ return normalizeContextHintText(value)
+ .toLowerCase()
+ .includes('keine konkrete shop-suchanfrage erkannt');
+ }
+
+ function rememberCompletedTurn(userPrompt, assistantText) {
+ const normalizedPrompt = normalizeContextHintText(userPrompt);
+ const normalizedAssistantText = normalizeContextHintText(assistantText);
+
+ if (!normalizedPrompt) {
+ return;
+ }
+
+ if (isClientMetaOnlyShopPrompt(normalizedPrompt) && isNoConcreteShopResponse(normalizedAssistantText)) {
+ return;
+ }
+
+ state.lastCompletedUserPrompt = normalizedPrompt.slice(0, 800);
+ state.lastCompletedAssistantText = normalizedAssistantText.slice(0, 3000);
+
+ try {
+ window.sessionStorage?.setItem(LAST_TURN_STORAGE_KEY, JSON.stringify({
+ userPrompt: state.lastCompletedUserPrompt,
+ assistantText: state.lastCompletedAssistantText,
+ rememberedAt: Date.now(),
+ }));
+ } catch (err) {
+ console.debug('Could not persist last completed turn:', err);
+ }
+ }
+
+ function loadStoredCompletedTurn() {
+ try {
+ const raw = window.sessionStorage?.getItem(LAST_TURN_STORAGE_KEY) || '';
+
+ if (!raw) {
+ return null;
+ }
+
+ const data = JSON.parse(raw);
+ const userPrompt = normalizeContextHintText(data?.userPrompt || '');
+ const assistantText = normalizeContextHintText(data?.assistantText || '');
+
+ if (!userPrompt) {
+ return null;
+ }
+
+ return {
+ userPrompt: userPrompt.slice(0, 800),
+ assistantText: assistantText.slice(0, 3000),
+ };
+ } catch (err) {
+ console.debug('Could not read last completed turn:', err);
+ return null;
+ }
+ }
+
+ function extractLatestVisibleCompletedTurn() {
+ if (!chatEl) {
+ return null;
+ }
+
+ const messages = Array.from(chatEl.querySelectorAll('.message'));
+ let pendingUserPrompt = '';
+ let latestTurn = null;
+
+ messages.forEach((message) => {
+ const bubble = message.querySelector('.bubble');
+ const text = normalizeContextHintText(bubble?.innerText || bubble?.textContent || '');
+
+ if (!text) {
+ return;
+ }
+
+ if (message.classList.contains('user')) {
+ pendingUserPrompt = text;
+ return;
+ }
+
+ if (!message.classList.contains('assistant') || !pendingUserPrompt) {
+ return;
+ }
+
+ if (bubble?.classList.contains('loader')) {
+ return;
+ }
+
+ if (isClientMetaOnlyShopPrompt(pendingUserPrompt) && isNoConcreteShopResponse(text)) {
+ pendingUserPrompt = '';
+ return;
+ }
+
+ latestTurn = {
+ userPrompt: pendingUserPrompt,
+ assistantText: text,
+ };
+ pendingUserPrompt = '';
+ });
+
+ return latestTurn;
+ }
+
+ function buildClientContextHint() {
+ const visibleTurn = extractLatestVisibleCompletedTurn();
+ const storedTurn = loadStoredCompletedTurn();
+ const userPrompt = visibleTurn?.userPrompt || state.lastCompletedUserPrompt || storedTurn?.userPrompt || '';
+ const assistantText = visibleTurn?.assistantText || state.lastCompletedAssistantText || storedTurn?.assistantText || '';
+
+ if (!userPrompt) {
+ return '';
+ }
+
+ const lines = [`Question: ${userPrompt.slice(0, 800)}`];
+
+ if (assistantText) {
+ lines.push(assistantText.slice(0, 3000));
+ }
+
+ return normalizeContextHintText(lines.join('\n')).slice(0, 4000);
+ }
+
function scrollChatToBottom() {
if (!chatEl) {
return;
@@ -368,10 +528,20 @@ document.addEventListener('DOMContentLoaded', () => {
}
const messages = await res.json();
+ let latestLoadedUserPrompt = '';
messages.forEach((message) => {
const bubble = addMessage(message.role);
renderBubbleContent(bubble, message.text);
+
+ if (message.role === 'user') {
+ latestLoadedUserPrompt = normalizeContextHintText(message.text);
+ return;
+ }
+
+ if (message.role === 'assistant' && latestLoadedUserPrompt) {
+ rememberCompletedTurn(latestLoadedUserPrompt, message.text);
+ }
});
enhanceChatLinks(chatEl);
@@ -396,6 +566,8 @@ document.addEventListener('DOMContentLoaded', () => {
return;
}
+ const contextHint = buildClientContextHint();
+
addMessage('user', renderMarkdown(prompt));
promptEl.value = '';
@@ -449,6 +621,7 @@ document.addEventListener('DOMContentLoaded', () => {
body: JSON.stringify({
prompt,
fullContext: false,
+ contextHint,
}),
signal: state.abortController.signal,
});
@@ -466,6 +639,7 @@ document.addEventListener('DOMContentLoaded', () => {
await new Promise((resolve, reject) => {
let finished = false;
+ let lastSseEventId = 0;
const source = new EventSource(`/ask-sse/${encodeURIComponent(jobId)}`);
state.eventSource = source;
@@ -521,12 +695,23 @@ document.addEventListener('DOMContentLoaded', () => {
return;
}
+ const numericEventId = Number.parseInt(event.lastEventId || '', 10);
+
+ if (Number.isFinite(numericEventId) && numericEventId > 0) {
+ if (numericEventId <= lastSseEventId) {
+ return;
+ }
+
+ lastSseEventId = numericEventId;
+ }
+
appendChunk(event.data);
};
source.addEventListener('done', () => {
if (!state.abortRequested) {
finalizeStream(bubble, raw);
+ rememberCompletedTurn(prompt, raw);
}
complete();
@@ -609,6 +794,13 @@ document.addEventListener('DOMContentLoaded', () => {
console.error('History delete failed:', err);
}
+ state.lastCompletedUserPrompt = '';
+ state.lastCompletedAssistantText = '';
+ try {
+ window.sessionStorage?.removeItem(LAST_TURN_STORAGE_KEY);
+ } catch (err) {
+ console.debug('Could not clear last completed turn:', err);
+ }
chatEl.innerHTML = '';
addMessage('assistant', 'History cleared.');
});
diff --git a/src/Agent/AgentRunner.php b/src/Agent/AgentRunner.php
index 4e08bf6..c44d48a 100644
--- a/src/Agent/AgentRunner.php
+++ b/src/Agent/AgentRunner.php
@@ -39,7 +39,7 @@ final readonly class AgentRunner
$this->systemMsgOn = true;
}
- public function run(string $prompt, string $userId, bool $forceFullContext = false): Generator
+ public function run(string $prompt, string $userId, bool $forceFullContext = false, string $requestContextHint = ''): Generator
{
$prompt = trim($prompt);
@@ -109,7 +109,7 @@ final readonly class AgentRunner
if ($this->isCommerceIntent($commerceIntent)) {
yield $this->systemMsg($this->agentRunnerConfig->getOptimizeSearchMessage(), 'think');
- $commerceHistoryContext = $this->buildCommerceHistoryContext($userId);
+ $commerceHistoryContext = $this->buildCommerceHistoryContext($userId, $requestContextHint);
if ($commerceHistoryContext !== '') {
$this->addSource($sources, $this->agentRunnerConfig->getConversationHistorySourceLabel());
@@ -136,6 +136,7 @@ final readonly class AgentRunner
'optimizedShopQuery' => $optimizedShopQuery,
'hasCommerceHistoryContext' => $commerceHistoryContext !== '',
'commerceHistoryContextLength' => mb_strlen($commerceHistoryContext),
+ 'hasRequestContextHint' => trim($requestContextHint) !== '',
]);
yield $this->systemMsg(
@@ -925,12 +926,42 @@ final readonly class AgentRunner
}
}
- private function buildCommerceHistoryContext(string $userId): string
+ private function buildCommerceHistoryContext(string $userId, string $requestContextHint = ''): string
{
- return $this->contextService->buildUserContextWithinBudget(
+ $history = $this->contextService->buildUserContextWithinBudget(
$userId,
$this->agentRunnerConfig->getCommerceHistoryBudgetChars()
);
+
+ $requestContextHint = $this->sanitizeRequestContextHintForCommerce($requestContextHint);
+
+ if ($requestContextHint === '') {
+ return $history;
+ }
+
+ if ($history === '') {
+ return $requestContextHint;
+ }
+
+ return trim($history) . "\n\n" . $requestContextHint;
+ }
+
+ private function sanitizeRequestContextHintForCommerce(string $requestContextHint): string
+ {
+ $requestContextHint = str_replace(["\r\n", "\r"], "\n", $requestContextHint);
+ $requestContextHint = preg_replace('/[\t ]+/u', ' ', $requestContextHint) ?? $requestContextHint;
+ $requestContextHint = preg_replace('/\n{3,}/u', "\n\n", $requestContextHint) ?? $requestContextHint;
+ $requestContextHint = trim($requestContextHint);
+
+ if ($requestContextHint === '') {
+ return '';
+ }
+
+ if (mb_strlen($requestContextHint, 'UTF-8') > 4000) {
+ $requestContextHint = mb_substr($requestContextHint, 0, 4000, 'UTF-8');
+ }
+
+ return trim($requestContextHint);
}
private function limitKnowledgeChunks(array $knowledgeChunks, string $commerceIntent): array
diff --git a/src/Config/AgentRunnerConfig.php b/src/Config/AgentRunnerConfig.php
index 50870aa..6c4919c 100644
--- a/src/Config/AgentRunnerConfig.php
+++ b/src/Config/AgentRunnerConfig.php
@@ -460,6 +460,10 @@ final class AgentRunnerConfig
'welches',
'welchem',
'welchen',
+ 'ist',
+ 'sind',
+ 'gut',
+ 'geeignet',
'was',
'wie',
'wo',
@@ -482,7 +486,6 @@ final class AgentRunnerConfig
'fuer',
'messen',
'gemessen',
- 'messung',
]);
}
diff --git a/src/Controller/AskSseController.php b/src/Controller/AskSseController.php
index 6855877..6712d6d 100644
--- a/src/Controller/AskSseController.php
+++ b/src/Controller/AskSseController.php
@@ -51,6 +51,8 @@ final readonly class AskSseController
FILTER_VALIDATE_BOOL
);
+ $requestContextHint = $this->sanitizeRequestContextHint((string) ($data['contextHint'] ?? ''));
+
$cookieResponse = new Response();
$clientId = $this->clientIdResolver->resolve($request, $cookieResponse);
@@ -63,6 +65,7 @@ final readonly class AskSseController
'prompt' => $prompt,
'clientId' => $clientId,
'includeFullContext' => $includeFullContext,
+ 'requestContextHint' => $requestContextHint,
'createdAt' => $now,
'updatedAt' => $now,
]);
@@ -83,19 +86,20 @@ final readonly class AskSseController
}
#[Route('/ask-sse/{jobId}', name: 'ask_sse_job', methods: ['GET'], requirements: ['jobId' => '[a-f0-9]{48}'])]
- public function streamJob(string $jobId): StreamedResponse
+ public function streamJob(Request $request, string $jobId): StreamedResponse
{
+ $lastEventId = $this->resolveLastEventId($request);
+
return new StreamedResponse(
- function () use ($jobId): void {
+ function () use ($jobId, $lastEventId): void {
$claimed = $this->claimJob($jobId);
if (($claimed['ok'] ?? false) !== true) {
$this->prepareStreamRuntime();
- echo "retry: 15000\n\n";
+ echo "retry: 30000\n\n";
- if ($this->shouldSilentlyCloseDuplicateJobStream($claimed)) {
- $this->sendComment('duplicate-or-finished-stream');
- $this->sendEvent('done', '[DONE]');
+ if ($this->canReplayOrTailClaimedJob($claimed)) {
+ $this->streamStoredJobResponse($jobId, $lastEventId);
return;
}
@@ -112,7 +116,8 @@ final readonly class AskSseController
clientId: (string) ($job['clientId'] ?? ''),
includeFullContext: (bool) ($job['includeFullContext'] ?? false),
cookieResponse: null,
- jobId: $jobId
+ jobId: $jobId,
+ requestContextHint: is_string($job['requestContextHint'] ?? null) ? (string) $job['requestContextHint'] : ''
);
},
Response::HTTP_OK,
@@ -136,17 +141,20 @@ final readonly class AskSseController
FILTER_VALIDATE_BOOL
);
+ $requestContextHint = $this->sanitizeRequestContextHint((string) ($data['contextHint'] ?? ''));
+
$cookieResponse = new Response();
$clientId = $this->clientIdResolver->resolve($request, $cookieResponse);
return new StreamedResponse(
- function () use ($prompt, $clientId, $cookieResponse, $includeFullContext): void {
+ function () use ($prompt, $clientId, $cookieResponse, $includeFullContext, $requestContextHint): void {
$this->streamAgentResponse(
prompt: $prompt,
clientId: $clientId,
includeFullContext: $includeFullContext,
cookieResponse: $cookieResponse,
- jobId: null
+ jobId: null,
+ requestContextHint: $requestContextHint
);
},
Response::HTTP_OK,
@@ -159,7 +167,8 @@ final readonly class AskSseController
string $clientId,
bool $includeFullContext,
?Response $cookieResponse,
- ?string $jobId = null
+ ?string $jobId = null,
+ string $requestContextHint = ''
): void {
$this->prepareStreamRuntime();
$this->registerStreamShutdownErrorHandler($jobId);
@@ -181,7 +190,7 @@ final readonly class AskSseController
}
try {
- foreach ($this->agentRunner->run($prompt, $clientId, $includeFullContext) as $chunk) {
+ foreach ($this->agentRunner->run($prompt, $clientId, $includeFullContext, $requestContextHint) as $chunk) {
if (connection_aborted() === 1) {
$this->markJobStatus(
$jobId,
@@ -192,7 +201,8 @@ final readonly class AskSseController
}
$chunk = str_replace(["\r\n", "\r"], "\n", $chunk);
- $this->sendData($chunk);
+ $eventId = $this->appendJobOutput($jobId, $chunk);
+ $this->sendData($chunk, $eventId);
}
} catch (\Throwable $e) {
$message = 'Stream abgebrochen: ' . $this->formatThrowableForClient($e);
@@ -261,6 +271,24 @@ final readonly class AskSseController
});
}
+ private function sanitizeRequestContextHint(string $contextHint): string
+ {
+ $contextHint = str_replace(["\r\n", "\r"], "\n", $contextHint);
+ $contextHint = preg_replace('/[\t ]+/u', ' ', $contextHint) ?? $contextHint;
+ $contextHint = preg_replace('/\n{3,}/u', "\n\n", $contextHint) ?? $contextHint;
+ $contextHint = trim($contextHint);
+
+ if ($contextHint === '') {
+ return '';
+ }
+
+ if (mb_strlen($contextHint, 'UTF-8') > 4000) {
+ $contextHint = mb_substr($contextHint, 0, 4000, 'UTF-8');
+ }
+
+ return trim($contextHint);
+ }
+
private function formatThrowableForClient(\Throwable $e): string
{
$message = trim($e->getMessage());
@@ -297,13 +325,17 @@ final readonly class AskSseController
];
}
- private function sendData(string $data): void
+ private function sendData(string $data, ?int $eventId = null): void
{
if ($data === '') {
$this->sendComment('keepalive');
return;
}
+ if ($eventId !== null && $eventId > 0) {
+ echo 'id: ' . $eventId . "\n";
+ }
+
$lines = explode("\n", $data);
foreach ($lines as $line) {
@@ -511,22 +543,239 @@ final readonly class AskSseController
}
}
}
+ private function resolveLastEventId(Request $request): int
+ {
+ $header = trim((string) $request->headers->get('Last-Event-ID', ''));
+
+ if ($header === '' || !ctype_digit($header)) {
+ return 0;
+ }
+
+ return max(0, (int) $header);
+ }
+
/**
- * EventSource may reconnect to an already running or already completed job.
- * Those duplicate connections should be closed quietly so the UI does not
- * append a misleading error after the real stream already produced output.
- *
* @param array $claim
*/
- private function shouldSilentlyCloseDuplicateJobStream(array $claim): bool
+ private function canReplayOrTailClaimedJob(array $claim): bool
{
if (($claim['reason'] ?? null) !== 'not_pending') {
return false;
}
- $status = (string) ($claim['status'] ?? '');
+ return in_array(
+ (string) ($claim['status'] ?? ''),
+ [
+ self::JOB_STATUS_RUNNING,
+ self::JOB_STATUS_COMPLETED,
+ self::JOB_STATUS_INTERRUPTED,
+ self::JOB_STATUS_FAILED,
+ ],
+ true
+ );
+ }
- return $status === self::JOB_STATUS_COMPLETED;
+ private function streamStoredJobResponse(string $jobId, int $lastEventId): void
+ {
+ $afterEventId = max(0, $lastEventId);
+ $lastKeepaliveAt = 0;
+
+ while (true) {
+ if (connection_aborted() === 1) {
+ return;
+ }
+
+ foreach ($this->readJobOutputAfter($jobId, $afterEventId) as $event) {
+ $eventId = (int) ($event['id'] ?? 0);
+ $data = is_string($event['data'] ?? null) ? (string) $event['data'] : '';
+
+ if ($eventId <= $afterEventId || $data === '') {
+ continue;
+ }
+
+ $this->sendData($data, $eventId);
+ $afterEventId = $eventId;
+ }
+
+ $job = $this->readJob($jobId);
+ $status = is_array($job) ? (string) ($job['status'] ?? '') : '';
+ $message = is_array($job) && is_string($job['message'] ?? null)
+ ? trim((string) $job['message'])
+ : '';
+
+ if ($status === self::JOB_STATUS_COMPLETED) {
+ $this->sendComment('replayed-completed-stream');
+ $this->sendEvent('done', '[DONE]');
+ return;
+ }
+
+ if ($status === self::JOB_STATUS_FAILED) {
+ $this->sendEvent(
+ 'error',
+ $message !== ''
+ ? 'Der Antwort-Stream ist fehlgeschlagen: ' . $message
+ : 'Der Antwort-Stream ist fehlgeschlagen. Bitte sende die Anfrage erneut.'
+ );
+ $this->sendEvent('done', '[DONE]');
+ return;
+ }
+
+ if ($status === self::JOB_STATUS_INTERRUPTED) {
+ $this->sendEvent(
+ 'error',
+ $message !== ''
+ ? $message
+ : 'Der Antwort-Stream wurde durch einen Verbindungsabbruch unterbrochen. Bitte sende die Anfrage erneut, falls die Antwort unvollständig ist.'
+ );
+ $this->sendEvent('done', '[DONE]');
+ return;
+ }
+
+ if ($status !== self::JOB_STATUS_RUNNING) {
+ $this->sendEvent('error', $this->jobClaimErrorMessage([
+ 'reason' => 'not_pending',
+ 'status' => $status,
+ 'message' => $message,
+ ]));
+ $this->sendEvent('done', '[DONE]');
+ return;
+ }
+
+ if (time() - $lastKeepaliveAt >= 10) {
+ $this->sendComment('waiting-for-running-stream');
+ $lastKeepaliveAt = time();
+ }
+
+ usleep(250000);
+ }
+ }
+
+ /**
+ * @return list
+ */
+ private function readJobOutputAfter(string $jobId, int $afterEventId): array
+ {
+ $path = $this->jobOutputPath($jobId);
+
+ if (!is_file($path)) {
+ return [];
+ }
+
+ $lines = @file($path, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES);
+
+ if (!is_array($lines)) {
+ return [];
+ }
+
+ $events = [];
+
+ foreach ($lines as $line) {
+ if (!is_string($line) || trim($line) === '') {
+ continue;
+ }
+
+ $decoded = json_decode($line, true);
+
+ if (!is_array($decoded)) {
+ continue;
+ }
+
+ $id = (int) ($decoded['id'] ?? 0);
+ $data = is_string($decoded['data'] ?? null) ? (string) $decoded['data'] : '';
+
+ if ($id > $afterEventId && $data !== '') {
+ $events[] = ['id' => $id, 'data' => $data];
+ }
+ }
+
+ return $events;
+ }
+
+ private function appendJobOutput(?string $jobId, string $data): ?int
+ {
+ if ($jobId === null || $data === '' || !preg_match('/\A[a-f0-9]{48}\z/', $jobId)) {
+ return null;
+ }
+
+ $eventId = null;
+
+ try {
+ $this->mutateJobWithLock($jobId, function (?array $job) use (&$eventId): array {
+ if ($job === null) {
+ return [
+ 'persist' => false,
+ 'result' => ['ok' => false],
+ ];
+ }
+
+ $eventId = max(0, (int) ($job['lastEventId'] ?? 0)) + 1;
+ $job['lastEventId'] = $eventId;
+ $job['updatedAt'] = time();
+
+ return [
+ 'data' => $job,
+ 'result' => ['ok' => true],
+ ];
+ });
+
+ if ($eventId === null || $eventId <= 0) {
+ return null;
+ }
+
+ $line = json_encode(
+ ['id' => $eventId, 'data' => $data],
+ JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES
+ ) . "\n";
+
+ if (file_put_contents($this->jobOutputPath($jobId), $line, FILE_APPEND | LOCK_EX) === false) {
+ return null;
+ }
+ } catch (\Throwable) {
+ return null;
+ }
+
+ return $eventId;
+ }
+
+ /**
+ * @return array|null
+ */
+ private function readJob(string $jobId): ?array
+ {
+ if (!preg_match('/\A[a-f0-9]{48}\z/', $jobId)) {
+ return null;
+ }
+
+ $path = $this->jobPath($jobId);
+
+ if (!is_file($path)) {
+ return null;
+ }
+
+ $handle = @fopen($path, 'r');
+
+ if ($handle === false) {
+ return null;
+ }
+
+ try {
+ if (!flock($handle, LOCK_SH)) {
+ return null;
+ }
+
+ $content = stream_get_contents($handle);
+ flock($handle, LOCK_UN);
+ } finally {
+ fclose($handle);
+ }
+
+ if (!is_string($content) || trim($content) === '') {
+ return null;
+ }
+
+ $decoded = json_decode($content, true);
+
+ return is_array($decoded) ? $decoded : null;
}
/**
@@ -590,7 +839,9 @@ final readonly class AskSseController
$mtime = filemtime($path);
if ($mtime === false || $mtime < $threshold) {
+ $base = preg_replace('/\.json\z/', '', $path) ?? $path;
@unlink($path);
+ @unlink($base . '.stream.ndjson');
}
}
}
@@ -600,6 +851,11 @@ final readonly class AskSseController
return $this->jobDirectory() . '/' . $jobId . '.json';
}
+ private function jobOutputPath(string $jobId): string
+ {
+ return $this->jobDirectory() . '/' . $jobId . '.stream.ndjson';
+ }
+
private function jobDirectory(): string
{
return rtrim($this->projectDir, '/\\') . '/var/stream_jobs';