8.2 The Asynchronous IO Factory Classes

Ru-Brd

8.2 The Asynchronous I/O Factory Classes

Motivation

The proactive I/O model is generally harder to program than reactive and synchronous I/O models because

  • I/O initiation and completion are distinct activities that must be handled separately.

  • Multiple I/O operations can be initiated simultaneously , which requires more recordkeeping.

  • There's no guaranteed completion order when multiple I/O operations complete simultaneously.

  • In a multithreaded service a completion handler may execute in a thread other than the one that initiated the I/O operation.

The proactive I/O model therefore requires a factory to initiate asynchronous I/O operations. Since multiple I/O operations can execute simultaneously and complete in any order, the proactive model also requires an explicit binding between each asynchronous operation, its parameters (such as the I/O handle, data buffer, and buffer size), and the completion handler that will process the results of the operation.

In theory, designing classes to generate asynchronous I/O operations and bind them to their completion handlers should be relatively straightforward. In practice, however, the design is complicated by the fact that asynchronous I/O is implemented in different ways across today's popular OS platforms. Two common examples include:

  • Windows. The Windows ReadFile() and WriteFile() system functions can either perform synchronous I/O or initiate an overlapped I/O operation.

  • POSIX. The POSIX aio_read() and aio_write() functions initiate asynchronous read and write operations, respectively. These functions are separate from the read() and write() (and Sockets recv() and send() ) functions that are used in ACE's IPC wrapper facade classes (see Chapter 3 in C++NPv1).

Each platform's asynchronous I/O facility also includes its own mechanism for binding an I/O operation with its parameters, such as buffer pointer and transfer size. For example, POSIX AIO provides an AIO control block ( aiocb ), whereas Windows provides the OVERLAPPED structure and a completion key argument to the I/O completion port facility. Sidebar 54 discusses other challenges with OS asynchronous I/O mechanisms.

Sidebar 54: Asynchronous I/O Portability Issues

Unlike synchronous I/O and reactive I/O, which are available on most modern operating systems, asynchronous I/O is not ubiquitous. The following OS platforms supported by ACE provide asynchronous I/O mechanisms:

  • Windows platforms that support both overlapped I/O and I/O completion ports [Ric97]. Overlapped I/O is an efficient and scalable I/O mechanism on Windows [Sol98]. Windows performs completion event demultiplexing via I/O completion ports and event handles. An I/O completion port is a queue managed by the Windows kernel to buffer I/O completion events.

  • POSIX platforms that implement the POSIX.4 AIO specification [POS95]. This specification was originally designed for disk file I/O [Gal95], but can also be used for network I/O with varying degress of success. An application thread can wait for completion events via aio_suspend() or be notified by real-time signals, which are tricky to integrate into an event-driven application. In general, POSIX.4 AIO requires extra care to program the proactive model correctly and efficiently . Despite UNIX's usual interchangeability of I/O system functions across IPC mechanisms, integration of the POSIX AIO facility with other IPC mechanisms, such as the Socket API, leaves much to be desired on some platforms. For example, Socket API functions such as connect() and accept() are not integrated with the POSIX AIO model, and some AIO implementations can't handle multiple outstanding operations on a handle under all conditions.

Asynchronous I/O performance characteristics can also vary widely. For example, some operating systems implement the POSIX AIO functions by spawning a thread for each asynchronous I/O operation. Although this is a compliant implementation, it provides no performance gain relative to an application spawning the thread itself. In fact, this implementation can actually degrade the performance of I/O- intensive applications rather than improve it! Hopefully, OS asynchronous I/O implementations will soon improve, allowing wider use of this powerful mechanism.

All the asynchronous I/O mechanisms discussed above use an I/O handle to refer to the IPC channel or file on which to perform the I/O. The ACE Proactor framework defines a set of classes that initiate asynchronous I/O on various IPC mechanisms. These classes enhance portability, minimize complexity, and avoid reintroducing the I/O handle- related problems mastered in C++NPv1. This book examines the two most popular classes for networked applications, ACE_Asynch_Read_Stream and ACE_Asynch_Write_Stream .

Class Capabilities

