Initial import of watchdog-discord-route skill
This commit is contained in:
200
scripts/openclaw_runtime_probe.py
Normal file
200
scripts/openclaw_runtime_probe.py
Normal file
@@ -0,0 +1,200 @@
|
||||
#!/usr/bin/env python3
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
|
||||
HOME = Path.home()
|
||||
ENV_KEYS = {
|
||||
"node": "WATCHDOG_B_NODE_BIN",
|
||||
"openclaw_mjs": "WATCHDOG_B_OPENCLAW_MJS",
|
||||
"openclaw_entry": "WATCHDOG_B_OPENCLAW_ENTRY",
|
||||
}
|
||||
|
||||
|
||||
def dedupe(items: Iterable[Path]) -> list[Path]:
|
||||
seen: set[str] = set()
|
||||
out: list[Path] = []
|
||||
for item in items:
|
||||
key = str(item)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
out.append(item)
|
||||
return out
|
||||
|
||||
|
||||
def path_candidates() -> tuple[Path | None, list[Path], list[Path]]:
|
||||
node_path = shutil.which("node")
|
||||
openclaw_path = shutil.which("openclaw")
|
||||
node_candidate = Path(node_path).resolve() if node_path else None
|
||||
roots: list[Path] = []
|
||||
entry_candidates: list[Path] = []
|
||||
if openclaw_path:
|
||||
op = Path(openclaw_path).resolve()
|
||||
roots.extend([
|
||||
op.parent.parent / "lib" / "node_modules" / "openclaw",
|
||||
op.parent.parent.parent / "lib" / "node_modules" / "openclaw",
|
||||
])
|
||||
entry_candidates.append(op.parent.parent / "lib" / "node_modules" / "openclaw" / "dist" / "entry.js")
|
||||
if node_candidate:
|
||||
roots.append(node_candidate.parent.parent / "lib" / "node_modules" / "openclaw")
|
||||
return node_candidate, dedupe(roots), dedupe(entry_candidates)
|
||||
|
||||
|
||||
def common_roots() -> list[Path]:
|
||||
roots: list[Path] = []
|
||||
nvm_dir = Path(os.environ.get("NVM_DIR", HOME / ".nvm")).expanduser()
|
||||
roots.extend([
|
||||
HOME / ".nvm" / "versions" / "node",
|
||||
nvm_dir / "versions" / "node",
|
||||
HOME / ".local" / "share" / "pnpm" / "global",
|
||||
HOME / ".npm-global",
|
||||
Path("/usr/local"),
|
||||
Path("/usr"),
|
||||
HOME / ".volta" / "tools" / "image",
|
||||
])
|
||||
return dedupe(roots)
|
||||
|
||||
|
||||
def scan_openclaw_install_roots() -> list[Path]:
|
||||
candidates: list[Path] = []
|
||||
for root in common_roots():
|
||||
if not root.exists():
|
||||
continue
|
||||
if root.name == "node":
|
||||
for child in sorted(root.glob("v*/lib/node_modules/openclaw"), reverse=True):
|
||||
candidates.append(child)
|
||||
continue
|
||||
patterns = [
|
||||
"lib/node_modules/openclaw",
|
||||
"node_modules/openclaw",
|
||||
"*/lib/node_modules/openclaw",
|
||||
"*/node_modules/openclaw",
|
||||
]
|
||||
for pattern in patterns:
|
||||
for child in sorted(root.glob(pattern), reverse=True):
|
||||
candidates.append(child)
|
||||
return dedupe(candidates)
|
||||
|
||||
|
||||
def valid_node(path: Path | None) -> Path | None:
|
||||
if path and path.exists() and os.access(path, os.X_OK):
|
||||
return path
|
||||
return None
|
||||
|
||||
|
||||
def valid_file(path: Path | None) -> Path | None:
|
||||
if path and path.is_file():
|
||||
return path
|
||||
return None
|
||||
|
||||
|
||||
def detect_runtime() -> dict[str, object]:
|
||||
result: dict[str, object] = {"ok": False, "detected": {}, "sources": {}, "searched": {}}
|
||||
detected: dict[str, str] = {}
|
||||
sources: dict[str, str] = {}
|
||||
searched: dict[str, list[str]] = {"node": [], "openclaw": []}
|
||||
|
||||
env_node = os.environ.get(ENV_KEYS["node"])
|
||||
if env_node:
|
||||
searched["node"].append(env_node)
|
||||
node = valid_node(Path(env_node).expanduser())
|
||||
if node:
|
||||
detected["node"] = str(node)
|
||||
sources["node"] = f"env:{ENV_KEYS['node']}"
|
||||
env_mjs = os.environ.get(ENV_KEYS["openclaw_mjs"])
|
||||
if env_mjs:
|
||||
searched["openclaw"].append(env_mjs)
|
||||
mjs = valid_file(Path(env_mjs).expanduser())
|
||||
if mjs:
|
||||
detected["openclaw_mjs"] = str(mjs)
|
||||
sources["openclaw_mjs"] = f"env:{ENV_KEYS['openclaw_mjs']}"
|
||||
env_entry = os.environ.get(ENV_KEYS["openclaw_entry"])
|
||||
if env_entry:
|
||||
searched["openclaw"].append(env_entry)
|
||||
entry = valid_file(Path(env_entry).expanduser())
|
||||
if entry:
|
||||
detected["openclaw_entry"] = str(entry)
|
||||
sources["openclaw_entry"] = f"env:{ENV_KEYS['openclaw_entry']}"
|
||||
|
||||
path_node, path_roots, path_entry_candidates = path_candidates()
|
||||
if "node" not in detected and path_node:
|
||||
searched["node"].append(str(path_node))
|
||||
node = valid_node(path_node)
|
||||
if node:
|
||||
detected["node"] = str(node)
|
||||
sources["node"] = "path:node"
|
||||
|
||||
install_roots = dedupe(path_roots + path_entry_candidates + scan_openclaw_install_roots())
|
||||
searched["openclaw"].extend(str(p) for p in install_roots)
|
||||
|
||||
def fill_from_root(root: Path, source: str) -> None:
|
||||
if root.is_file():
|
||||
candidate_entry = valid_file(root)
|
||||
if candidate_entry and candidate_entry.name == "entry.js" and "openclaw_entry" not in detected:
|
||||
detected["openclaw_entry"] = str(candidate_entry)
|
||||
sources["openclaw_entry"] = source
|
||||
root = candidate_entry.parent.parent
|
||||
elif candidate_entry and candidate_entry.name == "openclaw.mjs" and "openclaw_mjs" not in detected:
|
||||
detected["openclaw_mjs"] = str(candidate_entry)
|
||||
sources["openclaw_mjs"] = source
|
||||
root = candidate_entry.parent
|
||||
else:
|
||||
return
|
||||
candidate_mjs = valid_file(root / "openclaw.mjs")
|
||||
candidate_entry = valid_file(root / "dist" / "entry.js")
|
||||
if candidate_mjs and "openclaw_mjs" not in detected:
|
||||
detected["openclaw_mjs"] = str(candidate_mjs)
|
||||
sources["openclaw_mjs"] = source
|
||||
if candidate_entry and "openclaw_entry" not in detected:
|
||||
detected["openclaw_entry"] = str(candidate_entry)
|
||||
sources["openclaw_entry"] = source
|
||||
|
||||
for root in install_roots:
|
||||
source = "path:openclaw" if root in path_roots or root in path_entry_candidates else "scan:common-locations"
|
||||
fill_from_root(root, source)
|
||||
if all(k in detected for k in ("openclaw_mjs", "openclaw_entry")):
|
||||
break
|
||||
|
||||
result["detected"] = detected
|
||||
result["sources"] = sources
|
||||
result["searched"] = searched
|
||||
result["ok"] = all(k in detected for k in ("node", "openclaw_mjs", "openclaw_entry"))
|
||||
if not result["ok"]:
|
||||
missing = [k for k in ("node", "openclaw_mjs", "openclaw_entry") if k not in detected]
|
||||
result["missing"] = missing
|
||||
result["error"] = (
|
||||
"Could not auto-detect: " + ", ".join(missing) + ". "
|
||||
"Set WATCHDOG_B_NODE_BIN / WATCHDOG_B_OPENCLAW_MJS / WATCHDOG_B_OPENCLAW_ENTRY explicitly if this host uses a non-standard install path."
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="Detect node/openclaw runtime paths for watchdog-b scripts")
|
||||
parser.add_argument("--shell", action="store_true", help="print shell export lines")
|
||||
parser.add_argument("--pretty", action="store_true", help="pretty-print json")
|
||||
args = parser.parse_args()
|
||||
|
||||
result = detect_runtime()
|
||||
if args.shell:
|
||||
if not result["ok"]:
|
||||
print(result["error"], flush=True)
|
||||
return 1
|
||||
detected = result["detected"]
|
||||
print(f'WATCHDOG_B_NODE_BIN={detected["node"]}')
|
||||
print(f'WATCHDOG_B_OPENCLAW_MJS={detected["openclaw_mjs"]}')
|
||||
print(f'WATCHDOG_B_OPENCLAW_ENTRY={detected["openclaw_entry"]}')
|
||||
return 0
|
||||
|
||||
print(json.dumps(result, ensure_ascii=False, indent=2 if args.pretty else None))
|
||||
return 0 if result["ok"] else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user