feat(reporting-governance): add package-first portability smoke
This commit is contained in:
433
plugins/reporting-governance/scripts/long_task_watchdog.mjs
Executable file
433
plugins/reporting-governance/scripts/long_task_watchdog.mjs
Executable file
@@ -0,0 +1,433 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
import fs from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import process from 'node:process';
|
||||
import crypto from 'node:crypto';
|
||||
|
||||
const ROOT_DIR = path.resolve(import.meta.dirname, '..');
|
||||
const DEFAULT_STATE_PATH = path.join(ROOT_DIR, 'memory', 'watchdog-state.json');
|
||||
const DEFAULT_EVIDENCE_DIR = path.join(ROOT_DIR, 'state', 'long-task-watchdog');
|
||||
const DEFAULT_EVENT_DIR = path.join(ROOT_DIR, 'state', 'long-task-watchdog-events');
|
||||
const DEFAULT_NOTIFICATION_DIR = path.join(ROOT_DIR, 'state', 'operator-notify-queue');
|
||||
|
||||
function parseArgs(argv) {
|
||||
const args = {
|
||||
compact: false,
|
||||
state: DEFAULT_STATE_PATH,
|
||||
now: null,
|
||||
evidenceDir: DEFAULT_EVIDENCE_DIR,
|
||||
eventDir: DEFAULT_EVENT_DIR,
|
||||
notificationDir: DEFAULT_NOTIFICATION_DIR,
|
||||
writeState: false,
|
||||
help: false,
|
||||
};
|
||||
|
||||
for (let i = 0; i < argv.length; i += 1) {
|
||||
const token = argv[i];
|
||||
if (token === '--compact') {
|
||||
args.compact = true;
|
||||
continue;
|
||||
}
|
||||
if (token === '--write-state') {
|
||||
args.writeState = true;
|
||||
continue;
|
||||
}
|
||||
if (token === '--help' || token === '-h') {
|
||||
args.help = true;
|
||||
continue;
|
||||
}
|
||||
if (token === '--state') {
|
||||
args.state = argv[i + 1] ?? args.state;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--state=')) {
|
||||
args.state = token.slice('--state='.length) || args.state;
|
||||
continue;
|
||||
}
|
||||
if (token === '--now') {
|
||||
args.now = argv[i + 1] ?? null;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--now=')) {
|
||||
args.now = token.slice('--now='.length) || null;
|
||||
continue;
|
||||
}
|
||||
if (token === '--evidence-dir') {
|
||||
args.evidenceDir = argv[i + 1] ?? args.evidenceDir;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--evidence-dir=')) {
|
||||
args.evidenceDir = token.slice('--evidence-dir='.length) || args.evidenceDir;
|
||||
continue;
|
||||
}
|
||||
if (token === '--event-dir') {
|
||||
args.eventDir = argv[i + 1] ?? args.eventDir;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--event-dir=')) {
|
||||
args.eventDir = token.slice('--event-dir='.length) || args.eventDir;
|
||||
continue;
|
||||
}
|
||||
if (token === '--notification-dir') {
|
||||
args.notificationDir = argv[i + 1] ?? args.notificationDir;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--notification-dir=')) {
|
||||
args.notificationDir = token.slice('--notification-dir='.length) || args.notificationDir;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
return args;
|
||||
}
|
||||
|
||||
function printHelp() {
|
||||
process.stdout.write([
|
||||
'Usage: node scripts/long_task_watchdog.mjs [--compact] [--write-state] [--state <path>] [--now <iso>] [--evidence-dir <path>] [--event-dir <path>] [--notification-dir <path>]',
|
||||
'',
|
||||
'Minimal file-backed long-task watchdog runner with operator-notification queue output.',
|
||||
].join('\n') + '\n');
|
||||
}
|
||||
|
||||
function parseJsonFile(filePath) {
|
||||
const raw = fs.readFileSync(filePath, 'utf8');
|
||||
return JSON.parse(raw);
|
||||
}
|
||||
|
||||
function parseTime(value) {
|
||||
if (typeof value !== 'string' || value.length === 0) return null;
|
||||
const timestamp = Date.parse(value);
|
||||
return Number.isNaN(timestamp) ? null : timestamp;
|
||||
}
|
||||
|
||||
function toIso(value) {
|
||||
return new Date(value).toISOString();
|
||||
}
|
||||
|
||||
function toSafeName(value) {
|
||||
return String(value || 'watchdog')
|
||||
.replace(/[^a-zA-Z0-9._-]+/g, '-')
|
||||
.replace(/^-+|-+$/g, '')
|
||||
.slice(0, 80) || 'watchdog';
|
||||
}
|
||||
|
||||
function makeEventId(prefix) {
|
||||
return `${prefix}_${crypto.randomUUID()}`;
|
||||
}
|
||||
|
||||
function evaluateWatchdog(watchdog, nowMs) {
|
||||
const intervalMinutes = Number.isFinite(watchdog?.intervalMinutes)
|
||||
? watchdog.intervalMinutes
|
||||
: Number.parseInt(String(watchdog?.intervalMinutes ?? '0'), 10);
|
||||
const intervalMs = intervalMinutes > 0 ? intervalMinutes * 60 * 1000 : 0;
|
||||
const milestoneMs = parseTime(watchdog?.lastMilestoneAt);
|
||||
const lastAlertMs = parseTime(watchdog?.lastAlertAt);
|
||||
const active = watchdog?.status === 'active';
|
||||
|
||||
if (!active) {
|
||||
return {
|
||||
id: watchdog?.id ?? null,
|
||||
active: false,
|
||||
overdue: false,
|
||||
action: 'skip_inactive',
|
||||
reason: 'watchdog is not active',
|
||||
};
|
||||
}
|
||||
|
||||
if (!intervalMs || milestoneMs === null) {
|
||||
return {
|
||||
id: watchdog?.id ?? null,
|
||||
active: true,
|
||||
overdue: false,
|
||||
action: 'invalid_contract',
|
||||
reason: 'intervalMinutes or lastMilestoneAt is missing/invalid',
|
||||
};
|
||||
}
|
||||
|
||||
const dueAtMs = milestoneMs + intervalMs;
|
||||
const overdue = nowMs >= dueAtMs;
|
||||
|
||||
if (!overdue) {
|
||||
return {
|
||||
id: watchdog?.id ?? null,
|
||||
active: true,
|
||||
overdue: false,
|
||||
action: 'within_interval',
|
||||
reason: 'last milestone is still within interval',
|
||||
dueAt: toIso(dueAtMs),
|
||||
minutesOverdue: 0,
|
||||
};
|
||||
}
|
||||
|
||||
const lastAlertStillFresh = lastAlertMs !== null && lastAlertMs >= dueAtMs;
|
||||
if (lastAlertStillFresh) {
|
||||
return {
|
||||
id: watchdog?.id ?? null,
|
||||
active: true,
|
||||
overdue: true,
|
||||
action: 'already_alerted_this_interval',
|
||||
reason: 'lastAlertAt already covers current overdue interval',
|
||||
dueAt: toIso(dueAtMs),
|
||||
minutesOverdue: Math.floor((nowMs - dueAtMs) / 60000),
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
id: watchdog?.id ?? null,
|
||||
active: true,
|
||||
overdue: true,
|
||||
action: 'emit_external_evidence',
|
||||
reason: 'active watchdog is overdue and has not been externally evidenced for this interval',
|
||||
dueAt: toIso(dueAtMs),
|
||||
minutesOverdue: Math.floor((nowMs - dueAtMs) / 60000),
|
||||
};
|
||||
}
|
||||
|
||||
function ensureDir(dirPath) {
|
||||
fs.mkdirSync(dirPath, { recursive: true });
|
||||
}
|
||||
|
||||
function writeJsonFile(dirPath, fileName, payload) {
|
||||
ensureDir(dirPath);
|
||||
const filePath = path.join(dirPath, fileName);
|
||||
fs.writeFileSync(filePath, `${JSON.stringify(payload, null, 2)}\n`, 'utf8');
|
||||
return filePath;
|
||||
}
|
||||
|
||||
function buildBaseRefs(evidencePath) {
|
||||
return [
|
||||
{
|
||||
kind: 'runtime_artifact',
|
||||
path: evidencePath,
|
||||
label: 'watchdog_evidence',
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
function writeEvidence(evidenceDir, watchdog, evaluation, nowIso) {
|
||||
const fileName = `${nowIso.replace(/[:]/g, '').replace(/\.\d{3}Z$/, 'Z')}-${toSafeName(watchdog.id)}.json`;
|
||||
const payload = {
|
||||
generatedAt: nowIso,
|
||||
tool: 'long_task_watchdog',
|
||||
watchdog: {
|
||||
id: watchdog.id,
|
||||
task: watchdog.task,
|
||||
ownerSession: watchdog.ownerSession ?? null,
|
||||
ownerSessionKey: watchdog.ownerSessionKey ?? null,
|
||||
reportChannel: watchdog.reportChannel ?? watchdog.channel ?? null,
|
||||
reportTarget: watchdog.reportTarget ?? watchdog.target ?? null,
|
||||
intervalMinutes: watchdog.intervalMinutes,
|
||||
lastMilestoneAt: watchdog.lastMilestoneAt ?? null,
|
||||
lastAlertAt: watchdog.lastAlertAt ?? null,
|
||||
},
|
||||
evaluation,
|
||||
nextExpectedExternalAction: [
|
||||
'nudge owner session',
|
||||
'report owner-visible checkpoint',
|
||||
'or respawn / inspect locally if owner appears stalled',
|
||||
],
|
||||
};
|
||||
return writeJsonFile(evidenceDir, fileName, payload);
|
||||
}
|
||||
|
||||
function buildWatchdogEvent(watchdog, evaluation, nowIso, evidencePath) {
|
||||
const eventId = makeEventId('evt');
|
||||
return {
|
||||
event_id: eventId,
|
||||
event_type: 'watchdog_fired',
|
||||
runtime: 'openclaw',
|
||||
adapter_version: '1.1.0',
|
||||
agent_id: watchdog.ownerSessionKey ?? watchdog.ownerAgentId ?? 'agent:unknown',
|
||||
task_id: watchdog.id,
|
||||
correlation_id: `watchdog:${watchdog.id}`,
|
||||
timestamp: nowIso,
|
||||
payload: {
|
||||
watchdog_type: 'long_task_overdue',
|
||||
trigger_reason: evaluation.reason,
|
||||
triggered_at: nowIso,
|
||||
policy_id: 'long-task-watchdog-overdue-v1',
|
||||
severity: 'critical',
|
||||
due_at: evaluation.dueAt ?? null,
|
||||
minutes_overdue: evaluation.minutesOverdue ?? null,
|
||||
},
|
||||
evidence_refs: buildBaseRefs(evidencePath),
|
||||
operator_context: {
|
||||
channel: watchdog.reportChannel ?? watchdog.channel ?? null,
|
||||
operator_id: watchdog.reportTarget ?? watchdog.target ?? null,
|
||||
reporting_mode: 'watchdog',
|
||||
silent_task: true,
|
||||
watchdog_policy_id: 'long-task-watchdog-overdue-v1',
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function writeEvent(eventDir, watchdog, nowIso, event) {
|
||||
const fileName = `${nowIso.replace(/[:]/g, '').replace(/\.\d{3}Z$/, 'Z')}-${toSafeName(watchdog.id)}-watchdog-fired.json`;
|
||||
return writeJsonFile(eventDir, fileName, event);
|
||||
}
|
||||
|
||||
function buildNotificationMessage(watchdog, evaluation, nowIso) {
|
||||
const lines = [
|
||||
'【Watchdog 逾時告警】',
|
||||
`task: ${watchdog.task ?? watchdog.id ?? 'unknown-task'}`,
|
||||
`watchdog: ${watchdog.id ?? 'unknown-watchdog'}`,
|
||||
`dueAt: ${evaluation.dueAt ?? 'unknown'}`,
|
||||
`minutesOverdue: ${evaluation.minutesOverdue ?? 'unknown'}`,
|
||||
`lastMilestoneAt: ${watchdog.lastMilestoneAt ?? 'unknown'}`,
|
||||
`triggeredAt: ${nowIso}`,
|
||||
`ownerSessionKey: ${watchdog.ownerSessionKey ?? watchdog.ownerSession ?? 'unknown'}`,
|
||||
'requiredAction: 請立即對 owner/operator 發出可見更新,或檢查/重派 stalled task。',
|
||||
];
|
||||
return lines.join('\n');
|
||||
}
|
||||
|
||||
function buildNotificationQueueItem(watchdog, evaluation, nowIso, evidencePath, eventPath, eventId) {
|
||||
const notificationId = makeEventId('notify');
|
||||
const channel = watchdog.reportChannel ?? watchdog.channel ?? null;
|
||||
const target = watchdog.reportTarget ?? watchdog.target ?? null;
|
||||
return {
|
||||
notification_id: notificationId,
|
||||
kind: 'notify_operator',
|
||||
status: 'pending',
|
||||
created_at: nowIso,
|
||||
source_tool: 'long_task_watchdog',
|
||||
severity: 'critical',
|
||||
operator_notice: {
|
||||
required: true,
|
||||
channel,
|
||||
target,
|
||||
urgency: 'critical',
|
||||
message: buildNotificationMessage(watchdog, evaluation, nowIso),
|
||||
must_reference: ['watchdog_fired', 'forced_operator_update'],
|
||||
},
|
||||
dispatch_hint: {
|
||||
tool: 'message.send',
|
||||
channel,
|
||||
target,
|
||||
message: buildNotificationMessage(watchdog, evaluation, nowIso),
|
||||
},
|
||||
governance: {
|
||||
task_id: watchdog.id,
|
||||
correlation_id: `watchdog:${watchdog.id}`,
|
||||
trigger_event_id: eventId,
|
||||
trigger_event_type: 'watchdog_fired',
|
||||
decision: 'force_checkpoint',
|
||||
policy_id: 'long-task-watchdog-overdue-v1',
|
||||
reason: evaluation.reason,
|
||||
},
|
||||
evidence_refs: [
|
||||
...buildBaseRefs(evidencePath),
|
||||
{
|
||||
kind: 'runtime_artifact',
|
||||
path: eventPath,
|
||||
label: 'watchdog_event',
|
||||
},
|
||||
],
|
||||
blocked_gap: channel && target
|
||||
? null
|
||||
: 'watchdog state does not define reportChannel/reportTarget, so dispatcher target is incomplete',
|
||||
};
|
||||
}
|
||||
|
||||
function writeNotificationQueueItem(notificationDir, watchdog, nowIso, queueItem) {
|
||||
const fileName = `${nowIso.replace(/[:]/g, '').replace(/\.\d{3}Z$/, 'Z')}-${toSafeName(watchdog.id)}-notify-operator.json`;
|
||||
return writeJsonFile(notificationDir, fileName, queueItem);
|
||||
}
|
||||
|
||||
function main() {
|
||||
const args = parseArgs(process.argv.slice(2));
|
||||
if (args.help) {
|
||||
printHelp();
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
const nowMs = args.now ? parseTime(args.now) : Date.now();
|
||||
if (nowMs === null) {
|
||||
process.stderr.write('Invalid --now value\n');
|
||||
process.exit(1);
|
||||
}
|
||||
const nowIso = toIso(nowMs);
|
||||
|
||||
const state = parseJsonFile(args.state);
|
||||
const watchdogs = Array.isArray(state.watchdogs) ? state.watchdogs : [];
|
||||
const evaluations = watchdogs.map((watchdog) => ({
|
||||
watchdogId: watchdog?.id ?? null,
|
||||
...evaluateWatchdog(watchdog, nowMs),
|
||||
}));
|
||||
|
||||
const evidenceWrites = [];
|
||||
const eventWrites = [];
|
||||
const notificationWrites = [];
|
||||
const nextWatchdogs = watchdogs.map((watchdog, index) => {
|
||||
const evaluation = evaluations[index];
|
||||
if (evaluation.action !== 'emit_external_evidence') {
|
||||
return watchdog;
|
||||
}
|
||||
const evidencePath = writeEvidence(args.evidenceDir, watchdog, evaluation, nowIso);
|
||||
evidenceWrites.push({ watchdogId: watchdog.id, path: evidencePath });
|
||||
|
||||
const event = buildWatchdogEvent(watchdog, evaluation, nowIso, evidencePath);
|
||||
const eventPath = writeEvent(args.eventDir, watchdog, nowIso, event);
|
||||
eventWrites.push({ watchdogId: watchdog.id, path: eventPath, eventId: event.event_id, eventType: event.event_type });
|
||||
|
||||
const notification = buildNotificationQueueItem(watchdog, evaluation, nowIso, evidencePath, eventPath, event.event_id);
|
||||
const notificationPath = writeNotificationQueueItem(args.notificationDir, watchdog, nowIso, notification);
|
||||
notificationWrites.push({
|
||||
watchdogId: watchdog.id,
|
||||
path: notificationPath,
|
||||
notificationId: notification.notification_id,
|
||||
channel: notification.operator_notice.channel,
|
||||
target: notification.operator_notice.target,
|
||||
dispatchReady: notification.blocked_gap === null,
|
||||
blockedGap: notification.blocked_gap,
|
||||
});
|
||||
|
||||
return {
|
||||
...watchdog,
|
||||
lastAlertAt: nowIso,
|
||||
lastObservedActivityAt: watchdog.lastObservedActivityAt ?? watchdog.lastMilestoneAt ?? null,
|
||||
lastNudgeAt: nowIso,
|
||||
};
|
||||
});
|
||||
|
||||
if (args.writeState) {
|
||||
const nextState = {
|
||||
...state,
|
||||
watchdogs: nextWatchdogs,
|
||||
};
|
||||
fs.writeFileSync(args.state, `${JSON.stringify(nextState, null, 2)}\n`, 'utf8');
|
||||
}
|
||||
|
||||
const response = {
|
||||
ok: true,
|
||||
tool: 'long_task_watchdog',
|
||||
version: 'mvp-v2',
|
||||
statePath: path.resolve(args.state),
|
||||
evidenceDir: path.resolve(args.evidenceDir),
|
||||
eventDir: path.resolve(args.eventDir),
|
||||
notificationDir: path.resolve(args.notificationDir),
|
||||
now: nowIso,
|
||||
writeState: args.writeState,
|
||||
result: {
|
||||
activeCount: watchdogs.filter((item) => item?.status === 'active').length,
|
||||
overdueCount: evaluations.filter((item) => item.overdue === true).length,
|
||||
emittedCount: evidenceWrites.length,
|
||||
eventCount: eventWrites.length,
|
||||
notificationCount: notificationWrites.length,
|
||||
evaluations,
|
||||
evidenceWrites,
|
||||
eventWrites,
|
||||
notificationWrites,
|
||||
},
|
||||
};
|
||||
|
||||
process.stdout.write(`${JSON.stringify(response, null, args.compact ? 0 : 2)}\n`);
|
||||
}
|
||||
|
||||
main();
|
||||
@@ -0,0 +1,344 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
import fs from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import process from 'node:process';
|
||||
import { spawnSync } from 'node:child_process';
|
||||
|
||||
const ROOT_DIR = path.resolve(import.meta.dirname, '..');
|
||||
const DEFAULT_SPOOL_DIR = path.join(ROOT_DIR, 'state', 'operator-notify-dispatch-spool');
|
||||
const DEFAULT_QUEUE_DIR = path.join(ROOT_DIR, 'state', 'operator-notify-queue');
|
||||
const DEFAULT_RECEIPT_DIR = path.join(ROOT_DIR, 'state', 'operator-notify-bridge-receipts');
|
||||
const DEFAULT_DISPATCHER_SCRIPT = path.join(ROOT_DIR, 'scripts', 'operator_notify_dispatcher.mjs');
|
||||
|
||||
function parseArgs(argv) {
|
||||
const args = {
|
||||
spoolDir: DEFAULT_SPOOL_DIR,
|
||||
queueDir: DEFAULT_QUEUE_DIR,
|
||||
receiptDir: DEFAULT_RECEIPT_DIR,
|
||||
dispatcherScript: DEFAULT_DISPATCHER_SCRIPT,
|
||||
now: null,
|
||||
compact: false,
|
||||
help: false,
|
||||
dryRun: false,
|
||||
senderCommand: null,
|
||||
};
|
||||
|
||||
for (let i = 0; i < argv.length; i += 1) {
|
||||
const token = argv[i];
|
||||
if (token === '--compact') {
|
||||
args.compact = true;
|
||||
continue;
|
||||
}
|
||||
if (token === '--help' || token === '-h') {
|
||||
args.help = true;
|
||||
continue;
|
||||
}
|
||||
if (token === '--dry-run') {
|
||||
args.dryRun = true;
|
||||
continue;
|
||||
}
|
||||
if (token === '--spool-dir') {
|
||||
args.spoolDir = argv[i + 1] ?? args.spoolDir;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--spool-dir=')) {
|
||||
args.spoolDir = token.slice('--spool-dir='.length) || args.spoolDir;
|
||||
continue;
|
||||
}
|
||||
if (token === '--queue-dir') {
|
||||
args.queueDir = argv[i + 1] ?? args.queueDir;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--queue-dir=')) {
|
||||
args.queueDir = token.slice('--queue-dir='.length) || args.queueDir;
|
||||
continue;
|
||||
}
|
||||
if (token === '--receipt-dir') {
|
||||
args.receiptDir = argv[i + 1] ?? args.receiptDir;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--receipt-dir=')) {
|
||||
args.receiptDir = token.slice('--receipt-dir='.length) || args.receiptDir;
|
||||
continue;
|
||||
}
|
||||
if (token === '--dispatcher-script') {
|
||||
args.dispatcherScript = argv[i + 1] ?? args.dispatcherScript;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--dispatcher-script=')) {
|
||||
args.dispatcherScript = token.slice('--dispatcher-script='.length) || args.dispatcherScript;
|
||||
continue;
|
||||
}
|
||||
if (token === '--sender-command') {
|
||||
args.senderCommand = argv[i + 1] ?? null;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--sender-command=')) {
|
||||
args.senderCommand = token.slice('--sender-command='.length) || null;
|
||||
continue;
|
||||
}
|
||||
if (token === '--now') {
|
||||
args.now = argv[i + 1] ?? null;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--now=')) {
|
||||
args.now = token.slice('--now='.length) || null;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
return args;
|
||||
}
|
||||
|
||||
function printHelp() {
|
||||
process.stdout.write([
|
||||
'Usage:',
|
||||
' node scripts/operator_notify_bridge_supervisor.mjs [--spool-dir <path>] [--queue-dir <path>] [--receipt-dir <path>] [--sender-command <shell>] [--dry-run] [--now <iso>] [--compact]',
|
||||
'',
|
||||
'Supervisor contract for consuming dispatch spool artifacts and delegating the real send step to an upper runtime.',
|
||||
'This script does not embed OpenClaw message.send itself.',
|
||||
'Instead it prepares env vars, executes an injected sender command, and then writes ack/block receipts via operator_notify_dispatcher.mjs.',
|
||||
].join('\n') + '\n');
|
||||
}
|
||||
|
||||
function ensureDir(dirPath) {
|
||||
fs.mkdirSync(dirPath, { recursive: true });
|
||||
}
|
||||
|
||||
function parseTime(value) {
|
||||
if (typeof value !== 'string' || value.length === 0) return null;
|
||||
const timestamp = Date.parse(value);
|
||||
return Number.isNaN(timestamp) ? null : timestamp;
|
||||
}
|
||||
|
||||
function toIso(value) {
|
||||
return new Date(value).toISOString();
|
||||
}
|
||||
|
||||
function readJson(filePath) {
|
||||
return JSON.parse(fs.readFileSync(filePath, 'utf8'));
|
||||
}
|
||||
|
||||
function writeJson(filePath, payload) {
|
||||
ensureDir(path.dirname(filePath));
|
||||
fs.writeFileSync(filePath, `${JSON.stringify(payload, null, 2)}\n`, 'utf8');
|
||||
}
|
||||
|
||||
function listJsonFiles(dirPath) {
|
||||
if (!fs.existsSync(dirPath)) return [];
|
||||
return fs.readdirSync(dirPath)
|
||||
.filter((name) => name.endsWith('.json'))
|
||||
.sort()
|
||||
.map((name) => path.join(dirPath, name));
|
||||
}
|
||||
|
||||
function shellQuote(value) {
|
||||
return `'${String(value).replace(/'/g, `'\\''`)}'`;
|
||||
}
|
||||
|
||||
function buildEnv(spoolItem, spoolPath, queueDir, nowIso) {
|
||||
return {
|
||||
OPERATOR_NOTIFY_SPOOL_PATH: spoolPath,
|
||||
OPERATOR_NOTIFY_QUEUE_ITEM_PATH: spoolItem.queue_item_path,
|
||||
OPERATOR_NOTIFY_NOTIFICATION_ID: spoolItem.notification_id ?? '',
|
||||
OPERATOR_NOTIFY_CHANNEL: spoolItem?.dispatch_contract?.channel ?? '',
|
||||
OPERATOR_NOTIFY_TARGET: spoolItem?.dispatch_contract?.target ?? '',
|
||||
OPERATOR_NOTIFY_MESSAGE: spoolItem?.dispatch_contract?.message ?? '',
|
||||
OPERATOR_NOTIFY_QUEUE_DIR: queueDir,
|
||||
OPERATOR_NOTIFY_NOW: nowIso,
|
||||
};
|
||||
}
|
||||
|
||||
function buildSuggestedCommand(spoolItem) {
|
||||
return [
|
||||
'openclaw message send',
|
||||
`--channel ${shellQuote(spoolItem?.dispatch_contract?.channel ?? '')}`,
|
||||
`--target ${shellQuote(spoolItem?.dispatch_contract?.target ?? '')}`,
|
||||
`--message ${shellQuote(spoolItem?.dispatch_contract?.message ?? '')}`,
|
||||
].join(' ');
|
||||
}
|
||||
|
||||
function writeReceipt(receiptDir, spoolItem, nowIso, state, extra = {}) {
|
||||
const notificationId = spoolItem.notification_id ?? 'unknown-notification';
|
||||
const receiptPath = path.join(receiptDir, `${notificationId}-${state}.json`);
|
||||
writeJson(receiptPath, {
|
||||
receipt_version: '1.0.0',
|
||||
created_at: nowIso,
|
||||
state,
|
||||
notification_id: notificationId,
|
||||
queue_item_path: spoolItem.queue_item_path,
|
||||
spool_dispatch_path: extra.spoolPath ?? null,
|
||||
dispatch_contract: spoolItem.dispatch_contract ?? null,
|
||||
...extra,
|
||||
});
|
||||
return receiptPath;
|
||||
}
|
||||
|
||||
function runTransition(dispatcherScript, queueDir, mode, targetValue, note, nowIso) {
|
||||
const args = [dispatcherScript, '--queue-dir', queueDir, '--compact', '--now', nowIso, `--${mode}`, targetValue];
|
||||
if (note) {
|
||||
args.push('--note', note);
|
||||
}
|
||||
return spawnSync(process.execPath, args, { cwd: ROOT_DIR, encoding: 'utf8' });
|
||||
}
|
||||
|
||||
function main() {
|
||||
const args = parseArgs(process.argv.slice(2));
|
||||
if (args.help) {
|
||||
printHelp();
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
const nowMs = args.now ? parseTime(args.now) : Date.now();
|
||||
if (nowMs === null) {
|
||||
process.stderr.write('Invalid --now value\n');
|
||||
process.exit(1);
|
||||
}
|
||||
const nowIso = toIso(nowMs);
|
||||
|
||||
const spoolDir = path.resolve(args.spoolDir);
|
||||
const queueDir = path.resolve(args.queueDir);
|
||||
const receiptDir = path.resolve(args.receiptDir);
|
||||
const dispatcherScript = path.resolve(args.dispatcherScript);
|
||||
ensureDir(spoolDir);
|
||||
ensureDir(receiptDir);
|
||||
|
||||
const spoolFiles = listJsonFiles(spoolDir);
|
||||
const scanned = [];
|
||||
const acked = [];
|
||||
const blocked = [];
|
||||
const pending = [];
|
||||
const skipped = [];
|
||||
|
||||
for (const spoolPath of spoolFiles) {
|
||||
const spoolItem = readJson(spoolPath);
|
||||
const notificationId = spoolItem.notification_id ?? null;
|
||||
const queueItemPath = spoolItem.queue_item_path ?? null;
|
||||
const channel = spoolItem?.dispatch_contract?.channel ?? null;
|
||||
const target = spoolItem?.dispatch_contract?.target ?? null;
|
||||
const message = spoolItem?.dispatch_contract?.message ?? null;
|
||||
scanned.push({ spoolPath, notificationId, queueItemPath });
|
||||
|
||||
if (!queueItemPath || !channel || !target || !message) {
|
||||
const reason = 'spool artifact missing queue_item_path/channel/target/message';
|
||||
const receiptPath = writeReceipt(receiptDir, spoolItem, nowIso, 'blocked', { spoolPath, reason, blocked_by: 'bridge_supervisor' });
|
||||
const transition = runTransition(dispatcherScript, queueDir, 'block', queueItemPath || notificationId || '', reason, nowIso);
|
||||
blocked.push({ spoolPath, notificationId, queueItemPath, reason, receiptPath, transitionStatus: transition.status });
|
||||
continue;
|
||||
}
|
||||
|
||||
const envPayload = buildEnv(spoolItem, spoolPath, queueDir, nowIso);
|
||||
if (args.dryRun || !args.senderCommand) {
|
||||
const receiptPath = writeReceipt(receiptDir, spoolItem, nowIso, 'pending_external_send', {
|
||||
spoolPath,
|
||||
supervisor_mode: args.dryRun ? 'dry_run' : 'no_sender_command',
|
||||
suggested_command: buildSuggestedCommand(spoolItem),
|
||||
env: envPayload,
|
||||
});
|
||||
pending.push({ spoolPath, notificationId, queueItemPath, receiptPath, reason: args.dryRun ? 'dry_run' : 'sender_command_not_configured' });
|
||||
continue;
|
||||
}
|
||||
|
||||
const sender = spawnSync(args.senderCommand, {
|
||||
cwd: ROOT_DIR,
|
||||
encoding: 'utf8',
|
||||
shell: true,
|
||||
env: {
|
||||
...process.env,
|
||||
...envPayload,
|
||||
},
|
||||
});
|
||||
|
||||
let senderBinding = null;
|
||||
try {
|
||||
senderBinding = sender.stdout ? JSON.parse(sender.stdout) : null;
|
||||
} catch {
|
||||
senderBinding = null;
|
||||
}
|
||||
|
||||
const bindingState = senderBinding?.state ?? null;
|
||||
const bindingReason = senderBinding?.reason ?? null;
|
||||
|
||||
if (sender.status === 0 && bindingState === 'sent') {
|
||||
const note = `message.send delivered by upper runtime via bridge supervisor at ${nowIso}`;
|
||||
const transition = runTransition(dispatcherScript, queueDir, 'ack', queueItemPath, note, nowIso);
|
||||
const receiptPath = writeReceipt(receiptDir, spoolItem, nowIso, 'acked', {
|
||||
spoolPath,
|
||||
supervisor_mode: 'sender_command',
|
||||
sender_binding: senderBinding,
|
||||
sender_status: sender.status,
|
||||
sender_stdout: sender.stdout ?? '',
|
||||
sender_stderr: sender.stderr ?? '',
|
||||
dispatcher_transition_status: transition.status,
|
||||
});
|
||||
acked.push({ spoolPath, notificationId, queueItemPath, receiptPath, transitionStatus: transition.status });
|
||||
continue;
|
||||
}
|
||||
|
||||
if (sender.status === 0 && bindingState === 'pending') {
|
||||
const receiptPath = writeReceipt(receiptDir, spoolItem, nowIso, 'pending_external_send', {
|
||||
spoolPath,
|
||||
supervisor_mode: 'sender_command',
|
||||
sender_binding: senderBinding,
|
||||
sender_status: sender.status,
|
||||
sender_stdout: sender.stdout ?? '',
|
||||
sender_stderr: sender.stderr ?? '',
|
||||
reason: bindingReason ?? 'sender binding reported pending',
|
||||
});
|
||||
pending.push({ spoolPath, notificationId, queueItemPath, receiptPath, reason: bindingReason ?? 'sender_binding_pending' });
|
||||
continue;
|
||||
}
|
||||
|
||||
const reason = bindingReason
|
||||
?? (sender.status === 0 && bindingState === 'blocked'
|
||||
? 'sender binding reported blocked'
|
||||
: `upper runtime sender command failed with status ${sender.status ?? 'null'}`);
|
||||
const transition = runTransition(dispatcherScript, queueDir, 'block', queueItemPath, reason, nowIso);
|
||||
const receiptPath = writeReceipt(receiptDir, spoolItem, nowIso, 'blocked', {
|
||||
spoolPath,
|
||||
supervisor_mode: 'sender_command',
|
||||
sender_binding: senderBinding,
|
||||
reason,
|
||||
sender_status: sender.status,
|
||||
sender_stdout: sender.stdout ?? '',
|
||||
sender_stderr: sender.stderr ?? '',
|
||||
dispatcher_transition_status: transition.status,
|
||||
});
|
||||
blocked.push({ spoolPath, notificationId, queueItemPath, reason, receiptPath, transitionStatus: transition.status });
|
||||
}
|
||||
|
||||
const response = {
|
||||
ok: true,
|
||||
tool: 'operator_notify_bridge_supervisor',
|
||||
version: 'mvp-v1',
|
||||
now: nowIso,
|
||||
spoolDir,
|
||||
queueDir,
|
||||
receiptDir,
|
||||
senderCommandConfigured: Boolean(args.senderCommand),
|
||||
dryRun: args.dryRun,
|
||||
result: {
|
||||
scannedCount: scanned.length,
|
||||
ackedCount: acked.length,
|
||||
blockedCount: blocked.length,
|
||||
pendingCount: pending.length,
|
||||
skippedCount: skipped.length,
|
||||
scanned,
|
||||
acked,
|
||||
blocked,
|
||||
pending,
|
||||
skipped,
|
||||
},
|
||||
};
|
||||
|
||||
process.stdout.write(`${JSON.stringify(response, null, args.compact ? 0 : 2)}\n`);
|
||||
}
|
||||
|
||||
main();
|
||||
@@ -0,0 +1,448 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
import fs from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import process from 'node:process';
|
||||
import crypto from 'node:crypto';
|
||||
|
||||
const ROOT_DIR = path.resolve(import.meta.dirname, '..');
|
||||
const DEFAULT_QUEUE_DIR = path.join(ROOT_DIR, 'state', 'operator-notify-queue');
|
||||
const DEFAULT_SPOOL_DIR = path.join(ROOT_DIR, 'state', 'operator-notify-dispatch-spool');
|
||||
|
||||
function parseArgs(argv) {
|
||||
const args = {
|
||||
queueDir: DEFAULT_QUEUE_DIR,
|
||||
spoolDir: DEFAULT_SPOOL_DIR,
|
||||
now: null,
|
||||
compact: false,
|
||||
ack: null,
|
||||
block: null,
|
||||
note: null,
|
||||
claim: false,
|
||||
help: false,
|
||||
};
|
||||
|
||||
for (let i = 0; i < argv.length; i += 1) {
|
||||
const token = argv[i];
|
||||
if (token === '--compact') {
|
||||
args.compact = true;
|
||||
continue;
|
||||
}
|
||||
if (token === '--claim') {
|
||||
args.claim = true;
|
||||
continue;
|
||||
}
|
||||
if (token === '--help' || token === '-h') {
|
||||
args.help = true;
|
||||
continue;
|
||||
}
|
||||
if (token === '--queue-dir') {
|
||||
args.queueDir = argv[i + 1] ?? args.queueDir;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--queue-dir=')) {
|
||||
args.queueDir = token.slice('--queue-dir='.length) || args.queueDir;
|
||||
continue;
|
||||
}
|
||||
if (token === '--spool-dir') {
|
||||
args.spoolDir = argv[i + 1] ?? args.spoolDir;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--spool-dir=')) {
|
||||
args.spoolDir = token.slice('--spool-dir='.length) || args.spoolDir;
|
||||
continue;
|
||||
}
|
||||
if (token === '--now') {
|
||||
args.now = argv[i + 1] ?? null;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--now=')) {
|
||||
args.now = token.slice('--now='.length) || null;
|
||||
continue;
|
||||
}
|
||||
if (token === '--ack') {
|
||||
args.ack = argv[i + 1] ?? null;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--ack=')) {
|
||||
args.ack = token.slice('--ack='.length) || null;
|
||||
continue;
|
||||
}
|
||||
if (token === '--block') {
|
||||
args.block = argv[i + 1] ?? null;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--block=')) {
|
||||
args.block = token.slice('--block='.length) || null;
|
||||
continue;
|
||||
}
|
||||
if (token === '--note') {
|
||||
args.note = argv[i + 1] ?? null;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--note=')) {
|
||||
args.note = token.slice('--note='.length) || null;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
return args;
|
||||
}
|
||||
|
||||
function printHelp() {
|
||||
process.stdout.write([
|
||||
'Usage:',
|
||||
' node scripts/operator_notify_dispatcher.mjs [--queue-dir <path>] [--spool-dir <path>] [--now <iso>] [--compact] [--claim]',
|
||||
' node scripts/operator_notify_dispatcher.mjs --ack <queue-item-path|notification_id> [--queue-dir <path>] [--now <iso>] [--note <text>] [--compact]',
|
||||
' node scripts/operator_notify_dispatcher.mjs --block <queue-item-path|notification_id> [--queue-dir <path>] [--now <iso>] [--note <reason>] [--compact]',
|
||||
'',
|
||||
'File-backed dispatcher for state/operator-notify-queue.',
|
||||
'Default dispatch mode is spool/handoff only: it never claims to call message.send directly.',
|
||||
].join('\n') + '\n');
|
||||
}
|
||||
|
||||
function ensureDir(dirPath) {
|
||||
fs.mkdirSync(dirPath, { recursive: true });
|
||||
}
|
||||
|
||||
function parseTime(value) {
|
||||
if (typeof value !== 'string' || value.length === 0) return null;
|
||||
const timestamp = Date.parse(value);
|
||||
return Number.isNaN(timestamp) ? null : timestamp;
|
||||
}
|
||||
|
||||
function toIso(value) {
|
||||
return new Date(value).toISOString();
|
||||
}
|
||||
|
||||
function safeSlug(value) {
|
||||
return String(value || 'item')
|
||||
.replace(/[^a-zA-Z0-9._-]+/g, '-')
|
||||
.replace(/^-+|-+$/g, '')
|
||||
.slice(0, 120) || 'item';
|
||||
}
|
||||
|
||||
function listJsonFiles(dirPath) {
|
||||
if (!fs.existsSync(dirPath)) return [];
|
||||
return fs.readdirSync(dirPath)
|
||||
.filter((name) => name.endsWith('.json'))
|
||||
.sort()
|
||||
.map((name) => path.join(dirPath, name));
|
||||
}
|
||||
|
||||
function readJson(filePath) {
|
||||
return JSON.parse(fs.readFileSync(filePath, 'utf8'));
|
||||
}
|
||||
|
||||
function writeJson(filePath, payload) {
|
||||
ensureDir(path.dirname(filePath));
|
||||
fs.writeFileSync(filePath, `${JSON.stringify(payload, null, 2)}\n`, 'utf8');
|
||||
}
|
||||
|
||||
function isPendingLike(status) {
|
||||
return status === 'pending' || status === 'claimed';
|
||||
}
|
||||
|
||||
function resolveChannel(payload) {
|
||||
return payload?.operator_notice?.channel ?? payload?.dispatch_hint?.channel ?? null;
|
||||
}
|
||||
|
||||
function resolveTarget(payload) {
|
||||
return payload?.operator_notice?.target ?? payload?.dispatch_hint?.target ?? null;
|
||||
}
|
||||
|
||||
function resolveMessage(payload) {
|
||||
return payload?.operator_notice?.message ?? payload?.dispatch_hint?.message ?? null;
|
||||
}
|
||||
|
||||
function markBlocked(payload, nowIso, reason) {
|
||||
return {
|
||||
...payload,
|
||||
status: 'blocked',
|
||||
blocked_gap: reason,
|
||||
blocked_at: nowIso,
|
||||
dispatch_result: {
|
||||
mode: 'spool_only',
|
||||
state: 'blocked',
|
||||
blockedAt: nowIso,
|
||||
reason,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function claimPayload(payload, nowIso) {
|
||||
return {
|
||||
...payload,
|
||||
status: 'claimed',
|
||||
dispatch_claimed_at: nowIso,
|
||||
dispatch_result: {
|
||||
...(payload.dispatch_result ?? {}),
|
||||
mode: 'spool_only',
|
||||
state: 'claimed',
|
||||
claimedAt: nowIso,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function buildSpoolPayload(payload, queuePath, nowIso) {
|
||||
const channel = resolveChannel(payload);
|
||||
const target = resolveTarget(payload);
|
||||
const message = resolveMessage(payload);
|
||||
const notificationId = payload.notification_id ?? crypto.randomUUID();
|
||||
return {
|
||||
handoff_version: '1.0.0',
|
||||
created_at: nowIso,
|
||||
queue_item_path: queuePath,
|
||||
notification_id: notificationId,
|
||||
dispatch_contract: {
|
||||
executor: 'message.send',
|
||||
mode: 'manual_handoff',
|
||||
requires_human_or_upper_runtime: true,
|
||||
channel,
|
||||
target,
|
||||
message,
|
||||
},
|
||||
governance: payload.governance ?? null,
|
||||
evidence_refs: payload.evidence_refs ?? [],
|
||||
queue_status_before_dispatch: payload.status ?? 'pending',
|
||||
ack_instructions: {
|
||||
command: `node scripts/operator_notify_dispatcher.mjs --ack ${JSON.stringify(queuePath)} --note ${JSON.stringify('message.send delivered by upper runtime')}`,
|
||||
accepted_status_transition: 'dispatched -> acked',
|
||||
},
|
||||
block_instructions: {
|
||||
command: `node scripts/operator_notify_dispatcher.mjs --block ${JSON.stringify(queuePath)} --note ${JSON.stringify('upper runtime could not safely deliver message.send')}`,
|
||||
accepted_status_transition: 'dispatched -> blocked',
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function dispatchPayload(payload, nowIso, spoolPath) {
|
||||
return {
|
||||
...payload,
|
||||
status: 'dispatched',
|
||||
dispatched_at: nowIso,
|
||||
blocked_gap: null,
|
||||
dispatch_result: {
|
||||
mode: 'spool_only',
|
||||
state: 'dispatched',
|
||||
dispatchedAt: nowIso,
|
||||
spoolPath,
|
||||
executor: 'message.send',
|
||||
delivery: 'handoff_pending_ack',
|
||||
ackRequired: true,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function findQueueItem(queueDir, targetValue) {
|
||||
if (!targetValue) return null;
|
||||
const directPath = path.isAbsolute(targetValue) ? targetValue : path.join(queueDir, targetValue);
|
||||
if (fs.existsSync(directPath)) return directPath;
|
||||
for (const filePath of listJsonFiles(queueDir)) {
|
||||
try {
|
||||
const payload = readJson(filePath);
|
||||
if (payload?.notification_id === targetValue) return filePath;
|
||||
} catch {}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function transitionItem(queueDir, targetValue, nowIso, transition, note) {
|
||||
const queuePath = findQueueItem(queueDir, targetValue);
|
||||
if (!queuePath) {
|
||||
return {
|
||||
ok: false,
|
||||
error: `queue item not found for transition target: ${targetValue}`,
|
||||
};
|
||||
}
|
||||
|
||||
const payload = readJson(queuePath);
|
||||
const previousStatus = payload?.status ?? null;
|
||||
|
||||
if (transition === 'ack') {
|
||||
const nextPayload = {
|
||||
...payload,
|
||||
status: 'acked',
|
||||
acked_at: nowIso,
|
||||
ack_note: note ?? null,
|
||||
dispatch_result: {
|
||||
...(payload.dispatch_result ?? {}),
|
||||
state: 'acked',
|
||||
ackedAt: nowIso,
|
||||
ackNote: note ?? null,
|
||||
},
|
||||
};
|
||||
writeJson(queuePath, nextPayload);
|
||||
return {
|
||||
ok: true,
|
||||
mode: 'ack',
|
||||
queuePath,
|
||||
notificationId: payload.notification_id ?? null,
|
||||
previousStatus,
|
||||
nextStatus: 'acked',
|
||||
};
|
||||
}
|
||||
|
||||
if (transition === 'block') {
|
||||
const reason = note ?? payload.blocked_gap ?? 'blocked by upper runtime';
|
||||
const nextPayload = {
|
||||
...payload,
|
||||
status: 'blocked',
|
||||
blocked_at: nowIso,
|
||||
blocked_gap: reason,
|
||||
dispatch_result: {
|
||||
...(payload.dispatch_result ?? {}),
|
||||
state: 'blocked',
|
||||
blockedAt: nowIso,
|
||||
reason,
|
||||
},
|
||||
};
|
||||
writeJson(queuePath, nextPayload);
|
||||
return {
|
||||
ok: true,
|
||||
mode: 'block',
|
||||
queuePath,
|
||||
notificationId: payload.notification_id ?? null,
|
||||
previousStatus,
|
||||
nextStatus: 'blocked',
|
||||
reason,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
ok: false,
|
||||
error: `unsupported transition: ${transition}`,
|
||||
};
|
||||
}
|
||||
|
||||
function ackItem(queueDir, ackValue, nowIso, note) {
|
||||
return transitionItem(queueDir, ackValue, nowIso, 'ack', note);
|
||||
}
|
||||
|
||||
function blockItem(queueDir, targetValue, nowIso, note) {
|
||||
return transitionItem(queueDir, targetValue, nowIso, 'block', note);
|
||||
}
|
||||
|
||||
function main() {
|
||||
const args = parseArgs(process.argv.slice(2));
|
||||
if (args.help) {
|
||||
printHelp();
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
const nowMs = args.now ? parseTime(args.now) : Date.now();
|
||||
if (nowMs === null) {
|
||||
process.stderr.write('Invalid --now value\n');
|
||||
process.exit(1);
|
||||
}
|
||||
const nowIso = toIso(nowMs);
|
||||
|
||||
if (args.ack) {
|
||||
const ackResult = ackItem(path.resolve(args.queueDir), args.ack, nowIso, args.note);
|
||||
if (!ackResult.ok) {
|
||||
process.stderr.write(`${ackResult.error}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
process.stdout.write(`${JSON.stringify({ ok: true, tool: 'operator_notify_dispatcher', mode: 'ack', now: nowIso, result: ackResult }, null, args.compact ? 0 : 2)}\n`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (args.block) {
|
||||
const blockResult = blockItem(path.resolve(args.queueDir), args.block, nowIso, args.note);
|
||||
if (!blockResult.ok) {
|
||||
process.stderr.write(`${blockResult.error}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
process.stdout.write(`${JSON.stringify({ ok: true, tool: 'operator_notify_dispatcher', mode: 'block', now: nowIso, result: blockResult }, null, args.compact ? 0 : 2)}\n`);
|
||||
return;
|
||||
}
|
||||
|
||||
const queueDir = path.resolve(args.queueDir);
|
||||
const spoolDir = path.resolve(args.spoolDir);
|
||||
ensureDir(queueDir);
|
||||
ensureDir(spoolDir);
|
||||
|
||||
const files = listJsonFiles(queueDir);
|
||||
const queueScanned = [];
|
||||
const blocked = [];
|
||||
const dispatched = [];
|
||||
const skipped = [];
|
||||
const claimed = [];
|
||||
|
||||
for (const filePath of files) {
|
||||
const payload = readJson(filePath);
|
||||
queueScanned.push({ path: filePath, notificationId: payload.notification_id ?? null, status: payload.status ?? null });
|
||||
|
||||
if (!isPendingLike(payload.status)) {
|
||||
skipped.push({ path: filePath, notificationId: payload.notification_id ?? null, status: payload.status ?? null, reason: 'not_pending_like' });
|
||||
continue;
|
||||
}
|
||||
|
||||
const channel = resolveChannel(payload);
|
||||
const target = resolveTarget(payload);
|
||||
const message = resolveMessage(payload);
|
||||
|
||||
if (!channel || !target || !message) {
|
||||
const reason = payload.blocked_gap || 'queue item missing channel/target/message for safe dispatch';
|
||||
const nextPayload = markBlocked(payload, nowIso, reason);
|
||||
writeJson(filePath, nextPayload);
|
||||
blocked.push({ path: filePath, notificationId: payload.notification_id ?? null, status: 'blocked', reason });
|
||||
continue;
|
||||
}
|
||||
|
||||
let workingPayload = payload;
|
||||
if (args.claim && payload.status === 'pending') {
|
||||
workingPayload = claimPayload(payload, nowIso);
|
||||
writeJson(filePath, workingPayload);
|
||||
claimed.push({ path: filePath, notificationId: payload.notification_id ?? null, status: 'claimed' });
|
||||
}
|
||||
|
||||
const spoolPayload = buildSpoolPayload(workingPayload, filePath, nowIso);
|
||||
const spoolName = `${safeSlug(payload.notification_id ?? path.basename(filePath, '.json'))}-dispatch.json`;
|
||||
const spoolPath = path.join(spoolDir, spoolName);
|
||||
writeJson(spoolPath, spoolPayload);
|
||||
|
||||
const nextPayload = dispatchPayload(workingPayload, nowIso, spoolPath);
|
||||
writeJson(filePath, nextPayload);
|
||||
dispatched.push({
|
||||
path: filePath,
|
||||
notificationId: payload.notification_id ?? null,
|
||||
status: 'dispatched',
|
||||
spoolPath,
|
||||
delivery: 'handoff_pending_ack',
|
||||
});
|
||||
}
|
||||
|
||||
const response = {
|
||||
ok: true,
|
||||
tool: 'operator_notify_dispatcher',
|
||||
version: 'mvp-v1',
|
||||
mode: 'dispatch',
|
||||
dispatchMode: 'spool_only',
|
||||
queueDir,
|
||||
spoolDir,
|
||||
now: nowIso,
|
||||
result: {
|
||||
scannedCount: queueScanned.length,
|
||||
blockedCount: blocked.length,
|
||||
dispatchedCount: dispatched.length,
|
||||
claimedCount: claimed.length,
|
||||
skippedCount: skipped.length,
|
||||
queueScanned,
|
||||
claimed,
|
||||
blocked,
|
||||
dispatched,
|
||||
skipped,
|
||||
},
|
||||
};
|
||||
|
||||
process.stdout.write(`${JSON.stringify(response, null, args.compact ? 0 : 2)}\n`);
|
||||
}
|
||||
|
||||
main();
|
||||
@@ -0,0 +1,270 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
import fs from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import process from 'node:process';
|
||||
import { spawnSync } from 'node:child_process';
|
||||
|
||||
const ROOT_DIR = path.resolve(import.meta.dirname, '..');
|
||||
const DEFAULT_ATTEMPT_DIR = path.join(ROOT_DIR, 'state', 'operator-notify-sender-attempts');
|
||||
|
||||
function parseArgs(argv) {
|
||||
const args = {
|
||||
mode: 'shim',
|
||||
attemptDir: DEFAULT_ATTEMPT_DIR,
|
||||
now: null,
|
||||
compact: false,
|
||||
help: false,
|
||||
openclawBin: 'openclaw',
|
||||
};
|
||||
|
||||
for (let i = 0; i < argv.length; i += 1) {
|
||||
const token = argv[i];
|
||||
if (token === '--help' || token === '-h') {
|
||||
args.help = true;
|
||||
continue;
|
||||
}
|
||||
if (token === '--compact') {
|
||||
args.compact = true;
|
||||
continue;
|
||||
}
|
||||
if (token === '--mode') {
|
||||
args.mode = argv[i + 1] ?? args.mode;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--mode=')) {
|
||||
args.mode = token.slice('--mode='.length) || args.mode;
|
||||
continue;
|
||||
}
|
||||
if (token === '--attempt-dir') {
|
||||
args.attemptDir = argv[i + 1] ?? args.attemptDir;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--attempt-dir=')) {
|
||||
args.attemptDir = token.slice('--attempt-dir='.length) || args.attemptDir;
|
||||
continue;
|
||||
}
|
||||
if (token === '--now') {
|
||||
args.now = argv[i + 1] ?? null;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--now=')) {
|
||||
args.now = token.slice('--now='.length) || null;
|
||||
continue;
|
||||
}
|
||||
if (token === '--openclaw-bin') {
|
||||
args.openclawBin = argv[i + 1] ?? args.openclawBin;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
if (token.startsWith('--openclaw-bin=')) {
|
||||
args.openclawBin = token.slice('--openclaw-bin='.length) || args.openclawBin;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
return args;
|
||||
}
|
||||
|
||||
function printHelp() {
|
||||
process.stdout.write([
|
||||
'Usage:',
|
||||
' node scripts/operator_notify_sender_binding.mjs [--mode shim|openclaw-cli] [--attempt-dir <path>] [--openclaw-bin <path>] [--now <iso>] [--compact]',
|
||||
'',
|
||||
'Reads OPERATOR_NOTIFY_* env vars prepared by operator_notify_bridge_supervisor.mjs and emits a JSON sender-binding result.',
|
||||
'States:',
|
||||
' sent -> bridge may ack queue item',
|
||||
' pending -> bridge should keep queue item dispatched and write pending_external_send receipt',
|
||||
' blocked -> bridge should mark queue item blocked',
|
||||
].join('\n') + '\n');
|
||||
}
|
||||
|
||||
function ensureDir(dirPath) {
|
||||
fs.mkdirSync(dirPath, { recursive: true });
|
||||
}
|
||||
|
||||
function parseTime(value) {
|
||||
if (typeof value !== 'string' || value.length === 0) return null;
|
||||
const timestamp = Date.parse(value);
|
||||
return Number.isNaN(timestamp) ? null : timestamp;
|
||||
}
|
||||
|
||||
function toIso(value) {
|
||||
return new Date(value).toISOString();
|
||||
}
|
||||
|
||||
function safeSlug(value) {
|
||||
return String(value || 'item')
|
||||
.replace(/[^a-zA-Z0-9._-]+/g, '-')
|
||||
.replace(/^-+|-+$/g, '')
|
||||
.slice(0, 120) || 'item';
|
||||
}
|
||||
|
||||
function writeJson(filePath, payload) {
|
||||
ensureDir(path.dirname(filePath));
|
||||
fs.writeFileSync(filePath, `${JSON.stringify(payload, null, 2)}\n`, 'utf8');
|
||||
}
|
||||
|
||||
function readContractFromEnv() {
|
||||
return {
|
||||
spoolPath: process.env.OPERATOR_NOTIFY_SPOOL_PATH ?? '',
|
||||
queueItemPath: process.env.OPERATOR_NOTIFY_QUEUE_ITEM_PATH ?? '',
|
||||
notificationId: process.env.OPERATOR_NOTIFY_NOTIFICATION_ID ?? '',
|
||||
channel: process.env.OPERATOR_NOTIFY_CHANNEL ?? '',
|
||||
target: process.env.OPERATOR_NOTIFY_TARGET ?? '',
|
||||
message: process.env.OPERATOR_NOTIFY_MESSAGE ?? '',
|
||||
queueDir: process.env.OPERATOR_NOTIFY_QUEUE_DIR ?? '',
|
||||
now: process.env.OPERATOR_NOTIFY_NOW ?? '',
|
||||
};
|
||||
}
|
||||
|
||||
function validateContract(contract) {
|
||||
const missing = [];
|
||||
if (!contract.spoolPath) missing.push('OPERATOR_NOTIFY_SPOOL_PATH');
|
||||
if (!contract.queueItemPath) missing.push('OPERATOR_NOTIFY_QUEUE_ITEM_PATH');
|
||||
if (!contract.notificationId) missing.push('OPERATOR_NOTIFY_NOTIFICATION_ID');
|
||||
if (!contract.channel) missing.push('OPERATOR_NOTIFY_CHANNEL');
|
||||
if (!contract.target) missing.push('OPERATOR_NOTIFY_TARGET');
|
||||
if (!contract.message) missing.push('OPERATOR_NOTIFY_MESSAGE');
|
||||
return missing;
|
||||
}
|
||||
|
||||
function buildAttemptPath(attemptDir, notificationId, nowIso, mode) {
|
||||
return path.join(attemptDir, `${safeSlug(notificationId)}-${safeSlug(nowIso)}-${safeSlug(mode)}.json`);
|
||||
}
|
||||
|
||||
function emit(payload, compact) {
|
||||
process.stdout.write(`${JSON.stringify(payload, null, compact ? 0 : 2)}\n`);
|
||||
}
|
||||
|
||||
function main() {
|
||||
const args = parseArgs(process.argv.slice(2));
|
||||
if (args.help) {
|
||||
printHelp();
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
const nowMs = args.now ? parseTime(args.now) : Date.now();
|
||||
if (nowMs === null) {
|
||||
process.stderr.write('Invalid --now value\n');
|
||||
process.exit(1);
|
||||
}
|
||||
const nowIso = toIso(nowMs);
|
||||
const contract = readContractFromEnv();
|
||||
const missing = validateContract(contract);
|
||||
|
||||
if (missing.length > 0) {
|
||||
emit({
|
||||
ok: true,
|
||||
tool: 'operator_notify_sender_binding',
|
||||
version: 'mvp-v1',
|
||||
mode: args.mode,
|
||||
now: nowIso,
|
||||
state: 'blocked',
|
||||
reason: `missing sender env: ${missing.join(', ')}`,
|
||||
contract,
|
||||
}, args.compact);
|
||||
return;
|
||||
}
|
||||
|
||||
const attemptDir = path.resolve(args.attemptDir);
|
||||
ensureDir(attemptDir);
|
||||
const attemptPath = buildAttemptPath(attemptDir, contract.notificationId, nowIso, args.mode);
|
||||
|
||||
if (args.mode === 'shim') {
|
||||
const payload = {
|
||||
attempt_version: '1.0.0',
|
||||
created_at: nowIso,
|
||||
mode: 'shim',
|
||||
state: 'pending',
|
||||
reason: 'shim binding recorded a verifiable send attempt but cannot invoke privileged message.send from this repo/runtime boundary',
|
||||
contract,
|
||||
};
|
||||
writeJson(attemptPath, payload);
|
||||
emit({
|
||||
ok: true,
|
||||
tool: 'operator_notify_sender_binding',
|
||||
version: 'mvp-v1',
|
||||
mode: 'shim',
|
||||
now: nowIso,
|
||||
state: 'pending',
|
||||
reason: payload.reason,
|
||||
attemptPath,
|
||||
}, args.compact);
|
||||
return;
|
||||
}
|
||||
|
||||
if (args.mode === 'openclaw-cli') {
|
||||
const commandArgs = [
|
||||
'message', 'send',
|
||||
'--channel', contract.channel,
|
||||
'--target', contract.target,
|
||||
'--message', contract.message,
|
||||
];
|
||||
const sender = spawnSync(args.openclawBin, commandArgs, {
|
||||
cwd: ROOT_DIR,
|
||||
encoding: 'utf8',
|
||||
env: process.env,
|
||||
});
|
||||
|
||||
const attemptPayload = {
|
||||
attempt_version: '1.0.0',
|
||||
created_at: nowIso,
|
||||
mode: 'openclaw-cli',
|
||||
command: [args.openclawBin, ...commandArgs],
|
||||
contract,
|
||||
sender_status: sender.status,
|
||||
sender_stdout: sender.stdout ?? '',
|
||||
sender_stderr: sender.stderr ?? '',
|
||||
};
|
||||
|
||||
if (sender.status === 0) {
|
||||
attemptPayload.state = 'sent';
|
||||
writeJson(attemptPath, attemptPayload);
|
||||
emit({
|
||||
ok: true,
|
||||
tool: 'operator_notify_sender_binding',
|
||||
version: 'mvp-v1',
|
||||
mode: 'openclaw-cli',
|
||||
now: nowIso,
|
||||
state: 'sent',
|
||||
attemptPath,
|
||||
sender_status: sender.status,
|
||||
}, args.compact);
|
||||
return;
|
||||
}
|
||||
|
||||
attemptPayload.state = 'blocked';
|
||||
attemptPayload.reason = `openclaw message send failed with status ${sender.status ?? 'null'}`;
|
||||
writeJson(attemptPath, attemptPayload);
|
||||
emit({
|
||||
ok: true,
|
||||
tool: 'operator_notify_sender_binding',
|
||||
version: 'mvp-v1',
|
||||
mode: 'openclaw-cli',
|
||||
now: nowIso,
|
||||
state: 'blocked',
|
||||
reason: attemptPayload.reason,
|
||||
attemptPath,
|
||||
sender_status: sender.status,
|
||||
sender_stdout: sender.stdout ?? '',
|
||||
sender_stderr: sender.stderr ?? '',
|
||||
}, args.compact);
|
||||
return;
|
||||
}
|
||||
|
||||
emit({
|
||||
ok: true,
|
||||
tool: 'operator_notify_sender_binding',
|
||||
version: 'mvp-v1',
|
||||
mode: args.mode,
|
||||
now: nowIso,
|
||||
state: 'blocked',
|
||||
reason: `unsupported sender binding mode: ${args.mode}`,
|
||||
}, args.compact);
|
||||
}
|
||||
|
||||
main();
|
||||
103
plugins/reporting-governance/scripts/package-smoke.mjs
Normal file
103
plugins/reporting-governance/scripts/package-smoke.mjs
Normal file
@@ -0,0 +1,103 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
import fs from 'node:fs';
|
||||
import os from 'node:os';
|
||||
import path from 'node:path';
|
||||
import process from 'node:process';
|
||||
|
||||
import { generateDeploymentProfileArtifactFromFile } from '../src/storage/profile-generator.mjs';
|
||||
import { runOrchestratorAdapter } from '../src/adapters/orchestrator.mjs';
|
||||
|
||||
const packageRoot = path.resolve(import.meta.dirname, '..');
|
||||
|
||||
function parseArgs(argv) {
|
||||
const args = {
|
||||
workspace: null,
|
||||
now: '2026-05-07T08:20:00.000Z',
|
||||
compact: false,
|
||||
};
|
||||
|
||||
for (let i = 0; i < argv.length; i += 1) {
|
||||
const token = argv[i];
|
||||
if (token === '--compact') { args.compact = true; continue; }
|
||||
if (token === '--workspace') { args.workspace = argv[i + 1] ?? null; i += 1; continue; }
|
||||
if (token.startsWith('--workspace=')) { args.workspace = token.slice('--workspace='.length) || null; continue; }
|
||||
if (token === '--now') { args.now = argv[i + 1] ?? args.now; i += 1; continue; }
|
||||
if (token.startsWith('--now=')) { args.now = token.slice('--now='.length) || args.now; continue; }
|
||||
}
|
||||
|
||||
return args;
|
||||
}
|
||||
|
||||
function mkdirs(root, names) {
|
||||
for (const name of names) {
|
||||
fs.mkdirSync(path.join(root, name), { recursive: true });
|
||||
}
|
||||
}
|
||||
|
||||
function writeState(root) {
|
||||
const statePath = path.join(root, 'watchdog-state.json');
|
||||
fs.writeFileSync(statePath, `${JSON.stringify({
|
||||
version: 1,
|
||||
watchdogs: [
|
||||
{
|
||||
id: 'reporting-governance-plugin-watchdog',
|
||||
task: 'reporting-governance plugin package smoke',
|
||||
status: 'active',
|
||||
ownerSessionKey: 'agent:coder:main',
|
||||
reportChannel: 'telegram',
|
||||
reportTarget: '864811879',
|
||||
intervalMinutes: 10,
|
||||
lastMilestoneAt: '2026-05-07T08:00:00.000Z',
|
||||
lastAlertAt: null,
|
||||
}
|
||||
]
|
||||
}, null, 2)}\n`, 'utf8');
|
||||
return statePath;
|
||||
}
|
||||
|
||||
function main() {
|
||||
const args = parseArgs(process.argv.slice(2));
|
||||
const workspace = path.resolve(args.workspace ?? fs.mkdtempSync(path.join(os.tmpdir(), 'reporting-governance-package-smoke-')));
|
||||
mkdirs(workspace, ['evidence', 'events', 'queue', 'spool', 'receipts']);
|
||||
const state = writeState(workspace);
|
||||
|
||||
const profileSourcePath = path.join(packageRoot, 'profiles-src', 'strict-manager-mode.yaml');
|
||||
const artifact = generateDeploymentProfileArtifactFromFile(profileSourcePath);
|
||||
const artifactPath = path.join(workspace, 'strict-manager-mode.profile.json');
|
||||
fs.writeFileSync(artifactPath, `${JSON.stringify(artifact, null, 2)}\n`, 'utf8');
|
||||
|
||||
const result = runOrchestratorAdapter({
|
||||
profileArtifact: artifact,
|
||||
repoRootOverride: packageRoot,
|
||||
state,
|
||||
evidenceDir: path.join(workspace, 'evidence'),
|
||||
eventDir: path.join(workspace, 'events'),
|
||||
queueDir: path.join(workspace, 'queue'),
|
||||
spoolDir: path.join(workspace, 'spool'),
|
||||
receiptDir: path.join(workspace, 'receipts'),
|
||||
writeState: true,
|
||||
dryRun: true,
|
||||
now: args.now,
|
||||
});
|
||||
|
||||
const payload = {
|
||||
ok: true,
|
||||
tool: 'reporting-governance-package-smoke',
|
||||
packageRoot,
|
||||
workspace,
|
||||
artifactPath,
|
||||
generatedProfileSource: profileSourcePath,
|
||||
orchestrator: {
|
||||
ok: result?.ok === true,
|
||||
dispatchedCount: result?.result?.dispatcher?.dispatchedCount ?? null,
|
||||
pendingCount: result?.result?.supervisor?.pendingCount ?? null,
|
||||
notificationCount: result?.result?.watchdog?.notificationCount ?? null,
|
||||
executionOrder: result?.executionOrder ?? null,
|
||||
},
|
||||
};
|
||||
|
||||
process.stdout.write(`${JSON.stringify(payload, null, args.compact ? 0 : 2)}\n`);
|
||||
}
|
||||
|
||||
main();
|
||||
@@ -0,0 +1,212 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
import path from 'node:path';
|
||||
import process from 'node:process';
|
||||
import { spawnSync } from 'node:child_process';
|
||||
|
||||
const ROOT_DIR = path.resolve(import.meta.dirname, '..');
|
||||
const DEFAULT_STATE_PATH = path.join(ROOT_DIR, 'memory', 'watchdog-state.json');
|
||||
const DEFAULT_EVIDENCE_DIR = path.join(ROOT_DIR, 'state', 'long-task-watchdog');
|
||||
const DEFAULT_EVENT_DIR = path.join(ROOT_DIR, 'state', 'long-task-watchdog-events');
|
||||
const DEFAULT_QUEUE_DIR = path.join(ROOT_DIR, 'state', 'operator-notify-queue');
|
||||
const DEFAULT_SPOOL_DIR = path.join(ROOT_DIR, 'state', 'operator-notify-dispatch-spool');
|
||||
const DEFAULT_RECEIPT_DIR = path.join(ROOT_DIR, 'state', 'operator-notify-bridge-receipts');
|
||||
const DEFAULT_WATCHDOG_SCRIPT = path.join(ROOT_DIR, 'scripts', 'long_task_watchdog.mjs');
|
||||
const DEFAULT_DISPATCHER_SCRIPT = path.join(ROOT_DIR, 'scripts', 'operator_notify_dispatcher.mjs');
|
||||
const DEFAULT_SUPERVISOR_SCRIPT = path.join(ROOT_DIR, 'scripts', 'operator_notify_bridge_supervisor.mjs');
|
||||
|
||||
function parseArgs(argv) {
|
||||
const args = {
|
||||
state: DEFAULT_STATE_PATH,
|
||||
evidenceDir: DEFAULT_EVIDENCE_DIR,
|
||||
eventDir: DEFAULT_EVENT_DIR,
|
||||
queueDir: DEFAULT_QUEUE_DIR,
|
||||
spoolDir: DEFAULT_SPOOL_DIR,
|
||||
receiptDir: DEFAULT_RECEIPT_DIR,
|
||||
watchdogScript: DEFAULT_WATCHDOG_SCRIPT,
|
||||
dispatcherScript: DEFAULT_DISPATCHER_SCRIPT,
|
||||
supervisorScript: DEFAULT_SUPERVISOR_SCRIPT,
|
||||
senderCommand: null,
|
||||
senderMode: null,
|
||||
openclawBin: 'openclaw',
|
||||
now: null,
|
||||
compact: false,
|
||||
writeState: false,
|
||||
claim: false,
|
||||
dryRun: false,
|
||||
help: false,
|
||||
};
|
||||
|
||||
for (let i = 0; i < argv.length; i += 1) {
|
||||
const token = argv[i];
|
||||
if (token === '--compact') { args.compact = true; continue; }
|
||||
if (token === '--write-state') { args.writeState = true; continue; }
|
||||
if (token === '--claim') { args.claim = true; continue; }
|
||||
if (token === '--dry-run') { args.dryRun = true; continue; }
|
||||
if (token === '--help' || token === '-h') { args.help = true; continue; }
|
||||
|
||||
const pairs = [
|
||||
['--state', 'state'],
|
||||
['--evidence-dir', 'evidenceDir'],
|
||||
['--event-dir', 'eventDir'],
|
||||
['--queue-dir', 'queueDir'],
|
||||
['--spool-dir', 'spoolDir'],
|
||||
['--receipt-dir', 'receiptDir'],
|
||||
['--watchdog-script', 'watchdogScript'],
|
||||
['--dispatcher-script', 'dispatcherScript'],
|
||||
['--supervisor-script', 'supervisorScript'],
|
||||
['--sender-command', 'senderCommand'],
|
||||
['--sender-mode', 'senderMode'],
|
||||
['--openclaw-bin', 'openclawBin'],
|
||||
['--now', 'now'],
|
||||
];
|
||||
|
||||
let matched = false;
|
||||
for (const [flag, key] of pairs) {
|
||||
if (token === flag) {
|
||||
args[key] = argv[i + 1] ?? args[key];
|
||||
i += 1;
|
||||
matched = true;
|
||||
break;
|
||||
}
|
||||
if (token.startsWith(`${flag}=`)) {
|
||||
args[key] = token.slice(flag.length + 1) || args[key];
|
||||
matched = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (matched) continue;
|
||||
}
|
||||
|
||||
return args;
|
||||
}
|
||||
|
||||
function printHelp() {
|
||||
process.stdout.write([
|
||||
'Usage:',
|
||||
' node scripts/watchdog_auto_notify_orchestrator.mjs [--write-state] [--claim] [--dry-run] [--sender-command <shell>] [--sender-mode shim|openclaw-cli] [--openclaw-bin <path>] [--now <iso>] [--compact]',
|
||||
'',
|
||||
'Runs the full watchdog auto-notify chain in order:',
|
||||
' runner -> queue -> dispatcher -> bridge -> sender -> ack|blocked',
|
||||
'',
|
||||
'If --sender-mode is given and --sender-command is omitted, a sender-binding command is constructed automatically.',
|
||||
].join('\n') + '\n');
|
||||
}
|
||||
|
||||
function buildSenderCommand(args) {
|
||||
if (args.senderCommand) return args.senderCommand;
|
||||
if (!args.senderMode) return null;
|
||||
const cmd = [
|
||||
JSON.stringify(process.execPath),
|
||||
JSON.stringify(path.join(ROOT_DIR, 'scripts', 'operator_notify_sender_binding.mjs')),
|
||||
'--mode', JSON.stringify(args.senderMode),
|
||||
'--openclaw-bin', JSON.stringify(args.openclawBin),
|
||||
'--compact',
|
||||
];
|
||||
return cmd.join(' ');
|
||||
}
|
||||
|
||||
function runNodeScript(scriptPath, scriptArgs) {
|
||||
return spawnSync(process.execPath, [scriptPath, ...scriptArgs], {
|
||||
cwd: ROOT_DIR,
|
||||
encoding: 'utf8',
|
||||
});
|
||||
}
|
||||
|
||||
function parseJsonOutput(label, result) {
|
||||
const stdout = result.stdout ?? '';
|
||||
try {
|
||||
return stdout.trim() ? JSON.parse(stdout) : null;
|
||||
} catch (error) {
|
||||
throw new Error(`${label} emitted non-JSON stdout: ${error instanceof Error ? error.message : String(error)}`);
|
||||
}
|
||||
}
|
||||
|
||||
function ensureSuccess(label, result) {
|
||||
if (result.status !== 0) {
|
||||
throw new Error(`${label} failed with status ${result.status ?? 'null'}: ${(result.stderr ?? '').trim() || '(no stderr)'}`);
|
||||
}
|
||||
}
|
||||
|
||||
function main() {
|
||||
const args = parseArgs(process.argv.slice(2));
|
||||
if (args.help) {
|
||||
printHelp();
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
const senderCommand = buildSenderCommand(args);
|
||||
|
||||
try {
|
||||
const watchdogArgs = [
|
||||
'--state', path.resolve(args.state),
|
||||
'--evidence-dir', path.resolve(args.evidenceDir),
|
||||
'--event-dir', path.resolve(args.eventDir),
|
||||
'--notification-dir', path.resolve(args.queueDir),
|
||||
'--compact',
|
||||
];
|
||||
if (args.writeState) watchdogArgs.push('--write-state');
|
||||
if (args.now) watchdogArgs.push('--now', args.now);
|
||||
const watchdog = runNodeScript(path.resolve(args.watchdogScript), watchdogArgs);
|
||||
ensureSuccess('watchdog runner', watchdog);
|
||||
const watchdogPayload = parseJsonOutput('watchdog runner', watchdog);
|
||||
|
||||
const dispatcherArgs = [
|
||||
'--queue-dir', path.resolve(args.queueDir),
|
||||
'--spool-dir', path.resolve(args.spoolDir),
|
||||
'--compact',
|
||||
];
|
||||
if (args.claim) dispatcherArgs.push('--claim');
|
||||
if (args.now) dispatcherArgs.push('--now', args.now);
|
||||
const dispatcher = runNodeScript(path.resolve(args.dispatcherScript), dispatcherArgs);
|
||||
ensureSuccess('dispatcher', dispatcher);
|
||||
const dispatcherPayload = parseJsonOutput('dispatcher', dispatcher);
|
||||
|
||||
const supervisorArgs = [
|
||||
'--queue-dir', path.resolve(args.queueDir),
|
||||
'--spool-dir', path.resolve(args.spoolDir),
|
||||
'--receipt-dir', path.resolve(args.receiptDir),
|
||||
'--dispatcher-script', path.resolve(args.dispatcherScript),
|
||||
'--compact',
|
||||
];
|
||||
if (args.dryRun) supervisorArgs.push('--dry-run');
|
||||
if (senderCommand) supervisorArgs.push('--sender-command', senderCommand);
|
||||
if (args.now) supervisorArgs.push('--now', args.now);
|
||||
const supervisor = runNodeScript(path.resolve(args.supervisorScript), supervisorArgs);
|
||||
ensureSuccess('bridge supervisor', supervisor);
|
||||
const supervisorPayload = parseJsonOutput('bridge supervisor', supervisor);
|
||||
|
||||
const response = {
|
||||
ok: true,
|
||||
tool: 'watchdog_auto_notify_orchestrator',
|
||||
version: 'mvp-v1',
|
||||
now: args.now ?? null,
|
||||
executionOrder: [
|
||||
'runner',
|
||||
'queue',
|
||||
'dispatcher',
|
||||
'bridge',
|
||||
senderCommand ? 'sender' : 'sender_unconfigured',
|
||||
'ack_or_blocked_or_pending',
|
||||
],
|
||||
orchestration: {
|
||||
script: path.resolve(import.meta.filename),
|
||||
senderCommandConfigured: Boolean(senderCommand),
|
||||
senderMode: args.senderMode ?? null,
|
||||
dryRun: args.dryRun,
|
||||
},
|
||||
result: {
|
||||
watchdog: watchdogPayload?.result ?? null,
|
||||
dispatcher: dispatcherPayload?.result ?? null,
|
||||
supervisor: supervisorPayload?.result ?? null,
|
||||
},
|
||||
};
|
||||
|
||||
process.stdout.write(`${JSON.stringify(response, null, args.compact ? 0 : 2)}\n`);
|
||||
} catch (error) {
|
||||
process.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
main();
|
||||
Reference in New Issue
Block a user