Astreus

Graph

Workflow orchestration with dependency management and parallel execution Learn the setup patterns, APIs, and practical examples needed to build reliable...

Workflow orchestration with dependency management and parallel execution

Overview

The Graph system enables you to create complex workflows by connecting tasks and agents with dependencies, conditions, and parallel execution capabilities. It provides a visual and programmatic way to orchestrate multi-step processes, handle branching logic, and coordinate multiple agents working together.

Creating a Graph

Graphs are composed of nodes (tasks or agents) and edges (connections between them):

import { Graph } from '@astreus-ai/astreus';

// Create a workflow graph with agent reference
const agent = await Agent.create({
  name: 'ContentAgent',
  model: 'gpt-4o'
});

const graph = new Graph({
  name: 'content-creation-pipeline',
  description: 'Research and write technical content'
}, agent);  // Pass agent as second parameter

// Add task nodes
const researchNodeId = graph.addTaskNode({
  prompt: 'Research the latest TypeScript features and summarize key findings',
  model: 'gpt-4o',
  priority: 10,
  metadata: { type: 'research' }
});

const writeNodeId = graph.addTaskNode({
  prompt: 'Write a comprehensive blog post based on the research findings',
  model: 'gpt-4o',
  dependencies: [researchNodeId],  // Depends on research completion
  priority: 5,
  metadata: { type: 'writing' }
});

// Execute the graph
const results = await graph.run();

console.log('Success:', results.success);
console.log('Completed nodes:', results.completedNodes);
console.log('Failed nodes:', results.failedNodes);
console.log('Duration:', results.duration, 'ms');
console.log('Results:', results.results);

Graph Execution Flow

1

Node Resolution

Graph analyzes all nodes and their dependencies to determine execution order.

2

Parallel Execution

Independent nodes run simultaneously for optimal performance.

3

Dependency Waiting

Dependent nodes wait for their prerequisites to complete before starting.

4

Result Collection

All node outputs are collected and made available in the final result.

Advanced Example

Here's a complex workflow with dependencies, parallel execution, and error handling:

import { Graph } from '@astreus-ai/astreus';

// Create workflow graph with default agent
const agent = await Agent.create({
  name: 'OptimizationAgent',
  model: 'gpt-4o'
});

const graph = new Graph({
  name: 'code-optimization-pipeline',
  description: 'Analyze and optimize codebase',
  maxConcurrency: 3,   // Allow 3 parallel nodes
  timeout: 300000,     // 5 minute timeout
  retryAttempts: 2     // Retry failed nodes twice
}, agent);  // Pass agent as second parameter

// Add task nodes with proper configuration
const analysisNodeId = graph.addTaskNode({
  prompt: 'Analyze the codebase for performance issues and categorize them by severity',
  model: 'gpt-4o',
  priority: 10,  // High priority
  metadata: { step: 'analysis', category: 'review' }
});

const optimizationNodeId = graph.addTaskNode({
  prompt: 'Based on the analysis, implement performance optimizations',
  model: 'gpt-4o',
  dependencies: [analysisNodeId],  // Depends on analysis
  priority: 8,
  metadata: { step: 'optimization', category: 'implementation' }
});

const testNodeId = graph.addTaskNode({
  prompt: 'Run performance tests and validate the optimizations',
  model: 'gpt-4o',
  dependencies: [optimizationNodeId],  // Depends on optimization
  priority: 6,
  stream: true,  // Enable streaming for real-time feedback
  metadata: { step: 'testing', category: 'validation' }
});

const documentationNodeId = graph.addTaskNode({
  prompt: 'Document all changes and performance improvements',
  model: 'gpt-4o',
  dependencies: [analysisNodeId],  // Can run parallel to optimization
  priority: 5,  // Lower priority
  metadata: { step: 'documentation', category: 'docs' }
});

// Add edges (optional, as dependencies already create edges)
graph.addEdge(analysisNodeId, optimizationNodeId);
graph.addEdge(analysisNodeId, documentationNodeId);
graph.addEdge(optimizationNodeId, testNodeId);

