8.5 The ACE_Proactor Class

Ru-Brd

Motivation

Asynchronous I/O operations are handled in two steps: initiation and completion . Since multiple steps and classes are involved, there must be a way to demultiplex the completion events and efficiently associate each completion event with the operation that completed and the completion handler that will process the result. The diversity of OS asynchronous I/O facilities plays a deeper role here than in the reactive I/O model because

  • Platforms have different ways to receive completion notifications. For example, Windows uses I/O completion ports or events, whereas POSIX.4 AIO uses real-time signals or the aio_suspend() system function to wait for a completion.

  • Platforms use different data structures to maintain state information for asynchronous I/O operations. For example, Windows uses the OVERLAPPED structure, whereas POSIX.4 AIO uses struct aiocb .

Thus, the chain of knowledge concerning platform-specific mechanisms and data structures runs from initiation operations through dispatching and into completion handling. In addition to being complicated and hard to reimplement continually, it's easy to tightly couple proactive I/O designs. To resolve these issues and provide a portable and flexible completion event demultiplexing and dispatching facility, the ACE Proactor framework defines the ACE_Proactor class.

Class Capabilities

ACE_Proactor implements the Facade pattern [GoF] to define an interface that applications can use to access the various ACE Proactor framework features portably and flexibly. This class provides the following capabilities:

  • It centralizes event loop processing in a proactive application.

  • It dispatches timer expirations to their associated ACE_Handler objects.

  • It demultiplexes completion events to completion handlers and dispatches the appropriate hook methods on completion handlers that then perform application-defined processing in response to the completion events.

  • It can decouple the thread(s) performing completion event detection, demultiplexing, and dispatching from thread(s) initiating asynchronous operations.

  • It mediates between classes that initiate I/O operations and platform-specific asynchronous I/O implementation details.

The interface for ACE_Proactor is shown in Figure 8.8 (page 288). This class has a rich interface that exports all the features in the ACE Proactor framework. We therefore group its method descriptions into the four categories described below.

Figure 8.8. The ACE_Proactor Class

1. Life cycle management methods. The following methods initialize, destroy, and access an ACE_Proactor :

Method

Description

ACE_Proactor() open ()

These methods initialize proactor instance.

ACE_Proactor() close()

These methods clean up the resources allocated when a proactor was initialized .

instance()

A static method that returns a pointer to a singleton ACE_Proactor , which is created and managed by the Singleton pattern [GoF] combined with the Double-Checked Locking Optimization [POSA2].

The ACE_Proactor can be used in two ways:

  • As a singleton [GoF] via the instance() method shown in the table above.

  • By instantiating one or more instances. This capability can be used to support multiple proactors within a process. Each proactor is often associated with a thread running at a particular priority [Sch98].

2. Event loop management methods. Inversion of control is supported by the ACE Proactor framework. Similar to the ACE Reactor framework, ACE_Proactor implements the following event loop methods that control application completion handler dispatching:

Method

Description

handle_events()

Waits for completion events to occur and then dispatches the associated completion handler(s). A timeout parameter can limit the time spent waiting for an event.

proactor_run_event_loop()

Calls the handle_events() method repeatedly until it fails, proactor_event_loop_done() returns true, or an optional timeout occurs.

proactor_end_event_loop()

Instructs a proactor to shut down its event loop.

proactor_event_loop_done()

Returns 1 when the proactor's event loop has been ended, for example, via a call to proactor_end_event_loop() .

The ACE Proactor event loop is separate from that provided by the ACE Reactor framework. To use both ACE Reactor and ACE Proactor event loops in the same application and remain portable across all asynchronous I/O platforms, the two event loops must be executed in separate threads. However, the Windows implementation of ACE_Proactor can register its I/O completion port handle with an ACE_WFMO_Reactor instance to tie the two event loop mechanisms together, allowing them to both be used in one thread. Sidebar 58 (page 290) describes how to do this.

3. Timer management methods. By default, ACE_Proactor uses the ACE_Timer_Heap timer queue mechanism described in Section 3.4 to schedule and dispatch event handlers in accordance to their timeout deadlines. The timer management methods exposed by the ACE_Proactor include:

Method

Description

schedule_timer()

