phase a audit

This commit is contained in:
team2
2026-02-22 13:51:45 +01:00
parent 5656a10930
commit b3e9110dd1
14 changed files with 222 additions and 463 deletions

View File

@@ -29,7 +29,7 @@ final readonly class IngestFlow
) {}
// =========================================================
// DOCUMENT INGEST
// DOCUMENT INGEST (STREAMING SAFE)
// =========================================================
public function ingestDocumentVersion(DocumentVersion $version): void
@@ -45,12 +45,34 @@ final readonly class IngestFlow
$existing = $this->chunkManager->countAllChunks();
$records = iterator_to_array(
$this->knowledgeIngestService->buildChunkRecords($version),
false
);
$incoming = 0;
$generator = $this->knowledgeIngestService->buildChunkRecords($version);
$wrappedGenerator = (function () use ($generator, $existing, &$incoming) {
foreach ($generator as $record) {
$incoming++;
$total = $existing + $incoming;
if ($total >= self::CHUNK_LIMIT_WARN) {
// Nur einmal warnen
if ($incoming === 1 || $total === self::CHUNK_LIMIT_WARN) {
// Logging erfolgt außerhalb des Streams final
}
}
if ($total > self::CHUNK_LIMIT_HARD) {
throw new \RuntimeException('Chunk limit exceeded.');
}
yield $record;
}
})();
$this->chunkManager->appendChunks($wrappedGenerator);
$incoming = count($records);
$total = $existing + $incoming;
if ($total >= self::CHUNK_LIMIT_WARN) {
@@ -61,12 +83,6 @@ final readonly class IngestFlow
]);
}
if ($total > self::CHUNK_LIMIT_HARD) {
throw new \RuntimeException('Chunk limit exceeded.');
}
$this->chunkManager->appendChunks($records);
$this->rebuildIndex(false);
$version->setIngestStatus(DocumentVersion::INGEST_INDEXED);
@@ -81,13 +97,11 @@ final readonly class IngestFlow
}
// =========================================================
// GLOBAL REINDEX
// GLOBAL REINDEX (STREAMING SAFE)
// =========================================================
public function globalReindex(): void
{
// 1⃣ Prüfen ob aktive Dokumente existieren
$activeDocuments = $this->em
->getRepository(Document::class)
->createQueryBuilder('d')
@@ -102,22 +116,40 @@ final readonly class IngestFlow
);
}
// 2⃣ ChunkRecords erzeugen
$records = iterator_to_array(
$this->knowledgeIngestService->buildAllActiveChunkRecords(),
false
);
$incoming = 0;
if (empty($records)) {
$generator = $this->knowledgeIngestService->buildAllActiveChunkRecords();
$wrappedGenerator = (function () use ($generator, &$incoming) {
foreach ($generator as $record) {
$incoming++;
yield $record;
}
})();
// Prüfen ob überhaupt etwas kommt (ohne alles in RAM zu ziehen)
$peekIterator = $wrappedGenerator instanceof \Iterator
? $wrappedGenerator
: (function () use ($wrappedGenerator) {
foreach ($wrappedGenerator as $item) {
yield $item;
}
})();
if (!$peekIterator->valid()) {
$peekIterator->rewind();
}
if (!$peekIterator->valid()) {
throw new \RuntimeException(
'Global Reindex abgebrochen: Es wurden keine Chunks erzeugt. Bitte prüfen Sie die Dokumente.'
'Global Reindex abgebrochen: Es wurden keine Chunks erzeugt.'
);
}
// 3⃣ Rewrite NDJSON
$this->chunkManager->rewriteAll($records);
$this->chunkManager->rewriteAll($peekIterator);
// 4⃣ Rebuild Vector Index
$this->rebuildIndex(true);
}
@@ -137,18 +169,14 @@ final readonly class IngestFlow
throw new \RuntimeException('Document not found.');
}
// Chunks entfernen
$this->chunkManager->compactByDocument($documentId);
// Dokument aus DB entfernen
$this->em->remove($document);
$this->em->flush();
// 4⃣ Reindex nur wenn sinnvoll
$this->rebuildIndex(false);
}
// =========================================================
// CENTRAL REBUILD
// =========================================================
@@ -169,4 +197,4 @@ final readonly class IngestFlow
$chunkCount = $this->chunkManager->countAllChunks();
$this->metaManager->updateRuntimeStats($chunkCount);
}
}
}