diff --git a/python/vector/vector_service.py b/python/vector/vector_service.py index b563a9a..006a81d 100644 --- a/python/vector/vector_service.py +++ b/python/vector/vector_service.py @@ -1,6 +1,10 @@ #!/usr/bin/env python3 import json +import logging +from logging.handlers import RotatingFileHandler +import threading +import time from pathlib import Path from typing import Any, List, Optional, Dict @@ -17,6 +21,8 @@ from sentence_transformers import SentenceTransformer BASE_PATH = Path(__file__).resolve().parents[2] KNOWLEDGE_DIR = BASE_PATH / "var" / "knowledge" +LOG_DIR = BASE_PATH / "var" / "log" +LOG_FILE = LOG_DIR / "vector_service.log" CHUNK_INDEX_PATH = KNOWLEDGE_DIR / "vector.index" CHUNK_MAP_PATH = KNOWLEDGE_DIR / "vector.index.meta.json" @@ -25,9 +31,47 @@ TAG_INDEX_PATH = KNOWLEDGE_DIR / "vector_tags.index" TAG_MAP_PATH = KNOWLEDGE_DIR / "vector_tags.index.meta.json" INDEX_META_PATH = KNOWLEDGE_DIR / "index_meta.json" +INDEX_RUNTIME_PATH = KNOWLEDGE_DIR / "index_runtime.json" INDEX_NDJSON_PATH = KNOWLEDGE_DIR / "index.ndjson" +# ============================================================ +# Logging +# ============================================================ + +logger = logging.getLogger("vector_service") +logger.setLevel(logging.INFO) + +def setup_logging() -> None: + LOG_DIR.mkdir(parents=True, exist_ok=True) + + fmt = logging.Formatter( + fmt="%(asctime)s %(levelname)s %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S%z", + ) + + # Rotating file + file_handler = RotatingFileHandler( + str(LOG_FILE), + maxBytes=10 * 1024 * 1024, # 10MB + backupCount=5, + encoding="utf-8", + ) + file_handler.setFormatter(fmt) + file_handler.setLevel(logging.INFO) + + # Console (stdout) + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(fmt) + stream_handler.setLevel(logging.INFO) + + # avoid duplicate handlers if uvicorn reloads workers + if not any(isinstance(h, RotatingFileHandler) for h in logger.handlers): + logger.addHandler(file_handler) + if not any(isinstance(h, logging.StreamHandler) for h in logger.handlers): + logger.addHandler(stream_handler) + + # ============================================================ # FastAPI # ============================================================ @@ -44,6 +88,11 @@ tag_ids: Optional[List[Any]] = None loaded_embedding_model_name: Optional[str] = None +current_index_version: Optional[int] = None +current_runtime_stamp: Optional[str] = None + +reload_lock = threading.Lock() + # ============================================================ # Models @@ -52,14 +101,24 @@ loaded_embedding_model_name: Optional[str] = None class SearchRequest(BaseModel): query: str limit: int = 8 - doc_ids: Optional[List[str]] = None # NEW + doc_ids: Optional[List[str]] = None # ============================================================ # Loader # ============================================================ -def load_chunk_doc_map(): +def _safe_read_json(path: Path) -> Optional[dict]: + try: + if not path.exists(): + return None + return json.loads(path.read_text(encoding="utf-8")) + except Exception as e: + logger.warning("Failed to read json %s: %s", str(path), str(e)) + return None + + +def load_chunk_doc_map() -> None: global chunk_doc_map chunk_doc_map = {} @@ -67,61 +126,131 @@ def load_chunk_doc_map(): if not INDEX_NDJSON_PATH.exists(): return - with INDEX_NDJSON_PATH.open("r", encoding="utf-8") as f: - for line in f: - try: - row = json.loads(line) - except Exception: + try: + with INDEX_NDJSON_PATH.open("r", encoding="utf-8") as f: + for line in f: + try: + row = json.loads(line) + except Exception: + continue + + chunk_id = row.get("chunk_id") + document_id = row.get("document_id") + + if isinstance(chunk_id, str) and isinstance(document_id, str): + chunk_doc_map[chunk_id] = document_id + except Exception as e: + logger.warning("Failed to load chunk-doc map from ndjson: %s", str(e)) + + +def load_all() -> None: + """ + Reload everything deterministically (model + indices + maps), + guarded by reload_lock (thread-safe). + """ + global model, chunk_index, chunk_ids + global tag_index, tag_ids + global loaded_embedding_model_name + global current_index_version + global current_runtime_stamp + + with reload_lock: + meta = _safe_read_json(INDEX_META_PATH) + if not isinstance(meta, dict): + raise RuntimeError("index_meta.json not found or invalid") + + embedding_model_name = meta.get("embedding_model") + index_version = meta.get("index_version") + + if not embedding_model_name: + raise RuntimeError("embedding_model missing in index_meta.json") + + # Reload model if needed + if model is None or embedding_model_name != loaded_embedding_model_name: + logger.info("[Reload] Loading embedding model: %s", embedding_model_name) + model = SentenceTransformer(embedding_model_name) + loaded_embedding_model_name = embedding_model_name + + # Reload chunk index + if CHUNK_INDEX_PATH.exists() and CHUNK_MAP_PATH.exists(): + logger.info("[Reload] Loading chunk index") + chunk_index = faiss.read_index(str(CHUNK_INDEX_PATH)) + chunk_ids = _safe_read_json(CHUNK_MAP_PATH) or None + if not isinstance(chunk_ids, list): + chunk_index = None + chunk_ids = None + logger.warning("[Reload] chunk_ids meta invalid -> chunk index disabled") + else: + chunk_index = None + chunk_ids = None + + # Load chunk → document map + logger.info("[Reload] Loading chunk-doc map") + load_chunk_doc_map() + + # Reload tag index + if TAG_INDEX_PATH.exists() and TAG_MAP_PATH.exists(): + logger.info("[Reload] Loading tag index") + tag_index = faiss.read_index(str(TAG_INDEX_PATH)) + tag_ids = _safe_read_json(TAG_MAP_PATH) or None + if not isinstance(tag_ids, list): + tag_index = None + tag_ids = None + logger.warning("[Reload] tag_ids meta invalid -> tag index disabled") + else: + tag_index = None + tag_ids = None + + # Runtime stamp (commit marker for tags+chunks) + runtime = _safe_read_json(INDEX_RUNTIME_PATH) + if isinstance(runtime, dict): + v = runtime.get("last_rebuild_at") + current_runtime_stamp = v if isinstance(v, str) else None + else: + current_runtime_stamp = None + + current_index_version = index_version if isinstance(index_version, int) else None + + logger.info("[Reload] Completed (index_version=%s runtime=%s)", str(current_index_version), str(current_runtime_stamp)) + + +# ============================================================ +# Observer (Enterprise Auto Reload) +# ============================================================ + +def observer_loop() -> None: + global current_index_version + global current_runtime_stamp + + while True: + time.sleep(2) + + try: + meta = _safe_read_json(INDEX_META_PATH) + if not isinstance(meta, dict): continue - chunk_id = row.get("chunk_id") - document_id = row.get("document_id") + new_version = meta.get("index_version") if isinstance(meta.get("index_version"), int) else None - if isinstance(chunk_id, str) and isinstance(document_id, str): - chunk_doc_map[chunk_id] = document_id + runtime = _safe_read_json(INDEX_RUNTIME_PATH) + new_runtime = None + if isinstance(runtime, dict): + v = runtime.get("last_rebuild_at") + new_runtime = v if isinstance(v, str) else None + # Structure change (embedding, dim, scoring_version, etc.) -> reload + if new_version != current_index_version: + logger.info("[Observer] index_version changed (%s -> %s) -> Reload", str(current_index_version), str(new_version)) + load_all() + continue -def load_all(): - global model, chunk_index, chunk_ids, tag_index, tag_ids, loaded_embedding_model_name + # Content change (chunks OR tags) -> reload + if new_runtime != current_runtime_stamp: + logger.info("[Observer] runtime changed (%s -> %s) -> Reload", str(current_runtime_stamp), str(new_runtime)) + load_all() - if not INDEX_META_PATH.exists(): - raise RuntimeError("index_meta.json not found") - - meta = json.loads(INDEX_META_PATH.read_text()) - embedding_model_name = meta.get("embedding_model") - - if not embedding_model_name: - raise RuntimeError("embedding_model missing in index_meta.json") - - # Reload model only if changed - if model is None or embedding_model_name != loaded_embedding_model_name: - print(f"[Reload] Loading embedding model: {embedding_model_name}") - model = SentenceTransformer(embedding_model_name) - loaded_embedding_model_name = embedding_model_name - - # Reload chunk index - if CHUNK_INDEX_PATH.exists() and CHUNK_MAP_PATH.exists(): - print("[Reload] Loading chunk index") - chunk_index = faiss.read_index(str(CHUNK_INDEX_PATH)) - chunk_ids = json.loads(CHUNK_MAP_PATH.read_text()) - else: - chunk_index = None - chunk_ids = None - - # Load chunk → document map - print("[Reload] Loading chunk-doc map") - load_chunk_doc_map() - - # Reload tag index - if TAG_INDEX_PATH.exists() and TAG_MAP_PATH.exists(): - print("[Reload] Loading tag index") - tag_index = faiss.read_index(str(TAG_INDEX_PATH)) - tag_ids = json.loads(TAG_MAP_PATH.read_text()) - else: - tag_index = None - tag_ids = None - - print("[Reload] Completed") + except Exception as e: + logger.error("[Observer ERROR] %s", str(e)) # ============================================================ @@ -130,8 +259,15 @@ def load_all(): @app.on_event("startup") def startup_event(): + setup_logging() + logger.info("[VectorService] Startup") + load_all() - print("[VectorService] Ready") + + t = threading.Thread(target=observer_loop, daemon=True) + t.start() + + logger.info("[VectorService] Ready (log=%s)", str(LOG_FILE)) # ============================================================ @@ -145,11 +281,18 @@ def health(): "chunk_index_loaded": chunk_index is not None, "tag_index_loaded": tag_index is not None, "model_loaded": model is not None, + "index_version": current_index_version, + "runtime_stamp": current_runtime_stamp, + "log_file": str(LOG_FILE), } @app.post("/reload") def reload(): + """ + Manual reload endpoint (kept for compatibility with mto:agent:vector:control --reload). + Auto-reload still runs via observer_loop. + """ try: load_all() return {"status": "reloaded"} @@ -159,13 +302,12 @@ def reload(): @app.post("/search-chunks") def search_chunks(req: SearchRequest): - if chunk_index is None or chunk_ids is None: + if chunk_index is None or chunk_ids is None or model is None: raise HTTPException(status_code=503, detail="Chunk index not available") query_vec = model.encode([req.query], normalize_embeddings=True) query_vec = np.array(query_vec).astype("float32") - # Wenn doc_ids gesetzt sind → mehr holen, dann filtern effective_limit = req.limit if req.doc_ids: effective_limit = max(req.limit * 5, 50) @@ -182,7 +324,6 @@ def search_chunks(req: SearchRequest): chunk_id = chunk_ids[idx] - # NEW: doc-scoped filter if req.doc_ids: doc_id = chunk_doc_map.get(chunk_id) if doc_id not in req.doc_ids: @@ -201,7 +342,7 @@ def search_chunks(req: SearchRequest): @app.post("/search-tags") def search_tags(req: SearchRequest): - if tag_index is None or tag_ids is None: + if tag_index is None or tag_ids is None or model is None: raise HTTPException(status_code=503, detail="Tag index not available") query_vec = model.encode([req.query], normalize_embeddings=True) @@ -210,6 +351,7 @@ def search_tags(req: SearchRequest): scores, indices = tag_index.search(query_vec, req.limit) results = [] + for score, idx in zip(scores[0], indices[0]): if idx == -1: continue diff --git a/src/Command/TagsRebuildCommand.php b/src/Command/TagsRebuildCommand.php index beeb5a0..15fb8fb 100644 --- a/src/Command/TagsRebuildCommand.php +++ b/src/Command/TagsRebuildCommand.php @@ -4,6 +4,7 @@ declare(strict_types=1); namespace App\Command; +use App\Index\IndexMetaManager; use App\Tag\TagNdjsonExporter; use App\Tag\TagVectorIndexBuilder; use Symfony\Component\Console\Attribute\AsCommand; @@ -20,6 +21,7 @@ final class TagsRebuildCommand extends Command public function __construct( private readonly TagNdjsonExporter $exporter, private readonly TagVectorIndexBuilder $builder, + private readonly IndexMetaManager $metaManager, ) { parent::__construct(); } @@ -27,15 +29,32 @@ final class TagsRebuildCommand extends Command protected function execute(InputInterface $input, OutputInterface $output): int { try { + // ----------------------------------------- + // 1) Export tags.ndjson + // ----------------------------------------- $export = $this->exporter->export(); - $output->writeln('1/2 Exported tags.ndjson'); + + $output->writeln('1/3 Exported tags.ndjson'); $output->writeln('Path: ' . $export['path']); $output->writeln('Tags: ' . $export['tags']); $output->writeln('Lines: ' . $export['lines']); $output->writeln('Bytes: ' . $export['bytes']); + // ----------------------------------------- + // 2) Build FAISS tag index + // ----------------------------------------- $this->builder->build(); - $output->writeln('2/2 Built vector_tags.index'); + + $output->writeln('2/3 Built vector_tags.index'); + + // ----------------------------------------- + // 3) Enterprise Commit Marker + // ----------------------------------------- + $this->metaManager->touchRuntime([ + 'last_tags_rebuild_at' => (new \DateTimeImmutable())->format(DATE_ATOM), + ]); + + $output->writeln('3/3 Runtime commit marker updated'); } catch (\Throwable $e) { $output->writeln('ERROR: ' . $e->getMessage() . ''); return Command::FAILURE; diff --git a/src/Index/IndexMetaManager.php b/src/Index/IndexMetaManager.php index 34ec93d..9fedaae 100644 --- a/src/Index/IndexMetaManager.php +++ b/src/Index/IndexMetaManager.php @@ -106,6 +106,9 @@ final class IndexMetaManager $this->atomicWriteJson($this->runtimePath, $payload); } + /** + * Für Chunk-Rebuilds (harte Zahl + Commit-Timestamp). + */ public function updateRuntimeStats(int $chunkCount): void { $this->ensureRuntimeFileExists(); @@ -118,7 +121,40 @@ final class IndexMetaManager $this->atomicWriteJson($this->runtimePath, $payload); } - public function getRuntimeChunkCount(): int + /** + * Enterprise-Commit-Marker für alles, was einen Reload im Vector-Service erfordern soll, + * ohne dass zwingend chunk_count aktualisiert werden kann (z.B. Tag-Rebuild). + * + * Beispiel: + * touchRuntime(['last_tags_rebuild_at' => DATE_ATOM]) + */ + public function touchRuntime(array $extra = []): void + { + $this->ensureRuntimeFileExists(); + + $current = $this->readRuntime() ?? []; + + // Grundschema absichern + if (!isset($current['chunk_count'])) { + $current['chunk_count'] = 0; + } + + $payload = array_merge( + $current, + $extra, + [ + // Commit-Marker IMMER setzen/überschreiben + 'last_rebuild_at' => (new \DateTimeImmutable())->format(DATE_ATOM), + ] + ); + + $this->atomicWriteJson($this->runtimePath, $payload); + } + + /** + * Runtime-Info lesen (z.B. für Tests, Debug, Polling). + */ + public function readRuntime(): ?array { $this->ensureRuntimeFileExists(); @@ -127,9 +163,25 @@ final class IndexMetaManager true ); + return is_array($data) ? $data : null; + } + + public function getRuntimeChunkCount(): int + { + $data = $this->readRuntime(); + return (int)($data['chunk_count'] ?? 0); } + public function getRuntimeLastRebuildAt(): ?string + { + $data = $this->readRuntime(); + + $v = $data['last_rebuild_at'] ?? null; + + return is_string($v) && $v !== '' ? $v : null; + } + // ===================================================== // INTERNAL ATOMIC JSON WRITE // ===================================================== diff --git a/src/Tag/TagVectorIndexBuilder.php b/src/Tag/TagVectorIndexBuilder.php index 03f794f..8ef38d4 100644 --- a/src/Tag/TagVectorIndexBuilder.php +++ b/src/Tag/TagVectorIndexBuilder.php @@ -4,6 +4,7 @@ declare(strict_types=1); namespace App\Tag; +use App\Index\IndexMetaManager; use Psr\Log\LoggerInterface; final class TagVectorIndexBuilder @@ -16,6 +17,7 @@ final class TagVectorIndexBuilder private readonly string $embeddingModel, private readonly int $timeoutSeconds, private readonly LoggerInterface $agentLogger, + private readonly IndexMetaManager $metaManager, // ✅ NEU ) {} public function build(): void @@ -34,18 +36,14 @@ final class TagVectorIndexBuilder $finalIndex = $this->vectorTagsIndexPath; $finalMeta = $finalIndex . '.meta.json'; - // Ensure output dir exists $dir = \dirname($finalIndex); if (!\is_dir($dir)) { @\mkdir($dir, 0775, true); } - // Clean tmp leftovers @\unlink($tmpIndex); @\unlink($tmpMeta); - // Positional args: - // python vector_ingest_tags.py $cmd = sprintf( '%s %s %s %s %s 2>&1', escapeshellarg($this->pythonBin), @@ -73,20 +71,22 @@ final class TagVectorIndexBuilder throw new \RuntimeException('Tag vector ingest failed (exit=' . $exit . ')'); } - // If no tags -> python may remove outputs and exit 0 if (!is_file($tmpIndex) || !is_file($tmpMeta)) { - // treat as "no index" rather than hard error @\unlink($tmpIndex); @\unlink($tmpMeta); $this->agentLogger->warning('[tags] no tag index produced (maybe 0 tags).'); return; } - // Atomic switch $this->atomicReplace($tmpIndex, $finalIndex); $this->atomicReplace($tmpMeta, $finalMeta); - $this->agentLogger->info('[tags] tag vector index build completed', [ + // ✅ ENTERPRISE COMMIT MARKER + $this->metaManager->touchRuntime([ + 'last_tags_rebuild_at' => (new \DateTimeImmutable())->format(DATE_ATOM), + ]); + + $this->agentLogger->info('[tags] tag vector index build completed + runtime committed', [ 'index' => $finalIndex, 'meta' => $finalMeta, ]);