Skip Headers

Oracle® Streams Advanced Queuing User's Guide and Reference
Release 10.1

Part Number B10785-01
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Index
Index
Go to Master Index
Master Index
Go to Feedback page
Feedback

Go to previous page
Previous
Go to next page
Next
View PDF

10 Oracle Streams AQ Operational Interface: Basic Operations

This chapter describes the Oracle Streams Advanced Queuing (AQ) basic operational interface.

This chapter contains these topics:


See Also:


Enqueuing a Message

This section contains these topics:


Purpose

Adds a message to the specified queue.


Syntax
DBMS_AQ.ENQUEUE (
   queue_name          IN      VARCHAR2,
   payload             IN      "type_name",
   msgid               OUT     RAW);

Usage Notes

If a message is enqueued to a multiconsumer queue with no recipient and the queue has no subscribers (or rule-based subscribers that match this message), then Oracle error ORA 24033 is raised. This is a warning that the message will be discarded because there are no recipients or subscribers to whom it can be delivered.


Examples

Examples are provided in the following programmatic environments:

Enqueuing a Message and Specifying Options


Purpose

Specifies options available for the enqueue operation.


Syntax
DBMS_AQ.ENQUEUE (
   queue_name          IN      VARCHAR2,
   enqueue_options     IN      enqueue_options_t,
   message_properties  IN      message_properties_t,
   payload             IN      "type_name",
   msgid               OUT     RAW);

Usage Notes

Do not use the immediate option when you want to use LOB locators. LOB locators are valid only for the duration of the transaction. Your locator will not be valid, because the immediate option automatically commits the transaction.

The sequence deviation parameter in enqueue options can be used to change the order of processing between two messages. The identity of the other message, if any, is specified by the enqueue options parameter relative msgid. The relationship is identified by the sequence deviation parameter.

Specifying sequence deviation for a message introduces some restrictions for the delay and priority values that can be specified for this message. The delay of this message must be less than or equal to the delay of the message before which this message is to be enqueued. The priority of this message must be greater than or equal to the priority of the message before which this message is to be enqueued.

The visibility option must be immediate for nonpersistent queues.

Only local recipients are supported for nonpersistent queues.

If a transformation is specified, then it is applied to the message before enqueuing it to the queue. The transformation must map the message into an object whose type is the Oracle object type of the queue.

Using Secure Queues

For secure queues, you must specify the sender_id in the messages_properties parameter. See "MESSAGE_PROPERTIES_T Type" in PL/SQL Packages and Types Reference for more information about sender_id.

When you use secure queues, the following are required:

Enqueuing a Message and Specifying Message Properties


Purpose

Specifies message properties for the enqueue operation.


Syntax
DBMS_AQ.ENQUEUE (
   queue_name          IN      VARCHAR2,
   message_properties  IN      message_properties_t,
   payload             IN      "type_name",
   msgid               OUT     RAW);

Usage Notes

Oracle Streams AQ uses message properties to manage individual messages. They are set when a message is enqueued, and their values are returned when the message is dequeued. To view messages in a waiting or processed state, you can either dequeue or browse by message ID, or use SELECT statements.

Message delay and expiration are enforced by the queue monitor (QMN) background processes. You must start the QMN processes for the database if you intend to use the delay and expiration features of Oracle Streams AQ.

Enqueuing a Message and Specifying Sender ID


Purpose

Identifies the producer of a message.


Syntax
DBMS_AQ.ENQUEUE (
   queue_name          IN      VARCHAR2,
   payload             IN      "type_name",
   msgid               OUT     RAW);


See Also:

"AQ Agent Type (aq$_agent)" for more information on Agent

Enqueuing a Message and Adding Payload

To store a payload of type RAW, Oracle Streams AQ creates a queue table with LOB column as the payload repository. The maximum size of the payload is determined by which programmatic environment you use to access Oracle Streams AQ. For PL/SQL, Java and precompilers the limit is 32K; for the OCI the limit is 4G.


Examples

You must set up the following data structures for certain examples to work:

CONNECT system/manager
CREATE USER aq IDENTIFIED BY aq;
GRANT Aq_administrator_role TO aq;
***** CREATE TYPE ******

EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE (
   Queue_table            =>  'aq.objsgs_qtab',
   Queue_payload_type     =>  'aq.message_typ');
EXECUTE DBMS_AQADM.CREATE_QUEUE ( 
   Queue_name            =>  'aq.msg_queue',
   Queue_table           =>  'aq.objmsgs_qtab');
EXECUTE DBMS_AQADM.START_QUEUE (
   Queue_name         => 'aq.msg_queue',
   Enqueue            => TRUE);
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE (
   Queue_table            => 'aq.prioritymsgs_qtab',
   Sort_list              => 'PRIORITY,ENQ_TIME',
   Queue_payload_type     => 'aq.message_typ');
EXECUTE DBMS_AQADM.CREATE_QUEUE (
   Queue_name             => 'aq.priority_msg_queue',
   Queue_table            => 'aq.prioritymsgs_qtab');
EXECUTE DBMS_AQADM.START_QUEUE (
   Queue_name             => 'aq.priority_msg_queue',
   Enqueue                => TRUE);

Example 10-1 PL/SQL: Enqueue a Single Message and Specify the Queue Name and Payload

/* Enqueue to msg_queue: */
DECLARE
   Enqueue_options     DBMS_AQ.enqueue_options_t;
   Message_properties  DBMS_AQ.message_properties_t;
   Message_handle      RAW(16);
   Message             aq.message_typ;

BEGIN
   Message := aq.message_typ('NORMAL MESSAGE',
      'enqueued to msg_queue first.');

   DBMS_AQ.ENQUEUE(queue_name => 'msg_queue',
   Enqueue_options            => enqueue_options,
   Message_properties         => message_properties,
   Payload                    => message,
   Msgid                      => message_handle);

   COMMIT;
END;

Example 10-2 PL/SQL: Enqueue a Single Message and Specify the Priority

/* The queue name priority_msg_queue is defined as an object type queue table.
   The payload object type is message. The schema of the queue is aq.  */

 /* Enqueue a message with priority 30: */ 
DECLARE 
   Enqueue_options       dbms_aq.enqueue_options_t; 
   Message_properties    dbms_aq.message_properties_t; 
   Message_handle        RAW(16); 
   Message               aq.Message_typ; 
 
BEGIN 
   Message := Message_typ('PRIORITY MESSAGE', 'enqued at priority 30.'); 
 
   message_properties.priority := 30; 
 
   DBMS_AQ.ENQUEUE(queue_name => 'priority_msg_queue', 
   enqueue_options            => enqueue_options, 
   message_properties         => message_properties, 
   payload                    => message, 
   msgid                      => message_handle); 
 
   COMMIT; 
END; 

Example 10-3 PL/SQL: Enqueue a Single Message and Specify a Transformation

/* Enqueue to msg_queue: */
DECLARE
   Enqueue_options     DBMS_AQ.enqueue_options_t;
   Message_properties  DBMS_AQ.message_properties_t;
   Message_handle      RAW(16);
   Message             aq.message_typ;

BEGIN
   Message := aq.message_typ('NORMAL MESSAGE',
      'enqueued to msg_queue first.');

   DBMS_AQ.ENQUEUE(queue_name => 'msg_queue', 
   Enqueue_options            => enqueue_options,
   Message_properties         => message_properties,
   transformation             => 'AQ.MSG_MAP',
   Payload                    => message,
   Msgid                      => message_handle);

   COMMIT;
