So I am fairly new to Oracle, certainly no Oracle master
However I have used queues a lot before. I have been told that Oracle AQ supports this feature which seems to fly in the face of everything I know about how queues work, and distribuuted systems work.
But I promised I would look into this, so here it is, I am told that Oracle AQ supports this
- You can have a number on processes (same code for the process c# say) that all use the same consumer name "CONSUMER_1" say
- There is a AQ declared in Oracle
- The apps are spun up and all run
- They should all read from the Oracle AQ using the same consumer "CONSUMER_1"
All good so far
Scenario 1 But for me, my knowledge of queues and distribuuted computing would say this is how it works, assuming messages M1/M2 exist in Oracle AQ
- app instance #1 dequeues a message M1 transactionally. No one else can pick up this M1 message until its either committed or aborted
- app instance #2 dequeues a message m2 transactionally. No one else can pick up this M1 message until its either committed or aborted. But this instance is NOT blocked waiting for app instance #1
This is how I expect it to work, and this is what I seem from Oracle AQ
Scenario 2 But I am told there is a way that to set Oracle up like this
- app instance #1 dequeues a message M1 transactionally. No one else can pick up this M1 message until its either committed or aborted
- app instance #2 attempts to dequeue a message m2 transactionally. But this instance IS BLOCKED waiting for app instance #1
This seems crazy to me, as what if you had 1000nds of competing consumers, that means if any one of them crashed or had bad network, or code, the whole AQ is buggered. This cant be right can it.
Is **Scenario 2 even possible to setup in Oracle?
Its certainly not something I can get to work. Just wondering if anyone has any experience of this. For completeness this is my Oracle code
/* declare the custom type */
DROP TYPE ORDERS.AQ_TEST_MSG;
CREATE OR REPLACE TYPE ORDERS."AQ_TEST_MSG" AS OBJECT
(
ORDER_ID INTEGER,
ORDER_VERSION INTEGER,
TABLE_NAME VARCHAR2(256)
)
/
/* create the queue table */
BEGIN
SYS.DBMS_AQADM.DROP_QUEUE_TABLE
(QUEUE_TABLE => 'ORDERS.AQ_TEST_Q5_TAB');
END;
/
BEGIN
SYS.DBMS_AQADM.CREATE_QUEUE_TABLE
(
QUEUE_TABLE => 'ORDERS.AQ_TEST_Q5_TAB'
,QUEUE_PAYLOAD_TYPE => 'ORDERS.AQ_TEST_MSG'
,COMPATIBLE => '8.1'
,STORAGE_CLAUSE => 'NOCOMPRESS
TABLESPACE DSSDATA
RESULT_CACHE (MODE DEFAULT)
PCTUSED 0
PCTFREE 10
INITRANS 1
MAXTRANS 255
STORAGE (
INITIAL 64K
NEXT 1M
MAXSIZE UNLIMITED
MINEXTENTS 1
MAXEXTENTS UNLIMITED
PCTINCREASE 0
BUFFER_POOL DEFAULT
FLASH_CACHE DEFAULT
CELL_FLASH_CACHE DEFAULT
)'
,SORT_LIST => 'ENQ_TIME'
,MULTIPLE_CONSUMERS => TRUE
,MESSAGE_GROUPING => 0
,SECURE => FALSE
);
End;
/
/* create the queue */
BEGIN
SYS.DBMS_AQADM.CREATE_QUEUE
(
QUEUE_NAME => 'ORDERS.AQ_TEST_Q5'
,QUEUE_TABLE => 'ORDERS.AQ_TEST_Q5_TAB'
,QUEUE_TYPE => SYS.DBMS_AQADM.NORMAL_QUEUE
,MAX_RETRIES => 5
,RETRY_DELAY => 0
,RETENTION_TIME => 0
);
END;
/
/* create queue consumer : https://docs.oracle.com/database/121/ARPLS/d_aqadm.htm#ARPLS109 */
DECLARE
aSubscriber sys.aq$_agent;
BEGIN
aSubscriber := sys.aq$_agent('CONSUMER_Q3_1','', 0);
dbms_aqadm.add_subscriber
( queue_name => 'ORDERS.AQ_TEST_Q5'
,subscriber => aSubscriber);
END;
/
/* start queue */
BEGIN
SYS.DBMS_AQADM.START_QUEUE
(
QUEUE_NAME => 'ORDERS.AQ_TEST_Q5'
,ENQUEUE => TRUE
,DEQUEUE => TRUE
);
END;
/
/* queue operations */
BEGIN
SYS.DBMS_AQADM.STOP_QUEUE ( QUEUE_NAME => 'ORDERS.AQ_TEST_Q5');
SYS.DBMS_AQADM.DROP_QUEUE ( QUEUE_NAME => 'ORDERS.AQ_TEST_Q5');
END;
/
/* enqueue an item */
DECLARE
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message AQ_TEST_MSG;
BEGIN
message := AQ_TEST_MSG(ORDER_ID => 1, ORDER_VERSION => 1, TABLE_NAME => 'ORDERS.AQ_TEST_Q5_TAB');
dbms_aq.enqueue(queue_name => 'ORDERS.AQ_TEST_Q5',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
COMMIT;
END;
/
/* Dequeue from msg_queue: */
DECLARE
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message AQ_TEST_MSG;
BEGIN
dequeue_options.consumer_name := 'CONSUMER_Q3_1';
dequeue_options.visibility := DBMS_AQ.ON_COMMIT;
dequeue_options.dequeue_mode := DBMS_AQ.LOCKED;
DBMS_AQ.DEQUEUE(queue_name => 'ORDERS.AQ_TEST_Q5',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
DBMS_OUTPUT.PUT_LINE ('Message: ' || message.ORDER_ID ||
' ... ' || message.ORDER_VERSION );
COMMIT;
END;
And this is some test C# code (lots of other classes that I cant post, too may, but this is basic idea)
internal partial class Program
{
public static ILogger Logger = LogManager.GetLogger("Global");
static void Main(string[] args)
{
var p = new Program();
AsyncContext.Run(() => p.MainAsync(args));
}
async Task MainAsync(string[] args)
{
var cts = new CancellationTokenSource();
var token = cts.Token;
var p = new Program();
var t1 = Task.Run(() => p.DequeueLoopAsync(token), token);
await Task.WhenAll(t1);
Console.ReadLine();
cts.Cancel();
}
private async Task DequeueLoopAsync(CancellationToken ct)
{
var queueName = "ORDERS.AQ_TEST_Q5";
var consumer = "CONSUMER_Q5_1";
var queueDataTypeName = "ORDERS.AQ_TEST_MSG";
var queueWrapperFactory = new OracleAqQueueWrapperFactory();
var connectionString = ConfigurationManager.AppSettings[0];
while (!ct.IsCancellationRequested)
{
OracleAQDequeueOptions dequeueOptions = new OracleAQDequeueOptions();
dequeueOptions.ConsumerName = consumer;
dequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
dequeueOptions.DequeueMode = OracleAQDequeueMode.Locked;
using (IOracleAqQueueWrapper queue = queueWrapperFactory.GetAq(queueName, consumer, queueDataTypeName, connectionString, dequeueOptions))
{
IOracleTransactionWrapper transaction = null;
try
{
transaction = queue.BeginTransaction();
OracleAqMessageWrapper message = queue.Dequeue();
AQ_TEST_MSG update = message.Payload as AQ_TEST_MSG;
Logger.Info($"{update.OrderId} + {update.OrderVersion}");
transaction.Commit();
Thread.Sleep(3000);
}
catch (ApplicationException)
{
transaction?.Rollback();
}
catch (Exception e)
{
Logger.Error(e.Message);
transaction?.Rollback();
throw;
}
finally
{
transaction?.Dispose();
}
}
}
}
}
I would love it if anyone has any leads, even if its to say, no dont be silly Oracle can't do that. As I say I would be suprised if they let such a brittle design slip in there, where 1 consumer could destroy entitre queue for ALL other consumers. That doesnt seem to add up for me