Creating a Compression Sink

The first thing to ask yourself before starting to implement a new feature for .NET Remoting is whether you'll want to work on the original message (that is, the underlying dictionary) or on the serialized message that you'll get from the stream. In the case of compression, you won't really care about the message's contents and instead just want to compress the resulting stream at the client side and decompress it on the server side before reaching the server's SoapFormatter.

You can see the planned client-side sink chain in Figure 9-1 and the serverside chain in Figure 9-2.

click to expand
Figure 9-1: Client-side sink chain with the compression sink

click to expand
Figure 9-2: Server-side sink chain with the compression sink

After having decided upon the two new sinks, you can identify all classes that need to be written:

  • CompressionClientSink: Implements IClientChannelSink, compresses the request stream, and decompresses the response stream

  • CompressionClientSinkProvider: Implements IClientChannelSinkProvider and is responsible for the creation of the sink

  • CompressionServerSink: Implements IServerChannelSink, decompresses the request, and compresses the response before it is sent back to the client

  • CompressionServerSinkProvider: Implements IServerChannelSinkProvider and creates the server-side sink

Unfortunately, the .NET Framework does not come with classes that natively support compression, so you have to use a third-party compression library. I hereby recommend using Mike Krueger's NZipLib (available from http://www.icsharpcode.net/OpenSource/NZipLib/default.asp). This is an open source C# library that is covered by a liberated GPL license. To quote the Web page: "In plain English, this means you can use this library in commercial closedsource applications."

Implementing the Client-Side Sink

The client-side sink extends BaseChannelSinkWithProperties and implements IClientChannelSink. In Listing 9-1, you can see a skeleton client channel sink. The positions at which you'll have to implement the preprocessing and postprocessing logic have been marked "TODO." This is a fully working sink—it simply doesn't do anything useful yet.

Listing 9-1: A Skeleton IClientChannelSink

