Below is the code. But i see certain issues with it.
- not sure how to do it for more than one
.csv
files in the bucket - even though it copies to SFTP server, but it creates a staging directory in the destination path and then copies the file – like this
recieving_dirstaging*.csv
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.google.cloud.transfers.gcs_to_sftp import GCSToSFTPOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.operators.dummy import DummyOperator
gcs_bucket = 'adhoc_files'
staging_folder = 'staging'
archival_folder = 'archive'
SFTP_CONN_ID = "sftp_conn_id"
# Define your default arguments
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
# 'retries': 1,
# 'retry_delay': timedelta(minutes=5),
'start_date': datetime(2024, 4, 24),
}
# Define the DAG
dag = DAG(
'copy_from_gcs_to_sftp',
default_args=default_args,
schedule_interval=None,
)
# Define tasks
start_task = DummyOperator(task_id='start', dag=dag)
# Task to copy CSV file from GCS to SFTP
copy_to_sftp = GCSToSFTPOperator(
task_id='gcs_to_sftp',
source_bucket=f"{gcs_bucket}",
source_object=f"{staging_folder}/*.csv", # Will pick up any CSV file in the bucket
destination_path='recieving_dir/',
sftp_conn_id=SFTP_CONN_ID,
dag=dag,
)
# Task to move the file to the archival folder in the same GCS bucket
move_to_archive = BashOperator(
task_id='archive_files',
bash_command=f'gsutil mv gs://{source_gcs_bucket}/{staging_folder}/*.csv gs://{source_gcs_bucket}/{archival_folder}/'
)
# Define task dependencies
start_task >> copy_to_sftp >> move_to_archive