END;

Where MSG_MAP was created as follows:

BEGIN
   DBMS.TRANSFORM.CREATE_TRANSFORMATION
   (
      schema => 'AQ',
      name =>   'MSG_MAP',
      from_schema => 'AQ',
      from_type => 'PO_ORDER1',
      to_schema => 'AQ',
      to_type => 'PO_ORDER2',
      transformation => 'AQ.MAP_PO_ORDER (source.user_data)'),
END;

Example 10-4 Java (JDBC): Enqueue a Message and Add Payload

/* Setup  */
connect system/manager
CREATE USER aq IDENTIFIED BY aq;
grant aq_administrator_role to aq;

public static void setup(AQSession aq_sess) throws AQException
{
     AQQueueTableProperty    qtable_prop;
     AQQueueProperty         queue_prop;
     AQQueueTable            q_table;
     AQQueue                 queue;
     AQAgent                 agent;

     qtable_prop = new AQQueueTableProperty("RAW"); 

     q_table = aq_sess.createQueueTable ("aq", "rawmsgs_qtab", qtable_prop);

     queue_prop = new AQQueueProperty();
     queue = aq_sess.createQueue (q_table, "msg_queue", queue_prop);

     queue.start();

     qtable_prop = new AQQueueTableProperty("RAW"); 
     qtable_prop.setMultiConsumer(true);

     qtable_prop.setSortOrder("priority,enq_time");
     q_table = aq_sess.createQueueTable ("aq", "rawmsgs_qtab2", 
     qtable_prop);

     queue_prop = new AQQueueProperty();
     queue = aq_sess.createQueue (q_table, "priority_msg_queue", queue_prop);

     queue.start();

     agent = new AQAgent("subscriber1", null);

     queue.addSubscriber(agent, null);
}


/* Enqueue a message */
public static void example(AQSession aq_sess) throws AQException, SQLException
{
     AQQueue                  queue;
     AQMessage                message;
     AQRawPayload             raw_payload;
     AQEnqueueOption          enq_option;
     String                   test_data = "new message";
     byte[]                   b_array;
     Connection               db_conn;

     db_conn = ((AQOracleSession)aq_sess).getDBConnection();

     /* Get a handle to the queue */
     queue = aq_sess.getQueue ("aq", "msg_queue");

     /* Create a message to contain raw payload: */
     message = queue.createMessage();

     /* Get handle to the AQRawPayload object and populate it with raw data: */
     b_array = test_data.getBytes();

     raw_payload = message.getRawPayload();

     raw_payload.setStream(b_array, b_array.length);

     /* Create a AQEnqueueOption object with default options: */
     enq_option = new AQEnqueueOption();

     /* Enqueue the message: */
     queue.enqueue(enq_option, message);

     db_conn.commit();
}


/* Enqueue a message with priority = 5 */
public static void example(AQSession aq_sess) throws AQException, SQLException
{
     AQQueue                  queue;
     AQMessage                message;
     AQMessageProperty        msg_prop;
     AQRawPayload             raw_payload;
     AQEnqueueOption          enq_option;
     String                   test_data = "priority message";
     byte[]                   b_array;
     Connection               db_conn;

     db_conn = ((AQOracleSession)aq_sess).getDBConnection();

     /* Get a handle to the queue */
     queue = aq_sess.getQueue ("aq", "msg_queue");

     /* Create a message to contain raw payload: */
     message = queue.createMessage();

     /* Get Message property */
     msg_prop = message.getMessageProperty();

     /* Set priority */
     msg_prop.setPriority(5);

     /* Get handle to the AQRawPayload object and populate it with raw data: */
     b_array = test_data.getBytes();

     raw_payload = message.getRawPayload();

     raw_payload.setStream(b_array, b_array.length);

     /* Create a AQEnqueueOption object with default options: */
     enq_option = new AQEnqueueOption();

     /* Enqueue the message: */
     queue.enqueue(enq_option, message);

     db_conn.commit();
}

Example 10-5 Visual Basic (OO4O): Enqueue a message

Enqueuing messages of type objects

'Prepare the message. MESSAGE_TYPE is a user-defined type
' in the "AQ" schema 
Set OraMsg = Q.AQMsg(1, "MESSAGE_TYPE") 
Set OraObj = DB.CreateOraObject("MESSAGE_TYPE") 

OraObj("subject").Value = "Greetings from OO4O" 
OraObj("text").Value = "Text of a message originated from OO4O" 

Msgid = Q.Enqueue

Enqueuing messages of type RAW

'Create an OraAQ object for the queue "DBQ" 
Dim Q as object 
Dim Msg as object 
Dim OraSession as object 
Dim DB as object 

Set OraSession = CreateObject("OracleInProcServer.XOraSession") 
Set OraDatabase = OraSession.OpenDatabase(mydb, "scott/tiger" 0&) 
Set Q = DB.CreateAQ("DBQ") 

'Get a reference to the AQMsg object 
Set Msg = Q.AQMsg 
Msg.Value = "Enqueue the first message to a RAW queue." 

'Enqueue the message 
Q.Enqueue() 

'Enqueue another message.

Msg.Value = "Another message" 
Q.Enqueue() 

'Enqueue a message with nondefault properties. 
Msg.Priority = ORAQMSG_HIGH_PRIORITY 
Msg.Delay = 5 
Msg.Value = "Urgent message" 
Q.Enqueue() 
Msg.Value = "The visibility option used in the enqueue call is 
             ORAAQ_ENQ_IMMEDIATE" 
Q.Visible = ORAAQ_ENQ_IMMEDIATE 
Msgid = Q.Enqueue 

'Enqueue Ahead of message Msgid_1 
Msg.Value = "First Message to test Relative Message id" 
Msg.Correlation = "RELATIVE_MESSAGE_ID" 

Msgid_1 = Q.Enqueue 
Msg.Value = "Second message to test RELATIVE_MESSAGE_ID is queued 
             ahead of the First Message " 
OraAq.relmsgid = Msgid_1 
Msgid = Q.Enqueue

Enqueuing an Array of Messages


Purpose

Use the ENQUEUE_ARRAY function to enqueue an array of payloads using a corresponding array of message properties. The output is an array of message IDs of the enqueued messages. The function returns the number of messages successfully enqueued.


Syntax
DBMS_AQ.ENQUEUE_ARRAY (
   queue_name                IN   VARCHAR2,
   enqueue_options           IN   enqueue_options_t,
   array_size                IN   pls_integer,
   message_properties_array  IN   message_properties_array_t,
   payload_array             IN   VARRAY,
   msid_array                OUT  msgid_array_t)
RETURN pls_integer;

Usage Notes

The payload structure can be a VARRAY or nested table. The message IDs are returned into an array of RAW(16) entries of type DBMS_AQ.msgid_array_t.

As with array operations in the relational world, it is not possible to provide a single optimum array size that will be correct in all circumstances. Application developers must experiment with different array sizes to determine the optimal value for their particular applications.


Examples

Examples are provided in the following programmatic environments:

Example 10-6 PL/SQL: Array Enqueuing into a Queue of Type Message

CREATE OR REPLACE TYPE message as OBJECT (
data VARCHAR2(10)) ;/
 
CREATE OR REPLACE TYPE message_tbl AS TABLE OF message;/
 
....
 
