diff --git a/src/Command/IngestRunJobCommand.php b/src/Command/IngestRunJobCommand.php index 7ca53f9..cd233ae 100644 --- a/src/Command/IngestRunJobCommand.php +++ b/src/Command/IngestRunJobCommand.php @@ -11,6 +11,7 @@ use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; #[AsCommand(name: 'mto:agent:ingest:run')] @@ -26,12 +27,14 @@ final class IngestRunJobCommand extends Command protected function configure(): void { $this - ->addArgument('jobId', InputArgument::REQUIRED, 'UUID of IngestJob'); + ->addArgument('jobId', InputArgument::REQUIRED, 'UUID of IngestJob') + ->addOption('dry-run', null, InputOption::VALUE_NONE, 'Run without executing heavy operations'); } protected function execute(InputInterface $input, OutputInterface $output): int { $jobId = (string) $input->getArgument('jobId'); + $dryRun = (bool) $input->getOption('dry-run'); /** @var IngestJob|null $job */ $job = $this->em->getRepository(IngestJob::class)->find($jobId); @@ -41,19 +44,37 @@ final class IngestRunJobCommand extends Command return Command::FAILURE; } - // Idempotenz: wenn der Job bereits beendet ist, einfach ok zurück. - if (in_array($job->getStatus(), [IngestJob::STATUS_COMPLETED, IngestJob::STATUS_FAILED, IngestJob::STATUS_ABORTED], true)) { + // Idempotenz: Bereits abgeschlossene Jobs nicht erneut ausführen + if (in_array($job->getStatus(), [ + IngestJob::STATUS_COMPLETED, + IngestJob::STATUS_FAILED, + IngestJob::STATUS_ABORTED, + ], true)) { $output->writeln('Job already finished.'); return Command::SUCCESS; } try { - $output->writeln(sprintf('Running ingest job %s ...', (string) $job->getId())); - $this->orchestrator->runExistingJob($job, false); - $output->writeln('Job completed.'); + $output->writeln(sprintf( + 'Running ingest job %s (type: %s)...', + (string) $job->getId(), + $job->getType() + )); + + $this->orchestrator->runExistingJob($job, $dryRun); + + $output->writeln('Job completed successfully.'); + return Command::SUCCESS; + } catch (\Throwable $e) { - $output->writeln(sprintf('Job failed: %s', $e->getMessage())); + + // Wichtig: Status wird im Orchestrator gesetzt + $output->writeln(sprintf( + 'Job failed: %s', + $e->getMessage() + )); + return Command::FAILURE; } } diff --git a/src/Controller/Admin/IngestJobController.php b/src/Controller/Admin/IngestJobController.php index 6884577..16f0f5d 100644 --- a/src/Controller/Admin/IngestJobController.php +++ b/src/Controller/Admin/IngestJobController.php @@ -3,12 +3,12 @@ namespace App\Controller\Admin; use App\Entity\IngestJob; +use App\Service\IngestJobService; use Doctrine\ORM\EntityManagerInterface; use Symfony\Bundle\FrameworkBundle\Controller\AbstractController; use Symfony\Component\HttpFoundation\Response; use Symfony\Component\HttpKernel\Exception\NotFoundHttpException; use Symfony\Component\Routing\Attribute\Route; -use App\Ingest\IngestFlow; use Symfony\Component\HttpFoundation\RedirectResponse; use Symfony\Component\HttpFoundation\JsonResponse; @@ -73,12 +73,50 @@ class IngestJobController extends AbstractController #[Route('/global-reindex', name: 'admin_global_reindex', methods: ['POST'])] public function globalReindex( - IngestFlow $flow + IngestJobService $jobService, ): RedirectResponse { + $this->denyAccessUnlessGranted('ROLE_SUPER_ADMIN'); - $flow->globalReindex(); + // --------------------------------------------------------- + // 1) Job anlegen (QUEUED) + // --------------------------------------------------------- + $job = $jobService->startJob( + IngestJob::TYPE_GLOBAL_REINDEX, + $this->getUser(), + null, + null, + null, + IngestJob::STATUS_QUEUED + ); - return $this->redirectToRoute('admin_jobs'); + // --------------------------------------------------------- + // 2) CLI im Hintergrund starten + // --------------------------------------------------------- + $projectDir = (string)$this->getParameter('kernel.project_dir'); + $console = $projectDir . '/bin/console'; + + $cmd = sprintf( + '%s %s %s %s > /dev/null 2>&1 &', + escapeshellarg($console), + escapeshellarg('mto:agent:ingest:run'), + escapeshellarg((string)$job->getId()), + escapeshellarg('--no-interaction'), + ); + + if (!function_exists('exec')) { + $jobService->markFailed($job, 'Server configuration does not allow background execution (exec disabled).'); + $this->addFlash('danger', 'Global Reindex konnte nicht gestartet werden.'); + return $this->redirectToRoute('admin_jobs'); + } + + exec($cmd); + + // --------------------------------------------------------- + // 3) Redirect auf Job-Detailseite (Loader) + // --------------------------------------------------------- + return $this->redirectToRoute('admin_job_show', [ + 'id' => (string)$job->getId(), + ]); } } diff --git a/src/Ingest/IngestFlow.php b/src/Ingest/IngestFlow.php index d8fc3fe..ac1cfb5 100644 --- a/src/Ingest/IngestFlow.php +++ b/src/Ingest/IngestFlow.php @@ -4,6 +4,7 @@ declare(strict_types=1); namespace App\Ingest; +use App\Entity\Document; use App\Entity\DocumentVersion; use App\Index\IndexMetaManager; use App\Knowledge\ChunkManager; @@ -40,7 +41,6 @@ final readonly class IngestFlow try { - // Entfernt alte Chunks dieses Dokuments $this->chunkManager->compactByDocument($version->getDocument()->getId()); $existing = $this->chunkManager->countAllChunks(); @@ -86,10 +86,39 @@ final readonly class IngestFlow public function globalReindex(): void { - $records = $this->knowledgeIngestService->buildAllActiveChunkRecords(); + $this->metaManager->validateAgainstCurrent(); + // 1️⃣ Prüfen ob aktive Dokumente existieren + $activeDocuments = $this->em + ->getRepository(Document::class) + ->createQueryBuilder('d') + ->where('d.status = :status') + ->setParameter('status', Document::STATUS_ACTIVE) + ->getQuery() + ->getResult(); + + if (empty($activeDocuments)) { + throw new \RuntimeException( + 'Global Reindex abgebrochen: Es sind keine aktiven Dokumente vorhanden.' + ); + } + + // 2️⃣ ChunkRecords erzeugen + $records = iterator_to_array( + $this->knowledgeIngestService->buildAllActiveChunkRecords(), + false + ); + + if (empty($records)) { + throw new \RuntimeException( + 'Global Reindex abgebrochen: Es wurden keine Chunks erzeugt. Bitte prüfen Sie die Dokumente.' + ); + } + + // 3️⃣ Rewrite NDJSON $this->chunkManager->rewriteAll($records); + // 4️⃣ Rebuild Vector Index $this->rebuildIndex(true); } @@ -102,20 +131,17 @@ final readonly class IngestFlow $this->metaManager->validateAgainstCurrent(); $document = $this->em - ->getRepository(\App\Entity\Document::class) + ->getRepository(Document::class) ->find($documentId); if (!$document) { throw new \RuntimeException('Document not found.'); } - // 1) NDJSON bereinigen $this->chunkManager->compactByDocument($documentId); - // 2) Vector neu bauen $this->rebuildIndex(false); - // 3) DB Delete (nach rebuild) $this->em->remove($document); $this->em->flush(); } diff --git a/templates/admin/job/show.html.twig b/templates/admin/job/show.html.twig index c057681..300067d 100644 --- a/templates/admin/job/show.html.twig +++ b/templates/admin/job/show.html.twig @@ -11,7 +11,7 @@

