Skip to main content
This cookbook demonstrates how to build an automated data enrichment pipeline using Model Inference. You’ll learn to fetch records from a dataset, run inference to classify or enrich each record, and handle results at scale.

What this recipe accomplishes

  • Fetch sample records from a dataset
  • Run AI inference to enrich each record with classifications
  • Handle inference results with proper error handling
  • Process records in batches for efficiency

Prerequisites

  • SDK installed and configured (see Authentication)
  • A dataset with records to enrich
  • A data plane ID where inference will run

Complete example

import { NarrativeApi } from '@narrative.io/data-collaboration-sdk-ts';

// Types for our enrichment
interface RecordToEnrich {
  id: string;
  text: string;
  [key: string]: unknown;
}

interface EnrichmentResult {
  category: 'retail' | 'finance' | 'healthcare' | 'technology' | 'other';
  confidence: number;
  tags: string[];
  summary: string;
}

interface EnrichedRecord extends RecordToEnrich {
  enrichment: EnrichmentResult;
  enriched_at: string;
}

// Configuration
const CONFIG = {
  dataPlaneId: process.env.DATA_PLANE_ID!,
  datasetId: parseInt(process.env.DATASET_ID!),
  model: 'anthropic.claude-sonnet-4.5' as const,
  batchSize: 10,
  maxRetries: 3,
};

// Initialize API
const api = new NarrativeApi({
  apiKey: process.env.NARRATIVE_API_KEY!,
});

// Schema for the enrichment output
const enrichmentSchema = {
  type: 'object',
  properties: {
    category: {
      type: 'string',
      enum: ['retail', 'finance', 'healthcare', 'technology', 'other'],
      description: 'Primary category for this record',
    },
    confidence: {
      type: 'number',
      minimum: 0,
      maximum: 1,
      description: 'Confidence score for the classification',
    },
    tags: {
      type: 'array',
      items: { type: 'string' },
      maxItems: 5,
      description: 'Relevant tags for this record',
    },
    summary: {
      type: 'string',
      maxLength: 200,
      description: 'Brief summary of the record content',
    },
  },
  required: ['category', 'confidence', 'tags', 'summary'],
};

/**
 * Wait for an inference job to complete
 */
async function waitForInference(jobId: string, maxWaitMs = 60000): Promise<EnrichmentResult | null> {
  const startTime = Date.now();
  const pollInterval = 2000;

  while (Date.now() - startTime < maxWaitMs) {
    const job = await api.getJob(jobId);

    if (job.state === 'completed' && job.result) {
      return job.result.structured_output as EnrichmentResult;
    }

    if (job.state === 'failed') {
      console.error('Inference job failed:', job.failures);
      return null;
    }

    await new Promise(resolve => setTimeout(resolve, pollInterval));
  }

  throw new Error(`Job ${jobId} timed out`);
}

/**
 * Enrich a single record using inference
 */
async function enrichRecord(record: RecordToEnrich): Promise<EnrichedRecord | null> {
  const prompt = `Analyze and categorize this record:

ID: ${record.id}
Content: ${record.text}

Classify into one of: retail, finance, healthcare, technology, or other.
Extract relevant tags and provide a brief summary.`;

  try {
    const job = await api.runModelInference({
      data_plane_id: CONFIG.dataPlaneId,
      model: CONFIG.model,
      messages: [
        {
          role: 'system',
          text: 'You are a data classification assistant. Analyze records and provide structured classifications.',
        },
        { role: 'user', text: prompt },
      ],
      inference_config: {
        output_format_schema: enrichmentSchema,
        temperature: 0.3, // Lower temperature for consistent classifications
      },
      tags: ['enrichment', `dataset-${CONFIG.datasetId}`],
    });

    const result = await waitForInference(job.id);

    if (result) {
      return {
        ...record,
        enrichment: result,
        enriched_at: new Date().toISOString(),
      };
    }

    return null;
  } catch (error) {
    console.error(`Failed to enrich record ${record.id}:`, error);
    return null;
  }
}

