This document describes the WebSocket API used to run workflows, stream results, and receive real-time updates from the NodeTool backend.
Overview
NodeTool exposes a single WebSocket endpoint for workflow execution and live updates. Clients connect once and multiplex commands and responses for many concurrent workflows over that connection.
- Endpoint:
ws(s)://<host>/ws - Auth: Include a bearer token as a query parameter (
?api_key=<token>) or rely on the same auth flow used by REST endpoints. Tokens are optional inlocal/noneauth modes. - Protocol: Binary (MessagePack) or Text (JSON) frames. The server auto-detects frame type.
See workflow_runner/js/workflow-runner.js for a complete client implementation used by the bundled runner UI.
Encoding
The protocol supports two encoding modes on the same connection:
| Mode | Frame type | When to use |
|---|---|---|
| Binary (default) | Binary WebSocket frame | MessagePack-encoded. Preferred for production use β compact format and native binary data (images, audio) without Base64 overhead. |
| Text | Text WebSocket frame | JSON-encoded. Useful for debugging, curl/websocat testing, and lightweight clients that donβt need binary payloads. |
Binary frames are decoded with MessagePack; text frames are decoded as JSON.
Both modes are interchangeable β the server accepts either and responds in the
same format. Every message is a map/object with at least a type field.
Connection Lifecycle
- Connect β open a WebSocket to
ws://<host>/ws(orwss://for TLS). For binary mode setbinaryType = "arraybuffer". - Ready β the
onopenevent fires; the client can now send commands. - Reconnect β if the connection drops, retry with exponential backoff (the reference client retries up to 10 times starting at 1 s).
- Close β call
socket.close()or let the server close the connection.
Client β Server Commands
All client messages contain command and data fields.
run_job
Start a new workflow execution.
{
"command": "run_job",
"data": {
"type": "run_job_request",
"api_url": "http://localhost:7777/api",
"workflow_id": "<uuid>",
"job_type": "workflow",
"auth_token": "<token>",
"params": { "<input_name>": "<value>" },
"job_id": "<uuid>",
"user_id": "<user_id>",
"graph": {
"nodes": [],
"edges": []
},
"explicit_types": false,
"execution_strategy": "threaded",
"resource_limits": null
}
}
| Field | Type | Description |
|---|---|---|
type |
"run_job_request" |
Constant discriminator |
api_url |
string |
Base URL of the REST API |
workflow_id |
string |
UUID of the workflow to run |
job_type |
string |
Always "workflow" for workflow runs |
auth_token |
string |
Bearer token for authentication |
params |
object |
Input parameter values keyed by input name |
job_id |
string \| null |
Optional client-generated UUID to track the job |
user_id |
string |
User identifier |
graph |
object \| null |
Optional graph override ({ nodes, edges }) |
explicit_types |
boolean |
When false, let the server infer types |
execution_strategy |
string |
"threaded" (default) or "subprocess" |
resource_limits |
object \| null |
Optional resource constraints |
cancel_job
Cancel a running job.
{
"command": "cancel_job",
"data": {
"job_id": "<uuid>",
"workflow_id": "<uuid>"
}
}
pause_job
Pause a running job.
{
"command": "pause_job",
"data": {
"job_id": "<uuid>",
"workflow_id": "<uuid>"
}
}
resume_job
Resume a paused or suspended job.
{
"command": "resume_job",
"data": {
"job_id": "<uuid>",
"workflow_id": "<uuid>"
}
}
reconnect_job
Reconnect to an in-flight job (e.g. after a page reload). The server replays any missed updates.
{
"command": "reconnect_job",
"data": {
"job_id": "<uuid>",
"workflow_id": "<uuid>"
}
}
stream_input
Push a value into a streaming input node while a job is running.
{
"command": "stream_input",
"data": {
"job_id": "<uuid>",
"workflow_id": "<uuid>",
"input": "<input_name>",
"value": "<any>",
"handle": "<handle_name | null>"
}
}
end_input_stream
Signal that a streaming input is complete.
{
"command": "end_input_stream",
"data": {
"job_id": "<uuid>",
"workflow_id": "<uuid>",
"input": "<input_name>",
"handle": "<handle_name | null>"
}
}
Server β Client Messages
Every server message contains a type field used for dispatch. Messages also
include routing fields (workflow_id, job_id, or thread_id) so the client
can multiplex updates across concurrent workflows.
job_update
Reports overall job status changes.
{
"type": "job_update",
"status": "running",
"job_id": "<uuid>",
"workflow_id": "<uuid>",
"message": "optional status text",
"error": null,
"traceback": null,
"result": null,
"duration": null,
"run_state": null
}
| Field | Type | Description |
|---|---|---|
status |
string |
"queued", "running", "completed", "failed", "timed_out", "cancelled", "suspended", or "paused" |
job_id |
string \| null |
Job UUID |
workflow_id |
string \| null |
Workflow UUID for routing |
message |
string \| null |
Human-readable status message |
error |
string \| null |
Error description on failure |
traceback |
string \| null |
Python traceback on failure |
result |
object \| null |
Final result map on completion |
duration |
number \| null |
Execution duration in seconds |
run_state |
object \| null |
Extended state info (e.g. suspension reason) |
node_update
Reports per-node status changes during execution.
{
"type": "node_update",
"node_id": "<uuid>",
"node_name": "GenerateImage",
"node_type": "nodetool.image.Generate",
"status": "running",
"error": null,
"result": null,
"properties": null,
"workflow_id": "<uuid>"
}
| Field | Type | Description |
|---|---|---|
node_id |
string |
Node UUID |
node_name |
string |
Display name of the node |
node_type |
string |
Fully qualified node type |
status |
string |
"booting", "starting", "running", "completed", or "error" |
error |
string \| null |
Error message if the node failed |
result |
object \| null |
Node output on completion |
properties |
object \| null |
Updated node properties |
workflow_id |
string \| null |
Workflow UUID for routing |
node_progress
Reports progress for long-running nodes (e.g. image generation steps).
{
"type": "node_progress",
"node_id": "<uuid>",
"progress": 3,
"total": 20,
"chunk": "",
"workflow_id": "<uuid>"
}
| Field | Type | Description |
|---|---|---|
node_id |
string |
Node UUID |
progress |
number |
Current step |
total |
number |
Total steps |
chunk |
string |
Optional text chunk for streaming output |
workflow_id |
string \| null |
Workflow UUID for routing |
output_update
Delivers a final output value from an output node.
{
"type": "output_update",
"node_id": "<uuid>",
"node_name": "ImageOutput",
"output_name": "image",
"value": { "type": "image", "data": "<binary>" },
"output_type": "image",
"metadata": {},
"workflow_id": "<uuid>"
}
| Field | Type | Description |
|---|---|---|
node_id |
string |
Node UUID |
node_name |
string |
Display name |
output_name |
string |
Name of the output handle |
value |
any |
The output value (see Value Types) |
output_type |
string |
Type descriptor (e.g. "image", "string") |
metadata |
object |
Additional metadata |
workflow_id |
string \| null |
Workflow UUID for routing |
preview_update
Delivers an intermediate preview value during execution.
{
"type": "preview_update",
"node_id": "<uuid>",
"value": { "type": "image", "data": "<binary>" }
}
| Field | Type | Description |
|---|---|---|
node_id |
string |
Node UUID |
value |
any |
Preview data (see Value Types) |
edge_update
Reports data flow status on a connection between nodes.
{
"type": "edge_update",
"workflow_id": "<uuid>",
"edge_id": "<edge_id>",
"status": "active",
"counter": 5
}
| Field | Type | Description |
|---|---|---|
workflow_id |
string |
Workflow UUID |
edge_id |
string |
Edge identifier |
status |
string |
Edge status |
counter |
number \| null |
Number of items that have passed through |
log_update
Streams log output from a running node.
{
"type": "log_update",
"node_id": "<uuid>",
"node_name": "RunModel",
"content": "Loading model weights...",
"severity": "info",
"workflow_id": "<uuid>"
}
| Field | Type | Description |
|---|---|---|
node_id |
string |
Node UUID |
node_name |
string |
Display name |
content |
string |
Log text |
severity |
string |
"info", "warning", or "error" |
workflow_id |
string \| null |
Workflow UUID for routing |
notification
Server-initiated notification to display to the user.
{
"type": "notification",
"node_id": "<uuid>",
"content": "GPU memory low",
"severity": "warning",
"workflow_id": "<uuid>"
}
| Field | Type | Description |
|---|---|---|
node_id |
string |
Originating node UUID |
content |
string |
Notification text |
severity |
string |
"info", "warning", or "error" |
workflow_id |
string \| null |
Workflow UUID for routing |
planning_update
Reports agent planning phases.
{
"type": "planning_update",
"phase": "analyzing",
"status": "running",
"node_id": "<uuid>",
"content": "Determining approach...",
"workflow_id": "<uuid>"
}
task_update
Reports agent task progress.
{
"type": "task_update",
"task": { "...task object..." },
"event": "started",
"node_id": "<uuid>",
"workflow_id": "<uuid>"
}
tool_call_update
Reports when an agent node invokes a tool.
{
"type": "tool_call_update",
"name": "web_search",
"args": { "query": "example" },
"node_id": "<uuid>",
"tool_call_id": "<id>",
"workflow_id": "<uuid>"
}
tool_result_update
Delivers the result of a tool call.
{
"type": "tool_result_update",
"node_id": "<uuid>",
"result": { "...result data..." },
"workflow_id": "<uuid>"
}
prediction
Reports prediction/inference status from an external provider.
{
"type": "prediction",
"id": "<prediction_id>",
"node_id": "<uuid>",
"status": "running",
"user_id": "<user>",
"workflow_id": "<uuid>",
"logs": "Downloading model...",
"error": null,
"duration": null
}
chunk
Streams incremental text/media content from a node.
{
"type": "chunk",
"content": "Hello ",
"content_type": "text",
"content_metadata": {},
"done": false,
"thinking": false,
"node_id": "<uuid>",
"workflow_id": "<uuid>"
}
| Field | Type | Description |
|---|---|---|
content |
string |
Content fragment |
content_type |
string |
"text", "audio", "image", "video", or "document" |
content_metadata |
object |
Extra metadata for the content |
done |
boolean |
true on the final chunk |
thinking |
boolean |
true when the model is in reasoning/thinking mode |
node_id |
string \| null |
Node UUID |
workflow_id |
string \| null |
Workflow UUID for routing |
Value Types
Output and preview values are typically objects with a type discriminator:
| Type | Shape | Description |
|---|---|---|
| Image | { "type": "image", "data": <binary> } |
Raw image bytes (PNG) |
| Audio | { "type": "audio", "data": <binary> } |
Raw audio bytes (MP3/WAV) |
| Video | { "type": "video", "data": <binary> } |
Raw video bytes (MP4) |
| Text | "plain string" |
Simple string value |
| Number | 42 or 3.14 |
Numeric value |
| Boolean | true / false |
Boolean value |
| Object | { ... } |
Arbitrary JSON-like object |
Binary data in MessagePack frames is transmitted as raw byte arrays, avoiding Base64 overhead. In JSON/text mode, binary payloads are Base64-encoded strings.
Message Routing
The server tags each message with one or more routing keys:
workflow_idβ primary key for workflow execution updates.job_idβ fallback whenworkflow_idis not present.thread_idβ used for chat/conversation streams.
The GlobalWebSocketManager in the main web app multiplexes a single
connection and dispatches messages to per-workflow handlers based on these keys.
The standalone workflow runner uses a simpler approach, handling all messages in
a single callback.
Typical Message Sequence
Client Server
| |
|--- run_job ---------------------->|
| |
|<------------- job_update (queued) |
|<------------ job_update (running) |
| |
|<---- node_update (node A running) |
|<--- node_progress (node A 1/10) |
|<--- node_progress (node A 5/10) |
|<-- node_update (node A completed) |
| |
|<---- node_update (node B running) |
|<---------- preview_update (B) |
|<-- node_update (node B completed) |
| |
|<---------- output_update (final) |
|<-------- job_update (completed) |
| |
Error Handling
- Connection errors trigger automatic reconnection with exponential backoff.
- A 120-second timeout is applied to each
run_jobcall; if no terminaljob_updatearrives the promise is rejected. - Server errors are delivered as
job_updatemessages withstatus: "failed"and anerrorfield, or as standaloneerrortype messages. - Per-node errors arrive via
node_updatewith anerrorfield set.
Quick Start Examples
Binary mode (MessagePack)
const socket = new WebSocket("ws://localhost:7777/ws");
socket.binaryType = "arraybuffer";
socket.onopen = () => {
socket.send(
msgpack.encode({
command: "run_job",
data: {
type: "run_job_request",
workflow_id: "YOUR_WORKFLOW_ID",
auth_token: "YOUR_TOKEN",
job_type: "workflow",
params: { prompt: "hello world" }
}
})
);
};
socket.onmessage = (event) => {
const data = msgpack.decode(new Uint8Array(event.data));
console.log(data.type, data);
};
Text mode (JSON)
const socket = new WebSocket("ws://localhost:7777/ws");
socket.onopen = () => {
socket.send(
JSON.stringify({
command: "run_job",
data: {
type: "run_job_request",
workflow_id: "YOUR_WORKFLOW_ID",
auth_token: "YOUR_TOKEN",
job_type: "workflow",
params: { prompt: "hello world" }
}
})
);
};
socket.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log(data.type, data);
};