ACE_Asynch_Read_Stream and ACE_Asynch_Write_Stream are factory classes that enable applications to initiate portable asynchronous read() and write() operations, respectively. These classes provide the following capabilities:

  • They can initiate asynchronous I/O operations on a stream-oriented IPC mechanism, such as a TCP socket.

  • They bind an I/O handle, an ACE_Handler object, and an ACE_Proactor to process I/O completion events correctly and efficiently.

  • They create an object that carries an operation's parameters through the ACE Proactor framework to its completion handler.

  • They derive from ACE_Asynch_Operation , which provides the interface to initialize the object and to request cancellation of outstanding I/O operations.

ACE_Asynch_Read_Stream and ACE_Asynch_Write_Stream define nested Result classes to represent the binding between an operation and its parameters. The ACE Proactor framework abstracts common results-oriented behavior into the ACE_Asynch_Result class from which the nested Result classes derive. Together, this set of classes provides a completion handler with the following capabilities:

  • It can obtain the original parameters for an I/O operation, such as the requested transfer byte count and the memory address.

  • It can determine the success or failure of the associated operation.

  • It can be passed an asynchronous completion token (ACT) [POSA2] that provides a method to extend the amount and type of information communicated between the operation initiator and the completion handler.

The interfaces for ACE_Asynch_Result , ACE_Asynch_Read_Stream , ACE_Asynch_Write_Stream and their Result nested classes are shown in Figure 8.2 (page 264). The following table lists the key methods in ACE_Asynch_Read_Stream :

Figure 8.2. The ACE Asynchronous Read/Write Stream Class Relationships

Method

Description

open ()

Initialize object to prepare for initiating asynchronous read() operations.

cancel()

Attempt to cancel outstanding read() operations initiated via this object.

read()

Initiate an asynchronous operation to read from the associated IPC stream.

The key methods in the ACE_Asynch_Write_Stream class are shown in the following table:

Method

Description

open()

Initialize object to prepare for initiating asynchronous write() operations.

cancel()

Attempt to cancel outstanding write() operations initiated via this object.

write()

Initiate an asynchronous operation to write to the associated IPC stream.

The open() methods bind the asynchronous I/O factory object to

  • The handle that's used to initiate I/O operations

  • The ACE_Proactor object that will detect and demultiplex completion events for those I/O operations

  • The ACE_Handler object that will handle proactor-dispatched I/O completions

The act parameter is an asynchronous completion token that will be associated with each I/O operation issued via the I/O factory object. When the operation completes, the act parameter can be retrieved using the act() method in ACE_Asynch_Result . This feature is specific to Windows, however, so we don't show its use in this book.

The read() and write() methods use an ACE_Message_Block to receive into and send from, respectively, thereby providing the following benefits:

  • Simplified buffer management. Since asynchronous I/O initiation and completion are handled in separate classes, information concerning buffer addresses, available space, and used space must be associated with each I/O operation. Reusing ACE_Message_Block addresses these needs portably and efficiently.

  • Automatic transfer count udpate. The write() method starts transferring bytes from the message block's read pointer, that is, it reads bytes out of the message. Conversely, the read() method starts reading into the message block beginning at its write pointer, that is, it writes new bytes into the message. After successful completion of an I/O operation, the message block's read and write pointers are updated to reflect the number of bytes transferred successfully. Applications therefore needn't adjust message buffer pointers or sizes since the ACE Proactor framework handles this automatically. Sidebar 55 (page 275) further describes how the ACE Proactor framework manages ACE_Message_Block pointers.

  • Easy integration with other ACE frameworks. The ACE_Message_Block provides a convenient mechanism to obtain or forward data for further processing in the ACE Task framework (Chapter 6), ACE Acceptor-Connector framework (Chapter 7), and the ACE Streams framework (Chapter 9).

In contrast to the ACE IPC wrapper facades, such as ACE_SOCK_Stream described in C++NPv1, ACE_Asynch_Read_Stream and ACE_Asynch_Write_Stream don't encapsulate any underlying IPC mechanisms. Instead, they define the interface for initiating asynchronous I/O operations. This design yields the following benefits:

  • It allows reuse of ACE's IPC wrapper facade classes, such as ACE_SOCK_Stream and ACE_SPIPE_Stream , in the ACE Proactor framework and avoids recreating a parallel set of IPC classes usable only in the Proactor framework.

  • It imposes a structure to avoid misuse of I/O handles by exposing only the desired I/O operation initiators.

  • It facilitates use of the same IPC classes for synchronous and asynchronous I/O by giving the I/O handle to the asynchronous operation factory.

