Getting Started
Installation
bun add zod-stream zod openai
Core Concepts
The ZodStream
client provides real-time validation and metadata for streaming LLM responses:
import ZodStream from "zod-stream";
import { z } from "zod";
const client = new ZodStream({
debug: true // Enable debug logging
});
// Define your extraction schema
const schema = z.object({
content: z.string(),
metadata: z.object({
confidence: z.number(),
category: z.string()
})
});
// Create streaming extraction
const stream = await client.create({
completionPromise: async () => {
const response = await fetch("/api/extract", {
method: "POST",
body: JSON.stringify({ prompt: "..." })
});
return response.body;
},
response_model: {
schema,
name: "ContentExtraction"
}
});
// Process with validation metadata
for await (const chunk of stream) {
console.log({
data: chunk, // Partial extraction result
isValid: chunk._meta._isValid, // Current validation state
activePath: chunk._meta._activePath, // Currently processing path
completedPaths: chunk._meta._completedPaths // Completed paths
});
}
Progressive Processing
zod-stream
enables processing dependent data as soon as relevant paths complete, without waiting for the full response:
// Define schema for a complex analysis
const schema = z.object({
user: z.object({
id: z.string(),
preferences: z.object({
theme: z.string(),
language: z.string()
})
}),
content: z.object({
title: z.string(),
body: z.string(),
metadata: z.object({
keywords: z.array(z.string()),
category: z.string()
})
}),
recommendations: z.array(z.object({
id: z.string(),
score: z.number(),
reason: z.string()
}))
});
// Process data as it becomes available
for await (const chunk of stream) {
// Start personalizing UI as soon as user preferences are ready
if (isPathComplete(['user', 'preferences'], chunk)) {
applyUserTheme(chunk.user.preferences.theme);
setLanguage(chunk.user.preferences.language);
}
// Begin content indexing once we have title and keywords
if (isPathComplete(['content', 'metadata', 'keywords'], chunk) &&
isPathComplete(['content', 'title'], chunk)) {
indexContent({
title: chunk.content.title,
keywords: chunk.content.metadata.keywords
});
}
// Start fetching recommended content in parallel
chunk._meta._completedPaths.forEach(path => {
if (path[0] === 'recommendations' && path.length === 2) {
const index = path[1] as number;
const recommendation = chunk.recommendations[index];
if (recommendation?.id) {
prefetchContent(recommendation.id);
}
}
});
}
This approach enables:
- Early UI updates based on user preferences
- Parallel processing of independent data
- Optimistic loading of related content
- Better perceived performance
- Resource optimization
Stream Metadata
Every streamed chunk includes metadata about validation state:
type CompletionMeta = {
_isValid: boolean; // Schema validation status
_activePath: (string | number)[]; // Current parsing path
_completedPaths: (string | number)[][]; // All completed paths
}
// Example chunk
{
content: "partial content...",
metadata: {
confidence: 0.95
},
_meta: {
_isValid: false, // Not valid yet
_activePath: ["metadata", "category"],
_completedPaths: [
["content"],
["metadata", "confidence"]
]
}
}
Schema Stubs
Get typed stub objects for initialization:
const schema = z.object({
users: z.array(z.object({
name: z.string(),
age: z.number()
}))
});
const client = new ZodStream();
const stub = client.getSchemaStub({
schema,
defaultData: {
users: [{ name: "loading...", age: 0 }]
}
});
Debug Logging
Enable detailed logging for debugging:
const client = new ZodStream({ debug: true });
// Logs will include:
// - Stream initialization
// - Validation results
// - Path completion
// - Errors with full context
Using Response Models
The withResponseModel
helper configures OpenAI parameters based on your schema and chosen mode:
import { withResponseModel } from "zod-stream";
import { z } from "zod";
const schema = z.object({
sentiment: z.string(),
keywords: z.array(z.string()),
confidence: z.number()
});
// Configure for OpenAI tools mode
const params = withResponseModel({
response_model: {
schema,
name: "Analysis",
description: "Extract sentiment and keywords"
},
mode: "TOOLS",
params: {
messages: [{ role: "user", content: "Analyze this text..." }],
model: "gpt-4"
}
});
const completion = await oai.chat.completions.create({
...params,
stream: true
});
Response Modes
zod-stream
supports multiple modes for structured LLM responses:
import { MODE } from "zod-stream";
const modes = {
FUNCTIONS: "FUNCTIONS", // OpenAI function calling
TOOLS: "TOOLS", // OpenAI tools API
JSON: "JSON", // Direct JSON response
MD_JSON: "MD_JSON", // JSON in markdown blocks
JSON_SCHEMA: "JSON_SCHEMA", // JSON with schema validation
THINKING_MD_JSON: "THINKING_MD_JSON" // JSON with thinking in markdown blocks (deepseek r1)
} as const;
Mode-Specific Behaviors
TOOLS Mode
// Results in OpenAI tool configuration
{
tool_choice: {
type: "function",
function: { name: "Analysis" }
},
tools: [{
type: "function",
function: {
name: "Analysis",
description: "Extract sentiment and keywords",
parameters: {/* Generated from schema */}
}
}],
// ... other existing params are preserved
}
FUNCTIONS Mode (Legacy)
// Results in OpenAI function configuration
{
function_call: { name: "Analysis" },
functions: [{
name: "Analysis",
description: "Extract sentiment and keywords",
parameters: {/* Generated from schema */}
}],
// ... other existing params are preserved
}
JSON Mode
// Results in direct JSON response configuration
{
response_format: { type: "json_object" },
messages: [
{
role: "system",
content: `
Given a user prompt, you will return fully valid JSON based on the following description and schema.
You will return no other prose. You will take into account any descriptions or required parameters within the schema
and return a valid and fully escaped JSON object that matches the schema and those instructions.
description: ${definition.description}
json schema: ${JSON.stringify(definition)}
`
},
// ... user messages are preserved
]
}
JSON_SCHEMA Mode
// Results in JSON schema-based configuration
{
response_format: {
type: "json_object",
schema: {/* Schema without name and description */}
},
messages: [
{
role: "system",
content: `
Given a user prompt, you will return fully valid JSON based on the following description.
You will return no other prose. You will take into account any descriptions or required parameters within the schema
and return a valid and fully escaped JSON object that matches the schema and those instructions.
description: ${definition.description}
`
},
// ... user messages are preserved
]
}
MESSAGE_BASED Mode
// Similar to JSON mode but without response_format
{
messages: [
{
role: "system",
content: `
Given a user prompt, you will return fully valid JSON based on the following description and schema.
You will return no other prose. You will take into account any descriptions or required parameters within the schema
and return a valid and fully escaped JSON object that matches the schema and those instructions.
description: ${definition.description}
json schema: ${JSON.stringify(definition)}
`
},
// ... user messages are preserved
]
}
Response Parsing
Built-in parsers handle different response formats:
import {
OAIResponseParser,
OAIResponseToolArgsParser,
OAIResponseFnArgsParser,
OAIResponseJSONParser
} from "zod-stream";
// Automatic format detection
const result = OAIResponseParser(response);
// Format-specific parsing
const toolArgs = OAIResponseToolArgsParser(response);
const fnArgs = OAIResponseFnArgsParser(response);
const jsonContent = OAIResponseJSONParser(response);
Streaming Utilities
Handle streaming responses with built-in utilities:
import { OAIStream, readableStreamToAsyncGenerator } from "zod-stream";
// Create streaming response
app.post("/api/stream", async (req, res) => {
const completion = await oai.chat.completions.create({
...params,
stream: true
});
return new Response(
OAIStream({ res: completion })
);
});
// Convert stream to async generator
const generator = readableStreamToAsyncGenerator(stream);
for await (const chunk of generator) {
console.log(chunk);
}
Path Tracking Utilities
Monitor completion status of specific paths:
import { isPathComplete } from "zod-stream";
const activePath = ["analysis", "sentiment"];
const isComplete = isPathComplete(activePath, {
_meta: {
_completedPaths: [["analysis", "sentiment"]],
_activePath: ["analysis", "keywords"],
_isValid: false
}
});
Error Handling
zod-stream
provides error handling at multiple levels:
const stream = await client.create({
completionPromise: async () => response.body,
response_model: { schema }
});
let finalResult
// Path tracking for progressive updates
for await (const chunk of stream) {
finalResult = chunk
// Check which paths are complete
console.log("Completed paths:", chunk._meta._completedPaths);
console.log("Current path:", chunk._meta._activePath);
}
// Final validation happens after stream completes
const isValid = finalResult._meta._isValid