How to dequeue messages that were enqueued when app was offline (Oracle Advanced Queue)

1.9k views Asked by At

I have 2 question. The following is the scenario -

There are 2 different processes Process A and Process B. Process A enqueue's the message in the message queue. Process B dequeue's the message from the message queue.

1) Process B shuts down for some time but Process A continues to enqueue message in the queue. When Process B comes back live how to dequeue the messages in the message queue that were posted by Process A when Process B was offline?

2) The queue that I am using is multiple consumer queue as there needs to be more than 1 Process B to dequeue the message. The reason behind the design is if one of the process B dies the other process B's can still continue to process the message. At the same time if 1 instance of Process B has picked up a message it should notify other Process B to not process the message.

I coudn't find any samples. Any help is greatly appreciated.

1

There are 1 answers

0
Marcus O. On

I just completed a project with fairly similar requirements.

Problem 1) I created a Windows service timer which invokes a WCF Restful service to run periodically. The WCF service would then dequeue anything enqueued (up to 500 messages for each invocation). Anything Enqueued should be automatically processed in order so even if this timer stopped once it was restarted it would pick up where it left off.

Problem 2) I was replicating data from Oracle to CouchBase so I had a timestamp for retrieval when the process started and a timestamp for already saved data in CouchBase, if the first was older than the latter then it won't save. (This was to take care of race conditions).

In Oracle I also had a trigger which when something was enqueued it would copy the id and enqueued time to a second table. Periodically this second table is checked and if an item has been dequeued within the queue table but the second table has not been updated to reflect this within a certain time frame by the WCF service it will re-enqueue the data as something failed in the process.

In case it is helpful here is an example of the wcf restful service using odp.net.

OracleAQQueue _queueObj;
OracleConnection _connObj;
_connString = ConfigurationManager.ConnectionStrings["connectionstring"].ToString();
_connObj = new OracleConnection(_connString);
_queueObj = new OracleAQQueue("QUEUENAME", _connObj);
_connObj.Open();

  int i = 0;
  bool messageAvailable = true;

  while (messageAvailable && i < 500)
  {
    OracleTransaction _txn = _connObj.BeginTransaction();
    //Makes dequeue part of transaction
    _queueObj.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
    _queueObj.DequeueOptions.ConsumerName = "CONSUMERNAME"
    try
    {
         //Wait  number of seconds for dequeue, default is forever
         _queueObj.DequeueOptions.Wait = 2;
         _queueObj.MessageType = OracleAQMessageType.Raw;
         _queueObj.DequeueOptions.ProviderSpecificType = true;
         OracleAQMessage _depMsq = _queueObj.Dequeue();
         var _binary = (OracleBinary)_depMsq.Payload;
         byte[] byteArray = _binary.Value;
         _txn.Commit();
     }
     catch (Exception ex)
     {
         //This catch will always fire when all messages have been dequeued
         messageAvailable = false;
         if (ex.Message.IndexOf("end-of-fetch during message dequeue") == -1)
            {
             //Actual error present. 
             log.Info("Problem occurred during dequeue process : " + ex.Message);
            }
     }
  }

    _queueObj.Dispose();
    _connObj.Close();
    _connObj.Dispose();
    _connObj = null;