Documentation Index
Fetch the complete documentation index at: https://mintlify.com/badlogic/pi-mono/llms.txt
Use this file to discover all available pages before exploring further.
The agent runtime supports custom transport layers for connecting to different backends, including HTTP proxies and custom streaming implementations.
Overview
By default, the agent calls LLM APIs directly using @mariozechner/pi-ai. You can override this with a custom streamFn to:
- Route requests through a backend proxy
- Add authentication or rate limiting
- Mock LLM responses for testing
- Implement custom protocols
Proxy Usage
For browser apps that need to proxy through a backend:
import { Agent, streamProxy } from "@mariozechner/pi-agent-core";
const agent = new Agent({
streamFn: (model, context, options) =>
streamProxy(model, context, {
...options,
authToken: "your-auth-token",
proxyUrl: "https://your-server.com",
}),
});
Proxy Protocol
The streamProxy function sends requests to your backend with:
Request:
{
"model": { ... },
"context": { ... },
"options": { ... }
}
Response: Server-sent events (SSE) with AssistantMessageEvent objects
Custom Stream Function
Implement a custom streaming function for full control:
import type { Model, Context, StreamOptions } from "@mariozechner/pi-ai";
import type { AssistantMessageEventStream } from "@mariozechner/pi-ai";
async function customStreamFn(
model: Model<any>,
context: Context,
options?: StreamOptions
): Promise<AssistantMessageEventStream> {
// Your custom implementation
// Must return an async iterable of AssistantMessageEvent
return {
async *[Symbol.asyncIterator]() {
// Yield events: text_delta, tool_call, etc.
yield { type: "text_delta", delta: "Hello" };
yield { type: "done", reason: "stop", message: { ... } };
},
result: async () => finalMessage,
};
}
const agent = new Agent({
streamFn: customStreamFn,
});
Mock Transport for Testing
Create a mock transport that returns predefined responses:
function mockStreamFn(responses: string[]) {
let responseIndex = 0;
return async (model: Model<any>, context: Context) => {
const response = responses[responseIndex++];
return {
async *[Symbol.asyncIterator]() {
// Simulate streaming
for (const char of response) {
yield { type: "text_delta", delta: char };
await new Promise(resolve => setTimeout(resolve, 10));
}
yield {
type: "done",
reason: "stop",
message: {
role: "assistant",
content: [{ type: "text", text: response }],
usage: { input: 0, output: response.length },
},
};
},
result: async () => ({ ... }),
};
};
}
const agent = new Agent({
streamFn: mockStreamFn(["First response", "Second response"]),
});
Dynamic API Keys
Resolve API keys dynamically for OAuth token refresh:
const agent = new Agent({
getApiKey: async (provider) => {
// Refresh token if expired
const token = await refreshOAuthToken(provider);
return token;
},
});
Session IDs
Provide a session ID for provider-side prompt caching:
const agent = new Agent({
sessionId: "user-123-session-456",
});
// Update dynamically
agent.sessionId = "new-session-id";
Session IDs enable prompt caching with OpenAI Codex and other providers that support session-based caching.
Backend Example (Node.js)
Here’s a complete backend proxy example:
import express from "express";
import { stream } from "@mariozechner/pi-ai";
const app = express();
app.post("/stream", express.json(), async (req, res) => {
const { model, context, options } = req.body;
// Validate auth token
if (req.headers.authorization !== `Bearer ${process.env.AUTH_TOKEN}`) {
return res.status(401).json({ error: "Unauthorized" });
}
// Stream response as SSE
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
const s = stream(model, context, {
...options,
apiKey: process.env.ANTHROPIC_API_KEY,
});
for await (const event of s) {
res.write(`data: ${JSON.stringify(event)}\n\n`);
}
res.end();
});
app.listen(3000);
Next Steps
State Management
Managing agent state and queues
API Reference
Transport interface documentation