// Execute the graph
const results = await graph.run();

console.log('Pipeline results:', results);
console.log('Completed nodes:', results.completedNodes);
console.log('Failed nodes:', results.failedNodes);
console.log('Duration:', results.duration, 'ms');

// Access individual node results
Object.entries(results.results).forEach(([nodeId, result]) => {
  console.log(`Node ${nodeId}:`, result);
});

// Check for errors
if (results.errors && Object.keys(results.errors).length > 0) {
  console.log('Errors:', results.errors);
}

Graph Configuration

Graphs support various configuration options:

interface GraphConfig {
  id?: string;                 // Optional graph ID (UUID)
  name: string;                // Graph name (required)
  description?: string;        // Graph description
  maxConcurrency?: number;     // Max parallel execution (default: 1)
  timeout?: number;            // Execution timeout in ms
  retryAttempts?: number;      // Retry attempts for failed nodes
  autoLink?: boolean;          // Automatically link nodes based on dependencies
  maxContextTokens?: number;   // Maximum context tokens for the graph
  contextWarningThreshold?: number; // Warning threshold for context usage (0-1, e.g., 0.8 = 80%)
  subAgentNodeTimeout?: number;     // Extended timeout for sub-agent nodes (default: 5 minutes)
  metadata?: MetadataObject;   // Custom metadata
  subAgentAware?: boolean;                           // Enable sub-agent awareness and optimization
  optimizeSubAgentUsage?: boolean;                   // Optimize sub-agent delegation patterns
  subAgentCoordination?: 'parallel' | 'sequential' | 'adaptive'; // Default sub-agent coordination
}

// Note: The default agent is passed as the second parameter to the constructor:
// new Graph(config, agent)
// The graph's defaultAgentId is automatically set from the agent's ID.

// Example with full configuration including sub-agent support
const graph = new Graph({
  name: 'advanced-pipeline',
  description: 'Complex workflow with error handling and sub-agent coordination',
  maxConcurrency: 5,
  timeout: 600000,  // 10 minutes
  retryAttempts: 3,
  subAgentAware: true,
  optimizeSubAgentUsage: true,
  subAgentCoordination: 'adaptive',
  metadata: { project: 'automation', version: '1.0' }
}, agent);  // Agent passed as second parameter

Node Types and Options

Task Nodes

interface AddTaskNodeOptions {
  name?: string;               // Node name for easy referencing
  prompt: string;              // Task prompt (required)
  model?: string;              // Override model for this task
  agentId?: string;            // Override default agent (UUID)
  stream?: boolean;            // Enable streaming for this task
  schedule?: string;           // Simple schedule string (e.g., 'daily@09:00', 'after:5s')
  dependencies?: string[];     // Node IDs this task depends on
  dependsOn?: string[];        // Node names this task depends on (easier than IDs)
  priority?: number;           // Execution priority (higher = earlier)
  metadata?: MetadataObject;   // Custom metadata
  useSubAgents?: boolean;                        // Force enable/disable sub-agent usage for this task
  subAgentDelegation?: 'auto' | 'manual' | 'sequential'; // Sub-agent delegation strategy
  subAgentCoordination?: 'parallel' | 'sequential';      // Sub-agent coordination pattern
}

Agent Nodes

interface AddAgentNodeOptions {
  agentId: string;             // Agent ID (required, UUID)
  dependencies?: string[];     // Node IDs this agent depends on
  priority?: number;           // Execution priority
  metadata?: MetadataObject;   // Custom metadata
}

Sub-Agent Configuration Options

When configuring graphs with sub-agent support, you have comprehensive control over delegation and coordination:

