Dagster & Airbyte Cloud (Legacy)
This feature is considered in a beta stage. It is still being tested and may change. For more information, see the API lifecycle stages documentation.
If you are just getting started with the Airbyte Cloud integration, we recommend using the new Airbyte Cloud component.
This guide provides instructions for using Dagster with Airbyte Cloud using the dagster-airbyte library. Your Airbyte Cloud connection tables can be represented as assets in the Dagster asset graph, allowing you to track lineage and dependencies between Airbyte Cloud assets and data assets you are already modeling in Dagster. You can also use Dagster to orchestrate Airbyte Cloud connections, allowing you to trigger syncs for these on a cadence or based on upstream data changes.
What you'll learn
- How to represent Airbyte Cloud assets in the Dagster asset graph, including lineage to other Dagster assets.
- How to customize asset definition metadata for these Airbyte Cloud assets.
- How to materialize Airbyte Cloud connection tables from Dagster.
- How to customize how Airbyte Cloud connection tables are materialized.
Prerequisites
- The
dagsteranddagster-airbytelibraries installed in your environment - Familiarity with asset definitions and the Dagster asset graph
- Familiarity with Dagster resources
- Familiarity with Airbyte Cloud concepts, like connections and connection tables
- An Airbyte Cloud workspace
- An Airbyte Cloud client ID and client secret. For more information, see Configuring API Access in the Airbyte Cloud REST API documentation.
Set up your environment
To get started, you'll need to install the dagster and dagster-airbyte Python packages:
- uv
- pip
uv add dagster-airbyte
pip install dagster-airbyte
Represent Airbyte Cloud assets in the asset graph
To load Airbyte Cloud assets into the Dagster asset graph, you must first construct a AirbyteCloudWorkspace resource, which allows Dagster to communicate with your Airbyte Cloud workspace. You'll need to supply your workspace ID, client ID and client secret. See Configuring API Access in the Airbyte Cloud REST API documentation for more information on how to create your client ID and client secret.
Dagster can automatically load all connection tables from your Airbyte Cloud workspace as asset specs. Call the load_airbyte_asset_specs function, which returns list of AssetSpecs representing your Airbyte Cloud assets. You can then include these asset specs in your Definitions object:
from dagster_airbyte import AirbyteCloudWorkspace, load_airbyte_cloud_asset_specs
import dagster as dg
airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)
airbyte_cloud_specs = load_airbyte_cloud_asset_specs(airbyte_workspace)
defs = dg.Definitions(assets=airbyte_cloud_specs)
Sync and materialize Airbyte Cloud assets
You can use Dagster to sync Airbyte Cloud connections and materialize Airbyte Cloud connection tables. You can use the build_airbyte_assets_definitions factory to create all assets definitions for your Airbyte Cloud workspace.
from dagster_airbyte import AirbyteCloudWorkspace, build_airbyte_assets_definitions
import dagster as dg
airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)
all_airbyte_assets = build_airbyte_assets_definitions(workspace=airbyte_workspace)
defs = dg.Definitions(
assets=all_airbyte_assets,
resources={"airbyte": airbyte_workspace},
)
Customize the materialization of Airbyte Cloud assets
If you want to customize the sync of your connections, you can use the airbyte_assets decorator to do so. This allows you to execute custom code before and after the call to the Airbyte Cloud sync.
from dagster_airbyte import AirbyteCloudWorkspace, airbyte_assets
import dagster as dg
airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)
@airbyte_assets(
connection_id="airbyte_connection_id", # Replace with your connection ID
workspace=airbyte_workspace,
name="airbyte_connection_name", # Replace with your connection name
group_name="airbyte_connection_name",
)
def airbyte_connection_assets(
context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace
):
# Do something before the materialization...
yield from airbyte.sync_and_poll(context=context)
# Do something after the materialization...
defs = dg.Definitions(
assets=[airbyte_connection_assets],
resources={"airbyte": airbyte_workspace},
)