new system rebuild command
harden IngestService
This commit is contained in:
204
src/Command/SystemRebuildCommand.php
Normal file
204
src/Command/SystemRebuildCommand.php
Normal file
@@ -0,0 +1,204 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
|
||||||
|
declare(strict_types=1);
|
||||||
|
|
||||||
|
namespace App\Command;
|
||||||
|
|
||||||
|
use App\Entity\IngestJob;
|
||||||
|
use App\Index\IndexMetaManager;
|
||||||
|
use App\Service\IngestJobService;
|
||||||
|
use App\Service\IngestOrchestrator;
|
||||||
|
use App\Tag\TagNdjsonExporter;
|
||||||
|
use App\Tag\TagVectorIndexBuilder;
|
||||||
|
use App\Vector\VectorIndexHealthService;
|
||||||
|
use Symfony\Component\Console\Attribute\AsCommand;
|
||||||
|
use Symfony\Component\Console\Command\Command;
|
||||||
|
use Symfony\Component\Console\Input\InputInterface;
|
||||||
|
use Symfony\Component\Console\Input\InputOption;
|
||||||
|
use Symfony\Component\Console\Output\OutputInterface;
|
||||||
|
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||||
|
use Symfony\Component\Process\Process;
|
||||||
|
|
||||||
|
#[AsCommand(
|
||||||
|
name: 'mto:agent:system:rebuild',
|
||||||
|
description: 'Hard rebuild: global reindex (chunks+vector), rebuild tag index, then reload vector service'
|
||||||
|
)]
|
||||||
|
final class SystemRebuildCommand extends Command
|
||||||
|
{
|
||||||
|
public function __construct(
|
||||||
|
private readonly IngestJobService $jobService,
|
||||||
|
private readonly IngestOrchestrator $orchestrator,
|
||||||
|
private readonly TagNdjsonExporter $tagExporter,
|
||||||
|
private readonly TagVectorIndexBuilder $tagIndexBuilder,
|
||||||
|
private readonly IndexMetaManager $metaManager,
|
||||||
|
private readonly VectorIndexHealthService $health,
|
||||||
|
private readonly string $projectDir,
|
||||||
|
)
|
||||||
|
{
|
||||||
|
parent::__construct();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected function configure(): void
|
||||||
|
{
|
||||||
|
$this
|
||||||
|
->addOption('hard', null, InputOption::VALUE_NONE, 'Required safety switch. Without --hard, the command aborts.')
|
||||||
|
->addOption('no-tags', null, InputOption::VALUE_NONE, 'Skip tag rebuild')
|
||||||
|
->addOption('no-reload', null, InputOption::VALUE_NONE, 'Skip vector service reload/start')
|
||||||
|
->addOption('no-health', null, InputOption::VALUE_NONE, 'Skip health check (not recommended)')
|
||||||
|
->addOption('dry-run', null, InputOption::VALUE_NONE, 'Simulate ingest steps (no writes)');
|
||||||
|
}
|
||||||
|
|
||||||
|
protected function execute(InputInterface $input, OutputInterface $output): int
|
||||||
|
{
|
||||||
|
$io = new SymfonyStyle($input, $output);
|
||||||
|
|
||||||
|
if (!$input->getOption('hard')) {
|
||||||
|
$io->error('Safety switch missing: you must pass --hard to run this command.');
|
||||||
|
$io->writeln('Example: bin/console mto:agent:system:rebuild --hard');
|
||||||
|
return Command::FAILURE;
|
||||||
|
}
|
||||||
|
|
||||||
|
$dryRun = (bool)$input->getOption('dry-run');
|
||||||
|
|
||||||
|
$io->title('mto:agent:system:rebuild --hard');
|
||||||
|
|
||||||
|
// ---------------------------------------------------------
|
||||||
|
// 1) GLOBAL REINDEX (chunks rewrite + vector rebuild)
|
||||||
|
// ---------------------------------------------------------
|
||||||
|
$io->section('1/4 Global reindex (chunks + vector index)');
|
||||||
|
|
||||||
|
$job = $this->jobService->startJob(
|
||||||
|
IngestJob::TYPE_GLOBAL_REINDEX,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
IngestJob::STATUS_QUEUED
|
||||||
|
);
|
||||||
|
|
||||||
|
try {
|
||||||
|
$this->orchestrator->runExistingJob($job, $dryRun);
|
||||||
|
$io->success('Global reindex completed.');
|
||||||
|
} catch (\Throwable $e) {
|
||||||
|
$io->error('Global reindex failed: ' . $e->getMessage());
|
||||||
|
return Command::FAILURE;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------
|
||||||
|
// 2) TAG REBUILD (tags.ndjson + vector_tags.index)
|
||||||
|
// ---------------------------------------------------------
|
||||||
|
if (!$input->getOption('no-tags')) {
|
||||||
|
$io->section('2/4 Tag rebuild (tags.ndjson + vector_tags.index)');
|
||||||
|
|
||||||
|
if ($dryRun) {
|
||||||
|
$io->note('dry-run enabled: tag rebuild skipped (would export + build tag index).');
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
$export = $this->tagExporter->export();
|
||||||
|
|
||||||
|
$io->writeln('<info>Exported tags.ndjson</info>');
|
||||||
|
$io->writeln('Path: ' . $export['path']);
|
||||||
|
$io->writeln('Tags: ' . $export['tags']);
|
||||||
|
$io->writeln('Lines: ' . $export['lines']);
|
||||||
|
$io->writeln('Bytes: ' . $export['bytes']);
|
||||||
|
|
||||||
|
$this->tagIndexBuilder->build();
|
||||||
|
$io->writeln('<info>Built vector_tags.index</info>');
|
||||||
|
|
||||||
|
$this->metaManager->touchRuntime([
|
||||||
|
'last_tags_rebuild_at' => (new \DateTimeImmutable())->format(DATE_ATOM),
|
||||||
|
]);
|
||||||
|
$io->success('Tag rebuild completed.');
|
||||||
|
} catch (\Throwable $e) {
|
||||||
|
$io->error('Tag rebuild failed: ' . $e->getMessage());
|
||||||
|
return Command::FAILURE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
$io->section('2/4 Tag rebuild');
|
||||||
|
$io->note('Skipped due to --no-tags.');
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------
|
||||||
|
// 3) VECTOR SERVICE (install deps + start + reload)
|
||||||
|
// ---------------------------------------------------------
|
||||||
|
if (!$input->getOption('no-reload')) {
|
||||||
|
$io->section('3/4 Vector service reload (uvicorn)');
|
||||||
|
|
||||||
|
if ($dryRun) {
|
||||||
|
$io->note('dry-run enabled: service reload skipped.');
|
||||||
|
} else {
|
||||||
|
$cmd = [
|
||||||
|
'.venv/bin/python',
|
||||||
|
'python/vector/vector_control.py',
|
||||||
|
'--install',
|
||||||
|
'--start',
|
||||||
|
'--reload',
|
||||||
|
'--port', '8090',
|
||||||
|
'--host', '0.0.0.0'
|
||||||
|
];
|
||||||
|
|
||||||
|
$process = new Process($cmd, $this->projectDir);
|
||||||
|
$process->setTimeout(600);
|
||||||
|
$process->run();
|
||||||
|
|
||||||
|
$out = trim($process->getOutput());
|
||||||
|
$err = trim($process->getErrorOutput());
|
||||||
|
|
||||||
|
if ($out !== '') {
|
||||||
|
$io->writeln($out);
|
||||||
|
}
|
||||||
|
if ($err !== '') {
|
||||||
|
$io->writeln('<comment>' . $err . '</comment>');
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!$process->isSuccessful()) {
|
||||||
|
$io->error('Vector service reload failed (non-zero exit code).');
|
||||||
|
return Command::FAILURE;
|
||||||
|
}
|
||||||
|
|
||||||
|
$io->success('Vector service reloaded.');
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
$io->section('3/4 Vector service reload');
|
||||||
|
$io->note('Skipped due to --no-reload.');
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------
|
||||||
|
// 4) HEALTH CHECK (NDJSON vs vector meta)
|
||||||
|
// ---------------------------------------------------------
|
||||||
|
if (!$input->getOption('no-health')) {
|
||||||
|
$io->section('4/4 Health check');
|
||||||
|
|
||||||
|
try {
|
||||||
|
$report = $this->health->check();
|
||||||
|
} catch (\Throwable $e) {
|
||||||
|
$io->error('Health check failed: ' . $e->getMessage());
|
||||||
|
return Command::FAILURE;
|
||||||
|
}
|
||||||
|
|
||||||
|
$io->definitionList(
|
||||||
|
['ndjson_exists' => $report['ndjson_exists'] ? 'yes' : 'no'],
|
||||||
|
['ndjson_chunk_count' => (string)$report['ndjson_chunk_count']],
|
||||||
|
['vector_exists' => $report['vector_exists'] ? 'yes' : 'no'],
|
||||||
|
['meta_exists' => $report['meta_exists'] ? 'yes' : 'no'],
|
||||||
|
['vector_chunk_count' => (string)$report['vector_chunk_count']],
|
||||||
|
['status' => (string)$report['status']],
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!in_array($report['status'], ['OK', 'OK_EMPTY'], true)) {
|
||||||
|
$io->error('Health check not OK: ' . $report['status']);
|
||||||
|
return Command::FAILURE;
|
||||||
|
}
|
||||||
|
|
||||||
|
$io->success('Health check OK.');
|
||||||
|
} else {
|
||||||
|
$io->section('4/4 Health check');
|
||||||
|
$io->note('Skipped due to --no-health.');
|
||||||
|
}
|
||||||
|
|
||||||
|
$io->success('System rebuild finished.');
|
||||||
|
return Command::SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -6,7 +6,6 @@ namespace App\Knowledge\Ingest;
|
|||||||
|
|
||||||
use App\Entity\DocumentVersion;
|
use App\Entity\DocumentVersion;
|
||||||
use App\Repository\DocumentVersionRepository;
|
use App\Repository\DocumentVersionRepository;
|
||||||
use Symfony\Component\Uid\Uuid;
|
|
||||||
|
|
||||||
final class KnowledgeIngestService
|
final class KnowledgeIngestService
|
||||||
{
|
{
|
||||||
@@ -18,7 +17,7 @@ final class KnowledgeIngestService
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Lokaler Ingest: erzeugt NDJSON-Records für genau diese Version.
|
* Lokaler Ingest: erzeugt deterministische NDJSON-Records.
|
||||||
*
|
*
|
||||||
* @return iterable<array<string,mixed>>
|
* @return iterable<array<string,mixed>>
|
||||||
*/
|
*/
|
||||||
@@ -34,25 +33,34 @@ final class KnowledgeIngestService
|
|||||||
$documentId = $doc->getId()->toRfc4122();
|
$documentId = $doc->getId()->toRfc4122();
|
||||||
$versionId = $version->getId()->toRfc4122();
|
$versionId = $version->getId()->toRfc4122();
|
||||||
|
|
||||||
// ✅ Regel: Wenn title gefüllt ist, kommt er in jeden Chunk
|
|
||||||
$title = trim((string) $doc->getTitle());
|
$title = trim((string) $doc->getTitle());
|
||||||
|
|
||||||
$index = 0;
|
$index = 0;
|
||||||
|
|
||||||
foreach ($chunks as $chunkText) {
|
foreach ($chunks as $chunkText) {
|
||||||
|
|
||||||
// ✅ Prefix nur wenn title vorhanden; keine Flags, keine Meta-Schalter
|
// Titel optional weiterhin prefixen (wenn du das behalten willst)
|
||||||
if ($title !== '' && !str_starts_with($chunkText, $title)) {
|
if ($title !== '' && !str_starts_with($chunkText, $title)) {
|
||||||
$chunkText = $title . "\n\n" . $chunkText;
|
$chunkText = $title . "\n\n" . $chunkText;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$chunkText = trim($chunkText);
|
||||||
|
|
||||||
|
// 🔥 deterministische Chunk-ID
|
||||||
|
$normalizedForId = $this->normalizeForId($chunkText);
|
||||||
|
|
||||||
|
$chunkId = sha1(
|
||||||
|
$documentId . '|' .
|
||||||
|
$versionId . '|' .
|
||||||
|
$normalizedForId
|
||||||
|
);
|
||||||
|
|
||||||
yield [
|
yield [
|
||||||
'chunk_id' => Uuid::v4()->toRfc4122(),
|
'chunk_id' => $chunkId,
|
||||||
'document_id' => $documentId,
|
'document_id' => $documentId,
|
||||||
'version_id' => $versionId,
|
'version_id' => $versionId,
|
||||||
'chunk_index' => $index++,
|
'chunk_index' => $index++,
|
||||||
'text' => $chunkText,
|
'text' => $chunkText,
|
||||||
// ✅ checksum muss den finalen Text abbilden (inkl. Titel)
|
|
||||||
'checksum' => sha1($chunkText),
|
'checksum' => sha1($chunkText),
|
||||||
'metadata' => $this->buildMetadata($version),
|
'metadata' => $this->buildMetadata($version),
|
||||||
];
|
];
|
||||||
@@ -60,10 +68,7 @@ final class KnowledgeIngestService
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Global Reindex: iteriert streamingfähig über alle aktiven Versionen.
|
* Global Reindex
|
||||||
* Keine RAM-Explosion, da alles generatorbasiert bleibt.
|
|
||||||
*
|
|
||||||
* @return iterable<array<string,mixed>>
|
|
||||||
*/
|
*/
|
||||||
public function buildAllActiveChunkRecords(): iterable
|
public function buildAllActiveChunkRecords(): iterable
|
||||||
{
|
{
|
||||||
@@ -76,8 +81,18 @@ final class KnowledgeIngestService
|
|||||||
{
|
{
|
||||||
$text = preg_replace("/\n{3,}/", "\n\n", $text);
|
$text = preg_replace("/\n{3,}/", "\n\n", $text);
|
||||||
$text = preg_replace("/[ \t]+$/m", "", $text);
|
$text = preg_replace("/[ \t]+$/m", "", $text);
|
||||||
|
return trim($text);
|
||||||
|
}
|
||||||
|
|
||||||
return $text;
|
/**
|
||||||
|
* Normalisierung für stabile ID-Berechnung.
|
||||||
|
* Wichtig: ID darf nicht durch Whitespace minimal variieren.
|
||||||
|
*/
|
||||||
|
private function normalizeForId(string $text): string
|
||||||
|
{
|
||||||
|
$text = mb_strtolower($text);
|
||||||
|
$text = preg_replace('/\s+/u', ' ', $text);
|
||||||
|
return trim($text);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -87,7 +102,6 @@ final class KnowledgeIngestService
|
|||||||
{
|
{
|
||||||
$doc = $version->getDocument();
|
$doc = $version->getDocument();
|
||||||
|
|
||||||
// Optional: Titel/Name, falls vorhanden
|
|
||||||
$title = null;
|
$title = null;
|
||||||
if (method_exists($doc, 'getTitle')) {
|
if (method_exists($doc, 'getTitle')) {
|
||||||
$title = $doc->getTitle();
|
$title = $doc->getTitle();
|
||||||
@@ -97,8 +111,10 @@ final class KnowledgeIngestService
|
|||||||
|
|
||||||
return array_filter([
|
return array_filter([
|
||||||
'document_title' => $title,
|
'document_title' => $title,
|
||||||
'version_number' => method_exists($version, 'getVersionNumber') ? $version->getVersionNumber() : null,
|
'version_number' => method_exists($version, 'getVersionNumber')
|
||||||
|
? $version->getVersionNumber()
|
||||||
|
: null,
|
||||||
'file_path' => $version->getFilePath(),
|
'file_path' => $version->getFilePath(),
|
||||||
], static fn($v) => $v !== null && $v !== '');
|
], static fn($v) => $v !== null && $v !== '');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user