Skip to main content

dbt patterns and best practices

This guide covers advanced patterns and best practices for integrating dbt with Dagster, helping you build more maintainable data pipelines.

Preventing concurrent dbt snapshots

dbt snapshots track changes to data over time by comparing current data to previous snapshots. Running snapshots concurrently can corrupt these tables, so it's critical to ensure only one snapshot operation runs at a time.

Option 1: Separate snapshots from other models

Create separate dbt component definitions to isolate snapshots from your regular dbt models. First, scaffold two dbt components:

# Create component for regular models
dg scaffold defs dagster_dbt.DbtProjectComponent dbt_models

# Create component for snapshots
dg scaffold defs dagster_dbt.DbtProjectComponent dbt_snapshots

Configure the regular models component to exclude snapshots:

my_project/defs/dbt_models/defs.yaml
type: dagster_dbt.DbtProjectComponent

attributes:
project: '{{ project_root }}/dbt'
exclude: "resource_type:snapshot"

Configure the snapshots component with concurrency control:

my_project/defs/dbt_snapshots/defs.yaml
type: dagster_dbt.DbtProjectComponent

attributes:
project: '{{ project_root }}/dbt'
select: "resource_type:snapshot"

post_processing:
assets:
- target: "*"
attributes:
pool: "dbt-snapshots"

Option 2: Configure concurrency pools

Configure your Dagster instance to create pools with maximum concurrency of 1. Add this configuration to your dagster.yaml (for Dagster Open Source) or deployment settings (for Dagster+):

dagster.yaml
concurrency:
pools:
dbt-snapshots:
limit: 1
granularity: 'op'

Then set the pool limit for the snapshot pool:

# Set pool limit using CLI
dagster instance concurrency set dbt-snapshots 1

Option 3: Manage multiple snapshot groups with Dagster components

For large projects with many snapshots, you can create multiple snapshot groups while still preventing concurrency issues within each group. Create separate Dagster components for different business domains:

# Create component for sales snapshots
dg scaffold defs dagster_dbt.DbtProjectComponent dbt_snapshots_sales

# Create component for inventory snapshots
dg scaffold defs dagster_dbt.DbtProjectComponent dbt_snapshots_inventory

Sales snapshots component:

my_project/defs/dbt_snapshots_sales/defs.yaml
type: dagster_dbt.DbtProjectComponent

attributes:
project: '{{ project_root }}/dbt'
select: "resource_type:snapshot,path:snapshots/sales/*"

post_processing:
assets:
- target: "*"
attributes:
pool: "sales-snapshots"

Inventory snapshots component:

my_project/defs/dbt_snapshots_inventory/defs.yaml
type: dagster_dbt.DbtProjectComponent

attributes:
project: '{{ project_root }}/dbt'
select: "resource_type:snapshot,path:snapshots/inventory/*"

post_processing:
assets:
- target: "*"
attributes:
pool: "inventory-snapshots"

Configure separate pool limits for each domain. This approach allows snapshots from different business domains to run in parallel while preventing concurrent execution within each domain, reducing the risk of corruption while maintaining reasonable performance.

Microbatch incremental models

dbt's microbatch incremental strategy uses a fundamentally different batching model than regular incremental models. Understanding the difference determines which CLI flags you pass from Dagster.

Regular incremental models

With regular incremental models, you control the row filtering. Dagster passes --vars to provide date boundaries, and your SQL uses {% if is_incremental() %} to filter rows within those boundaries. dbt runs the model once for the entire window:

dbt_assets.py
@dbt_assets(
manifest=dbt_project.manifest_path,
select=INCREMENTAL_SELECTOR,
partitions_def=daily_partition,
)
def incremental_dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
time_window = context.partition_time_window
dbt_vars = {
"start_date": time_window.start.strftime("%Y-%m-%d"),
"end_date": time_window.end.strftime("%Y-%m-%d"),
}

yield from dbt.cli(
["build", "--vars", json.dumps(dbt_vars)], context=context
).stream()

