4.2 The ACE_Select_Reactor Class

Ru-Brd

Motivation

As discussed in Chapter 5 of C++NPv1, a reactive server responds to events from one or more sources. Ideally, response to events is quick enough so that all requests appear to be processed simultaneously , although event processing is usually handled by a single thread. A synchronous event demultiplexer is at the heart of each reactive server. This demultiplexer mechanism detects and reacts to events originating from a number of sources, making the events available to the server synchronously, as part of its normal execution path .

The select() function is the most common synchronous event demultiplexer. This system function waits for specified events to occur on a set of I/O handles. [1] When one or more of the I/O handles become active, or after a designated amount of time elapses, select() returns. Its return value indicates the number of handles that are active, that the caller-specified time elapsed before an event occurred, or an error occurred. The caller can then take appropriate action. Additional coverage of select() is available in Chapter 6 of C++NPv1 and in [Ste98].

[1] The Windows version of select() works only on socket handles.

Although select() is available on most OS platforms, programming to the native select() C API requires developers to wrestle with many low-level details, such as

  • Setting and clearing fd_sets

  • Detecting events and responding to signal interrupts

  • Managing internal locks

  • Demultiplexing events to associated event handlers

  • Dispatching functions that process I/O, signal, and timer events

Chapter 7 of C++NPv1 discussed several wrapper facade classes that can be used to master many complexities associated with these low-level details. It's also useful, however, to use select() in environments where it's necessary to

  • Allow multiple threads to change the I/O handle sets used by the select() thread

  • Interrupt the select() function before events occur

  • Remove thread support overhead entirely, either because it isn't needed or because the platform or application configuration doesn't support multithreading

To address these issues systematically, the ACE Reactor framework defines the ACE_Select_Reactor class, which provides all the capabilities outlined above.

Class Capabilities

ACE_Select_Reactor is an implementation of the ACE_Reactor interface that uses the select() synchronous event demultiplexer function to detect I/O and timer events. In addition to supporting all the features of the ACE_Reactor interface, the ACE_Select_Reactor class provides the following capabilities:

  • It supports reentrant reactor invocations, where applications can call the handle_events() method from event handlers that are being dispatched by the same reactor.

  • It can be configured to be either synchronized or nonsynchronized, which trades off thread safety for reduced overhead.

  • It preserves fairness by dispatching all active handles in its handle sets before calling select() again.

The ACE_Select_Reactor is the default implementation of ACE_Reactor on all platforms except Windows, which uses the ACE_WFMO_Reactor for the reasons described in Sidebar 25 (page 105).

Implementation overview. ACE_Select_Reactor descends from ACE_Reactor_Impl , as shown in Figure 4.1 (page 89). It therefore serves as a concrete implementation of the ACE_Reactor . As shown in Figure 4.2, ACE_Select_Reactor is actually a typedef of the ACE_Select_Reactor_T template (the Concurrency considerations section on page 93discusses this implementation aspect further). The ACE_Select_Reactor_Impl class contains the data and methods that are independent of the template argument to ACE_Select_Reactor_T , which isolates them from the template argument-dependent factors and prevents them from being duplicated in each template instantiation. Sidebar 20 (page 92) explains how to change the number of event handlers managed by an instance of ACE_Select_Reactor .

Figure 4.2. The ACE_Select_Reactor Framework Internals

The ACE Reactor framework's notification mechanism (page 77) enables a reactor to process an open -ended number of event handlers and can be used to unblock a reactor from its event loop. By default, ACE_Select_Reactor implements its notification mechanism via an ACE_Pipe , which is a bidirectional IPC mechanism whose semantics are described in Sidebar 21 (page 93). The two ends of the pipe play the following roles:

  • The writer role. The ACE_Select_Reactor::notify() method exposes the writer end of the pipe to application threads, which use the notify() method to pass event handler pointers to an ACE_Select_Reactor via its notification pipe.

  • The reader role. The ACE_Select_Reactor registers the reader end of the pipe internally with a READ_MASK . When the reactor detects an event in the reader end of its notification pipe it wakes up and dispatches its notify handler to process a user -configurable number of event handlers from the pipe. The number of handlers dispatched is controlled by the max_notify_iterations() method (page 77).

