Skip Headers

Oracle® Call Interface Programmer's Guide
10g Release 1 (10.1)

Part Number B10779-01
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Index
Index
Go to Master Index
Master Index
Go to Feedback page
Feedback

Go to previous page
Previous
Go to current chapter
Up
Go to next page
Next
View PDF

Streams Advanced Queuing and Publish-Subscribe Functions

This section describes the Streams Advanced Queuing and publish-subscribe functions.

Table 16-7 Advanced Queuing and Publish-Subscribe Functions  
Function Purpose

OCIAQDeq()

Advanced Queuing dequeue

OCIAQDeqArray()

Dequeues an array of messages

OCIAQEnq()

Advanced Queuing enqueue

OCIAQEnqArray()

Enqueues an array of messages

OCIAQListen()

Listens on one or more queues on behalf of a list of agents

OCISubscriptionEnable()

Enables notifications on a subscription

OCISubscriptionPost()

Posts to a subscription to receive notifications

OCISubscriptionRegister()

Registers a subscription

OCISubscriptionUnRegister()

Unregisters a subscription

OCIAQDeq()

Purpose

This call is used for an Streams Advanced Queuing dequeue operation using the OCI.

Syntax

sword OCIAQDeq ( OCISvcCtx           *svch,
                 OCIError            *errh,
                 text                *queue_name,
                 OCIAQDeqOptions     *dequeue_options,
                 OCIAQMsgProperties  *message_properties,
                 OCIType             *payload_tdo,
                 dvoid               **payload,
                 dvoid               **payload_ind,
                 OCIRaw              **msgid,
                 ub4                 flags );

Parameters

svch (IN)

OCI service context.

errh (IN)

An error handle you can pass to OCIErrorGet() for diagnostic information in the event of an error.

queue_name (IN)

The target queue for the dequeue operation.

dequeue_options (IN)

The options for the dequeue operation; stored in an OCIAQDeqOptions descriptor.

message_properties (OUT)

The message properties for the message; stored in an OCIAQMsgProperties descriptor.

payload_tdo (IN)

The TDO (type descriptor object) of an object type. For a raw queue, this parameter should point to the TDO of SYS.RAW.

payload (IN/OUT)

A pointer to a pointer to a program variable buffer that is an instance of an object type. For a raw queue, this parameter should point to an instance of OCIRaw.

Memory for the payload is dynamically allocated in the object cache. The application can optionally call OCIObjectFree() to deallocate the payload instance when it is no longer needed. If the pointer to the program variable buffer (*payload) is passed as NULL, the buffer is implicitly allocated in the cache.

The application may choose to pass NULL for payload the first time OCIAQDeq() is called, and let the OCI allocate the memory for the payload. It can then use a pointer to that previously allocated memory in subsequent calls to OCIAQDeq().

To obtain a TDO for the payload, use OCITypeByName(), or OCITypeByRef().

The OCI provides functions which allow the user to set attributes of the payload, such as its text. For information about setting these attributes, refer to "Manipulating Object Attributes".

payload_ind (IN/OUT)

A pointer to a pointer to the program variable buffer containing the parallel indicator structure for the object type.

The memory allocation rules for payload_ind are the same as those for payload,.

msgid (OUT)

The message ID.

flags (IN)

Not currently used; pass as OCI_DEFAULT.

Comments

Users must have the AQ_USER_ROLE or privileges to execute the DBMS_AQ package in order to use this call. The OCI environment must be initialized in object mode (using OCIInitialize()) to use this call.

See Also:

Examples

For code examples, refer to the description of OCIAQEnq().

Related Functions

OCIAQEnq(), OCIAQListen(), OCIInitialize()

OCIAQDeqArray()

Purpose

This call dequeues an array of messages from a queue. The array of messages is all dequeued with the same option and has the same queue table payload column TDO.

Syntax

sword OCIAQDeqArray ( OCISvcCtx           *svchp,
                      OCIError            *errhp,
                      OraText             *queue_name,
                      OCIAQDeqOptions     *deqopt,
                      ub4                 *iters,
                      OCIAQMsgProperties  **msgprop,
                      OCIType             *payload_tdo,
                      dvoid               **payload,
                      dvoid               **payload_ind,
                      OCIRaw              **msgid,
                      dvoid               *ctxp,
                      OCICallbackAQDeq    (cbfp) 
                                          (
                                          dvoid              *ctxp,
                                          dvoid              **payload,
                                          dvoid              **payload_ind
                                          ),
                      ub4                 flags );

Parameters

svchp (IN)

OCI service context (unchanged from OCIAQDeq()).

