add document dele and rebuild faiss index

This commit is contained in:
team 1
2026-02-17 14:49:35 +01:00
parent 1bb753e071
commit fcd9488a18
6 changed files with 189 additions and 50 deletions

View File

@@ -411,4 +411,80 @@ class DocumentController extends AbstractController
$this->addFlash('success', 'Das System wurde erfolgreich zurückgesetzt.');
return $this->redirectToRoute('admin_dashboard');
}
#[Route(
'/{id}/delete',
name: 'admin_document_delete',
requirements: ['id' => '[0-9a-fA-F\-]{36}'],
methods: ['POST']
)]
public function deleteDocument(
string $id,
Request $request,
EntityManagerInterface $em,
DocumentService $documentService,
IngestJobService $jobService,
): RedirectResponse
{
if (!$this->isCsrfTokenValid('delete_document', $request->request->get('_token'))) {
throw $this->createAccessDeniedException();
}
try {
$uuid = Uuid::fromString($id);
} catch (\Exception $e) {
throw $this->createNotFoundException();
}
$document = $em->getRepository(Document::class)->find($uuid);
if (!$document) {
throw $this->createNotFoundException();
}
// ---------------------------------------------------------
// 1) Delete-Job anlegen (QUEUED)
// ---------------------------------------------------------
$job = $jobService->startJob(
IngestJob::TYPE_DOCUMENT_DELETE,
$this->getUser(),
$document->getId(),
null,
null,
IngestJob::STATUS_QUEUED
);
// ---------------------------------------------------------
// 2) Hard Delete in DB
// ---------------------------------------------------------
$documentService->delete($document);
// ---------------------------------------------------------
// 3) Hintergrundprozess starten
// ---------------------------------------------------------
$projectDir = (string)$this->getParameter('kernel.project_dir');
$console = $projectDir . '/bin/console';
$cmd = sprintf(
'%s %s %s %s > /dev/null 2>&1 &',
escapeshellarg($console),
escapeshellarg('mto:agent:ingest:run'),
escapeshellarg((string)$job->getId()),
escapeshellarg('--no-interaction'),
);
if (!function_exists('exec')) {
$jobService->markFailed($job, 'Server configuration does not allow background execution (exec disabled).');
$this->addFlash('danger', 'Dokument gelöscht, aber Index-Bereinigung konnte nicht asynchron gestartet werden.');
return $this->redirectToRoute('admin_documents');
}
exec($cmd);
$this->addFlash('success', 'Dokument gelöscht. Index-Bereinigung läuft im Hintergrund.');
return $this->redirectToRoute('admin_job_show', [
'id' => (string)$job->getId(),
]);
}
}

View File

@@ -10,6 +10,7 @@ class IngestJob
{
public const TYPE_DOCUMENT = 'DOCUMENT';
public const TYPE_GLOBAL_REINDEX = 'GLOBAL_REINDEX';
/**
* Special job type used when a DocumentVersion is activated.
* Semantics: always re-ingest the selected version (even if it was previously INDEXED),
@@ -17,6 +18,12 @@ class IngestJob
*/
public const TYPE_DOCUMENT_VERSION_ACTIVATE = 'DOCUMENT_VERSION_ACTIVATE';
/**
* Job type used when a Document is hard-deleted from DB
* and must be removed from index.ndjson + vector index.
*/
public const TYPE_DOCUMENT_DELETE = 'DOCUMENT_DELETE';
public const STATUS_QUEUED = 'QUEUED';
public const STATUS_RUNNING = 'RUNNING';
public const STATUS_COMPLETED = 'COMPLETED';

View File

@@ -10,18 +10,11 @@ use App\Knowledge\ChunkManager;
use App\Knowledge\Ingest\KnowledgeIngestService;
use App\Vector\VectorIndexBuilder;
use Psr\Log\LoggerInterface;
use Symfony\Component\Uid\Uuid;
final readonly class IngestFlow
{
/**
* Realistische Betriebsgrenze für dieses Systemdesign (CPU Embedding + FlatIP + Full Rebuild).
* Wird beim lokalen Ingest (Dokumentversion) enforced.
*/
public const CHUNK_LIMIT_HARD = 120000;
/**
* Ab hier nur Warnung (keine Blockade) damit man frühzeitig reagieren kann.
*/
private const CHUNK_LIMIT_WARN = 100000;
public function __construct(
@@ -37,16 +30,10 @@ final readonly class IngestFlow
{
$this->metaManager->validateAgainstCurrent();
// Entfernt alte Chunks dieses Dokuments -> danach ist "existing" der Basis-Index ohne dieses Dokument.
$this->chunkManager->compactByDocument($version->getDocument()->getId());
// ------------------------------
// Chunk-Limit Guardrail (Hard Cap)
// ------------------------------
$existing = $this->chunkManager->countAllChunks();
// buildChunkRecords() ist generatorbasiert; für einen sauberen Hard-Cap materialisieren wir lokal,
// damit wir vor dem Append abbrechen können (keine Partial Writes).
$recordsIterable = $this->knowledgeIngestService->buildChunkRecords($version);
$records = is_array($recordsIterable)
? $recordsIterable
@@ -83,6 +70,30 @@ final readonly class IngestFlow
$this->updateChuckCount();
}
/**
* HARD DELETE FLOW
*
* Removes all chunks belonging to a document from index.ndjson
* and rebuilds the vector index deterministically.
*/
public function deleteDocument(Uuid $documentId): void
{
$this->metaManager->validateAgainstCurrent();
$this->logger->info('Deleting document from RAG index.', [
'document_id' => $documentId->toRfc4122(),
]);
// Remove chunks for this document
$this->chunkManager->compactByDocument($documentId);
// Rebuild vector index from updated NDJSON
$this->vectorBuilder->rebuildFromNdjson();
// Update runtime stats
$this->updateChuckCount();
}
public function globalReindex(): void
{
$allRecords = $this->knowledgeIngestService->buildAllActiveChunkRecords();
@@ -101,4 +112,4 @@ final readonly class IngestFlow
$chunkCount = $this->chunkManager->countAllChunks();
$this->metaManager->updateRuntimeStats($chunkCount);
}
}
}

View File

@@ -70,19 +70,12 @@ class DocumentService
}
/**
* Aktiviert eine Version (setzt andere inaktiv) und aktualisiert den Index.
*
* Beim Aktivieren wird deterministisch sichergestellt, dass nur diese
* Version im Index vorhanden ist:
* - alle Chunks des Dokuments werden aus index.ndjson entfernt (streaming compaction)
* - die aktive Version wird neu ge-chunkt und appended
* - FAISS wird vollständig aus index.ndjson neu gebaut
* Aktiviert eine Version
*/
public function activateVersion(DocumentVersion $version): void
{
$document = $version->getDocument();
// Aktiv-Status in DB konsistent setzen (genau 1 aktive Version)
foreach ($document->getVersions() as $existingVersion) {
$existingVersion->setActive(false);
}
@@ -90,8 +83,6 @@ class DocumentService
$version->setActive(true);
$document->setCurrentVersion($version);
// Wichtig: Aktivierung soll einen Job auslösen. Damit der Job NICHT an "INDEXED" scheitert,
// setzen wir hier bewusst auf PENDING.
$version->setIngestStatus(DocumentVersion::INGEST_PENDING);
$this->em->flush();
@@ -106,6 +97,31 @@ class DocumentService
$this->em->flush();
}
/**
* HARD DELETE
*
* Entfernt das Dokument vollständig aus der Datenbank.
* Chunks und Vector-Index werden NICHT hier behandelt,
* sondern im Ingest-Job (TYPE_DOCUMENT_DELETE).
*/
public function delete(Document $document): void
{
// 1. FK-Zyklus auflösen
$document->setCurrentVersion(null);
$this->em->flush(); // <-- WICHTIG: zuerst FK nullen!
// 2. Versionen entfernen (falls kein cascade remove existiert)
foreach ($document->getVersions() as $version) {
$this->em->remove($version);
}
$this->em->flush(); // <-- Versionen löschen
// 3. Dokument löschen
$this->em->remove($document);
$this->em->flush();
}
/**
* Berechnet SHA256 Checksum
*/

