Streaming Agent Communications
Agent Forge provides real-time streaming capabilities, allowing you to monitor agent thinking processes, responses, and execution events as they happen. This is particularly useful for building interactive applications or for gaining deeper insights during debugging.
Benefits of Streaming
- Real-time Feedback: Observe what an agent is doing or thinking without waiting for the entire process to complete.
- Interactive UIs: Build user interfaces that display live progress and agent messages.
- Enhanced Debugging: Get a granular view of agent interactions and internal states.
- Live Logging: Create detailed logs of agent behavior for analysis and auditing.
How to Enable Streaming
Streaming can be enabled in a few ways:
-
Event-Driven Streaming (Programmatic): You can subscribe to specific events emitted by Agent Forge's
globalEventEmitter
.import { Workflow, AgentForge, LLM, Agent, AgentForgeEvents, globalEventEmitter } from "agent-forge";
import dotenv from 'dotenv';
dotenv.config();
async function setupAndRunStreamingWorkflow() {
const apiKey = process.env.OPENAI_API_KEY;
if (!apiKey) {
console.error("API Key not found. Please set OPENAI_API_KEY in your .env file.");
return;
}
const llmProvider = new LLM("openai", { apiKey });
const forge = new AgentForge(llmProvider);
// Assuming research-agent.yaml and summary-agent.yaml exist and are valid
// Or, define Agent instances programmatically:
// const researchAgent = new Agent({ name: "Researcher", llm: llmProvider, model: "gpt-4", objective: "Research topic"});
// const summaryAgent = new Agent({ name: "Summarizer", llm: llmProvider, model: "gpt-4", objective: "Summarize research"});
const researchAgent = await forge.loadAgent("./research-agent.yaml");
const summaryAgent = await forge.loadAgent("./summary-agent.yaml");
// Set up event listeners for streaming
globalEventEmitter.on(AgentForgeEvents.AGENT_THINKING, (data: any) => {
console.log(`STREAM <THINKING>: Agent '${data.name}' thought: ${data.thought}`);
});
globalEventEmitter.on(AgentForgeEvents.AGENT_RESPONSE, (data: any) => {
console.log(`STREAM <RESPONSE>: Agent '${data.name}' responded: ${data.response}`);
});
globalEventEmitter.on(AgentForgeEvents.EXECUTION_COMPLETE, (data: any) => {
console.log(`STREAM <COMPLETE>: Execution complete for ${data.type} "${data.name}"`);
});
// Add listeners for other events like AGENT_ACTION, TOOL_START, TOOL_END etc. if available
// Create a workflow or team
const workflow = new Workflow("StreamingWorkflow") // Optional: Add a name to the workflow
.addStep(researchAgent)
.addStep(summaryAgent);
// Run with the `stream: true` option
try {
const result = await workflow.run(
"Explain quantum computing advancements in 2023",
{ stream: true } // Enable event emission for streaming
);
} catch (error) {
console.error("Error running streaming workflow:", error);
}
}
// To run this example:
// setupAndRunStreamingWorkflow();AgentForgeEvents
: This enum (or object) would define the types of events you can subscribe to (e.g.,AGENT_THINKING
,AGENT_RESPONSE
,TOOL_START
,TOOL_END
,EXECUTION_START
,EXECUTION_COMPLETE
).globalEventEmitter
: A central event emitter instance from Agent Forge.stream: true
option: This flag in therun()
method's options object is crucial to tell the Workflow or Team to emit these events.
-
Console Streaming: For immediate visibility in the console without setting up custom event listeners, Agent Forge provides a simpler option:
// This example assumes a 'workflow' instance is already defined and initialized,
// similar to the 'setupAndRunStreamingWorkflow' example above.
// You would need LLM, AgentForge, Agent, and Workflow imports and setup.
async function runWorkflowWithConsoleStream(workflowInstance: Workflow) {
if (!workflowInstance) {
console.error("Workflow instance not provided.");
return;
}
try {
const result = await workflowInstance.run(
"Explain quantum computing advancements in 2023",
{
stream: true, // Still required to enable underlying event emissions
enableConsoleStream: true, // Directly pipe formatted stream output to the console
}
);
} catch (error) {
console.error("Error running workflow with console stream:", error);
}
}
// To run this example, you would first get a workflow instance:
// const workflowToRun = workflow; // From a previous setup like in setupAndRunStreamingWorkflow
// if (workflowToRun) {
// runWorkflowWithConsoleStream(workflowToRun);
// }enableConsoleStream: true
: This option, when combined withstream: true
, likely formats and prints the streaming events directly to the console.
Combining Streaming with Other Options
Streaming can be used alongside other features like rate_limit
and verbose
logging:
// This example also assumes a 'workflow' instance is defined and initialized.
// Ensure necessary imports and setup are in place.
async function runWorkflowWithCombinedOptions(workflowInstance: Workflow) {
if (!workflowInstance) {
console.error("Workflow instance not provided.");
return;
}
try {
const result = await workflowInstance.run(
"Explain the impact of blockchain on financial systems",
{
stream: true, // Enable event-driven streaming
enableConsoleStream: true, // Also stream to console
rate_limit: 15, // Limit to 15 LLM calls per minute
verbose: true, // Enable detailed non-streaming verbose logs
}
);
} catch (error) {
console.error("Error running workflow with combined options:", error);
}
}
// To run this example, you would first get a workflow instance:
// const workflowToRunWithOptions = workflow; // From a previous setup
// if (workflowToRunWithOptions) {
// runWorkflowWithCombinedOptions(workflowToRunWithOptions);
// }
While verbose: true
provides a detailed log after execution segments or at the end, streaming (stream: true
and/or enableConsoleStream: true
) provides real-time updates as they happen.
Utilizing streaming is highly recommended for developing responsive applications and for gaining a deeper understanding of your agents' operational flow.
Next Steps
- Explore the Model Context Protocol (MCP) for extending agent capabilities with external tools, which can also benefit from streaming for observability.
- Refer to the specific
AgentForgeEvents
documentation (if available) or source code to understand all available event types and their data payloads for programmatic streaming.