errhp (IN)

An error handle you can pass to OCIErrorGet() for diagnostic information in the event of an error (unchanged from OCIAQDeq()).

queue_name (IN)

The name of the queue from which messages are dequeued (unchanged from OCIAQDeq()).

deqopt (IN)

A pointer to an OCIAQDeqOptions descriptor (unchanged from OCIAQDeq()).

iters (IN/OUT)

On input, the number of messages to dequeue. On output, the number of messages successfully dequeued.

msgprop (IN)

An array of pointers to OCIAQMsgProperties descriptors.

payload_tdo (OUT)

A pointer to the TDO of the queue table's payload column.

payload (OUT)

An array of pointers to dequeued messages.

payload_ind (OUT)

An array of pointers to indicators.

msgid (OUT)

An array of pointers to the message ID of the dequeued messages.

ctxp (IN)

The context that will be passed to the callback function.

cbfp (IN)

The callback that may be registered to provide a buffer pointer into which the dequeued message will be placed. If NULL, then messages will be dequeued into buffers pointed to by payload.

flags (IN)

Not currently used; pass as OCI_DEFAULT.

Comments

Users must have the AQ_USER_ROLE or privileges to execute the DBMS_AQ package in order to use this call. The OCI environment must be initialized in object mode (using OCIInitialize()) to use this call.

A non-zero wait time, as specified in the OCIAQDeqOptions, is recognized only when there are no messages in the queue. If the queue contains messages that are eligible for dequeue, then the OCIAQDeqArray() function will dequeue up to iters messages and return immediately.

This function is not supported in non-blocking mode.

See Also:

Related Functions

OCIAQDeq(), OCIAQEnqArray(), OCIAQListen(), OCIInitialize()

OCIAQEnq()

Purpose

This call is used for an Streams Advanced Queuing enqueue.

Syntax

sword OCIAQEnq ( OCISvcCtx           *svch,
                 OCIError            *errh,
                 text                *queue_name,
                 OCIAQEnqOptions     *enqueue_options,
                 OCIAQMsgProperties  *message_properties,
                 OCIType             *payload_tdo,
                 dvoid               **payload,
                 dvoid               **payload_ind,
                 OCIRaw              **msgid,
                 ub4                 flags );

Parameters

svch (IN)

OCI service context.

errh (IN)

An error handle you can pass to OCIErrorGet() for diagnostic information in the event of an error.

queue_name (IN)

The target queue for the enqueue operation.

enqueue_options (IN)

The options for the enqueue operation; stored in an OCIAQEnqOptions descriptor.

message_properties (IN)

The message properties for the message; stored in an OCIAQMsgProperties descriptor.

payload_tdo (IN)

The TDO (type descriptor object) of an object type. For a raw queue, this parameter should point to the TDO of SYS.RAW.

payload (IN)

A pointer to a pointer to an instance of an object type. For a raw queue, this parameter should point to an instance of OCIRaw.

The OCI provides functions which allow the user to set attributes of the payload, such as its text.

See Also:

For information about setting these attributes, refer to "Manipulating Object Attributes"

payload_ind (IN)

A pointer to a pointer to the program variable buffer containing the parallel indicator structure for the object type.

msgid (OUT)

The message ID.

flags (IN)

Not currently used; pass as OCI_DEFAULT.

Comments

Users must have the AQ_USER_ROLE or privileges to execute the DBMS_AQ package in order to use this call.

The OCI environment must be initialized in object mode (using OCIInitialize()) to use this call.

See Also:

To obtain a TDO for the payload, use OCITypeByName(), or OCITypeByRef().

Examples

The following four examples demonstrate the use of OCIAQEnq() and OCIAQDeq() in several different situations.

See Also:

These examples assume that the database is set up as illustrated in the section "Oracle Advanced Queuing By Example" in the Advanced Queuing chapter of the Oracle Streams Advanced Queuing User's Guide and Reference

Example 1 - Enqueue And Dequeue Of A Payload Object.
struct message
{
  OCIString   *subject;
  OCIString   *data;
};
typedef struct message message;

struct null_message
{
  OCIInd    null_adt;
  OCIInd    null_subject;
  OCIInd    null_data;
};
typedef struct null_message null_message;

