#!/usr/bin/env python3 from __future__ import annotations import argparse import json import os import shutil import subprocess import sys from datetime import datetime, timezone from pathlib import Path from typing import Any SCRIPT_DIR = Path(__file__).resolve().parent SKILL_DIR = SCRIPT_DIR.parent WORKSPACE = Path(os.environ.get("WATCHDOG_B_WORKSPACE", str(Path.home() / ".openclaw" / "workspace"))) CONFIG_FILE = Path(os.environ.get("WATCHDOG_B_CONFIG_FILE", str(Path.home() / ".config" / "openclaw" / "watchdog-b.env"))) LIVE_SCRIPT_DIR = Path(os.environ.get("WATCHDOG_B_LIVE_SCRIPT_DIR", str(WORKSPACE / "scripts" / "watchdog-b"))) def load_env_file(path: Path) -> None: if not path.exists(): return for raw_line in path.read_text(encoding="utf-8").splitlines(): line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() if not key: continue value = value.strip() if (value.startswith('"') and value.endswith('"')) or (value.startswith("'") and value.endswith("'")): value = value[1:-1] os.environ.setdefault(key, value) load_env_file(CONFIG_FILE) STATE_DIR = Path(os.environ.get("WATCHDOG_B_ARTIFACT_DIR", str(WORKSPACE / "state" / "watchdog-b"))) NOTIFY_STATE_PATH = STATE_DIR / "notify-state.json" OWNER_PRODUCER = Path(os.environ.get("WATCHDOG_B_OWNER_PRODUCER", str(SCRIPT_DIR / "owner_report_producer.py"))) OWNER_DRIVER = Path(os.environ.get("WATCHDOG_B_OWNER_DRIVER", str(SCRIPT_DIR / "owner_report_driver.py"))) PYTHON_BIN = os.environ.get("WATCHDOG_B_PYTHON_BIN", sys.executable or "python3") WATCHDOG_OWNER_REPORT_CHANNEL = os.environ.get("WATCHDOG_B_OWNER_REPORT_CHANNEL", "discord") WATCHDOG_OWNER_REPORT_TARGET = os.environ.get("WATCHDOG_B_OWNER_REPORT_TARGET", "channel:REPLACE_ME") WATCHDOG_MAIN_AGENT_ID = os.environ.get("WATCHDOG_B_MAIN_AGENT_ID", "").strip() HOSTNAME = os.uname().nodename UTC = timezone.utc RUNTIME_PROBE = Path(os.environ.get("WATCHDOG_B_RUNTIME_PROBE", str(SCRIPT_DIR / "openclaw_runtime_probe.py"))) RUNTIME_CACHE: dict[str, Path] | None = None DEFAULTS = { "running_min_interval_seconds": 3600, "stalled_nudge_min_interval_seconds": 900, "idle_nudge_min_interval_seconds": 1800, "stalled_owner_escalation_after": 2, "idle_owner_escalation_after": 2, } def now_iso() -> str: return datetime.now().astimezone().isoformat(timespec="seconds") def path_or_none(value: str | None) -> Path | None: if not value: return None return Path(value).expanduser() def detect_runtime_paths() -> dict[str, Path]: global RUNTIME_CACHE if RUNTIME_CACHE is not None: return RUNTIME_CACHE node_bin = path_or_none(os.environ.get("WATCHDOG_B_NODE_BIN")) openclaw_mjs = path_or_none(os.environ.get("WATCHDOG_B_OPENCLAW_MJS")) openclaw_entry = path_or_none(os.environ.get("WATCHDOG_B_OPENCLAW_ENTRY")) if node_bin and node_bin.exists() and os.access(node_bin, os.X_OK) and openclaw_mjs and openclaw_mjs.is_file() and openclaw_entry and openclaw_entry.is_file(): RUNTIME_CACHE = { "node": node_bin, "openclaw_mjs": openclaw_mjs, "openclaw_entry": openclaw_entry, } return RUNTIME_CACHE if RUNTIME_PROBE.exists(): proc = subprocess.run([PYTHON_BIN, str(RUNTIME_PROBE)], text=True, capture_output=True) if proc.returncode == 0: payload = json.loads(proc.stdout) detected = payload.get("detected", {}) RUNTIME_CACHE = { "node": Path(detected["node"]), "openclaw_mjs": Path(detected["openclaw_mjs"]), "openclaw_entry": Path(detected["openclaw_entry"]), } return RUNTIME_CACHE node_which = shutil.which("node") if node_which: node_bin = Path(node_which) missing = [] if not node_bin or not node_bin.exists(): missing.append("WATCHDOG_B_NODE_BIN") if not openclaw_mjs or not openclaw_mjs.is_file(): missing.append("WATCHDOG_B_OPENCLAW_MJS") if not openclaw_entry or not openclaw_entry.is_file(): missing.append("WATCHDOG_B_OPENCLAW_ENTRY") raise RuntimeError( "Unable to auto-detect watchdog runtime paths. Missing: " + ", ".join(missing) ) def load_state() -> dict[str, Any]: if NOTIFY_STATE_PATH.exists(): try: return json.loads(NOTIFY_STATE_PATH.read_text(encoding="utf-8")) except Exception: pass return {"events": {}} def save_state(data: dict[str, Any]) -> None: STATE_DIR.mkdir(parents=True, exist_ok=True) NOTIFY_STATE_PATH.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") def event_bucket(state: str) -> dict[str, Any]: data = load_state() events = data.setdefault("events", {}) bucket = events.setdefault(state, {}) return data def get_bucket(data: dict[str, Any], state: str) -> dict[str, Any]: events = data.setdefault("events", {}) return events.setdefault(state, {}) def should_send(bucket: dict[str, Any], min_interval_seconds: int, timestamp: datetime) -> tuple[bool, str]: last_sent = bucket.get("last_sent_at") if not last_sent: return True, "first-send" try: prev = datetime.fromisoformat(last_sent) except Exception: return True, "state-corrupt-reset" elapsed = (timestamp - prev).total_seconds() if elapsed >= min_interval_seconds: return True, f"interval-ok:{int(elapsed)}s" return False, f"throttled:{int(elapsed)}s<{min_interval_seconds}s" def mark_sent(bucket: dict[str, Any], channel: str, timestamp: str, detail: dict[str, Any] | None = None) -> None: bucket["last_sent_at"] = timestamp bucket["last_channel"] = channel bucket["send_count"] = int(bucket.get("send_count", 0)) + 1 bucket["last_detail"] = detail or {} def build_owner_message(state: str, timestamp: str, detail: str) -> dict[str, str]: emoji_default = { "running": "✅", "stalled": "⚠️", "idle": "🛑", } summary_default = { "running": "主程序仍在運行", "stalled": "主程序疑似卡住", "idle": "主程序目前未運行", } progress_default = { "running": "running", "stalled": "stalled", "idle": "idle", } status_default = { "running": "normal", "stalled": "needs-attention", "idle": "needs-attention", } source_default = { "running": "watchdog-b-running", "stalled": "watchdog-b-stalled-escalation", "idle": "watchdog-b-idle-escalation", } detail_default = { "running": f"checked_at={timestamp} host={HOSTNAME}", "stalled": f"checked_at={timestamp} host={HOSTNAME}; stale activity detected while process still looked alive", "idle": f"checked_at={timestamp} host={HOSTNAME}; no active main runtime detected", } return { "progress": os.environ.get(f"WATCHDOG_B_{state.upper()}_PROGRESS_LABEL", progress_default[state]), "done": f"{os.environ.get(f'WATCHDOG_B_{state.upper()}_EMOJI', emoji_default[state])} {os.environ.get(f'WATCHDOG_B_{state.upper()}_SUMMARY', summary_default[state])}", "next": detail or os.environ.get(f"WATCHDOG_B_{state.upper()}_DETAIL", detail_default[state]), "status": os.environ.get(f"WATCHDOG_B_{state.upper()}_STATUS", status_default[state]), "source": os.environ.get(f"WATCHDOG_B_{state.upper()}_SOURCE", source_default[state]), } def enqueue_owner_report(*, state: str, timestamp: str, dry_run: bool, detail: str) -> dict[str, Any]: msg = build_owner_message(state, timestamp, detail) report_id = f"watchdog-b-{state}-{datetime.now(UTC).strftime('%Y%m%dT%H%M%SZ')}" cmd = [ PYTHON_BIN, str(OWNER_PRODUCER), "--team", "watchdog-b", "--worker", HOSTNAME, "--task-id", f"openclaw-main-{state}", "--progress", msg["progress"], "--done", msg["done"], "--next", msg["next"], "--status", msg["status"], "--source", msg["source"], "--report-id", report_id, ] if dry_run: cmd.append("--dry-run") proc = subprocess.run(cmd, text=True, capture_output=True) result = { "kind": "owner-report-enqueue", "ok": proc.returncode == 0, "command": cmd, "exit_code": proc.returncode, "stdout": proc.stdout, "stderr": proc.stderr, "report_id": report_id, "dry_run": dry_run, } if proc.returncode == 0 and not dry_run: result["pending_path"] = str(Path.home() / ".clawteam" / "owner-reports" / "pending" / f"{report_id}.md") return result def build_owner_send_cmd() -> str: runtime = detect_runtime_paths() return ( f'"{runtime["node"]}" "{runtime["openclaw_entry"]}" message send ' f'--channel {WATCHDOG_OWNER_REPORT_CHANNEL} ' f"--target '{WATCHDOG_OWNER_REPORT_TARGET}' " f'--message "$OWNER_REPORT_MESSAGE"' ) def deliver_owner_report(*, report_id: str, dry_run: bool) -> dict[str, Any]: send_cmd = build_owner_send_cmd() cmd = [PYTHON_BIN, str(OWNER_DRIVER), report_id, "--send-cmd", send_cmd] if dry_run: cmd.append("--dry-run") proc = subprocess.run(cmd, text=True, capture_output=True) return { "kind": "owner-report-direct-delivery", "ok": proc.returncode == 0, "command": cmd, "send_cmd": send_cmd, "exit_code": proc.returncode, "stdout": proc.stdout, "stderr": proc.stderr, "dry_run": dry_run, "report_id": report_id, "target_channel": WATCHDOG_OWNER_REPORT_CHANNEL, "target": WATCHDOG_OWNER_REPORT_TARGET, } def call_main_agent(*, state: str, timestamp: str, dry_run: bool) -> dict[str, Any]: message = ( f"[watchdog-b][{state}] {timestamp}\n" f"Host: {HOSTNAME}\n" f"Please confirm current task state, whether progress is blocked, and whether owner-facing escalation is needed." ) if not WATCHDOG_MAIN_AGENT_ID: return { "kind": "main-agent-nudge", "ok": True, "skipped": True, "reason": "WATCHDOG_B_MAIN_AGENT_ID not configured", "dry_run": dry_run, "message": message, } try: runtime = detect_runtime_paths() except Exception as exc: return { "kind": "main-agent-nudge", "ok": False, "dry_run": dry_run, "error": str(exc), "message": message, } cmd = [ str(runtime["node"]), str(runtime["openclaw_mjs"]), "agent", "--agent", WATCHDOG_MAIN_AGENT_ID, "--message", message, "--timeout", os.environ.get("WATCHDOG_B_MAIN_AGENT_TIMEOUT", "120"), ] if dry_run: return {"kind": "main-agent-nudge", "ok": True, "dry_run": True, "command": cmd, "message": message} try: proc = subprocess.run(cmd, text=True, capture_output=True, timeout=int(os.environ.get("WATCHDOG_B_MAIN_AGENT_TIMEOUT", "120")) + 10) return { "kind": "main-agent-nudge", "ok": proc.returncode == 0, "dry_run": False, "command": cmd, "exit_code": proc.returncode, "stdout": proc.stdout, "stderr": proc.stderr, "message": message, } except subprocess.TimeoutExpired as e: return { "kind": "main-agent-nudge", "ok": False, "dry_run": False, "command": cmd, "timeout": True, "stdout": e.stdout, "stderr": e.stderr, "message": message, } def maybe_running_report(data: dict[str, Any], bucket: dict[str, Any], timestamp: str, dry_run: bool) -> dict[str, Any]: mode = os.environ.get("WATCHDOG_B_RUNNING_REPORT_MODE", "manual").lower() min_interval = int(os.environ.get("WATCHDOG_B_RUNNING_REPORT_MIN_INTERVAL_SECONDS", str(DEFAULTS["running_min_interval_seconds"]))) allowed, reason = should_send(bucket, min_interval, datetime.fromisoformat(timestamp)) result: dict[str, Any] = { "state": "running", "route": "owner-report", "mode": mode, "allowed": allowed, "reason": reason, "dry_run": dry_run, } if mode not in {"manual", "enqueue", "enqueue-and-drain"}: result.update({"ok": False, "error": f"unsupported running mode: {mode}"}) return result if mode == "manual": result.update({ "ok": True, "action": "manual-only", "hint": "set WATCHDOG_B_RUNNING_REPORT_MODE=enqueue to create a real pending item, or enqueue-and-drain to enqueue and directly deliver it to Discord", }) return result if not allowed: result.update({"ok": True, "action": "suppressed"}) return result enqueue = enqueue_owner_report(state="running", timestamp=timestamp, dry_run=dry_run, detail="Main runtime alive and log activity fresh.") result["enqueue"] = enqueue result["ok"] = enqueue.get("ok", False) if enqueue.get("ok"): mark_sent(bucket, "owner-report-enqueue", timestamp, {"report_id": enqueue.get("report_id")}) if mode == "enqueue-and-drain" and enqueue.get("ok"): deliver = deliver_owner_report(report_id=enqueue.get("report_id"), dry_run=dry_run) result["deliver"] = deliver result["ok"] = result["ok"] and deliver.get("ok", False) if deliver.get("ok"): mark_sent(bucket, "owner-report-direct-delivery", timestamp, {"report_id": enqueue.get("report_id")}) return result def maybe_nudge_and_escalate(data: dict[str, Any], bucket: dict[str, Any], *, state: str, timestamp: str, dry_run: bool) -> dict[str, Any]: is_stalled = state == "stalled" nudge_min = int(os.environ.get( "WATCHDOG_B_STALLED_NUDGE_MIN_INTERVAL_SECONDS" if is_stalled else "WATCHDOG_B_IDLE_NUDGE_MIN_INTERVAL_SECONDS", str(DEFAULTS["stalled_nudge_min_interval_seconds"] if is_stalled else DEFAULTS["idle_nudge_min_interval_seconds"]), )) escalation_after = int(os.environ.get( "WATCHDOG_B_STALLED_OWNER_ESCALATION_AFTER" if is_stalled else "WATCHDOG_B_IDLE_OWNER_ESCALATION_AFTER", str(DEFAULTS["stalled_owner_escalation_after"] if is_stalled else DEFAULTS["idle_owner_escalation_after"]), )) owner_mode = os.environ.get( "WATCHDOG_B_STALLED_OWNER_MODE" if is_stalled else "WATCHDOG_B_IDLE_OWNER_MODE", "escalate", ).lower() bucket["seen_count"] = int(bucket.get("seen_count", 0)) + 1 allowed, reason = should_send(bucket, nudge_min, datetime.fromisoformat(timestamp)) result: dict[str, Any] = { "state": state, "route": "main-agent-then-owner", "allowed": allowed, "reason": reason, "seen_count": bucket["seen_count"], "owner_mode": owner_mode, "dry_run": dry_run, } if allowed: nudge = call_main_agent(state=state, timestamp=timestamp, dry_run=dry_run) result["main_agent_nudge"] = nudge if nudge.get("ok"): mark_sent(bucket, "main-agent", timestamp, {"state": state}) result["ok"] = nudge.get("ok", False) else: result.update({"ok": True, "action": "nudge-suppressed"}) should_escalate = owner_mode in {"always", "escalate"} and bucket["seen_count"] >= escalation_after if owner_mode == "never": should_escalate = False if should_escalate: owner_allowed, owner_reason = should_send(bucket, nudge_min, datetime.fromisoformat(timestamp)) result["owner_escalation_gate"] = {"allowed": owner_allowed, "reason": owner_reason, "threshold": escalation_after} if owner_allowed: detail = "Main agent was nudged repeatedly; please review whether manual intervention is needed." enqueue = enqueue_owner_report(state=state, timestamp=timestamp, dry_run=dry_run, detail=detail) result["owner_enqueue"] = enqueue result["ok"] = result.get("ok", True) and enqueue.get("ok", False) if enqueue.get("ok"): mark_sent(bucket, "owner-report-enqueue", timestamp, {"report_id": enqueue.get("report_id"), "state": state}) owner_delivery_mode = os.environ.get( "WATCHDOG_B_OWNER_DELIVERY_MODE", "enqueue-only", ).lower() result["owner_delivery_mode"] = owner_delivery_mode if owner_delivery_mode == "direct-discord": deliver = deliver_owner_report(report_id=enqueue.get("report_id"), dry_run=dry_run) result["owner_deliver"] = deliver result["ok"] = result.get("ok", True) and deliver.get("ok", False) if deliver.get("ok"): mark_sent(bucket, "owner-report-direct-delivery", timestamp, {"report_id": enqueue.get("report_id"), "state": state}) return result def main() -> int: ap = argparse.ArgumentParser(description="Notification layer for watchdog-b") ap.add_argument("--state", required=True, choices=["running", "stalled", "idle"]) ap.add_argument("--timestamp", default=now_iso()) ap.add_argument("--dry-run", action="store_true") args = ap.parse_args() data = load_state() bucket = get_bucket(data, args.state) if args.state == "running": result = maybe_running_report(data, bucket, args.timestamp, args.dry_run) else: result = maybe_nudge_and_escalate(data, bucket, state=args.state, timestamp=args.timestamp, dry_run=args.dry_run) bucket["last_seen_at"] = args.timestamp bucket["last_result"] = result save_state(data) print(json.dumps(result, ensure_ascii=False, indent=2)) return 0 if result.get("ok", False) else 1 if __name__ == "__main__": raise SystemExit(main())