I’ve been facing this issue for over a month now and I searched the whole www to find a solution for it and I couldn’t.
My pipeline’s config is the following: Streaming pipeline, Python 3.11 SDK 2.56.0.
My pseudocode is shown below:
import apache_beam as beam
from apache_beam import window
from apache_beam import trigger
class AddTS(beam.DoFn):
def process(self, element, timestamp=beam.DoFn.TimestampParam):
load_type = element.decode("utf-8")
yield beam.window.TimestampedValue(
(load_type, None), timestamp
)
class Tag(beam.DoFn):
def process(self, element):
load_type = element[0]
yield beam.pvalue.TaggedOutput(load_type, element)
with beam.Pipeline(options=pipeline_options) as pipeline:
tables_a, tables_b = (
pipeline
| "Impulse" >> beam.io.ReadFromPubSub(subscription=input_subscription)
| "Decode" >> beam.Map(lambda x: x.decode("utf-8"))
| "AddTimestampToEvent" >> beam.ParDo(AddTS())
| "Window" >> beam.WindowInto(
windowfn=window.FixedWindows(600), # 10 minutes window
trigger=trigger.AfterWatermark(),
accumulation_mode=trigger.AccumulationMode.DISCARDING
)
| "GroupByKey" >> beam.GroupByKey() # <--- here
| "FetchTables" >> beam.ParDo(FetchTables())
| "TagLoadType" >> beam.ParDo(Tag()).with_outputs("a", "b")
)
# ...
My issue is that, according to the documentation either I have 1 or 1k messages coming through the Topic, I still have a window of 10 minutes and all the data that is ingested by the pipeline should be GroupedByKey()
within the Window and the Key itself (on the docs, see section “8.1.1. Windowing constraints”).
But that doesn’t happen (neither locally or in Dataflow). What happens is that the grouping is not respected by any means and the behaviour is similar to if I didn’t have the WindowInto
but only a continuous Stream pipeline.
What is very annoying although, is that there’s no either warning or error messages and I’ve been trying to guess what the problem is the whole time.
Essentially, the concepts of an aggregation function (GroupByKey
) and a Window
are very clear to me but still for a reason that I don’t know, it doesn’t behave like expected.
Any thoughts?