int main( argc, argv)
int    argc;
char * argv[];
{
  OCIEnv *envhp;
  OCIServer *srvhp;
  OCIError *errhp;
  OCISvcCtx *svchp;
  dvoid     *tmp;
  OCIType *mesg_tdo = (OCIType *) 0;
  message  msg;
  null_message nmsg;
  message *mesg = &msg;
  null_message *nmesg = &nmsg;
  message *deqmesg = (message *)0;
  null_message *ndeqmesg = (null_message *)0;

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)(dvoid *,size_t)) 0,
                dvoid * (*)(dvoid *, dvoid *, size_t)) 0,  
                (void (*)(dvoid *, dvoid *)) 0 );

  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
                  52, (dvoid **) &tmp);

  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
                    52, (dvoid **) &tmp);
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
                    52, (dvoid **) &tmp);

  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
                  52, (dvoid **) &tmp);
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
                  (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);

  OCILogon(envhp, errhp, &svchp, (const text *)"AQ", (ub4) strlen("AQ"), 
          (const text *) "AQ", (ub4) strlen("AQ"), (const text *)0, 0);

  /* obtain TDO of message_type */
  OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", (ub4) strlen("AQ"),
              (CONST text *)"MESSAGE_TYPE", (ub4) strlen("MESSAGE_TYPE"),
              (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo);

  /* prepare the message payload */
  mesg->subject = (OCIString *)0;
  mesg->data = (OCIString *)0;
  OCIStringAssignText(envhp, errhp, (CONST text *)"NORMAL MESSAGE",
                 (ub4) strlen("NORMAL MESSAGE"), &mesg->subject);
  OCIStringAssignText(envhp, errhp,(CONST text *)"OCI ENQUEUE",
                 (ub4) strlen("OCI ENQUEUE"), &mesg->data);
  nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL;

  /* enqueue into the msg_queue */
  OCIAQEnq(svchp, errhp, (text *)"msg_queue", (OCIAQEnqOptions *)0,
           (OCIAQMsgProperties *)0,
           mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, (OCIRaw **)0, 0);
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* dequeue from the msg_queue */
  OCIAQDeq(svchp, errhp, (text *)"msg_queue", (OCIAQDeqOptions *)0,
           (OCIAQMsgProperties *)0,
           mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, (OCIRaw **)0, 0);
  printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
  printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  OCITransCommit(svchp, errhp, (ub4) 0);
  return 0;
}
Example 2 - Enqueue and Dequeue Using Correlation Ids.

struct message
{
  OCIString   *subject;
  OCIString   *data;
};
typedef struct message message;

struct null_message
{
  OCIInd    null_adt;
  OCIInd    null_subject;
  OCIInd    null_data;
};
typedef struct null_message null_message;

