Skip to main content

Real-time Streaming

Agent Forge provides comprehensive streaming capabilities that allow you to monitor agent communications, LLM responses, and task execution in real-time. This is particularly useful for long-running operations, debugging, and creating interactive user experiences.

Overview

Streaming in Agent Forge operates on an event-driven architecture with multiple layers:

  • LLM Response Streaming: Real-time token-by-token responses from language models
  • Agent Communication Streaming: Live updates of agent-to-agent communications
  • Task Progress Streaming: Real-time status updates for team task execution
  • Console Visualization: Built-in console output for development and debugging

Agent Streaming

Basic Agent Streaming

Enable streaming for individual agents by setting the stream option:

import { Agent, agent, llmProvider } from "agent-forge";

@agent({
name: "Assistant",
role: "Helpful Assistant",
description: "Provides real-time assistance",
objective: "Help users with immediate feedback",
model: "gpt-4"
})
class StreamingAgent extends Agent {}

@llmProvider("openai", { apiKey: process.env.OPENAI_API_KEY })
@forge()
class StreamingExample {
static forge: AgentForge;

static async run() {
const agent = new StreamingAgent();

// Enable streaming for this agent call
const result = await agent.run("Tell me about renewable energy", {
stream: true // Enable real-time streaming
});

return result;
}
}

Stream Event Handling

Listen to streaming events programmatically:

import { globalEventEmitter, AgentForgeEvents } from "agent-forge";

// Listen to LLM response chunks
globalEventEmitter.on(AgentForgeEvents.LLM_STREAM_CHUNK, (event) => {
console.log(`[${event.agentName}]: ${event.chunk}`);
});

// Listen to agent communications
globalEventEmitter.on(AgentForgeEvents.AGENT_COMMUNICATION, (event) => {
console.log(`${event.sender}${event.recipient}: ${event.message}`);
});

// Listen to stream completion
globalEventEmitter.on(AgentForgeEvents.LLM_STREAM_COMPLETE, (event) => {
console.log(`Stream completed for ${event.agentName}`);
});

Team Streaming

Real-time Team Coordination

Teams support comprehensive streaming of task delegation and execution:

import { Team, enableConsoleStreaming } from "agent-forge";

@llmProvider("openai", { apiKey: process.env.OPENAI_API_KEY })
@forge()
class TeamStreamingExample {
static forge: AgentForge;

static async run() {
// Enable console visualization
enableConsoleStreaming();

const team = this.forge.createTeam("Project Manager");
team.addAgent(new ResearchAgent());
team.addAgent(new AnalystAgent());
team.addAgent(new WriterAgent());

// Stream the entire team execution
const result = await team.run("Create a market analysis report", {
stream: true, // Enable streaming
verbose: true, // Detailed logging
enableConsoleStream: true // Visual console output
});

return result;
}
}

Team Stream Events

Teams emit specific events for coordination monitoring:

import { AgentForgeEvents } from "agent-forge";

// Task assignment events
globalEventEmitter.on(AgentForgeEvents.TEAM_TASK_COMPLETE, (event) => {
console.log(`Task "${event.description}" completed by ${event.agentName}`);
console.log(`Result: ${event.result.output}`);
});

// Execution completion
globalEventEmitter.on(AgentForgeEvents.EXECUTION_COMPLETE, (event) => {
if (event.type === "team") {
console.log(`Team "${event.name}" execution completed`);
}
});

Workflow Streaming

Sequential Agent Execution

Workflows support streaming between sequential agent steps:

import { Workflow } from "agent-forge";

@llmProvider("openai", { apiKey: process.env.OPENAI_API_KEY })
@forge()
class WorkflowStreamingExample {
static forge: AgentForge;

static async run() {
const workflow = this.forge.createWorkflow("Analysis Pipeline");

workflow
.addStep(new ResearchAgent())
.addStep(new AnalystAgent(), (input, previousResults) => {
return `Analyze this research: ${previousResults[0].output}`;
})
.addStep(new SummarizerAgent());

// Stream the workflow execution
const result = await workflow.run("Research renewable energy trends", {
stream: true,
verbose: true,
enableConsoleStream: true
});

return result;
}
}

Workflow Stream Events

Monitor workflow progress with dedicated events:

// Step completion tracking
globalEventEmitter.on(AgentForgeEvents.WORKFLOW_STEP_COMPLETE, (event) => {
console.log(`Step ${event.stepIndex + 1}/${event.totalSteps} completed`);
console.log(`Agent: ${event.agentName}`);
console.log(`Duration: ${event.duration}ms`);
});

Console Streaming Visualization

Built-in Console Output

Agent Forge includes a sophisticated console streaming system:

import { enableConsoleStreaming } from "agent-forge";

// Enable visual console streaming
enableConsoleStreaming();

// Now all streaming operations will show in console with formatting
const result = await team.run("Complex task", {
stream: true,
enableConsoleStream: true
});

Console Output Features

The console streaming provides:

  • Agent Identification: Clear sender/recipient labeling
  • Real-time Chunks: Token-by-token LLM responses
  • Communication Flow: Agent-to-agent message tracking
  • Progress Updates: Task completion and timing
  • Error Handling: Graceful error display

Example Console Output:

<agents>Research Agent:</agents>
I'll help you research renewable energy trends...

<agents>Manager → Research Agent:</agents>
Please focus on solar and wind energy developments

<agents>Research Agent:</agents>
[Using tool: WebSearchTool] Searching for latest solar energy trends...

Step 1/3 completed (2,340ms)

<agents>Analyst Agent:</agents>
Based on the research data, I can see several key trends...

Custom Stream Handlers

Creating Custom Event Handlers

Build custom streaming interfaces for your applications:

