I have an issue with KubernetesPodOperator.
Maybe someone could give me an advice or know the answer.
I’m implementing Liquibase tool on my project. And we are going to trigger Liquibase from Airflow inside a Pod. This requires to have config files & changelog files inside it.
All migration files and etc.. located in /opt/airflow/dags/migrations folder on the Airflow Worker.
The main idea of migrations DAG is following:
2 tasks.
-
Python task copy migration files to the volume. (copy from /opt/airflow/dags/migrations to the volume /opt/airflow/migrations)
-
KubernetesPodOperator with liquibase image utilize bash to:
a) copy files from volume to liquiabase folder. (copy from /opt/airflow/migrations to /liquibase)
b) launch liquibase update command.
My DevOps collegue created a persistentVolumeClaim and VolumeMount:
extraVolumes:
- name: liquidbase-migrations
persistentVolumeClaim:
claimName: liquidbase-migrations-claim
extraVolumeMounts:
- name: liquidbase-migrations
mountPath: /opt/airflow/migrations
readOnly: false
volumeClaimTemplates:
- metadata:
name: liquidbase-migrations
spec:
storageClassName: "default"
accessModes:
- "ReadWriteOnce"
resources:
requests:
storage: "10Gi"
enter code here
We are not understand why does liquibase Pod can’t see the Volume.
Path /opt/airflow/migrations doesn’t exist there. /opt/ is empty.
Here is code I’m using:
volume = k8s.V1Volume(
name=volume_name,
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name=claim_name),
)
volume_mount = k8s.V1VolumeMount(
name=volume_mount_name,
mount_path='/opt/airflow/migrations',
)
migrations_test = KubernetesPodOperator(
namespace=namespace,
image=image_name,
name="test_migrations",
task_id="apply_migrations_task_test1",
arguments=arguments,
# cmds=["./go.sh"],
in_cluster=True,
# volume_mounts=[volume_mount],
# volumes=[volume],
log_events_on_failure=True,
is_delete_operator_pod=True,
get_logs=True,
env_vars=[
k8s.V1EnvVar(name="SF_ACCOUNT", value=str(values["account"] + "." + values["region"])),
k8s.V1EnvVar(name="SF_USER", value=sf_conn.login),
k8s.V1EnvVar(name="SF_PASSWORD", value=sf_conn.password),
k8s.V1EnvVar(name="SF_WAREHOUSE", value=values["warehouse"]),
k8s.V1EnvVar(name="SF_DATABASE", value=values["database"]),
k8s.V1EnvVar(name="SF_SCHEMA", value=values["schema"]),
k8s.V1EnvVar(name="SF_ROLE", value=values["role"]),
k8s.V1EnvVar(name="URL", value=url),
],
# tried following part as an alternative to:
### volume_mounts=[volume_mount]
### volumes=[volume]
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base1",
volume_mounts=[volume_mount]
)
],
volumes=[volume],
)
)
},
)
# volume_name = "liquidbase-migrations"
# volume_mount_name = "liquidbase-migrations"
# claim_name = "liquidbase-migrations-claim"