Streaming
The Streaming utility provides support for real-time response streaming from LLM providers, enabling responsive user interfaces and real-time data processing.
Overview
Streaming allows you to receive LLM responses in real-time chunks rather than waiting for the complete response, improving perceived performance and enabling interactive experiences.
Basic Usage
LLM Streaming
import { LLM } from "agent-forge";
const llm = await LLM.create("openai", { apiKey: process.env.OPENAI_API_KEY });
const response = await llm.chatStream({
messages: [{ role: "user", content: "Write a story" }],
model: "gpt-4",
onChunk: (chunk) => {
if (chunk.choices?.[0]?.delta?.content) {
process.stdout.write(chunk.choices[0].delta.content);
}
}
});
Agent Streaming
const agent = new Agent(config);
const result = await agent.run("Tell me about AI", {
stream: true
});
// Listen for streaming events
const eventEmitter = agent.getLLMProvider()?.getEventEmitter();
if (eventEmitter) {
eventEmitter.on('llm-stream-chunk', (data) => {
process.stdout.write(data.chunk);
});
}
Stream Event Types
LLM Stream Chunk
interface LLMStreamChunk {
model: string;
agentName: string;
chunk: string;
timestamp: number;
}
LLM Stream Complete
interface LLMStreamComplete {
content: string;
isComplete: boolean;
agentName: string;
duration: number;
}
Advanced Streaming
Custom Stream Handler
class StreamHandler {
private buffer: string = '';
private onUpdate: (content: string) => void;
constructor(onUpdate: (content: string) => void) {
this.onUpdate = onUpdate;
}
handleChunk(chunk: string): void {
this.buffer += chunk;
this.onUpdate(this.buffer);
}
complete(): string {
const finalContent = this.buffer;
this.buffer = '';
return finalContent;
}
}
const handler = new StreamHandler((content) => {
console.log("Current content:", content);
});
const response = await llm.chatStream({
messages: [{ role: "user", content: "Explain quantum computing" }],
model: "gpt-4",
onChunk: (chunk) => {
if (chunk.choices?.[0]?.delta?.content) {
handler.handleChunk(chunk.choices[0].delta.content);
}
}
});
Streaming with UI Updates
// Example for web applications
class UIStreamingHandler {
private element: HTMLElement;
private content: string = '';
constructor(elementId: string) {
this.element = document.getElementById(elementId)!;
}
onChunk(chunk: string): void {
this.content += chunk;
this.element.textContent = this.content;
// Auto-scroll to bottom
this.element.scrollTop = this.element.scrollHeight;
}
onComplete(): void {
this.element.classList.add('streaming-complete');
}
}
const uiHandler = new UIStreamingHandler('response-container');
await llm.chatStream({
messages: [{ role: "user", content: "Write a poem" }],
model: "gpt-4",
onChunk: (chunk) => {
if (chunk.choices?.[0]?.delta?.content) {
uiHandler.onChunk(chunk.choices[0].delta.content);
}
}
});
uiHandler.onComplete();
Error Handling
Stream Error Recovery
async function streamWithRetry(
llm: LLM,
messages: any[],
maxRetries: number = 3
): Promise<string> {
let attempt = 0;
let lastError: Error;
while (attempt < maxRetries) {
try {
const response = await llm.chatStream({
messages,
model: "gpt-4",
onChunk: (chunk) => {
if (chunk.choices?.[0]?.delta?.content) {
process.stdout.write(chunk.choices[0].delta.content);
}
}
});
return response.content;
} catch (error) {
lastError = error as Error;
attempt++;
if (attempt < maxRetries) {
console.log(`Stream failed, retrying... (${attempt}/${maxRetries})`);
await new Promise(resolve => setTimeout(resolve, 1000 * attempt));
}
}
}
throw lastError!;
}
Performance Optimization
Chunked Processing
class ChunkedProcessor {
private chunks: string[] = [];
private processInterval: NodeJS.Timeout | null = null;
constructor(private onProcess: (chunks: string[]) => void) {
this.startProcessing();
}
addChunk(chunk: string): void {
this.chunks.push(chunk);
}
private startProcessing(): void {
this.processInterval = setInterval(() => {
if (this.chunks.length > 0) {
const toProcess = this.chunks.splice(0, 10); // Process 10 chunks at a time
this.onProcess(toProcess);
}
}, 100); // Process every 100ms
}
stop(): void {
if (this.processInterval) {
clearInterval(this.processInterval);
this.processInterval = null;
}
// Process remaining chunks
if (this.chunks.length > 0) {
this.onProcess(this.chunks);
this.chunks = [];
}
}
}
Integration Examples
Express.js Streaming API
import express from 'express';
const app = express();
app.get('/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const query = req.query.q as string;
try {
await llm.chatStream({
messages: [{ role: "user", content: query }],
model: "gpt-4",
onChunk: (chunk) => {
if (chunk.choices?.[0]?.delta?.content) {
res.write(`data: ${JSON.stringify({ chunk: chunk.choices[0].delta.content })}\n\n`);
}
}
});
res.write('data: {"done": true}\n\n');
res.end();
} catch (error) {
res.write(`data: {"error": "${error.message}"}\n\n`);
res.end();
}
});
WebSocket Streaming
import WebSocket from 'ws';
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', (ws) => {
ws.on('message', async (message) => {
const { query } = JSON.parse(message.toString());
try {
await llm.chatStream({
messages: [{ role: "user", content: query }],
model: "gpt-4",
onChunk: (chunk) => {
if (chunk.choices?.[0]?.delta?.content) {
ws.send(JSON.stringify({
type: 'chunk',
content: chunk.choices[0].delta.content
}));
}
}
});
ws.send(JSON.stringify({ type: 'complete' }));
} catch (error) {
ws.send(JSON.stringify({ type: 'error', message: error.message }));
}
});
});
Best Practices
- Buffer Management: Handle chunks efficiently to avoid memory issues
- Error Recovery: Implement retry logic for failed streams
- Rate Limiting: Respect API rate limits even with streaming
- User Feedback: Provide visual indicators for streaming state
- Graceful Degradation: Have fallback for non-streaming responses