Real-time Streaming
Agent Forge provides comprehensive streaming capabilities that allow you to monitor agent communications, LLM responses, and task execution in real-time. This is particularly useful for long-running operations, debugging, and creating interactive user experiences.
Overview
Streaming in Agent Forge operates on an event-driven architecture with multiple layers:
- LLM Response Streaming: Real-time token-by-token responses from language models
- Agent Communication Streaming: Live updates of agent-to-agent communications
- Task Progress Streaming: Real-time status updates for team task execution
- Console Visualization: Built-in console output for development and debugging
Agent Streaming
Basic Agent Streaming
Enable streaming for individual agents by setting the stream
option:
import { Agent, agent, llmProvider } from "agent-forge";
@agent({
name: "Assistant",
role: "Helpful Assistant",
description: "Provides real-time assistance",
objective: "Help users with immediate feedback",
model: "gpt-4"
})
class StreamingAgent extends Agent {}
@llmProvider("openai", { apiKey: process.env.OPENAI_API_KEY })
@forge()
class StreamingExample {
static forge: AgentForge;
static async run() {
const agent = new StreamingAgent();
// Enable streaming for this agent call
const result = await agent.run("Tell me about renewable energy", {
stream: true // Enable real-time streaming
});
return result;
}
}
Stream Event Handling
Listen to streaming events programmatically:
import { globalEventEmitter, AgentForgeEvents } from "agent-forge";
// Listen to LLM response chunks
globalEventEmitter.on(AgentForgeEvents.LLM_STREAM_CHUNK, (event) => {
console.log(`[${event.agentName}]: ${event.chunk}`);
});
// Listen to agent communications
globalEventEmitter.on(AgentForgeEvents.AGENT_COMMUNICATION, (event) => {
console.log(`${event.sender} → ${event.recipient}: ${event.message}`);
});
// Listen to stream completion
globalEventEmitter.on(AgentForgeEvents.LLM_STREAM_COMPLETE, (event) => {
console.log(`Stream completed for ${event.agentName}`);
});
Team Streaming
Real-time Team Coordination
Teams support comprehensive streaming of task delegation and execution:
import { Team, enableConsoleStreaming } from "agent-forge";
@llmProvider("openai", { apiKey: process.env.OPENAI_API_KEY })
@forge()
class TeamStreamingExample {
static forge: AgentForge;
static async run() {
// Enable console visualization
enableConsoleStreaming();
const team = this.forge.createTeam("Project Manager");
team.addAgent(new ResearchAgent());
team.addAgent(new AnalystAgent());
team.addAgent(new WriterAgent());
// Stream the entire team execution
const result = await team.run("Create a market analysis report", {
stream: true, // Enable streaming
verbose: true, // Detailed logging
enableConsoleStream: true // Visual console output
});
return result;
}
}
Team Stream Events
Teams emit specific events for coordination monitoring:
import { AgentForgeEvents } from "agent-forge";
// Task assignment events
globalEventEmitter.on(AgentForgeEvents.TEAM_TASK_COMPLETE, (event) => {
console.log(`Task "${event.description}" completed by ${event.agentName}`);
console.log(`Result: ${event.result.output}`);
});
// Execution completion
globalEventEmitter.on(AgentForgeEvents.EXECUTION_COMPLETE, (event) => {
if (event.type === "team") {
console.log(`Team "${event.name}" execution completed`);
}
});
Workflow Streaming
Sequential Agent Execution
Workflows support streaming between sequential agent steps:
import { Workflow } from "agent-forge";
@llmProvider("openai", { apiKey: process.env.OPENAI_API_KEY })
@forge()
class WorkflowStreamingExample {
static forge: AgentForge;
static async run() {
const workflow = this.forge.createWorkflow("Analysis Pipeline");
workflow
.addStep(new ResearchAgent())
.addStep(new AnalystAgent(), (input, previousResults) => {
return `Analyze this research: ${previousResults[0].output}`;
})
.addStep(new SummarizerAgent());
// Stream the workflow execution
const result = await workflow.run("Research renewable energy trends", {
stream: true,
verbose: true,
enableConsoleStream: true
});
return result;
}
}
Workflow Stream Events
Monitor workflow progress with dedicated events:
// Step completion tracking
globalEventEmitter.on(AgentForgeEvents.WORKFLOW_STEP_COMPLETE, (event) => {
console.log(`Step ${event.stepIndex + 1}/${event.totalSteps} completed`);
console.log(`Agent: ${event.agentName}`);
console.log(`Duration: ${event.duration}ms`);
});
Console Streaming Visualization
Built-in Console Output
Agent Forge includes a sophisticated console streaming system:
import { enableConsoleStreaming } from "agent-forge";
// Enable visual console streaming
enableConsoleStreaming();
// Now all streaming operations will show in console with formatting
const result = await team.run("Complex task", {
stream: true,
enableConsoleStream: true
});
Console Output Features
The console streaming provides:
- Agent Identification: Clear sender/recipient labeling
- Real-time Chunks: Token-by-token LLM responses
- Communication Flow: Agent-to-agent message tracking
- Progress Updates: Task completion and timing
- Error Handling: Graceful error display
Example Console Output:
<agents>Research Agent:</agents>
I'll help you research renewable energy trends...
<agents>Manager → Research Agent:</agents>
Please focus on solar and wind energy developments
<agents>Research Agent:</agents>
[Using tool: WebSearchTool] Searching for latest solar energy trends...
Step 1/3 completed (2,340ms)
<agents>Analyst Agent:</agents>
Based on the research data, I can see several key trends...
Custom Stream Handlers
Creating Custom Event Handlers
Build custom streaming interfaces for your applications:
class CustomStreamHandler {
private messageBuffer: string[] = [];
constructor() {
this.setupEventListeners();
}
private setupEventListeners() {
globalEventEmitter.on(AgentForgeEvents.LLM_STREAM_CHUNK, (event) => {
this.handleLLMChunk(event);
});
globalEventEmitter.on(AgentForgeEvents.AGENT_COMMUNICATION, (event) => {
this.handleAgentCommunication(event);
});
}
private handleLLMChunk(event: any) {
// Custom chunk processing
if (event.isDelta) {
this.messageBuffer.push(event.chunk);
}
// Send to custom UI, WebSocket, etc.
this.sendToCustomInterface({
type: 'chunk',
agent: event.agentName,
content: event.chunk,
isComplete: event.isComplete
});
}
private handleAgentCommunication(event: any) {
// Process agent messages
this.sendToCustomInterface({
type: 'communication',
sender: event.sender,
recipient: event.recipient,
message: event.message,
timestamp: event.timestamp
});
}
private sendToCustomInterface(data: any) {
// Implement your custom interface logic
// WebSocket, HTTP SSE, file writing, etc.
}
}
// Use the custom handler
const streamHandler = new CustomStreamHandler();
WebSocket Integration Example
Stream agent communications to web clients:
import WebSocket from 'ws';
class WebSocketStreamHandler {
private wss: WebSocket.Server;
constructor(port: number = 8080) {
this.wss = new WebSocket.Server({ port });
this.setupAgentStreaming();
}
private setupAgentStreaming() {
globalEventEmitter.on(AgentForgeEvents.LLM_STREAM_CHUNK, (event) => {
this.broadcast({
type: 'llm_chunk',
agent: event.agentName,
chunk: event.chunk,
isDelta: event.isDelta,
isComplete: event.isComplete
});
});
globalEventEmitter.on(AgentForgeEvents.AGENT_COMMUNICATION, (event) => {
this.broadcast({
type: 'agent_communication',
sender: event.sender,
recipient: event.recipient,
message: event.message,
timestamp: event.timestamp
});
});
}
private broadcast(data: any) {
const message = JSON.stringify(data);
this.wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
}
}
// Start WebSocket streaming
const wsHandler = new WebSocketStreamHandler(8080);
Stream Configuration Options
Agent-level Streaming
const result = await agent.run("Question", {
stream: true, // Enable streaming
maxTurns: 5, // Limit conversation turns
maxExecutionTime: 60000 // Timeout in milliseconds
});
Team-level Streaming
const result = await team.run("Task", {
stream: true, // Enable streaming
verbose: true, // Detailed logs
enableConsoleStream: true, // Console visualization
maxTurns: 10, // Max iterations
maxExecutionTime: 300000 // 5 minute timeout
});
Workflow-level Streaming
const result = await workflow.run("Input", {
stream: true, // Enable streaming
verbose: true, // Step-by-step logs
enableConsoleStream: true, // Console visualization
rate_limit: 30 // Rate limiting
});
Performance Considerations
Stream Buffer Management
For high-frequency streaming, manage memory usage:
class BufferedStreamHandler {
private chunkBuffer: string[] = [];
private readonly maxBufferSize = 1000;
handleChunk(event: any) {
this.chunkBuffer.push(event.chunk);
// Prevent memory leaks
if (this.chunkBuffer.length > this.maxBufferSize) {
this.chunkBuffer = this.chunkBuffer.slice(-this.maxBufferSize / 2);
}
}
}
Rate Limiting with Streaming
Combine rate limiting with streaming for production systems:
@RateLimiter({
rateLimitPerSecond: 2,
rateLimitPerMinute: 60,
verbose: true
})
@Visualizer() // Timeline generation
@llmProvider("openai", { apiKey: process.env.OPENAI_API_KEY })
@forge()
class ProductionStreamingApp {
static forge: AgentForge;
static async run() {
const team = this.forge.createTeam("Manager");
// Rate limited streaming with visualization
const result = await team.run("Complex analysis", {
stream: true,
verbose: true,
enableConsoleStream: true
});
return result;
}
}
Best Practices
1. Error Handling in Streams
globalEventEmitter.on(AgentForgeEvents.LLM_STREAM_CHUNK, (event) => {
try {
// Process stream chunk
processChunk(event.chunk);
} catch (error) {
console.error('Stream processing error:', error);
// Graceful degradation
}
});
2. Memory Management
// Clean up event listeners when done
class StreamManager {
private listeners: Map<string, Function> = new Map();
addListener(event: string, handler: Function) {
globalEventEmitter.on(event, handler);
this.listeners.set(event, handler);
}
cleanup() {
this.listeners.forEach((handler, event) => {
globalEventEmitter.off(event, handler);
});
this.listeners.clear();
}
}
3. Performance Monitoring
// Track streaming performance
class StreamMetrics {
private chunkCount = 0;
private startTime = Date.now();
trackChunk() {
this.chunkCount++;
if (this.chunkCount % 100 === 0) {
const duration = Date.now() - this.startTime;
console.log(`Processed ${this.chunkCount} chunks in ${duration}ms`);
}
}
}
Troubleshooting
Common Issues
Streaming Not Working:
- Ensure
stream: true
is set in run options - Check that event listeners are properly registered
- Verify LLM provider supports streaming
Memory Issues:
- Implement buffer size limits
- Clean up event listeners after use
- Monitor chunk accumulation patterns
Performance Problems:
- Use rate limiting for high-frequency streams
- Consider batching chunk processing
- Implement selective event filtering
Streaming capabilities make Agent Forge ideal for interactive applications, real-time monitoring, and complex multi-agent orchestration scenarios.