Skip to main content

Dagster & dlt with components

info

dg and Dagster Components are under active development. You may encounter feature gaps, and the APIs may change. To report issues or give feedback, please join the #dg-components channel in the Dagster Community Slack.

The dagster-dlt library provides a DltLoadCollectionComponent which can be used to easily represent a collection of dlt sources and pipelines as assets in Dagster.

Preparing a Dagster project

To begin, you'll need a Dagster project. You can use an existing project ready for components or scaffold a new one:

dg scaffold project my-project && cd my-project/src

Next, you will need to add the dagster-dlt library to the project:

uv add dagster-dlt

Scaffolding a dlt component

Now that you have a Dagster project, you can scaffold a dlt component. You may optionally provide the source and destination types, which will be used to automatically generate a set of sample loads:

dg scaffold dagster_dlt.DltLoadCollectionComponent github_snowflake_ingest \
--source github --destination snowflake
Plugin object cache is invalidated or empty. Building cache...
Using /.../my-project/.venv/bin/dagster-components
Using /.../my-project/.venv/bin/dagster-components

The scaffold call will generate a component.yaml file and a loads.py file:

tree my_project/defs
my_project/defs
├── __init__.py
└── github_snowflake_ingest
├── component.yaml
├── github
│   ├── __init__.py
│   ├── helpers.py
│   ├── queries.py
│   ├── README.md
│   └── settings.py
└── loads.py

3 directories, 8 files

The loads.py file contains a number of dlt sources and pipelines which are referenced by Dagster, but can also be run directly using dlt:

my_project/defs/github_snowflake_ingest/loads.py
import dlt
from .github import github_reactions, github_repo_events, github_stargazers


duckdb_repo_reactions_issues_only_source = github_reactions(
"duckdb", "duckdb", items_per_page=100, max_items=100
).with_resources("issues")
duckdb_repo_reactions_issues_only_pipeline = dlt.pipeline(
"github_reactions", destination="snowflake", dataset_name="duckdb_issues"
)

airflow_events_source = github_repo_events("apache", "airflow", access_token="")
airflow_events_pipeline = dlt.pipeline(
"github_events", destination="snowflake", dataset_name="airflow_events"
)

dlthub_dlt_all_data_source = github_reactions("dlt-hub", "dlt")
dlthub_dlt_all_data_pipeline = dlt.pipeline(
"github_reactions", destination="snowflake", dataset_name="dlthub_reactions"
)

dlthub_dlt_stargazers_source = github_stargazers("dlt-hub", "dlt")
dlthub_dlt_stargazers_pipeline = dlt.pipeline(
"github_stargazers", destination="snowflake", dataset_name="dlthub_stargazers"
)

Each of these sources and pipelines are referenced by a fully scoped Python identifier in the component.yaml file, pairing them into a set of loads:

my_project/defs/github_snowflake_ingest/component.yaml
type: dagster_dlt.DltLoadCollectionComponent

attributes:
loads:
- source: .loads.duckdb_repo_reactions_issues_only_source
pipeline: .loads.duckdb_repo_reactions_issues_only_pipeline
- source: .loads.airflow_events_source
pipeline: .loads.airflow_events_pipeline
- source: .loads.dlthub_dlt_all_data_source
pipeline: .loads.dlthub_dlt_all_data_pipeline
- source: .loads.dlthub_dlt_stargazers_source
pipeline: .loads.dlthub_dlt_stargazers_pipeline

You can list the assets produced by the various loads:

dg list defs

┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━┩ │
│ │ │ airflow_events/repo_events │ default │ github_repo_events_repo_events │ dlt │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────────────┼─────────┼────────────────────────────────┼───────────┼─────────────┤ │
│ │ │ dlthub_reactions/issues │ default │ github_reactions_issues │ dlt │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────────────┼─────────┼────────────────────────────────┼───────────┼─────────────┤ │
│ │ │ dlthub_reactions/pull_requests │ default │ github_reactions_pull_requests │ dlt │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────────────┼─────────┼────────────────────────────────┼───────────┼─────────────┤ │
│ │ │ dlthub_stargazers/stargazers │ default │ github_stargazers_stargazers │ dlt │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────────────┼─────────┼────────────────────────────────┼───────────┼─────────────┤ │
│ │ │ duckdb_issues/issues │ default │ github_reactions_issues │ dlt │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────────────┼─────────┼────────────────────────────────┼───────────┼─────────────┤ │
│ │ │ github_reactions_issues │ default │ │ │ │ │
│ │ ├────────────────────────────────┼─────────┼────────────────────────────────┼───────────┼─────────────┤ │
│ │ │ github_reactions_pull_requests │ default │ │ │ │ │
│ │ ├────────────────────────────────┼─────────┼────────────────────────────────┼───────────┼─────────────┤ │
│ │ │ github_repo_events_repo_events │ default │ │ │ │ │
│ │ ├────────────────────────────────┼─────────┼────────────────────────────────┼───────────┼─────────────┤ │
│ │ │ github_stargazers_stargazers │ default │ │ │ │ │
│ │ └────────────────────────────────┴─────────┴────────────────────────────────┴───────────┴─────────────┘ │
└─────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────┘

Customizing dlt assets

Properties of the assets emitted by each load can be customized in the component.yaml file using the translation key:

my_project/defs/github_snowflake_ingest/component.yaml
type: dagster_dlt.DltLoadCollectionComponent

attributes:
loads:
- source: .loads.dlthub_dlt_stargazers_source
pipeline: .loads.dlthub_dlt_stargazers_pipeline
translation:
group_name: github_data
description: "Loads all users who have starred the dlt-hub/dlt repo"
dg list defs

┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━┩ │
│ │ │ dlthub_stargazers/stargazers │ github_data │ github_stargazers_st… │ dlt │ Loads all users who │ │
│ │ │ │ │ │ snowflake │ have starred the │ │
│ │ │ │ │ │ │ dlt-hub/dlt repo │ │
│ │ ├──────────────────────────────┼─────────────┼───────────────────────┼───────────┼───────────────────────┤ │
│ │ │ github_stargazers_stargazers │ default │ │ │ │ │
│ │ └──────────────────────────────┴─────────────┴───────────────────────┴───────────┴───────────────────────┘ │
└─────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

Both the DltResource and Pipeline objects are available in scope, and can be used for dynamic customization:

my_project/defs/github_snowflake_ingest/component.yaml
type: dagster_dlt.DltLoadCollectionComponent

attributes:
loads:
- source: .loads.dlthub_dlt_stargazers_source
pipeline: .loads.dlthub_dlt_stargazers_pipeline
translation:
metadata:
resource_name: "{{ resource.name }}"
pipeline_name: "{{ pipeline.pipeline_name }}"
is_transformer: "{{ resource.is_transformer }}"