Skip to main content
This cookbook demonstrates patterns for efficiently processing large datasets with Model Inference. You’ll learn chunking strategies, parallel execution, rate limiting, error recovery, and result aggregation.

What this recipe accomplishes

  • Process large datasets in manageable chunks
  • Execute inference requests in parallel
  • Handle rate limits and backoff
  • Recover from failures without losing progress
  • Aggregate and report results

Prerequisites

Complete example

import { NarrativeApi } from '@narrative.io/data-collaboration-sdk-ts';

// Types
interface BatchItem {
  id: string;
  content: string;
}

interface ProcessedItem {
  id: string;
  result: unknown;
  success: boolean;
  error?: string;
  processingTime: number;
}

interface BatchProgress {
  total: number;
  processed: number;
  succeeded: number;
  failed: number;
  startTime: number;
}

// Configuration
const CONFIG = {
  dataPlaneId: process.env.DATA_PLANE_ID!,
  model: 'anthropic.claude-haiku-4.5' as const, // Use faster model for batch
  concurrency: 5, // Parallel requests
  chunkSize: 50, // Items per chunk
  maxRetries: 3,
  retryDelayMs: 2000,
  pollIntervalMs: 2000,
  jobTimeoutMs: 60000,
};

// Initialize API
const api = new NarrativeApi({
  apiKey: process.env.NARRATIVE_API_KEY!,
});

// Output schema (customize for your use case)
const outputSchema = {
  type: 'object',
  properties: {
    category: { type: 'string' },
    score: { type: 'number', minimum: 0, maximum: 1 },
    summary: { type: 'string', maxLength: 100 },
  },
  required: ['category', 'score'],
};

/**
 * Process a single item with retries
 */
async function processItem(
  item: BatchItem,
  retryCount = 0
): Promise<ProcessedItem> {
  const startTime = Date.now();

  try {
    // Submit inference request
    const job = await api.runModelInference({
      data_plane_id: CONFIG.dataPlaneId,
      model: CONFIG.model,
      messages: [
        { role: 'user', text: `Process this item:\n\nID: ${item.id}\nContent: ${item.content}` },
      ],
      inference_config: {
        output_format_schema: outputSchema,
        temperature: 0.2,
      },
      tags: ['batch-processing', `item-${item.id}`],
    });

    // Wait for completion
    const result = await waitForJob(job.id);

    return {
      id: item.id,
      result: result,
      success: true,
      processingTime: Date.now() - startTime,
    };
  } catch (error) {
    // Retry on transient errors
    if (retryCount < CONFIG.maxRetries && isRetryableError(error)) {
      const delay = CONFIG.retryDelayMs * Math.pow(2, retryCount);
      console.log(`Retrying item ${item.id} in ${delay}ms (attempt ${retryCount + 1})`);
      await sleep(delay);
      return processItem(item, retryCount + 1);
    }

    return {
      id: item.id,
      result: null,
      success: false,
      error: error instanceof Error ? error.message : 'Unknown error',
      processingTime: Date.now() - startTime,
    };
  }
}

/**
 * Wait for job completion with timeout
 */
async function waitForJob(jobId: string): Promise<unknown> {
  const startTime = Date.now();

  while (Date.now() - startTime < CONFIG.jobTimeoutMs) {
    const job = await api.getJob(jobId);

    if (job.state === 'completed' && job.result) {
      return job.result.structured_output;
    }

    if (job.state === 'failed') {
      throw new Error(`Job failed: ${JSON.stringify(job.failures)}`);
    }

    await sleep(CONFIG.pollIntervalMs);
  }

  throw new Error(`Job ${jobId} timed out`);
}

/**
 * Check if error is retryable
 */
function isRetryableError(error: unknown): boolean {
  if (error instanceof Error) {
    // Rate limit or temporary failures
    return (
      error.message.includes('rate limit') ||
      error.message.includes('timeout') ||
      error.message.includes('503') ||
      error.message.includes('429')
    );
  }
  return false;
}

/**
 * Process items with controlled concurrency
 */
async function processWithConcurrency(
  items: BatchItem[],
  concurrency: number,
  onProgress?: (progress: BatchProgress) => void
): Promise<ProcessedItem[]> {
  const results: ProcessedItem[] = [];
  const progress: BatchProgress = {
    total: items.length,
    processed: 0,
    succeeded: 0,
    failed: 0,
    startTime: Date.now(),
  };

  // Process in chunks
  for (let i = 0; i < items.length; i += concurrency) {
    const chunk = items.slice(i, i + concurrency);

    // Process chunk in parallel
    const chunkResults = await Promise.all(
      chunk.map(item => processItem(item))
    );

    // Update results and progress
    for (const result of chunkResults) {
      results.push(result);
      progress.processed++;
      if (result.success) {
        progress.succeeded++;
      } else {
        progress.failed++;
      }
    }

    // Report progress
    if (onProgress) {
      onProgress(progress);
    }

    // Rate limiting between chunks
    if (i + concurrency < items.length) {
      await sleep(500);
    }
  }

  return results;
}

/**
 * Chunk an array into smaller arrays
 */
function chunkArray<T>(array: T[], size: number): T[][] {
  const chunks: T[][] = [];
  for (let i = 0; i < array.length; i += size) {
    chunks.push(array.slice(i, i + size));
  }
  return chunks;
}

/**
 * Sleep helper
 */
