The following sections discuss the basics needed to use the JBoss JMS implementation. JMS leaves the details of accessing JMS connection factories and destinations as provider-specific details. You need to know the following to use the JBossMQ layer:
The following sections look at examples of the various JMS messaging models and message-driven beans. You can find the source for this chapter's examples under the src/main/org/jboss/chap6 directory of the book examples. A Point-to-Point ExampleLet's start out with a point-to-point (P2P) example. In the P2P model, a sender delivers messages to a queue, and a single receiver pulls the message off the queue. The receiver does not need to be listening to the queue at the time the message is sent. Listing 6.1 shows a complete P2P example that sends a javax.jms.TextMessage to the queue queue/testQueue and asynchronously receives the message from the same queue. Listing 6.1. A P2P JMS Client Examplepackage org.jboss.chap6.ex1; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; import EDU.oswego.cs.dl.util.concurrent.CountDown; import org.apache.log4j.Logger; import org.jboss.util.ChapterExRepository; /** * A complete JMS client example program that sends a * TextMessage to a Queue and asynchronously receives the * message from the same Queue. * * @author Scott.Stark@jboss.org * @version $Revision: 1.4 $ */ public class SendRecvClient { static Logger log; static CountDown done = new CountDown(1); QueueConnection conn; QueueSession session; Queue que; public static class ExListener implements MessageListener { public void onMessage(Message msg) { done.release(); TextMessage tm = (TextMessage) msg; try { log.info("onMessage, recv text=" + tm.getText()); } catch(Throwable t) { t.printStackTrace(); } } } public void setupPTP() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); QueueConnectionFactory qcf = (QueueConnectionFactory) tmp; conn = qcf.createQueueConnection(); que = (Queue) iniCtx.lookup("queue/testQueue"); session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); conn.start(); } public void sendRecvAsync(String text) throws JMSException, NamingException { log.info("Begin sendRecvAsync"); // Set up the PTP connection, session setupPTP(); // Set the async listener QueueReceiver recv = session.createReceiver(que); recv.setMessageListener(new ExListener()); // Send a text msg QueueSender send = session.createSender(que); TextMessage tm = session.createTextMessage(text); send.send(tm); log.info("sendRecvAsync, sent text=" + tm.getText()); send.close(); log.info("End sendRecvAsync"); } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { ChapterExRepository.init(SendRecvClient.class); log = Logger.getLogger("SendRecvClient"); log.info("Begin SendRecvClient, now=" + System.currentTimeMillis()); SendRecvClient client = new SendRecvClient(); client.sendRecvAsync("A text msg"); client.done.acquire(); client.stop(); log.info("End SendRecvClient"); System.exit(0); } } You can run the client by using the following command line: [examples]$ ant -Dchap=chap6 -Dex=1p2p run-example ... run-example1p2p: [java] [INFO,SendRecvClient] Begin SendRecvClient, now=1102808673386 [java] [INFO,SendRecvClient] Begin sendRecvAsync [java] [INFO,SendRecvClient] onMessage, recv text=A text msg [java] [INFO,SendRecvClient] sendRecvAsync, sent text=A text msg [java] [INFO,SendRecvClient] End sendRecvAsync [java] [INFO,SendRecvClient] End SendRecvClient A Pub-Sub ExampleThe JMS publish/subscribe (pub-sub) message model is a one-to-many model. A publisher sends a message to a topic, and all active subscribers of the topic receive the message. Subscribers that are not actively listening to the topic will miss the published message. Listing 6.2 shows a complete JMS client that sends javax.jms.TextMessage to a topic and asynchronously receives the message from the same topic. Listing 6.2. A Pub-Sub JMS Client Examplepackage org.jboss.chap6.ex1; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSubscriber; import javax.jms.TopicSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; import EDU.oswego.cs.dl.util.concurrent.CountDown; /** * A complete JMS client example program that sends a TextMessage to * a Topic and asynchronously receives the message from the same * Topic. * * @author Scott.Stark@jboss.org * @version $Revision: 1.4 $ */ public class TopicSendRecvClient { static CountDown done = new CountDown(1); TopicConnection conn = null; TopicSession session = null; Topic topic = null; public static class ExListener implements MessageListener { public void onMessage(Message msg) { done.release(); TextMessage tm = (TextMessage) msg; try { System.out.println("onMessage, recv text=" + tm.getText()); } catch(Throwable t) { t.printStackTrace(); } } } public void setupPubSub() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; conn = tcf.createTopicConnection(); topic = (Topic) iniCtx.lookup("topic/testTopic"); session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); conn.start(); } public void sendRecvAsync(String text) throws JMSException, NamingException { System.out.println("Begin sendRecvAsync"); // Setup the PubSub connection, session setupPubSub(); // Set the async listener TopicSubscriber recv = session.createSubscriber(topic); recv.setMessageListener(new ExListener()); // Send a text msg TopicPublisher send = session.createPublisher(topic); TextMessage tm = session.createTextMessage(text); send.publish(tm); System.out.println("sendRecvAsync, sent text=" + tm.getText()); send.close(); System.out.println("End sendRecvAsync"); } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin TopicSendRecvClient, now=" + System.currentTimeMillis()); TopicSendRecvClient client = new TopicSendRecvClient(); client.sendRecvAsync("A text msg, now="+System.currentTimeMillis()); client.done.acquire(); client.stop(); System.out.println("End TopicSendRecvClient"); System.exit(0); } } You can run the client by using the following command line: [examples]$ ant -Dchap=chap6 -Dex=1ps run-example ... run-example1ps: [java] Begin TopicSendRecvClient, now=1102809427043 [java] Begin sendRecvAsync [java] onMessage, recv text=A text msg, now=1102809427071 [java] sendRecvAsync, sent text=A text msg, now=1102809427071 [java] End sendRecvAsync [java] End TopicSendRecvClient Now let's break the publisher and subscribers into separate programs to demonstrate that subscribers only receive messages while they are listening to a topic. Listing 6.3 shows a variation of the pub-sub client from Listing 6.2 that only publishes messages to the topic/testTopic topic. The subscriber client is shown in Listing 6.4. Listing 6.3. A JMS Publisher Clientpackage org.jboss.chap6.ex1; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSubscriber; import javax.jms.TopicSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; /** * A JMS client example program that sends a TextMessage to a Topic * * @author Scott.Stark@jboss.org * @version $Revision: 1.4 $ */ public class TopicSendClient { TopicConnection conn = null; TopicSession session = null; Topic topic = null; public void setupPubSub() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; conn = tcf.createTopicConnection(); topic = (Topic) iniCtx.lookup("topic/testTopic"); session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); conn.start(); } public void sendAsync(String text) throws JMSException, NamingException { System.out.println("Begin sendAsync"); // Set up the pub/sub connection, session setupPubSub(); // Send a text msg TopicPublisher send = session.createPublisher(topic); TextMessage tm = session.createTextMessage(text); send.publish(tm); System.out.println("sendAsync, sent text=" + tm.getText()); send.close(); System.out.println("End sendAsync"); } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin TopicSendClient, now=" + System.currentTimeMillis()); TopicSendClient client = new TopicSendClient(); client.sendAsync("A text msg, now="+System.currentTimeMillis()); client.stop(); System.out.println("End TopicSendClient"); System.exit(0); } } Listing 6.4. A JMS Subscriber Clientpackage org.jboss.chap6.ex1; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSubscriber; import javax.jms.TopicSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; /** * A JMS client example program that synchronously receives a message a Topic * * @author Scott.Stark@jboss.org * @version $Revision: 1.4 $ */ public class TopicRecvClient { TopicConnection conn = null; TopicSession session = null; Topic topic = null; public void setupPubSub() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; conn = tcf.createTopicConnection(); topic = (Topic) iniCtx.lookup("topic/testTopic"); session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); conn.start(); } public void recvSync() throws JMSException, NamingException { System.out.println("Begin recvSync"); // Set up the pub/sub connection, session setupPubSub(); // Wait up to 5 seconds for the message TopicSubscriber recv = session.createSubscriber(topic); Message msg = recv.receive(5000); if (msg == null) { System.out.println("Timed out waiting for msg"); } else { System.out.println("TopicSubscriber.recv, msgt="+msg); } } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin TopicRecvClient, now=" + System.currentTimeMillis()); TopicRecvClient client = new TopicRecvClient(); client.recvSync(); client.stop(); System.out.println("End TopicRecvClient"); System.exit(0); } } You run TopicSendClient followed by TopicRecvClient as follows: [examples]$ ant -Dchap=chap6 -Dex=1ps2 run-example ... run-example1ps2: [java] Begin TopicSendClient, now=1102810007899 [java] Begin sendAsync [java] sendAsync, sent text=A text msg, now=1102810007909 [java] End sendAsync [java] End TopicSendClient [java] Begin TopicRecvClient, now=1102810011524 [java] Begin recvSync [java] Timed out waiting for msg [java] End TopicRecvClient The output shows that the topic subscriber client (TopicRecvClient) fails to receive the message sent by the publisher due to a timeout. An Example of a Pub-Sub with a Durable TopicJMS supports a messaging model that is a cross between the P2P and pub-sub models. When a pub-sub client wants to receive all messages posted to the topic it subscribes to, even when it is not actively listening to the topic, it may achieve this behavior by using a durable topic. Let's look at a variation of the Listing 6.4 subscriber client that uses a durable topic to ensure that it receives all messages, including those published when the client is not listening to the topic. Listing 6.5 shows the durable topic client. Listing 6.5. A Durable Topic JMS Client Examplepackage org.jboss.chap6.ex1; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSubscriber; import javax.jms.TopicSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; /** * A JMS client example program that synchronously receives a message a Topic * * @author Scott.Stark@jboss.org * @version $Revision: 1.4 $ */ public class DurableTopicRecvClient { TopicConnection conn = null; TopicSession session = null; Topic topic = null; public void setupPubSub() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; conn = tcf.createTopicConnection("john", "needle"); topic = (Topic) iniCtx.lookup("topic/testTopic"); session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); conn.start(); } public void recvSync() throws JMSException, NamingException { System.out.println("Begin recvSync"); // Set up the pub/sub connection, session setupPubSub(); // Wait up to 5 seconds for the message TopicSubscriber recv = session.createDurableSubscriber(topic, "chap6-ex1dtps"); Message msg = recv.receive(5000); if (msg == null) { System.out.println("Timed out waiting for msg"); } else { System.out.println("DurableTopicRecvClient.recv, msgt=" + msg); } } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin DurableTopicRecvClient, now=" + System.currentTimeMillis()); DurableTopicRecvClient client = new DurableTopicRecvClient(); client.recvSync(); client.stop(); System.out.println("End DurableTopicRecvClient"); System.exit(0); } } Now you can run the previous topic publisher with the durable topic subscriber, as follows:
Items of note for the durable topic example include the following:
An Example of P2P with MDBListing 6.6 shows a message-driven bean (MDB) that transforms the TextMessages it receives and sends the transformed messages to the queue found in the incoming message JMSReplyTo header. Listing 6.6. A TextMessage-Processing MDBpackage org.jboss.chap6.ex2; import javax.ejb.MessageDrivenBean; import javax.ejb.MessageDrivenContext; import javax.ejb.EJBException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; /** * An MDB that transforms the TextMessages it receives and sends the * transformed messages to the Queue found in the incoming message * JMSReplyTo header. * * @author Scott.Stark@jboss.org * @version $Revision: 1.4 $ */ public class TextMDB implements MessageDrivenBean, MessageListener { private MessageDrivenContext ctx = null; private QueueConnection conn; private QueueSession session; public TextMDB() { System.out.println("TextMDB.ctor, this="+hashCode()); } public void setMessageDrivenContext(MessageDrivenContext ctx) { this.ctx = ctx; System.out.println("TextMDB.setMessageDrivenContext, this=" + hashCode()); } public void ejbCreate() { System.out.println("TextMDB.ejbCreate, this="+hashCode()); try { setupPTP(); } catch (Exception e) { throw new EJBException("Failed to init TextMDB", e); } } public void ejbRemove() { System.out.println("TextMDB.ejbRemove, this="+hashCode()); ctx = null; try { if (session != null) { session.close(); } if (conn != null) { conn.close(); } } catch(JMSException e) { e.printStackTrace(); } } public void onMessage(Message msg) { System.out.println("TextMDB.onMessage, this="+hashCode()); try { TextMessage tm = (TextMessage) msg; String text = tm.getText() + "processed by: "+hashCode(); Queue dest = (Queue) msg.getJMSReplyTo(); sendReply(text, dest); } catch(Throwable t) { t.printStackTrace(); } } private void setupPTP() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("java:comp/env/jms/QCF"); QueueConnectionFactory qcf = (QueueConnectionFactory) tmp; conn = qcf.createQueueConnection(); session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); conn.start(); } private void sendReply(String text, Queue dest) throws JMSException { System.out.println("TextMDB.sendReply, this=" + hashCode() + ", dest="+dest); QueueSender sender = session.createSender(dest); TextMessage tm = session.createTextMessage(text); sender.send(tm); sender.close(); } } The MDB ejb-jar.xml and jboss.xml deployment descriptors are shown in Listings 6.7 and 6.8, respectively. Listing 6.7. The MDB ejb-jar.xml Descriptor<?xml version="1.0"?> <!DOCTYPE ejb-jar PUBLIC "-//Sun Microsystems, Inc.//DTD Enterprise JavaBeans 2.0//EN" "http://java.sun.com/dtd/ejb-jar_2_0.dtd"> <ejb-jar> <enterprise-beans> <message-driven> <ejb-name>TextMDB</ejb-name> <ejb-class>org.jboss.chap6.ex2.TextMDB</ejb-class> <transaction-type>Container</transaction-type> <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode> <message-driven-destination> <destination-type>javax.jms.Queue</destination-type> </message-driven-destination> <res-ref-name>jms/QCF</res-ref-name> <resource-ref> <res-type>javax.jms.QueueConnectionFactory</res-type> <res-auth>Container</res-auth> </resource-ref> </message-driven> </enterprise-beans> </ejb-jar> Listing 6.8. The MDB jboss.xml Descriptor<?xml version="1.0"?> <jboss> <enterprise-beans> <message-driven> <ejb-name>TextMDB</ejb-name> <destination-jndi-name>queue/B</destination-jndi-name> <resource-ref> <res-ref-name>jms/QCF</res-ref-name> <jndi-name>ConnectionFactory</jndi-name> </resource-ref> </message-driven> </enterprise-beans> </jboss> Listing 6.9 shows a variation of the P2P client that sends several messages to the queue/B destination and asynchronously receives the messages as modified by TextMDB from Queue A. Listing 6.9. A JMS Client That Interacts with TextMDBpackage org.jboss.chap6.ex2; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; import EDU.oswego.cs.dl.util.concurrent.CountDown; /** * A complete JMS client example program that sends N TextMessages to * a Queue B and asynchronously receives the messages as modified by * TextMDB from Queue A. * * @author Scott.Stark@jboss.org * @version $Revision: 1.4 $ */ public class SendRecvClient { static final int N = 10; static CountDown done = new CountDown(N); QueueConnection conn; QueueSession session; Queue queA; Queue queB; public static class ExListener implements MessageListener { public void onMessage(Message msg) { done.release(); TextMessage tm = (TextMessage) msg; try { System.out.println("onMessage, recv text="+tm.getText()); } catch(Throwable t) { t.printStackTrace(); } } } public void setupPTP() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); QueueConnectionFactory qcf = (QueueConnectionFactory) tmp; conn = qcf.createQueueConnection(); queA = (Queue) iniCtx.lookup("queue/A"); queB = (Queue) iniCtx.lookup("queue/B"); session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); conn.start(); } public void sendRecvAsync(String textBase) throws JMSException, NamingException, InterruptedException { System.out.println("Begin sendRecvAsync"); // Set up the PTP connection, session setupPTP(); // Set the async listener for queA QueueReceiver recv = session.createReceiver(queA); recv.setMessageListener(new ExListener()); // Send a few text msgs to queB QueueSender send = session.createSender(queB); for(int m = 0; m < 10; m ++) { TextMessage tm = session.createTextMessage(textBase+"#"+m); tm.setJMSReplyTo(queA); send.send(tm); System.out.println("sendRecvAsync, sent text=" + tm.getText()); } System.out.println("End sendRecvAsync"); } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin SendRecvClient,now=" + System.currentTimeMillis()); SendRecvClient client = new SendRecvClient(); client.sendRecvAsync("A text msg"); client.done.acquire(); client.stop(); System.exit(0); System.out.println("End SendRecvClient"); } } You can run the client as follows: [examples]$ ant -Dchap=chap6 -Dex=2 run-example ... run-example2: [copy] Copying 1 file to /tmp/jboss-4.0.1/server/default/deploy [echo] Waiting 5 seconds for deploy... [java] Begin SendRecvClient, now=1102900541558 [java] Begin sendRecvAsync [java] sendRecvAsync, sent text=A text msg#0 [java] sendRecvAsync, sent text=A text msg#1 [java] sendRecvAsync, sent text=A text msg#2 [java] sendRecvAsync, sent text=A text msg#3 [java] sendRecvAsync, sent text=A text msg#4 [java] sendRecvAsync, sent text=A text msg#5 [java] sendRecvAsync, sent text=A text msg#6 [java] sendRecvAsync, sent text=A text msg#7 [java] sendRecvAsync, sent text=A text msg#8 [java] sendRecvAsync, sent text=A text msg#9 [java] End sendRecvAsync [java] onMessage, recv text=A text msg#0processed by: 12855623 [java] onMessage, recv text=A text msg#5processed by: 9399816 [java] onMessage, recv text=A text msg#9processed by: 6598158 [java] onMessage, recv text=A text msg#3processed by: 8153998 [java] onMessage, recv text=A text msg#4processed by: 10118602 [java] onMessage, recv text=A text msg#2processed by: 1792333 [java] onMessage, recv text=A text msg#7processed by: 14251014 [java] onMessage, recv text=A text msg#1processed by: 10775981 [java] onMessage, recv text=A text msg#8processed by: 6056676 [java] onMessage, recv text=A text msg#6processed by: 15679078 The corresponding JBoss server console output looks like this:
Items of note in this example include the following:
|