8.3 The ACE_Handler Class

Ru-Brd

Motivation

A chief differentiator between the proactive and reactive I/O models is that proactive I/O initiation and completion are distinct steps that occur separately. Moreover, these two steps may occur in different threads of control. Using separate classes for the initiation and completion processing avoids unnecessarily coupling the two. Section 8.2 described the ACE_Asynch_Read_Stream and ACE_Asynch_Write_Stream classes used to initiate asynchronous I/O operations; this section focuses on I/O completion handling.

Completion events signify that a previously initiated I/O operation has finished. To process the result of the I/O operation correctly and efficiently , a completion handler must know all of the arguments specified for the I/O operation, in addition to the result. Together, this information includes

  • What type of operation was initiated

  • Whether or not the operation completed successfully

  • The error code, if the operation failed

  • The I/O handle that identifies the communication endpoint

  • The memory address for the transfer

  • The requested and actual number of bytes transferred

Asynchronous I/O completion processing requires more information than is available to callback methods in the ACE Reactor framework. The ACE_Event_Handler class presented in Section 3.3 is therefore not suitable for use in the ACE Proactor framework. Since completion handling also depends on the asynchronous I/O mechanism offered by the underlying OS platform, it has the same portability issues discussed in Section 8.2. Addressing these issues in each application is unnecessarily tedious and costly, which is why the ACE Proactor framework provides the ACE_Handler class.

Class Capabilities

ACE_Handler is the base class of all asynchronous completion handlers in the ACE Proactor framework. This class provides the following capabilities:

  • It provides hook methods to handle completion of all asynchronous I/O operations defined in ACE, including connection establishment and I/O operations on an IPC stream.

  • It provides a hook method to handle timer expiration.

The interface for ACE_Handler is shown in Figure 8.5 (page 272) and its key methods are shown in the following table:

Method

Description

handle()

Obtains the handle used by this object

handle_read_stream()

Hook method called on completion of a read() operation initiated by ACE_Asynch_Read_Stream

handle_write_stream()

Hook method called on completion of a write() operation initiated by ACE_Asynch_Write_Stream

handle_time_out()

Hook method called when a timer scheduled via ACE_Proactor expires

Figure 8.5 The ACE_Handler Class
  ACE_Handler  # proactor_ : ACE_Proactor *  +  handle () : ACE_HANDLE  +  handle_read_stream (result :   const ACE_Asynch_Read_Stream::Result &)  +  handle_write_stream (result :   const ACE_Asynch_Write_Stream::Result &)  +  handle_time_out (tv : const ACE_Time_Value &,   act : const void *)  +  handle_accept (result :   const ACE_Asynch_Accept::Result &)  +  handle_connect (result :   const ACE_Asynch_Connect::Result &)  

The handle_time_out() method is called when a timer scheduled with an ACE_Proactor expires. Its tv argument is the absolute time of day that the timer was scheduled to expire. The actual time may be different depending on activity level and dispatched handler delays. Note that this behavior is slightly different than the ACE_Time_Value passed to ACE_Event_Handler::handle_timeout() (page 49), which is the actual time of day the timer event hook method was dispatched by the ACE proactor framework.

The handle_read_stream() and handle_write_stream() hook methods are called with a reference to the Result object associated with an asynchronous operation that has completed. The most useful Result object methods that are accessible from handle_read_stream() and handle_write_stream() are shown in context in Figure 8.2 (page 264) and listed in the following table:

Method

Description

success()

Indicates whether or not the asynchronous operation succeeded

handle()

Obtains the I/O handle used by the asynchronous I/O operation

message_block()

Obtains a reference to the ACE_Message_Block used in the operation

bytes_transferred()

Indicates how many bytes were actually transferred in the asynchronous operation

bytes_to_read()

Indicates how many bytes were requested for an asynchronous read() operation

bytes_to_write()

Indicates how many bytes were requested for an asynchronous write() operation

Example

All previous logging daemons, both client and server, have used the Logging_Handler class developed in Chapter 4 of C++NPv1 to receive log records from logging clients . The Logging_Handler::recv_log_record() method uses synchronous input operations to receive a log record. Synchronous input operations are relatively straightforward to program since the activation record of the receiving thread's run-time stack can be used to store bookkeeping information and data fragments .

In contrast, asynchronous input handling is harder to program since the bookkeeping details and data fragments must be managed explicitly, rather than implicitly on the run-time stack. In this example, the new AIO_Input_Handler class receives log records from logging clients by initiating asynchronous read() operations and assembling the data fragments into log records that are then forwarded to the server logging daemon. This class uses the proactive I/O model and asynchronous input operations to achieve maximum concurrency across all logging clients using a single thread of control.

