getContent(), true); $prompt = trim((string) ($data['prompt'] ?? '')); if ($prompt === '') { return new JsonResponse(['error' => 'Empty prompt'], Response::HTTP_BAD_REQUEST); } /** * Default: * - Browser chat uses budgeted recent context * - Full context must be explicitly requested */ $includeFullContext = filter_var( $data['fullContext'] ?? false, FILTER_VALIDATE_BOOL ); $requestContextHint = $this->sanitizeRequestContextHint((string) ($data['contextHint'] ?? '')); $cookieResponse = new Response(); $clientId = $this->clientIdResolver->resolve($request, $cookieResponse); try { $this->cleanupExpiredJobs(); $jobId = bin2hex(random_bytes(24)); $now = time(); $this->writeJob($jobId, [ 'status' => self::JOB_STATUS_PENDING, 'prompt' => $prompt, 'clientId' => $clientId, 'includeFullContext' => $includeFullContext, 'requestContextHint' => $requestContextHint, 'createdAt' => $now, 'updatedAt' => $now, ]); } catch (\Throwable $e) { return new JsonResponse( ['error' => 'Stream job could not be created: ' . $this->formatThrowableForClient($e)], Response::HTTP_INTERNAL_SERVER_ERROR ); } $response = new JsonResponse(['jobId' => $jobId]); foreach ($cookieResponse->headers->getCookies() as $cookie) { $response->headers->setCookie($cookie); } return $response; } #[Route('/ask-sse/{jobId}', name: 'ask_sse_job', methods: ['GET'], requirements: ['jobId' => '[a-f0-9]{48}'])] public function streamJob(Request $request, string $jobId): StreamedResponse { $lastEventId = $this->resolveLastEventId($request); return new StreamedResponse( function () use ($jobId, $lastEventId): void { $claimed = $this->claimJob($jobId); if (($claimed['ok'] ?? false) !== true) { $this->prepareStreamRuntime(); echo "retry: 30000\n\n"; if ($this->canReplayOrTailClaimedJob($claimed)) { $this->streamStoredJobResponse($jobId, $lastEventId); return; } $this->sendEvent('error', $this->jobClaimErrorMessage($claimed)); $this->sendEvent('done', '[DONE]'); return; } /** @var array $job */ $job = $claimed['job']; $this->streamAgentResponse( prompt: (string) ($job['prompt'] ?? ''), clientId: (string) ($job['clientId'] ?? ''), includeFullContext: (bool) ($job['includeFullContext'] ?? false), cookieResponse: null, jobId: $jobId, requestContextHint: is_string($job['requestContextHint'] ?? null) ? (string) $job['requestContextHint'] : '' ); }, Response::HTTP_OK, $this->streamHeaders() ); } #[Route('/ask-sse', name: 'ask_sse', methods: ['POST'])] public function stream(Request $request): StreamedResponse { $data = json_decode($request->getContent(), true); $prompt = trim((string) ($data['prompt'] ?? '')); /** * Default: * - Browser chat uses budgeted recent context * - Full context must be explicitly requested */ $includeFullContext = filter_var( $data['fullContext'] ?? false, FILTER_VALIDATE_BOOL ); $requestContextHint = $this->sanitizeRequestContextHint((string) ($data['contextHint'] ?? '')); $cookieResponse = new Response(); $clientId = $this->clientIdResolver->resolve($request, $cookieResponse); return new StreamedResponse( function () use ($prompt, $clientId, $cookieResponse, $includeFullContext, $requestContextHint): void { $this->streamAgentResponse( prompt: $prompt, clientId: $clientId, includeFullContext: $includeFullContext, cookieResponse: $cookieResponse, jobId: null, requestContextHint: $requestContextHint ); }, Response::HTTP_OK, $this->streamHeaders() ); } private function streamAgentResponse( string $prompt, string $clientId, bool $includeFullContext, ?Response $cookieResponse, ?string $jobId = null, string $requestContextHint = '' ): void { $this->prepareStreamRuntime(); $this->registerStreamShutdownErrorHandler($jobId); if ($cookieResponse !== null) { foreach ($cookieResponse->headers->getCookies() as $cookie) { header('Set-Cookie: ' . $cookie, false); } } echo "retry: 15000\n\n"; $this->sendComment('stream-open'); if ($prompt === '') { $this->markJobStatus($jobId, self::JOB_STATUS_FAILED, 'Empty prompt'); $this->sendEvent('error', 'Empty prompt'); $this->sendEvent('done', '[DONE]'); return; } try { foreach ($this->agentRunner->run($prompt, $clientId, $includeFullContext, $requestContextHint) as $chunk) { if (connection_aborted() === 1) { $this->markJobStatus( $jobId, self::JOB_STATUS_INTERRUPTED, 'Die Verbindung zum Antwort-Stream wurde unterbrochen.' ); return; } $chunk = str_replace(["\r\n", "\r"], "\n", $chunk); $eventId = $this->appendJobOutput($jobId, $chunk); $this->sendData($chunk, $eventId); } } catch (\Throwable $e) { $message = 'Stream abgebrochen: ' . $this->formatThrowableForClient($e); $this->markJobStatus($jobId, self::JOB_STATUS_FAILED, $message); $this->sendEvent( 'error', '❌ ' . $message ); if ($prompt !== '' && $clientId !== '') { $this->appendHistoryFailure($clientId, $prompt, $message); } $this->sendEvent('done', '[DONE]'); return; } $this->markJobStatus($jobId, self::JOB_STATUS_COMPLETED); $this->sendEvent('done', '[DONE]'); } private function appendHistoryFailure(string $clientId, string $prompt, string $message): void { try { $message = trim(preg_replace('/\s+/u', ' ', $message) ?? $message); if ($message === '') { $message = 'Unbekannter Streamfehler.'; } $this->contextService->appendHistory( $clientId, $prompt, 'Systemhinweis: Antwort konnte nicht abgeschlossen werden. Ursache: ' . $message ); } catch (\Throwable) { // History persistence must never break the SSE error response. } } private function registerStreamShutdownErrorHandler(?string $jobId = null): void { register_shutdown_function(function () use ($jobId): void { $error = error_get_last(); if ($error === null) { return; } $fatalTypes = [E_ERROR, E_PARSE, E_CORE_ERROR, E_COMPILE_ERROR, E_USER_ERROR]; if (!in_array((int) ($error['type'] ?? 0), $fatalTypes, true)) { return; } $message = sprintf( '❌ Fataler Serverfehler: %s in %s:%s', (string) ($error['message'] ?? 'unknown error'), (string) ($error['file'] ?? 'unknown file'), (string) ($error['line'] ?? '?') ); $this->markJobStatus($jobId, self::JOB_STATUS_FAILED, $message); $this->sendEvent('error', $message); $this->sendEvent('done', '[DONE]'); }); } private function sanitizeRequestContextHint(string $contextHint): string { $contextHint = str_replace(["\r\n", "\r"], "\n", $contextHint); $contextHint = preg_replace('/[\t ]+/u', ' ', $contextHint) ?? $contextHint; $contextHint = preg_replace('/\n{3,}/u', "\n\n", $contextHint) ?? $contextHint; $contextHint = trim($contextHint); if ($contextHint === '') { return ''; } if (mb_strlen($contextHint, 'UTF-8') > 4000) { $contextHint = mb_substr($contextHint, 0, 4000, 'UTF-8'); } return trim($contextHint); } private function formatThrowableForClient(\Throwable $e): string { $message = trim($e->getMessage()); if ($message === '') { $message = $e::class; } return preg_replace('/\s+/u', ' ', $message) ?? $message; } private function prepareStreamRuntime(): void { @set_time_limit(0); @ini_set('output_buffering', 'off'); @ini_set('zlib.output_compression', '0'); while (ob_get_level() > 0) { ob_end_flush(); } } /** * @return array */ private function streamHeaders(): array { return [ 'Content-Type' => 'text/event-stream; charset=utf-8', 'Cache-Control' => 'no-cache, no-store, must-revalidate, no-transform', 'Connection' => 'keep-alive', 'X-Accel-Buffering' => 'no', 'X-Content-Type-Options' => 'nosniff', ]; } private function sendData(string $data, ?int $eventId = null): void { if ($data === '') { $this->sendComment('keepalive'); return; } if ($eventId !== null && $eventId > 0) { echo 'id: ' . $eventId . "\n"; } $lines = explode("\n", $data); foreach ($lines as $line) { echo 'data: ' . $line . "\n"; } echo "\n"; $this->flushOutput(); } private function sendEvent(string $event, string $data): void { $safe = str_replace(["\r", "\n"], ' ', $data); echo "event: {$event}\n"; echo "data: {$safe}\n\n"; $this->flushOutput(); } private function sendComment(string $comment): void { $safe = str_replace(["\r", "\n"], ' ', $comment); echo ': ' . $safe . "\n\n"; $this->flushOutput(); } private function flushOutput(): void { if (function_exists('ob_flush')) { @ob_flush(); } @flush(); } /** * @param array $payload */ private function writeJob(string $jobId, array $payload): void { $directory = $this->jobDirectory(); if (!is_dir($directory) && !mkdir($directory, 0775, true) && !is_dir($directory)) { throw new \RuntimeException('Stream job directory could not be created.'); } $json = json_encode($payload, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); if (file_put_contents($this->jobPath($jobId), $json, LOCK_EX) === false) { throw new \RuntimeException('Stream job could not be written.'); } } /** * @return array{ok: bool, reason?: string, status?: string, message?: string, job?: array} */ private function claimJob(string $jobId): array { if (!preg_match('/\A[a-f0-9]{48}\z/', $jobId)) { return ['ok' => false, 'reason' => 'invalid']; } return $this->mutateJobWithLock($jobId, function (?array $data): array { if ($data === null) { return [ 'persist' => false, 'result' => ['ok' => false, 'reason' => 'missing'], ]; } $createdAt = (int) ($data['createdAt'] ?? 0); if ($createdAt <= 0 || time() - $createdAt > self::JOB_TTL_SECONDS) { return [ 'delete' => true, 'result' => ['ok' => false, 'reason' => 'expired'], ]; } $status = (string) ($data['status'] ?? self::JOB_STATUS_PENDING); if ($status !== self::JOB_STATUS_PENDING) { return [ 'persist' => false, 'result' => [ 'ok' => false, 'reason' => 'not_pending', 'status' => $status, 'message' => is_string($data['message'] ?? null) ? (string) $data['message'] : null, ], ]; } $data['status'] = self::JOB_STATUS_RUNNING; $data['startedAt'] = time(); $data['updatedAt'] = time(); return [ 'data' => $data, 'result' => ['ok' => true, 'job' => $data], ]; }); } private function markJobStatus(?string $jobId, string $status, ?string $message = null): void { if ($jobId === null || !preg_match('/\A[a-f0-9]{48}\z/', $jobId)) { return; } try { $this->mutateJobWithLock($jobId, function (?array $data) use ($status, $message): array { if ($data === null) { return [ 'persist' => false, 'result' => ['ok' => false], ]; } $data['status'] = $status; $data['updatedAt'] = time(); if ($status === self::JOB_STATUS_COMPLETED) { $data['completedAt'] = time(); unset($data['message']); } elseif ($message !== null && trim($message) !== '') { $data['message'] = trim(preg_replace('/\s+/u', ' ', $message) ?? $message); } return [ 'data' => $data, 'result' => ['ok' => true], ]; }); } catch (\Throwable) { // Job status persistence must never break the already-running stream. } } /** * @param callable(array|null): array $callback * * @return array */ private function mutateJobWithLock(string $jobId, callable $callback): array { $path = $this->jobPath($jobId); if (!is_file($path)) { return ['ok' => false, 'reason' => 'missing']; } $handle = @fopen($path, 'c+'); if ($handle === false) { return ['ok' => false, 'reason' => 'missing']; } try { if (!flock($handle, LOCK_EX)) { return ['ok' => false, 'reason' => 'lock_failed']; } clearstatcache(true, $path); $size = is_file($path) ? (int) filesize($path) : 0; rewind($handle); $content = $size > 0 ? stream_get_contents($handle) : ''; $data = null; if (is_string($content) && trim($content) !== '') { $decoded = json_decode($content, true); $data = is_array($decoded) ? $decoded : null; } $mutation = $callback($data); $result = is_array($mutation['result'] ?? null) ? $mutation['result'] : []; if (($mutation['delete'] ?? false) === true) { ftruncate($handle, 0); fflush($handle); flock($handle, LOCK_UN); fclose($handle); @unlink($path); return $result; } if (array_key_exists('data', $mutation) && is_array($mutation['data'])) { $json = json_encode($mutation['data'], JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); rewind($handle); ftruncate($handle, 0); fwrite($handle, $json); fflush($handle); } flock($handle, LOCK_UN); return $result; } catch (\Throwable $e) { @flock($handle, LOCK_UN); throw $e; } finally { if (is_resource($handle)) { @fclose($handle); } } } private function resolveLastEventId(Request $request): int { $header = trim((string) $request->headers->get('Last-Event-ID', '')); if ($header === '' || !ctype_digit($header)) { return 0; } return max(0, (int) $header); } /** * @param array $claim */ private function canReplayOrTailClaimedJob(array $claim): bool { if (($claim['reason'] ?? null) !== 'not_pending') { return false; } return in_array( (string) ($claim['status'] ?? ''), [ self::JOB_STATUS_RUNNING, self::JOB_STATUS_COMPLETED, self::JOB_STATUS_INTERRUPTED, self::JOB_STATUS_FAILED, ], true ); } private function streamStoredJobResponse(string $jobId, int $lastEventId): void { $afterEventId = max(0, $lastEventId); $lastKeepaliveAt = 0; while (true) { if (connection_aborted() === 1) { return; } foreach ($this->readJobOutputAfter($jobId, $afterEventId) as $event) { $eventId = (int) ($event['id'] ?? 0); $data = is_string($event['data'] ?? null) ? (string) $event['data'] : ''; if ($eventId <= $afterEventId || $data === '') { continue; } $this->sendData($data, $eventId); $afterEventId = $eventId; } $job = $this->readJob($jobId); $status = is_array($job) ? (string) ($job['status'] ?? '') : ''; $message = is_array($job) && is_string($job['message'] ?? null) ? trim((string) $job['message']) : ''; if ($status === self::JOB_STATUS_COMPLETED) { $this->sendComment('replayed-completed-stream'); $this->sendEvent('done', '[DONE]'); return; } if ($status === self::JOB_STATUS_FAILED) { $this->sendEvent( 'error', $message !== '' ? 'Der Antwort-Stream ist fehlgeschlagen: ' . $message : 'Der Antwort-Stream ist fehlgeschlagen. Bitte sende die Anfrage erneut.' ); $this->sendEvent('done', '[DONE]'); return; } if ($status === self::JOB_STATUS_INTERRUPTED) { $this->sendEvent( 'error', $message !== '' ? $message : 'Der Antwort-Stream wurde durch einen Verbindungsabbruch unterbrochen. Bitte sende die Anfrage erneut, falls die Antwort unvollständig ist.' ); $this->sendEvent('done', '[DONE]'); return; } if ($status !== self::JOB_STATUS_RUNNING) { $this->sendEvent('error', $this->jobClaimErrorMessage([ 'reason' => 'not_pending', 'status' => $status, 'message' => $message, ])); $this->sendEvent('done', '[DONE]'); return; } if (time() - $lastKeepaliveAt >= 10) { $this->sendComment('waiting-for-running-stream'); $lastKeepaliveAt = time(); } usleep(250000); } } /** * @return list */ private function readJobOutputAfter(string $jobId, int $afterEventId): array { $path = $this->jobOutputPath($jobId); if (!is_file($path)) { return []; } $lines = @file($path, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES); if (!is_array($lines)) { return []; } $events = []; foreach ($lines as $line) { if (!is_string($line) || trim($line) === '') { continue; } $decoded = json_decode($line, true); if (!is_array($decoded)) { continue; } $id = (int) ($decoded['id'] ?? 0); $data = is_string($decoded['data'] ?? null) ? (string) $decoded['data'] : ''; if ($id > $afterEventId && $data !== '') { $events[] = ['id' => $id, 'data' => $data]; } } return $events; } private function appendJobOutput(?string $jobId, string $data): ?int { if ($jobId === null || $data === '' || !preg_match('/\A[a-f0-9]{48}\z/', $jobId)) { return null; } $eventId = null; try { $this->mutateJobWithLock($jobId, function (?array $job) use (&$eventId): array { if ($job === null) { return [ 'persist' => false, 'result' => ['ok' => false], ]; } $eventId = max(0, (int) ($job['lastEventId'] ?? 0)) + 1; $job['lastEventId'] = $eventId; $job['updatedAt'] = time(); return [ 'data' => $job, 'result' => ['ok' => true], ]; }); if ($eventId === null || $eventId <= 0) { return null; } $line = json_encode( ['id' => $eventId, 'data' => $data], JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES ) . "\n"; if (file_put_contents($this->jobOutputPath($jobId), $line, FILE_APPEND | LOCK_EX) === false) { return null; } } catch (\Throwable) { return null; } return $eventId; } /** * @return array|null */ private function readJob(string $jobId): ?array { if (!preg_match('/\A[a-f0-9]{48}\z/', $jobId)) { return null; } $path = $this->jobPath($jobId); if (!is_file($path)) { return null; } $handle = @fopen($path, 'r'); if ($handle === false) { return null; } try { if (!flock($handle, LOCK_SH)) { return null; } $content = stream_get_contents($handle); flock($handle, LOCK_UN); } finally { fclose($handle); } if (!is_string($content) || trim($content) === '') { return null; } $decoded = json_decode($content, true); return is_array($decoded) ? $decoded : null; } /** * @param array $claim */ private function jobClaimErrorMessage(array $claim): string { $reason = (string) ($claim['reason'] ?? 'missing'); $status = (string) ($claim['status'] ?? ''); $message = is_string($claim['message'] ?? null) ? trim((string) $claim['message']) : ''; if ($reason === 'expired') { return 'Der Antwort-Job ist abgelaufen. Bitte sende die Anfrage erneut.'; } if ($reason === 'invalid') { return 'Der Antwort-Job ist ungültig. Bitte sende die Anfrage erneut.'; } if ($reason === 'lock_failed') { return 'Der Antwort-Job ist gerade gesperrt. Bitte sende die Anfrage erneut, falls keine Antwort erscheint.'; } if ($reason === 'not_pending') { if ($status === self::JOB_STATUS_RUNNING) { return 'Der Antwort-Stream läuft bereits oder wurde nach einem Verbindungsabbruch erneut geöffnet. Bitte sende die Anfrage erneut, falls die Antwort unvollständig ist.'; } if ($status === self::JOB_STATUS_INTERRUPTED) { return 'Der Antwort-Stream wurde durch einen Verbindungsabbruch unterbrochen. Bitte sende die Anfrage erneut, falls die Antwort unvollständig ist.'; } if ($status === self::JOB_STATUS_COMPLETED) { return 'Der Antwort-Stream wurde bereits abgeschlossen. Bitte sende eine neue Anfrage, wenn du eine weitere Antwort brauchst.'; } if ($status === self::JOB_STATUS_FAILED) { return $message !== '' ? 'Der Antwort-Stream ist fehlgeschlagen: ' . $message : 'Der Antwort-Stream ist fehlgeschlagen. Bitte sende die Anfrage erneut.'; } } return 'Der Antwort-Job wurde nicht gefunden. Falls deine Verbindung kurz unterbrochen war, sende die Anfrage bitte erneut.'; } private function cleanupExpiredJobs(): void { $directory = $this->jobDirectory(); if (!is_dir($directory)) { return; } $threshold = time() - self::JOB_TTL_SECONDS; foreach (glob($directory . '/*.json') ?: [] as $path) { if (!is_file($path)) { continue; } $mtime = filemtime($path); if ($mtime === false || $mtime < $threshold) { $base = preg_replace('/\.json\z/', '', $path) ?? $path; @unlink($path); @unlink($base . '.stream.ndjson'); } } } private function jobPath(string $jobId): string { return $this->jobDirectory() . '/' . $jobId . '.json'; } private function jobOutputPath(string $jobId): string { return $this->jobDirectory() . '/' . $jobId . '.stream.ndjson'; } private function jobDirectory(): string { return rtrim($this->projectDir, '/\\') . '/var/stream_jobs'; } }