I’m trying to use Apache Beam’s Python SDK to read data from a Kinesis Strem. I am using ReadDataFromKinesis
. This should use an external service writtent in Java to read data from the Kinesis stream. However, I’m encountering issues when running the pipeline. The pipeline start and run the external service jar(beam-sdks-java-io-kinesis-expansion-service-2.56.0.jar
, downloaded automatically) but when the external service is executed a got a NullPointerException
and the pipeline stops.
Pipeline code that should read Data from Kinesis:
import apache_beam as beam
from apache_beam.io.kinesis import ReadDataFromKinesis
if __name__ == '__main__':
with beam.Pipeline() as pipeline:
events = pipeline | 'ReadFromKinesis' >> ReadDataFromKinesis(
stream_name='your_stream_name',
aws_access_key=<aws_key>,
aws_secret_key=<aws_secret>
)
events | 'Print' >> beam.Map(print)
Error Message when run pipeline locally with DirectRun
:
WARNING:root:severity: WARN
timestamp {
seconds: 1716805967
nanos: 27000000
}
message: "Exception while splitting source. Source not split."
trace: "java.lang.RuntimeException: Unknown kinesis failure, when trying to reach kinesisntat org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:356)ntat org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.listShardsAtPoint(SimplifiedKinesisClient.java:130)ntat org.apache.beam.sdk.io.kinesis.DynamicCheckpointGenerator.generate(DynamicCheckpointGenerator.java:44)ntat org.apache.beam.sdk.io.kinesis.KinesisSource.split(KinesisSource.java:96)ntat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:541)ntat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown Source)ntat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:891)ntat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)ntat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)ntat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1792)ntat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:828)ntat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)ntat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)ntat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1792)ntat org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)ntat org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2650)ntat org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1068)ntat org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown Source)ntat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:803)ntat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)ntat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)ntat org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)ntat org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:158)ntat org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:123)ntat org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:550)ntat org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)ntat org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)ntat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)ntat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)ntat org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)ntat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)ntat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)ntat java.base/java.lang.Thread.run(Thread.java:840)nCaused by: java.lang.NullPointerException: Cannot invoke "org.apache.beam.sdk.io.kinesis.StartingPoint.getPosition()" because "startingPoint" is nullntat org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.buildShardFilterForStartingPoint(SimplifiedKinesisClient.java:136)ntat org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$listShardsAtPoint$1(SimplifiedKinesisClient.java:130)ntat org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:339)nt... 32 moren"
instruction_id: "bundle_1"
transform_id: "ReadFromKinesis/Read(KinesisSource)/ParDo(UnboundedSourceAsSDFWrapper)/ParMultiDo(UnboundedSourceAsSDFWrapper)/SplitAndSizeRestriction"
log_location: "org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn"
thread: "34"
ERROR:root:severity: ERROR
timestamp {
seconds: 1716805967
nanos: 35000000
}
message: "Failed to process element for bundle "bundle_1""
trace: "org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException: Cannot invoke "org.apache.beam.sdk.io.kinesis.ShardReadersPool.getLatestRecordTimestamp()" because "this.shardReadersPool" is nullntat org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)ntat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown Source)ntat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:891)ntat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)ntat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)ntat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1792)ntat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:828)ntat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)ntat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)ntat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1792)ntat org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)ntat org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2650)ntat org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1068)ntat org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown Source)ntat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:803)ntat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)ntat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)ntat org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)ntat org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:158)ntat org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:123)ntat org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:550)ntat org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)ntat org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)ntat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)ntat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)ntat org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)ntat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)ntat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)ntat java.base/java.lang.Thread.run(Thread.java:840)nCaused by: java.lang.NullPointerException: Cannot invoke "org.apache.beam.sdk.io.kinesis.ShardReadersPool.getLatestRecordTimestamp()" because "this.shardReadersPool" is nullntat org.apache.beam.sdk.io.kinesis.KinesisReader.getSplitBacklogBytes(KinesisReader.java:172)ntat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:1033)ntat org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432)ntat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown Source)ntat org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2502)ntat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:547)n"
instruction_id: "bundle_1"
transform_id: "ReadFromKinesis/Read(KinesisSource)/ParDo(UnboundedSourceAsSDFWrapper)/ParMultiDo(UnboundedSourceAsSDFWrapper)/SplitAndSizeRestriction"
log_location: "org.apache.beam.fn.harness.data.PCollectionConsumerRegistry"
thread: "34"
ERROR:root:severity: ERROR
timestamp {
seconds: 1716805967
nanos: 37000000
}
message: "Exception while trying to handle InstructionRequest bundle_1"
trace: "org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException: Cannot invoke "org.apache.beam.sdk.io.kinesis.ShardReadersPool.getLatestRecordTimestamp()" because "this.shardReadersPool" is nullntat org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)ntat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown Source)ntat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:891)ntat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)ntat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)ntat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1792)ntat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:828)ntat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)ntat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)ntat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1792)ntat org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)ntat org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2650)ntat org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1068)ntat org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown Source)ntat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:803)ntat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)ntat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)ntat org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)ntat org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:158)ntat org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:123)ntat org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:550)ntat org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)ntat org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)ntat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)ntat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)ntat org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)ntat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)ntat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)ntat java.base/java.lang.Thread.run(Thread.java:840)nCaused by: java.lang.NullPointerException: Cannot invoke "org.apache.beam.sdk.io.kinesis.ShardReadersPool.getLatestRecordTimestamp()" because "this.shardReadersPool" is nullntat org.apache.beam.sdk.io.kinesis.KinesisReader.getSplitBacklogBytes(KinesisReader.java:172)ntat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:1033)ntat org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432)ntat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown Source)ntat org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2502)ntat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:547)n"
instruction_id: "bundle_1"
log_location: "org.apache.beam.fn.harness.control.BeamFnControlClient"
thread: "34"
RuntimeError: org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException: Cannot invoke "org.apache.beam.sdk.io.kinesis.ShardReadersPool.getLatestRecordTimestamp()" because "this.shardReadersPool" is null
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown Source)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:891)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1792)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:828)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1792)
at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2650)
at org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1068)
at org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:803)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:158)
at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:123)
at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:550)
at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.beam.sdk.io.kinesis.ShardReadersPool.getLatestRecordTimestamp()" because "this.shardReadersPool" is null
at org.apache.beam.sdk.io.kinesis.KinesisReader.getSplitBacklogBytes(KinesisReader.java:172)
at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:1033)
at org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432)
at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown Source)
at org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2502)
at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:547)
I had also tryed run external service manually running java -jar beam-sdks-java-io-kinesis-expansion-service-2.56.0.jar 25000
and setting expansion_service
to localhost:25000
in ReadDataFromKinesis
instantiation.
The service seems to works and receive request correctly but not usefull logs are showed there.
Has anyone faced similar issues with ReadDataFromKinesis in external services?
What could be potential reasons for this behavior?
Are there any specific configurations or considerations needed for using ReadDataFromKinesis ?
Thanks in advance for any help!