add new route for global reindex
This commit is contained in:
@@ -11,6 +11,7 @@ use Symfony\Component\Console\Attribute\AsCommand;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
use Symfony\Component\Console\Input\InputArgument;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
|
||||
#[AsCommand(name: 'mto:agent:ingest:run')]
|
||||
@@ -26,12 +27,14 @@ final class IngestRunJobCommand extends Command
|
||||
protected function configure(): void
|
||||
{
|
||||
$this
|
||||
->addArgument('jobId', InputArgument::REQUIRED, 'UUID of IngestJob');
|
||||
->addArgument('jobId', InputArgument::REQUIRED, 'UUID of IngestJob')
|
||||
->addOption('dry-run', null, InputOption::VALUE_NONE, 'Run without executing heavy operations');
|
||||
}
|
||||
|
||||
protected function execute(InputInterface $input, OutputInterface $output): int
|
||||
{
|
||||
$jobId = (string) $input->getArgument('jobId');
|
||||
$dryRun = (bool) $input->getOption('dry-run');
|
||||
|
||||
/** @var IngestJob|null $job */
|
||||
$job = $this->em->getRepository(IngestJob::class)->find($jobId);
|
||||
@@ -41,19 +44,37 @@ final class IngestRunJobCommand extends Command
|
||||
return Command::FAILURE;
|
||||
}
|
||||
|
||||
// Idempotenz: wenn der Job bereits beendet ist, einfach ok zurück.
|
||||
if (in_array($job->getStatus(), [IngestJob::STATUS_COMPLETED, IngestJob::STATUS_FAILED, IngestJob::STATUS_ABORTED], true)) {
|
||||
// Idempotenz: Bereits abgeschlossene Jobs nicht erneut ausführen
|
||||
if (in_array($job->getStatus(), [
|
||||
IngestJob::STATUS_COMPLETED,
|
||||
IngestJob::STATUS_FAILED,
|
||||
IngestJob::STATUS_ABORTED,
|
||||
], true)) {
|
||||
$output->writeln('<info>Job already finished.</info>');
|
||||
return Command::SUCCESS;
|
||||
}
|
||||
|
||||
try {
|
||||
$output->writeln(sprintf('<info>Running ingest job %s ...</info>', (string) $job->getId()));
|
||||
$this->orchestrator->runExistingJob($job, false);
|
||||
$output->writeln('<info>Job completed.</info>');
|
||||
$output->writeln(sprintf(
|
||||
'<info>Running ingest job %s (type: %s)...</info>',
|
||||
(string) $job->getId(),
|
||||
$job->getType()
|
||||
));
|
||||
|
||||
$this->orchestrator->runExistingJob($job, $dryRun);
|
||||
|
||||
$output->writeln('<info>Job completed successfully.</info>');
|
||||
|
||||
return Command::SUCCESS;
|
||||
|
||||
} catch (\Throwable $e) {
|
||||
$output->writeln(sprintf('<error>Job failed: %s</error>', $e->getMessage()));
|
||||
|
||||
// Wichtig: Status wird im Orchestrator gesetzt
|
||||
$output->writeln(sprintf(
|
||||
'<error>Job failed: %s</error>',
|
||||
$e->getMessage()
|
||||
));
|
||||
|
||||
return Command::FAILURE;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,12 +3,12 @@
|
||||
namespace App\Controller\Admin;
|
||||
|
||||
use App\Entity\IngestJob;
|
||||
use App\Service\IngestJobService;
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
|
||||
use Symfony\Component\HttpFoundation\Response;
|
||||
use Symfony\Component\HttpKernel\Exception\NotFoundHttpException;
|
||||
use Symfony\Component\Routing\Attribute\Route;
|
||||
use App\Ingest\IngestFlow;
|
||||
use Symfony\Component\HttpFoundation\RedirectResponse;
|
||||
use Symfony\Component\HttpFoundation\JsonResponse;
|
||||
|
||||
@@ -73,12 +73,50 @@ class IngestJobController extends AbstractController
|
||||
|
||||
#[Route('/global-reindex', name: 'admin_global_reindex', methods: ['POST'])]
|
||||
public function globalReindex(
|
||||
IngestFlow $flow
|
||||
IngestJobService $jobService,
|
||||
): RedirectResponse {
|
||||
|
||||
$this->denyAccessUnlessGranted('ROLE_SUPER_ADMIN');
|
||||
|
||||
$flow->globalReindex();
|
||||
// ---------------------------------------------------------
|
||||
// 1) Job anlegen (QUEUED)
|
||||
// ---------------------------------------------------------
|
||||
$job = $jobService->startJob(
|
||||
IngestJob::TYPE_GLOBAL_REINDEX,
|
||||
$this->getUser(),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
IngestJob::STATUS_QUEUED
|
||||
);
|
||||
|
||||
return $this->redirectToRoute('admin_jobs');
|
||||
// ---------------------------------------------------------
|
||||
// 2) CLI im Hintergrund starten
|
||||
// ---------------------------------------------------------
|
||||
$projectDir = (string)$this->getParameter('kernel.project_dir');
|
||||
$console = $projectDir . '/bin/console';
|
||||
|
||||
$cmd = sprintf(
|
||||
'%s %s %s %s > /dev/null 2>&1 &',
|
||||
escapeshellarg($console),
|
||||
escapeshellarg('mto:agent:ingest:run'),
|
||||
escapeshellarg((string)$job->getId()),
|
||||
escapeshellarg('--no-interaction'),
|
||||
);
|
||||
|
||||
if (!function_exists('exec')) {
|
||||
$jobService->markFailed($job, 'Server configuration does not allow background execution (exec disabled).');
|
||||
$this->addFlash('danger', 'Global Reindex konnte nicht gestartet werden.');
|
||||
return $this->redirectToRoute('admin_jobs');
|
||||
}
|
||||
|
||||
exec($cmd);
|
||||
|
||||
// ---------------------------------------------------------
|
||||
// 3) Redirect auf Job-Detailseite (Loader)
|
||||
// ---------------------------------------------------------
|
||||
return $this->redirectToRoute('admin_job_show', [
|
||||
'id' => (string)$job->getId(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ declare(strict_types=1);
|
||||
|
||||
namespace App\Ingest;
|
||||
|
||||
use App\Entity\Document;
|
||||
use App\Entity\DocumentVersion;
|
||||
use App\Index\IndexMetaManager;
|
||||
use App\Knowledge\ChunkManager;
|
||||
@@ -40,7 +41,6 @@ final readonly class IngestFlow
|
||||
|
||||
try {
|
||||
|
||||
// Entfernt alte Chunks dieses Dokuments
|
||||
$this->chunkManager->compactByDocument($version->getDocument()->getId());
|
||||
|
||||
$existing = $this->chunkManager->countAllChunks();
|
||||
@@ -86,10 +86,39 @@ final readonly class IngestFlow
|
||||
|
||||
public function globalReindex(): void
|
||||
{
|
||||
$records = $this->knowledgeIngestService->buildAllActiveChunkRecords();
|
||||
$this->metaManager->validateAgainstCurrent();
|
||||
|
||||
// 1️⃣ Prüfen ob aktive Dokumente existieren
|
||||
$activeDocuments = $this->em
|
||||
->getRepository(Document::class)
|
||||
->createQueryBuilder('d')
|
||||
->where('d.status = :status')
|
||||
->setParameter('status', Document::STATUS_ACTIVE)
|
||||
->getQuery()
|
||||
->getResult();
|
||||
|
||||
if (empty($activeDocuments)) {
|
||||
throw new \RuntimeException(
|
||||
'Global Reindex abgebrochen: Es sind keine aktiven Dokumente vorhanden.'
|
||||
);
|
||||
}
|
||||
|
||||
// 2️⃣ ChunkRecords erzeugen
|
||||
$records = iterator_to_array(
|
||||
$this->knowledgeIngestService->buildAllActiveChunkRecords(),
|
||||
false
|
||||
);
|
||||
|
||||
if (empty($records)) {
|
||||
throw new \RuntimeException(
|
||||
'Global Reindex abgebrochen: Es wurden keine Chunks erzeugt. Bitte prüfen Sie die Dokumente.'
|
||||
);
|
||||
}
|
||||
|
||||
// 3️⃣ Rewrite NDJSON
|
||||
$this->chunkManager->rewriteAll($records);
|
||||
|
||||
// 4️⃣ Rebuild Vector Index
|
||||
$this->rebuildIndex(true);
|
||||
}
|
||||
|
||||
@@ -102,20 +131,17 @@ final readonly class IngestFlow
|
||||
$this->metaManager->validateAgainstCurrent();
|
||||
|
||||
$document = $this->em
|
||||
->getRepository(\App\Entity\Document::class)
|
||||
->getRepository(Document::class)
|
||||
->find($documentId);
|
||||
|
||||
if (!$document) {
|
||||
throw new \RuntimeException('Document not found.');
|
||||
}
|
||||
|
||||
// 1) NDJSON bereinigen
|
||||
$this->chunkManager->compactByDocument($documentId);
|
||||
|
||||
// 2) Vector neu bauen
|
||||
$this->rebuildIndex(false);
|
||||
|
||||
// 3) DB Delete (nach rebuild)
|
||||
$this->em->remove($document);
|
||||
$this->em->flush();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user