Sidebar 17 (page 78) explains how to avoid deadlocks that can result from the fact that the buffer size of an ACE_Pipe is bounded. In addition to those application design tips, Sidebar 22 (page 94) describes another potential problem related to notifications; its solution also provides a way to enlarge the notification mechanism, which helps avoid deadlocks.

Sidebar 20: Controlling the Size of an ACE_Select_Reactor

The number of event handlers that can be managed by an ACE_Select_Reactor defaults to the value of the FD _ SETSIZE macro. FD _ SETSIZE is generally used by the OS to size the fd_set structures discussed in Chapter 7 of C++NPv1. Since the internals of ACE_Select_Reactor rely on fd_set , and FD _ SETSIZE controls its size, FD _ SETSIZE can play an important role in increasing the number of possible event handlers in ACE_Select_Reactor . This value can be controlled as follows :

  • To create an ACE_Select_Reactor that's smaller than the default size of FD _ SETSIZE , simply pass in the value to the ACE_Select_Reactor::open() method. This does not require recompilation of the ACE library.

  • To create an ACE_Select_Reactor that's larger than the default size of FD _ SETSIZE , you'll need to change the value of FD _ SETSIZE in your $ACE_ROOT/ace/config.h file and recompile the ACE library (and possibly your OS kernel and C library on some platforms). After recompiling and reinstalling the necessary libraries, you can then pass in the desired number of event handlers to the ACE_Select_Reactor::open() method. You should be fine as long as this value is less than or equal to the new FD _ SETSIZE and the maximum number of handles supported by the OS.

Although the steps described above make it possible to handle a large number of I/O handles per ACE_Select_Reactor , it's not necessarily a good idea since performance may suffer due to deficiencies with select() [BM98]. To handle a large numbers of handles, you might therefore consider using the ACE_Dev_Poll_Reactor (page 114) that's available on certain UNIX platforms. An alternative choice could be a design using asynchronous I/O based on the ACE Proactor framework discussed in Chapter 8 (available on Windows and certain UNIX platforms). Avoid the temptation to divide a large number of handles between multiple instances of ACE_Select_Reactor since one of the deficiencies stems from the need for select() to scan large fd_set structures, not ACE's use of select() .

Unlike the event handlers registered with a reactor, the handlers passed via a reactor's notification mechanism needn't be associated with I/O-based or timer-based events, which helps improve the flexibility and scalability of ACE_Select_Reactor . Likewise, this mechanism allows all event handler processing to be serialized in the reactor's main thread, which simplifies event handler implementations since they needn't be thread-safe. Figure 4.3 illustrates how ACE_Pipe is used within ACE_Select_Reactor .

Figure 4.3. The ACE_Select_Reactor Notification Mechanism

With events originating in numerous sources, both application-supplied (timers and I/O handles) and internal (notification pipe), it's important for an ACE_Select_Reactor to dispatch events to event handlers in an effective order. Years of experimentation and refinement resulted in the following order for event handler dispatching in the ACE_Select_Reactor::handle_events() method:

Sidebar 21: The ACE_Pipe Class

The ACE_Select_Reactor 's notification mechanism is implemented via the ACE_Pipe class, which provides a portable, bidirectional IPC mechanism that transfers data within an OS kernel. This wrapper facade class is implemented using a STREAMS pipe on modern UNIX platforms, a socketpair() on legacy UNIX platforms, or a connected TCP / IP socket on Windows platforms. After initializing an ACE_Pipe , applications can obtain its "read" and "write" handles via access methods and invoke I/O operations to receive and send data. These handles can also be included in ACE_Handle_Set objects passed to ACE::select() or to any reactor based on select() , such as the ACE_Select_Reactor or ACE_Priority_Reactor .

  1. Time-driven events

  2. Notifications

  3. Output I/O events

  4. Exception I/O events

  5. Input I/O events

