I’m using Pyairbyte and Airflow to load data from AWS S3 into Bigquery. This is my task code:
@task.external_python(python='/usr/local/airflow/.pyairbyte-venv/bin/python')
def extract():
import airbyte as ab
#from airbyte.caches import BigQueryCache
source = ab.get_source(
"source-s3",
config={
"bucket": "my-bucket",
"region_name": "us-east-1",
"path_prefix": "src_data",
"streams": [
{
"name": "transaction",
"format": {
"filetype": "csv"
}
}
],
"aws_access_key_id": "",
"aws_secret_access_key": ""
},
install_if_missing=True,
)
source.check()
source.select_all_streams()
result = source.read()
On my bucket, I’ve multiple folders and each folder can have different file types. What I’m trying to do is copy only the files from my src_data folder (that only contains CSV files). However, I got an error because it tries to search for all the files from the bucket and not only inside the prefix specified. Is there anyone who has the same problem? How did you solve this?
I already try multiple options like adding the "streams.globs": ["src_data/*.csv"]
inside streams config but without success
Bakerstreet Pi is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.