Graph-Level Sub-Agent Configuration

  • subAgentAware: Enables automatic detection and optimization of sub-agent opportunities across the graph
  • optimizeSubAgentUsage: Enables real-time performance monitoring and automatic strategy adjustment for better efficiency
  • subAgentCoordination: Sets the default coordination pattern:
    • 'parallel': Sub-agents work simultaneously across different nodes
    • 'sequential': Sub-agents work in dependency order, passing context between executions
    • 'adaptive': Dynamically chooses the best coordination pattern based on task complexity and dependencies

Node-Level Sub-Agent Configuration

Each task node can override graph-level settings with specific sub-agent behavior:

  • useSubAgents: Force enable or disable sub-agent delegation for specific nodes
  • subAgentDelegation: Control how tasks are distributed to sub-agents at the node level
  • subAgentCoordination: Override the graph's default coordination pattern for specific nodes

Enhanced Graph Workflow with Sub-Agents

import { Graph, Agent } from '@astreus-ai/astreus';

// Create specialized sub-agents
const researcher = await Agent.create({
  name: 'DataResearcher',
  systemPrompt: 'You specialize in gathering and analyzing data from multiple sources.'
});

const analyst = await Agent.create({
  name: 'TechnicalAnalyst', 
  systemPrompt: 'You provide technical insights and recommendations.'
});

const writer = await Agent.create({
  name: 'TechnicalWriter',
  systemPrompt: 'You create clear, comprehensive technical documentation.'
});

// Main coordinator with sub-agents
const coordinator = await Agent.create({
  name: 'ProjectCoordinator',
  systemPrompt: 'You orchestrate complex projects using specialized team members.',
  subAgents: [researcher, analyst, writer]
});

// Create sub-agent optimized graph
// Note: defaultAgentId is automatically set from the coordinator agent passed as second parameter
const projectGraph = new Graph({
  name: 'Technical Documentation Pipeline',
  description: 'Automated technical documentation creation with specialized agents',
  maxConcurrency: 3,
  subAgentAware: true,
  optimizeSubAgentUsage: true,
  subAgentCoordination: 'adaptive'
}, coordinator);  // The coordinator's ID becomes the graph's defaultAgentId

// Research phase with automatic sub-agent delegation
const researchNode = projectGraph.addTaskNode({
  name: 'Market Research',
  prompt: 'Research current trends in cloud computing and serverless architecture',
  useSubAgents: true,
  subAgentDelegation: 'auto',
  priority: 10,
  metadata: { phase: 'research', category: 'data-gathering' }
});

// Analysis phase with sequential sub-agent coordination
const analysisNode = projectGraph.addTaskNode({
  name: 'Technical Analysis',
  prompt: 'Analyze research findings and identify key technical patterns',
  dependencies: [researchNode],
  useSubAgents: true,
  subAgentDelegation: 'auto',
  subAgentCoordination: 'sequential',
  priority: 8,
  metadata: { phase: 'analysis', category: 'insights' }
});

// Documentation phase with parallel sub-agent work
const docNode = projectGraph.addTaskNode({
  name: 'Documentation Creation',
  prompt: 'Create comprehensive technical documentation and executive summary',
  dependencies: [analysisNode],
  useSubAgents: true,
  subAgentDelegation: 'manual',
  subAgentCoordination: 'parallel',
  priority: 6,
  metadata: { phase: 'documentation', category: 'deliverables' }
});

// Execute the graph
const result = await projectGraph.run();

console.log('Pipeline completed:', result.success);
console.log('Node results:', result.results);

Response Types

Graph execution returns comprehensive results including node outcomes, usage statistics, and performance metrics.

Graph Execution Result

The graph.run() method returns a detailed GraphExecutionResult:

const result = await graph.run({ timeout: 60000 });

