Skip to main content
This recipe demonstrates a complete automation workflow: uploading data, waiting for ingestion, running a query, and exporting results.

What this recipe accomplishes

  • Upload a CSV file to a dataset
  • Monitor the ingestion job until completion
  • Execute an NQL query against the ingested data
  • Export results to a local file

Prerequisites

  • Node.js 18 or later
  • TypeScript configured in your project
  • A Narrative API key with read/write permissions
  • An existing dataset to upload data to

Complete example

Create a file called data-pipeline.ts:
import { NarrativeApi } from '@narrative.io/data-collaboration-sdk-ts';
import fs from 'fs';
import path from 'path';

// Configuration
const DATASET_ID = 12345;  // Replace with your dataset ID
const INPUT_FILE = './data/customer_events.csv';
const OUTPUT_FILE = './output/aggregated_events.csv';

// Initialize the SDK
const api = new NarrativeApi({
  apiKey: process.env.NARRATIVE_API_KEY,
});

/**
 * Wait for a job to complete with exponential backoff
 */
async function waitForJob(jobId: string, maxWaitMs = 300000): Promise<void> {
  const startTime = Date.now();
  let pollInterval = 2000;
  const maxInterval = 30000;

  console.log(`Waiting for job ${jobId}...`);

  while (Date.now() - startTime < maxWaitMs) {
    const job = await api.getJob(jobId);

    console.log(`  Status: ${job.state}`);

    if (job.state === 'completed') {
      console.log('Job completed successfully');
      return;
    }

    if (job.state === 'failed') {
      throw new Error(`Job failed: ${JSON.stringify(job.failures)}`);
    }

    await new Promise(resolve => setTimeout(resolve, pollInterval));
    pollInterval = Math.min(pollInterval * 1.5, maxInterval);
  }

  throw new Error(`Job ${jobId} timed out after ${maxWaitMs}ms`);
}

/**
 * Upload a file to a dataset
 */
async function uploadFile(datasetId: number, filePath: string): Promise<string> {
  const fileName = path.basename(filePath);
  const fileContent = fs.readFileSync(filePath);

  console.log(`Uploading ${fileName}...`);

  // Get upload URL
  const uploadUrl = await api.getUploadUrl(datasetId, fileName);

  // Upload to S3
  await api.uploadFileToS3(uploadUrl.url, fileContent, uploadUrl.headers);

  // Confirm upload
  await api.confirmUpload(datasetId, uploadUrl.upload_id);

  console.log(`Upload confirmed: ${uploadUrl.upload_id}`);
  return uploadUrl.upload_id;
}

/**
 * Run an aggregation query
 */
async function runQuery(datasetId: number): Promise<any[]> {
  console.log('Running aggregation query...');

  const result = await api.executeNql({
    nql: `
      SELECT
        sha256_hashed_email,
        COUNT(*) as event_count,
        MAX(event_timestamp) as last_seen,
        SUM(event_value) as total_value
      FROM company_data."dataset_${datasetId}"
      WHERE event_timestamp >= CURRENT_DATE - INTERVAL '30' DAY
      GROUP BY sha256_hashed_email
      ORDER BY event_count DESC
      LIMIT 10000
    `,
    data_plane_id: null,
  });

  if (result.state === 'failed') {
    throw new Error(`Query failed: ${JSON.stringify(result.failures)}`);
  }

  console.log(`Query completed: ${result.result.rows} rows`);

  // For this example, we'll return mock data since actual row data
  // would come from the result or a subsequent data retrieval
  return [];
}

/**
 * Export results to CSV
 */
function exportToCsv(data: any[], outputPath: string): void {
  if (data.length === 0) {
    console.log('No data to export');
    return;
  }

  const headers = Object.keys(data[0]);
  const rows = data.map(row =>
    headers.map(h => String(row[h] ?? '')).join(',')
  );

  const csv = [headers.join(','), ...rows].join('\n');

  // Ensure output directory exists
  const outputDir = path.dirname(outputPath);
  if (!fs.existsSync(outputDir)) {
    fs.mkdirSync(outputDir, { recursive: true });
  }

  fs.writeFileSync(outputPath, csv);
  console.log(`Exported ${data.length} rows to ${outputPath}`);
}

