Skip Headers

Oracle® Streams Advanced Queuing User's Guide and Reference
Release 10.1

Part Number B10785-01
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Index
Index
Go to Master Index
Master Index
Go to Feedback page
Feedback

Go to previous page
Previous
Go to next page
Next
View PDF

7 Oracle Streams AQ Sample Application

This chapter discusses the features of Oracle Streams Advanced Queuing (AQ) in a sample application based on a hypothetical company called BooksOnLine.

This chapter contains these topics:

A Sample Application

The operations of a large bookseller, BooksOnLine, are based on an online book ordering system that automates activities across the various departments involved in the sale. The front end of the system is an order entry application used to enter new orders. Incoming orders are processed by an order processing application that validates and records the order. Shipping departments located at regional warehouses are responsible for ensuring that orders are shipped on time.

There are three regional warehouses: one serving the East Region, another serving the West Region, and a third warehouse for shipping International orders. After an order is shipped, the order information is routed to a central billing department that handles payment processing. The customer service department, located at a separate site, is responsible for maintaining order status and handling inquiries.

The features of Oracle Streams AQ are exemplified in the BooksOnLine scenario to demonstrate the possibilities of Oracle Streams AQ technology. The sample code is provided in Appendix A, " Scripts for Implementing BooksOnLine".

General Features of Oracle Streams AQ

This section contains these topics:

System-Level Access Control

Oracle Streams AQ supports system-level access control for all queuing operations, allowing an application designer or DBA to designate users as queue administrators. A queue administrator can invoke Oracle Streams AQ administrative and operational interfaces on any queue in the database. This simplifies the administrative work because all administrative scripts for the queues in a database can be managed under one schema.


PL/SQL (DBMS_AQADM Package): Scenario and Code

In the BooksOnLine application, the DBA creates BOLADM, the BooksOnLine Administrator account, as the queue administrator of the database. This allows BOLADM to create, drop, manage, and monitor queues in the database. If PL/SQL packages are needed in the BOLADM schema for applications to enqueue and dequeue, then the DBA should grant ENQUEUE_ANY and DEQUEUE_ANY system privileges to BOLADM:

Example 7-1 Creating BOLADM, the BooksOnLine Administrator Account

CREATE USER BOLADM IDENTIFIED BY BOLADM; 
GRANT CONNECT, RESOURCE, aq_administrator_role TO BOLADM; 
GRANT EXECUTE ON DBMS_AQ TO BOLADM; 
GRANT EXECUTE ON DBMS_AQADM TO BOLADM; 
EXECUTE DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('ENQUEUE_ANY','BOLADM',FALSE); 
EXECUTE DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('DEQUEUE_ANY','BOLADM',FALSE); 

If using the Java AQ API, then BOLADM must be granted EXECUTE privileges on the DBMS_AQIN package:

GRANT EXECUTE ON DBMS_AQIN to BOLADM; 

In the application, Oracle Streams AQ propagators populate messages from the Order Entry (OE) schema to:

The WS, ES, and TS schemas in turn populate messages to:

Hence the OE, WS, ES, and TS schemas all host queues that serve as the source queues for the propagators.

When messages arrive at the destination queues, sessions based on the source queue schema name are used for enqueuing the newly arrived messages into the destination queues. This means that you must grant schemas of the source queues enqueue privileges to the destination queues.

Example 7-2 Granting ENQUEUE_ANY System Privilege to All Schemas Hosting a Source Queue

To simplify administration, all schemas that host a source queue in the BooksOnLine application are granted the ENQUEUE_ANY system privilege:

EXECUTE DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('ENQUEUE_ANY','OE',FALSE); 
EXECUTE DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('ENQUEUE_ANY','WS',FALSE); 
EXECUTE DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('ENQUEUE_ANY','ES',FALSE); 
EXECUTE DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('ENQUEUE_ANY','TS',FALSE);
 

To propagate to a remote destination queue, the login user specified in the database link in the address field of the agent structure should either be granted the ENQUEUE ANY QUEUE privilege, or be granted the rights to enqueue to the destination queue. If the login user in the database link also owns the queue tables at the destination, then no explicit privilege grant is needed.


Visual Basic (OO4O): Example Code

Use the dbexecutesql interface from the database for this functionality.


Java (JDBC): Example Code

No example is provided with this release.

Queue-Level Access Control

Oracle Streams AQ supports queue-level access control for enqueue and dequeue operations. This feature allows the application designer to protect queues created in one schema from applications running in other schemas. The application designer must grant only minimal access privileges to the applications that run outside the queue schema. The supported access privileges on a queue are ENQUEUE, DEQUEUE and ALL.


Scenario

The BooksOnLine application processes customer billings in its CB (Customer Billing) and CBADM schemas. The CB schema hosts the customer billing application and the CBADM schema hosts all related billing data stored as queue tables.

To protect the billing data, the billing application and the billing data reside in different schemas. The billing application is allowed only to dequeue messages from CBADM_shippedorders_que, the shipped order queue. It processes the messages, and then enqueues new messages into CBADM_billedorders_que, the billed order queue.

To protect the queues from other unauthorized operations from the application, the following two grant calls are needed:

Example 7-3 PL/SQL (DBMS_AQADM Package): Granting Dequeue Privilege on Shipped Orders Queue to CB Application

/* Grant dequeue privilege on the shipped orders queue to the Customer 
   Billing application. The CB application retrieves orders that are shipped but 
   not billed from the shipped orders queue. */
EXECUTE DBMS_AQADM.GRANT_QUEUE_PRIVILEGE(
   'DEQUEUE','CBADM_shippedorders_que', 'CB', FALSE); 
 
/* Grant enqueue privilege on the billed orders queue to Customer Billing 
   application. The CB application is allowed to put billed orders into this 
   queue after processing the orders. */ 
 
EXECUTE DBMS_AQADM.GRANT_QUEUE_PRIVILEGE(
   'ENQUEUE', 'CBADM_billedorders_que', 'CB', FALSE); 


Visual Basic (OO4O): Example Code

Use the dbexecutesql interface from the database for this functionality.

Example 7-4 Java (JDBC): Granting Dequeue Privilege on Shipped Orders Queue to CB Application

public static void grantQueuePrivileges(Connection db_conn)
{
    AQSession  aq_sess;
    AQQueue    sh_queue;
    AQQueue    bi_queue;

    try
    {
        /* Create an AQ session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        /* Grant dequeue privilege on the shipped orders queue to the Customer 
           Billing application. The CB application retrieves orders that are 
           shipped but not billed from the shipped orders queue. */ 

        sh_queue = aq_sess.getQueue("CBADM", "CBADM_shippedorders_que");

        sh_queue.grantQueuePrivilege("DEQUEUE", "CB", false);

        /* Grant enqueue privilege on the billed orders queue to Customer 
           Billing application. The CB application is allowed to put billed 
           orders into this queue after processing the orders. */ 
 
        bi_queue = aq_sess.getQueue("CBADM", "CBADM_billedorders_que");

        bi_queue.grantQueuePrivilege("ENQUEUE", "CB", false);
    }
    catch (AQException ex)
    {
        System.out.println("AQ Exception: " + ex); 
    }
}

Message Format Transformation

You can define transformation mappings between different message payload types. Transformation mappings are defined as SQL expressions that can include PL/SQL functions (including callouts) and Java stored procedures. Only one-to-one message transformations are supported. The transformation engine is tightly integrated with Oracle Streams AQ to facilitate transformation of messages when they move through the database messaging system.

Transformation mappings can be used during enqueue, dequeue, and propagation operations. To use a transformation at:

Example 7-5 PL/SQL (DBMS_TRANSFORM Package): Creating Types for the OE Application

In the BooksOnLine application, assume that the order type is represented differently in the Order Entry (OE) and the Shipping applications. The order type and other types for the Order Entry application are created as follows:

CREATE OR REPLACE TYPE order_typ AS object (
        orderno         number,
        status          varchar2(30),
        ordertype       varchar2(30),
        orderregion     varchar2(30),
        custno          number,
        paymentmethod   varchar2(30),
        items           orderitemlist_vartyp,
        ccnumber        varchar2(20),
        order_date      date);

CREATE OR REPLACE TYPE customer_typ AS object (
        custno          number,
        custid          varchar2(20),
        name            varchar2(100),
        street          varchar2(100),
        city            varchar2(30),
        state           varchar2(2),
        zip             number,
        country         varchar2(100));

CREATE OR REPLACE TYPE book_typ AS object (
        title           varchar2(100),
        authors         varchar2(100),
        ISBN            varchar2(20),
        price           number);

CREATE OR REPLACE TYPE orderitem_typ AS object (
        quantity        number,
        item            book_typ,
        subtotal        number);

CREATE OR REPLACE TYPE orderitemlist_vartyp AS varray (20) of
orderitem_typ;

Example 7-6 Creating Types for the Shipping Application

CREATE OR REPLACE TYPE order_typ_sh AS object (
        orderno         number,
        status          varchar2(30),
        ordertype       varchar2(30),
        orderregion     varchar2(30),
        customer        customer_typ_sh,
        paymentmethod   varchar2(30),
        items           orderitemlist_vartyp,
        ccnumber        varchar2(20),
        order_date      date);

CREATE OR REPLACE TYPE customer_typ_sh AS object (
        custno          number,
        name            varchar2(100),
        street          varchar2(100),
        city            varchar2(30),
        state           varchar2(2),
        zip             number);

CREATE OR REPLACE TYPE book_typ_sh AS object (
        title           varchar2(100),
        authors         varchar2(100),
        ISBN            varchar2(20),
        price           number);

CREATE OR REPLACE TYPE orderitem_typ_sh AS object (
        quantity        number,
        item            book_typ,
        subtotal        number);

CREATE OR REPLACE TYPE orderitemlist_vartyp_sh AS varray (20) of
orderitem_typ_sh;

The Overseas Shipping application uses an XMLType attribute.

Creating Transformations

You can create transformations by creating a single PL/SQL function or by creating an expression for each target type attribute.


Creating a Single PL/SQL Function

This PL/SQL function returns an object of the target type or the constructor of the target type. This representation is preferable for simple transformations or those not easily broken down into independent transformations for each attribute.

Example 7-7 DBMS_TRANSFORM.create transformation: Creating a Single PL/SQL Function to Return Target Type

