Essential Node Template
from typing import Any, AsyncGenerator, TypedDict, ClassVar
from pydantic import Field
from nodetool.workflows.base_node import BaseNode, InputNode, OutputNode
from nodetool.workflows.processing_context import ProcessingContext
# SIMPLE PROCESSING NODE
class SimpleNode(BaseNode):
"""
Clear description of what this node does.
keyword1, keyword2, keyword3
Use cases:
- What you can do with this
- Another useful application
- And one more example
"""
# Input properties
input_value: str = Field(default="", description="Help text")
threshold: int = Field(default=100, ge=0, le=255)
# Optional: Custom title in UI
@classmethod
def get_title(cls):
return "Node Title"
# Main execution method
async def process(self, context: ProcessingContext) -> str:
# Do work here
return f"Result: {self.input_value}"
# TYPED OUTPUT NODE
class TypedNode(BaseNode):
"""Multi-value output. keywords"""
class OutputType(TypedDict):
text: str
count: int
success: bool
async def process(self, context: ProcessingContext) -> OutputType:
return {
"text": "hello",
"count": 42,
"success": True
}
# STREAMING/GENERATOR NODE
class StreamingNode(BaseNode):
"""Emit multiple items. keywords"""
items: list[str] = []
class OutputType(TypedDict):
item: str
index: int
async def gen_process(
self, context: ProcessingContext
) -> AsyncGenerator[OutputType, None]:
for i, item in enumerate(self.items):
yield {"item": item, "index": i}
# INPUT NODE
class CustomInput(InputNode):
"""Parameter input. keywords"""
value: str = ""
@classmethod
def return_type(cls):
return str
async def process(self, context: ProcessingContext) -> str:
return self.value
# OUTPUT NODE
class CustomOutput(OutputNode):
"""Collect output. keywords"""
value: str = ""
async def process(self, context: ProcessingContext) -> str:
return self.value
Common Field Patterns
# Text input
text: str = Field(default="")
text: str = Field(default="", description="Help text")
# Number with constraints
count: int = Field(default=0, ge=0, le=100)
threshold: float = Field(default=0.5, ge=0.0, le=1.0)
# Optional
optional_value: str | None = Field(default=None)
# List
items: list[str] = Field(default=[])
tags: list[str] = Field(
default=["tag1", "tag2"],
description="List of tags"
)
# Enum choices
class MyEnum(str, Enum):
OPTION_A = "a"
OPTION_B = "b"
choice: MyEnum = Field(default=MyEnum.OPTION_A)
# Model selections
from nodetool.metadata.types import LanguageModel, ImageModel
model: LanguageModel = Field(default=LanguageModel())
image_model: ImageModel = Field(default=ImageModel())
# Asset references
from nodetool.metadata.types import ImageRef, AudioRef, VideoRef, DocumentRef, FolderRef
image: ImageRef = Field(default=ImageRef())
audio: AudioRef = Field(default=AudioRef())
folder: FolderRef = Field(default=FolderRef())
# Data structures
dataframe: DataframeRef = Field(default=DataframeRef())
columns: RecordType = Field(default=RecordType())
Processing Context Essentials
# Get provider for LLM calls
provider = await context.get_provider(Provider.OpenAI)
result = await provider.generate_message(model="gpt-4", ...)
# Work with images
image = await context.image_from_bytes(bytes_data)
pil_image = await context.image_to_pil(image_ref)
new_image = await context.image_from_pil(pil_image)
# Work with dataframes
df = await context.dataframe_to_pandas(df_ref)
new_df_ref = await context.dataframe_from_pandas(df)
# Work with assets
asset = await context.create_asset(filename, mime_type, file_obj, parent_id)
asset_url = await context.get_asset_url(asset_id)
file_bytes = await context.asset_to_bytes(ref)
# Emit updates to UI
context.post_message(SaveUpdate(
node_id=self.id,
name="filename",
value=result,
output_type="text"
))
Input Nodes Quick List
StringInput - Text value
IntegerInput - Whole number (min/max)
FloatInput - Decimal (min/max)
BooleanInput - True/False toggle
StringListInput - List of strings
HuggingFaceModelInput - Select HF model
LanguageModelInput - Select LLM
ImageModelInput - Select image model
ImageInput - Image asset reference
AudioInput - Audio asset reference
VideoInput - Video asset reference
DocumentInput - Document asset reference
AssetFolderInput - Folder asset reference
ColorInput - Color picker
CollectionInput - Vector DB collection
FolderPathInput - Local folder path
FilePathInput - Local file path
DocumentFileInput - Load document from file
Output Nodes Quick List
StringOutput - Text output
IntegerOutput - Number output
FloatOutput - Decimal output
BooleanOutput - True/False output
ListOutput - List output
DictionaryOutput - Dict output
ImageOutput - Image reference
AudioOutput - Audio reference
VideoOutput - Video reference
DocumentOutput - Document reference
DataframeOutput - Table data
ArrayOutput - NumPy array
FilePathOutput - File path
FolderPathOutput - Folder path
Special Node Features
# Make available to agents
_expose_as_tool: ClassVar[bool] = True
# Enable dynamic input connectors
_is_dynamic: ClassVar[bool] = True
# Support dynamic output slots
_supports_dynamic_outputs: ClassVar[bool] = True
# Stream input instead of batching
@classmethod
def is_streaming_input(cls) -> bool:
return True
# Emit streaming output
@classmethod
def is_streaming_output(cls) -> bool:
return True
# Hide from UI (for base classes)
@classmethod
def is_visible(cls):
return cls is not BaseClass
# Set basic fields shown first
@classmethod
def get_basic_fields(cls) -> list[str]:
return ["prompt", "model"]
# Define required inputs
def required_inputs(self):
return ["text"]
# Set return type
@classmethod
def return_type(cls):
return dict
Return Type Patterns
# Simple return
async def process(self, context) -> str:
return "result"
# Structured return (TypedDict)
class OutputType(TypedDict):
text: str
score: float
async def process(self, context) -> OutputType:
return {"text": "...", "score": 0.95}
# Multiple possible types
async def process(self, context) -> int | float | str:
return result
# Streaming (generator)
async def gen_process(self, context) -> AsyncGenerator[dict, None]:
for item in items:
yield {"output": item}
# Custom run method (advanced)
async def run(self, context, inputs: NodeInputs, outputs: NodeOutputs):
async for item in inputs.stream("input_field"):
await outputs.emit("output", processed_item)
Docstring Keywords by Category
Data Types text, string, number, integer, float, boolean, list, array, dict, object, document, file
Operations extract, filter, map, reduce, merge, split, join, sort, group, aggregate, transform, analyze
Media image, picture, visual, video, audio, sound, document, file, folder, asset
AI/ML model, embedding, classification, clustering, generation, language, agent, tool
Control flow, condition, loop, iterator, generator, stream, branch, switch
I/O input, output, load, save, read, write, import, export, download, upload
Common Node Patterns
Text Processing
# Concatenate
a: str
b: str
→ return self.a + self.b
# Split by delimiter
text: str
delimiter: str
→ return self.text.split(self.delimiter)
# Pattern matching
text: str
pattern: str
→ return re.findall(self.pattern, self.text)
List Operations
# Iterate with streaming
input_list: list[Any]
→ async def gen_process(...) -> AsyncGenerator[OutputType, None]:
for index, item in enumerate(self.input_list):
yield {"output": item, "index": index}
# Collect streamed items
input_item: Any
→ async for item in inputs.stream("input_item"):
collected.append(item)
Media Processing
# Load and process image
image: ImageRef
→ pil_img = await context.image_to_pil(self.image)
processed = PIL.Image.apply_filter(pil_img)
return await context.image_from_pil(processed)
# Work with files
folder: str
pattern: str
→ async def gen_process(...) -> AsyncGenerator[OutputType, None]:
for file in os.listdir(self.folder):
if fnmatch(file, self.pattern):
yield {"file": file, ...}
Provider Integration
model: LanguageModel
text: str
→ provider = await context.get_provider(self.model.provider)
response = await provider.generate_message(
model=self.model.id,
messages=[...],
...
)
return response.content
File Organization
my_node_file.py
from pydantic import Field
from typing import TypedDict, AsyncGenerator
from nodetool.workflows.base_node import BaseNode
from nodetool.workflows.processing_context import ProcessingContext
class Node1(BaseNode):
"""..."""
...
class Node2(BaseNode):
"""..."""
...
# Multiple nodes can be in one file
# All will be auto-discovered
Testing Pattern
# nodes/nodetool/my_nodes.py
class MyNode(BaseNode):
"""My node. keywords"""
value: str = ""
async def process(self, context: ProcessingContext) -> str:
return self.value.upper()
# tests/nodetool/test_my_nodes.py
import pytest
from nodetool.nodes.nodetool.my_nodes import MyNode
@pytest.mark.asyncio
async def test_my_node():
# Mock context
node = MyNode(value="hello")
# Would need proper context mock
# result = await node.process(context)
# assert result == "HELLO"
Key Reminders
- All nodes must be async (use
async def) - Docstring keywords are searchable and categorize the node
- Pydantic Field provides validation and UI hints
- ProcessingContext is your gateway to all services
- Use TypedDict for multiple outputs
- Use AsyncGenerator for streaming outputs
- Docstring format matters: “description. keywords” + “Use cases:”
- Node auto-discovery is automatic based on inheritance
- Always include helpful descriptions in Field
- Test with
pytest-asynciomarker:@pytest.mark.asyncio