Skip Headers

Oracle® Streams Advanced Queuing User's Guide and Reference
Release 10.1

Part Number B10785-01
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Index
Index
Go to Master Index
Master Index
Go to Feedback page
Feedback

Go to previous page
Previous
Go to next page
Next
View PDF

11 Creating Oracle Streams AQ Applications Using JMS

This chapter describes the Oracle Java Message Service (JMS) interface to Oracle Streams Advanced Queuing (AQ).

This chapter contains these topics:

General Features of JMS and Oracle JMS

This section contains these topics:

J2EE Compliance

In Oracle Database 10g, Oracle JMS conforms to the Sun Microsystems JMS 1.1 standard. You can define the J2EE compliance mode for an Oracle Java Message Service (OJMS) client at run time. For compliance, set the Java property "oracle.jms.j2eeCompliant" to TRUE as a command line option. For noncompliance, do nothing. FALSE is the default value.

Features in Oracle Streams AQ that support J2EE compliance (and are also available in the noncompliant mode) include:

  • Nontransactional sessions

  • Nondurable subscribers

  • Temporary queues and topics

  • Nonpersistent delivery mode

  • Multiple JMS messages types on a single JMS queue or topic (using Oracle Streams AQ queues of the AQ$_JMS_MESSAGE type)

  • The noLocal option for durable subscribers


    See Also:


Features of JMSPriority, JMSExpiration, and nondurable subscribers vary depending on which mode you use.

JMSPriority

Table 11-1 shows how JMSPriority values depend on whether you are running the default, noncompliant mode or the compliant mode, in which you set the compliance flag to TRUE.

Table 11-1 JMSPriority

Priority Noncompliant Mode Compliant Mode
Lowest java.lang.Integer.MAX_VALUE 0
Highest java.lang.Integer.MIN_VALUE 9
Default 1 4

JMSExpiration

JMSExpiration values depend on whether you are running the default, noncompliant mode or the compliant mode, in which you set the compliance flag to TRUE.

In noncompliant mode, the JMSExpiration header value is the sum of the enqueue time and the TimeToLive, as specified in the JMS specification when a message is enqueued. When a message is received, the duration of the expiration (not the expiration time) is returned. If a message never expires, then -1 is returned.

In compliant mode, the JMSExpiration header value in a dequeued message is the sum of the JMS time stamp when the message was enqueued (Greenwich Mean Time, in milliseconds) and the TimeToLive (in milliseconds). If a message never expires, then 0 is returned.

Durable Subscribers

Durable subscriber action, when subscribers use the same name, depends on whether you are running the default, noncompliant mode or the compliant mode, in which you set the compliance flag to TRUE.

In noncompliant mode, two durable TopicSubscribers with the same name can be active against two different topics.

In compliant mode, durable subscribers with the same name are not allowed. If two subscribers use the same name and are created against the same topic, but the selector used for each subscriber is different, then the underlying Oracle Streams AQ subscription is altered using the internal DBMS_AQJMS.ALTER_SUBSCRIBER() call.

If two subscribers use the same name and are created against two different topics, and if the client that uses the same subscription name also originally created the subscription name, then the existing subscription is dropped and the new subscription is created.

If two subscribers use the same name and are created against two different topics, and if a different client (a client that did not originate the subscription name) uses an existing subscription name, then the subscription is not dropped and an error is thrown. Because it is not known if the subscription was created by JMS or PL/SQL, the subscription on the other topic should not be dropped.

JMS Connection and Session

This section contains these topics:

ConnectionFactory Objects

A ConnectionFactory encapsulates a set of connection configuration parameters that has been defined by an administrator. A client uses it to create a connection with a JMS provider. In this case Oracle JMS, part of Oracle Database, is the JMS provider.

The three types of ConnectionFactory objects are:

  • ConnectionFactory

  • QueueConnectionFactory

  • TopicConnectionFactory

You can obtain ConnectionFactory objects two different ways:

Using AQjmsFactory to Obtain ConnectionFactory Objects

You can use the AQjmsFactory class to obtain a handle to a ConnectionFactory, QueueConnectionFactory, or TopicConnectionFactory object.

To obtain a ConnectionFactory, which supports both point-to-point and publish/subscribe operations, use

AQjmsFactory.getConnectionFactory()

To obtain a QueueConnectionFactory, use

AQjmsFactory.getQueueConnectionFactory()

To obtain a TopicConnectionFactory, use

AQjmsFactory.getTopicConnectionFactory()

The ConnectionFactory, QueueConnectionFactory, or TopicConnectionFactory can be created using hostname, port number, and SID driver or by using JDBC URL and properties.

Example 11-1 JMS: Getting a Queue Connection Factory for a Database

public static void get_Factory() throws JMSException
{
  QueueConnectionFactory    qc_fact   = null;
 /* get queue connection factory for database "aqdb", host "sun-123", 
 port 5521, driver "thin" */
 qc_fact = AQjmsFactory.getQueueConnectionFactory("sun-123", "aqdb", 
                                                  5521, "thin");
}

Using JNDI to Look Up ConnectionFactory Objects

A JMS administrator can register ConnectionFactory objects in a Lightweight Directory Access Protocol (LDAP) server. The following setup is required to enable Java Naming and Directory Interface (JNDI) lookup in JMS:


Register Database

When the Oracle Database server is installed, the database must be registered with the LDAP server. This can be accomplished using the Database Configuration Assistant (DBCA). Figure 11-1 shows the structure of Oracle Streams AQ entries in the LDAP server. ConnectionFactory information is stored under <cn=OracleDBConnections>, while topics and queues are stored under <cn=OracleDBQueues>.

Figure 11-1 Structure of Oracle Streams AQ Entries in LDAP Server

Description of adque446.gif follows
Description of the illustration adque446.gif


Set Parameter GLOBAL_TOPIC_ENABLED

The GLOBAL_TOPIC_ENABLED system parameter for the database must be set to TRUE. This ensures that all queues and topics created in Oracle Streams AQ are automatically registered with the LDAP server. This parameter can be set by using

ALTER SYSTEM SET GLOBAL_TOPICS_ENABLED = TRUE 


Register ConnectionFactory Objects

After the database has been set up to use an LDAP server, the JMS administrator can register ConnectionFactory, QueueConnectionFactory, and TopicConnectionFactory objects in LDAP by using:

AQjmsFactory.registerConnectionFactory()

The registration can be accomplished in one of the following ways:

  • Connect directly to the LDAP server

    The user must have the GLOBAL_AQ_USER_ROLE to register connection factories in LDAP

    To connect directly to LDAP, the parameters for the registerConnectionFactory method include the LDAP context, the name of the ConnectionFactory, QueueConnectionFactory, or TopicConnectionFactory, hostname, database SID, port number, JDBC driver (thin or oci8) and factory type (queue or topic).

  • Connect to LDAP through the database server

    The user can log on to Oracle Database first and then have the database update the LDAP entry. The user that logs on to the database must have the AQ_ADMINISTRATOR_ROLE to perform this operation.

    To connect to LDAP through the database server, the parameters for the registerConnectionFactory method include a JDBC connection (to a user having AQ_ADMINISTRATOR_ROLE), the name of the ConnectionFactory, QueueConnectionFactory, or TopicConnectionFactory, hostname, database SID, port number, JDBC driver (thin or oci8) and factory type (queue or topic).

After the ConnectionFactory objects have been registered in LDAP by a JMS administrator, they can be looked up by using JNDI

Example 11-2 Registering a Order Entry Queue Connection Factory in LDAP

Suppose the JMS administrator wants to register an order entry queue connection factory, oe_queue_factory. In LDAP, it can be registered as follows:

public static void register_Factory_in_LDAP() throws Exception
{
    Hashtable env = new Hashtable(5, 0.75f);
    env.put(Context.INITIAL_CONTEXT_FACTORY, AQjmsConstants.INIT_CTX_FACTORY);

    // aqldapserv is your LDAP host and 389 is your port
    env.put(Context.PROVIDER_URL, "ldap://aqldapserv:389); 

    // now authentication information
    // username/password scheme, user is OE, password is OE
    env.put(Context.SECURITY_AUTHENTICATION, "simple"); 
    env.put(Context.SECURITY_PRINCIPAL, "cn=oe,cn=users,cn=acme,cn=com");
    env.put(Context.SECURITY_CREDENTIALS, "oe");

    /* register queue connection factory for database "aqdb", host "sun-123", 
    port 5521, driver "thin" */
    AQjmsFactory.registerConnectionFactory(env, "oe_queue_factory", "sun-123", 
        "aqdb", 5521, "thin", "queue");
}

After order entry, queue connection factory oe_queue_factory has been registered in LDAP; it can be looked up as follows:

public static void get_Factory_from_LDAP() throws Exception
{
    Hashtable env = new Hashtable(5, 0.75f);
    env.put(Context.INITIAL_CONTEXT_FACTORY, AQjmsConstants.INIT_CTX_FACTORY);

    // aqldapserv is your LDAP host and 389 is your port
    env.put(Context.PROVIDER_URL, "ldap://aqldapserv:389); 

    // now authentication information
    // username/password scheme, user is OE, password is OE
    env.put(Context.SECURITY_AUTHENTICATION, "simple"); 
    env.put(Context.SECURITY_PRINCIPAL, "cn=oe,cn=users,cn=acme,cn=com");
    env.put(Context.SECURITY_CREDENTIALS, "oe");

    DirContext inictx = new InitialDirContext(env);
    // initialize context with the distinguished name of the database server
    inictx=(DirContext)inictx.lookup("cn=db1,cn=OracleContext,cn=acme,cn=com");

    //go to the connection factory holder cn=OraclDBConnections
    DirContext connctx = (DirContext)inictx.lookup("cn=OracleDBConnections");

    // get connection factory "oe_queue_factory"
    QueueConnectionFactory qc_fact = 
      (QueueConnectionFactory)connctx.lookup("cn=oe_queue_factory");
}

JMS Connection

A JMS Connection is a client's active connection to its JMS provider. A JMS Connection performs several critical services:

  • Encapsulates either an open connection or a pool of connections with a JMS provider

  • Typically represents an open TCP/IP socket (or a set of open sockets) between a client and a provider's service daemon

  • Provides a structure for authenticating clients at the time of its creation

  • Creates Sessions

  • Provides connection metadata

  • Supports an optional ExceptionListener

A JMS Connection to the database can be created by invoking createConnection(), createQueueConnection(), or createTopicConnection() and passing the parameters username and password on the ConnectionFactory, QueueConnectionFactory, or TopicConnectionFactory object respectively.


Connection Setup

A JMS client typically creates a Connection, Session and a number of MessageProducers and MessageConsumers. In the current version only one open session for each connection is allowed, except in the following cases:

  • If the JDBC oci8 driver is used to create the JMS connection

  • If the user provides an OracleOCIConnectionPool instance during JMS connection creation

When a Connection is created it is in stopped mode. In this state no messages can be delivered to it. It is typical to leave the Connection in stopped mode until setup is complete. At that point the Connection start() method is called and messages begin arriving at the Connection consumers. This setup convention minimizes any client confusion that can result from asynchronous message delivery while the client is still in the process of setup.

It is possible to start a Connection and to perform setup subsequently. Clients that do this must be prepared to handle asynchronous message delivery while they are still in the process of setting up. A MessageProducer can send messages while a Connection is stopped.

Some of the methods that are supported on the Connection object are

  • start(), which starts or restart delivery of incoming messages

  • stop(), which temporarily stops delivery of incoming messages

    When a Connection object is stopped, delivery to all of its message consumers is inhibited. Also, synchronous receive's block and messages are not delivered to message listener.

  • close(), which closes the JMS session and releases all associated resources

  • createSession(true, 0), which creates a JMS Session using a JMS Connection instance

  • createQueueSession(true, 0), which creates a QueueSession

  • createTopicSession(true, 0), which creates a TopicSession

  • setExceptionListener(ExceptionListener), which sets an exception listener for the connection

    This allows a client to be asynchronously notified of a problem. Some connections only consume messages, so they have no other way to learn the connection has failed.

  • getExceptionListener(), which gets the ExceptionListener for this connection

JMS Session

A Connection is a factory for Sessions that use its underlying connection to a JMS provider for producing and consuming messages. A JMS Session is a single threaded context for producing and consuming messages. Although it can allocate provider resources outside the Java Virtual Machine (JVM), it is considered a light-weight JMS object.

A Session serves several purposes:

  • Constitutes a factory for its MessageProducers and MessageConsumers

  • Provides a way to get a handle to destination objects (queues/topics)

  • Supplies provider-optimized message factories

  • Supports a single series of transactions that combines work spanning this session's MessageProducers and MessageConsumers, organizing these into units

  • Defines a serial order for the messages it consumes and the messages it produces

  • Serializes execution of MessageListeners registered with it

In Oracle Database 10g, you can create as many JMS Sessions as resources allow using a single JMS Connection, when using either jdbc thin or jdbc thick (OCI) drivers.

Because a provider can allocate some resources on behalf of a Session outside the JVM, clients should close them when they are not needed. Relying on garbage collection to eventually reclaim these resources may not be timely enough. The same is true for the MessageProducers and MessageConsumers created by a Session.

Methods on the Session object include:

  • commit(), which commits all messages performed in this transaction and releases locks currently held

  • rollback(), which rolls back any messages accomplished in the transaction and release locks currently held

  • close(), which closes the session

  • getDBConnection(), which gets a handle to the underlying JDBC connection

    This handle can be used to perform other SQL DML operations as part of the same Session. The method is specific to Oracle JMS.

  • acknowledge(), which acknowledges message receipt in a nontransactional session

  • recover(), which restarts message delivery in a nontransactional session

    In effect, the series of delivered messages in the Session is reset to the point after the last acknowledged message.

The following are some Oracle JMS extensions:

  • createQueueTable() creates a queue table

  • getQueueTable() gets a handle to an existing queue table

  • createQueue() creates a queue

  • getQueue() gets a handle to an existing queue

  • createTopic() creates a topic

  • getTopic() gets a handle to an existing topic

The Session object must be cast to AQjmsSession to use any of the extensions.


Note:

The JMS specification expects providers to return null messages when receives are accomplished on a JMS connection instance that has not been started.

After you create a javax.jms.Connection instance, you must call the start() method on it before you can receive messages. If you add a line like t_conn.start(); any time after the connection has been created, but before the actual receive, then you can receive your messages.


JMS Connection Examples

The following code illustrates how some of the preceding calls are used.

Example 11-3 JMS: Creating and Starting Queues and Queue Connections

public static void bol_example(String ora_sid, String host, int port,
                               String driver)
{
 
 QueueConnectionFactory    qc_fact   = null;
 QueueConnection           q_conn    = null;
 QueueSession              q_sess    = null;
 AQQueueTableProperty      qt_prop   = null;
 AQQueueTable              q_table   = null;
 AQjmsDestinationProperty  dest_prop = null;
 Queue                     queue     = null;
 BytesMessage              bytes_msg = null;

 try
 {
   /* get queue connection factory */
   qc_fact = AQjmsFactory.getQueueConnectionFactory(host, ora_sid,
                      port, driver);

   /* create queue connection */
   q_conn = qc_fact.createQueueConnection("boluser", "boluser");

   /* create QueueSession */
   q_sess = q_conn.createQueueSession(true, Session.CLIENT_ACKNOWLEDGE);

   /* start the queue connection */
   q_conn.start();

   qt_prop = new AQQueueTableProperty("SYS.AQ$_JMS_BYTES_MESSAGE");

   /* create a queue table */
   q_table = ((AQjmsSession)q_sess).createQueueTable("boluser",
                                                     "bol_ship_queue_table",
                                                     qt_prop);

   dest_prop = new AQjmsDestinationProperty();

   /* create a queue */
   queue = ((AQjmsSession)q_sess).createQueue(q_table, "bol_ship_queue",
                                             dest_prop);

   /* start the queue */
   ((AQjmsDestination)queue).start(q_sess, true, true);

   /* create a bytes message */
   bytes_msg = q_sess.createBytesMessage();

   /* close session */
   q_sess.close();

   /* close connection */
   q_conn.close();
 }
 catch (Exception ex)
 {
   System.out.println("Exception: " + ex);
 }
}

JMS Destination

A Destination is an object a client uses to specify the destination where it sends messages, and the source from which it receives messages. A Destination object can be a Queue or a Topic. In Oracle Streams AQ, these map to a schema.queue at a specific database. Queue maps to a single-consumer queue, and Topic maps to a multiconsumer queue.

Destination objects can be obtained in one of the following ways:

Using a JMS Session to Obtain Destination Objects

Destination objects are created from a Session object using domain-specific Session methods:

  • AQjmsSession.getQueue(queue_owner, queue_name) gets a handle to a JMS queue

  • AQjmsSession.getTopic(topic_owner, topic_name) gets a handle to a JMS topic

Using JNDI to Look Up Destination Objects

The database can be configured to register schema objects with an LDAP server. If a database has been configured to use LDAP and the GLOBAL_TOPIC_ENABLED parameter has been set to TRUE, then all JMS queues and topics are automatically registered with the LDAP server when they are created.

The administrator can also create aliases to the queues and topics registered in LDAP using the DBMS_AQAQDM.add_alias_to_ldap PL/SQL procedure.

Queues and topics that are registered in LDAP can be looked up through JNDI using the name or alias of the queue or topic.

JMS Destination Methods

Methods on the Destination object include:

  • alter(), which alters a Queue or a Topic

  • schedulePropagation(), which schedules propagation from a source to a destination

  • unschedulePropagation(), which unschedules a previously scheduled propagation

  • enablePropagationSchedule(), which enables a propagation schedule

  • disablePropagationSchedule(), which disables a propagation schedule

  • start(), which starts a Queue or a Topic

  • The queue can be started for enqueue or dequeue. The topic can be started for publish or subscribe.

  • stop(), which stops a Queue or a Topic

    The queue is stopped for enqueue or dequeue. The topic is stopped for publish or subscribe.

    drop(), which drops a Queue or a Topic

JMS Destination Examples

The following code illustrates how some of the preceding calls are used.

Example 11-4 JMS: Using JNDI to Lookup Destination Objects

Suppose we have a new orders queue OE.OE_neworders_que stored in LDA. It can be looked up as follows:

public static void get_Factory_from_LDAP() throws Exception
{
    Hashtable env = new Hashtable(5, 0.75f);
    env.put(Context.INITIAL_CONTEXT_FACTORY, AQjmsConstants.INIT_CTX_FACTORY);

    // aqldapserv is your LDAP host and 389 is your port
    env.put(Context.PROVIDER_URL, "ldap://aqldapserv:389); 

    // now authentication information
    // username/password scheme, user is OE, password is OE
    env.put(Context.SECURITY_AUTHENTICATION, "simple"); 
    env.put(Context.SECURITY_PRINCIPAL, "cn=oe,cn=users,cn=acme,cn=com");
    env.put(Context.SECURITY_CREDENTIALS, "oe");

    DirContext inictx = new InitialDirContext(env);
    // initialize context with the distinguished name of the database server
    inictx=(DirContext)inictx.lookup("cn=db1,cn=OracleContext,cn=acme,cn=com");

   // go to the destination holder
    DirContext destctx = (DirContext)inictx.lookup("cn=OracleDBQueues");

    // get the destination OE.OE_new_orders queue
    Queue myqueue = (Queue)destctx.lookup("cn=OE.OE_new_orders_que");

}

Example 11-5 JMS: Using JNDI to Perform Various Operations on a Destination Object

public static void setup_example(TopicSession t_sess)
{
  AQQueueTableProperty     qt_prop   = null;
  AQQueueTable             q_table   = null;
  AQjmsDestinationProperty dest_prop = null; 
  Topic                    topic     = null;
  TopicConnection          t_conn    = null;

  try
  {
    qt_prop = new AQQueueTableProperty("SYS.AQ$_JMS_BYTES_MESSAGE");
    /* create a queue table */
    q_table = ((AQjmsSession)t_sess).createQueueTable("boluser",
                           "bol_ship_queue_table",
                           qt_prop);
    dest_prop = new AQjmsDestinationProperty();
    /* create a topic */
    topic = ((AQjmsSession)t_sess).createTopic(q_table, "bol_ship_queue",
                        dest_prop);

    /* start the topic */
    ((AQjmsDestination)topic).start(t_sess, true, true);

    /* schedule propagation from topic "boluser" to the destination 
      dblink "dba" */
    ((AQjmsDestination)topic).schedulePropagation(t_sess, "dba", null,
                   null, null, null);
    /* 
       some processing accomplished here
    */
    /* Unschedule propagation */
    ((AQjmsDestination)topic).unschedulePropagation(t_sess, "dba");
    /* stop the topic */
    ((AQjmsDestination)topic).stop(t_sess, true, true, true);
    /* drop topic */
    ((AQjmsDestination)topic).drop(t_sess);
    /* drop queue table */
    q_table.drop(true);
    /* close session */
    t_sess.close();
    /* close connection */
    t_conn.close();
  }
  catch(Exception ex)
  {
    System.out.println("Exception: " + ex);
  }
}

System-Level Access Control in JMS

Oracle8i or higher supports system-level access control for all queuing operations. This feature allows an application designer or DBA to create users as queue administrators. A queue administrator can invoke administrative and operational JMS interfaces on any queue in the database. This simplifies administrative work, because all administrative scripts for the queues in a database can be managed under one schema.

When messages arrive at the destination queues, sessions based on the source queue schema name are used for enqueuing the newly arrived messages into the destination queues. This means that you must grant enqueue privileges to the destination queues to schemas of the source queues.

To propagate to a remote destination queue, the login user (specified in the database link in the address field of the agent structure) should either be granted the ENQUEUE_ANY privilege, or be granted the rights to enqueue to the destination queue. However, you are not required to grant any explicit privileges if the login user in the database link also owns the queue tables at the destination.

Destination-Level Access Control in JMS

Oracle8i or higher supports access control for enqueue and dequeue operations at the queue or topic level. This feature allows the application designer to protect queues and topics created in one schema from applications running in other schemas. You must grant only minimal access privileges to the applications that run outside the schema of the queue or topic. The supported access privileges on a queue or topic are ENQUEUE, DEQUEUE and ALL.

Retention and Message History in JMS

Oracle Streams AQ allows users to retain messages in the queue table. This means that SQL can then be used to query these messages for analysis. Messages are often related to each other. For example, if a message is produced as a result of the consumption of another message, then the two are related. As the application designer, you may want to keep track of such relationships. Along with retention and message identifiers, Oracle Streams AQ lets you automatically create message journals, also called tracking journals or event journals. Taken together, retention, message identifiers and SQL queries make it possible to build powerful message warehouses.

Example 11-6 JMS: Analyzing Retention and Message History in Oracle Streams AQ

Suppose that the shipping application must determine the average processing times of orders. This includes the time the order must wait in the backed_order topic. Specifying the retention as TRUE for the shipping queues and specifying the order number in the correlation field of the message, SQL queries can be written to determine the wait time for orders in the shipping application.

For simplicity, we analyze only orders that have already been processed. The processing time for an order in the shipping application is the difference between the enqueue time in the WS_bookedorders_topic and the enqueue time in the WS_shipped_orders_topic.

SELECT  SUM(SO.enq_time - BO.enq_time) / count (*) AVG_PRCS_TIME 
   FROM WS.AQ$WS_orders_pr_mqtab BO , WS.AQ$WS_orders_mqtab SO
   WHERE SO.msg_state = 'PROCESSED' and BO.msg_state = 'PROCESSED' 
   AND SO.corr_id = BO.corr_id and SO.queue = 'WS_shippedorders_topic'; 
 
/* Average waiting time in the backed order queue: */ 
SELECT SUM(BACK.deq_time - BACK.enq_time)/count (*) AVG_BACK_TIME 
   FROM WS.AQ$WS_orders_mqtab BACK
   WHERE BACK.msg_state = 'PROCESSED' AND BACK.queue = 'WS_backorders_topic'; 

Supporting Oracle Real Application Clusters in JMS

Oracle Real Application Clusters can be used to improve Oracle Streams AQ performance by allowing different queues to be managed by different instances. You do this by specifying different instance affinities (preferences) for the queue tables that store the queues. This allows queue operations (enqueue/dequeue) or topic operations (publish/subscribe) on different queues or topics to occur in parallel.

The Oracle Streams AQ queue monitor process continuously monitors the instance affinities of the queue tables. The queue monitor assigns ownership of a queue table to the specified primary instance if it is available, failing which it assigns it to the specified secondary instance.

If the owner instance of a queue table terminates, then the queue monitor changes ownership to a suitable instance such as the secondary instance.

Oracle Streams AQ propagation is able to make use of Real Application Clusters, although it is transparent to the user. The affinities for jobs submitted on behalf of the propagation schedules are set to the same values as that of the affinities of the respective queue tables. Thus, a job_queue_process associated with the owner instance of a queue table is handling the propagation from queues stored in that queue table, thereby minimizing pinging.

Supporting Statistics Views in JMS

Each instance keeps its own Oracle Streams AQ statistics information in its own System Global Area (SGA), and does not have knowledge of the statistics gathered by other instances. Then, when a GV$AQ view is queried by an instance, all other instances funnel their statistics information to the instance issuing the query.

Example 11-7 JMS: Querying Oracle Streams AQ Statistics Views

The GV$AQ view can be queried at any time to see the number of messages in waiting, ready or expired state. The view also displays the average number of seconds messages have been waiting to be processed. The order processing application can use this to dynamically tune the number of order-processing processes.

CONNECT oe/oe 
 
/* Count the number as messages and the average time for which the messages 
   have been waiting: */ 
SELECT READY, AVERAGE_WAIT 
FROM gv$aq Stats, user_queues Qs 
WHERE Stats.qid = Qs.qid and Qs.Name = 'OE_neworders_que'; 

Structured Payload/Message Types in JMS

JMS messages are composed of a header, properties, and a body.

The header consists of header fields, which contain values used by both clients and providers to identify and route messages. All messages support the same set of header fields.

Properties are optional header fields. In addition to standard properties defined by JMS, there can be provider-specific and application-specific properties.

The body is the message payload. JMS defines various types of message payloads, and a type that can store JMS messages of any or all JMS-specified message types.

JMS Message Headers

A JMS connection can contain only a header; a message body is not required. The message header contains the following fields:

  • JMSDestination contains the destination to which the message is sent. In Oracle Streams AQ this corresponds to the destination queue/topic.

  • JMSDeliveryMode determines whether the message is logged or not. JMS supports persistent delivery (where messages are logged to stable storage) and nonpersistent delivery (messages not logged). Oracle Streams AQ supports persistent message delivery. JMS permits an administrator to configure JMS to override the client-specified value for JMSDeliveryMode.

  • JMSMessageID uniquely identifies a message in a provider. All message IDs must begin with the string ID:.

  • JMSTimeStamp contains the time the message was handed over to the provider to be sent. This maps to Oracle Streams AQ message enqueue time.

  • JMSCorrelationID can be used by a client to link one message with another.

  • JMSReplyTo contains a destination supplied by a client when a message is sent. Clients can use the following types to specify the ReplyTo destination: oracle.jms.AQjmsAgent; javax.jms.Queue; and javax.jms.Topic.

  • JMSType contains a message type identifier supplied by a client at send time. For portability Oracle recommends that the JMSType be symbolic values.

  • JMSExpiration is the sum of the enqueue time and the TimeToLive in non-J2EE compliance mode. In compliant mode, the JMSExpiration header value in a dequeued message is the sum of the JMS time stamp when the message was enqueued (Greenwich Mean Time, in milliseconds) and the TimeToLive (in milliseconds). JMS permits an administrator to configure JMS to override the client-specified value for JMSExpiration.

  • JMSPriority contains the priority of the message. In J2EE-compliance mode, the permitted values for priority are 09, with 9 the highest priority and 4 the default, in conformance with the Sun Microsystem JMS 1.1 standard. Noncompliant mode is the default. JMS permits an administrator to configure JMS to override the client-specified value for JMSPriority.

  • JMSRedelivered is a Boolean set by the JMS provider.


    See Also:

    "J2EE Compliance"

Table 11-2 lists the type and use of each JMS message header field and shows by whom it is set.

Table 11-2 JMS Message Header Fields

Message Header Field Type Set by Use
JMSDestination Destination JMS after Send method has completed Destination to which message is sent
JMSDeliveryMode int JMS after Send method has completed Delivery mode (PERSISTENT or NONPERSISTENT)
JMSExpiration long JMS after Send method has completed Expiration time can be specified for a message producer or can be explicitly specified during each send or publish
JMSPriority int JMS after Send method has completed Message priority can be specified for a MessageProducer or can be explicitly specified during each send or publish
JMSMessageID String JMS after Send method has completed Uniquely identifies each message sent by the provider
JMSTimeStamp long JMS after Send method has completed Time message is handed to provider to be sent
JMSCorrelationID String JMS client Links one message with another
JMSReplyTo Destination JMS client Destination where a reply to the message should be sent. It can be specified as AQjmsAgent, javax.jms.Queue, or javax.jms.Topic types
JMSType String JMS client Message type identifier
JMSRedelivered Boolean JMS provider Message probably was delivered earlier, but the client did not acknowledge it at that time

JMS Message Properties

Properties add optional header fields to a message. Properties allow a client, using message selectors, to have a JMS provider select messages on its behalf using application-specific criteria. Property names are strings and values can be: Boolean, byte, short, int, long, float, double, and string.

JMS-defined properties, which all begin with "JMSX", include the following:

  • JMSXUserID is the identity of the user sending the message.

  • JMSXAppID is the identity of the application sending the message.

  • JMSXDeliveryCount is the number of message delivery attempts.

  • JMSXGroupid is set by the client and refers to the identity of the message group that this message belongs to.

  • JMSXGroupSeq is the sequence number of a message within a group.

  • JMSXRcvTimeStamp is the time the message was delivered to the consumer (dequeue time).

  • JMSXState is the message state, set by the provider. The message state can be WAITING, READY, EXPIRED, or RETAINED.

Table 11-3 lists the type and use of each JMS standard message property and shows by whom it is set.

Table 11-3 JMS Defined Message Properties

JMS Defined Message Property Type Set by Use
JMSXUserID String JMS after Send method has completed Identity of user sending message
JMSAppID String JMS after Send method has completed Identity of application sending message
JMSDeliveryCount int JMS after Receive method has completed Number of message delivery attempts
JMSXGroupID String JMS client Identity of message group to which message belongs
JMSXGroupSeq int JMS client Sequence number of message within group
JMSXRcvTimeStamp String JMS after Receive method has completed Time that JMS delivered message to consumer
JMSXState int JMS provider Message state set by provider

Oracle-specific JMS properties, which all begin with JMS_Oracle, include the following:

  • JMS_OracleExcpQ is the queue name to send the message to if it cannot be delivered to the original destination. Only destinations of type EXCEPTION can be specified in the JMS_OracleExcpQ property.

  • JMS_OracleDelay is the time in seconds to delay the delivery of the message. This can affect the order of message delivery.

  • JMS_OracleOriginalMessageId is set to the message ID of the message in the source if the message is propagated from one destination to another. If the message is not propagated, then this property has the same value as the JMSMessageId.

A client can add additional header fields to a message by defining properties. These properties can then be used in message selectors to select specific messages.

JMS properties or header fields are set either explicitly by the client or automatically by the JMS provider (these are generally read-only). Some JMS properties are set using the parameters specified in send and receive operations.

Table 11-4 Oracle Defined Message Properties

Header Field/Property Type Set by Use
JMS_OracleExcpQ String JMS client Specifies the name of the exception queue
JMS_OracleDelay int JMS client Specifies the time (seconds) after which the message should become available to the consumers
JMS_OracleOriginalMessageID String JMS provider Specifies the message ID of the message in source when the messages are propagated from one destination to another

JMS Message Body

JMS provides five forms of message body:

  • StreamMessage - a message whose body contains a stream of Java primitive values. It is filled and read sequentially.

  • BytesMessage - a message whose body contains a stream of uninterpreted bytes. This message type is for directly encoding a body to match an existing message format.

  • MapMessage - a message whose body contains a set of name-value pairs. Names are strings and values are Java primitive types. The entries can be accessed sequentially by enumerator or randomly by name.

  • TextMessage - a message whose body contains a java.lang.String.

  • ObjectMessage - a message that contains a serializable Java object.

  • ADTmessage - a message whose body contains an Oracle object type (AdtMessage type has been added in Oracle JMS).

The AQ$_JMS_MESSAGE Type

This type can store JMS messages of all the JMS-specified message types: JMSStream, JMSBytes, JMSMap, JMSText, and JMSObject. You can create a queue table of AQ$_JMS_MESSAGE type, but use any message type.

JMS Message Body: Stream Message

A StreamMessage is used to send a stream of Java primitives. It is filled and read sequentially. It inherits from Message and adds a stream message body. Its methods are based largely on those found in java.io.DataInputStream and java.io.DataOutputStream.

The primitive types can be read or written explicitly using methods for each type. They can also be read or written generically as objects. To use Stream Messages, create the queue table with the SYS.AQ$_JMS_STREAM_MESSAGE or AQ$_JMS_MESSAGE payload types.

Stream messages support the following conversion table. A value written as the row type can be read as the column type.

Table 11-5 Stream Message Conversion

Input Boolean byte short char int long float double String byte[]
Boolean X - - - - - - - X -
byte - X X - X X - - X -
short - - X - X X - - X -
char - - - X - - - - X -
int - - - - X X - - X -
long - - - - - X - - X -
float - - - - - - X X X -
double - - - - - - - X X -
string X X X X X X X X X -
byte[] - - - - - - - - - X

JMS Message Body: Bytes Message

A BytesMessage is used to send a message containing a stream of uninterpreted bytes. It inherits Message and adds a bytes message body. The receiver of the message supplies the interpretation of the bytes. Its methods are based largely on those found in java.io.DataInputStream and java.io.DataOutputStream.

This message type is for client encoding of existing message formats. If possible, one of the other self-defining message types should be used instead.

The primitive types can be written explicitly using methods for each type. They can also be written generically as objects. To use Bytes Messages, create the queue table with SYS.AQ$_JMS_BYTES_MESSAGE or AQ$_JMS_MESSAGE payload types.

JMS Message Body: Map Message

A MapMessage is used to send a set of name-value pairs where names are Strings and values are Java primitive types. The entries can be accessed sequentially or randomly by name. The order of the entries is undefined. It inherits from Message and adds a map message body. The primitive types can be read or written explicitly using methods for each type. They can also be read or written generically as objects.

To use Map Messages, create the queue table with the SYS.AQ$_JMS_MAP_MESSAGE or AQ$_JMS_MESSAGE payload types. Map messages support the following conversion table. A value written as the row type can be read as the column type.

Table 11-6 Map Message Conversion

Input Boolean byte short char int long float double String byte[]
Boolean X - - - - - - - X -
byte - X X - X X - - X -
short - - X - X X - - X -
char - - - X - - - - X -
int - - - - X X - - X -
long - - - - - X - - X -
float - - - - - - X X X -
double - - - - - - - X X -
string X X X X X X X X X -
byte[] - - - - - - - - - X

JMS Message Body: Text Message

A TextMessage is used to send a message containing a java.lang.StringBuffer. It inherits from Message and adds a text message body. The text information can be read or written using methods getText() and setText(...). To use Text Messages, create the queue table with the SYS.AQ$_JMS_TEXT_MESSAGE or AQ$_JMS_MESSAGE payload types.

JMS Message Body: Object Message

An ObjectMessage is used to send a message that contains a serializable Java object. It inherits from Message and adds a body containing a single Java reference. Only serializable Java objects can be used. If a collection of Java objects must be sent, then one of the collection classes provided in JDK 1.4 can be used. The objects can be read or written using the methods getObject() and setObject(...).To use Object Messages, create the queue table with the SYS.AQ$_JMS_OBJECT_MESSAGE or AQ$_JMS_MESSAGE payload types.

Example 11-8 JMS: Processing an ObjectMessage Body

public void enqueue_new_orders(QueueSession jms_session, BolOrder new_order)
{
   QueueSender   sender;
   Queue         queue;
   ObjectMessage obj_message;

   try
   {
       /* get a handle to the new_orders queue */
       queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");
       sender = jms_session.createSender(queue);
       obj_message = jms_session.createObjectMessage();
       obj_message.setJMSCorrelationID("RUSH");
       obj_message.setObject(new_order);
       jms_session.commit();
    }
    catch (JMSException ex)
    {
      System.out.println("Exception: " + ex); 
    }

}

JMS Message Body: AdtMessage

An AdtMessage is used to send a message that contains a Java object that maps to an Oracle object type. These objects inherit from Message and add a body containing a Java object that implements the CustomDatum or ORAData interface.


See Also:

Oracle Database Java Developer's Guide for information about the CustomDatum and ORAData interfaces

To use AdtMessage, create the queue table with payload type as the Oracle object type. The AdtMessage payload can be read and written using the getAdtPayload and setAdtPayload methods.

You can also use an AdtMessage to send messages to queues of type SYS.XMLType. You must use the oracle.xdb.XMLType class to create the message.


Using Message Properties with Different Message Types

The following message properties can be set by the client using the setProperty call. For StreamMessage, BytesMessage, ObjectMessage, TextMessage, and MapMessage, the client can set:

  • JMSXAppID

  • JMSXGroupID

  • JMSXGroupSeq

  • JMS_OracleExcpQ

  • JMS_OracleDelay

For AdtMessage, the client can set:

  • JMS_OracleExcpQ

  • JMS_OracleDelay

The following message properties can be obtained by the client using the getProperty call. For StreamMessage, BytesMessage, ObjectMessage, TextMessage, and MapMessage, the client can get:

  • JMSXuserID

  • JMSXAppID

  • JMSXDeliveryCount

  • JMSXGroupID

  • JMSXGroupSeq

  • JMSXRecvTimeStamp

  • JMSXState

  • JMS_OracleExcpQ

  • JMS_OracleDelay

  • JMS_OracleOriginalMessageID

For AdtMessage, the client can get:

  • JMSXDeliveryCount

  • JMSXRecvTimeStamp

  • JMSXState

  • JMS_OracleExcpQ

  • JMS_OracleDelay

The following JMS properties and header fields that can be included in a Message Selector. For QueueReceiver, TopicSubscriber and TopicReceiver on queues containing JMS type payloads, any SQL92 where clause of a string that contains:

  • JMSPriority (int)

  • JMSCorrelationID (String)

  • JMSMessageID (String) - only for QueueReceiver and TopicReceiver

  • JMSTimestamp (Date)

  • JMSType (String)

  • JMSXUserID (String)

  • JMSXAppID (String)

  • JMSXGroupID (String)

  • JMSXGroupSeq (int)

  • Any user-defined property in JMS message

For QueueReceiver, TopicSubscriber and TopicReceiver on queues containing Oracle object type payloads, use Oracle Streams AQ rule syntax for any SQL92 where clause of string that contains:

  • corrid

  • priority

  • tab.user_data.adt_field_name

JMS Point-to-Point Model Features

Queues

In the point-to-point model, clients exchange messages using queues - from one point to another. These queues are used by message producers and consumers to send and receive messages.

An administrator creates single-consumer queues by means of the createQueue method in AQjmsSession. A client can obtain a handle to a previously created queue using the getQueue method on AQjmsSession.

These queues are described as single-consumer queues because a message can be consumed by only a single consumer. Put another way: a message can be consumed exactly once. This raises the question: What happens when there are multiple processes or operating system threads concurrently dequeuing from the same queue? Because a locked message cannot be dequeued by a process other than the one that has created the lock, each process dequeues the first unlocked message at the head of the queue.

Before using a queue, the queue must be enabled for enqueue/dequeue using the start call in AQjmsDestination.

After processing, the message is removed if the retention time of the queue is 0, or is retained for a specified retention time. As long as the message is retained, it can be either

  • queried using SQL on the queue table view, or

  • dequeued using a QueueBrowser and specifying the message ID of the processed message.

QueueSender

A client uses a QueueSender to send messages to a queue. A QueueSender is created by passing a queue to a session's createSender method. A client also has the option of creating a QueueSender without supplying a queue. In that case a queue must be specified on every send operation.

A client can specify a default delivery mode, priority and TimeToLive for all messages sent by the QueueSender. Alternatively, the client can define these options for each message.

QueueReceiver

A client uses a QueueReceiver to receive messages from a queue. A QueueReceiver is created using the session's createQueueReceiver method. A QueueReceiver can be created with a message selector. This allows the client to restrict messages delivered to the consumer to those that match the selector.

The selector for queues containing payloads of type TextMessage, StreamMessage, BytesMessage, ObjectMessage, MapMessage can contain any expression that has a combination of one or more of the following:

  • JMSMessageID ='ID:23452345' to retrieve messages that have a specified message ID (all message IDs being prefixed with ID:)

  • JMS message header fields or properties:

    JMSPriority < 3 AND JMSCorrelationID = 'Fiction'
    
    JMSCorrelationID LIKE 'RE%'
    
    
  • User-defined message properties:

    color IN ('RED', BLUE', 'GREEN') AND price < 30000 
    
    

For queues containing AdtMessages the selector must be a SQL expression on the message payload contents or message ID or priority or correlation ID.

  • Selector on message ID - to retrieve messages that have a specific message ID

    msgid = '23434556566767676'

    Note: in this case message IDs must NOT be prefixed with ID:

  • Selector on priority or correlation is specified as follows

    priority < 3 AND corrid = 'Fiction'
    
    
  • Selector on message payload is specified as follows

    tab.user_data.color = 'GREEN' AND tab.user_data.price < 30000
    

Example 11-9 Creating a JMS Connection and Session. Creating a Receiver to Receive Messages

In the BOL application, new orders are retrieved from the new_orders_queue. These orders are then published to the OE.OE_bookedorders_topic. After creating a JMS connection and session, you create a receiver to receive messages:

public void get_new_orders(QueueSession jms_session)
  {
   QueueReceiver   receiver;
   Queue           queue;
   ObjectMessage   obj_message;
   BolOrder        new_order;
   BolCustomer     customer;
   String          state;
   String          cust_name;

   try
   {

     /* get a handle to the new_orders queue */
     queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");

     receiver = jms_session.createReceiver(queue);

     for(;;)
     {
       /* wait for a message to show up in the queue */
       obj_message = (ObjectMessage)receiver.receive(10);

       new_order = (BolOrder)obj_message.getObject();

       customer = new_order.getCustomer();
       state    = customer.getState();

       obj_message.clearBody();

       /* determine customer region and assign a shipping region*/
    if((state.equals("CA")) || (state.equals("TX")) ||
     (state.equals("WA")) || (state.equals("NV")))
    obj_message.setStringProperty("Region", "WESTERN");
       else
    obj_message.setStringProperty("Region", "EASTERN");

       cust_name = new_order.getCustomer().getName();

       obj_message.setStringProperty("Customer", cust_name);

    if(obj_message.getJMSCorrelationID().equals("RUSH"))
    book_rush_order(obj_message);
    else
    book_new_order(obj_message);

       jms_session.commit();
     } 
   }
   catch (JMSException ex)
   {
     System.out.println("Exception: " + ex);
   }
}

QueueBrowser

A client uses a QueueBrowser to view messages on a queue without removing them. The browser methods return a java.util.Enumeration that is used to scan the queue's messages. The first call to nextElement gets a snapshot of the queue. A QueueBrowser can also optionally lock messages as it is scanning them. This is similar to a "SELECT... for UPDATE" command on the message. This prevents other consumers from removing the message while they are being scanned.

A QueueBrowser can also be created with a message selector. This allows the client to restrict messages delivered to the browser to those that match the selector.

The selector for queues containing payloads of type TextMessage, StreamMessage, BytesMessage, ObjectMessage, MapMessage can contain any expression that has a combination of one or more of the following:

  • JMSMessageID ='ID:23452345' to retrieve messages that have a specified message ID (all message IDs being prefixed with ID:)

  • JMS message header fields or properties:

    JMSPriority < 3 AND JMSCorrelationID = 'Fiction'
    
    JMSCorrelationID LIKE 'RE%'
    
    
  • User-defined message properties:

    color IN ('RED', BLUE', 'GREEN') AND price < 30000 
    
    

For queues containing AdtMessages the selector must be a SQL expression on the message payload contents or messageID or priority or correlationID.

  • Selector on message ID - to retrieve messages that have a specific messageID

    msgid = '23434556566767676'

    Note: in this case message IDs must NOT be prefixed with ID:.

  • Selector on priority or correlation is specified as follows

    priority < 3 AND corrid = 'Fiction'
    
    
  • Selector on message payload is specified as follows

    tab.user_data.color = 'GREEN' AND tab.user_data.price < 30000
    

JMS Publish/Subscribe Model Features

The following topics are discussed in this section:

Topic

JMS has various features that allow you to develop an application based on a publish/subscribe model. The aim of this application model is to enable flexible and dynamic communication between applications functioning as publishers and applications playing the role of subscribers. The specific design point is that the applications playing these different roles should be decoupled in their communication.They should interact based on messages and message content.

In distributing messages, publisher applications are not required to explicitly handle or manage message recipients. This allows for the dynamic addition of new subscriber applications to receive messages without changing any publisher application logic. Subscriber applications receive messages based on message content without regard to which publisher applications are sending messages. This allows the dynamic addition of subscriber applications without changing any subscriber application logic. Subscriber applications specify interest by defining a rule-based subscription on message properties or the message content of a topic. The system automatically routes messages by computing recipients for published messages using the rule-based subscriptions.

In the publish/subscribe model, messages are published to and received from topics. A topic is created using the CreateTopic method in an AQjmsSession. A client can obtain a handle to a previously-created Topic using the getTopic method in AQjmsSession.

You use the publish/subscribe model of communication in JMS by taking the following steps:

  1. Enable enqueue/dequeue on the Topic using the start call in AQjmsDestination.

  2. Set up one or more topics to hold messages. These topics should represent an area or subject of interest. For example, a topic can be used to represent billed orders.

  3. Create a set of durable subscribers. Each subscriber can specify a selector that represents a specification (selects) for the messages that the subscriber wishes to receive. A null selector indicates that the subscriber wishes to receive all messages published on the topic.

  4. Subscribers can be local or remote. Local subscribers are durable subscribers defined on the same topic on which the message is published. Remote subscribers are other topics, or recipients on other topics that are defined as subscribers to a particular queue. In order to use remote subscribers, you must set up propagation between the two local and remote topic.


    See Also:

    Chapter 8, " Oracle Streams AQ Administrative Interface" for details on propagation

  5. Create TopicPublishers using the session's createPublisher method Messages are published using the publish call. Messages can be published to all subscribers to the topic or to a specified subset of recipients on the topic.

  6. Subscribers can receive messages on the topic by using the receive method.

  7. Subscribers can also receive messages asynchronously by using message listeners. The concepts of remote subscribers and propagation are Oracle extensions to JMS.

Durable Subscriber

Durable subscribers are instituted in either of the following ways:

  • A client uses the session's createDurableSubscriber method to create durable subscribers.

  • A DurableSubscriber is be created with a message selector. This allows the client to restrict messages delivered to the subscriber to those that match the selector.

The selector for topics containing payloads of type TextMessage, StreamMessage, BytesMessage, ObjectMessage, MapMessage can contain any expression that has a combination of one or more of the following:

  • JMS message header fields or properties:

    JMSPriority < 3 AND JMSCorrelationID = 'Fiction'
    
    
  • User-defined message properties:

    color IN ('RED', BLUE', 'GREEN') AND price < 30000 
    
    

For topics containing AdtMessages the selector must be a SQL expression on the message payload contents or priority or correlationID.

  • Selector on priority or correlation is specified as follows

    priority < 3 AND corrid = 'Fiction'
    
    
  • Selector on message payload is specified as follows

    tab.user_data.color = 'GREEN' AND tab.user_data.price < 30000
    
    

The syntax for the selector is described in detail in createDurableSubscriber in Oracle Streams Advanced Queuing Java API Reference.

Remote subscribers are defined using the createRemoteSubscriber call. The remote subscriber can be a specific consumer at the remote topic or all subscribers at the remote topic

A remote subscriber is defined using the AQjmsAgent structure. An AQjmsAgent consists of a name and address. The name refers to the consumer_name at the remote topic. The address refers to the remote topic:

schema.topic_name[@dblink]

  • To publish messages to a particular consumer at the remote topic, the subscription_name of the recipient at the remote topic must be specified in the name field of AQjmsAgent. The remote topic must be specified in the address field of AQjmsAgent.

  • To publish messages to all subscribers of the remote topic, the name field of AQjmsAgent must be set to null. The remote topic must be specified in the address field of AQjmsAgent.

Example 11-10 Creating Local and Remote Subscriber and Scheduling Propagation

public void create_bookedorders_subscribers(TopicSession jms_session)
{
   Topic            topic;
   TopicSubscriber  tsubs;
   AQjmsAgent       agt_east;
   AQjmsAgent       agt_west;

   try
   {

     /* get a handle to the OE_bookedorders_topic */
     topic = ((AQjmsSession)jms_session).getTopic("OE", 
                    "OR_bookedorders_topic");


     /* Create local subscriber - to track messages for some customers */
     tsubs = jms_session.createDurableSubscriber(topic, "SUBS1",
             "JMSPriority < 3 AND Customer = 'MARTIN'", 
                   false);

     /* Create remote subscribers in the western and eastern region */
     agt_west = new AQjmsAgent("West_Shipping", "WS.WS_bookedorders_topic");

     ((AQjmsSession)jms_session).createRemoteSubscriber(topic, agt_west, 
                     "Region = 'WESTERN'");

     agt_east = new AQjmsAgent("East_Shipping", "ES.ES_bookedorders_topic");

     ((AQjmsSession)jms_session).createRemoteSubscriber(topic, agt_east, 
                     "Region = 'EASTERN'");

     /* schedule propagation between bookedorders_topic and 
   WS_bookedorders_topic, ES.ES_bookedorders_topic */
     ((AQjmsDestination)topic).schedulePropagation(jms_session, 
                      "WS.WS_bookedorders_topic", 
                         null, null, null, null);

     ((AQjmsDestination)topic).schedulePropagation(jms_session, 
                   "ES.ES_bookedorders_topic", 
                      null, null, null, null);
   }
   catch (Exception ex)
   {
     System.out.println("Exception " + ex);
   }

}

TopicPublisher

Messages are published using TopicPublisher:

A TopicPublisher is created by passing a Topic to a session's createPublisher method. A client also has the option of creating a TopicPublisher without supplying a Topic. In this case, a Topic must be specified on every publish operation. A client can specify a default delivery mode, priority and TimeToLive for all messages sent by the TopicPublisher. It can also specify these options for each message.

Recipient Lists

In the JMS publish/subscribe model, clients can specify explicit recipient lists instead of having messages sent to all the subscribers of the topic. These recipients may or may not be existing subscribers of the topic. The recipient list overrides the subscription list on the topic for this message. The concept of recipient lists is an Oracle extension to JMS.

Example 11-11 JMS: Creating Recipient Lists for Specific Customers

Suppose we want to send high priority messages only to SUBS1 and Fedex_Shipping in the Eastern region instead of publishing them to all the subscribers of the OE_bookedorders_topic:

public void book_rush_order(TopicSession jms_session, 
             ObjectMessage obj_message)
{

   TopicPublisher  publisher;
   Topic           topic;
   AQjmsAgent[]    recp_list = new AQjmsAgent[2];

   try
   {
     /* get a handle to the bookedorders topic */
     topic = ((AQjmsSession) jms_session).getTopic("OE",
                     "OE_bookedorders_topic");

     publisher = jms_session.createPublisher(null);

     recp_list[0] = new AQjmsAgent("SUBS1", null);
     recp_list[1] = new AQjmsAgent("Fedex_Shipping", 
               "ES.ES_bookedorders_topic");

     publisher.setPriority (1);
     ((AQjmsTopicPublisher)publisher).publish(topic, obj_message, recp_list);

     jms_session.commit();

   }
   catch (Exception ex)
   {
     System.out.println("Exception: " + ex);
   }
}

TopicReceiver

If the recipient name is explicitly specified in the recipient list, but that recipient is not a subscriber to the queue, then messages sent to it can be received by creating a TopicReceiver.TopicReceiver is an Oracle extension to JMS.

A TopicReceiver can also be created with a message selector. This allows the client to restrict messages delivered to the recipient to those that match the selector.

The syntax for the selector for TopicReceiver is the same as that for a QueueReceiver.

Example 11-12 JMS: Creating a Topic and Local Subscriber and Waiting for a Message to Show Up in the Topic

public void ship_rush_orders(TopicSession jms_session)
{
   Topic            topic;
   TopicReceiver    trec;
   ObjectMessage    obj_message;
   BolCustomer      customer;
   BolOrder         new_order;
   String           state;
   int              i = 0;

   try
   {
     /* get a handle to the OE_bookedorders_topic */
     topic = ((AQjmsSession)jms_session).getTopic("ES", 
                    "ES_bookedorders_topic");

     /* Create local subscriber - to track messages for some customers */
     trec = ((AQjmsSession)jms_session).createTopicReceiver(topic, 
                         "Fedex_Shipping", 
                         null);

     /* process 10 messages */
     for(i = 0; i < 10; i++)
     {
       /* wait for a message to show up in the topic */
       obj_message = (ObjectMessage)trec.receive(10);

       new_order = (BolOrder)obj_message.getObject();

       customer = new_order.getCustomer();
       state    = customer.getState();

       System.out.println("Rush Order for customer " + 
           customer.getName()); 
       jms_session.commit();
     }
   }
   catch (Exception ex)
   {
     System.out.println("Exception ex: " + ex);
   }
}

For remote subscribers - if the subscriber name at the remote topic has explicitly been specified in the createRemoteSubscriber call, then to receive a message, we can use TopicReceivers

public void get_westernregion_bookedorders(TopicSession jms_session)
{
   Topic            topic;
   TopicReceiver    trec;
   ObjectMessage    obj_message;
   BolCustomer      customer;
   BolOrder         new_order;
   String           state;
   int              i = 0;

   try
   {
     /* get a handle to the WS_bookedorders_topic */
     topic = ((AQjmsSession)jms_session).getTopic("WS", 
                    "WS_bookedorders_topic");
     /* Create local subscriber - to track messages for some customers */
     trec = ((AQjmsSession)jms_session).createTopicReceiver(topic, 
                         "West_Shipping", 
                         null);
     /* process 10 messages */
     for(i = 0; i < 10; i++)
     {
       /* wait for a message to show up in the topic */
       obj_message = (ObjectMessage)trec.receive(10);

       new_order = (BolOrder)obj_message.getObject();

       customer = new_order.getCustomer();
       state    = customer.getState();

       System.out.println("Received Order for customer " + 
           customer.getName()); 
       jms_session.commit();
     }
   }
   catch (Exception ex)
   {
     System.out.println("Exception ex: " + ex); 
   }

}

If the subscriber name is not specified in the createRemoteSubscriber call, then clients must use durable subscribers at the remote site to receive messages.

TopicBrowser

A client uses a TopicBrowser to view messages on a topic without removing them. The browser methods return a java.util.Enumeration that is used to scan the topic's messages. The first call to nextElement gets a snapshot of the topic. A TopicBrowser can also optionally lock messages as it is scanning them. This is similar to a SELECT... for UPDATE command on the message. This prevents other consumers from removing the message while they are being scanned.

A TopicBrowser can also be created with a message selector. This allows the client to restrict messages delivered to the browser to those that match the selector.

The selector for the TopicBrowser can take any of the following forms:

  • JMSMessageID ='ID:23452345' to retrieve messages that have a specified message ID (all message IDs are prefixed with ID:)

  • JMS message header fields or properties:

    JMSPriority < 3 AND JMSCorrelationID = 'Fiction'
    JMSCorrelationID LIKE 'RE%'
    
    
  • User-defined message properties:

    color IN ('RED', BLUE', 'GREEN') AND price < 30000 
    
    

For topics containing AdtMessages, the selector must be a SQL expression on the message payload contents or messageID or priority or correlationID.

  • Selector on message ID - to retrieve messages that have a specific messageID

    msgid = '23434556566767676'

    Note: in this case message IDs must NOT be prefixed with ID:

    Selector on priority or correlation is specified as follows:

    priority < 3 AND corrid = 'Fiction'
    
    
  • Selector on message payload is specified as follows:

    tab.user_data.color = 'GREEN' AND tab.user_data.price < 30000
    
    

As with any consumer for topics, only durable subscribers are allowed to create a TopicBrowser.

TopicBrowser also supports a purge feature. This allows a client using a TopicBrowser to discard all messages that have been seen during the current browse operation on the topic. A purge is equivalent to a destructive receive of all of the seen messages (as if performed using a TopicSubscriber).

For a purge, a message is considered seen if it has been returned to the client using a call to the nextElement() operation on the java.lang.Enumeration for the TopicBrowser. Messages that have not yet been seen by the client are not discarded during a purge. A purge operation can be performed multiple times on the same TopicBrowser.

As with all other JMS messaging operations, the effect of a purge becomes stable when the JMS session used to create the TopicBrowser is committed. If the operations on the session are rolled back, then the effects of the purge operation are also undone.

JMS MessageProducer Features

Priority and Ordering of Messages

The message ordering dictates the order in which messages are received from a queue or topic. The ordering method is specified when the queue table for the queue or topic is created (see "Creating a Queue Table"). Currently, Oracle Streams AQ supports ordering on two of the message attributes:

  • Priority

  • Enqueue time

When combined, they lead to four possible ways of ordering:


First-In, First-Out (FIFO) Ordering of Messages

If enqueue time was chosen as the ordering criteria, then messages are received in the order of the enqueue time. The enqueue time is assigned to the message by Oracle Streams AQ at message publish/send time. This is also the default ordering.


Priority Ordering of Messages

If priority ordering is chosen, then each message is assigned a priority. Priority can be specified as a message property at publish/send time by the MessageProducer. The messages are received in the order of the priorities assigned.


FIFO Priority Ordering

A FIFO-priority topic/queue can also be created by specifying both the priority and the enqueue time as the sort order of the messages. A FIFO-priority topic/queue acts like a priority queue, except if two messages are assigned the same priority, they are received in the order of their enqueue time.


Enqueue Time Followed by Priority

Messages with the same enqueue time are received according to their priorities. If the ordering criteria of two message is the same, then the order they are received is indeterminate. However, Oracle Streams AQ does ensure that messages send/published in the same session with the same ordering criteria are received in the order they were sent.

Time Specification - Delay

Messages can be sent/published to a queue/topic with Delay. The delay represents a time interval after which the message becomes available to the Message Consumer. A message specified with a delay is in a waiting state until the delay expires and the message becomes available. Delay for a message is specified as message property (JMS_OracleDelay). This property is not specified in the JMS standard. It is an Oracle Streams AQ extension to JMS message properties.

Delay processing requires the Oracle Streams AQ background process queue monitor to be started. Note also that receiving by msgid overrides the delay specification.

Time Specification - Expiration

Producers of messages can specify expiration limits, or TimeToLive for messages. This defines the period of time the message is available for a Message Consumer.

TimeToLive can be specified at send/publish time or using the set TimeToLive method of a MessageProducer, with the former overriding the latter. The Oracle Streams AQ background process queue monitor must be running to implement TimeToLive.

Message Grouping

Messages belonging to a queue/topic can be grouped to form a set that can only be consumed by one consumer at a time. This requires the queue/topic be created in a queue table that is enabled for transactional message grouping (see "Creating a Queue Table"). All messages belonging to a group must be created in the same transaction and all messages created in one transaction belong to the same group. Using this feature, you can segment a complex message into simple messages. This is an Oracle Streams AQ extension and not part of the JMS specification.

For example, messages directed to a queue containing invoices could be constructed as a group of messages starting with the header message, followed by messages representing details, followed by the trailer message. Message grouping is also very useful if the message payload contains complex large objects such as images and video that can be segmented into smaller objects.

The general message properties (priority, delay, expiration) for the messages in a group are determined solely by the message properties specified for the first message (head) of the group irrespective of which properties are specified for subsequent messages in the group.

The message grouping property is preserved across propagation. However, the destination topic to which messages must be propagated must also be enabled for transactional grouping. There are also some restrictions you must keep in mind if the message grouping property is to be preserved while dequeuing messages from a queue enabled for transactional grouping.


See Also:

"Dequeue Features"

JMS Message Consumer Features

Receiving Messages

A JMS application can receive messages by creating a message consumer. Messages can be received synchronously using the receive call or asynchronously using a Message Listener.

There are three modes of receive:

  • Block until a message arrives for a consumer

  • Block for a maximum of the specified time

  • Nonblocking

Example 11-13 JMS: Blocking Until a Message Arrives

public BolOrder get_new_order1(QueueSession jms_session)
    {
      Queue            queue;
      QueueReceiver    qrec;
      ObjectMessage    obj_message;
      BolCustomer      customer;
      BolOrder         new_order = null;
      String           state;

      try
      {
      /* get a handle to the new_orders queue */
       queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");

       qrec = jms_session.createReceiver(queue);

       /* wait for a message to show up in the queue */
       obj_message = (ObjectMessage)qrec.receive();

       new_order = (BolOrder)obj_message.getObject();

       customer = new_order.getCustomer();
       state    = customer.getState();

       System.out.println("Order:  for customer " + 
                           customer.getName()); 

      }
      catch (JMSException ex)
      {
        System.out.println("Exception: " + ex);
      }
      return new_order;

   }

Example 11-14 JMS: Blocking Messages for a Maximum of 60 Seconds

public BolOrder get_new_order2(QueueSession jms_session)
    {
      Queue            queue;
      QueueReceiver    qrec;
      ObjectMessage    obj_message;
      BolCustomer      customer;
      BolOrder         new_order = null;
      String           state;

      try
      {
            /* get a handle to the new_orders queue */
       queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");

       qrec = jms_session.createReceiver(queue);

       /* wait for 60 seconds for a message to show up in the queue */
       obj_message = (ObjectMessage)qrec.receive(60000);

       new_order = (BolOrder)obj_message.getObject();

       customer = new_order.getCustomer();
       state    = customer.getState();

       System.out.println("Order:  for customer " + 
                           customer.getName()); 

      }
      catch (JMSException ex)
      {
        System.out.println("Exception: " + ex);
      }
      return new_order;

   }

Example 11-15 JMS: Nonblocking Messages

public BolOrder poll_new_order3(QueueSession jms_session)
    {
      Queue            queue;
      QueueReceiver    qrec;
      ObjectMessage    obj_message;
      BolCustomer      customer;
      BolOrder         new_order = null;
      String           state;

      try
      {
            /* get a handle to the new_orders queue */
       queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");

       qrec = jms_session.createReceiver(queue);

       /* check for a message to show in the queue */
       obj_message = (ObjectMessage)qrec.receiveNoWait();

       new_order = (BolOrder)obj_message.getObject();

       customer = new_order.getCustomer();
       state    = customer.getState();

       System.out.println("Order:  for customer " + 
                           customer.getName()); 

      }
      catch (JMSException ex)
      {
        System.out.println("Exception: " + ex);
      }
      return new_order;

   }

Message Navigation in Receive

When a consumer does the first receive in its session, its gets the first message in the queue or topic. Subsequent receives get the next message, and so on. The default action works well for FIFO queues and topics, but not for priority ordered queues. If a high priority message arrives for the consumer, then this client program does not receive the message until it has cleared the messages that were already there before it.

To provide the consumer better control in navigating the queue for its messages, Oracle Streams AQ navigation modes are made available to it as JMS extensions. These modes can be set at the TopicSubscriber, QueueReceiver or the TopicReceiver.

  • FIRST_MESSAGE resets the consumer's position to the beginning of the queue. This is a useful mode for priority ordered queues, because it allows the consumer to remove the message on the top of the queue.

  • NEXT_MESSAGE gets the message after the established position of the consumer. For example, a NEXT_MESSAGE applied after the position is at the fourth message, will get the second message in the queue. This is the default action.

For transaction grouping

  • FIRST_MESSAGE resets the consumer's position to the beginning of the queue.

  • NEXT_MESSAGE sets the position to the next message in the same transaction.

  • NEXT_TRANSACTION sets the position to the first message in the next transaction.

The transaction grouping property can be negated if messages are received in the following ways:

  • Receive by specifying a correlation identifier in the selector,

  • Receive by specifying a message identifier in the selector,

  • Committing before all the messages of a transaction group have been received.

If in navigating through the queue, the program reaches the end of the queue while using the NEXT_MESSAGE or NEXT_TRANSACTION option, and you have specified a blocking receive, then the navigating position is automatically changed to the beginning of the queue.

By default, a QueueReceiver, TopicReceiver, or TopicSubscriber uses FIRST_MESSAGE for the first receive call, and NEXT_MESSAGE for the subsequent receive calls.


Example Scenario

The get_new_orders() procedure retrieves orders from the OE_neworders_que. Each transaction refers to an order, and each message corresponds to an individual book in that order. The get_orders() procedure loops through the messages to retrieve the book orders. It resets the position to the beginning of the queue using the FIRST_MESSAGE option before the first receive. It then uses the NEXT_MESSAGE navigation option to retrieve the next book (message) of an order (transaction). If it gets an exception indicating all messages in the current group/transaction have been fetched, then it changes the navigation option to NEXT_TRANSACTION and gets the first book of the next order. It then changes the navigation option back to NEXT_MESSAGE for fetching subsequent messages in the same transaction. This is repeated until all orders (transactions) have been fetched.

Example 11-16 JMS: Navigating the Retrieval of Messages

public void get_new_orders(QueueSession jms_session)
    {
      Queue            queue;
      QueueReceiver    qrec;
      ObjectMessage    obj_message;
      BolCustomer      customer;
      BolOrder         new_order;
      String           state;
      int              new_orders = 1;

      try
      {

         /* get a handle to the new_orders queue */
         queue = ((AQjmsSession) jms_session).getQueue("OE","OE_neworders_que");
         qrec = jms_session.createReceiver(queue);

    /* set navigation to first message */
        ((AQjmsTopicSubscriber)qrec).setNavigationMode(AQjmsConstants.NAVIGATION_FIRST_MESSAGE);

        while(new_orders != 0)
        {
          try{

             /* wait for a message to show up in the topic */
             obj_message = (ObjectMessage)qrec.receiveNoWait();

             if (obj_message != null)   /* no more orders in the queue */
             { 
               System.out.println(" No more orders ");
               new_orders = 0;
             }
             new_order = (BolOrder)obj_message.getObject();
             customer = new_order.getCustomer();
             state    = customer.getState();

             System.out.println("Order: for customer " + 
                                customer.getName()); 

            /* Now get the next message */
            ((AQjmsTopicSubscriber)qrec).setNavigationMode(AQjmsConstants.NAVIGATION_NEXT_MESSAGE);
 
          }catch(AQjmsException ex)
          {  if (ex.getErrorNumber() == 25235) 
             {
               System.out.println("End of transaction group");
              ((AQjmsTopicSubscriber)qrec).setNavigationMode(AQjmsConstants.NAVIGATION_NEXT_TRANSACTION);
             }
             else
               throw ex;
          }
        }
     }catch (JMSException ex)
     {
        System.out.println("Exception: " + ex);
      }
   }

Browsing Messages

Aside from the usual receive, which allows the dequeuing client to delete the message from the queue, JMS provides an interface that allows the JMS client to browse its messages in the queue. A QueueBrowser can be created using the createBrowser method from QueueSession.

If a message is browsed, then it remains available for further processing. After a message has been browsed, there is no guarantee that the message will be available to the JMS session again, because a receive call from a concurrent session might remove the message.

To prevent a viewed message from being removed by a concurrent JMS client, you can view the message in the locked mode. To do this, you must create a QueueBrowser with the locked mode using the Oracle Streams AQ extension to the JMS interface.The lock on the message with a browser with locked mode is released when the session performs a commit or a rollback.

To remove the message viewed by a QueueBrowser, the session must create a QueueReceiver and use the JMSmesssageID as the selector.


Example Code

See "QueueBrowser".


Remove-No-Data

The MessageConsumer can remove the message from the queue or topic without retrieving the message using the receiveNoData call. This is useful when the application has already examined the message, perhaps using the QueueBrowser. This mode allows the JMS client to avoid the overhead of retrieving the payload from the database, which can be substantial for a large message.

Retry with Delay Interval

If the transaction receiving the message from a queue/topic fails, then it is regarded as an unsuccessful attempt to remove the message. Oracle Streams AQ records the number of failed attempts to remove the message in the message history.

In addition, it also allows the application to specify the maximum number of retries supported on messages at the queue/topic level. If the number of failed attempts to remove a message exceed this maximum, then the message is moved to the exception queue and is no longer available to applications.

The transaction receiving a message could have terminated due to a bad condition. For example, an order could not be fulfilled because there were insufficient books in stock. Because inventory updates are made every twelve hours, it makes sense to retry after that time. If an order is still not filled after four attempts, then there could be a problem serious enough for the order to move to the exception queue.

Oracle Streams AQ allows users to specify a retry_delay along with max_retries. This means that a message that has undergone a failed attempt at retrieving remains visible in the queue for dequeue after retry_delay interval. Until then it is in the WAITING state. The Oracle Streams AQ background process time manager enforces the retry delay property.

The maximum retries and retry delay are properties of the queue/topic which can be set when the queue/topic is created or using the alter method on the queue/topic. The default value for MAX_RETRIES is 5.

Example 11-17 JMS: Specifying Max Retries and Max Delays in Messages

If an order cannot be filled because of insufficient inventory, then the transaction processing the order is terminated. The bookedorders topic is set up with max_retries = 4 and retry_delay = 12 hours.Thus, if an order is not filled up in two days, then it is moved to an exception queue.

public BolOrder process_booked_order(TopicSession jms_session)
  {
    Topic            topic;
    TopicSubscriber  tsubs;
    ObjectMessage    obj_message;
    BolCustomer      customer;
    BolOrder         booked_order = null;
    String           country;
    int              i = 0;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("WS",
                                                   "WS_bookedorders_topic");

      /* Create local subscriber - to track messages for Western Region  */
      tsubs = jms_session.createDurableSubscriber(topic, "SUBS1",
                                       "Region = 'Western' ",
                                                   false);

       /* wait for a message to show up in the topic */
       obj_message = (ObjectMessage)tsubs.receive(10);

       booked_order = (BolOrder)obj_message.getObject();

       customer = booked_order.getCustomer();
       country    = customer.getCountry();

       if (country == "US")
       {
          jms_session.commit();
       }
       else
       {
          jms_session.rollback();
          booked_order = null;
       }
    }catch (JMSException ex)
    { System.out.println("Exception " + ex) ;}

     return booked_order;
   }

Asynchronously Receiving Messages Using Message Listener

The JMS client can receive messages asynchronously by setting the MessageListener using the setMessageListener method available with the Consumer.

When a message arrives for the message consumer, the onMessage method of the message listener is invoked with the message. The message listener can commit or terminate the receipt of the message. The message listener does not receive messages if the JMS Connection has been stopped. The receive call must not be used to receive messages once the message listener has been set for the consumer.

The JMS client can receive messages asynchronously for all the consumers of the session by setting the MessageListener at the session. No other mode for receiving messages must be used in the session once the message listener has been set.

Example 11-18 Asynchronous receipt of queue messages

The application processing the new orders queue can be set up for asynchronously receiving messages from the queue.

public class OrderListener implements MessageListener
   {
      QueueSession   the_sess;

      /* constructor */
      OrderListener(QueueSession  my_sess)
      {
        the_sess = my_sess;
      }

      /* message listener interface */
      public void onMessage(Message m)
      {
        ObjectMessage    obj_msg;
        BolCustomer      customer;
        BolOrder         new_order = null;

        try {
          /* cast to JMS Object Message */
          obj_msg = (ObjectMessage)m;

          /*  Print some useful information */
          new_order = (BolOrder)obj_msg.getObject();
          customer = new_order.getCustomer();
          System.out.println("Order:  for customer " +  customer.getName()); 

          /* call the process order method 
          * NOTE: we are assuming it is defined elsewhere
          * /
          process_order(new_order);
          /* commit the asynchronous receipt of the message */
           the_sess.commit();
        }catch (JMSException ex)
        { System.out.println("Exception " + ex) ;}

      }
   }

     public void setListener1(QueueSession jms_session)
    {
      Queue            queue;
      QueueReceiver    qrec;
      MessageListener  ourListener;

      try
      {
       /* get a handle to the new_orders queue */
       queue = ((AQjmsSession) jms_session).getQueue("OE", "OE_neworders_que");

       /* create a QueueReceiver */
       qrec = jms_session.createReceiver(queue);

       /* create the message listener */
       ourListener = new OrderListener(jms_session);
 
       /* set the message listener for the receiver */
       qrec.setMessageListener(ourListener);
      }
      catch (JMSException ex)
      {
        System.out.println("Exception: " + ex);
      }
   }

Oracle Streams AQ Exception Handling

Oracle Streams AQ provides four integrated mechanisms to support exception handling in applications: EXCEPTION_QUEUES, EXPIRATION, MAX_RETRIES and RETRY_DELAY.

An exception_queue is a repository for all expired or unserviceable messages. Applications cannot directly enqueue into exception queues. However, an application that intends to handle these expired or unserviceable messages can receive/remove them from the exception queue.

To retrieve messages from exception queues, the JMS client must use the point-to-point interface.The exception queue for messages intended for a topic must be created in a queue table with multiple consumers enabled. Like any other queue, the exception queue must be enabled for receiving messages using the start method in the AQOracleQueue class. You get an exception if you try to enable it for enqueue.

The exception queue is a provider (Oracle) specific message property called "JMS_OracleExcpQ" that can be set with the message before sending/publishing it. If an exception queue is not specified, then the default exception queue is used. If the queue/topic is created in a queue table, say QTAB, then the default exception queue is called AQ$_QTAB_E. The default exception queue is automatically created when the queue table is created.

Messages are moved to the exception queues by Oracle Streams AQ under the following conditions:

  • The message is not being dequeued within the specified timeToLive. For messages intended for more than one subscriber, the message is moved to the exception queue if one or more of the intended recipients is not able to dequeue the message within the specified timeToLive. If the timeToLive was not specified for the message, (either in the publish or send call, or as the publisher or sender), then it never expires.

  • The message was received successfully, but the application terminates the transaction that performed the receive because of an error while processing the message. The message is returned to the queue/topic and is available for any applications that are waiting to receive messages. Because this was a failed attempt to receive the message, its retry count is updated.

    If the retry count of the message exceeds the maximum value specified for the queue/topic where it resides, then it is moved to the exception queue. When a message has multiple subscribers, then the message is moved to the exception queue only when all the recipients of the message have exceeded the retry limit.

    A receive is considered rolled back or undone if the application terminates the entire transaction, or if it rolls back to a savepoint that was taken before the receive.


Note:

A message is moved to an exception queue if RETRY_COUNT is greater than MAX_RETRIES. If a dequeue transaction fails because the server process dies (including ALTER SYSTEM KILL SESSION) or SHUTDOWN ABORT on the instance, then RETRY_COUNT is not incremented.

  • The client program successfully received a message but terminated before committing the transaction.

JMS Propagation

Remote Subscribers

This feature enables applications to communicate with each other without having to be connected to the same database.

Oracle Streams AQ allows a remote subscriber, that is a subscriber at another database, to subscribe to a topic. When a message published to the topic meets the criterion of the remote subscriber, Oracle Streams AQ automatically propagates the message to the queue/topic at the remote database specified for the remote subscriber.

The snapshot (job_queue) background process performs propagation. Propagation is performed using database links and Oracle Net Services.

There are two ways to implement remote subscribers:

  • The createRemoteSubscriber method can be used to create a remote subscriber to/on the topic. The remote subscriber is specified as an instance of the class AQjmsAgent.

  • The AQjmsAgent has a name and an address. The address consists of a queue/topic and the database link (dblink) to the database of the subscriber.

There are two kinds of remote subscribers:


Case 1

The remote subscriber is a topic. This occurs when no name is specified for the remote subscriber in the AQjmsAgent object and the address is a topic. The message satisfying the subscriber's subscription is propagated to the remote topic. The propagated message is now available to all the subscriptions of the remote topic that it satisfies.


Case 2

Specify a specific remote recipient for the message. The remote subscription can be for a particular consumer at the remote database. If the name of the remote recipient is specified (in the AQjmsAgent object), then the message satisfying the subscription is propagated to the remote database for that recipient only. The recipient at the remote database uses the TopicReceiver interface to retrieve its messages. The remote subscription can also be for a point-to-point queue

Example 11-19 JMS: Creating Remote Subscribers

  • Scenario for Case 1. Assume the order entry application and Western region shipping application are on different databases, db1 and db2. Further assume that there is a database link dblink_oe_ws from database db1, the order entry database, to the western shipping database db2. The WS_bookedorders_topic at db2 is a remote subscriber to the OE_bookedorders_topic in db1.

  • Scenario for Case 2. Assume the order entry application and Western region shipping application are on different databases, db1 and db2. Further assume that there is a database link dblink_oe_ws from the local order entry database db1 to the western shipping database db2. The agent "Priority" at WS_bookedorders_topic in db2 is a remote subscriber to the OE_bookedorders_topic in db1. Messages propagated to the WS_bookedorders_topic are for "Priority" only.

public void remote_subscriber(TopicSession jms_session)
   {
     Topic            topic;
     ObjectMessage    obj_message;
     AQjmsAgent       remote_sub;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("OE",
                                                   "OE_bookedorders_topic");
      /* create the remote subscriber, name unspecified  and address
       * the topic WS_bookedorders_topic at db2 
       */
      remote_sub = new AQjmsAgent(null, "WS.WS_bookedorders_topic@dblink_oe_ws");

      /* subscribe for western region orders */
      ((AQjmsSession)jms_session).createRemoteSubscriber(topic, remote_sub, "Region = 'Western' ");
    }
    catch (JMSException ex)
    { System.out.println("Exception :" + ex); }
    catch (java.sql.SQLException  ex1)
    {System.out.println("SQL Exception :" + ex1); }
  }

Database db2 - shipping database: The WS_bookedorders_topic has two subscribers, one for priority shipping and the other normal. The messages from the Order Entry database are propagated to the Shipping database and delivered to the correct subscriber. Priority orders have a message priority of 1.

public void  get_priority_messages(TopicSession jms_session)
   {
     Topic            topic;
     TopicSubscriber  tsubs;
     ObjectMessage    obj_message;
     BolCustomer      customer;
     BolOrder         booked_order;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("WS",
                                                   "WS_bookedorders_topic");

       /* Create local subscriber - for priority messages */
      tsubs = jms_session.createDurableSubscriber(topic, "PRIORITY",
                                       " JMSPriority = 1 ", false);

      obj_message = (ObjectMessage) tsubs.receive();

      booked_order = (BolOrder)obj_message.getObject();
      customer = booked_order.getCustomer();
      System.out.println("Priority Order:  for customer " +  customer.getName()); 

      jms_session.commit();
    }
    catch (JMSException ex)
    { System.out.println("Exception :" + ex); }
  }

  public void  get_normal_messages(TopicSession jms_session)
   {
     Topic            topic;
     TopicSubscriber  tsubs;
     ObjectMessage    obj_message;
     BolCustomer      customer;
     BolOrder         booked_order;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("WS",
                                                   "WS_bookedorders_topic");

       /* Create local subscriber - for priority messages */
      tsubs = jms_session.createDurableSubscriber(topic, "PRIORITY",
                                       " JMSPriority > 1 ", false);

      obj_message = (ObjectMessage) tsubs.receive();

      booked_order = (BolOrder)obj_message.getObject();
      customer = booked_order.getCustomer();
      System.out.println("Normal Order:  for customer " +  customer.getName()); 

      jms_session.commit();
    }
    catch (JMSException ex)
    { System.out.println("Exception :" + ex); }
  }
 

public void remote_subscriber1(TopicSession jms_session)
   {
     Topic            topic;
     ObjectMessage    obj_message;
     AQjmsAgent       remote_sub;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("OE",
                                                   "OE_bookedorders_topic");
      /* create the remote subscriber, name "Priority"  and address
       * the topic WS_bookedorders_topic at db2 
       */
      remote_sub = new AQjmsAgent("Priority", "WS.WS_bookedorders_topic@dblink_oe_ws");

      /* subscribe for western region orders */
      ((AQjmsSession)jms_session).createRemoteSubscriber(topic, remote_sub, "Region = 'Western' ");
    }
    catch (JMSException ex)
    { System.out.println("Exception :" + ex); }
    catch (java.sql.SQLException  ex1)
    {System.out.println("SQL Exception :" + ex1); }
  }


   Remote database:
   database db2 - Western Shipping database.
/* get messages for subscriber priority */
   public void  get_priority_messages1(TopicSession jms_session)
   {
     Topic            topic;
     TopicReceiver    trecs;
     ObjectMessage    obj_message;
     BolCustomer      customer;
     BolOrder         booked_order;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("WS",
                                                   "WS_bookedorders_topic");

      /* create a local receiver "Priority" for the remote subscription
       * to WS_bookedorders_topic 
       */
      trecs = ((AQjmsSession)jms_session).createTopicReceiver(topic, "Priority", null);

      obj_message = (ObjectMessage) trecs.receive();

      booked_order = (BolOrder)obj_message.getObject();
      customer = booked_order.getCustomer();
      System.out.println("Priority Order:  for customer " +  customer.getName()); 

      jms_session.commit();
    }
    catch (JMSException ex)
    { System.out.println("Exception :" + ex); }
  }

Scheduling Propagation

Propagation must be scheduled using the schedule_propagation method for every topic from which messages are propagated to target destination databases.

A schedule indicates the time frame during which messages can be propagated from the source topic. This time frame can depend on a number of factors such as network traffic, load at source database, load at destination database, and so on. The schedule therefore must be tailored for the specific source and destination. When a schedule is created, a job is automatically submitted to the job_queue facility to handle propagation.

The administrative calls for propagation scheduling provide great flexibility for managing the schedules. The duration or propagation window parameter of a schedule specifies the time frame during which propagation must take place. If the duration is unspecified, then the time frame is an infinite single window. If a window must be repeated periodically, then a finite duration is specified along with a next_time function that defines the periodic interval between successive windows.

The propagation schedules defined for a queue can be changed or dropped at any time during the life of the queue. In addition there are calls for temporarily disabling a schedule (instead of dropping the schedule) and enabling a disabled schedule. A schedule is active when messages are being propagated in that schedule. All the administrative calls can be made irrespective of whether the schedule is active or not. If a schedule is active, then it takes a few seconds for the calls to be executed.

Job queue processes must be started for propagation to take place. At least 2 job queue processes must be started. The database links to the destination database must also be valid. The source and destination topics of the propagation must be of the same message type. The remote topic must be enabled for enqueue. The user of the database link must also have enqueue privileges to the remote topic.

Example 11-20 JMS: Scheduling Propagation

public void  schedule_propagation(TopicSession jms_session)
  {
     Topic            topic;

    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("WS",
                                                   "WS_bookedorders_topic");

      /* Schedule propagation immediately with duration of 5 minutes and latency 20 sec */
      ((AQjmsDestination)topic).schedulePropagation(jms_session, "dba", null,
                                       new Double(5*60), null, new Double(20));
    }catch (JMSException ex)
    {System.out.println("Exception: " + ex);}
  }

  Propagation schedule parameters can also be altered.


  /* alter duration to 10 minutes and latency to zero */
  public void  alter_propagation(TopicSession jms_session)
  {
     Topic            topic;
    try
    {
      /* get a handle to the OE_bookedorders_topic */
      topic = ((AQjmsSession)jms_session).getTopic("WS",
                                                   "WS_bookedorders_topic");

      /* Schedule propagation immediately with duration of 5 minutes and latency 20 sec */
    ((AQjmsDestination)topic).alterPropagationSchedule(jms_session, "dba",
                        new Double(10*60), null, new Double(0));
    }catch (JMSException ex)
    {System.out.println("Exception: " + ex);}
  }

