Documentation Index Fetch the complete documentation index at: https://mintlify.com/badlogic/pi-mono/llms.txt
Use this file to discover all available pages before exploring further.
Streaming API
The streaming API provides real-time access to model responses as they are generated. This enables building responsive UIs and handling partial results before the full response is complete.
Streaming vs Complete
Two ways to interact with models:
import { getModel , stream } from '@mariozechner/pi-ai' ;
const model = getModel ( 'openai' , 'gpt-4o-mini' );
const s = stream ( model , context );
for await ( const event of s ) {
if ( event . type === 'text_delta' ) {
process . stdout . write ( event . delta );
}
}
const message = await s . result ();
console . log ( `Total tokens: ${ message . usage . totalTokens } ` );
Event Types
All events emitted during assistant message generation:
Event Type Description Key Properties startStream begins partial: Initial assistant messagetext_startText block starts contentIndex: Position in content arraytext_deltaText chunk received delta: New text, contentIndextext_endText block complete content: Full text, contentIndexthinking_startThinking block starts contentIndex: Position in content arraythinking_deltaThinking chunk received delta: New text, contentIndexthinking_endThinking block complete content: Full thinking, contentIndextoolcall_startTool call begins contentIndex: Position in content arraytoolcall_deltaTool arguments streaming delta: JSON chunk, partial.content[contentIndex].argumentstoolcall_endTool call complete toolCall: Complete tool call with id, name, argumentsdoneStream complete reason: Stop reason, message: Final assistant messageerrorError occurred reason: “error” or “aborted”, error: AssistantMessage with partial content
Basic Streaming
import { getModel , stream } from '@mariozechner/pi-ai' ;
const model = getModel ( 'openai' , 'gpt-4o-mini' );
const context = {
messages: [{ role: 'user' , content: 'Write a short poem' }]
};
const s = stream ( model , context );
for await ( const event of s ) {
switch ( event . type ) {
case 'start' :
console . log ( `Starting with ${ event . partial . model } ` );
break ;
case 'text_start' :
console . log ( ' \n [Text started]' );
break ;
case 'text_delta' :
process . stdout . write ( event . delta );
break ;
case 'text_end' :
console . log ( ' \n [Text ended]' );
break ;
case 'done' :
console . log ( ` \n Finished: ${ event . reason } ` );
break ;
case 'error' :
console . error ( `Error: ${ event . error . errorMessage } ` );
break ;
}
}
const message = await s . result ();
console . log ( `Tokens: ${ message . usage . totalTokens } ` );
console . log ( `Cost: $ ${ message . usage . cost . total . toFixed ( 4 ) } ` );
import { getModel , stream , Type , Tool } from '@mariozechner/pi-ai' ;
const tools : Tool [] = [{
name: 'get_weather' ,
description: 'Get current weather' ,
parameters: Type . Object ({
location: Type . String ({ description: 'City name' })
})
}];
const context = {
messages: [{ role: 'user' , content: 'What is the weather in Paris?' }],
tools
};
const s = stream ( model , context );
for await ( const event of s ) {
switch ( event . type ) {
case 'text_delta' :
process . stdout . write ( event . delta );
break ;
case 'toolcall_start' :
console . log ( ` \n [Tool call started: index ${ event . contentIndex } ]` );
break ;
case 'toolcall_delta' :
// Partial tool arguments are being streamed
const partialCall = event . partial . content [ event . contentIndex ];
if ( partialCall . type === 'toolCall' ) {
console . log ( `[Streaming args for ${ partialCall . name } ]` );
// BE DEFENSIVE: arguments may be incomplete
if ( partialCall . arguments . location ) {
console . log ( ` Location so far: ${ partialCall . arguments . location } ` );
}
}
break ;
case 'toolcall_end' :
console . log ( ` \n Tool called: ${ event . toolCall . name } ` );
console . log ( `Arguments: ${ JSON . stringify ( event . toolCall . arguments ) } ` );
break ;
case 'done' :
console . log ( ` \n Finished: ${ event . reason } ` );
break ;
}
}
const message = await s . result ();
// Handle tool calls
for ( const block of message . content ) {
if ( block . type === 'toolCall' ) {
// Execute tool and add result
const result = await executeWeatherApi ( block . arguments );
context . messages . push ( message );
context . messages . push ({
role: 'toolResult' ,
toolCallId: block . id ,
toolName: block . name ,
content: [{ type: 'text' , text: JSON . stringify ( result ) }],
isError: false ,
timestamp: Date . now ()
});
}
}
During streaming, tool arguments are progressively parsed as they arrive:
for await ( const event of s ) {
if ( event . type === 'toolcall_delta' ) {
const toolCall = event . partial . content [ event . contentIndex ];
if ( toolCall . type === 'toolCall' && toolCall . arguments ) {
// BE DEFENSIVE: arguments may be incomplete
// Fields may be missing or truncated
if ( toolCall . name === 'write_file' && toolCall . arguments . path ) {
console . log ( `Writing to: ${ toolCall . arguments . path } ` );
// Content might be partial or missing
if ( toolCall . arguments . content ) {
console . log ( `Bytes so far: ${ toolCall . arguments . content . length } ` );
}
}
}
}
if ( event . type === 'toolcall_end' ) {
// Here toolCall.arguments is complete (but not yet validated)
const toolCall = event . toolCall ;
console . log ( `Tool completed: ${ toolCall . name } ` , toolCall . arguments );
}
}
Important notes:
During toolcall_delta, arguments contains best-effort parse of partial JSON
Fields may be missing, incomplete, or truncated mid-word
Arrays and nested objects may be partially populated
At minimum, arguments will be an empty object {}, never undefined
Google provider does not support function call streaming - you get a single toolcall_delta with full arguments
Streaming Thinking/Reasoning
Models with reasoning capabilities emit thinking events:
import { getModel , streamSimple } from '@mariozechner/pi-ai' ;
const model = getModel ( 'anthropic' , 'claude-sonnet-4-20250514' );
const context = {
messages: [{ role: 'user' , content: 'Solve: 2x + 5 = 13' }]
};
const s = streamSimple ( model , context , {
reasoning: 'medium' // 'minimal' | 'low' | 'medium' | 'high' | 'xhigh'
});
for await ( const event of s ) {
switch ( event . type ) {
case 'thinking_start' :
console . log ( '[Model is thinking...]' );
break ;
case 'thinking_delta' :
// Stream thinking content in real-time
process . stdout . write ( event . delta );
break ;
case 'thinking_end' :
console . log ( ' \n [Thinking complete]' );
break ;
case 'text_delta' :
process . stdout . write ( event . delta );
break ;
}
}
const message = await s . result ();
// Access thinking and text blocks
for ( const block of message . content ) {
if ( block . type === 'thinking' ) {
console . log ( 'Thinking:' , block . thinking );
} else if ( block . type === 'text' ) {
console . log ( 'Response:' , block . text );
}
}
Stop Reasons
Every completed stream includes a stop reason:
const message = await s . result ();
switch ( message . stopReason ) {
case 'stop' :
console . log ( 'Normal completion' );
break ;
case 'length' :
console . log ( 'Hit max token limit' );
break ;
case 'toolUse' :
console . log ( 'Model is calling tools' );
break ;
case 'error' :
console . log ( 'Error occurred:' , message . errorMessage );
break ;
case 'aborted' :
console . log ( 'Request was cancelled' );
break ;
}
Error Handling
const s = stream ( model , context );
for await ( const event of s ) {
if ( event . type === 'error' ) {
// event.reason is either "error" or "aborted"
console . error ( `Error ( ${ event . reason } ):` , event . error . errorMessage );
// Partial content received before error
console . log ( 'Partial content:' , event . error . content );
// Partial usage and cost
console . log ( 'Tokens used:' , event . error . usage );
}
}
const message = await s . result ();
if ( message . stopReason === 'error' || message . stopReason === 'aborted' ) {
console . error ( 'Request failed:' , message . errorMessage );
}
Aborting Requests
Cancel in-progress requests with an abort signal:
import { getModel , stream } from '@mariozechner/pi-ai' ;
const model = getModel ( 'openai' , 'gpt-4o-mini' );
const controller = new AbortController ();
// Abort after 2 seconds
setTimeout (() => controller . abort (), 2000 );
const s = stream ( model , {
messages: [{ role: 'user' , content: 'Write a long story' }]
}, {
signal: controller . signal
});
for await ( const event of s ) {
if ( event . type === 'text_delta' ) {
process . stdout . write ( event . delta );
} else if ( event . type === 'error' ) {
console . log ( ` ${ event . reason === 'aborted' ? 'Aborted' : 'Error' } :` , event . error . errorMessage );
}
}
const response = await s . result ();
if ( response . stopReason === 'aborted' ) {
console . log ( 'Request was aborted' );
console . log ( 'Partial content:' , response . content );
console . log ( 'Tokens used:' , response . usage );
}
Continuing After Abort
Aborted messages can be added to context and continued:
const context = {
messages: [
{ role: 'user' , content: 'Explain quantum computing' }
]
};
// First request gets aborted
const controller = new AbortController ();
setTimeout (() => controller . abort (), 2000 );
const partial = await complete ( model , context , { signal: controller . signal });
// Add partial response to context
context . messages . push ( partial );
context . messages . push ({ role: 'user' , content: 'Please continue' });
// Continue the conversation
const continuation = await complete ( model , context );
Stream Options
Common options for streaming requests:
import { getModel , stream } from '@mariozechner/pi-ai' ;
const s = stream ( model , context , {
temperature: 0.7 ,
maxTokens: 2000 ,
apiKey: 'sk-...' , // Override environment variable
signal: controller . signal , // Abort signal
sessionId: 'session-123' , // Session-based caching (where supported)
cacheRetention: 'long' , // 'none' | 'short' | 'long'
transport: 'websocket' , // 'sse' | 'websocket' | 'auto' (OpenAI Codex)
headers: {
'X-Custom-Header' : 'value'
},
metadata: {
user_id: 'user-123' // Provider-specific metadata
},
onPayload : ( payload ) => {
// Inspect request payload before sending
console . log ( 'Payload:' , JSON . stringify ( payload , null , 2 ));
}
});
Simplified Streaming API
For reasoning-capable models, use streamSimple with unified options:
import { getModel , streamSimple } from '@mariozechner/pi-ai' ;
// Works across all reasoning-capable providers
const model = getModel ( 'anthropic' , 'claude-sonnet-4-20250514' );
// or getModel('openai', 'gpt-5-mini')
// or getModel('google', 'gemini-2.5-flash')
// or getModel('xai', 'grok-code-fast-1')
const s = streamSimple ( model , context , {
reasoning: 'medium' , // Unified reasoning level
temperature: 0.7 ,
maxTokens: 2000
});
for await ( const event of s ) {
// Same event types as stream()
}
Reasoning levels:
minimal - Very quick, basic reasoning
low - Quick reasoning
medium - Balanced reasoning (default)
high - Deep reasoning
xhigh - Maximum reasoning (OpenAI only, maps to high on other providers)
See Thinking/Reasoning for details.
Usage and Cost Tracking
const s = stream ( model , context );
for await ( const event of s ) {
if ( event . type === 'text_delta' ) {
process . stdout . write ( event . delta );
}
}
const message = await s . result ();
// Token usage
console . log ( 'Input tokens:' , message . usage . input );
console . log ( 'Output tokens:' , message . usage . output );
console . log ( 'Cache read:' , message . usage . cacheRead );
console . log ( 'Cache write:' , message . usage . cacheWrite );
console . log ( 'Total tokens:' , message . usage . totalTokens );
// Cost breakdown
console . log ( 'Input cost:' , message . usage . cost . input );
console . log ( 'Output cost:' , message . usage . cost . output );
console . log ( 'Cache read cost:' , message . usage . cost . cacheRead );
console . log ( 'Cache write cost:' , message . usage . cost . cacheWrite );
console . log ( 'Total cost:' , message . usage . cost . total );
Next Steps
Tools Learn about tool calling and validation
Thinking Enable reasoning capabilities