I have a sensor that I want to trigger a job run after an upstream job finishes. here is my sensor definition(removed config details).
from dagster import run_status_sensor, RunRequest, SkipReason, DagsterRunStatus
from analytics.jobs.etl_job import dbt_job
@run_status_sensor(run_status=DagsterRunStatus.SUCCESS)
def trigger_dbt_job_after_etl(context):
context.log.info(f"Sensor triggered for job: {context.dagster_run.job_name}")
if context.dagster_run.job_name == "etl_job":
context.log.info("etl_job completed successfully. Preparing to trigger dbt_job.")
run_config = {
"ops": {
"run_dbt_task": {
"config": {
"cluster_name": "",
"launch_type": "",
"security_groups": [""],
"subnets": [
"subnet-",
"subnet-",
"subnet-",
"subnet-",
"subnet-"
],
"task_definition": ""
}
}
}
}
# Create the RunRequest and log its contents
run_request = RunRequest(run_key=None, run_config=run_config, job=dbt_job, job_name="dbt_job")
context.log.info(f"Created RunRequest: {run_request}")
return run_request
else:
context.log.info(f"Skipping sensor run because the job is not etl_job. Current job: {context.dagster_run.job_name}")
return SkipReason("The job is not etl_job, skipping.")
and I get the following traceback
Traceback (most recent call last):
File "/home/ec2-user/.local/lib/python3.9/site-packages/dagster/_daemon/sensor.py", line 471, in _process_tick_generator
yield from _evaluate_sensor(
File "/home/ec2-user/.local/lib/python3.9/site-packages/dagster/_daemon/sensor.py", line 614, in _evaluate_sensor
sensor_runtime_data = code_location.get_external_sensor_execution_data(
File "/home/ec2-user/.local/lib/python3.9/site-packages/dagster/_core/remote_representation/code_location.py", line 930, in get_external_sensor_execution_data
return sync_get_external_sensor_execution_data_grpc(
File "/home/ec2-user/.local/lib/python3.9/site-packages/dagster/_api/snapshot_sensor.py", line 87, in sync_get_external_sensor_execution_data_grpc
raise DagsterUserCodeProcessError.from_error_info(result.error)
dagster._core.errors.DagsterUserCodeProcessError: dagster._core.errors.SensorExecutionError: Error occurred during the execution of evaluation_fn for sensor trigger_dbt_job_after_etl
Stack Trace:
File "/home/ec2-user/.local/lib/python3.9/site-packages/dagster/_grpc/impl.py", line 400, in get_external_sensor_execution
return sensor_def.evaluate_tick(sensor_context)
File "/usr/lib64/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/home/ec2-user/.local/lib/python3.9/site-packages/dagster/_core/errors.py", line 297, in user_code_error_boundary
raise new_error from e
The above exception was caused by the following exception:
Exception: Error in sensor trigger_dbt_job_after_etl: Sensor evaluation function returned a RunRequest for a sensor lacking a specified target (job_name, job, or jobs). Targets can be specified by providing job, jobs, or job_name to the @sensor decorator.
Stack Trace:
File "/home/ec2-user/.local/lib/python3.9/site-packages/dagster/_core/errors.py", line 287, in user_code_error_boundary
yield
File "/home/ec2-user/.local/lib/python3.9/site-packages/dagster/_grpc/impl.py", line 400, in get_external_sensor_execution
return sensor_def.evaluate_tick(sensor_context)
File "/home/ec2-user/.local/lib/python3.9/site-packages/dagster/_core/definitions/sensor_definition.py", line 921, in evaluate_tick
*self.resolve_run_requests(
File "/home/ec2-user/.local/lib/python3.9/site-packages/dagster/_core/definitions/sensor_definition.py", line 965, in resolve_run_requests
raise Exception(
I believe I’m correctly adding the sensor to my init file/definitions
from dagster import Definitions, EnvVar
from analytics.jobs.etl_job import etl_job, dbt_job
from analytics.resources.ecs_resource import ecs_client
from analytics.sensors.job_sensor import trigger_dbt_job_after_etl
defs = Definitions(
jobs=[etl_job, dbt_job],
resources={
"ecs_client": ecs_client.configured({
"region_name":"us-east-1",
"aws_access_key_id":"",
"aws_secret_access_key":""
} )
},
sensors=[trigger_dbt_job_after_etl]
)
After reviewing dagster docs it seems pretty straightforward to pass a target to a RunRequest but not sure why its not recognizing it at runtime?
the returned runrequest object looks correct
Created RunRequest: RunRequest(run_key=None, run_config={'ops': {'run_dbt_task': {'config': {'cluster_name': 'dbt-', 'launch_type': 'FARGATE', 'security_groups': ['sg-'], 'subnets': ['subnet-', 'subnet-', 'subnet-', 'subnet-', 'subnet-'], 'task_definition': 'arn:aws:ecs:us-east-1:'}}}}, tags={}, job_name='dbt_job', asset_selection=None, stale_assets_only=False, partition_key=None, asset_check_keys=None, asset_graph_subset=None)
1