Im producing my pyspark dataframe into kafka cluster and i have been trying to optimize my time performance of the records sending . Actually my tests are done with a pyspark dataframe that has 2300 rows , each row is sent to kafka .
Each message has about 395 Kb and it takes about 200 seconds to the producer to send all the records , which is a bad time .
here is my cluster and producer configuration :
• kafka cluster : 3 brokers , 24 partitions in topic
• producer configuration :
df.selectExpr("CAST(retail_outlet_id AS STRING) AS key", "value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", self.job_settings["kafka"]["bootstrap_servers"])
.option("topic", "prices")
.option("kafka.batch.size", 4 *1024 * 1024)
.option("kafka.linger.ms", 100)
.option("kafka.acks", "1")
.option("kafka.max.request.size", 10485760)
.option("compression.type", "snappy")
.save()
also i have pyspark running standalone , with 16g memory on executor with 4 cores
I tried changing batch size and linger ms but without success
What do you suggest me to change to get to a 1000 messages/second ? thanks