Skip to main content

Streaming

The Streaming utility provides support for real-time response streaming from LLM providers, enabling responsive user interfaces and real-time data processing.

Overview

Streaming allows you to receive LLM responses in real-time chunks rather than waiting for the complete response, improving perceived performance and enabling interactive experiences.

Basic Usage

LLM Streaming

import { LLM } from "agent-forge";

const llm = await LLM.create("openai", { apiKey: process.env.OPENAI_API_KEY });

const response = await llm.chatStream({
messages: [{ role: "user", content: "Write a story" }],
model: "gpt-4",
onChunk: (chunk) => {
if (chunk.choices?.[0]?.delta?.content) {
process.stdout.write(chunk.choices[0].delta.content);
}
}
});

Agent Streaming

const agent = new Agent(config);

const result = await agent.run("Tell me about AI", {
stream: true
});

// Listen for streaming events
const eventEmitter = agent.getLLMProvider()?.getEventEmitter();
if (eventEmitter) {
eventEmitter.on('llm-stream-chunk', (data) => {
process.stdout.write(data.chunk);
});
}

Stream Event Types

LLM Stream Chunk

interface LLMStreamChunk {
model: string;
agentName: string;
chunk: string;
timestamp: number;
}

LLM Stream Complete

interface LLMStreamComplete {
content: string;
isComplete: boolean;
agentName: string;
duration: number;
}

Advanced Streaming

Custom Stream Handler

class StreamHandler {
private buffer: string = '';
private onUpdate: (content: string) => void;

constructor(onUpdate: (content: string) => void) {
this.onUpdate = onUpdate;
}

handleChunk(chunk: string): void {
this.buffer += chunk;
this.onUpdate(this.buffer);
}

complete(): string {
const finalContent = this.buffer;
this.buffer = '';
return finalContent;
}
}

const handler = new StreamHandler((content) => {
console.log("Current content:", content);
});

const response = await llm.chatStream({
messages: [{ role: "user", content: "Explain quantum computing" }],
model: "gpt-4",
onChunk: (chunk) => {
if (chunk.choices?.[0]?.delta?.content) {
handler.handleChunk(chunk.choices[0].delta.content);
}
}
});

Streaming with UI Updates

// Example for web applications
class UIStreamingHandler {
private element: HTMLElement;
private content: string = '';

constructor(elementId: string) {
this.element = document.getElementById(elementId)!;
}

onChunk(chunk: string): void {
this.content += chunk;
this.element.textContent = this.content;

// Auto-scroll to bottom
this.element.scrollTop = this.element.scrollHeight;
}

onComplete(): void {
this.element.classList.add('streaming-complete');
}
}

const uiHandler = new UIStreamingHandler('response-container');

await llm.chatStream({
messages: [{ role: "user", content: "Write a poem" }],
model: "gpt-4",
onChunk: (chunk) => {
if (chunk.choices?.[0]?.delta?.content) {
uiHandler.onChunk(chunk.choices[0].delta.content);
}
}
});

uiHandler.onComplete();

Error Handling

Stream Error Recovery

async function streamWithRetry(
llm: LLM,
messages: any[],
maxRetries: number = 3
): Promise<string> {
let attempt = 0;
let lastError: Error;

while (attempt < maxRetries) {
try {
const response = await llm.chatStream({
messages,
model: "gpt-4",
onChunk: (chunk) => {
if (chunk.choices?.[0]?.delta?.content) {
process.stdout.write(chunk.choices[0].delta.content);
}
}
});

return response.content;
} catch (error) {
lastError = error as Error;
attempt++;

if (attempt < maxRetries) {
console.log(`Stream failed, retrying... (${attempt}/${maxRetries})`);
await new Promise(resolve => setTimeout(resolve, 1000 * attempt));
}
}
}

throw lastError!;
}

Performance Optimization

Chunked Processing

class ChunkedProcessor {
private chunks: string[] = [];
private processInterval: NodeJS.Timeout | null = null;

constructor(private onProcess: (chunks: string[]) => void) {
this.startProcessing();
}

addChunk(chunk: string): void {
this.chunks.push(chunk);
}

private startProcessing(): void {
this.processInterval = setInterval(() => {
if (this.chunks.length > 0) {
const toProcess = this.chunks.splice(0, 10); // Process 10 chunks at a time
this.onProcess(toProcess);
}
}, 100); // Process every 100ms
}

stop(): void {
if (this.processInterval) {
clearInterval(this.processInterval);
this.processInterval = null;
}

// Process remaining chunks
if (this.chunks.length > 0) {
this.onProcess(this.chunks);
this.chunks = [];
}
}
}

Integration Examples

Express.js Streaming API

import express from 'express';

const app = express();

app.get('/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');

const query = req.query.q as string;

try {
await llm.chatStream({
messages: [{ role: "user", content: query }],
model: "gpt-4",
onChunk: (chunk) => {
if (chunk.choices?.[0]?.delta?.content) {
res.write(`data: ${JSON.stringify({ chunk: chunk.choices[0].delta.content })}\n\n`);
}
}
});

res.write('data: {"done": true}\n\n');
res.end();
} catch (error) {
res.write(`data: {"error": "${error.message}"}\n\n`);
res.end();
}
});

WebSocket Streaming

import WebSocket from 'ws';

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
ws.on('message', async (message) => {
const { query } = JSON.parse(message.toString());

try {
await llm.chatStream({
messages: [{ role: "user", content: query }],
model: "gpt-4",
onChunk: (chunk) => {
if (chunk.choices?.[0]?.delta?.content) {
ws.send(JSON.stringify({
type: 'chunk',
content: chunk.choices[0].delta.content
}));
}
}
});

ws.send(JSON.stringify({ type: 'complete' }));
} catch (error) {
ws.send(JSON.stringify({ type: 'error', message: error.message }));
}
});
});

Best Practices

  • Buffer Management: Handle chunks efficiently to avoid memory issues
  • Error Recovery: Implement retry logic for failed streams
  • Rate Limiting: Respect API rate limits even with streaming
  • User Feedback: Provide visual indicators for streaming state
  • Graceful Degradation: Have fallback for non-streaming responses

See Also