Skip to main content
If you’re running multi-step data operations manually—creating datasets, refreshing views, then building derived tables—workflows let you define the entire pipeline once and execute it automatically. Tasks run in sequence, each waiting for the previous one to finish before starting.
For a deeper understanding of how workflows relate to the jobs system, see Workflow Orchestration. For complete API details, see the Workflows API Reference.

Prerequisites

What you’ll learn

  • How to define a workflow specification in YAML
  • How to create and trigger workflows via the API
  • How to chain multiple tasks with dependencies
  • How to pass data between tasks using export and variable expressions
  • How to schedule workflows for automatic execution
  • How to monitor workflow runs and troubleshoot failures

Defining a workflow

Workflows use the Serverless Workflow DSL in YAML format. A workflow is a list of tasks that execute sequentially in the order you define them.

Minimal example

Here’s the simplest possible workflow—a single task that creates a materialized view:
document:
  dsl: '1.0.0'
  namespace: analytics
  name: daily-user-summary
  version: '1.0.0'
do:
  - createUserSummary:
      call: CreateMaterializedViewIfNotExists
      with:
        nql: |
          SELECT
            user_id,
            COUNT(1) as event_count
          FROM events
          GROUP BY user_id
        datasetName: user_event_summary
Every workflow has two sections:
SectionPurpose
documentMetadata—DSL version, namespace, name, and version
doOrdered list of tasks to execute
Each task has a unique name (the key), a call specifying which task to run, and with providing the task’s parameters.

Supported tasks

Workflows support three operations:
TaskDescription
CreateMaterializedViewIfNotExistsCreates a new materialized view from an NQL query
RefreshMaterializedViewRefreshes an existing materialized view with the latest data
ExecuteDmlExecutes a DML statement (INSERT, UPDATE, DELETE)
Direct ExecuteNQL is not supported. To execute a SELECT query and persist results, use CreateMaterializedViewIfNotExists.

Creating and running a workflow

Step 1: Create the workflow

Send your YAML specification to the API:
curl -X POST https://api.narrative.io/workflows \
  -H "Authorization: Bearer $NIO_API_TOKEN" \
  -H "Content-Type: application/x-yaml" \
  -d @my-workflow.yaml
The response includes a workflow_id that you’ll use for all subsequent operations.

Step 2: Trigger a run

Execute the workflow on a specific data plane:
curl -X POST https://api.narrative.io/workflows/{workflow_id}/trigger \
  -H "Authorization: Bearer $NIO_API_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"data_plane_id": 123}'
Each trigger creates a new run with its own run_id. The workflow’s tasks execute sequentially—each task waits for the previous one to complete before starting.

Step 3: Check the status

Monitor the run to see how it’s progressing:
curl https://api.narrative.io/workflows/{workflow_id}/runs \
  -H "Authorization: Bearer $NIO_API_TOKEN"
Each task in a workflow creates its own job in the underlying jobs system. Jobs created by workflows are automatically tagged with workflow_enqueued, making them easy to filter when checking job history.

Chaining dependent tasks

The real power of workflows is chaining operations that depend on each other. Tasks run in order, so later tasks can reference datasets created by earlier ones.

Example: Create a source dataset, then derive from it

document:
  dsl: '1.0.0'
  namespace: analytics
  name: user-comparison-workflow
  version: '1.0.0'
do:
  - createSourceData:
      call: CreateMaterializedViewIfNotExists
      with:
        nql: |
          SELECT
            id,
            name,
            email
          FROM users
          WHERE active = true
        datasetName: active_users_source
  - createDerivedView:
      call: CreateMaterializedViewIfNotExists
      with:
        nql: |
          SELECT
            id,
            name
          FROM company_data.active_users_source
          WHERE name IS NOT NULL
        datasetName: active_users_with_names
The second task references company_data.active_users_source—the dataset created by the first task. The workflow ensures the first task completes before the second one starts.
When referencing datasets created by earlier tasks, use the fully qualified path: company_data.dataset_name.

Example: Refresh multiple views in sequence

document:
  dsl: '1.0.0'
  namespace: etl
  name: daily-refresh
  version: '1.0.0'
do:
  - refreshSource:
      call: RefreshMaterializedView
      with:
        datasetName: raw_events
  - refreshAggregates:
      call: RefreshMaterializedView
      with:
        datasetName: event_aggregates
This ensures raw_events is refreshed before event_aggregates, which may depend on it.

Passing data between tasks

