I am writing a apache beam streaming pipeline, where I want to read msg from pubsusb, and want to iterate over all the msg present in window , after my window time, in this case 1 min.
But, although I have given ,window size of 60, it starts printing msg from PubSub, as soon as I publish on the topic.
Here is my code.
import json
import os
import typing
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.transforms.sql import SqlTransform
pipeline_options = PipelineOptions( streaming=True)
def run():
with beam.Pipeline(options=pipeline_options) as p:
out= (
p
| "Read from Pub/Sub subscription" >> beam.io.ReadFromPubSub(subscription="projects/<PROJ_ID>/subscriptions/Test-sub")
| "Decode and parse Json" >> beam.Map(lambda element: element.decode("utf-8"))
|"window" >> beam.WindowInto(beam.window.FixedWindows(10))
<< After the 60sec window gets finished, I want to take each element from window and pass it to a Google Gemini API to classify its sentiments>>
| beam.Map(print)
)
# p.run()
if __name__ == '__main__':
run()
Can anyone tell me whats happening or how to achieve this?
I want to accumulate all the msg, till my window size.
After window gets complete, I want to iterate over all elements, and do some operations.
Thank you.
Above is my code and what I have tried