11.4. Point-to-Point MessagingPoint-to-point messaging involves the sending of messages from one or more senders to a single receiver through a message queue. Point-to-point messaging is analogous to email messaging: a client delivers messages to a named mailbox (queue), and the owner of the mailbox (queue) reads them in the order they were received. Queues attempt to maintain the send order of messages generated by the sender(s) attached to them. In other words, if sender A sends messages A1, A2, and A3, in that order, to a queue, then the receiver attached to the queue will receive message 2 after message 1, and message 3 after message 2 (assuming that no message selectors filter out any of these messages). If there are multiple senders attached to a queue, then the relative order of each individual sender is preserved by the queue when it delivers the messages, but the queue doesn't attempt to impose a predefined absolute order on the messages across all senders. So if there is another sender, B, attached to the same queue as A, and it sends messages B1, B2, and B3, in that order, then the receiver will receive B2 after B1, and B3 after B2, but the messages from sender A may be interleaved with the messages from sender B. The receiver may receive the messages in order A1, A2, B1, A3, B2, B3; the messages may be delivered in order B1, B2, B3, A1, A2, A3; or the messages may arrive some other order altogether. There is nothing in the JMS specification that dictates how a JMS provider should queue messages from multiple senders. Point-to-point messaging is performed in JMS using the queue-related interfaces and classes in the javax.jms package. Queues are represented by Queue objects, which are looked up in JNDI from the JMS provider. QueueConnectionFactory objects are looked up in JNDI as well and used to create QueueConnections. QueueConnections and Queues are used to create QueueSessions, which are in turn used to create QueueSenders and QueueReceivers. 11.4.1. Sample ClientExample 11-2 shows a full point-to-point messaging client. The PTPMessagingClient is capable of sending and receiving a message from a given queue, as well as browsing the current contents of the queue. Example 11-2. Point-to-point messaging clientimport java.util.*; import javax.naming.*; import javax.jms.*; public class PTPMessagingClient implements Runnable { // Our connection to the JMS provider. Only one is needed for this client. private QueueConnection mQueueConn = null; // The queue used for message passing private Queue mQueue = null; // Our message receiver -- need only one private QueueReceiver mReceiver = null; // A single session for sending and receiving from all remote peers private QueueSession mSession = null; // The message type we tag all our messages with private static String MSG_TYPE = "JavaEntMessage"; // Constructor, with client name, and the JNDI locations of the JMS // connection factory and queue that we want to use public PTPMessagingClient(String cFactoryJNDIName, String queueJNDIName) { init(cFactoryJNDIName, queueJNDIName); } // Do all the JMS setup for this client. Assumes that the JVM is // configured (perhaps using jndi.properties) so that the default JNDI // InitialContext points to the JMS provider's JNDI service. protected boolean init(String cFactoryJNDIName, String queueJNDIName) { boolean success = true; Context ctx = null; // Attempt to make connection to JNDI service try { ctx = new InitialContext( ); } catch (NamingException ne) { System.out.println("Failed to connect to JNDI provider:"); ne.printStackTrace( ); success = false; } // If no JNDI context, bail out here if (ctx == null) { return success; } // Attempt to look up JMS connection factory from JNDI service QueueConnectionFactory connFactory = null; try { connFactory = (QueueConnectionFactory)ctx.lookup(cFactoryJNDIName); System.out.println("Got JMS connection factory."); } catch (NamingException ne2) { System.out.println("Failed to get JMS connection factory: "); ne2.printStackTrace( ); success = false; } try { // Make a connection to the JMS provider and keep it. // At this point, the connection is not started, so we aren't // receiving any messages. mQueueConn = connFactory.createQueueConnection( ); // Try to find our designated queue mQueue = (Queue)ctx.lookup(queueJNDIName); // Make a session for queueing messages: no transactions, // auto-acknowledge mSession = mQueueConn.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { System.out.println("Failed to establish connection/queue:"); e.printStackTrace( ); success = false; } catch (NamingException ne) { System.out.println("JNDI Error looking up factory or queue:"); ne.printStackTrace( ); success = false; } try { // Make our receiver, for incoming messages. // Set the message selector to receive only our type of messages, // in case the same queue is being used for other purposes. mReceiver = mSession.createReceiver(mQueue, "JMSType = '" + MSG_TYPE + "'"); } catch (JMSException je) { System.out.println("Error establishing message receiver:"); je.printStackTrace( ); } return success; } // Send a message to the queue public void sendMessage(String msg) { try { // Create a JMS msg sender connected to the destination queue QueueSender sender = mSession.createSender(mQueue); // Use the session to create a text message TextMessage tMsg = mSession.createTextMessage( ); tMsg.setJMSType(MSG_TYPE); // Set the body of the message tMsg.setText(msg); // Send the message using the sender sender.send(tMsg); System.out.println("Sent the message"); } catch (JMSException je) { System.out.println("Error sending message " + msg + " to queue"); je.printStackTrace( ); } } // Register a MessageListener with the queue to receive // messages asynchronously public void registerListener(MessageListener listener) { try { // Set the listener on the receiver mReceiver.setMessageListener(listener); // Start the connection, in case it's still stopped mQueueConn.start( ); } catch (JMSException je) { System.out.println("Error registering listener: "); je.printStackTrace( ); } } // Perform a synchronous receive of a message from the queue. If it's a // TextMessage, print the contents. public String receiveMessage( ) { String msg = "-- No message --"; try { Message m = mReceiver.receive( ); if (m instanceof TextMessage) { msg = ((TextMessage)m).getText( ); } else { msg = "-- Unsupported message type received --"; } } catch (JMSException je) { } return msg; } // Print the current contents of the message queue, using a QueueBrowser // so that we don't remove any messages from the queue public void printQueue( ) { try { QueueBrowser browser = mSession.createBrowser(mQueue); Enumeration msgEnum = browser.getEnumeration( ); System.out.println("Queue contents:"); while (msgEnum.hasMoreElements( )) { System.out.println("\t" + (Message)msgEnum.nextElement( )); } } catch (JMSException je) { System.out.println("Error browsing queue: " + je.getMessage( )); } } // When run within a thread, just wait for messages to be delivered to us public void run( ) { while (true) { try { this.wait( ); } catch (Exception we) {} } } // Take command-line arguments and send or receive messages from the // named queue public static void main(String args[]) { if (args.length < 3) { System.out.println("Usage: PTPMessagingClient" + " connFactoryName queueName" + " [send|listen|recv_synch] <messageToSend>"); System.exit(1); } // Get the JNDI names of the connection factory and // queue, from the commandline String factoryName = args[0]; String queueName = args[1]; // Get the command to execute (send, recv, recv_synch) String cmd = args[2]; // Create and initialize the messaging participant PTPMessagingClient msger = new PTPMessagingClient(factoryName, queueName); // Run the participant in its own thread, so that it can react to // incoming messages Thread listen = new Thread(msger); listen.start( ); // Send a message to the queue if (cmd.equals("send")) { String msg = args[3]; msger.sendMessage(msg); System.exit(0); } // Register a listener else if (cmd.equals("listen")) { MessageListener listener = new TextLogger( ); msger.registerListener(listener); System.out.println("Client listening to queue " + queueName + "..."); System.out.flush( ); try { listen.wait( ); } catch (Exception we) {} } // Synchronously receive a message from the queue else if (cmd.equals("recv_synch")) { String msg = msger.receiveMessage( ); System.out.println("Received message: " + msg); System.exit(0); } else if (cmd.equals("browse")) { msger.printQueue( ); System.exit(0); } } } The main( ) method takes a minimum of three command-line arguments. The first two are the JNDI names of a target JMS connection factory and queue, in that order. The third argument is a command indicating what to do:
The main( ) method creates a PTPMessagingClient using the two JNDI names. The constructor passes these to the init( ) method, where all of the JMS initialization we've discussed takes place. The client attempts to connect to its JNDI provider and get its InitialContext first. There are no properties provided to the InitialContext constructor, so the environment would have to have these properties specified in a jndi.properties file or on the command line using -D options to the JVM (see Chapter 9 for details on these JNDI-related system properties). Once the Context is acquired, the client looks up the QueueConnectionFactory and Queue from JNDI. It also creates a QueueConnection and a QueueSession, so that it can later create senders and receivers as needed. Finally, the init( ) method creates a QueueReceiver from the session, in case it's needed later. The connection hasn't been started yet, so the receiver is not receiving messages from the JMS provider yet. Back in the main( ) method, once the client is created, it's put into a Thread and run. This is useful for the case in which we're going to wait for messages sent to a listener. Finally, the requested command is checked. If the command is send, we call the client's sendMessage( ) method, which creates a QueueSender and a TextMessage (using the last command-line argument, passed in from the main( ) method). Then the message is sent by passing it to the send( ) method on the QueueSender. If a "recv" command is given, we create a TextLogger (see Example 11-1) and attach it as a MessageListener to our QueueReceiver, by calling the client's registerListener( ) method where the call to the receiver's setMessageListener( ) method is made. If a recv_synch command is given, then we call the client's receiveMessage( ) method, where the receive( ) method on the QueueReceiver is called. This will block until the next message is sent to the queue. Finally, a browse command causes a call to the client's printQueue( ) method, where a QueueBrowser is created from our session, then asked for an Enumeration of the current messages in the queue. Each message is printed to the console, in the order they would be received. 11.4.2. Browsing QueuesIn addition to the conventional use of queues for sending and receiving of messages, a client can also browse the contents of a queue without actually pulling the messages from the queue. This is done using a QueueBrowser, which is generated from a client's QueueSession using its createQueueBrowser( ) methods: QueueBrowser browser = qSession.createQueueBrowser(queue); Like QueueReceivers, QueueBrowsers can use message selectors to filter what messages they see in the queue: QueueBrowser filterBrowser = qSession.createQueueBrowser(queue, "transaction-type = 'update'"); This QueueBrowser "sees" only messages in the queue that have a transaction-type property set to update. To iterate over the messages in the queue, a client asks the browser for an Enumeration of the messages in the queue that match the browser's message selector, if it has one: Enumeration msgEnum = browser.getEnumeration( ); while (msgEnum.hasMoreElements( )) { Message msg = (Message)msgEnum.nextElement( ); System.out.println("Found message, ID = " + msg.getJMSMessageID( )); } The Enumeration returns messages in the order that they would be delivered to the client, using the message selector set on the QueueBrowser. So if you had an existing QueueReceiver and wanted to look ahead in the queue to see what messages would be delivered based on the current contents of the queue, you could create a browser using the same message selector as the receiver: QueueReceiver recvr = ...; QueueBrowser recvrBrowser = qSession.createQueueBrowser(queue, recvr.getMessageSelector( )); |