#!/usr/bin/env python3 import argparse import importlib import json import os import signal import socket import subprocess import time from pathlib import Path from typing import Dict, List, Optional, Tuple # ============================================================ # Paths # ============================================================ BASE_PATH = Path(__file__).resolve().parents[2] VENV_DIR = BASE_PATH / ".venv" VENV_PY = VENV_DIR / "bin" / "python" VENV_PIP = VENV_DIR / "bin" / "pip" UVICORN_BIN = VENV_DIR / "bin" / "uvicorn" PID_DIR = BASE_PATH / "var" / "run" PID_FILE = PID_DIR / "vector_service.pid" DEFAULT_HOST = "0.0.0.0" DEFAULT_PORT = 8090 DEFAULT_HEALTH_URL = "http://127.0.0.1:{port}/health" DEFAULT_RELOAD_URL = "http://127.0.0.1:{port}/reload" REQUIRED_MODULES = [ "fastapi", "uvicorn", "faiss", "sentence_transformers", "numpy", ] # ============================================================ # Utilities # ============================================================ def _now_ms() -> int: return int(time.time() * 1000) def _read_pid() -> Optional[int]: try: if PID_FILE.exists(): content = PID_FILE.read_text(encoding="utf-8").strip() if content.isdigit(): return int(content) except Exception: return None return None def _write_pid(pid: int) -> None: PID_DIR.mkdir(parents=True, exist_ok=True) PID_FILE.write_text(str(pid), encoding="utf-8") def _remove_pid() -> None: try: if PID_FILE.exists(): PID_FILE.unlink() except Exception: pass def _pid_is_running(pid: int) -> bool: try: os.kill(pid, 0) return True except Exception: return False def _is_port_open(host: str, port: int, timeout: float = 0.5) -> bool: try: with socket.create_connection((host, port), timeout=timeout): return True except Exception: return False def _curl(url: str, method: str = "GET", timeout_seconds: int = 3) -> Tuple[int, str]: cmd = [ "curl", "-s", "-X", method, "-m", str(timeout_seconds), "-w", "\n%{http_code}", url, ] p = subprocess.run(cmd, capture_output=True, text=True) out = (p.stdout or "").rstrip("\n") if "\n" in out: body, code = out.rsplit("\n", 1) try: return int(code), body except Exception: return 0, body return 0, out # ============================================================ # Dependency Handling # ============================================================ def check_modules() -> List[str]: missing = [] for module in REQUIRED_MODULES: try: importlib.import_module(module) except Exception: missing.append(module) return missing def install_missing_modules(missing: List[str]) -> Dict[str, str]: mod_to_pkg = { "fastapi": "fastapi", "uvicorn": "uvicorn", "numpy": "numpy", "sentence_transformers": "sentence-transformers", "faiss": "faiss-cpu", } pkgs = [mod_to_pkg.get(m, m) for m in missing] if not VENV_PIP.exists(): return {"status": "error", "detail": "pip not found in .venv"} cmd = [str(VENV_PIP), "install", *pkgs] p = subprocess.run(cmd, capture_output=True, text=True) if p.returncode != 0: return {"status": "error", "detail": (p.stderr or p.stdout).strip()} return {"status": "ok", "detail": "installed: " + " ".join(pkgs)} # ============================================================ # Service Control # ============================================================ def service_status(port: int) -> Dict: pid = _read_pid() pid_running = bool(pid and _pid_is_running(pid)) if pid and not pid_running: _remove_pid() pid = None code, body = _curl(DEFAULT_HEALTH_URL.format(port=port), method="GET") return { "pid": pid, "pid_running": pid_running, "health_code": code, "healthy": code == 200, "health_body": body, "port": port, } def start_service(host: str, port: int) -> Dict: if not UVICORN_BIN.exists(): return {"status": "error", "detail": "uvicorn not found in .venv"} if _is_port_open("127.0.0.1", port): return {"status": "error", "detail": f"port {port} already in use"} cmd = [ str(UVICORN_BIN), "python.vector.vector_service:app", "--host", host, "--port", str(port), ] p = subprocess.Popen( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, cwd=str(BASE_PATH), start_new_session=True, ) _write_pid(p.pid) time.sleep(2) return {"status": "ok", "detail": "service started", "pid": p.pid} def stop_service(port: int, force: bool = False) -> Dict: pid = _read_pid() if not pid: return {"status": "ok", "detail": "not running"} if not _pid_is_running(pid): _remove_pid() return {"status": "ok", "detail": "stale pid removed"} try: os.kill(pid, signal.SIGTERM) time.sleep(2) if not _pid_is_running(pid): _remove_pid() return {"status": "ok", "detail": "stopped"} if force: os.kill(pid, signal.SIGKILL) _remove_pid() return {"status": "ok", "detail": "force stopped"} return {"status": "error", "detail": "stop timeout (use --force)"} except Exception as e: return {"status": "error", "detail": str(e)} def reload_service(port: int) -> Dict: code, body = _curl(DEFAULT_RELOAD_URL.format(port=port), method="POST") if code == 200: return {"status": "ok", "detail": body} if code == 404: return {"status": "error", "detail": "reload endpoint not found"} return {"status": "error", "detail": f"reload failed (http {code}): {body}"} # ============================================================ # Main # ============================================================ def main() -> int: parser = argparse.ArgumentParser(description="Vector service control") parser.add_argument("--install", action="store_true") parser.add_argument("--start", action="store_true") parser.add_argument("--stop", action="store_true") parser.add_argument("--force", action="store_true") parser.add_argument("--reload", action="store_true") parser.add_argument("--status", action="store_true") parser.add_argument("--port", type=int, default=DEFAULT_PORT) parser.add_argument("--host", type=str, default=DEFAULT_HOST) args = parser.parse_args() result = { "ts_ms": _now_ms(), "actions": [], "results": {}, } missing = check_modules() result["results"]["modules_missing"] = missing if missing and args.install: result["actions"].append("install") result["results"]["install"] = install_missing_modules(missing) if args.stop: result["actions"].append("stop") result["results"]["stop"] = stop_service(args.port, args.force) if args.start: result["actions"].append("start") result["results"]["start"] = start_service(args.host, args.port) if args.reload: result["actions"].append("reload") result["results"]["reload"] = reload_service(args.port) if args.status or not any([args.install, args.start, args.stop, args.reload]): result["actions"].append("status") result["results"]["status"] = service_status(args.port) result["duration_ms"] = _now_ms() - result["ts_ms"] print(json.dumps(result, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())