10.2 Asynchronous Message Passing


10.2 Asynchronous Message Passing

In asynchronous message passing, the send operation does not block, as in the synchronous scheme, but continues. Messages which have been sent but not received are held in a message queue. Senders add messages to the tail of the queue and a receiver removes messages from the head. The abstraction we use for asynchronous message communication is termed a port. As shown in Figure 10.6, many senders may send to a port but only a single receiver may receive messages from it. Communication is said to be many-to-one.

image from book
Figure 10.6: Asynchronous message-passing port.

A port is a (conceptually) unbounded first-in-first-out (FIFO) queue of messages. The send and receive operations on ports are defined as follows:

send(e, p) – send the value of the expression e to port p. The process calling the send operation is not blocked. The message e is queued at the port if the receiver is not waiting.

v = receive(p) – receive a value into local variable v from port p. The calling process is blocked if there are no messages queued to the port.

This form of communication is termed asynchronous since the sender proceeds independently of the receiver. Synchronization only occurs when the receiver waits for a sender if the queue of messages at the port is empty. If send operations can occur more frequently than receive, then there is no upper bound on the length of queue required and consequently no upper bound on the amount of store required to buffer messages. Obviously, in a real computer system there is a fixed bound on the buffer space available. It is the responsibility of the designer to ensure that a message-passing program does not exceed this bound.

A process may selectively wait for messages from a set of ports using exactly the same select construct described earlier for synchronous message passing.

10.2.1 Asynchronous Message Passing in Java

We can implement asynchronous message passing in Java in the same way as we implemented channels. The outline of the Java class that implements the port abstraction is described in Program 10.5.

Program 10.5: Port class.

image from book
 class Port<T> extends Selectable{   . . .   public synchronized void send(T v) {. . .}   public synchronized T receive()              throws InterruptedException {. . .} }
image from book

The Port class extends the Selectable base class so that receive operations on ports can be combined in selective receive operations as described in section 10.1.4 for channel receives. In fact, the implementation permits channels and ports to be combined in the same Select object since they are both derived from the same Selectable base class.

The operation of asynchronous message communication can be observed using the applet depicted in Figure 10.7. The demonstration program has two sender threads which each send a sequence of messages with values 0..9 to the receiver thread via a port. The receiver thread receives a sequence of values, which is a merge of the two sequences sent. The display depicts the situation in which Sender1 is about to send the value 9 and Sender2 is about to send the value 8. The port is currently empty and the receiver blocked. The receiver thread performs four receive operations on every revolution of its display while the senders perform a single send on each revolution. Consequently, if all three threads are running, the port will have a maximum of two messages queued and for most of the time it will be empty. However, if the receiver thread is suspended using the Pause button then the senders continue to run queuing messages in the port. In this situation, the applet will eventually terminate with a Java OutOfMemory runtime error. The code for sender and receiver threads is given in Program 10.6.

image from book
Figure 10.7: Asynchronous message-passing applet display.

Program 10.6: Asender and Areceiver classes.

image from book
 class Asender implements Runnable {   private Port<Integer> port;   private SlotCanvas display;   Asender(Port<Integer> p, SlotCanvas d)   {port=p; display =d;}   public void run() {     try {       int ei = 0;       while(true) {         display.enter(String.valueOf(ei));         ThreadPanel.rotate(90);         port.send(new Integer(ei));         display.leave(String.valueOf(ei));         ei=(ei+1)%10; ThreadPanel.rotate(270);       }     } catch (InterruptedException e){}   } } class Areceiver implements Runnable {   private Port<Integer> port;   private SlotCanvas display;   Areceiver(Port<Integer> p, SlotCanvas d)     {port=p; display =d;}   public void run() {     try {     Integer v=null;     while(true) {       ThreadPanel.rotate(45);       if (v!=null) display.leave(v.toString());       ThreadPanel.rotate(45);       v = port.receive();       display.enter(v.toString());     }     } catch (InterruptedException e){}   } }
image from book

The threads and port are created by the Java code:

 Port<Integer> port = new Port<Integer>(); tx1.start(new Asender(port,send1disp)); tx2.start(new Asender(port,send2disp)); rx.start(new Areceiver(port,recvdisp));

where tx1, tx2 and rx are instances of ThreadPanel and send1disp, send2disp and recvdisp are instances of SlotCanvas.

10.2.2 Modeling Asynchronous Message Passing

We modeled synchronous communication directly as actions shared between two processes. A channel was modeled as the name of a shared action. Modeling asynchronous communication is considerably more difficult. The first difficulty arises because of the potentially unbounded size of port queues. As discussed earlier, models must be finite so that we can carry out analysis. Consequently, we cannot model a port with an unbounded queue. Instead, we use the solution we adopted in modeling semaphores in Chapter 5 and model a finite entity that causes an error when it overflows. The model for a port that can queue up to three messages is:

 range M = 0..9 set   S = {[M],[M][M]} PORT           // empty state, only send permitted   = (send[x:M] ->PORT[x]), PORT[h:M]      // one message queued to port   = (send[x:M] ->PORT[x][h]     |receive[h]->PORT     ), PORT[t:S][h:M] // two or more messages queued to port   = (send[x:M] ->PORT[x][t][h]     |receive[h]->PORT[t]     ).

The set S defines the set of values that can be taken by the tail of the queue when the queue contains two or more messages. However, care must be taken when modeling queues since the port described above, for a queue of up to three messages, generates a labeled transition system with 1111 states. A port to queue up to four messages can be produced by redefining the set S to be {[M],[M][M],[M][M][M]}. Extending the model to queue up to four messages would generate an LTS with 11111 states. In general, the model of a queue with n places for messages which can take up to x distinct values requires (xn+1 – 1)/(x – 1) states. In modeling asynchronous message-passing programs, care must be taken to restrict both the range of data values that a message can take and the size of queues. Otherwise, intractably large models are produced. The port model with 1111 states is clearly too large to view as a graph. To check that the model describes the correct behavior, we can abstract from the value of messages and examine only send and receive actions. To do this, we relabel the send and receive actions as shown below:

 ||APORT = PORT           /{send/send[M],receive/receive[M]}.

The minimized LTS for this abstracted port, called APORT, consists of only four states and is depicted in Figure 10.8.

image from book
Figure 10.8: APORT labeled transition system.

Figure 10.8 clearly shows that the port accepts up to three consecutive sends. A fourth send causes the port queue to overflow and the port to move to the ERROR state.

With the model for a port, we can now provide a complete model for the example program of the previous section. The Sender and Receiver threads are modeled as:

 ASENDER      = ASENDER[0], ASENDER[e:M] = (port.send[e]->ASENDER[(e+1)%10]). ARECEIVER    = (port.receive[v:M]->ARECEIVER).

The composition of two SENDER processes and a RECEIVER process communicating by a port is described in Figure 10.9.

image from book
Figure 10.9: Asynchronous message applet model.

A safety analysis of AsyncMsg produces the following output:

 Trace to property violation in port:PORT:      s.1.port.send.0      s.1.port.send.1      s.1.port.send.2      s.1.port.send.3

This is the situation where a fourth consecutive send causes the port queue to overflow. Since the model abstracts from time, it takes no account of the fact that in the implementation, we have made the receiver run faster than the senders. However, queue overflow (or rather memory overflow) can occur in the implementation if we slow the receiver by suspending it. The demonstration applet is inherently unsafe since, no matter how large the port queue, it can eventually overflow.




Concurrency(c) State Models & Java Programs
Concurrency: State Models and Java Programs
ISBN: 0470093552
EAN: 2147483647
Year: 2004
Pages: 162

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net