harden vector_control
This commit is contained in:
@@ -7,11 +7,14 @@ import os
|
|||||||
import signal
|
import signal
|
||||||
import socket
|
import socket
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
|
||||||
import time
|
import time
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Dict, List, Optional, Tuple
|
from typing import Dict, List, Optional, Tuple
|
||||||
|
|
||||||
|
# ============================================================
|
||||||
|
# Paths
|
||||||
|
# ============================================================
|
||||||
|
|
||||||
BASE_PATH = Path(__file__).resolve().parents[2]
|
BASE_PATH = Path(__file__).resolve().parents[2]
|
||||||
VENV_DIR = BASE_PATH / ".venv"
|
VENV_DIR = BASE_PATH / ".venv"
|
||||||
VENV_PY = VENV_DIR / "bin" / "python"
|
VENV_PY = VENV_DIR / "bin" / "python"
|
||||||
@@ -34,17 +37,9 @@ REQUIRED_MODULES = [
|
|||||||
"numpy",
|
"numpy",
|
||||||
]
|
]
|
||||||
|
|
||||||
# If you want pinning later, do it here. For now keep simple.
|
# ============================================================
|
||||||
INSTALL_PACKAGES = [
|
# Utilities
|
||||||
"fastapi",
|
# ============================================================
|
||||||
"uvicorn",
|
|
||||||
"numpy",
|
|
||||||
"sentence-transformers",
|
|
||||||
# faiss: depending on your env, it might be "faiss-cpu" (pip) or system package.
|
|
||||||
# Don't force install unless missing import "faiss" and you opt-in via --install.
|
|
||||||
"faiss-cpu",
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
def _now_ms() -> int:
|
def _now_ms() -> int:
|
||||||
return int(time.time() * 1000)
|
return int(time.time() * 1000)
|
||||||
@@ -82,7 +77,7 @@ def _pid_is_running(pid: int) -> bool:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def _is_port_open(host: str, port: int, timeout: float = 0.3) -> bool:
|
def _is_port_open(host: str, port: int, timeout: float = 0.5) -> bool:
|
||||||
try:
|
try:
|
||||||
with socket.create_connection((host, port), timeout=timeout):
|
with socket.create_connection((host, port), timeout=timeout):
|
||||||
return True
|
return True
|
||||||
@@ -90,21 +85,36 @@ def _is_port_open(host: str, port: int, timeout: float = 0.3) -> bool:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def _curl(url: str, timeout_seconds: int = 2) -> Tuple[int, str]:
|
def _curl(url: str, method: str = "GET", timeout_seconds: int = 3) -> Tuple[int, str]:
|
||||||
# We use curl because it's usually available in your container;
|
cmd = [
|
||||||
# if you prefer, we can switch to urllib.
|
"curl",
|
||||||
cmd = ["curl", "-s", "-m", str(timeout_seconds), "-w", "\n%{http_code}", url]
|
"-s",
|
||||||
|
"-X",
|
||||||
|
method,
|
||||||
|
"-m",
|
||||||
|
str(timeout_seconds),
|
||||||
|
"-w",
|
||||||
|
"\n%{http_code}",
|
||||||
|
url,
|
||||||
|
]
|
||||||
|
|
||||||
p = subprocess.run(cmd, capture_output=True, text=True)
|
p = subprocess.run(cmd, capture_output=True, text=True)
|
||||||
out = (p.stdout or "").rstrip("\n")
|
out = (p.stdout or "").rstrip("\n")
|
||||||
|
|
||||||
if "\n" in out:
|
if "\n" in out:
|
||||||
body, code = out.rsplit("\n", 1)
|
body, code = out.rsplit("\n", 1)
|
||||||
try:
|
try:
|
||||||
return int(code), body
|
return int(code), body
|
||||||
except Exception:
|
except Exception:
|
||||||
return 0, body
|
return 0, body
|
||||||
|
|
||||||
return 0, out
|
return 0, out
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================
|
||||||
|
# Dependency Handling
|
||||||
|
# ============================================================
|
||||||
|
|
||||||
def check_modules() -> List[str]:
|
def check_modules() -> List[str]:
|
||||||
missing = []
|
missing = []
|
||||||
for module in REQUIRED_MODULES:
|
for module in REQUIRED_MODULES:
|
||||||
@@ -116,7 +126,6 @@ def check_modules() -> List[str]:
|
|||||||
|
|
||||||
|
|
||||||
def install_missing_modules(missing: List[str]) -> Dict[str, str]:
|
def install_missing_modules(missing: List[str]) -> Dict[str, str]:
|
||||||
# map missing module names to pip packages (faiss -> faiss-cpu, sentence_transformers -> sentence-transformers)
|
|
||||||
mod_to_pkg = {
|
mod_to_pkg = {
|
||||||
"fastapi": "fastapi",
|
"fastapi": "fastapi",
|
||||||
"uvicorn": "uvicorn",
|
"uvicorn": "uvicorn",
|
||||||
@@ -124,58 +133,51 @@ def install_missing_modules(missing: List[str]) -> Dict[str, str]:
|
|||||||
"sentence_transformers": "sentence-transformers",
|
"sentence_transformers": "sentence-transformers",
|
||||||
"faiss": "faiss-cpu",
|
"faiss": "faiss-cpu",
|
||||||
}
|
}
|
||||||
pkgs = []
|
|
||||||
for m in missing:
|
pkgs = [mod_to_pkg.get(m, m) for m in missing]
|
||||||
pkgs.append(mod_to_pkg.get(m, m))
|
|
||||||
|
|
||||||
if not VENV_PIP.exists():
|
if not VENV_PIP.exists():
|
||||||
return {"status": "error", "detail": "pip not found in .venv"}
|
return {"status": "error", "detail": "pip not found in .venv"}
|
||||||
|
|
||||||
cmd = [str(VENV_PIP), "install", *pkgs]
|
cmd = [str(VENV_PIP), "install", *pkgs]
|
||||||
p = subprocess.run(cmd, capture_output=True, text=True)
|
p = subprocess.run(cmd, capture_output=True, text=True)
|
||||||
|
|
||||||
if p.returncode != 0:
|
if p.returncode != 0:
|
||||||
return {"status": "error", "detail": (p.stderr or p.stdout or "pip install failed").strip()}
|
return {"status": "error", "detail": (p.stderr or p.stdout).strip()}
|
||||||
|
|
||||||
return {"status": "ok", "detail": "installed: " + " ".join(pkgs)}
|
return {"status": "ok", "detail": "installed: " + " ".join(pkgs)}
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================
|
||||||
|
# Service Control
|
||||||
|
# ============================================================
|
||||||
|
|
||||||
def service_status(port: int) -> Dict:
|
def service_status(port: int) -> Dict:
|
||||||
pid = _read_pid()
|
pid = _read_pid()
|
||||||
pid_running = bool(pid and _pid_is_running(pid))
|
pid_running = bool(pid and _pid_is_running(pid))
|
||||||
# if pid file is stale, remove it
|
|
||||||
if pid and not pid_running:
|
if pid and not pid_running:
|
||||||
_remove_pid()
|
_remove_pid()
|
||||||
pid = None
|
pid = None
|
||||||
|
|
||||||
health_code, health_body = _curl(DEFAULT_HEALTH_URL.format(port=port), timeout_seconds=2)
|
code, body = _curl(DEFAULT_HEALTH_URL.format(port=port), method="GET")
|
||||||
health_ok = health_code == 200 and health_body.strip() != ""
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"pid_file": str(PID_FILE),
|
|
||||||
"pid": pid,
|
"pid": pid,
|
||||||
"pid_running": pid_running,
|
"pid_running": pid_running,
|
||||||
"health_code": health_code,
|
"health_code": code,
|
||||||
"health_body": health_body if len(health_body) <= 600 else health_body[:600] + "...",
|
"healthy": code == 200,
|
||||||
"healthy": health_ok,
|
"health_body": body,
|
||||||
"port": port,
|
"port": port,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def start_service(host: str, port: int, background: bool, health_retries: int, health_wait_ms: int) -> Dict:
|
def start_service(host: str, port: int) -> Dict:
|
||||||
# already running?
|
|
||||||
st = service_status(port)
|
|
||||||
if st["pid_running"] and st["healthy"]:
|
|
||||||
return {"status": "ok", "detail": "already running", "status_info": st}
|
|
||||||
|
|
||||||
if not UVICORN_BIN.exists():
|
if not UVICORN_BIN.exists():
|
||||||
return {"status": "error", "detail": "uvicorn not found in .venv/bin/uvicorn"}
|
return {"status": "error", "detail": "uvicorn not found in .venv"}
|
||||||
|
|
||||||
# If port already open but pidfile missing, we still consider it running; user can fix by stop with --force later
|
|
||||||
if _is_port_open("127.0.0.1", port):
|
if _is_port_open("127.0.0.1", port):
|
||||||
# Try health anyway
|
return {"status": "error", "detail": f"port {port} already in use"}
|
||||||
st2 = service_status(port)
|
|
||||||
if st2["healthy"]:
|
|
||||||
return {"status": "ok", "detail": "port already in use but service healthy", "status_info": st2}
|
|
||||||
return {"status": "error", "detail": f"port {port} already in use, and /health not healthy"}
|
|
||||||
|
|
||||||
cmd = [
|
cmd = [
|
||||||
str(UVICORN_BIN),
|
str(UVICORN_BIN),
|
||||||
@@ -184,157 +186,108 @@ def start_service(host: str, port: int, background: bool, health_retries: int, h
|
|||||||
"--port", str(port),
|
"--port", str(port),
|
||||||
]
|
]
|
||||||
|
|
||||||
# production: no --reload
|
|
||||||
# run in background by default
|
|
||||||
if background:
|
|
||||||
p = subprocess.Popen(
|
p = subprocess.Popen(
|
||||||
cmd,
|
cmd,
|
||||||
stdout=subprocess.DEVNULL,
|
stdout=subprocess.DEVNULL,
|
||||||
stderr=subprocess.DEVNULL,
|
stderr=subprocess.DEVNULL,
|
||||||
cwd=str(BASE_PATH),
|
cwd=str(BASE_PATH),
|
||||||
start_new_session=True, # detach from terminal
|
start_new_session=True,
|
||||||
)
|
)
|
||||||
_write_pid(p.pid)
|
|
||||||
else:
|
|
||||||
# foreground start (rare in production)
|
|
||||||
p = subprocess.Popen(cmd, cwd=str(BASE_PATH))
|
|
||||||
_write_pid(p.pid)
|
_write_pid(p.pid)
|
||||||
|
|
||||||
# wait for health
|
time.sleep(2)
|
||||||
last = None
|
return {"status": "ok", "detail": "service started", "pid": p.pid}
|
||||||
for _ in range(max(1, health_retries)):
|
|
||||||
time.sleep(health_wait_ms / 1000.0)
|
|
||||||
stx = service_status(port)
|
|
||||||
last = stx
|
|
||||||
if stx["healthy"]:
|
|
||||||
return {"status": "ok", "detail": "started", "status_info": stx}
|
|
||||||
|
|
||||||
return {"status": "error", "detail": "started but health not OK", "status_info": last}
|
|
||||||
|
|
||||||
|
|
||||||
def stop_service(port: int, force: bool = False, wait_seconds: int = 5) -> Dict:
|
def stop_service(port: int, force: bool = False) -> Dict:
|
||||||
pid = _read_pid()
|
pid = _read_pid()
|
||||||
if not pid:
|
if not pid:
|
||||||
# nothing to stop via pid; still check health
|
|
||||||
st = service_status(port)
|
|
||||||
if st["healthy"]:
|
|
||||||
return {"status": "error", "detail": "service healthy but no PID file (cannot stop safely)", "status_info": st}
|
|
||||||
return {"status": "ok", "detail": "not running"}
|
return {"status": "ok", "detail": "not running"}
|
||||||
|
|
||||||
if not _pid_is_running(pid):
|
if not _pid_is_running(pid):
|
||||||
_remove_pid()
|
_remove_pid()
|
||||||
return {"status": "ok", "detail": "not running (stale pid removed)"}
|
return {"status": "ok", "detail": "stale pid removed"}
|
||||||
|
|
||||||
# SIGTERM
|
|
||||||
try:
|
try:
|
||||||
os.kill(pid, signal.SIGTERM)
|
os.kill(pid, signal.SIGTERM)
|
||||||
except Exception as e:
|
time.sleep(2)
|
||||||
if force:
|
|
||||||
try:
|
|
||||||
os.kill(pid, signal.SIGKILL)
|
|
||||||
_remove_pid()
|
|
||||||
return {"status": "ok", "detail": "killed (SIGKILL)"}
|
|
||||||
except Exception as e2:
|
|
||||||
return {"status": "error", "detail": f"failed to kill: {e2}"}
|
|
||||||
return {"status": "error", "detail": f"failed to stop: {e}"}
|
|
||||||
|
|
||||||
# wait for exit
|
|
||||||
end = time.time() + max(1, wait_seconds)
|
|
||||||
while time.time() < end:
|
|
||||||
if not _pid_is_running(pid):
|
if not _pid_is_running(pid):
|
||||||
_remove_pid()
|
_remove_pid()
|
||||||
return {"status": "ok", "detail": "stopped"}
|
return {"status": "ok", "detail": "stopped"}
|
||||||
|
|
||||||
time.sleep(0.2)
|
|
||||||
|
|
||||||
if force:
|
if force:
|
||||||
try:
|
|
||||||
os.kill(pid, signal.SIGKILL)
|
os.kill(pid, signal.SIGKILL)
|
||||||
_remove_pid()
|
_remove_pid()
|
||||||
return {"status": "ok", "detail": "forced stop (SIGKILL)"}
|
return {"status": "ok", "detail": "force stopped"}
|
||||||
except Exception as e:
|
|
||||||
return {"status": "error", "detail": f"failed to SIGKILL: {e}"}
|
|
||||||
|
|
||||||
return {"status": "error", "detail": "timeout stopping (use --force)"}
|
return {"status": "error", "detail": "stop timeout (use --force)"}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
return {"status": "error", "detail": str(e)}
|
||||||
|
|
||||||
|
|
||||||
def reload_service(port: int) -> Dict:
|
def reload_service(port: int) -> Dict:
|
||||||
code, body = _curl(DEFAULT_RELOAD_URL.format(port=port), timeout_seconds=5)
|
code, body = _curl(DEFAULT_RELOAD_URL.format(port=port), method="POST")
|
||||||
if code == 200 and "reloaded" in body:
|
|
||||||
|
if code == 200:
|
||||||
return {"status": "ok", "detail": body}
|
return {"status": "ok", "detail": body}
|
||||||
|
|
||||||
if code == 404:
|
if code == 404:
|
||||||
return {"status": "error", "detail": "reload endpoint not found (wrong service version?)"}
|
return {"status": "error", "detail": "reload endpoint not found"}
|
||||||
|
|
||||||
return {"status": "error", "detail": f"reload failed (http {code}): {body}"}
|
return {"status": "error", "detail": f"reload failed (http {code}): {body}"}
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================
|
||||||
|
# Main
|
||||||
|
# ============================================================
|
||||||
|
|
||||||
def main() -> int:
|
def main() -> int:
|
||||||
parser = argparse.ArgumentParser(description="Production-safe vector service control")
|
parser = argparse.ArgumentParser(description="Vector service control")
|
||||||
parser.add_argument("--install", action="store_true", help="Install missing python deps into .venv")
|
parser.add_argument("--install", action="store_true")
|
||||||
parser.add_argument("--start", action="store_true", help="Start service if not running")
|
parser.add_argument("--start", action="store_true")
|
||||||
parser.add_argument("--stop", action="store_true", help="Stop service using PID file")
|
parser.add_argument("--stop", action="store_true")
|
||||||
parser.add_argument("--force", action="store_true", help="Force stop (SIGKILL) if needed")
|
parser.add_argument("--force", action="store_true")
|
||||||
parser.add_argument("--reload", action="store_true", help="Trigger /reload")
|
parser.add_argument("--reload", action="store_true")
|
||||||
parser.add_argument("--status", action="store_true", help="Print status (default if no action)")
|
parser.add_argument("--status", action="store_true")
|
||||||
parser.add_argument("--port", type=int, default=DEFAULT_PORT)
|
parser.add_argument("--port", type=int, default=DEFAULT_PORT)
|
||||||
parser.add_argument("--host", type=str, default=DEFAULT_HOST)
|
parser.add_argument("--host", type=str, default=DEFAULT_HOST)
|
||||||
parser.add_argument("--foreground", action="store_true", help="Start in foreground (default background)")
|
|
||||||
parser.add_argument("--health-retries", type=int, default=20)
|
|
||||||
parser.add_argument("--health-wait-ms", type=int, default=250)
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
started_ms = _now_ms()
|
result = {
|
||||||
out: Dict = {
|
"ts_ms": _now_ms(),
|
||||||
"ts_ms": started_ms,
|
|
||||||
"base_path": str(BASE_PATH),
|
|
||||||
"venv_python": str(VENV_PY),
|
|
||||||
"pid_file": str(PID_FILE),
|
|
||||||
"actions": [],
|
"actions": [],
|
||||||
"results": {},
|
"results": {},
|
||||||
}
|
}
|
||||||
|
|
||||||
# sanity: venv exists
|
|
||||||
if not VENV_PY.exists():
|
|
||||||
out["results"]["venv"] = {"status": "error", "detail": ".venv/bin/python not found"}
|
|
||||||
print(json.dumps(out, indent=2))
|
|
||||||
return 2
|
|
||||||
|
|
||||||
# 1) deps check
|
|
||||||
missing = check_modules()
|
missing = check_modules()
|
||||||
out["results"]["modules_missing"] = missing
|
result["results"]["modules_missing"] = missing
|
||||||
|
|
||||||
if missing and args.install:
|
if missing and args.install:
|
||||||
out["actions"].append("install")
|
result["actions"].append("install")
|
||||||
out["results"]["install"] = install_missing_modules(missing)
|
result["results"]["install"] = install_missing_modules(missing)
|
||||||
# re-check after install
|
|
||||||
missing2 = check_modules()
|
|
||||||
out["results"]["modules_missing_after"] = missing2
|
|
||||||
|
|
||||||
# 2) service actions
|
|
||||||
if args.stop:
|
if args.stop:
|
||||||
out["actions"].append("stop")
|
result["actions"].append("stop")
|
||||||
out["results"]["stop"] = stop_service(args.port, force=args.force)
|
result["results"]["stop"] = stop_service(args.port, args.force)
|
||||||
|
|
||||||
if args.start:
|
if args.start:
|
||||||
out["actions"].append("start")
|
result["actions"].append("start")
|
||||||
out["results"]["start"] = start_service(
|
result["results"]["start"] = start_service(args.host, args.port)
|
||||||
host=args.host,
|
|
||||||
port=args.port,
|
|
||||||
background=not args.foreground,
|
|
||||||
health_retries=args.health_retries,
|
|
||||||
health_wait_ms=args.health_wait_ms,
|
|
||||||
)
|
|
||||||
|
|
||||||
if args.reload:
|
if args.reload:
|
||||||
out["actions"].append("reload")
|
result["actions"].append("reload")
|
||||||
out["results"]["reload"] = reload_service(args.port)
|
result["results"]["reload"] = reload_service(args.port)
|
||||||
|
|
||||||
# default: status (or if requested)
|
if args.status or not any([args.install, args.start, args.stop, args.reload]):
|
||||||
if args.status or (not args.install and not args.start and not args.stop and not args.reload):
|
result["actions"].append("status")
|
||||||
out["actions"].append("status")
|
result["results"]["status"] = service_status(args.port)
|
||||||
out["results"]["status"] = service_status(args.port)
|
|
||||||
|
|
||||||
out["duration_ms"] = _now_ms() - started_ms
|
result["duration_ms"] = _now_ms() - result["ts_ms"]
|
||||||
print(json.dumps(out, indent=2))
|
|
||||||
|
print(json.dumps(result, indent=2))
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user