Therefore, networked applications written with ACE can use any combination of synchronous, reactive, and proactive I/O. The decision concerning which I/O mechanism to use can be made at compile time or run time if desired. In fact, the decision can be deferred until after the IPC object is set up! For example, an application may decide which mechanism to use after establishing a socket connection [HPS97].

The ACE Proactor framework also contains factory classes for initiating datagram I/O operations ( ACE_Asynch_Read_Dgram and ACE_Asynch_Write_Dgram ) and file I/O operations ( ACE_Asynch_Read_File , ACE_Asynch_Write_File , and ACE_Asynch_Transmit_File ). The designs and capabilities of these classes are similar to those described in this chapter. Please consult the online ACE documentation at http://ace.ece.uci.edu/Doxygen/ or http://www.riverace.com/docs/ for details.

Example

This chapter reimplements the client logging daemon service from Chapter 7 using the ACE Proactor framework. Although the classes used in the proactive client logging daemon service are similar to those in the Acceptor-Connector version, the proactive version uses a single application thread to initiate and handle completions for all its I/O operations.

The classes comprising the client logging daemon based on the ACE Proactor framework are shown in Figure 8.3. The role of each class is outlined below:

Figure 8.3. Classes in the Proactive Client Logging Daemon

Class

Description

AIO_Output_Handler

A message forwarder that initiates asynchronous write operations to forward messages to the logging server

AIO_CLD_Connector

A factory that actively (re)establishes and authenticates connections with the logging server and activates an AIO_Output_Handler

AIO_Input_Handler

Processes log record data received from logging clients via asynchronous read() operations and passes completed log records to AIO_Output_Handler for output processing

AIO_CLD_Acceptor

A factory that accepts connections from logging clients and creates a new AIO_Input_Handler for each

AIO_Client_Logging_Daemon

A facade class that integrates the other classes together

The interactions between instances of these classes are shown in Figure 8.4 (page 268). We'll start describing the AIO_Output_Handler class in this example and present the other classes in subsequent sections in this chapter.

Figure 8.4. Interactions in the Proactive Client Logging Daemon