Registers an ACE_Handler that will be dispatched after a user -specified amount of time

cancel_timer()

Cancels one or more timers that were previously registered

When a timer expires , ACE_Handler::handle_time_out() (page 271) is dispatched on the registered handler.

4. I/O operation facilitator methods. The ACE_Proactor class has visibility into the platform's asynchronous I/O implementation details that are useful for operation initiation and completion event processing. Making ACE_Proactor the central mediator of platform-specific knowledge prevents coupling between the classes in the ACE Proactor framework. In particular, the ACE Proactor framework uses the Bridge pattern [GoF].

ACE_Asynch_Read_Stream and ACE_Asynch_Write_Stream use the Bridge pattern to access flexible implementations of their I/O operation factories that are specific to the OS platform. Since ACE_Proactor is the mediator of this platform-specific knowledge, it defines the following methods used by the ACE_Asynch_Read_Stream and ACE_Asynch_Write_Stream classes:

Method

Description

create_asynch_read_stream()

Create an instance of a platform-specific subclass of ACE_Asynch_Read_Stream_Impl appropriate for initiating asynchronous read() operations.

create_asynch_write_stream()

Create an instance of a platform-specific subclass of ACE_Asynch_Write_Stream_Impl appropriate for initiating asynchronous write() operations.

As seen in Figure 8.8, the ACE_Proactor class refers to an object of type ACE_Proactor_Impl , similar to the design of ACE Reactor framework shown in Figure 4.1 (page 89). All work dependent on platform-specific mechanisms is forwarded to the Proactor implementation class for handling. We briefly describe the platform-specific ACE Proactor framework implementations below.

The ACE_WIN32_Proactor Class

ACE_WIN32_Proactor is the ACE_Proactor implementation on Windows. This class works on Windows NT 4.0 and newer Windows platforms, such as Windows 2000 and Windows XP. It doesn't work on Windows 95, 98, Me, or CE, however, since these platforms don't support asynchronous I/O.

Implementation overview. ACE_WIN32_Proactor uses an I/O completion port for completion event detection. When initializing an asynchronous operation factory, such as ACE_Asynch_Read_Stream or ACE_Asynch_Write_Stream , the I/O handle is associated with the Proactor's I/O completion port. In this implementation, the Windows GetQueuedCompletionStatus() function paces the event loop.

Sidebar 58: Integrating Proactor and Reactor Events on Windows

The ACE Reactor and ACE Proactor event loops require different event detection and demultiplexing mechanisms. As a result, they are often executed in separate threads. On Windows, however, ACE provides a way to integrate the two event loop mechanisms so they can both be driven by a single thread. The advantage of using a single event-loop thread is that it can simplify application-defined event handlers and completion handlers, since they may no longer require synchronizers to prevent race conditions.

The ACE_Proactor Windows implementation uses an I/O completion port to detect completion events. When one or more asynchronous operations complete, Windows signals the corresponding I/O completion port handle. This handle can therefore be registered with an instance of ACE_WFMO_Reactor (Chapter 4). Using this scheme, the ACE_WFMO_Reactor dispatches the I/O completion port's "signaled" event to the ACE_Proactor , which in turn dispatches completion events and returns to the reactor's event loop.

To use the scheme outlined above, an application must instantiate an ACE_Proactor with a particular set of nondefault options. The following code fragment shows how to accomplish this, and should be executed immediately at program startup:

 1 ACE_Proactor::close_singleton ();  2 ACE_WIN32_Proactor *impl = new ACE_WIN32_Proactor (0, 1);  3 ACE_Proactor::instance (new ACE_Proactor (impl, 1), 1);  4 ACE_Reactor::instance ()->register_handler  5   (impl, impl->get_handle ());  // ... Other registration and initiation code omitted.  6 ACE_Reactor::instance ()->run_reactor_event_loop ();  7 ACE_Reactor::instance ()->remove_handler  8   (impl->get_handle (), ACE_Event_Handler::DONT_CALL); 

Line 1 Close the existing proactor singleton.

Line 2 Creates a Windows-specific proactor implementation. The second argument specifies that this proactor will be used with a reactor.

