Created small sql application in flink .
Read data from kafka and write into hive.
When application start, it create connect to kafka, create catalog, set hive dialect and execute create table in hive.
When execute CREATE HIVE TABLE in flink i get error:
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_412]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_412]
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) ~[?:1.8.0_412]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ~[?:1.8.0_412]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_412]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_412]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:337) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[flink-dist-1.19.0.jar:1.19.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_412]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_412]
at org.apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:172) ~[?:?]
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59) [flink-rpc-akka52c5a613-0102-42f6-86ff-a2a04d0f7629.jar:1.19.0]
at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57) [flink-rpc-akka52c5a613-0102-42f6-86ff-a2a04d0f7629.jar:1.19.0]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_412]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_412]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_412]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_412]
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
... 13 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not find any factory for identifier 'hive' that implements 'org.apache.flink.table.delegation.ParserFactory' in the classpath.
Available factory identifiers are:
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.19.0.jar:1.19.0]
... 12 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hive' that implements 'org.apache.flink.table.delegation.ParserFactory' in the classpath.
Java code:
package org.my.nrt;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Main {
public static void main(String[] args) {
String createCatalog = "CREATE CATALOG hive WITH (n" +
" 'type' = 'hive',n" +
" 'hive-conf-dir' = '/etc/hive/conf'n" +
");";
String useCatalog = "USE CATALOG hive;";
String ddlTable = "create table if not exists default.tab_1n" +
"(n" +
" col_1 string,n" +
" col_2 string,n" +
" col_3 string,n" +
" col_4 stringn" +
")n" +
"partitioned by (dt STRING, hr STRING)n" +
"stored as textfilen" +
"tblproperties (n" +
" 'sink.partition-commit.trigger' = 'partition-time',n" +
" 'sink.partition-commit.delay' = '1 min',n" +
" 'sink.partition-commit.policy.kind' = 'metastore,success-file',n" +
" 'partition.time-extractor.timestamp-pattern' = '$dt $hr:00:00'n" +
");";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(createCatalog);
tEnv.executeSql(useCatalog);
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.executeSql(ddlTable);
}
}
HiveParserFactory exists in META-INF/service/org.apache.flink.table.factories.Factory in my JAR file.
Why i get this error?
3