I have an Apache Hudi job that reads data for one day from S3 and writes it to a Hudi table with clustering enabled on a event_name column. My job was running fine for one month, but then one day suddenly I got this error
<code>Exception in thread "main" java.util.concurrent.CompletionException: java.util.concurrent.CancellationException
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
at java.base/java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1498)
at java.base/java.util.concurrent.CompletableFuture$CoCompletion.tryFire(CompletableFuture.java:1219)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2480)
at org.apache.hudi.common.util.FutureUtils.lambda$null$0(FutureUtils.java:52)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.apache.hudi.common.util.FutureUtils.lambda$null$1(FutureUtils.java:52)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2480)
at org.apache.hudi.common.util.FutureUtils.lambda$null$0(FutureUtils.java:52)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.apache.hudi.common.util.FutureUtils.lambda$null$1(FutureUtils.java:52)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.util.concurrent.CancellationException
at java.base/java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2478)
... 10 more
</code>
<code>Exception in thread "main" java.util.concurrent.CompletionException: java.util.concurrent.CancellationException
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
at java.base/java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1498)
at java.base/java.util.concurrent.CompletableFuture$CoCompletion.tryFire(CompletableFuture.java:1219)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2480)
at org.apache.hudi.common.util.FutureUtils.lambda$null$0(FutureUtils.java:52)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.apache.hudi.common.util.FutureUtils.lambda$null$1(FutureUtils.java:52)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2480)
at org.apache.hudi.common.util.FutureUtils.lambda$null$0(FutureUtils.java:52)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.apache.hudi.common.util.FutureUtils.lambda$null$1(FutureUtils.java:52)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.util.concurrent.CancellationException
at java.base/java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2478)
... 10 more
</code>
Exception in thread "main" java.util.concurrent.CompletionException: java.util.concurrent.CancellationException
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
at java.base/java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1498)
at java.base/java.util.concurrent.CompletableFuture$CoCompletion.tryFire(CompletableFuture.java:1219)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2480)
at org.apache.hudi.common.util.FutureUtils.lambda$null$0(FutureUtils.java:52)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.apache.hudi.common.util.FutureUtils.lambda$null$1(FutureUtils.java:52)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2480)
at org.apache.hudi.common.util.FutureUtils.lambda$null$0(FutureUtils.java:52)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.apache.hudi.common.util.FutureUtils.lambda$null$1(FutureUtils.java:52)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.util.concurrent.CancellationException
at java.base/java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2478)
... 10 more
I am running this job on spark EMR with below configuration and commandline
<code> masterNodeInstanceType: m6g.2xlarge
coreNode:
instanceType: m6g.2xlarge
instanceCount: 16
</code>
<code> masterNodeInstanceType: m6g.2xlarge
coreNode:
instanceType: m6g.2xlarge
instanceCount: 16
</code>
masterNodeInstanceType: m6g.2xlarge
coreNode:
instanceType: m6g.2xlarge
instanceCount: 16
<code>/usr/bin/spark-submit --master yarn --deploy-mode client --driver-memory 8g --packages org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0 --executor-memory 4g --class AppEventsETLClusteredDayRefresh data-analytics.jar
</code>
<code>/usr/bin/spark-submit --master yarn --deploy-mode client --driver-memory 8g --packages org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0 --executor-memory 4g --class AppEventsETLClusteredDayRefresh data-analytics.jar
</code>
/usr/bin/spark-submit --master yarn --deploy-mode client --driver-memory 8g --packages org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0 --executor-memory 4g --class AppEventsETLClusteredDayRefresh data-analytics.jar
Below is my write statement
<code>
val additionalOptions = Map(
"hoodie.datasource.hive_sync.enable" -> "true",
"hoodie.table.name" -> tableName,
"hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
"hoodie.datasource.write.operation" -> "insert",
"hoodie.datasource.write.partitionpath.field" -> "data_date",
"hoodie.datasource.hive_sync.database" -> databaseName,
"hoodie.datasource.hive_sync.table" -> tableName,
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.write.drop.partition.columns" -> "true",
"hoodie.datasource.hive_sync.partition_fields" -> "data_date",
"hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc" -> "false",
"hoodie.datasource.write.drop.partition.columns" -> "true",
"hoodie.datasource.hive_sync.mode" -> "hms",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"path" -> targetBasePath,
"hoodie.bulkinsert.sort.mode"-> "NONE",
"hoodie.bulkinsert.shuffle.parallelism"->"800",
"hoodie.schema.on.read.enable"->"true",
"hoodie.layout.optimize.strategy"->"z-order",
"hoodie.clustering.plan.strategy.target.file.max.bytes"->"true",
"hoodie.clustering.plan.strategy.sort.columns"->"ep_event_name",
"hoodie.clustering.inline"-> "true",
"hoodie.clustering.inline.max.commits"-> "1",
"hoodie.clustering.plan.strategy.small.file.limit"->"1073741824",
"hoodie.clustering.plan.strategy.target.file.max.bytes"-> "134217728",
"hoodie.clustering.plan.strategy.max.num.groups"-> "4096",
"hoodie.clustering.rollback.pending.replacecommit.on.conflict"-> "true",
"hoodie.clustering.updates.strategy"-> "org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy",
"hoodie.clustering.async.enabled" -> "true"
)
df.write.format("hudi")
.options(additionalOptions)
.mode("append")
.save()
</code>
<code>
val additionalOptions = Map(
"hoodie.datasource.hive_sync.enable" -> "true",
"hoodie.table.name" -> tableName,
"hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
"hoodie.datasource.write.operation" -> "insert",
"hoodie.datasource.write.partitionpath.field" -> "data_date",
"hoodie.datasource.hive_sync.database" -> databaseName,
"hoodie.datasource.hive_sync.table" -> tableName,
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.write.drop.partition.columns" -> "true",
"hoodie.datasource.hive_sync.partition_fields" -> "data_date",
"hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc" -> "false",
"hoodie.datasource.write.drop.partition.columns" -> "true",
"hoodie.datasource.hive_sync.mode" -> "hms",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"path" -> targetBasePath,
"hoodie.bulkinsert.sort.mode"-> "NONE",
"hoodie.bulkinsert.shuffle.parallelism"->"800",
"hoodie.schema.on.read.enable"->"true",
"hoodie.layout.optimize.strategy"->"z-order",
"hoodie.clustering.plan.strategy.target.file.max.bytes"->"true",
"hoodie.clustering.plan.strategy.sort.columns"->"ep_event_name",
"hoodie.clustering.inline"-> "true",
"hoodie.clustering.inline.max.commits"-> "1",
"hoodie.clustering.plan.strategy.small.file.limit"->"1073741824",
"hoodie.clustering.plan.strategy.target.file.max.bytes"-> "134217728",
"hoodie.clustering.plan.strategy.max.num.groups"-> "4096",
"hoodie.clustering.rollback.pending.replacecommit.on.conflict"-> "true",
"hoodie.clustering.updates.strategy"-> "org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy",
"hoodie.clustering.async.enabled" -> "true"
)
df.write.format("hudi")
.options(additionalOptions)
.mode("append")
.save()
</code>
val additionalOptions = Map(
"hoodie.datasource.hive_sync.enable" -> "true",
"hoodie.table.name" -> tableName,
"hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
"hoodie.datasource.write.operation" -> "insert",
"hoodie.datasource.write.partitionpath.field" -> "data_date",
"hoodie.datasource.hive_sync.database" -> databaseName,
"hoodie.datasource.hive_sync.table" -> tableName,
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.write.drop.partition.columns" -> "true",
"hoodie.datasource.hive_sync.partition_fields" -> "data_date",
"hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc" -> "false",
"hoodie.datasource.write.drop.partition.columns" -> "true",
"hoodie.datasource.hive_sync.mode" -> "hms",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"path" -> targetBasePath,
"hoodie.bulkinsert.sort.mode"-> "NONE",
"hoodie.bulkinsert.shuffle.parallelism"->"800",
"hoodie.schema.on.read.enable"->"true",
"hoodie.layout.optimize.strategy"->"z-order",
"hoodie.clustering.plan.strategy.target.file.max.bytes"->"true",
"hoodie.clustering.plan.strategy.sort.columns"->"ep_event_name",
"hoodie.clustering.inline"-> "true",
"hoodie.clustering.inline.max.commits"-> "1",
"hoodie.clustering.plan.strategy.small.file.limit"->"1073741824",
"hoodie.clustering.plan.strategy.target.file.max.bytes"-> "134217728",
"hoodie.clustering.plan.strategy.max.num.groups"-> "4096",
"hoodie.clustering.rollback.pending.replacecommit.on.conflict"-> "true",
"hoodie.clustering.updates.strategy"-> "org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy",
"hoodie.clustering.async.enabled" -> "true"
)
df.write.format("hudi")
.options(additionalOptions)
.mode("append")
.save()