The source code for this example is in the AIO_Client_Logging_Daemon.cpp file. The AIO_Output_Handler class forwards log records to a server logging daemon. A portion of its class definition follows :

 class AIO_Output_Handler    : public ACE_Task<ACE_NULL_SYNCH>,      public ACE_Service_Handler { 

AIO_Output_Handler inherits from ACE_Task to reuse its ACE_Message_Queue . This queue is unsynchronized since all work in this service occurs in one application thread. Since AIO_Output_Handler derives from ACE_Service_Handler , it can process completion events (described in Section 8.3) and act as the target of the AIO_CLD_Connector asynchronous connection factory (described in Section 8.4).

 public:    AIO_Output_Handler (): can_write_ (0) {}    virtual AIO_Output_Handler ();    // Entry point into the <AIO_Output_Handler>.    virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0);    // Hook method called when server connection is established.    virtual void open (ACE_HANDLE new_handle,                       ACE_Message_Block &message_block);  protected:    ACE_Asynch_Read_Stream reader_; // Detects connection loss.    ACE_Asynch_Write_Stream writer_; // Sends records to server.    int can_write_; // Safe to begin sending a log record?    // Initiate the send of a log record.    void start_write (ACE_Message_Block *mblk = 0);  };  typedef ACE_Unmanaged_Singleton<AIO_Output_Handler,                                  ACE_Null_Mutex>          OUTPUT_HANDLER; 

AIO_Output_Handler contains an ACE_Asynch_Read_Stream to detect when the server connection closes . It also contains an ACE_Asynch_Write_Stream to initiate asynchronous write() operations that send log records to the server logging daemon.

Since there's one TCP connection to the server logging daemon, we use a single AIO_Output_Handler to forward log records to the server logging daemon. We therefore define OUTPUT_HANDLER as an ACE_Unmanaged_Singleton (page 194). It's unmanaged to ensure that we control its life cycle since this service may be stopped and unlinked before the end of the program. The synchronization traits class is ACE_Null_Mutex because all access to the singleton occurs in one thread.

When the server logging daemon connection is established, the following open() hook method is dispatched by the ACE Proactor framework:

 1 void AIO_Output_Handler::open   2     (ACE_HANDLE new_handle, ACE_Message_Block &) {   3   ACE_SOCK_Stream temp_peer (new_handle);   4   int bufsiz = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;   5   temp_peer.set_option (SOL_SOCKET, SO_SNDBUF,   6                         &bufsiz, sizeof bufsiz);   7   8   reader_.open (*this, new_handle, 0, proactor ());   9   writer_.open (*this, new_handle, 0, proactor ());  10  11   ACE_Message_Block *mb;  12   ACE_NEW (mb, ACE_Message_Block (1));  13   reader_.read (*mb, 1);  14   ACE_Sig_Action no_sigpipe ((ACE_SignalHandler) SIG_IGN);  15   no_sigpipe.register_action (SIGPIPE, 0);  16   can_write_ = 1;  17   start_write (0);  18 } 

Lines 3 “6 Increase the new socket's send buffer to its largest size to maximize throughput over long-delay and/or high-speed networks. We use a temporary ACE_SOCK_Stream object to set the buffer in a type-safe manner.

Lines 8 “9 Initialize the reader_ and writer_ objects. Each specifies this object as the completion handler, the new socket handle to issue operations on, and the same ACE_Proactor used to open the connection. The completion key ( ACT ) facility isn't used, so the third argument is 0. After initialization, the reader_ and writer_ are used to initiate asynchronous I/O operations.

Lines 11 “13 The client logging daemon in the Example part of Section 7.4 registered the server socket for input events in a reactor. In the proactive model, we initiate an asynchronous read() for one byte. If this operation ever completes, either the server sent data (which violates the protocol) or the server closed its socket endpoint.

Lines 14 “15 As in other examples that rely on a READ event to detect a closed socket, ignore the SIGPIPE signal so that asynchronous write() operations won't abort the program if the connection is closed.

Lines 16 “17 To avoid interleaving log records in the presence of partial writes, we only transmit one log record at a time. The can_write_ flag indicates whether it's safe to start writing a new log record or not. Since this connection is newly opened, it's now safe to write, so we set the flag and then call the following start_write() method to initiate an asynchronous write() operation:

 1 void AIO_Output_Handler::start_write   2     (ACE_Message_Block *mblk) {   3   if (mblk == 0) {   4     ACE_Time_Value nonblock (0);   5     getq (mblk, &nonblock);   6   }   7   if (mblk != 0) {   8     can_write_ = 0;   9     if (writer_.write (*mblk, mblk->length ()) == -1)  10       ungetq (mblk);  11   }  12 } 

Lines 1 “6 The start_write() method can be called with one of two arguments

  • A NULL pointer, which indicates that the first message in the queue should be dequeued and sent

  • A non- NULL ACE_Message_Block , which indicates to start writing immediately

Lines 7 “11 If there's no log record available for sending, we return without doing anything. Otherwise, we reset can_write_ to 0 to prevent further message blocks from being sent until the current one is finished. The writer_ object initiates an asynchronous write() operation to send the message block. If the write() initiation fails, the message block is put back on the queue with the assumption that the socket is closed. The reconnection strategy (page 293) will trigger an asynchronous write() as soon as the connection is reestablished.

When log records are received from logging clients, they are passed to the AIO_Output_Handler using the following put() method reimplemented from ACE_Task :

 int AIO_Output_Handler::put (ACE_Message_Block *mb,                               ACE_Time_Value *timeout) {    if (can_write_) { start_write (mb); return 0; }    return putq (mb, timeout);  } 

If there's no write() operation in progress, call start_write() to immediately initiate sending of the specified ACE_Message_Block . If a write() can't be started now, we queue the message for later.

Ru-Brd


C++ Network Programming
C++ Network Programming, Volume I: Mastering Complexity with ACE and Patterns
ISBN: 0201604647
EAN: 2147483647
Year: 2002
Pages: 65

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net