Java Message Service (JMS)


Message systems also go under the name message-oriented middleware (MOM). They differ from the classical client-server architectures in a number of respects:

  • Instead of being organized according to hierarchical structures, the clients of a message service are equal participants. Clients can be application programs, parts of application programs, application servers, or other processes.

  • The clients are loosely coupled to the message service; that is, they can connect to or disconnect from the message service at any time and in any number.

  • Message exchange proceeds asynchronously; that is, as soon as a client has transmitted a message to the message service, it can continue its processing without having to wait for the message to be received or processed.

  • Message exchange proceeds anonymously; that is, a client that has received a message does not know what client has sent it. It knows only the channel of the message service over which it has received it.

JMS is an API (application programming interface) from Sun Microsystems. It is a vendor-independent interface for access to message systems from Java programs. If a Java program uses JMS for access to a message system, the message system used can be exchanged for another without the Java program having to be significantly changed. If the behaviors of the message systems are the same at run time (which is not ensured by API alone) and if they offer the same range of functionality, then as a rule, the client program will not have to be altered. Many message systems in addition to JMS are accessible by way of a vendor's proprietary API. Figure 6-2 clarifies these relationships.

click to expand
Figure 6-2: JMS and JMS providers.

Sun Microsystems has not provided a reference implementation for JMS. They simply offer a commercial implementation of JMS (Java Message Queue). Most producers of message systems have meanwhile begun to offer a JMS implementation. A producer of a JMS-compatible message service is called a JMS provider.

Messaging Concepts

A message that is sent via a message service is not considered to be directed to a particular user, as in the case, for example, of e-mail. Messages in a message service are generally transmitted in a human-unreadable format. They serve primarily for communication between parts of a particular application or between different applications. Message services are frequently used for the purpose of triggering certain actions by the receiver of a message. They are also used to decouple lengthy processes. Such processes are shifted into the background to improve the response behavior of a system. Message services are well suited to execute collections of such actions. A message can, for example, be an instruction for the deletion of a data record in a database. Instead of opening a database connection upon receipt of such a message, these instructions can be collected. Once a sufficient number of instructions have accumulated, a connection to the database is opened and the actions executed as a block.

A message service is designed to provide asynchronous communication between different processes. Events, for example (as used in the Abstract Windowing Toolkit or with the JavaBeans component model), typically serve for communication internal to a process. A message service is thus primarily used for distributed processes in which parts of the application need to communicate with one another across process boundaries. Message services are also used for communication between different types of processes.

One distinguishes two principal types of communication message systems:

  • point to point;

  • publish and subscribe.

In the point-to-point model a message is sent from a sender to precisely one recipient. In the publish-and-subscribe model a message is sent from a sender to many recipients. The channel of the message service over which a message is sent in the point-to-point system is known as a queue. Under publish and subscribe the channel is called a topic. Conversely, one can say that a queue is a message channel of a message service by which a sent message is received by exactly one recipient. A topic is a message channel of a message service by which a sent message is received by several recipients (see Figure 6-3).

click to expand
Figure 6-3: Messaging concepts (queue, topic).

Depending on the configuration, arbitrarily many queues and topics can be established within a message service. They are generally distinguished by having different names.

Queues and topics can be persistent. This means that the messages that are currently in a queue or topic are stored in, for example, a database or file system. As soon as they have been successfully received by the recipient client, they are deleted from the storage medium. In the case of a server crash, unreceived messages in persistent queues and topics are not lost. After a system restart the message service will attempt to deliver the messages that are in the persistent queue of a persistent topic.

Whether a queue or topic is persistent depends on its configuration. If a queue or topic is one over which important information is sent, then it is usually made persistent. In the case of less important messages one can do without persistence, where we note that each time a message is stored there is a negative effect on system performance.

Not all message systems support both forms of message delivery (publish and subscribe; point to point). Even when a message service makes only one of the two forms available, it is still possible to use it over JMS, which models both concepts in separate interfaces (see the next section).

JMS Interfaces

In this section we discuss how JMS maps the components of a message system, as described in the previous section, to interfaces. The use of interfaces will be detailed with examples in the course of this chapter.

javax.jms.ConnectionFactory

A connection factory is an object that enables a JMS client to establish a connecption to a message service. It receives such an object via a naming service (JNDI lookup). Depending on whether a queue or a topic is to be used, this interface is further specialized to javax.jms.QueueConnectionFactory or javax.jms.TopicConnectionFactory . According to the JMS specification, it is the task of the administrator to configure connection factories. They are therefore called administered objects. The JMS implementation makes these available to the clients in the naming service at the startup of the message service.

javax.jms.Connection

A connection object represents a connection to a message service. It is generated over a ConnectionFactory object. According to whether a queue or topic is used, the object is of type javax.jms.QueueConnection or javax.jms.TopicConnection.

javax.jms.Session

A session is an object that is bound over a connection to JMS to a particular queue or a particular topic. A session is generated over a connection object, and messages are sent and received over a session. According to whether the session is linked to a queue or a topic, we are dealing with a javax.jms.QueueSession or a javax.jms.TopicSession.

javax.jms.Destination

