I run a Flink application on standalone mode with
<code>python -m flink_test.flink_job
<code>python -m flink_test.flink_job
</code>
python -m flink_test.flink_job
and it is able to connect to the Kafka broker.
In my flink_job.py i have:
<code>BOOTSTRAP_SERVERS = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:9094")
<code>BOOTSTRAP_SERVERS = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:9094")
</code>
BOOTSTRAP_SERVERS = os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "localhost:9094")
And the Docker container (docker-compose.yml) that runs the flink_job.py has:
KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"
<code> environment:
KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"
</code>
environment:
KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"
When i am trying to submit it to the Flink cluster (another Docker container),
with:
<code>docker-compose exec jobmanager flink run -py /opt/flink_app/flink_job.py -d
<code>docker-compose exec jobmanager flink run -py /opt/flink_app/flink_job.py -d
</code>
docker-compose exec jobmanager flink run -py /opt/flink_app/flink_job.py -d
It was trying to connect to localhost:9094, and couldn’t read the Kafka messages.
Then i tried:
<code>docker-compose exec -e KAFKA_BOOTSTRAP_SERVERS=kafka:9092 jobmanager flink run -py /opt/flink_app/flink_job.py -d
<code>docker-compose exec -e KAFKA_BOOTSTRAP_SERVERS=kafka:9092 jobmanager flink run -py /opt/flink_app/flink_job.py -d
</code>
docker-compose exec -e KAFKA_BOOTSTRAP_SERVERS=kafka:9092 jobmanager flink run -py /opt/flink_app/flink_job.py -d
And it is failing with error message:
<code>py4j.protocol.Py4JJavaError: An error occurred while calling o176.hasNext.
: java.lang.RuntimeException: Failed to fetch next result
Caused by: org.apache.flink.runtime.JobException: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: class org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
<code>py4j.protocol.Py4JJavaError: An error occurred while calling o176.hasNext.
: java.lang.RuntimeException: Failed to fetch next result
...
Caused by: org.apache.flink.runtime.JobException: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
...
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: class org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
</code>
py4j.protocol.Py4JJavaError: An error occurred while calling o176.hasNext.
: java.lang.RuntimeException: Failed to fetch next result
...
Caused by: org.apache.flink.runtime.JobException: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
...
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: class org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
the Flink job is submitted to the Flink JobManager but it never actually runs.
Any idea what is the difference when i run it on standalone and local cluster?
In the Flink JobManager i don’t specify and KAFKA_BOOTSTRAP_SERVERS but i don’t think it is needed there.
I explained above with the different runs.