New to kafka, trying to get a working system for handling high throughput. Testing on docker windows. It works, but if I use K6 to do a load test, it stops after publishing around 28000 msgs with the following error.
<code>phpApp | %3|1718821570.096|FAIL|rdkafka#producer-2836| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Failed to connect to broker at kafka-3.7.kafka-producer-k6_default:9092: Cannot assign requested address (after 6ms in state CONNECT)
phpApp | %3|1718821570.096|FAIL|rdkafka#producer-2816| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Failed to connect to broker at kafka-3.7.kafka-producer-k6_default:9092: Cannot assign requested address (after 6ms in state CONNECT, 1 identical error(s) suppressed)
<code>phpApp | %3|1718821570.096|FAIL|rdkafka#producer-2836| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Failed to connect to broker at kafka-3.7.kafka-producer-k6_default:9092: Cannot assign requested address (after 6ms in state CONNECT)
phpApp | %3|1718821570.096|FAIL|rdkafka#producer-2816| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Failed to connect to broker at kafka-3.7.kafka-producer-k6_default:9092: Cannot assign requested address (after 6ms in state CONNECT, 1 identical error(s) suppressed)
</code>
phpApp | %3|1718821570.096|FAIL|rdkafka#producer-2836| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Failed to connect to broker at kafka-3.7.kafka-producer-k6_default:9092: Cannot assign requested address (after 6ms in state CONNECT)
phpApp | %3|1718821570.096|FAIL|rdkafka#producer-2816| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Failed to connect to broker at kafka-3.7.kafka-producer-k6_default:9092: Cannot assign requested address (after 6ms in state CONNECT, 1 identical error(s) suppressed)
I have tried a few different things from what I could find, but they all lead to the same issue. At present, I have the following in Dockercompose.yml kafka section
image: 'bitnami/kafka:3.7.0'
container_name: kafka-3.7
# KRaft settings. No longer using zookeeper method.
- KAFKA_KRAFT_CLUSTER_ID=NDllYzhlNzNjMmZmNDEyNT
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_ADVERTISED_HOST_NAME=kafka
- KAFKA_AUTO_CREATE_TOPICS_ENABLE:true
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
<code>kafka:
image: 'bitnami/kafka:3.7.0'
container_name: kafka-3.7
ports:
- 9092:9092
environment:
# KRaft settings. No longer using zookeeper method.
- KAFKA_CFG_NODE_ID=0
- KAFKA_KRAFT_CLUSTER_ID=NDllYzhlNzNjMmZmNDEyNT
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_ADVERTISED_HOST_NAME=kafka
- KAFKA_AUTO_CREATE_TOPICS_ENABLE:true
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
</code>
kafka:
image: 'bitnami/kafka:3.7.0'
container_name: kafka-3.7
ports:
- 9092:9092
environment:
# KRaft settings. No longer using zookeeper method.
- KAFKA_CFG_NODE_ID=0
- KAFKA_KRAFT_CLUSTER_ID=NDllYzhlNzNjMmZmNDEyNT
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_ADVERTISED_HOST_NAME=kafka
- KAFKA_AUTO_CREATE_TOPICS_ENABLE:true
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
Producer.php
$payload=trim($_POST['payload']);
$conf = new RdKafkaConf();
$conf->set('metadata.broker.list', 'kafka:9092');
$producer = new RdKafkaProducer($conf);
$kafkaTopic = $producer->newTopic($topic);
$kafkaTopic->produce(0, 0, $payload);
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(5000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
<code><?php
$payload=trim($_POST['payload']);
$conf = new RdKafkaConf();
$conf->set('metadata.broker.list', 'kafka:9092');
$producer = new RdKafkaProducer($conf);
$topic="testData";
$kafkaTopic = $producer->newTopic($topic);
$kafkaTopic->produce(0, 0, $payload);
$producer->poll(0);
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(5000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
?>
</code>
<?php
$payload=trim($_POST['payload']);
$conf = new RdKafkaConf();
$conf->set('metadata.broker.list', 'kafka:9092');
$producer = new RdKafkaProducer($conf);
$topic="testData";
$kafkaTopic = $producer->newTopic($topic);
$kafkaTopic->produce(0, 0, $payload);
$producer->poll(0);
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(5000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
?>
For K6, I am generating a string that is some 200 bytes long, with the following setup.
<code>export let options = {
vus: 30, // 30 virtual users
duration: '30s', // Run test for 30 seconds
export default function () {
let fakeData = generateFakeDataStringForKafka();
let res = http.post('http://localhost/producer.php', `payload=${fakeData}`, {
'Content-Type': 'application/x-www-form-urlencoded',
<code>export let options = {
vus: 30, // 30 virtual users
duration: '30s', // Run test for 30 seconds
};
export default function () {
// Generate data
let fakeData = generateFakeDataStringForKafka();
let res = http.post('http://localhost/producer.php', `payload=${fakeData}`, {
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
},
});
}
</code>
export let options = {
vus: 30, // 30 virtual users
duration: '30s', // Run test for 30 seconds
};
export default function () {
// Generate data
let fakeData = generateFakeDataStringForKafka();
let res = http.post('http://localhost/producer.php', `payload=${fakeData}`, {
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
},
});
}
And this keeps running, until hitting around 28000 publishes, after which the kafka error happens.
<code> data_received..................: 6.0 MB 174 kB/s
data_sent......................: 11 MB 311 kB/s
http_req_blocked...............: avg=10.97µs min=0s med=0s max=9.51ms p(90)=0s p(95)=0s
http_req_connecting............: avg=630ns min=0s med=0s max=991.8µs p(90)=0s p(95)=0s
http_req_duration..............: avg=33.99ms min=13.23ms med=21.11ms max=6.02s p(90)=22.47ms p(95)=23.2ms
{ expected_response:true }...: avg=33.99ms min=13.23ms med=21.11ms max=6.02s p(90)=22.47ms p(95)=23.2ms
http_req_failed................: 0.00% ✓ 0 ✗ 28301
http_req_receiving.............: avg=603.81µs min=0s med=526.2µs max=12.01ms p(90)=1.04ms p(95)=1.1ms
http_req_sending...............: avg=17.87µs min=0s med=0s max=1.62ms p(90)=0s p(95)=0s
http_req_tls_handshaking.......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
http_req_waiting...............: avg=33.37ms min=12.7ms med=20.53ms max=6.02s p(90)=21.74ms p(95)=22.37ms
http_reqs......................: 28301 822.663185/s
iteration_duration.............: avg=34.31ms min=16.75ms med=21.38ms max=6.02s p(90)=22.81ms p(95)=23.67ms
iterations.....................: 28301 822.663185/s
vus............................: 10 min=10 max=30
vus_max........................: 30 min=30 max=30
running (0m34.4s), 00/30 VUs, 28301 complete and 0 interrupted iterations
default ✓ [======================================] 30 VUs 30s
<code> data_received..................: 6.0 MB 174 kB/s
data_sent......................: 11 MB 311 kB/s
http_req_blocked...............: avg=10.97µs min=0s med=0s max=9.51ms p(90)=0s p(95)=0s
http_req_connecting............: avg=630ns min=0s med=0s max=991.8µs p(90)=0s p(95)=0s
http_req_duration..............: avg=33.99ms min=13.23ms med=21.11ms max=6.02s p(90)=22.47ms p(95)=23.2ms
{ expected_response:true }...: avg=33.99ms min=13.23ms med=21.11ms max=6.02s p(90)=22.47ms p(95)=23.2ms
http_req_failed................: 0.00% ✓ 0 ✗ 28301
http_req_receiving.............: avg=603.81µs min=0s med=526.2µs max=12.01ms p(90)=1.04ms p(95)=1.1ms
http_req_sending...............: avg=17.87µs min=0s med=0s max=1.62ms p(90)=0s p(95)=0s
http_req_tls_handshaking.......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
http_req_waiting...............: avg=33.37ms min=12.7ms med=20.53ms max=6.02s p(90)=21.74ms p(95)=22.37ms
http_reqs......................: 28301 822.663185/s
iteration_duration.............: avg=34.31ms min=16.75ms med=21.38ms max=6.02s p(90)=22.81ms p(95)=23.67ms
iterations.....................: 28301 822.663185/s
vus............................: 10 min=10 max=30
vus_max........................: 30 min=30 max=30
running (0m34.4s), 00/30 VUs, 28301 complete and 0 interrupted iterations
default ✓ [======================================] 30 VUs 30s
</code>
data_received..................: 6.0 MB 174 kB/s
data_sent......................: 11 MB 311 kB/s
http_req_blocked...............: avg=10.97µs min=0s med=0s max=9.51ms p(90)=0s p(95)=0s
http_req_connecting............: avg=630ns min=0s med=0s max=991.8µs p(90)=0s p(95)=0s
http_req_duration..............: avg=33.99ms min=13.23ms med=21.11ms max=6.02s p(90)=22.47ms p(95)=23.2ms
{ expected_response:true }...: avg=33.99ms min=13.23ms med=21.11ms max=6.02s p(90)=22.47ms p(95)=23.2ms
http_req_failed................: 0.00% ✓ 0 ✗ 28301
http_req_receiving.............: avg=603.81µs min=0s med=526.2µs max=12.01ms p(90)=1.04ms p(95)=1.1ms
http_req_sending...............: avg=17.87µs min=0s med=0s max=1.62ms p(90)=0s p(95)=0s
http_req_tls_handshaking.......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
http_req_waiting...............: avg=33.37ms min=12.7ms med=20.53ms max=6.02s p(90)=21.74ms p(95)=22.37ms
http_reqs......................: 28301 822.663185/s
iteration_duration.............: avg=34.31ms min=16.75ms med=21.38ms max=6.02s p(90)=22.81ms p(95)=23.67ms
iterations.....................: 28301 822.663185/s
vus............................: 10 min=10 max=30
vus_max........................: 30 min=30 max=30
running (0m34.4s), 00/30 VUs, 28301 complete and 0 interrupted iterations
default ✓ [======================================] 30 VUs 30s
What am I missing that is causing the failure on load test? Any hints much appreciated.