Files
MtoRagSystem/src/Controller/AskSseController.php

381 lines
11 KiB
PHP

<?php
declare(strict_types=1);
namespace App\Controller;
use App\Agent\AgentRunner;
use App\Http\ClientIdResolver;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpFoundation\StreamedResponse;
use Symfony\Component\Routing\Annotation\Route;
final readonly class AskSseController
{
private const JOB_TTL_SECONDS = 900;
public function __construct(
private AgentRunner $agentRunner,
private ClientIdResolver $clientIdResolver,
private string $projectDir,
) {
}
#[Route('/ask-jobs', name: 'ask_job_start', methods: ['POST'])]
public function startJob(Request $request): JsonResponse
{
$data = json_decode($request->getContent(), true);
$prompt = trim((string) ($data['prompt'] ?? ''));
if ($prompt === '') {
return new JsonResponse(['error' => 'Empty prompt'], Response::HTTP_BAD_REQUEST);
}
/**
* Default:
* - Browser chat uses budgeted recent context
* - Full context must be explicitly requested
*/
$includeFullContext = filter_var(
$data['fullContext'] ?? false,
FILTER_VALIDATE_BOOL
);
$cookieResponse = new Response();
$clientId = $this->clientIdResolver->resolve($request, $cookieResponse);
try {
$this->cleanupExpiredJobs();
$jobId = bin2hex(random_bytes(24));
$this->writeJob($jobId, [
'prompt' => $prompt,
'clientId' => $clientId,
'includeFullContext' => $includeFullContext,
'createdAt' => time(),
]);
} catch (\Throwable $e) {
return new JsonResponse(
['error' => 'Stream job could not be created: ' . $this->formatThrowableForClient($e)],
Response::HTTP_INTERNAL_SERVER_ERROR
);
}
$response = new JsonResponse(['jobId' => $jobId]);
foreach ($cookieResponse->headers->getCookies() as $cookie) {
$response->headers->setCookie($cookie);
}
return $response;
}
#[Route('/ask-sse/{jobId}', name: 'ask_sse_job', methods: ['GET'], requirements: ['jobId' => '[a-f0-9]{48}'])]
public function streamJob(string $jobId): StreamedResponse
{
return new StreamedResponse(
function () use ($jobId): void {
$job = $this->readJob($jobId);
if ($job === null) {
$this->prepareStreamRuntime();
echo "retry: 15000\n\n";
$this->sendEvent('error', 'Der Antwort-Job ist abgelaufen oder wurde nicht gefunden. Bitte sende die Anfrage erneut.');
$this->sendEvent('done', '[DONE]');
return;
}
$this->deleteJob($jobId);
$this->streamAgentResponse(
prompt: (string) ($job['prompt'] ?? ''),
clientId: (string) ($job['clientId'] ?? ''),
includeFullContext: (bool) ($job['includeFullContext'] ?? false),
cookieResponse: null
);
},
Response::HTTP_OK,
$this->streamHeaders()
);
}
#[Route('/ask-sse', name: 'ask_sse', methods: ['POST'])]
public function stream(Request $request): StreamedResponse
{
$data = json_decode($request->getContent(), true);
$prompt = trim((string) ($data['prompt'] ?? ''));
/**
* Default:
* - Browser chat uses budgeted recent context
* - Full context must be explicitly requested
*/
$includeFullContext = filter_var(
$data['fullContext'] ?? false,
FILTER_VALIDATE_BOOL
);
$cookieResponse = new Response();
$clientId = $this->clientIdResolver->resolve($request, $cookieResponse);
return new StreamedResponse(
function () use ($prompt, $clientId, $cookieResponse, $includeFullContext): void {
$this->streamAgentResponse(
prompt: $prompt,
clientId: $clientId,
includeFullContext: $includeFullContext,
cookieResponse: $cookieResponse
);
},
Response::HTTP_OK,
$this->streamHeaders()
);
}
private function streamAgentResponse(
string $prompt,
string $clientId,
bool $includeFullContext,
?Response $cookieResponse
): void {
$this->prepareStreamRuntime();
$this->registerStreamShutdownErrorHandler();
if ($cookieResponse !== null) {
foreach ($cookieResponse->headers->getCookies() as $cookie) {
header('Set-Cookie: ' . $cookie, false);
}
}
echo "retry: 15000\n\n";
$this->sendComment('stream-open');
if ($prompt === '') {
$this->sendEvent('error', 'Empty prompt');
$this->sendEvent('done', '[DONE]');
return;
}
try {
foreach ($this->agentRunner->run($prompt, $clientId, $includeFullContext) as $chunk) {
if (connection_aborted() === 1) {
return;
}
$chunk = str_replace(["\r\n", "\r"], "\n", $chunk);
$this->sendData($chunk);
}
} catch (\Throwable $e) {
$this->sendEvent(
'error',
'❌ Stream abgebrochen: ' . $this->formatThrowableForClient($e)
);
}
$this->sendEvent('done', '[DONE]');
}
private function registerStreamShutdownErrorHandler(): void
{
register_shutdown_function(function (): void {
$error = error_get_last();
if ($error === null) {
return;
}
$fatalTypes = [E_ERROR, E_PARSE, E_CORE_ERROR, E_COMPILE_ERROR, E_USER_ERROR];
if (!in_array((int) ($error['type'] ?? 0), $fatalTypes, true)) {
return;
}
$message = sprintf(
'❌ Fataler Serverfehler: %s in %s:%s',
(string) ($error['message'] ?? 'unknown error'),
(string) ($error['file'] ?? 'unknown file'),
(string) ($error['line'] ?? '?')
);
$this->sendEvent('error', $message);
$this->sendEvent('done', '[DONE]');
});
}
private function formatThrowableForClient(\Throwable $e): string
{
$message = trim($e->getMessage());
if ($message === '') {
$message = $e::class;
}
return preg_replace('/\s+/u', ' ', $message) ?? $message;
}
private function prepareStreamRuntime(): void
{
@set_time_limit(0);
@ini_set('output_buffering', 'off');
@ini_set('zlib.output_compression', '0');
while (ob_get_level() > 0) {
ob_end_flush();
}
}
/**
* @return array<string, string>
*/
private function streamHeaders(): array
{
return [
'Content-Type' => 'text/event-stream; charset=utf-8',
'Cache-Control' => 'no-cache, no-store, must-revalidate, no-transform',
'Connection' => 'keep-alive',
'X-Accel-Buffering' => 'no',
'X-Content-Type-Options' => 'nosniff',
];
}
private function sendData(string $data): void
{
if ($data === '') {
$this->sendComment('keepalive');
return;
}
$lines = explode("\n", $data);
foreach ($lines as $line) {
echo 'data: ' . $line . "\n";
}
echo "\n";
$this->flushOutput();
}
private function sendEvent(string $event, string $data): void
{
$safe = str_replace(["\r", "\n"], ' ', $data);
echo "event: {$event}\n";
echo "data: {$safe}\n\n";
$this->flushOutput();
}
private function sendComment(string $comment): void
{
$safe = str_replace(["\r", "\n"], ' ', $comment);
echo ': ' . $safe . "\n\n";
$this->flushOutput();
}
private function flushOutput(): void
{
if (function_exists('ob_flush')) {
@ob_flush();
}
@flush();
}
/**
* @param array<string, mixed> $payload
*/
private function writeJob(string $jobId, array $payload): void
{
$directory = $this->jobDirectory();
if (!is_dir($directory) && !mkdir($directory, 0775, true) && !is_dir($directory)) {
throw new \RuntimeException('Stream job directory could not be created.');
}
$json = json_encode($payload, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
if (file_put_contents($this->jobPath($jobId), $json, LOCK_EX) === false) {
throw new \RuntimeException('Stream job could not be written.');
}
}
/**
* @return array<string, mixed>|null
*/
private function readJob(string $jobId): ?array
{
if (!preg_match('/\A[a-f0-9]{48}\z/', $jobId)) {
return null;
}
$path = $this->jobPath($jobId);
if (!is_file($path)) {
return null;
}
$content = file_get_contents($path);
if (!is_string($content) || $content === '') {
return null;
}
$data = json_decode($content, true);
if (!is_array($data)) {
return null;
}
$createdAt = (int) ($data['createdAt'] ?? 0);
if ($createdAt <= 0 || time() - $createdAt > self::JOB_TTL_SECONDS) {
$this->deleteJob($jobId);
return null;
}
return $data;
}
private function deleteJob(string $jobId): void
{
$path = $this->jobPath($jobId);
if (is_file($path)) {
@unlink($path);
}
}
private function cleanupExpiredJobs(): void
{
$directory = $this->jobDirectory();
if (!is_dir($directory)) {
return;
}
$threshold = time() - self::JOB_TTL_SECONDS;
foreach (glob($directory . '/*.json') ?: [] as $path) {
if (!is_file($path)) {
continue;
}
$mtime = filemtime($path);
if ($mtime === false || $mtime < $threshold) {
@unlink($path);
}
}
}
private function jobPath(string $jobId): string
{
return $this->jobDirectory() . '/' . $jobId . '.json';
}
private function jobDirectory(): string
{
return rtrim($this->projectDir, '/\\') . '/var/stream_jobs';
}
}