This is a simplified version of the DoFn I run in Google Cloud Dataflow:
class ExecuteFn(beam.DoFn):
def __init__():
self._records_to_log: list[ExecutionRecord] = []
def setup(self) -> None:
self._executor = Executor(...)
def process(
self, element: dict[str, Any]
) -> Iterable[ExecutionOutput | TaggedOutput]:
assert self._executor is not None
try:
result, record = self._executor.execute(element)
self._records_to_log.append(record)
except Error as error:
yield TaggedOutput("errors", str(message))
else:
yield result
def start_bundle(self):
self._records_to_log = []
def finish_bundle(self):
log_executions(self._records_to_log)
Here’s how the dofn is called:
input_dicts | "Execute" >> beam.ParDo(
ExecuteFn(
self._organization_name,
self._static_policy,
self._version_name,
self._backtest,
)
).with_outputs("errors", main="main")
The code run was expected on the direct runner and in unit tests, but in Dataflow start_bundle and finish_bundle are never called.
What I’ve tried:
- To ensure finish_bundle is rally never called I’ve added log statements to the start_bundle and finish_bundle. In unit tests and with the direct runner these messages show up, as is executed the behavior. In GCP’s dataflow neither happens. I’m sure the logging works because I have used it elsewhere.
- To eliminate the possibility it’s a recent bug, I’ve tried using Apache Beam 2.56 and 2.55.1, and had the same problem.
New contributor
David Reis is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.