EXECUTE DBMS_TRANSFORM.CREATE_TRANSFORMATION(
           schema => 'OE', name => 'OE2WS',
           from_schema => 'OE', from_type => 'order_typ',
           to_schema => 'WS', to_type => 'order_typ_sh',
           transformation(
               'WS.order_typ_sh(source.user_data.orderno,
                                source.user_data.status,
                                source.user_data.ordertype,
                                source.user_data.orderregion,

WS.get_customer_info(source.user_data.custno),
                                source.user_data.paymentmethod,
                                source.user_data.items,
                                source.user_data.ccnumber,
                                source.user_data.order_date)');

In the BooksOnline application, assume that the Overseas Shipping site represents the order as an XMLType payload. The Order Entry site represents the order as an Oracle object, ORDER_TYP. Because the Overseas Shipping site subscribes to messages in the OE_BOOKEDORDERS_QUE queue, a transformation is applied before messages are propagated from the Order Entry site to the Overseas Shipping site.

Example 7-8 Applying a Transformation Before Messages are Propagated from the OE Site

The transformation is defined as follows:

CREATE OR REPLACE FUNCTION CONVERT_TO_ORDER_XML(input_order TYPE OE.ORDER_TYP)
RETURN XMLType AS
   new_order XMLType;
BEGIN
   select SYS_XMLGEN(input_order) into new_order from dual;
   RETURN new_order;
END CONVERT_TO_ORDER_XML;

EXECUTE DBMS_TRANSFORM.CREATE_TRANSFORMATION(
   schema =>         'TS', 
   name   =>         'OE2XML',
   from_schema =>    'OE', 
   from_type =>      'ORDER_TYP',
   to_schema =>      'SYS', 
   to_type =>        'XMLTYPE',
   transformation => 'CONVERT_TO_ORDER_XML(source.user_data)');

/*  Add a rule-based subscriber for Overseas Shipping to the Booked Orders 
queues with Transformation. Overseas Shipping handles all non-US orders: */ 
DECLARE 
 subscriber     aq$_agent; 
BEGIN 
 subscriber := aq$_agent('Overseas_Shipping','TS.TS_bookedorders_que',null);

 DBMS_AQADM.ADD_SUBSCRIBER( 
        queue_name     => 'OE.OE_bookedorders_que', 
        subscriber     => subscriber, 
        rule           => 'tab.user_data.orderregion = ''INTERNATIONAL'''
        transformation => 'TS.OE2XML'); 
END; 


Creating an Expression for Each Target Type Attribute

Create a separate expression specified for each attribute of the target type. This representation simplifies transformation mapping creation and management for individual attributes of the destination type. It is useful when the destination type has many attributes.

Example 7-9 DBMS_TRANSFORM.create_transformation: Creating an Expression for Each Target Type Attribute

/* first create the transformation without any transformation expression*/
EXECUTE DBMS_TRANSFORM.CREATE_TRANSFORMATION(
           schema => 'OE', name => 'OE2WS',
           from_schema => 'OE', from_type => 'order_typ',
           to_schema => 'WS', to_type => 'order_typ_sh');

/* specify each attribute of the target type as a function of the source type*/
EXECUTE DBMS_TRANSFORM.MODIFY_TRANSFORMATION(
           schema => 'OE', name => 'OE2WS',
           attribute_number => 1,
           transformation => 'source.user_data.orderno');

EXECUTE DBMS_TRANSFORM.MODIFY_TRANSFORMATION(
           schema => 'OE', name => 'OE2WS',
           attribute_number => 1,
           transformation => 'source.user_data.status');

EXECUTE DBMS_TRANSFORM.MODIFY_TRANSFORMATION(
           schema => 'OE', name => 'OE2WS',
           attribute_number => 1,
           transformation => 'source.user_data.ordertype');

EXECUTE DBMS_TRANSFORM.MODIFY_TRANSFORMATION(
           schema => 'OE', name => 'OE2WS',
           attribute_number => 1,
           transformation => 'source.user_data.orderregion');

EXECUTE DBMS_TRANSFORM.MODIFY_TRANSFORMATION(
           schema => 'OE', name => 'OE2WS',
           attribute_number => 1,
           transformation =>
'WS.get_customer_info(source.user_data.custno)');

EXECUTE DBMS_TRANSFORM.MODIFY_TRANSFORMATION(
           schema => 'OE', name => 'OE2WS',
           attribute_number => 1,
           transformation => 'source.user_data.payment_method');

EXECUTE DBMS_TRANSFORM.MODIFY_TRANSFORMATION(
           schema => 'OE', name => 'OE2WS',
           attribute_number => 1,
           transformation => 'source.user_data.orderitemlist_vartyp');

EXECUTE DBMS_TRANSFORM.MODIFY_TRANSFORMATION(
           schema => 'OE', name => 'OE2WS',
           attribute_number => 1,
           transformation => 'source.user_data.ccnumber');

EXECUTE DBMS_TRANSFORM.MODIFY_TRANSFORMATION(
           schema => 'OE', name => 'OE2WS',
           attribute_number => 1,
           transformation => 'source.user_data.order_date');

Visual Basic (OO4O): Example Code

No example is provided with this release.


Java (JDBC): Example Code

No example is provided with this release.

Structured Payloads

With Oracle Streams AQ, you can use object types to structure and manage the payload of messages. The object-relational capabilities of Oracle Database provide a rich set of data types that range from traditional relational data types to user-defined types.

Using strongly typed content, that is, content whose format is defined by an Oracle object type system, makes the following features available:

You can also create payloads that contain Oracle objects with XMLType attributes. These can be used for transmitting and storing messages that contain XML documents. By defining Oracle objects with XMLType attributes, you can do the following:

Example 7-10 PL/SQL (DBMS_AQADM Package): Creating Various Structured Payloads

The BooksOnLine application uses a rich set of data types to model book orders as message content.

Customers are modeled as an object type called customer_typ.

CREATE OR REPLACE TYPE customer_typ AS OBJECT ( 
        custno          NUMBER, 
        name            VARCHAR2(100), 
        street          VARCHAR2(100), 
        city            VARCHAR2(30), 
        state           VARCHAR2(2), 
        zip             NUMBER, 
        country         VARCHAR2(100)); 

Books are modeled as an object type called book_typ.

CREATE OR REPLACE TYPE book_typ AS OBJECT ( 
        title           VARCHAR2(100), 
        authors         VARCHAR2(100), 
        ISBN            NUMBER, 
        price           NUMBER); 
 

An order item that represents an order line item is modeled as an object type called orderitem_typ. An order item is a nested type that includes the book type.

CREATE OR REPLACE TYPE orderitem_typ AS OBJECT ( 
        quantity        NUMBER, 
        item            BOOK_TYP, 
        subtotal        NUMBER); 
 

An order item list is used to represent a list of order line items and is modeled as a VARRAY of order items.

CREATE OR REPLACE TYPE orderitemlist_vartyp AS VARRAY (20) OF orderitem_typ; 
 

An order is modeled as an object type called order_typ. The order type is a composite type that includes nested object types defined earlier. The order type captures details of the order, the customer information, and the item list.

CREATE OR REPLACE TYPE order_typ as object ( 
        orderno         NUMBER, 
        status          VARCHAR2(30), 
        ordertype       VARCHAR2(30), 
        orderregion     VARCHAR2(30), 
        customer        CUSTOMER_TYP, 
        paymentmethod   VARCHAR2(30), 
        items           ORDERITEMLIST_VARTYP, 
        total           NUMBER);

Some queues in the BooksOnline application model an order using an XMLType payload.


Visual Basic (OO4O): Example Code

Use the dbexecutesql interface from the database for this functionality.

Example 7-11 Java (JDBC): Generating Java Classes to Map Structured Payloads to SQL Types

After creating the types, use JPublisher to generate Java classes that map to the SQL types.

  1. Create an input file jaqbol.typ for JPublisher with the following lines:

    TYPE boladm.customer_typ AS Customer
    TYPE boladm.book_typ AS Book
    TYPE boladm.orderitem_typ AS OrderItem
    TYPE boladm.orderitemlist_vartyp AS OrderItemList
    TYPE boladm.order_typ AS Order
    
    
  2. Run JPublisher with the following arguments:

    jpub -input=jaqbol.typ -user=boladm/boladm -case=mixed -methods=false -compatible=CustomDatum 
    
    

    This creates Java classes Customer, Book, OrderItem, and OrderItemList that map to the SQL object types created earlier.

  3. Load the Java AQ driver and create a JDBC connection:

    public static Connection loadDriver(String user, String passwd) 
    {
       Connection db_conn = null;
       try 
       {
             Class.forName("oracle.jdbc.driver.OracleDriver");
    
          /* your actual hostname, port number, and SID will 
          vary from what follows. Here we use 'dlsun736,' '5521,'
          and 'test,' respectively: */
    
          db_conn =
                   DriverManager.getConnection(
                   "jdbc:oracle:thin:@dlsun736:5521:test", 
                   user, passwd);
    
          System.out.println("JDBC Connection opened "); 
          db_conn.setAutoCommit(false);
    
          /* Load the Oracle Database AQ driver: */
          Class.forName("oracle.AQ.AQOracleDriver");
    
          System.out.println("Successfully loaded AQ driver ");
       }
       catch (Exception ex)
       {
          System.out.println("Exception: " + ex); 
          ex.printStackTrace();
       }
       return db_conn;
    

Creating Queues with XMLType Payloads

You can create queues with XMLType payloads. These can be used for transmitting and storing messages that contain XML documents. By defining Oracle objects with XMLType attributes, you can do the following:

  • Store more than one type of XML document in the same queue. The documents are stored internally as CLOBs.

  • Selectively dequeue messages with XMLType attributes using the operators XMLType.existsNode(), XMLType.extract(), and so on.


    See Also:

    Oracle XML DB Developer's Guide for details on XMLType operations

  • Define transformations to convert Oracle objects to XMLType.

  • Define rule-based subscribers that query message content using XMLType methods such as XMLType.existsNode() and XMLType.extract().

Example 7-12 DBMS_AQADM: Creating a Queue Table and Queue for an XMLType Order

In the BooksOnline application, assume that the Overseas Shipping site represents the order as XMLType. The Order Entry (OE) site represents the order as an Oracle object, ORDER_TYP. The Overseas queue table and queue are created as follows:

BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
   queue_table        => 'TS_orders_pr_mqtab',
   comment            => 'Overseas Shipping MultiConsumer Orders queue table',
   multiple_consumers => TRUE,
   queue_payload_type => 'SYS.XMLTtype',
   compatible         => '8.1');
END;

BEGIN
DBMS_AQADM.CREATE_QUEUEcreate_queue (
   queue_name   => 'TS_bookedorders_que',
   queue_table  => 'TS_orders_pr_mqtab');
END;

Example 7-13 Transforming Messages Before Propagation to the Overseas Shipping Site

Because the representation of orders at the Overseas Shipping site is different from the representation of orders at the Order Entry site, a transformation is applied before messages are propagated from the Order Entry site to the Overseas Shipping site.

/*  Add a rule-based subscriber (for Overseas Shipping) to the Booked Orders 
 queues with Transformation. Overseas Shipping handles all non-US orders: */ 
DECLARE 
  subscriber     aq$_agent; 
BEGIN 
  subscriber := aq$_agent('Overseas_Shipping','TS.TS_bookedorders_que',null); 

  DBMS_AQADM.ADD_SUBSCRIBER( 
   queue_name     => 'OE.OE_bookedorders_que', 
   subscriber     => subscriber, 
   rule           => 'tab.user_data.orderregion = ''INTERNATIONAL''',
   transformation => 'TS.OE2XML'); 
END; 


See Also:

"Creating Transformations" for more details on defining transformations that convert the type used by the Order Entry application to the type used by Overseas Shipping

Example 7-14 DBMS_AQ: Dequeuing XMLType Messages to Process Orders for Canadian Customers

Assume that an application processes orders for customers in Canada. This application can dequeue messages using the following procedure:

/*  Create procedures to enqueue into single-consumer queues: */ 
create or replace procedure get_canada_orders() as 
deq_msgid                RAW(16); 
dopt                     dbms_aq.dequeue_options_t; 
mprop                    dbms_aq.message_properties_t; 
deq_order_data           XMLTtype; 
no_messages              exception; 
pragma exception_init    (no_messages, -25228); 
new_orders               BOOLEAN := TRUE; 

begin 
        dopt.wait := 1;

/* Specify dequeue condition to select Orders for Canada */ 
        dopt.deq_condition := 'tab.user_data.extract(
''/ORDER_TYP/CUSTOMER/COUNTRY/text()'').getStringVal()=''CANADA''';

            dopt.consumer_name : = 'Overseas_Shipping';

        WHILE (new_orders) LOOP 
          BEGIN 
            dbms_aq.dequeue( 
                queue_name         => 'TS.TS_bookedorders_que', 
                dequeue_options    => dopt, 
                message_properties => mprop, 
                payload            => deq_order_data, 
                msgid              => deq_msgid); 
            commit; 

            dbms_output.put_line(' Order for Canada - Order: ' ||
                                   deq_order_data.getStringVal());
 
          EXCEPTION 
            WHEN no_messages THEN 
                 dbms_output.put_line (' ---- NO MORE ORDERS  ---- '); 
                 new_orders := FALSE; 
          END; 
        END LOOP; 
end; 

Nonpersistent Queues

A message in a nonpersistent queue is not stored in a database table. You create a nonpersistent queue, which can be either a single-consumer or multiconsumer type. These queues are created in a system-created queue table (AQ$_MEM_SC for single-consumer queues and AQ$_MEM_MC for multiconsumer queues) in the schema specified by the create_np_queue command. Subscribers can be added to the multiconsumer queues. Nonpersistent queues can be destinations for propagation.

You use the enqueue interface to enqueue messages into a nonpersistent queue in the usual way. You can enqueue RAW and Oracle object type messages into a nonpersistent queue. You retrieve messages from a nonpersistent queue through the asynchronous notification mechanism, registering for the notification (using OCISubscriptionRegister or DBMS_AQADM.REGISTER) for the queues you are interested in.

When a message is enqueued into a queue, it is delivered to clients with active registrations for the queue. The messages are published to the interested clients without incurring the overhead of storing them in the database.


See Also:



Scenario

Assume that there are three application processes servicing user requests at the Order Entry system. The connection dispatcher shares out connection requests from the application processes. It attempts to maintain a count of the number of users logged on to the Order Entry system and the number of users for each application process. The application processes are named APP1, APP2, and APP3. Application process failures are not considered in this example.

Using nonpersistent queues meets the requirements in this scenario. When a user logs on to the database, the application process enqueues to the multiconsumer nonpersistent queue, LOGIN_LOGOUT, with the application name as the consumer name. The same process occurs when a user logs out. To distinguish between the two events, the correlation of the message is LOGIN for logins and LOGOUT for logouts.

The callback function counts the login and logout events for each application process.


Note:

The dispatcher process must connect to the database only for registering the subscriptions. The notifications themselves can be received while the process is disconnected from the database.

Example 7-15 PL/SQL (DBMS_AQADM). Creating Multiconsumer Nonpersistent Queues in OE Schema

CONNECT oe/oe; 
/* Create the Object Type/ADT adtmsg */
CREATE OR REPLACE TYPE adtmsg AS OBJECT (id NUMBER, data VARCHAR2(4000));

/* Create the multiconsumer nonpersistent queue in OE schema: */ 
EXECUTE DBMS_AQADM.CREATE_NP_QUEUE(queue_name         => 'LOGIN_LOGOUT', 
                                   multiple_consumers => TRUE);
 
/* Enable the queue for enqueue and dequeue: */
EXECUTE DBMS_AQADM.START_QUEUE(queue_name => 'LOGIN_LOGOUT'); 
 
/* Nonpersistent Queue Scenario - procedure to be executed upon login: */ 
CREATE OR REPLACE PROCEDURE  User_Login(app_process IN VARCHAR2)
AS 
  msgprop        dbms_aq.message_properties_t; 
  enqopt         dbms_aq.enqueue_options_t; 
  enq_msgid      RAW(16); 
  payload        RAW(1); 
BEGIN 
  /* Visibility must always be immediate for NonPersistent queues */ 
  enqopt.visibility:=dbms_aq.IMMEDIATE; 
  msgprop.correlation:= 'LOGIN'; 
  msgprop.recipient_list(0) := aq$_agent(app_process, NULL, NULL); 
  /* payload is NULL */ 
  dbms_aq.enqueue( 
        queue_name         => 'LOGIN_LOGOUT', 
        enqueue_options    => enqopt, 
        message_properties => msgprop, 
        payload            => payload, 
        msgid              => enq_msgid);
 
END; 
 
/* Nonpersistent queue scenario - procedure to be executed upon logout: */ 
CREATE OR REPLACE PROCEDURE  User_logout(app_process IN VARCHAR2) 
AS 
  msgprop        dbms_aq.message_properties_t; 
  enqopt         dbms_aq.enqueue_options_t; 
  enq_msgid      RAW(16); 
  payload        adtmsg; 
BEGIN 
  /* Visibility must always be immediate for NonPersistent queues: */ 
  enqopt.visibility:=dbms_aq.IMMEDIATE; 
  msgprop.correlation:= 'LOGOUT'; 
  msgprop.recipient_list(0) := aq$_agent(app_process, NULL, NULL); 
  /* Payload is NOT NULL: */ 
payload := adtmsg(1, 'Logging Off');

dbms_aq.enqueue( 
        queue_name         => 'LOGIN_LOGOUT', 
        enqueue_options    => enqopt, 
        message_properties => msgprop, 
        payload            => payload, 
        msgid              => enq_msgid);
 END; 
/ 
 

/* If there is a login at APP1, then enqueue a message into 'login_logout' with 
   correlation 'LOGIN': */ 
EXECUTE User_login('APP1'); 
 
/* If there is a logout at APP3, then enqueue a message into 'login_logout' with 
   correlation 'LOGOUT' and payload adtmsg(1, 'Logging Off'): */ 
EXECUTE User_logout('APP3'); 
 
 /* The OCI program which waits for notifications: */ 
#include <stdio.h> 
#include <stdlib.h> 
#include <string.h> 
#include <oci.h> 
#ifdef WIN32COMMON 
#define sleep(x)   Sleep(1000*(x)) 
#endif 
 
/* LOGIN / password:  */ 
static text *username = (text *) "OE"; 
static text *password = (text *) "OE"; 
 
/* The correlation strings of messages: */ 
static char  *login = "LOGIN"; 
static char  *logout = "LOGOUT"; 
 
/* The possible consumer names of queues: */ 
static char *applist[] = {"APP1", "APP2","APP3"}; 
 
static OCIEnv *envhp; 
static OCIServer *srvhp; 
static OCIError *errhp; 
static OCISvcCtx *svchp; 
 
static void checkerr(/*_ OCIError *errhp, sword status _*/); 
 
struct process_statistics 
{ 
  ub4  login; 
  ub4  logout; 
}; 
 
typedef struct process_statistics process_statistics; 
 
int main(/*_ int argc, char *argv[] _*/); 

/* Notify Callback: */ 
ub4 notifyCB(ctx, subscrhp, pay, payl, desc, mode) 
dvoid *ctx; 
OCISubscription *subscrhp; 
dvoid *pay; 
ub4    payl; 
dvoid *desc; 
ub4    mode; 
{ 
 text                *subname;   /* subscription name */ 
 ub4                  lsub;      /* length of subscription name */ 
 text                *queue;     /* queue name */ 
 ub4                 *lqueue;    /* queue name */ 
 text                *consumer;  /* consumer name */ 
 ub4                  lconsumer;
 text                *correlation; 
 ub4                  lcorrelation; 
 ub4                  size; 
 ub4                  appno; 
 OCIRaw              *msgid;
 OCIAQMsgProperties  *msgprop;   /* message properties descriptor */ 
 process_statistics   *user_count = (process_statistics *)ctx; 
 
 OCIAttrGet((dvoid *)subscrhp, OCI_HTYPE_SUBSCRIPTION,
           (dvoid *)&subname, &lsub, OCI_ATTR_SUBSCR_NAME, errhp); 
 
 /* Extract the attributes from the AQ descriptor: */ 
 /* Queue name: */ 
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&queue, &size,
            OCI_ATTR_QUEUE_NAME, errhp); 

 /* Consumer name: */ 
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&consumer, &lconsumer,
            OCI_ATTR_CONSUMER_NAME, errhp); 
 
 /* Message properties: */ 
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgprop, &size,
            OCI_ATTR_MSG_PROP, errhp); 
 
 /* Get correlation from message properties: */ 
  checkerr(errhp, OCIAttrGet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES,
            (dvoid *)&correlation, &lcorrelation, OCI_ATTR_CORRELATION, errhp)); 

  if (lconsumer == strlen(applist[0])) 
  { 
    if (!memcmp((dvoid *)consumer, (dvoid *)applist[0], strlen(applist[0]))) 
     appno = 0; 
    else if (!memcmp((dvoid *)consumer, (dvoid *)applist[1],strlen(applist[1]))) 
     appno = 1; 
    else if (!memcmp((dvoid *)consumer, (dvoid *)applist[2],strlen(applist[2]))) 
     appno = 2; 
    else
    { 
     printf("Wrong consumer in notification"); 
     return; 
    } 
  } 
  else 
  {  /* consumer name must be "APP1", "APP2" or "APP3"  */ 
    printf("Wrong consumer in notification");
    return; 
  } 
 
  if (lcorrelation == strlen(login) &&                   /* login event */ 
       !memcmp((dvoid *)correlation, (dvoid *)login, strlen(login))) 
  { 
     user_count[appno].login++; 
                           /* increment login count for the app process */
     printf("Login by APP%d \n", (appno+1));
     printf("Login Payload length = %d \n", pay1);
   } 
  else if  (lcorrelation == strlen(logout) &&           /* logout event */ 
       !memcmp((dvoid *)correlation,(dvoid *)logout, strlen(logout))) 
  { 
     user_count[appno].logout++;
                          /* increment logout count for the app process */ 
     printf("logout by APP%d \n", (appno+1));
     printf("logout Payload length = %d \n", pay1);
  }
  else                            /* correlation is "LOGIN" or "LOGOUT" */ 
     printf("Wrong correlation in notification");
 
  printf("Total  : \n"); 
 
  printf("App1 : %d \n", user_count[0].login-user_count[0].logout); 
  printf("App2 : %d \n", user_count[1].login-user_count[1].logout); 
  printf("App3 : %d \n", user_count[2].login-user_count[2].logout); 
 
} 
 
int main(argc, argv) 
int argc; 
char *argv[]; 
{ 
  OCISession *authp = (OCISession *) 0; 
  OCISubscription *subscrhp[3]; 
  ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ; 
  process_statistics  ctx[3] = {{0,0}, {0,0}, {0,0}}; 
  ub4 sleep_time = 0; 
 
  printf("Initializing OCI Process\n"); 
 
  /* Initialize OCI environment with OCI_EVENTS flag set: */ 
  (void) OCIInitialize((ub4) OCI_EVENTS|OCI_OBJECT, (dvoid *)0, 
                       (dvoid * (*)(dvoid *, size_t)) 0, 
                       (dvoid * (*)(dvoid *, dvoid *, size_t))0, 
                       (void (*)(dvoid *, dvoid *)) 0 ); 
 
  printf("Initialization successful\n"); 
 
  printf("Initializing OCI Env\n"); 
  (void) OCIEnvInit( (OCIEnv **) &envhp, OCI_DEFAULT, (size_t) 0, (dvoid **) 0 
); 
  printf("Initialization successful\n"); 
 
  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, 
    OCI_HTYPE_ERROR, (size_t) 0, (dvoid **) 0)); 
 
  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, 
    OCI_HTYPE_SERVER, (size_t) 0, (dvoid **) 0)); 
 
  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, 
    OCI_HTYPE_SVCCTX, (size_t) 0, (dvoid **) 0)); 
 
  printf("connecting to server\n"); 
  checkerr(errhp, OCIServerAttach( srvhp, errhp, (text *)"inst1_alias", 
           strlen("inst1_alias"), (ub4) OCI_DEFAULT)); 
  printf("connect successful\n"); 
 
  /* Set attribute server context in the service context: */ 
  checkerr(errhp, OCIAttrSet( (dvoid *) svchp, OCI_HTYPE_SVCCTX, (dvoid *)srvhp,
                  (ub4) 0, OCI_ATTR_SERVER, (OCIError *) errhp)); 
 
  checkerr(errhp, OCIHandleAlloc((dvoid *) envhp, (dvoid **)&authp, 
                  (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0)); 

  /* Set username and password in the session handle: */ 
  checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, 
                  (dvoid *) username, (ub4) strlen((char *)username), 
                  (ub4) OCI_ATTR_USERNAME, errhp)); 

  checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, 
                 (dvoid *) password, (ub4) strlen((char *)password), 
                 (ub4) OCI_ATTR_PASSWORD, errhp)); 
 
  /* Begin session: */ 
  checkerr(errhp, OCISessionBegin ( svchp,  errhp, authp, OCI_CRED_RDBMS,
                 (ub4) OCI_DEFAULT)); 
 
  (void) OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, 
                 (dvoid *) authp, (ub4) 0, 
                 (ub4) OCI_ATTR_SESSION, errhp); 
 
  /* Register for notification: */ 
   printf("allocating subscription handle\n"); 
  subscrhp[0] = (OCISubscription *)0; 
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[0],
                 (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (size_t) 0, (dvoid **) 0); 

  /* For application process APP1: */ 
  printf("setting subscription name\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) "OE.LOGIN_LOGOUT:APP1",
                 (ub4) strlen("OE.LOGIN_LOGOUT:APP1"), 
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp); 

  printf("setting subscription callback\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) notifyCB, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); 
 
 (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *)&ctx, (ub4)sizeof(ctx), 
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp); 
 
  printf("setting subscription namespace\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) &namespace, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); 
 
  printf("allocating subscription handle\n"); 
  subscrhp[1] = (OCISubscription *)0; 
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[1],
                 (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (size_t) 0, (dvoid **) 0); 

  /* For application process APP2: */ 
  printf("setting subscription name\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) "OE.LOGIN_LOGOUT:APP2",
                 (ub4) strlen("OE.LOGIN_LOGOUT:APP2"), 
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp); 

  printf("setting subscription callback\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) notifyCB, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); 
 
 (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *)&ctx, (ub4)sizeof(ctx), 
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp); 
 
  printf("setting subscription namespace\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) &namespace, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); 
 
  printf("allocating subscription handle\n"); 
  subscrhp[2] = (OCISubscription *)0; 
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[2],
                 (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (size_t) 0, (dvoid **) 0); 
 
  /* For application process APP3: */ 
  printf("setting subscription name\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) "OE.LOGIN_LOGOUT:APP3",
                 (ub4) strlen("OE.LOGIN_LOGOUT:APP3"), 
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp); 

  printf("setting subscription callback\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) notifyCB, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); 
 
 (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *)&ctx, (ub4)sizeof(ctx), 
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp); 
 
  printf("setting subscription namespace\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) &namespace, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); 
 
  printf("Registering fornotifications \n"); 
  checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 3, errhp,
                 OCI_DEFAULT)); 
 
  sleep_time = (ub4)atoi(argv[1]); 
  printf ("waiting for %d s \n", sleep_time); 
  sleep(sleep_time); 
 
  printf("Exiting"); 
  exit(0); 
} 
 
void checkerr(errhp, status) 
OCIError *errhp; 
sword status; 
{ 
  text errbuf[512]; 
  sb4 errcode = 0; 
 
  switch (status) 
  { 
  case OCI_SUCCESS: 
    break; 
  case OCI_SUCCESS_WITH_INFO: 
    (void) printf("Error - OCI_SUCCESS_WITH_INFO\n"); 
    break; 
  case OCI_NEED_DATA: 
    (void) printf("Error - OCI_NEED_DATA\n"); 
    break; 
  case OCI_NO_DATA: 
    (void) printf("Error - OCI_NODATA\n"); 
    break; 
  case OCI_ERROR: 
    (void) OCIErrorGet((dvoid *)errhp, (ub4) 1, (text *) NULL, &errcode, 
                        errbuf, (ub4) sizeof(errbuf), OCI_HTYPE_ERROR); 
    (void) printf("Error - %.*s\n", 512, errbuf); 
    break; 
  case OCI_INVALID_HANDLE: 
    (void) printf("Error - OCI_INVALID_HANDLE\n"); 
    break; 
  case OCI_STILL_EXECUTING: 
    (void) printf("Error - OCI_STILL_EXECUTE\n"); 
    break; 
  case OCI_CONTINUE: 
    (void) printf("Error - OCI_CONTINUE\n"); 
    break; 
  default: 
    break; 
  } 
} 
 
/* End of file tkaqdocn.c */ 

Visual Basic (OO4O): Example Code

This feature is not supported currently.


Java (JDBC): Example Code

This feature is not supported through the Java API.

Retention and Message History

Oracle Streams AQ allows the retention of the message history after consumption. The messages and their histories can be queried using SQL. This allows business analysis of the integrated system. In certain cases, messages must be tracked. For example, if a message is produced as a result of the consumption of another message, then the two are related. As the application designer, you may want to keep track of such relationships. Taken together, retention, message identifiers, and SQL queries make it possible to build powerful message warehouses.


Scenario

Assume that you must determine the average order processing time. This includes the time the order must wait in the back_order queue. You want to know the average wait time in the backed_order queue. SQL queries can determine the wait time for orders in the shipping application. Specify the retention as TRUE for the shipping queues and specify the order number in the correlation field of the message.

For simplicity, only orders that have already been processed are analyzed. The processing time for an order in the shipping application is the difference between the enqueue time in the WS_bookedorders_que and the enqueue time in the WS_shipped_orders_que.


PL/SQL (DBMS_AQADM Package): Example Code
SELECT  SUM(SO.enq_time - BO.enq_time) / count (*) AVG_PRCS_TIME 
   FROM WS.AQ$WS_orders_pr_mqtab BO , WS.AQ$WS_orders_mqtab SO
   WHERE SO.msg_state = 'PROCESSED' and BO.msg_state = 'PROCESSED' 
   AND SO.corr_id = BO.corr_id and SO.queue = 'WS_shippedorders_que'; 
 
/* Average waiting time in the backed order queue: */ 
SELECT SUM(BACK.deq_time - BACK.enq_time)/count (*) AVG_BACK_TIME 
   FROM WS.AQ$WS_orders_mqtab BACK
   WHERE BACK.msg_state = 'PROCESSED' AND BACK.queue = 'WS_backorders_que'; 

Visual Basic (OO4O): Example Code

Use the dbexecutesql interface from the database for this functionality.


Java (JDBC): Example Code

No example is provided with this release.

Publish/Subscribe Support

Oracle Streams AQ supports the publish/subscribe model of application integration. In the model, publishing applications put the message in the queue. The subscribing applications subscribe to the message in the queue. More publishing and subscribing applications can be dynamically added without changing the existing publishing and subscribing applications.

Oracle Streams AQ also supports content-based subscriptions. The subscriber can subscribe to a subset of messages in the queue based on the message properties and the contents of the messages. A subscriber to a queue can also be another queue or a consumer on another queue.

You can implement a publish/subscribe model of communication using Oracle Streams AQ as follows:


Scenario

The BooksOnLine application illustrates the use of a publish/subscribe model for communicating between applications. The following subsections give some examples.


Defining queues

The Order Entry application defines a queue (OE_booked_orders_que) to communicate orders that are booked to various applications. The Order Entry application is not aware of the various subscriber applications and thus, a new subscriber application can be added without disrupting any setup or logic in the Order Entry (publisher) application.


Setting Up Subscriptions

The various Shipping applications and the Customer Service application (that is, Eastern Region shipping, Western Region shipping, Overseas Shipping and Customer Service) are defined as subscribers to the booked_orders queue of the Order Entry application. Oracle Streams AQ uses rules to route messages of interest to the various subscribers. Thus, Eastern Region shipping, which handles shipment of all orders for the East Coast and all rush U.S. orders, expresses the subscription rule as follows:

rule  => 'tab.user_data.orderregion = ''EASTERN'' OR 
(tab.user_data.ordertype = ''RUSH'' AND
tab.user_data.customer.country = ''USA'') ' 
 

Each subscriber can specify a local queue where messages are to be delivered. The Eastern Region shipping application specifies a local queue (ES_booked_orders_que) for message delivery by specifying the subscriber address as follows:

subscriber := aq$_agent('East_Shipping', 'ES.ES_bookedorders_que', null); 

Setting Up Propagation

Enable propagation from each publisher application queue. To allow subscribed messages to be delivered to remote queues, the Order Entry application enables propagation by means of the following statement:

EXECUTE DBMS_AQADM.SCHEDULE_PROPAGATION(queue_name => 'OE.OE_bookedorders_que');
Publishing Messages 

Booked orders are published by the Order Entry application when it enqueues orders (into the OE_booked_order_que) that have been validated and are ready for shipping. These messages are then routed to each of the subscribing applications. Messages are delivered to local queues (if specified) at each of the subscriber applications.


Receiving Messages

Each of the shipping applications and the Customer Service application then receives these messages in their local queues. For example, Eastern Region Shipping only receives booked orders that are for East Coast addresses or any U.S. order that are marked RUSH. This application then dequeues messages and processes its orders for shipping.

Oracle Real Application Clusters Support

Real Application Clusters can be used to improve Oracle Streams AQ performance by allowing different queues to be managed by different instances. You do this by specifying different instance affinities (preferences) for the queue tables that store the queues. This allows queue operations (enqueue and dequeue) on different queues to occur in parallel.

The Oracle Streams AQ queue monitor process continuously monitors the instance affinities of the queue tables. The queue monitor assigns ownership of a queue table to the specified primary instance if it is available, failing which it assigns it to the specified secondary instance.

If the owner instance of a queue table terminates, then the queue monitor changes ownership to a suitable instance such as the secondary instance.

Oracle Streams AQ propagation is able to make use of Real Application Clusters, although it is transparent to the user. The affinities for jobs submitted on behalf of the propagation schedules are set to the same values as those of the affinities of the respective queue tables. Thus a job_queue_process associated with the owner instance of a queue table is handling the propagation from queues stored in that queue table, thereby minimizing pinging.


Scenario

In the BooksOnLine example, operations on the new_orders_queue and booked_order_queue at the order entry (OE) site can be made faster if the two queues are associated with different instances. This is accomplished by creating the queues in different queue tables and specifying different affinities for the queue tables in the create_queue_table() command.

In the example, the queue table OE_orders_sqtab stores queue new_orders_queue and the primary and secondary are instances 1 and 2 respectively. Queue table OE_orders_mqtab stores queue booked_order_queue and the primary and secondary are instances 2 and 1 respectively.

The objective is to let instances 1 and 2 manage the two queues in parallel. By default, only one instance is available, in which case the owner instances of both queue tables are set to instance 1. However, if Real Application Clusters are set up correctly and both instances 1 and 2 are available, then queue table OE_orders_sqtab is owned by instance 1 and the other queue table is owned by instance 2.

The primary and secondary instance specification of a queue table can be changed dynamically using the alter_queue_table() command as shown in the following example. Information about the primary, secondary and owner instance of a queue table can be obtained by querying the view USER_QUEUE_TABLES.


Note:

Mixed case (upper and lower case together) queue names, queue table names, and subscriber names are supported if database compatibility is 10.0, but the names must be enclosed in double quote marks. So abc.efg means the schema is ABC and the name is EFG, but "abc"."efg" means the schema is abc and the name is efg.


PL/SQL (DBMS_AQADM Package): Example Code
/* Create queue tables, queues for OE  */
CONNECT OE/OE; 
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE( \
        queue_table        => 'OE_orders_sqtab',\
        comment            => 'Order Entry Single-Consumer Orders queue table',\
        queue_payload_type => 'BOLADM.order_typ',\
        compatible         => '8.1',\
        primary_instance   => 1,\
        secondary_instance => 2);

EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE(\
        queue_table        => 'OE_orders_mqtab',\
        comment            => 'Order Entry Multi Consumer Orders queue table',\
        multiple_consumers => TRUE,\
        queue_payload_type => 'BOLADM.order_typ',\
        compatible         => '8.1',\
        primary_instance   => 2,\
        secondary_instance => 1); 

EXECUTE DBMS_AQADM.CREATE_QUEUE ( \
        queue_name         => 'OE_neworders_que',\
        queue_table        => 'OE_orders_sqtab'); 

EXECUTE DBMS_AQADM.CREATE_QUEUE ( \
        queue_name         => 'OE_bookedorders_que',\
        queue_table        => 'OE_orders_mqtab'); 

/* Check instance affinity of OE queue tables from AQ administrative view: */ 
SELECT queue_table, primary_instance, secondary_instance, owner_instance 
FROM user_queue_tables; 

/* Alter instance affinity of OE queue tables: */ 
EXECUTE DBMS_AQADM.ALTER_QUEUE_TABLE( \
        queue_table        => 'OE.OE_orders_sqtab',\
        primary_instance   => 2,\
        secondary_instance => 1); 

EXECUTE DBMS_AQADM.ALTER_QUEUE_TABLE(  \
        queue_table        => 'OE.OE_orders_mqtab', \
        primary_instance   => 1,\
        secondary_instance => 2); 

/* Check instance affinity of OE queue tables from AQ administrative view: */
SELECT queue_table, primary_instance, secondary_instance, owner_instance 
FROM user_queue_tables; 

Visual Basic (OO4O): Example Code

This feature currently not supported.


Java (JDBC): Example Code
public static void createQueueTablesAndQueues(Connection db_conn)
{
    AQSession            aq_sess;
    AQQueueTableProperty sqt_prop;
    AQQueueTableProperty mqt_prop;
    AQQueueTable         sq_table;
    AQQueueTable         mq_table;
    AQQueueProperty      q_prop;
    AQQueue              neworders_q;
    AQQueue              bookedorders_q;

    try
    {
        /* Create an AQ session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        /* Create a single-consumerorders queue table */
        sqt_prop = new AQQueueTableProperty("BOLADM.order_typ");
        sqt_prop.setComment("Order Entry Single-Consumer Orders queue table");
        sqt_prop.setCompatible("8.1");
        sqt_prop.setPrimaryInstance(1);
        sqt_prop.setSecondaryInstance(2);

        sq_table = aq_sess.createQueueTable("OE", "OE_orders_sqtab", sqt_prop);

        /* Create a multiconsumer orders queue table */
        mqt_prop = new AQQueueTableProperty("BOLADM.order_typ");
        mqt_prop.setComment("Order Entry Multiconsumer Orders queue table");
        mqt_prop.setCompatible("8.1");
        mqt_prop.setMultiConsumer(true);
        mqt_prop.setPrimaryInstance(2);
        mqt_prop.setSecondaryInstance(1);

        mq_table = aq_sess.createQueueTable("OE", "OE_orders_mqtab", mqt_prop);


        /* Create queues in these queue tables */
        q_prop = new AQQueueProperty();

        neworders_q = aq_sess.createQueue(sq_table, "OE_neworders_que", 
                                          q_prop);

        bookedorders_q = aq_sess.createQueue(mq_table, "OE_bookedorders_que", 
                                             q_prop);

    }
    catch (AQException ex)
    {
        System.out.println("AQ Exception: " + ex); 
    }
}

public static void alterInstanceAffinity(Connection db_conn)
{
    AQSession            aq_sess;
    AQQueueTableProperty sqt_prop;
    AQQueueTableProperty mqt_prop;
    AQQueueTable         sq_table;
    AQQueueTable         mq_table;
    AQQueueProperty      q_prop;

    try
    {

        /* Create an AQ session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        /* Check instance affinities */
        sq_table = aq_sess.getQueueTable("OE", "OE_orders_sqtab");

        sqt_prop = sq_table.getProperty();
        System.out.println("Current primary instance for OE_orders_sqtab: " + 
                           sqt_prop.getPrimaryInstance());

        mq_table = aq_sess.getQueueTable("OE", "OE_orders_mqtab");
        mqt_prop = mq_table.getProperty();
        System.out.println("Current primary instance for OE_orders_mqtab: " + 
                           mqt_prop.getPrimaryInstance());

        /* Alter queue table affinities */
        sq_table.alter(null, 2, 1);

        mq_table.alter(null, 1, 2);

        sqt_prop = sq_table.getProperty();
        System.out.println("Current primary instance for OE_orders_sqtab: " + 
                           sqt_prop.getPrimaryInstance());

        mq_table = aq_sess.getQueueTable("OE", "OE_orders_mqtab");
        mqt_prop = mq_table.getProperty();
        System.out.println("Current primary instance for OE_orders_mqtab: " + 
                           mqt_prop.getPrimaryInstance());

    }
    catch (AQException ex)
    {
        System.out.println("AQ Exception: " + ex); 
    }
}

Statistics Views and Oracle Streams AQ

Each instance keeps its own Oracle Streams AQ statistics information in its own System Global Area (SGA), and does not have knowledge of the statistics gathered by other instances. When a GV$AQ view is queried by an instance, all other instances funnel their Oracle Streams AQ statistics information to the instance issuing the query.


Scenario

The gv$ view can be queried at any time to see the number of messages in waiting, ready or expired state. The view also displays the average number of seconds messages have been waiting to be processed. The order processing application can use this to dynamically tune the number of order processing processes.


PL/SQL (DBMS_AQADM Package): Example Code
CONNECT oe/oe 
 
/* Count the number of messages and the average time for which the messages have 
   been waiting: */ 
SELECT READY, AVERAGE_WAIT FROM gv$aq Stats, user_queues Qs 
  WHERE Stats.qid = Qs.qid and Qs.Name = 'OE_neworders_que'; 

Visual Basic (OO4O): Example Code

Use the dbexecutesql interface from the database for this functionality.


Java (JDBC): Example Code

No example is provided with this release.

Internet Access for Oracle Streams AQ

See Chapter 17, " Internet Access to Oracle Streams AQ" for information on Internet access to Oracle Streams AQ features.

Enqueue Features

This section contains these topics:

Subscriptions and Recipient Lists

After consumption by dequeue, messages are retained for the time specified in retention_time. When retention_time expires, messages are removed by the time manager process.

After processing, the message is removed if the retention_time of the queue is 0, or retained for the specified retention time. While the message is retained the message can either be queried using SQL on the queue table view or by dequeuing using the BROWSE mode and specifying the message ID of the processed message.

Oracle Streams AQ allows a single message to be processed and consumed by more than one consumer. To use this feature, you must create multiconsumer queues and enqueue the messages into these multiconsumer queues. Oracle Streams AQ allows two methods of identifying the list of consumers for a message: subscriptions and recipient lists.


Subscriptions

You can add a subscription to a queue by using the DBMS_AQADM.ADD_SUBSCRIBER PL/SQL procedure. This lets you specify a consumer by means of the AQ$_AGENT parameter for enqueued messages. You can add more subscribers by repeatedly using the DBMS_AQADM.ADD_SUBSCRIBER procedure up to a maximum of 1024 subscribers for a multiconsumer queue.

All consumers that are added as subscribers to a multiconsumer queue must have unique values for the AQ$_AGENT parameter. This means that two subscribers cannot have the same values for the NAME, ADDRESS and PROTOCOL attributes for the AQ$_AGENT type. At least one of the three attributes must be different for two subscribers.


See Also:

"AQ Agent Type (aq$_agent)" for a formal description of this data structure

You cannot add subscriptions to single-consumer queues or exception queues. A consumer that is added as a subscriber to a queue is only able to dequeue messages that are enqueued after the DBMS_AQADM.ADD_SUBSCRIBER procedure is completed. In other words, messages that had been enqueued before this procedure is executed are not available for dequeue by this consumer.

You can remove a subscription by using the DBMS_AQADM.REMOVE_SUBSCRIBER procedure. Oracle Streams AQ automatically removes from the queue all data corresponding to the consumer identified by the AQ$_AGENT parameter. In other words, it is not an error to run the REMOVE_SUBSCRIBER procedure even when there are pending messages that are available for dequeue by the consumer. These messages are automatically made unavailable for dequeue after the REMOVE_SUBSCRIBER procedure is executed.

In a queue table that is created with the compatible parameter set to '8.1' or higher, such messages that were not dequeued by the consumer are shown as "UNDELIVERABLE" in the AQ$queue_table view. A multiconsumer queue table created without the compatible parameter, or with the compatible parameter set to '8.0', does not display the state of a message on a consumer basis, but only displays the global state of the message.


Recipient Lists

You are not required to specify subscriptions for a multiconsumer queue if the producers of messages for enqueue supply a recipient list of consumers. In some situations it can be desirable to enqueue a message that is targeted to a specific set of consumers rather than the default list of subscribers. You accomplish this by specifying a recipient list at the time of enqueuing the message.

  • In PL/SQL you specify the recipient list by adding elements to the recipient_list field of the message_properties record.

  • In OCI the recipient list is specified by using the OCISetAttr procedure to specify an array of OCI_DTYPE_AQAGENT descriptors as the recipient list (OCI_ATTR_RECIPIENT_LIST attribute) of an OCI_DTYPE_AQMSG_PROPERTIES message properties descriptor.

If a recipient list is specified during enqueue, then it overrides the subscription list. In other words, messages that have a specified recipient list are not available for dequeue by the subscribers of the queue. The consumers specified in the recipient list may or may not be subscribers for the queue. It is an error if the queue does not have any subscribers and the enqueue does not specify a recipient list.

Priority and Ordering of Messages

The message ordering dictates the order that messages are dequeued from a queue. The ordering method for a queue is specified when a queue table is created.

Priority ordering of messages is achieved by specifying priority, enqueue time as the sort order for the message. If priority ordering is chosen, then each message is assigned a priority at enqueue time by the enqueuer. At dequeue time, the messages are dequeued in the order of the priorities assigned. If two messages have the same priority, then the order in which they are dequeued is determined by the enqueue time. A first-in, first-out (FIFO) priority queue can also be created by specifying the enqueue time, priority as the sort order of the messages.


Scenario

In the BooksOnLine application, a customer can request:

  • FedEx shipping (priority 1)

  • Priority air shipping (priority 2)

  • Regular ground shipping (priority 3)

The Order Entry application uses a priority queue to store booked orders. Booked orders are propagated to the regional booked orders queues. At each region, orders in these regional booked orders queues are processed in the order of the shipping priorities.

The following calls create the priority queues for the Order Entry application.


PL/SQL (DBMS_AQADM Package): Example Code
/* Create a priority queue table for OE: */
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE( \
   queue_table         => 'OE_orders_pr_mqtab', \
   sort_list           =>'priority,enq_time', \
   comment             => 'Order Entry Priority  \
                          MultiConsumer Orders queue table',\
   multiple_consumers  => TRUE, \
   queue_payload_type  => 'BOLADM.order_typ', \
   compatible          => '8.1', \
   primary_instance    => 2, \
   secondary_instance  => 1); 
 
EXECUTE DBMS_AQADM.CREATE_QUEUE ( \
   queue_name          => 'OE_bookedorders_que', \
   queue_table         => 'OE_orders_pr_mqtab');
 
/* When an order arrives, the order entry application can use the following 
   procedure to enqueue the order into its booked orders queue. A shipping 
   priority is specified for each order: */
CREATE OR REPLACE procedure order_enq(book_title        IN VARCHAR2, 
                                      book_qty          IN NUMBER, 
                                      order_num         IN NUMBER, 
                                      shipping_priority IN NUMBER, 
                                      cust_state        IN VARCHAR2, 
                                      cust_country      IN VARCHAR2, 
                                      cust_region       IN VARCHAR2, 
                                      cust_ord_typ      IN VARCHAR2) AS 
 
OE_enq_order_data        BOLADM.order_typ; 
OE_enq_cust_data         BOLADM.customer_typ; 
OE_enq_book_data         BOLADM.book_typ; 
OE_enq_item_data         BOLADM.orderitem_typ; 
OE_enq_item_list         BOLADM.orderitemlist_vartyp; 
enqopt                   dbms_aq.enqueue_options_t; 
msgprop                  dbms_aq.message_properties_t; 
enq_msgid                RAW(16); 

BEGIN 
   msgprop.correlation := cust_ord_typ; 
   OE_enq_cust_data    := BOLADM.customer_typ(NULL, NULL, NULL, NULL, 
                                cust_state, NULL, cust_country); 
   OE_enq_book_data    := BOLADM.book_typ(book_title, NULL, NULL, NULL); 
   OE_enq_item_data    := BOLADM.orderitem_typ(book_qty,
                                OE_enq_book_data, NULL); 
   OE_enq_item_list    := BOLADM.orderitemlist_vartyp( 
                                BOLADM.orderitem_typ(book_qty,
                                OE_enq_book_data, NULL)); 
   OE_enq_order_data   := BOLADM.order_typ(order_num, NULL,
                                cust_ord_typ, cust_region, 
                                OE_enq_cust_data, NULL,
                                OE_enq_item_list, NULL); 
 
   /*Put the shipping priority into message property before enqueuing 
     the message: */
   msgprop.priority    := shipping_priority; 
   dbms_aq.enqueue('OE.OE_bookedorders_que', enqopt, msgprop,
                        OE_enq_order_data, enq_msgid); 
        COMMIT; 
  END; 
  / 
 
 
/* At each region, similar booked order queues are created. The orders are
   propagated from the central Order Entry's booked order queues to the regional
   booked order queues. For example, at the Western Region, the booked orders
   queue is created. Create a priority queue table for WS shipping: */
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE( \
   queue_table        =>  'WS_orders_pr_mqtab',
   sort_list          =>'  priority,enq_time',  \
   comment            =>  'West Shipping Priority  \
                           MultiConsumer Orders queue table',\ 
   multiple_consumers => TRUE, \
   queue_payload_type => 'BOLADM.order_typ', \
   compatible         => '8.1');
 
/* Booked orders are stored in the priority queue table: */
EXECUTE DBMS_AQADM.CREATE_QUEUE ( \
   queue_name         => 'WS_bookedorders_que', \
   queue_table        => 'WS_orders_pr_mqtab');
 
/* At each region, the shipping application dequeues orders from the regional 
   booked order queue according to the orders' shipping priorities, processes 
   the orders, and enqueues the processed orders into the shipped orders queues 
   or the backorders queues. */


Visual Basic (OO4O): Example Code
Dim OraSession as object
Dim OraDatabase as object
Dim OraAq as object
Dim OraMsg as Object
Dim OraOrder,OraCust,OraBook,OraItem,OraItemList as Object
Dim Msgid as String

   Set OraSession = CreateObject("OracleInProcServer.XOraSession")
   Set OraDatabase = OraSession.DbOpenDatabase("dbname", "user/pwd", 0&)
   set oraAq = OraDatabase.CreateAQ("OE.OE_bookedorders_que")
   Set OraMsg = OraAq.AQMsg(ORATYPE_OBJECT, "BOLADM.order_typ")
   Set OraOrder = OraDatabase.CreateOraObject("BOLADM.order_typ")
   Set OraCust = OraDatabase.CreateOraObject("BOLADM.Customer_typ")
   Set OraBook = OraDatabase.CreateOraObject("BOLADM.book_typ")
   Set OraItem = OraDatabase.CreateOraObject("BOLADM.orderitem_typ")
   Set OraItemList = OraDatabase.CreateOraObject("BOLADM.orderitemlist_vartyp")

   ' Get the values of cust_state,cust_country etc from user(form_based
   ' input) and then a cmd_click event for Enqueue
   ' will run the subroutine order_enq.
   Private Sub Order_enq()

   OraMsg.correlation = txt_correlation
   'Initialize the customer details 
        OraCust("state") = txt_cust_state
   OraCust("country") = txt_cust_country
        OraBook("title") = txt_book_title
   OraItem("quantity") = txt_book_qty
   OraItem("item") = OraBook
   OraItemList(1) = OraItem
   OraOrder("orderno") = txt_order_num
   OraOrder("ordertype") = txt_cust_order_typ
   OraOrder("orderregion") = cust_region
   OraOrder("customer") = OraCust
   OraOrder("items") = OraItemList

   'Put the shipping priority into message property before enqueuing 
   '  the message: 
   OraMsg.priority = priority
   OraMsg = OraOrder
   Msgid = OraAq.enqueue

   'Release all allocations 
   End Sub

Java (JDBC): Example Code
public static void createPriorityQueueTable(Connection db_conn)
{
    AQSession            aq_sess;
    AQQueueTableProperty mqt_prop;
    AQQueueTable         pr_mq_table;
    AQQueueProperty      q_prop;
    AQQueue              bookedorders_q;

    try
    {

        /* Create an AQ session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        /* Create a priority queue table for OE */
        mqt_prop = new AQQueueTableProperty("BOLADM.order_typ");
        mqt_prop.setComment("Order Entry Priority " + 
                            "MultiConsumer Orders queue table");
        mqt_prop.setCompatible("8.1");
        mqt_prop.setMultiConsumer(true);

        mqt_prop.setSortOrder("priority,enq_time");

        pr_mq_table = aq_sess.createQueueTable("OE", "OE_orders_pr_mqtab", 
                                            mqt_prop);

        /* Create a queue in this queue table */
        q_prop = new AQQueueProperty();

        bookedorders_q = aq_sess.createQueue(pr_mq_table, 
                                             "OE_bookedorders_que", q_prop);

        /* Enable enqueue and dequeue on the queue */
        bookedorders_q.start(true, true);

    }
    catch (AQException ex)
    {
        System.out.println("AQ Exception: " + ex); 
    }
}

 
/* When an order arrives, the order entry application can use the following 
   procedure to enqueue the order into its booked orders queue. A shipping 
   priority is specified for each order. 
 */
public static void order_enqueue(Connection db_conn, String book_title,
                                 double book_qty, double order_num, 
                                 int ship_priority, String cust_state,
                                 String cust_country, String cust_region,
                                 String cust_order_type)
{
    AQSession         aq_sess;
    AQQueue           bookedorders_q;
    Order             enq_order;
    Customer          cust_data;
    Book              book_data;
    OrderItem         item_data;
    OrderItem[]       items;
    OrderItemList     item_list;
    AQEnqueueOption   enq_option;
    AQMessageProperty m_property;
    AQMessage         message;
    AQObjectPayload   obj_payload;
    byte[]            enq_msg_id;

    try
    {

        /* Create an AQ session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        cust_data = new Customer();
        cust_data.setCountry(cust_country);
        cust_data.setState(cust_state);

        book_data = new Book();
        book_data.setTitle(book_title);

        item_data = new OrderItem();
        item_data.setQuantity(new BigDecimal(book_qty));
        item_data.setItem(book_data);

        items = new OrderItem[1];
        items[0] = item_data;

        item_list = new OrderItemList(items);

        enq_order = new Order();
        enq_order.setCustomer(cust_data);
        enq_order.setItems(item_list);
        enq_order.setOrderno(new BigDecimal(order_num));
        enq_order.setOrdertype(cust_order_type);

        bookedorders_q = aq_sess.getQueue("OE", "OE_bookedorders_que");

        message = bookedorders_q.createMessage();

        /* Put the shipping priority into message property before enqueuing */ 
        m_property = message.getMessageProperty();

        m_property.setPriority(ship_priority);

        obj_payload = message.getObjectPayload();

        obj_payload.setPayloadData(enq_order);

        enq_option = new AQEnqueueOption();

        /* Enqueue the message */
        enq_msg_id = bookedorders_q.enqueue(enq_option, message);

        db_conn.commit();

    }
    catch (AQException aq_ex)
    {
        System.out.println("AQ Exception: " + aq_ex); 
    }
    catch (SQLException sql_ex)
    {
        System.out.println("SQL Exception: " + sql_ex); 
    }

}
 
/* At each region, similar booked order queues are created. The orders are
   propagated from the central Order Entry's booked order queues to the 
   regional booked order queues.
   For example, at the Western Region, the booked orders queue is created. 
   Create a priority queue table for WS shipping
 */
public static void createWesternShippingQueueTable(Connection db_conn)
{
    AQSession            aq_sess;
    AQQueueTableProperty mqt_prop;
    AQQueueTable         mq_table;
    AQQueueProperty      q_prop;
    AQQueue              bookedorders_q;

    try
    {

        /* Create an AQ session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);


        /* Create a priority queue table for WS: */
        mqt_prop = new AQQueueTableProperty("BOLADM.order_typ");
        mqt_prop.setComment("Western Shipping Priority " + 
                            "MultiConsumer Orders queue table");
        mqt_prop.setCompatible("8.1");
        mqt_prop.setMultiConsumer(true);
        mqt_prop.setSortOrder("priority,enq_time");

        mq_table = aq_sess.createQueueTable("WS", "WS_orders_pr_mqtab", 
                                            mqt_prop);

        /* Booked orders are stored in the priority queue table: */
        q_prop = new AQQueueProperty();

        bookedorders_q = aq_sess.createQueue(mq_table, "WS_bookedorders_que", 
                                             q_prop);

        /* Start the queue:*/
        bookedorders_q.start(true, true);

    }
    catch (AQException ex)
    {
        System.out.println("AQ Exception: " + ex); 
    }

  /* At each region, the shipping application dequeues orders from the 
     regional booked order queue according to the orders' shipping priorities,
     processes the orders, and enqueues the processed orders into the shipped 
     orders queues or the backorders queues. 
   */
}

Time Specification: Delay

Oracle Streams AQ supports delay delivery of messages by letting the enqueuer specify a delay interval on a message when enqueuing the message, that is, the time before that a message cannot be retrieved by a dequeue call. The delay interval determines when an enqueued message is marked as available to the dequeuers after the message is enqueued.

When a message is enqueued with a delay time set, the message is marked in a WAIT state. Messages in WAIT state are masked from the default dequeue calls. A background time-manager daemon wakes up periodically, scans an internal index for all WAIT state messages, and marks messages as READY if their delay time has passed. The time-manager then posts to all foreground processes that are waiting on queues for messages that have just been made available.


Scenario

In the BooksOnLine application, delay can be used to implement deferred billing. A billing application can define a queue where shipped orders that are not billed immediately can be placed in a deferred billing queue with a delay. For example, a certain class of customer accounts, such as those of corporate customers, may not be billed for 15 days. The billing application dequeues incoming shipped order messages (from the shippedorders queue) and if the order is for a corporate customer, this order is enqueued into a deferred billing queue with a delay.


PL/SQL (DBMS_AQADM Package): Example Code
/* Enqueue an order to implement deferred billing so that the order is not made 
   visible again until delay has expired: */
CREATE OR REPLACE PROCEDURE defer_billing(deferred_billing_order order_typ) 
AS 
  defer_bill_queue_name    VARCHAR2(62); 
  enqopt                   dbms_aq.enqueue_options_t; 
  msgprop                  dbms_aq.message_properties_t; 
  enq_msgid                RAW(16); 
BEGIN 
 
/* Enqueue the order into the deferred billing queue with a delay of 15 days: */ 
  defer_bill_queue_name := 'CBADM.deferbilling_que'; 
  msgprop.delay := 15*60*60*24; 
  dbms_aq.enqueue(defer_bill_queue_name, enqopt, msgprop,
                  deferred_billing_order, enq_msgid); 
END; 
/ 

Visual Basic (OO4O): Example Code
set oraAq = OraDatabase.CreateAQ("CBADM.deferbilling_que")
   Set OraMsg = OraAq.AQMsg(ORATYPE_OBJECT, "BOLADM.order_typ")
   Set OraOrder = OraDatabase.CreateOraObject("BOLADM.order_typ")

   Private Sub defer_billing

   OraMsg = OraOrder
   OraMsg.delay = 15*60*60*24
   OraMsg = OraOrder 'OraOrder contains the order details
   Msgid = OraAq.enqueue

   End Sub

Java (JDBC): Example Code
public static void defer_billing(Connection db_conn, Order deferred_order)
{
    AQSession         aq_sess;
    AQQueue           def_bill_q;
    AQEnqueueOption   enq_option;
    AQMessageProperty m_property;
    AQMessage         message;
    AQObjectPayload   obj_payload;
    byte[]            enq_msg_id;

    try
    {
        /* Create an AQ session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        def_bill_q = aq_sess.getQueue("CBADM", "deferbilling_que");

        message = def_bill_q.createMessage();

        /* Enqueue the order into the deferred billing queue with a delay 
           of 15 days */ 
        m_property = message.getMessageProperty();
        m_property.setDelay(15*60*60*24);

        obj_payload = message.getObjectPayload();
        obj_payload.setPayloadData(deferred_order);

        enq_option = new AQEnqueueOption();

        /* Enqueue the message */
        enq_msg_id = def_bill_q.enqueue(enq_option, message);

        db_conn.commit();
    }
    catch (Exception ex)
    {
        System.out.println("Exception " + ex);
    }

}

Time Specification: Expiration

Messages can be enqueued with an expiration that specifies the interval of time the message is available for dequeuing. Expiration processing requires that the queue monitor be running. The producer can also specify the time when a message expires, at which time the message is moved to an exception queue.


Scenario

In the BooksOnLine application, expiration can be used to control the amount of time that is allowed to process a back order. The shipping application places orders for books that are not available in a back order queue. If the shipping policy is that all back_order must be shipped within a week, then messages can be enqueued into the back order queue with an expiration of 1 week. In this case, any back_order that are not processed within one week are moved to the exception queue with the message state set to EXPIRED. This can be used to flag any orders that have not been shipped according to the back order shipping policy.


PL/SQL (DBMS_AQADM Package): Example Code
CONNECT BOLADM/BOLADM 
/* Re-enqueue a backorder into a backorder queue and set a delay of 7 days; 
   all backorders must be processed in 7 days or they are moved to the 
   exception queue: */ 
CREATE OR REPLACE PROCEDURE requeue_back_order(sale_region varchar2,
                                               backorder order_typ) 
AS 
  back_order_queue_name    VARCHAR2(62); 
  enqopt                   dbms_aq.enqueue_options_t; 
  msgprop                  dbms_aq.message_properties_t; 
  enq_msgid                RAW(16); 
BEGIN 
  /* Look up a backorder queue based the the region by means of a directory 
     service: */
  IF sale_region = 'WEST' THEN 
    back_order_queue_name := 'WS.WS_backorders_que';
  ELSIF sale_region = 'EAST' THEN 
    back_order_queue_name := 'ES.ES_backorders_que';
  ELSE 
    back_order_queue_name := 'TS.TS_backorders_que';
  END IF; 
 
  /* Enqueue the order with expiration set to 7 days: */ 
  msgprop.expiration := 7*60*60*24; 
  dbms_aq.enqueue(back_order_queue_name, enqopt, msgprop,
                  backorder, enq_msgid); 
END; 
/ 
 

Visual Basic (OO4O): Example Code
set oraAq1 = OraDatabase.CreateAQ("WS.WS_backorders_que")
   set oraAq2 = OraDatabase.CreateAQ("ES.ES_backorders_que")
   set oraAq3 = OraDatabase.CreateAQ("CBADM.deferbilling_que")
   Set OraMsg = OraAq.AQMsg(ORATYPE_OBJECT, "BOLADM.order_typ")
   Set OraBackOrder = OraDatabase.CreateOraObject("BOLADM.order_typ")

Private Sub Requeue_backorder
   Dim q as oraobject
   If sale_region = WEST then
      q = oraAq1
   else if sale_region = EAST then
      q = oraAq2
   else 
      q = oraAq3
   end if

   OraMsg.delay = 7*60*60*24
   OraMsg = OraBackOrder 'OraOrder contains the order details
   Msgid = q.enqueue

End Sub


Java (JDBC): Example Code
/* Re-enqueue a backorder into a backorder queue and set a delay of 7 days; 
   all backorders must be processed in 7 days or they are moved to the 
   exception queue */ 
public static void requeue_back_order(Connection db_conn, 
                                      String sale_region, Order back_order)
{
    AQSession         aq_sess;
    AQQueue           back_order_q;
    AQEnqueueOption   enq_option;
    AQMessageProperty m_property;
    AQMessage         message;
    AQObjectPayload   obj_payload;
    byte[]            enq_msg_id;

    try
    {
        /* Create an AQ session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        /* Look up a backorder queue based on the region */
        if(sale_region.equals("WEST"))
        {
            back_order_q = aq_sess.getQueue("WS", "WS_backorders_que");
        }
        else if(sale_region.equals("EAST"))
        {
            back_order_q = aq_sess.getQueue("ES", "ES_backorders_que");
        }
        else
        {
            back_order_q = aq_sess.getQueue("TS", "TS_backorders_que");
        }

        message = back_order_q.createMessage();

        m_property = message.getMessageProperty();

        /* Enqueue the order with expiration set to 7 days: */ 
        m_property.setExpiration(7*60*60*24);

        obj_payload = message.getObjectPayload();
        obj_payload.setPayloadData(back_order);

        enq_option = new AQEnqueueOption();

        /* Enqueue the message */
        enq_msg_id = back_order_q.enqueue(enq_option, message);

        db_conn.commit();
    }
    catch (Exception ex)
    {
        System.out.println("Exception :" + ex); 
    }
}

Message Grouping

Messages belonging to one queue can be grouped to form a set that can only be consumed by one user at a time. This requires that the queue be created in a queue table that is enabled for transactional message grouping. All messages belonging to a group must be created in the same transaction and all messages created in one transaction belong to the same group. With this feature, you can segment complex messages into simple messages.

For example, messages directed to a queue containing invoices can be constructed as a group of messages starting with the header message, followed by messages representing details, followed by the trailer message. Message grouping is also useful if the message payload contains complex large objects such as images and video that can be segmented into smaller objects.

The general message properties (priority, delay, expiration) for the messages in a group are determined solely by the message properties specified for the first message (head) of the group, irrespective of which properties are specified for subsequent messages in the group.

The message grouping property is preserved across propagation. However, the destination queue where messages must be propagated must also be enabled for transactional grouping. There are also some restrictions you must keep in mind if the message grouping property is to be preserved while dequeuing messages from a queue enabled for transactional grouping.


Scenario

In the BooksOnLine application, message grouping can be used to handle new orders. Each order contains a number of books ordered one by one in succession. Items ordered over the Web exhibit similar action.

In the following example, each enqueue corresponds to an individual book that is part of an order and the group/transaction represents a complete order. Only the first enqueue contains customer information. The OE_neworders_que is stored in the table OE_orders_sqtab,which has been enabled for transactional grouping. Refer to the example code for descriptions of procedures new_order_enq() and same_order_enq().


Note:

Mixed case (upper and lower case together) queue names, queue table names, and subscriber names are supported if database compatibility is 10.0, but the names must be enclosed in double quote marks. So abc.efg means the schema is ABC and the name is EFG, but "abc"."efg" means the schema is abc and the name is efg.


PL/SQL (DBMS_AQADM Package): Example Code
connect OE/OE; 
 
/* Create queue table for OE:  */
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE( \
        queue_table        => 'OE_orders_sqtab',\
        comment            => 'Order Entry Single-Consumer Orders queue table',\
        queue_payload_type => 'BOLADM.order_typ',\
        message_grouping   => DBMS_AQADM.TRANSACTIONAL, \
        compatible         => '8.1',  \
        primary_instance   => 1,\
        secondary_instance => 2); 
 
/* Create neworders queue for OE: */ 
EXECUTE DBMS_AQADM.CREATE_QUEUE ( \
        queue_name         => 'OE_neworders_que', 
        queue_table        => 'OE_orders_sqtab'); 
 
/* Login into OE account:*/
CONNECT OE/OE; 
SET serveroutput on; 
/* Enqueue some orders using message grouping into OE_neworders_que,
   First Order Group: */
EXECUTE BOLADM.new_order_enq('My First   Book', 1, 1001, 'CA'); 
EXECUTE BOLADM.same_order_enq('My Second  Book', 2); 
COMMIT; 
/ 
/* Second Order Group: */ 
EXECUTE BOLADM.new_order_enq('My Third   Book', 1, 1002, 'WA'); 
COMMIT; 
/ 
/* Third Order Group: */ 
EXECUTE BOLADM.new_order_enq('My Fourth  Book', 1, 1003, 'NV'); 
EXECUTE BOLADM.same_order_enq('My Fifth   Book', 3); 
EXECUTE BOLADM.same_order_enq('My Sixth   Book', 2); 
COMMIT; 
/ 
/* Fourth Order Group: */
EXECUTE BOLADM.new_order_enq('My Seventh Book', 1, 1004, 'MA'); 
EXECUTE BOLADM.same_order_enq('My Eighth  Book', 3); 
EXECUTE BOLADM.same_order_enq('My Ninth   Book', 2); 
COMMIT; 
/

Visual Basic (OO4O): Example Code

This functionality is currently not available.


Java (JDBC): Example Code
public static void createMsgGroupQueueTable(Connection db_conn)
{
    AQSession            aq_sess;
    AQQueueTableProperty sqt_prop;
    AQQueueTable         sq_table;
    AQQueueProperty      q_prop;
    AQQueue              neworders_q;

    try
    {

        /* Create an AQ session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        /* Create a single-consumer orders queue table */
        sqt_prop = new AQQueueTableProperty("BOLADM.order_typ");
        sqt_prop.setComment("Order Entry Single-Consumer Orders queue table");
        sqt_prop.setCompatible("8.1");
        sqt_prop.setMessageGrouping(AQQueueTableProperty.TRANSACTIONAL);

        sq_table = aq_sess.createQueueTable("OE", "OE_orders_sqtab", sqt_prop);

        /* Create new orders queue for OE */
        q_prop = new AQQueueProperty();

        neworders_q = aq_sess.createQueue(sq_table, "OE_neworders_que", 
                                          q_prop);

    }
    catch (AQException ex)
    {
        System.out.println("AQ Exception: " + ex); 
    }
}

Message Transformation During Enqueue

Continuing the scenario introduced in "Message Format Transformation", the Order Entry and Shipping applications have different representations for the order item:

  • The Order Entry application represents the order item in the form of the Oracle object type OE.order_typ.

  • The Western Region Shipping application represents the order item in the form of the Oracle object type WS.order_typ_sh.

Therefore, the queues in the OE schema are of payload type OE.orders_typ and those in the WS schema are of payload type WS.orders_typ_sh.

Message transformation can be used during enqueue. This is especially useful for verification and transformation of messages during enqueue. An application can generate a message based on its own data model. The message can be transformed to the data type of the queue before it is enqueued using transformation mapping.


Scenario

At enqueue time, assume that instead of propagating messages from the OE_booked_orders_topic, an application dequeues the order, and, if it is meant for Western Region Shipping, publishes it to the WS_booked_orders_topic.


PL/SQL (DBMS_AQ Package): Example Code

The application can use transformations at enqueue time as follows:

CREATE OR REPLACE FUNCTION
   fwd_message_to_ws_shipping(booked_order  OE.order_typ)
    RETURNS boolean AS

  enq_opt   dbms_aq.enqueue_options_t;
  msg_prp   dbms_aq.message_properties_t;
 BEGIN

  IF (booked_order.order_region = 'WESTERN' and
      booked_order.order_type != 'RUSH') THEN
    enq_opt.transformation := 'OE.OE2WS';
    msg_prp.recipient_list(0) := aq$_agent('West_shipping', null, null);

    dbms_aq.enqueue('WS.ws_bookedorders_topic',
                     enq_opt, msg_prp, booked_order);

    RETURN true;
  ELSE
     RETURN false;
  END IF;
 END;

Visual Basic (OO4O): Example Code

No example is provided with this release.


Java (JDBC): Example Code

No example is provided with this release.

Enqueue Using the Oracle Streams AQ XML Servlet

You can perform enqueue requests over the Internet using Internet Data Access Presentation (IDAP).


See Also:

Chapter 17, " Internet Access to Oracle Streams AQ" for more information on sending Oracle Streams AQ requests using IDAP


Scenario

In the BooksOnLine application, a customer can request:

  • FedEx shipping (priority 1),

  • Priority air shipping (priority 2)

  • Regular ground shipping (priority 3)

The Order Entry application uses a priority queue to store booked orders. Booked orders are propagated to the regional booked orders queues. At each region, orders in these regional booked orders queues are processed in the order of the shipping priorities.

The following calls create the priority queues for the Order Entry application.


PL/SQL (DBMS_AQADM Package): Example Code
/* Create a priority queue table for OE: */ 
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE( \
   queue_table         => 'OE_orders_pr_mqtab', \
   sort_list           =>'priority,enq_time', \ 
   comment             => 'Order Entry Priority  \
                          MultiConsumer Orders queue table',\ 
   multiple_consumers  => TRUE, \
   queue_payload_type  => 'BOLADM.order_typ', \
   compatible          => '8.1', \
   primary_instance    => 2, \
   secondary_instance  => 1); 
 
EXECUTE DBMS_AQADM.CREATE_QUEUE ( \
   queue_name          => 'OE_bookedorders_que', \
   queue_table         => 'OE_orders_pr_mqtab'); 

Assume that a customer, John, wants to send an enqueue request using Simple Object Access Protocol (SOAP). The XML message has the following format:

<?xml version="1.0"?>
   <Envelope xmlns= "http://schemas.xmlsoap.org/soap/envelope/">
      <Body>
        <AQXmlSend xmlns = "http://ns.oracle.com/AQ/schemas/access">
          <producer_options>
            <destination>OE.OE_bookedorders_que</destination>
          </producer_options>
 
          <message_set>
            <message_count>1</message_count>        
 
            <message>
              <message_number>1</message_number>
              <message_header>
                <correlation>ORDER1</correlation>
<priority>1</priority>
 <sender_id>
   <agent_name>john</agent_name>
         </sender_id>
              </message_header>

              <message_payload>

               <ORDER_TYP>
                     <ORDERNO>100</ORDERNO>
                     <STATUS>NEW</STATUS>
                     <ORDERTYPE>URGENT</ORDERTYPE>
                     <ORDERREGION>EAST</ORDERREGION>
                     <CUSTOMER>
                        <CUSTNO>1001233</CUSTNO>
                        <CUSTID>JOHN</CUSTID>
                        <NAME>JOHN DASH</NAME>
                        <STREET>100 EXPRESS STREET</STREET>
                        <CITY>REDWOOD CITY</CITY>
                        <STATE>CA</STATE>
                        <ZIP>94065</ZIP>
                        <COUNTRY>USA</COUNTRY>
                     </CUSTOMER>
                     <PAYMENTMETHOD>CREDIT</PAYMENTMETHOD>
                     <ITEMS>
                        <ITEMS_ITEM>
                           <QUANTITY>10</QUANTITY>
                           <ITEM>
                              <TITLE>Perl handbook</TITLE>
                              <AUTHORS>Randal</AUTHORS>
                              <ISBN>345620200</ISBN>
                              <PRICE>19</PRICE>
                           </ITEM>
                           <SUBTOTAL>190</SUBTOTAL>
                        </ITEMS_ITEM>
                        <ITEMS_ITEM>
                           <QUANTITY>10</QUANTITY>
                           <ITEM>
                              <TITLE>JDBC guide</TITLE>
                              <AUTHORS>Taylor</AUTHORS>
                              <ISBN>123420212</ISBN>
                              <PRICE>59</PRICE>
                           </ITEM>
                           <SUBTOTAL>590</SUBTOTAL>
                        </ITEMS_ITEM>
                     </ITEMS>
                     <CCNUMBER>NUMBER01</CCNUMBER>
                     <ORDER_DATE>08/23/2000 12:45:00</ORDER_DATE>
               </ORDER_TYP>
             </message_payload>
            </message>
          </message_set>

          <AQXmlCommit/>
        </AQXmlSend> 
      </Body>
</Envelope>

Dequeue Features

When there are multiple processes dequeuing from a single consumer queue or dequeuing for a single consumer on the multiconsumer queue, different processes skip the messages that are being worked on by a concurrent process. This allows multiple processes to work concurrently on different messages for the same consumer.

This section contains these topics:

Dequeue Methods

A message can be dequeued using one of the following dequeue methods:

  • Correlation identifier

  • Message identifier

  • Dequeue condition

  • Default dequeue

A correlation identifier is a user-defined message property (of VARCHAR2 datatype) while a message identifier is a system-assigned value (of RAW datatype). Multiple messages with the same correlation identifier can be present in a queue, while only one message with a given message identifier can be present. If there are multiple messages with the same correlation identifier, then the ordering (enqueue order) between messages may not be preserved on dequeue calls. The correlation identifier cannot be changed between successive dequeue calls without specifying the FIRST_MESSAGE navigation option.

A dequeue condition is an expression that is similar in syntax to the WHERE clause of a SQL query. Dequeue conditions are expressed in terms of the attributes that represent message properties or message content. The messages in the queue are evaluated against the conditions and a message that satisfies the given condition is returned.

A default dequeue means that the first available message for the consumer of a multiconsumer queue or the first available message in a single-consumer queue is dequeued.

Dequeuing with correlation identifier, message identifier, or dequeue condition does not preserve the message grouping property.


Scenario

In the BooksOnLine example, rush orders received by the East shipping site are processed first. This is achieved by dequeuing the message using the correlation identifier, which has been defined to contain the order type (rush/normal). For an illustration of dequeuing using a message identifier, refer to the get_northamerican_orders procedure discussed in the example under "Modes of Dequeuing ".


PL/SQL (DBMS_AQADM Package): Example Code
CONNECT boladm/boladm; 
 
/*  Create procedures to dequeue RUSH orders */ 
create or replace procedure get_rushtitles(consumer in varchar2) as 

deq_cust_data            BOLADM.customer_typ; 
deq_book_data            BOLADM.book_typ; 
deq_item_data            BOLADM.orderitem_typ; 
deq_msgid                RAW(16); 
dopt                     dbms_aq.dequeue_options_t; 
mprop                    dbms_aq.message_properties_t; 
deq_order_data           BOLADM.order_typ; 
qname                    varchar2(30); 
no_messages              exception; 
pragma exception_init    (no_messages, -25228); 
new_orders               BOOLEAN := TRUE; 

begin 

        dopt.consumer_name := consumer; 
        dopt.wait := 1; 
        dopt.correlation := 'RUSH'; 

        IF (consumer = 'West_Shipping') THEN 
                qname := 'WS.WS_bookedorders_que'; 
        ELSIF (consumer = 'East_Shipping') THEN 
                qname := 'ES.ES_bookedorders_que'; 
        ELSE 
                qname := 'TS.TS_bookedorders_que'; 
        END IF; 

        WHILE (new_orders) LOOP 
          BEGIN 
            dbms_aq.dequeue( 
                queue_name => qname, 
                dequeue_options => dopt, 
                message_properties => mprop, 
                payload => deq_order_data, 
                msgid => deq_msgid); 
            commit; 

            deq_item_data := deq_order_data.items(1); 
            deq_book_data := deq_item_data.item; 

            dbms_output.put_line(' rushorder book_title: ' ||
                                deq_book_data.title ||
                        ' quantity: ' || deq_item_data.quantity); 
          EXCEPTION 
            WHEN no_messages THEN 
                 dbms_output.put_line (' ---- NO MORE RUSH TITLES ---- '); 
                 new_orders := FALSE; 
          END; 
        END LOOP; 

end; 
/ 
 
CONNECT EXECUTE on get_rushtitles to ES; 
 
/* Dequeue the orders: */ 
CONNECT ES/ES; 
 
/*  Dequeue all rush order titles for East_Shipping: */ 
EXECUTE BOLADM.get_rushtitles('East_Shipping'); 

Visual Basic (OO4O): Example Code
set oraAq1 = OraDatabase.CreateAQ("WS.WS_backorders_que")
   set oraAq2 = OraDatabase.CreateAQ("ES.ES_backorders_que")
   set oraAq3 = OraDatabase.CreateAQ("CBADM.deferbilling_que")
   Set OraMsg = OraAq.AQMsg(ORATYPE_OBJECT, "BOLADM.order_typ")
   Set OraBackOrder = OraDatabase.CreateOraObject("BOLADM.order_typ")

Private Sub Requeue_backorder
   Dim q as oraobject
   If sale_region = WEST then
      q = oraAq1
   else if sale_region = EAST then
      q = oraAq2
   else 
      q = oraAq3
   end if

   OraMsg.delay = 7*60*60*24
   OraMsg = OraBackOrder 'OraOrder contains the order details
   Msgid = q.enqueue

End Sub

Java (JDBC): Example Code
public static void getRushTitles(Connection db_conn, String consumer)
{
    AQSession         aq_sess;
    Order             deq_order;
    byte[]            deq_msgid;
    AQDequeueOption   deq_option;
    AQMessageProperty msg_prop;
    AQQueue           bookedorders_q;
    AQMessage         message;
    AQObjectPayload   obj_payload;
    boolean           new_orders = true;

    try
    {
        /* Create an AQ session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        deq_option = new AQDequeueOption();

        deq_option.setConsumerName(consumer);
        deq_option.setWaitTime(1);
        deq_option.setCorrelation("RUSH");

        if(consumer.equals("West_Shipping"))
        {
            bookedorders_q = aq_sess.getQueue("WS", "WS_bookedorders_que");
        }
        else if(consumer.equals("East_Shipping"))
        {
            bookedorders_q = aq_sess.getQueue("ES", "ES_bookedorders_que");
        }
        else
        {
            bookedorders_q = aq_sess.getQueue("TS", "TS_bookedorders_que"); 
        }

        while(new_orders)
        {
            try
            {
              /* Dequeue the message */
              message = bookedorders_q.dequeue(deq_option, Order.getFactory());

              obj_payload = message.getObjectPayload();

              deq_order = (Order)(obj_payload.getPayloadData());

              System.out.println("Order number " + deq_order.getOrderno() + 
                                 " is a rush order");

            }
            catch (AQException aqex)
            {
              new_orders = false;
              System.out.println("No more rush titles");
              System.out.println("Exception-1: " + aqex);
            }
        }
    }
    catch (Exception ex)
    {
        System.out.println("Exception-2: " + ex);
    }

}

Multiple Recipients

A consumer can dequeue a message from a multiconsumer, usual queue by supplying the name that was used in the AQ$_AGENT type of the DBMS_AQADM.ADD_SUBSCRIBER procedure or the recipient list of the message properties.

  • In PL/SQL the consumer name is supplied using the consumer_name field of the dequeue_options_t record.

  • In OCI the consumer name is supplied using the OCISetAttr procedure to specify a text string as the OCI_ATTR_CONSUMER_NAME of an OCI_DTYPE_AQDEQ_OPTIONS descriptor.

  • In Oracle Objects for OLE (OO4O), the consumer name is supplied by setting the consumer property of the OraAQ object.

Multiple processes or operating system threads can use the same consumer_name to dequeue concurrently from a queue. In that case Oracle Streams AQ provides the first unlocked message that is at the head of the queue and is intended for the consumer. Unless the message ID of a specific message is specified during dequeue, the consumers can dequeue messages that are in the READY state.

A message is considered PROCESSED only when all intended consumers have successfully dequeued the message. A message is considered EXPIRED if one or more consumers did not dequeue the message before the EXPIRATION time. When a message has expired, it is moved to an exception queue.

The exception queue must also be a multiconsumer queue. Expired messages from multiconsumer queues cannot be dequeued by the intended recipients of the message. However, they can be dequeued in the REMOVE mode exactly once by specifying a NULL consumer name in the dequeue options. Hence, from a dequeue perspective, multiconsumer exception queues act like single-consumer queues because each expired message can be dequeued only once using a NULL consumer name. Expired messages can be dequeued only by specifying a message ID if the multiconsumer exception queue was created in a queue table with the compatible parameter set to '8.0'.

Beginning with release 8.1.6, only the queue monitor removes messages from multiconsumer queues. This allows dequeuers to complete the dequeue operation by not locking the message in the queue table. Because the queue monitor removes messages that have been processed by all consumers from multiconsumer queues approximately once every minute, users can see a delay when the messages have been completely processed and when they are physically removed from the queue.

Local and Remote Recipients

Consumers of a message in multiconsumer queues (either by virtue of being a subscriber to the queue or because the consumer was a recipient in the enqueuer's recipient list) can be local or remote.

  • A local consumer dequeues the message from the same queue into which the producer enqueued the message. Local consumers have a non-NULL NAME and NULL ADDRESS and PROTOCOL field in the AQ$_AGENT type.

  • A remote consumer dequeues from a queue that is different from the queue where the message was enqueued. As such, users must be familiar with and use the Oracle Streams AQ propagation feature to use remote consumers. Remote consumers can fall into one of three categories:

    1. The ADDRESS field refers to a queue in the same database. In this case the consumer dequeues the message from a different queue in the same database. These addresses are of the form [schema].queue_name where queue_name (optionally qualified by the schema name) is the target queue. If the schema is not specified, then the schema of the current user executing the ADD_SUBSCRIBER procedure or the enqueue is used. Use the DBMS_AQADM.SCHEDULE_PROPAGATION command with a NULL destination (which is the default) to schedule propagation to such remote consumers.

    2. The ADDRESS field refers to a queue in a different database. In this case the database must be reachable using database links and the PROTOCOL must be either NULL or 0. These addresses are of the form [schema].queue_name@dblink. If the schema is not specified, then the schema of the current user executing the ADD_SUBSCRIBER procedure or the enqueue is used. If the database link is not a fully qualified name (does not have a domain name specified), then the default domain as specified by the db_domain init.ora parameter is used. Use the DBMS_AQADM.SCHEDULE_PROPAGATION procedure with the database link as the destination to schedule the propagation. Oracle Streams AQ does not support the use of synonyms to refer to queues or database links.

    3. The ADDRESS field refers to a destination that can be reached by a third party protocol. You must refer to the documentation of the third party software to determine how to specify the ADDRESS and the PROTOCOL database link, and on how to schedule propagation.

When a consumer is remote, a message is marked as PROCESSED in the source queue immediately after the message has been propagated, even though the consumer may not have dequeued the message at the remote queue. Similarly, when a propagated message expires at the remote queue, the message is moved to the DEFAULT exception queue of the remote queue's queue table, and not to the exception queue of the local queue. As can be seen in both cases, Oracle Streams AQ does not currently propagate the exceptions to the source queue. You can use the MSGID and the ORIGINAL_MSGID columns in the queue table view (AQ$queue_table) to chain the propagated messages. When a message with message ID m1 is propagated to a remote queue, m1 is stored in the ORIGINAL_MSGID column of the remote queue.

The DELAY, EXPIRATION and PRIORITY parameters apply identically to both local and remote consumers. Oracle Streams AQ accounts for any delay in propagation by adjusting the DELAY and EXPIRATION parameters accordingly. For example, if the EXPIRATION is set to one hour, and the message is propagated after 15 minutes, then the expiration at the remote queue is set to 45 minutes.

Because the database handles message propagation, OO4O does not differentiate between remote and local recipients. The same sequence of calls/steps are required to dequeue a message for local and remote recipients.

Message Navigation in Dequeue

You have several options for selecting a message from a queue. You can select the first message. Alternatively, once you have selected a message and established its position in the queue (for example, as the fourth message), you can then retrieve the next message.

The FIRST_MESSAGE navigation option performs a SELECT on the queue. The NEXT_MESSAGE navigation option fetches from the results of the SELECT run in the FIRST_MESSAGE navigation. Thus performance is optimized because subsequent dequeues need not run the entire SELECT again.

These selections work in a slightly different way if the queue is enabled for transactional grouping.

  • If FIRST_MESSAGE is requested, then the dequeue position is reset to the beginning of the queue.

  • If NEXT_MESSAGE is requested, then the position is set to the next message of the same transaction

  • If NEXT_TRANSACTION is requested, then the position is set to the first message of the next transaction.

The transaction grouping property is negated if a dequeue is performed in one of the following ways: dequeue by specifying a correlation identifier, dequeue by specifying a message identifier, or dequeuing some of the messages of a transaction and committing.

In navigating through the queue, if the program reaches the end of the queue while using the NEXT_MESSAGE or NEXT_TRANSACTION option, and you have specified a nonzero wait time, then the navigating position is automatically changed to the beginning of the queue. If a zero wait time is specified, then you can get an exception when the end of the queue is reached.


See Also:

"Dequeue Methods "


Scenario

The following scenario in the BooksOnLine example continues the message grouping example already discussed with regard to enqueuing.

The get_orders() procedure dequeues orders from the OE_neworders_que. Recall that each transaction refers to an order and each message corresponds to an individual book in the order. The get_orders() procedure loops through the messages to dequeue the book orders. It resets the position to the beginning of the queue using the FIRST_MESSAGE option before the first dequeues. It then uses the NEXT_MESSAGE navigation option to retrieve the next book (message) of an order (transaction). If it gets an error message indicating all messages in the current group/transaction have been fetched, then it changes the navigation option to NEXT_TRANSACTION and gets the first book of the next order. It then changes the navigation option back to NEXT_MESSAGE for fetching subsequent messages in the same transaction. This is repeated until all orders (transactions) have been fetched.


PL/SQL (DBMS_AQADM Package): Example Code
CONNECT boladm/boladm; 
 
create or replace procedure get_new_orders as 
 
deq_cust_data            BOLADM.customer_typ; 
deq_book_data            BOLADM.book_typ; 
deq_item_data            BOLADM.orderitem_typ; 
deq_msgid                RAW(16); 
dopt                     dbms_aq.dequeue_options_t; 
mprop                    dbms_aq.message_properties_t; 
deq_order_data           BOLADM.order_typ; 
qname                    VARCHAR2(30); 
no_messages              exception; 
end_of_group             exception; 
pragma exception_init    (no_messages, -25228); 
pragma exception_init    (end_of_group, -25235); 
new_orders               BOOLEAN := TRUE; 

BEGIN 

        dopt.wait := 1; 
        dopt.navigation := DBMS_AQ.FIRST_MESSAGE;
        qname := 'OE.OE_neworders_que'; 
        WHILE (new_orders) LOOP 
          BEGIN 
            LOOP 
                BEGIN 
                    dbms_aq.dequeue( 
                        queue_name          => qname, 
                        dequeue_options     => dopt, 
                        message_properties  => mprop, 
                        payload             => deq_order_data, 
                        msgid               => deq_msgid); 
 
                    deq_item_data := deq_order_data.items(1); 
                    deq_book_data := deq_item_data.item; 
                    deq_cust_data := deq_order_data.customer; 

                    IF (deq_cust_data IS NOT NULL) THEN 
                      dbms_output.put_line(' **** NEXT ORDER **** ');
                      dbms_output.put_line('order_num: ' ||
                                deq_order_data.orderno); 
                      dbms_output.put_line('ship_state: ' ||
                                deq_cust_data.state); 
                    END IF; 
                    dbms_output.put_line(' ---- next book ---- ');
                    dbms_output.put_line(' book_title: ' ||
                                deq_book_data.title ||
                                ' quantity: ' || deq_item_data.quantity); 
                EXCEPTION 
                    WHEN end_of_group THEN 
                      dbms_output.put_line ('*** END OF ORDER ***'); 
                      commit; 
                      dopt.navigation := DBMS_AQ.NEXT_TRANSACTION; 
                END; 
            END LOOP; 
          EXCEPTION 
            WHEN no_messages THEN 
                 dbms_output.put_line (' ---- NO MORE NEW ORDERS ---- '); 
                 new_orders := FALSE; 
          END; 
        END LOOP; 

END;
/

CONNECT EXECUTE ON get_new_orders to OE;

/*  Dequeue the orders: */
CONNECT OE/OE;
EXECUTE BOLADM.get_new_orders;
Visual Basic (OO4O): Example Code
Dim OraSession as object
Dim OraDatabase as object
Dim OraAq as object
Dim OraMsg as Object
Dim OraOrder,OraItemList,OraItem,OraBook,OraCustomer  as Object
Dim Msgid as String

   Set OraSession = CreateObject("OracleInProcServer.XOraSession")
   Set OraDatabase = OraSession.DbOpenDatabase("", "boladm/boladm", 0&)
   set oraAq = OraDatabase.CreateAQ("OE.OE_neworders_que")
   Set OraMsg = OraAq.AQMsg(ORATYPE_OBJECT, "BOLADM.order_typ")
       OraAq.wait = 1
   OraAq.Navigation = ORAAQ_DQ_FIRST_MESSAGE

private sub get_new_orders
   Dim MsgIsDequeued as Boolean
   On Error goto ErrHandler
   MsgIsDequeued = TRUE
       msgid = q.Dequeue
        if MsgIsDequeued then
      set OraOrder = OraMsg
      OraItemList = OraOrder("items")
      OraItem = OraItemList(1)
      OraBook = OraItem("item")
      OraCustomer = OraOrder("customer")

         ' Populate the textboxes with the values
      if( OraCustomer ) then
         if OraAq.Navigation <> ORAAQ_DQ_NEXT_MESSAGE then
            MsgBox " ********* NEXT ORDER *******"
         end if
         txt_book_orderno = OraOrder("orderno")
         txt_book_shipstate = OraCustomer("state")
      End if
      OraAq.Navigation = ORAAQ_DQ_NEXT_MESSAGE
      txt_book_title = OraBook("title")
      txt_book_qty  = OraItem("quantity")
    Else
      MsgBox " ********* END OF  ORDER *******"
   End if

ErrHandler:
   'Handle error case, like no message etc
   If OraDatabase.LastServerErr = 25228 then
      OraAq.Navigation = ORAAQ_DQ_NEXT_TRANSACTION
      MsgIsDequeued = FALSE
      Resume Next
   End If
   'Process other errors
end sub

Java (JDBC): Example Code

No example is provided with this release.

Modes of Dequeuing

A dequeue request can either view a message or delete a message.

  • To view a message, you can use the browse mode or locked mode.

  • To consume a message, you can use either the remove mode or remove with no data mode.

If a message is browsed, then it remains available for further processing. Similarly if a message is locked, then it remains available for further processing after the lock is released by performing a transaction commit or rollback. After a message is consumed, using either of the remove modes, it is no longer available for dequeue requests.

You can use the REMOVE mode to read a message and delete it. The message can be retained in the queue table based on the retention properties. When the REMOVE mode is specified, DEQ_TIME, DEQ_USER_ID, and DEQ_TXN_ID (as seen in the AQ$Queue_Table_Name view) are updated for the consumer that dequeued the message.[

When a message is dequeued using REMOVE_NODATA mode, the payload of the message is not retrieved. This mode can be useful when the user has already examined the message payload, possibly by means of a previous BROWSE dequeue. In this way, you can avoid the overhead of payload retrieval that can be substantial for large payloads.

A message is retained in the queue table after it has been consumed only if a retention time is specified for a queue. Messages cannot be retained in exception queues (refer to the section on exceptions for further information). Removing a message with no data is generally used if the payload is known (from a previous browse/locked mode dequeue call), or if the message will not be used.

After a message has been browsed, there is no guarantee that the message can be dequeued again, because a dequeue call from a concurrent user might have removed the message. To prevent a viewed message from being dequeued by a concurrent user, you should view the message in the locked mode.

In general, use care while using the browse mode. The dequeue position is automatically changed to the beginning of the queue if a nonzero wait time is specified and the navigating position reaches the end of the queue. Hence repeating a dequeue call in the browse mode with the NEXT_MESSAGE navigation option and a nonzero wait time can dequeue the same message over and over again. Oracle recommends that you use a nonzero wait time for the first dequeue call on a queue in a session, and then use a zero wait time with the NEXT_MESSAGE navigation option for subsequent dequeue calls. If a dequeue call gets an "end of queue" error message, then the dequeue position can be explicitly set by the dequeue call to the beginning of the queue using the FIRST_MESSAGE navigation option, following which the messages in the queue can be browsed again.


Scenario

In the following scenario from the BooksOnLine example, international orders destined to Mexico and Canada are to be processed separately due to trade policies and carrier discounts. Hence, a message is viewed in the locked mode (so no other concurrent user removes the message) and the customer country (message payload) is checked. If the customer country is Mexico or Canada, then the message is consumed (deleted from the queue) using REMOVE_NODATA (because the payload is already known). Otherwise, the lock on the message is released by the commit call. The remove dequeue call uses the message identifier obtained from the locked mode dequeue call. The shipping_bookedorder_deq (refer to the example code for the description of this procedure) call illustrates the use of the browse mode.


PL/SQL (DBMS_AQADM Package): Example Code
CONNECT boladm/boladm; 
 
create or replace procedure get_northamerican_orders as 

deq_cust_data            BOLADM.customer_typ; 
deq_book_data            BOLADM.book_typ; 
deq_item_data            BOLADM.orderitem_typ; 
deq_msgid                RAW(16); 
dopt                     dbms_aq.dequeue_options_t; 
mprop                    dbms_aq.message_properties_t; 
deq_order_data           BOLADM.order_typ; 
deq_order_nodata         BOLADM.order_typ; 
qname                    VARCHAR2(30); 
no_messages              exception; 
pragma exception_init    (no_messages, -25228); 
new_orders               BOOLEAN := TRUE; 

begin 

        dopt.consumer_name := consumer; 
        dopt.wait := DBMS_AQ.NO_WAIT; 
        dopt.navigation := dbms_aq.FIRST_MESSAGE; 
        dopt.dequeue_mode := DBMS_AQ.LOCKED; 

        qname := 'TS.TS_bookedorders_que'; 

        WHILE (new_orders) LOOP 
          BEGIN 
            dbms_aq.dequeue( 
                queue_name => qname, 
                dequeue_options => dopt, 
                message_properties => mprop, 
                payload => deq_order_data, 
                msgid => deq_msgid); 
 
            deq_item_data := deq_order_data.items(1); 
            deq_book_data := deq_item_data.item; 
            deq_cust_data := deq_order_data.customer; 

            IF (deq_cust_data.country = 'Canada' OR
                deq_cust_data.country = 'Mexico' ) THEN 

                dopt.dequeue_mode := dbms_aq.REMOVE_NODATA; 
                dopt.msgid := deq_msgid; 
                dbms_aq.dequeue( 
                        queue_name => qname, 
                        dequeue_options => dopt, 
                        message_properties => mprop, 
                        payload => deq_order_nodata, 
                        msgid => deq_msgid); 
                commit; 

                dbms_output.put_line(' **** next booked order **** ');
                dbms_output.put_line('order_no: ' || deq_order_data.orderno ||
                        ' book_title: ' || deq_book_data.title ||
                        ' quantity: ' || deq_item_data.quantity); 
                dbms_output.put_line('ship_state: ' || deq_cust_data.state || 
                        ' ship_country: ' || deq_cust_data.country || 
                        ' ship_order_type: ' || deq_order_data.ordertype); 

            END IF; 

            commit; 
            dopt.dequeue_mode := DBMS_AQ.LOCKED; 
            dopt.msgid := NULL; 
            dopt.navigation := dbms_aq.NEXT_MESSAGE; 
          EXCEPTION 
            WHEN no_messages THEN 
                 dbms_output.put_line (' ---- NO MORE BOOKED ORDERS ---- '); 
                 new_orders := FALSE; 
          END; 
        END LOOP; 

end; 
/ 
 
CONNECT EXECUTE on get_northamerican_orders to TS; 
 
CONNECT ES/ES; 
 
/*  Browse all booked orders for East_Shipping: */ 
EXECUTE BOLADM.shipping_bookedorder_deq('East_Shipping', DBMS_AQ.BROWSE); 
 
CONNECT TS/TS; 
 
/*  Dequeue all international North American orders for Overseas_Shipping: */ 
EXECUTE BOLADM.get_northamerican_orders; 

Visual Basic (OO4O): Example Code

OO4O supports all the modes of dequeuing described earlier. Possible values include:

  • ORAAQ_DQ_BROWSE (1) - Do not lock when dequeuing

  • ORAAQ_DQ_LOCKED (2) - Read and obtain a write lock on the message

  • ORAAQ_DQ_REMOVE (3)(Default) -Read the message and update or delete it.

Dim OraSession as object
Dim OraDatabase as object
Dim OraAq as object
Dim OraMsg as Object
Dim OraOrder,OraItemList,OraItem,OraBook,OraCustomer  as Object
Dim Msgid as String

   Set OraSession = CreateObject("OracleInProcServer.XOraSession")
   Set OraDatabase = OraSession.DbOpenDatabase("", "boladm/boladm", 0&)
   set oraAq = OraDatabase.CreateAQ("OE.OE_neworders_que")
       OraAq.DequeueMode = ORAAQ_DQ_BROWSE


Java (JDBC): Example Code
public static void get_northamerican_orders(Connection db_conn)
{

    AQSession         aq_sess;
    Order             deq_order;
    Customer          deq_cust;
    String            cust_country;
    byte[]            deq_msgid;
    AQDequeueOption   deq_option;
    AQMessageProperty msg_prop;
    AQQueue           bookedorders_q;
    AQMessage         message;
    AQObjectPayload   obj_payload;
    boolean           new_orders = true;

    try
    {
        /* Create an AQ session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        deq_option = new AQDequeueOption();

        deq_option.setConsumerName("Overseas_Shipping");
        deq_option.setWaitTime(AQDequeueOption.WAIT_NONE);
        deq_option.setNavigationMode(AQDequeueOption.NAVIGATION_FIRST_MESSAGE);
        deq_option.setDequeueMode(AQDequeueOption.DEQUEUE_LOCKED);

        bookedorders_q = aq_sess.getQueue("TS", "TS_bookedorders_que");

        while(new_orders)
        {
            try
            {
              /* Dequeue the message - browse with lock */
              message = bookedorders_q.dequeue(deq_option, Order.getFactory());

              obj_payload = message.getObjectPayload();

              deq_msgid = message.getMessageId();
              deq_order = (Order)(obj_payload.getPayloadData());

              deq_cust = deq_order.getCustomer();

              cust_country = deq_cust.getCountry();

              if(cust_country.equals("Canada") ||
                 cust_country.equals("Mexico"))
              {
                deq_option.setDequeueMode(
                                  AQDequeueOption.DEQUEUE_REMOVE_NODATA);
                deq_option.setMessageId(deq_msgid);


                /* Delete the message */
                bookedorders_q.dequeue(deq_option, Order.getFactory());

                System.out.println("----  next booked order ------");
                System.out.println("Order no: " + deq_order.getOrderno());
                System.out.println("Ship state: " + deq_cust.getState());
                System.out.println("Ship country: " + deq_cust.getCountry());
                System.out.println("Order type: " + deq_order.getOrdertype());

              }

              db_conn.commit();

              deq_option.setDequeueMode(AQDequeueOption.DEQUEUE_LOCKED);
              deq_option.setMessageId(null);
              deq_option.setNavigationMode(
                                    AQDequeueOption.NAVIGATION_NEXT_MESSAGE); 
            }
            catch (AQException aqex)
            {
              new_orders = false;
              System.out.println("--- No more booked orders ----");
              System.out.println("Exception-1: " + aqex);
            }
        }

    }
    catch (Exception ex)
    {
        System.out.println("Exception-2: " + ex);
    }

}

Optimization of Waiting for Arrival of Messages

Oracle Streams AQ allows applications to block on one or more queues waiting for the arrival of either a newly enqueued message or for a message that becomes ready. You can use the DEQUEUE operation to wait for the arrival of a message in a queue or the LISTEN operation to wait for the arrival of a message in more than one queue.

When the blocking DEQUEUE call returns, it returns the message properties and the message payload. By contrast, when the blocking LISTEN call returns, it discloses only the name of the queue where a message has arrived. A subsequent DEQUEUE operation is needed to dequeue the message.

Applications can optionally specify a timeout of zero or more seconds to indicate the time that Oracle Streams AQ must wait for the arrival of a message. The default is to wait forever until a message arrives in the queue. This optimization is important in two ways. It removes the burden of continually polling for messages from the application. And it saves CPU and network resources, because the application remains blocked until a new message is enqueued or becomes READY after its DELAY time. Applications can also perform a blocking dequeue on exception queues to wait for arrival of EXPIRED messages.

A process or thread that is blocked on a dequeue is either awakened directly by the enqueuer if the new message has no DELAY or is awakened by the queue monitor process when the DELAY or EXPIRATION time has passed. Applications cannot only wait for the arrival of a message in the queue that an enqueuer enqueues a message, but also on a remote queue, if propagation has been scheduled to the remote queue using DBMS_AQADM.SCHEDULE_PROPAGATION. In this case, the Oracle Streams AQ propagator wakes up the blocked dequeuer after a message has been propagated.


Scenario

In the BooksOnLine example, the get_rushtitles procedure discussed under dequeue methods specifies a wait time of 1 second in the dequeue_options argument for the dequeue call. Wait time can be specified in different ways as illustrated in the following code.

  • If the wait time is specified as 10 seconds, then the dequeue call is blocked with a timeout of 10 seconds until a message is available in the queue. This means that if there are no messages in the queue after 10 seconds, the dequeue call returns without a message. Predefined constants can also be assigned for the wait time.

  • If the wait time is specified as DBMS_AQ.NO_WAIT, then a wait time of 0 seconds is implemented. The dequeue call in this case returns immediately even if there are no messages in the queue.

  • If the wait time is specified as DBMS_AQ.FOREVER, then the dequeue call is blocked without a timeout until a message is available in the queue.


PL/SQL (DBMS_AQADM Package): Example Code
/* dopt is a variable of type dbms_aq.dequeue_options_t. 
   Set the dequeue wait time to 10 seconds: */
dopt.wait := 10; 
 
/* Set the dequeue wait time to 0 seconds: */
dopt.wait := DBMS_AQ.NO_WAIT; 
 
/* Set the dequeue wait time to infinite (forever): */ 
dopt.wait := DBMS_AQ.FOREVER; 

Visual Basic (OO4O): Example Code

OO4O supports asynchronous dequeuing of messages. First, the monitor is started for a particular queue. When messages that fulfil the user criteria are dequeued, the user's callback object is notified.


Java (JDBC): Example Code

AQDequeueOption deq-opt;

deq-opt = new AQDequeueOption ();

Retry with Delay Interval

If the transaction dequeuing the message from a queue fails, then it is regarded as an unsuccessful attempt to consume the message. Oracle Streams AQ records the number of failed attempts to consume the message in the message history. Applications can query the RETRY_COUNT column of the queue table view to find out the number of unsuccessful attempts on a message. In addition, Oracle Streams AQ allows the application to specify, at the queue level, the maximum number of retries for messages in the queue. If the number of failed attempts to remove a message exceeds this number, then the message is moved to the exception queue and is no longer available to applications.


Note:

If a dequeue transaction fails because the server process dies (including ALTER SYSTEM KILL SESSION) or SHUTDOWN ABORT on the instance, then RETRY_COUNT is not incremented.


Retry Delay

A bad condition can cause the transaction receiving a message to end. Oracle Streams AQ allows users to hide the bad message for a prespecified interval. A retry_delay can be specified along with maximum retries. This means that a message that has had a failed attempt is visible in the queue for dequeue after the retry_delay interval. Until then it is in the WAITING state. In the Oracle Streams AQ background process, the time manager enforces the retry delay property. The default value for maximum retries is 5. The default value for retry delay is 0. Maximum retries and retry delay are not available with 8.0-compatible multiconsumer queues.


PL/SQL (DBMS_AQADM Package): Example Code
/*  Create a package that enqueue with delay set to one day: /*
      CONNECT BOLADM/BOLADM
     >
      /* queue has max retries = 4 and retry delay = 12 hours */
      EXECUTE DBMS_AQADM.ALTER_QUEUE(queue_name = 'WS.WS_BOOKED_ORDERS_QUE',
      max_retr
      ies = 4,
                                     retry_delay = 3600*12);
     >
      /* processes the next order available in the booked_order_queue */
      CREATE OR REPLACE PROCEDURE  process_next_order()
      AS
        dqqopt                   dbms_aq.dequeue_options_t;
        msgprop                  dbms_aq.message_properties_t;
        deq_msgid                RAW(16);
        book                     BOLADM.book_typ;
        item                     BOLADM.orderitem_typ;
        BOLADM.order_typ         order;
      BEGIN
     >
        dqqopt.dequeue_option := DBMS_AQ.FIRST_MESSAGE;
        dbms_aq.dequeue('WS.WS_BOOKED_ORDERS_QUEUE', dqqopt, msgprop, order,
      deq_msgid
      );
     >
        /* For simplicity, assume order has a single item */
       item = order.items(1);
        book = the_orders.item;
     >
        /* assume search_inventory searches inventory for the book */
        /* if we don't find the book in the warehouse, terminate transaction */
        IF  (search_inventory(book) != TRUE)
            rollback;
        ELSE
           process_order(order);
        END IF;
     >
      END;
      /

Visual Basic (OO4O): Example Code

Use the dbexecutesql interface from the database for this functionality.


Java (JDBC): Example Code
public static void setup_queue(Connection db_conn)
{
    AQSession         aq_sess;
    AQQueue           bookedorders_q;
    AQQueueProperty   q_prop;

    try
    {
        /* Create an AQ session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        bookedorders_q = aq_sess.getQueue("WS", "WS_bookedorders_que");

        /* Alter queue - set max retries = 4 and retry delay = 12 hours */
        q_prop = new AQQueueProperty();
        q_prop.setMaxRetries(4);

        q_prop.setRetryInterval(3600*12);  // specified in seconds

        bookedorders_q.alterQueue(q_prop);


    }
    catch (Exception ex)
    {
        System.out.println("Exception: " + ex); 
    }

}
public static void process_next_order(Connection db_conn)
{
    AQSession         aq_sess;
    Order             deq_order;
    OrderItem         order_item;
    Book              book;
    AQDequeueOption   deq_option;
    AQMessageProperty msg_prop;
    AQQueue           bookedorders_q;
    AQMessage         message;
    AQObjectPayload   obj_payload;

    try
    {
        /* Create an AQ session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        deq_option = new AQDequeueOption();

        deq_option.setNavigationMode(AQDequeueOption.NAVIGATION_FIRST_MESSAGE);

        bookedorders_q = aq_sess.getQueue("WS", "WS_bookedorders_que");


        /* Dequeue the message */
        message = bookedorders_q.dequeue(deq_option, Order.getFactory());

        obj_payload = message.getObjectPayload();

        deq_order = (Order)(obj_payload.getPayloadData());

        /* For simplicity, assume order has a single item */
        order_item = deq_order.getItems().getElement(0);
        book = order_item.getItem(); 

        /* assume search_inventory searches inventory for the book 
         * if we don't find the book in the warehouse, terminate transaction 
         */
        if(search_inventory(book) != true)
          db_conn.rollback();
        else
          process_order(deq_order);

    }
    catch (AQException aqex)
    {
        System.out.println("Exception-1: " + aqex);
    }
    catch (Exception ex)
    {
        System.out.println("Exception-2: " + ex);
    }
}

Exception Handling

Oracle Streams AQ provides four integrated mechanisms to support exception handling in applications:

An exception_queue is a repository for all expired or unserviceable messages. Applications cannot directly enqueue into exception queues. Also, a multiconsumer exception queue cannot have subscribers associated with it. However, an application that intends to handle these expired or unserviceable messages can dequeue from the exception queue.

When a message has expired, it is moved to an exception queue. The exception queue for a message in a multiconsumer queue should be created in a multiconsumer queue table. However, the exception queue always acts like a single-consumer queue. You cannot add subscribers to an exception queue. The consumer name specified while dequeuing should be null.

Like any other queue, the exception queue must be enabled for dequeue using the DBMS_AQADM.START_QUEUE procedure. You get an Oracle Streams AQ error if you try to enable an exception queue for enqueue.

Expired messages from multiconsumer queues cannot be dequeued by the intended recipients of the message. However, they can be dequeued in the REMOVE mode exactly once by specifying a NULL consumer name in the dequeue options. Hence, from a dequeue perspective multiconsumer exception queues act like single-consumer queues, because each expired message can be dequeued only once using a NULL consumer name. Messages can also be dequeued from the exception queue by specifying the message ID.

The exception queue is a message property that can be specified during enqueue time. In PL/SQL users can use the exception_queue attribute of the DBMS_AQ.MESSAGE_PROPERTIES_T record to specify the exception queue. In OCI users can use the OCISetAttr procedure to set the OCI_ATTR_EXCEPTION_QUEUE attribute of the OCIAQMsgProperties descriptor.

If an exception queue is not specified, then the default exception queue is used. If the queue is created in a queue table, for example, QTAB, then the default exception queue is called AQ$_QTAB_E. The default exception queue is automatically created when the queue table is created. Messages are moved to the exception queues by Oracle Streams AQ under the following conditions:


Note:

If a dequeue transaction fails because the server process dies (including ALTER SYSTEM KILL SESSION) or SHUTDOWN ABORT on the instance, then RETRY_COUNT is not incremented.

Messages intended for 8.1-compatible multiconsumer queues cannot be dequeued by the intended recipients once the messages have been moved to an exception queue. These messages should instead be dequeued in the REMOVE or BROWSE mode exactly once by specifying a NULL consumer name in the dequeue options. The messages can also be dequeued by their message IDs.

Messages intended for single consumer queues, or for 8.0-compatible multiconsumer queues, can only be dequeued by their message IDs once the messages have been moved to an exception queue.

Users can associate a RETRY_DELAY with a queue. The default value for this parameter is 0, meaning that the message is available for dequeue immediately after the RETRY_COUNT is incremented. Otherwise the message is unavailable for RETRY_DELAY seconds. After RETRY_DELAY seconds, the queue monitor marks the message as READY.

For a multiconsumer queue, RETRY_DELAY is for each subscriber.


Scenario

In the BooksOnLine application, the business rule for each shipping region is that an order is placed in a back order queue if the order cannot be filled immediately. The back order application tries to fill the order once a day. If the order cannot be filled within 5 days, then it is placed in an exception queue for special processing. You can implement this process by making use of the retry and exception handling features in Oracle Streams AQ.

The following example shows how you can create a queue with specific maximum retry and retry delay interval.


PL/SQL (DBMS_AQADM Package): Example Code
/* Example for creating a backorder queue in Western Region which allows a 
   maximum of 5 retries and 1 day delay between each retry. */
CONNECT BOLADM/BOLADM 
BEGIN 
  DBMS_AQADM.CREATE_QUEUE ( 
        queue_name              => 'WS.WS_backorders_que', 
        queue_table             => 'WS.WS_orders_mqtab', 
        max_retries             => 5, 
        retry_delay             => 60*60*24); 
END; 
/ 
 
/* Create an exception queue for the backorder queue for Western Region. */
CONNECT BOLADM/BOLADM 
BEGIN 
  DBMS_AQADM.CREATE_QUEUE ( 
        queue_name              => 'WS.WS_backorders_excpt_que', 
        queue_table             => 'WS.WS_orders_mqtab', 
        queue_type              => DBMS_AQADM.EXCEPTION_QUEUE); 
end; 
/ 
 
/* Enqueue a message to WS_backorders_que and specify WS_backorders_excpt_que as the exception queue for the message: */ 
CONNECT BOLADM/BOLADM 
CREATE OR REPLACE PROCEDURE enqueue_WS_unfilled_order(backorder order_typ) 
 AS 
   back_order_queue_name    varchar2(62); 
   enqopt                   dbms_aq.enqueue_options_t; 
   msgprop                  dbms_aq.message_properties_t; 
   enq_msgid                raw(16); 
 BEGIN 

   /* Set backorder queue name for this message: */ 
   back_order_queue_name := 'WS.WS_backorders_que'; 
 
   /* Set exception queue name for this message: */ 
   msgprop.exception_queue := 'WS.WS_backorders_excpt_que'; 
 
   dbms_aq.enqueue(back_order_queue_name, enqopt, msgprop, 
                   backorder, enq_msgid); 
 END; 
 / 

Visual Basic (OO4O): Example Code

The exception queue is a message property that can be provided at the time of enqueuing a message. If this property is not set, then the default exception queue of the queue is used for any error conditions.

set oraAq = OraDatabase.CreateAQ("CBADM.deferbilling_que")
Set OraMsg = OraAq.AQMsg(ORATYPE_OBJECT, "BOLADM.order_typ")
Set OraOrder = OraDatabase.CreateOraObject("BOLADM.order_typ")
OraMsg = OraOrder
   OraMsg.delay = 15*60*60*24
   OraMsg.ExceptionQueue = "WS.WS_backorders_que"
   'Fill up the order values
   OraMsg = OraOrder 'OraOrder contains the order details
   Msgid = OraAq.enqueue

Java (JDBC): Example Code
public static void createBackOrderQueues(Connection db_conn)
{
    AQSession         aq_sess;
    AQQueue           backorders_q;
    AQQueue           backorders_excp_q;
    AQQueueProperty   q_prop;
    AQQueueProperty   q_prop2;
    AQQueueTable      mq_table;

    try
    {
        /* Create an AQ session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        mq_table = aq_sess.getQueueTable("WS", "WS_orders_mqtab");


        /* Create a backorder queue in Western Region which allows a 
           maximum of 5 retries and 1 day delay between each retry. */

        q_prop = new AQQueueProperty();
        q_prop.setMaxRetries(5);
        q_prop.setRetryInterval(60*24*24);

        backorders_q = aq_sess.createQueue(mq_table, "WS_backorders_que", 
                                          q_prop);

        backorders_q.start(true, true);

        /* Create an exception queue for the backorder queue for 
           Western Region. */
        q_prop2 = new AQQueueProperty();
        q_prop2.setQueueType(AQQueueProperty.EXCEPTION_QUEUE);

        backorders_excp_q = aq_sess.createQueue(mq_table, 
                                          "WS_backorders_excpt_que", q_prop2);

    }
    catch (Exception ex)
    {
        System.out.println("Exception " + ex);
    }

}

/* Enqueue a message to WS_backorders_que and specify WS_backorders_excpt_que 
   as the exception queue for the message: */ 
public static void enqueue_WS_unfilled_order(Connection db_conn, 
                                             Order back_order)
{
    AQSession         aq_sess;
    AQQueue           back_order_q;
    AQEnqueueOption   enq_option;
    AQMessageProperty m_property;
    AQMessage         message;
    AQObjectPayload   obj_payload;
    byte[]            enq_msg_id;

    try
    {
        /* Create an AQ session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        back_order_q = aq_sess.getQueue("WS", "WS_backorders_que");

        message = back_order_q.createMessage();

        /* Set exception queue name for this message: */ 
        m_property = message.getMessageProperty();

        m_property.setExceptionQueue("WS.WS_backorders_excpt_que");

        obj_payload = message.getObjectPayload();
        obj_payload.setPayloadData(back_order);

        enq_option = new AQEnqueueOption();

        /* Enqueue the message */
        enq_msg_id = back_order_q.enqueue(enq_option, message);

        db_conn.commit();
    }
    catch (Exception ex)
    {
        System.out.println("Exception: " + ex); 
    }

}

Rule-Based Subscription

Messages can be routed to various recipients based on message properties or message content. Users define a rule-based subscription for a given queue to specify interest in receiving messages that meet particular conditions.

Rules are Boolean expressions that evaluate to TRUE or FALSE. Similar in syntax to the WHERE clause of a SQL query, rules are expressed in terms of the attributes that represent message properties or message content. These subscriber rules are evaluated against incoming messages and those rules that match are used to determine message recipients. This feature thus supports the notions of content-based subscriptions and content-based routing of messages.

Subscription rules can also be defined on an attribute of type XMLType using XML operators such as ExistsNode.


Scenario

For the BooksOnLine application, we illustrate how rule-based subscriptions are used to implement a publish/subscribe paradigm utilizing content-based subscription and content-based routing of messages. The interaction between the Order Entry application and each of the Shipping Applications is modeled as follows:

  • Western Region Shipping handles orders for the Western Region of the U.S.

  • Eastern Region Shipping handles orders for the Eastern Region of the U.S.

  • Overseas Shipping handles all non-U.S. orders.

  • Overseas Shipping checks for the XMLType attribute to identify special handling.

  • Eastern Region Shipping also handles all U.S. rush orders.

Each shipping application subscribes to the OE booked orders queue. The following rule-based subscriptions are defined by the Order Entry user to handle the routing of booked orders from the Order Entry application to each of the Shipping applications.


PL/SQL (DBMS_AQADM Package): Example Code
CONNECT OE/OE; 

Western Region Shipping defines an agent called 'West_Shipping' with the WS booked orders queue as the agent address (destination queue where messages must be delivered). This agent subscribes to the OE booked orders queue using a rule specified on order region and ordertype attributes.

/*  Add a rule-based subscriber for West Shipping - 
    West Shipping handles Western Region U.S. orders, 
    Rush Western Region orders are handled by Eastern Shipping: */ 
DECLARE 
  subscriber     aq$_agent; 
BEGIN 
  subscriber := aq$_agent('West_Shipping', 'WS.WS_bookedorders_que', null); 
  DBMS_AQADM.ADD_SUBSCRIBER( 
                queue_name => 'OE.OE_bookedorders_que', 
                subscriber => subscriber, 
                rule       => 'tab.user_data.orderregion =
                    ''WESTERN'' AND tab.user_data.ordertype != ''RUSH'''); 
