From 55fe51483b0bd2a54d553b8d04246c5a0ce60074 Mon Sep 17 00:00:00 2001 From: Eve Date: Fri, 8 May 2026 16:30:32 +0800 Subject: [PATCH] refactor: extract orchestrator execution core --- plugins/reporting-governance/package.json | 2 +- .../src/adapters/index.mjs | 1 + .../src/adapters/orchestrator-cli.mjs | 137 ++---------------- .../src/adapters/orchestrator-execution.mjs | 104 +++++++++++++ .../src/adapters/orchestrator.mjs | 53 ++++--- .../test/orchestrator-execution.test.mjs | 75 ++++++++++ 6 files changed, 222 insertions(+), 150 deletions(-) create mode 100644 plugins/reporting-governance/src/adapters/orchestrator-execution.mjs create mode 100644 plugins/reporting-governance/test/orchestrator-execution.test.mjs diff --git a/plugins/reporting-governance/package.json b/plugins/reporting-governance/package.json index 1989ef4..afae1fa 100644 --- a/plugins/reporting-governance/package.json +++ b/plugins/reporting-governance/package.json @@ -27,7 +27,7 @@ "src/" ], "scripts": { - "test": "node --test test/package-structure.test.mjs test/policy-evaluator.test.mjs test/compatibility-preflight.test.mjs test/profile-artifact.test.mjs test/profile-generator.test.mjs test/decision-runner.test.mjs test/decision-store.test.mjs test/decision-store-runtime.integration.test.mjs test/governance-contract.integration.test.mjs test/watchdog-chain.integration.test.mjs test/runtime-integrated.integration.test.mjs test/exports-boundary.integration.test.mjs test/packed-consumer-install.smoke.test.mjs", + "test": "node --test test/package-structure.test.mjs test/policy-evaluator.test.mjs test/compatibility-preflight.test.mjs test/profile-artifact.test.mjs test/profile-generator.test.mjs test/decision-runner.test.mjs test/decision-store.test.mjs test/decision-store-runtime.integration.test.mjs test/governance-contract.integration.test.mjs test/watchdog-chain.integration.test.mjs test/orchestrator-execution.test.mjs test/runtime-integrated.integration.test.mjs test/exports-boundary.integration.test.mjs test/packed-consumer-install.smoke.test.mjs", "smoke": "node ./scripts/package-smoke.mjs --compact" }, "dependencies": { diff --git a/plugins/reporting-governance/src/adapters/index.mjs b/plugins/reporting-governance/src/adapters/index.mjs index 5dde3c6..4255d05 100644 --- a/plugins/reporting-governance/src/adapters/index.mjs +++ b/plugins/reporting-governance/src/adapters/index.mjs @@ -3,6 +3,7 @@ export { runDispatcherAdapter } from './dispatcher.mjs'; export { runBridgeSupervisorAdapter } from './bridge-supervisor.mjs'; export { runSenderBindingAdapter } from './sender-binding.mjs'; export { runOrchestratorAdapter } from './orchestrator.mjs'; +export { createDefaultOrchestratorExecutionArgs, buildSenderCommand, runOrchestratorExecution } from './orchestrator-execution.mjs'; export { parseOrchestratorCliArgs, formatOrchestratorHelp, runWatchdogAutoNotifyOrchestrator, runOrchestratorCli } from './orchestrator-cli.mjs'; export { createRuntimeBinding } from './runtime-binding.mjs'; diff --git a/plugins/reporting-governance/src/adapters/orchestrator-cli.mjs b/plugins/reporting-governance/src/adapters/orchestrator-cli.mjs index b0f9d9d..41304c6 100644 --- a/plugins/reporting-governance/src/adapters/orchestrator-cli.mjs +++ b/plugins/reporting-governance/src/adapters/orchestrator-cli.mjs @@ -1,41 +1,18 @@ import path from 'node:path'; import process from 'node:process'; import { fileURLToPath } from 'node:url'; -import { spawnSync } from 'node:child_process'; +import { + buildSenderCommand, + createDefaultOrchestratorExecutionArgs, + runOrchestratorExecution, +} from './orchestrator-execution.mjs'; const packageRoot = path.resolve(path.dirname(fileURLToPath(import.meta.url)), '..', '..'); -const DEFAULT_STATE_PATH = path.join(packageRoot, 'memory', 'watchdog-state.json'); -const DEFAULT_EVIDENCE_DIR = path.join(packageRoot, 'state', 'long-task-watchdog'); -const DEFAULT_EVENT_DIR = path.join(packageRoot, 'state', 'long-task-watchdog-events'); -const DEFAULT_QUEUE_DIR = path.join(packageRoot, 'state', 'operator-notify-queue'); -const DEFAULT_SPOOL_DIR = path.join(packageRoot, 'state', 'operator-notify-dispatch-spool'); -const DEFAULT_RECEIPT_DIR = path.join(packageRoot, 'state', 'operator-notify-bridge-receipts'); -const DEFAULT_WATCHDOG_SCRIPT = path.join(packageRoot, 'scripts', 'long_task_watchdog.mjs'); -const DEFAULT_DISPATCHER_SCRIPT = path.join(packageRoot, 'scripts', 'operator_notify_dispatcher.mjs'); -const DEFAULT_SUPERVISOR_SCRIPT = path.join(packageRoot, 'scripts', 'operator_notify_bridge_supervisor.mjs'); const DEFAULT_SENDER_BINDING_SCRIPT = path.join(packageRoot, 'scripts', 'operator_notify_sender_binding.mjs'); +const DEFAULT_ARGS = createDefaultOrchestratorExecutionArgs({ packageRoot }); export function parseOrchestratorCliArgs(argv) { - const args = { - state: DEFAULT_STATE_PATH, - evidenceDir: DEFAULT_EVIDENCE_DIR, - eventDir: DEFAULT_EVENT_DIR, - queueDir: DEFAULT_QUEUE_DIR, - spoolDir: DEFAULT_SPOOL_DIR, - receiptDir: DEFAULT_RECEIPT_DIR, - watchdogScript: DEFAULT_WATCHDOG_SCRIPT, - dispatcherScript: DEFAULT_DISPATCHER_SCRIPT, - supervisorScript: DEFAULT_SUPERVISOR_SCRIPT, - senderCommand: null, - senderMode: null, - openclawBin: 'openclaw', - now: null, - compact: false, - writeState: false, - claim: false, - dryRun: false, - help: false, - }; + const args = { ...DEFAULT_ARGS }; for (let i = 0; i < argv.length; i += 1) { const token = argv[i]; @@ -97,105 +74,13 @@ export function printOrchestratorHelp(options = {}) { process.stdout.write(`${formatOrchestratorHelp(options)}\n`); } -export function buildSenderCommand(args) { - if (args.senderCommand) return args.senderCommand; - if (!args.senderMode) return null; - const cmd = [ - JSON.stringify(process.execPath), - JSON.stringify(DEFAULT_SENDER_BINDING_SCRIPT), - '--mode', JSON.stringify(args.senderMode), - '--openclaw-bin', JSON.stringify(args.openclawBin), - '--compact', - ]; - return cmd.join(' '); -} - -export function runNodeScript(scriptPath, scriptArgs) { - return spawnSync(process.execPath, [scriptPath, ...scriptArgs], { - cwd: packageRoot, - encoding: 'utf8', - }); -} - -export function parseJsonOutput(label, result) { - const stdout = result.stdout ?? ''; - try { - return stdout.trim() ? JSON.parse(stdout) : null; - } catch (error) { - throw new Error(`${label} emitted non-JSON stdout: ${error instanceof Error ? error.message : String(error)}`); - } -} - -export function ensureSuccess(label, result) { - if (result.status !== 0) { - throw new Error(`${label} failed with status ${result.status ?? 'null'}: ${(result.stderr ?? '').trim() || '(no stderr)'}`); - } -} - export function runWatchdogAutoNotifyOrchestrator(args) { - const senderCommand = buildSenderCommand(args); - - const watchdogArgs = [ - '--state', path.resolve(args.state), - '--evidence-dir', path.resolve(args.evidenceDir), - '--event-dir', path.resolve(args.eventDir), - '--notification-dir', path.resolve(args.queueDir), - '--compact', - ]; - if (args.writeState) watchdogArgs.push('--write-state'); - if (args.now) watchdogArgs.push('--now', args.now); - const watchdog = runNodeScript(path.resolve(args.watchdogScript), watchdogArgs); - ensureSuccess('watchdog runner', watchdog); - const watchdogPayload = parseJsonOutput('watchdog runner', watchdog); - - const dispatcherArgs = [ - '--queue-dir', path.resolve(args.queueDir), - '--spool-dir', path.resolve(args.spoolDir), - '--compact', - ]; - if (args.claim) dispatcherArgs.push('--claim'); - if (args.now) dispatcherArgs.push('--now', args.now); - const dispatcher = runNodeScript(path.resolve(args.dispatcherScript), dispatcherArgs); - ensureSuccess('dispatcher', dispatcher); - const dispatcherPayload = parseJsonOutput('dispatcher', dispatcher); - - const supervisorArgs = [ - '--queue-dir', path.resolve(args.queueDir), - '--spool-dir', path.resolve(args.spoolDir), - '--receipt-dir', path.resolve(args.receiptDir), - '--dispatcher-script', path.resolve(args.dispatcherScript), - '--compact', - ]; - if (args.dryRun) supervisorArgs.push('--dry-run'); - if (senderCommand) supervisorArgs.push('--sender-command', senderCommand); - if (args.now) supervisorArgs.push('--now', args.now); - const supervisor = runNodeScript(path.resolve(args.supervisorScript), supervisorArgs); - ensureSuccess('bridge supervisor', supervisor); - const supervisorPayload = parseJsonOutput('bridge supervisor', supervisor); - + const payload = runOrchestratorExecution(args, { senderBindingScript: DEFAULT_SENDER_BINDING_SCRIPT }); return { - ok: true, - tool: 'watchdog_auto_notify_orchestrator', - version: 'mvp-v1', - now: args.now ?? null, - executionOrder: [ - 'runner', - 'queue', - 'dispatcher', - 'bridge', - senderCommand ? 'sender' : 'sender_unconfigured', - 'ack_or_blocked_or_pending', - ], + ...payload, orchestration: { script: path.resolve(import.meta.filename), - senderCommandConfigured: Boolean(senderCommand), - senderMode: args.senderMode ?? null, - dryRun: args.dryRun, - }, - result: { - watchdog: watchdogPayload?.result ?? null, - dispatcher: dispatcherPayload?.result ?? null, - supervisor: supervisorPayload?.result ?? null, + ...payload.orchestration, }, }; } @@ -222,4 +107,4 @@ export function main(argv = process.argv.slice(2), options = {}) { } } -export { packageRoot }; +export { buildSenderCommand, packageRoot }; diff --git a/plugins/reporting-governance/src/adapters/orchestrator-execution.mjs b/plugins/reporting-governance/src/adapters/orchestrator-execution.mjs new file mode 100644 index 0000000..6d0a05b --- /dev/null +++ b/plugins/reporting-governance/src/adapters/orchestrator-execution.mjs @@ -0,0 +1,104 @@ +import path from 'node:path'; +import process from 'node:process'; +import { runWatchdogAdapter } from './watchdog.mjs'; +import { runDispatcherAdapter } from './dispatcher.mjs'; +import { runBridgeSupervisorAdapter } from './bridge-supervisor.mjs'; + +export const DEFAULT_OPENCLAW_BIN = 'openclaw'; + +export function createDefaultOrchestratorExecutionArgs({ packageRoot }) { + return { + state: path.join(packageRoot, 'memory', 'watchdog-state.json'), + evidenceDir: path.join(packageRoot, 'state', 'long-task-watchdog'), + eventDir: path.join(packageRoot, 'state', 'long-task-watchdog-events'), + queueDir: path.join(packageRoot, 'state', 'operator-notify-queue'), + spoolDir: path.join(packageRoot, 'state', 'operator-notify-dispatch-spool'), + receiptDir: path.join(packageRoot, 'state', 'operator-notify-bridge-receipts'), + watchdogScript: path.join(packageRoot, 'scripts', 'long_task_watchdog.mjs'), + dispatcherScript: path.join(packageRoot, 'scripts', 'operator_notify_dispatcher.mjs'), + supervisorScript: path.join(packageRoot, 'scripts', 'operator_notify_bridge_supervisor.mjs'), + senderCommand: null, + senderMode: null, + openclawBin: DEFAULT_OPENCLAW_BIN, + now: null, + compact: false, + writeState: false, + claim: false, + dryRun: false, + help: false, + }; +} + +export function buildSenderCommand(args, { senderBindingScript }) { + if (args.senderCommand) return args.senderCommand; + if (!args.senderMode) return null; + const cmd = [ + JSON.stringify(process.execPath), + JSON.stringify(senderBindingScript), + '--mode', JSON.stringify(args.senderMode), + '--openclaw-bin', JSON.stringify(args.openclawBin), + '--compact', + ]; + return cmd.join(' '); +} + +export function runOrchestratorExecution(args, { senderBindingScript } = {}) { + const senderCommand = buildSenderCommand(args, { senderBindingScript }); + + const watchdogPayload = runWatchdogAdapter({ + scriptPath: path.resolve(args.watchdogScript), + state: args.state, + evidenceDir: args.evidenceDir, + eventDir: args.eventDir, + notificationDir: args.queueDir, + now: args.now, + compact: true, + writeState: args.writeState, + }); + + const dispatcherPayload = runDispatcherAdapter({ + scriptPath: path.resolve(args.dispatcherScript), + queueDir: args.queueDir, + spoolDir: args.spoolDir, + now: args.now, + compact: true, + claim: args.claim, + }); + + const supervisorPayload = runBridgeSupervisorAdapter({ + scriptPath: path.resolve(args.supervisorScript), + queueDir: args.queueDir, + spoolDir: args.spoolDir, + receiptDir: args.receiptDir, + dispatcherScript: path.resolve(args.dispatcherScript), + senderCommand, + now: args.now, + compact: true, + dryRun: args.dryRun, + }); + + return { + ok: true, + tool: 'watchdog_auto_notify_orchestrator', + version: 'mvp-v1', + now: args.now ?? null, + executionOrder: [ + 'runner', + 'queue', + 'dispatcher', + 'bridge', + senderCommand ? 'sender' : 'sender_unconfigured', + 'ack_or_blocked_or_pending', + ], + orchestration: { + senderCommandConfigured: Boolean(senderCommand), + senderMode: args.senderMode ?? null, + dryRun: args.dryRun, + }, + result: { + watchdog: watchdogPayload?.result ?? null, + dispatcher: dispatcherPayload?.result ?? null, + supervisor: supervisorPayload?.result ?? null, + }, + }; +} diff --git a/plugins/reporting-governance/src/adapters/orchestrator.mjs b/plugins/reporting-governance/src/adapters/orchestrator.mjs index 496ea1c..9b1f259 100644 --- a/plugins/reporting-governance/src/adapters/orchestrator.mjs +++ b/plugins/reporting-governance/src/adapters/orchestrator.mjs @@ -1,7 +1,7 @@ import path from 'node:path'; -import { ensureSuccess, parseJsonStdout, runNodeScript } from './_script-runner.mjs'; import { createRuntimeBinding, resolveScriptPath } from './runtime-binding.mjs'; import { loadDeploymentProfileArtifact, createDeploymentBindingContract, assertUseTimePathWithinRepoRoot } from '../storage/profile-artifact.mjs'; +import { runOrchestratorExecution } from './orchestrator-execution.mjs'; export function runOrchestratorAdapter({ scriptPath = null, @@ -42,7 +42,7 @@ export function runOrchestratorAdapter({ cwd: repoRootOverride, scripts: resolvedDeploymentBinding?.scripts, }); - const resolvedScriptPath = path.resolve(scriptPath ?? resolvedDeploymentBinding?.entrypoint ?? resolveScriptPath('orchestrator', { runtimeBinding: binding })); + const resolvedOrchestratorScriptPath = path.resolve(scriptPath ?? resolvedDeploymentBinding?.entrypoint ?? resolveScriptPath('orchestrator', { runtimeBinding: binding })); const resolvedWatchdogScript = path.resolve(watchdogScript ?? resolveScriptPath('watchdog', { runtimeBinding: binding })); const resolvedDispatcherScript = path.resolve(dispatcherScript ?? resolveScriptPath('dispatcher', { runtimeBinding: binding })); const resolvedSupervisorScript = path.resolve(supervisorScript ?? resolveScriptPath('bridgeSupervisor', { runtimeBinding: binding })); @@ -59,26 +59,33 @@ export function runOrchestratorAdapter({ ? path.resolve(receiptDir) : null; - const args = []; - if (state) args.push('--state', path.resolve(state)); - if (evidenceDir) args.push('--evidence-dir', path.resolve(evidenceDir)); - if (eventDir) args.push('--event-dir', path.resolve(eventDir)); - if (resolvedQueueDir) args.push('--queue-dir', resolvedQueueDir); - if (resolvedSpoolDir) args.push('--spool-dir', resolvedSpoolDir); - if (resolvedReceiptDir) args.push('--receipt-dir', resolvedReceiptDir); - if (resolvedWatchdogScript) args.push('--watchdog-script', resolvedWatchdogScript); - if (resolvedDispatcherScript) args.push('--dispatcher-script', resolvedDispatcherScript); - if (resolvedSupervisorScript) args.push('--supervisor-script', resolvedSupervisorScript); - if (senderCommand) args.push('--sender-command', senderCommand); - if (senderMode) args.push('--sender-mode', senderMode); - if (openclawBin) args.push('--openclaw-bin', openclawBin); - if (now) args.push('--now', now); - if (writeState) args.push('--write-state'); - if (claim) args.push('--claim'); - if (dryRun) args.push('--dry-run'); - if (compact) args.push('--compact'); + const payload = runOrchestratorExecution({ + state, + evidenceDir, + eventDir, + queueDir: resolvedQueueDir, + spoolDir: resolvedSpoolDir, + receiptDir: resolvedReceiptDir, + watchdogScript: resolvedWatchdogScript, + dispatcherScript: resolvedDispatcherScript, + supervisorScript: resolvedSupervisorScript, + senderCommand, + senderMode, + openclawBin, + now, + compact, + writeState, + claim, + dryRun, + }, { + senderBindingScript: resolveScriptPath('senderBinding', { runtimeBinding: binding }), + }); - const result = runNodeScript(resolvedScriptPath, args, { runtimeBinding: binding }); - ensureSuccess('orchestrator adapter', result); - return parseJsonStdout('orchestrator adapter', result); + return { + ...payload, + orchestration: { + script: resolvedOrchestratorScriptPath, + ...payload.orchestration, + }, + }; } diff --git a/plugins/reporting-governance/test/orchestrator-execution.test.mjs b/plugins/reporting-governance/test/orchestrator-execution.test.mjs new file mode 100644 index 0000000..abe50ab --- /dev/null +++ b/plugins/reporting-governance/test/orchestrator-execution.test.mjs @@ -0,0 +1,75 @@ +import test from 'node:test'; +import assert from 'node:assert/strict'; +import fs from 'node:fs'; +import os from 'node:os'; +import path from 'node:path'; + +import { runOrchestratorExecution } from '../src/adapters/orchestrator-execution.mjs'; + +const packageRoot = path.resolve(import.meta.dirname, '..'); + +function createFixtureRoot() { + return fs.mkdtempSync(path.join(os.tmpdir(), 'reporting-governance-orchestrator-core-')); +} + +function mkdirs(root, names) { + for (const name of names) fs.mkdirSync(path.join(root, name), { recursive: true }); +} + +function writeState(root) { + const statePath = path.join(root, 'watchdog-state.json'); + fs.writeFileSync(statePath, `${JSON.stringify({ + version: 1, + watchdogs: [{ + id: 'reporting-governance-plugin-watchdog', + task: 'reporting-governance plugin spec development', + status: 'active', + ownerSessionKey: 'agent:coder:main', + reportChannel: 'telegram', + reportTarget: '864811879', + intervalMinutes: 10, + lastMilestoneAt: '2026-05-07T08:00:00.000Z', + lastAlertAt: null, + }], + }, null, 2)}\n`, 'utf8'); + return statePath; +} + +test('execution core runs adapter chain directly without orchestrator script shell hop', () => { + const root = createFixtureRoot(); + try { + mkdirs(root, ['evidence', 'events', 'queue', 'spool', 'receipts']); + const statePath = writeState(root); + + const payload = runOrchestratorExecution({ + state: statePath, + evidenceDir: path.join(root, 'evidence'), + eventDir: path.join(root, 'events'), + queueDir: path.join(root, 'queue'), + spoolDir: path.join(root, 'spool'), + receiptDir: path.join(root, 'receipts'), + watchdogScript: path.join(packageRoot, 'scripts', 'long_task_watchdog.mjs'), + dispatcherScript: path.join(packageRoot, 'scripts', 'operator_notify_dispatcher.mjs'), + supervisorScript: path.join(packageRoot, 'scripts', 'operator_notify_bridge_supervisor.mjs'), + senderCommand: `node -e "process.stdout.write(JSON.stringify({state:'sent'}))"`, + now: '2026-05-07T08:20:00.000Z', + writeState: true, + }, { + senderBindingScript: path.join(packageRoot, 'scripts', 'operator_notify_sender_binding.mjs'), + }); + + assert.equal(payload.ok, true); + assert.deepEqual(payload.executionOrder, ['runner', 'queue', 'dispatcher', 'bridge', 'sender', 'ack_or_blocked_or_pending']); + assert.equal(payload.orchestration.senderCommandConfigured, true); + assert.equal(payload.result.watchdog.notificationCount, 1); + assert.equal(payload.result.dispatcher.dispatchedCount, 1); + assert.equal(payload.result.supervisor.ackedCount, 1); + + const queueFiles = fs.readdirSync(path.join(root, 'queue')).filter((name) => name.endsWith('.json')); + assert.equal(queueFiles.length, 1); + const queueItem = JSON.parse(fs.readFileSync(path.join(root, 'queue', queueFiles[0]), 'utf8')); + assert.equal(queueItem.status, 'acked'); + } finally { + fs.rmSync(root, { recursive: true, force: true }); + } +});