models/incremental_model.sql
select
event_id,
event_time,
user_id,
event_type
from {{ source('app', 'events') }}
{% if is_incremental() %}
where event_time >= '{{ var("start_date") }}' and event_time < '{{ var("end_date") }}'
{% endif %}

Microbatch incremental models

With microbatch, dbt controls the batching. You configure the model with an event_time column and a batch_size, and dbt's engine automatically subdivides the window into discrete batches — running the model once per batch. Your SQL doesn't need a manual filter; dbt injects it based on event_time.

models/microbatch_model.sql
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='event_time',
batch_size='day',
begin='2023-01-01'
) }}

select
event_id,
event_time,
user_id,
event_type
from {{ source('app', 'events') }}

Dagster passes --event-time-start and --event-time-end to tell the microbatch engine which window to process. These flags drive the engine directly. Passing --vars instead has no effect on batch scheduling — it only injects values into the SQL template — which is why misconfigured microbatch models silently process all batches from begin to now rather than the intended partition window.

dbt_assets.py
@dbt_assets(
manifest=dbt_project.manifest_path,
select=MICROBATCH_SELECTOR,
partitions_def=daily_partition,
)
def microbatch_dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
time_window = context.partition_time_window

yield from dbt.cli(
[
"build",
"--event-time-start",
time_window.start.strftime("%Y-%m-%d"),
"--event-time-end",
time_window.end.strftime("%Y-%m-%d"),
],
context=context,
).stream()

Because dbt processes one batch_size interval at a time, your PartitionsDefinition should match:

dbt batch_sizeDagster PartitionsDefinition
dayDailyPartitionsDefinition
monthMonthlyPartitionsDefinition
yearYearlyPartitionsDefinition

The start_date of the PartitionsDefinition should match the model's begin config so Dagster backfills align with dbt's batch history.

Blue/green deployments with clone-then-swap

A dbt build that fails partway through can leave consumers reading a half-built table. A blue/green deployment isolates the tables from in-progress runs by keeping two parallel schemas — green (live, what consumers query) and blue (staging) — and promoting blue into green only after the whole run, including models and tests, finishes cleanly.

In Dagster, the dbt project materializes as assets and its dbt tests surface as asset checks. The green tables behind your assets update only when a materialization fully succeeds — a failed run leaves green tables exactly as consumers last saw them.

You orchestrate the run with a DbtProjectComponent that invokes dbt build. Because build runs models and tests in a single invocation, one Dagster run covers the entire promotion, and the blue/green logic stays inside the dbt project:

my_project/defs/dbt/defs.yaml
type: dagster_dbt.DbtProjectComponent

attributes:
project:
project_dir: ../../dbt_project
# `dbt build` runs models AND tests in one invocation, so the on-run-end
# hook sees every test result and can gate the swap. Dagster orchestrates
# the build; the blue/green safety lives entirely in the dbt hooks.
cli_args:
- build

The flow has three stages, configured through dbt_project.yml:

  1. Clone green to blue in an on-run-start hook, so the run begins from the current published state.
  2. Build into blue — models are configured to materialize into the blue schema.
  3. Promote blue to green in an on-run-end hook, but only if the whole run was clean.
dbt_project.yml
name: bluegreen_analytics
version: "1.0.0"
profile: bluegreen_analytics
config-version: 2

vars:
green_schema: green # live schema that consumers read
blue_schema: blue # staging schema that models build into

# Before any model builds, clone the live "green" tables into "blue" so the
# run starts from the current published state.
on-run-start:
- "{{ clone_green_to_blue() }}"

# After EVERY model and test has finished, promote blue to green — but only if
# the whole run was clean. Per-model post-hooks fire before tests, so they
# cannot gate promotion on test results. See swap_all_marts_if_clean.sql.
on-run-end:
- "{{ swap_all_marts_if_clean() }}"

models:
bluegreen_analytics:
staging:
+schema: blue
+materialized: view
marts:
+schema: blue
+materialized: table

The clone is dispatched on adapter type. On Snowflake, the clone is a metadata-only operation that's effectively free even for TB-scale tables (see zero-copy clone):