END; 
/ 

Eastern Region Shipping defines an agent called East_Shipping with the ES booked orders queue as the agent address (the destination queue where messages must be delivered). This agent subscribes to the OE booked orders queue using a rule specified on orderregion, ordertype and customer attributes.

/*  Add a rule-based subscriber for Eastern Shipping - 
    Eastern Shipping handles all Eastern Region orders, 
    Eastern Shipping also handles all U.S. rush orders: */ 
DECLARE 
  subscriber     aq$_agent; 
BEGIN 
  subscriber := aq$_agent('East_Shipping', 'ES.ES_bookedorders_que', null); 
  DBMS_AQADM.ADD_SUBSCRIBER( 
        queue_name => 'OE.OE_bookedorders_que', 
        subscriber => subscriber, 
        rule       => 'tab.user_data.orderregion = ''EASTERN'' OR
                      (tab.user_data.ordertype = ''RUSH'' AND
                       tab.user_data.customer.country = ''USA'') '); 
END; 

Overseas Shipping defines an agent called Overseas_Shipping with the TS booked orders queue as the agent address (destination queue to which messages must be delivered). This agent subscribes to the OE booked orders queue using a rule specified on the orderregion attribute. Because the representation of orders at the Overseas Shipping site is different from the representation of orders at the Order Entry site, a transformation is applied before messages are propagated from the Order Entry site to the Overseas Shipping site.

/*  Add a rule-based subscriber (for Overseas Shipping) to the Booked orders 
queues with Transformation. Overseas Shipping handles all non-US orders: */ 
DECLARE 
  subscriber     aq$_agent; 
BEGIN 
  subscriber := aq$_agent('Overseas_Shipping','TS.TS_bookedorders_que',null); 

  DBMS_AQADM.ADD_SUBSCRIBER( 
        queue_name     => 'OE.OE_bookedorders_que', 
        subscriber     => subscriber, 
        rule           => 'tab.user_data.orderregion = ''INTERNATIONAL''',
        transformation => 'TS.OE2XML'); 
END; 

Assume that the Overseas Shipping site has a subscriber, Overseas_DHL, for handling RUSH orders. Because TS_bookedorders_que has the order details represented as an XMLType, the rule uses XPath syntax.

DECLARE 
  subscriber     aq$_agent; 
BEGIN 
  subscriber := aq$_agent('Overseas_DHL', null, null); 

  DBMS_AQADM.ADD_SUBSCRIBER( 
        queue_name     => 'TS.TS_bookedorders_que', 
        subscriber     => subscriber, 
        rule           => 'tab.user_data.extract(''/ORDER_TYP/ORDERTYPE/
                             text()'').getStringVal()=''RUSH''');

END; 

Visual Basic (OO4O): Example Code

This functionality is currently not available.


Java (JDBC): Example Code
public static void addRuleBasedSubscribers(Connection db_conn)
{

    AQSession         aq_sess;
    AQQueue           bookedorders_q;
    String            rule;
    AQAgent           agt1, agt2, agt3;

    try
    {
        /* Create an AQ session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        bookedorders_q = aq_sess.getQueue("OE", "OE_booked_orders_que");

        /* Add a rule-based subscriber for West Shipping - 
           West Shipping handles Western region U.S. orders, 
           Rush Western region orders are handled by East Shipping: */ 

        agt1 = new AQAgent("West_Shipping", "WS.WS_bookedorders_que"); 

        rule = "tab.user_data.orderregion = 'WESTERN' AND " +
               "tab.user_data.ordertype != 'RUSH'";

        bookedorders_q.addSubscriber(agt1, rule);

        /*  Add a rule-based subscriber for Eastern Shipping - 
            Eastern Shipping handles all Eastern Region orders, 
            Eastern Shipping also handles all U.S. rush orders: */ 

        agt2 = new AQAgent("East_Shipping", "ES.ES_bookedorders_que"); 
        rule = "tab.user_data.orderregion = 'EASTERN' OR " +
               "(tab.user_data.ordertype = 'RUSH' AND " + 
               "tab.user_data.customer.country = 'USA')"; 

        bookedorders_q.addSubscriber(agt2, rule);

        /*  Add a rule-based subscriber for Overseas Shipping 
            Intl Shipping handles all non-U.S. orders: */ 

        agt3 = new AQAgent("Overseas_Shipping", "TS.TS_bookedorders_que");
        rule = "tab.user_data.orderregion = 'INTERNATIONAL'";

        bookedorders_q.addSubscriber(agt3, rule);
    }
    catch (Exception ex)
    {
        System.out.println("Exception: " + ex);
    }
} 

Listen Capability

Oracle Streams AQ can monitor multiple queues for messages with a single call, LISTEN. An application can use LISTEN to wait for messages for multiple subscriptions. It can also be used by gateway applications to monitor multiple queues. If the LISTEN call returns successfully, then a dequeue must be used to retrieve the message.

Without the LISTEN call, an application which sought to dequeue from a set of queues would continuously poll the queues to determine if there were a message. Alternatively, you could design your application to have a separate dequeue process for each queue. However, if there are long periods with no traffic in any of the queues, then these approaches create unacceptable overhead. The LISTEN call is well suited for such applications.

When there are messages for multiple agents in the agent list, LISTEN returns with the first agent for whom there is a message. In that sense LISTEN is not 'fair' in monitoring the queues. The application designer must keep this in mind when using the call. To prevent one agent from 'starving' other agents for messages, the application can change the order of the agents in the agent list.


Scenario

In the customer service component of the BooksOnLine example, messages from different databases arrive in the customer service queues, indicating the state of the message. The customer service application monitors the queues and whenever there is a message about a customer order, it updates the order status in the order_status_table. The application uses the LISTEN call to monitor the different queues. Whenever there is a message in any of the queues, it dequeues the message and updates the order status accordingly.


PL/SQL (DBMS_AQADM Package): Example Code
CODE (in tkaqdocd.sql) 
 
/* Update the status of the order in the order status table: */ 
CREATE OR REPLACE PROCEDURE update_status( 
                                new_status    IN VARCHAR2, 
                                order_msg     IN BOLADM.ORDER_TYP) 
IS 
 old_status    VARCHAR2(30); 
 dummy         NUMBER; 
BEGIN 
 
  BEGIN
    /* Query old status from the table: */ 
    SELECT st.status INTO old_status FROM order_status_table st
       WHERE st.customer_order.orderno = order_msg.orderno; 
 
  /* Status can be 'BOOKED_ORDER', 'SHIPPED_ORDER', 'BACK_ORDER' 
     and   'BILLED_ORDER': */ 
 
   IF new_status = 'SHIPPED_ORDER' THEN
      IF old_status = 'BILLED_ORDER' THEN 
        return;             /* message about a previous state */ 
      END IF; 
   ELSIF new_status = 'BACK_ORDER' THEN 
      IF old_status = 'SHIPPED_ORDER' OR old_status = 'BILLED_ORDER' THEN 
        return;             /* message about a previous state */ 
      END IF; 
   END IF; 
 
   /* Update the order status:  */ 
     UPDATE order_status_table st 
        SET st.customer_order = order_msg, st.status = new_status; 
 
   COMMIT; 
 
  EXCEPTION 
  WHEN OTHERS  THEN     /* change to no data found */ 
    /* First update for the order: */ 
    INSERT INTO order_status_table(customer_order, status) 
    VALUES (order_msg, new_status); 
    COMMIT; 
 
  END; 
END; 
/ 
 
 
/* Dequeues message from 'QUEUE' for 'CONSUMER': */ 
CREATE OR REPLACE PROCEDURE DEQUEUE_MESSAGE( 
                         queue      IN   VARCHAR2, 
                         consumer   IN   VARCHAR2, 
                         message    OUT  BOLADM.order_typ) 
IS 

dopt                     dbms_aq.dequeue_options_t; 
mprop                    dbms_aq.message_properties_t; 
deq_msgid                RAW(16); 
BEGIN 
  dopt.dequeue_mode := dbms_aq.REMOVE; 
  dopt.navigation := dbms_aq.FIRST_MESSAGE; 
  dopt.consumer_name := consumer; 
 
  dbms_aq.dequeue( 
                queue_name => queue, 
                dequeue_options => dopt, 
                message_properties => mprop, 
                payload => message, 
                msgid => deq_msgid); 
  commit; 
END; 
/ 
 
/* Monitor the queues in the customer service databse for 'time' seconds: */ 
 CREATE OR REPLACE PROCEDURE MONITOR_STATUS_QUEUE(time  IN  NUMBER)
IS 
  agent_w_message   aq$_agent; 
  agent_list        dbms_aq.agent_list_t; 
  wait_time         INTEGER := 120; 
  no_message        EXCEPTION; 
  pragma EXCEPTION_INIT(no_message, -25254); 
  order_msg         boladm.order_typ;
  new_status        VARCHAR2(30); 
  monitor           BOOLEAN := TRUE; 
  begin_time        NUMBER; 
  end_time          NUMBER; 
BEGIN 
 
 begin_time :=  dbms_utility.get_time;
 WHILE (monitor) 
 LOOP 
 BEGIN 
 
  /* Construct the waiters list: */ 
  agent_list(1) := aq$_agent('BILLED_ORDER', 'CS_billedorders_que', NULL); 
  agent_list(2) := aq$_agent('SHIPPED_ORDER', 'CS_shippedorders_que', 
NULL); 
  agent_list(3) := aq$_agent('BACK_ORDER', 'CS_backorders_que', NULL); 
  agent_list(4) := aq$_agent('Booked_ORDER', 'CS_bookedorders_que', NULL); 
 
   /* Wait for order status messages: */ 
   dbms_aq.listen(agent_list, wait_time, agent_w_message); 

   dbms_output.put_line('Agent' || agent_w_message.name || ' Address '|| 
agent_w_message.address); 
   /* Dequeue the message from the queue: */ 
   dequeue_message(agent_w_message.address, agent_w_message.name, order_msg); 
 
   /* Update the status of the order depending on the type of the message,
    * the name of the agent contains the new state: */ 
   update_status(agent_w_message.name, order_msg); 
 
  /* Exit if we have been working long enough: */ 
   end_time := dbms_utility.get_time; 
   IF  (end_time - begin_time > time)   THEN 
     EXIT; 
   END IF; 
 
  EXCEPTION 
  WHEN  no_message  THEN 
    dbms_output.put_line('No messages in the past 2 minutes'); 
       end_time := dbms_utility.get_time; 
    /* Exit if we have accomplished enough work: */ 
    IF  (end_time - begin_time > time)   THEN 
      EXIT; 
    END IF; 
  END; 

  END LOOP; 
END; 
/ 

Visual Basic (OO4O): Example Code

Feature not currently available.


Java (JDBC): Example Code
public static void monitor_status_queue(Connection db_conn)
{
    AQSession         aq_sess;
    AQAgent[]         agt_list = null;
    AQAgent           ret_agt  = null;
    Order             deq_order;
    AQDequeueOption   deq_option;
    AQQueue           orders_q;
    AQMessage         message;
    AQObjectPayload   obj_payload;
    String            owner = null;
    String            queue_name = null;
    int               idx = 0;

    try
    {
        /* Create an AQ session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

/* Construct the waiters list: */ 
agt_list = new AQAgent[4];

agt_list[0] = new AQAgent("BILLED_ORDER", "CS_billedorders_que", 0);
agt_list[1] = new AQAgent("SHIPPED_ORDER", "CS_shippedorders_que", 0);
agt_list[2] = new AQAgent("BACK_ORDER", "CS_backorders_que", 0);
agt_list[3] = new AQAgent("BOOKED_ORDER", "CS_bookedorders_que", 0);

/* Wait for order status messages for 120 seconds: */
ret_agt = aq_sess.listen(agt_list, 120);

System.out.println("Message available for agent: " + 
   ret_agt.getName()  + "   "  + ret_agt.getAddress());

/* Get owner, queue where message is available */
idx = ret_agt.getAddress().indexOf(".");

if(idx != -1)
{
  owner = ret_agt.getAddress().substring(0, idx);
  queue_name = ret_agt.getAddress().substring(idx + 1);

/* Dequeue the message */
deq_option = new AQDequeueOption();

deq_option.setConsumerName(ret_agt.getName());
deq_option.setWaitTime(1);

orders_q = aq_sess.getQueue(owner, queue_name);

/* Dequeue the message */
message = orders_q.dequeue(deq_option, Order.getFactory());

obj_payload = message.getObjectPayload();

deq_order = (Order)(obj_payload.getPayloadData());

   System.out.println("Order number " + deq_order.getOrderno() + " retrieved");

   }
   catch (AQException aqex)
   {
   System.out.println("Exception-1: " + aqex);
   }
   catch (Exception ex)
   {
    System.out.println("Exception-2: " + ex);
    }

}

Message Transformation During Dequeue

Continuing the scenario introduced in "Message Format Transformation" and "Message Transformation During Enqueue", the queues in the OE schema are of payload type OE.orders_typ and the queues in the WS schema are of payload type WS.orders_typ_sh.


Scenario

At dequeue time, an application can move messages from OE_booked_orders_topic to the WS_booked_orders_topic by using a selection criteria on dequeue to dequeue only orders with order_region "WESTERN" and order_type not equal to "RUSH." At the same time, the transformation is applied and the order in the ws.order_typ_sh type is retrieved. Then the message is enqueued into the WS.ws_booked_orders queue.


PL/SQL (DBMS_AQ Package): Example Code
CREATE OR REPLACE PROCEDURE   fwd_message_to_ws_shipping AS

  enq_opt   dbms_aq.enqueue_options_t;
  deq_opt   dbms_aq.dequeue_options_t;
  msg_prp   dbms_aq.message_properties_t;
  booked_order WS.order_typ_sh;
BEGIN

/* First dequeue the message from OE booked orders topic: */
    deq_opt.transformation := 'OE.OE2WS';
    deq_opt.condition := 'tab.user_data.order_region = ''WESTERN'' and tab.user_data.order_type != ''RUSH''';

    dbms_aq.dequeue('OE.oe_bookedorders_topic', deq_opt,
                     msg_prp, booked_order);

/* Enqueue the message in the WS booked orders topic */
    msg_prp.recipient_list(0) := aq$_agent('West_shipping', null, null);

    dbms_aq.enqueue('WS.ws_bookedorders_topic',
                     enq_opt, msg_prp, booked_order);

END;

Visual Basic (OO4O): Example Code

No example is provided with this release.


Java (JDBC): Example Code

No example is provided with this release.

Dequeue Using the Oracle Streams AQ XML Servlet

You can perform dequeue requests over the Internet using SOAP.

In the BooksOnline scenario, assume that the Eastern Shipping application receives Oracle Streams AQ messages with a correlation identifier 'RUSH' over the Internet. The dequeue request has the following format:

<?xml version="1.0"?>
<Envelope xmlns= "http://schemas.xmlsoap.org/soap/envelope/">
      <Body>
        <AQXmlReceive xmlns = "http://ns.oracle.com/AQ/schemas/access">
          <consumer_options>
            <destination>ES_ES_bookedorders_que</destination>
            <consumer_name>East_Shipping</consumer_name>
            <wait_time>0</wait_time>
            <selector>
                 <correlation>RUSH</correlation>
            </selector>
          </consumer_options>

          <AQXmlCommit/>

        </AQXmlReceive>
      </Body>
</Envelope> 

Asynchronous Notifications

This feature allows clients to receive notifications for messages of interest. It supports multiple mechanisms to receive notifications. Clients can receive notifications procedurally using PL/SQL, Java Message Service (JMS), or OCI callback functions, or clients can receive notifications through e-mail or HTTP post.

For persistent queues, notifications contain only the message properties, except for JMS notifications. Clients explicitly dequeue to receive the message. In JMS, the dequeue is accomplished as part of the notifications and hence explicit dequeue is not required. For nonpersistent queues, the message is delivered as part of the notification.

Clients can also specify the presentation for notifications as either RAW or XML.


Scenario

In the BooksOnLine application, a customer can request Fed-Ex shipping (priority 1), priority air shipping (priority 2), or regular ground shipping (priority 3).

The shipping application then ships the orders according to the user's request. It is of interest to BooksOnLine to find out how many requests of each shipping type come in each day. The application uses asynchronous notification facility for this purpose. It registers for notification on the WS.WS_bookedorders_que. When it is notified of new message in the queue, it updates the count for the appropriate shipping type depending on the priority of the message.


Visual Basic (OO4O): Example Code

Refer to the Visual Basic online help, "Monitoring Messages".


Java (JDBC): Example Code

This feature is not supported by the Java API.


C (OCI): Example Code

This example illustrates the use of OCIRegister. At the shipping site, an OCI client program keeps track of how many orders were made for each of the shipping types, FEDEX, AIR and GROUND. The priority field of the message enables us to determine the type of shipping wanted.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <oci.h>
#ifdef WIN32COMMON
#define sleep(x)   Sleep(1000*(x))
#endif
static text *username = (text *) "WS";
static text *password = (text *) "WS";

static OCIEnv *envhp;
static OCIServer *srvhp;
static OCIError *errhp;
static OCISvcCtx *svchp;

static void checkerr(/*_ OCIError *errhp, sword status _*/);

struct ship_data
{
  ub4  fedex;
  ub4  air;
  ub4  ground;
};

typedef struct ship_data ship_data;

int main(/*_ int argc, char *argv[] _*/);


/* Notify callback: */
ub4 notifyCB(ctx, subscrhp, pay, payl, desc, mode)
dvoid *ctx;
OCISubscription *subscrhp;
dvoid *pay;
ub4    payl;
dvoid *desc;
ub4    mode;
{
 text                *subname;
 ub4                 size;
 ship_data           *ship_stats = (ship_data *)ctx;
 text                *queue;
 text                *consumer;
 OCIRaw              *msgid;
 ub4                 priority;
 OCIAQMsgProperties  *msgprop;

 OCIAttrGet((dvoid *)subscrhp, OCI_HTYPE_SUBSCRIPTION,
                             (dvoid *)&subname, &size,
                             OCI_ATTR_SUBSCR_NAME, errhp);

 /* Extract the attributes from the AQ descriptor.
    Queue name: */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&queue, &size, 
            OCI_ATTR_QUEUE_NAME, errhp);

 /* Consumer name: */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&consumer, &size, 
            OCI_ATTR_CONSUMER_NAME, errhp);

 /* Msgid: */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgid, &size, 
            OCI_ATTR_NFY_MSGID, errhp);

 /* Message properties: */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgprop, &size, 
            OCI_ATTR_MSG_PROP, errhp);

 /* Get priority from message properties: */
 checkerr(errhp, OCIAttrGet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, 
                             (dvoid *)&priority, 0, 
                             OCI_ATTR_PRIORITY, errhp));

  switch (priority)
  {
  case 1:  ship_stats->fedex++;
           break;
  case 2: ship_stats->air++;
           break;
  case 3:  ship_stats->ground++;
           break;
  default: 
           printf(" Error priority %d", priority);
  }
}


