stash light
This commit is contained in:
186
src/Knowledge/ChunkManager.php
Normal file
186
src/Knowledge/ChunkManager.php
Normal file
@@ -0,0 +1,186 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Knowledge;
|
||||
|
||||
use Symfony\Component\Uid\Uuid;
|
||||
|
||||
final class ChunkManager
|
||||
{
|
||||
private string $indexPath;
|
||||
|
||||
public function __construct(
|
||||
string $projectDir,
|
||||
string $relativeIndexPath = '/var/knowledge/index.ndjson'
|
||||
) {
|
||||
$this->indexPath = rtrim($projectDir, '/') . $relativeIndexPath;
|
||||
}
|
||||
|
||||
public function getIndexPath(): string
|
||||
{
|
||||
return $this->indexPath;
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// APPEND
|
||||
// ============================================================
|
||||
|
||||
/**
|
||||
* @param iterable<array<string,mixed>> $records
|
||||
*/
|
||||
public function appendChunks(iterable $records): void
|
||||
{
|
||||
$dir = \dirname($this->indexPath);
|
||||
|
||||
if (!is_dir($dir) && !mkdir($dir, 0777, true) && !is_dir($dir)) {
|
||||
throw new \RuntimeException('Unable to create index directory');
|
||||
}
|
||||
|
||||
$handle = fopen($this->indexPath, 'ab');
|
||||
if (!$handle) {
|
||||
throw new \RuntimeException('Unable to open index.ndjson for append');
|
||||
}
|
||||
|
||||
foreach ($records as $record) {
|
||||
$json = json_encode($record, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
|
||||
if ($json === false) {
|
||||
fclose($handle);
|
||||
throw new \RuntimeException('Unable to encode chunk record');
|
||||
}
|
||||
|
||||
fwrite($handle, $json . PHP_EOL);
|
||||
}
|
||||
|
||||
fclose($handle);
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// COMPACTION – Entfernt alle Chunks eines Dokuments
|
||||
// ============================================================
|
||||
|
||||
public function compactByDocument(Uuid $documentId): void
|
||||
{
|
||||
if (!is_file($this->indexPath)) {
|
||||
return; // nichts zu kompaktieren
|
||||
}
|
||||
|
||||
$tmpPath = $this->indexPath . '.tmp';
|
||||
|
||||
$in = fopen($this->indexPath, 'rb');
|
||||
$out = fopen($tmpPath, 'wb');
|
||||
|
||||
if (!$in || !$out) {
|
||||
throw new \RuntimeException('Unable to open index for compaction');
|
||||
}
|
||||
|
||||
$docIdString = $documentId->toRfc4122();
|
||||
|
||||
while (($line = fgets($in)) !== false) {
|
||||
$line = trim($line);
|
||||
if ($line === '') {
|
||||
continue;
|
||||
}
|
||||
|
||||
$data = json_decode($line, true);
|
||||
if (!is_array($data)) {
|
||||
continue; // skip corrupted line
|
||||
}
|
||||
|
||||
if (($data['document_id'] ?? null) === $docIdString) {
|
||||
continue; // skip this document's chunks
|
||||
}
|
||||
|
||||
fwrite($out, $line . PHP_EOL);
|
||||
}
|
||||
|
||||
fclose($in);
|
||||
fclose($out);
|
||||
|
||||
$this->atomicSwitch($tmpPath, $this->indexPath);
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// FULL REWRITE (Global Reindex)
|
||||
// ============================================================
|
||||
|
||||
/**
|
||||
* @param iterable<array<string,mixed>> $records
|
||||
*/
|
||||
public function rewriteAll(iterable $records): void
|
||||
{
|
||||
$dir = \dirname($this->indexPath);
|
||||
|
||||
if (!is_dir($dir) && !mkdir($dir, 0777, true) && !is_dir($dir)) {
|
||||
throw new \RuntimeException('Unable to create index directory');
|
||||
}
|
||||
|
||||
$tmpPath = $this->indexPath . '.tmp';
|
||||
|
||||
$handle = fopen($tmpPath, 'wb');
|
||||
if (!$handle) {
|
||||
throw new \RuntimeException('Unable to open temp index file');
|
||||
}
|
||||
|
||||
foreach ($records as $record) {
|
||||
$json = json_encode($record, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
|
||||
if ($json === false) {
|
||||
fclose($handle);
|
||||
throw new \RuntimeException('Unable to encode chunk record');
|
||||
}
|
||||
|
||||
fwrite($handle, $json . PHP_EOL);
|
||||
}
|
||||
|
||||
fclose($handle);
|
||||
|
||||
$this->atomicSwitch($tmpPath, $this->indexPath);
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// STREAM READ (für FAISS rebuild)
|
||||
// ============================================================
|
||||
|
||||
/**
|
||||
* @return \Generator<array<string,mixed>>
|
||||
*/
|
||||
public function streamAll(): \Generator
|
||||
{
|
||||
if (!is_file($this->indexPath)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$handle = fopen($this->indexPath, 'rb');
|
||||
if (!$handle) {
|
||||
throw new \RuntimeException('Unable to open index.ndjson for read');
|
||||
}
|
||||
|
||||
try {
|
||||
while (($line = fgets($handle)) !== false) {
|
||||
$line = trim($line);
|
||||
if ($line === '') {
|
||||
continue;
|
||||
}
|
||||
|
||||
$data = json_decode($line, true);
|
||||
if (is_array($data)) {
|
||||
yield $data;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
fclose($handle);
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// INTERNAL ATOMIC SWITCH
|
||||
// ============================================================
|
||||
|
||||
private function atomicSwitch(string $tmp, string $final): void
|
||||
{
|
||||
if (!rename($tmp, $final)) {
|
||||
@unlink($tmp);
|
||||
throw new \RuntimeException('Atomic switch failed for index.ndjson');
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,39 +1,93 @@
|
||||
<?php
|
||||
// src/Knowledge/Ingest/KnowledgeIngestService.php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Knowledge\Ingest;
|
||||
|
||||
use App\Entity\DocumentVersion;
|
||||
use App\Repository\DocumentVersionRepository;
|
||||
use Symfony\Component\Uid\Uuid;
|
||||
|
||||
final class KnowledgeIngestService
|
||||
{
|
||||
public function __construct(
|
||||
private DocumentLoader $loader,
|
||||
private SimpleChunker $chunker,
|
||||
private ChunkWriter $writer,
|
||||
private ChunkIndexWriter $indexWriter,
|
||||
)
|
||||
{
|
||||
private DocumentLoader $loader,
|
||||
private SimpleChunker $chunker,
|
||||
private DocumentVersionRepository $versionRepo,
|
||||
) {
|
||||
}
|
||||
|
||||
/** @return string[] written chunk filenames */
|
||||
public function ingestFile(string $path, bool $optimize = false): array
|
||||
/**
|
||||
* Lokaler Ingest: erzeugt NDJSON-Records für genau diese Version.
|
||||
*
|
||||
* @return iterable<array<string,mixed>>
|
||||
*/
|
||||
public function buildChunkRecords(DocumentVersion $version): iterable
|
||||
{
|
||||
$text = $this->loader->load($path);
|
||||
|
||||
if ($optimize) {
|
||||
$text = preg_replace("/\n{3,}/", "\n\n", $text);
|
||||
$text = preg_replace("/[ \t]+$/m", "", $text);
|
||||
}
|
||||
|
||||
$sourceHash = sha1($text);
|
||||
$sourceName = basename($path);
|
||||
|
||||
if ($this->indexWriter->hasSourceHash($sourceName, $sourceHash)) {
|
||||
return [];
|
||||
}
|
||||
$text = $this->loader->load($version->getFilePath());
|
||||
$text = $this->optimizeText($text);
|
||||
|
||||
$chunks = $this->chunker->chunk($text);
|
||||
return $this->writer->write($sourceName, $chunks, $sourceHash);
|
||||
|
||||
$documentId = $version->getDocument()->getId()->toRfc4122();
|
||||
$versionId = $version->getId()->toRfc4122();
|
||||
|
||||
$index = 0;
|
||||
|
||||
foreach ($chunks as $chunkText) {
|
||||
yield [
|
||||
'chunk_id' => Uuid::v4()->toRfc4122(),
|
||||
'document_id' => $documentId,
|
||||
'version_id' => $versionId,
|
||||
'chunk_index' => $index++,
|
||||
'text' => $chunkText,
|
||||
'checksum' => sha1($chunkText),
|
||||
'metadata' => $this->buildMetadata($version),
|
||||
];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Global Reindex: iteriert streamingfähig über alle aktiven Versionen.
|
||||
* Keine RAM-Explosion, da alles generatorbasiert bleibt.
|
||||
*
|
||||
* @return iterable<array<string,mixed>>
|
||||
*/
|
||||
public function buildAllActiveChunkRecords(): iterable
|
||||
{
|
||||
foreach ($this->versionRepo->iterateActiveVersions() as $version) {
|
||||
// yield from hält das Ganze streamingfähig (Generator-Kaskade)
|
||||
yield from $this->buildChunkRecords($version);
|
||||
}
|
||||
}
|
||||
|
||||
private function optimizeText(string $text): string
|
||||
{
|
||||
$text = preg_replace("/\n{3,}/", "\n\n", $text);
|
||||
$text = preg_replace("/[ \t]+$/m", "", $text);
|
||||
|
||||
return $text;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array<string,mixed>
|
||||
*/
|
||||
private function buildMetadata(DocumentVersion $version): array
|
||||
{
|
||||
$doc = $version->getDocument();
|
||||
|
||||
// Optional: Titel/Name, falls vorhanden
|
||||
$title = null;
|
||||
if (method_exists($doc, 'getTitle')) {
|
||||
$title = $doc->getTitle();
|
||||
} elseif (method_exists($doc, 'getName')) {
|
||||
$title = $doc->getName();
|
||||
}
|
||||
|
||||
return array_filter([
|
||||
'document_title' => $title,
|
||||
'version_number' => method_exists($version, 'getVersionNumber') ? $version->getVersionNumber() : null,
|
||||
'file_path' => $version->getFilePath(),
|
||||
], static fn($v) => $v !== null && $v !== '');
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user