Island AI

Getting Started

Installation

bun add zod-stream zod openai

Core Concepts

The ZodStream client provides real-time validation and metadata for streaming LLM responses:

import ZodStream from "zod-stream";
import { z } from "zod";
 
const client = new ZodStream({
  debug: true  // Enable debug logging
});
 
// Define your extraction schema
const schema = z.object({
  content: z.string(),
  metadata: z.object({
    confidence: z.number(),
    category: z.string()
  })
});
 
// Create streaming extraction
const stream = await client.create({
  completionPromise: async () => {
    const response = await fetch("/api/extract", {
      method: "POST",
      body: JSON.stringify({ prompt: "..." })
    });
    return response.body;
  },
  response_model: {
    schema,
    name: "ContentExtraction"
  }
});
 
// Process with validation metadata
for await (const chunk of stream) {
  console.log({
    data: chunk,              // Partial extraction result
    isValid: chunk._meta._isValid,    // Current validation state
    activePath: chunk._meta._activePath,    // Currently processing path
    completedPaths: chunk._meta._completedPaths  // Completed paths
  });
}

Progressive Processing

zod-stream enables processing dependent data as soon as relevant paths complete, without waiting for the full response:

// Define schema for a complex analysis
const schema = z.object({
  user: z.object({
    id: z.string(),
    preferences: z.object({
      theme: z.string(),
      language: z.string()
    })
  }),
  content: z.object({
    title: z.string(),
    body: z.string(),
    metadata: z.object({
      keywords: z.array(z.string()),
      category: z.string()
    })
  }),
  recommendations: z.array(z.object({
    id: z.string(),
    score: z.number(),
    reason: z.string()
  }))
});
 
// Process data as it becomes available
for await (const chunk of stream) {
  // Start personalizing UI as soon as user preferences are ready
  if (isPathComplete(['user', 'preferences'], chunk)) {
    applyUserTheme(chunk.user.preferences.theme);
    setLanguage(chunk.user.preferences.language);
  }
 
  // Begin content indexing once we have title and keywords
  if (isPathComplete(['content', 'metadata', 'keywords'], chunk) && 
      isPathComplete(['content', 'title'], chunk)) {
    indexContent({
      title: chunk.content.title,
      keywords: chunk.content.metadata.keywords
    });
  }
 
  // Start fetching recommended content in parallel
  chunk._meta._completedPaths.forEach(path => {
    if (path[0] === 'recommendations' && path.length === 2) {
      const index = path[1] as number;
      const recommendation = chunk.recommendations[index];
      
      if (recommendation?.id) {
        prefetchContent(recommendation.id);
      }
    }
  });
}

This approach enables:

  • Early UI updates based on user preferences
  • Parallel processing of independent data
  • Optimistic loading of related content
  • Better perceived performance
  • Resource optimization

Stream Metadata

Every streamed chunk includes metadata about validation state:

type CompletionMeta = {
  _isValid: boolean;           // Schema validation status
  _activePath: (string | number)[];     // Current parsing path
  _completedPaths: (string | number)[][]; // All completed paths
}
 
// Example chunk
{
  content: "partial content...",
  metadata: {
    confidence: 0.95
  },
  _meta: {
    _isValid: false,  // Not valid yet
    _activePath: ["metadata", "category"],
    _completedPaths: [
      ["content"],
      ["metadata", "confidence"]
    ]
  }
}

Schema Stubs

Get typed stub objects for initialization:

const schema = z.object({
  users: z.array(z.object({
    name: z.string(),
    age: z.number()
  }))
});
 
const client = new ZodStream();
const stub = client.getSchemaStub({
  schema,
  defaultData: {
    users: [{ name: "loading...", age: 0 }]
  }
});

Debug Logging

Enable detailed logging for debugging:

const client = new ZodStream({ debug: true });
 
// Logs will include:
// - Stream initialization
// - Validation results
// - Path completion
// - Errors with full context

Using Response Models

The withResponseModel helper configures OpenAI parameters based on your schema and chosen mode:

import { withResponseModel } from "zod-stream";
import { z } from "zod";
 
const schema = z.object({
  sentiment: z.string(),
  keywords: z.array(z.string()),
  confidence: z.number()
});
 
// Configure for OpenAI tools mode
const params = withResponseModel({
  response_model: {
    schema,
    name: "Analysis",
    description: "Extract sentiment and keywords"
  },
  mode: "TOOLS",
  params: {
    messages: [{ role: "user", content: "Analyze this text..." }],
    model: "gpt-4"
  }
});
 
const completion = await oai.chat.completions.create({
  ...params,
  stream: true
});