int main(argc, argv)
int argc;
char *argv[];
{
  OCISession *authp = (OCISession *) 0;
  OCISubscription *subscrhp[8];
  ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ;
  ship_data  ctx = {0,0,0};
  ub4 sleep_time = 0;

  printf("Initializing OCI Process\n");

  /* Initialize OCI environment with OCI_EVENTS flag set: */
  (void) OCIInitialize((ub4) OCI_EVENTS|OCI_OBJECT, (dvoid *)0,
                       (dvoid * (*)(dvoid *, size_t)) 0,
                       (dvoid * (*)(dvoid *, dvoid *, size_t))0,
                       (void (*)(dvoid *, dvoid *)) 0 );

  printf("Initialization successful\n");

  printf("Initializing OCI Env\n");
  (void) OCIEnvInit( (OCIEnv **) &envhp, OCI_DEFAULT, (size_t) 0, (dvoid **) 0 );
  printf("Initialization successful\n");

  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, OCI_HTYPE_ERROR, 
                   (size_t) 0, (dvoid **) 0));

  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, OCI_HTYPE_SERVER,
                   (size_t) 0, (dvoid **) 0));

  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, OCI_HTYPE_SVCCTX,
                   (size_t) 0, (dvoid **) 0));


  printf("connecting to server\n");
  checkerr(errhp, OCIServerAttach( srvhp, errhp, (text *)"inst1_alias",
           strlen("inst1_alias"), (ub4) OCI_DEFAULT));
  printf("connect successful\n");

  /* Set attribute server context in the service context: */
  checkerr(errhp, OCIAttrSet( (dvoid *) svchp, OCI_HTYPE_SVCCTX, (dvoid *)srvhp, 
                    (ub4) 0, OCI_ATTR_SERVER, (OCIError *) errhp));

  checkerr(errhp, OCIHandleAlloc((dvoid *) envhp, (dvoid **)&authp,
                       (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0));
 
  /* Set username and password in the session handle: */
  checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION,
                  (dvoid *) username, (ub4) strlen((char *)username),
                  (ub4) OCI_ATTR_USERNAME, errhp));
 
  checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION,
                  (dvoid *) password, (ub4) strlen((char *)password),
                  (ub4) OCI_ATTR_PASSWORD, errhp));

  /* Begin session: */
  checkerr(errhp, OCISessionBegin ( svchp,  errhp, authp, OCI_CRED_RDBMS, 
                          (ub4) OCI_DEFAULT));

  (void) OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX,
                   (dvoid *) authp, (ub4) 0,
                   (ub4) OCI_ATTR_SESSION, errhp);


  /* Register for notification: */
  printf("allocating subscription handle\n");
  subscrhp[0] = (OCISubscription *)0;
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[0], 
                        (ub4) OCI_HTYPE_SUBSCRIPTION,
                        (size_t) 0, (dvoid **) 0);
 
  printf("setting subscription name\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) "WS.WS_BOOKEDORDERS_QUE:BOOKED_ORDERS", 
                 (ub4) strlen("WS.WS_BOOKEDORDERS_QUE:BOOKED_ORDERS"),
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp);
 
  printf("setting subscription callback\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) notifyCB, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp);

 (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *)&ctx, (ub4)sizeof(ctx),
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp);

  printf("setting subscription namespace\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) &namespace, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp);

  printf("Registering \n");
  checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 1, errhp, 
                                          OCI_DEFAULT));

  sleep_time = (ub4)atoi(argv[1]);
  printf ("waiting for %d s", sleep_time);
  sleep(sleep_time);

  printf("Exiting");
  exit(0);
}

