I started a yarn-session cluster with the following command.
bin/yarn-session.sh
-s 1
-tm 2048
-Dtaskmanager.memory.process.size=2048mb
-Dtaskmanager.memory.managed.size=0mb
-Dtaskmanager.numberOfTaskSlots=2
By checking the launch_container.sh log, I found that the memory parameters related to the task manager startup are as follows.
exec /bin/bash -c "$JAVA_HOME/bin/java -Xmx1363652112 -Xms1363652112 -XX:MaxDirectMemorySize=300647712 -XX:MaxMetaspaceSize=268435456
...
-Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner
-D taskmanager.memory.network.min=166429984b
-D taskmanager.memory.task.off-heap.size=0b
-D taskmanager.memory.jvm-metaspace.size=268435456b
-D taskmanager.memory.jvm-overhead.min=214748368b
-D taskmanager.memory.framework.off-heap.size=134217728b
-D taskmanager.memory.network.max=166429984b
-D taskmanager.memory.framework.heap.size=134217728b
-D taskmanager.memory.managed.size=0b
-D taskmanager.memory.task.heap.size=1229434384b
-D taskmanager.numberOfTaskSlots=2
-D taskmanager.memory.jvm-overhead.max=214748368b
-Djobmanager.memory.jvm-overhead.min='201326592b'
-Djobmanager.memory.off-heap.size='134217728b'
-Djobmanager.memory.jvm-metaspace.size='268435456b'
-Djobmanager.memory.heap.size='1073741824b'
-Djobmanager.memory.jvm-overhead.max='201326592b'
The memory allocation observed through the web UI is shown in the figure below.
enter image description here
doubt-1:
In the web UI, the off-heap memory should be “framework offheap” + “network” + “task off-heap” = 128m + 159m + 0 = 287m.
However, in the “advanced” section of the web UI, the “Outside JVM Memory” displayed is the “direct memory capacity” = 159m, which does not correspond to the above 287m. Why is that?
doubt-2:
In the “JVM (heap/non-heap) memory” section of the advanced block in the web UI, the heap is 629m, which does not correspond to the “framework heap” + “task heap” = 128m+538m=666m in the Flink memory model. Why is that?
doubt-3:
I submitted a job that uses the following code to use off-heap memory.
stream.map(new RichMapFunction<String, String>() {
List<ByteBuffer> lst = new ArrayList<>();
@Override
public String map(String s) throws Exception {
for(int i=1;i<=1000;i++){
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(2048000);
lst.add(byteBuffer);
System.out.println(i+" * 2m ==> " + (i*2) + " m" );
Thread.sleep(500);
}
return s;
}
However, the default configuration for the task’s off-heap memory is 0. Theoretically, my code should throw an error as soon as it runs, but in reality, it runs to the extent shown below before throwing an error. Why is that?
My confusion lies in the three inconsistencies mentioned above. I am unsure if it is due to my misunderstanding of the Flink memory model or if there are some underlying details not explained in the official documentation.
coder black is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.