DECLARE
 enqopt dbms_aq.enqueue_options_t;
 msgproparr dbms_aq.message_properties_array_t;
 msgprop dbms_aq.message_properties_t;
 payloadarr  message_tbl;
 msgidarr dbms_aq.msgid_array_t;
 retval pls_integer;
 
BEGIN
  payloadarr := message_tbl(message('Oracle') ,message('Corp')) ;
  msgproparr := dbms_aq.message_properties_array_t(msgprop, msgprop);
 
  retval := dbms_aq.enqueue_array( queue_name => 'AQUSER.MY_QUEUE',
                 enqueue_options => enqopt ,
                 array_size => 2,
                 message_properties_array => msgproparr,
                 payload_array => payloadarr,
                 msgid_array => msgidarr ) ;
  commit;
END;/

Example 10-7 C(OCI): Array Enqueuing into a Queue of Type Message

struct message{
 
  OCIString   *data;
};
typedef struct message message;
 
struct null_message{
 
  OCIInd null_adt;
  OCIInd null_data;
};
typedef struct null_message null_message;
 
int main( argc, argv)
int argc ;
char **argv ;{
 
  OCIEnv              *envhp;
  OCIServer           *srvhp;
  OCIError            *errhp;
  OCISvcCtx           *svchp;
  OCISession          *usrhp;
  dvoid               *tmp;
  OCIType             *mesg_tdo = (OCIType *) 0;
  message              mesg[NMESGS];
  message             *mesgp[NMESGS];
  null_message         nmesg[NMESGS];
  null_message        *nmesgp[NMESGS];
  int                  i, j, k;
  OCIInd               ind[NMESGS];
  dvoid               *indptr[NMESGS];
  ub4                  priority;
  OCIAQEnqOptions     *enqopt = (OCIAQEnqOptions *)0;
  OCIAQMsgProperties  *msgprop= (OCIAQMsgProperties *)0;
  ub4                  wait = 1;
  ub4                  navigation = OCI_DEQ_NEXT_MSG;
  ub4                  iters = 2;
  text                *qname ; 
  text                 mesgdata[30];
  ub4                  payload_size = 5;
  text                *payload = (text *)0;
  ub4                  batch_size = 2;
  ub4                  enq_size = 2;
  
  printf("session start\n");
  /* establish a session */  
  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
        (dvoid * (*)()) 0,  (void (*)()) 0 );
  
  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
           52, (dvoid **) &tmp);
  
  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
           52, (dvoid **) &tmp);
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
           52, (dvoid **) &tmp);
  
  printf("server attach\n");
  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
           52, (dvoid **) &tmp);
  
  /* set attribute server context in the service context */
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
           (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);
  
  /* allocate a user context handle */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
           (size_t) 0, (dvoid **) 0);
  
  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
           (dvoid *)"AQUSER", (ub4)strlen("AQUSER"), 
             OCI_ATTR_USERNAME, errhp);
           
  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
           (dvoid *)"AQUSER", (ub4)strlen("AQUSER"),
           OCI_ATTR_PASSWORD, errhp);
  
  checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, 
                                   OCI_DEFAULT));
  
  OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX,
           (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp);
 
  /* get descriptor for enqueue options */
  checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&enqopt, 
                                         OCI_DTYPE_AQENQ_OPTIONS, 0, 
                                         (dvoid **)0));
 
  printf("enq options set\n");
  /* set enqueue options - for consumer name, wait and navigation */
 
  /* construct null terminated payload string */
  payload = (text *)malloc(payload_size+1);
  for (k=0 ; k < payload_size ; k++)
    payload[k] = 'a';
  payload[payload_size] = '\0';
 
  for (k=0 ; k < batch_size ; k++)
  {
    indptr[k] = &ind[k];
    mesgp[k] = &mesg[k];
    nmesgp[k] = &nmesg[k];
    nmesg[k].null_adt = nmesg[k].null_data = OCI_IND_NOTNULL; 
    mesg[k].data = (OCIString *)0;
    OCIStringAssignText(envhp, errhp, (const unsigned char *)payload,
                        strlen((const char *)payload), &(mesg[k].data));
  }
  
  printf("check message tdo\n");
  checkerr(errhp, OCITypeByName(envhp, errhp, svchp, 
   (CONST text *)"AQUSER", strlen("AQUSER"),
   (CONST text *)"MESSAGE", strlen("MESSAGE"), (text *)0, 0, 
   OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo));
  k=0;
 
  while (k < iters) 
  {
    enq_size = batch_size;
    checkerr(errhp, OCIAQEnqArray(svchp, errhp, 
                                  (dvoid *)"AQUSER.MY_QUEUE", 
                                  (OCIAQEnqOptions *)0, &enq_size, 
                                  0, mesg_tdo, 
                                  (dvoid **)&mesgp, 
                                  (dvoid **)&nmesgp, 0, 0, 0, 0));
    k+=batch_size;
  }
  
  checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0));
 
  checkerr(errhp, OCIServerDetach( srvhp, errhp, (ub4) OCI_DEFAULT));
 
  return 0;}

Listening to One or More Queues


Purpose

Specifies which queue or queues to monitor


Syntax
DBMS_AQ.LISTEN (
   agent_list IN    aq$_agent_list_t,
   wait       IN    BINARY_INTEGER DEFAULT DBMS_AQ.FOREVER, 
   agent      OUT   sys.aq$_agent);

TYPE aq$_agent_list_t IS TABLE of aq$_agent INDEXED BY BINARY_INTEGER;

Usage Notes

The call takes a list of agents as an argument. You specify the queue to be monitored in the address field of each agent listed. You also must specify the name of the agent when monitoring multiconsumer queues. For single-consumer queues, an agent name must not be specified. Only local queues are supported as addresses. Protocol is reserved for future use.


Note:

Listening to multiconsumer queues is not supported in the Java API.

This is a blocking call that returns when there is a message ready for consumption for an agent in the list. If there are messages for more than one agent, then only the first agent listed is returned. If there are no messages found when the wait time expires, then an error is raised.

A successful return from the listen call is only an indication that there is a message for one of the listed agents in one of the specified queues. The interested agent must still dequeue the relevant message.


Note:

You cannot call listen on nonpersistent queues.


Examples

Examples are provided in the following programmatic environments:

Example 10-8 PL/SQL: Listen to Single-Consumer Queue (Timeout of Zero)

/* The listen call monitors a list of queues for messages for 
   specific agents. You must have dequeue privileges for all the queues 
   you wish to monitor. */
DECLARE
   Agent_w_msg      aq$_agent;
   My_agent_list    dbms_aq.agent_list_t;

BEGIN
   /* NOTE:  MCQ1, MCQ2, MCQ3 are multiconsumer queues  in SCOTT's schema
   *        SCQ1, SCQ2, SCQ3 are single-consumer queues in SCOTT's schema
   */

   Qlist(1):= aq$_agent(NULL, 'scott.SCQ1',  NULL);
   Qlist(2):= aq$_agent(NULL, 'SCQ2', NULL);
   Qlist(3):= aq$_agent(NULL, 'SCQ3', NULL);

   /* Listen with a timeout of zero: */
   DBMS_AQ.LISTEN(
      Agent_list   =>   My_agent_list, 
      Wait         =>   0, 
      Agent        =>   agent_w_msg);
   DBMS_OUTPUT.PUT_LINE('Message in Queue :- ' ||  agent_w_msg.address);
   DBMS_OUTPUT.PUT_LINE('');
END;

Example 10-9 Java (JDBC): Listen to Queues

