Is there a way to kick off a group of asset jobs in Dagster after another group has finished?
By way of context, I have two groups of assets (foo
and bar
).
foo = AssetSelection.groups("foo")
bar = AssetSelection.groups("bar")
And two assets based on those groups:
foo_pipeline = define_asset_job(
name="foo_pipeline", selection=foo
)
bar_pipeline = define_asset_job(
name="bar_pipeline", selection=bar
)
as well as one schedule
foo_schedule = ScheduleDefinition(
job=foo_pipeline,
cron_schedule="30 9 * * MON-FRI",
execution_timezone="America/Chicago",
default_status=DefaultScheduleStatus.RUNNING,
)
Based on https://github.com/dagster-io/dagster/discussions/7160, I’ve attempted to do the following, to no avail.
@sensor(job=bar)
def thing_sensor():
run_records = context.instance.get_run_records(
filters=RunsFilter(
job_name=foo,
statuses=[DagsterRunStatus.SUCCESS],
# updated_after=..., # can also filter by timestamp to do more efficient fetching
),
order_by="update_timestamp",
ascending=False,
)
for run_record in run_records:
yield RunRequest(
run_key=run_record.dagster_run.run_id, # avoid double firing for the same run
)
thing_sensor()
This does not work, however. Nothing errors out when run dagster dev -m projects
but when I look at the Dagster UI, neither my foo job nor a foo sensor have been defined.
When I hop into iPython, I see:
In [74]: x = thing_sensor()
In [75]: [print(i) for i in x]
Cell In[75], line 1, in <listcomp>(.0)
----> 1 [print(i) for i in x]
Cell In[66], line 3, in thing_sensor()
1 @sensor(job=internal_reporting_pipeline)
2 def thing_sensor():
----> 3 run_records = context.instance.get_run_records(
4 filters=RunsFilter(
5 job_name=partner_reporting_pipeline,
6 statuses=[DagsterRunStatus.SUCCESS],
7 # updated_after=..., # can also filter by timestamp to do more efficient fetching
8 ),
9 order_by="update_timestamp",
10 ascending=False,
11 )
12 for run_record in run_records:
13 yield RunRequest(
14 run_key=run_record.dagster_run.run_id, # avoid double firing for the same run
15 )
NameError: name 'context' is not defined
I am sure that I am misunderstanding the Github link shared above, but I’m not sure how. At the end of the day, I just want to run one group of jobs after the other has finished, but I’m finding Dagster’s documentation to be difficult to parse.
Is there a way to run one group of jobs after another has finished? Can you share an example or a link to code if so?