refactor: shim repo operator notify dispatcher

This commit is contained in:
Eve
2026-05-08 20:20:24 +08:00
parent 61a32b0857
commit 5f0b77d8d9
2 changed files with 270 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
#!/usr/bin/env node
import '../plugins/reporting-governance/scripts/operator_notify_dispatcher.mjs';

View File

@@ -0,0 +1,267 @@
#!/usr/bin/env node
import assert from 'node:assert/strict';
import { mkdtempSync, mkdirSync, readFileSync, readdirSync, rmSync, writeFileSync } from 'node:fs';
import { tmpdir } from 'node:os';
import path from 'node:path';
import process from 'node:process';
import { spawnSync } from 'node:child_process';
const ROOT_DIR = path.resolve(import.meta.dirname, '..');
const REPO_SHIM = path.join(ROOT_DIR, 'scripts', 'operator_notify_dispatcher.mjs');
const PACKAGE_ENTRY = path.join(ROOT_DIR, 'plugins', 'reporting-governance', 'scripts', 'operator_notify_dispatcher.mjs');
function createFixtureRunner(scriptPath) {
const fixtureRoot = mkdtempSync(path.join(tmpdir(), 'operator-notify-dispatcher-test-'));
const queueDir = path.join(fixtureRoot, 'queue');
const spoolDir = path.join(fixtureRoot, 'spool');
mkdirSync(queueDir, { recursive: true });
mkdirSync(spoolDir, { recursive: true });
function writeQueueItem(fileName, payload) {
const filePath = path.join(queueDir, fileName);
writeFileSync(filePath, `${JSON.stringify(payload, null, 2)}\n`, 'utf8');
return filePath;
}
function run(args = []) {
const result = spawnSync(
process.execPath,
[scriptPath, '--queue-dir', queueDir, '--spool-dir', spoolDir, ...args],
{ cwd: ROOT_DIR, encoding: 'utf8' },
);
return {
status: result.status,
stdout: result.stdout ?? '',
stderr: result.stderr ?? '',
};
}
function readQueueItem(fileName) {
return JSON.parse(readFileSync(path.join(queueDir, fileName), 'utf8'));
}
function readSpoolItem(fileName) {
return JSON.parse(readFileSync(path.join(spoolDir, fileName), 'utf8'));
}
function listSpoolFiles() {
return readdirSync(spoolDir).sort();
}
function cleanup() {
rmSync(fixtureRoot, { recursive: true, force: true });
}
return {
queueDir,
spoolDir,
writeQueueItem,
run,
readQueueItem,
readSpoolItem,
listSpoolFiles,
cleanup,
};
}
const tests = [];
function test(name, fn) { tests.push({ name, fn }); }
function printResult(prefix, name, detail = '') {
process.stdout.write(`${prefix} ${name}${detail ? ` ${detail}` : ''}\n`);
}
function normalizePayload(payload) {
return {
...payload,
queueDir: '<queue>',
spoolDir: '<spool>',
result: {
...payload.result,
queueScanned: payload.result.queueScanned.map((item) => ({ ...item, path: '<queue-item>' })),
claimed: payload.result.claimed.map((item) => ({ ...item, path: '<queue-item>' })),
blocked: payload.result.blocked.map((item) => ({ ...item, path: '<queue-item>' })),
dispatched: payload.result.dispatched.map((item) => ({ ...item, path: '<queue-item>', spoolPath: '<spool-item>' })),
skipped: payload.result.skipped.map((item) => ({ ...item, path: '<queue-item>' })),
},
};
}
function normalizeQueueItem(payload) {
return {
...payload,
dispatch_result: payload?.dispatch_result
? {
...payload.dispatch_result,
spoolPath: payload.dispatch_result.spoolPath ? '<spool-item>' : payload.dispatch_result.spoolPath,
}
: payload.dispatch_result,
};
}
test('repo shim matches package entrypoint for same ready queue item', () => {
const shim = createFixtureRunner(REPO_SHIM);
const pkg = createFixtureRunner(PACKAGE_ENTRY);
const payload = {
notification_id: 'notify-ready-1',
kind: 'notify_operator',
status: 'pending',
operator_notice: {
channel: 'telegram',
target: '864811879',
message: 'watchdog overdue',
},
dispatch_hint: {
tool: 'message.send',
channel: 'telegram',
target: '864811879',
message: 'watchdog overdue',
},
governance: {
task_id: 'watchdog-1',
},
evidence_refs: [],
blocked_gap: null,
};
try {
shim.writeQueueItem('ready.json', payload);
pkg.writeQueueItem('ready.json', payload);
const shimResult = shim.run(['--compact', '--now', '2026-05-07T09:00:00.000Z']);
const pkgResult = pkg.run(['--compact', '--now', '2026-05-07T09:00:00.000Z']);
assert.equal(shimResult.status, 0, shimResult.stderr);
assert.equal(pkgResult.status, 0, pkgResult.stderr);
assert.deepEqual(normalizePayload(JSON.parse(shimResult.stdout)), normalizePayload(JSON.parse(pkgResult.stdout)));
assert.deepEqual(normalizeQueueItem(shim.readQueueItem('ready.json')), normalizeQueueItem(pkg.readQueueItem('ready.json')));
} finally {
shim.cleanup();
pkg.cleanup();
}
});
test('dispatches pending ready queue item into spool and marks queue item dispatched', () => {
const runner = createFixtureRunner(PACKAGE_ENTRY);
try {
runner.writeQueueItem('ready.json', {
notification_id: 'notify-ready-1',
kind: 'notify_operator',
status: 'pending',
operator_notice: {
channel: 'telegram',
target: '864811879',
message: 'watchdog overdue',
},
dispatch_hint: {
tool: 'message.send',
channel: 'telegram',
target: '864811879',
message: 'watchdog overdue',
},
governance: {
task_id: 'watchdog-1',
},
evidence_refs: [],
blocked_gap: null,
});
const result = runner.run(['--compact', '--now', '2026-05-07T09:00:00.000Z']);
assert.equal(result.status, 0, result.stderr);
const payload = JSON.parse(result.stdout);
assert.equal(payload.result.dispatchedCount, 1);
assert.equal(payload.result.blockedCount, 0);
const queueItem = runner.readQueueItem('ready.json');
assert.equal(queueItem.status, 'dispatched');
assert.equal(queueItem.dispatch_result.mode, 'spool_only');
assert.equal(queueItem.dispatch_result.delivery, 'handoff_pending_ack');
const spoolFiles = runner.listSpoolFiles();
assert.equal(spoolFiles.length, 1);
const spoolItem = runner.readSpoolItem(spoolFiles[0]);
assert.equal(spoolItem.dispatch_contract.executor, 'message.send');
assert.equal(spoolItem.dispatch_contract.channel, 'telegram');
assert.equal(spoolItem.dispatch_contract.target, '864811879');
} finally {
runner.cleanup();
}
});
test('marks incomplete pending queue item blocked instead of pretending it was dispatched', () => {
const runner = createFixtureRunner(PACKAGE_ENTRY);
try {
runner.writeQueueItem('blocked.json', {
notification_id: 'notify-blocked-1',
kind: 'notify_operator',
status: 'pending',
operator_notice: {
channel: 'telegram',
target: null,
message: 'watchdog overdue',
},
dispatch_hint: {
tool: 'message.send',
channel: 'telegram',
target: null,
message: 'watchdog overdue',
},
blocked_gap: 'watchdog state does not define reportChannel/reportTarget, so dispatcher target is incomplete',
});
const result = runner.run(['--compact', '--now', '2026-05-07T09:00:00.000Z']);
assert.equal(result.status, 0, result.stderr);
const payload = JSON.parse(result.stdout);
assert.equal(payload.result.dispatchedCount, 0);
assert.equal(payload.result.blockedCount, 1);
assert.deepEqual(runner.listSpoolFiles(), []);
const queueItem = runner.readQueueItem('blocked.json');
assert.equal(queueItem.status, 'blocked');
assert.match(queueItem.blocked_gap, /reportChannel\/reportTarget|channel\/target\/message/);
} finally {
runner.cleanup();
}
});
test('ack command marks dispatched queue item acked with note', () => {
const runner = createFixtureRunner(PACKAGE_ENTRY);
try {
const queuePath = runner.writeQueueItem('acked.json', {
notification_id: 'notify-ack-1',
kind: 'notify_operator',
status: 'dispatched',
dispatch_result: {
mode: 'spool_only',
state: 'dispatched',
},
});
const result = runner.run(['--compact', '--ack', queuePath, '--note', 'message.send delivered by upper runtime', '--now', '2026-05-07T09:05:00.000Z']);
assert.equal(result.status, 0, result.stderr);
const payload = JSON.parse(result.stdout);
assert.equal(payload.result.nextStatus, 'acked');
const queueItem = runner.readQueueItem('acked.json');
assert.equal(queueItem.status, 'acked');
assert.equal(queueItem.ack_note, 'message.send delivered by upper runtime');
assert.equal(queueItem.dispatch_result.state, 'acked');
} finally {
runner.cleanup();
}
});
let failures = 0;
for (const { name, fn } of tests) {
try {
fn();
printResult('ok', name);
} catch (error) {
failures += 1;
printResult('not ok', name, `- ${error instanceof Error ? error.message : String(error)}`);
}
}
if (failures > 0) {
process.exit(1);
}