function sleep(ms: number): Promise<void> {
  return new Promise(resolve => setTimeout(resolve, ms));
}

/**
 * Format duration
 */
function formatDuration(ms: number): string {
  if (ms < 1000) return `${ms}ms`;
  if (ms < 60000) return `${(ms / 1000).toFixed(1)}s`;
  return `${(ms / 60000).toFixed(1)}m`;
}

/**
 * Main batch processing function
 */
async function processBatch(items: BatchItem[]): Promise<void> {
  console.log(`\n=== Batch Processing ===`);
  console.log(`Items: ${items.length}`);
  console.log(`Concurrency: ${CONFIG.concurrency}`);
  console.log(`Model: ${CONFIG.model}\n`);

  const startTime = Date.now();

  // Process with progress reporting
  const results = await processWithConcurrency(
    items,
    CONFIG.concurrency,
    (progress) => {
      const elapsed = Date.now() - progress.startTime;
      const rate = progress.processed / (elapsed / 1000);
      const eta = progress.processed > 0
        ? (progress.total - progress.processed) / rate
        : 0;

      process.stdout.write(
        `\rProgress: ${progress.processed}/${progress.total} ` +
        `(${progress.succeeded} ok, ${progress.failed} failed) ` +
        `Rate: ${rate.toFixed(1)}/s ETA: ${formatDuration(eta * 1000)}  `
      );
    }
  );

  console.log('\n');

  // Aggregate results
  const totalTime = Date.now() - startTime;
  const succeeded = results.filter(r => r.success);
  const failed = results.filter(r => !r.success);
  const avgTime = results.reduce((sum, r) => sum + r.processingTime, 0) / results.length;

  console.log('=== Results ===');
  console.log(`Total time: ${formatDuration(totalTime)}`);
  console.log(`Processed: ${results.length}`);
  console.log(`Succeeded: ${succeeded.length} (${(succeeded.length / results.length * 100).toFixed(1)}%)`);
  console.log(`Failed: ${failed.length}`);
  console.log(`Avg processing time: ${formatDuration(avgTime)}`);
  console.log(`Throughput: ${(results.length / (totalTime / 1000)).toFixed(2)} items/sec`);

  if (failed.length > 0) {
    console.log('\nFailed items:');
    failed.slice(0, 5).forEach(f => {
      console.log(`  - ${f.id}: ${f.error}`);
    });
    if (failed.length > 5) {
      console.log(`  ... and ${failed.length - 5} more`);
    }
  }

  // Return or store results as needed
  return;
}

/**
 * Checkpoint-based processing for very large batches
 */
async function processWithCheckpoints(
  items: BatchItem[],
  checkpointFile: string
): Promise<void> {
  const fs = await import('fs').then(m => m.promises);

  // Load checkpoint if exists
  let processedIds: Set<string> = new Set();
  try {
    const checkpoint = await fs.readFile(checkpointFile, 'utf-8');
    processedIds = new Set(JSON.parse(checkpoint));
    console.log(`Resuming from checkpoint: ${processedIds.size} items already processed`);
  } catch {
    console.log('Starting fresh (no checkpoint found)');
  }

  // Filter already processed
  const remaining = items.filter(item => !processedIds.has(item.id));
  console.log(`Remaining items to process: ${remaining.length}`);

  // Process in chunks with checkpoints
  const chunks = chunkArray(remaining, CONFIG.chunkSize);

  for (let i = 0; i < chunks.length; i++) {
    console.log(`\nProcessing chunk ${i + 1}/${chunks.length}...`);

    const results = await processWithConcurrency(chunks[i], CONFIG.concurrency);

    // Update checkpoint
    results.forEach(r => processedIds.add(r.id));
    await fs.writeFile(checkpointFile, JSON.stringify([...processedIds]));

    console.log(`Checkpoint saved: ${processedIds.size} items`);
  }
}

// Example usage
async function main(): Promise<void> {
  // Generate sample items
  const sampleItems: BatchItem[] = Array.from({ length: 100 }, (_, i) => ({
    id: `item-${i + 1}`,
    content: `Sample content for item ${i + 1}`,
  }));

  await processBatch(sampleItems);
}

main().catch(console.error);

Key patterns

Controlled concurrency

Limit parallel requests to avoid overwhelming the system:
// Process in parallel batches
for (let i = 0; i < items.length; i += concurrency) {
  const chunk = items.slice(i, i + concurrency);
  await Promise.all(chunk.map(processItem));
}

Exponential backoff

Retry failed requests with increasing delays:
const delay = baseDelay * Math.pow(2, retryCount);
await sleep(delay);

Progress tracking

Report progress for visibility into long-running jobs:
onProgress({
  processed: count,
  total: items.length,
  rate: count / elapsedSeconds,
});

Checkpoint recovery

Save progress for resumable processing:
// Save after each chunk
await fs.writeFile(checkpointFile, JSON.stringify([...processedIds]));

// Resume from checkpoint
const checkpoint = await fs.readFile(checkpointFile, 'utf-8');
const processedIds = new Set(JSON.parse(checkpoint));

Performance tuning

ParameterEffectRecommendation
concurrencyParallel requestsStart with 5, increase if system handles it
chunkSizeItems per checkpoint50-100 for good checkpoint granularity
modelSpeed vs capabilityUse Haiku for simple tasks, Sonnet for complex
pollIntervalMsStatus check frequency2000ms balances responsiveness and API calls