Advanced Queuing Technology Support

Advanced Queuing (AQ) is a flexible message exchange mechanism built in Oracle server. With AQ you can send messages from a workstation or server to one or more workstations. dotConnect for Oracle incorporates set of classes that leverage working with the technology.

This feature is available in Professional and Developer Editions only.

This topic provides information and examples that can help you start with AQ. It consists of the following sections:

AQ Basics

Advanced Queuing provides database-integrated message queuing functionality. Advanced Queue messages can be stored persistently, propagated between queues on different machines and databases, and transmitted using Oracle Net Services, HTTP(S), and SMTP. Since Oracle Advanced Queuing is implemented in database tables, all the benefits of high availability, scalability, and reliability are applicable to queue data.

Standard database features such as recovery, restart, and security are supported in Advanced Queuing, and queue tables can be imported and exported. Advanced Queuing provides the message management functionality and asynchronous communication needed for application integration.

Advanced Queuing is a complicated area, so it is recommended that you read the official documentation for better understanding of AQ aspects and capabilities.

Advanced Queuing sends and receives messages in two ways:

  • Point-to-Point
  • Publish-Subscribe

This article covers both ways.

You can also inspect the Queue sample in the WinForms demo pack.

A Simple Example of Point-to-Point Messaging

The following example demonstrates how to use Advanced Queueing in its simplest form. It represents a point-to-point messaging scheme where messages are deleted from server once they are consumed by the recipient. The example performs exactly the following:

  1. Prepares a queue
  2. Sends a message
  3. Receives the message
  4. Destroys the query
