I got Airflow 2.8.2 installed with Docker.
I need clean Postgres database used with Airlow to free space on hard drive.
First of all this is my database memory usage:
As I know I can’t just delete rows from tables.
So I found simple dag for clearing database:
<code>from airflow import settings
from airflow.models import DagRun
from sqlalchemy.orm import load_onlysession = settings.Session()def remove_old_dag_runs():
DagRun # specify of which resource metadata should be removed
DagRun.execution_date # just get the execution date column
DagRun.execution_date < '2022-02-25 12:23:38.462662+00'
query.delete(synchronize_session=False)
session.commit()remove_old_dag_runs_task = PythonOperator(
task_id='remove_old_dag_runs',
python_callable=remove_old_dag_runs,
<code>from airflow import settings
from airflow.models import DagRun
from sqlalchemy.orm import load_onlysession = settings.Session()def remove_old_dag_runs():
query = session.query(
DagRun # specify of which resource metadata should be removed
).options(
load_only(
DagRun.execution_date # just get the execution date column
)
)
query = query.filter(
DagRun.execution_date < '2022-02-25 12:23:38.462662+00'
)
query.delete(synchronize_session=False)
session.commit()remove_old_dag_runs_task = PythonOperator(
task_id='remove_old_dag_runs',
python_callable=remove_old_dag_runs,
dag=dag
)
</code>
from airflow import settings
from airflow.models import DagRun
from sqlalchemy.orm import load_onlysession = settings.Session()def remove_old_dag_runs():
query = session.query(
DagRun # specify of which resource metadata should be removed
).options(
load_only(
DagRun.execution_date # just get the execution date column
)
)
query = query.filter(
DagRun.execution_date < '2022-02-25 12:23:38.462662+00'
)
query.delete(synchronize_session=False)
session.commit()remove_old_dag_runs_task = PythonOperator(
task_id='remove_old_dag_runs',
python_callable=remove_old_dag_runs,
dag=dag
)
Also I found this code for DB cleaning.
But this code used with old Airflow version.
So which database object I can delete in Airflow 2.8.2?
I got this list of objects:
<code>airflow.models list 1:
DagModel, DagRun, Log, XCom, SlaMiss, TaskInstance, Variable
TaskReschedule, TaskFail, RenderedTaskInstanceFields, ImportError
from airflow.jobs.base_job import BaseJob
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.jobs.local_task_job import LocalTaskJob
<code>airflow.models list 1:
DagModel, DagRun, Log, XCom, SlaMiss, TaskInstance, Variable
airflow.models list 2:
TaskReschedule, TaskFail, RenderedTaskInstanceFields, ImportError
airflow.jobs list 1:
from airflow.jobs.base_job import BaseJob
airflow.jobs list 2:
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.jobs.local_task_job import LocalTaskJob
</code>
airflow.models list 1:
DagModel, DagRun, Log, XCom, SlaMiss, TaskInstance, Variable
airflow.models list 2:
TaskReschedule, TaskFail, RenderedTaskInstanceFields, ImportError
airflow.jobs list 1:
from airflow.jobs.base_job import BaseJob
airflow.jobs list 2:
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.jobs.local_task_job import LocalTaskJob