Files
MtoRagSystem/python/vector/vector_service.py
2026-02-28 16:10:47 +01:00

600 lines
19 KiB
Python

#!/usr/bin/env python3
import json
import logging
from logging.handlers import RotatingFileHandler
import threading
import time
from pathlib import Path
from typing import Any, List, Optional, Dict
import numpy as np
import faiss
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from sentence_transformers import SentenceTransformer
# ============================================================
# Service Stamp (to verify you are running THIS file)
# ============================================================
SERVICE_STAMP = "vector_service.py@2026-02-28T10:20+01:00"
# ============================================================
# Paths
# ============================================================
BASE_PATH = Path(__file__).resolve().parents[2]
KNOWLEDGE_DIR = BASE_PATH / "var" / "knowledge"
LOG_DIR = BASE_PATH / "var" / "log"
LOG_FILE = LOG_DIR / "vector_service.log"
CHUNK_INDEX_PATH = KNOWLEDGE_DIR / "vector.index"
CHUNK_MAP_PATH = KNOWLEDGE_DIR / "vector.index.meta.json"
TAG_INDEX_PATH = KNOWLEDGE_DIR / "vector_tags.index"
TAG_MAP_PATH = KNOWLEDGE_DIR / "vector_tags.index.meta.json"
INDEX_META_PATH = KNOWLEDGE_DIR / "index_meta.json"
INDEX_RUNTIME_PATH = KNOWLEDGE_DIR / "index_runtime.json"
INDEX_NDJSON_PATH = KNOWLEDGE_DIR / "index.ndjson"
# NEW: Tags NDJSON (exported by PHP) used to enrich /search-tags responses
TAGS_NDJSON_PATH = KNOWLEDGE_DIR / "tags.ndjson"
# ============================================================
# Logging
# ============================================================
logger = logging.getLogger("vector_service")
logger.setLevel(logging.INFO)
def setup_logging() -> None:
LOG_DIR.mkdir(parents=True, exist_ok=True)
fmt = logging.Formatter(
fmt="%(asctime)s %(levelname)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S%z",
)
file_handler = RotatingFileHandler(
str(LOG_FILE),
maxBytes=10 * 1024 * 1024,
backupCount=5,
encoding="utf-8",
)
file_handler.setFormatter(fmt)
file_handler.setLevel(logging.INFO)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(fmt)
stream_handler.setLevel(logging.INFO)
if not any(isinstance(h, RotatingFileHandler) for h in logger.handlers):
logger.addHandler(file_handler)
if not any(isinstance(h, logging.StreamHandler) for h in logger.handlers):
logger.addHandler(stream_handler)
# Capture uvicorn logs in the same file as well (critical for hidden 500s)
uvicorn_error = logging.getLogger("uvicorn.error")
uvicorn_access = logging.getLogger("uvicorn.access")
uvicorn_error.setLevel(logging.INFO)
uvicorn_access.setLevel(logging.INFO)
if not any(isinstance(h, RotatingFileHandler) for h in uvicorn_error.handlers):
uvicorn_error.addHandler(file_handler)
if not any(isinstance(h, logging.StreamHandler) for h in uvicorn_error.handlers):
uvicorn_error.addHandler(stream_handler)
if not any(isinstance(h, RotatingFileHandler) for h in uvicorn_access.handlers):
uvicorn_access.addHandler(file_handler)
if not any(isinstance(h, logging.StreamHandler) for h in uvicorn_access.handlers):
uvicorn_access.addHandler(stream_handler)
# ============================================================
# FastAPI
# ============================================================
app = FastAPI()
model: Optional[SentenceTransformer] = None
chunk_index = None
chunk_ids: Optional[List[Any]] = None
chunk_doc_map: Dict[str, str] = {}
chunk_pos_map: Dict[str, int] = {}
tag_index = None
tag_ids: Optional[List[Any]] = None
# NEW: tag_id -> {"label": "...", "tag_type": "..."}
tag_meta_map: Dict[str, Dict[str, str]] = {}
loaded_embedding_model_name: Optional[str] = None
current_index_version: Optional[int] = None
current_runtime_stamp: Optional[str] = None
reload_lock = threading.Lock()
# ============================================================
# Models
# ============================================================
class SearchRequest(BaseModel):
query: str
limit: int = 8
doc_ids: Optional[List[str]] = None
# ============================================================
# Loader Helpers
# ============================================================
def _safe_read_json(path: Path) -> Optional[Any]:
try:
if not path.exists():
return None
return json.loads(path.read_text(encoding="utf-8"))
except Exception as e:
logger.warning("Failed to read json %s: %s", str(path), str(e))
return None
def _as_key(value: Any) -> Optional[str]:
if value is None:
return None
if isinstance(value, str):
v = value.strip()
return v if v else None
try:
v = str(value).strip()
return v if v else None
except Exception:
return None
def _sanitize_limit(limit: int, default: int = 8, max_limit: int = 200) -> int:
try:
v = int(limit)
except Exception:
return default
if v <= 0:
return default
if v > max_limit:
return max_limit
return v
def load_chunk_maps_from_ndjson() -> None:
global chunk_doc_map, chunk_pos_map
chunk_doc_map = {}
chunk_pos_map = {}
if not INDEX_NDJSON_PATH.exists():
return
try:
with INDEX_NDJSON_PATH.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
row = json.loads(line)
except Exception:
continue
chunk_id_key = _as_key(row.get("chunk_id"))
if not chunk_id_key:
continue
doc_id_key = _as_key(row.get("document_id"))
if doc_id_key:
chunk_doc_map[chunk_id_key] = doc_id_key
ci = row.get("chunk_index")
if isinstance(ci, int):
chunk_pos_map[chunk_id_key] = ci
elif isinstance(ci, str):
s = ci.strip()
if s.isdigit():
try:
chunk_pos_map[chunk_id_key] = int(s)
except Exception:
pass
except Exception as e:
logger.warning("Failed to load chunk maps from ndjson: %s", str(e))
def load_tag_meta_from_tags_ndjson() -> None:
"""
Loads minimal tag metadata from tags.ndjson to enrich /search-tags results.
Expected line format (from PHP exporter / ingester pipeline):
{"tag_id":"...","text":"LABEL\\nSLUG\\noptional description", ...}
We extract:
label = first line of "text" (fallback: "")
tag_type = "type" if present (preferred), else "generic"
"""
global tag_meta_map
tag_meta_map = {}
if not TAGS_NDJSON_PATH.exists():
logger.info("[Reload] tags.ndjson missing -> tag_meta_map empty (%s)", str(TAGS_NDJSON_PATH))
return
try:
with TAGS_NDJSON_PATH.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
row = json.loads(line)
except Exception:
continue
tag_id = _as_key(row.get("tag_id"))
if not tag_id:
continue
# Prefer explicit fields if present
ttype = row.get("type")
if isinstance(ttype, str) and ttype.strip():
tag_type = ttype.strip()
else:
tag_type = "generic"
label = ""
txt = row.get("text")
if isinstance(txt, str) and txt.strip():
first = txt.splitlines()[0].strip() if txt.splitlines() else ""
label = first
if label:
tag_meta_map[tag_id] = {"label": label, "tag_type": tag_type}
else:
tag_meta_map[tag_id] = {"label": "", "tag_type": tag_type}
except Exception as e:
logger.warning("Failed to load tag meta from tags.ndjson: %s", str(e))
tag_meta_map = {}
def _normalize_meta_list(value: Any) -> Optional[List[Any]]:
"""
Accepts:
- list: ok
- dict like {"0": "...", "1": "..."}: convert to list sorted by numeric key
Returns None if invalid.
"""
if isinstance(value, list):
return value
if isinstance(value, dict):
try:
keys = sorted(int(k) for k in value.keys())
return [value[str(i)] for i in keys]
except Exception:
return None
return None
def load_all() -> None:
global model, chunk_index, chunk_ids
global tag_index, tag_ids
global loaded_embedding_model_name
global current_index_version
global current_runtime_stamp
with reload_lock:
meta = _safe_read_json(INDEX_META_PATH)
if not isinstance(meta, dict):
raise RuntimeError("index_meta.json not found or invalid")
embedding_model_name = meta.get("embedding_model")
index_version = meta.get("index_version")
if not embedding_model_name:
raise RuntimeError("embedding_model missing in index_meta.json")
if model is None or embedding_model_name != loaded_embedding_model_name:
logger.info("[Reload] Loading embedding model: %s", embedding_model_name)
model = SentenceTransformer(embedding_model_name)
loaded_embedding_model_name = embedding_model_name
# Chunks
if CHUNK_INDEX_PATH.exists() and CHUNK_MAP_PATH.exists():
logger.info("[Reload] Loading chunk index")
chunk_index = faiss.read_index(str(CHUNK_INDEX_PATH))
raw = _safe_read_json(CHUNK_MAP_PATH)
chunk_ids = _normalize_meta_list(raw)
if chunk_ids is None:
chunk_index = None
logger.warning("[Reload] chunk_ids meta invalid -> chunk index disabled")
else:
chunk_index = None
chunk_ids = None
logger.info("[Reload] Loading chunk maps (doc_id + chunk_index)")
load_chunk_maps_from_ndjson()
# Tags
if TAG_INDEX_PATH.exists() and TAG_MAP_PATH.exists():
logger.info("[Reload] Loading tag index")
tag_index = faiss.read_index(str(TAG_INDEX_PATH))
raw = _safe_read_json(TAG_MAP_PATH)
tag_ids = _normalize_meta_list(raw)
if tag_ids is None:
tag_index = None
logger.warning("[Reload] tag_ids meta invalid -> tag index disabled")
else:
tag_index = None
tag_ids = None
# NEW: load tag meta for enrichment
logger.info("[Reload] Loading tag meta from tags.ndjson")
load_tag_meta_from_tags_ndjson()
runtime = _safe_read_json(INDEX_RUNTIME_PATH)
if isinstance(runtime, dict):
v = runtime.get("last_rebuild_at")
current_runtime_stamp = v if isinstance(v, str) else None
else:
current_runtime_stamp = None
current_index_version = index_version if isinstance(index_version, int) else None
logger.info(
"[Reload] Completed (index_version=%s runtime=%s embedding_model=%s tag_meta=%s stamp=%s file=%s)",
str(current_index_version),
str(current_runtime_stamp),
str(loaded_embedding_model_name),
str(len(tag_meta_map)),
SERVICE_STAMP,
str(Path(__file__).resolve()),
)
# ============================================================
# Observer
# ============================================================
def observer_loop() -> None:
global current_index_version
global current_runtime_stamp
while True:
time.sleep(2)
try:
meta = _safe_read_json(INDEX_META_PATH)
if not isinstance(meta, dict):
continue
new_version = meta.get("index_version") if isinstance(meta.get("index_version"), int) else None
runtime = _safe_read_json(INDEX_RUNTIME_PATH)
new_runtime = None
if isinstance(runtime, dict):
v = runtime.get("last_rebuild_at")
new_runtime = v if isinstance(v, str) else None
if new_version != current_index_version:
logger.info("[Observer] index_version changed (%s -> %s) -> Reload", str(current_index_version), str(new_version))
load_all()
continue
if new_runtime != current_runtime_stamp:
logger.info("[Observer] runtime changed (%s -> %s) -> Reload", str(current_runtime_stamp), str(new_runtime))
load_all()
except Exception as e:
logger.exception("[Observer ERROR] %s", str(e))
# ============================================================
# Global Exception Handler (forces JSON + logs)
# ============================================================
@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception):
logger.exception("UNHANDLED_EXCEPTION path=%s method=%s", request.url.path, request.method)
return JSONResponse(
status_code=500,
content={
"error": "Internal Server Error",
"detail": str(exc),
"path": request.url.path,
"stamp": SERVICE_STAMP,
},
)
# ============================================================
# Startup
# ============================================================
@app.on_event("startup")
def startup_event():
setup_logging()
logger.info("[VectorService] Startup stamp=%s file=%s", SERVICE_STAMP, str(Path(__file__).resolve()))
load_all()
t = threading.Thread(target=observer_loop, daemon=True)
t.start()
logger.info("[VectorService] Ready (log=%s)", str(LOG_FILE))
# ============================================================
# Endpoints
# ============================================================
@app.get("/health")
def health():
return {
"status": "ok",
"stamp": SERVICE_STAMP,
"file": str(Path(__file__).resolve()),
"chunk_index_loaded": chunk_index is not None,
"tag_index_loaded": tag_index is not None,
"model_loaded": model is not None,
"embedding_model": loaded_embedding_model_name,
"index_version": current_index_version,
"runtime_stamp": current_runtime_stamp,
"tag_meta_type": type(tag_ids).__name__ if tag_ids is not None else None,
"tag_meta_len": len(tag_ids) if isinstance(tag_ids, list) else None,
"chunk_meta_type": type(chunk_ids).__name__ if chunk_ids is not None else None,
"chunk_meta_len": len(chunk_ids) if isinstance(chunk_ids, list) else None,
"tag_meta_map_len": len(tag_meta_map),
"tags_ndjson_path": str(TAGS_NDJSON_PATH),
"log_file": str(LOG_FILE),
}
@app.post("/reload")
def reload():
try:
load_all()
return {"status": "reloaded", "stamp": SERVICE_STAMP}
except Exception as e:
logger.exception("reload failed")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/search-chunks")
def search_chunks(req: SearchRequest):
if chunk_index is None or chunk_ids is None or model is None:
raise HTTPException(status_code=503, detail="Chunk index not available")
try:
limit = _sanitize_limit(req.limit, default=8, max_limit=200)
query = (req.query or "").strip()
if not query:
raise HTTPException(status_code=400, detail="query must not be empty")
query_vec = model.encode([f"query: {query}"], normalize_embeddings=True)
query_vec = np.array(query_vec).astype("float32")
effective_limit = limit
doc_filter: Optional[List[str]] = None
if req.doc_ids:
doc_filter = []
for d in req.doc_ids:
dk = _as_key(d)
if dk:
doc_filter.append(dk)
effective_limit = max(limit * 5, 50)
effective_limit = min(effective_limit, 500)
scores, indices = chunk_index.search(query_vec, effective_limit)
results = []
for score, idx in zip(scores[0], indices[0]):
if idx == -1:
continue
if idx < 0 or idx >= len(chunk_ids):
continue
raw_chunk_id = chunk_ids[idx]
chunk_id_key = _as_key(raw_chunk_id)
if not chunk_id_key:
continue
doc_id = chunk_doc_map.get(chunk_id_key)
if doc_filter is not None:
if doc_id is None or doc_id not in doc_filter:
continue
payload = {
"chunk_id": raw_chunk_id,
"score": float(score),
"document_id": doc_id,
}
ci = chunk_pos_map.get(chunk_id_key)
if isinstance(ci, int):
payload["chunk_index"] = ci
results.append(payload)
if len(results) >= limit:
break
return results
except HTTPException:
raise
except Exception as e:
logger.exception("search-chunks failure")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/search-tags")
def search_tags(req: SearchRequest):
if tag_index is None or tag_ids is None or model is None:
raise HTTPException(status_code=503, detail="Tag index not available")
try:
limit = _sanitize_limit(req.limit, default=8, max_limit=200)
query = (req.query or "").strip()
if not query:
raise HTTPException(status_code=400, detail="query must not be empty")
query_vec = model.encode([f"query: {query}"], normalize_embeddings=True)
query_vec = np.array(query_vec).astype("float32")
if query_vec.ndim != 2:
raise RuntimeError(f"Invalid embedding shape: {query_vec.shape}")
if query_vec.shape[1] != tag_index.d:
raise RuntimeError(f"Embedding dimension mismatch (vec={query_vec.shape[1]}, index={tag_index.d})")
scores, indices = tag_index.search(query_vec, limit)
results = []
for score, idx in zip(scores[0], indices[0]):
if idx == -1:
continue
if idx < 0 or idx >= len(tag_ids):
continue
tag_id = tag_ids[idx]
tag_id_key = _as_key(tag_id) or ""
payload: Dict[str, Any] = {
"tag_id": tag_id,
"score": float(score),
}
meta = tag_meta_map.get(tag_id_key)
if isinstance(meta, dict):
label = meta.get("label")
ttype = meta.get("tag_type")
if isinstance(label, str) and label.strip():
payload["label"] = label
if isinstance(ttype, str) and ttype.strip():
payload["tag_type"] = ttype
results.append(payload)
return results
except HTTPException:
raise
except Exception as e:
logger.exception("search-tags failure")
raise HTTPException(status_code=500, detail=str(e))