int main( argc, argv)
int    argc;
char * argv[];
{
  OCIEnv *envhp;
  OCIServer *srvhp;
  OCIError *errhp;
  OCISvcCtx *svchp;
  dvoid     *tmp;
  OCIType *mesg_tdo = (OCIType *) 0;
  message  msg;
  null_message nmsg;
  message *mesg = &msg;
  null_message *nmesg = &nmsg;
  message *deqmesg = (message *)0;
  null_message *ndeqmesg = (null_message *)0;
  OCIRaw*firstmsg = (OCIRaw *)0;
  OCIAQMsgProperties *msgprop = (OCIAQMsgProperties *)0;
  OCIAQDeqOptions *deqopt = (OCIAQDeqOptions *)0;
  text correlation1[30], correlation2[30];

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)(dvoid *,size_t)) 0,
  (dvoid * (*)(dvoid *, dvoid *, size_t)) 0,  (void (*)(dvoid *, dvoid *)) 0 );

  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
                 52, (dvoid **) &tmp);

  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
                 52, (dvoid **) &tmp);
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
                 52, (dvoid **) &tmp);

  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
                 52, (dvoid **) &tmp);
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
                 (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);

  OCILogon(envhp, errhp, &svchp, (const text *) "AQ", (ub4) strlen("AQ"), 
           (const text *) "AQ", (ub4) strlen("AQ"), (const text *) 0, 0);

  /* allocate message properties descriptor */
  OCIDescriptorAlloc(envhp, (dvoid **)&msgprop,
                     OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0);
  strcpy((char *) correlation1, "1st message");
  OCIAttrSet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *)&correlation1,
            (ub4) strlen((const char*) correlation1), OCI_ATTR_CORRELATION,
            errhp);

  /* obtain TDO of message_type */
  OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", (ub4)strlen("AQ"),
                (CONST text *)"MESSAGE_TYPE", (ub4) strlen("MESSAGE_TYPE"),
                (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo);

  /* prepare the message payload */
  mesg->subject = (OCIString *)0;
  mesg->data = (OCIString *)0;
  OCIStringAssignText(envhp, errhp, (CONST text *)"NORMAL ENQUEUE1",
                  (ub4) strlen("NORMAL ENQUEUE1"), &mesg->subject);
  OCIStringAssignText(envhp, errhp,(CONST text *)"OCI ENQUEUE",
                  (ub4) strlen("OCI ENQUEUE"), &mesg->data);
  nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL;

  /* enqueue into the msg_queue, store the message id into firstmsg */
  OCIAQEnq(svchp, errhp, (text *)"msg_queue", (OCIAQEnqOptions *)0, msgprop,
           mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, &firstmsg, 0);

  /* enqueue into the msg_queue with a different correlation id */
  strcpy((char *)correlation2, "2nd message");
  OCIAttrSet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid*)&correlation2,
             (ub4) strlen((const char *)correlation2), OCI_ATTR_CORRELATION,
             errhp);
  OCIStringAssignText(envhp, errhp, (text *)"NORMAL ENQUEUE2",
                  (ub4) strlen("NORMAL ENQUEUE2"), &mesg->subject);
  OCIAQEnq(svchp, errhp, (text *)"msg_queue", (OCIAQEnqOptions *)0, msgprop,
                  mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, (OCIRaw **)0, 0);

  OCITransCommit(svchp, errhp, (ub4) 0);

  /* first dequeue by correlation id "2nd message" */
  /* allocate dequeue options descriptor and set the correlation option */
  OCIDescriptorAlloc(envhp, (dvoid **)&deqopt,
                     OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **)0);
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)correlation2,
            (ub4) strlen((const char *)correlation2),  OCI_ATTR_CORRELATION,
            errhp);

  /* dequeue from the msg_queue */
  OCIAQDeq(svchp, errhp, (text *)"msg_queue", deqopt, (OCIAQMsgProperties *)0,
           mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, (OCIRaw **)0, 0);
  printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
  printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* second dequeue by message id */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&firstmsg,
  OCIRawSize(envhp, firstmsg), OCI_ATTR_DEQ_MSGID, errhp);
  /* clear correlation id option */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS,
             (dvoid *)correlation2, 0, OCI_ATTR_CORRELATION, errhp);

  /* dequeue from the msg_queue */
  OCIAQDeq(svchp, errhp, (text *)"msg_queue", deqopt, (OCIAQMsgProperties *)0,
           mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, (OCIRaw **)0, 0);
  printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
  printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  OCITransCommit(svchp, errhp, (ub4) 0);
  return 0;
}
Example 3 - Enqueue and Dequeue Of A Raw Queue.
int main( argc, argv)
int    argc;
char * argv[];
{
  OCIEnv *envhp;
  OCIServer *srvhp;
  OCIError *errhp;
  OCISvcCtx *svchp;
  dvoid     *tmp;
  OCIType *mesg_tdo = (OCIType *) 0;
  char  msg_text[100];
  OCIRaw  *mesg = (OCIRaw *)0;
  OCIRaw*deqmesg = (OCIRaw *)0;
  OCIInd   ind = 0;
  dvoid  *indptr = (dvoid *)&ind;
  int i;

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)(dvoid *, size_t)) 0,
               (dvoid * (*)(dvoid *, dvoid *, size_t)) 0,  (void (*)(dvoid *,
               dvoid *)) 0 );
  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
                  52, (dvoid **) &tmp);

  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
                  52, (dvoid **) &tmp);
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
                  52, (dvoid **) &tmp);

  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
                  52, (dvoid **) &tmp);
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
                  (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);

  OCILogon(envhp, errhp, &svchp, (const text *) "AQ", (ub4) strlen("AQ"), (const
           text *) "AQ", (ub4) strlen("AQ"), (const text *) 0, 0);

  /* obtain the TDO of the RAW datatype */
  OCITypeByName(envhp, errhp, svchp, (CONST text *)"SYS", (ub4) strlen("SYS"),
                (CONST text *)"RAW", (ub4) strlen("RAW"),
                (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo);

  /* prepare the message payload */
  strcpy(msg_text, "Enqueue to a RAW queue");
  OCIRawAssignBytes(envhp, errhp, (const ub1 *)msg_text, (ub4) strlen(msg_text),
                    &mesg);

  /* enqueue the message into raw_msg_queue */
  OCIAQEnq(svchp, errhp, (text *)"raw_msg_queue", (OCIAQEnqOptions *)0,
           (OCIAQMsgProperties *) 0,
           mesg_tdo, (dvoid **)&mesg, (dvoid **)&indptr, (OCIRaw **)0, 0);
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* dequeue the same message into C variable deqmesg */
  OCIAQDeq(svchp, errhp, (text *)"raw_msg_queue", (OCIAQDeqOptions *)0,
           (OCIAQMsgProperties *)0,
            mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&indptr, (OCIRaw **)0, 0);
  for (i = 0; i < OCIRawSize(envhp, deqmesg); i++)
    printf("%c", *(OCIRawPtr(envhp, deqmesg) + i));
  OCITransCommit(svchp, errhp, (ub4) 0);
  return 0;
}
Example 4 - Enqueue and Dequeue Using OCIAQAgent.