void checkerr(errhp, status)
OCIError *errhp;
sword status;
{
  text errbuf[512];
  sb4 errcode = 0;

  switch (status)
  {
  case OCI_SUCCESS:
    break;
  case OCI_SUCCESS_WITH_INFO:
    (void) printf("Error - OCI_SUCCESS_WITH_INFO\n");
    break;
  case OCI_NEED_DATA:
    (void) printf("Error - OCI_NEED_DATA\n");
    break;
  case OCI_NO_DATA:
    (void) printf("Error - OCI_NODATA\n");
    break;
  case OCI_ERROR:
    (void) OCIErrorGet((dvoid *)errhp, (ub4) 1, (text *) NULL, &errcode,
                        errbuf, (ub4) sizeof(errbuf), OCI_HTYPE_ERROR);
    (void) printf("Error - %.*s\n", 512, errbuf);
    break;
  case OCI_INVALID_HANDLE:
    (void) printf("Error - OCI_INVALID_HANDLE\n");
    break;
  case OCI_STILL_EXECUTING:
    (void) printf("Error - OCI_STILL_EXECUTE\n");
    break;
  case OCI_CONTINUE:
    (void) printf("Error - OCI_CONTINUE\n");
    break;
  default:
    break;
  }
}

PL/SQL (DBMS_AQ package): Example Code

