PublishSubscribe Using Streaming


Publish/Subscribe Using Streaming

In using either callback contracts or MSMQ PGM to do publish/subscribe with the Windows Communication Foundation, there is the shortcoming of incurring the cost of sending an entire message with each notification from the publisher to the subscribers. That price is more acceptable when the size of the notification in proportion to the total size of the messages is larger, and when notifications are required less frequently. However, the requirement to publish frequent notifications of small items of information is commonplace. One can use the Windows Communication Foundation's streamed transfer mode to avoid having to create an entire message for each notification in such cases.

The Streamed Transfer Mode

The Windows Communication Foundation uses a buffered transfer mode by default. That means that the entire contents of an outgoing message must have been written into a buffer before the message is sent, and that the entire contents of an incoming message must be read from a buffer before the message is dispatched for processing. However, the Windows Communication Foundation provides the option of a streamed transfer mode by which the content of an incoming message may be dispatched for processing by the receiver even before the entire content of the message has been formulated by the source:

  1. Open the solution C:\WCFHandsOn\PublishSubscribe\Streaming\Streaming.sln. It consists of two projects. The Client project is for building a Windows Forms application that displays an image retrieved from a Windows Communication Foundation service. That service is built using the other project in the solution, called Service.

  2. Examine the interface IPictureServer in the Program.cs module of the Service project. It is designated as a Windows Communication Foundation service contract, of which the only notable feature is that its sole operation, GetPicture(), is defined as returning a Stream object:

    [ServiceContract] public interface IPictureServer {     [OperationContract]     Stream GetPicture(string pictureName); }

  3. Look at the PictureServer class, which is a service type that implements the IPictureServer contract. It returns the image requested by a client as a FileStream object:

    internal class PictureServer: IPictureServer {     Stream IPictureServer.GetPicture(string pictureName)     {         try         {            return new FileStream(pictureName, FileMode.Open);         }         catch (Exception)         {             return null;         }     } }

  4. See how the service is configured in the App.Config file of the PictureService project:

    <?xml version="1.0" encoding="utf-8" ?> <configuration>     <system.serviceModel>         <services>             <service type="Server.PictureServer">                 <endpoint   address="http://localhost:8000/Picture/Server"                            binding="basicHttpBinding"                            bindingConfiguration="StreamedHttp"                            contract="Server.IPictureServer,Server"/>             </service>         </services>         <bindings>             <basicHttpBinding>                 <binding                     name="StreamedHttp"                     transferMode="StreamedResponse"                     maxMessageSize="9223372036854775807"/>             </basicHttpBinding>         </bindings>     </system.serviceModel> </configuration>

    The standard Windows Communication Foundation BasicHttpBinding is selected for the service, but the value of the transfer mode property of that binding is set to StreamedResponse. Note that the value of the maxMessageSize property is set to a very large number, which happens to be the maximum value.

  5. Examine the client application's use of the GetPicture() operation of the service in the RetrievePicture() method of the MainForm.cs module of the Client project:

    private void RetrievePicture(object state) {     if (this.InvokeRequired)     {         IPictureServer pictureServer =             new ChannelFactory<IPictureServer>("PictureServer")                 .CreateChannel();         Stream pictureStream =             pictureServer.GetPicture(                 ConfigurationManager.AppSettings["PictureName"]);         ((IChannel)pictureServer).Close();         this.Invoke(             new RetrievePictureDelegate(             this.RetrievePicture),new object[]{pictureStream});     }     else     {        Bitmap bitMap = new Bitmap((Stream)state);        this.Picture.Image = bitMap;     } }

    The Stream object retrieved from the service via the GetPicture() operation is marshaled onto the user interface thread. Then it is displayed in the PictureBox control of the client application's form.

  6. Start debugging the application. The console window of the service should appear, along with the client application's form.

  7. When there is activity in the console window of the service, click the Get the Picture! button on the client application's form. After a moment, a picture, retrieved from the service, should appear on the client application's form, as shown in Figure 10.5.

    Figure 10.5. Retrieving a picture from a service using the streamed transfer mode.

  8. Stop debugging the application.

  9. Alter the App.Config file of the Service project as shown in Listing 10.2, thereby adding the mechanism introduced in Chapter 2 for logging the messages sent and received by the service.

    Listing 10.2. Streamed Transfer Configuration with Message Logging

    <?xml version="1.0" encoding="utf-8" ?> <configuration>     <system.diagnostics>         <sources>             <source                 name="System.ServiceModel.MessageLogging"                 switchValue="Verbose">                 <listeners>                     <add                         name="xml"                         type="System.Diagnostics.XmlWriterTraceListener" initializeData="C:\WCFHandsOn\PublishSubscribe\Streaming\message.log" />                 </listeners>             </source>         </sources>         <trace autoflush="true" />     </system.diagnostics>      <system.serviceModel>          <diagnostics>              <messageLogging logEntireMessage="true"                              maxMessagesToLog="300"                              logMessagesAtServiceLevel="false"                              logMalformedMessages="true"                              logMessagesAtTransportLevel="true" />          </diagnostics>          <services>              <service type="Server.PictureServer">                  <endpoint    address="http://localhost:8000/Picture/Server"                              binding="basicHttpBinding                              bindingConfiguration="StreamedHttp"                              contract="Server.IPictureServer,Server/>              </service>          </services>          <bindings>              <basicHttpBinding>                  <binding                      name="StreamedHttp"                      transferMode="StreamedResponse"                      maxMessageSize="9223372036854775807"/>              </basicHttpBinding>          </bindings>      </system.serviceModel> </configuration>

  10. Now repeat steps 6, 7, and 8.

  11. Open the file C:\WCFHandsOn\PublishSubscribe\Streaming\Mesage.log.

    That file should contain the following record of the message by which the service replied to the client, showing that the stream object was incorporated in the body of a SOAP message:

    <s:Envelope     xmlns:a="http://schemas.xmlsoap.org/ws/2004/08/addressing"     xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">     <s:Header>         <a:Action s:mustUnderstand="1">         http://tempuri.org/IPictureServer/GetPictureResponse</a:Action>         <a:To s:mustUnderstand=1>         http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous         </a:To>         </s:Header>         <s:Body>... stream ...</s:Body> </s:Envelope>

  12. Alter the App.Config file of the Client project to omit the adjustment to the maximum message size of the binding:

    <?xml version="1.0" encoding="utf-8" ?> <configuration>         <appSettings>             <add key="PictureName" value="Pony.jpg"/>         </appSettings>         <system.serviceModel>         <client>             <endpoint name="PictureServer"                       address="http://localhost:8000/Picture/Server"                       binding="basicHttpBinding"                       bindingConfiguration="StreamedHttp"                       contract="Client.IPictureServer,Client"/>        </client>        <bindings>            <basicHttpBinding>                <binding name="StreamedHttp"                    transferMode="StreamedResponse"                />            </basicHttpBinding>        </bindings>     </system.serviceModel> </configuration>

  13. Repeat, once again, steps 6, 7, and 8.

