I am trying to read messages from an Oracle Queue that has custom ADT payload (not simple text messaging) using a simple spring cloud stream project. I am having difficulties doing this with spring, but I was able to do it using simple java, so the problem should not be on Oracle part.
The error that I am getting is:
oracle.jakarta.jms.AQjmsException: JMS-137: Payload factory must be specified for destinations with ADT payloads
at oracle.jakarta.jms.AQjmsError.throwEx(AQjmsError.java:317) ~[aqapi-jakarta-23.3.1.0.jar:na]
at oracle.jakarta.jms.AQjmsConsumer.<init>(AQjmsConsumer.java:482) ~[aqapi-jakarta-23.3.1.0.jar:na]
at oracle.jakarta.jms.AQjmsConsumer.<init>(AQjmsConsumer.java:338) ~[aqapi-jakarta-23.3.1.0.jar:na]
at oracle.jakarta.jms.AQjmsSession.createConsumer(AQjmsSession.java:8974) ~[aqapi-jakarta-23.3.1.0.jar:na]
at oracle.jakarta.jms.AQjmsSession.createConsumer(AQjmsSession.java:8863) ~[aqapi-jakarta-23.3.1.0.jar:na]
at org.springframework.jms.listener.AbstractMessageListenerContainer.createConsumer(AbstractMessageListenerContainer.java:930) ~[spring-jms-6.1.13.jar:6.1.13]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:225) ~[spring-jms-6.1.13.jar:6.1.13]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1290) ~[spring-jms-6.1.13.jar:6.1.13]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1256) ~[spring-jms-6.1.13.jar:6.1.13]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1247) ~[spring-jms-6.1.13.jar:6.1.13]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1140) ~[spring-jms-6.1.13.jar:6.1.13]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
(I placed logging.level.org.springframework.jms=TRACE in application.properties)
The queue name is container_queue, the database user is jmsuser and the database object type name is message_container
Here is the spring-cloud-stream project:
build.gradle
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.boot:spring-boot-starter-activemq'
implementation 'jakarta.jms:jakarta.jms-api:3.0.0'
implementation 'com.oracle.database.messaging:aqapi-jakarta:23.3.1.0'
implementation 'com.oracle.database.jdbc:ojdbc11:23.3.0.23.09'
implementation 'com.oracle.database.jdbc:ucp:23.3.0.23.09'
MessageContainer.java
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Struct;
import java.sql.Timestamp;
import oracle.jdbc.OracleTypes;
import oracle.sql.Datum;
import oracle.sql.ORAData;
import oracle.sql.ORADataFactory;
public class MessageContainer implements ORAData, ORADataFactory {
String primaryId;
String businessUnitCode;
Timestamp scanTimestamp;
public static final String SQL_NAME = "jmsuser.MESSAGE_CONTAINER";
public static final int SQL_TYPECODE = OracleTypes.STRUCT;
protected static final MessageContainer MESSAGECONTAINERFactory = new MessageContainer();
public MessageContainer() {
}
public MessageContainer(String primaryId, String businessUnitCode, Timestamp scanTimestamp) {
this.primaryId = primaryId;
this.businessUnitCode = businessUnitCode;
this.scanTimestamp = scanTimestamp;
}
public static ORADataFactory getORADataFactory() {
return MESSAGECONTAINERFactory;
}
public ORAData create(Datum d, int sqlType) throws SQLException {
if (d == null) {
return null;
}
if (!(d instanceof Struct)) {
throw new SQLException("Expected Struct type Datum but found " + d.getClass());
}
Struct struct = (Struct) d;
Object[] attr = struct.getAttributes();
return new MessageContainer((String) attr[0], (String) attr[1], (Timestamp) attr[2]);
}
@Override
public Datum toDatum(Connection c) throws SQLException {
Object[] attributes = { primaryId, businessUnitCode, scanTimestamp };
Struct struct = c.createStruct(SQL_NAME, attributes);
return (Datum) struct;
}
public String getPrimaryId() {
return primaryId;
}
public void setPrimaryId(String primaryId) {
this.primaryId = primaryId;
}
public String getBusinessUnitCode() {
return businessUnitCode;
}
public void setBusinessUnitCode(String businessUnitCode) {
this.businessUnitCode = businessUnitCode;
}
public Timestamp getScanTimestamp() {
return scanTimestamp;
}
public void setScanTimestamp(Timestamp scanTimestamp) {
this.scanTimestamp = scanTimestamp;
}
}
OracleAQListener.java
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import jakarta.jms.Message;
import oracle.jakarta.jms.AQjmsAdtMessage;
@Component
public class OracleAQListener {
@JmsListener(destination = "container_queue", containerFactory = "jmsListenerContainerFactory")
public void receiveMessage(Message message) {
try {
if (message instanceof AQjmsAdtMessage adtMessage) {
MessageContainer msg = (MessageContainer) adtMessage.getAdtPayload();
System.out.println("Received a message! ID: " + msg.getPrimaryId() + " BU: " + msg.getBusinessUnitCode() + " date: " + msg.getScanTimestamp());
} else {
System.out.println("Received unknown message type: " + message.getClass().getName());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
OracleAQConfig.java (most probably here I need to change, but I don’t know how)
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import oracle.jakarta.jms.AQjmsFactory;
import oracle.jdbc.pool.OracleDataSource;
@Configuration
public class OracleAQConfig {
@Bean
public ConnectionFactory connectionFactory() throws JMSException, SQLException {
Properties props = new Properties();
props.setProperty("oracle.jms.mapMessage", "true");
OracleDataSource dataSource = new OracleDataSource();
dataSource.setURL("jdbc:oracle:thin:@//dbaddress.com:1521/MYSID");
dataSource.setUser("jmsuser");
dataSource.setPassword("jmspassword");
dataSource.setConnectionProperties(props);
Connection conn = null;
try {
conn = dataSource.getConnection();
Map<String, Class<?>> typeMap = conn.getTypeMap();
typeMap.put(MessageContainer.SQL_NAME, MessageContainer.class);
conn.setTypeMap(typeMap);
} catch (Exception e) {
throw new RuntimeException("Error setting up custom ADT factory", e);
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
return AQjmsFactory.getConnectionFactory(dataSource);
}
}
In the Oracle documentation they state about this error:
JMS-137 Payload factory must be specified for destinations with ADT payloads
Cause: CustomDatumFactory was not specified for destinations containing ADT payloads
Action: For destinations containing ADT messages, a CustomDatumFactory for a java class that maps to the SQL ADT type of the destination must be specified
But I have no idea how to create CustomDatumFactory. I found no relevant example/documentation on how to do it.
Any advice would be very helpful.
Thank you!