Enhanced Propagation Scheduling Capabilities

Detailed information about the schedules can be obtained from the catalog views defined for propagation. Information about active schedules—such as the name of the background process handling that schedule, the SID (session, serial number) for the session handling the propagation and the Oracle Database instance handling a schedule (relevant if Real Application Clusters are being used)—can be obtained from the catalog views. The same catalog views also provide information about the previous successful execution of a schedule (last successful propagation of message) and the next execution of the schedule.

For each schedule, detailed propagation statistics are maintained:

  • The total number of messages propagated in a schedule

  • Total number of bytes propagated in a schedule

  • Maximum number of messages propagated in a window

  • Maximum number of bytes propagated in a window

  • Average number of messages propagated in a window

  • Average size of propagated messages

  • Average time to propagated a message

These statistics have been designed to provide useful information to the queue administrators for tuning the schedules such that maximum efficiency can be achieved.

Propagation has built-in support for handling failures and reporting errors. For example, if the database link specified is invalid, or if the remote database is unavailable, or if the remote topic/queue is not enabled for enqueuing, then the appropriate error message is reported. Propagation uses an exponential backoff scheme for retrying propagation from a schedule that encountered a failure. If a schedule continuously encounters failures, then the first retry happens after 30 seconds, the second after 60 seconds, the third after 120 seconds and so forth. If the retry time is beyond the expiration time of the current window, then the next retry is attempted at the start time of the next window.