[C#]
// Create a connection. The AQ types are not supported in Direct mode, so we disable it
//in the connection string. 
OracleConnection oracleConnection = new OracleConnection(
   "User Id=system;Password=manager;Server=ora;Direct=False;");
oracleConnection.Open();

// Create a user-defined type to use in the queues. All queue messages will be 
//structured as this UDT.
OracleCommand oracleCommand = new OracleCommand(
   "CREATE OR REPLACE TYPE message AS OBJECT (nickname VARCHAR2(15), " +
       "mestext VARCHAR2(80));", oracleConnection);
oracleCommand.ExecuteNonQuery();

// Create a new queue table with name "QUEUE_TABLE_MESSAGE". 
//This table will store all messages of this queue.
OracleQueueTable oracleQueueTable = new OracleQueueTable(
   "QUEUE_TABLE_MESSAGE", oracleConnection);

// Specify type of the messages in the queue. This is the UDT we created before.
oracleQueueTable.Options.PayloadTypeName = "message";

// Create the table "QUEUE_TABLE_MESSAGE" at the database.
oracleQueueTable.CreateQueueTable();

// Create a new queue administration object and bind it to the 
//"QUEUE_TABLE_MESSAGE" table.
// This object will control the queue lifecycle. The queue name is 
//set to "MESSAGE_QUEUE".
OracleQueueAdmin oracleQueueAdmin = new OracleQueueAdmin(
   "MESSAGE_QUEUE", "QUEUE_TABLE_MESSAGE", oracleConnection);

// Create the queue itself in the associated table "QUEUE_TABLE_MESSAGE".
oracleQueueAdmin.CreateQueue();

// Start the queue. From now on it is ready to work.
oracleQueueAdmin.StartQueue();

// Create an object that sends messages to the queue named "MESSAGE_QUEUE".
OracleQueue oracleEnqueueQueue = new OracleQueue("MESSAGE_QUEUE", oracleConnection);

// Create an object that represents the queue message
OracleObject mes = new OracleObject("message", oracleConnection);
mes["nickname"] = oracleConnection .UserId;
mes["mestext"] = "Hello, world!";

// Send the message to the database queue
oracleEnqueueQueue.Enqueue(mes);

// Create an object that receives messages from the queue named "MESSAGE_QUEUE".
OracleQueue oracleDequeueQueue = new OracleQueue("MESSAGE_QUEUE", oracleConnection);

// Get the message from the queue.
// By default the Dequeue() method does not return until it successfully receives a message.
// The behavior will be explained in details later.
OracleQueueMessage msg = oracleDequeueQueue.Dequeue();
if (msg != null && msg.ObjectPayload != null) {
  Console.WriteLine(msg.ObjectPayload["nickname"]);
  Console.WriteLine(msg.ObjectPayload["mestext"]);
}

// Stop the queue.
oracleQueueAdmin.StopQueue();

// Delete the queue.
oracleQueueAdmin.DropQueue();

// Drop the associated table.
oracleQueueTable.DropQueueTable();
oracleConnection.Close();
[Visual Basic]
'Create a connection. The AQ types are not supported in Direct mode, so we 
'disable it in the connection string.
Dim oracleConnection As New OracleConnection( _
"User Id=system;Password=manager;Server=ora;Direct=False;")
oracleConnection.Open()

'Create a user-defined type to use in the queues.  
'All queue messages will be structured as this UDT.
Dim oracleCommand As New OracleCommand(
   "CREATE OR REPLACE TYPE message AS OBJECT (nickname VARCHAR2(15), & _
         mestext VARCHAR2(80));", oracleConnection)
oracleCommand.ExecuteNonQuery()

'Create a new queue table with name "QUEUE_TABLE_MESSAGE". 
'This table will store all messages of this queue.
Dim oracleQueueTable As New OracleQueueTable("QUEUE_TABLE_MESSAGE", oracleConnection)

'Specify type of the messages in the queue. This is the UDT we created before.
oracleQueueTable.Options.PayloadTypeName = "message"

'Create the table "QUEUE_TABLE_MESSAGE" at the database.
oracleQueueTable.CreateQueueTable()

'Create a new queue administration object and bind it to the "QUEUE_TABLE_MESSAGE" table.
'This object will control the queue lifecycle. The queue name is set to "MESSAGE_QUEUE".
Dim oracleQueueAdmin As New OracleQueueAdmin("MESSAGE_QUEUE", _
"QUEUE_TABLE_MESSAGE", oracleConnection)

'Create the queue itself in the associated table "QUEUE_TABLE_MESSAGE".
oracleQueueAdmin.CreateQueue()

'Start the queue. From now on it is ready to work.
oracleQueueAdmin.StartQueue()

'Create an object that sends messages to the queue named "MESSAGE_QUEUE".
Dim oracleEnqueueQueue As New OracleQueue("MESSAGE_QUEUE", oracleConnection)

'Create an object that represents the queue message
Dim mes As New OracleObject("message", oracleConnection)
mes.Item("nickname") = oracleConnection.UserId
mes.Item("mestext") = "Hello, world!"

'Send the message to the database queue
oracleEnqueueQueue.Enqueue(mes)

'Create an object that receives messages from the queue named "MESSAGE_QUEUE".
Dim oracleDequeueQueue As New OracleQueue("MESSAGE_QUEUE", oracleConnection)

'Get the message from the queue.
'By default the Dequeue() method does not return until it successfully receives a message.
'The behavior will be explained in details later.
Dim msg As OracleQueueMessage = oracleDequeueQueue.Dequeue()
If ((Not msg Is Nothing) AndAlso (Not msg.ObjectPayload Is Nothing)) Then
  Console.WriteLine(msg.ObjectPayload.Item("nickname"))
  Console.WriteLine(msg.ObjectPayload.Item("mestext"))
End If

'Stop the queue.
oracleQueueAdmin.StopQueue()

'Delete the queue.
oracleQueueAdmin.DropQueue()

'Drop the associated table.
oracleQueueTable.DropQueueTable()
oracleConnection.Close()

There are several ways how the OracleQueue.Dequeue() method behaves. It depends on the value of OracleQueueDequeueOptions.WaitTimeout property. When this value is -1 (by default), the Dequeue() method waits for a message to come, and does not return until the message arrives. When the value is 0, the Dequeue() method checks for a message and returns immediately. If there is no message, an exception is thrown. When the value is a positive integer, it designates a number of seconds to wait for a message. If no message is received during this timeout, an exception is thrown.

In the end of the sample we stop and destroy the queue together with its table. In the real world this may be not necessary, as the queue will continue to work after the application that created it is shut down. In the sample we remove the queue to be able to recreate it next time we run a sample from this article.

An Extended Example of Point-to-Point Messaging

The next example slightly alters the code from the previous section. First, to avoid creation of user-defined types for messaging, the payload type is changed to RAW. Second, a priority is assigned to one of the messages sent through the AQ mechanism. Third, messages are received in a more real-world way, with loops and finite timeouts.

[C#]
OracleConnection oracleConnection = new OracleConnection(
   "User Id=system;Password=manager;Server=ora;Direct=False;");
oracleConnection.Open();

OracleQueueTable oracleQueueTable = new OracleQueueTable(
   "QUEUE_TABLE_MESSAGE", oracleConnection);

// Set sort order by priority for the queue. The messages 
// with higher priority will reach the recipient first.
oracleQueueTable.Options.SortOrder = OracleQueueSortOrder.PriorityEnqueueTime;

// Specify type of the messages in the queue. This time we use a simpler way to communicate.
// Each message will be represented by just a string object.
oracleQueueTable.Options.PayloadTypeName = "RAW";

// The following operations are same as in the previous example.
oracleQueueTable.CreateQueueTable();
OracleQueueAdmin oracleQueueAdmin = new OracleQueueAdmin("MESSAGE_QUEUE", 
"QUEUE_TABLE_MESSAGE", oracleConnection);
oracleQueueAdmin.CreateQueue();
oracleQueueAdmin.StartQueue();
OracleQueue oracleEnqueueQueue = new OracleQueue("MESSAGE_QUEUE", oracleConnection);

// Create and send the first message.
OracleQueueMessage message1 = new OracleQueueMessage();
message1.StringPayload = "First message.";
message1.MessageProperties.Priority = 7;
oracleEnqueueQueue.Enqueue(message1);

// Create and send the second message. This message is assigned a higher priority value.
// The message will be consumed first, regardless of the fact that it was sent later.
OracleQueueMessage message2 = new OracleQueueMessage();
message2.StringPayload = "Second message with high priority.";
message2.MessageProperties.Priority = 1;
oracleEnqueueQueue.Enqueue(message2);

// Create an object that receives the two messages.
OracleQueue oracleDequeueQueue = new OracleQueue("MESSAGE_QUEUE", oracleConnection);
oracleDequeueQueue.DequeueOptions.WaitTimeout = 1;

// Retrieve the messages in a loop. Once there are two messages received, quit.
int messageCount = 0;
while (messageCount < 2) {
  try {
    OracleQueueMessage msg = oracleDequeueQueue.Dequeue();
    messageCount++;
    if (msg != null && msg.StringPayload != null) {
      Console.WriteLine(msg.StringPayload);
    }
  }
  catch(OracleException ex) {
    if (ex.Code == 25228) {
      continue;
    }
    else
      throw ex;
  }
}

oracleQueueAdmin.StopQueue();
oracleQueueAdmin.DropQueue();
oracleQueueTable.DropQueueTable();
oracleConnection.Close();
[Visual Basic]
Dim oracleConnection As New OracleConnection((
"User Id=system;Password=manager;Server=ora;Direct=False;")
oracleConnection.Open()

Dim oracleQueueTable As New OracleQueueTable("QUEUE_TABLE_MESSAGE", oracleConnection)

' Set sort order by priority for the queue. 
'The messages with higher priority will reach the recipient first.
oracleQueueTable.Options.SortOrder = OracleQueueSortOrder.PriorityEnqueueTime

' Specify type of the messages in the queue. This time we use a simpler way to communicate.
' Each message will be represented by just a string object.
oracleQueueTable.Options.PayloadTypeName = "RAW"

' The following operations are same as in the previous example.
oracleQueueTable.CreateQueueTable()
Dim oracleQueueAdmin As New OracleQueueAdmin("MESSAGE_QUEUE", _
   "QUEUE_TABLE_MESSAGE", oracleConnection)
oracleQueueAdmin.CreateQueue()
oracleQueueAdmin.StartQueue()
Dim oracleEnqueueQueue As New OracleQueue("MESSAGE_QUEUE", oracleConnection)

' Create and send the first message.
Dim message1 As New OracleQueueMessage
message1.StringPayload = "First message."
message1.MessageProperties.Priority = 1
oracleEnqueueQueue.Enqueue(message1)

' Create and send the second message. This message is assigned a higher priority value.
' The message will be consumed first, regardless of the fact that it was sent later.
Dim message2 As New OracleQueueMessage
message2.StringPayload = "Second message with high priority."
message2.MessageProperties.Priority = 7
oracleEnqueueQueue.Enqueue(message2)

' Create an object that receives the two messages.
Dim oracleDequeueQueue As New OracleQueue("MESSAGE_QUEUE", oracleConnection)
oracleDequeueQueue.DequeueOptions.WaitTimeout = 1

' Retrieve the messages in a loop. Once there are two messages received, quit.
Dim messageCount As Integer = 0
Do While (messageCount < 2)
  Try
    Dim msg As OracleQueueMessage = oracleDequeueQueue.Dequeue()
    messageCount += 1
    if ((Not msg Is Nothing) AndAlso (Not msg.StringPayload Is Nothing)) Then
      Console.WriteLine(msg.StringPayload)
    End If
  Catch ex As OracleException
    If (ex.Code = 25228) Then
      Continue Do
    Else
      Throw ex
    End If
  End Try
Loop

oracleQueueAdmin.StopQueue()
oracleQueueAdmin.DropQueue()
oracleQueueTable.DropQueueTable()
oracleConnection.Close()

A Simple Example of Point-to-Multipoint Messaging

In the point-to-multipoint scheme one publisher generates messages for several subscribers. The publisher may specify who exactly will receive the message. By default messages do not expire, and they are deleted either when dequeued in Remove mode, or when the table is destroyed. You can assign a lifetime to every message when sending it.

The following example illustrates the point-to-multipoint scheme within a single application (though usually consumers are quite different applications). The example demonstrates another dequeue principle: every consumer fires an event when a new message is available.

[C#]
void TestAQ() {
  OracleConnection oracleConnection = new OracleConnection(
     "User Id=system;Password=manager;Server=ora;Direct=False;");
  oracleConnection.Open();
  OracleQueueTable oracleQueueTable = new OracleQueueTable(
     "QUEUE_TABLE_MESSAGE", oracleConnection);
  // Set the table to be multiconsumer.
  oracleQueueTable.Options.MultipleConsumers = true;
  oracleQueueTable.Options.PayloadTypeName = "RAW";
  oracleQueueTable.CreateQueueTable();
  OracleQueueAdmin oracleQueueAdmin = new OracleQueueAdmin(
     "MESSAGE_QUEUE", "QUEUE_TABLE_MESSAGE", oracleConnection);
  oracleQueueAdmin.CreateQueue();
  oracleQueueAdmin.StartQueue();

  // Register Bob and Scott as known recipients.
  // This is required to send messages to Bob or Scott personally.
  oracleQueueAdmin.AddSubscriber(new OracleQueueAgent("Bob"));
  oracleQueueAdmin.AddSubscriber(new OracleQueueAgent("Scott"));

  OracleQueue oracleEnqueueQueue = new OracleQueue("MESSAGE_QUEUE", oracleConnection);

  // This message will be delivered to Bob only.
  OracleQueueMessage message1 = new OracleQueueMessage();
  message1.StringPayload = "Message for Bob.";
  message1.MessageProperties.RecipientList.Add(new OracleQueueAgent("Bob"));

  // This message will be delivered to all recipients, as there is 
  // no specific member in the recipients list.
  OracleQueueMessage message2 = new OracleQueueMessage();
  message2.StringPayload = "Message for all.";


  // Create an OracleQueue instance that will receive general notifications 
  // and notifications for Scott.
  OracleQueue oracleDequeueScott = new OracleQueue("MESSAGE_QUEUE", oracleConnection);
  oracleDequeueScott.DequeueOptions.Navigation = OracleQueueNavigation.FirstMessage;
  oracleDequeueScott.DequeueOptions.ConsumerName = "Scott";
  // Set up asynchronous notification using events.
  // The event just signals that a new message is available; 
  //messages are not dequeued implicitly.
  oracleDequeueScott.AsyncNotification = true;
  oracleDequeueScott.OnMessage += new OracleQueueMessageEventHandler(
        oracleDequeueScott_OnMessage);

  // Create an OracleQueue instance that will receive general 
  // notifications and notifications for Bob.
  OracleQueue oracleDequeueBob = new OracleQueue("MESSAGE_QUEUE", oracleConnection);
  oracleDequeueBob.DequeueOptions.Navigation = OracleQueueNavigation.FirstMessage;
  oracleDequeueBob.DequeueOptions.ConsumerName = "Bob";
  oracleDequeueBob.AsyncNotification = true;
  oracleDequeueBob.OnMessage += new OracleQueueMessageEventHandler(
        oracleDequeueBob_OnMessage);

  // Enqueue the messages
  oracleEnqueueQueue.Enqueue(message1);
  oracleEnqueueQueue.Enqueue(message2);

  // Wait a little so the events can fire.
  System.Threading.Thread.Sleep(5000);

  oracleQueueAdmin.StopQueue();
  oracleQueueAdmin.DropQueue();
  oracleQueueTable.DropQueueTable();

  oracleConnection.Close();
}


// A helper function that dequeues messages during an event
string getMessageData(object sender, OracleQueueMessageEventArgs e) {
  OracleQueue oracleDequeue = sender as OracleQueue;
  OracleQueueDequeueOptions dequeueOptions = oracleDequeue.DequeueOptions;
  dequeueOptions.MessageId = e.MessageId;
  // Messages are not deleted from server once dequeued.
  dequeueOptions.DequeueMode = OracleQueueDequeueMode.Browse;
  OracleQueueMessage msg = oracleDequeue.Dequeue(dequeueOptions);
  return msg.StringPayload;
}


// Simple event handlers that write something into console 
// when a message is received
void oracleDequeueBob_OnMessage(object sender, OracleQueueMessageEventArgs e) {
  string messageData = getMessageData(sender, e);
  Console.WriteLine("Bob receives a message: " + messageData);
}


void oracleDequeueScott_OnMessage(object sender, OracleQueueMessageEventArgs e) {
  string messageData = getMessageData(sender, e);
  Console.WriteLine("Scott receives a message: " + messageData);
}
[Visual Basic]
Private Sub TestAQ()
  Dim oracleConnection As New OracleConnection( _
       "User Id=system;Password=manager;Server=ora;Direct=False;")
  oracleConnection.Open()
  Dim oracleQueueTable As New OracleQueueTable( _
       "QUEUE_TABLE_MESSAGE", oracleConnection)

  ' Set the table to be multiconsumer.
  oracleQueueTable.Options.MultipleConsumers = True
  oracleQueueTable.Options.PayloadTypeName = "RAW"
  oracleQueueTable.CreateQueueTable()
  Dim oracleQueueAdmin As New OracleQueueAdmin( _
        "MESSAGE_QUEUE", "QUEUE_TABLE_MESSAGE", oracleConnection)
  oracleQueueAdmin.CreateQueue()
  oracleQueueAdmin.StartQueue()

  ' Register Bob and Scott as known recipients.
  ' This is required to send messages to Bob or Scott personally.
  oracleQueueAdmin.AddSubscriber(New OracleQueueAgent("Bob"))
  oracleQueueAdmin.AddSubscriber(New OracleQueueAgent("Scott"))

  Dim oracleEnqueueQueue As New OracleQueue("MESSAGE_QUEUE", oracleConnection)

  ' This message will be delivered to Bob only.
  Dim message1 As New OracleQueueMessage
  message1.StringPayload = "Message for Bob."
  message1.MessageProperties.RecipientList.Add(New OracleQueueAgent("Bob"))

  ' This message will be delivered to all recipients, 
  ' as there is no specific member in the recipients list.
  Dim message2 As New OracleQueueMessage
  message2.StringPayload = "Message for all."

  ' Create an OracleQueue instance that will receive general notifications 
  ' and notifications for Scott.
  Dim oracleDequeueScott As New OracleQueue("MESSAGE_QUEUE", oracleConnection)
  oracleDequeueScott.DequeueOptions.Navigation = OracleQueueNavigation.FirstMessage
  oracleDequeueScott.DequeueOptions.ConsumerName = "Scott"
  ' Set up asynchronous notification using events.
  ' The event just signals that a new message is available; 
  ' messages are not dequeued implicitly.
  oracleDequeueScott.AsyncNotification = True
  AddHandler oracleDequeueScott.OnMessage, _
      New OracleQueueMessageEventHandler(AddressOf oracleDequeueScott_OnMessage)

  ' Create an OracleQueue instance that will receive general notifications 
  ' and notifications for Bob.
  Dim oracleDequeueBob As New OracleQueue("MESSAGE_QUEUE", oracleConnection)
  oracleDequeueBob.DequeueOptions.Navigation = OracleQueueNavigation.FirstMessage
  oracleDequeueBob.DequeueOptions.ConsumerName = "Bob"
  oracleDequeueBob.AsyncNotification = True
  AddHandler oracleDequeueBob.OnMessage, _
      New OracleQueueMessageEventHandler(AddressOf oracleDequeueBob_OnMessage)

  ' Enqueue the messages
  oracleEnqueueQueue.Enqueue(message1)
  oracleEnqueueQueue.Enqueue(message2)

  ' Wait a little so the events can fire.
  System.Threading.Thread.Sleep(5000)

  oracleQueueAdmin.StopQueue()
  oracleQueueAdmin.DropQueue()
  oracleQueueTable.DropQueueTable()

  oracleConnection.Close()
End Sub

' A helper function that dequeues messages during an event
Private Function getMessageData(ByVal sender As Object, _
    ByVal e As OracleQueueMessageEventArgs) As String
  Dim oracleDequeue As OracleQueue = TryCast(sender, OracleQueue)
  Dim dequeueOptions As OracleQueueDequeueOptions = oracleDequeue.DequeueOptions
  dequeueOptions.MessageId = e.MessageId
  ' Messages are not deleted from server once dequeued.
  dequeueOptions.DequeueMode = OracleQueueDequeueMode.Browse
  Dim msg As OracleQueueMessage = oracleDequeue.Dequeue(dequeueOptions)
  Return msg.StringPayload
End Function

' Simple event handlers that write something into console when a message is received
Private Sub oracleDequeueBob_OnMessage(ByVal sender As Object, _
    ByVal e As OracleQueueMessageEventArgs)
  Dim messageData As String = getMessageData(sender, e)
  Console.WriteLine(("Bob receives a message: " + messageData))
End Sub

Private Sub oracleDequeueScott_OnMessage(ByVal sender As Object, _
    ByVal e As OracleQueueMessageEventArgs)
  Dim messageData As String = getMessageData(sender, e)
  Console.WriteLine(("Scott receives a message: " + messageData))
End Sub

Administration

The AQ technology incorporates many options, settings, and capabilities in both enqueuing and dequeuing messages. To support this set of features dotConnect for Oracle introduces a wide set of classes in the Oracle namespace. The public interface of these classes is made similar to the interface of Oracle server packages, so you can refer to the official documentation for detailed information on what is behind the numerous methods and properties.

As seen in the three examples in this article, most administrative tasks can be accomplished by OracleQueueTable and OracleQueueAdmin classes. Note that administering queues often requires a rich set of privileges in the database.