Oracle AQ competing consumer

568 views Asked by At

So I am fairly new to Oracle, certainly no Oracle master

However I have used queues a lot before. I have been told that Oracle AQ supports this feature which seems to fly in the face of everything I know about how queues work, and distribuuted systems work.

But I promised I would look into this, so here it is, I am told that Oracle AQ supports this

  • You can have a number on processes (same code for the process c# say) that all use the same consumer name "CONSUMER_1" say
  • There is a AQ declared in Oracle
  • The apps are spun up and all run
  • They should all read from the Oracle AQ using the same consumer "CONSUMER_1"

All good so far

Scenario 1 But for me, my knowledge of queues and distribuuted computing would say this is how it works, assuming messages M1/M2 exist in Oracle AQ

  • app instance #1 dequeues a message M1 transactionally. No one else can pick up this M1 message until its either committed or aborted
  • app instance #2 dequeues a message m2 transactionally. No one else can pick up this M1 message until its either committed or aborted. But this instance is NOT blocked waiting for app instance #1

This is how I expect it to work, and this is what I seem from Oracle AQ

Scenario 2 But I am told there is a way that to set Oracle up like this

  • app instance #1 dequeues a message M1 transactionally. No one else can pick up this M1 message until its either committed or aborted
  • app instance #2 attempts to dequeue a message m2 transactionally. But this instance IS BLOCKED waiting for app instance #1

This seems crazy to me, as what if you had 1000nds of competing consumers, that means if any one of them crashed or had bad network, or code, the whole AQ is buggered. This cant be right can it.

Is **Scenario 2 even possible to setup in Oracle?

Its certainly not something I can get to work. Just wondering if anyone has any experience of this. For completeness this is my Oracle code

/* declare the custom type */
DROP TYPE ORDERS.AQ_TEST_MSG;

CREATE OR REPLACE TYPE ORDERS."AQ_TEST_MSG" AS OBJECT
(
  ORDER_ID INTEGER,
  ORDER_VERSION INTEGER,
  TABLE_NAME VARCHAR2(256)
)
/




/* create the queue table */
BEGIN
  SYS.DBMS_AQADM.DROP_QUEUE_TABLE
    (QUEUE_TABLE          =>        'ORDERS.AQ_TEST_Q5_TAB');
END;
/


BEGIN
  SYS.DBMS_AQADM.CREATE_QUEUE_TABLE
  (
    QUEUE_TABLE           =>  'ORDERS.AQ_TEST_Q5_TAB'
   ,QUEUE_PAYLOAD_TYPE    =>  'ORDERS.AQ_TEST_MSG'
   ,COMPATIBLE            =>  '8.1'
   ,STORAGE_CLAUSE        =>  'NOCOMPRESS
                               TABLESPACE DSSDATA
                               RESULT_CACHE (MODE DEFAULT)
                               PCTUSED    0
                               PCTFREE    10
                               INITRANS   1
                               MAXTRANS   255
                               STORAGE    (
                                           INITIAL          64K
                                           NEXT             1M
                                           MAXSIZE          UNLIMITED
                                           MINEXTENTS       1
                                           MAXEXTENTS       UNLIMITED
                                           PCTINCREASE      0
                                           BUFFER_POOL      DEFAULT
                                           FLASH_CACHE      DEFAULT
                                           CELL_FLASH_CACHE DEFAULT
                                          )'
   ,SORT_LIST             =>  'ENQ_TIME'
   ,MULTIPLE_CONSUMERS    =>  TRUE
   ,MESSAGE_GROUPING      =>  0
   ,SECURE                =>  FALSE
  );
End;
/


/* create the queue */
BEGIN
  SYS.DBMS_AQADM.CREATE_QUEUE
  (
    QUEUE_NAME          =>   'ORDERS.AQ_TEST_Q5'
   ,QUEUE_TABLE         =>   'ORDERS.AQ_TEST_Q5_TAB'
   ,QUEUE_TYPE          =>   SYS.DBMS_AQADM.NORMAL_QUEUE
   ,MAX_RETRIES         =>   5
   ,RETRY_DELAY         =>   0
   ,RETENTION_TIME      =>   0
   );
END;
/

/* create queue consumer : https://docs.oracle.com/database/121/ARPLS/d_aqadm.htm#ARPLS109 */
DECLARE
  aSubscriber sys.aq$_agent;
BEGIN 
  aSubscriber := sys.aq$_agent('CONSUMER_Q3_1','', 0);
  dbms_aqadm.add_subscriber
     ( queue_name     => 'ORDERS.AQ_TEST_Q5'
      ,subscriber     => aSubscriber);
END;
/

/* start queue */
BEGIN
  SYS.DBMS_AQADM.START_QUEUE
  (
    QUEUE_NAME => 'ORDERS.AQ_TEST_Q5'
   ,ENQUEUE => TRUE 
   ,DEQUEUE => TRUE 
   );
END;
/


/* queue operations */
BEGIN
  SYS.DBMS_AQADM.STOP_QUEUE ( QUEUE_NAME => 'ORDERS.AQ_TEST_Q5');
  SYS.DBMS_AQADM.DROP_QUEUE ( QUEUE_NAME => 'ORDERS.AQ_TEST_Q5');
END;
/

/* enqueue an item */
DECLARE
   enqueue_options     dbms_aq.enqueue_options_t;
   message_properties  dbms_aq.message_properties_t;
   message_handle      RAW(16);
   message             AQ_TEST_MSG;

BEGIN
   message := AQ_TEST_MSG(ORDER_ID => 1, ORDER_VERSION => 1, TABLE_NAME => 'ORDERS.AQ_TEST_Q5_TAB'); 

   dbms_aq.enqueue(queue_name => 'ORDERS.AQ_TEST_Q5',           
         enqueue_options      => enqueue_options,       
         message_properties   => message_properties,     
         payload              => message,               
         msgid                => message_handle);

   COMMIT;   
END; 
/


/* Dequeue from msg_queue: */
DECLARE
   dequeue_options     dbms_aq.dequeue_options_t;
   message_properties  dbms_aq.message_properties_t;
   message_handle      RAW(16);
   message             AQ_TEST_MSG;

BEGIN

   dequeue_options.consumer_name := 'CONSUMER_Q3_1';   
   dequeue_options.visibility := DBMS_AQ.ON_COMMIT;   
   dequeue_options.dequeue_mode := DBMS_AQ.LOCKED;

   DBMS_AQ.DEQUEUE(queue_name => 'ORDERS.AQ_TEST_Q5',
           dequeue_options    => dequeue_options,
           message_properties => message_properties,
           payload            => message,
           msgid              => message_handle);

   DBMS_OUTPUT.PUT_LINE ('Message: ' || message.ORDER_ID ||
                                      ' ... ' || message.ORDER_VERSION );
   COMMIT;
END;

And this is some test C# code (lots of other classes that I cant post, too may, but this is basic idea)

internal partial class Program
{
    public static ILogger Logger = LogManager.GetLogger("Global");


    static void Main(string[] args)
    {
        var p = new Program();
        AsyncContext.Run(() => p.MainAsync(args));
    }

    async Task MainAsync(string[] args)
    {
        var cts = new CancellationTokenSource();
        var token = cts.Token;
        var p = new Program();
        var t1 = Task.Run(() => p.DequeueLoopAsync(token), token);
        await Task.WhenAll(t1);
        Console.ReadLine();
        cts.Cancel();
    }
 

    private async Task DequeueLoopAsync(CancellationToken ct)
    {
        var queueName = "ORDERS.AQ_TEST_Q5";
        var consumer = "CONSUMER_Q5_1";
        var queueDataTypeName = "ORDERS.AQ_TEST_MSG";
        var queueWrapperFactory = new OracleAqQueueWrapperFactory();
        var connectionString = ConfigurationManager.AppSettings[0];


        while (!ct.IsCancellationRequested)
        {
            OracleAQDequeueOptions dequeueOptions = new OracleAQDequeueOptions();
            dequeueOptions.ConsumerName = consumer;
            dequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
            dequeueOptions.DequeueMode = OracleAQDequeueMode.Locked;


            using (IOracleAqQueueWrapper queue = queueWrapperFactory.GetAq(queueName, consumer, queueDataTypeName, connectionString, dequeueOptions))
            {
                IOracleTransactionWrapper transaction = null;
                try
                {
                    transaction = queue.BeginTransaction();
                    OracleAqMessageWrapper message = queue.Dequeue();

                    AQ_TEST_MSG update = message.Payload as AQ_TEST_MSG;
                    Logger.Info($"{update.OrderId} + {update.OrderVersion}");

                    transaction.Commit();

                    Thread.Sleep(3000);
                }
                catch (ApplicationException)
                {
                    transaction?.Rollback();
                }
                catch (Exception e)
                {
                    Logger.Error(e.Message);
                    transaction?.Rollback();

                    throw;
                }
                finally
                {
                    transaction?.Dispose();
                }
            }
        }

    }
}

I would love it if anyone has any leads, even if its to say, no dont be silly Oracle can't do that. As I say I would be suprised if they let such a brittle design slip in there, where 1 consumer could destroy entitre queue for ALL other consumers. That doesnt seem to add up for me

0

There are 0 answers