Line 3 Create a new ACE_Proactor for the reactor-enabled implementation and make it the singleton. The second argument to ACE_Proactor says to delete the implementation when the ACE_Proactor is closed. The second argument to instance() says to delete the ACE_Proactor object when it's closed.

Lines 4 “6 Register the I/O completion port handle with the reactor and run the reactor event loop.

Lines 7 “8 After the event loop ends, remove the proactor's I/O completion port from the reactor.

All the Result classes defined for use with the ACE_WIN32_Proactor are derived from the Windows OVERLAPPED structure. Additional information is added to each depending on the operation being performed. When GetQueuedCompletionStatus() returns a pointer to the completed operation's OVERLAPPED structure, the ACE_WIN32_Proactor converts it to a pointer to a Result object. This design allows completions to be dispatched efficiently to the correct ACE_Handler -derived completion handler when I/O operations complete. The Implementation section of the Proactor pattern in POSA2 illustrates how to implement a proactor using the Windows asynchrony mechanisms and the Asynchronous Completion Token pattern.

Concurrency considerations. Multiple threads can execute the event loop of an ACE_WIN32_Proactor simultaneously . Since all event registration and dispatching is handled by the I/O completion port mechanism, and not by the ACE_WIN32_Proactor itself, there's no need to synchronize access to registration- related data structures, as in the ACE_WFMO_Reactor implementation (page 106).

The timer queue expiry management is handled in a separate thread that's managed by the ACE_WIN32_Proactor . When a timer expires, the timeout mechanism uses the PostQueuedCompletionStatus() function to post a completion to the proactor's I/O completion port. This design cleanly integrates the timer mechanism with the normal completion dispatching mechanism. It also ensures that only one thread is awakened to dispatch the timer since all completion-detection threads wait only for completion events and needn't worry about waking up for a scheduled timer expiration.

The ACE_POSIX_Proactor Class

The ACE Proactor implementations on POSIX systems present multiple mechanisms for initiating I/O operations and detecting their completions. Moreover, Sun's Solaris Operating Environment offers its own proprietary version of asynchronous I/O. On Solaris 2.6 and above, the performance of the Sun-specific asynchronous I/O functions is significantly higher than that of Solaris's POSIX.4 AIO implementation. To take advantage of this performance improvement, ACE also encapsulates this mechanism in a separate set of classes.

Implementation overview. The POSIX implementations of asynchronous I/O use a control block ( struct aiocb ) to identify each asynchronous I/O request and its controlling information. Each aiocb can be associated with only one I/O request at a time. The Sun-specific asynchronous I/O uses an additional structure named aio_result_t .

Although the encapsulated POSIX asynchronous I/O mechanisms support read() and write() operations, they don't support any TCP / IP connection-related operations. To support the functions of ACE_Asynch_Acceptor , and ACE_Asynch_Connector ,a separate thread is used to perform connection-related operations. This asynchronous connection emulation is described in Sidebar 57 (page 283).

The three variants of the ACE_POSIX_Proactor implementation are described in the following table:

ACE Proactor Variant

Description

ACE_POSIX_AIOCB_Proactor

This implementation maintains a parallel list of aiocb structures and Result objects. Each outstanding operation is represented by an entry in each list. The aio_suspend() function suspends the event loop until one or more asynchronous I/O operations complete.

ACE_POSIX_SIG_Proactor

This implementation is derived from ACE_POSIX_AIOCB_Proactor , but uses POSIX real-time signals to detect asynchronous I/O completion. The event loop uses the sigtimedwait() and sigwaitinfo() functions to pace the loop and retrieve information about completed operations. Each asynchyronous I/O operation started using this proactor has a unique value associated with its aiocb that's communicated with the signal noting its completion. This design makes it easy to locate the aiocb and its parallel Result object, and dispatch the correct completion handler.

ACE_SUN_Proactor

This implementation is also based on ACE_POSIX_AIOCB_Proactor , but it uses the Sun-specific asynchronous I/O facility instead of the POSIX.4 AIO facility. This implementation works much like ACE_POSIX_AIOCB_Proactor , but uses the Sun-specific aiowait() function to detect I/O completions.

