I am executing my Apache-beam code in google cloud shell, I am able to execute code without errors, but jobs not creating in data flow.
**below roles I assigned to service account
**
Dataflow Worker, Dataflow Admin, Pub/Sub Editor, the Storage Object Admin, and the BigQuery Data Editor
**
code I am using below, I am not getting errors when executing this code in cloud shell**
import json
import apache_beam as beam
from datetime import date
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
cus_gcp_temp_loc="gs://sai_sample_bucket"
subscription_name="projects/teg-cloud-bfsi-uk1/subscriptions/s-test-2-sub"
output_table="teg-cloud-bfsi-uk1:tutorial_dataset.s-test-2"
schema="""url:STRING, review:STRING"""
class fun1(beam.DoFn):
def process(self,element):
try:
if len(element)==0:
print("if working")
raise ValueError("no data")
else:
value=[]
print("else its working")
return [value]
except ValueError as e:
print(e)
def process_columns(element):
data=element.decode("UTF-8")
return json.loads(data)
class table(beam.DoFn):
def process(self, element):
try:
if len(element)==0:
raise ValueError("collection is empty")
else:
val=[]
msg=[element['msg']]
val.extend(msg)
name=[element['name']]
val.extend(name)
return val
except ValueError as e:
print(e)
map=lambda x:{'msg':x[0],'name':x[1]}
def load():
beam_options=PipelineOptions(
streaming=True,
runner='DataflowRunner',
project='teg-cloud-bfsi-uk1',
job_name='newjob',
statging_location='gs://sai_sample_bucket/stage',
temp_location='gs://sai_sample_bucket/temp',
template_location="gs://sai_sample_bucket/streamingpipeline",
region='us-central1')
with beam.Pipeline(options=beam_options) as pipeline:
input_data=(pipeline | "read from pub/sub" >> ReadFromPubSub(subscription=subscription_name)
| 'process columns' >> beam.Map(process_columns)
#| 'splitting' >> beam.Map(lambda x:x.split(','))
#|'aggregation columns'>>beam.ParDo(table())
#|'formating column into key value pair' >> beam.Map(map)
#| "out" >>beam.Map(print))
| 'WriteToBigQuery' >> WriteToBigQuery(table=output_table,schema=schema,create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
if __name__ == "__main__":
print("start")
load()
print("END")
help me to create a job in GCP data flow
1