On this occasion, the client application should throw an exception, as shown in Figure 10.6.

Figure 10.6. Exceeding the maximum message size.


This solution has demonstrated how to select the streamed transfer mode for the BasicHttpBinding. It has also shown that one can transmit a Stream object using the Windows Communication Foundation. By logging and examining the messages transmitted by the service, it became apparent that, by default, the Windows Communication Foundation sends streams embedded within SOAP messages. When the size of the stream causes the size of the message in which it is embedded to exceed the maximum message size of the binding, a trappable error is thrown.

However, the effect of the streamed transfer mode has remained mostly invisible. It has not yet been made apparent that the initial content of the stream was available to the client before the entire content of the stream was received.

Most important, this crucial line of code by which the service returned the stream to the client,

return new FileStream(pictureName, FileMode.Open);


does not reveal how individual data items can be sent progressively via a stream. That is what would be required in order to implement publish/subscribe using the Windows Communication Foundation's streamed transfer mode.

Transmitting a Custom Stream with the Streamed Transfer Mode

To see how individual data items can be fed through a stream, follow these steps:

1.

Open the solution C:\WCFHandsOn\PublishSubscribe\CustomStream\CustomStream.sln. It consists of two projects. The Client project is for building a console application that retrieves an image from a Windows Communication Foundation service. The service is built using the other project in the solution, called Service.

2.

Examine the interface IPictureServer in the Program.cs module of the Service project. It represents the same service contract that was used previously, with a single operation, GetPicture(), that returns a stream object:

[ServiceContract] public interface IPictureServer {     [OperationContract]     Stream GetPicture(string pictureName); }


3.

See, however, that the PictureServer class that implements the IPictureServer contract is slightly altered from the earlier version. This time the stream that it returns is an instance of the CustomStream() class:

internal class PictureServer: IPictureServer {     Stream IPictureServer.GetPicture(string pictureName)     {         try         {              CustomStream customStream = new CustomStream(pictureName);              return customStream;         }         catch (Exception)         {              return null;         }     } }


4.

Study the definition of the CustomStream class in the CustomStream.cs module of the Service project (see Listing 10.3).

Listing 10.3. A Custom Stream Class

