I have a pipeline which is receiving streaming data via pub/sub.
The data is manipulated and I want to write in BigQuery, using dynamic tables based on the row information:
table=lambda element: f'{GCP_PROJECT}:{element.get(“customerId”).replace(“-“,”_”)}.{element.get(“siteId”)}’,
pipeline below:
# ----------------------------------
# Write to BgQuery Pipeline
# ----------------------------------
bq_results = (
pipeline_bq
| 'Add Timestamp' >> beam.Map(lambda elem: beam.window.TimestampedValue(elem, int(elem['measurement_time'])))
| 'Window into window_size Seconds' >> beam.WindowInto(beam.window.FixedWindows(window_size), allowed_lateness=beam.window.Duration(seconds=120))
| 'creating tupple' >> beam.Map(lambda element: ((element.get("customerId", "").replace("-", "_"), element.get("siteId", "")), element))
| 'Group by customerId and networkId' >> beam.GroupByKey()
# Option 1 - Write directly
| 'Extract Values from Tupple' >> beam.Map(lambda kv: kv[1])
| 'Flatten List (BigQuery expects one individual row)' >> beam.FlatMap(lambda values: values)
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
table=lambda element: f'{GCP_PROJECT}:{element.get("customerId").replace("-","_")}.{element.get("siteId")}',
schema=schema,
method="STREAMING_INSERTS",
triggering_frequency=120,
with_auto_sharding=True,
batch_size=500,
additional_bq_parameters={
'timePartitioning':
{
'type': 'DAY',
'field': 'measurement_time'
},
'clustering':
{
'fields': ['node_address']
}
},
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)
Everything is working, but I’m getting the above warning saying that the process is taking long time to execute:
Operation ongoing in bundle process_bundle-70-183 for PTransform{name=Write to BigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)-ptransform-59, state=process-msecs} for at least 3336.47 seconds without outputting or completing
Two questions:
- Why is it taking so long to Stream the rows into BigQuery
- How can I optimise the write? I tried several options and I’m doing the GroupByKey to try to aggregate the similar rows, but still, it’s taking long time
Suggestions are more than welcome. Thanks