I have written below code, which load json rows to bigquery.
Its working on Direct runner but, on DataFlowRunneer it throws this error.
In start_bundle NameError: name 'bigquery' is not defined [while running 'ParDo(Error_handle)-ptransform-41'].
My code is below:
import argparse
import json
import logging
import sys
import time
import dask.dataframe as dd
import pandas as pd
from tabulate import tabulate
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions
from bigquery_schema_generator.generate_schema import SchemaGenerator, read_existing_schema_from_file
from google.cloud import bigquery,storage
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
# bucket_name="gs://dataflow-stream-amar/error_files"
client = bigquery.Client(project="<PROJECT_ID>")
table_id="dflow_dataset.test_10"
class Error_handle(beam.DoFn):
def start_bundle(self):
self.client = bigquery.Client()
def process(self, batch):
from google.cloud import bigquery ##tried importing library inside ParDo, but not working
print(f"Got {len(batch)} bad rows")
print("--------------")
print(batch)
print("---------------")
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
schema_update_options=[
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
],
write_disposition=
bigquery.WriteDisposition.WRITE_APPEND,
autodetect=True
)
try:
load_job = self.client.load_table_from_json(
batch,
table_id,
job_config=job_config,
) # Make an API request.
load_job.result() # Waits for the job to complete.
if load_job.errors:
logging.info(f"error_result = {load_job.error_result}")
logging.info(f"errors = {load_job.errors}")
else:
logging.info(f'Loaded {len(batch)} rows.')
except Exception as error:
logging.info(f'Error: {error} with loading dataframe')
class json_data(beam.DoFn):
def process(self,element):
# print(type(e))
e=json.loads(element)
# print(e)
yield e
def format_err(e):
d=("_",e)
return d
def run(argv=None):
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as pipeline:
res=(pipeline | beam.io.ReadFromText("gs://dataproc-input-321/test1.txt")
|beam.ParDo(json_data())
|beam.io.WriteToBigQuery(
table="<PROJECT_ID>.dflow_dataset.test_10",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
method=beam.io.WriteToBigQuery.Method.STREAMING_INSERTS, insert_retry_strategy=beam.io.gcp.bigquery_tools.RetryStrategy.RETRY_NEVER
)
)
_ = (
res[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
|beam.Map(lambda x: x[1])
|beam.Map(format_err)
|beam.GroupIntoBatches(3)
|beam.MapTuple(lambda _, val: val)
|beam.ParDo(Error_handle())
)
run()
I tried to load error rows to Bigquery.
Its loading when runner is DirectRunner, but failing on DataFlow runner.
My python version is 3.10
I can not use some other method to load json, beacause other method expects rows to be in a file.