start example
 using System; using System.Runtime.Remoting.Channels; using System.Runtime.Remoting.Messaging; using System.IO; namespace CompressionSink {    public class CompressionClientSink: BaseChannelSinkWithProperties,                                    IClientChannelSink    {       private IClientChannelSink _nextSink;       public CompressionClientSink(IClientChannelSink next)       {          _nextSink = next;       }       public IClientChannelSink NextChannelSink       {          get {             return _nextSink;          }       }       public void AsyncProcessRequest(IClientChannelSinkStack sinkStack,                                         IMessage msg,                                         ITransportHeaders headers,                                         Stream stream)       {          // TODO: Implement the pre-processing          sinkStack.Push(this,null);          _nextSink.AsyncProcessRequest(sinkStack,msg,headers,stream);       }       public void AsyncProcessResponse(IClientResponseChannelSinkStack sinkStack,                                          object state,                                          ITransportHeaders headers,                                          Stream stream)       {          // TODO: Implement the post-processing          sinkStack.AsyncProcessResponse(headers,stream);       }       public Stream GetRequestStream(IMessage msg,                                         ITransportHeaders headers)       {          return _nextSink.GetRequestStream(msg, headers);       }       public void ProcessMessage(IMessage msg,                                     ITransportHeaders requestHeaders,                                     Stream requestStream,                                     out ITransportHeaders responseHeaders,                                     out Stream responseStream)       {          // TODO: Implement the pre-processing          _nextSink.ProcessMessage(msg,                                     requestHeaders,                                     requestStream,                                     out responseHeaders,                                     out responseStream);          // TODO: Implement the post-processing       }    } } 
end example

Before filling this sink with functionality, you create a helper class that communicates with the compression library and returns a compressed or uncompressed copy of a given stream. You can see this class in Listing 9-2.

Listing 9-2: Class Returning Compressed or Uncompressed Streams

start example
 using System; using System.IO; using NZlib.Compression; using NZlib.Streams; namespace CompressionSink {    public class CompressionHelper    {       public static Stream GetCompressedStreamCopy(Stream inStream)       {          Stream outStream = new System.IO.MemoryStream();          DeflaterOutputStream compressStream = new DeflaterOutputStream(                  outStream, new Deflater(Deflater.BEST_COMPRESSION));          byte[] buf = new Byte[1000];          int cnt = inStream.Read(buf,0,1000);          while (cnt>0)          {             compressStream.Write(buf,0,cnt);             cnt = inStream.Read(buf,0,1000);          }          compressStream.Finish();          compressStream.Flush();          return outStream;       }       public static Stream GetUncompressedStreamCopy(Stream inStream)       {          return new InflaterInputStream(inStream);       }    } } 
end example

When implementing the compression functionality in the client-side sink, you have to deal with both synchronous and asynchronous processing. The synchronous implementation is quite straightforward. Before passing control further down the chain, the sink simply compresses the stream. When it has finished processing (that is, when the server has sent its response), the message sink will decompress the stream and return it to the calling function as an out parameter:

 public void ProcessMessage(IMessage msg,                               ITransportHeaders requestHeaders,                               Stream requestStream,                               out ITransportHeaders responseHeaders,                               out Stream responseStream) {    // generate a compressed stream using NZipLib    requestStream  =  CompressionHelper.GetCompressedStreamCopy(requestStream);    // forward the call to the next sink    _nextSink.ProcessMessage(msg, requestHeaders, requestStream,                              out responseHeaders, out responseStream);    // uncompress  the response    responseStream = CompressionHelper.GetUncompressedStreamCopy(responseStream); } 

As you've seen in the previous chapter, asynchronous handling is split between two methods. In the current example, you add the compression to AsyncProcessRequest() and the decompression to AsyncProcessResponse(), as shown in the following piece of code:

 public void AsyncProcessRequest(IClientChannelSinkStack sinkStack,                                IMessage msg,                                ITransportHeaders headers,                                Stream stream) {    // generate a compressed stream using NZipLib    stream = CompressionHelper.GetCompressedStreamCopy(stream);    // push onto stack and forward the request    sinkStack.Push(this,null);    _nextSink.AsyncProcessRequest(sinkStack,msg,headers,stream); } public void AsyncProcessResponse(IClientResponseChannelSinkStack sinkStack,                                     object state,                                     ITransportHeaders headers,                                     Stream stream) {    // uncompress the response    stream = CompressionHelper.GetUncompressedStreamCopy(stream);    // forward the request    sinkStack.AsyncProcessResponse(headers,stream); } 

Implementing the Server-Side Sink

The server-side sink's task is to decompress the incoming stream before passing it on to the formatter. In Listing 9-3, you can see a skeleton IServerChannelSink.

Listing 9-3: A Basic IServerChannelSink

start example
 using System; using System.Runtime.Remoting.Channels; using System.Runtime.Remoting.Messaging; using System.IO; namespace CompressionSink {    public class CompressionServerSink: BaseChannelSinkWithProperties,                                           IServerChannelSink    {       private IServerChannelSink _nextSink;       public CompressionServerSink(IServerChannelSink next)       {          _nextSink = next;       }       public IServerChannelSink NextChannelSink       {          get          {             return _nextSink;          }       }       public void AsyncProcessResponse(IServerResponseChannelSinkStack sinkStack,          object state,          IMessage msg,          ITransportHeaders headers,          Stream stream)       {          // TODO: Implement the post-processing          // forwarding to the stack for further processing          sinkStack.AsyncProcessResponse(msg,headers,stream);       }       public Stream GetResponseStream(IServerResponseChannelSinkStack sinkStack,          object state,          IMessage msg,          ITransportHeaders headers)       {          return null;       }       public ServerProcessing ProcessMessage(IServerChannelSinkStack sinkStack,          IMessage requestMsg,          ITransportHeaders requestHeaders,          Stream requestStream,          out IMessage responseMsg,          out ITransportHeaders responseHeaders,          out Stream responseStream)       {          // TODO: Implement the pre-processing          // pushing onto stack and forwarding the call          sinkStack.Push(this,null);          ServerProcessing srvProc = _nextSink.ProcessMessage(sinkStack,             requestMsg,             requestHeaders,             requestStream,             out responseMsg,             out responseHeaders,             out responseStream);          // TODO: Implement the post-processing          // returning status information          return srvProc;       }    } } 
end example

An interesting difference between client-side and server-side sinks is that the server-side sink does not distinguish between synchronous and asynchronous calls during the request stage. Only later in the sink stack will this decision be made and the call possibly returned asynchronously—therefore you always have to push the current sink onto the sinkStack whenever you want the response to be post-processed. To follow the preceding example, you implement ProcessMessage() and AsyncProcessResponse() to decompress the request and compress the response.

 public ServerProcessing ProcessMessage(IServerChannelSinkStack sinkStack,    IMessage requestMsg,    ITransportHeaders requestHeaders,    Stream requestStream,    out IMessage responseMsg,    out ITransportHeaders responseHeaders,    out Stream responseStream) {    // uncompressing the request    requestStream = CompressionHelper.GetUncompressedStreamCopy(requestStream);    // pushing onto stack and forwarding the call    sinkStack.Push(this,null);    ServerProcessing srvProc = _nextSink.ProcessMessage(sinkStack,       requestMsg, requestHeaders, requestStream,       out responseMsg, out responseHeaders, out responseStream);    // compressing the response    responseStream = CompressionHelper.GetCompressedStreamCopy(responseStream);    // returning status information    return srvProc; } public void AsyncProcessResponse(IServerResponseChannelSinkStack sinkStack,    object state,    IMessage msg,    ITransportHeaders headers,    Stream stream) {    // compressing the response    stream = CompressionHelper.GetCompressedStreamCopy(stream);    // forwarding to the stack for further processing    sinkStack.AsyncProcessResponse(msg,headers,stream); } 

Congratulations! If you've been following along with the examples, you have now finished your first channel sinks. To start using them, you only have to implement two providers that take care of the sink's initialization.

Creating the Sink Providers

Before you can use your sinks in a .NET Remoting application, you have to create a server-side and a client-side sink provider. These classes look nearly identical for most sinks you're going to implement.

In the CreateSink() method, you first create the next provider's sinks and then put the compression sink on top of the chain before returning it, as shown in Listing 9-4.

Listing 9-4: The Client-Side Sink Provider

start example
 using System; using System.Runtime.Remoting.Channels; using System.Collections; namespace CompressionSink {    public class CompressionClientSinkProvider: IClientChannelSinkProvider    {       private IClientChannelSinkProvider _nextProvider;       public CompressionClientSinkProvider(IDictionary properties,              ICollection providerData)       {          // not yet needed       }       public IClientChannelSinkProvider Next       {          get {return _nextProvider; }          set {_nextProvider = value;}       }       public IClientChannelSink CreateSink(IChannelSender channel,            string url,            object remoteChannelData)       {          // create other sinks in the chain          IClientChannelSink next = _nextProvider.CreateSink(channel,             url,             remoteChannelData);          // put our sink on top of the chain and return it          return new CompressionClientSink(next);       }    } } 
end example

The server-side sink provider that is shown in Listing 9-5 looks nearly identical, but returns IServerChannelSink instead of IClientChannelSink.

Listing 9-5: The Server-Side Sink Provider

start example
 using System; using System.Runtime.Remoting.Channels; using System.Collections; namespace CompressionSink {    public class CompressionServerSinkProvider: IServerChannelSinkProvider    {       private IServerChannelSinkProvider _nextProvider;       public CompressionServerSinkProvider(IDictionary properties,               ICollection providerData)       {          // not yet needed       }       public IServerChannelSinkProvider Next       {          get {return _nextProvider; }          set {_nextProvider = value;}       }       public IServerChannelSink CreateSink(IChannelReceiver channel)       {          // create other sinks in the chain          IServerChannelSink next = _nextProvider.CreateSink(channel);          // put our sink on top of the chain and return it          return new CompressionServerSink(next);       }       public void GetChannelData(IChannelDataStore channelData)       {          // not yet needed       }    } } 
end example

Using the Sinks

To use the sinks on the client and server side of a channel, you simply have to include them in your configuration files. In the client-side configuration file, you have to incorporate the information shown in the following code. If you place the CompressionSink assembly in the GAC, mind that you have to specify the complete strong name in the type attribute!

  <configuration>   <system.runtime.remoting>    <application> <channels>     <channel ref="http">       <clientProviders>          <formatter ref="soap" />          <provider          type="CompressionSink.CompressionClientSinkProvider, CompressionSink" />       </clientProviders>     </channel>    </channels>   </application>  </system.runtime.remoting> </configuration> 

The server-side configuration file will look similar to the following:

 <configuration>  <system.runtime.remoting>   <application>    <channels>     <channel ref="http" port="1234">      <serverProviders>          <provider          type="CompressionSink.CompressionServerSinkProvider, CompressionSink" />         <formatter ref="soap"/>      </serverProviders>     </channel>    </channels>   </application>  </system.runtime.remoting> </configuration> 

Figure 9-3 shows a TCP trace from a client/server connection that isn't using this sink, whereas Figure 9-4 shows the improvement when compression is used.

click to expand
Figure 9-3: TCP trace of an HTTP/SOAP connection[1]

click to expand
Figure 9-4: TCP trace of an HTTP connection with compressed content

In the circled area, you can see that the HTTP Content-Length header goes down from 549 bytes to 234 bytes when using the compression sink.

Note 

This is a proof-of-concept example. Instead of using compression in this scenario, you could easily switch to binary encoding to save even more bytes to transfer. But keep in mind that the compression sink also works with the binary formatter!

Extending the Compression Sink

The server-side sink as presented in the previous section has at least one serious problem when used in real-world applications: it doesn't yet detect if the stream is compressed or not and will always try to decompress it. This will lead to an inevitable exception when the request stream has not been compressed before.

In an average remoting scenario, you have two types of users. On the one hand, there are local (LAN) users who connect to the server via high-speed links. If these users compress their requests, it's quite possible that the stream compression would take up more time (in regard to client- and server-side CPU time plus transfer time) than the network transfer of the uncompressed stream would. On the other hand, you might have several remote users who connect via lines ranging from speedy T1s down to 9600 bps wireless devices. These users will quite certainly profit from sending requests in a compressed way.

The first step to take when implementing these additional capabilities in a channel sink is to determine how the server will know that the request stream is compressed. Generally this can be done by adding additional fields to the ITransportHeader object that is passed as a parameter to ProcessMessage() and AsyncProcessRequest().

These headers are then transferred to the server and can be obtained by the server-side sink by using the ITransportHeaders that it receives as a parameter to its ProcessMessage() method. By convention, these additional headers should start with the prefix X-, so that you can simply add a statement like the following in the client-side sink's ProcessMessage() method to indicate that the content will be compressed:

 requestHeaders["X-Compress"]="yes"; 

The complete AsyncProcessRequest() method now looks like this:

 public void AsyncProcessRequest(IClientChannelSinkStack sinkStack,                           IMessage msg,                           ITransportHeaders headers,                           Stream stream) {    headers["X-Compress"]="yes";    stream = CompressionHelper.GetCompressedStreamCopy(stream);    // push onto stack and forward the request    sinkStack.Push(this,null);    _nextSink.AsyncProcessRequest(sinkStack,msg,headers,stream); } 

When the server receives this request, it processes the message and replies with a compressed stream as well. The server also indicates this compression by setting the X-Compress header. The complete client-side code for AsyncProcessResponse() and ProcessMessage() will therefore look at the response headers and decompress the message if necessary:

 public void ProcessMessage(IMessage msg,                        ITransportHeaders requestHeaders,                        Stream requestStream,                        out ITransportHeaders responseHeaders,                        out Stream responseStream) {    requestStream = CompressionHelper.GetCompressedStreamCopy(requestStream);    requestHeaders["X-Compress"] = "yes";    // forward the call to the next    sink _nextSink.ProcessMessage(msg,                                requestHeaders,                                requestStream,                                out responseHeaders,                                out responseStream);    // deflate the response if necessary    String xcompress = (String) responseHeaders["X-Compress"];    if (xcompress != null && xcompress == "yes")    {       responseStream =             CompressionHelper.GetUncompressedStreamCopy(responseStream);    } } public void AsyncProcessResponse(IClientResponseChannelSinkStack sinkStack,                              object state,                              ITransportHeaders headers,                              Stream stream) {    // decompress the stream if necessary    String xcompress = (String) headers["X-Compress"];    if (xcompress != null && xcompress == "yes")    {       stream = CompressionHelper.GetUncompressedStreamCopy(stream);    }    // forward the request    sinkStack.AsyncProcessResponse(headers,stream); } 

The server-side channel sink's ProcessMessage() method works a little bit differently. As you've seen in Chapter 7, when the message reaches this method, it's not yet determined if the call will be executed synchronously or asynchronously. Therefore the sink has to push itself onto a sink stack that will be used when replying asynchronously.

As the AsyncProcessResponse() method for the channel sink has to know whether the original request has been compressed or not, you'll need to use the second parameter of the sinkStack.Push() method, which is called during ProcessMessage(). In this parameter you can put any object that enables you to later determine the state of the request. This state object will be passed as a parameter to AsyncProcessResponse(). The complete server-side implementation of ProcessMessage() and AsyncProcessResponse() therefore looks like this:

 public ServerProcessing ProcessMessage(IServerChannelSinkStack sinkStack,    IMessage requestMsg,    ITransportHeaders requestHeaders,    Stream requestStream,    out IMessage responseMsg,    out ITransportHeaders responseHeaders,    out Stream responseStream) {    bool isCompressed=false;    // decompress the stream if necessary    String xcompress = (String) requestHeaders["X-Compress"];    if (xcompress != null && xcompress == "yes")    {       requestStream = CompressionHelper.GetUncompressedStreamCopy(requestStream);       isCompressed = true;    }    // Pushing onto stack and forwarding the call.    // The state object contains true if the request has been compressed,    // else false.    sinkStack.Push(this,isCompressed);    ServerProcessing srvProc = _nextSink.ProcessMessage(sinkStack,       requestMsg,       requestHeaders,       requestStream,       out responseMsg,       out responseHeaders,       out responseStream);    if (srvProc == ServerProcessing.Complete ) {       // compressing the response if necessary       if (isCompressed)       {          responseStream=                  CompressionHelper.GetCompressedStreamCopy(responseStream);          responseHeaders["X-Compress"] = "yes";       }    }    // returning status information    return srvProc; } public void AsyncProcessResponse(IServerResponseChannelSinkStack sinkStack,    object state,    IMessage msg,    ITransportHeaders headers,    Stream stream) {    // fetching the flag from the async-state    bool hasBeenCompressed = (bool) state;    // compressing the response if necessary    if (hasBeenCompressed)    {       stream=CompressionHelper.GetCompressedStreamCopy(stream);       headers["X-Compress"] = "yes";    }    // forwarding to the stack for further processing    sinkStack.AsyncProcessResponse(msg,headers,stream); } 

As you can see in Figures 9-5, which shows the HTTP request, and Figure 9-6, which shows the corresponding response, the complete transfer is compressed and the custom HTTP header X-Compress is populated.

click to expand
Figure 9-5: The compressed HTTP request

click to expand
Figure 9-6: The compressed HTTP response

[1]You can get this tcpTrace tool at http://www.pocketsoap.com.




Advanced  .NET Remoting C# Edition
Advanced .NET Remoting (C# Edition)
ISBN: 1590590252
EAN: 2147483647
Year: 2002
Pages: 91
Authors: Ingo Rammer

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net