Destination is the generic term for a topic or a queue. This interface is accordingly specialized further in javax.jms.Queue or javax.jms.Topic. An object of type queue or topic can be obtained over a JNDI lookup. Here, too, the JMS specification provides that they be set up by the administrator according to the configuration. Thus a queue or topic is called an administered object. The JMS implementation makes them available over JNDI to JMS clients at the startup of the message service. A connection or session to a queue or topic can be opened over a connection factory for sending or receiving messages.

javax.jms.Message

This interface represents a message that is sent over a queue or topic. A message has the following components:

  • Header: In the header of a message, information is stored from the JMS client as well as the provider that serves for identification and delivery of a message. Message filters (see below) can be used in header fields.

  • Properties: These provide an application that possesses a message service the ability to store additional application-related information in the message. Such information as a rule gives the recipient suggestions or instructions that affect the processing of the message. For example, information about the sender of the message can be transmitted in a property field, so that the recipient knows from whom the message was sent. Message filters (see below) can be used in property fields.

  • Body: The body contains the actual content of the messge.

JMS defines five concrete forms of a message. They are derived from javax.jms.Message and represent various technical representations of useful data in a message (message body):

  • javax.jms.TextMessage: A message that contains a string as the message body. With this message type it is possible, for example, to exchange messages in XML format. Many message systems offer extensive support for the XML format.

  • javax.jms.ObjectMessage: A message that contains an arbitrary object as message body. The class of the object must implement the interface java.io.Serializable.

  • javax.jms.BytesMessage: A message that contains a message body in the form of a byte stream. The individual bytes are not interpreted. It is up to the recipient of the message to interpret the message appropriately.

  • javax.jms.StreamMessage: A message in which a stream of primitive Java data types and serializable objects can be sent. The primitives and the objects are written sequentially in the message and must be read by the recipient in the same order. The recipient must therefore know the order and type of the transmitted data. Thus the sending of a stream message requires a suitable serialization/deserialization protocol.

  • javax.jms.MapMessage: A message in which name-value pairs can be sent as the message body. The names are transmitted in the form of strings, the values as primitive data types.

Every application should be capable of finding a suitable message type that fits its requirements. Such general types as javax.jms.ObjectMessage and java.jms.StreamMessage offer the developer wide scope.

javax.jms.MessageProducer

A message producer is an object that enables the sending and receiving of messages. Depending on the destination type, one here distinguishes the concrete forms javax.jms.QueueSender and javax.jms.TopicPublisher.A message producer is generated via a session object.

javax.jms.MessageConsumer

A message consumer is an object that enables messages to be received. Depending on the destination type, one here distinguishes the concrete forms javax.jms.QueueReceiver and java.jms.TopicSubscriber. A message consumer is generated via a session object.

Figure 6-4 represents once again the relationships among the central interfaces of JMS-API.

click to expand
Figure 6-4: JMS interfaces.

JMS-Clients

A JMS client employs the interfaces of the Java message service for sending and receiving messages over a message service. A JMS client can send messages, receive messages, or do both. The implementation of the JMS interfaces is supplied by the provider of the message service. In addition to the interfaces of JMS, the client also uses the interfaces of JNDI (Java Naming and Directory Service). The use of JNDI has already been discussed in Chapter 4. With JNDI the JMS client can access JMS resources such as queues, topics, and connection factories (administered objects). These JMS resources are configured by the administrator of the message service (as we have already mentioned). The message system makes these resources available to the JMS clients at run time via a naming and directory service.

The following sections demonstrate the implementation of the individual aspects of a JMS client. All the examples use the auxiliary class Lookup, which has the method get. In order to limit the sample code to essentials, access to the naming service is encapsulated within this auxiliary class and the method get. The implementation of this auxiliary class is a part of the available sample source code to this book.

Sending a Message

Single-Threaded

Listing 6-1 shows the sending of a text message. The example demonstrates the use of a queue. The use of a topic is identical as regards the order of the steps. Instead of queue-specific interfaces one uses the topic-related interfaces (for example, TopicConnection instead of QueueConnection, and TopicSession instead of QueueSession).

Listing 6-1: Sending a text message.

start example
 ...      static final String FACTORY =                     "SampleConnectionFactory";      static final String QUEUE = "SampleQueue";      QueueConnection qc    = null;      QueueSession    qs    = null;      QueueSender     qsend = null;      try {          QueueConnectionFactory qcf = null;          qcf = (QueueConnectionFactory)                    Lookup.get(FACTORY);          qc = qcf.createQueueConnection();          qs = qc.createQueueSession(false,                                     Session.AUTO_ACKNOWLEDGE);          Queue queue = (Queue)Lookup.get(QUEUE);          qsend = qs.createSender(queue);          Message m = null;          m = qs.createTextMessage("a text");          qsend.send(m);      } finally {          try { qsend.close(); } catch(Exception ex) {}          try { qs.close(); }    catch(Exception ex) {}          try { qc.close(); }    catch(Exception ex) {}      } 
end example