Concurrency considerations. The ACE_POSIX_SIG_Proactor is the default proactor implementation on POSIX.4 AIO-enabled platforms. Its completion event demultiplexing mechanism uses the sigtimedwait() function. Each ACE_POSIX_SIG_Proactor instance can specify the set of signals to use with sigtimedwait() .To use multiple threads at different priorities with different ACE_POSIX_SIG_Proactor instances, therefore, each instance should use a different signal, or set of signals.

Limitations and characteristics of some platforms directly affect which ACE_POSIX_Proactor implementation can be used. On Linux, for instance, threads are actually cloned processes. Since signals can't be sent across processes, and asynchronous I/O operations and Proactor timer expirations are both implemented using threads, the ACE_POSIX_SIG_Proactor doesn't work well on Linux. Thus, ACE_POSIX_AIOCB_Proactor is the default proactor implementation on Linux. The aio_suspend() demultiplexing mechanism used in ACE_POSIX_AIOCB_Proactor is thread safe, so multiple threads can run its event loop simultaneously.

Example

The AIO_CLD_Connector reconnection mechanism was mentioned in the discussion of the AIO_Output_Handler::handle_read_stream() method (page 276). The reconnection mechanism is initiated using the AIO_CLD_Connector:: reconnect () method below:

 int reconnect (void) { return connect (remote_addr_); } 

This method simply initiates a new asynchronous connection request to the server logging daemon. An exponential backoff strategy is used to avoid continually initiating connection attempts when, for example, there's no server logging daemon listening. We use the following validate_connection() hook method to insert application-defined behavior into ACE_Asynch_Connector 's connection completion handling. This method learns the disposition of each asynchronous connection request and schedules a timer to retry the connect if it failed. If the connect succeeded, this method executes the SSL authentication with the server logging daemon.

 1 int AIO_CLD_Connector::validate_connection   2       (const ACE_Asynch_Connect::Result &result,   3        const ACE_INET_Addr &remote, const ACE_INET_Addr &) {   4   remote_addr_ = remote;   5   if (!result.success ()) {   6     ACE_Time_Value delay (retry_delay_);   7     retry_delay_ *= 2;   8     if (retry_delay_ > MAX_RETRY_DELAY)   9       retry_delay_ = MAX_RETRY_DELAY;  10     proactor ()->schedule_timer (*this, 0, delay);  11     return -1;  12   }  13   retry_delay_ = INITIAL_RETRY_DELAY;  14  15   if (ssl_ctx_ == 0) {  16     OpenSSL_add_ssl_algorithms ();  17     ssl_ctx_ = SSL_CTX_new (SSLv3_client_method ());  18     if (ssl_ctx_ == 0) return -1;  19  20     if (SSL_CTX_use_certificate_file (ssl_ctx_,  21                                       CLD_CERTIFICATE_FILENAME,  22                                       SSL_FILETYPE_PEM) <= 0  23         SSL_CTX_use_PrivateKey_file (ssl_ctx_,  24                                        CLD_KEY_FILENAME,  25                                        SSL_FILETYPE_PEM) <= 0  26         !SSL_CTX_check_private_key (ssl_ctx_)) {  27       SSL_CTX_free (ssl_ctx_);  28       ssl_ctx_ = 0;  29       return -1;  30     }  31     ssl_ = SSL_new (ssl_ctx_);  32     if (ssl_ == 0) {  33       SSL_CTX_free (ssl_ctx_); ssl_ctx_ = 0;  34       return -1;  35     }  36   }  37  38   SSL_clear (ssl_);  39   SSL_set_fd  40     (ssl_, ACE_reinterpret_cast (int, result.connect_handle()));  41  42   SSL_set_verify (ssl_, SSL_VERIFY_PEER, 0);  43  44   if (SSL_connect (ssl_) == -1  45        SSL_shutdown (ssl_) == -1) return -1;  46   return 0;  47 } 

Line 4 Save the peer address we tried to connect to so it can be reused for future connection attempts, if needed.

Lines 5 “12 If the connect operation failed, set a timer to retry the connection later, then double the retry timer, up to the number of seconds specified by MAX _ RETRY _ DELAY .

Line 13 If the connection succeeded, we reset the retry_delay_ back to its initial value for the next reconnection sequence.