As shown in Figure 8.3 (page 267), the AIO_Input_Handler class is a descendant of ACE_Handler (Section 8.4 discusses ACE_Service_Handler , which derives from ACE_Handler ) and is defined as follows :

 class AIO_Input_Handler : public ACE_Service_Handler {  public:    AIO_Input_Handler (AIO_CLD_Acceptor *acc = 0)      : acceptor_ (acc), mblk_ (0) {}    virtual AIO_Input_Handler ();    // Called by <ACE_Asynch_Acceptor> when a client connects.    virtual void open (ACE_HANDLE new_handle,                       ACE_Message_Block &message_block);  protected:    enum { LOG_HEADER_SIZE = 8 };   // Length of CDR header.    AIO_CLD_Acceptor *acceptor_;    // Our creator.    ACE_Message_Block *mblk_;       // Block to receive log record.    ACE_Asynch_Read_Stream reader_; // Asynchronous read() factory.    // Handle input from logging clients.    virtual void handle_read_stream      (const ACE_Asynch_Read_Stream::Result &result);  }; 

The logging client sends each log record in CDR format, starting with a fixed-length header (Section 4.4.2 in C++NPv1 presents complete details on the CDR marshaling and log record format). The header has an ACE_CDR::Boolean to indicate the byte order and an ACE_CDR::ULong holding the length of the payload following the header. From CDR encoding and alignment rules, we know this to be 8 bytes, so we define the LOG_HEADER_SIZE enumerator as 8.

When a logging client connects to the client logging daemon, the following open() hook method is dispatched by the ACE Proactor framework:

 void AIO_Input_Handler::open      (ACE_HANDLE new_handle, ACE_Message_Block &) {    reader_.open (*this, new_handle, 0, proactor ());    ACE_NEW_NORETURN      (mblk_, ACE_Message_Block (ACE_DEFAULT_CDR_BUFSIZE));    ACE_CDR::mb_align (mblk_);    reader_.read (*mblk_, LOG_HEADER_SIZE);  } 

This method allocates an ACE_Message_Block to receive the log record header from the client. Rather than simply allocating just enough for the header, the message will be large enough for the entire record in many cases. Moreover, it can be resized when needed. The block's write pointer is aligned correctly for CDR demarshaling, and then passed to an asynchronous read() operation that's initiated to receive the header.

When the read() operation completes, the following completion handler method is invoked by the ACE Proactor framework:

 1 void AIO_Input_Handler::handle_read_stream   2     (const ACE_Asynch_Read_Stream::Result &result) {   3   if (!result.success ()  result.bytes_transferred () == 0)   4     delete this;   5   else if (result.bytes_transferred() < result.bytes_to_read())   6     reader_.read (*mblk_, result.bytes_to_read () -  7                           result.bytes_transferred ());   8   else if (mblk_->length () == LOG_HEADER_SIZE) {   9     ACE_InputCDR cdr (mblk_);  10  11     ACE_CDR::Boolean byte_order;  12     cdr >> ACE_InputCDR::to_boolean (byte_order);  13     cdr.reset_byte_order (byte_order);  14  15     ACE_CDR::ULong length;  16     cdr >> length;  17  18     mblk_->size (length + LOG_HEADER_SIZE);  19     reader_.read (*mblk_, length);  20   }  21   else {  22     if (OUTPUT_HANDLER::instance ()->put (mblk_) == -1)  23       mblk_->release ();  24  25     ACE_NEW_NORETURN  26       (mblk_, ACE_Message_Block (ACE_DEFAULT_CDR_BUFSIZE));  27     ACE_CDR::mb_align (mblk_);  28     reader_.read (*mblk_, LOG_HEADER_SIZE);  29   }  30 } 

Lines 3 “4 Delete this object if read() failed or if the peer logging client closed the connection. AIO_Input_Handler 's destructor releases resources to prevent leaks.

Lines 5 “7 If fewer bytes were received than requested, initiate another asynchronous read() operation to receive the remaining bytes. We needn't adjust any pointers when requesting that more data be added to the message block. Sidebar 55 describes how the ACE Proactor framework manages ACE_Message_Block pointers automatically.

Lines 8 “19 If all requested data was received, and it's the size of a log record header, we received the header, not payload. This test is safe since the data portion of a log record is always larger than the header. We then use the ACE_InputCDR class to demarshal the header, yielding the number of payload bytes in the record. mblk_ is resized to accomodate the remaining data and an asynchronous read() is initiated to receive the payload. This method will be called again upon completion and, if necessary, will continue initiating asynchronous read() operations until the complete record is received or an error occurs.

Lines 22 “23 When the complete log record is received, it's forwarded via the AIO_Output_Handler::put() method (page 270). After forwarding the log record, AIO_Output_Handler will release the message block. If the record can't be forwarded, however, the message is released immediately, discarding the log record.

Lines 25 “28 Allocate a new ACE_Message_Block object for the next log record and initiate an asynchronous read() operation to receive its header.

Sidebar 55: How ACE_Message_Block Pointers are Managed

When initiating an asynchronous read() or write() operation, the request must specify an ACE_Message_Block to either receive or supply the data. The ACE Proactor framework's completion handling mechanism updates the ACE_Message_Block pointers to reflect the amount of data read or written as follows:

Operation

Pointer Usage

Read

The initial read buffer pointer is the message's wr_ptr() . At completion, the wr_ptr is advanced by the number of bytes read.

Write

The initial write buffer pointer is the message's rd_ptr() . At completion, the rd_ptr is advanced by the number of bytes written.

It may seem counterintuitive to use the write pointer for reads and the read pointer for writes . It may therefore help to consider that when reading data, it's being written into the message block. Similarly, when writing data, it's being read from the message block. Upon completion, the updated length of data in the ACE_Message_Block is larger for reads (because the write pointer has advanced) and smaller for writes (because the read pointer has advanced).

The AIO_Output_Handler::start_write() method (page 269) initiates an asynchronous write() operation to send a log record to the server logging daemon. When a write() completes, the following method is called by the ACE Proactor framework.

 1 void AIO_Output_Handler::handle_write_stream   2        (const ACE_Asynch_Write_Stream::Result &result) {   3   ACE_Message_Block &mblk = result.message_block ();   4   if (!result.success ()) {   5     mblk.rd_ptr (mblk.base ());   6     ungetq (&mblk);   7   }   8   else {   9     can_write_ = handle () == result.handle ();  10     if (mblk.length () == 0) {  11       mblk.release ();  12       if (can_write_) start_write ();  13     }  14     else if (can_write_) start_write (&mblk);  15     else { mblk.rd_ptr (mblk.base ()); ungetq (&mblk); }  16   }  17 } 

Lines 4 “7 If the write() operation failed, reset the message block's rd_ptr() to the beginning of the block. We assume that if a write() fails, the socket is closed and will be reconnected later, at which point the message block will be resent from the beginning.

Line 9 Re-enable can_write_ only if the socket hasn't been closed. If the socket closes while asynchronous write() operations are queued or in progress, there is no guarantee that the OS will deliver the completions in a given order. Thus, we check the handle, which is set to ACE _ INVALID _ HANDLE when the socket close is handled.

Lines 10 “13 If the entire block has been written, release it. If another write() is possible, initiate it.

Line 14 The write() succeeded, but only part of the message was sent. If the socket is still connected, initiate an asynchronous write() to send the rest of the message.

Line 15 The message block was partially written, but the socket has been disconnected, so rewind the message's rd_ptr() to the beginning of the block and put it back on the message queue to be resent later.

If a write() operation fails, we don't try to clean up the socket since it has an outstanding read() operation to detect this problem. When the server logging daemon closes the socket, the asynchronous read() operation started in AIO_Output_Handler::open() (page 268) completes. Upon completion, the following method is called by the ACE Proactor framework:

 1 void AIO_Output_Handler::handle_read_stream  2        (const ACE_Asynch_Read_Stream::Result &result) {  3   result.message_block ().release ();  4   writer_.cancel ();  5   ACE_OS::closesocket (result.handle ());  6   handle (ACE_INVALID_HANDLE);  7   can_write_ = 0;  8   CLD_CONNECTOR::instance ()->reconnect ();  9 } 

Lines 3 “5 Release the ACE_Message_Block that was allocated for the read() operation, cancel any outstanding write() operations, and close the socket.

Lines 6 “8 Since the connection is now closed, set the handle to ACE _ INVALID _ HANDLE and reset the can_write_ flag to prevent any log record transmissions from being initiated until after we reestablish the server connection (the reconnection mechanism is explained on page 293). Section 8.4 discusses how to passively and actively establish connections in the ACE Proactor framework.

Figure 8.6 shows the sequence of events that occur when receiving a log record from a logging client and forwarding it to the server logging daemon.

Figure 8.6. Sequence of Events to Forward a Log Record

Ru-Brd


C++ Network Programming
C++ Network Programming, Volume I: Mastering Complexity with ACE and Patterns
ISBN: 0201604647
EAN: 2147483647
Year: 2002
Pages: 65

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net