Applications should generally not rely on the order in which the different types of events are dispatched since not all reactor implementations guarantee the same order. For example, the ACE_Dev_Poll_Reactor (page 114) might not dispatch notifications before I/O events. There are situations, however, where knowing the dispatching order of events is useful. For example, an event handler's handle_output() callback method may encounter an error writing to a socket because the peer application aborts the connection. In this case, it's likely that the socket is also ready for input and the handler's handle_input() callback will be invoked shortly, where common socket and handler cleanup can take place.

Sidebar 22: Enlarging ACE_Select_Reactor's Notification Mechanism

In some situations, it's possible that a notification queued to an ACE_Select_Reactor won't be delivered until after the desired event handler is destroyed . This delay stems from the time window between when the notify() method is called and the time when the reactor reacts to the notification pipe, reads the notification information from the pipe, and dispatches the associated callback. Although application developers can often work around this scenario and avoid deleting an event handler while notifications are pending, it's not always possible to do so.

ACE offers a way to change the ACE_Select_Reactor notification queueing mechanism from an ACE_Pipe to a user-space queue that can grow arbitrarily large. This alternate mechanism offers the following benefits:

  • Greatly expands the queueing capacity of the notification mechanism, also helping to avoid deadlock (see Sidebar 17 on page 78)

  • Allows the ACE_Reactor::purge_pending_notifications() method to scan the queue and remove desired event handlers

To enable this feature, add #define ACE_HAS_REACTOR_NOTIFICATION_QUEUE to your $ACE_ROOT/ace/config.h file and rebuild ACE. This option is not enabled by default because the additional dynamic memory allocation required may be prohibitive for high-performance or embedded systems.

Concurrency considerations. The ACE_Select_Reactor is an instantiation of the ACE_Select_Reactor_T class template shown in Figure 4.2 (page 91). This template uses the Strategized Locking pattern [POSA2] to allow application developers to configure the necessary level of synchronization. The TOKEN template argument is always an instantiation of ACE_Select_Reactor_Token_T with one of the following types:

  • ACE_Token This produces a synchronized reactor, allowing multiple threads to invoke event handler registration, removal, and management methods on a single ACE_Reactor that's shared by all threads in a process. The ACE_Token recursive locking mechanism is described in Sidebar 23.

  • ACE_Noop_Token This produces an unsynchronized reactor that minimizes the overhead of event handling for single-threaded applications. ACE_Noop_Token exports the same interface as ACE_Token , but performs no synchronization. This type of token is the default when ACE is built without multithreading support.

Only one thread (called the owner ) can invoke ACE_Select_Reactor::handle_events() at a time. By default, the owner of an ACE_Reactor is the identity of the thread that initialized it. The ACE_Select_Reactor::owner() method is used to change ownership of the ACE_Select_Reactor to a particular thread id. This method is useful when the thread running the reactor's event loop differs from the thread that initialized the reactor. The event_loop() function on page 97 illustrates this use case.

Sidebar 23: The ACE_Token Class