Lines 15 “46 The rest of validate_connection() is similar to TPC_Logging_Acceptor::open() (page 224), so we don't explain it here. If the SSL authentication fails, validate_connection() returns -1, causing ACE_Asynch_Connector to close the new connection before opening a service handler for it. Note that the SSL function calls on lines 43and 44 are synchronous, and therefore the proactor event loop is not processing completion events while these calls are being made. As with the Reactor framework, developers must be aware of this type of delay in a callback method.

When a timer set by the handle_connection() method expires, the following handle_time_out() hook method is called by the ACE Proactor framework:

 void AIO_CLD_Connector::handle_time_out (const ACE_Time_Value &,                                           const void *)  { connect (remote_addr_); } 

This method simply initiates another asynchronous connect() attempt which will trigger another call to validate_connection() , regardless of whether it succeeds or fails.

The AIO client logging daemon service is represented by the following AIO_Client_Logging_Daemon class:

 class AIO_Client_Logging_Daemon: public ACE_Task<ACE_NULL_SYNCH> {  protected:    ACE_INET_Addr cld_addr_; // Our listener address.    ACE_INET_Addr sld_addr_; // The logging server's address.    // Factory that passively connects the <AIO_Input_Handler>.    AIO_CLD_Acceptor acceptor_;  public:    // Service Configurator hook methods.    virtual int init (int argc, ACE_TCHAR *argv[]);    virtual int fini ();    virtual int svc (void);  }; 

This class is similar to the AC_Client_Logging_Daemon class (page 251). The primary difference is that AIO_Client_Logging_Daemon spawns a new thread to run the proactor event loop that the service depends on, whereas AC_Client_Logging_Daemon relies on the main program thread to run the singleton reactor's event loop. To activate this thread easily, AIO_Client_Logging_Daemon derives from ACE_Task rather than ACE_Service_Config .

We start a new thread for the proactor event loop because we do still rely on the reactor event loop for service reconfiguration activity as described in Chapter 5. If our service was designed solely for Windows, we could integrate the proactor and reactor event loops, as described in Sidebar 58 (page 290). However, this client logging daemon implementation is portable to all AIO-enabled ACE platforms. After our AIO_Client_Logging_Daemon::init() method processes the argument list and forms addresses, it then calls ACE_Task::activate() to spawn a thread running the following svc() method:

 1   int AIO_Client_Logging_Daemon::svc (void) {  2   if (acceptor_.open (cld_addr_) == -1) return -1;  3   if (CLD_CONNECTOR::instance ()->connect (sld_addr_) == 0)  4     ACE_Proactor::instance ()->proactor_run_event_loop ();  5   acceptor_.close ();  6   CLD_CONNECTOR::close ();  7   OUTPUT_HANDLER::close ();  8   return 0;  9 } 

Lines 2 “3 Initialize the acceptor_ object to begin listening for logging client connections and initiate the first connection attempt to the server logging daemon.

Line 4 Call ACE_Proactor::proactor_run_event_loop() to handle asynchronous operation completions. The proactor event loop is terminated when the service is shut down via the following fini() method:

 int AIO_Client_Logging_Daemon::fini () {    ACE_Proactor::instance ()->proactor_end_event_loop ();    wait ();    return 0;  } 

This method calls ACE_Proactor::proactor_end_event_loop() which ends the event loop in the svc() method. It then calls ACE_Task::wait() to wait for the svc() method to complete and exit its thread.

Lines 5 “7 Close all open connections and singleton objects and exit the svc() thread.

Lastly, we add the necessary ACE _ FACTORY _ DEFINE macro to generate the service's factory function:

 ACE_FACTORY_DEFINE (AIO_CLD, AIO_Client_Logging_Daemon) 

Our new proactive client logging daemon service uses the ACE Service Configurator framework to configure itself into any main program, such as the Configurable_Logging_Server (page 147) by including the following entry in a svc.conf file:

 dynamic AIO_Client_Logging_Daemon Service_Object *  AIO_CLD:_make_AIO_Client_Logging_Daemon()    "-p $CLIENT_LOGGING_DAEMON_PORT" 
Ru-Brd


C++ Network Programming
C++ Network Programming, Volume I: Mastering Complexity with ACE and Patterns
ISBN: 0201604647
EAN: 2147483647
Year: 2002
Pages: 65

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net