I am running a Flink job locally using
python -m job.py
and it runs fine.
The job is:
calcul_count = t_env.execute_sql("""
SELECT
username,
COUNT(action) AS a_count
FROM kafka_logs
GROUP BY username
""")
with calcul_count.collect() as results:
for row in results:
print(row)
When i try to submit it to the Flink JobManager running as a Docker service with:
docker-compose exec -e KAFKA_BOOTSTRAP_SERVERS="kafka:9092" jobmanager flink run -py /opt/app/job.py
I am getting error:
for row in results:
File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_result.py", line 240, in __next__
File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco
File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o176.hasNext.
: java.lang.RuntimeException: Failed to fetch next result
Caused by: java.io.IOException: Failed to fetch job execution result
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: class org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
This is the flink cluster Dockerfile:
FROM flink:1.18.1-scala_2.12
RUN apt-get update &&
apt-get install -y python3 python3-pip &&
ln -s /usr/bin/python3 /usr/bin/python &&
apt-get clean &&
rm -rf /var/lib/apt/lists/*
RUN pip install apache-flink==1.18.1
# Download the necessary Flink connector jars
ADD https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.1.0-1.18/flink-sql-connector-kafka-3.1.0-1.18.jar /opt/flink/lib/
ADD https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/1.18.1/flink-json-1.18.1.jar /opt/flink/lib/
WORKDIR /opt/flink
Is it a library missing from the Flink cluster? What would cause that error?
I have also tried to run the job.py by submitting it on a Flink cluster on EMR which has the same version of Flink as my local Flink cluster which runs as a Docker container with:
/usr/lib/flink/bin/flink run -py /opt/app/job.py
after copying the job on the Flink JobManager on EMR but i am getting error:
calcul_count = t_env.execute_sql("""
File "/usr/lib/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 837, in execute_sql
File "/usr/lib/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/usr/lib/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco
File "/usr/lib/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o16.executeSql.
: java.lang.NoSuchFieldError: SPOT_INSTANCE_INTERRUPTION_NOTICE_DURATION
at org.apache.flink.streaming.api.environment.CheckpointConfig.setSpotInstanceInterruptionNoticeDuration(CheckpointConfig.java:891)
at org.apache.flink.streaming.api.graph.StreamGraph.setSpotInstanceInterruptionNoticeDuration(StreamGraph.java:182)