struct message
{
  OCIString   *subject;
  OCIString   *data;
};
typedef struct message message;

struct null_message
{
  OCIInd    null_adt;
  OCIInd    null_subject;
  OCIInd    null_data;
};
typedef struct null_message null_message;

int main( argc, argv)
int    argc;
char * argv[];
{
  OCIEnv *envhp;
  OCIServer *srvhp;
  OCIError *errhp;
  OCISvcCtx *svchp;
  dvoid     *tmp;
  OCIType *mesg_tdo = (OCIType *) 0;
  message  msg;
  null_message nmsg;
  message *mesg = &msg;
  null_message *nmesg = &nmsg;
  message *deqmesg = (message *)0;
  null_message *ndeqmesg = (null_message *)0;
  OCIAQMsgProperties *msgprop = (OCIAQMsgProperties *)0;
  OCIAQAgent *agents[2];
  OCIAQDeqOptions *deqopt = (OCIAQDeqOptions *)0;
  ub4 wait = OCI_DEQ_NO_WAIT;
  ub4 navigation = OCI_DEQ_FIRST_MSG;

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)(dvoid *,size_t)) 0,
                (dvoid * (*)(dvoid *, dvoid *, size_t)) 0,  
                (void (*)(dvoid *, dvoid *)) 0 );

  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
                52, (dvoid **) &tmp);

  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
                   52, (dvoid **) &tmp);
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
                   52, (dvoid **) &tmp);

  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
                   52, (dvoid **) &tmp);

  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
              (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);

  OCILogon(envhp, errhp, &svchp, (const text *) "AQ", (ub4) strlen("AQ"), (const
           text *) "AQ", (ub4) strlen("AQ"), (const text *) 0, 0);

  /* obtain TDO of message_type */
  OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", (ub4) strlen("AQ"),
       (CONST text *)"MESSAGE_TYPE", (ub4) strlen("MESSAGE_TYPE"),
       (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo);

  /* prepare the message payload */
  mesg->subject = (OCIString *)0;
  mesg->data = (OCIString *)0;
  OCIStringAssignText(envhp, errhp,
                    (CONST text *)"MESSAGE 1", (ub4) strlen("MESSAGE 1"),
                    &mesg->subject);
  OCIStringAssignText(envhp, errhp,
                    (CONST text *)"mesg for queue subscribers",
                    (ub4) strlen("mesg for queue subscribers"), &mesg->data);
  nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL;

  /* enqueue MESSAGE 1 for subscribers to the queue for RED and GREEN */
  OCIAQEnq(svchp, errhp, (text *)"msg_queue_multiple", (OCIAQEnqOptions *)0,
           (OCIAQMsgProperties *)0,
           mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, (OCIRaw **)0, 0);

  /* enqueue MESSAGE 2 for specified recipients for RED and BLUE */
  /* prepare message payload */
  OCIStringAssignText(envhp, errhp,
                    (CONST text *)"MESSAGE 2", (ub4) strlen("MESSAGE 2"),
                    &mesg->subject);
  OCIStringAssignText(envhp, errhp,
                    (CONST text *)"mesg for two recipients",
                    (ub4) strlen("mesg for two recipients"), &mesg->data);

  /* allocate AQ message properties and agent descriptors */
  OCIDescriptorAlloc(envhp, (dvoid **)&msgprop,
                OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0);
  OCIDescriptorAlloc(envhp, (dvoid **)&agents[0],
                  OCI_DTYPE_AQAGENT, 0, (dvoid **)0);
  OCIDescriptorAlloc(envhp, (dvoid **)&agents[1],
                  OCI_DTYPE_AQAGENT, 0, (dvoid **)0);

  /* prepare the recipient list, RED and BLUE */
  OCIAttrSet(agents[0], OCI_DTYPE_AQAGENT, (dvoid *) "RED", (ub4) strlen("RED"),
             OCI_ATTR_AGENT_NAME, errhp);
  OCIAttrSet(agents[1], OCI_DTYPE_AQAGENT, (dvoid *)"BLUE", 
             (ub4) strlen("BLUE"), OCI_ATTR_AGENT_NAME, errhp);
  OCIAttrSet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *)agents, 2,
             OCI_ATTR_RECIPIENT_LIST, errhp);

  OCIAQEnq(svchp, errhp, (text *)"msg_queue_multiple", (OCIAQEnqOptions *)0,
         msgprop, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, (OCIRaw **)0, 0);
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* now dequeue the messages using different consumer names */
  /* allocate dequeue options descriptor to set the dequeue options
*/
  OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 0,
                     (dvoid **)0);

  /* set wait parameter to NO_WAIT so that the dequeue returns
immediately */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&wait, 0,
             OCI_ATTR_WAIT, errhp);

  /* set navigation to FIRST_MESSAGE so that the dequeue resets the
position */
  /* after a new consumer_name is set in the dequeue options     */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&navigation, 0,
              OCI_ATTR_NAVIGATION, errhp);

  /* dequeue from the msg_queue_multiple as consumer BLUE */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"BLUE",
             (ub4)strlen("BLUE"), OCI_ATTR_CONSUMER_NAME, errhp);
  while (OCIAQDeq(svchp, errhp, (text *)"msg_queue_multiple", deqopt,
                  (OCIAQMsgProperties *) 0,
                   mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 
                  (OCIRaw **) 0, 0) == OCI_SUCCESS)
  {
    printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
    printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  }
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* dequeue from the msg_queue_multiple as consumer RED */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"RED",
             (ub4)strlen("RED"), OCI_ATTR_CONSUMER_NAME, errhp);
  while (OCIAQDeq(svchp, errhp, (text *)"msg_queue_multiple", deqopt,
                 (OCIAQMsgProperties *)0,
                 mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 
                 (OCIRaw **)0, 0) == OCI_SUCCESS)
  {
    printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
    printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  }
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* dequeue from the msg_queue_multiple as consumer GREEN */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS,(dvoid *)"GREEN", (ub4)
             strlen("GREEN"), OCI_ATTR_CONSUMER_NAME, errhp);
  while (OCIAQDeq(svchp, errhp, (text *)"msg_queue_multiple", deqopt,
                  (OCIAQMsgProperties *)0,
                  mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 
                 (OCIRaw **)0, 0) == OCI_SUCCESS)
  {
    printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
    printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  }
  OCITransCommit(svchp, errhp, (ub4) 0);
  return 0;
}

