Skip to main content
This page was written for advanced builders who want to review and edit workflows in code.

Overview

Workflows in Tracecat are defined as YAML workflow definitions.

Workflow schema

The workflow body uses these top-level fields:
title:
description:
entrypoint:
config:
triggers:
error_handler:
actions:
returns:
A minimal workflow looks like this:
title: "Minimal workflow"
description: "Return a static value"
entrypoint:
  expects: {}
actions:
  - ref: build_result
    action: core.transform.reshape
    args:
      value:
        status: ok
returns: ${{ ACTIONS.build_result.result }}

Top-level fields

title
string
Workflow name.
description
string
Workflow description.
entrypoint
object
Input schema. Use expects to define trigger inputs in the workflow definition.
config
object
Workflow settings. Supported fields are environment and timeout.
triggers
list[object]
Optional trigger definitions. Trigger types are webhook and schedule.
error_handler
string
Workflow alias to run when the workflow fails.
actions
list[object]
Workflow actions.
returns
any
Output schema. Use returns to define the workflow output in the workflow definition.
Example input schema:
entrypoint:
  expects:
    alert_id:
      type: str
      description: The alert to investigate
    severity:
      type: enum["low", "medium", "high"]
      default: medium
Example output schema:
returns:
  finding_count: ${{ FN.length(ACTIONS.gather_results.result) }}
  findings: ${{ ACTIONS.gather_results.result }}

Actions schema

ref
string
Unique action identifier. It must be unique within the workflow and match Tracecat’s slug format.
action
string
Fully qualified action name such as core.http_request, core.transform.reshape, core.transform.scatter, core.loop.end, or tools.slack.post_message.
args
mapping
Action inputs. The valid keys depend on the action type.
depends_on
list[string]
Dependencies for this action.Use upstream_ref for a success edge, upstream_ref.success for an explicit success edge, and upstream_ref.error for an error edge. Any other suffix is invalid.
run_if
expression
Conditional expression. If it evaluates to a falsy value, the action is skipped.
for_each
expression | list[expression]
Run the action once per item in a collection. Inside the action, access the current item through var.<name>. Example: for_each: ${{ for var.alert in TRIGGER.alerts }}.
retry_policy
object
Retry configuration with max_attempts and timeout.
start_delay
float
Delay in seconds before the action starts.
join_strategy
string
Join behavior for downstream actions. Use all to wait for all upstream branches, or any to allow a downstream join to complete once any upstream branch completes.
environment
expression | string
Per-action override for the secrets environment.
interaction
object
Marks an action as interactive. interaction cannot be combined with for_each.
Example action:
- ref: fetch_alert
  action: core.http_request
  args:
    url: "https://api.example.com/alerts/${{ TRIGGER.alert_id }}"
    method: GET
    headers:
      Authorization: "Bearer ${{ SECRETS.alerting.API_TOKEN }}"
  retry_policy:
    max_attempts: 3
    timeout: 60

Expressions

Expressions use ${{ ... }}. Use these references inside expressions:
  • TRIGGER.<field>
  • ACTIONS.<ref>.result
  • SECRETS.<name>.<KEY>
  • VARS.<name>.<key>
  • ENV.<field>
  • var.<name>
  • FN.<name>(...)
See Expressions for syntax, operators, and literals. See JSONPath for field access and array access. See Functions for the full function list.

Control-flow primitives

Conditional execution with run_if

title: "Conditional workflow"
description: "Only notify on high severity"
entrypoint:
  expects:
    severity:
      type: str
actions:
  - ref: build_message
    action: core.transform.reshape
    args:
      value: "Severity is ${{ TRIGGER.severity }}"
  - ref: notify
    action: tools.slack.post_message
    depends_on:
      - build_message
    run_if: ${{ FN.is_equal(TRIGGER.severity, "high") }}
    args:
      channel: ${{ SECRETS.slack.SLACK_CHANNEL }}
      text: ${{ ACTIONS.build_message.result }}

Scatter and gather

Use core.transform.scatter to fan out a collection into parallel streams, then core.transform.gather to collect results back into a list.
title: "Scatter gather workflow"
description: "Fetch details for each item"
entrypoint:
  expects:
    items:
      type: list[dict[str, Any]]
actions:
  - ref: scatter_items
    action: core.transform.scatter
    args:
      collection: ${{ TRIGGER.items }}
  - ref: fetch_item
    action: core.http_request
    depends_on:
      - scatter_items
    args:
      url: "https://api.example.com/items/${{ ACTIONS.scatter_items.result.id }}"
      method: GET
  - ref: gather_results
    action: core.transform.gather
    depends_on:
      - fetch_item
    args:
      items: ${{ ACTIONS.fetch_item.result }}
      drop_nulls: true
      error_strategy: partition
returns: ${{ ACTIONS.gather_results.result }}
Within a scatter region, each downstream action reads the current item through ACTIONS.<scatter_ref>.result.

Do-while loops

Use core.loop.start to open a loop region and core.loop.end to decide whether to continue.
title: "Loop workflow"
description: "Retry until the third iteration"
entrypoint:
  expects: {}
actions:
  - ref: loop_start
    action: core.loop.start
  - ref: body
    action: core.transform.reshape
    depends_on:
      - loop_start
    args:
      value:
        iteration: ${{ ACTIONS.loop_start.result.iteration }}
  - ref: loop_end
    action: core.loop.end
    depends_on:
      - body
    args:
      condition: ${{ ACTIONS.loop_start.result.iteration < 2 }}
      max_iterations: 10
  - ref: after_loop
    action: core.transform.reshape
    depends_on:
      - loop_end
    args:
      value: ${{ ACTIONS.loop_end.result.continue }}
returns: ${{ ACTIONS.after_loop.result }}
Loop-specific behavior:
  • core.loop.start exposes ACTIONS.<loop_start_ref>.result.iteration
  • core.loop.end requires condition
  • max_iterations defaults to 100

Subflows

Use core.workflow.execute to run a subflow.
title: "Subflow launcher"
description: "Execute a subflow for each alert"
entrypoint:
  expects:
    alerts:
      type: list[dict[str, Any]]
actions:
  - ref: run_subflow
    action: core.workflow.execute
    for_each: ${{ for var.alert in TRIGGER.alerts }}
    args:
      workflow_id: wf-fee9abc1cc88417bbccb73433646e2c6
      trigger_inputs:
        alert: ${{ var.alert }}
returns: null
The subflow target is usually provided through workflow_id. Trigger data is passed through trigger_inputs.

Validation rules

These are the main workflow rules:
  • All action refs must be unique.
  • Every dependency in depends_on must reference a real action.
  • Cycles in the action graph are invalid.
  • interaction cannot be combined with for_each.
  • Outer scopes cannot reference actions inside a nested scatter or loop scope.
  • Inner scopes can reference actions in parent scopes.
  • core.transform.gather must close the scatter scope it depends on.
  • core.loop.end must close the loop scope it depends on.

Workflow definition schema

Workflow definitions may also use this outer schema:
definition:
  title: "Imported workflow"
  description: "This is the executable workflow DSL"
  entrypoint:
    expects: {}
  actions:
    - ref: a
      action: core.transform.reshape
      args:
        value: ok
  returns: ${{ ACTIONS.a.result }}

layout:
  trigger:
    x: 0
    y: 0
  viewport:
    x: 0
    y: 0
    zoom: 1
  actions:
    - ref: a
      x: 0
      y: 160

schedules:
  - cron: "0 * * * *"
    status: online

webhook:
  methods:
    - POST
  status: online

case_trigger:
  status: offline
  event_types: []
  tag_filters: []