We have a Apache Beam pipeline developed with the Python SDK that does a stateful join of a large number of incoming Kafka records (approximately 2000 requests per second).
We need to act on some inputs immediately so all our work happens in the global window and we use a Stateful DoFn with a timed cached very similar to the one described in this blog post: Fast Joins in Apache Beam
Over time the pipeline accumulates an ever increasing backlog for all steps.
Inspecting the worker logs we see the following suspicious items:
Retrieving state 345 times costed 60 seconds. It may be due to insufficient state cache size and/or frequent direct access of states.
Consider adding ‘–max_cache_memory_usage_mb’ pipeline option to increase state cache size or switch to materialized (pvalue.AsList) side input if applicable.
That looks like each state retrieval cost 173 ms on average (!!!).
Even reading from disk that looks excessively slow. It looks like something must be going very wrong.
Our max_cache_memory_usage_mb
is currently set to 6000 and the pipeline can easily exhaust 20 workers and not keep up.
What’s even more worrisome is seeing warnings like this:
Operation ongoing in bundle process_bundle-25849-71932 for
PTransform{name=name=Enrich/CoGroupByKey foo/CoGroupByKeyImpl/MapTuple(collect_values)-ptransform-39,
state=start-msecs} for at least 1518.49 seconds without outputting or completing.
Current Traceback:
File "/usr/local/lib/python3.11/threading.py", line 1002, in _bootstrap
self._bootstrap_inner()
File "/usr/local/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.11/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run
self._work_item.run()
File "/usr/local/lib/python3.11/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 385, in task
self._execute(
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 311, in _execute
response = task()
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 386, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 656, in do_instruction
return getattr(self, request_type)(
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 694, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1106, in process_bundle
for element in data_channel.input_elements(instruction_id,
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/data_plane.py", line 539, in input_elements
element = received.get(timeout=1)
File "/usr/local/lib/python3.11/queue.py", line 180, in get
self.not_empty.wait(remaining)
File "/usr/local/lib/python3.11/threading.py", line 331, in wait
gotit = waiter.acquire(True, timeout)
Taking the message: Operation ongoing in bundle for at least 1518.49 seconds without outputting or completing. at face value would suggest CoGroupByKey/MapTuple being blocked for 25 minutes…
Our pipeline logic has no threading and the only I/O we’re doing are:
- Reading from Kafka
- Reading and writing cached records via
ReadModifyWriteStateSpec()
- Using timers for garbage collection
Our individual records are well under 5 kb.