class CustomStreamHandler {
private messageBuffer: string[] = [];

constructor() {
this.setupEventListeners();
}

private setupEventListeners() {
globalEventEmitter.on(AgentForgeEvents.LLM_STREAM_CHUNK, (event) => {
this.handleLLMChunk(event);
});

globalEventEmitter.on(AgentForgeEvents.AGENT_COMMUNICATION, (event) => {
this.handleAgentCommunication(event);
});
}

private handleLLMChunk(event: any) {
// Custom chunk processing
if (event.isDelta) {
this.messageBuffer.push(event.chunk);
}

// Send to custom UI, WebSocket, etc.
this.sendToCustomInterface({
type: 'chunk',
agent: event.agentName,
content: event.chunk,
isComplete: event.isComplete
});
}

private handleAgentCommunication(event: any) {
// Process agent messages
this.sendToCustomInterface({
type: 'communication',
sender: event.sender,
recipient: event.recipient,
message: event.message,
timestamp: event.timestamp
});
}

private sendToCustomInterface(data: any) {
// Implement your custom interface logic
// WebSocket, HTTP SSE, file writing, etc.
}
}

// Use the custom handler
const streamHandler = new CustomStreamHandler();

WebSocket Integration Example

Stream agent communications to web clients:

import WebSocket from 'ws';

class WebSocketStreamHandler {
private wss: WebSocket.Server;

constructor(port: number = 8080) {
this.wss = new WebSocket.Server({ port });
this.setupAgentStreaming();
}

private setupAgentStreaming() {
globalEventEmitter.on(AgentForgeEvents.LLM_STREAM_CHUNK, (event) => {
this.broadcast({
type: 'llm_chunk',
agent: event.agentName,
chunk: event.chunk,
isDelta: event.isDelta,
isComplete: event.isComplete
});
});

globalEventEmitter.on(AgentForgeEvents.AGENT_COMMUNICATION, (event) => {
this.broadcast({
type: 'agent_communication',
sender: event.sender,
recipient: event.recipient,
message: event.message,
timestamp: event.timestamp
});
});
}

private broadcast(data: any) {
const message = JSON.stringify(data);
this.wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
}
}

// Start WebSocket streaming
const wsHandler = new WebSocketStreamHandler(8080);

Stream Configuration Options

Agent-level Streaming

const result = await agent.run("Question", {
stream: true, // Enable streaming
maxTurns: 5, // Limit conversation turns
maxExecutionTime: 60000 // Timeout in milliseconds
});

Team-level Streaming

const result = await team.run("Task", {
stream: true, // Enable streaming
verbose: true, // Detailed logs
enableConsoleStream: true, // Console visualization
maxTurns: 10, // Max iterations
maxExecutionTime: 300000 // 5 minute timeout
});

Workflow-level Streaming

const result = await workflow.run("Input", {
stream: true, // Enable streaming
verbose: true, // Step-by-step logs
enableConsoleStream: true, // Console visualization
rate_limit: 30 // Rate limiting
});

Performance Considerations

Stream Buffer Management

For high-frequency streaming, manage memory usage:

class BufferedStreamHandler {
private chunkBuffer: string[] = [];
private readonly maxBufferSize = 1000;

handleChunk(event: any) {
this.chunkBuffer.push(event.chunk);

// Prevent memory leaks
if (this.chunkBuffer.length > this.maxBufferSize) {
this.chunkBuffer = this.chunkBuffer.slice(-this.maxBufferSize / 2);
}
}
}

Rate Limiting with Streaming

Combine rate limiting with streaming for production systems:

@RateLimiter({
rateLimitPerSecond: 2,
rateLimitPerMinute: 60,
verbose: true
})
@Visualizer() // Timeline generation
@llmProvider("openai", { apiKey: process.env.OPENAI_API_KEY })
@forge()
class ProductionStreamingApp {
static forge: AgentForge;

static async run() {
const team = this.forge.createTeam("Manager");

// Rate limited streaming with visualization
const result = await team.run("Complex analysis", {
stream: true,
verbose: true,
enableConsoleStream: true
});

return result;
}
}

Best Practices

1. Error Handling in Streams

globalEventEmitter.on(AgentForgeEvents.LLM_STREAM_CHUNK, (event) => {
try {
// Process stream chunk
processChunk(event.chunk);
} catch (error) {
console.error('Stream processing error:', error);
// Graceful degradation
}
});

2. Memory Management

// Clean up event listeners when done
class StreamManager {
private listeners: Map<string, Function> = new Map();

addListener(event: string, handler: Function) {
globalEventEmitter.on(event, handler);
this.listeners.set(event, handler);
}

cleanup() {
this.listeners.forEach((handler, event) => {
globalEventEmitter.off(event, handler);
});
this.listeners.clear();
}
}

3. Performance Monitoring

// Track streaming performance
class StreamMetrics {
private chunkCount = 0;
private startTime = Date.now();

trackChunk() {
this.chunkCount++;
if (this.chunkCount % 100 === 0) {
const duration = Date.now() - this.startTime;
console.log(`Processed ${this.chunkCount} chunks in ${duration}ms`);
}
}
}

Troubleshooting

Common Issues

Streaming Not Working:

  • Ensure stream: true is set in run options
  • Check that event listeners are properly registered
  • Verify LLM provider supports streaming

Memory Issues:

  • Implement buffer size limits
  • Clean up event listeners after use
  • Monitor chunk accumulation patterns

Performance Problems:

  • Use rate limiting for high-frequency streams
  • Consider batching chunk processing
  • Implement selective event filtering

Streaming capabilities make Agent Forge ideal for interactive applications, real-time monitoring, and complex multi-agent orchestration scenarios.