201 lines
7.3 KiB
Python
201 lines
7.3 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
|
|
HOME = Path.home()
|
|
ENV_KEYS = {
|
|
"node": "WATCHDOG_B_NODE_BIN",
|
|
"openclaw_mjs": "WATCHDOG_B_OPENCLAW_MJS",
|
|
"openclaw_entry": "WATCHDOG_B_OPENCLAW_ENTRY",
|
|
}
|
|
|
|
|
|
def dedupe(items: Iterable[Path]) -> list[Path]:
|
|
seen: set[str] = set()
|
|
out: list[Path] = []
|
|
for item in items:
|
|
key = str(item)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
out.append(item)
|
|
return out
|
|
|
|
|
|
def path_candidates() -> tuple[Path | None, list[Path], list[Path]]:
|
|
node_path = shutil.which("node")
|
|
openclaw_path = shutil.which("openclaw")
|
|
node_candidate = Path(node_path).resolve() if node_path else None
|
|
roots: list[Path] = []
|
|
entry_candidates: list[Path] = []
|
|
if openclaw_path:
|
|
op = Path(openclaw_path).resolve()
|
|
roots.extend([
|
|
op.parent.parent / "lib" / "node_modules" / "openclaw",
|
|
op.parent.parent.parent / "lib" / "node_modules" / "openclaw",
|
|
])
|
|
entry_candidates.append(op.parent.parent / "lib" / "node_modules" / "openclaw" / "dist" / "entry.js")
|
|
if node_candidate:
|
|
roots.append(node_candidate.parent.parent / "lib" / "node_modules" / "openclaw")
|
|
return node_candidate, dedupe(roots), dedupe(entry_candidates)
|
|
|
|
|
|
def common_roots() -> list[Path]:
|
|
roots: list[Path] = []
|
|
nvm_dir = Path(os.environ.get("NVM_DIR", HOME / ".nvm")).expanduser()
|
|
roots.extend([
|
|
HOME / ".nvm" / "versions" / "node",
|
|
nvm_dir / "versions" / "node",
|
|
HOME / ".local" / "share" / "pnpm" / "global",
|
|
HOME / ".npm-global",
|
|
Path("/usr/local"),
|
|
Path("/usr"),
|
|
HOME / ".volta" / "tools" / "image",
|
|
])
|
|
return dedupe(roots)
|
|
|
|
|
|
def scan_openclaw_install_roots() -> list[Path]:
|
|
candidates: list[Path] = []
|
|
for root in common_roots():
|
|
if not root.exists():
|
|
continue
|
|
if root.name == "node":
|
|
for child in sorted(root.glob("v*/lib/node_modules/openclaw"), reverse=True):
|
|
candidates.append(child)
|
|
continue
|
|
patterns = [
|
|
"lib/node_modules/openclaw",
|
|
"node_modules/openclaw",
|
|
"*/lib/node_modules/openclaw",
|
|
"*/node_modules/openclaw",
|
|
]
|
|
for pattern in patterns:
|
|
for child in sorted(root.glob(pattern), reverse=True):
|
|
candidates.append(child)
|
|
return dedupe(candidates)
|
|
|
|
|
|
def valid_node(path: Path | None) -> Path | None:
|
|
if path and path.exists() and os.access(path, os.X_OK):
|
|
return path
|
|
return None
|
|
|
|
|
|
def valid_file(path: Path | None) -> Path | None:
|
|
if path and path.is_file():
|
|
return path
|
|
return None
|
|
|
|
|
|
def detect_runtime() -> dict[str, object]:
|
|
result: dict[str, object] = {"ok": False, "detected": {}, "sources": {}, "searched": {}}
|
|
detected: dict[str, str] = {}
|
|
sources: dict[str, str] = {}
|
|
searched: dict[str, list[str]] = {"node": [], "openclaw": []}
|
|
|
|
env_node = os.environ.get(ENV_KEYS["node"])
|
|
if env_node:
|
|
searched["node"].append(env_node)
|
|
node = valid_node(Path(env_node).expanduser())
|
|
if node:
|
|
detected["node"] = str(node)
|
|
sources["node"] = f"env:{ENV_KEYS['node']}"
|
|
env_mjs = os.environ.get(ENV_KEYS["openclaw_mjs"])
|
|
if env_mjs:
|
|
searched["openclaw"].append(env_mjs)
|
|
mjs = valid_file(Path(env_mjs).expanduser())
|
|
if mjs:
|
|
detected["openclaw_mjs"] = str(mjs)
|
|
sources["openclaw_mjs"] = f"env:{ENV_KEYS['openclaw_mjs']}"
|
|
env_entry = os.environ.get(ENV_KEYS["openclaw_entry"])
|
|
if env_entry:
|
|
searched["openclaw"].append(env_entry)
|
|
entry = valid_file(Path(env_entry).expanduser())
|
|
if entry:
|
|
detected["openclaw_entry"] = str(entry)
|
|
sources["openclaw_entry"] = f"env:{ENV_KEYS['openclaw_entry']}"
|
|
|
|
path_node, path_roots, path_entry_candidates = path_candidates()
|
|
if "node" not in detected and path_node:
|
|
searched["node"].append(str(path_node))
|
|
node = valid_node(path_node)
|
|
if node:
|
|
detected["node"] = str(node)
|
|
sources["node"] = "path:node"
|
|
|
|
install_roots = dedupe(path_roots + path_entry_candidates + scan_openclaw_install_roots())
|
|
searched["openclaw"].extend(str(p) for p in install_roots)
|
|
|
|
def fill_from_root(root: Path, source: str) -> None:
|
|
if root.is_file():
|
|
candidate_entry = valid_file(root)
|
|
if candidate_entry and candidate_entry.name == "entry.js" and "openclaw_entry" not in detected:
|
|
detected["openclaw_entry"] = str(candidate_entry)
|
|
sources["openclaw_entry"] = source
|
|
root = candidate_entry.parent.parent
|
|
elif candidate_entry and candidate_entry.name == "openclaw.mjs" and "openclaw_mjs" not in detected:
|
|
detected["openclaw_mjs"] = str(candidate_entry)
|
|
sources["openclaw_mjs"] = source
|
|
root = candidate_entry.parent
|
|
else:
|
|
return
|
|
candidate_mjs = valid_file(root / "openclaw.mjs")
|
|
candidate_entry = valid_file(root / "dist" / "entry.js")
|
|
if candidate_mjs and "openclaw_mjs" not in detected:
|
|
detected["openclaw_mjs"] = str(candidate_mjs)
|
|
sources["openclaw_mjs"] = source
|
|
if candidate_entry and "openclaw_entry" not in detected:
|
|
detected["openclaw_entry"] = str(candidate_entry)
|
|
sources["openclaw_entry"] = source
|
|
|
|
for root in install_roots:
|
|
source = "path:openclaw" if root in path_roots or root in path_entry_candidates else "scan:common-locations"
|
|
fill_from_root(root, source)
|
|
if all(k in detected for k in ("openclaw_mjs", "openclaw_entry")):
|
|
break
|
|
|
|
result["detected"] = detected
|
|
result["sources"] = sources
|
|
result["searched"] = searched
|
|
result["ok"] = all(k in detected for k in ("node", "openclaw_mjs", "openclaw_entry"))
|
|
if not result["ok"]:
|
|
missing = [k for k in ("node", "openclaw_mjs", "openclaw_entry") if k not in detected]
|
|
result["missing"] = missing
|
|
result["error"] = (
|
|
"Could not auto-detect: " + ", ".join(missing) + ". "
|
|
"Set WATCHDOG_B_NODE_BIN / WATCHDOG_B_OPENCLAW_MJS / WATCHDOG_B_OPENCLAW_ENTRY explicitly if this host uses a non-standard install path."
|
|
)
|
|
return result
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Detect node/openclaw runtime paths for watchdog-b scripts")
|
|
parser.add_argument("--shell", action="store_true", help="print shell export lines")
|
|
parser.add_argument("--pretty", action="store_true", help="pretty-print json")
|
|
args = parser.parse_args()
|
|
|
|
result = detect_runtime()
|
|
if args.shell:
|
|
if not result["ok"]:
|
|
print(result["error"], flush=True)
|
|
return 1
|
|
detected = result["detected"]
|
|
print(f'WATCHDOG_B_NODE_BIN={detected["node"]}')
|
|
print(f'WATCHDOG_B_OPENCLAW_MJS={detected["openclaw_mjs"]}')
|
|
print(f'WATCHDOG_B_OPENCLAW_ENTRY={detected["openclaw_entry"]}')
|
|
return 0
|
|
|
|
print(json.dumps(result, ensure_ascii=False, indent=2 if args.pretty else None))
|
|
return 0 if result["ok"] else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|