Below is my code
For the server side
public class IgniteMainConfig {
@Value("${kubernetes.namespace:}")
private String kubernetesNamespace;
@Value("${kubernetes.service_name:}")
private String kubernetesServiceName;
public Slf4jLogger igniteLogger() {
return new Slf4jLogger();
public Ignite igniteInstance(IgniteConfiguration igniteConfiguration) {
return Ignition.start(igniteConfiguration);
* This is only used for local development. In the Ignite K8 config which is deployed it uses the
* configuration from the Docker container.
* @throws NoSuchFieldException
public IgniteConfiguration igniteConfiguration(
Optional<IgniteDiscoverySpi> discoverySpi,
@Qualifier("cacheConfiguration") CacheConfiguration<?,?>[] cacheConfigs
) throws NoSuchFieldException {
IgniteConfiguration igniteCfg = new IgniteConfiguration();
//uncomment this code for using persistance cache
/*igniteCfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true) // Enable persistence
/*igniteCfg.setConsistentId("node1");
igniteCfg.setIgniteInstanceName("Ins1");*/
// Add all cache configurations to IgniteConfiguration
igniteCfg.setCacheConfiguration(cacheConfigs);
igniteCfg.setPeerClassLoadingEnabled(true);
igniteCfg.setDeploymentMode(DeploymentMode.CONTINUOUS);
igniteCfg.setGridLogger(logger);
ThinClientConfiguration thinClientCfg = new ThinClientConfiguration()
.setMaxActiveComputeTasksPerConnection(100);
ClientConnectorConfiguration clientConnectorCfg = new ClientConnectorConfiguration()
.setThinClientConfiguration(thinClientCfg);
igniteCfg.setClientConnectorConfiguration(clientConnectorCfg);
discoverySpi.ifPresent(discovery -> igniteCfg.setDiscoverySpi(discovery));
// Ignite ignite = Ignition.start(igniteCfg);
// for (ClusterNode node : ignite.cluster().nodes()) {
// System.out.println("Node IP: " + node.addresses());
//igniteCfg.setClientConnectorConfiguration(new ClientConnectorConfiguration().setPort(10800)); // Adjust the port as needed
//uncomment this code for using persistance cache
/*ignite.cluster().active(true);*/
@ConditionalOnPropertyNotEmpty("kubernetes.namespace")
public IgniteDiscoverySpi discoverySpi(TcpDiscoveryKubernetesIpFinder kubernetesDiscovery) {
var discoverySpi = new TcpDiscoverySpi();
discoverySpi.setIpFinder(kubernetesDiscovery);
@ConditionalOnPropertyNotEmpty("kubernetes.namespace")
public TcpDiscoveryKubernetesIpFinder kubernetesDiscovery() {
var k8sConfig = new KubernetesConnectionConfiguration();
k8sConfig.setNamespace(kubernetesNamespace);
k8sConfig.setServiceName(kubernetesServiceName);
return new TcpDiscoveryKubernetesIpFinder(k8sConfig);
<code>@Slf4j
@RequiredArgsConstructor
@Configuration
public class IgniteMainConfig {
@Value("${kubernetes.namespace:}")
private String kubernetesNamespace;
@Value("${kubernetes.service_name:}")
private String kubernetesServiceName;
@Bean
public Slf4jLogger igniteLogger() {
return new Slf4jLogger();
}
@Bean
public Ignite igniteInstance(IgniteConfiguration igniteConfiguration) {
return Ignition.start(igniteConfiguration);
}
/**
* This is only used for local development. In the Ignite K8 config which is deployed it uses the
* configuration from the Docker container.
*
* @param cacheConfigs
* @return
* @throws NoSuchFieldException
*/
@Bean
public IgniteConfiguration igniteConfiguration(
Slf4jLogger logger,
Optional<IgniteDiscoverySpi> discoverySpi,
@Qualifier("cacheConfiguration") CacheConfiguration<?,?>[] cacheConfigs
) throws NoSuchFieldException {
IgniteConfiguration igniteCfg = new IgniteConfiguration();
//uncomment this code for using persistance cache
/*igniteCfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true) // Enable persistence
));*/
/*igniteCfg.setConsistentId("node1");
igniteCfg.setIgniteInstanceName("Ins1");*/
// Add all cache configurations to IgniteConfiguration
igniteCfg.setCacheConfiguration(cacheConfigs);
igniteCfg.setPeerClassLoadingEnabled(true);
igniteCfg.setDeploymentMode(DeploymentMode.CONTINUOUS);
igniteCfg.setGridLogger(logger);
ThinClientConfiguration thinClientCfg = new ThinClientConfiguration()
.setMaxActiveComputeTasksPerConnection(100);
ClientConnectorConfiguration clientConnectorCfg = new ClientConnectorConfiguration()
.setThinClientConfiguration(thinClientCfg);
igniteCfg.setClientConnectorConfiguration(clientConnectorCfg);
discoverySpi.ifPresent(discovery -> igniteCfg.setDiscoverySpi(discovery));
// Ignite ignite = Ignition.start(igniteCfg);
//
// for (ClusterNode node : ignite.cluster().nodes()) {
// System.out.println("Node IP: " + node.addresses());
// }
//igniteCfg.setClientConnectorConfiguration(new ClientConnectorConfiguration().setPort(10800)); // Adjust the port as needed
//uncomment this code for using persistance cache
/*ignite.cluster().active(true);*/
return igniteCfg;
}
@Bean
@ConditionalOnPropertyNotEmpty("kubernetes.namespace")
public IgniteDiscoverySpi discoverySpi(TcpDiscoveryKubernetesIpFinder kubernetesDiscovery) {
var discoverySpi = new TcpDiscoverySpi();
discoverySpi.setIpFinder(kubernetesDiscovery);
return discoverySpi;
}
@Bean
@ConditionalOnPropertyNotEmpty("kubernetes.namespace")
public TcpDiscoveryKubernetesIpFinder kubernetesDiscovery() {
var k8sConfig = new KubernetesConnectionConfiguration();
k8sConfig.setNamespace(kubernetesNamespace);
k8sConfig.setServiceName(kubernetesServiceName);
return new TcpDiscoveryKubernetesIpFinder(k8sConfig);
}
}
</code>
@Slf4j
@RequiredArgsConstructor
@Configuration
public class IgniteMainConfig {
@Value("${kubernetes.namespace:}")
private String kubernetesNamespace;
@Value("${kubernetes.service_name:}")
private String kubernetesServiceName;
@Bean
public Slf4jLogger igniteLogger() {
return new Slf4jLogger();
}
@Bean
public Ignite igniteInstance(IgniteConfiguration igniteConfiguration) {
return Ignition.start(igniteConfiguration);
}
/**
* This is only used for local development. In the Ignite K8 config which is deployed it uses the
* configuration from the Docker container.
*
* @param cacheConfigs
* @return
* @throws NoSuchFieldException
*/
@Bean
public IgniteConfiguration igniteConfiguration(
Slf4jLogger logger,
Optional<IgniteDiscoverySpi> discoverySpi,
@Qualifier("cacheConfiguration") CacheConfiguration<?,?>[] cacheConfigs
) throws NoSuchFieldException {
IgniteConfiguration igniteCfg = new IgniteConfiguration();
//uncomment this code for using persistance cache
/*igniteCfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true) // Enable persistence
));*/
/*igniteCfg.setConsistentId("node1");
igniteCfg.setIgniteInstanceName("Ins1");*/
// Add all cache configurations to IgniteConfiguration
igniteCfg.setCacheConfiguration(cacheConfigs);
igniteCfg.setPeerClassLoadingEnabled(true);
igniteCfg.setDeploymentMode(DeploymentMode.CONTINUOUS);
igniteCfg.setGridLogger(logger);
ThinClientConfiguration thinClientCfg = new ThinClientConfiguration()
.setMaxActiveComputeTasksPerConnection(100);
ClientConnectorConfiguration clientConnectorCfg = new ClientConnectorConfiguration()
.setThinClientConfiguration(thinClientCfg);
igniteCfg.setClientConnectorConfiguration(clientConnectorCfg);
discoverySpi.ifPresent(discovery -> igniteCfg.setDiscoverySpi(discovery));
// Ignite ignite = Ignition.start(igniteCfg);
//
// for (ClusterNode node : ignite.cluster().nodes()) {
// System.out.println("Node IP: " + node.addresses());
// }
//igniteCfg.setClientConnectorConfiguration(new ClientConnectorConfiguration().setPort(10800)); // Adjust the port as needed
//uncomment this code for using persistance cache
/*ignite.cluster().active(true);*/
return igniteCfg;
}
@Bean
@ConditionalOnPropertyNotEmpty("kubernetes.namespace")
public IgniteDiscoverySpi discoverySpi(TcpDiscoveryKubernetesIpFinder kubernetesDiscovery) {
var discoverySpi = new TcpDiscoverySpi();
discoverySpi.setIpFinder(kubernetesDiscovery);
return discoverySpi;
}
@Bean
@ConditionalOnPropertyNotEmpty("kubernetes.namespace")
public TcpDiscoveryKubernetesIpFinder kubernetesDiscovery() {
var k8sConfig = new KubernetesConnectionConfiguration();
k8sConfig.setNamespace(kubernetesNamespace);
k8sConfig.setServiceName(kubernetesServiceName);
return new TcpDiscoveryKubernetesIpFinder(k8sConfig);
}
}
And I have a service in the same project where I have deployed it locally by below code snippet
<code> Ignite ignite = Ignition.ignite();
ignite.getOrCreateCache(cacheCfg);
ignite.compute().localDeployTask(PersonCacheTask.class, PersonCacheTask.class.getClassLoader());
<code> Ignite ignite = Ignition.ignite();
ignite.getOrCreateCache(cacheCfg);
ignite.compute().localDeployTask(PersonCacheTask.class, PersonCacheTask.class.getClassLoader());
</code>
Ignite ignite = Ignition.ignite();
ignite.getOrCreateCache(cacheCfg);
ignite.compute().localDeployTask(PersonCacheTask.class, PersonCacheTask.class.getClassLoader());
and the Task is defined as below kept at server side
<code>@ComputeTaskName("PersonCacheTask")
public class PersonCacheTask extends ComputeTaskAdapter<List<Integer>, Integer> {
// Map step: Splitting the task into jobs
public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, List<Integer> keys) throws IgniteException {
IgniteCache<Integer, BinaryObject> cache = ignite.cache("personCache").withKeepBinary();
Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
for (Integer key : keys) {
jobs.put(new ComputeJobAdapter() {
public Object execute() {
// Perform any necessary processing
return 1; // Example result
}, subgrid.get(0)); // Assign the job to a cluster node
// Reduce step: Aggregating the results
public Integer reduce(List<ComputeJobResult> results) throws IgniteException {
// Combine results from all jobs
return 1; // Example: Sum the results
<code>@ComputeTaskName("PersonCacheTask")
public class PersonCacheTask extends ComputeTaskAdapter<List<Integer>, Integer> {
@IgniteInstanceResource
private Ignite ignite;
// Map step: Splitting the task into jobs
@Override
public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, List<Integer> keys) throws IgniteException {
IgniteCache<Integer, BinaryObject> cache = ignite.cache("personCache").withKeepBinary();
Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
for (Integer key : keys) {
jobs.put(new ComputeJobAdapter() {
@Override
public Object execute() {
// Perform any necessary processing
return 1; // Example result
}
}, subgrid.get(0)); // Assign the job to a cluster node
}
return jobs;
}
// Reduce step: Aggregating the results
@Override
public Integer reduce(List<ComputeJobResult> results) throws IgniteException {
// Combine results from all jobs
return 1; // Example: Sum the results
}
}
</code>
@ComputeTaskName("PersonCacheTask")
public class PersonCacheTask extends ComputeTaskAdapter<List<Integer>, Integer> {
@IgniteInstanceResource
private Ignite ignite;
// Map step: Splitting the task into jobs
@Override
public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, List<Integer> keys) throws IgniteException {
IgniteCache<Integer, BinaryObject> cache = ignite.cache("personCache").withKeepBinary();
Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
for (Integer key : keys) {
jobs.put(new ComputeJobAdapter() {
@Override
public Object execute() {
// Perform any necessary processing
return 1; // Example result
}
}, subgrid.get(0)); // Assign the job to a cluster node
}
return jobs;
}
// Reduce step: Aggregating the results
@Override
public Integer reduce(List<ComputeJobResult> results) throws IgniteException {
// Combine results from all jobs
return 1; // Example: Sum the results
}
}
Now at the client side
This is how I get the configuration
<code>public static IgniteClient createThinClient() {
ClientConfiguration cfg = new ClientConfiguration()
.setAddresses(Constants.CLUSTER_IP_ADDRESS + ":" + Constants.CLUSTER_THIN_CLIENT_PORT)
.setClusterDiscoveryEnabled(true)
.setPartitionAwarenessEnabled(true)
.setSslMode(SslMode.DISABLED)
return Ignition.startClient(cfg);
public static Ignite createThickClient() {
IgniteConfiguration cfg = new IgniteConfiguration()
.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(new TcpDiscoveryVmIpFinder().setAddresses(Collections.singleton(Constants.CLUSTER_IP_ADDRESS + ":" + Constants.CLUSTER_DISCOVERY_PORT))))
.setCommunicationSpi(new TcpCommunicationSpi().setForceClientToServerConnections(false))
.setPeerClassLoadingEnabled(true)
.setDeploymentMode(DeploymentMode.CONTINUOUS)
Ignition.setClientMode(true);
return Ignition.start(cfg);
<code>public static IgniteClient createThinClient() {
ClientConfiguration cfg = new ClientConfiguration()
.setAddresses(Constants.CLUSTER_IP_ADDRESS + ":" + Constants.CLUSTER_THIN_CLIENT_PORT)
.setClusterDiscoveryEnabled(true)
.setPartitionAwarenessEnabled(true)
.setSslMode(SslMode.DISABLED)
.setTimeout(10000);
return Ignition.startClient(cfg);
}
public static Ignite createThickClient() {
IgniteConfiguration cfg = new IgniteConfiguration()
.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(new TcpDiscoveryVmIpFinder().setAddresses(Collections.singleton(Constants.CLUSTER_IP_ADDRESS + ":" + Constants.CLUSTER_DISCOVERY_PORT))))
.setCommunicationSpi(new TcpCommunicationSpi().setForceClientToServerConnections(false))
.setPeerClassLoadingEnabled(true)
.setDeploymentMode(DeploymentMode.CONTINUOUS)
.setClientMode(true);
Ignition.setClientMode(true);
return Ignition.start(cfg);
}
</code>
public static IgniteClient createThinClient() {
ClientConfiguration cfg = new ClientConfiguration()
.setAddresses(Constants.CLUSTER_IP_ADDRESS + ":" + Constants.CLUSTER_THIN_CLIENT_PORT)
.setClusterDiscoveryEnabled(true)
.setPartitionAwarenessEnabled(true)
.setSslMode(SslMode.DISABLED)
.setTimeout(10000);
return Ignition.startClient(cfg);
}
public static Ignite createThickClient() {
IgniteConfiguration cfg = new IgniteConfiguration()
.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(new TcpDiscoveryVmIpFinder().setAddresses(Collections.singleton(Constants.CLUSTER_IP_ADDRESS + ":" + Constants.CLUSTER_DISCOVERY_PORT))))
.setCommunicationSpi(new TcpCommunicationSpi().setForceClientToServerConnections(false))
.setPeerClassLoadingEnabled(true)
.setDeploymentMode(DeploymentMode.CONTINUOUS)
.setClientMode(true);
Ignition.setClientMode(true);
return Ignition.start(cfg);
}
And below is how I execute the code from the client side
Via thick client
<code>IgniteCompute compute = client.compute(client.cluster().forRemotes());
List<Integer> keys = Arrays.asList(1, 2, 3, 4, 5);
Integer result = client.compute().execute(PersonCacheTask.class, keys);
System.out.println("Task result: " + result);
<code>IgniteCompute compute = client.compute(client.cluster().forRemotes());
List<Integer> keys = Arrays.asList(1, 2, 3, 4, 5);
Integer result = client.compute().execute(PersonCacheTask.class, keys);
System.out.println("Task result: " + result);
</code>
IgniteCompute compute = client.compute(client.cluster().forRemotes());
List<Integer> keys = Arrays.asList(1, 2, 3, 4, 5);
Integer result = client.compute().execute(PersonCacheTask.class, keys);
System.out.println("Task result: " + result);
Via thin client
<code>List<Integer> keys = Arrays.asList(1, 2, 3, 4, 5);
Integer result = client.compute().execute(PersonCacheTask.class.getName(), keys);
System.out.println("Task result: " + result);
<code>List<Integer> keys = Arrays.asList(1, 2, 3, 4, 5);
Integer result = client.compute().execute(PersonCacheTask.class.getName(), keys);
System.out.println("Task result: " + result);
</code>
List<Integer> keys = Arrays.asList(1, 2, 3, 4, 5);
Integer result = client.compute().execute(PersonCacheTask.class.getName(), keys);
System.out.println("Task result: " + result);
I get this log on the server side
2024-12-26 12:45:03.773 INFO 9028 — [ main] o.a.i.i.m.d.GridDeploymentLocalStore : Task locally deployed: class packageName.PersonCacheTask
But when I try to execute via thin client I get below logs
Caused by: org.apache.ignite.internal.client.thin.ClientServerError: Ignite failed to process request [4]: Unknown task name or failed to auto-deploy task (was task (re|un)deployed?): packageName.PersonCacheTask (server status code [1])
Also below logs
Ignite failed to process request [4]: Failed to map task jobs to nodes due to undeclared user exception [cause=null, ses=GridTaskSessionImpl [taskName=PersonCacheTask, dep=GridDeployment [ts=1735197303767, depMode=CONTINUOUS, clsLdr=jdk.internal.loader.ClassLoaders$AppClassLoader@368239c8, clsLdrId=72881d10491-ca4b8bdc-e6ff-47ea-949b-cf8b2011936e, userVer=0, loc=true, sampleClsName=packageName.PersonCacheTask, pendingUndeploy=false, undeployed=false, usage=1], taskClsName=package.PersonCacheTask, sesId=59e81d10491-ca4b8bdc-e6ff-47ea-949b-cf8b2011936e, startTime=1735198790734, endTime=9223372036854775807, taskNodeId=ca4b8bdc-e6ff-47ea-949b-cf8b2011936e, clsLdr=jdk.internal.loader.ClassLoaders$AppClassLoader@368239c8, closed=false, cpSpi=null, failSpi=null, loadSpi=null, usage=1, fullSup=false, internal=false, topPred=org.apache.ignite.internal.processors.platform.client.compute.ClientComputeTask$$Lambda$1452/0x00000008009dfc40@3a8d6b59, mapFut=IgniteFuture [orig=GridFutureAdapter [ignoreInterrupts=false, state=INIT, res=null, hash=1457235860]], execName=null, secCtx=null]] (server status code [1])