// Response structure:
{
  graph: {
    id: "graph-uuid-123",
    defaultAgentId: "agent-uuid",  // Set from the agent passed to constructor
    config: {
      name: "code-optimization-pipeline",
      description: "Analyze and optimize codebase",
      maxConcurrency: 3,
      timeout: 300000,
      retryAttempts: 2
    },
    nodes: [ /* GraphNode[] */ ],
    edges: [ /* GraphEdge[] */ ],
    status: "completed",  // 'idle' | 'running' | 'completed' | 'failed' | 'paused'
    startedAt: Date('2024-01-15T10:00:00Z'),
    completedAt: Date('2024-01-15T10:12:30Z'),
    executionLog: [ /* GraphExecutionLogEntry[] */ ],
    usage: { /* GraphUsage */ },
    createdAt: Date('2024-01-15T09:55:00Z'),
    updatedAt: Date('2024-01-15T10:12:30Z')
  },
  success: true,                // Overall success status
  completedNodes: 5,            // Number of successfully completed nodes
  failedNodes: 0,               // Number of failed nodes
  duration: 12500,              // Total execution time in milliseconds
  results: {
    "node_abc12345-...": "Analysis complete: Found 15 performance issues categorized by severity...",
    "node_def67890-...": "Optimization implemented: 40% performance improvement...",
    "node_ghi11111-...": "Tests passed: All optimizations validated...",
    "node_jkl22222-...": "Documentation updated with all changes...",
    "node_mno33333-...": "Final review completed..."
  },
  errors: {},  // Empty if all nodes succeeded
  usage: {
    totalPromptTokens: 1500,
    totalCompletionTokens: 3000,
    totalTokens: 4500,
    totalContextTokens: 500,
    totalCost: 0.045,
    nodeUsages: {
      "node_abc12345-...": {
        promptTokens: 200,
        completionTokens: 400,
        totalTokens: 600,
        contextTokens: 100,
        model: "gpt-4",
        cost: 0.012
      },
      "node_def67890-...": {
        promptTokens: 300,
        completionTokens: 600,
        totalTokens: 900,
        contextTokens: 150,
        model: "gpt-4",
        cost: 0.018
      }
      // ... more node usages
    },
    modelsUsed: ["gpt-4", "gpt-3.5-turbo"]
  }
}

Graph Execution with Errors

When nodes fail, errors are included in the response:

const result = await graph.run();

// Response with failures:
{
  graph: { /* ... */ },
  success: false,
  completedNodes: 3,
  failedNodes: 2,
  duration: 8500,
  results: {
    "node_abc12345-...": "Successfully completed...",
    "node_def67890-...": "Partial completion...",
    "node_ghi11111-...": "Task completed..."
  },
  errors: {
    "node_jkl22222-...": "Error: Timeout exceeded after 5000ms",
    "node_mno33333-...": "Error: Dependency node_jkl22222-... failed, skipping execution"
  },
  usage: { /* ... */ }
}

Add Node Response

Adding nodes returns the node ID (format: node_<uuid>):

const nodeId = graph.addTaskNode({
  name: "Analyze Data",
  prompt: "Analyze the following data...",
  model: "gpt-4",
  priority: 10
});

// Response: "node_a1b2c3d4-e5f6-7890-abcd-ef1234567890" (node ID string)

Node Usage Details

Each node's usage is tracked individually:

// Access individual node usage from result
const nodeUsage = result.usage.nodeUsages["node_abc12345-..."];

// Structure:
{
  promptTokens: 200,
  completionTokens: 400,
  totalTokens: 600,
  contextTokens: 100,      // Optional: tokens from context/memory
  model: "gpt-4",
  cost: 0.012              // Optional: calculated cost
}

Graph Usage Summary

Total usage across all nodes:

const totalUsage = result.usage;

// Structure:
{
  totalPromptTokens: 1500,        // Sum of all prompt tokens
  totalCompletionTokens: 3000,    // Sum of all completion tokens
  totalTokens: 4500,              // Total tokens used
  totalContextTokens: 500,        // Total context tokens loaded
  totalCost: 0.045,               // Total estimated cost
  nodeUsages: { /* ... */ },      // Per-node breakdown
  modelsUsed: ["gpt-4", "gpt-3.5-turbo"]  // All models used in execution
}

Last updated: May 26, 2026