macros/clone_green_to_blue.sql
{#
Prep the "blue" staging schema with the current state of "green" before any
model builds, so incremental models, views, and unselected tables all start
from the live published data. Runs as on-run-start.

Dispatched on adapter type: Snowflake clones the whole schema in one
zero-copy metadata operation. Warehouses without a schema-clone primitive
can supply their own implementation (e.g. duckdb__) that iterates CTAS
per table.
#}
{% macro clone_green_to_blue() %}
{{ return(adapter.dispatch('clone_green_to_blue', 'bluegreen_analytics')()) }}
{% endmacro %}


{% macro snowflake__clone_green_to_blue() %}
{% if execute %}
{% set green = var('green_schema') %}
{% set blue = var('blue_schema') %}
{% set db = target.database %}

{% set green_exists_sql %}
select count(*)
from {{ db }}.information_schema.schemata
where schema_name = upper('{{ green }}')
{% endset %}
{% set green_exists = run_query(green_exists_sql).rows[0][0] > 0 %}

{% if not green_exists %}
{% do log("No " ~ green ~ " schema yet; first publish run.", info=True) %}
{% do run_query("create schema if not exists " ~ db ~ "." ~ blue) %}
{% else %}
{% do run_query(
"create or replace schema " ~ db ~ "." ~ blue
~ " clone " ~ db ~ "." ~ green
) %}
{% endif %}
{% endif %}
{% endmacro %}

Promotion of each mart is an atomic ALTER TABLE ... SWAP WITH. On Snowflake, this is a metadata rename, so consumers see only the old or the new table, never a partial write:

macros/swap_blue_to_green.sql
{#
Promote a freshly-built blue table into green. On Snowflake this is an
atomic ALTER TABLE ... SWAP WITH — a metadata rename, so consumers see only
the old or the new table, never a half-built state. On a first publish
(green doesn't exist yet) it clones blue into green instead.

Called once per mart by swap_all_marts_if_clean. Warehouses without an
atomic swap can supply their own implementation (e.g. duckdb__) that
snapshots green, then CTAS's blue into green's place.
#}
{% macro swap_blue_to_green(model_relation) %}
{{ return(adapter.dispatch('swap_blue_to_green', 'bluegreen_analytics')(model_relation)) }}
{% endmacro %}


{% macro snowflake__swap_blue_to_green(model_relation) %}
{% if execute %}
{% set green = var('green_schema') %}
{% set blue = var('blue_schema') %}
{% set db = target.database %}
{% set tname = model_relation.identifier %}

{% set green_exists_sql %}
select count(*)
from {{ db }}.information_schema.tables
where table_schema = upper('{{ green }}') and table_name = upper('{{ tname }}')
{% endset %}
{% set green_exists = run_query(green_exists_sql).rows[0][0] > 0 %}

{% if green_exists %}
{% do log("SWAP " ~ blue ~ "." ~ tname ~ " <-> " ~ green ~ "." ~ tname, info=True) %}
{% do run_query(
"alter table " ~ db ~ "." ~ blue ~ "." ~ tname
~ " swap with " ~ db ~ "." ~ green ~ "." ~ tname
) %}
{% else %}
{% do log("First publish of " ~ tname ~ "; promoting blue -> green", info=True) %}
{% do run_query(
"create or replace table " ~ db ~ "." ~ green ~ "." ~ tname
~ " clone " ~ db ~ "." ~ blue ~ "." ~ tname
) %}
{% endif %}
{% endif %}
{% endmacro %}

Why on-run-end and not a per-model post-hook

This is the critical detail. dbt's post-hook fires after a model's SQL completes but before the tests dbt scheduled on that model. Promoting in a post-hook means a bad model is already swapped into green by the time its unique / not_null tests fail, which is exactly the failure the blue/green flow is meant to prevent.

on-run-end is the first hook point where every model and every test has finished and the results array is populated with each node's status. The orchestrator macro walks results, aborts the whole promotion if any node has a status other than success or pass, and otherwise swaps every mart:

macros/swap_all_marts_if_clean.sql
{#
on-run-end hook for the blue/green flow.

Promotes every mart from blue to green ONLY if every node in the run
finished cleanly (models built, tests passed, nothing skipped or errored).
If anything failed, blue keeps the un-promoted data and green is left
untouched — fix the issue and re-run, and the next clean run promotes.

Why on-run-end and not a per-model post-hook: post-hook fires when a model's
SQL finishes but BEFORE dbt runs the tests attached to that model. Promoting
in a post-hook would swap a bad model into green before its own unique /
not_null tests had a chance to fail. on-run-end is the first hook point where
every model and every test has finished and the `results` array is populated
with each node's status.

dbt invokes on-run-end whenever the run finishes, INCLUDING when models or
tests failed (failures live in `results`, not as raised exceptions). It is
skipped only on catastrophic errors (parse failure, on-run-start raising, the
process being killed)in which case nothing was published anyway.
#}
{% macro swap_all_marts_if_clean() %}
{% if execute %}
{% if results is not defined or results | length == 0 %}
{% do log("No node results available — nothing to swap.", info=True) %}
{{ return("") }}
{% endif %}

{# Abort the entire promotion if any node did not succeed or pass. #}
{% set failures = [] %}
{% for r in results %}
{% if r.status not in ['success', 'pass'] %}
{% do failures.append(r.node.unique_id ~ " (" ~ r.status ~ ")") %}
{% endif %}
{% endfor %}

{% if failures | length > 0 %}
{% do log(
"Build had " ~ failures | length ~ " failure(s); skipping swap. "
~ "Blue holds the un-promoted data. Failed nodes: "
~ failures | join(", "),
info=True
) %}
{{ return("") }}
{% endif %}

{# Clean run: promote every mart. #}
{% set marts_swapped = [] %}
{% for r in results %}
{% if r.node.resource_type == 'model' and 'marts' in r.node.fqn %}
{% do swap_blue_to_green(r.node) %}
{% do marts_swapped.append(r.node.name) %}
{% endif %}
{% endfor %}

{% do log(
"Swap complete; promoted " ~ marts_swapped | length ~ " mart(s) to green: "
~ marts_swapped | join(", "),
info=True
) %}
{% endif %}
{% endmacro %}

dbt invokes on-run-end whenever the run finishes, including when models or tests failed. Failures live in the results array, not as raised exceptions. It is skipped only on catastrophic errors (parse failure, on-run-start raising, the process being killed), in which case nothing was published anyway, and green stays untouched.

From Dagster's side, this reduces to a single contract: green advances only when the run's materialization — every model and every asset check — comes back clean. The clone and swap macros log a line per schema and per mart, so you can follow each promotion in the run's compute logs.

Trade-offs

  • All-or-nothing promotion. One failed test means no mart promotes, even ones whose own tests passed. The usual remedy is "fix the broken model and re-run." Walking results to promote only the marts that are clean and not downstream of any failure is possible but considerably more complex.
  • No cross-table consistency during the swap window. The hook swaps marts one statement at a time, so there's a brief window where some marts are promoted and others aren't. Consumers reading multiple marts in that window can see mixed old/new. To close it fully, have consumers query a view that points at one of two green slots and flip the view in a single statement at the end.

Organizing dbt assets into groups

By default, all dbt assets land in a single dbt group. To split them into meaningful groups, subclass DagsterDbtTranslator and override get_group_name. Common grouping strategies:

  • Model directory (e.g., marts/, staging/, intermediate/) for layer-based pipelines.
  • dbt tags (e.g., finance, marketing) when you already use tags to organize models.
  • meta configuration for explicit per-model overrides defined in dbt source control.
src/<project_name>/defs/dbt_assets.py
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
# Group by model directory.
model_path = dbt_resource_props.get("original_file_path", "")
if "marts/" in model_path:
return "marts"
if "staging/" in model_path:
return "staging"
if "intermediate/" in model_path:
return "intermediate"

# Group by tag.
tags = dbt_resource_props.get("tags", [])
if "finance" in tags:
return "finance"
if "marketing" in tags:
return "marketing"

# Group by meta config.
meta = dbt_resource_props.get("config", {}).get("meta", {})
if "dagster_group" in meta:
return meta["dagster_group"]

return "dbt_models"


@dbt_assets(
manifest=MANIFEST_PATH,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def grouped_dbt_assets(context: dg.AssetExecutionContext, dbt_cli: DbtCliResource):
yield from dbt_cli.cli(["build"], context=context).stream()

Pick one strategy or combine them; any model that doesn't match the rules falls back to the default group returned at the end.

Running tagged dbt tests

dbt tests are not Dagster assets. They are operations that run against assets, so build_dbt_asset_selection with a tag filters the assets that have that tag, not the tests.

To run only the tests with a specific tag (including source tests), invoke the dbt CLI directly through DbtCliResource and use dbt's selection syntax:

src/<project_name>/defs/dbt_tests.py
@dg.op(required_resource_keys={"dbt"})
def run_tagged_tests(context):
"""Run only tests with specific tags using the dbt CLI directly."""
dbt = context.resources.dbt
return dbt.cli(["test", "--select", "tag:hourly_tests"], context=context)


@dg.job(resource_defs={"dbt": DbtCliResource(project_dir="path/to/dbt")})
def hourly_tests_job():
run_tagged_tests()


hourly_test_schedule = dg.ScheduleDefinition(
name="hourly_tests_schedule",
cron_schedule="0 * * * *",
job=hourly_tests_job,
execution_timezone="UTC",
)

The asset-based approach using build_dbt_asset_selection with tags is the right tool for selecting models, but it cannot select tests on sources or untagged tests on tagged models. Use the CLI pattern above when test selection is what you need.

Running specific dbt models from the UI

To materialize a specific subset of dbt models on demand, navigate to your code location's Assets tab and use the asset selection input. The input accepts dbt selection syntax similar to dbt -s:

  • Model names directly: model1 model2
  • dbt selection syntax: tag:staging, +model, model+
  • Filter by group, tag, or asset key

This is the typical path for ad-hoc runs (recovering from outages, materializing a small set of new models) without writing a custom job.

Macro changes are not detected by code_version_changed

AutomationCondition.code_version_changed() does not detect changes to dbt macros. Code versions for dbt assets are derived from each model's raw_code or raw_sql in the manifest.json. Macros and ephemeral models are not imported into the Dagster asset graph, so their content is not part of the code version calculation.

If you change a macro that downstream models depend on, the consuming models' code versions do not change, and the automation condition will not mark them stale. Until dagster#22566 is resolved, treat macro changes as a manual trigger:

  • Identify which models depend on the changed macros.
  • Manually materialize those assets after deploying the macro change.
  • For workflows where this happens often, consider a custom automation condition that watches macro file modifications, or fall back to a time-based trigger.

Recovering from snapshot SQL compilation errors after package updates

dbt snapshots can start failing across all environments simultaneously after a dbt deps update if a transitive package changes a macro that snapshot-related models compile against. Symptoms include syntax errors at column positions that look unrelated to your code (for example, syntax error line 15 at position 21 unexpected ')' from Snowflake) and local environments working until packages are reinstalled.

Reset the dbt environment to clear cached dependencies:

dbt clean && dbt deps

Then re-run the snapshots. To prevent recurrence, pin package versions in packages.yml rather than letting them float, and test snapshots in a development environment after any package upgrade before promoting to production.

Recovering from infrastructure interruptions

To gracefully recover from infrastructure interruptions, such as a Kubernetes node eviction or a pod termination, use the FROM_ASSET_FAILURE run retry strategy with a dagster/retry_on_asset_or_op_failure setting value of false to use persisted asset materialization records from the event log and automatically exclude already-materialized assets during retry. This enables recovering without requiring persisted dbt artifacts. See Configuring run retries.

src/my_project/jobs.py
asset_job = dg.define_asset_job(
name="asset_job",
selection=dg.AssetSelection.assets(asset_one, asset_two, asset_three),
tags={
"dagster/max_retries": "3",
"dagster/retry_strategy": "FROM_ASSET_FAILURE",
"dagster/retry_on_asset_or_op_failure": "false",
},
)