A maximum of 16 retry attempts are made after which the schedule is automatically disabled. When a schedule is disabled automatically due to failures, the relevant information is written into the alert log. It is possible to check at any time if there were failures encountered by a schedule and if so how many successive failure were encountered, the error message indicating the cause for the failure and the time at which the last failure was encountered. By examining this information, an administrator can fix the failure and enable the schedule.

If propagation is successful during a retry, then the number of failures is reset to 0. Propagation has built-in support for Real Application Clusters and is transparent to the user and the administrator. The job that handles propagation is submitted to the same instance as the owner of the queue table where the source topic resides. If at any time there is a failure at an instance and the queue table that stores the topic is migrated to a different instance, then the propagation job is also automatically migrated to the new instance. This minimizes the pinging between instances and thus offers better performance. Propagation has been designed to handle any number of concurrent schedules.

The number of job_queue_processes is limited to a maximum of 1000 and some of these can be used to handle jobs unrelated to propagation. Hence, propagation has built in support for multitasking and load balancing. The propagation algorithms are designed such that multiple schedules can be handled by a single snapshot (job_queue) process. The propagation load on a job_queue processes can be skewed based on the arrival rate of messages in the different source topics. If one process is overburdened with several active schedules while another is less loaded with many passive schedules, then propagation automatically redistributes the schedules among the processes such that they are loaded uniformly.

