Context: I already set up to connect with Airflow successfully. During the OM is collect metadata from Airflow, it has issues is Field required [type=missing, input_value={‘__var’: {‘downstream_ta…}, ‘__type’: ‘operator’}, input_type=dict]. I hope you can help me out this situation.
For further information visit https://errors.pydantic.dev/2.7/v/missing
[2024-09-06T08:39:25.482+0000] {metadata.py:339} WARNING - Error building pydantic model for ('upload_data_to_s33', {'__version': 1, 'dag': {'end_date': 1662915600.0, 'schedule_interval': '@daily', '_task_group': {'_group_id': None, 'prefix_group_id': True, 'tooltip ... (2636 characters truncated) ... task', '_is_empty': False, 'start_trigger_args': None, 'op_args': [], 'op_kwargs': {}}, '__type': 'operator'}], 'dag_dependencies': [], 'params': []}}, '/opt/airflow/dags/upload_data_to_s33.py') - 3 validation errors for AirflowDagDetails
tasks.0.task_id
Field required [type=missing, input_value={'__var': {'downstream_ta...}, '__type': 'operator'}, input_type=dict]
For further information visit https://errors.pydantic.dev/2.7/v/missing
tasks.1.task_id
Field required [type=missing, input_value={'__var': {'downstream_ta...}, '__type': 'operator'}, input_type=dict]
For further information visit https://errors.pydantic.dev/2.7/v/missing
tasks.2.task_id
Field required [type=missing, input_value={'__var': {'downstream_ta...}, '__type': 'operator'}, input_type=dict]
For further information visit https://errors.pydantic.dev/2.7/v/missing
[2024-09-06T08:39:25.482+0000] {topology_runner.py:252} DEBUG - Post processing node producer='get_services' stages=[NodeStage(type_=<class 'metadata.generated.schema.entity.services.pipelineService.PipelineService'>, processor='yield_create_request_pipeline_service', nullable=False, must_return=True, overwrite=False, consumer=None, context='pipeline_service', store_all_in_context=False, clear_context=False, store_fqn=False, cache_entities=True, use_cache=False)] children=['pipeline'] post_process=['mark_pipelines_as_deleted'] threads=False
[2024-09-06T08:39:25.634+0000] {ingestion_pipeline_mixin.py:52} DEBUG - Created Pipeline Status for pipeline Airflow_Configuration.f4b9aef5-f8d4-4e58-a361-01c793c4b57d: runId='c5aa044c-287d-4c89-a1cd-aa717e2e5610' pipelineState=<PipelineState.success: 'success'> startDate=Timestamp(root=1725611963429) timestamp=Timestamp(root=1725611963429) endDate=Timestamp(root=1725611965574) status=IngestionStatus(root=[StepSummary(name='Airflow', records=0, updated_records=0, warnings=0, errors=0, filtered=0, failures=None), StepSummary(name='OpenMetadata', records=0, updated_records=0, warnings=0, errors=0, filtered=0, failures=None)])
[2024-09-06T08:39:25.638+0000] {logger.py:175} INFO - Statuses detailed info:
[2024-09-06T08:39:25.639+0000] {logger.py:175} INFO - Airflow Status:
[2024-09-06T08:39:25.639+0000] {logger.py:175} INFO - {'failures': [], 'filtered': [], 'records': [], 'source_start_time': 1725611963.8067443, 'updated_records': [], 'warnings': []}
[2024-09-06T08:39:25.639+0000] {logger.py:175} INFO - OpenMetadata Status:
[2024-09-06T08:39:25.640+0000] {logger.py:175} INFO - {'failures': [], 'filtered': [], 'records': [], 'source_start_time': 1725611965.3595185, 'updated_records': [], 'warnings': []}
[2024-09-06T08:39:25.640+0000] {logger.py:175} INFO - Execution Time Summary
[2024-09-06T08:39:25.642+0000] {logger.py:175} INFO -
[2024-09-06T08:39:25.642+0000] {logger.py:175} INFO - Workflow Airflow Summary:
[2024-09-06T08:39:25.642+0000] {logger.py:175} INFO - Processed records: 0
[2024-09-06T08:39:25.642+0000] {logger.py:175} INFO - Updated records: 0
[2024-09-06T08:39:25.643+0000] {logger.py:175} INFO - Warnings: 0
[2024-09-06T08:39:25.643+0000] {logger.py:175} INFO - Errors: 0
[2024-09-06T08:39:25.643+0000] {logger.py:175} INFO - Workflow OpenMetadata Summary:
[2024-09-06T08:39:25.643+0000] {logger.py:175} INFO - Processed records: 0
[2024-09-06T08:39:25.643+0000] {logger.py:175} INFO - Updated records: 0
[2024-09-06T08:39:25.643+0000] {logger.py:175} INFO - Warnings: 0
[2024-09-06T08:39:25.643+0000] {logger.py:175} INFO - Errors: 0
[2024-09-06T08:39:25.643+0000] {logger.py:175} INFO - Success %: 100.0
[2024-09-06T08:39:25.643+0000] {logger.py:175} INFO - Workflow finished in time: 1.84s
[2024-09-06T08:39:25.644+0000] {python.py:237} INFO - Done. Returned value was: None
[2024-09-06T08:39:25.727+0000] {local_task_job_runner.py:240} INFO - Task exited with return code 0
[2024-09-06T08:39:25.744+0000] {taskinstance.py:3498} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2024-09-06T08:39:25.745+0000] {local_task_job_runner.py:222} INFO - ::endgroup::
This is my sample DAG airflow: trying upload data to MinIO. This pipeline is good:
My database of Airflow on Docker:enter image description here
My configuration on OM: enter image description here
My postgres: enter image description here