Documentation Index
Fetch the complete documentation index at: https://mintlify.com/badlogic/pi-mono/llms.txt
Use this file to discover all available pages before exploring further.
The @mariozechner/pi-agent-core package supports custom transport implementations for routing agent requests to different backends.
Overview
By default, the agent uses the streamSimple() function from @mariozechner/pi-ai to call LLM APIs directly. For custom architectures (proxies, edge workers, microservices), you can provide a custom streamFn.
StreamFn Type
type StreamFn = (
model: Model<any>,
context: Context,
options?: SimpleStreamOptions
) => AssistantMessageEventStream | Promise<AssistantMessageEventStream>;
Your custom stream function must:
- Accept the same parameters as
streamSimple()
- Return an
AssistantMessageEventStream (or Promise of one)
- Emit standard events:
text, thinking, tool_call, usage, stop, error
Example: HTTP Proxy
import { Agent, type StreamFn } from "@mariozechner/pi-agent-core";
import { EventStream } from "@mariozechner/pi-ai";
import type {
Model,
Context,
SimpleStreamOptions,
AssistantMessageEvent,
} from "@mariozechner/pi-ai";
const proxyStreamFn: StreamFn = async (
model: Model<any>,
context: Context,
options?: SimpleStreamOptions
) => {
const response = await fetch("https://api.example.com/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ model, context, options }),
signal: options?.signal,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const stream = new EventStream<AssistantMessageEvent, AssistantMessage>();
// Parse SSE stream
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
(async () => {
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop()!;
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = JSON.parse(line.slice(6));
stream.push(data);
}
}
}
stream.close();
} catch (error) {
stream.error(error);
}
})();
return stream;
};
// Use custom transport
const agent = new Agent({
streamFn: proxyStreamFn,
initialState: {
systemPrompt: "You are helpful.",
model: getModel("anthropic", "claude-4.5-sonnet-20250514"),
thinkingLevel: "medium",
tools: [],
messages: [],
},
});
Example: WebSocket Transport
import { EventStream } from "@mariozechner/pi-ai";
import type { StreamFn } from "@mariozechner/pi-agent-core";
const websocketStreamFn: StreamFn = async (
model: Model<any>,
context: Context,
options?: SimpleStreamOptions
) => {
const ws = new WebSocket("wss://api.example.com/stream");
const stream = new EventStream<AssistantMessageEvent, AssistantMessage>();
ws.onopen = () => {
ws.send(JSON.stringify({ model, context, options }));
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
stream.push(data);
if (data.type === "stop") {
ws.close();
stream.close();
}
};
ws.onerror = (error) => {
stream.error(error);
};
ws.onclose = () => {
stream.close();
};
// Handle abort signal
options?.signal?.addEventListener("abort", () => {
ws.close();
});
return stream;
};
Example: Mock Transport (Testing)
import { EventStream } from "@mariozechner/pi-ai";
import type { StreamFn } from "@mariozechner/pi-agent-core";
const mockStreamFn: StreamFn = async (
model: Model<any>,
context: Context,
options?: SimpleStreamOptions
) => {
const stream = new EventStream<AssistantMessageEvent, AssistantMessage>();
// Simulate streaming response
setTimeout(() => {
stream.push({ type: "text", delta: "Hello! " });
}, 100);
setTimeout(() => {
stream.push({ type: "text", delta: "I'm a mock assistant." });
}, 200);
setTimeout(() => {
stream.push({
type: "usage",
usage: {
input: 10,
output: 5,
cacheRead: 0,
cacheWrite: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
});
stream.push({ type: "stop", reason: "stop" });
stream.close();
}, 300);
return stream;
};
// Use in tests
const agent = new Agent({
streamFn: mockStreamFn,
});
const stream = agent.prompt("Hello");
const messages = await stream.result();
consert.equal(messages[0].content, "Hello! I'm a mock assistant.");
Event Stream Contract
Your custom transport must emit events in this order:
- Streaming phase: Emit
text, thinking, and tool_call events as content arrives
- Usage phase: Emit one
usage event with token statistics
- Stop phase: Emit one
stop event with stop reason
- Close stream: Call
stream.close()
If an error occurs, emit an error event and call stream.error(error).
Example Event Sequence
// Text streaming
stream.push({ type: "text", delta: "Hello" });
stream.push({ type: "text", delta: " world" });
// Tool call
stream.push({
type: "tool_call",
toolCall: {
type: "toolCall",
id: "call_123",
name: "search",
arguments: { query: "cats" },
},
});
// Usage
stream.push({
type: "usage",
usage: {
input: 100,
output: 50,
cacheRead: 0,
cacheWrite: 0,
cost: { input: 0.0003, output: 0.00075, cacheRead: 0, cacheWrite: 0, total: 0.00105 },
},
});
// Stop
stream.push({ type: "stop", reason: "tool_use" });
// Close
stream.close();
Cancellation Support
Your transport should respect the AbortSignal:
const customStreamFn: StreamFn = async (
model: Model<any>,
context: Context,
options?: SimpleStreamOptions
) => {
const stream = new EventStream<AssistantMessageEvent, AssistantMessage>();
const signal = options?.signal;
// Handle abort
signal?.addEventListener("abort", () => {
// Clean up resources (close connections, etc.)
stream.error(new Error("Aborted"));
});
// Check if already aborted
if (signal?.aborted) {
stream.error(new Error("Aborted"));
return stream;
}
// ... implement streaming ...
return stream;
};
Backend Implementation
If you’re building a custom backend that receives requests from your transport:
// Example Express.js endpoint
app.post("/stream", async (req, res) => {
const { model, context, options } = req.body;
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
// Stream from actual LLM
const stream = streamSimple(model, context, options);
for await (const event of stream) {
res.write(`data: ${JSON.stringify(event)}\n\n`);
}
res.end();
});