In the first step the JMS client obtains, with the help of the class Lookup,a QueueConnectionFactory object from JNDI, which is provided by the administrator of the message service through the configuration. In the next step, the JMS client generates a QueueConnection via the queue connection factory with the help of which it then generates a QueueSession. The Queue over which it would like to send the message is taken (like the queue connection factory) from JNDI. The queue is also set up by the administrator of the message service by configuration. With the help of the QueueSession object, the JMS client generates a QueueSender for this queue. The Message object is also generated via the session. In our case we are dealing with an object of type TextMessage. In the last step the JMS client sends the message via the queue sender. Immediately after placing the message in the queue the method send returns and the JMS client can continue its processing.

Generally, a JMS connection and a JMS session require resources such as a network connection or a thread. It is thus to be recommended that these resources be released when no longer needed through a call to the various close methods. In our example the session and connection are immediately closed. If a client sends messages regularly, then it might be a good idea to keep the connection and session open and to close them only when the client program is terminated.

In sending a message the JMS client can give the message a priority (using the method Message.setJMSPriority). Messages with higher priority are sent before messages with lower priority. The priority can be transmitted with the send method (the send method possesses various signatures). If the priority is not specified, then the message is given a standard priority.

Multithreaded

A JMS session is not thread-secure. This means that a session cannot be used simultaneously by several threads. The JMS specification refers specifically to this situation (see [29]). One of the main reasons for this is that a session is the one unit in a message system that support transactions (to the extent that the message system supports transactions at all). It is difficult to implement transactions that are multithreaded. If a session is nonetheless used simultaneously by several threads, then the behavior of the JMS provider is uncertain, and it is quite likely that serious errors will arise.

The fact that a JMS session is not thread-secure is particularly critical in applications that use many threads. A typical example is that of Internet applications. They are simultaneously used by many users. Within the web container process there is thus at least one active thread for each parallel access. The type of message sending shown in Listing 6-1 is not very effective in web applications (assuming that messages are sent regularly). If many users and thus many threads are active simultaneously, then many JMS connections and JMS sessions will be constantly opened and then closed. To leave the JMS session open and close it only at program termination would lead to a JMS session using several threads simultaneously. Such problems are as a rule solved through the use of thread-secure pools. Listing 6-2 shows a simple example of how such a resource pool can be implemented.

Listing 6-2: Example of a session pool.

start example
 package jms.client; import java.util.LinkedList; import javax.jms.Session; import javax.jms.JMSException; public class SessionPool {     private LinkedList sessions;     private int        size;     public SessionPool(int size) {         sessions = new LinkedList();         this.size = size;     }     public synchronized void addSession(Session s) {         if(!sessions.contains(s)) {             sessions.addLast(s);         } else {             throw new IllegalArgumentException ("session already in use!");         } }     public synchronized Session getSession() {         Session ret = null;         while(sessions.isEmpty()) {             try {                 this.wait();             } catch(InterruptedException ex) {}         }         ret = (Session)sessions.removeFirst();         return ret;     }     public int getAvailable() {         return sessions.size();     }     public synchronized void releaseSession(Session s) {         if(sessions.size() >= size) {             throw new IllegalStateException                         ("pool is exceeding initial size");         }         if(sessions.contains(s)) {             throw new IllegalArgumentException                         ("session is available");         }         sessions.addLast(s);         this.notify();     }     public synchronized void destroy() {         for(inti=0;i< sessions.size(); i++) {             try {                 ((Session)sessions.get(i)).close();             } catch(JMSException jmsex) {                 jmsex.printStackTrace();             }         }         sessions.clear();         sessions = null;     } } 
end example

Through the use of a session pool the continual generation and closing of sessions can be avoided. Moreover, use of the pool results in fewer JMS sessions being needed as active threads. Furthermore, the session pool prevents a JMS session from being used simultaneously by several threads. An available session is requested via the method getSession. If all available sessions are occupied by other threads, then the method puts a block on the threads via a call to the method wait. When a session object is no longer needed, it must be released via the method releaseSession. When a JMS session is terminated, a call to the method notify results in one of the threads blocked by the method wait being released. The thread can continue its processing and is now in possession of a JMS session object. If significantly more threads than there are available JMS sessions are continually active, the pool can develop a bottleneck, since threads will be constantly blocked. The problem can be solved by increasing the number of available JMS sessions in the pool. Listing 6-3 shows the use of a session pool in an arbitrary class.

Listing 6-3: Use of a session pool.

