Getting following errors after launching dataflow template – code execution starts and once pipeline graph generation starts, getting this ????????
“subprocess.CalledProcessError: Command ‘[‘/usr/local/bin/python’, ‘-m’, ‘pip’, ‘download’, ‘–dest’, ‘/tmp/dataflow-requirements-cache’, ‘-r’, ‘/tmp/tmpomr2b4n9/tmp_requirements.txt’, ‘–exists-action’, ‘i’, ‘–no-deps’, ‘–implementation’, ‘cp’, ‘–abi’, ‘cp38’, ‘–platform’, ‘manylinux2014_x86_64′]’ returned non-zero exit status 1.”}
Pip install failed for package: -r “}
{“severity”:”INFO”,”time”:”2024/05/29 14:53:10.869696″,”line”:”exec.go:66″,”message”:” Output from execution of subprocess: b”WARNING: Retrying (Retry(total=4, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘NewConnectionError(‘u003cpip._vendor.urllib3.connection.HTTPSConnection object at 0x7f5f4c7bde50u003e: Failed to establish a new connection: [Errno 101] Network is unreachable’)’: /simple/google-cloud-storage/}
{“severity”:”INFO”,”time”:”2024/05/29 14:53:11.262883″,”line”:”exec.go:52″,”message”:”python failed with exit status 1″}
{“severity”:”ERROR”,”time”:”2024/05/29 14:53:11.263007″,”line”:”launch.go:80″,”message”:”Error: Template launch failed: exit status 1″}
- Following is my requirements.txt
apache-beam[gcp]
google-cloud-storage
google-cloud-bigquery
google-cloud-logging
pandas
pandas-gbq
db-dtypes
- Following is my metadata.json
{
“name”: “Getting started Batch Pipeline”,
“description”: “Batch Pipeline flex template for Python.”,
“parameters”: [
{
“name”: “output”,
“label”: “Output destination”,
“helpText”: “The path and filename prefix for writing output files. Example: gs://your-bucket/your-path”,
“regexes”: [
“^gs://[^nr]+$”
]
}
]
}
- This is dummy batch_pipeline.py
import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
def write_to_cloud_storage(argv=None):
# Parse the pipeline options passed into the application.
class MyOptions(PipelineOptions):
@classmethod
# Define a custom pipeline option that specfies the Cloud Storage bucket.
def _add_argparse_args(cls, parser):
parser.add_argument(“–output”, required=True)
wordsList = ["1", "2", "3", "4"]
options = MyOptions()
with beam.Pipeline(options=options) as pipeline:
(
pipeline
| "Create elements" >> beam.Create(wordsList)
| "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
)
if name == “main“:
write_to_cloud_storage()
- Using following command I am building the flex template
gcloud dataflow flex-template build gs://<BUCKET_NAME>/batch_pipeline-req-py.json
–image-gcr-path “europe-west1-docker.pkg.dev/<PROJECT_ID>/<BUCKET_NAME>/batch-pipeline-python:V1”
–sdk-language “PYTHON”
–flex-template-base-image “PYTHON3”
–metadata-file “metadata.json”
–py-path “.”
–env “FLEX_TEMPLATE_PYTHON_PY_FILE=batch_pipeline.py”
–env “FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=requirements.txt”
- Following command is used to run the template on DataflowRunner
gcloud dataflow flex-template run “batch-pipeline-flex”
–template-file-gcs-location “gs://<BUCKET_NAME>/batch_pipeline-req-py.json”
–service-account-email “service-dfl-ingest@<PROJECT_ID>.iam.gserviceaccount.com”
–subnetwork “https://www.googleapis.com/compute/v1/projects/<PROJECT_ID>/regions/europe-west1/subnetworks/”
–parameters output=”gs://<BUCKET_NAME>/output-”
–region “europe-west1”
Service account has following IAM roles