refactor: extract orchestrator execution core

This commit is contained in:
Eve
2026-05-08 16:30:32 +08:00
parent 72397df976
commit 55fe51483b
6 changed files with 222 additions and 150 deletions

View File

@@ -27,7 +27,7 @@
"src/" "src/"
], ],
"scripts": { "scripts": {
"test": "node --test test/package-structure.test.mjs test/policy-evaluator.test.mjs test/compatibility-preflight.test.mjs test/profile-artifact.test.mjs test/profile-generator.test.mjs test/decision-runner.test.mjs test/decision-store.test.mjs test/decision-store-runtime.integration.test.mjs test/governance-contract.integration.test.mjs test/watchdog-chain.integration.test.mjs test/runtime-integrated.integration.test.mjs test/exports-boundary.integration.test.mjs test/packed-consumer-install.smoke.test.mjs", "test": "node --test test/package-structure.test.mjs test/policy-evaluator.test.mjs test/compatibility-preflight.test.mjs test/profile-artifact.test.mjs test/profile-generator.test.mjs test/decision-runner.test.mjs test/decision-store.test.mjs test/decision-store-runtime.integration.test.mjs test/governance-contract.integration.test.mjs test/watchdog-chain.integration.test.mjs test/orchestrator-execution.test.mjs test/runtime-integrated.integration.test.mjs test/exports-boundary.integration.test.mjs test/packed-consumer-install.smoke.test.mjs",
"smoke": "node ./scripts/package-smoke.mjs --compact" "smoke": "node ./scripts/package-smoke.mjs --compact"
}, },
"dependencies": { "dependencies": {

View File

@@ -3,6 +3,7 @@ export { runDispatcherAdapter } from './dispatcher.mjs';
export { runBridgeSupervisorAdapter } from './bridge-supervisor.mjs'; export { runBridgeSupervisorAdapter } from './bridge-supervisor.mjs';
export { runSenderBindingAdapter } from './sender-binding.mjs'; export { runSenderBindingAdapter } from './sender-binding.mjs';
export { runOrchestratorAdapter } from './orchestrator.mjs'; export { runOrchestratorAdapter } from './orchestrator.mjs';
export { createDefaultOrchestratorExecutionArgs, buildSenderCommand, runOrchestratorExecution } from './orchestrator-execution.mjs';
export { parseOrchestratorCliArgs, formatOrchestratorHelp, runWatchdogAutoNotifyOrchestrator, runOrchestratorCli } from './orchestrator-cli.mjs'; export { parseOrchestratorCliArgs, formatOrchestratorHelp, runWatchdogAutoNotifyOrchestrator, runOrchestratorCli } from './orchestrator-cli.mjs';
export { createRuntimeBinding } from './runtime-binding.mjs'; export { createRuntimeBinding } from './runtime-binding.mjs';

View File

@@ -1,41 +1,18 @@
import path from 'node:path'; import path from 'node:path';
import process from 'node:process'; import process from 'node:process';
import { fileURLToPath } from 'node:url'; import { fileURLToPath } from 'node:url';
import { spawnSync } from 'node:child_process'; import {
buildSenderCommand,
createDefaultOrchestratorExecutionArgs,
runOrchestratorExecution,
} from './orchestrator-execution.mjs';
const packageRoot = path.resolve(path.dirname(fileURLToPath(import.meta.url)), '..', '..'); const packageRoot = path.resolve(path.dirname(fileURLToPath(import.meta.url)), '..', '..');
const DEFAULT_STATE_PATH = path.join(packageRoot, 'memory', 'watchdog-state.json');
const DEFAULT_EVIDENCE_DIR = path.join(packageRoot, 'state', 'long-task-watchdog');
const DEFAULT_EVENT_DIR = path.join(packageRoot, 'state', 'long-task-watchdog-events');
const DEFAULT_QUEUE_DIR = path.join(packageRoot, 'state', 'operator-notify-queue');
const DEFAULT_SPOOL_DIR = path.join(packageRoot, 'state', 'operator-notify-dispatch-spool');
const DEFAULT_RECEIPT_DIR = path.join(packageRoot, 'state', 'operator-notify-bridge-receipts');
const DEFAULT_WATCHDOG_SCRIPT = path.join(packageRoot, 'scripts', 'long_task_watchdog.mjs');
const DEFAULT_DISPATCHER_SCRIPT = path.join(packageRoot, 'scripts', 'operator_notify_dispatcher.mjs');
const DEFAULT_SUPERVISOR_SCRIPT = path.join(packageRoot, 'scripts', 'operator_notify_bridge_supervisor.mjs');
const DEFAULT_SENDER_BINDING_SCRIPT = path.join(packageRoot, 'scripts', 'operator_notify_sender_binding.mjs'); const DEFAULT_SENDER_BINDING_SCRIPT = path.join(packageRoot, 'scripts', 'operator_notify_sender_binding.mjs');
const DEFAULT_ARGS = createDefaultOrchestratorExecutionArgs({ packageRoot });
export function parseOrchestratorCliArgs(argv) { export function parseOrchestratorCliArgs(argv) {
const args = { const args = { ...DEFAULT_ARGS };
state: DEFAULT_STATE_PATH,
evidenceDir: DEFAULT_EVIDENCE_DIR,
eventDir: DEFAULT_EVENT_DIR,
queueDir: DEFAULT_QUEUE_DIR,
spoolDir: DEFAULT_SPOOL_DIR,
receiptDir: DEFAULT_RECEIPT_DIR,
watchdogScript: DEFAULT_WATCHDOG_SCRIPT,
dispatcherScript: DEFAULT_DISPATCHER_SCRIPT,
supervisorScript: DEFAULT_SUPERVISOR_SCRIPT,
senderCommand: null,
senderMode: null,
openclawBin: 'openclaw',
now: null,
compact: false,
writeState: false,
claim: false,
dryRun: false,
help: false,
};
for (let i = 0; i < argv.length; i += 1) { for (let i = 0; i < argv.length; i += 1) {
const token = argv[i]; const token = argv[i];
@@ -97,105 +74,13 @@ export function printOrchestratorHelp(options = {}) {
process.stdout.write(`${formatOrchestratorHelp(options)}\n`); process.stdout.write(`${formatOrchestratorHelp(options)}\n`);
} }
export function buildSenderCommand(args) {
if (args.senderCommand) return args.senderCommand;
if (!args.senderMode) return null;
const cmd = [
JSON.stringify(process.execPath),
JSON.stringify(DEFAULT_SENDER_BINDING_SCRIPT),
'--mode', JSON.stringify(args.senderMode),
'--openclaw-bin', JSON.stringify(args.openclawBin),
'--compact',
];
return cmd.join(' ');
}
export function runNodeScript(scriptPath, scriptArgs) {
return spawnSync(process.execPath, [scriptPath, ...scriptArgs], {
cwd: packageRoot,
encoding: 'utf8',
});
}
export function parseJsonOutput(label, result) {
const stdout = result.stdout ?? '';
try {
return stdout.trim() ? JSON.parse(stdout) : null;
} catch (error) {
throw new Error(`${label} emitted non-JSON stdout: ${error instanceof Error ? error.message : String(error)}`);
}
}
export function ensureSuccess(label, result) {
if (result.status !== 0) {
throw new Error(`${label} failed with status ${result.status ?? 'null'}: ${(result.stderr ?? '').trim() || '(no stderr)'}`);
}
}
export function runWatchdogAutoNotifyOrchestrator(args) { export function runWatchdogAutoNotifyOrchestrator(args) {
const senderCommand = buildSenderCommand(args); const payload = runOrchestratorExecution(args, { senderBindingScript: DEFAULT_SENDER_BINDING_SCRIPT });
const watchdogArgs = [
'--state', path.resolve(args.state),
'--evidence-dir', path.resolve(args.evidenceDir),
'--event-dir', path.resolve(args.eventDir),
'--notification-dir', path.resolve(args.queueDir),
'--compact',
];
if (args.writeState) watchdogArgs.push('--write-state');
if (args.now) watchdogArgs.push('--now', args.now);
const watchdog = runNodeScript(path.resolve(args.watchdogScript), watchdogArgs);
ensureSuccess('watchdog runner', watchdog);
const watchdogPayload = parseJsonOutput('watchdog runner', watchdog);
const dispatcherArgs = [
'--queue-dir', path.resolve(args.queueDir),
'--spool-dir', path.resolve(args.spoolDir),
'--compact',
];
if (args.claim) dispatcherArgs.push('--claim');
if (args.now) dispatcherArgs.push('--now', args.now);
const dispatcher = runNodeScript(path.resolve(args.dispatcherScript), dispatcherArgs);
ensureSuccess('dispatcher', dispatcher);
const dispatcherPayload = parseJsonOutput('dispatcher', dispatcher);
const supervisorArgs = [
'--queue-dir', path.resolve(args.queueDir),
'--spool-dir', path.resolve(args.spoolDir),
'--receipt-dir', path.resolve(args.receiptDir),
'--dispatcher-script', path.resolve(args.dispatcherScript),
'--compact',
];
if (args.dryRun) supervisorArgs.push('--dry-run');
if (senderCommand) supervisorArgs.push('--sender-command', senderCommand);
if (args.now) supervisorArgs.push('--now', args.now);
const supervisor = runNodeScript(path.resolve(args.supervisorScript), supervisorArgs);
ensureSuccess('bridge supervisor', supervisor);
const supervisorPayload = parseJsonOutput('bridge supervisor', supervisor);
return { return {
ok: true, ...payload,
tool: 'watchdog_auto_notify_orchestrator',
version: 'mvp-v1',
now: args.now ?? null,
executionOrder: [
'runner',
'queue',
'dispatcher',
'bridge',
senderCommand ? 'sender' : 'sender_unconfigured',
'ack_or_blocked_or_pending',
],
orchestration: { orchestration: {
script: path.resolve(import.meta.filename), script: path.resolve(import.meta.filename),
senderCommandConfigured: Boolean(senderCommand), ...payload.orchestration,
senderMode: args.senderMode ?? null,
dryRun: args.dryRun,
},
result: {
watchdog: watchdogPayload?.result ?? null,
dispatcher: dispatcherPayload?.result ?? null,
supervisor: supervisorPayload?.result ?? null,
}, },
}; };
} }
@@ -222,4 +107,4 @@ export function main(argv = process.argv.slice(2), options = {}) {
} }
} }
export { packageRoot }; export { buildSenderCommand, packageRoot };

View File

@@ -0,0 +1,104 @@
import path from 'node:path';
import process from 'node:process';
import { runWatchdogAdapter } from './watchdog.mjs';
import { runDispatcherAdapter } from './dispatcher.mjs';
import { runBridgeSupervisorAdapter } from './bridge-supervisor.mjs';
export const DEFAULT_OPENCLAW_BIN = 'openclaw';
export function createDefaultOrchestratorExecutionArgs({ packageRoot }) {
return {
state: path.join(packageRoot, 'memory', 'watchdog-state.json'),
evidenceDir: path.join(packageRoot, 'state', 'long-task-watchdog'),
eventDir: path.join(packageRoot, 'state', 'long-task-watchdog-events'),
queueDir: path.join(packageRoot, 'state', 'operator-notify-queue'),
spoolDir: path.join(packageRoot, 'state', 'operator-notify-dispatch-spool'),
receiptDir: path.join(packageRoot, 'state', 'operator-notify-bridge-receipts'),
watchdogScript: path.join(packageRoot, 'scripts', 'long_task_watchdog.mjs'),
dispatcherScript: path.join(packageRoot, 'scripts', 'operator_notify_dispatcher.mjs'),
supervisorScript: path.join(packageRoot, 'scripts', 'operator_notify_bridge_supervisor.mjs'),
senderCommand: null,
senderMode: null,
openclawBin: DEFAULT_OPENCLAW_BIN,
now: null,
compact: false,
writeState: false,
claim: false,
dryRun: false,
help: false,
};
}
export function buildSenderCommand(args, { senderBindingScript }) {
if (args.senderCommand) return args.senderCommand;
if (!args.senderMode) return null;
const cmd = [
JSON.stringify(process.execPath),
JSON.stringify(senderBindingScript),
'--mode', JSON.stringify(args.senderMode),
'--openclaw-bin', JSON.stringify(args.openclawBin),
'--compact',
];
return cmd.join(' ');
}
export function runOrchestratorExecution(args, { senderBindingScript } = {}) {
const senderCommand = buildSenderCommand(args, { senderBindingScript });
const watchdogPayload = runWatchdogAdapter({
scriptPath: path.resolve(args.watchdogScript),
state: args.state,
evidenceDir: args.evidenceDir,
eventDir: args.eventDir,
notificationDir: args.queueDir,
now: args.now,
compact: true,
writeState: args.writeState,
});
const dispatcherPayload = runDispatcherAdapter({
scriptPath: path.resolve(args.dispatcherScript),
queueDir: args.queueDir,
spoolDir: args.spoolDir,
now: args.now,
compact: true,
claim: args.claim,
});
const supervisorPayload = runBridgeSupervisorAdapter({
scriptPath: path.resolve(args.supervisorScript),
queueDir: args.queueDir,
spoolDir: args.spoolDir,
receiptDir: args.receiptDir,
dispatcherScript: path.resolve(args.dispatcherScript),
senderCommand,
now: args.now,
compact: true,
dryRun: args.dryRun,
});
return {
ok: true,
tool: 'watchdog_auto_notify_orchestrator',
version: 'mvp-v1',
now: args.now ?? null,
executionOrder: [
'runner',
'queue',
'dispatcher',
'bridge',
senderCommand ? 'sender' : 'sender_unconfigured',
'ack_or_blocked_or_pending',
],
orchestration: {
senderCommandConfigured: Boolean(senderCommand),
senderMode: args.senderMode ?? null,
dryRun: args.dryRun,
},
result: {
watchdog: watchdogPayload?.result ?? null,
dispatcher: dispatcherPayload?.result ?? null,
supervisor: supervisorPayload?.result ?? null,
},
};
}

View File

@@ -1,7 +1,7 @@
import path from 'node:path'; import path from 'node:path';
import { ensureSuccess, parseJsonStdout, runNodeScript } from './_script-runner.mjs';
import { createRuntimeBinding, resolveScriptPath } from './runtime-binding.mjs'; import { createRuntimeBinding, resolveScriptPath } from './runtime-binding.mjs';
import { loadDeploymentProfileArtifact, createDeploymentBindingContract, assertUseTimePathWithinRepoRoot } from '../storage/profile-artifact.mjs'; import { loadDeploymentProfileArtifact, createDeploymentBindingContract, assertUseTimePathWithinRepoRoot } from '../storage/profile-artifact.mjs';
import { runOrchestratorExecution } from './orchestrator-execution.mjs';
export function runOrchestratorAdapter({ export function runOrchestratorAdapter({
scriptPath = null, scriptPath = null,
@@ -42,7 +42,7 @@ export function runOrchestratorAdapter({
cwd: repoRootOverride, cwd: repoRootOverride,
scripts: resolvedDeploymentBinding?.scripts, scripts: resolvedDeploymentBinding?.scripts,
}); });
const resolvedScriptPath = path.resolve(scriptPath ?? resolvedDeploymentBinding?.entrypoint ?? resolveScriptPath('orchestrator', { runtimeBinding: binding })); const resolvedOrchestratorScriptPath = path.resolve(scriptPath ?? resolvedDeploymentBinding?.entrypoint ?? resolveScriptPath('orchestrator', { runtimeBinding: binding }));
const resolvedWatchdogScript = path.resolve(watchdogScript ?? resolveScriptPath('watchdog', { runtimeBinding: binding })); const resolvedWatchdogScript = path.resolve(watchdogScript ?? resolveScriptPath('watchdog', { runtimeBinding: binding }));
const resolvedDispatcherScript = path.resolve(dispatcherScript ?? resolveScriptPath('dispatcher', { runtimeBinding: binding })); const resolvedDispatcherScript = path.resolve(dispatcherScript ?? resolveScriptPath('dispatcher', { runtimeBinding: binding }));
const resolvedSupervisorScript = path.resolve(supervisorScript ?? resolveScriptPath('bridgeSupervisor', { runtimeBinding: binding })); const resolvedSupervisorScript = path.resolve(supervisorScript ?? resolveScriptPath('bridgeSupervisor', { runtimeBinding: binding }));
@@ -59,26 +59,33 @@ export function runOrchestratorAdapter({
? path.resolve(receiptDir) ? path.resolve(receiptDir)
: null; : null;
const args = []; const payload = runOrchestratorExecution({
if (state) args.push('--state', path.resolve(state)); state,
if (evidenceDir) args.push('--evidence-dir', path.resolve(evidenceDir)); evidenceDir,
if (eventDir) args.push('--event-dir', path.resolve(eventDir)); eventDir,
if (resolvedQueueDir) args.push('--queue-dir', resolvedQueueDir); queueDir: resolvedQueueDir,
if (resolvedSpoolDir) args.push('--spool-dir', resolvedSpoolDir); spoolDir: resolvedSpoolDir,
if (resolvedReceiptDir) args.push('--receipt-dir', resolvedReceiptDir); receiptDir: resolvedReceiptDir,
if (resolvedWatchdogScript) args.push('--watchdog-script', resolvedWatchdogScript); watchdogScript: resolvedWatchdogScript,
if (resolvedDispatcherScript) args.push('--dispatcher-script', resolvedDispatcherScript); dispatcherScript: resolvedDispatcherScript,
if (resolvedSupervisorScript) args.push('--supervisor-script', resolvedSupervisorScript); supervisorScript: resolvedSupervisorScript,
if (senderCommand) args.push('--sender-command', senderCommand); senderCommand,
if (senderMode) args.push('--sender-mode', senderMode); senderMode,
if (openclawBin) args.push('--openclaw-bin', openclawBin); openclawBin,
if (now) args.push('--now', now); now,
if (writeState) args.push('--write-state'); compact,
if (claim) args.push('--claim'); writeState,
if (dryRun) args.push('--dry-run'); claim,
if (compact) args.push('--compact'); dryRun,
}, {
senderBindingScript: resolveScriptPath('senderBinding', { runtimeBinding: binding }),
});
const result = runNodeScript(resolvedScriptPath, args, { runtimeBinding: binding }); return {
ensureSuccess('orchestrator adapter', result); ...payload,
return parseJsonStdout('orchestrator adapter', result); orchestration: {
script: resolvedOrchestratorScriptPath,
...payload.orchestration,
},
};
} }

View File

@@ -0,0 +1,75 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import fs from 'node:fs';
import os from 'node:os';
import path from 'node:path';
import { runOrchestratorExecution } from '../src/adapters/orchestrator-execution.mjs';
const packageRoot = path.resolve(import.meta.dirname, '..');
function createFixtureRoot() {
return fs.mkdtempSync(path.join(os.tmpdir(), 'reporting-governance-orchestrator-core-'));
}
function mkdirs(root, names) {
for (const name of names) fs.mkdirSync(path.join(root, name), { recursive: true });
}
function writeState(root) {
const statePath = path.join(root, 'watchdog-state.json');
fs.writeFileSync(statePath, `${JSON.stringify({
version: 1,
watchdogs: [{
id: 'reporting-governance-plugin-watchdog',
task: 'reporting-governance plugin spec development',
status: 'active',
ownerSessionKey: 'agent:coder:main',
reportChannel: 'telegram',
reportTarget: '864811879',
intervalMinutes: 10,
lastMilestoneAt: '2026-05-07T08:00:00.000Z',
lastAlertAt: null,
}],
}, null, 2)}\n`, 'utf8');
return statePath;
}
test('execution core runs adapter chain directly without orchestrator script shell hop', () => {
const root = createFixtureRoot();
try {
mkdirs(root, ['evidence', 'events', 'queue', 'spool', 'receipts']);
const statePath = writeState(root);
const payload = runOrchestratorExecution({
state: statePath,
evidenceDir: path.join(root, 'evidence'),
eventDir: path.join(root, 'events'),
queueDir: path.join(root, 'queue'),
spoolDir: path.join(root, 'spool'),
receiptDir: path.join(root, 'receipts'),
watchdogScript: path.join(packageRoot, 'scripts', 'long_task_watchdog.mjs'),
dispatcherScript: path.join(packageRoot, 'scripts', 'operator_notify_dispatcher.mjs'),
supervisorScript: path.join(packageRoot, 'scripts', 'operator_notify_bridge_supervisor.mjs'),
senderCommand: `node -e "process.stdout.write(JSON.stringify({state:'sent'}))"`,
now: '2026-05-07T08:20:00.000Z',
writeState: true,
}, {
senderBindingScript: path.join(packageRoot, 'scripts', 'operator_notify_sender_binding.mjs'),
});
assert.equal(payload.ok, true);
assert.deepEqual(payload.executionOrder, ['runner', 'queue', 'dispatcher', 'bridge', 'sender', 'ack_or_blocked_or_pending']);
assert.equal(payload.orchestration.senderCommandConfigured, true);
assert.equal(payload.result.watchdog.notificationCount, 1);
assert.equal(payload.result.dispatcher.dispatchedCount, 1);
assert.equal(payload.result.supervisor.ackedCount, 1);
const queueFiles = fs.readdirSync(path.join(root, 'queue')).filter((name) => name.endsWith('.json'));
assert.equal(queueFiles.length, 1);
const queueItem = JSON.parse(fs.readFileSync(path.join(root, 'queue', queueFiles[0]), 'utf8'));
assert.equal(queueItem.status, 'acked');
} finally {
fs.rmSync(root, { recursive: true, force: true });
}
});