Section 11.4. Point-to-Point Messaging


11.4. Point-to-Point Messaging

Point-to-point messaging involves the sending of messages from one or more senders to a single receiver through a message queue. Point-to-point messaging is analogous to email messaging: a client delivers messages to a named mailbox (queue), and the owner of the mailbox (queue) reads them in the order they were received. Queues attempt to maintain the send order of messages generated by the sender(s) attached to them. In other words, if sender A sends messages A1, A2, and A3, in that order, to a queue, then the receiver attached to the queue will receive message 2 after message 1, and message 3 after message 2 (assuming that no message selectors filter out any of these messages). If there are multiple senders attached to a queue, then the relative order of each individual sender is preserved by the queue when it delivers the messages, but the queue doesn't attempt to impose a predefined absolute order on the messages across all senders. So if there is another sender, B, attached to the same queue as A, and it sends messages B1, B2, and B3, in that order, then the receiver will receive B2 after B1, and B3 after B2, but the messages from sender A may be interleaved with the messages from sender B. The receiver may receive the messages in order A1, A2, B1, A3, B2, B3; the messages may be delivered in order B1, B2, B3, A1, A2, A3; or the messages may arrive some other order altogether. There is nothing in the JMS specification that dictates how a JMS provider should queue messages from multiple senders.

Point-to-point messaging is performed in JMS using the queue-related interfaces and classes in the javax.jms package. Queues are represented by Queue objects, which are looked up in JNDI from the JMS provider. QueueConnectionFactory objects are looked up in JNDI as well and used to create QueueConnections. QueueConnections and Queues are used to create QueueSessions, which are in turn used to create QueueSenders and QueueReceivers.

11.4.1. Sample Client

Example 11-2 shows a full point-to-point messaging client. The PTPMessagingClient is capable of sending and receiving a message from a given queue, as well as browsing the current contents of the queue.

