PublishSubscribe Using MSMQ Pragmatic Multicasting


Publish/Subscribe Using MSMQ Pragmatic Multicasting

Version 3 of Microsoft Message Queuing (MSMQ), a technology provided free of charge with Microsoft Windows operation systems, added support for the pragmatic multicasting (PGM) protocol. As shown in Figure 10.3, a nontransactional queue can be associated with a PGM address, and any number of queues can be associated with the same PGM address.

Figure 10.3. Associating a PGM address with an MSMQ queue.


As Anand Rajagopalan points out, this new facility of MSMQ provides a simple way of doing publish/subscribe with pull-style notification (Rajagopalan 2005). A publisher can direct publication messages to a PGM address via MSMQ, which will result in those messages being added to all the subscriber queues associated with that address. Subscribers can then pull the messages from their respective queues. Because, as Rajagopalan further points out, the Windows Communication Foundation provides the MsmqIntegrationBinding for exchanging messages with MSMQ applications, this way of doing publish/subscribe can also be implemented with the Windows Communication Foundation:

  1. Open the solution C:\WCFHandsOn\PublishSubscribe\MSMQPragmaticMulticasting\MSMQPragmaticMulticasting.sln.

    The solution consists of four projects:

    • The Order project is for building a class library with a class called PurchaseOrder.

    • The Publisher project provides a console application that publishes information about incoming purchase orders to a PGM address via MSMQ, using the Windows Communication Foundation's MsmqIntegrationBinding.

    • SubscriberOne and SubscriberTwo are both console applications that subscribe to notifications of incoming purchase orders, using the Windows Communication Foundation's MsmqIntegrationBinding to pull the notifications from queues associated with the PGM address to which the Publisher sends the notifications.

  2. Look at the PurchaseOrder class in the Order.cs module of the Order project in the MSMQPragmaticMulticasting project, reproduced in Listing 10.1. The class claims to be serializable by having the Serializable attribute. It overrides the ToString() method of the base class, Object, to provide an informative representation of itself as a string. It will be instances of this class that the publisher in this solution will be sending to the subscribers.

    Listing 10.1. Notification Class

    [Serializable] public class PurchaseOrder {     public string orderIdentifier;     public string customerIdentifier;     public PurchaseOrderLineItem[] orderLineItems;     private OrderStates orderStatus;     public float TotalCost     {         get         {             float totalCost = 0;             foreach (PurchaseOrderLineItem lineItem in orderLineItems)                 totalCost += lineItem.TotalCost;             return totalCost;         }    }     public OrderStates Status     {         get         {             return orderStatus;         }         set         {             orderStatus = value;         }     }     public override string ToString()     {         StringBuilder buffer =             new StringBuilder("Purchase Order: " +         orderIdentifier + "\n");         buffer.Append("\tCustomer: " + customerIdentifier + "\n");         buffer.Append("\tOrderDetails\n");         foreach (PurchaseOrderLineItem lineItem in orderLineItems)         {             buffer.Append("\t\t" + lineItem.ToString());         }         buffer.Append("\tTotal cost of this order: $" + TotalCost + "\n");         buffer.Append("\tOrder status: " + Status + "\n");         return buffer.ToString();     } }

  3. Examine the IOrderSubscriber interface in the Publisher.cs module of the Publisher project, and in the Subscriber.cs module of the SubscriberOne project:

    [ServiceContract(Namespace = "http://Microsoft.ServiceModel.Samples")] [KnownType(typeof(PurchaseOrder))] public interface IOrderSubscriber {     [OperationContract(IsOneWay = true, Action = "*")]     void Notify(MsmqMessage<PurchaseOrder> message); }

    This .NET interface is designated as a Windows Communication Foundation service contract by the ServiceContract attribute. It includes a single operation, Notify(), that accepts a single parameter of the type MsmqMessage<PurchaseOrder>. MsmqMessage<T> is a generic type provided by the Windows Communication Foundation for which any serializable type can serve as the type argument. It allows data to be marshaled in and out of MSMQ messages sent or received via the MSMQ integration binding.

    In Chapter 4, "Security," it was explained that the value of the Action parameter of the OperationContract attribute is used to correlate messages with operations. A value usually does not have to be provided for that parameter, because the Windows Communication Foundation automatically and invisibly supplies appropriate default values.

    However, the value "*" is provided for the Action parameter of the OperationContract attribute on the IOrderSubscriber contract's Notify() operation. Specifying Action="*" as the parameter to the OperationContract attribute signifies that the operation with that attribute is the unmatched message handler, which means that operation will be used to process all messages not matched with another operation. All messages received via the MSMQ integration binding are dispatched to the unmatched message handler of the receiving service. In this case all such messages will be dispatched to the method by which the IOrderSubscriber contract's Notify() operation is implemented.

  4. Study the static Main() method of the Publisher class in the Publisher.cs module of the Publisher project:

    static void Main(string[] args) {     [...]     PurchaseOrder order = new PurchaseOrder();     order.customerIdentifier = "somecustomer.com";     order.orderIdentifier = Guid.NewGuid().ToString();     PurchaseOrderLineItem firstLineItem = new PurchaseOrderLineItem();     [...]     PurchaseOrderLineItem secondLineItem = new PurchaseOrderLineItem();     [...]     order.orderLineItems =         new PurchaseOrderLineItem[] {firstLineItem,    secondLineItem };     IOrderSubscriber proxy =         new ChannelFactory<IOrderSubscriber>(               "OrderPullPoint").CreateChannel();     proxy.Notify(new MsmqMessage<PurchaseOrder>(order));     ((IChannel)proxy).Close();     [...] }

    The method sends notification of a purchase order to the subscribers using a proxy that is obtained in a customary way, using the Windows Communication Foundation's ChannelFactory<T> generic. The Publisher code simply invokes the proxy's Notify() operation, passing an instance of MsmqMessage<PurchaseOrder> created from the purchase order about which it wants to notify the subscribers.

  5. Look at the configuration of the Publisher in the App.Config file of the Publisher project to see the OrderPullPoint configuration referred to in the construction of the proxy:

    <?xml version="1.0" encoding="utf-8" ?> <configuration>     <system.serviceModel>         <client>             <endpoint name="OrderPullPoint"                       address="msmq.formatname:MULTICAST=224.0.255.1:80"                       binding="msmqIntegrationBinding"                       bindingConfiguration="OrderPublicationBinding"                       contract                 ="Microsoft.ServiceModel.Samples.IOrderSubscriber">             </endpoint>         </client>         <bindings>             <msmqIntegrationBinding>                 <binding name=" OrderPublicationBinding" exactlyOnce="false" >                     <security mode="None" />                 </binding>             </msmqIntegrationBinding>         </bindings>     </system.serviceModel> </configuration>

    That configuration selects the Windows Communication Foundation's standard MsmqIntegrationBinding as the binding to use in publishing the service. The settings of that standard binding are modified so as to not require the assurance of messages being delivered exactly once. That assurance, which is provided by default by the MsmqIntegrationBinding, is not possible in this case, because the destination queues are not transactional queues. They are not transactional queues because MSMQ queues associated with PGM addresses cannot be transactional.

    The address provided as the destination of the messages is msmq.formatname:MULTICAST=224.0.255.1:80. In that address, msmq is the scheme associated with the MSMQ-integration transport protocol by the MSQM integration binding. The expression formatname:MULTICAST signifies that the destination for messages is to be identified by a PGM address. The PGM address given is 224.0.255.1. The component 80 of the address is a port number.

  6. Compare the configuration of the Publisher with the configuration of a subscriber, such as the configuration of the first subscriber, in the App.Config file of the SubscriberOne project:

    <?xml version="1.0" encoding="utf-8" ?> <configuration>     <appSettings>         <add key="orderQueueName" value=".\private$\WCFHandsOnOne" />     </appSettings>     <system.serviceModel>     <services>       <service         type="Microsoft.ServiceModel.Samples.OrderSubscriber">         <endpoint address="msmq.formatname:DIRECT=OS:.\private$\WCFHandsOnOne"                   binding="msmqIntegrationBinding"                   bindingConfiguration="OrderSubscriptionBinding"                   contract="Microsoft.ServiceModel.Samples.IOrderSubscriber">         </endpoint>       </service>     </services>     <bindings>       <msmqIntegrationBinding>         <binding name="OrderSubscriptionBinding" exactlyOnce="false" >           <security mode="None" />         </binding>       </msmqIntegrationBinding>     </bindings>   </system.serviceModel > </configuration>

    The subscriber configuration defines the configuration of a Windows Communication Foundation service that receives messages via MSMQ. The selection and configuration of the binding corresponds exactly with the selection and configuration of the binding for the publisher. Whereas the address provided as the destination of the publisher's messages was a PGM address, the address provided as the source of messages for the subscriber service is the name of an MSMQ queue associated with that PGM address.

  7. Examine the static Main() method of the OrderSubscriber class of one of the subscribers in the Subscriber.cs module of the SubscriberOne project:

    public static void Main() {     string queueName = ConfigurationManager.AppSettings["orderQueueName"];     if (!(MessageQueue.Exists(queueName)))     {         MessageQueue.Create(queueName);         MessageQueue queue = new MessageQueue(queueName);         queue.MulticastAddress =             ConfigurationManager.AppSettings["multicastAddress"];     }     using (ServiceHost serviceHost = new ServiceHost(typeof(OrderSubscriber)))     {         serviceHost.Open();         Console.WriteLine("The service is ready.");         Console.WriteLine("Press any key to terminate the service.");         Console.ReadLine();         serviceHost.Close();     } }

    The method creates the queue that serves as the subscriber's pull-point if it does not already exist. In creating the queue, it associates the queue with the PGM address to which the publisher directs its messages.

    An instance of the OrderSubscriber class, which implements the IOrderSubscriber service contract, is then loaded into an application domain using an instance of the Windows Communication Foundation's ServiceHost class. Then the Open() method of the ServiceHost instance is invoked, whereupon the Windows Communication Foundation's channel layer will begin watching for messages delivered to the queue specified in the subscriber's configuration file. Such messages will be dispatched, by the Windows Communication Foundation, to the implementation of the unmatched message handler, the Notify() operation, of the IOrderSubscriber service contract.

  8. Look at the OrderSubscriber class's implementation of the Notify() operation of the IOrderSubscriber contract:

    public void Notify(MsmqMessage<PurchaseOrder> message) {      PurchaseOrder order = (PurchaseOrder)message.Body;      Random statusIndexer = new Random();      order.Status = (OrderStates)statusIndexer.Next(3);      Console.WriteLine("Processing {0} ", order); }

    Recall that the Notify() operation is designated as the unmatched message handler of the IOrderSubscriber contract, and also that all messages received via the MSMQ integration binding are dispatched to the method that implements the unmatched message handler. In this case, that method is the Notify() method of the OrderSubscriber class. The received messages are dispatched to the Notify() method as instances of the MsmqMessage<PurchaseOrder> type, from which instances of the PurchaseOrder class are extracted with this simple statement:

    PurchaseOrder order = (PurchaseOrder)message.Body;

  9. Start debugging the MSMQPragmaticMulticastingSolution. Console windows for the two subscriber applications should appear, as well as the console window of the publisher.

  10. When there is activity in both of the subscriber application's console windows, enter a keystroke into the console window of the publisher. The results should appear as shown in Figure 10.4. Notifications of incoming purchase orders are published to the subscriber's pull-points by the publisher, from which they are retrieved by the subscribers.

    Figure 10.4. Publish/Subscribe using MSMQ PGM.

  11. Stop debugging the application.

Generally, when Windows Communication Foundation applications send messages to other Windows Communication Foundation applications via MSMQ queues, one uses the Windows Communication Foundation's standard NetMsmqBinding, rather the MsmqIntegrationBinding. The NetMsmqBinding has the virtue of being more flexible, not requiring messages to be sent and received in the form of instances of MsmqMessage<T> types, and also allowing messages to be dispatched to operations other than the unmatched message handler. Usually, one must only resort to using the MsmqIntegrationBinding when a Windows Communication Foundation application must communicate with a nonWindows Communication Foundation application via MSMQ. In this case, however, all the applications communicating via MSMQ are Windows Communication Foundation applications, so what is the reason for using the MsmqIntegrationBinding rather than the NetMsmqBinding? The reason is that the implementation of the PGM protocol in MSMQ represents, in effect, a nonWindows Communication Foundation application interposed between the Windows Communication Foundation applications.




Presenting Microsoft Communication Foundation. Hands-on.
Microsoft Windows Communication Foundation: Hands-on
ISBN: 0672328771
EAN: 2147483647
Year: 2006
Pages: 132

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net