I have an app written using the Twisted framework, that communicates various data (mostly – log entries in JSON format) to a variety of logging servers using output plug-ins. I would like to make it able to send the data to a Kafka server, too – but I am hitting some kind of problem that I don’t know how to solve.
If I send data to the Kafka server in a straightforward way using Python and the kafka-python
module, everything works just fine:
from json import dumps
from kafka import KafkaProducer
site = '<KAFKA SERVER>'
port = 9092
username = '<USERNAME>'
password = '<PASSWORD>'
topic = 'test'
producer = KafkaProducer(
bootstrap_servers='{}:{}'.format(site, port),
value_serializer=lambda v: bytes(str(dumps(v)).encode('utf-8')),
sasl_mechanism='SCRAM-SHA-256',
security_protocol='SASL_SSL',
sasl_plain_username=username,
sasl_plain_password=password
)
event = {
'message': 'Test message'
}
try:
producer.send(topic, event)
producer.flush()
print('Message sent.')
except Exception as e:
print('Error producing message: {}'.format(e))
finally:
producer.close()
However, if I try to send it from my actual Twisted app, using pretty much the same code, it hangs:
from json import dumps
from core import output
from kafka import KafkaProducer
from twisted.python.log import msg
class Output(output.Output):
def start(self):
site = '<KAFKA SERVER>'
port = 9092
username = '<USERNAME>'
password = '<PASSWORD>'
self.topic = 'test'
self.topic = CONFIG.get('output_kafka', 'topic', fallback='mssqlpot')
self.producer = KafkaProducer(
bootstrap_servers='{}:{}'.format(site, port),
value_serializer=lambda v: bytes(str(dumps(v)).encode('utf-8')),
sasl_mechanism='SCRAM-SHA-256',
security_protocol='SASL_SSL',
sasl_plain_username=username,
sasl_plain_password=password
)
def stop(self):
self.producer.flush()
self.producer.close()
def write(self, event):
try:
self.producer.send(self.topic, event)
self.producer.flush()
except Exception as e:
msg('Kafka error: {}'.format(e))
(This won’t run out-of-the box; it’s just the plug-in and uses a generic Output
class defined elsewhere.)
In particular, it hangs at the self.producer.send(self.topic, event)
line.
I think that the problem comes from the fact that the Kafka producer in kafka-python
is synchronous (blocking) and Twisted requires asynchronous (non-blocking) code. There is an asynchronous Kafka module, called afkak
– but it doesn’t seem to provide authentication with the Kafka server, so it is not suitable for my needs.
The way I understand it, the way to get around such problems in Twisted is to use deferreds. However, I have been unable to understand how exactly to do it. If I rewrite the write
method like this
def write(self, event):
d = threads.deferToThread(self.postentry, event)
d.addCallback(self.postentryCallback)
return d
def postentryCallback(self):
reactor.stop()
def postentry(self, event):
try:
self.producer.send(self.topic, event)
self.producer.flush()
except Exception as e:
msg('Kafka error: {}'.format(e))
it no longer hangs when trying to send data to the Kafka server – but it hangs when the application terminates and nothing is send to the Kafka server anyway (which I can verify with a separate, consumer script, written in pure Python).
Any ideas what I am doing wrong and how to fix it?
Twisted has an asynchronous implementation of some typical otherwise blocking code, like making GET/POST requests to a web server – but Kafka is not a web server, it runs on port 9092, so I don’t think that I can use that.