The Retail Stock Brokerage Application

Let us now look at the different features available in Pub/Sub programming by implementing a real-world JMS application.

Before we start off please be aware that the intention here is to present as many JMS features as possible. This will demonstrate the array of options available to you when developing JMS applications and is not designed to allow you to build fancy applications immediately available for deployment. A lot of error checking has been removed to present relevant information and keep the discussions pertinent to JMS, so that we don't digress from the topic at hand. Similarly, there is really no need to use multiple message types in this application, but they have been included to demonstrate their usage.

Specification

The eCommWare Corporation (from Chapter 4) also provides retail stock brokerage services. Being a typical stock brokerage, they have to handle vast quantities of financial data and needs ten to thousands of instruments (anything of value used as a medium of exchange to buy or sell is an instrument.). This example illustrates event information traveling through the messaging server's event management system at a retail stock brokerage firm. The scenario assumes that the client has already connected to the net and has downloaded the sales order. The main tasks for this application are:

  • Event generation

    A consumer discovers a hot technology and places an order with the account representative to buy a certain number of shares of stock.

  • Information gathering

    The account representative completes a form in the sales order application.

  • Evaluating the event's context

    The sales order application publishes a request-for-reply message on the Consumer:Check topic. This message is forwarded to all the clients that have active subscriptions to this topic. In this case, the active subscribers include the accounts department that checks the consumer's accounts and credit line to verify that the required funds are available to support the stock purchase. The accounts department has to reply to the request with an approval. Likewise, the compliance department has to approve the trade, and the risk management department also has to approve the trade. At this point, the business process can move to the next stage.

  • Notifying the organization

    The sales order application handles the reply (the credit verification and trade approval) by sending a message to the Equity:Trade topic.

  • Executing the trade

    The trading application on the exchange floor is the subscriber to the Equity:Trade topic. It executes the buy and publishes a message about the purchase to the Trade:Price topic.

  • Propagating information

    The organization propagates information on the event and takes action for closure. Several applications now receive information on the Trade:Price topic. The client management application adjusts the customer's records in the database to reflect the purchase. The accounting application issues an invoice for the stock and logs a receivable. A payroll application adjusts the account representative's commission.

The figure below illustrates this stock brokerage system diagrammatically:

click to expand

The Constants Class