Related Functions

OCIAQDeq(), OCIAQListen(), OCIInitialize()

OCIAQEnqArray()

Purpose

This call enqueues an array of messages to a queue. The array of messages is enqueued with the same options and has the same payload column TDO.

Syntax

sword OCIAQEnqArray ( OCISvcCtx              *svchp,
                      OCIError               *errhp,
                      OraText                *queue_name,
                      OCIAQEnqOptions        *enqopt,
                      ub4                    *iters,
                      OCIAQMsgProperties     **msgprop,
                      OCIType                *payload_tdo,
                      dvoid                  **payload,
                      dvoid                  **payload_ind,
                      OCIRaw                 **msgid,
                      dvoid                  *ctxp,
                      OCICallbackAQEnq       (cbfp)
                                             (
                                             dvoid              *ctxp,
                                             dvoid              **payload,
                                             dvoid              **payload_ind
                                             ),
                      ub4                    flags );

Parameters

svchp (IN)

The service context (unchanged from OCIAQEnq()).

errhp (IN/OUT)

The error handle (unchanged from OCIAQEnq()).

queue_name (IN)

The name of the queue in which messages are enqueued (unchanged from OCIAQEnq()).

enqopt (IN)

A pointer to an OCIAQEnqOptions descriptor (unchanged from OCIAQEnq()).

iters (IN/OUT)

On input, the number of messages to enqueue. On output, the number of messages successfully enqueued.

msgprop (IN)

An array of pointers to OCIAQMsgProperties descriptors.

payload_tdo (IN)

A pointer to the TDO of the queue table's payload column.

payload (IN)

An array of pointers to messages to be enqueued.

payload_ind (IN)

An array of pointers to indicators, or a NULL pointer if indicator variables are not used.

msgid (OUT)

An array of pointers to the message ID of the enqueued messages or a NULL pointer if no message IDs are returned.

ctxp (IN)

The context that will be passed to the registered callback function.

cbfp (IN)

A callback that may be registered to provide messages dynamically. If NULL, then all messages must be materialized prior to calling OCIAQEnqArray().

flags (IN)

Not currently used; pass as OCI_DEFAULT.

Comments

This function is not supported in non-blocking mode.

Related Functions

OCIAQEnq(), OCIAQDeqArray(), OCIAQListen(), OCIInitialize()

OCIAQListen()

Purpose