ACE_Token is a lock whose interface is compatible with other ACE synchronization wrapper facades, such as ACE_Thread_Mutex or ACE_RW_Mutex from Chapter 10 of C++NPv1, but whose implementation has the following capabilities:

  • It implements recursive mutex semantics; that is, a thread that owns the token can reacquire it without deadlocking. Before a token can be acquired by a different thread, however, its release() method must be called the same number of times that acquire() was called.

  • Each ACE_Token maintains two ordered lists that are used to queue high- and low-priority threads waiting to acquire the token. Threads requesting the token using ACE_Token::acquire_write() are kept in the high-priority list and take precedence over threads that call ACE_Token::acquire_read() , which are kept in the low-priority list. Within a priority list, threads that are blocked awaiting to acquire a token are serviced in either FIFO or LIFO order according to the current queueing strategy as threads release the token.

  • The ACE_Token queueing strategy can be obtained or set via calls to ACE_Token::queueing_strategy() and defaults to FIFO, which ensures the fairness among waiting threads. In contrast, UNIX International and Pthreads mutexes don't strictly enforce any particular thread acquisition ordering. For applications that don't require strict FIFO ordering, the ACE_Token LIFO strategy can improve performance by maximizing CPU cache affinity [SOP + 00].

  • The ACE_Token::sleep_hook() hook method is invoked if a thread can't acquire a token immediately. This method allows a thread to release any resources it's holding before it waits to acquire the token, thereby avoiding deadlock, starvation , and unbounded priority inversion.

ACE_Select_Reactor uses an ACE_Token -derived class named ACE_Select_Reactor_Token to synchronize access to a reactor. Requests to change the internal states of a reactor use ACE_Token::acquire_write() to ensure other waiting threads see the changes as soon as possible. ACE_Select_Reactor_Token overrides its sleep_hook() method to notify the reactor of pending threads via its notification mechanism described in Sidebar 21 (page 93).

Example

Since the reactive logging server on page 84 runs continuously there's no way to shut it down gracefully, other than to terminate it abruptly. For example, an administrator can send its process a "kill 9" from a UNIX login console or end the process via the Windows Task Manager. Abruptly terminating a process via these mechanisms prevents it from performing cleanup activities, such as flushing log records to disk, releasing synchronization locks, and closing TCP / IP connections. In this example, we show how to use the ACE_Select_Reactor::notify() mechanism to shut down the logging server cleanly.

Figure 4.4 shows the architecture of our solution, which uses the ACE_Select_Reactor 's notification mechanism to shut down our Reactor_Logging_Server via the following steps:

Figure 4.4. ACE_Select_Reactor Logging Server with Controller Thread

  1. We'll spawn a controller thread that waits for an administrator to pass it commands via its standard input.

  2. When the "quit" command is received, the controller thread passes a special event handler to the reactor via its notify() method and then exits the thread.

  3. The reactor invokes this event handler's handle_exception() method, which calls end_reactor_event_loop() and then deletes itself.

  4. When ACE_Reactor::run_reactor_event_loop() next checks the result of the reactor_event_loop_done() method, it will be true, causing the reactor event loop to exit, and the main server thread to exit gracefully.

The C++ code below illustrates these four steps. It's in Select_Reactor_Logging_Server.cpp and the revised main() function is shown first:

 1 #include "ace/streams.h"   2 #include "ace/Reactor.h"   3 #include "ace/Select_Reactor.h"   4 #include "ace/Thread_Manager.h"   5 #include "Reactor_Logging_Server.h"   6 #include <string>   7 // Forward declarations.   8 ACE_THR_FUNC_RETURN controller (void *);   9 ACE_THR_FUNC_RETURN event_loop (void *);  10  11 typedef Reactor_Logging_Server<Logging_Acceptor_Ex>  12         Server_Logging_Daemon;  13  14 int main (int argc, char *argv[]) {  15   ACE_Select_Reactor select_reactor;  16   ACE_Reactor reactor (&select_reactor);  17  18   Server_Logging_Daemon *server = 0;  19   ACE_NEW_RETURN (server,  20                   Server_Logging_Daemon (argc, argv, &reactor),  21                   1);  22   ACE_Thread_Manager::instance()->spawn (event_loop, &reactor);  23   ACE_Thread_Manager::instance()->spawn (controller, &reactor);  24   return ACE_Thread_Manager::instance ()->wait ();  25 } 

Lines 112 Include the header files, define some forward declarations, and instantiate the Reactor_Logging_Server template with the Logging_Acceptor_Ex class (page 67) to create the Server_Logging_Daemon type definition. ACE_THR_FUNC_RETURN portably specifies the thread function's return type.