Exception Handling During Propagation

When a system errors such as a network failure occurs, Oracle Streams AQ continues to attempt to propagate messages using an exponential back-off algorithm. In some situations that indicate application errors Oracle Streams AQ marks messages as UNDELIVERABLE if there is an error in propagating the message.

Examples of such errors are when the remote queue/topic does not exist or when there is a type mismatch between the source queue/topic and the remote queue/topic.In such situations users must query the DBA_SCHEDULES view to determine the last error that occurred during propagation to a particular destination.The trace files in the $ORACLE_HOME/log directory can provide additional information about the error.

Message Transformation with JMS AQ

The following topics are discussed in this section:

Defining Message Transformations

A transformation can be defined to map messages of one format to another. Transformations are useful when applications that use different formats to represent the same information must be integrated. Transformations can be SQL expressions and PLSQL functions.

The transformations can be created using the DBMS_TRANSFORM.create_transformation procedure. Transformation can be specified for the following operations:

  • Sending a message to a queue or topic

  • Receiving a message from a queue, or topic

  • Creating a TopicSubscriber

  • Creating a Remote Subscriber. This enables propagation of messages between Topics of different formats.

The Message Transformation feature is an Oracle Streams AQ extension to the standard JMS interface.

Sending Messages to a Destination Using a Transformation

