Ru-Brd |
8.2 The Asynchronous I/O Factory ClassesMotivationThe proactive I/O model is generally harder to program than reactive and synchronous I/O models because
The proactive I/O model therefore requires a factory to initiate asynchronous I/O operations. Since multiple I/O operations can execute simultaneously and complete in any order, the proactive model also requires an explicit binding between each asynchronous operation, its parameters (such as the I/O handle, data buffer, and buffer size), and the completion handler that will process the results of the operation. In theory, designing classes to generate asynchronous I/O operations and bind them to their completion handlers should be relatively straightforward. In practice, however, the design is complicated by the fact that asynchronous I/O is implemented in different ways across today's popular OS platforms. Two common examples include:
Each platform's asynchronous I/O facility also includes its own mechanism for binding an I/O operation with its parameters, such as buffer pointer and transfer size. For example, POSIX AIO provides an AIO control block ( aiocb ), whereas Windows provides the OVERLAPPED structure and a completion key argument to the I/O completion port facility. Sidebar 54 discusses other challenges with OS asynchronous I/O mechanisms.
All the asynchronous I/O mechanisms discussed above use an I/O handle to refer to the IPC channel or file on which to perform the I/O. The ACE Proactor framework defines a set of classes that initiate asynchronous I/O on various IPC mechanisms. These classes enhance portability, minimize complexity, and avoid reintroducing the I/O handle- related problems mastered in C++NPv1. This book examines the two most popular classes for networked applications, ACE_Asynch_Read_Stream and ACE_Asynch_Write_Stream . Class CapabilitiesACE_Asynch_Read_Stream and ACE_Asynch_Write_Stream are factory classes that enable applications to initiate portable asynchronous read() and write() operations, respectively. These classes provide the following capabilities:
ACE_Asynch_Read_Stream and ACE_Asynch_Write_Stream define nested Result classes to represent the binding between an operation and its parameters. The ACE Proactor framework abstracts common results-oriented behavior into the ACE_Asynch_Result class from which the nested Result classes derive. Together, this set of classes provides a completion handler with the following capabilities:
The interfaces for ACE_Asynch_Result , ACE_Asynch_Read_Stream , ACE_Asynch_Write_Stream and their Result nested classes are shown in Figure 8.2 (page 264). The following table lists the key methods in ACE_Asynch_Read_Stream : Figure 8.2. The ACE Asynchronous Read/Write Stream Class Relationships
The key methods in the ACE_Asynch_Write_Stream class are shown in the following table:
The open() methods bind the asynchronous I/O factory object to
The act parameter is an asynchronous completion token that will be associated with each I/O operation issued via the I/O factory object. When the operation completes, the act parameter can be retrieved using the act() method in ACE_Asynch_Result . This feature is specific to Windows, however, so we don't show its use in this book. The read() and write() methods use an ACE_Message_Block to receive into and send from, respectively, thereby providing the following benefits:
In contrast to the ACE IPC wrapper facades, such as ACE_SOCK_Stream described in C++NPv1, ACE_Asynch_Read_Stream and ACE_Asynch_Write_Stream don't encapsulate any underlying IPC mechanisms. Instead, they define the interface for initiating asynchronous I/O operations. This design yields the following benefits:
Therefore, networked applications written with ACE can use any combination of synchronous, reactive, and proactive I/O. The decision concerning which I/O mechanism to use can be made at compile time or run time if desired. In fact, the decision can be deferred until after the IPC object is set up! For example, an application may decide which mechanism to use after establishing a socket connection [HPS97]. The ACE Proactor framework also contains factory classes for initiating datagram I/O operations ( ACE_Asynch_Read_Dgram and ACE_Asynch_Write_Dgram ) and file I/O operations ( ACE_Asynch_Read_File , ACE_Asynch_Write_File , and ACE_Asynch_Transmit_File ). The designs and capabilities of these classes are similar to those described in this chapter. Please consult the online ACE documentation at http://ace.ece.uci.edu/Doxygen/ or http://www.riverace.com/docs/ for details. ExampleThis chapter reimplements the client logging daemon service from Chapter 7 using the ACE Proactor framework. Although the classes used in the proactive client logging daemon service are similar to those in the Acceptor-Connector version, the proactive version uses a single application thread to initiate and handle completions for all its I/O operations. The classes comprising the client logging daemon based on the ACE Proactor framework are shown in Figure 8.3. The role of each class is outlined below: Figure 8.3. Classes in the Proactive Client Logging Daemon
The interactions between instances of these classes are shown in Figure 8.4 (page 268). We'll start describing the AIO_Output_Handler class in this example and present the other classes in subsequent sections in this chapter. Figure 8.4. Interactions in the Proactive Client Logging Daemon
The source code for this example is in the AIO_Client_Logging_Daemon.cpp file. The AIO_Output_Handler class forwards log records to a server logging daemon. A portion of its class definition follows : class AIO_Output_Handler : public ACE_Task<ACE_NULL_SYNCH>, public ACE_Service_Handler { AIO_Output_Handler inherits from ACE_Task to reuse its ACE_Message_Queue . This queue is unsynchronized since all work in this service occurs in one application thread. Since AIO_Output_Handler derives from ACE_Service_Handler , it can process completion events (described in Section 8.3) and act as the target of the AIO_CLD_Connector asynchronous connection factory (described in Section 8.4). public: AIO_Output_Handler (): can_write_ (0) {} virtual AIO_Output_Handler (); // Entry point into the <AIO_Output_Handler>. virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); // Hook method called when server connection is established. virtual void open (ACE_HANDLE new_handle, ACE_Message_Block &message_block); protected: ACE_Asynch_Read_Stream reader_; // Detects connection loss. ACE_Asynch_Write_Stream writer_; // Sends records to server. int can_write_; // Safe to begin sending a log record? // Initiate the send of a log record. void start_write (ACE_Message_Block *mblk = 0); }; typedef ACE_Unmanaged_Singleton<AIO_Output_Handler, ACE_Null_Mutex> OUTPUT_HANDLER; AIO_Output_Handler contains an ACE_Asynch_Read_Stream to detect when the server connection closes . It also contains an ACE_Asynch_Write_Stream to initiate asynchronous write() operations that send log records to the server logging daemon. Since there's one TCP connection to the server logging daemon, we use a single AIO_Output_Handler to forward log records to the server logging daemon. We therefore define OUTPUT_HANDLER as an ACE_Unmanaged_Singleton (page 194). It's unmanaged to ensure that we control its life cycle since this service may be stopped and unlinked before the end of the program. The synchronization traits class is ACE_Null_Mutex because all access to the singleton occurs in one thread. When the server logging daemon connection is established, the following open() hook method is dispatched by the ACE Proactor framework: 1 void AIO_Output_Handler::open 2 (ACE_HANDLE new_handle, ACE_Message_Block &) { 3 ACE_SOCK_Stream temp_peer (new_handle); 4 int bufsiz = ACE_DEFAULT_MAX_SOCKET_BUFSIZ; 5 temp_peer.set_option (SOL_SOCKET, SO_SNDBUF, 6 &bufsiz, sizeof bufsiz); 7 8 reader_.open (*this, new_handle, 0, proactor ()); 9 writer_.open (*this, new_handle, 0, proactor ()); 10 11 ACE_Message_Block *mb; 12 ACE_NEW (mb, ACE_Message_Block (1)); 13 reader_.read (*mb, 1); 14 ACE_Sig_Action no_sigpipe ((ACE_SignalHandler) SIG_IGN); 15 no_sigpipe.register_action (SIGPIPE, 0); 16 can_write_ = 1; 17 start_write (0); 18 } Lines 3 “6 Increase the new socket's send buffer to its largest size to maximize throughput over long-delay and/or high-speed networks. We use a temporary ACE_SOCK_Stream object to set the buffer in a type-safe manner. Lines 8 “9 Initialize the reader_ and writer_ objects. Each specifies this object as the completion handler, the new socket handle to issue operations on, and the same ACE_Proactor used to open the connection. The completion key ( ACT ) facility isn't used, so the third argument is 0. After initialization, the reader_ and writer_ are used to initiate asynchronous I/O operations. Lines 11 “13 The client logging daemon in the Example part of Section 7.4 registered the server socket for input events in a reactor. In the proactive model, we initiate an asynchronous read() for one byte. If this operation ever completes, either the server sent data (which violates the protocol) or the server closed its socket endpoint. Lines 14 “15 As in other examples that rely on a READ event to detect a closed socket, ignore the SIGPIPE signal so that asynchronous write() operations won't abort the program if the connection is closed. Lines 16 “17 To avoid interleaving log records in the presence of partial writes, we only transmit one log record at a time. The can_write_ flag indicates whether it's safe to start writing a new log record or not. Since this connection is newly opened, it's now safe to write, so we set the flag and then call the following start_write() method to initiate an asynchronous write() operation: 1 void AIO_Output_Handler::start_write 2 (ACE_Message_Block *mblk) { 3 if (mblk == 0) { 4 ACE_Time_Value nonblock (0); 5 getq (mblk, &nonblock); 6 } 7 if (mblk != 0) { 8 can_write_ = 0; 9 if (writer_.write (*mblk, mblk->length ()) == -1) 10 ungetq (mblk); 11 } 12 } Lines 1 “6 The start_write() method can be called with one of two arguments
Lines 7 “11 If there's no log record available for sending, we return without doing anything. Otherwise, we reset can_write_ to 0 to prevent further message blocks from being sent until the current one is finished. The writer_ object initiates an asynchronous write() operation to send the message block. If the write() initiation fails, the message block is put back on the queue with the assumption that the socket is closed. The reconnection strategy (page 293) will trigger an asynchronous write() as soon as the connection is reestablished. When log records are received from logging clients, they are passed to the AIO_Output_Handler using the following put() method reimplemented from ACE_Task : int AIO_Output_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *timeout) { if (can_write_) { start_write (mb); return 0; } return putq (mb, timeout); } If there's no write() operation in progress, call start_write() to immediately initiate sending of the specified ACE_Message_Block . If a write() can't be started now, we queue the message for later. |
Ru-Brd |