diff --git a/src/Controller/Admin/DocumentController.php b/src/Controller/Admin/DocumentController.php index 9184eaa..fde5f28 100644 --- a/src/Controller/Admin/DocumentController.php +++ b/src/Controller/Admin/DocumentController.php @@ -8,6 +8,7 @@ use App\Entity\IngestJob; use App\Service\DocumentService; use App\Service\FormatText; use App\Service\IngestJobService; +use App\Service\LockService; use Doctrine\DBAL\Connection; use Doctrine\ORM\EntityManagerInterface; use Symfony\Bundle\FrameworkBundle\Controller\AbstractController; @@ -60,7 +61,7 @@ class DocumentController extends AbstractController $document = $em->getRepository(Document::class)->find($uuid); if (!$document) { - throw new NotFoundHttpException(); + $this->addFlash('danger', 'Das Dokument existiert nicht mehr.'); } return $this->render('admin/document/show.html.twig', [ @@ -422,8 +423,8 @@ class DocumentController extends AbstractController string $id, Request $request, EntityManagerInterface $em, - DocumentService $documentService, IngestJobService $jobService, + LockService $lockService, ): RedirectResponse { if (!$this->isCsrfTokenValid('delete_document', $request->request->get('_token'))) { @@ -442,6 +443,17 @@ class DocumentController extends AbstractController throw $this->createNotFoundException(); } + // --------------------------------------------------------- + // 🔒 Delete nur erlauben wenn kein anderer Job lĂ€uft + // --------------------------------------------------------- + if (!$lockService->acquire()) { + $this->addFlash('danger', 'Ein Ingest-Job lĂ€uft bereits. Löschen derzeit nicht möglich.'); + return $this->redirectToRoute('admin_documents'); + } + + // Nur Test-Lock – echter Lock im Orchestrator + $lockService->release(); + // --------------------------------------------------------- // 1) Delete-Job anlegen (QUEUED) // --------------------------------------------------------- @@ -455,12 +467,7 @@ class DocumentController extends AbstractController ); // --------------------------------------------------------- - // 2) Hard Delete in DB - // --------------------------------------------------------- - $documentService->delete($document); - - // --------------------------------------------------------- - // 3) Hintergrundprozess starten + // 2) Hintergrundprozess starten // --------------------------------------------------------- $projectDir = (string)$this->getParameter('kernel.project_dir'); $console = $projectDir . '/bin/console'; @@ -475,13 +482,13 @@ class DocumentController extends AbstractController if (!function_exists('exec')) { $jobService->markFailed($job, 'Server configuration does not allow background execution (exec disabled).'); - $this->addFlash('danger', 'Dokument gelöscht, aber Index-Bereinigung konnte nicht asynchron gestartet werden.'); + $this->addFlash('danger', 'Löschen konnte nicht gestartet werden (exec deaktiviert).'); return $this->redirectToRoute('admin_documents'); } exec($cmd); - $this->addFlash('success', 'Dokument gelöscht. Index-Bereinigung lĂ€uft im Hintergrund.'); + $this->addFlash('success', 'Löschvorgang gestartet. Dokument wird nach Index-Rebuild entfernt.'); return $this->redirectToRoute('admin_job_show', [ 'id' => (string)$job->getId(), diff --git a/src/Ingest/IngestFlow.php b/src/Ingest/IngestFlow.php index 6134003..d8fc3fe 100644 --- a/src/Ingest/IngestFlow.php +++ b/src/Ingest/IngestFlow.php @@ -9,6 +9,7 @@ use App\Index\IndexMetaManager; use App\Knowledge\ChunkManager; use App\Knowledge\Ingest\KnowledgeIngestService; use App\Vector\VectorIndexBuilder; +use Doctrine\ORM\EntityManagerInterface; use Psr\Log\LoggerInterface; use Symfony\Component\Uid\Uuid; @@ -23,91 +24,118 @@ final readonly class IngestFlow private VectorIndexBuilder $vectorBuilder, private IndexMetaManager $metaManager, private LoggerInterface $logger, - ) { - } + private EntityManagerInterface $em, + ) {} + + // ========================================================= + // DOCUMENT INGEST + // ========================================================= public function ingestDocumentVersion(DocumentVersion $version): void { $this->metaManager->validateAgainstCurrent(); - $this->chunkManager->compactByDocument($version->getDocument()->getId()); + $version->setIngestStatus(DocumentVersion::INGEST_RUNNING); + $this->em->flush(); - $existing = $this->chunkManager->countAllChunks(); + try { - $recordsIterable = $this->knowledgeIngestService->buildChunkRecords($version); - $records = is_array($recordsIterable) - ? $recordsIterable - : iterator_to_array($recordsIterable, false); + // Entfernt alte Chunks dieses Dokuments + $this->chunkManager->compactByDocument($version->getDocument()->getId()); - $incoming = count($records); - $total = $existing + $incoming; + $existing = $this->chunkManager->countAllChunks(); - if ($total >= self::CHUNK_LIMIT_WARN) { - $this->logger->warning('RAG chunk count approaching limit.', [ - 'existing' => $existing, - 'incoming' => $incoming, - 'total' => $total, - 'warn_at' => self::CHUNK_LIMIT_WARN, - 'hard_cap' => self::CHUNK_LIMIT_HARD, - 'document_id' => $version->getDocument()->getId()->toRfc4122(), - 'version_id' => $version->getId()->toRfc4122(), - ]); + $records = iterator_to_array( + $this->knowledgeIngestService->buildChunkRecords($version), + false + ); + + $incoming = count($records); + $total = $existing + $incoming; + + if ($total >= self::CHUNK_LIMIT_WARN) { + $this->logger->warning('Chunk count approaching limit.', [ + 'existing' => $existing, + 'incoming' => $incoming, + 'total' => $total, + ]); + } + + if ($total > self::CHUNK_LIMIT_HARD) { + throw new \RuntimeException('Chunk limit exceeded.'); + } + + $this->chunkManager->appendChunks($records); + + $this->rebuildIndex(false); + + $version->setIngestStatus(DocumentVersion::INGEST_INDEXED); + $this->em->flush(); + + } catch (\Throwable $e) { + + $version->setIngestStatus(DocumentVersion::INGEST_FAILED); + $this->em->flush(); + throw $e; } - - if ($total > self::CHUNK_LIMIT_HARD) { - throw new \RuntimeException(sprintf( - 'Chunk limit reached: %d existing + %d incoming = %d (hard cap: %d). Reduce knowledge base or move to a scaled vector setup (IVF/HNSW/GPU/sharding).', - $existing, - $incoming, - $total, - self::CHUNK_LIMIT_HARD - )); - } - - $this->chunkManager->appendChunks($records); - $this->vectorBuilder->rebuildFromNdjson(); - - $this->updateChuckCount(); } - /** - * HARD DELETE FLOW - * - * Removes all chunks belonging to a document from index.ndjson - * and rebuilds the vector index deterministically. - */ + // ========================================================= + // GLOBAL REINDEX + // ========================================================= + + public function globalReindex(): void + { + $records = $this->knowledgeIngestService->buildAllActiveChunkRecords(); + + $this->chunkManager->rewriteAll($records); + + $this->rebuildIndex(true); + } + + // ========================================================= + // DELETE FLOW + // ========================================================= + public function deleteDocument(Uuid $documentId): void { $this->metaManager->validateAgainstCurrent(); - $this->logger->info('Deleting document from RAG index.', [ - 'document_id' => $documentId->toRfc4122(), - ]); + $document = $this->em + ->getRepository(\App\Entity\Document::class) + ->find($documentId); - // Remove chunks for this document + if (!$document) { + throw new \RuntimeException('Document not found.'); + } + + // 1) NDJSON bereinigen $this->chunkManager->compactByDocument($documentId); - // Rebuild vector index from updated NDJSON - $this->vectorBuilder->rebuildFromNdjson(); + // 2) Vector neu bauen + $this->rebuildIndex(false); - // Update runtime stats - $this->updateChuckCount(); + // 3) DB Delete (nach rebuild) + $this->em->remove($document); + $this->em->flush(); } - public function globalReindex(): void + // ========================================================= + // CENTRAL REBUILD + // ========================================================= + + private function rebuildIndex(bool $isGlobal): void { - $allRecords = $this->knowledgeIngestService->buildAllActiveChunkRecords(); - - $this->chunkManager->rewriteAll($allRecords); - $this->vectorBuilder->rebuildFromNdjson(); - $this->metaManager->writeMetaForGlobalReindex(); + if ($isGlobal) { + $this->metaManager->writeMetaForGlobalReindex(); + } - $this->updateChuckCount(); + $this->updateChunkCount(); } - private function updateChuckCount(): void + private function updateChunkCount(): void { $chunkCount = $this->chunkManager->countAllChunks(); $this->metaManager->updateRuntimeStats($chunkCount); diff --git a/src/Service/IngestOrchestrator.php b/src/Service/IngestOrchestrator.php index b50734e..aba9a3b 100644 --- a/src/Service/IngestOrchestrator.php +++ b/src/Service/IngestOrchestrator.php @@ -32,9 +32,8 @@ final class IngestOrchestrator $job = null; try { - $status = $version->getIngestStatus(); - if ($status === DocumentVersion::INGEST_INDEXED) { + if ($version->getIngestStatus() === DocumentVersion::INGEST_INDEXED) { throw new \RuntimeException('DocumentVersion already indexed.'); } @@ -45,18 +44,13 @@ final class IngestOrchestrator $version->getId(), ); - $version->setIngestStatus(DocumentVersion::INGEST_RUNNING); - $this->em->flush(); - if ($dryRun) { usleep(200000); } else { $this->ingestFlow->ingestDocumentVersion($version); } - $version->setIngestStatus(DocumentVersion::INGEST_INDEXED); $this->jobService->markCompleted($job); - $this->em->flush(); return $job; @@ -66,9 +60,6 @@ final class IngestOrchestrator $this->jobService->markFailed($job, $e->getMessage()); } - $version->setIngestStatus(DocumentVersion::INGEST_FAILED); - $this->em->flush(); - throw $e; } finally { @@ -84,6 +75,7 @@ final class IngestOrchestrator try { + // Idempotenz if (in_array($job->getStatus(), [ IngestJob::STATUS_COMPLETED, IngestJob::STATUS_FAILED, @@ -111,7 +103,7 @@ final class IngestOrchestrator } // --------------------------- - // DOCUMENT DELETE (NEU) + // DOCUMENT DELETE // --------------------------- if ($job->getType() === IngestJob::TYPE_DOCUMENT_DELETE) { @@ -158,56 +150,21 @@ final class IngestOrchestrator throw new \RuntimeException('DocumentVersion not found.'); } - $status = $version->getIngestStatus(); - - if (!$isActivateJob) { - if ($status === DocumentVersion::INGEST_INDEXED) { - throw new \RuntimeException('DocumentVersion already indexed.'); - } + if (!$isActivateJob && $version->getIngestStatus() === DocumentVersion::INGEST_INDEXED) { + throw new \RuntimeException('DocumentVersion already indexed.'); } - if (!$isActivateJob) { - if (!in_array($status, [ - DocumentVersion::INGEST_PENDING, - DocumentVersion::INGEST_FAILED, - DocumentVersion::INGEST_RUNNING, - ], true)) { - throw new \RuntimeException(sprintf( - 'Ingest not allowed for status "%s".', - $status - )); - } - } - - $version->setIngestStatus(DocumentVersion::INGEST_RUNNING); - $this->em->flush(); - if ($dryRun) { usleep(200000); } else { $this->ingestFlow->ingestDocumentVersion($version); } - $version->setIngestStatus(DocumentVersion::INGEST_INDEXED); $this->jobService->markCompleted($job); - $this->em->flush(); } catch (\Throwable $e) { $this->jobService->markFailed($job, $e->getMessage()); - - $versionId = $job->getDocumentVersionId(); - if ($versionId instanceof Uuid) { - $version = $this->em - ->getRepository(DocumentVersion::class) - ->find($versionId); - - if ($version) { - $version->setIngestStatus(DocumentVersion::INGEST_FAILED); - $this->em->flush(); - } - } - throw $e; } finally { @@ -224,6 +181,7 @@ final class IngestOrchestrator $job = null; try { + $job = $this->jobService->startJob( IngestJob::TYPE_GLOBAL_REINDEX, $user diff --git a/templates/admin/document/show.html.twig b/templates/admin/document/show.html.twig index d6fffa9..549518c 100644 --- a/templates/admin/document/show.html.twig +++ b/templates/admin/document/show.html.twig @@ -8,139 +8,150 @@ ← ZurĂŒck -

{{ document.title }}

+ {% if document %} +

{{ document.title }}

-
-
+
+
+ +
+ Status: + {% if document.status == 'ACTIVE' %} + Aktiv + {% else %} + Archiviert + {% endif %} +
+ +
+ Erstellt von: + {{ document.createdBy.email }} +
+ +
+ Erstellt am: + {{ document.createdAt|date('d.m.Y H:i') }} +
+ +
+ Aktive Version: + {% if document.currentVersion %} + v{{ document.currentVersion.versionNumber }} + {% else %} + - + {% endif %} +
-
- Status: - {% if document.status == 'ACTIVE' %} - Aktiv - {% else %} - Archiviert - {% endif %}
- -
- Erstellt von: - {{ document.createdBy.email }} -
- -
- Erstellt am: - {{ document.createdAt|date('d.m.Y H:i') }} -
- -
- Aktive Version: - {% if document.currentVersion %} - v{{ document.currentVersion.versionNumber }} - {% else %} - - - {% endif %} -
-
-
-

Versionen

- - + Neue Version - - {% if document.versions is empty %} -
- Keine Versionen vorhanden. -
- {% else %} -
-
- - - - - - - - - - - - - - {% for version in document.versions %} +

Versionen

+ + + Neue Version + + {% if document.versions is empty %} +
+ Keine Versionen vorhanden. +
+ {% else %} +
+
+
VersionAktivIngestChecksumErstellt vonDatumAktion
+ - + + + + + + + + + + + {% for version in document.versions %} + + - + - + - + - + - - + - {% else %} - - - - - {% endif %} - - - - {% endfor %} - -
v{{ version.versionNumber }}VersionAktivIngestChecksumErstellt vonDatumAktion
v{{ version.versionNumber }} - {% if version.isActive %} - Ja - {% else %} - Nein - {% endif %} - + {% if version.isActive %} + Ja + {% else %} + Nein + {% endif %} + - {% if version.ingestStatus == 'INDEXED' %} - INDEXED - {% elseif version.ingestStatus == 'RUNNING' %} - RUNNING - {% elseif version.ingestStatus == 'FAILED' %} - FAILED - {% else %} - PENDING - {% endif %} - + {% if version.ingestStatus == 'INDEXED' %} + INDEXED + {% elseif version.ingestStatus == 'RUNNING' %} + RUNNING + {% elseif version.ingestStatus == 'FAILED' %} + FAILED + {% else %} + PENDING + {% endif %} + - {{ version.checksum[:10] }}... - + {{ version.checksum[:10] }}... + - {{ version.createdBy.email }} - + {{ version.createdBy.email }} + - {{ version.createdAt|date('d.m.Y H:i') }} - - {% if version.isActive %} + + {{ version.createdAt|date('d.m.Y H:i') }} + + {% if version.isActive %} - {# Optional: manuelles Re-Ingest nur bei PENDING/FAILED #} - {% if version.ingestStatus in ['PENDING', 'FAILED'] %} + {# Optional: manuelles Re-Ingest nur bei PENDING/FAILED #} + {% if version.ingestStatus in ['PENDING', 'FAILED'] %} -
- - -
+
+ + +
+ + {% else %} + Ingested + {% endif %} {% else %} - Ingested +
+ + +
{% endif %} +
+ + {% endfor %} + + +
-
+ {% endif %} + {% else %} +

Ein Fehler trat auf

+

Fehler:

+ {% for message in app.flashes('danger') %} +
+ {{ message }} +
+ {% endfor %} {% endif %} - {% endblock %} \ No newline at end of file diff --git a/templates/admin/job/show.html.twig b/templates/admin/job/show.html.twig index f185bcc..c057681 100644 --- a/templates/admin/job/show.html.twig +++ b/templates/admin/job/show.html.twig @@ -84,15 +84,7 @@
- - - {% if job.errorMessage %} -
- Fehler:
- {{ job.errorMessage }} -
- {% endif %} - +