I am just starting with dagster. I want to do something similar to what is described in this section of the docs: https://docs.dagster.io/concepts/io-management/io-managers#using-io-managers-to-load-source-data
I created I/O managers to read/write a file in CSV and parquet into and from pandas DataFrame objects. I am only trying to load a CSV file and write a parquet with the same data. My asset code looks like this:
import pandas as pd
from dagster import asset, AssetKey, AssetSpec
csv_asset = AssetSpec(
key = AssetKey(["in", "dropout"]),
metadata = {"dagster/io_manager_key": "local_csv_io_manager"},
)
@asset(io_manager_key="local_parquet_io_manager", key_prefix=["out"])
def parquet_asset(csv_asset: pd.DataFrame):
return csv_asset
defs = Definitions(
assets=[csv_asset, parquet_asset],
resources={
"local_parquet_io_manager": LocalParquetIOManager(),
"local_csv_io_manager": LocalCSVIOManager()
}
)
This results in the following:
2024-09-25 10:51:53 +0200 - dagster - WARNING - /Users/jbelis/Sources/codekarma/aws-ecs-test/.venv2/lib/python3.12/site-packages/dagster/_core/workspace/context.py:660:
UserWarning: Error loading repository location definitions.py:dagster._core.errors.DagsterInvalidDefinitionError:
Input asset '["csv_asset"]' for asset '["out", "parquet_asset"]' is not produced by any of the provided asset ops and is not one of the provided sources.
What am I missing? Thank you!
This is using dagster 1.8.8
This is the stack strace:
File "/Users/jbelis/Sources/codekarma/aws-ecs-test/.venv2/lib/python3.12/site-packages/dagster/_grpc/server.py", line 421, in __init__
self._loaded_repositories: Optional[LoadedRepositories] = LoadedRepositories(
^^^^^^^^^^^^^^^^^^^
File "/Users/jbelis/Sources/codekarma/aws-ecs-test/.venv2/lib/python3.12/site-packages/dagster/_grpc/server.py", line 275, in __init__
repo_def = recon_repo.get_definition()
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jbelis/Sources/codekarma/aws-ecs-test/.venv2/lib/python3.12/site-packages/dagster/_core/definitions/reconstruct.py", line 120, in get_definition
return repository_def_from_pointer(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jbelis/Sources/codekarma/aws-ecs-test/.venv2/lib/python3.12/site-packages/dagster/_core/definitions/reconstruct.py", line 777, in repository_def_from_pointer
repo_def = repository_def_from_target_def(target, load_type, repository_load_data)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jbelis/Sources/codekarma/aws-ecs-test/.venv2/lib/python3.12/site-packages/dagster/_core/definitions/reconstruct.py", line 743, in repository_def_from_target_def
target = target.get_inner_repository()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jbelis/Sources/codekarma/aws-ecs-test/.venv2/lib/python3.12/site-packages/dagster/_utils/cached_method.py", line 104, in _cached_method_wrapper
result = method(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jbelis/Sources/codekarma/aws-ecs-test/.venv2/lib/python3.12/site-packages/dagster/_core/definitions/definitions_class.py", line 600, in get_inner_repository
return _create_repository_using_definitions_args(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jbelis/Sources/codekarma/aws-ecs-test/.venv2/lib/python3.12/site-packages/dagster/_core/definitions/definitions_class.py", line 290, in _create_repository_using_definitions_args
@repository(
^^^^^^^^^^^
File "/Users/jbelis/Sources/codekarma/aws-ecs-test/.venv2/lib/python3.12/site-packages/dagster/_core/definitions/decorators/repository_decorator.py", line 170, in __call__
else CachingRepositoryData.from_list(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jbelis/Sources/codekarma/aws-ecs-test/.venv2/lib/python3.12/site-packages/dagster/_core/definitions/repository_definition/repository_data.py", line 382, in from_list
return build_caching_repository_data_from_list(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jbelis/Sources/codekarma/aws-ecs-test/.venv2/lib/python3.12/site-packages/dagster/_core/definitions/repository_definition/repository_data_builder.py", line 317, in build_caching_repository_data_from_list
asset_graph = AssetGraph.from_assets(
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jbelis/Sources/codekarma/aws-ecs-test/.venv2/lib/python3.12/site-packages/dagster/_core/definitions/asset_graph.py", line 239, in from_assets
assets_defs = cls.normalize_assets(assets)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jbelis/Sources/codekarma/aws-ecs-test/.venv2/lib/python3.12/site-packages/dagster/_core/definitions/asset_graph.py", line 195, in normalize_assets
resolved_deps = ResolvedAssetDependencies(assets_defs, [])
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jbelis/Sources/codekarma/aws-ecs-test/.venv2/lib/python3.12/site-packages/dagster/_core/definitions/resolved_asset_deps.py", line 23, in __init__
self._deps_by_assets_def_id = resolve_assets_def_deps(assets_defs, source_assets)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jbelis/Sources/codekarma/aws-ecs-test/.venv2/lib/python3.12/site-packages/dagster/_core/definitions/resolved_asset_deps.py", line 206, in resolve_assets_def_deps
raise DagsterInvalidDefinitionError(msg)
warnings.warn(f"Error loading repository location {location_name}:{error.to_string()}")
There was an error in my code. The upstream (source) asset needs to be referenced using its key (dropout) and not the actual asset variable (csv_asset). It works after changing the downstream asset as follows:
@asset(io_manager_key="local_parquet_io_manager", key_prefix=["out"])
def parquet_asset(dropout: pd.DataFrame):
return dropout
Hope this helps someone some day…