Lines 1516 Set the implementation of the local ACE_Reactor instance to be an ACE_Select_Reactor .

Lines 2021 Dynamically create an instance of Server_Logging_Daemon .

Line 22 Use the ACE_Thread_Manager singleton from Chapter 9 of C++NPv1 to spawn a thread that runs the following event_loop() function:

 static ACE_THR_FUNC_RETURN event_loop (void *arg) {    ACE_Reactor *reactor = ACE_static_cast (ACE_Reactor *, arg);    reactor->owner (ACE_OS::thr_self ());    reactor->run_reactor_event_loop ();    return 0;  } 

Note how we set the owner of the reactor to the identity of the thread that runs the event loop. The Concurrency considerations discussion (page 94) explains the use of thread ownership for ACE_Select_Reactor .

Line 23 Spawn a thread to run the controller() function, which waits for an administrator to shut down the server via a command on its standard input.

Line 24 Wait for the other two threads to exit before returning from the main() function. ACE_Thread_Manager::wait() also reaps the exit status of the two threads to avoid memory leaks. Sidebar 42 (page 186) describes the conventions to follow to ensure memory isn't leaked when threads exit.

Line 25 At this point, the event loop isn't running, but the Server_Logging_Daemon and existing client connections are still open. The reactor and select_reactor objects are about to go out of scope, however. Since the ACE_Reactor plays the Abstraction role in the Bridge pattern, the only important field in reactor is a pointer to its implementation object, select_reactor . By default, the ACE_Reactor destructor only destroys the implementation object if the ACE_Reactor created it. Since select_reactor was created on the stack and passed to reactor , select_reactor is not destroyed by the ACE_Reactor destructor. Instead, it's destroyed when it goes out of scope. Its destruction triggers callbacks to Logging_Acceptor::handle_close() (page 58) and the Logging_Event_Handler_Ex::handle_close() (page 70) hook methods for each logging handler and logging event handler, respectively, that are still registered with the reactor .

The controller() function can be implemented as follows:

 1 static ACE_THR_FUNC_RETURN controller (void *arg) {   2   ACE_Reactor *reactor = ACE_static_cast (ACE_Reactor *, arg);   3   Quit_Handler *quit_handler = 0;   4   ACE_NEW_RETURN (quit_handler, Quit_Handler (reactor), 0);   5   6   for (;;) {   7     std::string user_input;   8     std::getline (cin, user_input, '\n');   9     if (user_input == "quit") {  10       reactor->notify (quit_handler);  11       break;  12    }  13   }  14   return 0;  15 } 

Lines 24 After casting the void pointer argument back into an ACE_Reactor pointer, we create a special event handler called Quit_Handler . Its handle_exception() and handle_close() methods simply shut down the ACE_Select_Reactor 's event loop and delete the event handler, respectively, as shown below:

 class Quit_Handler : public ACE_Event_Handler {  public:    Quit_Handler (ACE_Reactor *r): ACE_Event_Handler (r) {}    virtual int handle_exception (ACE_HANDLE) {      reactor ()->end_reactor_event_loop ();      return -1; // Trigger call to handle_close() method.    }    virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask)    { delete this; return 0; }  private:    // Private destructor ensures dynamic allocation.    virtual Quit_Handler () {}  }; 

Lines 613 Go into a loop that waits for an administrator to type "quit" on the standard input stream. When this occurs, we pass the quit_handler to the reactor via its notify() method and exit the controller thread.

The implementation shown above is portable to all ACE platforms that support threads. Section 4.4 illustrates how to take advantage of Windows-specific features to accomplish the same behavior.

Ru-Brd


C++ Network Programming
C++ Network Programming, Volume I: Mastering Complexity with ACE and Patterns
ISBN: 0201604647
EAN: 2147483647
Year: 2002
Pages: 65

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net