This example illustrates the use of the DBMS_AQ.REGISTER procedure.

In the BooksOnline scenario, assume that we want a PL/SQL callback WS.notifyCB() to be invoked when the subscriber BOOKED_ORDER receives a message in the WS.WS_BOOKED_ORDERS_QUE queue. In addition, we want to send an e-mail to john@company.com when an order is enqueued in the queue for subscriber BOOKED_ORDERS. Also assume that we want to invoke the servlet http://xyz.company.com/servlets/NofifyServlet. This can be accomplished as follows:

First define a PL/SQL procedure that is invoked on notification.

connect ws/ws;
set echo on;
set serveroutput on;

-- notifyCB callback 
create or replace procedure notifyCB(
  context raw, reginfo sys.aq$_reg_info, descr sys.aq$_descriptor,
  payload raw, payloadl number)
AS
  dequeue_options   DBMS_AQ.dequeue_options_t;
  message_properies DBMS_AQ.message_properties_t;
  message_handle    RAW(16);
  message           BOLADM.order_typ;
BEGIN
  -- get the consumer name and msg_id from the descriptor
  dequeue_options.msgid := descr.msg_id;
  dequeue_options.consumer_name := descr.consumer_name;

  -- Dequeue the message
  DBMS_AQ.DEQUEUE(queue_name => descr.queue_name,
                  dequeue_options => dequeue_options,
                  message_properties => message_properties,
                  payload => message,
                  msgid => message_handle);

  commit;

  DBMS_OUTPUT.PUTLINE('Received Order: ' || message.orderno);

