I am learning apache-beam triggers.
I have written a apache beam code, which have 30 second fixed window, and a afterCount trigger of 3, and accumulation_mode as trigger.AccumulationMode.ACCUMULATING.
After the trigger, I am using CombinePerKey, to sum the values per key basis.
Now, As per my understanding, the trigger should fire whenever there are 3 elements, or 30 seconds pass.
But, when I see the output, I am not able to understand , whats happening.
Can anyone explain me the output?
Here is my code:
import json
import os
import typing
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.transforms.sql import SqlTransform
from apache_beam.transforms.trigger import AccumulationMode,AfterAny,AfterCount,AfterProcessingTime
from apache_beam.transforms import trigger
topic= "projects/<PROJECT_ID>/topics/Test-beam"
pipeline_options = PipelineOptions( streaming=True)
def human_readable_window(window) -> str:
"""Formats a window object into a human readable string."""
if isinstance(window, beam.window.GlobalWindow):
return str(window)
return f'{window.start.to_utc_datetime()} - {window.end.to_utc_datetime()}'
class PrintElementInfo(beam.DoFn):
"""Prints an element with its Window information."""
def process(self, element, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
print(f'[{human_readable_window(window)}] {timestamp.to_utc_datetime()} -- {element}')
yield element
@beam.ptransform_fn
def PrintWindowInfo(pcollection):
"""Prints the Window information with how many elements landed in that window."""
class PrintCountsInfo(beam.DoFn):
def process(self, num_elements, window=beam.DoFn.WindowParam):
print(f'>> Window [{human_readable_window(window)}] has {num_elements} elements')
yield num_elements
return (
pcollection
| 'Count elements per window' >> beam.combiners.Count.Globally().without_defaults()
| 'Print counts info' >> beam.ParDo(PrintCountsInfo())
)
def form(element):
word=element.split(",")
return (word[0],int(word[1]))
def run():
with beam.Pipeline(options=pipeline_options) as p:
out= (
p
| "Read from Pub/Sub subscription" >> beam.io.ReadFromPubSub(topic=topic)
| "Decode and parse Json" >> beam.Map(lambda element: element.decode("utf-8"))
| "format" >> beam.Map(form)
|"window" >> beam.WindowInto(beam.window.FixedWindows(30),trigger=AfterCount(3),accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
| beam.ParDo(PrintElementInfo())
|beam.CombinePerKey(sum)
|"print msg" >> beam.Map(print)
# |'Print element info' >> beam.ParDo(PrintElementInfo())
)
# p.run()
if __name__ == '__main__':
run()
For testing , I have published 100 msg with 1sec duration to pubSub.
And Here is out:
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:07.520000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:06.307000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:08.561000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:09.531000 -- ('A', 5)
('A', 20)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:10.557000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:11.547000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:12.521000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:13.524000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:14.514000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:15.549000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:16.534000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:17.512000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:18.525000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:19.529000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:20.509000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:21.515000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:22.546000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:23.520000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:24.516000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:25.524000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:26.510000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:27.516000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:28.507000 -- ('A', 5)
[2024-06-25 07:59:00 - 2024-06-25 07:59:30] 2024-06-25 07:59:29.531000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:30.525000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:31.499000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:32.516000 -- ('A', 5)
('A', 15)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:33.542000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:34.530000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:35.526000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:36.529000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:37.510000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:38.520000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:39.508000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:40.514000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:41.541000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:42.545000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:43.524000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:44.548000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:45.559000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:46.511000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:47.529000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:48.545000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:49.545000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:50.545000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:51.537000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:52.535000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:53.564000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:54.530000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:55.538000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:56.529000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:57.526000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:58.516000 -- ('A', 5)
[2024-06-25 07:59:30 - 2024-06-25 08:00:00] 2024-06-25 07:59:59.543000 -- ('A', 5)
[2024-06-25 08:00:00 - 2024-06-25 08:00:30] 2024-06-25 08:00:00.537000 -- ('A', 5)
[2024-06-25 08:00:00 - 2024-06-25 08:00:30] 2024-06-25 08:00:01.621000 -- ('A', 5)
[2024-06-25 08:00:00 - 2024-06-25 08:00:30] 2024-06-25 08:00:02.514000 -- ('A', 5)
('A', 15)
[2024-06-25 08:00:00 - 2024-06-25 08:00:30] 2024-06-25 08:00:03.528000 -- ('A', 5)
[2024-06-25 08:00:00 - 2024-06-25 08:00:30] 2024-06-25 08:00:04.545000 -- ('A', 5)
[2024-06-25 08:00:00 - 2024-06-25 08:00:30] 2024-06-25 08:00:05.514000 -- ('A', 5)
[2024-06-25 08:00:00 - 2024-06-25 08:00:30] 2024-06-25 08:00:06.504000 -- ('A', 5)
[2024-06-25 08:01:00 - 2024-06-25 08:01:30] 2024-06-25 08:01:01.682000 -- ('A', 5)
As you can see, the first window is 2024-06-25 07:59:00 – 2024-06-25 07:59:30, but the Result (A,20) I got after 4 element and end of Window.
Now the next output,(‘A’, 15), is after window
[2024-06-25 07:59:00 – 2024-06-25 07:59:30] -> this window passes, complete 30 sec ,no result emit
[2024-06-25 07:59:30 – 2024-06-25 08:00:00]—-> this window starts.
when this window [2024-06-25 07:59:30 – 2024-06-25 08:00:00], gets 3 elements, it trigger result and again no output, even after this window is over or again 3 elements are added.
- Why , its not accumulating all elements within a window then grouping it, I have accumulation mode set as Accumulating.
Above I have code as well as output.
I have tried all that things which I have mention in the question description.
I want to create a beam streamin pipeline which aggregate results every 30 second , or after every 3 elements.
So , my expectation is
if 1st window starts from 10:10->10:40
And elements published are like below:
A,5 –>10:10
A,5 –>10:11
A,5 –>10:12 it should trigger here ,because 3 elements are added, but still window is open.
…..
<trigger can fire multiple times whenever new 3 elements are added…>
…
A,5 –>10:40 — Again it should trigger, and apply combine per key, summing up all the values of A in current window.
Amar Jeet is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.