Island AI

Getting Started

Installation

bun add schema-stream zod

Basic Usage

import { SchemaStream } from 'schema-stream';
import { z } from 'zod';
 
// Define your schema
const schema = z.object({
  users: z.array(z.object({
    name: z.string(),
    age: z.number()
  })),
  metadata: z.object({
    total: z.number(),
    page: z.number()
  })
});
 
// Create parser with optional defaults
const parser = new SchemaStream(schema, {
  metadata: { total: 0, page: 1 }
});
 
// Track completion paths
parser.onKeyComplete(({ completedPaths }) => {
  console.log('Completed:', completedPaths);
});
 
// Parse streaming data
const stream = parser.parse();
response.body.pipeThrough(stream);
 
// Read results with full type inference
const reader = stream.readable.getReader();
while (true) {
  const { value, done } = await reader.read();
  if (done) break;
  
  const result = JSON.parse(decoder.decode(value));
  // result is fully typed as z.infer<typeof schema>
  console.log(result);
}

Validation and Error Handling

schema-stream focuses solely on streaming JSON parsing with type stubs - it intentionally does not perform full Zod schema validation during parsing. This design choice enables:

  1. Faster parsing without validation overhead
  2. Immediate access to partial data
  3. Flexibility for downstream validation

For full schema validation and error handling, consider using:

  • zod-stream: Adds validation, OpenAI integration, and structured error handling
  • instructor: Complete solution for validated LLM extraction

Example of schema-stream being used by zod-stream:

const streamParser = new SchemaStream(response_model.schema, {
  typeDefaults: {
    string: null,
    number: null,
    boolean: null
  },
  onKeyComplete: ({ activePath, completedPaths }) => {
    _activePath = activePath;
    _completedPaths = completedPaths;
  }
});
 
// Create parser with validation stream
const parser = streamParser.parse({
  handleUnescapedNewLines: true
});
 
// Add validation in transform stream
const validationStream = new TransformStream({
  transform: async (chunk, controller) => {
    try {
      const parsedChunk = JSON.parse(decoder.decode(chunk));
      const validation = await schema.safeParseAsync(parsedChunk);
      
      controller.enqueue(encoder.encode(JSON.stringify({
        ...parsedChunk,
        _meta: {
          _isValid: validation.success,
          _activePath,
          _completedPaths
        }
      })));
    } catch (e) {
      controller.error(e);
    }
  }
});
 
// Chain streams
stream
  .pipeThrough(parser)
  .pipeThrough(validationStream);

Progressive UI Updates

const schema = z.object({
  analysis: z.object({
    sentiment: z.string(),
    keywords: z.array(z.string()),
    summary: z.string()
  }),
  metadata: z.object({
    processedAt: z.string(),
    wordCount: z.number()
  })
});
 
const parser = new SchemaStream(schema, {
  // Show loading states initially
  defaultData: {
    analysis: {
      sentiment: "analyzing...",
      keywords: ["loading..."],
      summary: "generating summary..."
    }
  },
  onKeyComplete({ activePath, completedPaths }) {
    // Update UI loading states based on completion
    updateLoadingStates(activePath, completedPaths);
  }
});

Nested Data Processing

const schema = z.object({
  users: z.array(z.object({
    id: z.string(),
    profile: z.object({
      name: z.string(),
      email: z.string(),
      preferences: z.object({
        theme: z.string(),
        notifications: z.boolean()
      })
    }),
    activity: z.array(z.object({
      timestamp: z.string(),
      action: z.string()
    }))
  }))
});
 
const parser = new SchemaStream(schema);
 
// Track specific paths for business logic
parser.onKeyComplete(({ activePath, completedPaths }) => {
  const path = activePath.join('.');
  
  // Process user profiles as they complete
  if (path.match(/users\.\d+\.profile$/)) {
    processUserProfile(/* ... */);
  }
  
  // Process activity logs in batches
  if (path.match(/users\.\d+\.activity\.\d+$/)) {
    batchActivityLog(/* ... */);
  }
});

On this page