Skip to main content

GraphProcessor

Description

GraphProcessor is the low-level execution engine for a Rivet graph. It walks the graph, resolves node inputs, executes node implementations, runs subgraphs, tracks events, handles control flow, and returns graph outputs as DataValue records.

Most integrations should use the higher-level helpers from @valerypopoff/rivet2-node, such as runGraph, runGraphInFile, or createProcessor. Use GraphProcessor directly when you are building a runtime, debugger, executor, test harness, or custom embedding that needs direct access to execution events and lifecycle controls.

Constructor

new GraphProcessor(
project: Project,
graphId: GraphId | undefined,
registry: NodeRegistration<any, any>,
includeTrace?: boolean,
options?: {
concurrency?: GraphProcessorConcurrency;
},
);

project

Type: Project

The Rivet project that contains the graph to run.

graphId

Type: GraphId | undefined

The graph ID to run. If undefined, the processor uses project.metadata.mainGraphId. The constructor throws if it cannot resolve a graph.

registry

Type: NodeRegistration<any, any>

The node registry used to instantiate built-in and plugin node implementations.

includeTrace

Type: boolean | undefined

Controls whether trace events are emitted. The current default is trace-enabled unless explicitly disabled.

options.concurrency

Type: GraphProcessorConcurrency

Controls processor-level concurrency.

export type GraphProcessorConcurrency = {
nodeConcurrency?: number;
splitRunConcurrency?: number;
};

nodeConcurrency defaults to 8. splitRunConcurrency defaults to 4. Invalid, non-finite, or values below 1 fall back to the default. Node-level Max concurrent runs can override split-run concurrency for a specific many-runs node.

Running a Graph

const outputs = await processor.processGraph(context, inputs, contextValues);

processGraph(context, inputs?, contextValues?)

processGraph(
context: ProcessContext,
inputs?: Record<string, DataValue>,
contextValues?: Record<string, DataValue>,
): Promise<GraphOutputs>

Runs the selected graph and resolves with graph outputs.

inputs are values for Graph Input nodes in the main graph. contextValues are values available to Context nodes throughout the graph and all subgraphs. Context nodes coerce their resolved runtime value or fallback default to the node's configured data type before emitting an output.

processGraph throws if the processor is already running. A GraphProcessor instance is intended for one active run at a time.

When the run succeeds, GraphProcessor adds a cost output if the graph did not already produce one.

ProcessContext

ProcessContext provides host/runtime services to nodes:

export type ProcessContext = {
settings: Settings;
nativeApi?: NativeApi;
datasetProvider?: DatasetProvider;
mcpProvider?: MCPProvider;
audioProvider?: AudioProvider;
tokenizer: Tokenizer;
codeRunner?: CodeRunner;
projectReferenceLoader?: ProjectReferenceLoader;
projectPath?: string;
editorExecutionCache?: Map<string, unknown>;
getChatNodeEndpoint?: (
configuredEndpoint: string,
configuredModel: string,
) => ChatNodeEndpointInfo | Promise<ChatNodeEndpointInfo>;
};

If the project has references, projectReferenceLoader is required. projectPath is used by loaders that resolve references or file-relative behavior.

Outputs and Inputs

export type GraphOutputs = Record<string, DataValue>;
export type GraphInputs = Record<string, DataValue>;
export type Inputs = Record<PortId, DataValue | undefined>;
export type Outputs = Record<PortId, DataValue | undefined>;

GraphOutputs is keyed by Graph Output ID. Node-level Inputs and Outputs are keyed by port ID.

Events

GraphProcessor exposes Emittery methods:

processor.on(eventName, listener);
processor.off(eventName, listener);
processor.once(eventName);
processor.onAny(listener);
processor.offAny(listener);

The main event map is ProcessEvents:

EventWhen it fires
startRoot graph processing starts.
graphStartA graph or subgraph starts.
graphFinishA graph or subgraph finishes.
graphErrorA graph or subgraph fails.
graphAbortA graph is aborted.
nodeStartA node starts with resolved inputs.
nodeFinishA node finishes with outputs.
nodeErrorA node errors.
nodeExcludedA node is skipped due to disabled state, conditional state, control-flow exclusion, or an unconnected required input.
partialOutputA node emits partial output while still running.
nodeOutputsClearedPreviously displayed outputs for a node should be cleared.
userInputA User Input node is waiting for user input.
errorRoot graph execution fails.
doneRoot graph execution completes successfully.
abortRoot graph execution is aborted.
finishRoot graph processing has finished, successful or not.
traceA trace message is emitted when trace is enabled.
pauseThe processor is paused.
resumeThe processor is resumed.
globalSetA graph global value is set.
newAbortControllerA node-level AbortController is created.
userEvent:${name}A custom user event is raised.
globalSet:${id}A specific graph global value is set.

Most event payloads include execution metadata:

export type GraphExecutionMetadata = {
rootRunId: RootRunId;
graphRunId: GraphRunId;
graphId: GraphId;
parentGraphRunId?: GraphRunId;
executor?: {
nodeId: NodeId;
parentGraphId: GraphId;
processId: ProcessId;
splitIndex?: number;
};
};