END;
/

The PL/SQL procedure, e-mail address, and HTTP URL can be registered as follows:

connect ws/ws; 
set echo on; 
set serveroutput on; 

DECLARE 
  reginfo1     sys.aq$_reg_info; 
  reginfo2     sys.aq$_reg_info; 
  reginfo3     sys.aq$_reg_info; 
  reginfolist  sys.aq$_reg_info_list; 

BEGIN 
   -- register for the pl/sql procedure notifyCB to be called on notification 
  reginfo1 := sys.aq$_reg_info('WS.WS_BOOKEDORDERS_QUE:BOOKED_ORDERS', 
                     DBMS_AQ.NAMESPACE_AQ, 'plsql://WS.notifyCB', 
                     HEXTORAW('FF')); 

  -- register for an e-mail to be sent to john@company.com on notification 
  reginfo2 := sys.aq$_reg_info('WS.WS_BOOKEDORDERS_QUE:BOOKED_ORDERS', 
                          DBMS_AQ.NAMESPACE_AQ, 'mailto://john@company.com', 
                            HEXTORAW('FF')); 

  -- register for an HTTP servlet to be invoked for notification 
  reginfo3 := sys.aq$_reg_info('WS.WS_BOOKEDORDERS_QUE:BOOKED_ORDERS', 
                          DBMS_AQ.NAMESPACE_AQ, 
                          'http://xyz.oracle.com/servlets/NotifyServlet', 
                            HEXTORAW('FF')); 

  -- Create the registration information list 
  reginfolist := sys.aq$_reg_info_list(reginfo1); 
  reginfolist.EXTEND; 
  reginfolist(2) := reginfo2; 

  reginfolist.EXTEND; 
  reginfolist(3) := reginfo3; 

-- do the registration 
  sys.dbms_aq.register(reginfolist, 3); 

END; 

Registering for Notifications Using the Oracle Streams AQ XML Servlet

Clients can register for Oracle Streams AQ notifications over the Internet.

The register request has the following format:

?xml version="1.0"?>
<Envelope xmlns= "http://schemas.xmlsoap.org/soap/envelope/">
      <Body>

        <AQXmlRegister xmlns = "http://ns.oracle.com/AQ/schemas/access">

          <register_options>
            <destination>WS.WS_BOOKEDORDERS_QUE</destination>
            <consumer_name>BOOKED_ORDERS</consumer_name>
            <notify_url>mailto://john@company.com</notify_url>
          </register_options>

          <AQXmlCommit/>
        </AQXmlRegister>
      </Body>
</Envelope>

The e-mail notification sent to john@company.com has the following format:

<?xml version="1.0"?>
<Envelope xmlns="http://www.oracle.com/schemas/IDAP/envelope">
    <Body>
        <AQXmlNotification xmlns="http://www.oracle.com/schemas/AQ/access">
            <notification_options>
                <destination>WS.WS_BOOKEDORDERS_QUE</destination>
            </notification_options>
            <message_set>
                <message>
                    <message_header>
                       <message_id>81128B6AC46D4B15E03408002092AA15</message_id>
                       <correlation>RUSH</correlation>
                       <priority>1</priority>
                       <delivery_count>0</delivery_count>
                       <sender_id>
                            <agent_name>john</agent_name>
                       </sender_id>
                       <message_state>0</message_state>
                    </message_header>
                </message>
            </message_set>
        </AQXmlNotification>
    </Body>
</Envelope>

Propagation Features

In this section, the following topics are discussed:

Propagation Overview

This feature allows applications to communicate with each other without being connected to the same database or to the same queue. Messages can be propagated from one queue to another. The destination queue can be located in the same database or in a remote database. Propagation is performed by job queue background processes. Propagation to the remote queue uses database links over Oracle Net Services or HTTP(S).

