Examples
I'll break down each example into smaller, annotated snippets that are easier to understand and copy. I'll start with the first example:
1. Progressive Data Analysis
First, let's define the schema:
// Define the structure of our market analysis data
const analysisSchema = z.object({
marketData: z.object({
trends: z.array(z.object({
metric: z.string(),
value: z.number()
})),
summary: z.string()
}),
competitors: z.array(z.object({
name: z.string(),
strengths: z.array(z.string()),
weaknesses: z.array(z.string())
})),
recommendations: z.object({
immediate: z.array(z.string()),
longTerm: z.array(z.string()),
budget: z.number()
})
});
Then, handle the streaming data processing:
for await (const chunk of stream) {
// 1. Visualize market trends as soon as they're available
if (isPathComplete(['marketData', 'trends'], chunk)) {
initializeCharts(chunk.marketData.trends);
}
// 2. Process competitor data as each competitor entry completes
chunk._meta._completedPaths.forEach(path => {
if (path[0] === 'competitors' && path.length === 2) {
const competitor = chunk.competitors[path[1] as number];
fetchCompetitorData(competitor.name);
}
});
// 3. Handle budget planning when both requirements are met
if (isPathComplete(['recommendations', 'immediate'], chunk) &&
isPathComplete(['recommendations', 'budget'], chunk)) {
planBudgetAllocation({
actions: chunk.recommendations.immediate,
budget: chunk.recommendations.budget
});
}
}
2. Document Processing Pipeline
The schema:
const documentSchema = z.object({
metadata: z.object({
title: z.string(),
author: z.string(),
topics: z.array(z.string())
}),
sections: z.array(z.object({
heading: z.string(),
content: z.string(),
annotations: z.array(z.object({
type: z.string(),
text: z.string(),
confidence: z.number()
}))
})),
summary: z.object({
abstract: z.string(),
keyPoints: z.array(z.string()),
readingTime: z.number()
})
});
The processing logic:
for await (const chunk of stream) {
// 1. Start document indexing when metadata arrives
if (isPathComplete(['metadata'], chunk)) {
indexDocument({
title: chunk.metadata.title,
topics: chunk.metadata.topics
});
}
// 2. Process section annotations as they complete
chunk._meta._completedPaths.forEach(path => {
if (path[0] === 'sections' && isPathComplete([...path, 'annotations'], chunk)) {
const sectionIndex = path[1] as number;
const section = chunk.sections[sectionIndex];
processAnnotations({
heading: section.heading,
annotations: section.annotations
});
}
});
// 3. Generate preview when required fields are available
if (isPathComplete(['summary', 'abstract'], chunk) &&
isPathComplete(['summary', 'readingTime'], chunk)) {
generatePreview({
abstract: chunk.summary.abstract,
readingTime: chunk.summary.readingTime
});
}
}
3. E-commerce Product Enrichment
The schema:
const productSchema = z.object({
basic: z.object({
id: z.string(),
name: z.string(),
category: z.string()
}),
pricing: z.object({
base: z.number(),
discounts: z.array(z.object({
type: z.string(),
amount: z.number()
})),
final: z.number()
}),
inventory: z.object({
status: z.string(),
locations: z.array(z.object({
id: z.string(),
quantity: z.number()
}))
}),
enrichment: z.object({
seoDescription: z.string(),
searchKeywords: z.array(z.string()),
relatedProducts: z.array(z.string())
})
});
The processing logic:
for await (const chunk of stream) {
// 1. Initialize product display with basic info
if (isPathComplete(['basic'], chunk)) {
initializeProductCard(chunk.basic);
}
// 2. Handle pricing and inventory updates
if (isPathComplete(['pricing', 'final'], chunk)) {
updatePriceDisplay(chunk.pricing.final);
if (isPathComplete(['inventory', 'status'], chunk)) {
updateBuyButton({
price: chunk.pricing.final,
status: chunk.inventory.status
});
}
}
// 3. Handle SEO optimization
if (isPathComplete(['enrichment', 'seoDescription'], chunk) &&
isPathComplete(['enrichment', 'searchKeywords'], chunk)) {
optimizeProductSEO({
description: chunk.enrichment.seoDescription,
keywords: chunk.enrichment.searchKeywords
});
}
// 4. Handle related products
if (isPathComplete(['enrichment', 'relatedProducts'], chunk)) {
prefetchRelatedProducts(chunk.enrichment.relatedProducts);
}
}
Next.js API Route Example
The schema:
// schemas/analysis.ts
const schema = z.object({
summary: z.string(),
topics: z.array(z.string()),
sentiment: z.object({
score: z.number(),
label: z.string()
})
});
The API route:
// pages/api/extract.ts
import { withResponseModel, OAIStream } from "zod-stream";
import { z } from "zod";
import { analysisSchema } from "@/schemas/analysis";
// 2. Create the API handler
export default async function handler(req, res) {
const { content } = await req.json();
// 3. Configure the response model
const params = withResponseModel({
response_model: {
schema,
name: "ContentAnalysis"
},
mode: "TOOLS",
params: {
messages: [{
role: "user",
content: `Analyze: ${content}`
}],
model: "gpt-4"
}
});
// 4. Create and return the stream
const stream = await oai.chat.completions.create({
...params,
stream: true
});
return new Response(OAIStream({ res: stream }));
}
React Component Example
import { useJsonStream } from "stream-hooks";
import { analysisSchema } from "@/schemas/analysis";
import { z } from "zod";
function AnalysisComponent() {
const [data, setData] = useState<z.infer<typeof schema>>();
const {
loading,
error,
startStream
} = useJsonStream({
schema,
onReceive: (data) => {
setData(data)
}
});
return (
<div>
<button
onClick={() => startStream({
url: "/api/extract",
method: "POST",
body: { content: "..." }
})}
disabled={loading}
>
Start Analysis
</button>
{loading && <LoadingState paths={data._meta._completedPaths} />}
{error && <ErrorDisplay error={error} />}
<ProgressiveDisplay
data={data}
isComplete={data._meta._completedPaths.length > 0}
/>
</div>
);
}