/**
 * Process records in batches with rate limiting
 */
async function processBatch(records: RecordToEnrich[]): Promise<EnrichedRecord[]> {
  const results: EnrichedRecord[] = [];

  // Process in parallel within the batch
  const promises = records.map(async (record) => {
    const enriched = await enrichRecord(record);
    if (enriched) {
      results.push(enriched);
    }
    return enriched;
  });

  await Promise.all(promises);
  return results;
}

/**
 * Main enrichment pipeline
 */
async function runEnrichmentPipeline(): Promise<void> {
  console.log('Starting enrichment pipeline...');

  // Step 1: Fetch records from the dataset
  console.log(`Fetching sample from dataset ${CONFIG.datasetId}...`);
  const sample = await api.getDatasetSample(CONFIG.datasetId, 100);
  const records = sample.records as RecordToEnrich[];

  console.log(`Found ${records.length} records to process`);

  // Step 2: Process in batches
  const allEnriched: EnrichedRecord[] = [];

  for (let i = 0; i < records.length; i += CONFIG.batchSize) {
    const batch = records.slice(i, i + CONFIG.batchSize);
    const batchNumber = Math.floor(i / CONFIG.batchSize) + 1;
    const totalBatches = Math.ceil(records.length / CONFIG.batchSize);

    console.log(`Processing batch ${batchNumber}/${totalBatches}...`);

    const enriched = await processBatch(batch);
    allEnriched.push(...enriched);

    console.log(`  Enriched ${enriched.length}/${batch.length} records`);

    // Rate limiting between batches
    if (i + CONFIG.batchSize < records.length) {
      await new Promise(resolve => setTimeout(resolve, 1000));
    }
  }

  // Step 3: Report results
  console.log('\n=== Enrichment Complete ===');
  console.log(`Total records processed: ${records.length}`);
  console.log(`Successfully enriched: ${allEnriched.length}`);
  console.log(`Failed: ${records.length - allEnriched.length}`);

  // Category distribution
  const categoryCount = allEnriched.reduce((acc, r) => {
    acc[r.enrichment.category] = (acc[r.enrichment.category] || 0) + 1;
    return acc;
  }, {} as Record<string, number>);

  console.log('\nCategory distribution:');
  Object.entries(categoryCount)
    .sort((a, b) => b[1] - a[1])
    .forEach(([category, count]) => {
      console.log(`  ${category}: ${count}`);
    });

  // Average confidence
  const avgConfidence = allEnriched.reduce((sum, r) => sum + r.enrichment.confidence, 0) / allEnriched.length;
  console.log(`\nAverage confidence: ${(avgConfidence * 100).toFixed(1)}%`);
}

// Run the pipeline
runEnrichmentPipeline()
  .then(() => console.log('\nPipeline finished successfully'))
  .catch((error) => {
    console.error('Pipeline failed:', error);
    process.exit(1);
  });

How it works

  1. Fetch records: The pipeline retrieves sample records from your dataset using getDatasetSample()
  2. Enrich with inference: Each record is sent to Model Inference with a prompt asking for classification and tagging
  3. Structured output: The JSON Schema ensures every response includes category, confidence, tags, and summary
  4. Batch processing: Records are processed in parallel batches with rate limiting between batches
  5. Error handling: Failed enrichments are tracked and reported without stopping the pipeline

Customizing the recipe

Different enrichment tasks

Change the schema and prompt for your use case:
// Sentiment analysis
const sentimentSchema = {
  type: 'object',
  properties: {
    sentiment: { type: 'string', enum: ['positive', 'negative', 'neutral'] },
    score: { type: 'number', minimum: -1, maximum: 1 },
    key_phrases: { type: 'array', items: { type: 'string' } }
  },
  required: ['sentiment', 'score']
};

Writing results back

Store enriched records in a new dataset:
// Create output dataset
const outputDataset = await api.createDataset({
  name: 'enriched_records',
  display_name: 'Enriched Records',
  schema: { /* schema matching EnrichedRecord */ }
});

// Write enriched data
// ... upload enriched records