public static void monitor_status_queue(Connection db_conn)
{
    AQSession         aq_sess;
    AQAgent[]         agt_list = null;
    AQAgent           ret_agt  = null;

    try
    {
        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

/* Construct the waiters list: */ 
agt_list = new AQAgent[3];

agt_list[0] = new AQAgent(null, "scott.SCQ1",0);
agt_list[1] = new AQAgent (null, "SCQ2",0);
agt_list[2] = new AQAgent (null, "SCQ3",0);

/* Wait for order status messages for 120 seconds: */
ret_agt = aq_sess.listen(agt_list, 120);

System.out.println("Message available for agent: " + 
   ret_agt.getName()  + "   "  + ret_agt.getAddress());

    }
    catch (AQException aqex)
    {
System.out.println("Exception-1: " + aqex);
    }
    catch (Exception ex)
    {
        System.out.println("Exception-2: " + ex);
    }

}

Example 10-10 C (OCI): Listening for Single-Consumer Queues with Zero Timeout

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <oci.h>

static void checkerr(errhp, status)
OCIError *errhp;
sword status;
{
    text errbuf[512];
    ub4 buflen;
    sb4 errcode;

    switch (status)
    {
   case OCI_SUCCESS:
       break;
   case OCI_SUCCESS_WITH_INFO:
       printf("Error - OCI_SUCCESS_WITH_INFO\n");
       break;
   case OCI_NEED_DATA:
       printf("Error - OCI_NEED_DATA\n");
       break;
   case OCI_NO_DATA:
       printf("Error - OCI_NO_DATA\n");
       break;
   case OCI_ERROR:
       OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode,
       errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR);
       printf("Error - %s\n", errbuf);
       break;
   case OCI_INVALID_HANDLE:
       printf("Error - OCI_INVALID_HANDLE\n");
       break;
   case OCI_STILL_EXECUTING:
       printf("Error - OCI_STILL_EXECUTE\n");
       break;
   case OCI_CONTINUE:
       printf("Error - OCI_CONTINUE\n");
       break;
   default:
   break;
    }
}

/* set agent into descriptor */
void SetAgent(agent, appname, queue,errhp)

OCIAQAgent  *agent;
text        *appname;
text        *queue;
OCIError    *errhp;
{

  OCIAttrSet(agent, OCI_DTYPE_AQAGENT, 
     appname ? (dvoid *)appname : (dvoid *)"", 
     appname ? strlen((const char *)appname) : 0,
        OCI_ATTR_AGENT_NAME, errhp);

  OCIAttrSet(agent, OCI_DTYPE_AQAGENT, 
     queue ? (dvoid *)queue : (dvoid *)"", 
     queue ? strlen((const char *)queue) : 0,
        OCI_ATTR_AGENT_ADDRESS, errhp);

  printf("Set agent name to %s\n", appname ? (char *)appname : "NULL");
  printf("Set agent address to %s\n", queue ? (char *)queue : "NULL");
}

/* get agent from descriptor */
void GetAgent(agent, errhp)
OCIAQAgent *agent;
OCIError   *errhp;
{
text      *appname;
text      *queue;
ub4       appsz;
ub4       queuesz;

  if (!agent )
  {
    printf("agent was NULL \n");
    return;
  }
  checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, 
     (dvoid *)&appname, &appsz, OCI_ATTR_AGENT_NAME, errhp));
  checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, 
     (dvoid *)&queue, &queuesz, OCI_ATTR_AGENT_ADDRESS, errhp));
  if (!appsz)
     printf("agent name: NULL\n");
  else printf("agent name: %.*s\n", appsz, (char *)appname);
  if (!queuesz)
     printf("agent address: NULL\n");
  else printf("agent address: %.*s\n", queuesz, (char *)queue);
}

int main()
{
  OCIEnv *envhp;
  OCIServer *srvhp;
  OCIError *errhp;
  OCISvcCtx *svchp;
  OCISession *usrhp;
  OCIAQAgent *agent_list[3];
  OCIAQAgent *agent = (OCIAQAgent *)0;
  /* added next 2 121598 */
  int i;

 /* Standard OCI Initialization */

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
     (dvoid * (*)()) 0,  (void (*)()) 0 );
 
  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, 
             (ub4) OCI_HTYPE_ENV, 0, (dvoid **) 0);

  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 0, (dvoid **) 0);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
     0, (dvoid **) 0);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
     0, (dvoid **) 0);

  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
     0, (dvoid **) 0);

  /* set attribute server context in the service context */
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
     (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);

  /* allocate a user context handle */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
      (size_t) 0, (dvoid **) 0);

  /* allocate a user context handle */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
     (size_t) 0, (dvoid **) 0);

  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
     (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp);

  OCIAttrSet((dvoid *) usrhp, (ub4) OCI_HTYPE_SESSION,
      (dvoid *) "tiger", (ub4) strlen("tiger"),
      (ub4) OCI_ATTR_PASSWORD, errhp);

  OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT);

  OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX,
     (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp);

  /* AQ LISTEN Initialization - allocate agent handles */
  for (i = 0; i < 3; i++)
  {
     agent_list[i] = (OCIAQAgent *)0;
     OCIDescriptorAlloc(envhp, (dvoid **)&agent_list[i], 
         OCI_DTYPE_AQAGENT, 0, (dvoid **)0);
  }

  /* 
   *   SCQ1, SCQ2, SCQ3 are single-consumer queues in SCOTT's schema
   */
 
  SetAgent(agent_list[0], (text *)0, "SCOTT.SCQ1", errhp);
  SetAgent(agent_list[1], (text *)0, "SCOTT.SCQ2", errhp);
  SetAgent(agent_list[2], (text *)0, "SCOTT.SCQ3", errhp);
 
  checkerr(errhp,OCIAQListen(svchp, errhp, agent_list, 3, 0, &agent, 0));

  printf("MESSAGE for :- \n");
  GetAgent(agent, errhp);
  printf("\n");

}

Example 10-11 C (OCI): Listening for Single-Consumer Queues with Timeout of 120 Seconds

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <oci.h>

static void checkerr(errhp, status)
OCIError *errhp;
sword status;
{
    text errbuf[512];
    ub4 buflen;
    sb4 errcode;

    switch (status)
    {
   case OCI_SUCCESS:
       break;
   case OCI_SUCCESS_WITH_INFO:
       printf("Error - OCI_SUCCESS_WITH_INFO\n");
       break;
   case OCI_NEED_DATA:
       printf("Error - OCI_NEED_DATA\n");
       break;
   case OCI_NO_DATA:
       printf("Error - OCI_NO_DATA\n");
       break;
   case OCI_ERROR:
       OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode,
       errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR);
       printf("Error - %s\n", errbuf);
       break;
   case OCI_INVALID_HANDLE:
       printf("Error - OCI_INVALID_HANDLE\n");
       break;
   case OCI_STILL_EXECUTING:
       printf("Error - OCI_STILL_EXECUTE\n");
       break;
   case OCI_CONTINUE:
       printf("Error - OCI_CONTINUE\n");
       break;
   default:
   break;
    }
}

/* set agent into descriptor */
/* void SetAgent(agent, appname, queue) */
void SetAgent(agent, appname, queue,errhp)

