Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/badlogic/pi-mono/llms.txt

Use this file to discover all available pages before exploring further.

The agent runtime supports custom transport layers for connecting to different backends, including HTTP proxies and custom streaming implementations.

Overview

By default, the agent calls LLM APIs directly using @mariozechner/pi-ai. You can override this with a custom streamFn to:
  • Route requests through a backend proxy
  • Add authentication or rate limiting
  • Mock LLM responses for testing
  • Implement custom protocols

Proxy Usage

For browser apps that need to proxy through a backend:
import { Agent, streamProxy } from "@mariozechner/pi-agent-core";

const agent = new Agent({
  streamFn: (model, context, options) =>
    streamProxy(model, context, {
      ...options,
      authToken: "your-auth-token",
      proxyUrl: "https://your-server.com",
    }),
});

Proxy Protocol

The streamProxy function sends requests to your backend with: Request:
{
  "model": { ... },
  "context": { ... },
  "options": { ... }
}
Response: Server-sent events (SSE) with AssistantMessageEvent objects

Custom Stream Function

Implement a custom streaming function for full control:
import type { Model, Context, StreamOptions } from "@mariozechner/pi-ai";
import type { AssistantMessageEventStream } from "@mariozechner/pi-ai";

async function customStreamFn(
  model: Model<any>,
  context: Context,
  options?: StreamOptions
): Promise<AssistantMessageEventStream> {
  // Your custom implementation
  // Must return an async iterable of AssistantMessageEvent
  
  return {
    async *[Symbol.asyncIterator]() {
      // Yield events: text_delta, tool_call, etc.
      yield { type: "text_delta", delta: "Hello" };
      yield { type: "done", reason: "stop", message: { ... } };
    },
    result: async () => finalMessage,
  };
}

const agent = new Agent({
  streamFn: customStreamFn,
});

Mock Transport for Testing

Create a mock transport that returns predefined responses:
function mockStreamFn(responses: string[]) {
  let responseIndex = 0;
  
  return async (model: Model<any>, context: Context) => {
    const response = responses[responseIndex++];
    
    return {
      async *[Symbol.asyncIterator]() {
        // Simulate streaming
        for (const char of response) {
          yield { type: "text_delta", delta: char };
          await new Promise(resolve => setTimeout(resolve, 10));
        }
        
        yield {
          type: "done",
          reason: "stop",
          message: {
            role: "assistant",
            content: [{ type: "text", text: response }],
            usage: { input: 0, output: response.length },
          },
        };
      },
      result: async () => ({ ... }),
    };
  };
}

const agent = new Agent({
  streamFn: mockStreamFn(["First response", "Second response"]),
});

Dynamic API Keys

Resolve API keys dynamically for OAuth token refresh:
const agent = new Agent({
  getApiKey: async (provider) => {
    // Refresh token if expired
    const token = await refreshOAuthToken(provider);
    return token;
  },
});

Session IDs

Provide a session ID for provider-side prompt caching:
const agent = new Agent({
  sessionId: "user-123-session-456",
});

// Update dynamically
agent.sessionId = "new-session-id";
Session IDs enable prompt caching with OpenAI Codex and other providers that support session-based caching.

Backend Example (Node.js)

Here’s a complete backend proxy example:
import express from "express";
import { stream } from "@mariozechner/pi-ai";

const app = express();

app.post("/stream", express.json(), async (req, res) => {
  const { model, context, options } = req.body;
  
  // Validate auth token
  if (req.headers.authorization !== `Bearer ${process.env.AUTH_TOKEN}`) {
    return res.status(401).json({ error: "Unauthorized" });
  }
  
  // Stream response as SSE
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");
  
  const s = stream(model, context, {
    ...options,
    apiKey: process.env.ANTHROPIC_API_KEY,
  });
  
  for await (const event of s) {
    res.write(`data: ${JSON.stringify(event)}\n\n`);
  }
  
  res.end();
});

app.listen(3000);

Next Steps

State Management

Managing agent state and queues

API Reference

Transport interface documentation