This is last question
When I learned from leonard that the pipeline could not be used,then I am trying to synchronize data between SQL Server databases using Flink Table API
. Here is my environment and what I have done so far:
Environment
- Flink 1.19.1
- Flink CDC 3.1.0
- SQL Server 2022
- OpenJDK 11.0.23
Steps Taken
- Followed the example code from the Flink documentation.
- Created a Java application in IntelliJ IDEA.
- Run App.java but error!
- If 3 is success,then submit the task by jar .
App.java:
package org.example;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
public class App {
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql("CREATE TABLE orders (n" +
"id INTEGER NOT NULL ,n" +
"order_date DATE NOT NULL,n" +
"purchaser INTEGER NOT NULL,n" +
"quantity INTEGER NOT NULL,n" +
"product_id INTEGER NOT NULLn" +
") WITH (n" +
"'connector' = 'sqlserver-cdc', n" +
"'hostname' = 'x.x.x.x', n" +
"'port' = '1433', n" +
"'username' = 'sa', n" +
"'password' = 'xxxxx', n" +
"'database-name' = 'TEST_FOR_FLINK', n" +
"'table-name' = 'dbo.orders' n" +
");");
tEnv.executeSql("n" +
"CREATE TABLE orders_dw (n" +
" id INTEGER NOT NULL,n" +
" order_date DATE NOT NULL,n" +
" purchaser INTEGER NOT NULL,n" +
" quantity INTEGER NOT NULL,n" +
"product_id INTEGER NOT NULL,n" +
"PRIMARY KEY (id) NOT ENFORCED n" +
") WITH (n" +
" 'connector' = 'jdbc', n" +
" 'url' = 'jdbc:sqlserver://x.x.x.x:x;databaseName=DW', n" +
" 'username' = 'sa', n" +
" 'password' = 'xxxxx', n" +
" 'table-name' = 'dbo.orders_from_flink' n" +
");");
Table transactions = tEnv.from("orders");
transactions.executeInsert("orders_dw");
}
}
POM Dependencies:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>FLinkSQL</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>FLinkSQL</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<flink.version>1.19.1</flink.version>
<flink.cdc.version>3.1.0</flink.cdc.version>
<sqlserver.jdbc.version>12.6.3.jre11</sqlserver.jdbc.version>
</properties>
<dependencies>
<!-- Flink dependencies -->
<!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
<version>3.0.1</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.19.1</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.19.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>12.6.3.jre11</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-test-utils</artifactId>
<version>1.19.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>org.example.App</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
Error message:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:605)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:241)
at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:99)
at org.example.App.main(App.java:12)
When I run the main method, I encounter the above error. However, using the Flink SQL CLI in my test environment works fine,I would appreciate any help to resolve this issue.
If you need more information or if I missed anything, please let me know. Thank you!
I would also like to understand how to analyze or resolve such issues. For instance, I suspect it might be related to dependency packages. Could you recommend any keywords or documentation that I should be familiar with to address this problem?
潘德拉贡阿尔托莉雅 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
3