OCIAQAgent  *agent;
text        *appname;
text        *queue;
OCIError    *errhp;
{

  OCIAttrSet(agent, OCI_DTYPE_AQAGENT, 
     appname ? (dvoid *)appname : (dvoid *)"", 
     appname ? strlen((const char *)appname) : 0,
        OCI_ATTR_AGENT_NAME, errhp);

  OCIAttrSet(agent, OCI_DTYPE_AQAGENT, 
     queue ? (dvoid *)queue : (dvoid *)"", 
     queue ? strlen((const char *)queue) : 0,
        OCI_ATTR_AGENT_ADDRESS, errhp);

  printf("Set agent name to %s\n", appname ? (char *)appname : "NULL");
  printf("Set agent address to %s\n", queue ? (char *)queue : "NULL");
}

/* get agent from descriptor */
void GetAgent(agent, errhp)
OCIAQAgent *agent;
OCIError   *errhp;
{
text      *appname;
text      *queue;
ub4       appsz;
ub4       queuesz;

  if (!agent )
  {
    printf("agent was NULL \n");
    return;
  }
  checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, 
     (dvoid *)&appname, &appsz, OCI_ATTR_AGENT_NAME, errhp));
  checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, 
     (dvoid *)&queue, &queuesz, OCI_ATTR_AGENT_ADDRESS, errhp));
  if (!appsz)
     printf("agent name: NULL\n");
  else printf("agent name: %.*s\n", appsz, (char *)appname);
  if (!queuesz)
     printf("agent address: NULL\n");
  else printf("agent address: %.*s\n", queuesz, (char *)queue);
}

int main()
{
  OCIEnv *envhp;
  OCIServer *srvhp;
  OCIError *errhp;
  OCISvcCtx *svchp;
  OCISession *usrhp;
  OCIAQAgent *agent_list[3];
  OCIAQAgent *agent = (OCIAQAgent *)0;
  /* added next 2 121598 */
  int i;

 /* Standard OCI Initialization */

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
     (dvoid * (*)()) 0,  (void (*)()) 0 );
 
  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, 
             (ub4) OCI_HTYPE_ENV, 0, (dvoid **) 0);

  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 0, (dvoid **) 0);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
     0, (dvoid **) 0);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
     0, (dvoid **) 0);

  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
     0, (dvoid **) 0);

  /* set attribute server context in the service context */
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
     (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);

  /* allocate a user context handle */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
      (size_t) 0, (dvoid **) 0);

  /* allocate a user context handle */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
     (size_t) 0, (dvoid **) 0);

  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
     (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp);

  OCIAttrSet((dvoid *) usrhp, (ub4) OCI_HTYPE_SESSION,
      (dvoid *) "tiger", (ub4) strlen("tiger"),
      (ub4) OCI_ATTR_PASSWORD, errhp);

  OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT);

  OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX,
     (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp);

  /* AQ LISTEN Initialization - allocate agent handles */
  for (i = 0; i < 3; i++)
  {
     agent_list[i] = (OCIAQAgent *)0;
     OCIDescriptorAlloc(envhp, (dvoid **)&agent_list[i], 
         OCI_DTYPE_AQAGENT, 0, (dvoid **)0);
  }

  /* 
   *   SCQ1, SCQ2, SCQ3 are single-consumer queues in SCOTT's schema
   */
 
  SetAgent(agent_list[0], (text *)0, "SCOTT.SCQ1", errhp);
  SetAgent(agent_list[1], (text *)0, "SCOTT.SCQ2", errhp);
  SetAgent(agent_list[2], (text *)0, "SCOTT.SCQ3", errhp);
 
  checkerr(errhp,OCIAQListen(svchp, errhp, agent_list, 3, 120, &agent, 0));

  printf("MESSAGE for :- \n");
  GetAgent(agent, errhp);
  printf("\n");

}

Dequeuing a Message

This section contains these topics:


Purpose

Dequeues a message from the specified queue.


Syntax
DBMS_AQ.DEQUEUE (
   queue_name          IN      VARCHAR2,
   dequeue_options     IN      dequeue_options_t,
   message_properties  OUT     message_properties_t,
   payload             OUT     "type_name",
   msgid               OUT     RAW);

Usage Notes

The search criteria for messages to be dequeued is determined by the consumer name, msgid and correlation parameters in the dequeue options. Parameter msgid uniquely identifies the message to be dequeued. Only messages in the READY state are dequeued unless msgid is specified. Correlation identifiers are application-defined identifiers that are not interpreted by Oracle Streams AQ.

The dequeue order is determined by the values specified at the time the queue table is created unless overridden by the message ID and correlation ID in dequeue options.

The database consistent read mechanism is applicable for queue operations. For example, a BROWSE call may not see a message that is enqueued after the beginning of the browsing transaction.

The default NAVIGATION parameter during dequeue is NEXT MESSAGE. This means that subsequent dequeues retrieve the messages from the queue based on the snapshot obtained in the first dequeue. In particular, a message that is enqueued after the first dequeue command is processed only after processing all the remaining messages in the queue. This is usually sufficient when all the messages have already been enqueued into the queue, or when the queue does not have a priority-based ordering. However, applications must use the FIRST_MESSAGE navigation option when the first message in the queue must be processed by every dequeue command. This usually becomes necessary when a higher priority message arrives in the queue while messages already enqueued are being processed.


Note:

It can also be more efficient to use the FIRST_MESSAGE navigation option when there are messages being concurrently enqueued. If the FIRST_MESSAGE option is not specified, then Oracle Streams AQ continually generates the snapshot as of the first dequeue command, leading to poor performance. If the FIRST_MESSAGE option is specified, then Oracle Streams AQ uses a new snapshot for every dequeue command.

Messages enqueued in the same transaction into a queue that has been enabled for message grouping form a group. If only one message is enqueued in the transaction, then this effectively forms a group of one message. There is no upper limit to the number of messages that can be grouped in a single transaction.

In queues that have not been enabled for message grouping, a dequeue in LOCKED or REMOVE mode locks only a single message. By contrast, a dequeue operation that seeks to dequeue a message that is part of a group locks the entire group. This is useful when all the messages in a group must be processed as a unit.

When all the messages in a group have been dequeued, the dequeue returns an error indicating that all messages in the group have been processed. The application can then use the NEXT TRANSACTION to start dequeuing messages from the next available group. In the event that no groups are available, the dequeue times out after the specified WAIT period.


Examples

Examples are provided in the following programmatic environments:

Dequeuing a Message from a Single-Consumer Queue and Specifying Options


Purpose

Specifies the options available for the dequeue operation.


Usage Notes

Typically, you expect the consumer of messages to access messages using the dequeue interface. You can view processed messages or messages still to be processed by browsing by message ID or by using SELECT commands.

The transformation, if specified, is applied before returning the message to the caller. The transformation should be defined to map the queue Oracle object type to the return type wanted by the caller.


Examples

Examples are provided in the following programmatic environments:

Example 10-12 PL/SQL: Dequeue of Object Type Messages

/* Dequeue from msg_queue: */
DECLARE
dequeue_options     dbms_aq.dequeue_options_t;
message_properties  dbms_aq.message_properties_t;
message_handle      RAW(16);
message             aq.message_typ;

BEGIN
   DBMS_AQ.DEQUEUE(
      queue_name          =>     'msg_queue',
      dequeue_options      =>    dequeue_options,
      message_properties  =>     message_properties,
      payload             =>     message,
      msgid               =>     message_handle);

   DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject ||
                                      ' ... ' || message.text );
   COMMIT;
END;

Example 10-13 Java (JDBC): Dequeue a message from a single-consumer queue (specify options)

