I am trying to implement a gRPC client as a source for my Apache Flink application. The goal is to read streaming data from a gRPC event listener (the gRPC client is salesforce PubSub Api for Change data capture) and process it in Flink. The idea is to schedule multiple grpc client to flink as a source and want to manage this dynamically like remove or adding a new client when required
What is the best way to integrate a gRPC client with Flink as a source? Are there any best practices or patterns I should follow?
So far I tried to create a custom source as
class GrpcSource(Source):
def __init__(self, argument_dict):
self.argument_dict = argument_dict
self.running = True
def run(self, ctx):
cdc_listener = PubSub(self.argument_dict)
cdc_listener.auth()
cdc_listener.subscribe(
self.argument_dict["topic"], "LATEST", "", 1, self.process_event, ctx
)
def process_event(self, event, pubsub, ctx):
if event.events:
for evt in event.events:
payload_bytes = evt.event.payload
schema_id = evt.event.schema_id
decoded_event = pubsub.decode(
pubsub.get_schema_json(schema_id), payload_bytes
)
# Emit the decoded event to the Flink context
ctx.collect(decoded_event)
def cancel(self):
self.running = False
def main(argument_dict):
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
# Define the gRPC source
grpc_source = GrpcSource(argument_dict)
ds = env.from_source(
grpc_source, WatermarkStrategy.for_monotonous_timestamps(), "GrpcSource"
)
# Process the stream (example: print the events)
ds.print()
# Execute the Flink job
env.execute("gRPC Flink Job")
but getting the error as self._j_function AttributeError: 'GrpcSource' object has no attribute '_j_function'
1