View File

@@ -19,9 +19,6 @@ final class IngestOrchestrator
) {
}
/**
* SYNCHRONE Variante (falls noch genutzt)
*/
public function runForVersion(
DocumentVersion $version,
User $user,
@@ -79,9 +76,6 @@ final class IngestOrchestrator
}
}
/**
* ASYNCHRONE Variante (Detached CLI)
*/
public function runExistingJob(IngestJob $job, bool $dryRun = false): void
{
if (!$this->lockService->acquire()) {
@@ -90,7 +84,6 @@ final class IngestOrchestrator
try {
// Falls Job bereits final ist → nichts tun (idempotent)
if (in_array($job->getStatus(), [
IngestJob::STATUS_COMPLETED,
IngestJob::STATUS_FAILED,
@@ -102,7 +95,9 @@ final class IngestOrchestrator
$job->markRunning();
$this->em->flush();
// Global Reindex
// ---------------------------
// GLOBAL REINDEX
// ---------------------------
if ($job->getType() === IngestJob::TYPE_GLOBAL_REINDEX) {
if ($dryRun) {
@@ -115,6 +110,30 @@ final class IngestOrchestrator
return;
}
// ---------------------------
// DOCUMENT DELETE (NEU)
// ---------------------------
if ($job->getType() === IngestJob::TYPE_DOCUMENT_DELETE) {
$documentId = $job->getDocumentId();
if (!$documentId instanceof Uuid) {
throw new \RuntimeException('Job has no document id.');
}
if ($dryRun) {
usleep(200000);
} else {
$this->ingestFlow->deleteDocument($documentId);
}
$this->jobService->markCompleted($job);
return;
}
// ---------------------------
// DOCUMENT / ACTIVATE
// ---------------------------
$isActivateJob = $job->getType() === IngestJob::TYPE_DOCUMENT_VERSION_ACTIVATE;
if (!$isActivateJob && $job->getType() !== IngestJob::TYPE_DOCUMENT) {
@@ -141,16 +160,12 @@ final class IngestOrchestrator
$status = $version->getIngestStatus();
// Bei Aktivierungs-Jobs IMMER re-ingestieren (auch wenn die Version früher schon indexed war).
// Hintergrund: nach Aktivierung soll der Index deterministisch die aktive Version widerspiegeln.
if (!$isActivateJob) {
// Nur blockieren wenn wirklich schon indexed
if ($status === DocumentVersion::INGEST_INDEXED) {
throw new \RuntimeException('DocumentVersion already indexed.');
}
}
// RUNNING darf hier erlaubt sein (async!)
if (!$isActivateJob) {
if (!in_array($status, [
DocumentVersion::INGEST_PENDING,
@@ -200,9 +215,6 @@ final class IngestOrchestrator
}
}
/**
* Globaler Reindex (synchron)
*/
public function runGlobal(User $user, bool $dryRun = false): IngestJob
{
if (!$this->lockService->acquire()) {