951 lines
30 KiB
PHP
951 lines
30 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
namespace App\Controller;
|
|
|
|
use App\Agent\AgentRunner;
|
|
use App\Context\ContextService;
|
|
use App\Http\ClientIdResolver;
|
|
use Symfony\Component\HttpFoundation\JsonResponse;
|
|
use Symfony\Component\HttpFoundation\Request;
|
|
use Symfony\Component\HttpFoundation\Response;
|
|
use Symfony\Component\HttpFoundation\StreamedResponse;
|
|
use Symfony\Component\Routing\Annotation\Route;
|
|
use Throwable;
|
|
|
|
final readonly class AskSseController
|
|
{
|
|
private const JOB_TTL_SECONDS = 900;
|
|
private const JOB_RUNNING_STALE_SECONDS = 120;
|
|
|
|
private const JOB_STATUS_PENDING = 'pending';
|
|
private const JOB_STATUS_RUNNING = 'running';
|
|
private const JOB_STATUS_COMPLETED = 'completed';
|
|
private const JOB_STATUS_INTERRUPTED = 'interrupted';
|
|
private const JOB_STATUS_FAILED = 'failed';
|
|
|
|
public function __construct(
|
|
private AgentRunner $agentRunner,
|
|
private ClientIdResolver $clientIdResolver,
|
|
private ContextService $contextService,
|
|
private string $projectDir,
|
|
) {
|
|
}
|
|
|
|
#[Route('/ask-jobs', name: 'ask_job_start', methods: ['POST'])]
|
|
public function startJob(Request $request): JsonResponse
|
|
{
|
|
$data = json_decode($request->getContent(), true);
|
|
$prompt = trim((string) ($data['prompt'] ?? ''));
|
|
|
|
if ($prompt === '') {
|
|
return new JsonResponse(['error' => 'Empty prompt'], Response::HTTP_BAD_REQUEST);
|
|
}
|
|
|
|
/**
|
|
* Default:
|
|
* - Browser chat uses budgeted recent context
|
|
* - Full context must be explicitly requested
|
|
*/
|
|
$includeFullContext = filter_var(
|
|
$data['fullContext'] ?? false,
|
|
FILTER_VALIDATE_BOOL
|
|
);
|
|
|
|
$requestContextHint = $this->sanitizeRequestContextHint((string) ($data['contextHint'] ?? ''));
|
|
|
|
$cookieResponse = new Response();
|
|
$clientId = $this->clientIdResolver->resolve($request, $cookieResponse);
|
|
|
|
try {
|
|
$this->cleanupExpiredJobs();
|
|
$jobId = bin2hex(random_bytes(24));
|
|
$now = time();
|
|
$this->writeJob($jobId, [
|
|
'status' => self::JOB_STATUS_PENDING,
|
|
'prompt' => $prompt,
|
|
'clientId' => $clientId,
|
|
'includeFullContext' => $includeFullContext,
|
|
'requestContextHint' => $requestContextHint,
|
|
'createdAt' => $now,
|
|
'updatedAt' => $now,
|
|
]);
|
|
} catch (Throwable $e) {
|
|
return new JsonResponse(
|
|
['error' => 'Stream job could not be created: ' . $this->formatThrowableForClient($e)],
|
|
Response::HTTP_INTERNAL_SERVER_ERROR
|
|
);
|
|
}
|
|
|
|
$response = new JsonResponse(['jobId' => $jobId]);
|
|
|
|
foreach ($cookieResponse->headers->getCookies() as $cookie) {
|
|
$response->headers->setCookie($cookie);
|
|
}
|
|
|
|
return $response;
|
|
}
|
|
|
|
#[Route('/ask-jobs/{jobId}', name: 'ask_job_status', requirements: ['jobId' => '[a-f0-9]{48}'], methods: ['GET'])]
|
|
public function jobStatus(string $jobId): JsonResponse
|
|
{
|
|
$job = $this->readJob($jobId);
|
|
|
|
if (!is_array($job)) {
|
|
return new JsonResponse([
|
|
'status' => 'missing',
|
|
'message' => 'Der Antwort-Job wurde nicht gefunden.',
|
|
'lastEventId' => 0,
|
|
], Response::HTTP_NOT_FOUND);
|
|
}
|
|
|
|
$job = $this->markRunningJobFailedIfStale($jobId, $job) ?? $job;
|
|
$now = time();
|
|
|
|
return new JsonResponse([
|
|
'status' => (string) ($job['status'] ?? ''),
|
|
'message' => is_string($job['message'] ?? null) ? (string) $job['message'] : '',
|
|
'lastEventId' => max(0, (int) ($job['lastEventId'] ?? 0)),
|
|
'updatedAt' => max(0, (int) ($job['updatedAt'] ?? 0)),
|
|
'startedAt' => max(0, (int) ($job['startedAt'] ?? 0)),
|
|
'completedAt' => max(0, (int) ($job['completedAt'] ?? 0)),
|
|
'serverTime' => $now,
|
|
'runningStaleAfterSeconds' => self::JOB_RUNNING_STALE_SECONDS,
|
|
]);
|
|
}
|
|
|
|
#[Route('/ask-sse/{jobId}', name: 'ask_sse_job', methods: ['GET'], requirements: ['jobId' => '[a-f0-9]{48}'])]
|
|
public function streamJob(Request $request, string $jobId): StreamedResponse
|
|
{
|
|
$lastEventId = $this->resolveLastEventId($request);
|
|
|
|
return new StreamedResponse(
|
|
function () use ($jobId, $lastEventId): void {
|
|
$claimed = $this->claimJob($jobId);
|
|
|
|
if (($claimed['ok'] ?? false) !== true) {
|
|
$this->prepareStreamRuntime();
|
|
echo "retry: 3000\n\n";
|
|
|
|
if ($this->canReplayOrTailClaimedJob($claimed)) {
|
|
$this->streamStoredJobResponse($jobId, $lastEventId);
|
|
return;
|
|
}
|
|
|
|
$this->sendEvent('error', $this->jobClaimErrorMessage($claimed));
|
|
$this->sendDoneEvent();
|
|
return;
|
|
}
|
|
|
|
/** @var array<string, mixed> $job */
|
|
$job = $claimed['job'];
|
|
|
|
$this->streamAgentResponse(
|
|
prompt: (string) ($job['prompt'] ?? ''),
|
|
clientId: (string) ($job['clientId'] ?? ''),
|
|
includeFullContext: (bool) ($job['includeFullContext'] ?? false),
|
|
cookieResponse: null,
|
|
jobId: $jobId,
|
|
requestContextHint: is_string($job['requestContextHint'] ?? null) ? (string) $job['requestContextHint'] : ''
|
|
);
|
|
},
|
|
Response::HTTP_OK,
|
|
$this->streamHeaders()
|
|
);
|
|
}
|
|
|
|
#[Route('/ask-sse', name: 'ask_sse', methods: ['POST'])]
|
|
public function stream(Request $request): StreamedResponse
|
|
{
|
|
$data = json_decode($request->getContent(), true);
|
|
$prompt = trim((string) ($data['prompt'] ?? ''));
|
|
|
|
/**
|
|
* Default:
|
|
* - Browser chat uses budgeted recent context
|
|
* - Full context must be explicitly requested
|
|
*/
|
|
$includeFullContext = filter_var(
|
|
$data['fullContext'] ?? false,
|
|
FILTER_VALIDATE_BOOL
|
|
);
|
|
|
|
$requestContextHint = $this->sanitizeRequestContextHint((string) ($data['contextHint'] ?? ''));
|
|
|
|
$cookieResponse = new Response();
|
|
$clientId = $this->clientIdResolver->resolve($request, $cookieResponse);
|
|
|
|
return new StreamedResponse(
|
|
function () use ($prompt, $clientId, $cookieResponse, $includeFullContext, $requestContextHint): void {
|
|
$this->streamAgentResponse(
|
|
prompt: $prompt,
|
|
clientId: $clientId,
|
|
includeFullContext: $includeFullContext,
|
|
cookieResponse: $cookieResponse,
|
|
jobId: null,
|
|
requestContextHint: $requestContextHint
|
|
);
|
|
},
|
|
Response::HTTP_OK,
|
|
$this->streamHeaders()
|
|
);
|
|
}
|
|
|
|
private function streamAgentResponse(
|
|
string $prompt,
|
|
string $clientId,
|
|
bool $includeFullContext,
|
|
?Response $cookieResponse,
|
|
?string $jobId = null,
|
|
string $requestContextHint = ''
|
|
): void {
|
|
$this->prepareStreamRuntime();
|
|
$this->registerStreamShutdownErrorHandler($jobId);
|
|
|
|
if ($cookieResponse !== null) {
|
|
foreach ($cookieResponse->headers->getCookies() as $cookie) {
|
|
header('Set-Cookie: ' . $cookie, false);
|
|
}
|
|
}
|
|
|
|
echo "retry: 3000\n\n";
|
|
$this->sendComment('stream-open');
|
|
|
|
if ($prompt === '') {
|
|
$this->markJobStatus($jobId, self::JOB_STATUS_FAILED, 'Empty prompt');
|
|
$this->sendEvent('error', 'Empty prompt');
|
|
$this->sendDoneEvent();
|
|
return;
|
|
}
|
|
|
|
try {
|
|
foreach ($this->agentRunner->run($prompt, $clientId, $includeFullContext, $requestContextHint) as $chunk) {
|
|
if (connection_aborted() === 1) {
|
|
$this->markJobStatus(
|
|
$jobId,
|
|
self::JOB_STATUS_INTERRUPTED,
|
|
'Die Verbindung zum Antwort-Stream wurde unterbrochen.'
|
|
);
|
|
return;
|
|
}
|
|
|
|
$chunk = str_replace(["\r\n", "\r"], "\n", $chunk);
|
|
$eventId = $this->appendJobOutput($jobId, $chunk);
|
|
$this->sendData($chunk, $eventId);
|
|
}
|
|
} catch (Throwable $e) {
|
|
$message = 'Stream abgebrochen: ' . $this->formatThrowableForClient($e);
|
|
$this->markJobStatus($jobId, self::JOB_STATUS_FAILED, $message);
|
|
$this->sendEvent(
|
|
'error',
|
|
'❌ ' . $message
|
|
);
|
|
|
|
if ($prompt !== '' && $clientId !== '') {
|
|
$this->appendHistoryFailure($clientId, $prompt, $message);
|
|
}
|
|
|
|
$this->sendDoneEvent();
|
|
return;
|
|
}
|
|
|
|
$this->markJobStatus($jobId, self::JOB_STATUS_COMPLETED);
|
|
$this->sendDoneEvent();
|
|
}
|
|
|
|
private function appendHistoryFailure(string $clientId, string $prompt, string $message): void
|
|
{
|
|
try {
|
|
$message = trim(preg_replace('/\s+/u', ' ', $message) ?? $message);
|
|
|
|
if ($message === '') {
|
|
$message = 'Unbekannter Streamfehler.';
|
|
}
|
|
|
|
$this->contextService->appendHistory(
|
|
$clientId,
|
|
$prompt,
|
|
'Systemhinweis: Antwort konnte nicht abgeschlossen werden. Ursache: ' . $message
|
|
);
|
|
} catch (Throwable) {
|
|
// History persistence must never break the SSE error response.
|
|
}
|
|
}
|
|
|
|
private function registerStreamShutdownErrorHandler(?string $jobId = null): void
|
|
{
|
|
register_shutdown_function(function () use ($jobId): void {
|
|
$error = error_get_last();
|
|
|
|
if ($error === null) {
|
|
return;
|
|
}
|
|
|
|
$fatalTypes = [E_ERROR, E_PARSE, E_CORE_ERROR, E_COMPILE_ERROR, E_USER_ERROR];
|
|
|
|
if (!in_array((int) ($error['type'] ?? 0), $fatalTypes, true)) {
|
|
return;
|
|
}
|
|
|
|
$message = sprintf(
|
|
'❌ Fataler Serverfehler: %s in %s:%s',
|
|
(string) ($error['message'] ?? 'unknown error'),
|
|
(string) ($error['file'] ?? 'unknown file'),
|
|
(string) ($error['line'] ?? '?')
|
|
);
|
|
|
|
$this->markJobStatus($jobId, self::JOB_STATUS_FAILED, $message);
|
|
$this->sendEvent('error', $message);
|
|
$this->sendDoneEvent();
|
|
});
|
|
}
|
|
|
|
private function sanitizeRequestContextHint(string $contextHint): string
|
|
{
|
|
$contextHint = str_replace(["\r\n", "\r"], "\n", $contextHint);
|
|
$contextHint = preg_replace('/[\t ]+/u', ' ', $contextHint) ?? $contextHint;
|
|
$contextHint = preg_replace('/\n{3,}/u', "\n\n", $contextHint) ?? $contextHint;
|
|
$contextHint = trim($contextHint);
|
|
|
|
if ($contextHint === '') {
|
|
return '';
|
|
}
|
|
|
|
if (mb_strlen($contextHint, 'UTF-8') > 4000) {
|
|
$contextHint = mb_substr($contextHint, 0, 4000, 'UTF-8');
|
|
}
|
|
|
|
return trim($contextHint);
|
|
}
|
|
|
|
private function formatThrowableForClient(Throwable $e): string
|
|
{
|
|
$message = trim($e->getMessage());
|
|
|
|
if ($message === '') {
|
|
$message = $e::class;
|
|
}
|
|
|
|
return preg_replace('/\s+/u', ' ', $message) ?? $message;
|
|
}
|
|
|
|
private function prepareStreamRuntime(): void
|
|
{
|
|
@set_time_limit(0);
|
|
@ini_set('output_buffering', 'off');
|
|
@ini_set('zlib.output_compression', '0');
|
|
|
|
while (ob_get_level() > 0) {
|
|
ob_end_flush();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @return array<string, string>
|
|
*/
|
|
private function streamHeaders(): array
|
|
{
|
|
return [
|
|
'Content-Type' => 'text/event-stream; charset=utf-8',
|
|
'Cache-Control' => 'no-cache, no-store, must-revalidate, no-transform',
|
|
'Connection' => 'keep-alive',
|
|
'X-Accel-Buffering' => 'no',
|
|
'X-Content-Type-Options' => 'nosniff',
|
|
];
|
|
}
|
|
|
|
private function sendData(string $data, ?int $eventId = null): void
|
|
{
|
|
if ($data === '') {
|
|
$this->sendComment('keepalive');
|
|
return;
|
|
}
|
|
|
|
if ($eventId !== null && $eventId > 0) {
|
|
echo 'id: ' . $eventId . "\n";
|
|
}
|
|
|
|
$lines = explode("\n", $data);
|
|
|
|
foreach ($lines as $line) {
|
|
echo 'data: ' . $line . "\n";
|
|
}
|
|
|
|
echo "\n";
|
|
$this->flushOutput();
|
|
}
|
|
|
|
private function sendEvent(string $event, string $data): void
|
|
{
|
|
$safe = str_replace(["\r", "\n"], ' ', $data);
|
|
|
|
echo "event: {$event}\n";
|
|
echo "data: {$safe}\n\n";
|
|
$this->flushOutput();
|
|
}
|
|
|
|
private function sendDoneEvent(): void
|
|
{
|
|
$this->sendEvent('done', '[DONE]');
|
|
|
|
if (function_exists('fastcgi_finish_request')) {
|
|
@fastcgi_finish_request();
|
|
}
|
|
}
|
|
|
|
private function sendComment(string $comment): void
|
|
{
|
|
$safe = str_replace(["\r", "\n"], ' ', $comment);
|
|
|
|
echo ': ' . $safe . "\n\n";
|
|
$this->flushOutput();
|
|
}
|
|
|
|
private function flushOutput(): void
|
|
{
|
|
if (function_exists('ob_flush')) {
|
|
@ob_flush();
|
|
}
|
|
|
|
@flush();
|
|
}
|
|
|
|
/**
|
|
* @param array<string, mixed> $payload
|
|
* @throws \JsonException
|
|
*/
|
|
private function writeJob(string $jobId, array $payload): void
|
|
{
|
|
$directory = $this->jobDirectory();
|
|
|
|
if (!is_dir($directory) && !mkdir($directory, 0775, true) && !is_dir($directory)) {
|
|
throw new \RuntimeException('Stream job directory could not be created.');
|
|
}
|
|
|
|
$json = json_encode($payload, JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
|
|
|
|
if (file_put_contents($this->jobPath($jobId), $json, LOCK_EX) === false) {
|
|
throw new \RuntimeException('Stream job could not be written.');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @return array{ok: bool, reason?: string, status?: string, message?: string, job?: array<string, mixed>}
|
|
*/
|
|
private function claimJob(string $jobId): array
|
|
{
|
|
if (!preg_match('/\A[a-f0-9]{48}\z/', $jobId)) {
|
|
return ['ok' => false, 'reason' => 'invalid'];
|
|
}
|
|
|
|
return $this->mutateJobWithLock($jobId, function (?array $data): array {
|
|
if ($data === null) {
|
|
return [
|
|
'persist' => false,
|
|
'result' => ['ok' => false, 'reason' => 'missing'],
|
|
];
|
|
}
|
|
|
|
$createdAt = (int) ($data['createdAt'] ?? 0);
|
|
|
|
if ($createdAt <= 0 || time() - $createdAt > self::JOB_TTL_SECONDS) {
|
|
return [
|
|
'delete' => true,
|
|
'result' => ['ok' => false, 'reason' => 'expired'],
|
|
];
|
|
}
|
|
|
|
$status = (string) ($data['status'] ?? self::JOB_STATUS_PENDING);
|
|
|
|
if ($status !== self::JOB_STATUS_PENDING) {
|
|
return [
|
|
'persist' => false,
|
|
'result' => [
|
|
'ok' => false,
|
|
'reason' => 'not_pending',
|
|
'status' => $status,
|
|
'message' => is_string($data['message'] ?? null) ? (string) $data['message'] : null,
|
|
],
|
|
];
|
|
}
|
|
|
|
$data['status'] = self::JOB_STATUS_RUNNING;
|
|
$data['startedAt'] = time();
|
|
$data['updatedAt'] = time();
|
|
|
|
return [
|
|
'data' => $data,
|
|
'result' => ['ok' => true, 'job' => $data],
|
|
];
|
|
});
|
|
}
|
|
|
|
private function markJobStatus(?string $jobId, string $status, ?string $message = null): void
|
|
{
|
|
if ($jobId === null || !preg_match('/\A[a-f0-9]{48}\z/', $jobId)) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
$this->mutateJobWithLock($jobId, function (?array $data) use ($status, $message): array {
|
|
if ($data === null) {
|
|
return [
|
|
'persist' => false,
|
|
'result' => ['ok' => false],
|
|
];
|
|
}
|
|
|
|
$currentStatus = (string) ($data['status'] ?? '');
|
|
|
|
if (in_array($currentStatus, [self::JOB_STATUS_COMPLETED, self::JOB_STATUS_FAILED, self::JOB_STATUS_INTERRUPTED], true) && $currentStatus !== $status) {
|
|
return [
|
|
'persist' => false,
|
|
'result' => ['ok' => false, 'reason' => 'terminal_status'],
|
|
];
|
|
}
|
|
|
|
$data['status'] = $status;
|
|
$data['updatedAt'] = time();
|
|
|
|
if ($status === self::JOB_STATUS_COMPLETED) {
|
|
$data['completedAt'] = time();
|
|
unset($data['message']);
|
|
} elseif ($message !== null && trim($message) !== '') {
|
|
$data['message'] = trim(preg_replace('/\s+/u', ' ', $message) ?? $message);
|
|
}
|
|
|
|
return [
|
|
'data' => $data,
|
|
'result' => ['ok' => true],
|
|
];
|
|
});
|
|
} catch (Throwable) {
|
|
// Job status persistence must never break the already-running stream.
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @param callable(array<string, mixed>|null): array<string, mixed> $callback
|
|
*
|
|
* @return array<string, mixed>
|
|
*/
|
|
private function mutateJobWithLock(string $jobId, callable $callback): array
|
|
{
|
|
$path = $this->jobPath($jobId);
|
|
if (!is_file($path)) {
|
|
return ['ok' => false, 'reason' => 'missing'];
|
|
}
|
|
|
|
$handle = @fopen($path, 'c+');
|
|
|
|
if ($handle === false) {
|
|
return ['ok' => false, 'reason' => 'missing'];
|
|
}
|
|
|
|
try {
|
|
if (!flock($handle, LOCK_EX)) {
|
|
return ['ok' => false, 'reason' => 'lock_failed'];
|
|
}
|
|
|
|
clearstatcache(true, $path);
|
|
$size = is_file($path) ? (int) filesize($path) : 0;
|
|
rewind($handle);
|
|
$content = $size > 0 ? stream_get_contents($handle) : '';
|
|
$data = null;
|
|
|
|
if (is_string($content) && trim($content) !== '') {
|
|
$decoded = json_decode($content, true);
|
|
$data = is_array($decoded) ? $decoded : null;
|
|
}
|
|
|
|
$mutation = $callback($data);
|
|
$result = is_array($mutation['result'] ?? null) ? $mutation['result'] : [];
|
|
|
|
if (($mutation['delete'] ?? false) === true) {
|
|
ftruncate($handle, 0);
|
|
fflush($handle);
|
|
flock($handle, LOCK_UN);
|
|
fclose($handle);
|
|
@unlink($path);
|
|
|
|
return $result;
|
|
}
|
|
|
|
if (array_key_exists('data', $mutation) && is_array($mutation['data'])) {
|
|
$json = json_encode($mutation['data'], JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
|
|
rewind($handle);
|
|
ftruncate($handle, 0);
|
|
fwrite($handle, $json);
|
|
fflush($handle);
|
|
}
|
|
|
|
flock($handle, LOCK_UN);
|
|
|
|
return $result;
|
|
} catch (Throwable $e) {
|
|
@flock($handle, LOCK_UN);
|
|
} finally {
|
|
if (is_resource($handle)) {
|
|
@fclose($handle);
|
|
}
|
|
}
|
|
return [];
|
|
}
|
|
|
|
private function resolveLastEventId(Request $request): int
|
|
{
|
|
$header = trim((string) $request->headers->get('Last-Event-ID', ''));
|
|
|
|
if ($header === '' || !ctype_digit($header)) {
|
|
return 0;
|
|
}
|
|
|
|
return max(0, (int) $header);
|
|
}
|
|
|
|
/**
|
|
* @param array<string, mixed> $claim
|
|
*/
|
|
private function canReplayOrTailClaimedJob(array $claim): bool
|
|
{
|
|
if (($claim['reason'] ?? null) !== 'not_pending') {
|
|
return false;
|
|
}
|
|
|
|
return in_array(
|
|
(string) ($claim['status'] ?? ''),
|
|
[
|
|
self::JOB_STATUS_RUNNING,
|
|
self::JOB_STATUS_COMPLETED,
|
|
self::JOB_STATUS_INTERRUPTED,
|
|
self::JOB_STATUS_FAILED,
|
|
],
|
|
true
|
|
);
|
|
}
|
|
|
|
private function streamStoredJobResponse(string $jobId, int $lastEventId): void
|
|
{
|
|
$afterEventId = max(0, $lastEventId);
|
|
$lastKeepaliveAt = 0;
|
|
|
|
while (true) {
|
|
if (connection_aborted() === 1) {
|
|
return;
|
|
}
|
|
|
|
foreach ($this->readJobOutputAfter($jobId, $afterEventId) as $event) {
|
|
$eventId = (int) ($event['id'] ?? 0);
|
|
$data = is_string($event['data'] ?? null) ? (string) $event['data'] : '';
|
|
|
|
if ($eventId <= $afterEventId || $data === '') {
|
|
continue;
|
|
}
|
|
|
|
$this->sendData($data, $eventId);
|
|
$afterEventId = $eventId;
|
|
}
|
|
|
|
$job = $this->readJob($jobId);
|
|
$job = is_array($job) ? ($this->markRunningJobFailedIfStale($jobId, $job) ?? $job) : null;
|
|
$status = is_array($job) ? (string) ($job['status'] ?? '') : '';
|
|
$message = is_array($job) && is_string($job['message'] ?? null)
|
|
? trim((string) $job['message'])
|
|
: '';
|
|
|
|
if ($status === self::JOB_STATUS_COMPLETED) {
|
|
$this->sendComment('replayed-completed-stream');
|
|
$this->sendDoneEvent();
|
|
return;
|
|
}
|
|
|
|
if ($status === self::JOB_STATUS_FAILED) {
|
|
$this->sendEvent(
|
|
'error',
|
|
$message !== ''
|
|
? 'Der Antwort-Stream ist fehlgeschlagen: ' . $message
|
|
: 'Der Antwort-Stream ist fehlgeschlagen. Bitte sende die Anfrage erneut.'
|
|
);
|
|
$this->sendDoneEvent();
|
|
return;
|
|
}
|
|
|
|
if ($status === self::JOB_STATUS_INTERRUPTED) {
|
|
$this->sendEvent(
|
|
'error',
|
|
$message !== ''
|
|
? $message
|
|
: 'Der Antwort-Stream wurde durch einen Verbindungsabbruch unterbrochen. Bitte sende die Anfrage erneut, falls die Antwort unvollständig ist.'
|
|
);
|
|
$this->sendDoneEvent();
|
|
return;
|
|
}
|
|
|
|
if ($status !== self::JOB_STATUS_RUNNING) {
|
|
$this->sendEvent('error', $this->jobClaimErrorMessage([
|
|
'reason' => 'not_pending',
|
|
'status' => $status,
|
|
'message' => $message,
|
|
]));
|
|
$this->sendDoneEvent();
|
|
return;
|
|
}
|
|
|
|
if (time() - $lastKeepaliveAt >= 10) {
|
|
$this->sendComment('waiting-for-running-stream');
|
|
$lastKeepaliveAt = time();
|
|
}
|
|
|
|
usleep(250000);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @return list<array{id: int, data: string}>
|
|
*/
|
|
private function readJobOutputAfter(string $jobId, int $afterEventId): array
|
|
{
|
|
$path = $this->jobOutputPath($jobId);
|
|
|
|
if (!is_file($path)) {
|
|
return [];
|
|
}
|
|
|
|
$lines = @file($path, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES);
|
|
|
|
if (!is_array($lines)) {
|
|
return [];
|
|
}
|
|
|
|
$events = [];
|
|
|
|
foreach ($lines as $line) {
|
|
if (!is_string($line) || trim($line) === '') {
|
|
continue;
|
|
}
|
|
|
|
$decoded = json_decode($line, true);
|
|
|
|
if (!is_array($decoded)) {
|
|
continue;
|
|
}
|
|
|
|
$id = (int) ($decoded['id'] ?? 0);
|
|
$data = is_string($decoded['data'] ?? null) ? (string) $decoded['data'] : '';
|
|
|
|
if ($id > $afterEventId && $data !== '') {
|
|
$events[] = ['id' => $id, 'data' => $data];
|
|
}
|
|
}
|
|
|
|
return $events;
|
|
}
|
|
|
|
private function appendJobOutput(?string $jobId, string $data): ?int
|
|
{
|
|
if ($jobId === null || $data === '' || !preg_match('/\A[a-f0-9]{48}\z/', $jobId)) {
|
|
return null;
|
|
}
|
|
|
|
$eventId = null;
|
|
|
|
try {
|
|
$this->mutateJobWithLock($jobId, function (?array $job) use (&$eventId): array {
|
|
if ($job === null) {
|
|
return [
|
|
'persist' => false,
|
|
'result' => ['ok' => false],
|
|
];
|
|
}
|
|
|
|
if ((string) ($job['status'] ?? '') !== self::JOB_STATUS_RUNNING) {
|
|
return [
|
|
'persist' => false,
|
|
'result' => ['ok' => false, 'reason' => 'job_not_running'],
|
|
];
|
|
}
|
|
|
|
$eventId = max(0, (int) ($job['lastEventId'] ?? 0)) + 1;
|
|
$job['lastEventId'] = $eventId;
|
|
$job['updatedAt'] = time();
|
|
|
|
return [
|
|
'data' => $job,
|
|
'result' => ['ok' => true],
|
|
];
|
|
});
|
|
|
|
if ($eventId === null || $eventId <= 0) {
|
|
return null;
|
|
}
|
|
|
|
$line = json_encode(
|
|
['id' => $eventId, 'data' => $data],
|
|
JSON_THROW_ON_ERROR | JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES
|
|
) . "\n";
|
|
|
|
if (file_put_contents($this->jobOutputPath($jobId), $line, FILE_APPEND | LOCK_EX) === false) {
|
|
return null;
|
|
}
|
|
} catch (Throwable) {
|
|
return null;
|
|
}
|
|
|
|
return $eventId;
|
|
}
|
|
|
|
/**
|
|
* @return array<string, mixed>|null
|
|
*/
|
|
private function readJob(string $jobId): ?array
|
|
{
|
|
if (!preg_match('/\A[a-f0-9]{48}\z/', $jobId)) {
|
|
return null;
|
|
}
|
|
|
|
$path = $this->jobPath($jobId);
|
|
|
|
if (!is_file($path)) {
|
|
return null;
|
|
}
|
|
|
|
$handle = @fopen($path, 'r');
|
|
|
|
if ($handle === false) {
|
|
return null;
|
|
}
|
|
|
|
try {
|
|
if (!flock($handle, LOCK_SH)) {
|
|
return null;
|
|
}
|
|
|
|
$content = stream_get_contents($handle);
|
|
flock($handle, LOCK_UN);
|
|
} finally {
|
|
fclose($handle);
|
|
}
|
|
|
|
if (!is_string($content) || trim($content) === '') {
|
|
return null;
|
|
}
|
|
|
|
$decoded = json_decode($content, true);
|
|
|
|
return is_array($decoded) ? $decoded : null;
|
|
}
|
|
|
|
/**
|
|
* @param array<string, mixed> $job
|
|
*
|
|
* @return array<string, mixed>|null
|
|
*/
|
|
private function markRunningJobFailedIfStale(string $jobId, array $job): ?array
|
|
{
|
|
if ((string) ($job['status'] ?? '') !== self::JOB_STATUS_RUNNING) {
|
|
return $job;
|
|
}
|
|
|
|
$lastProgressAt = max(
|
|
(int) ($job['updatedAt'] ?? 0),
|
|
(int) ($job['startedAt'] ?? 0),
|
|
(int) ($job['createdAt'] ?? 0)
|
|
);
|
|
|
|
if ($lastProgressAt <= 0 || time() - $lastProgressAt <= self::JOB_RUNNING_STALE_SECONDS) {
|
|
return $job;
|
|
}
|
|
|
|
$message = 'Der Antwort-Job liefert seit längerer Zeit keine neuen Daten. Der Stream wurde beendet, damit die Oberfläche nicht hängen bleibt.';
|
|
$this->markJobStatus($jobId, self::JOB_STATUS_FAILED, $message);
|
|
|
|
$freshJob = $this->readJob($jobId);
|
|
|
|
return is_array($freshJob) ? $freshJob : $job;
|
|
}
|
|
|
|
/**
|
|
* @param array<string, mixed> $claim
|
|
*/
|
|
private function jobClaimErrorMessage(array $claim): string
|
|
{
|
|
$reason = (string) ($claim['reason'] ?? 'missing');
|
|
$status = (string) ($claim['status'] ?? '');
|
|
$message = is_string($claim['message'] ?? null) ? trim((string) $claim['message']) : '';
|
|
|
|
if ($reason === 'expired') {
|
|
return 'Der Antwort-Job ist abgelaufen. Bitte sende die Anfrage erneut.';
|
|
}
|
|
|
|
if ($reason === 'invalid') {
|
|
return 'Der Antwort-Job ist ungültig. Bitte sende die Anfrage erneut.';
|
|
}
|
|
|
|
if ($reason === 'lock_failed') {
|
|
return 'Der Antwort-Job ist gerade gesperrt. Bitte sende die Anfrage erneut, falls keine Antwort erscheint.';
|
|
}
|
|
|
|
if ($reason === 'not_pending') {
|
|
if ($status === self::JOB_STATUS_RUNNING) {
|
|
return 'Der Antwort-Stream läuft bereits oder wurde nach einem Verbindungsabbruch erneut geöffnet. Bitte sende die Anfrage erneut, falls die Antwort unvollständig ist.';
|
|
}
|
|
|
|
if ($status === self::JOB_STATUS_INTERRUPTED) {
|
|
return 'Der Antwort-Stream wurde durch einen Verbindungsabbruch unterbrochen. Bitte sende die Anfrage erneut, falls die Antwort unvollständig ist.';
|
|
}
|
|
|
|
if ($status === self::JOB_STATUS_COMPLETED) {
|
|
return 'Der Antwort-Stream wurde bereits abgeschlossen. Bitte sende eine neue Anfrage, wenn du eine weitere Antwort brauchst.';
|
|
}
|
|
|
|
if ($status === self::JOB_STATUS_FAILED) {
|
|
return $message !== ''
|
|
? 'Der Antwort-Stream ist fehlgeschlagen: ' . $message
|
|
: 'Der Antwort-Stream ist fehlgeschlagen. Bitte sende die Anfrage erneut.';
|
|
}
|
|
}
|
|
|
|
return 'Der Antwort-Job wurde nicht gefunden. Falls deine Verbindung kurz unterbrochen war, sende die Anfrage bitte erneut.';
|
|
}
|
|
private function cleanupExpiredJobs(): void
|
|
{
|
|
$directory = $this->jobDirectory();
|
|
|
|
if (!is_dir($directory)) {
|
|
return;
|
|
}
|
|
|
|
$threshold = time() - self::JOB_TTL_SECONDS;
|
|
|
|
foreach (glob($directory . '/*.json') ?: [] as $path) {
|
|
if (!is_file($path)) {
|
|
continue;
|
|
}
|
|
|
|
$mtime = filemtime($path);
|
|
|
|
if ($mtime === false || $mtime < $threshold) {
|
|
$base = preg_replace('/\.json\z/', '', $path) ?? $path;
|
|
@unlink($path);
|
|
@unlink($base . '.stream.ndjson');
|
|
}
|
|
}
|
|
}
|
|
|
|
private function jobPath(string $jobId): string
|
|
{
|
|
return $this->jobDirectory() . '/' . $jobId . '.json';
|
|
}
|
|
|
|
private function jobOutputPath(string $jobId): string
|
|
{
|
|
return $this->jobDirectory() . '/' . $jobId . '.stream.ndjson';
|
|
}
|
|
|
|
private function jobDirectory(): string
|
|
{
|
|
return rtrim($this->projectDir, '/\\') . '/var/stream_jobs';
|
|
}
|
|
}
|