OpenAI (dagster-openai)
The dagster_openai library provides utilities for using OpenAI with Dagster. A good place to start with dagster_openai is the guide.
- dagster_openai.with_usage_metadata
- This wrapper can be used on any endpoint of the openai library <https://github.com/openai/openai-python> to log the OpenAI API usage metadata in the asset metadata. - Examples: - from dagster import (
 AssetExecutionContext,
 AssetKey,
 AssetSelection,
 AssetSpec,
 Definitions,
 EnvVar,
 MaterializeResult,
 asset,
 define_asset_job,
 multi_asset,
 )
 from dagster_openai import OpenAIResource, with_usage_metadata
 @asset(compute_kind="OpenAI")
 def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
 with openai.get_client(context) as client:
 client.fine_tuning.jobs.create = with_usage_metadata(
 context=context, output_name="some_output_name", func=client.fine_tuning.jobs.create
 )
 client.fine_tuning.jobs.create(model="gpt-3.5-turbo", training_file="some_training_file")
 openai_asset_job = define_asset_job(name="openai_asset_job", selection="openai_asset")
 @multi_asset(
 specs=[
 AssetSpec("my_asset1"),
 AssetSpec("my_asset2"),
 ]
 )
 def openai_multi_asset(context: AssetExecutionContext, openai: OpenAIResource):
 with openai.get_client(context, asset_key=AssetKey("my_asset1")) as client:
 client.chat.completions.create(
 model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Say this is a test"}]
 )
 # The materialization of `my_asset1` will include both OpenAI usage metadata
 # and the metadata added when calling `MaterializeResult`.
 return (
 MaterializeResult(asset_key="my_asset1", metadata={"foo": "bar"}),
 MaterializeResult(asset_key="my_asset2", metadata={"baz": "qux"}),
 )
 openai_multi_asset_job = define_asset_job(
 name="openai_multi_asset_job", selection=AssetSelection.assets(openai_multi_asset)
 )
 defs = Definitions(
 assets=[openai_asset, openai_multi_asset],
 jobs=[openai_asset_job, openai_multi_asset_job],
 resources={
 "openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
 },
 )
- classdagster_openai.OpenAIResource
- This resource is wrapper over the openai library. - By configuring this OpenAI resource, you can interact with OpenAI API and log its usage metadata in the asset metadata. - Examples: - import os
 from dagster import AssetExecutionContext, Definitions, EnvVar, asset, define_asset_job
 from dagster_openai import OpenAIResource
 @asset(compute_kind="OpenAI")
 def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
 with openai.get_client(context) as client:
 client.chat.completions.create(
 model="gpt-3.5-turbo",
 messages=[{"role": "user", "content": "Say this is a test"}]
 )
 openai_asset_job = define_asset_job(name="openai_asset_job", selection="openai_asset")
 defs = Definitions(
 assets=[openai_asset],
 jobs=[openai_asset_job],
 resources={
 "openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
 },
 )- get_client
- Yields an - openai.Clientfor interacting with the OpenAI API.- By default, in an asset context, the client comes with wrapped endpoints for three API resources, Completions, Embeddings and Chat, allowing to log the API usage metadata in the asset metadata. - Note that the endpoints are not and cannot be wrapped to automatically capture the API usage metadata in an op context. - Parameters: context – The - contextobject for computing the op or asset in which- get_clientis called. Examples:- from dagster import (
 AssetExecutionContext,
 Definitions,
 EnvVar,
 GraphDefinition,
 OpExecutionContext,
 asset,
 define_asset_job,
 op,
 )
 from dagster_openai import OpenAIResource
 @op
 def openai_op(context: OpExecutionContext, openai: OpenAIResource):
 with openai.get_client(context) as client:
 client.chat.completions.create(
 model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Say this is a test"}]
 )
 openai_op_job = GraphDefinition(name="openai_op_job", node_defs=[openai_op]).to_job()
 @asset(compute_kind="OpenAI")
 def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
 with openai.get_client(context) as client:
 client.chat.completions.create(
 model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Say this is a test"}]
 )
 openai_asset_job = define_asset_job(name="openai_asset_job", selection="openai_asset")
 defs = Definitions(
 assets=[openai_asset],
 jobs=[openai_asset_job, openai_op_job],
 resources={
 "openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
 },
 )
 - get_client_for_asset
- Yields an - openai.Clientfor interacting with the OpenAI.- When using this method, the OpenAI API usage metadata is automatically logged in the asset materializations associated with the provided - asset_key.- By default, the client comes with wrapped endpoints for three API resources, Completions, Embeddings and Chat, allowing to log the API usage metadata in the asset metadata. - This method can only be called when working with assets, i.e. the provided - contextmust be of type- AssetExecutionContext.- Parameters: - context – The contextobject for computing the asset in whichget_clientis called.
- asset_key – the asset_keyof the asset for which a materialization should include the metadata.
 - Examples: - from dagster import (
 AssetExecutionContext,
 AssetKey,
 AssetSpec,
 Definitions,
 EnvVar,
 MaterializeResult,
 asset,
 define_asset_job,
 multi_asset,
 )
 from dagster_openai import OpenAIResource
 @asset(compute_kind="OpenAI")
 def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
 with openai.get_client_for_asset(context, context.asset_key) as client:
 client.chat.completions.create(
 model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Say this is a test"}]
 )
 openai_asset_job = define_asset_job(name="openai_asset_job", selection="openai_asset")
 @multi_asset(specs=[AssetSpec("my_asset1"), AssetSpec("my_asset2")], compute_kind="OpenAI")
 def openai_multi_asset(context: AssetExecutionContext, openai_resource: OpenAIResource):
 with openai_resource.get_client_for_asset(context, asset_key=AssetKey("my_asset1")) as client:
 client.chat.completions.create(
 model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Say this is a test"}]
 )
 return (
 MaterializeResult(asset_key="my_asset1", metadata={"some_key": "some_value1"}),
 MaterializeResult(asset_key="my_asset2", metadata={"some_key": "some_value2"}),
 )
 openai_multi_asset_job = define_asset_job(
 name="openai_multi_asset_job", selection="openai_multi_asset"
 )
 defs = Definitions(
 assets=[openai_asset, openai_multi_asset],
 jobs=[openai_asset_job, openai_multi_asset_job],
 resources={
 "openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
 },
 )
- context – The