start example
    ...    private QueueConnection queueCon = null;    private SessionPool     thePool  = null;    ...    public void init() {      ...      //generation of the SessionPools, e.g., at the time of      //initialization      final int SIZE = 10;      QueueConnectionFactory qcf;      qcf = (QueueConnectionFactory)                Lookup.get("ANY_FACTORY");      Queue q = (Queue)Lookup.get("ANY_QUEUE");      QueueConnection qc = qcf.createQueueConnection();      SessionPool sp  = new SessionPool(SIZE);      for(int i = 0; i < SIZE; i++) {          sp.addSession(qc.createQueueSession                         (false, Session.AUTO_ACKNOWLEDGE));      }      queueCon = qc;      thePool = sp;      ...     }     public void send(String message)     throws NamingException, JMSException  {    QueueSession qs    = null;    QueueSender  qsend = null;    try {      qs = (QueueSession)thePool.getSession();      qsend = qs.createSender(theQueue);      TextMessage tm = qs.createTextMessage();      tm.setText(message);      qsend.send(tm);    } finally {        try { qsend.close(); } catch(Exception ex) {}        thePool.releaseSession(qs);    }  } ... public void shutdown()     throws JMSException {   ...   //Termination of the Session Pool and closing of   //the session when the process is ended   thePool.destroy();   queueCon.close();   ... } ... 
end example

If an application uses a session pool, as introduced in this section, it must make one such pool available per queue or per topic. In order to reduce the number of different kinds of pools that are used, it is suggested to use "generic" queues or topics. One then sends various types of messages via a queue or topic, which then trigger various actions from the recipient, rather than to set up a separate queue or topic for each type of action. In any case, it is important to release a session immediately after use. The longer a thread blocks a session object, the greater is the probability that for this reason another thread will be blocked and will have to wait for an available session. In our example the request for a session takes place in a try block, and the release of the session in an attached finally block. Without the use of the try-finally mechanism, the occurrence of unexpected exceptions could result in the session never being released. The result would be an unwanted reduction in the size of the pool of session instances. The session object would be lost, and the resources reserved by it could not be properly released.

Receiving a Message

In receiving messages the JMS client has several options as to how the reception is to occur. It can fetch the messages from the message service or, in a manner of speaking, have the messaging service deliver them. For the examples in this section we shall again illustrate the use of the point-to-point model. The use of the publish-subscribe model is identical as to the order of steps. Instead of queue-specific interfaces, the topic-related interfaces are brought into play. We shall call attention to the specific places where there is a difference in the behavior of the message service.

The Method receive

The method receive() on the interface QueueReceiver can be used to fetch messages from the message service. Listing 6-4 shows how the method is used.

Listing 6-4: Fetching a message.

start example
 ...     static final String FACTORY =                     "SampleConnectionFactory";     static final String QUEUE = "SampleQueue";     QueueConnectionFactory qcf = null;     QueueConnection        qc  = null;     QueueSession           qs  = null;     QueueReceiver          qr  = null;     try {         qcf = (QueueConnectionFactory)                   Lookup.get(FACTORY);         qc = qcf.createQueueConnection();         qs = qc.createQueueSession(false,                                    Session.AUTO_ACKNOWLEDGE);         Queue queue = (Queue)Lookup.get(QUEUE);         qr = qs.createReceiver(queue);         qc.start();         while(true) {             Message m = qr.receive();             //...             //process the message             //...             m.acknowledge();         }     } finally {         try { qr.close(); } catch(Exception ex) {}         try { qs.close(); } catch(Exception ex) {}         try { qc.close(); } catch(Exception ex) {}     } ... 
end example

The first steps in receiving a message are identical to those in sending a message. Instead of a queue sender, the JMS client now generates a QueueReceiver object. A call to the start method on the QueueConnection object causes the JMS client to begin the message service at once with the sending of messages. A call to the receive method delivers the next queued message. If at the moment no message is available, then this method blocks the caller until the next message is available. We shall have more to say about the acknowledge method later. The method receive() is available in two forms: It can take a parameter timeout (a time interval in milliseconds). If no message is available for the JMS client, then this method is blocked until a message is available, but if no message is available within the time interval timeout, then the blockade is lifted. The method returns and delivers null instead of a message. The method receiveNoWait returns immediately to the caller in any case. If a message is available, then it is returned. If no message was available, then null is returned.

When receiving a message both the JMS connection and JMS session require certain resources. If the (receiving) JMS client releases these resources through a call to an appropriate close method, then the receipt of messages is terminated.

The Interface MessageListener

In contrast to the case of the receive method, in which the JMS client fetches the messages from the message service, here the client can have the messages delivered as soon as they become available. They are delivered asynchronously over the session's thread. For this to happen, the JMS client must register an object that implements the javax.jms.MessageListener interface with a QueueReceiver object. Listing 6-5 shows the use of this mechanism.

Listing 6-5: Receiving a message.

start example
 ...    static final String FACTORY =                    "SampleConnectionFactory";    static final String QUEUE = "SampleQueue";    QueueConnectionFactory qcf = null;    QueueConnection        qc  = null;    QueueSession           qs  = null;    QueueReceiver          qr  = null;    qcf = (QueueConnectionFactory)              Lookup.get(FACTORY);    qc = qcf.createQueueConnection();    qs = qc.createQueueSession(false,                               Session.AUTO_ACKNOWLEDGE);    Queue queue = (Queue)Lookup.get(QUEUE);    qr = qs.createReceiver(queue);    qr.setMessageListener(this);    qc.start(); ... 
end example

To set the receipt of messages in motion, in this case as well the method start() on the QueueConnection object is called. A call to the setMessageListener method returns at once, and the JMS client continues its processing. With this method the JMS client registers the object that the javax.jms.MessageListener interface implements (in our example, this). This interface contains only the method onMessage (Message) (see Listing 6-6). If a message is to be delivered to the JMS client, this method will be called and the message delivered in the form of a Message object.

Listing 6-6: Implementation of MessageListener.onMessage().

start example
 ...   public void onMessage(javax.jms.Message m) {       // ...       // Process the message       // ...       try {           m.acknowledge();       } catch(javax.jms.JMSException jmsex) {           jmsex.printStackTrace();       }   } ... 
end example

Within a session the sending of messages to a receiver is serialized, and a session always uses only one thread. The next message is sent only when the sending of a message to one or more recipients is closed. To run message sending in parallel the JMS client can either work with several threads within the onMessage method or use multiple sessions. In this way dealing with threads is left to the JMS implementation of the message service.

In the case of asynchronous receiving of messages the behavior of the message service depends on whether a queue or topic is used. If a JMS client receives messages from a queue and uses several sessions for this purpose, then which client receives the next message depends on the implementation of the JMS provider. What is certain, however, is that only one client receives the message. There are JMS providers that allow only one session and one recipient per queue. Under the publish-subscribe model each recipient receives every message for every session. Figure 6-5 shows this state of affairs.

click to expand
Figure 6-5: Receiving messages over several sessions.

If the JMS client uses more than one session for receiving messages in parallel and in each session registers the same MessageListener object, then the implementation of the MessageListener method must be thread-secure. Since each session uses a thread for sending, the onMessage method can be run simultaneously for multiple threads. If the JMS client registers a different MessageListener object for each session, then the implementation does not have to be thread-secure. However, in this case the JMS client must be concerned in some situations about the synchronization of parallel incoming messages.

The Server Session Pool

Another type of message reception is treated by the JMS specification in a section devoted explicitly to secondary themes that are not part of the range of standard functionality of a message service. The section JMS Application Server Facilities (see [29]) treats matters targeted at those who provide application servers. To this belongs the server session pool, which enables the parallel receipt of multiple messages.

Instead of opening and managing several sessions itself, the JMS client can leave this task to the application server, by using a server session pool. In the world of Java, by application server one understands typically a product that conforms to the specifications of Java 2, Enterprise Edition (that is, a server that in addition to a web and EJB container offers a message service and a JMS implementation). It is also possible that the JMS provider itself makes a server session pool implementation available. In the course of this section we will use the term application server for both cases.

A server session is an object that is made available by an application server. A server session joins a JMS session with a thread of the application server. The message service is responsible for delivering messages throughout a session. The application server manages the session and provides a thread for the JMS session. The JMS client merely provides the implementation of the MessageListener interface in a separate class, which is responsible for the processing of messages. Since a separate thread is provided to each MessageListener object, the implementation does not have to be thread-secure.

The processing of received messages does not take place in this case, as in both other cases, in the process of the JMS client, but in the process of the application server. If the JMS client wants to or has to for performance considerations process messages in parallel, then the use of a server session pool affords enormous simplification. It need not be concerned with managing multiple JMS sessions or with multiple threads, nor is the client process burdened at run time by the allocation of various resources such as network connections and threads. However, the resources of the application server are additionally burdened by a server session pool. Before a server session pool is used it should be considered whether the application server can bear the additional burden. Listing 6-7 shows an example of the use of a server session pool, while Listing 6-8 shows the implementation of the MessageListener interface, which is responsible for the processing of messages in the server session pool. An example of an application server that offers a message service and an implementation of a server session pool is the WebLogic application server from BEA Systems (see [35]). The example shown in Listing 6-7 was developed with WebLogic.

Listing 6-7: Generation of a server session pool.

start example
 ...     static final String FACTORY =                     "SampleConnectionFactory";     static final String QUEUE = "SampleQueue";     static final String SSP_FACTORY =                     "SampleSessionPoolFactory";     static final String LISTENER = "jms.client.MQListener";     QueueConnectionFactory qcf = null;     QueueConnection        qc  = null;     ConnectionConsumer     cc  = null;     qcf = (QueueConnectionFactory)               Lookup.get(FACTORY);     qc = qcf.createQueueConnection();     qc.start();     Queue queue = (Queue)Lookup.get(QUEUE);     ServerSessionPoolFactory factory =         (ServerSessionPoolFactory)             Lookup.get(SSP_FACTORY);     ServerSessionPool sessionPool;     sessionPool = factory.getServerSessionPool                    (qc, 5, false,                     Session.AUTO_ACKNOWLEDGE, LISTENER);     cc = qc.createConnectionConsumer              (queue, "TRUE", sessionPool, 10); ... 
end example

Listing 6-8: Receiver class of the server session pool.

start example
 package jms.client; import javax.jms.*; public class MQListener implements MessageListener {     public MQListener() {}     public void onMessage(Message m) {         // ...         // process the message         // ...         try {             m.acknowledge();         } catch(JMSException jmsex) {             jmsex.printStackTrace();         }     } } 
end example

In order to generate a ServerSessionPool object the JMS client requires a ServerSessionPoolFactory. It acquires one, such as a connection factory, from JNDI. It is also made available from the administrator via the configuration. In generating the server session pool the following are specified: a Connection object, the size of the pool (the value 5 in the example), true or false (according to whether transactions are used), the type of acknowledgment, and the name of the class that implements the MessageListener interface and is responsible for the processing of messages (see Listing 6-8). The session pool is activated via the method createConnectionConsumer() on the QueueConnection object. The session pool opens the requisite number of server sessions, associates them with threads, and uses instances of the passed classes to process messages. The MessageListener class must be available to the application server, since it is responsible for the generation of instances.

At this place we would like to reiterate that the instances of the MessageListener class are generated in the process of the application server and not in the process of the JMS client. Thus the processing of messages also takes place in the process of the application server and not in that of the JMS client.

Reception Behavior

A queue always delivers a message to a single recipient. If several recipients are registered with a single queue, then to which recipient the message is delivered depends on the implementation of the JMS provider. Queues have, as a rule, only a single recipient. The ability of a JMS implementation to permit multiple recipients is often used to achieve economy of scale. In using several sessions and several receivers for a queue, it can be a convenient option to process incoming messages in parallel.

Topics typically have multiple recipients. A message in a topic is delivered only to the currently active subscribers. Inactive subscribers do not receive the message. If a JMS client would like to ensure that it receives all the messages belonging to a particular topic, it can register as a durable topic subscriber. The messages of a topic are stored until they have been delivered to all durable registered recipients.

The situation of message distribution in a queue operates differently. If no recipient is currently registered with a queue, then the message remains in the queue until a recipient registers. It is thereby guaranteed that every message reaches precisely one client.

In the case of a server crash the above-mentioned guarantees remain in effect only if the queue or topic is persistent. The messages are then stored persistently by the message service, and they remain stored even in the case of a server crash or restart. The message service begins immediately after a restart with the delivery of any remaining messages.

Messages are normally delivered in the order in which they were added to the queue or topic. The order in which messages arrive at recipients depends on a number of factors. If the sender designates varying priorities for various messages, it thereby influences the order of delivery to the recipients. Messages with higher priority are delivered before messages with lower priority. If the recipient employs a message filter (see below), the order of delivery is again influenced.

Transactions and Acknowledgment

Simple Transactions

To automate the sending and delivery of messages, a JMS client can use a transactional session (assuming that the JMS provider supports this functionality). Listing 6-9 shows how a transactional session is generated.

Listing 6-9: Generation of a transactional session.

start example
 ... QueueConnectionFactory qcf = null; qcf = (QueueConnectionFactory)Lookup.get(FACTORY); QueueConnection qc = qcf.createQueueConnection(); qc.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); ... 
end example

When the method createQueueSession() is called, the parameter true results in the session being transactional. The same holds for the method createTopicSession. Sent messages are transferred to a queue or topic only when Session.commit() is called. Received messages are considered to have been successfully received only when the recipient calls Session.commit(). The transaction bond encompasses the sending or receiving of a particular message. The sending and receiving of a particular message cannot be bound in a single transaction, since the receiving session is different from the sending session. Only the receiving and sending of different messages within a session is transactional.

When a sending JMS client uses a transactional session, it must call commit to finally transfer the message into the queue or topic. The same holds for a receiving JMS client when it uses a transactional session. A message that is sent via a transactional session can be received via a nontransactional session, and vice versa.

A call to commit not only closes the current transaction, but also opens simultaneously a new transaction. Transactions can therefore not be nested. If Session.rollback() is called, all messages sent since the last call to the method commit are discarded. None of these messages are transferred to the queue or topic. The same holds for the receipt of messages. Messages delivered after the last call to commit are considered to have been successfully delivered only when the method commit is again called. Only then are the delivered messages finally removed from the queue or topic. If Session.rollback() is called, the message service begins with a renewed delivery to the registered recipients of the messages left in the queue or topic since the last call to commit. Whether a session is transactional, that is, whether a JMS client must call the method commit for sending or receipt of messages, can be determined by a call to Session.getTransacted(). Whether a message was redelivered due to a call to the method rollback can be determined by the JMS client via the method Message.getJMSRedelivered().

Distributed Transactions

The JMS provider can optionally support distributed transactions. (See Chapter 7 for details on transactions.) In contrast to simple transactions of a session, in the case of distributed transactions, actions of various services can be collected into a single action. These services can come into operation from various servers. A distributed transaction can, for example, comprise a method call to an Enterprise Bean and the placing of a message in a queue. If one of the two actions goes awry, then it is possible to cancel both actions. The context of a distributed transaction ends with the sending of a message. It is not extended to the receipt of the message. Thus the receipt and processing of a message cannot be components of a distributed transaction in which the message was sent. Here the same principle holds as with simple transactions.

The JMS supports distributed transactions via the JTA/XAResource API (details are discussed in Chapter 7). The support for distributed transactions in implemented by the JMS provider in the JMS sessions. The specification defines for this an interface javax.jms.XASession, which is derived from javax.jms.Session; XASession is specialized for the particular communication types (publish-subscribe, point to point). The transaction is steered by the transaction monitor via the XA interfaces. The JMS client does not come into direct contact with these interfaces. If it moves within a distributed transaction, it will obtain an XASession object, which, however, appears to it as a Session object. If the JMS clients attempts within a distributed transaction to call the method commit or rollback on the session object, then the method triggers an exception of type TransactionInProgressException. Support of distributed transactions in JMS is designed primarily for the integration of JMS into application servers.

Acknowledgment

When neither simple nor distributed transactions are used, the acknowledgment mechanism comes into play. It governs when delivered messages are finally removed from the queue or topic. The acknowledgment mechanism is of no importance in sending messages. At the time a session is created, the JMS client can specify the type of acknowledgment behavior that is desired. The mechanism is used by the JMS client via the methods Message.acknowledge() and Session.recover(). The method acknowledge is comparable to the commit method of a transaction, while the method recover is comparable to the rollback method. The JMS client can exhibit various acknowledgment behaviors when a JMS sessions is generated as follows:

  • javax.jms.Session.AUTO_ACKNOWLEDGE: The JMS session automatically acknowleges a message after the receive method has been called successfully or after a call by the onMessage method to a message listener has returned. The delivered message is not sent again and is immediately removed from the queue or topic. In this way the sending of one message corresponds to an atomic action. Calls to the acknowledge method of the JMS client are ignored. A call to the recover method by the JMS client changes nothing in the behavior of the session.

  • javax.jms.Session.CLIENT_ACKNOWLEDGE: The JMS client must call the acknowledge explicitly to confirm the receipt of one or several messages. As long as the acknowledge method is not called, already delivered messages are not removed from the queue or topic. If the JMS client calls the acknowledge method after each receipt of a message, the behavior is identical to that of AUTO_ACKNOWLEDGE. If the JMS client calls the method recover, the session halts the delivery of messages at once. The session returns to the message that was stopped after the last call to acknowledge and begins again with delivery. Messages that were redelivered due to a call to recover are recognized by the JMS client by a call to the method Message.getJMSRedelivered().

  • javax.jms.Session.DUPS_OK_ACKNOWLEDGE: In this mode the JMS session confirms the receipt of a message automatically as in the mode AUTO_ACKNOWLEDGE. However, it is up to the session to determine when the confirmation is made and the messages removed from the queue or topic. This means that in this mode, messages can be multiply delivered (the prefix DUPS stands for "duplicates"). In contrast to a call to the recover method in the mode CLIENT_ACKNOWLEDGE, there is nothing in the message to show whether it was delivered more than once. Thus this mode can be used only with JMS clients that can deal with doubly delivered messages. Calls to the methods acknowledge and recover have no effect. The advantage of this mode over AUTO_ACKNOWLEDGE is that the session can work more efficiently. For example, the removal of sent messages can take place in a separate thread, which runs in the background with lower priority.

AUTO_ACKNOWLEDGE is probably the mode best suited for most JMS clients. The increased cost of a JMS session at run time vis- -vis the DUPS_OK_ACKNOWLEDGE mode is frequently made up for by its being less error-prone. The mode DUPS_OK_ACKNOWLEDGE is used with JMS clients to process incoming messages as a block to achieve greater run-time efficiency. This makes sense, for example, when in processing a message a database access is necessary. Instead of accessing the database for each incoming message, a collective query can be made to the database for several messages. In this way the cost of opening several connections is spared, and the database is burdened with fewer queries. If the action was successful, then the receipt of the other messages of the block to be processed can be confirmed with a call of the method acknowledge to the last-received message.

Filtering Messages

A JMS client has the ability of specifying a filter for receipt of messages. This filter results in only those messages being delivered to the client that satisfy the particular filter criteria. The JMS specification calls this filter mechanism the message selector. The filter can be specified when a recipient is created at the session interface in the form of a string object:

  • QueueSession.createReceiver(Queue queue, java.lang.String messageSelector)

  • TopicSession.createSubscriber(Topic topic, java.jang.String messageSelector, boolean noLocal)

The attribute noLocal, which can be specified only in relation to a topic, offers the possibility of excluding the delivery of messages that are sent over the same connection. This attribute is useful when a JMS client sends and receives messages over a particular topic (chat applications use this process, for example). With noLocal the client can determine whether messages that it sends itself are received.

The syntax of the message selector is based on the WHERE clause of the query language SQL92. For a description of the syntax see [29]. The message selector is applied only to the header fields and the properties of a message. It cannot be applied to the message body.

Example 1

start example
 ... QueueReceiver qr; qr = queueSession.createReceiver(queue, "JMSPriority >= 5"); ... 
end example

The receiver shown in this example will receive from the message service only the messages in the queue whose priority is greater than 5. JMSPriority is a header field defined by the JMS specification and is placed on every message. If the priority is not specified by the sender, the field JMSPriority is filled with a standard value. JMSPriority can assume the values 0 through 9, where 0 is the lowest priority and 9 the highest. The standard priority has the value 4.

Example 2

start example
 ... QueueReceiver qr; String selector = "AppProp in ('aProp', 'bProp')"; qr = queueSession.createReceiver(queue, selector); ... 
end example

The receiver shown in this example will have only those messages from the queue delivered by the message service for which the property AppProp defined by the application has the value aProp or bProp. The sender of a message sets the property AppProp via the method setStringProperty on the interface javax.jms.Message.

The use of a message selector results in a particular recipient receiving only those messages that satisfy the selector criteria. In the case of a queue, the result is that the messages that do not meet the criteria remain in the queue until a recipient registers that meets one of the following criteria:

  • Recipient uses no message selector;

  • Recipient uses a message selector with the appropriate header and properties of this message.

With a topic, messages that are not delivered to any recipient as a result of the selector criteria are simply not delivered. For the recipient or recipients in question it is as if the message had never been sent.

Request-Reply

Message systems are designed for asynchronous communication. A JMS client sends a message and thereby sets off certain responses on the receiver side. It does not know exactly when the impact will occur (that is, when the message will be received), nor who will undertake the processing (that is, who receives the message). In a classical RPC (remote procedure call), for example, the client must wait until the processing of the procedure is complete. After the RPC has returned, the client receives the result of the processing as the return value. This return value can have various meanings for the RPC client. It can signal whether the processing was successful or whether it suffered errors. The return value can also influence the further actions of the client.

When a message service is used for communication between the components of a distributed application, the behavior just described is often desired. With the request-reply mechanism a behavior can be simulated over a message service that corresponds to a classical RPC. This means that synchronous communication can also occur over a message service. The only difference with respect to a classical RPC is that the caller cannot determine from whom the request is received.

The JMS specification provides auxiliary classes for achieving request-reply behavior. These classes are javax.jms.QueueRequestor for the point-to-point communication model and javax.jms.TopicRequestor for the publish-subscribe model. Listing 6-10 shows an example of how the class QueueRequestor is used (which corresponds to the use of the class TopicRequestor).

Listing 6-10: Request-reply with the class QueueRequestor.

start example
 ...    QueueConnection qc    = null;    QueueSession    qs    = null;    QueueSender     qsend = null;    QueueRequestor  qr    = null;    try {        QueueConnectionFactory qcf = null;        qcf = (QueueConnectionFactory)                   Lookup.get(FACTORY);        qc = qcf.createQueueConnection();        qc.start();        qs = qc.createQueueSession(false,                                   Session.AUTO_ACKNOWLEDGE);        Queue queue = (Queue)Lookup.get(qn);        qsend = qs.createSender(queue);        m = qs.createTextMessage("a message");        qr = new QueueRequestor(qs, queue);        Message ret = qr.request(m);        //...        //process the reply        //...    } finally {        try { qr.close(); }    catch(Exception ex) {}        try { qsend.close(); } catch(Exception ex) {}        try { qs.close(); }    catch(Exception ex) {}        try { qc.close(); }    catch(Exception ex) {}    } ... 
end example

The code is largely identical to that for sending a message, as shown in the earlier section on sending a message. The significant difference consists in the use of the class QueueRequestor. Instead of generating a queue sender via a queue session, an instance of the class javax.jms.QueueRequestor is generated. To it are passed the queue session and the queue as parameters. For sending a message the JMS client calls the method request() and passes the message to be sent. The message is sent via the queue that was passed to the queue requestor in the constructor. Simultaneously, the queue requestor sets up a temporary queue, with which it awaits the answer. Temporary queues (and the same for temporary topics) are valid only as long as the connections over which they were created are active. Only the sessions that belong to the connection over which the temporary queue or topic was generated are allowed to generate a receiver for this temporary queue or topic. A call to the request() method blocks the call until an answer is received over the temporary queue. Listing 6-11 shows a receiver that has answered the sent message from Listing 6-10.

Listing 6-11: A JMS client's answer to a message.

start example
 ...     public void onMessage(Message m) {         // ...         // process Message         // ...        QueueSender qs = null;        try {          TextMessage tm =              queueSession.createTextMessage();          tm.setText("Reply:" +                        ((TextMessage)m).getText());          Queue reply = (Queue)m.getJMSReplyTo();          qs = queueSession.createSender(reply);          qs.send(tm);          m.acknowledge();      } catch(JMSException jmsex) {          jmsex.printStackTrace();      } finally {          try { qs.close(); } catch(Exception ex) {}      }     } ... 
end example

To reply to a message the receiver evaluates the header field JMSReplyTo. It receives the (temporary) queue or topic over which the sender of the message is awaiting an answer. Under other circumstances it would have no access to the temporary queue. Due to the properties of temporary queues and topics (as described above) it is ensured that only the recipient of a message can send an answer. The field JMSReplyTo is set by the queue requestor via the method setJMSReplyTo() on the interface javax.jms.Message. The recipient generates a queue sender for the temporary queue and sends the answer over that queue to the sender of the original message. If the answer arrives at the sender, the call to the request() method returns, and the sender can evaluate the answer.

Since temporary queues and topics are employed in request-reply, no critical information should be contained in the answer. The sender of the message must take into consideration that the answer might, for example, become lost due to a server crash or loss of a connection. A temporary queue or topic exists, as already mentioned, only in the context of a particular JMS connection. Should this connection be broken, the temporary queue or topic is lost together with the messages it contains.




Enterprise JavaBeans 2.1
Enterprise JavaBeans 2.1
ISBN: 1590590880
EAN: 2147483647
Year: 2006
Pages: 103

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net