What this recipe accomplishes
- Process large datasets in manageable chunks
- Execute inference requests in parallel
- Handle rate limits and backoff
- Recover from failures without losing progress
- Aggregate and report results
Prerequisites
- SDK installed and configured (see Authentication)
- A data plane ID where inference will run
- Understanding of Running Model Inference
Complete example
Copy
Ask AI
import { NarrativeApi } from '@narrative.io/data-collaboration-sdk-ts';
// Types
interface BatchItem {
id: string;
content: string;
}
interface ProcessedItem {
id: string;
result: unknown;
success: boolean;
error?: string;
processingTime: number;
}
interface BatchProgress {
total: number;
processed: number;
succeeded: number;
failed: number;
startTime: number;
}
// Configuration
const CONFIG = {
dataPlaneId: process.env.DATA_PLANE_ID!,
model: 'anthropic.claude-haiku-4.5' as const, // Use faster model for batch
concurrency: 5, // Parallel requests
chunkSize: 50, // Items per chunk
maxRetries: 3,
retryDelayMs: 2000,
pollIntervalMs: 2000,
jobTimeoutMs: 60000,
};
// Initialize API
const api = new NarrativeApi({
apiKey: process.env.NARRATIVE_API_KEY!,
});
// Output schema (customize for your use case)
const outputSchema = {
type: 'object',
properties: {
category: { type: 'string' },
score: { type: 'number', minimum: 0, maximum: 1 },
summary: { type: 'string', maxLength: 100 },
},
required: ['category', 'score'],
};
/**
* Process a single item with retries
*/
async function processItem(
item: BatchItem,
retryCount = 0
): Promise<ProcessedItem> {
const startTime = Date.now();
try {
// Submit inference request
const job = await api.runModelInference({
data_plane_id: CONFIG.dataPlaneId,
model: CONFIG.model,
messages: [
{ role: 'user', text: `Process this item:\n\nID: ${item.id}\nContent: ${item.content}` },
],
inference_config: {
output_format_schema: outputSchema,
temperature: 0.2,
},
tags: ['batch-processing', `item-${item.id}`],
});
// Wait for completion
const result = await waitForJob(job.id);
return {
id: item.id,
result: result,
success: true,
processingTime: Date.now() - startTime,
};
} catch (error) {
// Retry on transient errors
if (retryCount < CONFIG.maxRetries && isRetryableError(error)) {
const delay = CONFIG.retryDelayMs * Math.pow(2, retryCount);
console.log(`Retrying item ${item.id} in ${delay}ms (attempt ${retryCount + 1})`);
await sleep(delay);
return processItem(item, retryCount + 1);
}
return {
id: item.id,
result: null,
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
processingTime: Date.now() - startTime,
};
}
}
/**
* Wait for job completion with timeout
*/
async function waitForJob(jobId: string): Promise<unknown> {
const startTime = Date.now();
while (Date.now() - startTime < CONFIG.jobTimeoutMs) {
const job = await api.getJob(jobId);
if (job.state === 'completed' && job.result) {
return job.result.structured_output;
}
if (job.state === 'failed') {
throw new Error(`Job failed: ${JSON.stringify(job.failures)}`);
}
await sleep(CONFIG.pollIntervalMs);
}
throw new Error(`Job ${jobId} timed out`);
}
/**
* Check if error is retryable
*/
function isRetryableError(error: unknown): boolean {
if (error instanceof Error) {
// Rate limit or temporary failures
return (
error.message.includes('rate limit') ||
error.message.includes('timeout') ||
error.message.includes('503') ||
error.message.includes('429')
);
}
return false;
}
/**
* Process items with controlled concurrency
*/
async function processWithConcurrency(
items: BatchItem[],
concurrency: number,
onProgress?: (progress: BatchProgress) => void
): Promise<ProcessedItem[]> {
const results: ProcessedItem[] = [];
const progress: BatchProgress = {
total: items.length,
processed: 0,
succeeded: 0,
failed: 0,
startTime: Date.now(),
};
// Process in chunks
for (let i = 0; i < items.length; i += concurrency) {
const chunk = items.slice(i, i + concurrency);
// Process chunk in parallel
const chunkResults = await Promise.all(
chunk.map(item => processItem(item))
);
// Update results and progress
for (const result of chunkResults) {
results.push(result);
progress.processed++;
if (result.success) {
progress.succeeded++;
} else {
progress.failed++;
}
}
// Report progress
if (onProgress) {
onProgress(progress);
}
// Rate limiting between chunks
if (i + concurrency < items.length) {
await sleep(500);
}
}
return results;
}
/**
* Chunk an array into smaller arrays
*/
function chunkArray<T>(array: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
/**
* Sleep helper
*/
function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
/**
* Format duration
*/
function formatDuration(ms: number): string {
if (ms < 1000) return `${ms}ms`;
if (ms < 60000) return `${(ms / 1000).toFixed(1)}s`;
return `${(ms / 60000).toFixed(1)}m`;
}
/**
* Main batch processing function
*/
async function processBatch(items: BatchItem[]): Promise<void> {
console.log(`\n=== Batch Processing ===`);
console.log(`Items: ${items.length}`);
console.log(`Concurrency: ${CONFIG.concurrency}`);
console.log(`Model: ${CONFIG.model}\n`);
const startTime = Date.now();
// Process with progress reporting
const results = await processWithConcurrency(
items,
CONFIG.concurrency,
(progress) => {
const elapsed = Date.now() - progress.startTime;
const rate = progress.processed / (elapsed / 1000);
const eta = progress.processed > 0
? (progress.total - progress.processed) / rate
: 0;
process.stdout.write(
`\rProgress: ${progress.processed}/${progress.total} ` +
`(${progress.succeeded} ok, ${progress.failed} failed) ` +
`Rate: ${rate.toFixed(1)}/s ETA: ${formatDuration(eta * 1000)} `
);
}
);
console.log('\n');
// Aggregate results
const totalTime = Date.now() - startTime;
const succeeded = results.filter(r => r.success);
const failed = results.filter(r => !r.success);
const avgTime = results.reduce((sum, r) => sum + r.processingTime, 0) / results.length;
console.log('=== Results ===');
console.log(`Total time: ${formatDuration(totalTime)}`);
console.log(`Processed: ${results.length}`);
console.log(`Succeeded: ${succeeded.length} (${(succeeded.length / results.length * 100).toFixed(1)}%)`);
console.log(`Failed: ${failed.length}`);
console.log(`Avg processing time: ${formatDuration(avgTime)}`);
console.log(`Throughput: ${(results.length / (totalTime / 1000)).toFixed(2)} items/sec`);
if (failed.length > 0) {
console.log('\nFailed items:');
failed.slice(0, 5).forEach(f => {
console.log(` - ${f.id}: ${f.error}`);
});
if (failed.length > 5) {
console.log(` ... and ${failed.length - 5} more`);
}
}
// Return or store results as needed
return;
}
/**
* Checkpoint-based processing for very large batches
*/
async function processWithCheckpoints(
items: BatchItem[],
checkpointFile: string
): Promise<void> {
const fs = await import('fs').then(m => m.promises);
// Load checkpoint if exists
let processedIds: Set<string> = new Set();
try {
const checkpoint = await fs.readFile(checkpointFile, 'utf-8');
processedIds = new Set(JSON.parse(checkpoint));
console.log(`Resuming from checkpoint: ${processedIds.size} items already processed`);
} catch {
console.log('Starting fresh (no checkpoint found)');
}
// Filter already processed
const remaining = items.filter(item => !processedIds.has(item.id));
console.log(`Remaining items to process: ${remaining.length}`);
// Process in chunks with checkpoints
const chunks = chunkArray(remaining, CONFIG.chunkSize);
for (let i = 0; i < chunks.length; i++) {
console.log(`\nProcessing chunk ${i + 1}/${chunks.length}...`);
const results = await processWithConcurrency(chunks[i], CONFIG.concurrency);
// Update checkpoint
results.forEach(r => processedIds.add(r.id));
await fs.writeFile(checkpointFile, JSON.stringify([...processedIds]));
console.log(`Checkpoint saved: ${processedIds.size} items`);
}
}
// Example usage
async function main(): Promise<void> {
// Generate sample items
const sampleItems: BatchItem[] = Array.from({ length: 100 }, (_, i) => ({
id: `item-${i + 1}`,
content: `Sample content for item ${i + 1}`,
}));
await processBatch(sampleItems);
}
main().catch(console.error);
Key patterns
Controlled concurrency
Limit parallel requests to avoid overwhelming the system:Copy
Ask AI
// Process in parallel batches
for (let i = 0; i < items.length; i += concurrency) {
const chunk = items.slice(i, i + concurrency);
await Promise.all(chunk.map(processItem));
}
Exponential backoff
Retry failed requests with increasing delays:Copy
Ask AI
const delay = baseDelay * Math.pow(2, retryCount);
await sleep(delay);
Progress tracking
Report progress for visibility into long-running jobs:Copy
Ask AI
onProgress({
processed: count,
total: items.length,
rate: count / elapsedSeconds,
});
Checkpoint recovery
Save progress for resumable processing:Copy
Ask AI
// Save after each chunk
await fs.writeFile(checkpointFile, JSON.stringify([...processedIds]));
// Resume from checkpoint
const checkpoint = await fs.readFile(checkpointFile, 'utf-8');
const processedIds = new Set(JSON.parse(checkpoint));
Performance tuning
| Parameter | Effect | Recommendation |
|---|---|---|
concurrency | Parallel requests | Start with 5, increase if system handles it |
chunkSize | Items per checkpoint | 50-100 for good checkpoint granularity |
model | Speed vs capability | Use Haiku for simple tasks, Sonnet for complex |
pollIntervalMs | Status check frequency | 2000ms balances responsiveness and API calls |

