In Airflow Plugins, what is the proper way to define a custom model in the metadata PSQL database?
I’ve tried the approach in this post by @user1966723:
Airflow Plugin – create own model and use airflow metadatabase for storing plugin specific data
But it doesn’t seem to work.
I’ve come up with this solution, but I don’t think this is the best way to implement this…:
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.db import provide_session
from airflow.models.base import Base
from sqlalchemy import inspect
from sqlalchemy import Column, Integer, String
class Custom(Base):
__tablename__ = 'custom'
id = Column(Integer, primary_key=True)
name = Column(String(255), nullable=False)
description = Column(String(255))
@provide_session
def create_custom_table(session=None):
engine = session.get_bind()
inspector = inspect(engine)
# Check if the table exists before creating it
if not inspector.has_table('custom'):
CustomTable.__table__.create(engine)
print("Table 'custom' has been created.")
else:
print("Table 'custom' already exists.")
# Define the plugin
class DynamicDagPlugin(AirflowPlugin):
name = 'custom_plugin'
def on_load(self, *args, **kwargs):
create_custom_table()