Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/badlogic/pi-mono/llms.txt

Use this file to discover all available pages before exploring further.

The @mariozechner/pi-agent-core package supports custom transport implementations for routing agent requests to different backends.

Overview

By default, the agent uses the streamSimple() function from @mariozechner/pi-ai to call LLM APIs directly. For custom architectures (proxies, edge workers, microservices), you can provide a custom streamFn.

StreamFn Type

type StreamFn = (
  model: Model<any>,
  context: Context,
  options?: SimpleStreamOptions
) => AssistantMessageEventStream | Promise<AssistantMessageEventStream>;
Your custom stream function must:
  1. Accept the same parameters as streamSimple()
  2. Return an AssistantMessageEventStream (or Promise of one)
  3. Emit standard events: text, thinking, tool_call, usage, stop, error

Example: HTTP Proxy

import { Agent, type StreamFn } from "@mariozechner/pi-agent-core";
import { EventStream } from "@mariozechner/pi-ai";
import type {
  Model,
  Context,
  SimpleStreamOptions,
  AssistantMessageEvent,
} from "@mariozechner/pi-ai";

const proxyStreamFn: StreamFn = async (
  model: Model<any>,
  context: Context,
  options?: SimpleStreamOptions
) => {
  const response = await fetch("https://api.example.com/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ model, context, options }),
    signal: options?.signal,
  });

  if (!response.ok) {
    throw new Error(`HTTP ${response.status}: ${response.statusText}`);
  }

  const stream = new EventStream<AssistantMessageEvent, AssistantMessage>();

  // Parse SSE stream
  const reader = response.body!.getReader();
  const decoder = new TextDecoder();
  let buffer = "";

  (async () => {
    try {
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split("\n");
        buffer = lines.pop()!;

        for (const line of lines) {
          if (line.startsWith("data: ")) {
            const data = JSON.parse(line.slice(6));
            stream.push(data);
          }
        }
      }
      stream.close();
    } catch (error) {
      stream.error(error);
    }
  })();

  return stream;
};

// Use custom transport
const agent = new Agent({
  streamFn: proxyStreamFn,
  initialState: {
    systemPrompt: "You are helpful.",
    model: getModel("anthropic", "claude-4.5-sonnet-20250514"),
    thinkingLevel: "medium",
    tools: [],
    messages: [],
  },
});

Example: WebSocket Transport

import { EventStream } from "@mariozechner/pi-ai";
import type { StreamFn } from "@mariozechner/pi-agent-core";

const websocketStreamFn: StreamFn = async (
  model: Model<any>,
  context: Context,
  options?: SimpleStreamOptions
) => {
  const ws = new WebSocket("wss://api.example.com/stream");
  const stream = new EventStream<AssistantMessageEvent, AssistantMessage>();

  ws.onopen = () => {
    ws.send(JSON.stringify({ model, context, options }));
  };

  ws.onmessage = (event) => {
    const data = JSON.parse(event.data);
    stream.push(data);

    if (data.type === "stop") {
      ws.close();
      stream.close();
    }
  };

  ws.onerror = (error) => {
    stream.error(error);
  };

  ws.onclose = () => {
    stream.close();
  };

  // Handle abort signal
  options?.signal?.addEventListener("abort", () => {
    ws.close();
  });

  return stream;
};

Example: Mock Transport (Testing)

import { EventStream } from "@mariozechner/pi-ai";
import type { StreamFn } from "@mariozechner/pi-agent-core";

const mockStreamFn: StreamFn = async (
  model: Model<any>,
  context: Context,
  options?: SimpleStreamOptions
) => {
  const stream = new EventStream<AssistantMessageEvent, AssistantMessage>();

  // Simulate streaming response
  setTimeout(() => {
    stream.push({ type: "text", delta: "Hello! " });
  }, 100);

  setTimeout(() => {
    stream.push({ type: "text", delta: "I'm a mock assistant." });
  }, 200);

  setTimeout(() => {
    stream.push({
      type: "usage",
      usage: {
        input: 10,
        output: 5,
        cacheRead: 0,
        cacheWrite: 0,
        cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
      },
    });
    stream.push({ type: "stop", reason: "stop" });
    stream.close();
  }, 300);

  return stream;
};

// Use in tests
const agent = new Agent({
  streamFn: mockStreamFn,
});

const stream = agent.prompt("Hello");
const messages = await stream.result();
consert.equal(messages[0].content, "Hello! I'm a mock assistant.");

Event Stream Contract

Your custom transport must emit events in this order:
  1. Streaming phase: Emit text, thinking, and tool_call events as content arrives
  2. Usage phase: Emit one usage event with token statistics
  3. Stop phase: Emit one stop event with stop reason
  4. Close stream: Call stream.close()
If an error occurs, emit an error event and call stream.error(error).

Example Event Sequence

// Text streaming
stream.push({ type: "text", delta: "Hello" });
stream.push({ type: "text", delta: " world" });

// Tool call
stream.push({
  type: "tool_call",
  toolCall: {
    type: "toolCall",
    id: "call_123",
    name: "search",
    arguments: { query: "cats" },
  },
});

// Usage
stream.push({
  type: "usage",
  usage: {
    input: 100,
    output: 50,
    cacheRead: 0,
    cacheWrite: 0,
    cost: { input: 0.0003, output: 0.00075, cacheRead: 0, cacheWrite: 0, total: 0.00105 },
  },
});

// Stop
stream.push({ type: "stop", reason: "tool_use" });

// Close
stream.close();

Cancellation Support

Your transport should respect the AbortSignal:
const customStreamFn: StreamFn = async (
  model: Model<any>,
  context: Context,
  options?: SimpleStreamOptions
) => {
  const stream = new EventStream<AssistantMessageEvent, AssistantMessage>();
  const signal = options?.signal;

  // Handle abort
  signal?.addEventListener("abort", () => {
    // Clean up resources (close connections, etc.)
    stream.error(new Error("Aborted"));
  });

  // Check if already aborted
  if (signal?.aborted) {
    stream.error(new Error("Aborted"));
    return stream;
  }

  // ... implement streaming ...

  return stream;
};

Backend Implementation

If you’re building a custom backend that receives requests from your transport:
// Example Express.js endpoint
app.post("/stream", async (req, res) => {
  const { model, context, options } = req.body;

  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");

  // Stream from actual LLM
  const stream = streamSimple(model, context, options);

  for await (const event of stream) {
    res.write(`data: ${JSON.stringify(event)}\n\n`);
  }

  res.end();
});