diff --git a/config/services.yaml b/config/services.yaml index cdf0a98..2bcccee 100644 --- a/config/services.yaml +++ b/config/services.yaml @@ -29,8 +29,8 @@ parameters: mto.vector.data.upload.path: '%mto.knowledge.upload%' - mto.index.chunk_size: 800 - mto.index.chunk_overlap: 100 + mto.index.chunk_size: 250 + mto.index.chunk_overlap: 50 mto.index.embedding_model: 'intfloat/multilingual-e5-base' mto.index.embedding_dimension: 768 mto.index.scoring_version: 1 diff --git a/python/vector/vector_ingest_tags.py b/python/vector/vector_ingest_tags.py index bb88919..30d9c2d 100644 --- a/python/vector/vector_ingest_tags.py +++ b/python/vector/vector_ingest_tags.py @@ -1,13 +1,26 @@ #!/usr/bin/env python3 import json +import os +import signal import sys from pathlib import Path from typing import Any, Dict, List, Tuple +# Keep HuggingFace/SentenceTransformer model loading deterministic in CLI jobs. +os.environ.setdefault("HF_HUB_DISABLE_XET", "1") +os.environ.setdefault("HF_HUB_ETAG_TIMEOUT", "10") +os.environ.setdefault("HF_HUB_DOWNLOAD_TIMEOUT", "30") + +MODEL_LOAD_TIMEOUT_SECONDS = int(os.environ.get("RETRIEX_EMBEDDING_MODEL_LOAD_TIMEOUT_SECONDS", "60")) + + +def log_event(event: str, **payload: Any) -> None: + print(json.dumps({"event": event, **payload}, ensure_ascii=False), file=sys.stderr, flush=True) + def fail(message: str, code: int) -> None: - print(f"ERROR: {message}", file=sys.stderr) + print(f"ERROR: {message}", file=sys.stderr, flush=True) sys.exit(code) @@ -40,27 +53,6 @@ except Exception: import numpy as np -# --------------------------------------------------------- -# Load embedding model from index_meta.json (Single Source of Truth) -# --------------------------------------------------------- -BASE_PATH = Path(__file__).resolve().parents[2] -INDEX_META_PATH = BASE_PATH / "var" / "knowledge" / "index_meta.json" - -if not INDEX_META_PATH.exists(): - fail("index_meta.json not found", 30) - -try: - meta = json.loads(INDEX_META_PATH.read_text(encoding="utf-8")) -except Exception: - fail("index_meta.json is invalid", 30) - -embedding_model = meta.get("embedding_model") -if not isinstance(embedding_model, str) or embedding_model.strip() == "": - fail("embedding_model missing in index_meta.json", 31) - -model = SentenceTransformer(embedding_model.strip()) - - # --------------------------------------------------------- # File checks # --------------------------------------------------------- @@ -90,6 +82,61 @@ def normalize_text(value: Any) -> str: return text +def resolve_embedding_model_from_meta() -> str: + # Local model path wins. This avoids implicit network/cache lookup in production. + override = os.environ.get("RETRIEX_EMBEDDING_MODEL_PATH", "").strip() + if override: + return override + + base_path = Path(__file__).resolve().parents[2] + index_meta_path = base_path / "var" / "knowledge" / "index_meta.json" + + if not index_meta_path.exists(): + fail("index_meta.json not found", 30) + + try: + meta = json.loads(index_meta_path.read_text(encoding="utf-8")) + except Exception: + fail("index_meta.json is invalid", 30) + + embedding_model = meta.get("embedding_model") + if not isinstance(embedding_model, str) or embedding_model.strip() == "": + fail("embedding_model missing in index_meta.json", 31) + + return embedding_model.strip() + + +def load_sentence_transformer(model_name_or_path: str) -> SentenceTransformer: + def timeout_handler(_signum: int, _frame: Any) -> None: + raise TimeoutError( + "Embedding model load timed out. " + "Cache the model locally or set RETRIEX_EMBEDDING_MODEL_PATH." + ) + + log_event( + "tag_embedding_model_load_start", + model=model_name_or_path, + timeout_seconds=MODEL_LOAD_TIMEOUT_SECONDS, + hf_hub_disable_xet=os.environ.get("HF_HUB_DISABLE_XET"), + ) + + previous_handler = signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(MODEL_LOAD_TIMEOUT_SECONDS) + + try: + loaded_model = SentenceTransformer(model_name_or_path) + except TimeoutError as exc: + fail(str(exc), 32) + except Exception as exc: + fail(f"Unable to load embedding model '{model_name_or_path}': {exc}", 33) + finally: + signal.alarm(0) + signal.signal(signal.SIGALRM, previous_handler) + + log_event("tag_embedding_model_load_done", model=model_name_or_path) + return loaded_model + + # --------------------------------------------------------- # Streaming read NDJSON # --------------------------------------------------------- @@ -146,33 +193,34 @@ def load_rows(path: Path) -> Tuple[List[str], List[str], Dict[str, int]]: texts, ids, stats = load_rows(tags_path) - -print( - json.dumps( - { - "event": "tag_rows_loaded", - **stats, - }, - ensure_ascii=False, - ), - file=sys.stderr, -) +log_event("tag_rows_loaded", **stats) if not texts: cleanup_outputs() sys.exit(0) +# --------------------------------------------------------- +# Load model only after we know that usable tags exist +# --------------------------------------------------------- +embedding_model = resolve_embedding_model_from_meta() +model = load_sentence_transformer(embedding_model) + + # --------------------------------------------------------- # Build embeddings # --------------------------------------------------------- +log_event("tag_embedding_encode_start", rows=len(texts)) + embeddings = model.encode( texts, normalize_embeddings=True, - show_progress_bar=True, + show_progress_bar=False, batch_size=128, ) +log_event("tag_embedding_encode_done", rows=len(texts)) + embeddings = np.array(embeddings, dtype="float32") if embeddings.ndim != 2 or embeddings.shape[0] != len(ids) or embeddings.shape[0] == 0: @@ -198,4 +246,12 @@ meta_path.write_text( encoding="utf-8", ) -sys.exit(0) \ No newline at end of file +log_event( + "tag_vector_index_written", + index=str(out_path), + meta=str(meta_path), + rows=len(ids), + dimension=dim, +) + +sys.exit(0) diff --git a/python/vector/vector_search_tags.py b/python/vector/vector_search_tags.py index e9e41ea..90bcbf0 100644 --- a/python/vector/vector_search_tags.py +++ b/python/vector/vector_search_tags.py @@ -1,8 +1,28 @@ #!/usr/bin/env python3 -import sys import json +import os +import signal +import sys from pathlib import Path +from typing import Any + +# Keep stdout clean for the PHP caller. Diagnostics go to stderr only. +os.environ.setdefault("HF_HUB_DISABLE_XET", "1") +os.environ.setdefault("HF_HUB_ETAG_TIMEOUT", "5") +os.environ.setdefault("HF_HUB_DOWNLOAD_TIMEOUT", "10") + +MODEL_LOAD_TIMEOUT_SECONDS = int(os.environ.get("RETRIEX_EMBEDDING_MODEL_LOAD_TIMEOUT_SECONDS", "30")) + + +def empty() -> None: + print("[]") + sys.exit(0) + + +def debug(message: str) -> None: + print(message, file=sys.stderr, flush=True) + # --------------------------------------------------------- # Positional args (aligned with PHP client exec call) @@ -12,14 +32,9 @@ from pathlib import Path # 3 index_path # 4 meta_path # 5 model -# -# Example: -# python vector_search_tags.py "foo" 8 /path/vector_tags.index /path/vector_tags.index.meta.json all-MiniLM-L6-v2 # --------------------------------------------------------- - if len(sys.argv) < 6: - print("[]") - sys.exit(0) + empty() query = sys.argv[1] @@ -29,66 +44,83 @@ except Exception: limit = 5 index_path = Path(sys.argv[3]).resolve() -meta_path = Path(sys.argv[4]).resolve() +meta_path = Path(sys.argv[4]).resolve() model_name = sys.argv[5] +model_override = os.environ.get("RETRIEX_EMBEDDING_MODEL_PATH", "").strip() +if model_override: + model_name = model_override + # --------------------------------------------------------- # Dependency checks # --------------------------------------------------------- try: import faiss except Exception: - # keep stdout clean for caller - print("[]") - sys.exit(0) + empty() try: from sentence_transformers import SentenceTransformer except Exception: - print("[]") - sys.exit(0) - -from sentence_transformers import SentenceTransformer + empty() # --------------------------------------------------------- # File checks # --------------------------------------------------------- if limit <= 0: - print("[]") - sys.exit(0) + empty() if not index_path.is_file() or not meta_path.is_file(): - # No tag index available => no routing - print("[]") - sys.exit(0) + empty() # --------------------------------------------------------- -# Load model +# Load model with timeout # --------------------------------------------------------- -model = SentenceTransformer(model_name) +def load_model(model_name_or_path: str) -> SentenceTransformer: + def timeout_handler(_signum: int, _frame: Any) -> None: + raise TimeoutError("tag search embedding model load timed out") + + previous_handler = signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(MODEL_LOAD_TIMEOUT_SECONDS) + + try: + return SentenceTransformer(model_name_or_path) + finally: + signal.alarm(0) + signal.signal(signal.SIGALRM, previous_handler) + + +try: + model = load_model(model_name) +except Exception as exc: + debug(f"Unable to load tag search embedding model '{model_name}': {exc}") + empty() # --------------------------------------------------------- # Load index + meta # --------------------------------------------------------- -index = faiss.read_index(str(index_path)) +try: + index = faiss.read_index(str(index_path)) +except Exception: + empty() try: with open(meta_path, "r", encoding="utf-8") as f: ids = json.load(f) except Exception: - print("[]") - sys.exit(0) + empty() if not isinstance(ids, list) or len(ids) == 0: - print("[]") - sys.exit(0) + empty() # --------------------------------------------------------- # Embed & search # --------------------------------------------------------- -qvec = model.encode([query], normalize_embeddings=True) - -scores, idxs = index.search(qvec, limit) +try: + qvec = model.encode([query], normalize_embeddings=True, show_progress_bar=False) + scores, idxs = index.search(qvec, limit) +except Exception: + empty() out = [] for score, idx in zip(scores[0], idxs[0]): @@ -100,4 +132,4 @@ for score, idx in zip(scores[0], idxs[0]): }) print(json.dumps(out)) -sys.exit(0) \ No newline at end of file +sys.exit(0) diff --git a/python/vector/vector_service.py b/python/vector/vector_service.py index 97f45bf..126176d 100644 --- a/python/vector/vector_service.py +++ b/python/vector/vector_service.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import json +import os import logging from logging.handlers import RotatingFileHandler import threading @@ -13,6 +14,12 @@ import numpy as np from fastapi import FastAPI, HTTPException, Request from fastapi.responses import JSONResponse from pydantic import BaseModel + +# Keep HuggingFace/SentenceTransformer model loading deterministic. +os.environ.setdefault("HF_HUB_DISABLE_XET", "1") +os.environ.setdefault("HF_HUB_ETAG_TIMEOUT", "10") +os.environ.setdefault("HF_HUB_DOWNLOAD_TIMEOUT", "30") + from sentence_transformers import SentenceTransformer @@ -147,6 +154,15 @@ def _safe_read_json(path: Path) -> Optional[Any]: return None + +def _resolve_embedding_model_name(configured_model_name: str) -> str: + # A local model directory avoids implicit network/cache lookups in production. + model_override = os.environ.get("RETRIEX_EMBEDDING_MODEL_PATH", "").strip() + if model_override: + return model_override + + return configured_model_name.strip() + def _as_key(value: Any) -> Optional[str]: if value is None: return None @@ -362,10 +378,12 @@ def load_all() -> None: if not embedding_model_name: raise RuntimeError("embedding_model missing in index_meta.json") - if model is None or embedding_model_name != loaded_embedding_model_name: - logger.info("[Reload] Loading embedding model: %s", embedding_model_name) - model = SentenceTransformer(embedding_model_name) - loaded_embedding_model_name = embedding_model_name + resolved_embedding_model_name = _resolve_embedding_model_name(str(embedding_model_name)) + + if model is None or resolved_embedding_model_name != loaded_embedding_model_name: + logger.info("[Reload] Loading embedding model: %s", resolved_embedding_model_name) + model = SentenceTransformer(resolved_embedding_model_name) + loaded_embedding_model_name = resolved_embedding_model_name runtime = _safe_read_json(INDEX_RUNTIME_PATH) chunk_runtime_stamp, tags_runtime_stamp, tags_index_present = _extract_runtime_state(runtime) diff --git a/src/Command/TagsRebuildCommand.php b/src/Command/TagsRebuildCommand.php index b42d1a4..99502f7 100644 --- a/src/Command/TagsRebuildCommand.php +++ b/src/Command/TagsRebuildCommand.php @@ -38,16 +38,22 @@ final class TagsRebuildCommand extends Command $io->writeln('Tags: ' . (string) ($export['tags'] ?? 0)); $io->writeln('Lines: ' . (string) ($export['lines'] ?? 0)); $io->writeln('Bytes: ' . (string) ($export['bytes'] ?? 0)); + $io->newLine(); + $io->writeln('2/2 Building vector_tags.index'); + $io->writeln('If this step fails, check the embedding model cache or set RETRIEX_EMBEDDING_MODEL_PATH.'); $this->builder->build(); - $io->writeln('2/2 Built vector_tags.index'); $io->success('Tag rebuild completed.'); return Command::SUCCESS; } catch (\Throwable $e) { $io->error($e->getMessage()); + if ($output->isVerbose() && $e->getPrevious() !== null) { + $io->writeln('Previous exception: ' . $e->getPrevious()->getMessage()); + } + return Command::FAILURE; } } @@ -71,4 +77,4 @@ final class TagsRebuildCommand extends Command throw new \RuntimeException('Tag export returned invalid statistics.'); } } -} \ No newline at end of file +} diff --git a/src/Tag/TagVectorIndexBuilder.php b/src/Tag/TagVectorIndexBuilder.php index d074a64..5a7e8e6 100644 --- a/src/Tag/TagVectorIndexBuilder.php +++ b/src/Tag/TagVectorIndexBuilder.php @@ -6,11 +6,12 @@ namespace App\Tag; use App\Index\IndexMetaManager; use Psr\Log\LoggerInterface; +use Symfony\Component\Process\Exception\ProcessFailedException; +use Symfony\Component\Process\Exception\ProcessTimedOutException; +use Symfony\Component\Process\Process; final readonly class TagVectorIndexBuilder { - private const GRACEFUL_TERMINATION_SECONDS = 2; - public function __construct( private string $pythonBin, private string $scriptPath, @@ -44,25 +45,46 @@ final readonly class TagVectorIndexBuilder return; } - $cmd = $this->buildCommand($tmpIndex); + $cmd = [ + $this->pythonBin, + $this->scriptPath, + $this->tagsNdjsonPath, + $tmpIndex, + ]; $this->agentLogger->info('[tags] build tag vector index', [ 'cmd' => $cmd, 'timeout' => $this->timeoutSeconds, 'embedding_model' => $this->embeddingModel, + 'model_path_override' => getenv('RETRIEX_EMBEDDING_MODEL_PATH') ?: null, ]); try { - $result = $this->runCommand($cmd); + $process = new Process($cmd); + $process->setTimeout($this->timeoutSeconds); + $process->setIdleTimeout($this->timeoutSeconds); - if ($result['exit'] !== 0) { + $process->run(function (string $type, string $buffer): void { + $message = trim($buffer); + + if ($message === '') { + return; + } + + $this->agentLogger->info('[tags] vector ingest output', [ + 'type' => $type, + 'output' => $message, + ]); + }); + + if (!$process->isSuccessful()) { $this->agentLogger->error('[tags] tag vector ingest failed', [ - 'exit' => $result['exit'], - 'stdout' => $result['stdout'], - 'stderr' => $result['stderr'], + 'exit' => $process->getExitCode(), + 'stdout' => trim($process->getOutput()), + 'stderr' => trim($process->getErrorOutput()), ]); - throw new \RuntimeException('Tag vector ingest failed (exit=' . $result['exit'] . ')'); + throw new ProcessFailedException($process); } if (!$this->isUsableArtifact($tmpIndex) || !$this->isUsableArtifact($tmpMeta)) { @@ -77,6 +99,21 @@ final readonly class TagVectorIndexBuilder 'index' => $finalIndex, 'meta' => $finalMeta, ]); + } catch (ProcessTimedOutException $e) { + $this->cleanupTemporaryArtifacts($tmpIndex, $tmpMeta); + + $this->agentLogger->error('[tags] tag vector ingest timed out', [ + 'timeout' => $this->timeoutSeconds, + 'message' => $e->getMessage(), + ]); + + throw new \RuntimeException( + 'Tag vector ingest timed out after ' . $this->timeoutSeconds . ' seconds. ' + . 'Most likely the embedding model cannot be loaded. ' + . 'Set RETRIEX_EMBEDDING_MODEL_PATH to a local model directory or check the HuggingFace cache.', + 0, + $e, + ); } catch (\Throwable $e) { $this->cleanupTemporaryArtifacts($tmpIndex, $tmpMeta); throw $e; @@ -102,17 +139,6 @@ final readonly class TagVectorIndexBuilder } } - private function buildCommand(string $tmpIndex): string - { - return sprintf( - '%s %s %s %s 2>&1', - escapeshellarg($this->pythonBin), - escapeshellarg($this->scriptPath), - escapeshellarg($this->tagsNdjsonPath), - escapeshellarg($tmpIndex), - ); - } - private function ensureTargetDirectoryExists(string $finalIndexPath): void { $dir = dirname($finalIndexPath); @@ -162,85 +188,6 @@ final readonly class TagVectorIndexBuilder return false; } - /** - * @return array{exit:int, stdout:string, stderr:string} - */ - private function runCommand(string $cmd): array - { - $descriptorSpec = [ - 0 => ['pipe', 'r'], - 1 => ['pipe', 'w'], - 2 => ['pipe', 'w'], - ]; - - $process = @proc_open($cmd, $descriptorSpec, $pipes); - - if (!is_resource($process)) { - throw new \RuntimeException('Could not start tag vector ingest process.'); - } - - fclose($pipes[0]); - stream_set_blocking($pipes[1], false); - stream_set_blocking($pipes[2], false); - - $stdout = ''; - $stderr = ''; - $startedAt = microtime(true); - $timedOut = false; - - try { - while (true) { - $stdout .= stream_get_contents($pipes[1]) ?: ''; - $stderr .= stream_get_contents($pipes[2]) ?: ''; - - $status = proc_get_status($process); - - if (!is_array($status) || ($status['running'] ?? false) !== true) { - break; - } - - if ((microtime(true) - $startedAt) > $this->timeoutSeconds) { - $timedOut = true; - proc_terminate($process); - usleep(self::GRACEFUL_TERMINATION_SECONDS * 1000000); - - $status = proc_get_status($process); - if (is_array($status) && ($status['running'] ?? false) === true) { - proc_terminate($process, 9); - } - - break; - } - - usleep(100000); - } - - $stdout .= stream_get_contents($pipes[1]) ?: ''; - $stderr .= stream_get_contents($pipes[2]) ?: ''; - } finally { - fclose($pipes[1]); - fclose($pipes[2]); - } - - $exitCode = proc_close($process); - - if ($timedOut) { - $this->agentLogger->error('[tags] tag vector ingest timed out', [ - 'timeout' => $this->timeoutSeconds, - 'stdout' => $stdout, - 'stderr' => $stderr, - ]); - - throw new \RuntimeException('Tag vector ingest timed out after ' . $this->timeoutSeconds . ' seconds.'); - } - - return [ - 'exit' => is_int($exitCode) ? $exitCode : 1, - 'stdout' => trim($stdout), - 'stderr' => trim($stderr), - ]; - } - private function isUsableArtifact(string $path): bool { return is_file($path) && filesize($path) > 0; @@ -281,4 +228,4 @@ final readonly class TagVectorIndexBuilder @chmod($final, 0664); } -} \ No newline at end of file +}