All About Asynchronous Messaging

The previous part of this chapter only covers synchronous processing due to one reason: it's a lot easier and consistent between client and server than other types of processing.

As you know from the examples and figures earlier in this chapter, several methods are available for message handling: on the client side, there are the IMessageSink and the IClientChannelSink interfaces. Both sink types approach the handling of asynchronous messages in a substantially different manner, as described in the next sections.

Asynchronous IMessageSink Processing

When handling the messages in a synchronous manner in an IMessageSink chain, the response message will simply be the return value of the method call. You can see this in the following snippet, which shows a sample IMessageSink (I've omitted parts of the interface and only display the SyncProcessMessage() method here):

 Class MySink1: IMessageSink {    IMessageSink _nextSink;    IMessage SyncProcessMessage(IMessage msg) {       // here you can do something with the msg       IMessage retMsg = _nextSink.SyncProcessMessage(msg);       // here you can do something with the retMsg       // and then, simply return the retMsg to the previous sink       return retMsg;    } } 

When implementing asynchronous processing that is triggered whenever you use a Delegate's BeginInvoke() method, the call to NextSink.AsyncProcessMessage() is returned immediately. The response message is sent to a secondary chain, which is passed in the replySink parameter of the call to AsyncProcessMessage().

First, I show you how to do asynchronous processing when you don't want to be notified of the call's return:

 public class MySink1: IMessageSink {    IMessageSink _nextSink;    IMessageCtrl AsyncProcessMessage(IMessage msg,                                           IMessageSink replySink);       // here you can do something with the msg       return _nextSink.AsyncProcessMessage(msg, replySink);    } } 

In replySink, you'll receive the first entry to a chain of IMessageSink objects that want to be notified upon completion of the asynchronous call.

If you want to handle the reply message in a sink of your own, you have to instantiate a new IMessageSink object and "chain" it to the existing list of reply sinks. You can see this in the following snippet (again, parts of the interface have been omitted):

 public class MyReplySink: IMessageSink {    IMessageSink _nextSink;    MyReplySink(IMessageSink next) {       // .ctor used to connect this sink to the chain       _nextSink = next;    }    IMessage SyncProcessMessage(IMessage msg) {       // the msg will be the reply message!       // here you can do something with the msg       // and then, pass it onto the next reply sink in the chain       IMessage retMsg = _nextSink.SyncProcessMessage(msg);       return retMsg;    } } public class MySink1: IMessageSink {    IMessageSink _nextSink;    IMessageCtrl AsyncProcessMessage(IMessage msg,                                           IMessageSink replySink);       // here you can do something with the msg       // create a new reply sink which is chained to the existing replySink       IMessageSink myReplyChain = new MyReplySink(replySink);       // call the next sink's async processing       return _nextSink.AsyncProcessMessage(msg, myReplyChain);    } } 

When the async call is completed in this example, you'll have the option to change the reply message in the MyReplySink.SyncProcessMessage() method.

Note 

The reply message is processed synchronously; only the generation of the message happens asynchronously at the server.

Asynchronous IClientChannelSink Processing

As you can see in the following partial definition of the IClientChannelSink interface, there are, in contrast to the IMessageSink interface, distinct methods for handling the asynchronous request and reply.

 public interface IClientChannelSink {    void AsyncProcessRequest(IClientChannelSinkStack sinkStack,                                  IMessage msg,                                  ITransportHeaders headers,                                  Stream stream);    void AsyncProcessResponse(IClientResponseChannelSinkStack sinkStack,                                   object state,                                   ITransportHeaders headers,                                   Stream stream); } 

When the AsyncProcessRequest() method is called while an IMessage travels through a sink chain, it will receive an IClientChannelSinkStack as a parameter. This sink stack contains all sinks that want to be notified when the asynchronous processing returns. If your sink wants to be included in this notification, it has to push itself onto this stack before calling the next sink's AsyncProcessRequest() method.

You can see this in the following snippet (only parts are shown):

 public class SomeSink: IClientChannelSink {       IClientChannelSink _nextChnlSink;       public void AsyncProcessRequest(IClientChannelSinkStack sinkStack,                                             IMessage msg,                                             ITransportHeaders headers,                                             Stream stream)       {         // here you can work with the message         // pushing this sink onto the stack        sinkStack.Push (this,null);         // calling the next sink         _nextChnlSink.AsyncProcessRequest(sinkStack,msg,headers,stream);     } } 

The sinkStack object that is passed to AsyncProcessRequest() is of type ClientChannelSinkStack and implements the following IClientChannelSinkStack interface, which allows a sink to push itself onto the stack:

 public interface IClientChannelSinkStack : IClientResponseChannelSinkStack {    object Pop(IClientChannelSink sink);    void Push(IClientChannelSink sink, object state); } 

Whatever is being passed as the state parameter of Push() will be received by sink's AsyncProcessResponse() method upon completion of the call. It can contain any information the sink might need while processing the response. For built-in sinks, this will be the original message that triggered the request.

As the previous interface extends the IClientReponseChannelSinkStack interface, I'll show you this one here as well:

 public interface IClientResponseChannelSinkStack {    void AsyncProcessResponse(ITransportHeaders headers, Stream stream);    void DispatchException(Exception e);    void DispatchReplyMessage(IMessage msg); } 

The AsyncProcessResponse() method of the ClientChannelSinkStack pops one sink from stack and calls this sink's AsyncProcessResponse() method. Therefore, the "reverse" chaining that uses the sink stack works simply by recursively calling the stack's AsyncProcessResponse() method from each sink. This is shown in the following piece of code:

 public class SomeSink: IClientChannelSink {    public void AsyncProcessResponse(                IClientResponseChannelSinkStack sinkStack,                object state,                ITransportHeaders headers,                Stream stream);    {       // here you can work with the stream or headers       // calling the next sink via the sink stack       sinkStack.AsyncProcessResponse(headers,stream);    } } 

Generating the Request

In the following text, I show you how an asynchronous request is generated and how the response is handled. The example I'll use is based on the following custom sinks:

  • MyMessageSink: A custom IMessageSink

  • MyResponseSink: A sink created by MyMessageSink as a handler for the asynchronous response

  • MyChannelSink: A custom IClientChannelSink implementation

Regardless of whether the request is synchronous or asynchronous, the IMessageSink chain is handled before any calls to a IClientChannelSink. Immediately before the first call to IClientChannelSink.AsyncProcessRequest(), a ClientSinkStack object is instantiated and gets a reference to the IMessageSink reply chain. You can see the beginning of an asynchronous call in Figure 7-9.

click to expand
Figure 7-9: First phase of an asynchronous call

In Figures 7-10 through 7-14, you'll see the contents of the replySink and the SinkStack that are passed as parameters at the points marked (1) through (4) in Figure 7-9. In Figure 7-10 you can see the predefined reply chain that is established for each asynchronous call before reaching the first IMessageSink. The purpose of these sinks is to handle the asynchronous response in a form that's compatible with the "conventional" delegate mechanism.

click to expand
Figure 7-10: SinkStack before call to first custom IMessageSink

click to expand
Figure 7-11: SinkStack before call to SoapClientFormatterSink

click to expand
Figure 7-12: SinkStack before call to first custom IClientChannelSink

click to expand
Figure 7-13: SinkStack before call to HttpClientTransportSink

click to expand
Figure 7-14: Handling an asynchronous response

The final sink for the asynchronous reply will be the AsyncResult object that is returned from the delegate's BeginInvoke() method.

When MyMessageSink's AsyncProcessResponse() method is called, it generates a new reply sink, named MyReplySink, that is linked to the existing reply chain. You can see the ReplySink parameter that is passed to the next sink in Figure 7-11.

Figure 7-12 shows you the contents of the ClientChannelSinkStack after SoapClientFormatterSink has finished its processing. The stack contains a reference to the previous IMessageSink stack, shown in Figure 7-11, and points to the first entry in the stack of IClientFormatterSinks. This is quite interesting insofar as the SOAP formatter has been called as an IMessageSink but pushes itself onto the stack of IClientChannelSinks.

When the secondary custom IClientChannelSink object, MyChannelSink, is called, it pushes itself onto the stack and calls the AsyncProcessRequest() method of HttpClientTransportSink. In Figure 7-13 you can see the resulting channel sink stack before the HTTP request is sent.

Handling the Response

On receiving the HTTP response, HttpClientTransportSink calls AsyncProcessResponse() on the ClientChannelSinkStack. The sink stack then pops the first entry from the stack (using a different implementation, as with its public Pop() method) and calls AsyncProcessResponse() on the IClientChannelSink that is at the top of the stack. You can see the sequence of calls that follow in Figure 7-14.

Before this call—the point marked with (1) in the diagram—the ClientChannelSinkStack will look the same as in Figure 7-13. You can see the state of this stack after the "pop" operation in Figure 7-15.

click to expand
Figure 7-15: The ClientChannelSinkStack before the first sink is called

In the following step, the sink stack places a call to the custom MyChannelSink object. This sink will handle the call as shown in the following source code fragment and will therefore just proceed with invoking AsyncProcessResponse() on the sink stack again.

 public void AsyncProcessResponse(IClientResponseChannelSinkStack sinkStack,                                      object state,                                      ITransportHeaders headers,                                      Stream stream); {    sinkStack.AsyncProcessResponse(headers,stream); } 

The ClientChannelSinkStack now pops the next sink from the internal stack and forwards the call on to this SoapClientFormatterSink. You can see the state of the stack at Figure 7-16.

click to expand
Figure 7-16: The stack before the call to the SOAP formatter

The SOAP formatter deserializes the HTTP response and creates a new IMessage object. This object is then passed as a parameter to the channel sink stack's DispatchReplyMessage() method.

The sink stack now calls SyncProcessMessage() on the first entry of its reply sink chain, which is shown as (4) in the sequence diagram. After this call, the IMessage travels through the sinks until it reaches the AsyncResult object.

This final sink will examine the response message and prepare the return values for the call to the Delegate's EndInvoke() method.

Exception Handling

When an exception occurs during an IClientChannelSink's processing, it has to call DispatchException() on the sink stack. The stack in this case generates a new ReturnMessage object, passing the original exception as a parameter to its constructor. This newly created return message travels through the chain of IMessageSinks, and the exception will be "unwrapped" by the AsyncResult sink.

Server-Side Asynchronous Processing

On the server side, there are also two kinds of interfaces: IServerChannelSink and IMessageSink. The asynchronous processing for objects implementing IMessageSink is handled in the same way as on the client: the sink has to create another reply sink and pass this to the next sink's AsyncProcessMessage() method.

The handling of asynchronous messages for IServerChannelSink objects is a little bit different. You'll see the relevant parts of this interface below:

 public interface IServerChannelSink {    ServerProcessing ProcessMessage(IServerChannelSinkStack sinkStack,                                         IMessage requestMsg,                                         ITransportHeaders requestHeaders,                                         Stream requestStream,                                         ref IMessage responseMsg,                                         ref ITransportHeaders responseHeaders,                                         ref Stream responseStream);    void AsyncProcessResponse(IServerResponseChannelSinkStack sinkStack,                                   object state,                                   IMessage msg,                                   ITransportHeaders headers,                                   Stream stream); } 

When a serialized message is received by an object implementing this interface, it is not yet determined whether it will be handled synchronously or asynchronously. This can only be defined after the message later reaches the formatter.

In the meantime, the sink has to assume that the response might be received asynchronously, and therefore will have to push itself (and a possible state object) onto a stack before calling the next sink.

The call to the next sink returns a ServerProcessing value that can be Completed, Async, or OneWay. The sink has to check if this value is Completed and only in such a case might do any post-processing work in ProcessMessage(). If this returned value is OneWay, the sink will not receive any further information when the processing has been finished.

When the next sink's return value is ServerProcessing.Async, the current sink will be notified via the sink stack when the processing has been completed. The sink stack will call AsyncProcessResponse() in this case. After the sink has completed the processing of the response, it has to call sinkStack.AsyncProcessResponse() to forward the call to further sinks.

A sample implementation of ProcessMessage() and AsyncProcessResponse() might look like this:

 public ServerProcessing ProcessMessage(IServerChannelSinkStack sinkStack,    IMessage requestMsg,    ITransportHeaders requestHeaders,    Stream requestStream,    out IMessage responseMsg,    out ITransportHeaders responseHeaders,    out Stream responseStream) {    // Handling of the request will be implemented here    // pushing onto stack and forwarding the call    sinkStack.Push(this,null);     ServerProcessing srvProc = _nextSink.ProcessMessage(sinkStack,       requestMsg,       requestHeaders,       requestStream,       out responseMsg,       out responseHeaders,       out responseStream);    if (srvProc == ServerProcessing.Complete) {       // Handling of the response will be implemented here    }    // returning status information    return srvProc; } public void AsyncProcessResponse(IServerResponseChannelSinkStack sinkStack,    object state,    IMessage msg,    ITransportHeaders headers,    Stream stream) {    // Handling of the response will be implemented here    // forwarding to the stack for further processing    sinkStack.AsyncProcessResponse(msg,headers,stream); } 




Advanced  .NET Remoting C# Edition
Advanced .NET Remoting (C# Edition)
ISBN: 1590590252
EAN: 2147483647
Year: 2002
Pages: 91
Authors: Ingo Rammer

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net