From c099f72703cfb78af014af0cb810aa19e749f33b Mon Sep 17 00:00:00 2001 From: team 1 Date: Sun, 15 Feb 2026 16:01:08 +0100 Subject: [PATCH] harden code --- config/packages/messenger.yaml | 19 +-- src/Command/IngestRunJobCommand.php | 60 ++++++++ src/Command/KnowledgeIngestCommand.php | 2 +- src/Controller/Admin/DocumentController.php | 60 ++++++-- src/Controller/Admin/IngestJobController.php | 31 +++- src/Entity/IngestJob.php | 6 + src/Ingest/IngestFlow.php | 4 - src/Service/FormatText.php | 30 ++++ src/Service/IngestJobService.php | 6 +- src/Service/IngestOrchestrator.php | 141 +++++++++++++++++-- templates/admin/document/new.html.twig | 2 +- templates/admin/job/index.html.twig | 4 +- templates/admin/job/show.html.twig | 91 +++++++++++- 13 files changed, 397 insertions(+), 59 deletions(-) create mode 100644 src/Command/IngestRunJobCommand.php create mode 100644 src/Service/FormatText.php diff --git a/config/packages/messenger.yaml b/config/packages/messenger.yaml index 19db483..4b5d2e4 100644 --- a/config/packages/messenger.yaml +++ b/config/packages/messenger.yaml @@ -1,22 +1,7 @@ framework: messenger: - # Uncomment this (and the failed transport below) to send failed messages to this transport for later handling. - # failure_transport: failed - transports: - # https://symfony.com/doc/current/messenger.html#transport-configuration - # async: '%env(MESSENGER_TRANSPORT_DSN)%' - # failed: 'doctrine://default?queue_name=failed' - sync: 'sync://' + async: '%env(MESSENGER_TRANSPORT_DSN)%' routing: - # Route your messages to the transports - # 'App\Message\YourMessage': async - -# when@test: -# framework: -# messenger: -# transports: -# # replace with your transport name here (e.g., my_transport: 'in-memory://') -# # For more Messenger testing tools, see https://github.com/zenstruck/messenger-test -# async: 'in-memory://' + #'App\Message\IngestDocumentMessage': async \ No newline at end of file diff --git a/src/Command/IngestRunJobCommand.php b/src/Command/IngestRunJobCommand.php new file mode 100644 index 0000000..7ca53f9 --- /dev/null +++ b/src/Command/IngestRunJobCommand.php @@ -0,0 +1,60 @@ +addArgument('jobId', InputArgument::REQUIRED, 'UUID of IngestJob'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + $jobId = (string) $input->getArgument('jobId'); + + /** @var IngestJob|null $job */ + $job = $this->em->getRepository(IngestJob::class)->find($jobId); + + if (!$job) { + $output->writeln('IngestJob not found.'); + return Command::FAILURE; + } + + // Idempotenz: wenn der Job bereits beendet ist, einfach ok zurück. + if (in_array($job->getStatus(), [IngestJob::STATUS_COMPLETED, IngestJob::STATUS_FAILED, IngestJob::STATUS_ABORTED], true)) { + $output->writeln('Job already finished.'); + return Command::SUCCESS; + } + + try { + $output->writeln(sprintf('Running ingest job %s ...', (string) $job->getId())); + $this->orchestrator->runExistingJob($job, false); + $output->writeln('Job completed.'); + return Command::SUCCESS; + } catch (\Throwable $e) { + $output->writeln(sprintf('Job failed: %s', $e->getMessage())); + return Command::FAILURE; + } + } +} diff --git a/src/Command/KnowledgeIngestCommand.php b/src/Command/KnowledgeIngestCommand.php index d64ccda..d3bded9 100644 --- a/src/Command/KnowledgeIngestCommand.php +++ b/src/Command/KnowledgeIngestCommand.php @@ -46,7 +46,7 @@ class KnowledgeIngestCommand extends Command $output->writeln('Starting ingest...'); - $job = $this->orchestrator->runForVersion($version, $user, false); + $job = $this->orchestrator->runForVersion($version, $user); $output->writeln(sprintf('Ingest completed. Job: %s', (string) $job->getId())); diff --git a/src/Controller/Admin/DocumentController.php b/src/Controller/Admin/DocumentController.php index bf8d800..e8bcc12 100644 --- a/src/Controller/Admin/DocumentController.php +++ b/src/Controller/Admin/DocumentController.php @@ -1,13 +1,13 @@ isMethod('POST')) { - $title = $request->request->get('title'); + $file = $request->files->get('file'); + $title = $request->request->get('title') ?: $file->getClientOriginalName(); + $title = $this->formatText->slugify($title); if (!$file || !$title) { $this->addFlash('error', 'Titel und Datei sind erforderlich.'); @@ -191,7 +198,7 @@ class DocumentController extends AbstractController string $versionId, Request $request, EntityManagerInterface $em, - IngestOrchestrator $orchestrator + IngestJobService $jobService, ): ?RedirectResponse { $dryRun = false; if (!$this->isCsrfTokenValid('ingest_version', $request->request->get('_token'))) { @@ -214,14 +221,47 @@ class DocumentController extends AbstractController return null; } - $orchestrator->runForVersion( - $version, + // --------------------------------------------------------- + // Asynchroner Ingest (ohne Messenger): + // 1) Job als QUEUED anlegen + // 2) Symfony-Command im Hintergrund starten + // 3) Direkt auf Job-Detailseite redirecten (Loader + Polling) + // --------------------------------------------------------- + + $job = $jobService->startJob( + IngestJob::TYPE_DOCUMENT, $this->getUser(), - $dryRun + $version->getDocument()->getId(), + $version->getId(), + null, + IngestJob::STATUS_QUEUED ); - return $this->redirectToRoute('admin_document_show', [ - 'id' => $version->getDocument()->getId() + // Hintergrundprozess starten (Provider-kompatibel, kein Worker/Daemon) + $projectDir = (string) $this->getParameter('kernel.project_dir'); + $console = $projectDir . '/bin/console'; + + $cmd = sprintf( + '%s %s %s %s > /dev/null 2>&1 &', + escapeshellarg($console), + escapeshellarg('mto:agent:ingest:run'), + escapeshellarg((string) $job->getId()), + escapeshellarg('--no-interaction'), + ); + + // Best effort: wenn exec deaktiviert ist, sauber abbrechen. + if (!\function_exists('exec')) { + $jobService->markFailed($job, 'Server configuration does not allow background execution (exec disabled).'); + $this->addFlash('error', 'Ingest konnte nicht asynchron gestartet werden (exec deaktiviert).'); + return $this->redirectToRoute('admin_document_show', [ + 'id' => $version->getDocument()->getId() + ]); + } + + exec($cmd); + + return $this->redirectToRoute('admin_job_show', [ + 'id' => (string) $job->getId(), ]); } diff --git a/src/Controller/Admin/IngestJobController.php b/src/Controller/Admin/IngestJobController.php index dfc43db..6884577 100644 --- a/src/Controller/Admin/IngestJobController.php +++ b/src/Controller/Admin/IngestJobController.php @@ -1,6 +1,5 @@ '[0-9a-fA-F\-]{36}'], + methods: ['GET'] + )] + public function status(string $id, EntityManagerInterface $em): JsonResponse + { + $this->denyAccessUnlessGranted('ROLE_USER'); + + /** @var IngestJob|null $job */ + $job = $em->getRepository(IngestJob::class)->find($id); + + if (!$job) { + throw new NotFoundHttpException(); + } + + return $this->json([ + 'id' => (string) $job->getId(), + 'type' => $job->getType(), + 'status' => $job->getStatus(), + 'startedAt' => $job->getStartedAt()->format(DATE_ATOM), + 'finishedAt' => $job->getFinishedAt()?->format(DATE_ATOM), + 'errorMessage' => $job->getErrorMessage(), + ]); + } + #[Route('/global-reindex', name: 'admin_global_reindex', methods: ['POST'])] public function globalReindex( IngestFlow $flow ): RedirectResponse { $this->denyAccessUnlessGranted('ROLE_SUPER_ADMIN'); - $flow->globalReindex($this->getUser()); + $flow->globalReindex(); return $this->redirectToRoute('admin_jobs'); } diff --git a/src/Entity/IngestJob.php b/src/Entity/IngestJob.php index 8ee801b..b0bd125 100644 --- a/src/Entity/IngestJob.php +++ b/src/Entity/IngestJob.php @@ -11,6 +11,7 @@ class IngestJob public const TYPE_DOCUMENT = 'DOCUMENT'; public const TYPE_GLOBAL_REINDEX = 'GLOBAL_REINDEX'; + public const STATUS_QUEUED = 'QUEUED'; public const STATUS_RUNNING = 'RUNNING'; public const STATUS_COMPLETED = 'COMPLETED'; public const STATUS_FAILED = 'FAILED'; @@ -94,6 +95,11 @@ class IngestJob $this->finishedAt = new \DateTimeImmutable(); } + public function markRunning(): void + { + $this->status = self::STATUS_RUNNING; + } + public function getErrorMessage(): ?string { return $this->errorMessage; diff --git a/src/Ingest/IngestFlow.php b/src/Ingest/IngestFlow.php index b74271a..dc6a87a 100644 --- a/src/Ingest/IngestFlow.php +++ b/src/Ingest/IngestFlow.php @@ -28,16 +28,12 @@ final readonly class IngestFlow ): void { $this->metaManager->validateAgainstCurrent(); - $this->chunkManager->compactByDocument( $version->getDocument()->getId() ); - $records = $this->knowledgeIngestService ->buildChunkRecords($version); - $this->chunkManager->appendChunks($records); - $this->vectorBuilder->rebuildFromNdjson(); } diff --git a/src/Service/FormatText.php b/src/Service/FormatText.php new file mode 100644 index 0000000..5f26a93 --- /dev/null +++ b/src/Service/FormatText.php @@ -0,0 +1,30 @@ + 'ae', + 'ö' => 'oe', + 'ü' => 'ue', + 'ß' => 'ss' + ]; + $text = str_replace(array_keys($replacements), $replacements, $text); + + // Nicht erlaubte Zeichen entfernen + $text = preg_replace('/[^a-z0-9\s.-]/', '', $text); + + // Leerzeichen zu Bindestrichen + $text = preg_replace('/[\s-]+/', '-', $text); + + $text = preg_replace('/\./', '-', $text); + + return trim($text, '-'); + } +} \ No newline at end of file diff --git a/src/Service/IngestJobService.php b/src/Service/IngestJobService.php index 7689629..1b8039d 100644 --- a/src/Service/IngestJobService.php +++ b/src/Service/IngestJobService.php @@ -1,6 +1,5 @@ setStartedBy($user); $job->setDocumentId($documentId); $job->setDocumentVersionId($documentVersionId); diff --git a/src/Service/IngestOrchestrator.php b/src/Service/IngestOrchestrator.php index d286ed9..c39c2c9 100644 --- a/src/Service/IngestOrchestrator.php +++ b/src/Service/IngestOrchestrator.php @@ -7,6 +7,7 @@ use App\Entity\IngestJob; use App\Entity\User; use App\Ingest\IngestFlow; use Doctrine\ORM\EntityManagerInterface; +use Symfony\Component\Uid\Uuid; final class IngestOrchestrator { @@ -19,14 +20,14 @@ final class IngestOrchestrator } /** - * Startet Ingest für eine bestimmte DocumentVersion (1 Job pro Run). - * @throws \Throwable + * SYNCHRONE Variante (falls noch genutzt) */ public function runForVersion( DocumentVersion $version, User $user, bool $dryRun = false ): IngestJob { + if (!$this->lockService->acquire()) { throw new \RuntimeException('Another ingest job is already running.'); } @@ -34,16 +35,12 @@ final class IngestOrchestrator $job = null; try { - // Governance: nur PENDING/FAILED erlauben $status = $version->getIngestStatus(); - if (!in_array($status, [ - DocumentVersion::INGEST_PENDING, - DocumentVersion::INGEST_FAILED, - ], true)) { - throw new \RuntimeException(sprintf('Ingest not allowed for status "%s".', $status)); + + if ($status === DocumentVersion::INGEST_INDEXED) { + throw new \RuntimeException('DocumentVersion already indexed.'); } - // Job anlegen (einmal!) $job = $this->jobService->startJob( IngestJob::TYPE_DOCUMENT, $user, @@ -51,18 +48,15 @@ final class IngestOrchestrator $version->getId(), ); - // Status → RUNNING $version->setIngestStatus(DocumentVersion::INGEST_RUNNING); $this->em->flush(); if ($dryRun) { usleep(200000); } else { - // Fachlogik ausführen (Flow erzeugt keine Jobs!) $this->ingestFlow->ingestDocumentVersion($version); } - // Erfolg $version->setIngestStatus(DocumentVersion::INGEST_INDEXED); $this->jobService->markCompleted($job); $this->em->flush(); @@ -86,7 +80,120 @@ final class IngestOrchestrator } /** - * Globaler Reindex aller aktiven Dokumente. + * ASYNCHRONE Variante (Detached CLI) + */ + public function runExistingJob(IngestJob $job, bool $dryRun = false): void + { + if (!$this->lockService->acquire()) { + throw new \RuntimeException('Another ingest job is already running.'); + } + + try { + + // Falls Job bereits final ist → nichts tun (idempotent) + if (in_array($job->getStatus(), [ + IngestJob::STATUS_COMPLETED, + IngestJob::STATUS_FAILED, + IngestJob::STATUS_ABORTED, + ], true)) { + return; + } + + $job->markRunning(); + $this->em->flush(); + + // Global Reindex + if ($job->getType() === IngestJob::TYPE_GLOBAL_REINDEX) { + + if ($dryRun) { + usleep(200000); + } else { + $this->ingestFlow->globalReindex(); + } + + $this->jobService->markCompleted($job); + return; + } + + if ($job->getType() !== IngestJob::TYPE_DOCUMENT) { + throw new \RuntimeException(sprintf( + 'Unsupported ingest job type "%s".', + $job->getType() + )); + } + + $versionId = $job->getDocumentVersionId(); + + if (!$versionId instanceof Uuid) { + throw new \RuntimeException('Job has no document version id.'); + } + + /** @var DocumentVersion|null $version */ + $version = $this->em + ->getRepository(DocumentVersion::class) + ->find($versionId); + + if (!$version) { + throw new \RuntimeException('DocumentVersion not found.'); + } + + $status = $version->getIngestStatus(); + + // Nur blockieren wenn wirklich schon indexed + if ($status === DocumentVersion::INGEST_INDEXED) { + throw new \RuntimeException('DocumentVersion already indexed.'); + } + + // RUNNING darf hier erlaubt sein (async!) + if (!in_array($status, [ + DocumentVersion::INGEST_PENDING, + DocumentVersion::INGEST_FAILED, + DocumentVersion::INGEST_RUNNING, + ], true)) { + throw new \RuntimeException(sprintf( + 'Ingest not allowed for status "%s".', + $status + )); + } + + $version->setIngestStatus(DocumentVersion::INGEST_RUNNING); + $this->em->flush(); + + if ($dryRun) { + usleep(200000); + } else { + $this->ingestFlow->ingestDocumentVersion($version); + } + + $version->setIngestStatus(DocumentVersion::INGEST_INDEXED); + $this->jobService->markCompleted($job); + $this->em->flush(); + + } catch (\Throwable $e) { + + $this->jobService->markFailed($job, $e->getMessage()); + + $versionId = $job->getDocumentVersionId(); + if ($versionId instanceof Uuid) { + $version = $this->em + ->getRepository(DocumentVersion::class) + ->find($versionId); + + if ($version) { + $version->setIngestStatus(DocumentVersion::INGEST_FAILED); + $this->em->flush(); + } + } + + throw $e; + + } finally { + $this->lockService->release(); + } + } + + /** + * Globaler Reindex (synchron) */ public function runGlobal(User $user, bool $dryRun = false): IngestJob { @@ -97,12 +204,15 @@ final class IngestOrchestrator $job = null; try { - $job = $this->jobService->startJob(IngestJob::TYPE_GLOBAL_REINDEX, $user); + $job = $this->jobService->startJob( + IngestJob::TYPE_GLOBAL_REINDEX, + $user + ); if ($dryRun) { usleep(200000); } else { - $this->ingestFlow->globalReindex($job->getLogPath()); + $this->ingestFlow->globalReindex(); } $this->jobService->markCompleted($job); @@ -110,6 +220,7 @@ final class IngestOrchestrator return $job; } catch (\Throwable $e) { + if ($job) { $this->jobService->markFailed($job, $e->getMessage()); } diff --git a/templates/admin/document/new.html.twig b/templates/admin/document/new.html.twig index 20fac7d..a57d8bb 100644 --- a/templates/admin/document/new.html.twig +++ b/templates/admin/document/new.html.twig @@ -9,7 +9,7 @@
- +
diff --git a/templates/admin/job/index.html.twig b/templates/admin/job/index.html.twig index eb70a05..083cbf4 100644 --- a/templates/admin/job/index.html.twig +++ b/templates/admin/job/index.html.twig @@ -53,6 +53,8 @@ {% if job.status == 'COMPLETED' %} COMPLETED + {% elseif job.status == 'QUEUED' %} + QUEUED {% elseif job.status == 'RUNNING' %} RUNNING {% elseif job.status == 'FAILED' %} @@ -64,7 +66,7 @@ {% if job.documentId %} - {{ job.documentId }} + {{ job.documentId }} {% else %} - {% endif %} diff --git a/templates/admin/job/show.html.twig b/templates/admin/job/show.html.twig index 08c44f4..28a6f41 100644 --- a/templates/admin/job/show.html.twig +++ b/templates/admin/job/show.html.twig @@ -24,8 +24,11 @@
Status: + {% if job.status == 'COMPLETED' %} COMPLETED + {% elseif job.status == 'QUEUED' %} + QUEUED {% elseif job.status == 'RUNNING' %} RUNNING {% elseif job.status == 'FAILED' %} @@ -33,6 +36,7 @@ {% else %} {{ job.status }} {% endif %} +
@@ -52,11 +56,13 @@
Beendet: - {% if job.finishedAt %} - {{ job.finishedAt|date('d.m.Y H:i:s') }} - {% else %} - - - {% endif %} + + {% if job.finishedAt %} + {{ job.finishedAt|date('d.m.Y H:i:s') }} + {% else %} + - + {% endif %} +
@@ -68,6 +74,18 @@ {% endif %}
+ + + + {% if job.errorMessage %}
Fehler:
@@ -85,4 +103,67 @@
+ + {% endblock %}