Streaming
Real-time streaming events and progressive output
Streaming
Streaming lets you process agent output in real-time as it's generated, rather than waiting for the complete response. This is essential for responsive UIs, long-running agent tasks, and real-time monitoring.
Basic Streaming
import { Runner } from 'assistme-agent-sdk'
const stream = Runner.stream(agent, {
messages: [{ role: 'user', content: 'Explain quantum computing' }],
})
for await (const event of stream) {
if (event.type === 'text_delta') {
process.stdout.write(event.delta)
}
}Event Types
The stream emits a rich set of typed events:
See the full StreamEvent type definition in the API Reference.
Key event types include: run_start, text_delta, text_done, tool_call_start, tool_call_done, tool_result, tool_approval_required, guardrail_triggered, handoff, agent_start, agent_end, memory_recall, memory_extract, turn_start, turn_end, run_end, and error.
Full Event Handling
const stream = Runner.stream(agent, { messages })
for await (const event of stream) {
switch (event.type) {
case 'run_start':
console.log(`Run ${event.runId} started with agent "${event.agent}"`)
break
case 'text_delta':
process.stdout.write(event.delta)
break
case 'tool_call_start':
console.log(`\nπ§ Calling ${event.name}...`)
break
case 'tool_result':
console.log(`β
${event.name} completed`)
break
case 'tool_approval_required':
console.log(`βΈοΈ Approval needed: ${event.name}(${JSON.stringify(event.args)})`)
// Handle approval (see Human-in-the-Loop docs)
break
case 'guardrail_triggered':
console.log(`π‘οΈ Guardrail "${event.guardrail}" ${event.allow ? 'modified' : 'blocked'} ${event.phase}`)
break
case 'handoff':
console.log(`π Handoff from "${event.from}" to "${event.to}"`)
break
case 'run_end':
console.log(`\n\nRun completed: ${event.status}`)
console.log(`Tokens: ${event.usage.totalTokens}`)
break
case 'error':
console.error('Error:', event.error)
break
}
}Getting the Final Result
const stream = Runner.stream(agent, { messages })
// Process events...
for await (const event of stream) {
// ...
}
// Get the final result after the stream completes
const result = await stream.finalResult()
console.log(result.output)
console.log(result.status)
console.log(result.usage)Server-Sent Events (SSE)
For web applications, convert the stream to SSE format:
// Next.js API route / Express handler
export async function POST(request: Request) {
const { messages } = await request.json()
const stream = Runner.stream(agent, { messages })
return new Response(stream.toSSE(), {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
})
}Client-Side Consumption
const response = await fetch('/api/chat', {
method: 'POST',
body: JSON.stringify({ messages }),
})
const reader = response.body.getReader()
const decoder = new TextDecoder()
while (true) {
const { done, value } = await reader.read()
if (done) break
const text = decoder.decode(value)
const events = text.split('\n\n').filter(Boolean)
for (const eventStr of events) {
const event = JSON.parse(eventStr.replace('data: ', ''))
// Handle event...
}
}React Integration
import { useAgentStream } from 'assistme-agent-sdk/react'
function Chat() {
const { messages, send, isStreaming, events } = useAgentStream({
agent: 'researcher',
endpoint: '/api/chat',
})
return (
<div>
{messages.map((msg, i) => (
<div key={i} className={msg.role}>
{msg.content}
</div>
))}
{events.map((event, i) => {
if (event.type === 'tool_call_start') {
return <ToolIndicator key={i} name={event.name} />
}
return null
})}
<input
onKeyDown={(e) => {
if (e.key === 'Enter') {
send(e.currentTarget.value)
}
}}
disabled={isStreaming}
/>
</div>
)
}Streaming with Multi-Agent
When an agent hands off to another agent, the stream seamlessly continues:
const stream = Runner.stream(triageAgent, { messages })
for await (const event of stream) {
if (event.type === 'handoff') {
// The stream continues with the new agent
console.log(`Switched to ${event.to}`)
}
if (event.type === 'agent_start') {
console.log(`Agent "${event.agent}" is now responding`)
}
if (event.type === 'text_delta') {
// This works across agent boundaries
process.stdout.write(event.delta)
}
}Backpressure
For slow consumers, the stream respects backpressure automatically:
const stream = Runner.stream(agent, { messages })
for await (const event of stream) {
if (event.type === 'text_delta') {
// Even if this is slow, the stream won't overflow
await slowDatabaseWrite(event.delta)
}
}Best Practices
-
Always use streaming for user-facing agents β Users perceive streaming responses as faster, even when total time is the same.
-
Handle all event types β Even if you only care about
text_delta, handleerrorandrun_endevents for robustness. -
Use
finalResult()for post-processing β Don't try to reconstruct the final result from events. Use the provided method. -
Convert to SSE for web APIs β Use
toSSE()for standard HTTP streaming that works with all frontend frameworks. -
Show tool activity β Display tool calls and results to users during streaming. This builds trust and makes wait times feel productive.