Ingest Job

-
+
@@ -25,23 +25,32 @@
Status: - {% if job.status == 'COMPLETED' %} - COMPLETED - {% elseif job.status == 'QUEUED' %} - QUEUED - {% elseif job.status == 'RUNNING' %} - RUNNING - {% elseif job.status == 'FAILED' %} - FAILED - {% else %} - {{ job.status }} - {% endif %} + {% if job.status == 'COMPLETED' %} + COMPLETED + {% elseif job.status == 'QUEUED' %} + QUEUED + {% elseif job.status == 'RUNNING' %} + RUNNING + {% elseif job.status == 'FAILED' %} + FAILED + {% elseif job.status == 'ABORTED' %} + ABORTED + {% else %} + {{ job.status }} + {% endif %}
Dokument: - {{ job.documentId ?? '-' }} + {% if job.documentId %} + + {{ job.documentId }} + + {% else %} + - + {% endif %}
@@ -57,46 +66,44 @@
Beendet: - {% if job.finishedAt %} - {{ job.finishedAt|date('d.m.Y H:i:s') }} - {% else %} - - - {% endif %} + {{ job.finishedAt ? job.finishedAt|date('d.m.Y H:i:s') : '-' }}
Gestartet von: - {% if job.startedBy %} - {{ job.startedBy.email }} - {% else %} - - - {% endif %} + {{ job.startedBy ? job.startedBy.email : '-' }}
- {% endblock %}