From 5656a10930eb287f928dfb74da794ca3c5ea52e3 Mon Sep 17 00:00:00 2001 From: team2 Date: Sun, 22 Feb 2026 09:12:01 +0100 Subject: [PATCH] harden vector_control --- src/Vector/vector_control.py | 243 ++++++++++++++--------------------- 1 file changed, 98 insertions(+), 145 deletions(-) diff --git a/src/Vector/vector_control.py b/src/Vector/vector_control.py index 897236d..a1e2734 100644 --- a/src/Vector/vector_control.py +++ b/src/Vector/vector_control.py @@ -7,11 +7,14 @@ import os import signal import socket import subprocess -import sys import time from pathlib import Path from typing import Dict, List, Optional, Tuple +# ============================================================ +# Paths +# ============================================================ + BASE_PATH = Path(__file__).resolve().parents[2] VENV_DIR = BASE_PATH / ".venv" VENV_PY = VENV_DIR / "bin" / "python" @@ -34,17 +37,9 @@ REQUIRED_MODULES = [ "numpy", ] -# If you want pinning later, do it here. For now keep simple. -INSTALL_PACKAGES = [ - "fastapi", - "uvicorn", - "numpy", - "sentence-transformers", - # faiss: depending on your env, it might be "faiss-cpu" (pip) or system package. - # Don't force install unless missing import "faiss" and you opt-in via --install. - "faiss-cpu", -] - +# ============================================================ +# Utilities +# ============================================================ def _now_ms() -> int: return int(time.time() * 1000) @@ -82,7 +77,7 @@ def _pid_is_running(pid: int) -> bool: return False -def _is_port_open(host: str, port: int, timeout: float = 0.3) -> bool: +def _is_port_open(host: str, port: int, timeout: float = 0.5) -> bool: try: with socket.create_connection((host, port), timeout=timeout): return True @@ -90,21 +85,36 @@ def _is_port_open(host: str, port: int, timeout: float = 0.3) -> bool: return False -def _curl(url: str, timeout_seconds: int = 2) -> Tuple[int, str]: - # We use curl because it's usually available in your container; - # if you prefer, we can switch to urllib. - cmd = ["curl", "-s", "-m", str(timeout_seconds), "-w", "\n%{http_code}", url] +def _curl(url: str, method: str = "GET", timeout_seconds: int = 3) -> Tuple[int, str]: + cmd = [ + "curl", + "-s", + "-X", + method, + "-m", + str(timeout_seconds), + "-w", + "\n%{http_code}", + url, + ] + p = subprocess.run(cmd, capture_output=True, text=True) out = (p.stdout or "").rstrip("\n") + if "\n" in out: body, code = out.rsplit("\n", 1) try: return int(code), body except Exception: return 0, body + return 0, out +# ============================================================ +# Dependency Handling +# ============================================================ + def check_modules() -> List[str]: missing = [] for module in REQUIRED_MODULES: @@ -116,7 +126,6 @@ def check_modules() -> List[str]: def install_missing_modules(missing: List[str]) -> Dict[str, str]: - # map missing module names to pip packages (faiss -> faiss-cpu, sentence_transformers -> sentence-transformers) mod_to_pkg = { "fastapi": "fastapi", "uvicorn": "uvicorn", @@ -124,58 +133,51 @@ def install_missing_modules(missing: List[str]) -> Dict[str, str]: "sentence_transformers": "sentence-transformers", "faiss": "faiss-cpu", } - pkgs = [] - for m in missing: - pkgs.append(mod_to_pkg.get(m, m)) + + pkgs = [mod_to_pkg.get(m, m) for m in missing] if not VENV_PIP.exists(): return {"status": "error", "detail": "pip not found in .venv"} cmd = [str(VENV_PIP), "install", *pkgs] p = subprocess.run(cmd, capture_output=True, text=True) + if p.returncode != 0: - return {"status": "error", "detail": (p.stderr or p.stdout or "pip install failed").strip()} + return {"status": "error", "detail": (p.stderr or p.stdout).strip()} + return {"status": "ok", "detail": "installed: " + " ".join(pkgs)} +# ============================================================ +# Service Control +# ============================================================ + def service_status(port: int) -> Dict: pid = _read_pid() pid_running = bool(pid and _pid_is_running(pid)) - # if pid file is stale, remove it + if pid and not pid_running: _remove_pid() pid = None - health_code, health_body = _curl(DEFAULT_HEALTH_URL.format(port=port), timeout_seconds=2) - health_ok = health_code == 200 and health_body.strip() != "" + code, body = _curl(DEFAULT_HEALTH_URL.format(port=port), method="GET") return { - "pid_file": str(PID_FILE), "pid": pid, "pid_running": pid_running, - "health_code": health_code, - "health_body": health_body if len(health_body) <= 600 else health_body[:600] + "...", - "healthy": health_ok, + "health_code": code, + "healthy": code == 200, + "health_body": body, "port": port, } -def start_service(host: str, port: int, background: bool, health_retries: int, health_wait_ms: int) -> Dict: - # already running? - st = service_status(port) - if st["pid_running"] and st["healthy"]: - return {"status": "ok", "detail": "already running", "status_info": st} - +def start_service(host: str, port: int) -> Dict: if not UVICORN_BIN.exists(): - return {"status": "error", "detail": "uvicorn not found in .venv/bin/uvicorn"} + return {"status": "error", "detail": "uvicorn not found in .venv"} - # If port already open but pidfile missing, we still consider it running; user can fix by stop with --force later if _is_port_open("127.0.0.1", port): - # Try health anyway - st2 = service_status(port) - if st2["healthy"]: - return {"status": "ok", "detail": "port already in use but service healthy", "status_info": st2} - return {"status": "error", "detail": f"port {port} already in use, and /health not healthy"} + return {"status": "error", "detail": f"port {port} already in use"} cmd = [ str(UVICORN_BIN), @@ -184,157 +186,108 @@ def start_service(host: str, port: int, background: bool, health_retries: int, h "--port", str(port), ] - # production: no --reload - # run in background by default - if background: - p = subprocess.Popen( - cmd, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - cwd=str(BASE_PATH), - start_new_session=True, # detach from terminal - ) - _write_pid(p.pid) - else: - # foreground start (rare in production) - p = subprocess.Popen(cmd, cwd=str(BASE_PATH)) - _write_pid(p.pid) + p = subprocess.Popen( + cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + cwd=str(BASE_PATH), + start_new_session=True, + ) - # wait for health - last = None - for _ in range(max(1, health_retries)): - time.sleep(health_wait_ms / 1000.0) - stx = service_status(port) - last = stx - if stx["healthy"]: - return {"status": "ok", "detail": "started", "status_info": stx} + _write_pid(p.pid) - return {"status": "error", "detail": "started but health not OK", "status_info": last} + time.sleep(2) + return {"status": "ok", "detail": "service started", "pid": p.pid} -def stop_service(port: int, force: bool = False, wait_seconds: int = 5) -> Dict: +def stop_service(port: int, force: bool = False) -> Dict: pid = _read_pid() if not pid: - # nothing to stop via pid; still check health - st = service_status(port) - if st["healthy"]: - return {"status": "error", "detail": "service healthy but no PID file (cannot stop safely)", "status_info": st} return {"status": "ok", "detail": "not running"} if not _pid_is_running(pid): _remove_pid() - return {"status": "ok", "detail": "not running (stale pid removed)"} + return {"status": "ok", "detail": "stale pid removed"} - # SIGTERM try: os.kill(pid, signal.SIGTERM) - except Exception as e: - if force: - try: - os.kill(pid, signal.SIGKILL) - _remove_pid() - return {"status": "ok", "detail": "killed (SIGKILL)"} - except Exception as e2: - return {"status": "error", "detail": f"failed to kill: {e2}"} - return {"status": "error", "detail": f"failed to stop: {e}"} - - # wait for exit - end = time.time() + max(1, wait_seconds) - while time.time() < end: + time.sleep(2) if not _pid_is_running(pid): _remove_pid() return {"status": "ok", "detail": "stopped"} - time.sleep(0.2) - - if force: - try: + if force: os.kill(pid, signal.SIGKILL) _remove_pid() - return {"status": "ok", "detail": "forced stop (SIGKILL)"} - except Exception as e: - return {"status": "error", "detail": f"failed to SIGKILL: {e}"} + return {"status": "ok", "detail": "force stopped"} - return {"status": "error", "detail": "timeout stopping (use --force)"} + return {"status": "error", "detail": "stop timeout (use --force)"} + + except Exception as e: + return {"status": "error", "detail": str(e)} def reload_service(port: int) -> Dict: - code, body = _curl(DEFAULT_RELOAD_URL.format(port=port), timeout_seconds=5) - if code == 200 and "reloaded" in body: + code, body = _curl(DEFAULT_RELOAD_URL.format(port=port), method="POST") + + if code == 200: return {"status": "ok", "detail": body} + if code == 404: - return {"status": "error", "detail": "reload endpoint not found (wrong service version?)"} + return {"status": "error", "detail": "reload endpoint not found"} + return {"status": "error", "detail": f"reload failed (http {code}): {body}"} +# ============================================================ +# Main +# ============================================================ + def main() -> int: - parser = argparse.ArgumentParser(description="Production-safe vector service control") - parser.add_argument("--install", action="store_true", help="Install missing python deps into .venv") - parser.add_argument("--start", action="store_true", help="Start service if not running") - parser.add_argument("--stop", action="store_true", help="Stop service using PID file") - parser.add_argument("--force", action="store_true", help="Force stop (SIGKILL) if needed") - parser.add_argument("--reload", action="store_true", help="Trigger /reload") - parser.add_argument("--status", action="store_true", help="Print status (default if no action)") + parser = argparse.ArgumentParser(description="Vector service control") + parser.add_argument("--install", action="store_true") + parser.add_argument("--start", action="store_true") + parser.add_argument("--stop", action="store_true") + parser.add_argument("--force", action="store_true") + parser.add_argument("--reload", action="store_true") + parser.add_argument("--status", action="store_true") parser.add_argument("--port", type=int, default=DEFAULT_PORT) parser.add_argument("--host", type=str, default=DEFAULT_HOST) - parser.add_argument("--foreground", action="store_true", help="Start in foreground (default background)") - parser.add_argument("--health-retries", type=int, default=20) - parser.add_argument("--health-wait-ms", type=int, default=250) + args = parser.parse_args() - started_ms = _now_ms() - out: Dict = { - "ts_ms": started_ms, - "base_path": str(BASE_PATH), - "venv_python": str(VENV_PY), - "pid_file": str(PID_FILE), + result = { + "ts_ms": _now_ms(), "actions": [], "results": {}, } - # sanity: venv exists - if not VENV_PY.exists(): - out["results"]["venv"] = {"status": "error", "detail": ".venv/bin/python not found"} - print(json.dumps(out, indent=2)) - return 2 - - # 1) deps check missing = check_modules() - out["results"]["modules_missing"] = missing + result["results"]["modules_missing"] = missing if missing and args.install: - out["actions"].append("install") - out["results"]["install"] = install_missing_modules(missing) - # re-check after install - missing2 = check_modules() - out["results"]["modules_missing_after"] = missing2 + result["actions"].append("install") + result["results"]["install"] = install_missing_modules(missing) - # 2) service actions if args.stop: - out["actions"].append("stop") - out["results"]["stop"] = stop_service(args.port, force=args.force) + result["actions"].append("stop") + result["results"]["stop"] = stop_service(args.port, args.force) if args.start: - out["actions"].append("start") - out["results"]["start"] = start_service( - host=args.host, - port=args.port, - background=not args.foreground, - health_retries=args.health_retries, - health_wait_ms=args.health_wait_ms, - ) + result["actions"].append("start") + result["results"]["start"] = start_service(args.host, args.port) if args.reload: - out["actions"].append("reload") - out["results"]["reload"] = reload_service(args.port) + result["actions"].append("reload") + result["results"]["reload"] = reload_service(args.port) - # default: status (or if requested) - if args.status or (not args.install and not args.start and not args.stop and not args.reload): - out["actions"].append("status") - out["results"]["status"] = service_status(args.port) + if args.status or not any([args.install, args.start, args.stop, args.reload]): + result["actions"].append("status") + result["results"]["status"] = service_status(args.port) - out["duration_ms"] = _now_ms() - started_ms - print(json.dumps(out, indent=2)) + result["duration_ms"] = _now_ms() - result["ts_ms"] + + print(json.dumps(result, indent=2)) return 0