468 lines
18 KiB
Python
Executable File
468 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
SCRIPT_DIR = Path(__file__).resolve().parent
|
|
SKILL_DIR = SCRIPT_DIR.parent
|
|
WORKSPACE = Path(os.environ.get("WATCHDOG_B_WORKSPACE", str(Path.home() / ".openclaw" / "workspace")))
|
|
CONFIG_FILE = Path(os.environ.get("WATCHDOG_B_CONFIG_FILE", str(Path.home() / ".config" / "openclaw" / "watchdog-b.env")))
|
|
LIVE_SCRIPT_DIR = Path(os.environ.get("WATCHDOG_B_LIVE_SCRIPT_DIR", str(WORKSPACE / "scripts" / "watchdog-b")))
|
|
|
|
|
|
def load_env_file(path: Path) -> None:
|
|
if not path.exists():
|
|
return
|
|
for raw_line in path.read_text(encoding="utf-8").splitlines():
|
|
line = raw_line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, value = line.split("=", 1)
|
|
key = key.strip()
|
|
if not key:
|
|
continue
|
|
value = value.strip()
|
|
if (value.startswith('"') and value.endswith('"')) or (value.startswith("'") and value.endswith("'")):
|
|
value = value[1:-1]
|
|
os.environ.setdefault(key, value)
|
|
|
|
|
|
load_env_file(CONFIG_FILE)
|
|
|
|
STATE_DIR = Path(os.environ.get("WATCHDOG_B_ARTIFACT_DIR", str(WORKSPACE / "state" / "watchdog-b")))
|
|
NOTIFY_STATE_PATH = STATE_DIR / "notify-state.json"
|
|
OWNER_PRODUCER = Path(os.environ.get("WATCHDOG_B_OWNER_PRODUCER", str(SCRIPT_DIR / "owner_report_producer.py")))
|
|
OWNER_DRIVER = Path(os.environ.get("WATCHDOG_B_OWNER_DRIVER", str(SCRIPT_DIR / "owner_report_driver.py")))
|
|
PYTHON_BIN = os.environ.get("WATCHDOG_B_PYTHON_BIN", sys.executable or "python3")
|
|
WATCHDOG_OWNER_REPORT_CHANNEL = os.environ.get("WATCHDOG_B_OWNER_REPORT_CHANNEL", "discord")
|
|
WATCHDOG_OWNER_REPORT_TARGET = os.environ.get("WATCHDOG_B_OWNER_REPORT_TARGET", "channel:REPLACE_ME")
|
|
WATCHDOG_MAIN_AGENT_ID = os.environ.get("WATCHDOG_B_MAIN_AGENT_ID", "").strip()
|
|
HOSTNAME = os.uname().nodename
|
|
UTC = timezone.utc
|
|
RUNTIME_PROBE = Path(os.environ.get("WATCHDOG_B_RUNTIME_PROBE", str(SCRIPT_DIR / "openclaw_runtime_probe.py")))
|
|
RUNTIME_CACHE: dict[str, Path] | None = None
|
|
|
|
DEFAULTS = {
|
|
"running_min_interval_seconds": 3600,
|
|
"stalled_nudge_min_interval_seconds": 900,
|
|
"idle_nudge_min_interval_seconds": 1800,
|
|
"stalled_owner_escalation_after": 2,
|
|
"idle_owner_escalation_after": 2,
|
|
}
|
|
|
|
|
|
def now_iso() -> str:
|
|
return datetime.now().astimezone().isoformat(timespec="seconds")
|
|
|
|
|
|
def path_or_none(value: str | None) -> Path | None:
|
|
if not value:
|
|
return None
|
|
return Path(value).expanduser()
|
|
|
|
|
|
def detect_runtime_paths() -> dict[str, Path]:
|
|
global RUNTIME_CACHE
|
|
if RUNTIME_CACHE is not None:
|
|
return RUNTIME_CACHE
|
|
|
|
node_bin = path_or_none(os.environ.get("WATCHDOG_B_NODE_BIN"))
|
|
openclaw_mjs = path_or_none(os.environ.get("WATCHDOG_B_OPENCLAW_MJS"))
|
|
openclaw_entry = path_or_none(os.environ.get("WATCHDOG_B_OPENCLAW_ENTRY"))
|
|
|
|
if node_bin and node_bin.exists() and os.access(node_bin, os.X_OK) and openclaw_mjs and openclaw_mjs.is_file() and openclaw_entry and openclaw_entry.is_file():
|
|
RUNTIME_CACHE = {
|
|
"node": node_bin,
|
|
"openclaw_mjs": openclaw_mjs,
|
|
"openclaw_entry": openclaw_entry,
|
|
}
|
|
return RUNTIME_CACHE
|
|
|
|
if RUNTIME_PROBE.exists():
|
|
proc = subprocess.run([PYTHON_BIN, str(RUNTIME_PROBE)], text=True, capture_output=True)
|
|
if proc.returncode == 0:
|
|
payload = json.loads(proc.stdout)
|
|
detected = payload.get("detected", {})
|
|
RUNTIME_CACHE = {
|
|
"node": Path(detected["node"]),
|
|
"openclaw_mjs": Path(detected["openclaw_mjs"]),
|
|
"openclaw_entry": Path(detected["openclaw_entry"]),
|
|
}
|
|
return RUNTIME_CACHE
|
|
|
|
node_which = shutil.which("node")
|
|
if node_which:
|
|
node_bin = Path(node_which)
|
|
|
|
missing = []
|
|
if not node_bin or not node_bin.exists():
|
|
missing.append("WATCHDOG_B_NODE_BIN")
|
|
if not openclaw_mjs or not openclaw_mjs.is_file():
|
|
missing.append("WATCHDOG_B_OPENCLAW_MJS")
|
|
if not openclaw_entry or not openclaw_entry.is_file():
|
|
missing.append("WATCHDOG_B_OPENCLAW_ENTRY")
|
|
raise RuntimeError(
|
|
"Unable to auto-detect watchdog runtime paths. Missing: " + ", ".join(missing)
|
|
)
|
|
|
|
|
|
def load_state() -> dict[str, Any]:
|
|
if NOTIFY_STATE_PATH.exists():
|
|
try:
|
|
return json.loads(NOTIFY_STATE_PATH.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
pass
|
|
return {"events": {}}
|
|
|
|
|
|
def save_state(data: dict[str, Any]) -> None:
|
|
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
NOTIFY_STATE_PATH.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
|
|
|
|
|
|
def event_bucket(state: str) -> dict[str, Any]:
|
|
data = load_state()
|
|
events = data.setdefault("events", {})
|
|
bucket = events.setdefault(state, {})
|
|
return data
|
|
|
|
|
|
def get_bucket(data: dict[str, Any], state: str) -> dict[str, Any]:
|
|
events = data.setdefault("events", {})
|
|
return events.setdefault(state, {})
|
|
|
|
|
|
def should_send(bucket: dict[str, Any], min_interval_seconds: int, timestamp: datetime) -> tuple[bool, str]:
|
|
last_sent = bucket.get("last_sent_at")
|
|
if not last_sent:
|
|
return True, "first-send"
|
|
try:
|
|
prev = datetime.fromisoformat(last_sent)
|
|
except Exception:
|
|
return True, "state-corrupt-reset"
|
|
elapsed = (timestamp - prev).total_seconds()
|
|
if elapsed >= min_interval_seconds:
|
|
return True, f"interval-ok:{int(elapsed)}s"
|
|
return False, f"throttled:{int(elapsed)}s<{min_interval_seconds}s"
|
|
|
|
|
|
def mark_sent(bucket: dict[str, Any], channel: str, timestamp: str, detail: dict[str, Any] | None = None) -> None:
|
|
bucket["last_sent_at"] = timestamp
|
|
bucket["last_channel"] = channel
|
|
bucket["send_count"] = int(bucket.get("send_count", 0)) + 1
|
|
bucket["last_detail"] = detail or {}
|
|
|
|
|
|
def build_owner_message(state: str, timestamp: str, detail: str) -> dict[str, str]:
|
|
emoji_default = {
|
|
"running": "✅",
|
|
"stalled": "⚠️",
|
|
"idle": "🛑",
|
|
}
|
|
summary_default = {
|
|
"running": "主程序仍在運行",
|
|
"stalled": "主程序疑似卡住",
|
|
"idle": "主程序目前未運行",
|
|
}
|
|
progress_default = {
|
|
"running": "running",
|
|
"stalled": "stalled",
|
|
"idle": "idle",
|
|
}
|
|
status_default = {
|
|
"running": "normal",
|
|
"stalled": "needs-attention",
|
|
"idle": "needs-attention",
|
|
}
|
|
source_default = {
|
|
"running": "watchdog-b-running",
|
|
"stalled": "watchdog-b-stalled-escalation",
|
|
"idle": "watchdog-b-idle-escalation",
|
|
}
|
|
detail_default = {
|
|
"running": f"checked_at={timestamp} host={HOSTNAME}",
|
|
"stalled": f"checked_at={timestamp} host={HOSTNAME}; stale activity detected while process still looked alive",
|
|
"idle": f"checked_at={timestamp} host={HOSTNAME}; no active main runtime detected",
|
|
}
|
|
return {
|
|
"progress": os.environ.get(f"WATCHDOG_B_{state.upper()}_PROGRESS_LABEL", progress_default[state]),
|
|
"done": f"{os.environ.get(f'WATCHDOG_B_{state.upper()}_EMOJI', emoji_default[state])} {os.environ.get(f'WATCHDOG_B_{state.upper()}_SUMMARY', summary_default[state])}",
|
|
"next": detail or os.environ.get(f"WATCHDOG_B_{state.upper()}_DETAIL", detail_default[state]),
|
|
"status": os.environ.get(f"WATCHDOG_B_{state.upper()}_STATUS", status_default[state]),
|
|
"source": os.environ.get(f"WATCHDOG_B_{state.upper()}_SOURCE", source_default[state]),
|
|
}
|
|
|
|
|
|
def enqueue_owner_report(*, state: str, timestamp: str, dry_run: bool, detail: str) -> dict[str, Any]:
|
|
msg = build_owner_message(state, timestamp, detail)
|
|
report_id = f"watchdog-b-{state}-{datetime.now(UTC).strftime('%Y%m%dT%H%M%SZ')}"
|
|
cmd = [
|
|
PYTHON_BIN,
|
|
str(OWNER_PRODUCER),
|
|
"--team",
|
|
"watchdog-b",
|
|
"--worker",
|
|
HOSTNAME,
|
|
"--task-id",
|
|
f"openclaw-main-{state}",
|
|
"--progress",
|
|
msg["progress"],
|
|
"--done",
|
|
msg["done"],
|
|
"--next",
|
|
msg["next"],
|
|
"--status",
|
|
msg["status"],
|
|
"--source",
|
|
msg["source"],
|
|
"--report-id",
|
|
report_id,
|
|
]
|
|
if dry_run:
|
|
cmd.append("--dry-run")
|
|
proc = subprocess.run(cmd, text=True, capture_output=True)
|
|
result = {
|
|
"kind": "owner-report-enqueue",
|
|
"ok": proc.returncode == 0,
|
|
"command": cmd,
|
|
"exit_code": proc.returncode,
|
|
"stdout": proc.stdout,
|
|
"stderr": proc.stderr,
|
|
"report_id": report_id,
|
|
"dry_run": dry_run,
|
|
}
|
|
if proc.returncode == 0 and not dry_run:
|
|
result["pending_path"] = str(Path.home() / ".clawteam" / "owner-reports" / "pending" / f"{report_id}.md")
|
|
return result
|
|
|
|
|
|
def build_owner_send_cmd() -> str:
|
|
runtime = detect_runtime_paths()
|
|
return (
|
|
f'"{runtime["node"]}" "{runtime["openclaw_entry"]}" message send '
|
|
f'--channel {WATCHDOG_OWNER_REPORT_CHANNEL} '
|
|
f"--target '{WATCHDOG_OWNER_REPORT_TARGET}' "
|
|
f'--message "$OWNER_REPORT_MESSAGE"'
|
|
)
|
|
|
|
|
|
def deliver_owner_report(*, report_id: str, dry_run: bool) -> dict[str, Any]:
|
|
send_cmd = build_owner_send_cmd()
|
|
cmd = [PYTHON_BIN, str(OWNER_DRIVER), report_id, "--send-cmd", send_cmd]
|
|
if dry_run:
|
|
cmd.append("--dry-run")
|
|
proc = subprocess.run(cmd, text=True, capture_output=True)
|
|
return {
|
|
"kind": "owner-report-direct-delivery",
|
|
"ok": proc.returncode == 0,
|
|
"command": cmd,
|
|
"send_cmd": send_cmd,
|
|
"exit_code": proc.returncode,
|
|
"stdout": proc.stdout,
|
|
"stderr": proc.stderr,
|
|
"dry_run": dry_run,
|
|
"report_id": report_id,
|
|
"target_channel": WATCHDOG_OWNER_REPORT_CHANNEL,
|
|
"target": WATCHDOG_OWNER_REPORT_TARGET,
|
|
}
|
|
|
|
|
|
def call_main_agent(*, state: str, timestamp: str, dry_run: bool) -> dict[str, Any]:
|
|
message = (
|
|
f"[watchdog-b][{state}] {timestamp}\n"
|
|
f"Host: {HOSTNAME}\n"
|
|
f"Please confirm current task state, whether progress is blocked, and whether owner-facing escalation is needed."
|
|
)
|
|
if not WATCHDOG_MAIN_AGENT_ID:
|
|
return {
|
|
"kind": "main-agent-nudge",
|
|
"ok": True,
|
|
"skipped": True,
|
|
"reason": "WATCHDOG_B_MAIN_AGENT_ID not configured",
|
|
"dry_run": dry_run,
|
|
"message": message,
|
|
}
|
|
try:
|
|
runtime = detect_runtime_paths()
|
|
except Exception as exc:
|
|
return {
|
|
"kind": "main-agent-nudge",
|
|
"ok": False,
|
|
"dry_run": dry_run,
|
|
"error": str(exc),
|
|
"message": message,
|
|
}
|
|
cmd = [
|
|
str(runtime["node"]),
|
|
str(runtime["openclaw_mjs"]),
|
|
"agent",
|
|
"--agent",
|
|
WATCHDOG_MAIN_AGENT_ID,
|
|
"--message",
|
|
message,
|
|
"--timeout",
|
|
os.environ.get("WATCHDOG_B_MAIN_AGENT_TIMEOUT", "120"),
|
|
]
|
|
if dry_run:
|
|
return {"kind": "main-agent-nudge", "ok": True, "dry_run": True, "command": cmd, "message": message}
|
|
try:
|
|
proc = subprocess.run(cmd, text=True, capture_output=True, timeout=int(os.environ.get("WATCHDOG_B_MAIN_AGENT_TIMEOUT", "120")) + 10)
|
|
return {
|
|
"kind": "main-agent-nudge",
|
|
"ok": proc.returncode == 0,
|
|
"dry_run": False,
|
|
"command": cmd,
|
|
"exit_code": proc.returncode,
|
|
"stdout": proc.stdout,
|
|
"stderr": proc.stderr,
|
|
"message": message,
|
|
}
|
|
except subprocess.TimeoutExpired as e:
|
|
return {
|
|
"kind": "main-agent-nudge",
|
|
"ok": False,
|
|
"dry_run": False,
|
|
"command": cmd,
|
|
"timeout": True,
|
|
"stdout": e.stdout,
|
|
"stderr": e.stderr,
|
|
"message": message,
|
|
}
|
|
|
|
|
|
def maybe_running_report(data: dict[str, Any], bucket: dict[str, Any], timestamp: str, dry_run: bool) -> dict[str, Any]:
|
|
mode = os.environ.get("WATCHDOG_B_RUNNING_REPORT_MODE", "manual").lower()
|
|
min_interval = int(os.environ.get("WATCHDOG_B_RUNNING_REPORT_MIN_INTERVAL_SECONDS", str(DEFAULTS["running_min_interval_seconds"])))
|
|
allowed, reason = should_send(bucket, min_interval, datetime.fromisoformat(timestamp))
|
|
result: dict[str, Any] = {
|
|
"state": "running",
|
|
"route": "owner-report",
|
|
"mode": mode,
|
|
"allowed": allowed,
|
|
"reason": reason,
|
|
"dry_run": dry_run,
|
|
}
|
|
if mode not in {"manual", "enqueue", "enqueue-and-drain"}:
|
|
result.update({"ok": False, "error": f"unsupported running mode: {mode}"})
|
|
return result
|
|
if mode == "manual":
|
|
result.update({
|
|
"ok": True,
|
|
"action": "manual-only",
|
|
"hint": "set WATCHDOG_B_RUNNING_REPORT_MODE=enqueue to create a real pending item, or enqueue-and-drain to enqueue and directly deliver it to Discord",
|
|
})
|
|
return result
|
|
if not allowed:
|
|
result.update({"ok": True, "action": "suppressed"})
|
|
return result
|
|
enqueue = enqueue_owner_report(state="running", timestamp=timestamp, dry_run=dry_run, detail="Main runtime alive and log activity fresh.")
|
|
result["enqueue"] = enqueue
|
|
result["ok"] = enqueue.get("ok", False)
|
|
if enqueue.get("ok"):
|
|
mark_sent(bucket, "owner-report-enqueue", timestamp, {"report_id": enqueue.get("report_id")})
|
|
if mode == "enqueue-and-drain" and enqueue.get("ok"):
|
|
deliver = deliver_owner_report(report_id=enqueue.get("report_id"), dry_run=dry_run)
|
|
result["deliver"] = deliver
|
|
result["ok"] = result["ok"] and deliver.get("ok", False)
|
|
if deliver.get("ok"):
|
|
mark_sent(bucket, "owner-report-direct-delivery", timestamp, {"report_id": enqueue.get("report_id")})
|
|
return result
|
|
|
|
|
|
def maybe_nudge_and_escalate(data: dict[str, Any], bucket: dict[str, Any], *, state: str, timestamp: str, dry_run: bool) -> dict[str, Any]:
|
|
is_stalled = state == "stalled"
|
|
nudge_min = int(os.environ.get(
|
|
"WATCHDOG_B_STALLED_NUDGE_MIN_INTERVAL_SECONDS" if is_stalled else "WATCHDOG_B_IDLE_NUDGE_MIN_INTERVAL_SECONDS",
|
|
str(DEFAULTS["stalled_nudge_min_interval_seconds"] if is_stalled else DEFAULTS["idle_nudge_min_interval_seconds"]),
|
|
))
|
|
escalation_after = int(os.environ.get(
|
|
"WATCHDOG_B_STALLED_OWNER_ESCALATION_AFTER" if is_stalled else "WATCHDOG_B_IDLE_OWNER_ESCALATION_AFTER",
|
|
str(DEFAULTS["stalled_owner_escalation_after"] if is_stalled else DEFAULTS["idle_owner_escalation_after"]),
|
|
))
|
|
owner_mode = os.environ.get(
|
|
"WATCHDOG_B_STALLED_OWNER_MODE" if is_stalled else "WATCHDOG_B_IDLE_OWNER_MODE",
|
|
"escalate",
|
|
).lower()
|
|
|
|
bucket["seen_count"] = int(bucket.get("seen_count", 0)) + 1
|
|
allowed, reason = should_send(bucket, nudge_min, datetime.fromisoformat(timestamp))
|
|
result: dict[str, Any] = {
|
|
"state": state,
|
|
"route": "main-agent-then-owner",
|
|
"allowed": allowed,
|
|
"reason": reason,
|
|
"seen_count": bucket["seen_count"],
|
|
"owner_mode": owner_mode,
|
|
"dry_run": dry_run,
|
|
}
|
|
|
|
if allowed:
|
|
nudge = call_main_agent(state=state, timestamp=timestamp, dry_run=dry_run)
|
|
result["main_agent_nudge"] = nudge
|
|
if nudge.get("ok"):
|
|
mark_sent(bucket, "main-agent", timestamp, {"state": state})
|
|
result["ok"] = nudge.get("ok", False)
|
|
else:
|
|
result.update({"ok": True, "action": "nudge-suppressed"})
|
|
|
|
should_escalate = owner_mode in {"always", "escalate"} and bucket["seen_count"] >= escalation_after
|
|
if owner_mode == "never":
|
|
should_escalate = False
|
|
|
|
if should_escalate:
|
|
owner_allowed, owner_reason = should_send(bucket, nudge_min, datetime.fromisoformat(timestamp))
|
|
result["owner_escalation_gate"] = {"allowed": owner_allowed, "reason": owner_reason, "threshold": escalation_after}
|
|
if owner_allowed:
|
|
detail = "Main agent was nudged repeatedly; please review whether manual intervention is needed."
|
|
enqueue = enqueue_owner_report(state=state, timestamp=timestamp, dry_run=dry_run, detail=detail)
|
|
result["owner_enqueue"] = enqueue
|
|
result["ok"] = result.get("ok", True) and enqueue.get("ok", False)
|
|
if enqueue.get("ok"):
|
|
mark_sent(bucket, "owner-report-enqueue", timestamp, {"report_id": enqueue.get("report_id"), "state": state})
|
|
owner_delivery_mode = os.environ.get(
|
|
"WATCHDOG_B_OWNER_DELIVERY_MODE",
|
|
"enqueue-only",
|
|
).lower()
|
|
result["owner_delivery_mode"] = owner_delivery_mode
|
|
if owner_delivery_mode == "direct-discord":
|
|
deliver = deliver_owner_report(report_id=enqueue.get("report_id"), dry_run=dry_run)
|
|
result["owner_deliver"] = deliver
|
|
result["ok"] = result.get("ok", True) and deliver.get("ok", False)
|
|
if deliver.get("ok"):
|
|
mark_sent(bucket, "owner-report-direct-delivery", timestamp, {"report_id": enqueue.get("report_id"), "state": state})
|
|
return result
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser(description="Notification layer for watchdog-b")
|
|
ap.add_argument("--state", required=True, choices=["running", "stalled", "idle"])
|
|
ap.add_argument("--timestamp", default=now_iso())
|
|
ap.add_argument("--dry-run", action="store_true")
|
|
args = ap.parse_args()
|
|
|
|
data = load_state()
|
|
bucket = get_bucket(data, args.state)
|
|
|
|
if args.state == "running":
|
|
result = maybe_running_report(data, bucket, args.timestamp, args.dry_run)
|
|
else:
|
|
result = maybe_nudge_and_escalate(data, bucket, state=args.state, timestamp=args.timestamp, dry_run=args.dry_run)
|
|
|
|
bucket["last_seen_at"] = args.timestamp
|
|
bucket["last_result"] = result
|
|
save_state(data)
|
|
print(json.dumps(result, ensure_ascii=False, indent=2))
|
|
return 0 if result.get("ok", False) else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|