#!/usr/bin/env python3 import argparse import importlib import json import os import subprocess import sys import time from pathlib import Path BASE_PATH = Path(__file__).resolve().parents[2] KNOWLEDGE_DIR = BASE_PATH / "var" / "knowledge" REQUIRED_MODULES = [ "fastapi", "uvicorn", "faiss", "sentence_transformers", "numpy", ] VENV_PIP = BASE_PATH / ".venv" / "bin" / "pip" UVICORN_BIN = BASE_PATH / ".venv" / "bin" / "uvicorn" def check_modules(): missing = [] for module in REQUIRED_MODULES: try: importlib.import_module(module) except Exception: missing.append(module) return missing def install_modules(modules): if not modules: return subprocess.call([str(VENV_PIP), "install", *modules]) def service_running(): result = subprocess.run( ["ps", "aux"], capture_output=True, text=True ) return "uvicorn src.Vector.vector_service:app" in result.stdout def start_service(): subprocess.Popen([ str(UVICORN_BIN), "src.Vector.vector_service:app", "--host", "0.0.0.0", "--port", "8090" ]) time.sleep(2) def reload_service(): subprocess.call([ "curl", "-s", "-X", "POST", "http://127.0.0.1:8090/reload" ]) def health_check(): try: result = subprocess.run( ["curl", "-s", "http://127.0.0.1:8090/health"], capture_output=True, text=True ) return result.stdout.strip() except Exception: return None def main(): parser = argparse.ArgumentParser() parser.add_argument("--install", action="store_true") parser.add_argument("--start", action="store_true") parser.add_argument("--reload", action="store_true") args = parser.parse_args() result = { "modules_missing": [], "service_running": False, "health": None, "actions": [] } # 1️⃣ Check modules missing = check_modules() result["modules_missing"] = missing if missing and args.install: install_modules(missing) result["actions"].append("modules_installed") # 2️⃣ Service check running = service_running() result["service_running"] = running if not running and args.start: start_service() result["actions"].append("service_started") running = True # 3️⃣ Reload if args.reload: reload_service() result["actions"].append("service_reloaded") # 4️⃣ Health if running: result["health"] = health_check() print(json.dumps(result, indent=2)) if __name__ == "__main__": main()