Example 11-2. Point-to-point messaging client
 import java.util.*; import javax.naming.*; import javax.jms.*; public class PTPMessagingClient implements Runnable {   // Our connection to the JMS provider.  Only one is needed for this client.   private QueueConnection mQueueConn = null;   // The queue used for message passing   private Queue mQueue = null;   // Our message receiver -- need only one   private QueueReceiver mReceiver = null;   // A single session for sending and receiving from all remote peers   private QueueSession mSession = null;   // The message type we tag all our messages with   private static String MSG_TYPE = "JavaEntMessage";   // Constructor, with client name, and the JNDI locations of the JMS   // connection factory and queue that we want to use   public PTPMessagingClient(String cFactoryJNDIName, String queueJNDIName) {     init(cFactoryJNDIName, queueJNDIName);   }   // Do all the JMS setup for this client.  Assumes that the JVM is   // configured (perhaps using jndi.properties) so that the default JNDI   // InitialContext points to the JMS provider's JNDI service.   protected boolean init(String cFactoryJNDIName, String queueJNDIName) {     boolean success = true;     Context ctx = null;     // Attempt to make connection to JNDI service     try {       ctx = new InitialContext( );     }     catch (NamingException ne) {       System.out.println("Failed to connect to JNDI provider:");       ne.printStackTrace( );       success = false;     }     // If no JNDI context, bail out here     if (ctx == null) {       return success;     }     // Attempt to look up JMS connection factory from JNDI service     QueueConnectionFactory connFactory = null;     try {       connFactory = (QueueConnectionFactory)ctx.lookup(cFactoryJNDIName);       System.out.println("Got JMS connection factory.");     }     catch (NamingException ne2) {       System.out.println("Failed to get JMS connection factory: ");       ne2.printStackTrace( );       success = false;     }     try {       // Make a connection to the JMS provider and keep it.       // At this point, the connection is not started, so we aren't       // receiving any messages.       mQueueConn = connFactory.createQueueConnection( );       // Try to find our designated queue       mQueue = (Queue)ctx.lookup(queueJNDIName);       // Make a session for queueing messages: no transactions,       // auto-acknowledge       mSession =         mQueueConn.createQueueSession(false,                                       javax.jms.Session.AUTO_ACKNOWLEDGE);     }     catch (JMSException e) {       System.out.println("Failed to establish connection/queue:");       e.printStackTrace( );       success = false;     }     catch (NamingException ne) {       System.out.println("JNDI Error looking up factory or queue:");       ne.printStackTrace( );       success = false;     }     try {       // Make our receiver, for incoming messages.       // Set the message selector to receive only our type of messages,       // in case the same queue is being used for other purposes.       mReceiver = mSession.createReceiver(mQueue,                                           "JMSType = '" + MSG_TYPE + "'");     }     catch (JMSException je) {       System.out.println("Error establishing message receiver:");       je.printStackTrace( );     }     return success;   }   // Send a message to the queue   public void sendMessage(String msg) {     try {       // Create a JMS msg sender connected to the destination queue       QueueSender sender = mSession.createSender(mQueue);       // Use the session to create a text message       TextMessage tMsg = mSession.createTextMessage( );       tMsg.setJMSType(MSG_TYPE);       // Set the body of the message       tMsg.setText(msg);       // Send the message using the sender       sender.send(tMsg);       System.out.println("Sent the message");     }     catch (JMSException je) {       System.out.println("Error sending message " + msg + " to queue");       je.printStackTrace( );     }   }   // Register a MessageListener with the queue to receive   // messages asynchronously   public void registerListener(MessageListener listener) {     try {       // Set the listener on the receiver       mReceiver.setMessageListener(listener);       // Start the connection, in case it's still stopped       mQueueConn.start( );     }     catch (JMSException je) {       System.out.println("Error registering listener: ");       je.printStackTrace( );     }   }   // Perform a synchronous receive of a message from the queue.  If it's a   // TextMessage, print the contents.   public String receiveMessage( ) {     String msg = "-- No message --";     try {       Message m = mReceiver.receive( );       if (m instanceof TextMessage) {         msg = ((TextMessage)m).getText( );       }       else {         msg = "-- Unsupported message type received --";       }     }     catch (JMSException je) {     }     return msg;   }   // Print the current contents of the message queue, using a QueueBrowser   // so that we don't remove any messages from the queue   public void printQueue( ) {     try {       QueueBrowser browser = mSession.createBrowser(mQueue);       Enumeration msgEnum = browser.getEnumeration( );       System.out.println("Queue contents:");       while (msgEnum.hasMoreElements( )) {         System.out.println("\t" + (Message)msgEnum.nextElement( ));       }     }     catch (JMSException je) {       System.out.println("Error browsing queue: " + je.getMessage( ));     }   }   // When run within a thread, just wait for messages to be delivered to us   public void run( ) {     while (true) {       try { this.wait( ); } catch (Exception we) {}     }   }   // Take command-line arguments and send or receive messages from the   // named queue   public static void main(String args[]) {     if (args.length < 3) {       System.out.println("Usage: PTPMessagingClient" +                          " connFactoryName queueName" +                          " [send|listen|recv_synch] <messageToSend>");       System.exit(1);     }     // Get the JNDI names of the connection factory and     // queue, from the commandline     String factoryName = args[0];     String queueName = args[1];     // Get the command to execute (send, recv, recv_synch)     String cmd = args[2];     // Create and initialize the messaging participant     PTPMessagingClient msger =       new PTPMessagingClient(factoryName, queueName);     // Run the participant in its own thread, so that it can react to     // incoming messages     Thread listen = new Thread(msger);     listen.start( );     // Send a message to the queue     if (cmd.equals("send")) {       String msg = args[3];       msger.sendMessage(msg);       System.exit(0);     }     // Register a listener     else if (cmd.equals("listen")) {       MessageListener listener = new TextLogger( );       msger.registerListener(listener);       System.out.println("Client listening to queue " + queueName                          + "...");       System.out.flush( );       try { listen.wait( ); } catch (Exception we) {}     }     // Synchronously receive a message from the queue     else if (cmd.equals("recv_synch")) {       String msg = msger.receiveMessage( );       System.out.println("Received message: " + msg);       System.exit(0);     }     else if (cmd.equals("browse")) {       msger.printQueue( );       System.exit(0);     }   } } 

The main( ) method takes a minimum of three command-line arguments. The first two are the JNDI names of a target JMS connection factory and queue, in that order. The third argument is a command indicating what to do:


send

Sends a message, using the next command-line argument as the text of a TextMessage


recv

Registers a listener with the queue and waits for messages to come in


recv_synch

Synchronously polls the queue for the next message that's sent


browse

Sends a request to print the current contents of the queue without emptying it, using a QueueBrowser

The main( ) method creates a PTPMessagingClient using the two JNDI names. The constructor passes these to the init( ) method, where all of the JMS initialization we've discussed takes place. The client attempts to connect to its JNDI provider and get its InitialContext first. There are no properties provided to the InitialContext constructor, so the environment would have to have these properties specified in a jndi.properties file or on the command line using -D options to the JVM (see Chapter 9 for details on these JNDI-related system properties). Once the Context is acquired, the client looks up the QueueConnectionFactory and Queue from JNDI. It also creates a QueueConnection and a QueueSession, so that it can later create senders and receivers as needed. Finally, the init( ) method creates a QueueReceiver from the session, in case it's needed later. The connection hasn't been started yet, so the receiver is not receiving messages from the JMS provider yet.

Back in the main( ) method, once the client is created, it's put into a Thread and run. This is useful for the case in which we're going to wait for messages sent to a listener. Finally, the requested command is checked. If the command is send, we call the client's sendMessage( ) method, which creates a QueueSender and a TextMessage (using the last command-line argument, passed in from the main( ) method). Then the message is sent by passing it to the send( ) method on the QueueSender. If a "recv" command is given, we create a TextLogger (see Example 11-1) and attach it as a MessageListener to our QueueReceiver, by calling the client's registerListener( ) method where the call to the receiver's setMessageListener( ) method is made. If a recv_synch command is given, then we call the client's receiveMessage( ) method, where the receive( ) method on the QueueReceiver is called. This will block until the next message is sent to the queue. Finally, a browse command causes a call to the client's printQueue( ) method, where a QueueBrowser is created from our session, then asked for an Enumeration of the current messages in the queue. Each message is printed to the console, in the order they would be received.

11.4.2. Browsing Queues

In addition to the conventional use of queues for sending and receiving of messages, a client can also browse the contents of a queue without actually pulling the messages from the queue. This is done using a QueueBrowser, which is generated from a client's QueueSession using its createQueueBrowser( ) methods:

     QueueBrowser browser = qSession.createQueueBrowser(queue); 

Like QueueReceivers, QueueBrowsers can use message selectors to filter what messages they see in the queue:

     QueueBrowser filterBrowser =       qSession.createQueueBrowser(queue, "transaction-type = 'update'"); 

This QueueBrowser "sees" only messages in the queue that have a transaction-type property set to update.

To iterate over the messages in the queue, a client asks the browser for an Enumeration of the messages in the queue that match the browser's message selector, if it has one:

     Enumeration msgEnum = browser.getEnumeration( );     while (msgEnum.hasMoreElements( )) {       Message msg = (Message)msgEnum.nextElement( );       System.out.println("Found message, ID = " + msg.getJMSMessageID( ));     } 

The Enumeration returns messages in the order that they would be delivered to the client, using the message selector set on the QueueBrowser. So if you had an existing QueueReceiver and wanted to look ahead in the queue to see what messages would be delivered based on the current contents of the queue, you could create a browser using the same message selector as the receiver:

     QueueReceiver recvr = ...;     QueueBrowser recvrBrowser =       qSession.createQueueBrowser(queue, recvr.getMessageSelector( )); 



Java Enterprise in a Nutshell
Java Enterprise in a Nutshell (In a Nutshell (OReilly))
ISBN: 0596101422
EAN: 2147483647
Year: 2004
Pages: 269

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net