public class CustomStream: Stream {     private string backingStore = null;     private FileStream backingStream = null;     private bool initialRead = true;     private DateTime startRead;     private long totalBytes = 0;     private CustomStream()     {     }     public CustomStream(string fileName)     {         this.backingStore = fileName;     }     [...]     public override int Read(byte[] buffer, int offset, int count)     {         TimeSpan duration;         if (this.initialRead)         {             this.startRead = DateTime.Now;             this.initialRead = false;         }         else         {             Thread.Sleep(100);         }         Console.WriteLine(string.Format(             "Reading {0} bytes from backing store.", count));         if (this.backingStream == null)         {             this.backingStream = new FileStream(                 this.backingStore,                 FileMode.Open);         }         int bytesRead = this.backingStream.Read(buffer, offset, count);         if (bytesRead <= 0)         {             this.backingStream.Close();         }     this.totalBytes += bytesRead;             duration = (DateTime.Now - this.startRead);             Console.WriteLine(                 "Sent {0} bytes in {1}:{2}.",                  this.totalBytes,                  duration.Seconds,                  duration.Milliseconds);             return bytesRead;     }     [...] }

The CustomStream class derives from the abstract Stream class. Although it is required to override all the latter's abstract methods, it really only provides a substantive override for the Read() method. What the CustomStream class's Read() method does is return a chunk of the image requested by the client application, the maximum size of the chunk being specified by a parameter passed to the Read() method.

5.

Start debugging the application. The console window of the service application should appear, followed by the console window of the client application.

6.

When there is activity in the console window of the service application, enter a keystroke into the console window of the client application. The results should be as shown in Figure 10.7: As chunks of the image requested by the client are still being retrieved from the CustomStream object within the service, the chunks already transmitted to the client are being retrieved from the CustomStream object within the client.

Figure 10.7. Using the streamed transfer mode with a custom stream class.


7.

Stop debugging the solution.

This makes the effect of the streamed transfer mode vividly apparent. In response to a single request from a client, data is being transmitted to the client in chunks. The chunks received by the client are immediately available for processing, before all the chunks have been sent by the service.

In the service's implementation of the operation used by the client to retrieve the picture,

internal class PictureServer: IPictureServer {     Stream IPictureServer.GetPicture(string pictureName)     {        try        {            CustomStream customStream = new CustomStream(pictureName);            return customStream;        }        catch (Exception)        {            return null;        }     } }


this single line of code,

return customStream;


causes the Windows Communication Foundation to send the initial parts of the response message to the client,

<s:Envelope     xmlns:a="http://schemas.xmlsoap.org/ws/2004/08/addressing"     xmlns:s="http://schemas.xmlsoap.org/soap/envelope/">     <s:Header>         <a:Action s:mustUnderstand="1">         http://tempuri.org/IPictureServer/GetPictureResponse</a:Action>         <a:To s:mustUnderstand="1">         http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous         </a:To>         </s:Header>         <s:Body>


and then calls the Read() method of the stream iteratively, requesting up to one kilobyte of data from it on each iteration. Each chunk of data retrieved in that manner is transmitted to the client:

... stream ...


When the Read() method returns zero bytes, the Windows Communication Foundation closes the stream and transmits the remainder of the message:

                       </s:Body> </s:Envelope>


Implementing Publish/Subscribe Using the Streamed Transfer Mode and a Custom Stream

Now it should be evident how to use the Windows Communication Foundation's streamed transfer mode to implement publish/subscribe. When the data to be published consists of small data items, and the subscribers require notifications with minimal delay, the publisher can send a stream to each of its subscribers using the Windows Communication Foundation's streamed transfer mode. The streams should be custom streams. The Windows Communication Foundation will invoke the Read() methods of the custom streams iteratively, requesting kilobytes of data to transmit to the subscribers. If the custom stream objects have updates available, they can provide those to the Windows Communication Foundation to publish to the subscribers. If no updates are available, the Read() methods of the streams can sleep until updates occur, or until some configurable timeout expires. If the timeout expires, zero bytes can be returned to the Windows Communication Foundation, which will close the stream. The subscriber can then choose to renew the subscription. The publisher buffers updates pertinent to the subscriber for a configurable period so that if the subscription is renewed, updates that occurred between the closing of the initial stream and the renewal of the subscription can be sent to the subscriber immediately upon the renewal.

If updates continue to be available, so that the custom streams continue to make data available to the Windows Communication Foundation as it iteratively calls their Read() the maximum sizes for the messages into which the Windows Communication Foundation is embedding the data retrieved from the custom streams will eventually be exceeded. So there should be logic in the custom streams that detects when the maximum message size is about to be exceeded. That logic will have the Windows Communication Foundation close the current stream and then immediately open a new stream to the subscriber.

All of these capabilities are implemented in a reusable library called StreamingPublicationSubscription that is included in the solution C:\WCFHandsOn\PublishSubscribe\StreamedPublishSubscribe\StreamedPublishSubcribe.sln. The key classes that it provides are the BufferedSubscriptionManager class and the NotificationStreamWriter class. The former is programmed to buffer data items to which subscriptions have been received for configurable periods, whereas the latter is programmed to retrieve data items from the BufferedSubscriptionManager and make them available to the Windows Communication Foundation. The NotificationStreamReader class is programmed to read the streams output by the NotificationStreamWriter class.