/* Dequeue a message with correlation ID = 'RUSH' */
public static void example(AQSession aq_sess) throws AQException, SQLException
{
     AQQueue                  queue;
     AQMessage                message;
     AQRawPayload             raw_payload;
     AQDequeueOption          deq_option;
     byte[]                   b_array;
     Connection               db_conn;

     db_conn = ((AQOracleSession)aq_sess).getDBConnection();

     queue = aq_sess.getQueue ("aq", "msg_queue");

     /* Create a AQDequeueOption object with default options: */
     deq_option = new AQDequeueOption();

     deq_option.setCorrelation("RUSH");

     /* Dequeue a message */
     message = queue.dequeue(deq_option);

     System.out.println("Successful dequeue"); 

     /* Retrieve raw data from the message: */
     raw_payload = message.getRawPayload();
 
     b_array = raw_payload.getBytes();

     db_conn.commit();
}

Example 10-14 Visual Basic (OO4O): Dequeue a message

Dequeuing messages of RAW type

'Dequeue the first message available
Q.Dequeue() 
Set Msg = Q.QMsg 

'Display the message content 
MsgBox Msg.Value 

'Dequeue the first message available without removing it 
' from the queue 
Q.DequeueMode = ORAAQ_DEQ_BROWSE 

'Dequeue the first message with the correlation identifier 
' equal to "RELATIVE_MSG_ID" 
Q.Navigation = ORAAQ_DQ_FIRST_MSG 
Q.correlate = "RELATIVE_MESSAGE_ID" 
Q.Dequeue 

'Dequeue the next message with the correlation identifier 

' of "RELATIVE_MSG_ID" 
Q.Navigation = ORAAQ_DQ_NEXT_MSG 
Q.Dequeue() 

'Dequeue the first high priority message 
Msg.Priority = ORAQMSG_HIGH_PRIORITY 
Q.Dequeue() 

'Dequeue the message enqueued with message ID of Msgid_1 
Q.DequeueMsgid = Msgid_1 
Q.Dequeue() 

'Dequeue the message meant for "ANDY" 
Q.consumer = "ANDY" 
Q.Dequeue() 

'Return immediately if there is no message on the queue
Q.wait = ORAAQ_DQ_NOWAIT 
Q.Dequeue()

Dequeuing messages of Oracle object type

Set OraObj = DB.CreateOraObject("MESSAGE_TYPE") 
Set QMsg = Q.AQMsg(1, "MESSAGE_TYPE") 

'Dequeue the first message available without removing it 
Q.Dequeue() 
OraObj = QMsg.Value 

'Display the subject and data 
MsgBox OraObj!subject & OraObj!Data

Dequeuing a Message from a Multiconsumer Queue and Specifying Options


Purpose

Specifies the options available for the dequeue operation.


Usage Notes

See "Dequeuing a Message from a Single-Consumer Queue and Specifying Options ".


Examples

Examples are provided in the following programmatic environments:

Example 10-15 Java (JDBC): Dequeue a message from a multiconsumer queue (specify options)

/* Dequeue a message for subscriber1 in browse mode*/
public static void example(AQSession aq_sess) throws AQException, SQLException
{
     AQQueue                  queue;
     AQMessage                message;
     AQRawPayload             raw_payload;
     AQDequeueOption          deq_option;
     byte[]                   b_array;
     Connection               db_conn;

     db_conn = ((AQOracleSession)aq_sess).getDBConnection();

     queue = aq_sess.getQueue ("aq", "priority_msg_queue");

     /* Create a AQDequeueOption object with default options: */
     deq_option = new AQDequeueOption();

     /* Set dequeue mode to BROWSE */
     deq_option.setDequeueMode(AQDequeueOption.DEQUEUE_BROWSE);

     /* Dequeue messages for subscriber1 */
     deq_option.setConsumerName("subscriber1");

     /* Dequeue a message: */
     message = queue.dequeue(deq_option);

     System.out.println("Successful dequeue"); 

     /* Retrieve raw data from the message: */
     raw_payload = message.getRawPayload();
 
     b_array = raw_payload.getBytes();

     db_conn.commit();
}

Dequeuing an Array of Messages


Purpose

Use the DEQUEUE_ARRAY function to dequeue an array of payloads and a corresponding array of message properties. The output is an array of payloads, message IDs, and message properties of the dequeued messages. The function returns the number of messages successfully dequeued.


Syntax
DBMS_AQ.DEQUEUE_ARRAY (
   queue_name                IN      VARCHAR2,
   dequeue_options           IN      dequeue_options_t,
   array_size                IN      pls_integer, 
   message_properties_array  OUT     message_properties_array_t,
   payload_array             OUT     VARRAY,
   msgid_array               OUT     msgid_array_t)
RETURN pls_integer;

Usage Notes

A nonzero wait time, as specified in dequeue_options, is recognized only when there are no messages in the queue. If the queue contains messages that are eligible for dequeue, then the DEQUEUE_ARRAY function will dequeue up to array_size messages and return immediately.

The payload structure can be a VARRAY or nested table. The message IDs are returned into an array of RAW(16) entries of type DBMS_AQ.msgid_array_t. The message properties are returned into an array of type DBMS_AQ.message_properties_array_t.

As with array operations in the relational world, it is not possible to provide a single optimum array size that will be correct in all circumstances. Application developers must experiment with different array sizes to determine the optimal value for their particular applications.

When dequeuing messages, you might want to dequeue all the messages for a transaction group with a single call. You might also want to dequeue messages that span multiple transaction groups. You can specify either of these methods by using one of the following navigation methods:

Navigation method NEXT_MESSAGE_ONE_GROUP dequeues messages that match the search criteria from the next available transaction group into an array. navigation method FIRST_MESSAGE_ONE_GROUP resets the position to the beginning of the queue and dequeues all the messages in a single transaction group that are available and match the search criteria.

The number of messages dequeued is determined by an array size limit. If the number of messages in the transaction group exceeds array_size, then multiple calls to DEQUEUE_ARRAY must be made to dequeue all the messages for the transaction group.

Navigation methods NEXT_MESSAGE_MULTI_GROUP and FIRST_MESSAGE_MULTI_GROUP work like their ONE_GROUP counterparts, but they are not limited to a single transaction group. Each message that is dequeued into the array has an associated set of message properties. Message property transaction_group determines which messages belong to the same transaction group.


Examples

Examples are provided in the following programmatic environments:

Example 10-16 PL/SQL: Array Dequeuing from a Queue of Type Message

CREATE OR REPLACE TYPE message as OBJECT (data VARCHAR2(10));
/
 
CREATE OR REPLACE TYPE message_arr AS VARRAY(2000) OF message;/
 
....
 
DECLARE
 deqopt dbms_aq.dequeue_options_t ;
 msgproparr dbms_aq.message_properties_array_t := 
               dbms_aq.message_properties_array_t();
 payloadarr  message_arr := message_arr() ;
 msgidarr dbms_aq.msgid_array_t ;
 retval pls_integer ;
BEGIN
  payloadarr.extend(2);
  msgproparr.extend(2);
  deqopt.consumer_name := 'SUB1';
  retval := dbms_aq.dequeue_array( queue_name => 'AQUSER.MY_QUEUE',
                 dequeue_options => deqopt ,
                 array_size => payloadarr.count,
                 message_properties_array => msgproparr,
                 payload_array => payloadarr,
                 msgid_array => msgidarr ) ;
END;/
 

Example 10-17 C(OCI): Array Dequeuing from a Queue of Type Message