Listens on one or more queues on behalf of a list of agents.

Syntax

sword OCIAQListen (OCISvcCtx      *svchp, 
                   OCIError       *errhp,
                   OCIAQAgent     **agent_list, 
                   ub4            num_agents,
                   sb4            wait, 
                   OCIAQAgent     **agent,
                   ub4            flags);

Parameters

svchpp (IN/OUT)

The service context handle.

errhp (IN/OUT)

An error handle you can pass to OCIErrorGet() for diagnostic information in the event of an error.

agent_list (IN)

List of agents for which to monitor messages.

num_agents (IN)

Number of agents in the agent list.

wait (IN)

Time-out for the listen call.

agent (OUT)

Agent for which there is a message. OCIAgent is an OCI descriptor.

flags (IN)

Not currently used; pass as OCI_DEFAULT.

Comments

This is a blocking call that returns when there is a message ready for consumption for an agent in the list. If there are no messages found when the wait time expires, an error is returned.

Related Functions

OCIAQEnq(), OCIAQDeq(), OCISvcCtxToLda(), OCISubscriptionEnable(), OCISubscriptionPost(), OCISubscriptionRegister(),OCISubscriptionUnRegister()

OCISubscriptionDisable()

Purpose

Disables a subscription registration which turns off all notifications.

Syntax

ub4 OCISubscriptionDisable ( OCISubscription   *subscrhp,
                             OCIError          *errhp
                             ub4               mode );

Parameters

subscrhp (IN)

A subscription handle with the OCI_ATTR_SUBSCR_NAME and OCI_ATTR_SUBSCR_NAMESPACE attributes set.

See Also:

For information, see Subscription Handle Attributes

errhp (OUT)

An error handle you can pass to OCIErrorGet() for diagnostic information in the event of an error.

mode (IN)

Call-specific mode. Valid values:

Comments

This call is used to temporarily turn off notifications. This is useful when the application is running a critical section of the code and should not be interrupted.

The user need not be connected or authenticated to perform this operation. A registration must have been performed to the subscription specified by the subscription handle before this call is made.

All notifications subsequent to an OCISubscriptionDisable() are discarded by the system until an OCISubscriptionEnable() is performed.

Related Functions

OCIAQListen(), OCISubscriptionEnable(), OCISubscriptionPost(), OCISubscriptionRegister(), OCISubscriptionUnRegister()

OCISubscriptionEnable()

Purpose

Enables a subscription registration that has been disabled. This turns on all notifications.

Syntax

ub4 OCISubscriptionEnable ( OCISubscription   *subscrhp,
                            OCIError          *errhp
                            ub4               mode );

Parameters

subscrhp (IN)

A subscription handle with the OCI_ATTR_SUBSCR_NAME and OCI_ATTR_SUBSCR_NAMESPACE attributes set.

See Also:

For information, see Subscription Handle Attributes

errhp (OUT)

An error handle you can pass to OCIErrorGet() for diagnostic information in the event of an error.

mode (IN)

Call-specific mode. Valid value:

Comments

This call is used to turn on notifications after a subscription registration has been disabled.

The user need not be connected or authenticated to perform this operation. A registration must have been done for the specified subscription before this call is made.

Related Functions

OCIAQListen(), OCISvcCtxToLda(), OCISubscriptionPost(), OCISubscriptionRegister(), OCISubscriptionUnRegister()

OCISubscriptionPost()

Purpose

Posts to a subscription which allows all clients who are registered for the subscription to get notifications.

Syntax

ub4 OCISubscriptionPost ( OCISvcCtx         *svchp,
                          OCISubscription   **subscrhpp,
                          ub2               count,
                          OCIError          *errhp
                          ub4               mode );

Parameters

svchp (IN)

An OCI service context (after release 7). This service context should have a valid authenticated user handle.

subscrhpp (IN)

An array of subscription handles. Each element of this array should be a subscription handle with the OCI_ATTR_SUBSCR_NAME and OCI_ATTR_SUBSCR_NAMESPACE attributes set.

See Also:

For information, see Subscription Handle Attributes

The OCI_ATTR_SUBSCR_PAYLOAD attribute has to be set for each subscription handle prior to this call. If it is not set, the payload is assumed to be NULL and no payload is delivered when the notification is received by the clients that have registered interest. Note that the caller will have to preserve the payload until the post is done as the OCIAttrSet() call keeps track of the reference to the payload but does not copy the contents.

count (IN)

The number of elements in the subscription handle array.

errhp (OUT)

An error handle you can pass to OCIErrorGet() for diagnostic information in the event of an error.

mode (IN)

