From 5f0b77d8d9962aa80cf73fd77fb4e5dcfbb2a583 Mon Sep 17 00:00:00 2001 From: Eve Date: Fri, 8 May 2026 20:20:24 +0800 Subject: [PATCH] refactor: shim repo operator notify dispatcher --- scripts/operator_notify_dispatcher.mjs | 3 + scripts/test_operator_notify_dispatcher.mjs | 267 ++++++++++++++++++++ 2 files changed, 270 insertions(+) create mode 100644 scripts/operator_notify_dispatcher.mjs create mode 100644 scripts/test_operator_notify_dispatcher.mjs diff --git a/scripts/operator_notify_dispatcher.mjs b/scripts/operator_notify_dispatcher.mjs new file mode 100644 index 0000000..8408fde --- /dev/null +++ b/scripts/operator_notify_dispatcher.mjs @@ -0,0 +1,3 @@ +#!/usr/bin/env node + +import '../plugins/reporting-governance/scripts/operator_notify_dispatcher.mjs'; diff --git a/scripts/test_operator_notify_dispatcher.mjs b/scripts/test_operator_notify_dispatcher.mjs new file mode 100644 index 0000000..cadc746 --- /dev/null +++ b/scripts/test_operator_notify_dispatcher.mjs @@ -0,0 +1,267 @@ +#!/usr/bin/env node + +import assert from 'node:assert/strict'; +import { mkdtempSync, mkdirSync, readFileSync, readdirSync, rmSync, writeFileSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import path from 'node:path'; +import process from 'node:process'; +import { spawnSync } from 'node:child_process'; + +const ROOT_DIR = path.resolve(import.meta.dirname, '..'); +const REPO_SHIM = path.join(ROOT_DIR, 'scripts', 'operator_notify_dispatcher.mjs'); +const PACKAGE_ENTRY = path.join(ROOT_DIR, 'plugins', 'reporting-governance', 'scripts', 'operator_notify_dispatcher.mjs'); + +function createFixtureRunner(scriptPath) { + const fixtureRoot = mkdtempSync(path.join(tmpdir(), 'operator-notify-dispatcher-test-')); + const queueDir = path.join(fixtureRoot, 'queue'); + const spoolDir = path.join(fixtureRoot, 'spool'); + mkdirSync(queueDir, { recursive: true }); + mkdirSync(spoolDir, { recursive: true }); + + function writeQueueItem(fileName, payload) { + const filePath = path.join(queueDir, fileName); + writeFileSync(filePath, `${JSON.stringify(payload, null, 2)}\n`, 'utf8'); + return filePath; + } + + function run(args = []) { + const result = spawnSync( + process.execPath, + [scriptPath, '--queue-dir', queueDir, '--spool-dir', spoolDir, ...args], + { cwd: ROOT_DIR, encoding: 'utf8' }, + ); + return { + status: result.status, + stdout: result.stdout ?? '', + stderr: result.stderr ?? '', + }; + } + + function readQueueItem(fileName) { + return JSON.parse(readFileSync(path.join(queueDir, fileName), 'utf8')); + } + + function readSpoolItem(fileName) { + return JSON.parse(readFileSync(path.join(spoolDir, fileName), 'utf8')); + } + + function listSpoolFiles() { + return readdirSync(spoolDir).sort(); + } + + function cleanup() { + rmSync(fixtureRoot, { recursive: true, force: true }); + } + + return { + queueDir, + spoolDir, + writeQueueItem, + run, + readQueueItem, + readSpoolItem, + listSpoolFiles, + cleanup, + }; +} + +const tests = []; +function test(name, fn) { tests.push({ name, fn }); } + +function printResult(prefix, name, detail = '') { + process.stdout.write(`${prefix} ${name}${detail ? ` ${detail}` : ''}\n`); +} + +function normalizePayload(payload) { + return { + ...payload, + queueDir: '', + spoolDir: '', + result: { + ...payload.result, + queueScanned: payload.result.queueScanned.map((item) => ({ ...item, path: '' })), + claimed: payload.result.claimed.map((item) => ({ ...item, path: '' })), + blocked: payload.result.blocked.map((item) => ({ ...item, path: '' })), + dispatched: payload.result.dispatched.map((item) => ({ ...item, path: '', spoolPath: '' })), + skipped: payload.result.skipped.map((item) => ({ ...item, path: '' })), + }, + }; +} + +function normalizeQueueItem(payload) { + return { + ...payload, + dispatch_result: payload?.dispatch_result + ? { + ...payload.dispatch_result, + spoolPath: payload.dispatch_result.spoolPath ? '' : payload.dispatch_result.spoolPath, + } + : payload.dispatch_result, + }; +} + +test('repo shim matches package entrypoint for same ready queue item', () => { + const shim = createFixtureRunner(REPO_SHIM); + const pkg = createFixtureRunner(PACKAGE_ENTRY); + const payload = { + notification_id: 'notify-ready-1', + kind: 'notify_operator', + status: 'pending', + operator_notice: { + channel: 'telegram', + target: '864811879', + message: 'watchdog overdue', + }, + dispatch_hint: { + tool: 'message.send', + channel: 'telegram', + target: '864811879', + message: 'watchdog overdue', + }, + governance: { + task_id: 'watchdog-1', + }, + evidence_refs: [], + blocked_gap: null, + }; + + try { + shim.writeQueueItem('ready.json', payload); + pkg.writeQueueItem('ready.json', payload); + + const shimResult = shim.run(['--compact', '--now', '2026-05-07T09:00:00.000Z']); + const pkgResult = pkg.run(['--compact', '--now', '2026-05-07T09:00:00.000Z']); + assert.equal(shimResult.status, 0, shimResult.stderr); + assert.equal(pkgResult.status, 0, pkgResult.stderr); + assert.deepEqual(normalizePayload(JSON.parse(shimResult.stdout)), normalizePayload(JSON.parse(pkgResult.stdout))); + assert.deepEqual(normalizeQueueItem(shim.readQueueItem('ready.json')), normalizeQueueItem(pkg.readQueueItem('ready.json'))); + } finally { + shim.cleanup(); + pkg.cleanup(); + } +}); + +test('dispatches pending ready queue item into spool and marks queue item dispatched', () => { + const runner = createFixtureRunner(PACKAGE_ENTRY); + try { + runner.writeQueueItem('ready.json', { + notification_id: 'notify-ready-1', + kind: 'notify_operator', + status: 'pending', + operator_notice: { + channel: 'telegram', + target: '864811879', + message: 'watchdog overdue', + }, + dispatch_hint: { + tool: 'message.send', + channel: 'telegram', + target: '864811879', + message: 'watchdog overdue', + }, + governance: { + task_id: 'watchdog-1', + }, + evidence_refs: [], + blocked_gap: null, + }); + + const result = runner.run(['--compact', '--now', '2026-05-07T09:00:00.000Z']); + assert.equal(result.status, 0, result.stderr); + const payload = JSON.parse(result.stdout); + assert.equal(payload.result.dispatchedCount, 1); + assert.equal(payload.result.blockedCount, 0); + + const queueItem = runner.readQueueItem('ready.json'); + assert.equal(queueItem.status, 'dispatched'); + assert.equal(queueItem.dispatch_result.mode, 'spool_only'); + assert.equal(queueItem.dispatch_result.delivery, 'handoff_pending_ack'); + + const spoolFiles = runner.listSpoolFiles(); + assert.equal(spoolFiles.length, 1); + const spoolItem = runner.readSpoolItem(spoolFiles[0]); + assert.equal(spoolItem.dispatch_contract.executor, 'message.send'); + assert.equal(spoolItem.dispatch_contract.channel, 'telegram'); + assert.equal(spoolItem.dispatch_contract.target, '864811879'); + } finally { + runner.cleanup(); + } +}); + +test('marks incomplete pending queue item blocked instead of pretending it was dispatched', () => { + const runner = createFixtureRunner(PACKAGE_ENTRY); + try { + runner.writeQueueItem('blocked.json', { + notification_id: 'notify-blocked-1', + kind: 'notify_operator', + status: 'pending', + operator_notice: { + channel: 'telegram', + target: null, + message: 'watchdog overdue', + }, + dispatch_hint: { + tool: 'message.send', + channel: 'telegram', + target: null, + message: 'watchdog overdue', + }, + blocked_gap: 'watchdog state does not define reportChannel/reportTarget, so dispatcher target is incomplete', + }); + + const result = runner.run(['--compact', '--now', '2026-05-07T09:00:00.000Z']); + assert.equal(result.status, 0, result.stderr); + const payload = JSON.parse(result.stdout); + assert.equal(payload.result.dispatchedCount, 0); + assert.equal(payload.result.blockedCount, 1); + assert.deepEqual(runner.listSpoolFiles(), []); + + const queueItem = runner.readQueueItem('blocked.json'); + assert.equal(queueItem.status, 'blocked'); + assert.match(queueItem.blocked_gap, /reportChannel\/reportTarget|channel\/target\/message/); + } finally { + runner.cleanup(); + } +}); + +test('ack command marks dispatched queue item acked with note', () => { + const runner = createFixtureRunner(PACKAGE_ENTRY); + try { + const queuePath = runner.writeQueueItem('acked.json', { + notification_id: 'notify-ack-1', + kind: 'notify_operator', + status: 'dispatched', + dispatch_result: { + mode: 'spool_only', + state: 'dispatched', + }, + }); + + const result = runner.run(['--compact', '--ack', queuePath, '--note', 'message.send delivered by upper runtime', '--now', '2026-05-07T09:05:00.000Z']); + assert.equal(result.status, 0, result.stderr); + const payload = JSON.parse(result.stdout); + assert.equal(payload.result.nextStatus, 'acked'); + + const queueItem = runner.readQueueItem('acked.json'); + assert.equal(queueItem.status, 'acked'); + assert.equal(queueItem.ack_note, 'message.send delivered by upper runtime'); + assert.equal(queueItem.dispatch_result.state, 'acked'); + } finally { + runner.cleanup(); + } +}); + +let failures = 0; +for (const { name, fn } of tests) { + try { + fn(); + printResult('ok', name); + } catch (error) { + failures += 1; + printResult('not ok', name, `- ${error instanceof Error ? error.message : String(error)}`); + } +} + +if (failures > 0) { + process.exit(1); +}