cleanup and harden document and ingest service

This commit is contained in:
team 1
2026-02-17 15:42:09 +01:00
parent 2981716c3d
commit 0b96ce6188
5 changed files with 236 additions and 240 deletions

View File

@@ -8,6 +8,7 @@ use App\Entity\IngestJob;
use App\Service\DocumentService; use App\Service\DocumentService;
use App\Service\FormatText; use App\Service\FormatText;
use App\Service\IngestJobService; use App\Service\IngestJobService;
use App\Service\LockService;
use Doctrine\DBAL\Connection; use Doctrine\DBAL\Connection;
use Doctrine\ORM\EntityManagerInterface; use Doctrine\ORM\EntityManagerInterface;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController; use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
@@ -60,7 +61,7 @@ class DocumentController extends AbstractController
$document = $em->getRepository(Document::class)->find($uuid); $document = $em->getRepository(Document::class)->find($uuid);
if (!$document) { if (!$document) {
throw new NotFoundHttpException(); $this->addFlash('danger', 'Das Dokument existiert nicht mehr.');
} }
return $this->render('admin/document/show.html.twig', [ return $this->render('admin/document/show.html.twig', [
@@ -422,8 +423,8 @@ class DocumentController extends AbstractController
string $id, string $id,
Request $request, Request $request,
EntityManagerInterface $em, EntityManagerInterface $em,
DocumentService $documentService,
IngestJobService $jobService, IngestJobService $jobService,
LockService $lockService,
): RedirectResponse ): RedirectResponse
{ {
if (!$this->isCsrfTokenValid('delete_document', $request->request->get('_token'))) { if (!$this->isCsrfTokenValid('delete_document', $request->request->get('_token'))) {
@@ -442,6 +443,17 @@ class DocumentController extends AbstractController
throw $this->createNotFoundException(); throw $this->createNotFoundException();
} }
// ---------------------------------------------------------
// 🔒 Delete nur erlauben wenn kein anderer Job läuft
// ---------------------------------------------------------
if (!$lockService->acquire()) {
$this->addFlash('danger', 'Ein Ingest-Job läuft bereits. Löschen derzeit nicht möglich.');
return $this->redirectToRoute('admin_documents');
}
// Nur Test-Lock echter Lock im Orchestrator
$lockService->release();
// --------------------------------------------------------- // ---------------------------------------------------------
// 1) Delete-Job anlegen (QUEUED) // 1) Delete-Job anlegen (QUEUED)
// --------------------------------------------------------- // ---------------------------------------------------------
@@ -455,12 +467,7 @@ class DocumentController extends AbstractController
); );
// --------------------------------------------------------- // ---------------------------------------------------------
// 2) Hard Delete in DB // 2) Hintergrundprozess starten
// ---------------------------------------------------------
$documentService->delete($document);
// ---------------------------------------------------------
// 3) Hintergrundprozess starten
// --------------------------------------------------------- // ---------------------------------------------------------
$projectDir = (string)$this->getParameter('kernel.project_dir'); $projectDir = (string)$this->getParameter('kernel.project_dir');
$console = $projectDir . '/bin/console'; $console = $projectDir . '/bin/console';
@@ -475,13 +482,13 @@ class DocumentController extends AbstractController
if (!function_exists('exec')) { if (!function_exists('exec')) {
$jobService->markFailed($job, 'Server configuration does not allow background execution (exec disabled).'); $jobService->markFailed($job, 'Server configuration does not allow background execution (exec disabled).');
$this->addFlash('danger', 'Dokument gelöscht, aber Index-Bereinigung konnte nicht asynchron gestartet werden.'); $this->addFlash('danger', 'Löschen konnte nicht gestartet werden (exec deaktiviert).');
return $this->redirectToRoute('admin_documents'); return $this->redirectToRoute('admin_documents');
} }
exec($cmd); exec($cmd);
$this->addFlash('success', 'Dokument gelöscht. Index-Bereinigung läuft im Hintergrund.'); $this->addFlash('success', 'Löschvorgang gestartet. Dokument wird nach Index-Rebuild entfernt.');
return $this->redirectToRoute('admin_job_show', [ return $this->redirectToRoute('admin_job_show', [
'id' => (string)$job->getId(), 'id' => (string)$job->getId(),

View File

@@ -9,6 +9,7 @@ use App\Index\IndexMetaManager;
use App\Knowledge\ChunkManager; use App\Knowledge\ChunkManager;
use App\Knowledge\Ingest\KnowledgeIngestService; use App\Knowledge\Ingest\KnowledgeIngestService;
use App\Vector\VectorIndexBuilder; use App\Vector\VectorIndexBuilder;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Symfony\Component\Uid\Uuid; use Symfony\Component\Uid\Uuid;
@@ -23,91 +24,118 @@ final readonly class IngestFlow
private VectorIndexBuilder $vectorBuilder, private VectorIndexBuilder $vectorBuilder,
private IndexMetaManager $metaManager, private IndexMetaManager $metaManager,
private LoggerInterface $logger, private LoggerInterface $logger,
) { private EntityManagerInterface $em,
} ) {}
// =========================================================
// DOCUMENT INGEST
// =========================================================
public function ingestDocumentVersion(DocumentVersion $version): void public function ingestDocumentVersion(DocumentVersion $version): void
{ {
$this->metaManager->validateAgainstCurrent(); $this->metaManager->validateAgainstCurrent();
$version->setIngestStatus(DocumentVersion::INGEST_RUNNING);
$this->em->flush();
try {
// Entfernt alte Chunks dieses Dokuments
$this->chunkManager->compactByDocument($version->getDocument()->getId()); $this->chunkManager->compactByDocument($version->getDocument()->getId());
$existing = $this->chunkManager->countAllChunks(); $existing = $this->chunkManager->countAllChunks();
$recordsIterable = $this->knowledgeIngestService->buildChunkRecords($version); $records = iterator_to_array(
$records = is_array($recordsIterable) $this->knowledgeIngestService->buildChunkRecords($version),
? $recordsIterable false
: iterator_to_array($recordsIterable, false); );
$incoming = count($records); $incoming = count($records);
$total = $existing + $incoming; $total = $existing + $incoming;
if ($total >= self::CHUNK_LIMIT_WARN) { if ($total >= self::CHUNK_LIMIT_WARN) {
$this->logger->warning('RAG chunk count approaching limit.', [ $this->logger->warning('Chunk count approaching limit.', [
'existing' => $existing, 'existing' => $existing,
'incoming' => $incoming, 'incoming' => $incoming,
'total' => $total, 'total' => $total,
'warn_at' => self::CHUNK_LIMIT_WARN,
'hard_cap' => self::CHUNK_LIMIT_HARD,
'document_id' => $version->getDocument()->getId()->toRfc4122(),
'version_id' => $version->getId()->toRfc4122(),
]); ]);
} }
if ($total > self::CHUNK_LIMIT_HARD) { if ($total > self::CHUNK_LIMIT_HARD) {
throw new \RuntimeException(sprintf( throw new \RuntimeException('Chunk limit exceeded.');
'Chunk limit reached: %d existing + %d incoming = %d (hard cap: %d). Reduce knowledge base or move to a scaled vector setup (IVF/HNSW/GPU/sharding).',
$existing,
$incoming,
$total,
self::CHUNK_LIMIT_HARD
));
} }
$this->chunkManager->appendChunks($records); $this->chunkManager->appendChunks($records);
$this->vectorBuilder->rebuildFromNdjson();
$this->updateChuckCount(); $this->rebuildIndex(false);
$version->setIngestStatus(DocumentVersion::INGEST_INDEXED);
$this->em->flush();
} catch (\Throwable $e) {
$version->setIngestStatus(DocumentVersion::INGEST_FAILED);
$this->em->flush();
throw $e;
}
} }
/** // =========================================================
* HARD DELETE FLOW // GLOBAL REINDEX
* // =========================================================
* Removes all chunks belonging to a document from index.ndjson
* and rebuilds the vector index deterministically. public function globalReindex(): void
*/ {
$records = $this->knowledgeIngestService->buildAllActiveChunkRecords();
$this->chunkManager->rewriteAll($records);
$this->rebuildIndex(true);
}
// =========================================================
// DELETE FLOW
// =========================================================
public function deleteDocument(Uuid $documentId): void public function deleteDocument(Uuid $documentId): void
{ {
$this->metaManager->validateAgainstCurrent(); $this->metaManager->validateAgainstCurrent();
$this->logger->info('Deleting document from RAG index.', [ $document = $this->em
'document_id' => $documentId->toRfc4122(), ->getRepository(\App\Entity\Document::class)
]); ->find($documentId);
// Remove chunks for this document if (!$document) {
throw new \RuntimeException('Document not found.');
}
// 1) NDJSON bereinigen
$this->chunkManager->compactByDocument($documentId); $this->chunkManager->compactByDocument($documentId);
// Rebuild vector index from updated NDJSON // 2) Vector neu bauen
$this->vectorBuilder->rebuildFromNdjson(); $this->rebuildIndex(false);
// Update runtime stats // 3) DB Delete (nach rebuild)
$this->updateChuckCount(); $this->em->remove($document);
$this->em->flush();
} }
public function globalReindex(): void // =========================================================
// CENTRAL REBUILD
// =========================================================
private function rebuildIndex(bool $isGlobal): void
{ {
$allRecords = $this->knowledgeIngestService->buildAllActiveChunkRecords();
$this->chunkManager->rewriteAll($allRecords);
$this->vectorBuilder->rebuildFromNdjson(); $this->vectorBuilder->rebuildFromNdjson();
if ($isGlobal) {
$this->metaManager->writeMetaForGlobalReindex(); $this->metaManager->writeMetaForGlobalReindex();
$this->updateChuckCount();
} }
private function updateChuckCount(): void $this->updateChunkCount();
}
private function updateChunkCount(): void
{ {
$chunkCount = $this->chunkManager->countAllChunks(); $chunkCount = $this->chunkManager->countAllChunks();
$this->metaManager->updateRuntimeStats($chunkCount); $this->metaManager->updateRuntimeStats($chunkCount);

View File

@@ -32,9 +32,8 @@ final class IngestOrchestrator
$job = null; $job = null;
try { try {
$status = $version->getIngestStatus();
if ($status === DocumentVersion::INGEST_INDEXED) { if ($version->getIngestStatus() === DocumentVersion::INGEST_INDEXED) {
throw new \RuntimeException('DocumentVersion already indexed.'); throw new \RuntimeException('DocumentVersion already indexed.');
} }
@@ -45,18 +44,13 @@ final class IngestOrchestrator
$version->getId(), $version->getId(),
); );
$version->setIngestStatus(DocumentVersion::INGEST_RUNNING);
$this->em->flush();
if ($dryRun) { if ($dryRun) {
usleep(200000); usleep(200000);
} else { } else {
$this->ingestFlow->ingestDocumentVersion($version); $this->ingestFlow->ingestDocumentVersion($version);
} }
$version->setIngestStatus(DocumentVersion::INGEST_INDEXED);
$this->jobService->markCompleted($job); $this->jobService->markCompleted($job);
$this->em->flush();
return $job; return $job;
@@ -66,9 +60,6 @@ final class IngestOrchestrator
$this->jobService->markFailed($job, $e->getMessage()); $this->jobService->markFailed($job, $e->getMessage());
} }
$version->setIngestStatus(DocumentVersion::INGEST_FAILED);
$this->em->flush();
throw $e; throw $e;
} finally { } finally {
@@ -84,6 +75,7 @@ final class IngestOrchestrator
try { try {
// Idempotenz
if (in_array($job->getStatus(), [ if (in_array($job->getStatus(), [
IngestJob::STATUS_COMPLETED, IngestJob::STATUS_COMPLETED,
IngestJob::STATUS_FAILED, IngestJob::STATUS_FAILED,
@@ -111,7 +103,7 @@ final class IngestOrchestrator
} }
// --------------------------- // ---------------------------
// DOCUMENT DELETE (NEU) // DOCUMENT DELETE
// --------------------------- // ---------------------------
if ($job->getType() === IngestJob::TYPE_DOCUMENT_DELETE) { if ($job->getType() === IngestJob::TYPE_DOCUMENT_DELETE) {
@@ -158,29 +150,9 @@ final class IngestOrchestrator
throw new \RuntimeException('DocumentVersion not found.'); throw new \RuntimeException('DocumentVersion not found.');
} }
$status = $version->getIngestStatus(); if (!$isActivateJob && $version->getIngestStatus() === DocumentVersion::INGEST_INDEXED) {
if (!$isActivateJob) {
if ($status === DocumentVersion::INGEST_INDEXED) {
throw new \RuntimeException('DocumentVersion already indexed.'); throw new \RuntimeException('DocumentVersion already indexed.');
} }
}
if (!$isActivateJob) {
if (!in_array($status, [
DocumentVersion::INGEST_PENDING,
DocumentVersion::INGEST_FAILED,
DocumentVersion::INGEST_RUNNING,
], true)) {
throw new \RuntimeException(sprintf(
'Ingest not allowed for status "%s".',
$status
));
}
}
$version->setIngestStatus(DocumentVersion::INGEST_RUNNING);
$this->em->flush();
if ($dryRun) { if ($dryRun) {
usleep(200000); usleep(200000);
@@ -188,26 +160,11 @@ final class IngestOrchestrator
$this->ingestFlow->ingestDocumentVersion($version); $this->ingestFlow->ingestDocumentVersion($version);
} }
$version->setIngestStatus(DocumentVersion::INGEST_INDEXED);
$this->jobService->markCompleted($job); $this->jobService->markCompleted($job);
$this->em->flush();
} catch (\Throwable $e) { } catch (\Throwable $e) {
$this->jobService->markFailed($job, $e->getMessage()); $this->jobService->markFailed($job, $e->getMessage());
$versionId = $job->getDocumentVersionId();
if ($versionId instanceof Uuid) {
$version = $this->em
->getRepository(DocumentVersion::class)
->find($versionId);
if ($version) {
$version->setIngestStatus(DocumentVersion::INGEST_FAILED);
$this->em->flush();
}
}
throw $e; throw $e;
} finally { } finally {
@@ -224,6 +181,7 @@ final class IngestOrchestrator
$job = null; $job = null;
try { try {
$job = $this->jobService->startJob( $job = $this->jobService->startJob(
IngestJob::TYPE_GLOBAL_REINDEX, IngestJob::TYPE_GLOBAL_REINDEX,
$user $user

View File

@@ -8,6 +8,7 @@
← Zurück ← Zurück
</a> </a>
{% if document %}
<h1 class="h4 mb-3">{{ document.title }}</h1> <h1 class="h4 mb-3">{{ document.title }}</h1>
<div class="card bg-black text-info border-secondary mb-4"> <div class="card bg-black text-info border-secondary mb-4">
@@ -113,7 +114,8 @@
<form method="post" <form method="post"
action="{{ path('admin_document_version_ingest', {versionId: version.id}) }}" action="{{ path('admin_document_version_ingest', {versionId: version.id}) }}"
style="display:inline;"> style="display:inline;">
<input type="hidden" name="_token" value="{{ csrf_token('ingest_version') }}"> <input type="hidden" name="_token"
value="{{ csrf_token('ingest_version') }}">
<button class="btn btn-sm btn-outline-info"> <button class="btn btn-sm btn-outline-info">
Ingest starten Ingest starten
</button> </button>
@@ -127,7 +129,8 @@
<form method="post" <form method="post"
action="{{ path('admin_document_version_activate', {versionId: version.id}) }}" action="{{ path('admin_document_version_activate', {versionId: version.id}) }}"
style="display:inline;"> style="display:inline;">
<input type="hidden" name="_token" value="{{ csrf_token('activate_version') }}"> <input type="hidden" name="_token"
value="{{ csrf_token('activate_version') }}">
<button class="btn btn-sm btn-outline-light"> <button class="btn btn-sm btn-outline-light">
Aktivieren Aktivieren
</button> </button>
@@ -142,5 +145,13 @@
</div> </div>
</div> </div>
{% endif %} {% endif %}
{% else %}
<h1 class="h4 mb-3">Ein Fehler trat auf</h1>
<h2 class="h5 mb-3">Fehler:</h2>
{% for message in app.flashes('danger') %}
<div class="alert alert-danger">
{{ message }}
</div>
{% endfor %}
{% endif %}
{% endblock %} {% endblock %}

View File

@@ -84,15 +84,7 @@
</div> </div>
</div> </div>
<div id="job-error" class="alert alert-danger mt-3" style="display:none;"></div> <div id="job-error" class="alert alert-danger mt-3" style="display:none;">
{% if job.errorMessage %}
<div class="alert alert-danger mt-3">
<strong>Fehler:</strong><br>
{{ job.errorMessage }}
</div>
{% endif %}
{% if job.logPath %} {% if job.logPath %}
<div class="mt-3"> <div class="mt-3">
<strong>Log Datei:</strong><br> <strong>Log Datei:</strong><br>