Call-specific mode. Valid value:

Comments

Posting to a subscription involves identifying the subscription name and the payload if desired. If no payload is associated, the payload length can be set to 0.

This call provides a best-effort guarantee. A notification does to registered clients at most once.

This call is primarily used for light-weight notification and is useful in the case of several system events. If the application needs more rigid guarantees, it can use the Advanced Queuing functionality by enqueuing to queue.

Related Functions

OCIAQListen(), OCISvcCtxToLda(), OCISubscriptionEnable(), OCISubscriptionRegister(), OCISubscriptionUnRegister()

OCISubscriptionRegister()

Purpose

Registers a callback for message notification.

Syntax

ub4 OCISubscriptionRegister ( OCISvcCtx         *svchp,
                              OCISubscription   **subscrhpp,
                              ub2               count,
                              OCIError          *errhp
                              ub4               mode );

Parameters

svchp (IN)

An OCI service context (after release 7). This service context should have a valid authenticated user handle.

subscrhpp (IN)

An array of subscription handles. Each element of this array should be a subscription handle with all of the following attributes set:

Otherwise, an error will be returned.

One of attributes

must also be set.

See Also:

For information about the handle attributes, see Subscription Handle Attributes

When a notification is received for the registration denoted by subscrhpp[i], either the user-defined callback function (OCI_ATTR_SUBSCR_CBACK) set for subscrhpp[i] will be invoked with the context (OCI_ATTR_SUBSCR_CTX) set for subscrhpp[i], or an e-mail will be sent to (OCI_ATTR_SUBSCR_RECPT) set for subscrhpp[i], or the PL/SQL procedure (OCI_ATTR_SUBSCR_RECPT) set for subscrhpp[i], will be invoked in the database, provided the subscriber of subscrhpp[i] has the appropriate permissions on the procedure.

count (IN)

The number of elements in the subscription handle array.

errhp (OUT)

An error handle you can pass to OCIErrorGet() for diagnostic information in the event of an error.

mode (IN)

Call-specific mode. Valid values:

Whenever a new client process comes up, or an old one goes down and comes back up, it needs to register for all subscriptions of interest. If the client stays up and the server first goes down and then comes back up, the client will continue to receive notifications for registrations that are DISCONNECTED. However, the client will not receive notifications for CONNECTED registrations as they will be lost once the server goes down and comes back up.

Comments

This call is invoked for registration to a subscription which identifies the subscription name of interest and the associated callback to be invoked. Interest in several subscriptions can be registered at one time.

This interface is only valid for the asynchronous mode of message delivery. In this mode, a subscriber issues a registration call which specifies a callback. When messages are received that match the subscription criteria, the callback is invoked. The callback may then issue an explicit message_receive (dequeue) to retrieve the message.

The user must specify a subscription handle at registration time with the namespace attribute set to OCI_SUBSCR_NAMESPACE_AQ.

The subscription name is the string SCHEMA.QUEUE if the registration is for a single consumer queue and SCHEMA.QUEUE:CONSUMER_NAME if the registration is for a multi-consumer queue. The string should be in uppercase.

Each namespace will have its own privilege model. If the user performing the register is not entitled to register in the namespace for the specified subscription, an error is returned.

Related Functions

OCIAQListen(), OCISvcCtxToLda(), OCISubscriptionEnable(), OCISubscriptionPost(), OCISubscriptionUnRegister()

OCISubscriptionUnRegister()

Purpose

Unregisters a subscription which turns off notifications.

Syntax

ub4 OCISubscriptionUnRegister ( OCISvcCtx         *svchp,
                                OCISubscription   *subscrhp,
                                OCIError          *errhp
                                ub4               mode );

Parameters

svchp (IN)

An OCI service context (after release 7). This service context should have a valid authenticated user handle.

subscrhp (IN)

A subscription handle with the OCI_ATTR_SUBSCR_NAME and OCI_ATTR_SUBSCR_NAMESPACE attributes set.

See Also:

For information, see Subscription Handle Attributes

errhp (OUT)

An error handle you can pass to OCIErrorGet() for diagnostic information in the event of an error.

mode (IN)

Call-specific mode. Valid value:

Comments

Unregistering to a subscription is going to ensure that the user will not receive notifications regarding the specified subscription in future. If the user wishes to resume notification, then the only option is to re-register to the subscription.

All notifications that would otherwise have been delivered are not delivered after a subsequent register is performed because the user is no longer in the list of interested clients.

Related Functions

OCIAQListen(), OCISvcCtxToLda(), OCISubscriptionEnable(), OCISubscriptionPost(), OCISubscriptionRegister().