The propagation feature is used as follows. First one or more subscribers are defined for the queue from which messages are to be propagated. Second, a schedule is defined for each destination where messages are to be propagated from the queue. Enqueued messages are propagated and automatically available for dequeuing at the destination queues.

For propagation over the Internet, you must specify the remote Internet user in the database link. The remote Internet user must have privileges to enqueue in the destination queue.

Two or more job_queue background processes must be running to use propagation. This is in addition to the number of job_queue background processes needed for handling non-propagation related jobs. Also, if you want to deploy remote propagation, then you must ensure that the database link specified for the schedule is valid and have proper privileges for enqueuing into the destination queue.


See Also:

"Propagation Scheduling " for more information about the administrative commands for managing propagation schedules

Propagation also has mechanisms for handling failure. For example, if the database link specified is invalid, then the appropriate error message is reported.

Finally, propagation provides detailed statistics about the messages propagated and the schedule itself. This information can be used to properly tune the schedules for best performance.


See Also:

"Enhanced Propagation Scheduling Capabilities " for a discussion of the failure handling and error reporting facilities of propagation and propagation statistics

Propagation Scheduling

A propagation schedule is defined for a pair of source and destination database links. If a queue has messages to be propagated to several queues, then a schedule must be defined for each of the destination queues. A schedule indicates the time frame during which messages can be propagated from the source queue. This time frame can depend on a number of factors such as network traffic, load at source database, load at destination database, and so on. The schedule therefore must be tailored for the specific source and destination. When a schedule is created, a job is automatically submitted to the job_queue facility to handle propagation.

The administrative calls for propagation scheduling provide flexibility for managing the schedules. The duration or propagation window parameter of a schedule specifies the time frame during which propagation must take place. If the duration is unspecified, then the time frame is an infinite single window. If a window must be repeated periodically, then a finite duration is specified along with a next_time function that defines the periodic interval between successive windows.

The propagation schedules defined for a queue can be changed or dropped at any time during the life of the queue. In addition there are calls for temporarily disabling a schedule (instead of dropping the schedule) and enabling a disabled schedule. A schedule is active when messages are being propagated in that schedule. All the administrative calls can be made irrespective of whether the schedule is active or not. If a schedule is active, then it takes a few seconds for the calls to be executed.


Scenario

In the BooksOnLine example, messages in the OE_bookedorders_que are propagated to different shipping sites. The following example code illustrates the various administrative calls available for specifying and managing schedules. It also shows the calls for enqueuing messages into the source queue and for dequeuing the messages at the destination site. The catalog view USER_QUEUE_SCHEDULES provides all information relevant to a schedule.


PL/SQL (DBMS_AQADM Package): Example Code
CONNECT OE/OE; 
 
/* Schedule Propagation from bookedorders_que to shipping: */ 
EXECUTE DBMS_AQADM.SCHEDULE_PROPAGATION( \
   queue_name      => 'OE.OE_bookedorders_que'); 
 
/* Check if a schedule has been created: */ 
SELECT * FROM user_queue_schedules; 
 
/* Enqueue some orders into OE_bookedorders_que: */ 
EXECUTE BOLADM.order_enq('My First   Book', 1, 1001, 'CA', 'USA', \
   'WESTERN', 'NORMAL'); 
EXECUTE BOLADM.order_enq('My Second  Book', 2, 1002, 'NY', 'USA', \
   'EASTERN', 'NORMAL'); 
EXECUTE BOLADM.order_enq('My Third   Book', 3, 1003, '',   'Canada', \
   'INTERNATIONAL', 'NORMAL'); 
EXECUTE BOLADM.order_enq('My Fourth  Book', 4, 1004, 'NV', 'USA', \
   'WESTERN', 'RUSH'); 
EXECUTE BOLADM.order_enq('My Fifth   Book', 5, 1005, 'MA', 'USA', \
   'EASTERN', 'RUSH'); 
EXECUTE BOLADM.order_enq('My Sixth   Book', 6, 1006, ''  , 'UK', \ 
   'INTERNATIONAL', 'NORMAL'); 
EXECUTE BOLADM.order_enq('My Seventh Book', 7, 1007, '',   'Canada', \
   'INTERNATIONAL', 'RUSH'); 
EXECUTE BOLADM.order_enq('My Eighth  Book', 8, 1008, '',   'Mexico', \
   'INTERNATIONAL', 'NORMAL'); 
EXECUTE BOLADM.order_enq('My Ninth   Book', 9, 1009, 'CA', 'USA', \
   'WESTERN', 'RUSH'); 
EXECUTE BOLADM.order_enq('My Tenth   Book', 8, 1010, ''  , 'UK', \
   'INTERNATIONAL', 'NORMAL'); 
EXECUTE BOLADM.order_enq('My Last    Book', 7, 1011, ''  , 'Mexico', \
   'INTERNATIONAL', 'NORMAL'); 
 
/* Wait for propagation to happen: */ 
EXECUTE dbms_lock.sleep(100); 
 
/* Connect to shipping sites and check propagated messages: */
CONNECT WS/WS; 
set serveroutput on; 
 
/*  Dequeue all booked orders for West_Shipping: */
EXECUTE BOLADM.shipping_bookedorder_deq('West_Shipping', DBMS_AQ.REMOVE); 
 
CONNECT ES/ES; 
SET SERVEROUTPUT ON; 
 
/* Dequeue all remaining booked orders (normal order) for East_Shipping: */
EXECUTE BOLADM.shipping_bookedorder_deq('East_Shipping', DBMS_AQ.REMOVE); 
 
CONNECT TS/TS; 
SET SERVEROUTPUT ON; 
 
/* Dequeue all international North American orders for Overseas_Shipping: */ 
EXECUTE BOLADM.get_northamerican_orders('Overseas_Shipping'); 
 
/* Dequeue rest of the booked orders for Overseas_Shipping: */ 
EXECUTE BOLADM.shipping_bookedorder_deq('Overseas_Shipping', DBMS_AQ.REMOVE); 
 
/* Disable propagation schedule for booked orders */ 
EXECUTE DBMS_AQADM.DISABLE_PROPAGATION _SCHEDULE(   \
   queue_name   => 'OE_bookedorders_que'); 
 
/* Wait for some time for call to be effected: */ 
EXECUTE dbms_lock.sleep(30); 
 
/*  Check if the schedule has been disabled: */ 
SELECT schedule_disabled FROM user_queue_schedules; 
 
/* Alter propagation schedule for booked orders to run every
   15 mins (900 seconds) for a window duration of 300 seconds: */ 
EXECUTE DBMS_AQADM.ALTER_PROPAGATION _SCHEDULE( \
   queue_name     => 'OE_bookedorders_que', \
   duration       => 300, \
   next_time      => 'SYSDATE + 900/86400',\
   latency        => 25); 
 
/* Wait for some time for call to be effected: */ 
EXECUTE dbms_lock.sleep(30); 
 
/*  Check if the schedule parameters have changed: */ 
SELECT next_time, latency, propagation_window FROM user_queue_schedules; 
 
/* Enable propagation schedule for booked orders: 
EXECUTE DBMS_AQADM.ENABLE_PROPAGATION _SCHEDULE( \
   queue_name     => 'OE_bookedorders_que'); 
 
/* Wait for some time for call to be effected: */ 
EXECUTE dbms_lock.sleep(30); 
 
/* Check if the schedule has been enabled: */ 
SELECT schedule_disabled FROM user_queue_schedules; 
 
/* Unschedule propagation for booked orders: */ 
EXECUTE DBMS_AQADM.UNSCHEDULE_PROPAGATION(   \
   queue_name      => 'OE.OE_bookedorders_que'); 
 
/* Wait for some time for call to be effected: */ 
EXECUTE dbms_lock.sleep(30); 
 
/*  Check if the schedule has been dropped 
SELECT *  FROM user_queue_schedules; 


Visual Basic (OO4O): Example Code

This functionality is currently not available.


Java (JDBC): Example Code

No example is provided with this release.

Propagation of Messages with LOB Attributes

Large Objects can be propagated using Oracle Streams AQ using two methods:

  • Propagation from RAW queues. In RAW queues the message payload is stored as a BLOB. This allows users to store up to 32KB of data when using the PL/SQL interface and as much data as can be contiguously allocated by the client when using OCI. This method is supported by all releases after 8.0.4 inclusive.

  • Propagation from Object queues with LOB attributes. The user can populate the LOB and read from the LOB using Oracle Database LOB handling routines. The LOB attributes can be BLOBs or CLOBs (not NCLOBs). If the attribute is a CLOB, then Oracle Streams AQ automatically performs any necessary characterset conversion between the source queue and the destination queue. This method is supported by all releases from 8.1.3 inclusive.


    Note:

    Payloads containing LOBs require users to grant explicit Select, Insert and Update privileges on the queue table for doing enqueues and dequeues.


Scenario

In the BooksOnLine application, the company may wish to send promotional coupons along with the book orders. These coupons are generated depending on the content of the order, and other customer preferences. The coupons are images generated from some multimedia database, and are stored as LOBs.

When the order information is sent to the shipping warehouses, the coupon contents are also sent to the warehouses. In the following code, order_typ is enhanced to contain a coupon attribute of LOB type. The code demonstrates how the LOB contents are inserted into the message that is enqueued into OE_bookedorders_que when an order is placed. The message payload is first constructed with an empty LOB. The place holder (LOB locator) information is obtained from the queue table and is then used in conjunction with the LOB manipulation routines, such as DBMS_LOB.WRITE(), to fill the LOB contents. The example illustrates enqueuing and dequeuing of messages with LOBs as part the payload.

A COMMIT is applied only after the LOB contents are filled in with the appropriate image data. Propagation automatically takes care of moving the LOB contents along with the rest of the message contents. The following code also shows a dequeue at the destination queue for reading the LOB contents from the propagated message. The LOB contents are read into a buffer that can be sent to a printer for printing the coupon.


PL/SQL (DBMS_AQADM Package): Example Code
/* Enhance the type order_typ to contain coupon field (lob field): */ 
CREATE OR REPLACE TYPE order_typ AS OBJECT ( 
        orderno         NUMBER, 
        status          VARCHAR2(30), 
        ordertype       VARCHAR2(30), 
        orderregion     VARCHAR2(30), 
        customer        customer_typ, 
        paymentmethod   VARCHAR2(30), 
        items           orderitemlist_vartyp, 
        total           NUMBER, 
        coupon          BLOB);
/ 
 
/* lob_loc is a variable of type BLOB, 
   buffer is a variable of type RAW, 
   length is a variable of type NUMBER. */ 
 
/* Complete the order data and perform the enqueue using the order_enq() 
   procedure: */
dbms_aq.enqueue('OE.OE_bookedorders_que', enqopt, msgprop,
                OE_enq_order_data, enq_msgid); 
 
/* Get the lob locator in the queue table after enqueue: */ 
SELECT t.user_data.coupon INTO lob_loc 
FROM   OE.OE_orders_pr_mqtab t 
WHERE  t.msgid = enq_msgid; 
 
/* Generate a sample LOB of 100 bytes: */ 
buffer := hextoraw(rpad('FF',100,'FF')); 
 
/* Fill in the lob using LOB routines in the dbms_lob package: */
dbms_lob.write(lob_loc, 90, 1, buffer); 
 
/* Applies a commit only after filling in lob contents: */ 
COMMIT; 
 
/* Sleep until propagation is complete: */ 
 
/* Perform dequeue at the Western Shipping warehouse: */ 
dbms_aq.dequeue( 
        queue_name         => qname, 
        dequeue_options    => dopt, 
        message_properties => mprop, 
        payload            => deq_order_data, 
        msgid              => deq_msgid); 
 
/* Get the LOB locator after dequeue: */ 
lob_loc := deq_order_data.coupon; 
 
/* Get the length of the LOB: */ 
length := dbms_lob.getlength(lob_loc); 
 
/* Read the LOB contents into the buffer: */ 
dbms_lob.read(lob_loc, length, 1, buffer); 
 

Visual Basic (OO4O): Example Code

This functionality is not available currently.


Java (JDBC): Example Code

No example is provided with this release.

Enhanced Propagation Scheduling Capabilities

Detailed information about the schedules can be obtained from the catalog views defined for propagation.

Information about active schedules—such as the name of the background process handling that schedule, the SID (session, serial number) for the session handling the propagation and the Oracle Database instance handling a schedule (relevant if Real Application Clusters are being used)—can be obtained from the catalog views. The same catalog views also provide information about the previous successful execution of a schedule (last successful propagation of message) and the next execution of the schedule.

For each schedule, detailed propagation statistics are maintained:

  • The total number of messages propagated in a schedule

  • Total number of bytes propagated in a schedule

  • Maximum number of messages propagated in a window

  • Maximum number of bytes propagated in a window

  • Average number of messages propagated in a window

  • Average size of propagated messages

  • Average time to propagated a message

These statistics have been designed to provide useful information to the queue administrators for tuning the schedules such that maximum efficiency can be achieved.

Propagation has built-in support for handling failures and reporting errors. For example, if the specified database link is invalid, if the remote database is unavailable, or if the remote queue is not enabled for enqueuing, then the appropriate error message is reported. Propagation uses an exponential backoff scheme for retrying propagation from a schedule that encountered a failure.

If a schedule continuously encounters failures, then the first retry happens after 30 seconds, the second after 60 seconds, the third after 120 seconds and so forth. If the retry time is beyond the expiration time of the current window, then the next retry is attempted at the start time of the next window. A maximum of 16 retry attempts is made, after which the schedule is automatically disabled. When a schedule is disabled automatically due to failures, the relevant information is written into the alert log.

A check for scheduling failures indicates:

  • How many successive failures were encountered

  • The error message indicating the cause for the failure

  • The time at which the last failure was encountered

By examining this information, a queue administrator can fix the failure and enable the schedule. During a retry, if propagation is successful, the number of failures is reset to 0.

Propagation has support built-in for Oracle Real Application Clusters and is transparent to the user and the queue administrator. The job that handles propagation is submitted to the same instance as the owner of the queue table where the queue resides.

If there is a failure at an instance and the queue table that stores the queue is migrated to a different instance, then the propagation job is also migrated to the new instance. This minimizes pinging between instances and thus offers better performance. Propagation has been designed to handle any number of concurrent schedules. The number of job queue processes is limited to a maximum of 1000, and some of these can be used to handle jobs unrelated to propagation. Hence, propagation has built-in support for multitasking and load balancing.

The propagation algorithms are designed such that multiple schedules can be handled by a single snapshot (job_queue) process. The propagation load on a job_queue process can be skewed based on the arrival rate of messages in the different source queues.

If one process is overburdened with several active schedules while another is less loaded with many passive schedules, then propagation automatically redistributes the schedules so they are loaded uniformly.


Scenario

In the BooksOnLine example, the OE_bookedorders_que is a busy queue, because messages in it are propagated to different shipping sites. The following example code illustrates the calls supported by enhanced propagation scheduling for error checking and schedule monitoring.


PL/SQL (DBMS_AQADM Package): Example Code
CONNECT OE/OE; 
 
/*  get averages 
select avg_time, avg_number, avg_size from user_queue_schedules; 
 
/*  get totals 
select total_time, total_number, total_bytes from user_queue_schedules; 
 
/*  get maximums for a window 
select max_number, max_bytes from user_queue_schedules; 
 
/*  get current status information of schedule 
select process_name, session_id, instance, schedule_disabled
   from user_queue_schedules; 
 
/*  get information about last and next execution 
select last_run_date, last_run_time, next_run_date, next_run_time 
   from user_queue_schedules; 
 
/*  get last error information if any 
select failures, last_error_msg, last_error_date, last_error_time
   from user_queue_schedules; 

Visual Basic (OO4O): Example Code

This functionality is currently not available.


Java (JDBC): Example Code

No example is provided with this release.

Exception Handling During Propagation

When system errors such as a network failure occur, Oracle Streams AQ continues to attempt to propagate messages using an exponential backoff algorithm. In some situations that indicate application errors, Oracle Streams AQ marks messages as UNDELIVERABLE if a message propagation error occurs.

Examples of such errors are when the remote queue does not exist or when there is a type mismatch between the source queue and the remote queue. In such situations users must query the DBA_SCHEDULES view to determine the last error that occurred during propagation to a particular destination. The trace files in the $ORACLE_HOME/log directory can provide additional information about the error.


Scenario

In the BooksOnLine example, the ES_bookedorders_que in the Eastern Shipping Region is stopped intentionally using the stop_queue() call. After a short while the propagation schedule for OE_bookedorders_que displays an error indicating that the remote queue ES_bookedorders_que is disabled for enqueuing. When the ES_bookedorders_que is started using the start_queue() call, propagation to that queue resumes and there is no error message associated with schedule for OE_bookedorders_que.


PL/SQL (DBMS_AQADM Package): Example Code
/*  Intentionally stop the Eastern Shipping queue: */
connect BOLADM/BOLADM 
EXECUTE DBMS_AQADM.STOP_QUEUE(queue_name => 'ES.ES_bookedorders_que');
 
/* Wait for some time before error shows up in dba_queue_schedules: */ 
EXECUTE dbms_lock.sleep(100); 

/* This query returns an ORA-25207 enqueue failed error: */ 
SELECT qname, last_error_msg from dba_queue_schedules; 
 
/* Start the Eastern Shipping queue: */ 
EXECUTE DBMS_AQADM.START_QUEUE(queue_name => 'ES.ES_bookedorders_que');
 
/* Wait for Propagation to resume for Eastern Shipping queue: */ 
EXECUTE dbms_lock.sleep(100); 
 
/* This query indicates that there are no errors with propagation: 
SELECT qname, last_error_msg from dba_queue_schedules; 

Visual Basic (OO4O): Example Code

This functionality is handled by the database.


Java (JDBC): Example Code

No example is provided with this release.

Message Format Transformation During Propagation

At propagation time, a transformation can be specified when adding a rule-based subscriber to OE_bookedorders_topic for Western Shipping orders. The transformation is applied to the orders, transforming them to the WS.order_typ_sh type before propagating them to WS_bookedorders_topic.


PL/SQL (DBMS_AQADM Package): Example Code
declare
subscriber     sys.aq$_agent;
begin
  subscriber :=sys.aq$_agent('West_Shipping','WS.WS_bookedorders_topic',null);
  DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'OE.OE_bookedorders_topic',
       subscriber     => subscriber,
       rule           => 'tab.user_data.orderregion =''WESTERN''
                          AND tab.user_data.ordertype != ''RUSH''',
       transformation => 'OE.OE2WS');
end;

Visual Basic (OO4O): Example Code

No example is provided with this release.


Java (JDBC): Example Code

No example is provided with this release.

Propagation Using HTTP

In Oracle Database 10g and higher, you can set up Oracle Streams AQ propagation over HTTP and HTTPS (HTTP over SSL). HTTP propagation uses the Internet access infrastructure and requires that the Oracle Streams AQ servlet that connects to the destination database be deployed. The database link must be created with the connect string indicating the Web server address and port and indicating HTTP as the protocol. The source database must be created for running Java and XML. Otherwise, the setup for HTTP propagation is more or less the same as Oracle Net Services propagation.


Scenario

In the BooksOnLine example, messages in the OE_bookedorders_que are propagated to different shipping sites. For the purpose of this scenario, the Western Shipping application is running on another database, 'dest-db' and we propagates to WS_bookedorders_que.


Propagation Setup
  1. Deploy the Oracle Streams AQ Servlet.

    HTTP propagation depends on Internet access to the destination database. Create a class AQPropServlet that extends the AQxmlServlet.

    import java.io.*;
    import javax.servlet.*;
    import javax.servlet.http.*;
    import oracle.AQ.*;
    import oracle.AQ.xml.*;
    import java.sql.*;
    import oracle.jms.*;
    import javax.jms.*;
    import java.io.*;
    import oracle.jdbc.pool.*;
    
    /* This is an Oracle Streams AQ Propagation Servlet. */
    public class AQPropServlet extends oracle.AQ.xml.AQxmlServlet
    
    /* getDBDrv - specify the database to which the servlet connects */
    public AQxmlDataSource createAQDataSource() throws AQxmlException
    {
      AQxmlDataSource  db_drv = null;
      db_drv = new AQxmlDataSource("aqadm", "aqadm", "dest-db", "dest-host",
          5521);
        return db_drv;
      }
    
      public void init()
      {
          try {
            AQxmlDataSource axds = this.createAQDataSource();
            setAQDataSource(axds) ;
            setSessionMaxInactiveTime(180) ;
    
          } catch (Exception e) {
             System.err.println("Error in init : " +e) ;
          }
      }
    }
    
    

    This servlet must connect to the destination database. The servlet must be deployed on the Web server in the path aqserv/servlet. In Oracle Database 10g and higher, the propagation servlet name and deployment path are fixed; that is, they must be AQPropServlet and aqserv/servlet, respectively.

    Assume that the Web server host and port are webdest.oracle.com and 8081, respectively.

  2. Create the database link dba.

    • Specify HTTP as the protocol.

    • Specify the username and password that are used for authentication with the Web server/servlet runner as the host and port of the Web server running the Oracle Streams AQ servlet.

    For this example, the connect string of the database link should be as follows:

    (DESCRIPTION=(ADDRESS=(PROTOCOL=http)(HOST=webdest.oracle.com)(PORT=8081))
    
    

    If SSL is used, then specify HTTPS as the protocol in the connect string.

    Create the database link as follows:

    create public database link dba connect to john IDENTIFIED BY welcome
    using
    '(DESCRIPTION=(ADDRESS=(PROTOCOL=http)(HOST=webdest.oracle.com)(PORT=8081))';
    
    

    Here john is the Oracle Streams AQ HTTP agent used to access the Oracle Streams AQ (propagation) servlet. Welcome is the password used to authenticate with the Web server.

  3. Make sure that the Oracle Streams AQ HTTP agent, John, is authorized to perform Oracle Streams AQ operations. Do the following at the destination database.

    1. Register the Oracle Streams AQ agent.

      DBMS_AQADM.CREATE_AQ_AGENT(agent_name => 'John', enable_http => true);
      
      
    2. Map the Oracle Streams AQ agent to a database user.

      DBMS_AQADM.ENABLE_DB_ACCESS(agent_name =>'John', db_username =>'CBADM')'
      
      
  4. Set up the remote subscription to OE.OE_bookedorders_que.

    EXECUTE DBMS_AQADM.ADD_SUBSCRIBER('OE.OE_bookedorders_que',
    aq$_agent(null, 'WS.WS_bookedorders_que', null));
    
    
  5. Start propagation by calling dbms_aqdm.schedule_propagation at the source database.

    DBMS_AQADM.SCHEDULE_PROPAGATION('OE.OE_bookedorders_que', 'dba');
    
    

All other propagation administration APIs work the same for HTTP propagation. Use the propagation view, DBA_QUEUE_SCHEDULES, to check the propagation statistics for propagation schedules using HTTP.