/**
 * Main pipeline function
 */
async function runPipeline() {
  console.log('Starting data pipeline...\n');

  try {
    // Step 1: Upload data
    console.log('=== Step 1: Upload Data ===');
    if (fs.existsSync(INPUT_FILE)) {
      const uploadId = await uploadFile(DATASET_ID, INPUT_FILE);

      // Wait for ingestion (if there's an associated job)
      // Note: You may need to track the ingestion job separately
      console.log('Upload complete\n');
    } else {
      console.log(`Skipping upload: ${INPUT_FILE} not found\n`);
    }

    // Step 2: Run query
    console.log('=== Step 2: Run Query ===');
    const queryResults = await runQuery(DATASET_ID);
    console.log('Query complete\n');

    // Step 3: Export results
    console.log('=== Step 3: Export Results ===');
    exportToCsv(queryResults, OUTPUT_FILE);
    console.log('Export complete\n');

    console.log('Pipeline finished successfully!');

  } catch (error) {
    console.error('Pipeline failed:', error);
    process.exit(1);
  }
}

// Run the pipeline
runPipeline();

Running the pipeline

Set your environment variable and run:
export NARRATIVE_API_KEY="your-api-key-here"
npx ts-node data-pipeline.ts

Scheduling with cron

To run automatically, add to crontab:
# Run daily at 2 AM
0 2 * * * cd /path/to/project && NARRATIVE_API_KEY="your-key" npx ts-node data-pipeline.ts >> /var/log/pipeline.log 2>&1
Or use Node.js scheduling:
import cron from 'node-cron';

// Run every day at 2 AM
cron.schedule('0 2 * * *', () => {
  runPipeline();
});

Adding error notifications

Send alerts on failure:
async function runPipelineWithNotification() {
  try {
    await runPipeline();
    await sendNotification('Pipeline completed successfully');
  } catch (error) {
    await sendNotification(`Pipeline failed: ${error.message}`);
    throw error;
  }
}

async function sendNotification(message: string) {
  // Example: Send to Slack
  await fetch(process.env.SLACK_WEBHOOK_URL, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ text: message }),
  });
}

Parallel processing

Process multiple datasets concurrently:
async function processMultipleDatasets(datasetIds: number[]) {
  const results = await Promise.allSettled(
    datasetIds.map(async (id) => {
      console.log(`Processing dataset ${id}...`);
      const data = await runQuery(id);
      exportToCsv(data, `./output/dataset_${id}.csv`);
      return { id, rows: data.length };
    })
  );

  const succeeded = results.filter(r => r.status === 'fulfilled');
  const failed = results.filter(r => r.status === 'rejected');

  console.log(`Completed: ${succeeded.length}/${results.length}`);
  if (failed.length > 0) {
    console.error('Failed datasets:', failed);
  }
}

Handling incremental updates

Track last run time for incremental processing:
const STATE_FILE = './state/last_run.json';

function getLastRunTime(): string | null {
  try {
    const state = JSON.parse(fs.readFileSync(STATE_FILE, 'utf8'));
    return state.lastRun;
  } catch {
    return null;
  }
}

function saveLastRunTime(timestamp: string): void {
  const dir = path.dirname(STATE_FILE);
  if (!fs.existsSync(dir)) {
    fs.mkdirSync(dir, { recursive: true });
  }
  fs.writeFileSync(STATE_FILE, JSON.stringify({ lastRun: timestamp }));
}

async function runIncrementalQuery(datasetId: number) {
  const lastRun = getLastRunTime();
  const now = new Date().toISOString();

  const whereClause = lastRun
    ? `WHERE event_timestamp > '${lastRun}'`
    : '';

  const result = await api.executeNql({
    nql: `
      SELECT *
      FROM company_data."dataset_${datasetId}"
      ${whereClause}
      LIMIT 100000
    `,
    data_plane_id: null,
  });

  saveLastRunTime(now);
  return result;
}