I’m currently working with Apache Beam and Google Cloud Dataflow, and I’ve encountered an issue where my Dataflow job is executing successfully, but no Directed Acyclic Graph (DAG) is visible in the Cloud Dataflow Console.
Here’s a brief overview of my setup:
- Apache Beam Version: [Specify version, if known]
- Python Version: [Specify version, if known]
- Google Cloud SDK Version: [Specify version, if known]
- Dataflow Runner: DataflowRunner
Problem:
The Dataflow job runs successfully, but no DAG is visible in the Cloud Dataflow Console.
The job status is Running or Succeeded, but the visualization for the DAG is missing.
Code
import json
import datetime
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
class ParsePubSubMessage(beam.DoFn):
def process(self, element):
try:
element = json.loads(element.decode('utf-8'))
yield element['customer_id'], element['purchase_amount'], element['time_stamp']
except json.JSONDecodeError as e:
print(f"Failed to decode message: {e}")
class CustomTimeStamp(beam.DoFn):
def process(self, elements):
if len(elements) > 2:
try:
unix_timestamp = float(elements[2])
if unix_timestamp < 0 or unix_timestamp > 2**31:
raise ValueError("Timestamp out of range")
yield beam.window.TimestampedValue(elements, unix_timestamp)
except (ValueError, TypeError) as e:
print(f"Invalid timestamp {elements[2]}: {e}")
default_timestamp = float(datetime.datetime.now().timestamp())
yield beam.window.TimestampedValue(elements, default_timestamp)
else:
print(f"Element does not have enough items: {elements}")
yield None
class ParseFn(beam.DoFn):
def process(self, element):
yield (element[0], float(element[1]))
class FormatOutputFn(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam):
customer_id, total_purchase = element
try:
start = window.start.to_utc_datetime()
end = window.end.to_utc_datetime()
output_str = f"{customer_id}: ${total_purchase}, Window: {start} - {end}"
print(output_str)
yield output_str.encode('utf-8')
except OverflowError:
yield f"{customer_id}: ${total_purchase}, Invalid timestamp".encode('utf-8')
def customer_data_aggregation(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input_subscription', dest='input_subscription', required=True, help='Pub/Sub subscription to read from')
parser.add_argument('--output_topic', dest='output_topic', required=True, help='Pub/Sub topic to write to')
parser.add_argument('--runner', dest='runner', default='DirectRunner', help='Runner type (DirectRunner or DataflowRunner)')
parser.add_argument('--project', dest='project', required=False, help='Google Cloud Project ID')
parser.add_argument('--temp_location', dest='temp_location', required=False, help='Temporary storage location for Dataflow (e.g., gs://bucket/temp)')
parser.add_argument('--region', dest='region', required=False, help='Google Cloud region (e.g., us-central1)')
parser.add_argument('--job_name', dest='job_name', required=False, help='Dataflow job name')
parser.add_argument('--streaming', dest='streaming', action='store_true', help='Specify this flag to run the pipeline in streaming mode')
known_args, pipeline_args = parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
if known_args.runner == 'DataflowRunner':
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = known_args.project
google_cloud_options.temp_location = known_args.temp_location
google_cloud_options.region = known_args.region
google_cloud_options.job_name = known_args.job_name
if known_args.streaming:
options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=options) as p:
purchases = (
p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
| "Parse Pubsub Data" >> beam.ParDo(ParsePubSubMessage())
| 'Custom Timestamp' >> beam.ParDo(CustomTimeStamp())
| 'Filter None' >> beam.Filter(lambda x: x is not None)
)
windowed_purchases = (
purchases
| 'Apply Fixed Window' >> beam.WindowInto(FixedWindows(30))
| 'Parse CSV' >> beam.ParDo(ParseFn())
| 'Sum Purchases' >> beam.CombinePerKey(sum)
)
formatted_output = (
windowed_purchases
| 'Format Output' >> beam.ParDo(FormatOutputFn())
| "Write To PubSub" >> beam.io.WriteToPubSub(topic=known_args.output_topic)
)
if __name__ == '__main__':
customer_data_aggregation()
Command to Run the Pipeline:
python fixed_window.py
--input_subscription=projects/project-id/subscriptions/customer-topic-2-sub
--output_topic=projects/project-id/topics/topic-12
--runner=DataflowRunner
--project=project-id
--temp_location=gs://customer_data_analysis/temp/
--region=us-central1
--streaming
--job_name=my-dataflow-job
Troubleshooting Steps Taken:
- Verified that the Dataflow job is executing and logging.
- Checked pipeline options and arguments.
- Ensured that the job name is valid and unique.
- Attempted to refresh the Dataflow Console page.
- Simplified the pipeline to test DAG creation.
Questions:
- Are there any specific issues that could prevent the DAG from being created or displayed?
- How can I further diagnose why the DAG visualization is missing?
- Are there any additional steps I can take to ensure that the DAG is created and visible?
Any help or insights would be greatly appreciated!