All the modules in the system use the Constants class to retrieve static constant values:

     package pubsub.retailbrokerage;     public class Constants {         static final String PROVIDER_URL =          "http://localhost:2001"; // For FioranoMQ version 4.6         // "http://localhost:1856"; // For FioranoMQ version 5.0         static final String USER_NAME = "athul";         static final String PASSWORD = "raj";         // The initialContextFactory class of the JMS Provider         static final String FACTORY =          "fiorano.jms.rtl.FioranoInitialContextFactory"; // FMQ 4.6         // "fiorano.jms. runtime.naming.FioranoInitialContextFactory"; // FMQ 5.0         static final String CONNECTION_FACTORY = "retailbrokerage:tcf";         static final String PRICE_TOPIC = "trade:price";         static final String CHECK_TOPIC = "consumer:check";         static final String TRADE_TOPIC = "equity:trade";         public final static String STOCK_BUY = "BUY";         public final static String STOCK_SELL = "SELL";         public final static int APPROVED = 0;         public final static int REJECTED = 1;         static final int HIGHEST_PRIORITY_MESSAGE = 9;     } 

The Approval Requestor

The first step in the solution is for an approval requestor to publish a request-for-reply message on the Consumer:Check topic requesting the approval department to approve the trade.

The approval requestor creates a topic requestor and publishes an object message on it. This object message contains in its body a serializable object of class Instruction. The Instruction class contains all the values required for the approval department to approve or reject the request. It has appropriate get/set methods to access its data members.

The approval requestor also sets some properties of this object message. It creates a temporary topic and passes the temporary topic reference in the JMSReplyTo field of the object message. This is to ensure that when the approval department replies to the message, the reply will reach this temporary topic, which can then be deleted as soon as the approval requestor receives the approval message back from the approval department.

Similarly, the JMSCorrelationID property is also set for the object message. This field serves to identify the message in question and serves to act as the subject field. The approval requestor sets the JMSCorrelationID field with the customer's name. The approval department, while replying back to the request-for-reply message will set this field and send the message back as though it were the regarding subject field.

The reply message is a stream message object. The stream message has a body, which can contain an integer denoting either an Instruction.APPROVED value, in which case the trade is approved, or an Instruction.REJECTED in which case the permission to trade was denied.

The figure below shows the scenario that has just been described, where the approval requestor sends an object message to the Consumer:Check topic requesting permission to trade and gets a response back from the temporary topic with a stream message on whether permission to trade was approved or denied:

click to expand

As soon as the approval requestor receives the approval message, the temporary topic is deleted and the integer value containing the permission is passed off to the caller.

The source code for the approval requestor is shown below:

     package pubsub.retailbrokerage;     import javax.jms.*;     import javax.naming.*;     import common.*;     public class ApprovalRequestor {         private TopicConnectionFactory connectionFactory;         private TopicConnection connection;         private Topic topic;         private TopicSession session;         private TopicRequestor requestor;         private String hostName;         private String userID;         private String password;         private static final String CONTEXT_FACTORY = Constants.FACTORY;         private static final String CONNECTION_FACTORY =                                                Constants.CONNECTION_FACTORY;         private static final String PUBLISHER_TOPIC = Constants.CHECK_TOPIC; 

The constructor of the ApprovalRequestor class takes in multiple parameters. In these are the hostname or IP:Port of the remote-running JMS server to connect to, and the username and password of a valid user for the JMS Server:

     public ApprovalRequestor(String host, String user,                        String passwd) throws NamingException {         hostName = host;         userID = user;         password' = passwd;         createConnections();        }        public void createConnections() throws NamingException {         try { 

Recall that we saw the JNDIService class in earlier chapters. To refresh your memory, it is a class that retrieves the topic connection factory and the topic. The host name is used to retrieve the Connection Factory object and the destination or topic reference. This way, if the messaging server is running on a remote host, it is possible to specify the host name so that a connection is established with the remote server machine:

       JNDIService.init(CONTEXT_FACTORY, hostName);             connectionFactory =              (TopicConnectionFactory) JNDIService.lookup(CONNECTION_FACTORY);             topic = (Topic) JNDIService. lookup (PUBLISHER_TOPIC): 

The username and password are used to authenticate and authorize the user so that only valid users can create a connection to the JMS messaging server and unauthorized access is denied. The code below shows a topic connection being created with the user name and password:

       connection = connectionFactory.createTopicConnection(userID,                   password);              connection.setExceptionListener(new ExceptionHandler());              connection.start();              session = connection.createTopicSession(false,                                       Session.AUTO_ACKNOWLEDGE); 

If no credentials are supplied while creating the connection, the JMS specification says that the current thread's credentials are used. However, if the password supplied does not match, the connection is rejected and this causes the provider runtime present on the client to throw a javax.jms.JMSSecurityException. Similarly, if the username sent in the login packet is not found in the repository, the messaging server will reject the connection, and cause the client runtime to throw a javax.jms.JMSSecurityException:

       } catch (JMSException exception) {              exception.printStackTrace();          }         }         public void closeConnections() {         try {             if (requestor != null) {              requestor.close();             }             if (session != null) {              session.close();             }             if (connection != null) {              connection.close();             }         } catch (Exception exception) {             exception.printStackTrace();         }       } 

The method that sends a request-for-reply message stays blocked until it receives a reply, and returns the result:

       public int getApproval(Instruction instruction) {           int approval = Constants.REJECTED;           try {               // create a requestor object if one does not exist               if (requestor == null) {                requestor = new TopicRequestor(session, topic);               }               // create Object Message               ObjectMessage objectMessage = session.createObjectMessage(); 

A temporary topic is a temporary destination, created for the duration of a topic connection. A Session object is used to create a temporary topic:

               TemporaryTopic temporaryTopic = session.createTemporaryTopic(); 

Even though a Session object is used to create a temporary topic, their lifetime is that of the connection on which they were created.

The ApprovalRequestor class uses a temporary topic for receiving replies to its request. Therefore, it sets the temporary topic reference in the object message's JMSReplyTo field. This directs the replier to reply to the request on that destination:

         objectMessage.setJMSReplyTo(temporaryTopic);               String name = instruction.getCustomerName();               objectMessage.setJMSCorrelationID(name);               objectMessage.setObject((java.io.Serializable) instruction);               // receive a Stream Message               StreamMessage streamMessage =                (StreamMessage) requestor.request(objectMessage);               // get the JMSCorrelationID property               String correlationID = streamMessage.getJMSCorrelationID();             // read the message body             approval = streamMessage.readInt();             if ((approval == Constants.APPROVED)                  && (correlationID.equalsIgnoreCase(name) == true)) {              System.out.println("Received : Permission to Trade Equity for "                           + name + " Granted.");             } else {              System.out.println("Received : Permission to Trade Equity for "                           + name + " Denied.");             } 

A temporary topic is a unique type of object in that it is the only type of destination that can be created at the client side without requiring that the destination is first pre-created by an administrative tool. A unique feature of this is that its their own connection can be used to create a message consumer for it, which naturally means only sessions of the same connection can consume messages from it too. Each temporary topic is unique and cannot be copied.

It is very important to clean up resources as soon as they are no longer needed. This improves both scalability and efficiency of the application. As these temporary destinations consume resources outside the JVM, it is better to delete them as soon as they are no longer needed. However, the publisher, sessions, and connection are automatically closed and the garbage collector releases all resources when the program terminates:

     temporaryTopic.delete();          } catch (Exception exception) {              exception.printStackTrace();          }          return approval;         }         public static void main(String[] args) {          int approval = Constants.REJECTED;          if (args.length < 3) {              System.out.println("\n\tUsage: java "                           + "pubsub.retailbrokerage.ApprovalRequestor "                           + "hostName userID password");              System.out.println("\tExiting Program ...");              System.exit(-1);          }          String hostName = args[0];          String userID = args[1];          String password = args[2];          // create test data and populate the Instruction object          Instruction instruction = null;          instruction = new Instruction("Athul", "ECWR", Constants.STOCK_BUY,                               100.90, 1000);          try {              // create ApprovalRequestor              ApprovalRequestor requestor = null;              requestor = new ApprovalRequestor(hostName, userID, password);              // Request for approval              approval = requestor.getApproval (instruction);          } catch (NamingException exception) {              exception.printStackTrace();          } catch (Exception exception) {              exception.printStackTrace();          }        } 

If a JMS provider detects a serious problem with a connection this method is invoked passing it a JMSException describing the problem:

       class ExceptionHandler implements ExceptionListener {         public void onException(JMSException exception) {             exception.printStackTrace();         }       }     } 

The Instruction Class

While publishing messages to the Consumer:Check topic using an object message, we need to set the body of the object message with a serializable object. The type of information that needed to be encapsulated in this object involved all the relevant information needed to complete a stock trade. This included the customer name, the stock symbol, the instruction to buy or sell stock, the unit price of each stock, and the number of stocks to trade (buy or sell).

The Instruction class is serializable and contains all the relevant information to complete a stock trade. It has get/set methods to retrieve each of its data members. The source code for the Instruction class is listed below.

In the following piece of code, I have used floating point for currency (price). I realize that there are too many opportunities for errors, inaccuracy, and fraud if floating points are used in such cases. Masked decimal is far more accurate. However, since this is a demo, I'm sure you won't mind it:

     package pubsub.retailbrokerage;     public class Instruction implements java.io.Serializable {         private String customerName;         private String stockSymbol;         private String instruction;         private double unitPrice;         private int numberOfStocks;         public Instruction() {          customerName = "";          stockSymbol = "";          instruction = Constants.STOCK_BUY;          unitPrice = 0;          numberOfStocks = 0;         }         public Instruction(String customer, String symbol, String buyOrSell,                      double price, int number) {          customerName = customer;          stockSymbol = symbol;          instruction = buyOrSell;          unitPrice = price;          numberOfStocks = number;         }         public String getCustomerName() {          return customerName;         }         public String getStockSymbol() {          return stockSymbol;         }         public String getInstructions() {          return instruction;         }         public double getUnitPrice() {          return unitPrice;         }         public int getStockNumber() {          return numberOfStocks;         }         public void setCustomerName(String name) {          customerName = name;         }         public void setStockSymbol(String symbol) {          stockSymbol = symbol;         }         public void setInstructions(String buyOrSell) {          instruction = buyOrSell;         }         public void setUnitPrice(double price) {          unitPrice = price;         }         public void setStockNumber(int number) {          numberOfStocks = number;         }     } 

The Instruction class is used wherever complete information about the stock trade is required in the program.

The Approval Department

The next step in the solution is for the approval department that subscribes to the Consumer:Check topic to evaluate the customer's credit, either approve or reject the trade, and inform the requestor about the decision.

The approval department subscribes for messages from the Consumer:Check topic, and installs an asynchronous message listener with an overloaded callback function that gets called by the server whenever a new message arrives from the topic.

When an object message is received from the Consumer:Check topic, the class retrieves the body from the message to evaluate the request. A stream message that has an integer indicating whether the trade is approved or not is placed in its message body. If the consumer name is "Suresh" and he wants to buy stock, a rejection is recorded in the body probably because he has bad credit, or because he has a criminal record. For all other requests, an approval is granted.

During this time, the JMSReplyTo field from the original object message is read and the temporary destination to send the message to is noted. Similarly, the JMSCorrelationID property is also noted to set the regarding field.

The stream message's JMSCorrelationID is set with the customer name and the body is populated with the approval information and the reply is sent off to the temporary topic.

The figure below shows the scenario that I just described, where the approval department receives an object message from the Consumer:Check topic requesting permission to trade. The JMSReplyTo field specifies the temporary topic where the ApprovalDepartment class has to publish the reply with an approval message (using a stream message):

click to expand

The ApprovalDepartment class installs itself as a message handler that receives messages asynchronously for the bound receiver. The ApprovalDepartment class implements the MessageListener JMS interface. This is the first requirement for any class aspiring to become an asynchronous message consumer.

By implementing the MessageListener interface, the class is forced to provide an implementation for the onMessage() method. This method is the registered callback method that the JMS server invokes, in case there is a new message available for consumption at the topic:

     package pubsub.retailbrokerage;     import javax.jms.*;     import javax.naming.*;     import java.io.*;     import java.net.*;     import java.util.*;     import common.*;     public class ApprovalDepartment implements MessageListener {         private TopicConnectionFactory factory;         private TopicConnection connection;         private Topic topic;         private TopicSession session;         private TopicPublisher replier;         private String hostName;         private String userID;         private String password;         private ApprovalHandler caller;         private static final String CONTEXT_FACTORY = Constants.FACTORY;         private static final String CONNECTION_FACTORY =          Constants.CONNECTION_FACTORY;         private static final String SUBSCRIBER_TOPIC = Constants.CHECK_TOPIC;         public ApprovalDepartment(String host, String ID, String passwd) {          hostName = host;          userID = ID;          password = passwd;          initialize() ;         }         public ApprovalDepartment(ApprovalHandler callbackImplementor) {          caller = callbackImplementor;          hostName = Constants.PROVIDER_URL;          userID = Constants.USER_NAME;          password = Constants.PASSWORD;          initialize();         }         // Retrieve the connection factory and the topic references         private void initialize() {          try {              JNDIService.init(CONTEXT_FACTORY, hostName;              factory =               (TopicConnectionFactory) JNDIService.lookup(CONNECTION_FACTORY);              topic = (Topic) JNDIService.lookup(SUBSCRIBER_TOPIC);          } catch (Exception exception) {              exception.printStackTrace();          }         } 

The approval department subscribes for messages from the Consumer:Check topic, and installs an asynchronous message listener with an overloaded callback function that gets called by the server whenever a new message arrives from the topic. Note that the message listener is registered with the provider through the Topic Subscriber object as soon as it is created by a call to its setMessageListener() method:

     public void createConnections() {         try {             // Create and start a Topic connection             connection = factory.createTopicConnection();             connection.setClientID("ApprovalDepartment");             // Create a Topic session on this connection             session = connection.createTopicSession(false,                                      Session.AUTO_ACKNOWLEDGE);             // Create a Topic Receiver             TopicSubscriber topicSubscriber = session.createSubscriber (topic);             // Install an asynchronous listener/callback on the Receiver object             // just created             topicSubscriber.setMessageListener(this);             connection.start();             System.out.println("Approval Department, up and ready for eCommerce");         } catch (Exception exception) {             exception.printStackTrace();         }        }        // Close all open resources        public void closeConnections() {         try {             if (replier != null) {              replier.close();             }             if (session != null) {              session.close();             }             if (connection != null) {              connection.close();             }         } catch (JMSException exception) {             exception.printStackTrace();         }        } 

The displayUsage() method gives the user information on how to run the ApprovalDepartment class if they did not supply a hostname, a username or a password:

       private static void displayUsage() {          System.out.println("\n\tUsage : java"                       + "Chapter05.RetailBrokerage.ApprovalDepartment "                       + "hostName UserName Password");          System.out.println("\t Terminating Program ...");          System.exit (-1);         }         public static void main(String[] args) {          if (args.length < 3) {              ApprovalDepartment.displayUsage();              System.exit(1);          }          String hostName = args [0];          String userID = args [1];          String password = args [2];          ApprovalDepartment approval = new ApprovalDepartment(hostName,               userID, password);          approval.createConnections();       } 

The onMessage() method contains the code for retrieving the properties and the object body:

     public void onMessage(Message objectMessage) {         try {             ObjectMessage request = (ObjectMessage) objectMessage;             String correlationID = request.getJMSCorrelationID();             System.out.print("Re: Customer " + correlationID);             Instruction instruction = (Instruction) request.getObject(); 

When an application sends a message and expects to receive a message in return, request/reply messaging can be used. This is the standard synchronous object-messaging format. JMS does not explicitly support request-reply messaging, though it allows it in the context of the other two messaging models.

At the approval department end (the subscriber end), a non-transacted session is started on the connection so that messages can be sent to the topic Consumer:Check. A subscriber is then created and a message handler is also registered as a callback. When a message is received, the handler uses the getJMSReplyTo() method to find to whom to send back a reply. It then retrieves the temporary topic reference from the JMSReplyTo property and sends the message on the temporary topic:

         Topic temporaryTopic =                (Topic) objectMessage.getJMSReplyTo();               StreamMessage reply = session.createStreamMessage();               reply.setJMSCorrelationID(correlationID); 

If customer name is "Suresh" and he wants to buy stock, reject it:

         if ((instruction.getCustomerName().equalsIgnoreCase("Suresh") == true)                    && (instruction.getInstructions().equalsIgnoreCase("BUY")                     == true)) {                reply.writeInt(Constants.REJECTED);                if (caller != null) { 

Invoke the callback function to inform the approval requestor that the request has been denied:

             caller.processInformation(Constants.REJECTED,                                      instruction);                 }                 System.out.println(". : Insufficient Funds. "                              +"Sending a Rejection to the Requestor ...");                } else {                 reply.writeInt(Constants.APPROVED);                 if (caller != null) { 

Invoke the callback function to inform the approval requestor that the request has been approved:

     caller.processInformation(Constants.APPROVED,                                    instruction);               }               System.out.println(". : Sending an Approval to the Requestor ...");              } 

If there is not a publisher, create one and send the reply to the temporary topic:

       if (replier == null) {               replier = session.createPublisher(topic);              }              replier.publish(temporaryTopic, reply);          } catch (Throwable exception) {              exception.printStackTrace();          }         }     } 

The ApprovalHandler Interface

Classes that need to implement a callback for the ApprovalDepartment class use the ApprovalHandler interface. The ApprovalHandler interface has a method called processInformation() that needs to be implemented by whichever class has to be called back when an approval or rejection is sent:

     package pubsub.retailbrokerage;     public interface ApprovalHandler {         public void processInformation(int approvedOrDenied,                            Instruction information);     } 

We will see more about this interface when we discuss an applet implementation of the retail stock brokerage application.

The Trader

Having received approval to trade from the approval department, the Trader class is used to publish stock trade instructions on what stocks to buy or sell for which customer on the Equity:Trade topic.

The Trader class creates a topic publisher and publishes a persistent bytes message transactionally. There is no particular reason for creating a bytes message here. The Trader class could create a message of any type as long as it is assured that the receiver is capable of handling and interpreting it. Once it creates the bytes message, it fills the message's property values. It sets the JMSCorrelationID property to the name of the customer. Similarly it creates user-defined message properties like customerName, stockSymbol, instructions, unitPrice, and stockNumber, before mapping them to their appropriate values.

There is a very important reason why the instructions are sent in the properties, rather than in the body. You might now be asking, if these instructions are sent as properties in the header, why were they not just header properties all along? Nothing prevents you from embedding the instructions as properties all the time. This is done to illustrate the multiple coding options at your disposal.

The overwhelming reason that the instructions are sent as properties is that the subscriber to the Equity:Trade topic filters messages that it subscribes to and is only interested in looking at certain types of messages.

As we saw in Chapter 4, message selectors do not examine the message body but they examine the message properties and headers to determine if they need to send a message to the subscriber or not. Therefore, the Trader class populates the bytes message properties with relevant information that can be used by the message selectors to filter the message.

Having populated the message property fields with the relevant information, the Trader class publishes the message on the Equity:Trade topic. It then commits the transaction by invoking commit(). The reason transactions are used while publishing messages is that if there were any failure, the system would be brought back to a stable state after the failure.

The bytes message that is sent out by the Trader class is also PERSISTENT. By making it so, the Trader ensures that if something bad happened and the intended receiver did not receive the message, the message would be resent.

The figure below shows the Trader class sending a PERSISTENT bytes message transactionally to the Equity:Trade topic:

click to expand

The source code for the Trader class is shown below:

     package pubsub.retailbrokerage;     import javax.jms.*;     import javax.naming.*;     import common.*;     public class Trader {         private TopicConnectionFactory factory;         private TopicConnection connection;         private Topic Topic;         private TopicSession session;         private TopicPublisher publisher;         private String hostName;         private String userID;         private String password;         private static final String CONTEXT_FACTORY = Constants.FACTORY;         private static final String CONNECTION_FACTORY =          Constants.CONNECTION_FACTORY;         private static final String PUBLISHER_TOPIC = Constants.TRADE_TOPIC;         public Trader(String host, String user,                 String passwd) throws NamingException {          hostName = host;          userID = user;          password = passwd;          createConnections();         }         private void createConnections() {          try {              JNDIService.init(CONTEXT_FACTORY, hostName);              // retrieve the connection factory object              factory =               (TopicConnectionFactory) JNDIService.lookup(CONNECTION_FACTORY);              topic = (Topic) JNDIService.lookup(PUBLISHER_TOPIC);              connection = factory.createTopicConnection(userID, password);              connection.setExceptionListener(new ExceptionHandler()); 

When a session is created as transacted, unsurprisingly transactions are created. A transacted session is created by the first parameter passed in to the createTopicSession() method. A value of true for the first parameter makes this a transacted session.

The first message sent on a transacted session starts off a new transaction. All subsequent messages sent on that session then become part of this transaction, until a Session.commit() or a Session.rollback() call is invoked. After the call to commit or rollback the next message sent on the connection starts off a new transaction, whose scope continues until the next call to commit or rollback:

         session = connection.createTopicSession(true,                                        Session.AUTO_ACKNOWLEDGE);               connection.start();               System.out.println("Trader, up and ready for eCommerce ...");           } catch (Exception exception) {               exception.printStackTrace();           }          }          public void performTrade(Instruction instruction) throws Exception {           try {               // Create a Publisher if one does not exist               if (publisher == null) {                publisher = session.createPublisher(topic);               } 

The body of a bytes message contains a stream of uninterpreted bytes. Although nothing prevents us from adding properties to a bytes message, it is typically not done when there is a message body for fear that the inclusion of properties may affect the format of the content in the message body. However, since we are not passing anything in the body of the message, we can add anything we want to its property fields.

A bytes message is generally used for literally copying the body of one of the other four message types. Normal applications, however, use one of the other four message types to perform their operations, and use this message type to transfer raw data from a disk file to a different machine or location. This is a fairly common practice when messages are being reused.

The Trader class uses the bytes message to send information in its property fields that can be retrieved at the other end:

         System.out.println("Performing Trade for "                            + instruction.getCustomerName()                            + " transactionally. Now ...");               // create a Bytes Message               BytesMessage bytesMessage = session.createBytesMessage();               bytesMessage.setJMSCorrelationID(instruction.getCustomerName());               // set message properties               bytesMessage.setStringProperty("customerName",                                  instruction.getCustomerName());               bytesMessage.setStringProperty("stockSymbol",                                  instruction.getStockSymbol());               bytesMessage.setStringProperty ("instructions",                                  instruction.get Instructions());               bytesMessage.setDoubleProperty ("unitPrice",                                  instruction.getUnitPrice());               bytesMessage.setIntProperty ("stockNumber",                                  instruction.getStockNumber()); 

In this example the message is said to be of the highest priority because the priority parameter is set to 9. This is a directive to the messaging system to expedite delivery of this message over other messages. If two or more messages are set to the highest priority, the provider's implementation decides which messages are delivered before the others:

               // publish the message               publisher.publish(bytesMessage, DeliveryMode.PERSISTENT,                           Constants.HIGHEST_PRIORITY_MESSAGE, // value of 9               Message.DEFAULT_TIME_TO_LIVE) ;               // commit the transaction               session.commit();               System.out.println(instruction.getInstructions());           } catch (Exception exception) {               session.rollback();               exception.printStackTrace();           }          }          public void closeConnections() {           try {               // send a message that I'm going away               sendQuitMessage();               if (publisher != null) {                publisher.close();               }               if (session != null) {                session.close();               }               if (connection != null) {                connection.close();               }           } catch (Exception exception) {               exception.printStackTrace();           }          } 

As soon as a commit() call is invoked, all the messages sent until that point are packaged into a transaction and are sent out to the server. However, if the transaction is rolled back, all produced messages until that point are destroyed, and all the consumed messages are automatically recovered. A commit or rollback call signifies the end of the current transaction and automatically starts off the next transaction.

In the following code sample, we have obtained a reference to a transacted Session object called session and we have created a Publisher object called publisher. The message textMessage is not sent off immediately as soon as the publish() method is invoked on the Publisher object. Instead however, they are buffered in an internal storage of the provider's client run-time library until either a commit() or a rollback() operation is invoked. If the client dies before committing or rolling back the transaction and the cache is dumped, the transaction fails, and the operation is retried the next time that the client comes up.

As such, commit() or a rollback() invocation signifies the end of the current transaction and all subsequent operations on the session automatically become part of the next transaction. In essence, a transacted session always has a current transaction within which its work is done:

     private void sendQuitMessage() {         try {             // create a Text Message             TextMessage textMessage = session.createTextMessage();             textMessage.setStringProperty("instructions", "QUIT");             textMessage.setText("QUIT");             // publish the message             publisher.publish(textMessage, DeliveryMode.NON_PERSISTENT,                         Message.DEFAULT_PRIORITY,                         Message.DEFAULT_TIME_TO_LIVE);             // commit the transaction             session.commit();         } catch (Exception exception) {             exception.printStackTrace();         }        } 

The method doAnother() asks the user if they want to perform another trade and conveys their response back:

     public boolean doAnother() {          String another = "yes";          boolean result = false;          try {              java.io.InputStreamReader in =               new java.io.InputStreamReader(System.in);              java.io.BufferedReader stdin = new java.io.BufferedReader(in);              System.out.print("\nWant to do another Trade? (\"yes\" or \"no\") : ");              another = stdin.readLine().trim();              if (another.toLowerCase().startsWith("y") == true) {               result = true;              }          } catch (Exception exception) {              exception.printStackTrace();          }           return result;          }          // Get input from the user regarding the trade (s)he wants to perform          public Instruction getTradeInput() {           // Read all standard input and send it as a message.           String name = "Athul";           String symbol = "ECWR";           String direction = "BUY"           double unitPrice = 100.90;           String number = "1000";           try {               java.io.InputStreamReader in =                new java.io.InputStreamReader(System.in);               java.io.BufferedReader stdin = new java.io.BufferedReader(in);               System.out.print("\nEnter Customer Name : ");               name = stdin.readLine().trim();               System.out.print("Enter Stock Symbol : ");               symbol = stdin.readLine().trim();               System.out.print("Enter Instruction (\"buy\" or \"sell\") : ");               direction = stdin.readLine().trim();               System.out.print("Enter number of stock to trade : ");               number = stdin.readLine().trim();           } catch (Exception exception) {               exception.printStackTrace();           }           String command = Constants.STOCK_BUY;           if (direction.toLowerCase().startsWith("s") == true) {               command = Constants.STOCK_SELL;           }           int value = (new Integer(number)).intValue();           Instruction instruction = new Instruction(name, symbol, command,                                      unitPrice, value);           return instruction;          }          public static void main(String[] args) {           if (args.length < 3) {               displayUsage();           }          String hostName = args[0];          String userID = args[1];          String password = args[2];          try {              Trader trader = new Trader(hostName, userID, password);              Instruction instruction = null;              boolean doAnother = false;              do {               instruction = trader.getTradeInput();               trader.performTrade(instruction);               doAnother = trader.doAnother();              } while (doAnother == true);              trader.closeConnections();          } catch (NamingException exception) {              exception.printStackTrace();          } catch (Exception exception) {              exception.printStackTrace();          }         } 

The displayUsage() method gives the user information on how to run the Trader class if they did not supply a hostname, a username, or a password:

      private static void displayUsage() {          System.out.printIn("\n\tUsage : java "                       + "Chapter05.RetailBrokerage.Trader "                       + "hostName UserName Password");          System.out.printIn("\t Terminating Program ...");          System.exit(-1);         }         // The class that implements the callback method         class ExceptionHandler implements ExceptionListener {          public void onException(JMSException exception) {              exception.printStackTrace();          }         }     } 

The Sales Order Application

The sales order application gets input from the user, gets the approval to trade from the approval department and uses the trader to send out trade instructions on the Equity:Tradetopic. In a real-world application, however, the sales order application would not directly post trade requests (for security reasons), but for simplicity, let's assume it does.

The steps that the sales order goes through before sending a message to the Equity:Trade topic with a directive to buy or sell stock are as follows:

  • The approval requestor requests the approval department for permission to trade on the Consumer:Check topic.

  • The approval department responds with an approval or a rejection.

  • If an approval is obtained, the trader publishes the trade instruction on the Equity:Trade topic.

The figure below shows the scenarios that we just discussed:

click to expand

The sales order application gets input from the user, gets the approval to trade from the approval department through the Consumer:Check Topic, and uses the trader to send out trade instructions on the Equity:Trade topic.

The sales order application implements the ApprovalHandler interface. This interface contains a method called processInformation() that takes a couple of parameters – an integer that specifies whether the sale was approved or rejected, and an Instruction object containing the details of the trade. The sales order applet creates an implementation for the processInformation() method:

click to expand

The sales order application creates the approval department and registers the approval handler callback with it.

When the messaging server invokes the approval department's onMessage() callback, it in turn invokes the processInformation() callback of the sales order application and informs it of whether a particular trade was approved or rejected.

Now that we have the ApprovalDepartment and Trader classes ready, we need to write a GUI component that will allow the user to interact with our system. The GUI component that we're going to write will be generic enough that it could be used both as an applet and a standalone application. Since Applet extends Panel, we create our AccountingPanel class to extend Applet.

The source code for the sales order panel is listed below:

     package pubsub.retailbrokerage;     import java.applet.*;     import java.awt.*;     import java.awt.event.*;     public class SalesOrderPanel extends Applet implements ActionListener,          ApprovalHandler {         private TextField nameField;         private TextField symbolField;         private TextField numberField;         private Button buyButton;         private Button sellButton;         private Label priceLabel;         private Label nameLabel;         private Label symbolLabel;         private Label unitPriceLabel;         private Label numberLabel;         private Label instructionField;         private Label approvalField;         private ApprovalRequestor requester;         private Trader trader; 

The init() method is used to create a new instance of the SalesOrderPanel class and also layout the GUI for our application:

     public void init() {          addComponents();          initialize();         }         public void stop() {          if (requester ! = null) {              requester.closeConnections();          }          if (trader != null) {              trader.closeConnections();          }         }         private void addComponents() {          resize(450, 400);          GridLayout layout = new GridLayout(0, 2, 10, 10);          layout.setRows(10);          this.setLayout(layout);          nameField = new TextField("Athul");          symbolField = new TextField("ECWR", 4);          numberField = new TextField("1000");          priceLabel = new Label("100.90");          buyButton = new Button("BUY");          sellButton = new Button("SELL");          nameLabel = new Label("Enter the Customer's Name :");          symbolLabel = new Label("Enter the Stock Symbol :");          unitPriceLabel = new Label ("Unit Price of Scrip (in USD) :");          numberLabel = new Label ("Enter Number of Scrips to trade :");          instructionField = new Label (".");          approvalField = new Label(".");          add(nameLabel);          add(nameField);          add(symbolLabel);          add(symbolField);          add(numberLabel);          add(numberField);          add(unitPriceLabel);          add(priceLabel);          add(buyButton);          add(sellButton);          add(instructionField);          add(approvalField);          buyButton.addActionListener(this);          sellButton.addActionListener(this);         }         private void initialize() {          try {              requester = new ApprovalRequestor(Constants.PROVIDER_URL,                                    Constants.USER_NAME,                                    Constants.PASSWORD);              trader = new Trader (Constants.PROVIDER_URL, Constants.USER_NAME,                         Constants.PASSWORD);          } catch (Exception exception) {              exception.printStackTrace();          }         }         private void getValuesAndProceed(String command) {          Instruction instruction = null;          double price = new Double(priceLabel.getText()).doubleValue();          int number = new Integer(numberField.getText()).intValue();          instruction = new Instruction(nameField.getText(),                               symbolField.getText(), command, price,                               number);          String data = command + " " + instruction.getStockNumber() + ": "                     + instruction.getStockSymbol() + " @ $"                     + instruction.getUnitPrice() + " -"                     + instruction.getCustomerName();          instructionField.setText(data);          instructionField.invalidate();          System.out.println(data);          invokeSalesOrder(instruction);         } 

The processInformation() callback method is implemented so that the approval department can invoke this method to inform the sales order panel of any approvals:

     public synchronized void processInformation(int approve,                                  Instruction instruction) {         if (approve == Constants.APPROVED) {             approvalField.setText (" *** APPROVED ***");             try {             // invoke Trader methods             trader.performTrade(instruction);            } catch (Exception exception) {             exception.printStackTrace();            }          }  // end if APPROVED          if (approve == Constants.REJECTED) {             approvalField.setText (" *** REJECTED - No Funds. ***);          }   // end if REJECTED          invalidate();         }         void invokeSalesOrder(Instruction instruction) {          int approve = Constants.REJECTED;          try {              approve = requester.getApproval(instruction);              processInformation(approve, instruction);          } catch (Exception exception) {              exception.printStackTrace();          }         } 

The SalesOrderPanel class implements the ActionListener interface and its actionPerformed() method so that whenever the user clicks on the buttons, the system responds:

     public void actionPerformed(ActionEvent actionEvent) {         if (actionEvent.getSource() == buyButton) {             getValuesAndProceed("BUY");         }         if (actionEvent.getSource() == sellButton) {             getValuesAndProceed("SELL");         }        } 

The call to System.exit() has no effect when running as an applet inside a browser:

       static class CloseAdapter extends WindowAdapter {          Applet creator;          public CloseAdapter(Applet parent) {              creator = parent;          }          public void windowClosing(WindowEvent windowEvent) {              creator.stop();              System.exit(0);          }         } 

The main() method is used when running sales order panel as a standalone Java application:

     public static void main(String[] args) {         SalesOrderPanel panel = new SalesOrderPanel();         panel.init();         Frame frame = new Frame("Retail Brokerage Sales Order");         frame.add(panel, BorderLayout.CENTER);         frame.setSize(250, 250);         frame.setLocation(300, 200);         frame.pack();         frame.show();         frame.addWindowListener(new CloseAdapter(panel));        }     } 

Compiling and Running the Examples

Before doing anything else, make sure the JMS messaging server is up and is ready to send and receive messages. If the server is not up, we will not be able to either send or receive messages. The screen text below shows how to compile and run the sample.

Both the programs take the host address of the machine on which the JMS server is running, a valid registered username on the JMS messaging server and the password as their parameters. You can specify the address of your server host in the hostname parameter. If you are running the messaging server locally, you can specify localhost as the address or you can also specify an IP:Port combination like 127 . 0. 0 .1:2001. However, if this, or the other two input parameters, are missed out then an error message is printed:

click to expand

The output of the sales order console looks like this:

click to expand

The output of the approval department console looks like this:

click to expand

The Equity Buyer

The equity buyer subscribes to the Equity:Trade topic. It filters messages so that it only receives buy or quit messages from the topic. After performing the buy, it publishes that information on the Trade:Price topic transactionally.

The equity buyer acts as both a durable subscriber (in other words, it wants to get all messages posted to the topic — not just the ones that occurred while this client was active) for the Equity:Trade topic and a publisher for the Trade:Price topic. Therefore, even while starting off, it creates a couple of connections to each of the topics that it has to interact with.

It then creates a durable subscriber for the Equity:Trade topic giving it the name EquityBuyer and also installs a message selector on the subscriber. This message selector filters messages and delivers to these subscribers only messages with instructions that involve a buy instruction or a quit message.

When a valid message with the trade instruction arrives at the topic, the JMS server passes on the filtered message to the equity buyer's message handler. At that time, the handler distinguishes between a quit message (which is sent as a text message) and a trade instruction message, which is sent as a bytes message.

Once it reads the properties from the bytes message, it reconstructs the instruction, completes the trade and provides the information by publishing the buy information packaged as a map message on the Trade:Price topic. It sends out the map message on a transactional queue and calls the commit() method to force it to go out.

However, if it receives a quit message instead of a trade instruction, it releases all the resources, unsubscribes its durable subscription with the Equity:Trade topic and quits. Before closing its connections, a text message containing the string "QUIT" in its message body is created and published on the Trade:Price topic so that all durable subscribers that see the message will unsubscribe their durable subscriptions at that time.

The equity buyer will not receive sell messages as the equity buyer only processes buy or quit messages. It is not supposed to receive any sell messages, as it is only a buyer. This is the reason for installing a message filter that filters just the buy and quit messages.

The figure overleaf shows the equity buyer class durably subscribed to the Equity:Trade topic with a message selector installed and at the same time, acting as the publisher to the Trade:Price topic, sending out transactional messages:

click to expand

The source code for the EquityBuyer class is listed below:

     package pubsub.retailbrokerage;     import javax.jms.*;     import javax.naming.*;     import common.*;     public class EquityBuyer implements MessageListener {         private TopicConnectionFactory factory;         private TopicConnection publisherConnection;         private TopicConnection subscriberConnection;         private Topic topic;         private TopicSession publisherSession;         private TopicSession subscriberSession;         private TopicPublisher publisher;         private TopicSubscriber subscriber;         private String hostName;         private String userID;         private String password;         private static final String CONTEXT_FACTORY = Constants.FACTORY;         private static final String CONNECTION_FACTORY =          Constants.CONNECTION_FACTORY;         private static final String SUBSCRIBER_TOPIC = Constants.TRADE_TOPIC;         private static final String PUBLISHER_TOPIC = Constants.PRICE_TOPIC;         public EquityBuyer(String host, String user,                            String passwd) throws NamingException {          hostName = host;          userID = user;          password = passwd;          JNDIService.init(CONTEXT_FACTORY, hostName);          factory =              (TopicConnectionFactory) JNDIService.lookup(CONNECTION_FACTORY) ;          createConnections();         } 

JMS clients that require concurrent delivery of messages can use multiple sessions. This implies that each session's listener thread runs concurrently. In effect, while the listener on one session is executing, a listener on another session may also be executing concurrently:

     private void createConnections() {          try {              // create and start a topic connection              subscriberConnection = factory.createTopicConnection();              subscriberConnection.setClientID(userID);              // create topic session on the connection just created              subscriberSession = subscriberConnection.createTopicSession(false,                   Session.AUTO_ACKNOWLEDGE);              // create and start a topic connection              publisherConnection = factory.createTopicConnection();              publisherConnection.setClientID(userID);              publisherSession = publisherConnection.createtopicSession(true,                   Session.AUTO_ACKNOWLEDGE);              publisherConnection.start();              System.out.println("Equity Buyer, up and Ready for eCommerce ...");          } catch (JMSException exception) {              exception.printStackTrace();          }         }         public void closeConnections() {          try {              sendQuitMessage();              if (publisher != null) {               publisher.close();              if (subscriber != null) {               subscriber.close();              }              if (publisherSession != null) {               publisherSession.close();              } 

An unsubscribe() method is used by the topic session for deleting a durable subscription created by its clients. This deletes the state being maintained on the messaging server on behalf of the subscriber. It is wrong for a client to delete a durable subscription when it has an active topic subscriber for it or while a message received as part of the transaction has not yet been acknowledged:

              if (subscriberSession != null) {               subscriberSession.unsubscribe("EquityBuyer");               subscriberSession.close();              }              if (subscriberConnection != null) {               subscriberConnection.close();              }              if (publisherConnection != null) {               publisherConnection.close();              }          } catch (Exception exception) {              exception.printStackTrace();          }         } 

A text message with QUIT in the body is created and published to the topic to inform all subscribers that the publisher is going away:

     private void sendQuitMessage() {         try {             // create a Bytes Message             TextMessage textMessage = publisherSession.createTextMessage();             textMessage.setText("QUIT");             // publish the message             publisher.publish(textMessage, DeliveryMode.NON_PERSISTENT,                         Message.DEFAULT_PRIORITY,                         Message.DEFAULT_TIME_TO_LIVE);             // commit the transaction             publisherSession.commit();         } catch (Exception exception) {             exception.printStackTrace();         }        } 

Install a durable subscriber with a message filter. Install a message handler and start the connection:

     public void buyEquities() {         try {             topic = (Topic) JNDIService.lookup(SUBSCRIBER_TOPIC);             // Only select those messages which have the following properties             String selector =              "instructions LIKE 'BUY' OR instructions LIKE 'QUIT"'; 

If you want to ensure absolutely guaranteed delivery of your messages under all circumstances, your messaging clients should always publish persistent messages and all your subscriptions should be durable.

Messaging clients should specify a unique name for each durable subscription that they create. The code to create a durable subscriber is reproduced below:

             // create a Durable Subscriber on the topic session             // create the durable subscriber with a given name             subscriber = subscriberSession.createDurableSubscriber(topic,                  "EquityBuyer", selector, false);             // install an asynchronous listener/callback on             // the subscriber object just created             subscriber.setMessageListener(this);             subscriberConnection.start();             } catch (JMSException exception) {                 exception.printStackTrace();             } catch (NamingException exception) {                 exception.printStackTrace();             }            }            public static void main(String[] args) {             if (args.length < 3) {                 displayUsage();             }             String hostName = args[0];             String userID = args[1];             String password = args[2];             EquityBuyer buyer = null;             try {                 buyer = new EquityBuyer(hostName, userID, password);                 buyer.buyEquities();             } catch (Exception exception) {                 exception.printStackTrace();             }            }            private static void displayUsage() {             System.out.println("\n\tUsage : java "                          + "Chapter05.RetailBrokerage.EquityBuyer "                          + "hostName UserName Password");             System.out.println("\t Terminating Program ...");             System.exit(-1);            } 

The following function is called by the message handler. It publishes information on the topic informing all subscribers of a buy:

      public void informEveryone(Instruction information) {          try {              topic = (Topic) JNDIService.lookup(PUBLISHER_TOPIC);              System.out.println("Creating Topic Publisher");              // Create a Publisher for this Topic              publisher = publisherSession.createPublisher(topic);              // Create a text message for use in the while loop              MapMessage mapMessage = publisherSession.createMapMessage();              // Set and Publish the message              mapMessage.setString("customerName",                          information.getCustomerName());              mapMessage.setString("stockSymbol", information.getStockSymbol());              mapMessage.setString("instructions",                          information.getInstructions());              mapMessage.setDouble("unitPrice", information.getUnitPrice());              mapMessage.setInt("stockNumber", information.getStockNumber());              publisher.publish(mapMessage, DeliveryMode.PERSISTENT,                          Message.DEFAULT_PRIORITY,                          Message.DEFAULT_TIME_TO_LIVE);              publisherSession.commit();          } catch (JMSException exception) {              exception.printStackTrace();          } catch (NamingException exception) {              exception.printStackTrace();          }         } 

Here is the message listener that receives messages asynchronously for the bound receiver:

     public void onMessage(Message message) {         try {             // When a message is received, print it out to             // standard output             if (message instanceof TextMessage) {              TextMessage textMessage = (TextMessage) message;              String stringData = textMessage.getText();              System.out.println(stringData + "instruction received ...");              //If "QUIT" instruction received, close everything              if (stringData.equalsIgnoreCase("quit") == true) {                  closeConnections();              }             }             if (message instanceof BytesMessage) {              BytesMessage msg = (BytesMessage) message;              Instruction instruction = null;              instruction =                  new Instruction(msg.getStringProperty("customerName"),                            msg.getStringProperty("stockSymbol"),                            msg.getStringProperty("instructions"),                            msg.getDoubleProperty("unitPrice"),                            msg.getIntProperty ("stockNumber"));              String information = "Bought " + instruction.getStockNumber()                             + " shares of "                             + instruction.getStockSymbol()                             + " at a Price of "                             + instruction.getUnitPrice()                             + " in the name of "                             + instruction.getCustomerName();              System.out.println(information);              // pass the information of the "BUY" to all Subscribers              informEveryone(instruction);             }         } catch (JMSException exception) {             exception.printStackTrace();         }        }     } 

The Accounts Department

The accounts department subscribes durably to the Trade:Price topic and receives notification of stocks traded by the trading department.

The AccountsDepartment class subscribes durably to the Trade:Price topic with the name AccountsDepartment. It then installs an asynchronous message handler on this subscriber.

When a message is received on the topic, the server invokes the message handler's callback function with the message passed in. If the message was a map message, the class constructs an Instruction object out of the body retrieved from the message and sends it off to update account details. This in turn invokes all the trade handler interface method processInformation() with the Instruction object.

While closing the connections, the class unsubscribes from its durable subscription, before closing other resources.

The figure below shows both the EquityBuyer and the AccountsDepartment class:

click to expand

The complete source code for the AccountsDepartment class is shown below:

     package pubsub.retailbrokerage;     import javax.jms.*;     import javax.naming.*;     import java.util.*;     import common.*;     public class AccountsDepartment implements MessageListener {         private TopicConnectionFactory factory;         private TopicConnection connection;         private Topic topic;         private TopicSession session;         private TopicSubscriber subscriber;         private String hostName;         private String userID;         private String password;         private TradeHandler caller;         private static final String CONTEXT_FACTORY = Constants.FACTORY;         private static final String CONNECTION_FACTORY =          Constants.CONNECTION_FACTORY;         private static final String SUBSCRIBER_TOPIC = Constants.PRICE_TOPIC;         public AccountsDepartment(String host, String ID, String passwd) {          hostName = host;          userID = ID;          password = passwd;         }         public AccountsDepartment(TradeHandler callbackImplementor) {          caller = callbackImplementor;          hostName = Constants.PROVIDER_URL;          userID = Constants.USER_NAME;          password = Constants.PASSWORD;         }         void initialize() {          try {              JNDIService.init(CONTEXT_FACTORY, hostName);              factory =               (TopicConnectionFactory) JNDIService.lookup (CONNECTION_FACTORY);              topic = (Topic) JNDIService.lookup(SUBSCRIBER_TOPIC);          } catch (Exception exception) {              exception.printStackTrace();          }         }         public void createConnections() {          try {              initialize();              // Create and start a Topic connection              connection = factory.createTopicConnection();              connection.setClientID("Accounts_Department");              // Create a Topic session on this connection              session = connection.createTopicSession(false,                                       Session.AUTO_ACKNOWLEDGE);              // Create a Durable Subscriber              subscriber = session.createDurableSubscriber(topic,                   "AccountsDepartment"); 

An asynchronous listener/callback on the Receiver object just created:

         // Install an asynchronous listener/callback on the Receiver               // object just created               subscriber.setMessageListener(this);               connection.start();               System.out.println("Accounts Department up and Ready for eCommerce ...");          } catch (Exception exception) {              exception.printStackTrace();          }         }         private static void displayUsage() {          System.out.println("\n\tUsage : java "                       + "Chapter05.RetailBrokerage.AccountsDepartment "                       + "hostName UserName Password");          System.out.println("\t Terminating Program ...");          System.exit(-1);         }         public void closeConnections() {          try {              if (subscriber != null) {               subscriber.close();              }              if (session != null) {               session.unsubscribe("AccountsDepartment");               session.close();              }              if (connection != null) {               connection.close();              }          } catch (Exception exception) {              exception.printStackTrace();          }         } 

The updateAccountDetails() method informs everyone about the buy by printlng out the information and invoking the registered callback handler to inform listeners too:

       public void updateAccountDetails(Instruction information) {         String data = "Bought " + information.getStockNumber()                    + "shares of " + information.getStockSymbol()                    + "at a Price of " + information.getUnitPrice()                    + " in the name of " + information.getCustomerName();         System.out.println(data);         // inform any other registered callback handlers         if (caller != null) {             caller.processInformation(information);         }         } 

This is the message listener that receives messages asynchronously for the bound receiver:

       public void onMessage(Message message) {          try {              // When a message is received, print it out to              // standard output              if (message instanceof TextMessage) {               TextMessage textMessage = (TextMessage) message;               String stringData = textMessage.getText();               System.out.println(stringData + " instruction received ...");               // if a "QUIT" message was received close connections               if (stringData.equalsIgnoreCase("quit") == true) {                   closeConnections();               }              }              if (message instanceof MapMessage) {               MapMessage mapMessage = (MapMessage) message;               Instruction information = null;               information =                   new Instruction(mapMessage.getString("customerName"),                             mapMessage.getString("stockSymbol"),                             mapMessage.getString("instructions"),                             mapMessage.getDouble("unitPrice"),                             mapMessage.getlnt("stockNumber"));               // inform everyone about the stock trade               updateAccountDetails(information);              }          } catch (Throwable exception) {              exception.printStackTrace();          }         }         public static void main(String[] args) {          if (args.length < 3) {              AccountsDepartment.displayUsage();          }          String hostName = args[0];          String userID = args[1]          String password = args[2];          AccountsDepartment accounts = new AccountsDepartment(hostName,               userID, password);          accounts.createConnections();         }     } 

The TradeHandler Interface

Classes that need to implement a callback for the AccountsDepartment class use the TradeHandler interface. The TradeHandler interface has a method called processInformation() that needs to be implemented by whichever class has to be called back when a Trade happens and be informed of its details.

     package pubsub.retailbrokerage;     public interface TradeHandler {         public void processInformation(Instruction information);     } 

We will see more about this interface when we discuss an applet implementation of the retail stock brokerage application.

The AccountingPanel

The AccountingPanel applet creates the EquityBuyer and the AccountsDepartment classes and displays the output of the AccountsDepartment class. This class is very similar to the sales order application, in that it can be deployed as both a standalone application and as an applet.

The AccountingPanel applet implements the TradeHandler interface. This interface contains a method called processInformation() that takes an Instruction object containing the details of the trade. The AccountingPanel applet creates an implementation for the processInformation() method:

click to expand

The AccountingPanel applet creates an instance of the accounts department and registers the trade handler callback with it. When the messaging server invokes the accounts department's onMessage() callback, it in turn invokes the processInformation() callback of the accounts department applet and about the details of the trade.

The source code for the AccountingPanel applet is as follows:

     package pubsub.retailbrokerage;     import java.applet.*;     import java.awt.*;     import java.awt.event.*;     public class AccountingPanel extends Applet implements TradeHandler {         private Label label;         private TextArea textArea;         public void init() {          addComponents();          String hostName = Constants.PROVIDER_URL;          String userID = Constants.USER_NAME;          String password = Constants.PASSWORD;          EquityBuyer buyer = null;          AccountsDepartment accounts = null;          try {              buyer = new EquityBuyer(hostName, userID, password);              buyer.buyEquities();              accounts = new AccountsDepartment(this);              accounts.createConnections();          } catch (Exception exception) {              exception.printStackTrace();          }         }         public synchronized void processInformation(Instruction information) {          String command = "";          if (information.getInstructions().equalsIgnoreCase("BUY")) {              command = "Bought";          }          if (information.getInstructions().equalsIgnoreCase("SELL")) {              command = "Sold";          }          String data = command + " " + information.getstockNumber()                     + " shares of " + information.getStockSymbol()                     + " at (USD) $" + information.getUnitPrice() + " for "                     + information.getCustomerName() + "\n";          textArea.append(data);         }         private void addComponents() {          resize(450, 400);          this.setLayout(new BorderLayout());          textArea = new TextArea(5, 40);          textArea.setEnabled(false);          label = new Label("Retrieving Messages from the "                      + "Trade:Price Buy Queue");          add(label, "North");          add(textArea, "Center");         }    // end addComponents         static class CloseAdapter extends WindowAdapter {          public void windowClosing(WindowEvent windowEvent) {              System.exit(0);          }         }         public static void main(String[] args) {          AccountingPanel panel = new AccountingPanel();          panel.init();          Frame frame = new Frame("Retail Brokerage Accounting Department");          frame.add(panel, BorderLayout. CENTER);          frame.setSize(250, 250);          frame.setLocation(300, 200);          frame.pack();          frame.show();          frame.addWindowListener(new CloseAdapter());         }     } 

Compiling and Running the Accounts Department Example

Before doing anything else, make sure the JMS messaging server is up and is ready to send and receive messages. If the server is not up, we will not be able to either send or receive messages.

Both the programs take the host address of the machine on which the JMS server is running, a valid registered username on the JMS messaging server and the password as their parameters. You can specify the address of your server host in the hostname parameter. If you are running the messaging server locally, you can specify localhost as the address or you can also specify an IP:Port combination like 127.0.0.1:2001.

The results of compiling all the programs and running them are shown below:

click to expand

The output of the command console looks like this:

click to expand

The output of the accounts department looks like this:

click to expand



Professional JMS
Professional JMS
ISBN: 1861004931
EAN: 2147483647
Year: 2000
Pages: 154

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net