449 lines
13 KiB
JavaScript
449 lines
13 KiB
JavaScript
#!/usr/bin/env node
|
|
|
|
import fs from 'node:fs';
|
|
import path from 'node:path';
|
|
import process from 'node:process';
|
|
import crypto from 'node:crypto';
|
|
|
|
const ROOT_DIR = path.resolve(import.meta.dirname, '..');
|
|
const DEFAULT_QUEUE_DIR = path.join(ROOT_DIR, 'state', 'operator-notify-queue');
|
|
const DEFAULT_SPOOL_DIR = path.join(ROOT_DIR, 'state', 'operator-notify-dispatch-spool');
|
|
|
|
function parseArgs(argv) {
|
|
const args = {
|
|
queueDir: DEFAULT_QUEUE_DIR,
|
|
spoolDir: DEFAULT_SPOOL_DIR,
|
|
now: null,
|
|
compact: false,
|
|
ack: null,
|
|
block: null,
|
|
note: null,
|
|
claim: false,
|
|
help: false,
|
|
};
|
|
|
|
for (let i = 0; i < argv.length; i += 1) {
|
|
const token = argv[i];
|
|
if (token === '--compact') {
|
|
args.compact = true;
|
|
continue;
|
|
}
|
|
if (token === '--claim') {
|
|
args.claim = true;
|
|
continue;
|
|
}
|
|
if (token === '--help' || token === '-h') {
|
|
args.help = true;
|
|
continue;
|
|
}
|
|
if (token === '--queue-dir') {
|
|
args.queueDir = argv[i + 1] ?? args.queueDir;
|
|
i += 1;
|
|
continue;
|
|
}
|
|
if (token.startsWith('--queue-dir=')) {
|
|
args.queueDir = token.slice('--queue-dir='.length) || args.queueDir;
|
|
continue;
|
|
}
|
|
if (token === '--spool-dir') {
|
|
args.spoolDir = argv[i + 1] ?? args.spoolDir;
|
|
i += 1;
|
|
continue;
|
|
}
|
|
if (token.startsWith('--spool-dir=')) {
|
|
args.spoolDir = token.slice('--spool-dir='.length) || args.spoolDir;
|
|
continue;
|
|
}
|
|
if (token === '--now') {
|
|
args.now = argv[i + 1] ?? null;
|
|
i += 1;
|
|
continue;
|
|
}
|
|
if (token.startsWith('--now=')) {
|
|
args.now = token.slice('--now='.length) || null;
|
|
continue;
|
|
}
|
|
if (token === '--ack') {
|
|
args.ack = argv[i + 1] ?? null;
|
|
i += 1;
|
|
continue;
|
|
}
|
|
if (token.startsWith('--ack=')) {
|
|
args.ack = token.slice('--ack='.length) || null;
|
|
continue;
|
|
}
|
|
if (token === '--block') {
|
|
args.block = argv[i + 1] ?? null;
|
|
i += 1;
|
|
continue;
|
|
}
|
|
if (token.startsWith('--block=')) {
|
|
args.block = token.slice('--block='.length) || null;
|
|
continue;
|
|
}
|
|
if (token === '--note') {
|
|
args.note = argv[i + 1] ?? null;
|
|
i += 1;
|
|
continue;
|
|
}
|
|
if (token.startsWith('--note=')) {
|
|
args.note = token.slice('--note='.length) || null;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
return args;
|
|
}
|
|
|
|
function printHelp() {
|
|
process.stdout.write([
|
|
'Usage:',
|
|
' node scripts/operator_notify_dispatcher.mjs [--queue-dir <path>] [--spool-dir <path>] [--now <iso>] [--compact] [--claim]',
|
|
' node scripts/operator_notify_dispatcher.mjs --ack <queue-item-path|notification_id> [--queue-dir <path>] [--now <iso>] [--note <text>] [--compact]',
|
|
' node scripts/operator_notify_dispatcher.mjs --block <queue-item-path|notification_id> [--queue-dir <path>] [--now <iso>] [--note <reason>] [--compact]',
|
|
'',
|
|
'File-backed dispatcher for state/operator-notify-queue.',
|
|
'Default dispatch mode is spool/handoff only: it never claims to call message.send directly.',
|
|
].join('\n') + '\n');
|
|
}
|
|
|
|
function ensureDir(dirPath) {
|
|
fs.mkdirSync(dirPath, { recursive: true });
|
|
}
|
|
|
|
function parseTime(value) {
|
|
if (typeof value !== 'string' || value.length === 0) return null;
|
|
const timestamp = Date.parse(value);
|
|
return Number.isNaN(timestamp) ? null : timestamp;
|
|
}
|
|
|
|
function toIso(value) {
|
|
return new Date(value).toISOString();
|
|
}
|
|
|
|
function safeSlug(value) {
|
|
return String(value || 'item')
|
|
.replace(/[^a-zA-Z0-9._-]+/g, '-')
|
|
.replace(/^-+|-+$/g, '')
|
|
.slice(0, 120) || 'item';
|
|
}
|
|
|
|
function listJsonFiles(dirPath) {
|
|
if (!fs.existsSync(dirPath)) return [];
|
|
return fs.readdirSync(dirPath)
|
|
.filter((name) => name.endsWith('.json'))
|
|
.sort()
|
|
.map((name) => path.join(dirPath, name));
|
|
}
|
|
|
|
function readJson(filePath) {
|
|
return JSON.parse(fs.readFileSync(filePath, 'utf8'));
|
|
}
|
|
|
|
function writeJson(filePath, payload) {
|
|
ensureDir(path.dirname(filePath));
|
|
fs.writeFileSync(filePath, `${JSON.stringify(payload, null, 2)}\n`, 'utf8');
|
|
}
|
|
|
|
function isPendingLike(status) {
|
|
return status === 'pending' || status === 'claimed';
|
|
}
|
|
|
|
function resolveChannel(payload) {
|
|
return payload?.operator_notice?.channel ?? payload?.dispatch_hint?.channel ?? null;
|
|
}
|
|
|
|
function resolveTarget(payload) {
|
|
return payload?.operator_notice?.target ?? payload?.dispatch_hint?.target ?? null;
|
|
}
|
|
|
|
function resolveMessage(payload) {
|
|
return payload?.operator_notice?.message ?? payload?.dispatch_hint?.message ?? null;
|
|
}
|
|
|
|
function markBlocked(payload, nowIso, reason) {
|
|
return {
|
|
...payload,
|
|
status: 'blocked',
|
|
blocked_gap: reason,
|
|
blocked_at: nowIso,
|
|
dispatch_result: {
|
|
mode: 'spool_only',
|
|
state: 'blocked',
|
|
blockedAt: nowIso,
|
|
reason,
|
|
},
|
|
};
|
|
}
|
|
|
|
function claimPayload(payload, nowIso) {
|
|
return {
|
|
...payload,
|
|
status: 'claimed',
|
|
dispatch_claimed_at: nowIso,
|
|
dispatch_result: {
|
|
...(payload.dispatch_result ?? {}),
|
|
mode: 'spool_only',
|
|
state: 'claimed',
|
|
claimedAt: nowIso,
|
|
},
|
|
};
|
|
}
|
|
|
|
function buildSpoolPayload(payload, queuePath, nowIso) {
|
|
const channel = resolveChannel(payload);
|
|
const target = resolveTarget(payload);
|
|
const message = resolveMessage(payload);
|
|
const notificationId = payload.notification_id ?? crypto.randomUUID();
|
|
return {
|
|
handoff_version: '1.0.0',
|
|
created_at: nowIso,
|
|
queue_item_path: queuePath,
|
|
notification_id: notificationId,
|
|
dispatch_contract: {
|
|
executor: 'message.send',
|
|
mode: 'manual_handoff',
|
|
requires_human_or_upper_runtime: true,
|
|
channel,
|
|
target,
|
|
message,
|
|
},
|
|
governance: payload.governance ?? null,
|
|
evidence_refs: payload.evidence_refs ?? [],
|
|
queue_status_before_dispatch: payload.status ?? 'pending',
|
|
ack_instructions: {
|
|
command: `node scripts/operator_notify_dispatcher.mjs --ack ${JSON.stringify(queuePath)} --note ${JSON.stringify('message.send delivered by upper runtime')}`,
|
|
accepted_status_transition: 'dispatched -> acked',
|
|
},
|
|
block_instructions: {
|
|
command: `node scripts/operator_notify_dispatcher.mjs --block ${JSON.stringify(queuePath)} --note ${JSON.stringify('upper runtime could not safely deliver message.send')}`,
|
|
accepted_status_transition: 'dispatched -> blocked',
|
|
},
|
|
};
|
|
}
|
|
|
|
function dispatchPayload(payload, nowIso, spoolPath) {
|
|
return {
|
|
...payload,
|
|
status: 'dispatched',
|
|
dispatched_at: nowIso,
|
|
blocked_gap: null,
|
|
dispatch_result: {
|
|
mode: 'spool_only',
|
|
state: 'dispatched',
|
|
dispatchedAt: nowIso,
|
|
spoolPath,
|
|
executor: 'message.send',
|
|
delivery: 'handoff_pending_ack',
|
|
ackRequired: true,
|
|
},
|
|
};
|
|
}
|
|
|
|
function findQueueItem(queueDir, targetValue) {
|
|
if (!targetValue) return null;
|
|
const directPath = path.isAbsolute(targetValue) ? targetValue : path.join(queueDir, targetValue);
|
|
if (fs.existsSync(directPath)) return directPath;
|
|
for (const filePath of listJsonFiles(queueDir)) {
|
|
try {
|
|
const payload = readJson(filePath);
|
|
if (payload?.notification_id === targetValue) return filePath;
|
|
} catch {}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
function transitionItem(queueDir, targetValue, nowIso, transition, note) {
|
|
const queuePath = findQueueItem(queueDir, targetValue);
|
|
if (!queuePath) {
|
|
return {
|
|
ok: false,
|
|
error: `queue item not found for transition target: ${targetValue}`,
|
|
};
|
|
}
|
|
|
|
const payload = readJson(queuePath);
|
|
const previousStatus = payload?.status ?? null;
|
|
|
|
if (transition === 'ack') {
|
|
const nextPayload = {
|
|
...payload,
|
|
status: 'acked',
|
|
acked_at: nowIso,
|
|
ack_note: note ?? null,
|
|
dispatch_result: {
|
|
...(payload.dispatch_result ?? {}),
|
|
state: 'acked',
|
|
ackedAt: nowIso,
|
|
ackNote: note ?? null,
|
|
},
|
|
};
|
|
writeJson(queuePath, nextPayload);
|
|
return {
|
|
ok: true,
|
|
mode: 'ack',
|
|
queuePath,
|
|
notificationId: payload.notification_id ?? null,
|
|
previousStatus,
|
|
nextStatus: 'acked',
|
|
};
|
|
}
|
|
|
|
if (transition === 'block') {
|
|
const reason = note ?? payload.blocked_gap ?? 'blocked by upper runtime';
|
|
const nextPayload = {
|
|
...payload,
|
|
status: 'blocked',
|
|
blocked_at: nowIso,
|
|
blocked_gap: reason,
|
|
dispatch_result: {
|
|
...(payload.dispatch_result ?? {}),
|
|
state: 'blocked',
|
|
blockedAt: nowIso,
|
|
reason,
|
|
},
|
|
};
|
|
writeJson(queuePath, nextPayload);
|
|
return {
|
|
ok: true,
|
|
mode: 'block',
|
|
queuePath,
|
|
notificationId: payload.notification_id ?? null,
|
|
previousStatus,
|
|
nextStatus: 'blocked',
|
|
reason,
|
|
};
|
|
}
|
|
|
|
return {
|
|
ok: false,
|
|
error: `unsupported transition: ${transition}`,
|
|
};
|
|
}
|
|
|
|
function ackItem(queueDir, ackValue, nowIso, note) {
|
|
return transitionItem(queueDir, ackValue, nowIso, 'ack', note);
|
|
}
|
|
|
|
function blockItem(queueDir, targetValue, nowIso, note) {
|
|
return transitionItem(queueDir, targetValue, nowIso, 'block', note);
|
|
}
|
|
|
|
function main() {
|
|
const args = parseArgs(process.argv.slice(2));
|
|
if (args.help) {
|
|
printHelp();
|
|
process.exit(0);
|
|
}
|
|
|
|
const nowMs = args.now ? parseTime(args.now) : Date.now();
|
|
if (nowMs === null) {
|
|
process.stderr.write('Invalid --now value\n');
|
|
process.exit(1);
|
|
}
|
|
const nowIso = toIso(nowMs);
|
|
|
|
if (args.ack) {
|
|
const ackResult = ackItem(path.resolve(args.queueDir), args.ack, nowIso, args.note);
|
|
if (!ackResult.ok) {
|
|
process.stderr.write(`${ackResult.error}\n`);
|
|
process.exit(1);
|
|
}
|
|
process.stdout.write(`${JSON.stringify({ ok: true, tool: 'operator_notify_dispatcher', mode: 'ack', now: nowIso, result: ackResult }, null, args.compact ? 0 : 2)}\n`);
|
|
return;
|
|
}
|
|
|
|
if (args.block) {
|
|
const blockResult = blockItem(path.resolve(args.queueDir), args.block, nowIso, args.note);
|
|
if (!blockResult.ok) {
|
|
process.stderr.write(`${blockResult.error}\n`);
|
|
process.exit(1);
|
|
}
|
|
process.stdout.write(`${JSON.stringify({ ok: true, tool: 'operator_notify_dispatcher', mode: 'block', now: nowIso, result: blockResult }, null, args.compact ? 0 : 2)}\n`);
|
|
return;
|
|
}
|
|
|
|
const queueDir = path.resolve(args.queueDir);
|
|
const spoolDir = path.resolve(args.spoolDir);
|
|
ensureDir(queueDir);
|
|
ensureDir(spoolDir);
|
|
|
|
const files = listJsonFiles(queueDir);
|
|
const queueScanned = [];
|
|
const blocked = [];
|
|
const dispatched = [];
|
|
const skipped = [];
|
|
const claimed = [];
|
|
|
|
for (const filePath of files) {
|
|
const payload = readJson(filePath);
|
|
queueScanned.push({ path: filePath, notificationId: payload.notification_id ?? null, status: payload.status ?? null });
|
|
|
|
if (!isPendingLike(payload.status)) {
|
|
skipped.push({ path: filePath, notificationId: payload.notification_id ?? null, status: payload.status ?? null, reason: 'not_pending_like' });
|
|
continue;
|
|
}
|
|
|
|
const channel = resolveChannel(payload);
|
|
const target = resolveTarget(payload);
|
|
const message = resolveMessage(payload);
|
|
|
|
if (!channel || !target || !message) {
|
|
const reason = payload.blocked_gap || 'queue item missing channel/target/message for safe dispatch';
|
|
const nextPayload = markBlocked(payload, nowIso, reason);
|
|
writeJson(filePath, nextPayload);
|
|
blocked.push({ path: filePath, notificationId: payload.notification_id ?? null, status: 'blocked', reason });
|
|
continue;
|
|
}
|
|
|
|
let workingPayload = payload;
|
|
if (args.claim && payload.status === 'pending') {
|
|
workingPayload = claimPayload(payload, nowIso);
|
|
writeJson(filePath, workingPayload);
|
|
claimed.push({ path: filePath, notificationId: payload.notification_id ?? null, status: 'claimed' });
|
|
}
|
|
|
|
const spoolPayload = buildSpoolPayload(workingPayload, filePath, nowIso);
|
|
const spoolName = `${safeSlug(payload.notification_id ?? path.basename(filePath, '.json'))}-dispatch.json`;
|
|
const spoolPath = path.join(spoolDir, spoolName);
|
|
writeJson(spoolPath, spoolPayload);
|
|
|
|
const nextPayload = dispatchPayload(workingPayload, nowIso, spoolPath);
|
|
writeJson(filePath, nextPayload);
|
|
dispatched.push({
|
|
path: filePath,
|
|
notificationId: payload.notification_id ?? null,
|
|
status: 'dispatched',
|
|
spoolPath,
|
|
delivery: 'handoff_pending_ack',
|
|
});
|
|
}
|
|
|
|
const response = {
|
|
ok: true,
|
|
tool: 'operator_notify_dispatcher',
|
|
version: 'mvp-v1',
|
|
mode: 'dispatch',
|
|
dispatchMode: 'spool_only',
|
|
queueDir,
|
|
spoolDir,
|
|
now: nowIso,
|
|
result: {
|
|
scannedCount: queueScanned.length,
|
|
blockedCount: blocked.length,
|
|
dispatchedCount: dispatched.length,
|
|
claimedCount: claimed.length,
|
|
skippedCount: skipped.length,
|
|
queueScanned,
|
|
claimed,
|
|
blocked,
|
|
dispatched,
|
|
skipped,
|
|
},
|
|
};
|
|
|
|
process.stdout.write(`${JSON.stringify(response, null, args.compact ? 0 : 2)}\n`);
|
|
}
|
|
|
|
main();
|