Create and materialize a downstream asset
Now that we have the raw data loaded into DuckDB, we need to create a downstream asset that combines the upstream assets together. In this step, you will:
- Create a downstream asset
- Materialize that asset
1. Create a downstream asset
Now that we have all of our raw data loaded into DuckDB, our next step is to merge it together in a view composed of data from all three source tables.
To accomplish this in SQL, we will bring in our sales_data table and then left join on sales_reps and products on their respective id columns. Additionally, we will keep this view concise and only have relevant columns for analysis.
As you can see, the new joined_data asset looks a lot like our previous ones, with a few small changes. We put this asset into a different group. To make this asset dependent on the raw tables, we add the asset keys to the deps parameter in the asset definition.
@dg.asset(
    compute_kind="duckdb",
    group_name="joins",
    deps=[sales_data, sales_reps, products],
)
def joined_data(duckdb: DuckDBResource) -> dg.MaterializeResult:
    with duckdb.get_connection() as conn:
        conn.execute(
            """
            create or replace view joined_data as (
                select 
                    date,
                    dollar_amount,
                    customer_name,
                    quantity,
                    rep_name,
                    department,
                    hire_date,
                    product_name,
                    category,
                    price
                from sales_data
                left join sales_reps
                    on sales_reps.rep_id = sales_data.rep_id
                left join products
                    on products.product_id = sales_data.product_id
            )
            """
        )
        preview_query = "select * from joined_data limit 10"
        preview_df = conn.execute(preview_query).fetchdf()
        row_count = conn.execute("select count(*) from joined_data").fetchone()
        count = row_count[0] if row_count else 0
        return dg.MaterializeResult(
            metadata={
                "row_count": dg.MetadataValue.int(count),
                "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
            }
        )
2. Materialize the asset
- Add the joined_data asset to the Definitions object
defs = dg.Definitions(
  assets=[products,
      sales_reps,
      sales_data,
      joined_data,
  ],
  resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")},
)
- In the Dagster UI, reload definitions and materialize the joined_dataasset.
Next steps
- Continue this tutorial with ensuring data quality with asset checks.