How to enqueue a JMS message into Oracle AQ using Java

18.5k views Asked by At

I have an Oracle AQ with the queue type of SYS.AQ$_JMS_TEXT_MESSAGE. What I'm trying to do is to insert a text into the mentioned queue from a java application.

The equivalent SQL query is

declare
 r_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
 r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
 v_message_handle     RAW(16);
 o_payload            SYS.AQ$_JMS_TEXT_MESSAGE;
begin
 o_payload := sys.aq$_jms_text_message.construct;
 o_payload.set_text(xmltype('<user>text</user>').getClobVal());
 sys.dbms_aq.enqueue (
   queue_name         => 'QUEUE_NAME',
   enqueue_options    => r_enqueue_options,
   message_properties => r_message_properties,
   payload            => o_payload,
   msgid              => v_message_handle
 );
 commit;
end;
/

I got most of it right using this guide, but I'm stuck at

 o_payload := sys.aq$_jms_text_message.construct;
 o_payload.set_text(xmltype('<user>text</user>').getClobVal());

The guide shows how to enqueue a RAW message, but I need it to be JMS, otherwise the data type doesn't match the queue type.

Any help would be appreciated, because even with the almighty google I am not able to find a solution to this problem. Is there a way to do it using the oracle.jdbc.aq classes, or do I just have to suck it up and use the SQL query?

2

There are 2 answers

0
Chathura Kulasinghe On BEST ANSWER

Just copy paste this code and try. (if it works for you) Then carefully go through the code, and understand.

While executing,

  • First uncomment the 'createQueue()' line in the main method.

after that,

  • Comment it and uncomment 'sendMessage()' line and try sending your message.

Then comment/uncomment each line respectively and give a try.

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.jms.TextMessage;

import oracle.AQ.AQQueueTable;
import oracle.AQ.AQQueueTableProperty;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsDestinationProperty;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;

public class OracleAQClient {

public static QueueConnection getConnection() {

    String hostname = "localhost";
    String oracle_sid = "xe";
    int portno = 1521;
    String userName = "jmsuser";
    String password = "jmsuser";
    String driver = "thin";
    QueueConnectionFactory QFac = null;
    QueueConnection QCon = null;
    try {
        // get connection factory , not going through JNDI here
        QFac = AQjmsFactory.getQueueConnectionFactory(hostname, oracle_sid, portno, driver);
        // create connection
        QCon = QFac.createQueueConnection(userName, password);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return QCon;
}

public static void createQueue(String user, String qTable, String queueName) {
    try {
        /* Create Queue Tables */
        System.out.println("Creating Queue Table...");
        QueueConnection QCon = getConnection();
        Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

        AQQueueTableProperty qt_prop;
        AQQueueTable q_table = null;
        AQjmsDestinationProperty dest_prop;
        Queue queue = null;
        qt_prop = new AQQueueTableProperty("SYS.AQ$_JMS_TEXT_MESSAGE");

        q_table = ((AQjmsSession) session).createQueueTable(user, qTable, qt_prop);

        System.out.println("Qtable created");
        dest_prop = new AQjmsDestinationProperty();
        /* create a queue */
        queue = ((AQjmsSession) session).createQueue(q_table, queueName, dest_prop);
        System.out.println("Queue created");
        /* start the queue */
        ((AQjmsDestination) queue).start(session, true, true);

    } catch (Exception e) {
        e.printStackTrace();
        return;
    }
}

public static void sendMessage(String user, String queueName,String message) {

    try {
        QueueConnection QCon = getConnection();
        Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
        QCon.start();
        Queue queue = ((AQjmsSession) session).getQueue(user, queueName);
        MessageProducer producer = session.createProducer(queue);
        TextMessage tMsg = session.createTextMessage(message);

        //set properties to msg since axis2 needs this parameters to find the operation
        tMsg.setStringProperty("SOAPAction", "getQuote");
        producer.send(tMsg);
        System.out.println("Sent message = " + tMsg.getText());

        session.close();
        producer.close();
        QCon.close();

    } catch (JMSException e) {
        e.printStackTrace();
        return;
    }
}

public static void browseMessage(String user, String queueName) {
    Queue queue;
    try {
        QueueConnection QCon = getConnection();
        Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

        QCon.start();
        queue = ((AQjmsSession) session).getQueue(user, queueName);
        QueueBrowser browser = session.createBrowser(queue);
        Enumeration enu = browser.getEnumeration();
        List list = new ArrayList();
        while (enu.hasMoreElements()) {
            TextMessage message = (TextMessage) enu.nextElement();
            list.add(message.getText());
        }
        for (int i = 0; i < list.size(); i++) {
            System.out.println("Browsed msg " + list.get(i));
        }
        browser.close();
        session.close();
        QCon.close();

    } catch (JMSException e) {
        e.printStackTrace();
    }

}

public static void consumeMessage(String user, String queueName) {
    Queue queue;
    try {
        QueueConnection QCon = getConnection();
        Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
        QCon.start();
        queue = ((AQjmsSession) session).getQueue(user, queueName);
        MessageConsumer consumer = session.createConsumer(queue);
        TextMessage msg = (TextMessage) consumer.receive();
        System.out.println("MESSAGE RECEIVED " + msg.getText());

        consumer.close();
        session.close();
        QCon.close();
    } catch (JMSException e) {
        e.printStackTrace();
    }
}

public static void main(String args[]) {
    String userName = "jmsuser";
    String queue = "sample_aq";
    String qTable = "sample_aqtbl";
    //createQueue(userName, qTable, queue);
    //sendMessage(userName, queue,"<user>text</user>");
    //browseMessage(userName, queue);
    //consumeMessage(userName, queue);
}

}

You will need to copy these jars/libs to your java project from your oracle DB setup directory

  1. ojdbc6.jar
  2. jta.jar
  3. jmscommon.jar
  4. aqapi.jar

The credits should go to Ratha for this article[1]. There were few stuff to be amended, I just modified those and provided the code.

[1] http://wso2.com/library/tutorials/2011/11/configuring-wso2-esb-with-oracle-as-messaging-media/

Thanks

0
Steve S. On

I will add some tidbits to the answer of @Chathura Kulasinghe.

First, in the consumeMessage method, using the

Session.CLIENT_ACKNOWLEDGE

parameter for creating the session object will have the effect of leaving the message you consume in the queue. If you run this program many times, you will see the number of message going up in the database table of the queue. To remove a message, you need to « acknowledge » it by calling this method on the message object:

msg.acknowledge();

Second, if you want the session do this for you, simply change the client acknowledge mode to :

Session.AUTO_ACKNOWLEDGE

With this parameter, everytime your consumer.receive() is call, it's auto acknowledge and so, removed from the queue.