Response Modes

zod-stream supports multiple modes for structured LLM responses:

import { MODE } from "zod-stream";
 
const modes = {
  FUNCTIONS: "FUNCTIONS",   // OpenAI function calling
  TOOLS: "TOOLS",          // OpenAI tools API
  JSON: "JSON",            // Direct JSON response
  MD_JSON: "MD_JSON",      // JSON in markdown blocks
  JSON_SCHEMA: "JSON_SCHEMA" // JSON with schema validation
} as const;

Mode-Specific Behaviors

TOOLS Mode

// Results in OpenAI tool configuration
{
  tool_choice: {
    type: "function",
    function: { name: "Analysis" }
  },
  tools: [{
    type: "function",
    function: {
      name: "Analysis",
      description: "Extract sentiment and keywords",
      parameters: {/* Generated from schema */}
    }
  }],
  // ... other existing params are preserved
}

FUNCTIONS Mode (Legacy)

// Results in OpenAI function configuration
{
  function_call: { name: "Analysis" },
  functions: [{
    name: "Analysis",
    description: "Extract sentiment and keywords",
    parameters: {/* Generated from schema */}
  }],
  // ... other existing params are preserved
}

JSON Mode

// Results in direct JSON response configuration
{
  response_format: { type: "json_object" },
  messages: [
    {
      role: "system",
      content: `
        Given a user prompt, you will return fully valid JSON based on the following description and schema.
        You will return no other prose. You will take into account any descriptions or required parameters within the schema
        and return a valid and fully escaped JSON object that matches the schema and those instructions.
 
        description: ${definition.description}
        json schema: ${JSON.stringify(definition)}
      `
    },
    // ... user messages are preserved
  ]
}

JSON_SCHEMA Mode

// Results in JSON schema-based configuration
{
  response_format: {
    type: "json_object",
    schema: {/* Schema without name and description */}
  },
  messages: [
    {
      role: "system",
      content: `
        Given a user prompt, you will return fully valid JSON based on the following description.
        You will return no other prose. You will take into account any descriptions or required parameters within the schema
        and return a valid and fully escaped JSON object that matches the schema and those instructions.
 
        description: ${definition.description}
      `
    },
    // ... user messages are preserved
  ]
}

MESSAGE_BASED Mode

// Similar to JSON mode but without response_format
{
  messages: [
    {
      role: "system",
      content: `
        Given a user prompt, you will return fully valid JSON based on the following description and schema.
        You will return no other prose. You will take into account any descriptions or required parameters within the schema
        and return a valid and fully escaped JSON object that matches the schema and those instructions.
 
        description: ${definition.description}
        json schema: ${JSON.stringify(definition)}
      `
    },
    // ... user messages are preserved
  ]
}

Response Parsing

Built-in parsers handle different response formats:

import { 
  OAIResponseParser,
  OAIResponseToolArgsParser,
  OAIResponseFnArgsParser,
  OAIResponseJSONParser
} from "zod-stream";
 
// Automatic format detection
const result = OAIResponseParser(response);
 
// Format-specific parsing
const toolArgs = OAIResponseToolArgsParser(response);
const fnArgs = OAIResponseFnArgsParser(response);
const jsonContent = OAIResponseJSONParser(response);

Streaming Utilities

Handle streaming responses with built-in utilities:

import { OAIStream, readableStreamToAsyncGenerator } from "zod-stream";
 
// Create streaming response
app.post("/api/stream", async (req, res) => {
  const completion = await oai.chat.completions.create({
    ...params,
    stream: true
  });
 
  return new Response(
    OAIStream({ res: completion })
  );
});
 
// Convert stream to async generator
const generator = readableStreamToAsyncGenerator(stream);
for await (const chunk of generator) {
  console.log(chunk);
}

Path Tracking Utilities

Monitor completion status of specific paths:

import { isPathComplete } from "zod-stream";
 
const activePath = ["analysis", "sentiment"];
const isComplete = isPathComplete(activePath, {
  _meta: {
    _completedPaths: [["analysis", "sentiment"]],
    _activePath: ["analysis", "keywords"],
    _isValid: false
  }
});

Error Handling

zod-stream provides error handling at multiple levels:

const stream = await client.create({
  completionPromise: async () => response.body,
  response_model: { schema }
});
 
let finalResult
 
// Path tracking for progressive updates
for await (const chunk of stream) {
  finalResult = chunk
  // Check which paths are complete
  console.log("Completed paths:", chunk._meta._completedPaths);
  console.log("Current path:", chunk._meta._activePath);
}
 
// Final validation happens after stream completes
const isValid = finalResult._meta._isValid