What this recipe accomplishes
- Upload a CSV file to a dataset
- Monitor the ingestion job until completion
- Execute an NQL query against the ingested data
- Export results to a local file
Prerequisites
- Node.js 18 or later
- TypeScript configured in your project
- A Narrative API key with read/write permissions
- An existing dataset to upload data to
Complete example
Create a file calleddata-pipeline.ts:
Copy
import { NarrativeApi } from '@narrative.io/data-collaboration-sdk-ts';
import fs from 'fs';
import path from 'path';
// Configuration
const DATASET_ID = 12345; // Replace with your dataset ID
const INPUT_FILE = './data/customer_events.csv';
const OUTPUT_FILE = './output/aggregated_events.csv';
// Initialize the SDK
const api = new NarrativeApi({
apiKey: process.env.NARRATIVE_API_KEY,
});
/**
* Wait for a job to complete with exponential backoff
*/
async function waitForJob(jobId: string, maxWaitMs = 300000): Promise<void> {
const startTime = Date.now();
let pollInterval = 2000;
const maxInterval = 30000;
console.log(`Waiting for job ${jobId}...`);
while (Date.now() - startTime < maxWaitMs) {
const job = await api.getJob(jobId);
console.log(` Status: ${job.state}`);
if (job.state === 'completed') {
console.log('Job completed successfully');
return;
}
if (job.state === 'failed') {
throw new Error(`Job failed: ${JSON.stringify(job.failures)}`);
}
await new Promise(resolve => setTimeout(resolve, pollInterval));
pollInterval = Math.min(pollInterval * 1.5, maxInterval);
}
throw new Error(`Job ${jobId} timed out after ${maxWaitMs}ms`);
}
/**
* Upload a file to a dataset
*/
async function uploadFile(datasetId: number, filePath: string): Promise<string> {
const fileName = path.basename(filePath);
const fileContent = fs.readFileSync(filePath);
console.log(`Uploading ${fileName}...`);
// Get upload URL
const uploadUrl = await api.getUploadUrl(datasetId, fileName);
// Upload to S3
await api.uploadFileToS3(uploadUrl.url, fileContent, uploadUrl.headers);
// Confirm upload
await api.confirmUpload(datasetId, uploadUrl.upload_id);
console.log(`Upload confirmed: ${uploadUrl.upload_id}`);
return uploadUrl.upload_id;
}
/**
* Run an aggregation query
*/
async function runQuery(datasetId: number): Promise<any[]> {
console.log('Running aggregation query...');
const result = await api.executeNql({
nql: `
SELECT
sha256_hashed_email,
COUNT(*) as event_count,
MAX(event_timestamp) as last_seen,
SUM(event_value) as total_value
FROM company_data."dataset_${datasetId}"
WHERE event_timestamp >= CURRENT_DATE - INTERVAL '30' DAY
GROUP BY sha256_hashed_email
ORDER BY event_count DESC
LIMIT 10000
`,
data_plane_id: null,
});
if (result.state === 'failed') {
throw new Error(`Query failed: ${JSON.stringify(result.failures)}`);
}
console.log(`Query completed: ${result.result.rows} rows`);
// For this example, we'll return mock data since actual row data
// would come from the result or a subsequent data retrieval
return [];
}
/**
* Export results to CSV
*/
function exportToCsv(data: any[], outputPath: string): void {
if (data.length === 0) {
console.log('No data to export');
return;
}
const headers = Object.keys(data[0]);
const rows = data.map(row =>
headers.map(h => String(row[h] ?? '')).join(',')
);
const csv = [headers.join(','), ...rows].join('\n');
// Ensure output directory exists
const outputDir = path.dirname(outputPath);
if (!fs.existsSync(outputDir)) {
fs.mkdirSync(outputDir, { recursive: true });
}
fs.writeFileSync(outputPath, csv);
console.log(`Exported ${data.length} rows to ${outputPath}`);
}
/**
* Main pipeline function
*/
async function runPipeline() {
console.log('Starting data pipeline...\n');
try {
// Step 1: Upload data
console.log('=== Step 1: Upload Data ===');
if (fs.existsSync(INPUT_FILE)) {
const uploadId = await uploadFile(DATASET_ID, INPUT_FILE);
// Wait for ingestion (if there's an associated job)
// Note: You may need to track the ingestion job separately
console.log('Upload complete\n');
} else {
console.log(`Skipping upload: ${INPUT_FILE} not found\n`);
}
// Step 2: Run query
console.log('=== Step 2: Run Query ===');
const queryResults = await runQuery(DATASET_ID);
console.log('Query complete\n');
// Step 3: Export results
console.log('=== Step 3: Export Results ===');
exportToCsv(queryResults, OUTPUT_FILE);
console.log('Export complete\n');
console.log('Pipeline finished successfully!');
} catch (error) {
console.error('Pipeline failed:', error);
process.exit(1);
}
}
// Run the pipeline
runPipeline();
Running the pipeline
Set your environment variable and run:Copy
export NARRATIVE_API_KEY="your-api-key-here"
npx ts-node data-pipeline.ts
Scheduling with cron
To run automatically, add to crontab:Copy
# Run daily at 2 AM
0 2 * * * cd /path/to/project && NARRATIVE_API_KEY="your-key" npx ts-node data-pipeline.ts >> /var/log/pipeline.log 2>&1
Copy
import cron from 'node-cron';
// Run every day at 2 AM
cron.schedule('0 2 * * *', () => {
runPipeline();
});
Adding error notifications
Send alerts on failure:Copy
async function runPipelineWithNotification() {
try {
await runPipeline();
await sendNotification('Pipeline completed successfully');
} catch (error) {
await sendNotification(`Pipeline failed: ${error.message}`);
throw error;
}
}
async function sendNotification(message: string) {
// Example: Send to Slack
await fetch(process.env.SLACK_WEBHOOK_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ text: message }),
});
}
Parallel processing
Process multiple datasets concurrently:Copy
async function processMultipleDatasets(datasetIds: number[]) {
const results = await Promise.allSettled(
datasetIds.map(async (id) => {
console.log(`Processing dataset ${id}...`);
const data = await runQuery(id);
exportToCsv(data, `./output/dataset_${id}.csv`);
return { id, rows: data.length };
})
);
const succeeded = results.filter(r => r.status === 'fulfilled');
const failed = results.filter(r => r.status === 'rejected');
console.log(`Completed: ${succeeded.length}/${results.length}`);
if (failed.length > 0) {
console.error('Failed datasets:', failed);
}
}
Handling incremental updates
Track last run time for incremental processing:Copy
const STATE_FILE = './state/last_run.json';
function getLastRunTime(): string | null {
try {
const state = JSON.parse(fs.readFileSync(STATE_FILE, 'utf8'));
return state.lastRun;
} catch {
return null;
}
}
function saveLastRunTime(timestamp: string): void {
const dir = path.dirname(STATE_FILE);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
}
fs.writeFileSync(STATE_FILE, JSON.stringify({ lastRun: timestamp }));
}
async function runIncrementalQuery(datasetId: number) {
const lastRun = getLastRunTime();
const now = new Date().toISOString();
const whereClause = lastRun
? `WHERE event_timestamp > '${lastRun}'`
: '';
const result = await api.executeNql({
nql: `
SELECT *
FROM company_data."dataset_${datasetId}"
${whereClause}
LIMIT 100000
`,
data_plane_id: null,
});
saveLastRunTime(now);
return result;
}