To see these classes in action, and to understand how to use them to implement publish/subscribe solutions, follow these steps:

1.

Open the solution C:\WCFHandsOn\PublishSubscribe\StreamedPublishSubscribe\StreamedPublishSubcribe.sln. Besides the project for building the StreamingPublicationSubscription library, the solution also has the Subscriber project, for building a subscriber console application, and the PublisherServiceHost project, for building a console application to host the publisher built from the PublisherService project.

2.

Examine the ISubscriber interface in the ISubscriber.cs module of the PublisherService project. That interface defines the service contract that all subscribers are expected to implement. It defines a single operation, Notify(), that takes a Stream object as a parameter:

[ServiceContract] public interface ISubscriber {      [OperationContract(IsOneWay=true)]      void Notify(Stream stream); }


3.

Look at the Subscribe() method of the PublisherService class in the PublisherService.cs module of the PublisherService project. That method, shown in Listing 10.4, executes when subscribers submit subscription requests. After validating the subscription and the subscriber, the method invokes the Activate() method of the PublishingAgent class on a background thread.



Listing 10.4. Method for Processing Subscription Requests

void IPublisher.Subscribe(   KnownDataPoint[] dataPoints,   out bool subscriptionAccepted) {     subscriptionAccepted = false;     string dataPointIdentifier = null;     if (dataPoints.Length == 1)     {         dataPointIdentifier = dataPoints[0].Identifier;         this.ValidateDataPoint(dataPointIdentifier, out subscriptionAccepted);     }     string configuration = null;     if(subscriptionAccepted)     {         this.ValidateSubscriber(         OperationContext.Current.ServiceSecurityContext.WindowsIdentity.Name,             out subscriptionAccepted,             out configuration);     }     if (subscriptionAccepted)     {         ThreadPool.QueueUserWorkItem(             new WaitCallback(                 ((IPublishingAgent)new PublishingAgent(                     configuration,                     dataPointIdentifier)).Activate                     ),null);    } }

4.

See what is done by the Activate() method of the PublishingAgent class, in the PublishingAgent.cs module of the PublisherService project. The method is reproduced in Listing 10.5.

Listing 10.5. Activating a Publication Agent

void IPublishingAgent.Activate(object state) {     this.randomDataPoint.Active = true;     NotificationStreamWriter writer = null;     IBufferedSubscriptionManager bufferedDataSubscriptionManager         = new BufferedSubscriptionManager(this.subscriberConfiguration, 100);     bufferedDataSubscriptionManager.AddSubscription(this.randomDataPoint);     while (true)     {         using (SubscriberProxy subscriberProxy             = new SubscriberProxy(this.subscriberConfiguration))         {             ISubscriber subscriber = (ISubscriber)subscriberProxy;             writer = new NotificationStreamWriter(                 bufferedDataSubscriptionManager,                 long.Parse(ConfigurationManager.AppSettings["MessageCapacity"]),                 new TimeSpan(                     0,                     0,                     0, int.Parse(     ConfigurationManager.AppSettings["UpdateFrequencyInSeconds"])),                 new TimeSpan(                     0,                     0,                     0,                     0, int.Parse(     ConfigurationManager.AppSettings["DataSourceTimeoutInMilliseconds])));             subscriber.Notify(writer);             subscriberProxy.Close();             Console.WriteLine("Batch completed.");          }     } }

The method adds details of the subscription to an instance of the BufferedDataSubscriptionManager class, which will begin buffering updates to the values of the data point to which the subscription pertains. Then the method invokes the subscriber's Notify() operation, passing an instance of a NotificationStreamWriter, which will then proceed to read updates from the BufferedDataSubscriptionManager and pass them to the Windows Communication Foundation for transmission.

5.

Start debugging the solution. The console windows of the Subscriber application and the PublisherServiceHost application should appear.

6.

When there is activity in the console window of the PublisherServiceHost application, enter a keystroke into the console of the Subscriber application. After a moment, updates from the publisher will begin to be registered in the console window of the subscriber.

7.

Watch for notification in the console window of the subscriber that the maximum size of a message incorporating a stream was about to be exceeded, resulting in that stream being closed and a new one automatically being provided by the publisher. This effect is shown in Figure 10.8.

Figure 10.8. Implementing publish/subscribe with the StreamingPublicationSubscription library.


8.

Stop debugging the solution.




Presenting Microsoft Communication Foundation. Hands-on.
Microsoft Windows Communication Foundation: Hands-on
ISBN: 0672328771
EAN: 2147483647
Year: 2006
Pages: 132

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net