I have my jobmanager and taskmanager running as docker services. In same machine, i am having other 2 microservices deployed in same docker as 2 different services (lets say MS1 and MS2). My MS1 is a spring app(not spring boot) that has pipeline creation and executeAsync code. MS2 is spring boot app, from here i want to submit my flink job to jobmanager for that wrote a rest api. Rest api’s code looks like below
private static final int PARALLELISM = 8;
private static final Configuration FLINK_CONFIG = new Configuration();
void foo() throws Exception {
FLINK_CONFIG.setString(JobManagerOptions.ADDRESS, "localhost");
FLINK_CONFIG.setInteger(RestOptions.PORT, 8081);
FLINK_CONFIG.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 3);
RestClusterClient<StandaloneClusterId> flinkClient = new RestClusterClient<>(FLINK_CONFIG, StandaloneClusterId.getInstance());
String jar = "/path/to/jar";
String[] args = new String[]{"..."};
PackagedProgram program = PackagedProgram.newBuilder()
.setJarFile(new File(jar))
.setArguments(args)
.build();
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, FLINK_CONFIG, PARALLELISM, false);
JobID jobId = flinkClient.submitJob(jobGraph).get();
...
}
createJobGraph loads the correct jar and invokes the main method. but in main method as it is spring app, i have some context.registerbeans and refresh. Refersh is trying to refresh MS2’s beans as well and failing here. I am not able to visualize how from MS2’s JVM , job will be submitted to jobmanager. Any help will really be appreciated.
Note: same MS1’s jar works fine if i start from flink command line tool.
Flink doesn’t have much examples recording this. i have refered following pages.let me know if any good examples are present.
Does Flink provide Java API to submit jobs to JobManager?
How do I submit a job to a Flink cluster using Java code?