import { NarrativeApi } from '@narrative.io/data-collaboration-sdk-ts';
// Types for our enrichment
interface RecordToEnrich {
id: string;
text: string;
[key: string]: unknown;
}
interface EnrichmentResult {
category: 'retail' | 'finance' | 'healthcare' | 'technology' | 'other';
confidence: number;
tags: string[];
summary: string;
}
interface EnrichedRecord extends RecordToEnrich {
enrichment: EnrichmentResult;
enriched_at: string;
}
// Configuration
const CONFIG = {
dataPlaneId: process.env.DATA_PLANE_ID!,
datasetId: parseInt(process.env.DATASET_ID!),
model: 'anthropic.claude-sonnet-4.5' as const,
batchSize: 10,
maxRetries: 3,
};
// Initialize API
const api = new NarrativeApi({
apiKey: process.env.NARRATIVE_API_KEY!,
});
// Schema for the enrichment output
const enrichmentSchema = {
type: 'object',
properties: {
category: {
type: 'string',
enum: ['retail', 'finance', 'healthcare', 'technology', 'other'],
description: 'Primary category for this record',
},
confidence: {
type: 'number',
minimum: 0,
maximum: 1,
description: 'Confidence score for the classification',
},
tags: {
type: 'array',
items: { type: 'string' },
maxItems: 5,
description: 'Relevant tags for this record',
},
summary: {
type: 'string',
maxLength: 200,
description: 'Brief summary of the record content',
},
},
required: ['category', 'confidence', 'tags', 'summary'],
};
/**
* Wait for an inference job to complete
*/
async function waitForInference(jobId: string, maxWaitMs = 60000): Promise<EnrichmentResult | null> {
const startTime = Date.now();
const pollInterval = 2000;
while (Date.now() - startTime < maxWaitMs) {
const job = await api.getJob(jobId);
if (job.state === 'completed' && job.result) {
return job.result.structured_output as EnrichmentResult;
}
if (job.state === 'failed') {
console.error('Inference job failed:', job.failures);
return null;
}
await new Promise(resolve => setTimeout(resolve, pollInterval));
}
throw new Error(`Job ${jobId} timed out`);
}
/**
* Enrich a single record using inference
*/
async function enrichRecord(record: RecordToEnrich): Promise<EnrichedRecord | null> {
const prompt = `Analyze and categorize this record:
ID: ${record.id}
Content: ${record.text}
Classify into one of: retail, finance, healthcare, technology, or other.
Extract relevant tags and provide a brief summary.`;
try {
const job = await api.runModelInference({
data_plane_id: CONFIG.dataPlaneId,
model: CONFIG.model,
messages: [
{
role: 'system',
text: 'You are a data classification assistant. Analyze records and provide structured classifications.',
},
{ role: 'user', text: prompt },
],
inference_config: {
output_format_schema: enrichmentSchema,
temperature: 0.3, // Lower temperature for consistent classifications
},
tags: ['enrichment', `dataset-${CONFIG.datasetId}`],
});
const result = await waitForInference(job.id);
if (result) {
return {
...record,
enrichment: result,
enriched_at: new Date().toISOString(),
};
}
return null;
} catch (error) {
console.error(`Failed to enrich record ${record.id}:`, error);
return null;
}
}
/**
* Process records in batches with rate limiting
*/
async function processBatch(records: RecordToEnrich[]): Promise<EnrichedRecord[]> {
const results: EnrichedRecord[] = [];
// Process in parallel within the batch
const promises = records.map(async (record) => {
const enriched = await enrichRecord(record);
if (enriched) {
results.push(enriched);
}
return enriched;
});
await Promise.all(promises);
return results;
}
/**
* Main enrichment pipeline
*/
async function runEnrichmentPipeline(): Promise<void> {
console.log('Starting enrichment pipeline...');
// Step 1: Fetch records from the dataset
console.log(`Fetching sample from dataset ${CONFIG.datasetId}...`);
const sample = await api.getDatasetSample(CONFIG.datasetId, 100);
const records = sample.records as RecordToEnrich[];
console.log(`Found ${records.length} records to process`);
// Step 2: Process in batches
const allEnriched: EnrichedRecord[] = [];
for (let i = 0; i < records.length; i += CONFIG.batchSize) {
const batch = records.slice(i, i + CONFIG.batchSize);
const batchNumber = Math.floor(i / CONFIG.batchSize) + 1;
const totalBatches = Math.ceil(records.length / CONFIG.batchSize);
console.log(`Processing batch ${batchNumber}/${totalBatches}...`);
const enriched = await processBatch(batch);
allEnriched.push(...enriched);
console.log(` Enriched ${enriched.length}/${batch.length} records`);
// Rate limiting between batches
if (i + CONFIG.batchSize < records.length) {
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
// Step 3: Report results
console.log('\n=== Enrichment Complete ===');
console.log(`Total records processed: ${records.length}`);
console.log(`Successfully enriched: ${allEnriched.length}`);
console.log(`Failed: ${records.length - allEnriched.length}`);
// Category distribution
const categoryCount = allEnriched.reduce((acc, r) => {
acc[r.enrichment.category] = (acc[r.enrichment.category] || 0) + 1;
return acc;
}, {} as Record<string, number>);
console.log('\nCategory distribution:');
Object.entries(categoryCount)
.sort((a, b) => b[1] - a[1])
.forEach(([category, count]) => {
console.log(` ${category}: ${count}`);
});
// Average confidence
const avgConfidence = allEnriched.reduce((sum, r) => sum + r.enrichment.confidence, 0) / allEnriched.length;
console.log(`\nAverage confidence: ${(avgConfidence * 100).toFixed(1)}%`);
}
// Run the pipeline
runEnrichmentPipeline()
.then(() => console.log('\nPipeline finished successfully'))
.catch((error) => {
console.error('Pipeline failed:', error);
process.exit(1);
});