Beyond referencing datasets by name, workflows can pass structured data between tasks using export and variable expressions. This is useful when a downstream task needs metadata from an upstream task — like a dataset ID, row count, or status value.
For the complete syntax reference, see Task output, Export, and Variable expressions in the specification reference.

Exporting task output to workflow context

Every task produces a JSON output object after execution. Use export.as with a jq expression to capture values into the workflow context ($context):
- createView:
    call: CreateMaterializedViewIfNotExists
    with:
      nql: "SELECT user_id, email FROM company_data.users WHERE is_active = true"
      datasetName: active_users
    export:
      as: '$context + { datasetId: .dataset_id }'
After this task runs, $context.datasetId contains the ID of the created dataset.

Using variable expressions in parameters

Use ${…} in parameter values to inject data from the previous task’s output (.) or the workflow context ($context). Use jq string interpolation \(expr) to embed values inside strings:
- logResult:
    call: ExecuteDml
    with:
      nql: |
        ${"INSERT INTO company_data.pipeline_log (dataset_id, status) VALUES (\($context.datasetId), 'created')"}
A pure ${…} expression (as the entire parameter value) preserves the JSON type. When used inside a string or with ${"..."}, the result is always a string.

Complete example

This workflow creates a materialized view, captures its dataset ID in the workflow context, then logs the operation using both direct output access and context:
document:
  dsl: '1.0.0'
  namespace: etl
  name: create-and-log
  version: '1.0.0'
do:
  - createView:
      call: CreateMaterializedViewIfNotExists
      with:
        nql: "SELECT user_id, email FROM company_data.users WHERE is_active = true"
        datasetName: active_users
      export:
        as: '$context + { datasetId: .dataset_id }'
  - logCreatedDataset:
      call: ExecuteDml
      with:
        nql: |
          ${"INSERT INTO company_data.pipeline_log (dataset_id, status) VALUES (\(.dataset_id), 'created')"}
  - logFromContext:
      call: ExecuteDml
      with:
        nql: |
          ${"INSERT INTO company_data.pipeline_log (dataset_id, status) VALUES (\($context.datasetId), 'created')"}
Here’s what happens at each step:
  1. createView creates the materialized view. Its output includes dataset_id. The export.as expression merges this into $context as datasetId.
  2. logCreatedDataset uses .dataset_id — the direct output of the previous task — in a variable expression to insert a log record.
  3. logFromContext uses $context.datasetId to access the same value via the accumulated workflow context. This is useful when the value was exported several tasks ago and is no longer in the immediately preceding output.

Scheduling workflows

Instead of triggering runs manually, you can set workflows to run on a schedule using cron expressions. Add a schedule block to your workflow specification:
document:
  dsl: '1.0.0'
  namespace: etl
  name: hourly-refresh
  version: '1.0.0'
schedule:
  cron: '0 * * * *'
do:
  - refreshData:
      call: RefreshMaterializedView
      with:
        datasetName: hourly_metrics

Common schedules

ExpressionSchedule
0 * * * *Every hour
0 0 * * *Daily at midnight
0 0 * * 0Weekly on Sunday
0 0 1 * *Monthly on the 1st
Run your workflow manually at least once before adding a schedule. This lets you verify it works correctly without waiting for the next scheduled run.

Handling errors

Workflows use fail-fast behavior: if any task fails, execution stops immediately and remaining tasks are skipped. The workflow run status becomes failed. To investigate a failure:
  1. Check the workflow run status via the runs endpoint
  2. Identify which task failed from the run details
  3. Query the individual job using its job_id for detailed error information

Common errors

ErrorCauseSolution
Unsupported taskUsing a task not in the supported listUse only CreateMaterializedViewIfNotExists, RefreshMaterializedView, or ExecuteDml
Missing required fieldIncomplete YAML specificationEnsure document, dsl, namespace, name, version, and do are all present
Invalid workflow specificationMalformed YAMLValidate your YAML syntax before submitting

Best practices

  • Name workflows clearly. Use descriptive names that explain what the workflow does (e.g., daily-audience-refresh rather than workflow-1).
  • Version your workflows. Increment the version when you change the specification to maintain a clear history.
  • Start with a single task. Build and test one task at a time before assembling multi-step pipelines.
  • Test before scheduling. Run workflows manually to confirm they work before setting up automatic execution.

Limitations

  • Sequential only — tasks execute one at a time in order; parallel execution is not supported
  • No conditional logic — all tasks run unconditionally; there is no if/else branching
  • No loops — iterative operations are not supported
  • No automatic retries — failed tasks are not retried automatically