A transformation can be supplied when sending/publishing a message to a queue/topic. The transformation is applied before putting the message into the queue/topic.

The application can specify a transformation using the setTransformation interface in the AQjmsQueueSender and AQjmsTopicPublisher interfaces.

Example 11-21 Sending Messages to a Destination Using a Transformation

Suppose that the orders that are processed by the order entry application should be published to WS_bookedorders_topic. The transformation OE2WS (defined in the previous section) is supplied so that the messages are inserted into the topic in the correct format.

public void ship_bookedorders(TopicSession    jms_session,
                                             AQjmsADTMessage adt_message)                            
{
       TopicPublisher  publisher;
       Topic           topic;

       try
       {
         /* get a handle to the WS_bookedorders_topic */
                 topic = ((AQjmsSession)jms_session).getTopic("WS", 
                                                              "WS_bookedorders_topic");
                 publisher = jms_session.createPublisher(topic);

         /* set the transformation in the publisher */
              ((AQjmsTopicPublisher)publisher).setTransformation("OE2WS");

                 publisher.publish(topic, adt_message);

               }
               catch (JMSException ex)
               {
                  System.out.println("Exception :" ex);
       }
}

Receiving Messages from a Destination Using a Transformation

A transformation can be applied when receiving a message from a queue or topic. The transformation is applied to the message before returning it to JMS application.