struct message{
 
  OCIString   *data;
};
typedef struct message message;
 
struct null_message{
 
  OCIInd null_adt;
  OCIInd null_data;
};
typedef struct null_message null_message;
 
int main(argc, argv)
 int argc;
 char **argv;{
 
  OCIEnv                *envhp;
  OCIServer             *srvhp;
  OCIError              *errhp;
  OCISvcCtx             *svchp;
  OCISession            *usrhp;
  dvoid                 *tmp;
  message               *mesgp[NMESGS];
  int                    i, j, k;
  null_message          *nmesgp[NMESGS];
  ub4                    priority = 0;
  OCIAQDeqOptions       *deqopt = (OCIAQDeqOptions *)0;
  ub4                    iters = 2;
  OCIType               *mesg_tdo = (OCIType *) 0;
  ub4                    batch_size = 2;
  ub4                    deq_size = batch_size;

 printf("session start\n");
  /* establish a session */  
  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
        (dvoid * (*)()) 0,  (void (*)()) 0 );
  
  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
           52, (dvoid **) &tmp);
  
  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
           52, (dvoid **) &tmp);
 
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
           52, (dvoid **) &tmp);
  
  printf("server attach\n");
  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
           52, (dvoid **) &tmp);
  
  /* set attribute server context in the service context */
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
           (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);
  
  /* allocate a user context handle */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
           (size_t) 0, (dvoid **) 0);
  
  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
           (dvoid *)"AQUSER", (ub4)strlen("AQUSER"), 
             OCI_ATTR_USERNAME, errhp);
  
  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
           (dvoid *)"AQUSER", (ub4)strlen("AQUSER"),
           OCI_ATTR_PASSWORD, errhp);
  
  checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, 
                                   OCI_DEFAULT));
  
  OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX,
           (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp);
 
  /* get descriptor for dequeue options */
  checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, 
                                         OCI_DTYPE_AQDEQ_OPTIONS, 0, 
                                         (dvoid **)0));
  
  printf("deq options set\n");
  /* set dequeue options - for consumer name, wait and navigation */
  checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 
                                 (dvoid *)"SUB1",
                                 (ub4)strlen("SUB1"), 
                                 OCI_ATTR_CONSUMER_NAME, errhp));
 
  for (k=0 ; k < NMESGS ; k++)
  {
    mesgp[k] = 0;
    nmesgp[k] = 0;
  }
  
  printf("check message tdo\n");
  checkerr(errhp, OCITypeByName(envhp, errhp, svchp, 
        (CONST text *)"AQUSER", strlen("AQUSER"),
        (CONST text *)"MESSAGE", strlen("MESSAGE"), (text *)0, 0, 
        OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo));

 k=0;
 
  while (k < iters) 
  {
    deq_size = batch_size;
    checkerr(errhp, OCIAQDeqArray(svchp, errhp, 
                                  (text *)"AQUSER.MY_QUEUE",
                                  (OCIAQDeqOptions *)deqopt, 
                                  &deq_size, 0, mesg_tdo, 
                                  (dvoid **)mesgp, 
                                  (dvoid **)nmesgp, 0, 0, 0, 0));
    k+=batch_size;
  }
  checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0)); 
  
  checkerr(errhp, OCIServerDetach( srvhp, errhp, (ub4) OCI_DEFAULT));
  return 0;}
 

Registering for Notification


Purpose

Registers a callback for message notification.


Syntax
DBMS_AQ.REGISTER (
   reg_list IN SYS.AQ$_REG_INFO_LIST,
   count    IN NUMBER);

Usage Notes

This call is invoked for registration to a subscription which identifies the subscription name of interest and the associated callback to be invoked. Interest in several subscriptions can be registered at one time.

This interface is only valid for the asynchronous mode of message delivery. In this mode, a subscriber applies a registration call which specifies a callback. When messages are received that match the subscription criteria, the callback is invoked. The callback can then apply an explicit message_receive (dequeue) to retrieve the message.

The user must specify a subscription handle at registration time with the namespace attribute set to OCI_SUBSCR_NAMESPACE_AQ.

The subscription name is the string schema.queue if the registration is for a single-consumer queue and schema.queue:consumer_name if the registration is for a multiconsumer queues.

Related Functions: OCIAQListen(), OCISubscriptionDisable(), OCISubscriptionEnable(), OCISubscriptionUnRegister()


Examples

Example 10-18 C (OCI): Register for Notifications For Single-Consumer and Multiconsumer Queries

/* OCIRegister can be used by the client to register to receive notifications 
   when messages are enqueued into nonpersistent and usual queues. */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <oci.h>


static OCIEnv     *envhp;
static OCIServer  *srvhp;
static OCIError   *errhp;
static OCISvcCtx  *svchp;


/* The callback that gets invoked on notification */
ub4 notifyCB(ctx, subscrhp, pay, payl, desc, mode)
dvoid *ctx;
OCISubscription *subscrhp;      /* subscription handle */
dvoid           *pay;           /* payload  */
ub4              payl;          /* payload length */
dvoid           *desc;          /* the AQ notification descriptor */
ub4              mode;
{
 text                *subname;
 ub4                  size;
 ub4                 *number = (ub4 *)ctx;
 text                *queue;
 text                *consumer;
 OCIRaw              *msgid;
 OCIAQMsgProperties  *msgprop;

  (*number)++;

  /* Get the subscription name */
  OCIAttrGet((dvoid *)subscrhp, OCI_HTYPE_SUBSCRIPTION,
                             (dvoid *)&subname, &size,
                             OCI_ATTR_SUBSCR_NAME, errhp);
 printf("got notification number %d for %.*s  %d  \n", 
         *number, size, subname, payl);

 /* Get the queue name from the AQ notify descriptor */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&queue, &size, 
             OCI_ATTR_QUEUE_NAME, errhp);
 
 /* Get the consumer name for which this notification was received */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&consumer, &size, 
       OCI_ATTR_CONSUMER_NAME, errhp);

 /* Get the message ID of the message for which we were notified */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgid, &size, 
       OCI_ATTR_NFY_MSGID, errhp);

 /* Get the message properties of the message for which we were notified */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgprop, &size, 
       OCI_ATTR_MSG_PROP, errhp);

}