This identifies the root run, current graph invocation, parent subgraph invocation, and the Subgraph node that invoked the current graph when relevant.

Async Event Stream

for await (const event of processor.events()) {
console.log(event.type, event);
}

events() returns an async generator of ProcessEvent objects. Each object has a type field plus the event payload. The generator ends after the root finish event.

Lifecycle Controls

abort(successful?, error?)

await processor.abort(successful, error);

Aborts the current run. If successful is true, the graph is treated as intentionally stopped rather than failed. If error is provided, it is used as the abort error.

pause()

Pauses graph processing before the next node execution.

resume()

Resumes a paused processor.

isRunning

Read-only getter that reports whether the processor is currently running.

setSlowMode(slowMode)

Sets the public slowMode flag. This is used by UI/debugging flows that need slower execution visualization.

User Input

When a User Input node requests values, GraphProcessor emits a userInput event. Respond either by calling the event payload's callback, or by calling:

processor.userInput(nodeId, {
type: 'string[]',
value: ['answer one', 'answer two'],
});

The call is also forwarded to active subprocessors so nested graphs can receive input.

User Events

processor.onUserEvent('approved', (value) => {
console.log(value);
});

processor.offUserEvent(listener);

onUserEvent listens for userEvent:${name} events. Nodes can raise user events through their process context, and external code can raise them directly:

processor.raiseEvent('approved', { type: 'boolean', value: true });

raiseEvent is propagated through subprocessors.

External Functions

processor.setExternalFunction('lookupCustomer', async (context, customerId) => {
return {
type: 'object',
value: await lookupCustomer(customerId),
};
});

External functions are available to External Call nodes by name. The function receives an ExternalFunctionProcessContext and any graph-provided arguments, and must return a Promise<DataValue & { cost?: number }> or throw.

GraphProcessor also registers a default echo external function.

Run-To and Preloaded Partial Execution

GraphProcessor exposes runToNodeIds for targeted execution:

processor.runToNodeIds = [nodeId];

runToNodeIds restricts execution to the dependencies needed to reach the selected node or nodes.

Editor "Run from here" is built outside the core processor by calculating explicit runToNodeIds plus preloaded boundary inputs. Programmatic callers should use that same explicit shape instead of relying on runFromNodeId.

Preloading Node Data

processor.preloadNodeData(nodeId, {
output: { type: 'string', value: 'already computed' },
});

preloadNodeData marks a node as already visited and stores its outputs. Every preloaded output must be a valid DataValue.

This is mainly used by editor partial reruns. Do not preload a node that should execute in the upcoming run; preload only the already-computed boundary inputs that the targeted run should reuse. The Rivet editor also preserves output snapshots for nodes outside the rerun slice so their previous values remain visible, but that preservation is UI state and does not change GraphProcessor results.

Dependencies

const dependencies = processor.getDependencyNodesDeep(nodeId);

Returns all node IDs that the given node depends on. This method preprocesses the graph if needed.

Recording Replay

const outputs = await processor.replayRecording(recorder);

Replays a recorded execution through processor events and returns graph outputs. The recordingPlaybackChatLatency property controls the delay between replayed chat/node-finish events. The default is 1000 milliseconds.

Subprocessors

Subgraph nodes create child GraphProcessor instances internally. Subprocessors share execution cache, external functions, globals, context values, pause/resume state, and execution lineage with the root processor.

Call getRootProcessor() from processor-aware code when you need the top-level processor for a nested run.

Advanced Properties

PropertyPurpose
idGenerated processor instance ID.
executorOptional runtime label: 'nodejs' or 'browser'.
runToNodeIdsOptional target nodes for run-to-node execution.
runFromNodeIdLegacy editor field. Current scheduling uses explicit runToNodeIds plus preloadNodeData(...) instead.
recordingPlaybackChatLatencyReplay delay in milliseconds. Default 1000.
warnOnInvalidGraphEnables graph preprocessing warnings for invalid graph structures.
slowModePublic flag for slow/debug visualization. Prefer setSlowMode(...).

Direct Use Example

import { GraphProcessor, globalRivetNodeRegistry, type DataValue } from '@valerypopoff/rivet2-core';
import { loadProjectFromFile } from '@valerypopoff/rivet2-node';

const project = await loadProjectFromFile('./workflow.rivet-project');
const processor = new GraphProcessor(project, project.metadata.mainGraphId, globalRivetNodeRegistry, true, {
concurrency: {
nodeConcurrency: 8,
splitRunConcurrency: 4,
},
});

processor.on('nodeError', ({ node, error }) => {
console.error(`Node failed: ${node.title}`, error);
});

const outputs = await processor.processGraph(
{
settings: {},
tokenizer,
},
{
prompt: { type: 'string', value: 'Hello' } satisfies DataValue,
},
);

This example omits most host services. Real runtimes usually provide a full ProcessContext, or use createProcessor / runGraph from @valerypopoff/rivet2-node so Node defaults are supplied.

See Also