The transformation can be specified using setTransformation() interface of the AQjmsQueueReceiver, AQjmsTopicSubscriber and AQjmsTopicReceiver.

Example 11-22 JMS: Receiving Messages from a Destination Using a Transformation

Assume that the Western Shipping application retrieves messages from the OE_bookedorders_topic. It specifies the transformation OE2WS to retrieve the message as the Oracle object type WS_order. Assume that the WSOrder Java class has been generated by Jpublisher to map to the Oracle object WS.WS_order:

public AQjmsAdtMessage retrieve_bookedorders(TopicSession jms_session)
       AQjmsTopicReceiver  receiver;
       Topic               topic;
       Message             msg = null;

       try
       {
         /* get a handle to the OE_bookedorders_topic */
         topic = ((AQjmsSession)jms_session).getTopic("OE", 
                                                      "OE_bookedorders_topic");

         /* Create a receiver for WShip */
         receiver = ((AQjmsSession)jms_session).createTopicReceiver(topic, 
                  "WShip, null, WSOrder.getFactory());

         /* set the transformation in the publisher */
         receiver.setTransformation("OE2WS");

         msg = receiver.receive(10);
       }
       catch (JMSException ex)
       {
          System.out.println("Exception :" ex);
       }
 
       return (AQjmsAdtMessage)msg;
}