int main(argc, argv)
int argc;
char *argv[];
{
  OCISession *authp = (OCISession *) 0;

  /* The subscription handles */
  OCISubscription *subscrhp[5];

  /* Registrations are for AQ namespace */
  ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ;

  /* The context fot the callback */
  ub4 ctx[5] = {0,0,0,0,0};

  printf("Initializing OCI Process\n");

  /* The OCI Process Environment must be initialized  with OCI_EVENTS */
  /* OCI_OBJECT flag is set to enable us dequeue */
  (void) OCIInitialize((ub4) OCI_EVENTS|OCI_OBJECT, (dvoid *)0,
                       (dvoid * (*)(dvoid *, size_t)) 0,
                       (dvoid * (*)(dvoid *, dvoid *, size_t))0,
                       (void (*)(dvoid *, dvoid *)) 0 );

  printf("Initialization successful\n");

  /* The standard OCI setup */
  printf("Initializing OCI Env\n");
  (void) OCIEnvInit((OCIEnv **) &envhp, OCI_DEFAULT, (size_t) 0, 
                (dvoid **) 0 );

  (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, OCI_HTYPE_ERROR, 
                   (size_t) 0, (dvoid **) 0);

  /* Server contexts */
  (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, OCI_HTYPE_SERVER,
                   (size_t) 0, (dvoid **) 0);

  (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, OCI_HTYPE_SVCCTX,
                   (size_t) 0, (dvoid **) 0);


  printf("connecting to server\n");
  (void) OCIServerAttach( srvhp, errhp, (text *)"", strlen(""), 0);
  printf("connect successful\n");

  /* Set attribute server context in the service context */
  (void) OCIAttrSet( (dvoid *) svchp, OCI_HTYPE_SVCCTX, (dvoid *)srvhp, 
          (ub4) 0, OCI_ATTR_SERVER, (OCIError *) errhp);

  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&authp,
             (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0);
 
  (void) OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION,
                 (dvoid *) "scott", (ub4) strlen("scott"),
                 (ub4) OCI_ATTR_USERNAME, errhp);
 
  (void) OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION,
                 (dvoid *) "tiger", (ub4) strlen("tiger"),
                 (ub4) OCI_ATTR_PASSWORD, errhp);

  checkerr(errhp, OCISessionBegin ( svchp,  errhp, authp, OCI_CRED_RDBMS, 
           (ub4) OCI_DEFAULT));

  (void) OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX,
                   (dvoid *) authp, (ub4) 0,
                   (ub4) OCI_ATTR_SESSION, errhp);

 /* Setting the subscription handle for notification on
    a NORMAL single-consumer queue */
  printf("allocating subscription handle\n");
  subscrhp[0] = (OCISubscription *)0;
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[0], 
         (ub4) OCI_HTYPE_SUBSCRIPTION,
         (size_t) 0, (dvoid **) 0);
 
  printf("setting subscription name\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                (dvoid *) "SCOTT.SCQ1", (ub4) strlen("SCOTT.SCQ1"),
                (ub4) OCI_ATTR_SUBSCR_NAME, errhp);
 
  printf("setting subscription callback\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) notifyCB, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp);

 printf("setting subscription context \n");
 (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *)&ctx[0], (ub4)sizeof(ctx[0]),
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp);

  printf("setting subscription namespace\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) &namespace, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp);

 /* Setting the subscription handle for notification on a NORMAL multiconsumer 
    consumer queue */
  subscrhp[1] = (OCISubscription *)0;
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[1], 
         (ub4) OCI_HTYPE_SUBSCRIPTION,
         (size_t) 0, (dvoid **) 0);

  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) "SCOTT.MCQ1:APP1", 
                 (ub4) strlen("SCOTT.MCQ1:APP1"),
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp);

  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) notifyCB, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp);

  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *)&ctx[1], (ub4)sizeof(ctx[1]),
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp);

  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) &namespace, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp);


 /* Setting the subscription handle for notification on a nonpersistent
   single-consumer queue */
  subscrhp[2] = (OCISubscription *)0;
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[2], 
         (ub4) OCI_HTYPE_SUBSCRIPTION,
         (size_t) 0, (dvoid **) 0);

  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) "SCOTT.NP_SCQ1", 
                 (ub4) strlen("SCOTT.NP_SCQ1"),
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp);

  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) notifyCB, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp);

  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *)&ctx[2], (ub4)sizeof(ctx[2]),
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp);

  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) &namespace, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp);


 /* Setting the subscription handle for notification on
    a nonpersistent multi consumer queue */
 /* Waiting on user specified recipient */
  subscrhp[3] = (OCISubscription *)0;
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[3], 
         (ub4) OCI_HTYPE_SUBSCRIPTION,
         (size_t) 0, (dvoid **) 0);

  (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) "SCOTT.NP_MCQ1", 
                 (ub4) strlen("SCOTT.NP_MCQ1"),
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp);

  (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) notifyCB, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp);

 (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *)&ctx[3], (ub4)sizeof(ctx[3]),
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp);

  (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) &namespace, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp);


  printf("Registering for all the subscriptiosn \n");
  checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 4, errhp, 
                 OCI_DEFAULT));

  printf("Waiting for notifcations \n");

  /* wait for minutes for notifications */
  sleep(300);

  printf("Exiting\n");
}

Posting for Subscriber Notification


Purpose

Posts to a list of anonymous subscriptions so clients registered for the subscription get notifications.


Syntax
DBMS_AQ.POST (
  post_list  IN  SYS.AQ$_POST_INFO_LIST,
  count      IN NUMBER);

Usage Notes

Several subscriptions can be posted to at one time. Posting to a subscription involves identifying the subscription name and the payload, if wanted. It is possible for no payload to be associated with this call. This call provides a best-effort guarantee. A notification goes to registered clients at most once.

This call is primarily used for lightweight notification and is useful in the case of several system events. If an application needs more rigid guarantees, then it can use Oracle Streams AQ functionality by enqueuing to a queue.

When using OCI, you must specify a subscription handle at registration time with the namespace attribute set to OCI_SUBSCR_NAMESPACE_ANONYMOUS.

When using PL/SQL, the namespace attribute in aq$_post_info must be set to DBMS_AQ.NAMESPACE_ANONYMOUS.

Related functions: OCIAQListen(), OCISvcCtxToLda(), OCISubscriptionEnable(), OCISubscriptionRegister(), OCISubscriptionUnRegister(), dbms_aq.register, dbms_aq.unregister.


Examples

Example 10-19 PL/SQL: Post of Object-Type Messages

-- Register for notification
DECLARE
  reginfo             sys.aq$_reg_info;
  reginfolist         sys.aq$_reg_info_list;

BEGIN
  -- Register for anonymous subscription PUBSUB1.ANONSTR, consumer_name ADMIN
  -- The PL/SQL callback pubsub1.mycallbk is invoked 
  -- when a notification is received
  reginfo := sys.aq$_reg_info('PUBSUB1.ANONSTR:ADMIN',
                              DBMS_AQ.NAMESPACE_ANONYMOUS,
      'plsql://PUBSUB1.mycallbk', HEXTORAW('FF'));

  reginfolist := sys.aq$_reg_info_list(reginfo);

  sys.dbms_aq.register(reginfolist, 1);

  commit;
END;
/

-- Post to an anonymous subscription
DECLARE

  postinfo            sys.aq$_post_info;
  postinfolist        sys.aq$_post_info_list;

BEGIN

  -- Post to the anonymous subscription PUBSUB1.ANONSTR, consumer_name ADMIN
  postinfo := sys.aq$_post_info('PUBSUB1.ANONSTR:ADMIN',0,HEXTORAW('FF'));
  postinfolist := sys.aq$_post_info_list(postinfo);

  sys.dbms_aq.post(postinfolist, 1);

  commit;

END;
/

Adding an Agent to the LDAP Server


Purpose

Adds an agent to the Lightweight Directory Access Protocol (LDAP) server.


Syntax
DBMS_AQ.BIND_AGENT(
   agent        IN SYS.AQ$_AGENT,
   certificate  IN VARCHAR2 default NULL);

Usage Notes

This call takes an agent and an optional certificate location as the arguments, and adds the agent entry to the LDAP server. The certificate location parameter is the distinguished name of the LDAP entry that contains the digital certificate which the agent uses. If the agent does not have a digital certificate, then this parameter is defaulted to null.

Removing an Agent from the LDAP Server


Purpose

Removes an agent from the LDAP server.


Syntax
DBMS_AQ.UNBIND_AGENT(
   agent    IN SYS.AQ$_AGENT);

Usage Notes

This call takes an agent as the argument, and removes the corresponding agent entry in the LDAP server.