Specifying Transformations for Topic Subscribers

A transformation can also be specified when creating Topic Subscribers using the CreateDurableSubscriber call. The transformation is applied to the retrieved message before returning it to the subscriber. If the subscriber specified in the CreateDurableSubscriber already exists, then its transformation is set to the specified transformation.

Example 11-23 JMS: Specifying Transformations for Topic Subscribers

The Western Shipping application subscribes to the OE_bookedorders_topic with the transformation OE2WS. This transformation is applied to the messages and the returned message is of Oracle object type WS.WS_orders.

Suppose that the WSOrder java class has been generated by Jpublisher to map to the Oracle object WS.WS_order:

public AQjmsAdtMessage retrieve_bookedorders(TopicSession jms_session)
{
       TopicSubscriber     subscriber;
       Topic               topic;
       AQjmsAdtMessage     msg = null;

       try
       {
         /* get a handle to the OE_bookedorders_topic */
         topic = ((AQjmsSession)jms_session).getTopic("OE", 
            "OE_bookedorders_topic");

         /* create a subscriber with the transformation OE2WS */
         subs = ((AQjmsSession)jms_session).createDurableSubscriber(topic, 
            'WShip', null, false, WSOrder.getFactory(), "OE2WS");
         msg = subscriber.receive(10);
       }
       catch (JMSException ex)
       {
           System.out.println("Exception :" ex);
       }
 
       return (AQjmsAdtMessage)msg;
}

Specifying Transformations for Remote Subscribers

Oracle Streams AQ allows a remote subscriber, that is a subscriber at another database, to subscribe to a topic.

Transformations can be specified when creating remote subscribers using the createRemoteSubscriber. This enables propagation of messages between Topics of different formats. When a message published at a topic meets the criterion of a remote subscriber, Oracle Streams AQ automatically propagates the message to the queue/topic at the remote database specified for the remote subscriber. If a transformation is also specified, then Oracle Streams AQ applies the transformation to the message before propagating it to the queue/topic at the remote database.

Example 11-24 JMS: Specifying Transformations for Remote Subscribers

A remote subscriber is created at the OE.OE_bookedorders_topic so that messages are automatically propagated to the WS.WS_bookedorders_topic. The transformation OE2WS is specified when creating the remote subscriber so that the messages reaching the WS_bookedorders_topic have the correct format.

Suppose that the WSOrder java class has been generated by Jpublisher to map to the Oracle object WS.WS_order

public void create_remote_sub(TopicSession jms_session)
{
     AQjmsAgent          subscriber;
     Topic               topic;

     try
     {
       /* get a handle to the OE_bookedorders_topic */
       topic = ((AQjmsSession)jms_session).getTopic("OE", 
                                                    "OE_bookedorders_topic");

       subscriber = new AQjmsAgent("WShip", "WS.WS_bookedorders_topic");

       ((AQjmsSession )jms_session).createRemoteSubscriber(topic, 
                                subscriber, null, WSOrder.getFactory(),"OE2WS");
     }
     catch (JMSException ex)
     {
       System.out.println("Exception :" ex);
     }
}