4.4 The ACE_WFMO_Reactor Class

Ru-Brd

Motivation

Although the select() function is available on most operating systems, it's not always the most efficient or most powerful event demultiplexer on any given OS platform. In particular, select() has the following limitations:

  • On UNIX platforms, it only supports demultiplexing of I/O handles, such as files, terminal devices, FIFOs, and pipes. It does not portably support demultiplexing of synchronizers, threads, or System V Message Queues.

  • On Windows, select() only supports demultiplexing of socket handles.

  • It can only be called by one thread at a time for a particular set of I/O handles, which can degrade potential parallelism.

Windows defines the WaitForMultipleObjects() system function, described in Sidebar 24 (page 104), to alleviate these problems. This function works with many Windows handle types that can be signaled. Although it doesn't work directly with I/O handles, it can be used to demultiplex I/O- related events in two ways:

  1. Event handles used in overlapped I/O operations

  2. Event handles associated with socket handles via WSAEventSelect()

Moreover, multiple threads can call WaitForMultipleObjects() concurrently on the same set of handles, thereby enhancing potential parallelism.

WaitForMultipleObjects() is tricky to use correctly, however, for the following reasons [SS95a]:

  • WaitForMultipleObjects() returns an index to the first handle array slot with a signaled handle. It does not, however, indicate the number of handles that are signaled, and there is no simple way to scan the handles and check which are. WaitForMultipleObjects() must therefore be invoked numerous times to find all signaled handles. In contrast, select() returns a set of active I/O handles and a count of how many are active.

  • WaitForMultipleObjects() doesn't guarantee a fair distribution of notifications, that is, the lowest active handle in the array is always returned, regardless of how long other handles further back in the array may have had pending events.

To shield programmers from these low-level details, preserve demultiplexing fairness, and leverage the power of WaitForMultipleObjects() on Windows platforms, the ACE Reactor framework provides the ACE_WFMO_Reactor class.

Sidebar 24: The Windows WaitForMultipleObjects() Function

The Windows WaitForMultipleObjects() event demultiplexer function is similar to select() . It blocks on an array of up to 64 handles until one or more of them become active (which is known as being "signaled" in Windows terminology) or until the interval in its timeout parameter elapses. It can be programmed to return to its caller when either any one or more of the handles becomes active or all the handles become active. In either case, it returns the index of the lowest active handle in the caller-specified array of handles. Unlike the select() function, which only demultiplexes I/O handles, WaitForMultipleObjects() can wait for many types of Windows objects, including a thread, process, synchronizer (e.g., event, semaphore, or mutex), change notification, console input, and timer.

Class Capabilities

ACE_WFMO_Reactor is yet another implementation of the ACE_Reactor interface, which uses the WaitForMultipleObjects() function to wait for events to occur on a set of event sources. In addition to supporting all the features of the ACE_Reactor interface, the ACE_WFMO_Reactor class provides the following capabilities:

  • It enables a pool of threads to call its handle_events() method concurrently (as a result, the ACE_WFMO_Reactor::owner() method is a no-op). This facility is more powerful than that of ACE_TP_Reactor . In ACE_WFMO_Reactor , all of the event handling threads can dispatch events concurrently instead of taking turns in a leader/followers arrangement. We'll discuss this aspect of ACE_WFMO_Reactor in more detail in the Concurrency considerations section on page 106.

  • It allows applications to wait for socket I/O events and scheduled timers, similar to the select() -based reactors. ACE_WFMO_Reactor also integrates event demultiplexing and dispatching for all event types that WaitForMultipleObjects() supports, as outlined in Sidebar 24.

  • Each call to handle_events() waits for a handle to become active. Starting from that handle, it iterates through all other active handles before returning. This design prevents an active handle from starving handles further down in the handle set array.

  • Using the ACE_Msg_WFMO_Reactor subclass, applications can process all the events above plus window messages.

ACE_WFMO_Reactor is the default ACE_Reactor implementation on Windows for the reasons described in Sidebar 25. Note that ACE_WFMO_Reactor dispatches events in the same order as the ACE_Select_Reactor (page 92).

Implementation overview. As shown in Figure 4.1 (page 89), ACE_WFMO_Reactor inherits from ACE_Reactor_Impl . It therefore serves as a concrete implementation of the ACE_Reactor interface. Just as ACE_Select_Reactor leverages the capabilities of the select() function, ACE_WFMO_Reactor leverages the capabilities of WaitForMultipleObjects() , as shown in Figure 4.6 (page 106).

Figure 4.6. The ACE_WFMO_Reactor Framework Internals

Sidebar 25: Why ACE_WFMO_Reactor Is the Default on Windows

The ACE_WFMO_Reactor is the default implementation of the ACE_Reactor on Windows platforms for the following reasons:

  • It lends itself more naturally to multithreaded processing, which is common on Windows ( ACE_WFMO_Reactor was developed before ACE_TP_Reactor and was the first reactor to support multithreaded event handling).

  • Applications often use signalable handles in situations where a signal may have been used on POSIX (e.g., child process exit) and these events can be dispatched by ACE_WFMO_Reactor .

  • It can handle a wider range of events than the ACE_Select_Reactor , which can only handle socket and timer events on Windows.

  • It's easily integrated with ACE_Proactor event handling, discussed in Sidebar 58 (page 290).

ACE_WFMO_Reactor 's most significant differences from ACE_Select_Reactor and ACE_TP_Reactor include the following:

  • Limited number of handles. Unlike ACE_Select_Reactor and ACE_TP_Reactor , which can be configured to demultiplex hundreds or thousands of handles, ACE_WFMO_Reactor can process no more than 62 handles. This limitation stems from the fact that Windows only allows WaitForMultipleObjects() to wait for 64 handles per thread. ACE_WFMO_Reactor uses two of these handles internally: one for its notification mechanism and another for synchronizing concurrent handler updates. If more than 64 handles must be demultiplexed, you can use multiple ACE_WFMO_Reactor objects in multiple threads, use the ACE_Proactor (Chapter 8), or use the ACE_Select_Reactor and increase its size via the mechanisms described in Sidebar 20 (page 92).

  • WRITE_MASK semantics different from select() . When a socket can send more data, select() detects a WRITE condition. It will continue to detect this condition as long as the socket remains writeable , that is, until it becomes flow controlled. In contrast, the Windows WSAEventSelect() function only sets the WRITE event when the socket is first connected, whether passively or actively, and when the socket transitions from flow-controlled to writeable. When relying on WRITE events using the ACE_WFMO_Reactor , you must therefore continue to write until the connection closes or the socket becomes flow controlled and a send() fails with EWOULDBLOCK . If this behavior is undesirable, you might consider choosing the ACE_Select_Reactor as the ACE_Reactor implementation on Windows since it has the same WRITE_MASK semantics as on UNIX platforms.

  • Different notification mechanism. The ACE_Select_Reactor 's notification mechanism is implemented using the ACE_Pipe mechanism described in Sidebar 21 (page 93). In contrast, the ACE_WFMO_Reactor 's notification mechanism is implemented using a synchronized version of the ACE_Message_Queue described in Section 6.2. As a result, the queue can be configured to have a user -defined size that can help avoid the deadlock problems discussed in Sidebar 17 (page 78). The default maximum number of queued notifications for ACE_WFMO_Reactor is 1,024. To change this value:

    • Create a new ACE_WFMO_Reactor_Notify object, specifying the desired maximum number of queued notifications to its constructor.

    • Create a new ACE_WFMO_Reactor object, passing a pointer to the new ACE_WFMO_Reactor_Notify object to the ACE_WFMO_Reactor constructor.

Concurrency considerations. The ACE_WFMO_Reactor allows multiple threads to call its handle_events() method concurrently. This capability complicates its design, however, and introduces some subtle behavioral differences from select() -based reactors, as discussed below:

  • Coordination of registration changes. Each change to the set of registered handles will affect all threads executing the event loop. Allowing these changes to occur without synchronization would cause errors ranging from missed events to incorrect or invalid handlers being dispatched. To handle registration changes properly in the presence of multiple threads, ACE_WFMO_Reactor maintains three sets of handler information objects:

  1. Current handlers , which are the handlers used for event detection and demultiplexing

  2. New handlers , which are awaiting addition to the set of current handlers

  3. Suspended handlers , which are handlers suspended from the current handler set

When registration changes are requested (such as registering, removing, suspending, or resuming an event handler), the handle, event handler, and event type information are remembered and the need for changing the associated information is noted. The next thread that completes its pass through handle_events() will notice the need for a change, obtain the reactor's lock, and wait for all other threads running handle_events() to complete. To ensure that they complete in a timely fashion, the waiting thread signals an internal event that's part of the dispatch handle set, causing all threads blocked in WaitForMultipleObjects() to awake. At this point, all event-handling threads will block waiting for changes to occur. When the original thread completes the necessary information and handle changes, the reactor lock is released, and all event-handling threads restart their event waiting, demultiplexing, and dispatching with the updated handle set.

  • Deferred event handler cleanup. ACE_WFMO_Reactor 's registration change delay introduces a subtle behavioral difference compared to select() -based reactors. When a handle_*() method returns -1or ACE_Reactor::remove_handler() is called on an event handler, the ACE_WFMO_Reactor defers the handler removal and callback to the handler's handle_close() hook until the registration changes can take place as described above. An application therefore can't delete an event handler immediately after requesting an ACE_WFMO_Reactor to remove it since the reactor's later call to the handle_close() method will dispatch through an invalid pointer.

    The differences between ACE_WFMO_Reactor and the select() -based reactors can be masked in practice by adhering to the idiom of performing all cleanup in an event handler's handle_close() hook method, as described in Sidebar 9 (page 51). This idiom prevents premature deletion of the event handler object. In cases where this idiom doesn't apply (for example if an event handler is an automatic object whose destruction cannot be deferred), the ACE_Event_Handler::DONT_CALL flag must be passed to ACE_Reactor::remove_handler() to prevent ACE_WFMO_Reactor from invoking the handle_close() hook method later. It's therefore advisable to allocate event handlers dynamically, as advocated in Sidebar 11 (page 55), which avoids the need to manage the lifetime of event handlers outside the ACE_Reactor handler management scheme.

  • Multithreaded dispatch to same handler. Unlike those using select() -based reactors, multithreaded applications can demultiplex and dispatch events concurrently using the ACE_WFMO_Reactor::handle_events() method. [2] In the multithreaded case it's therefore possible that different threads will dispatch events simultaneously to the same event handler. This can happen, for example, when a handler-scheduled timer expires while I/O is being processed . It can also occur in the following situation:

    [2] Although multiple threads can call ACE_TP_Reactor::handle_events() concurrently, only one thread at a time (the leader) actually runs select() .

  1. Thread 1 dispatches a socket input event to a handler.

  2. Thread 1 calls ACE_SOCK_Stream::recv() to read data.

  3. More data are available on the socket, due to a limited receive in step 2 or the arrival of more data.

  4. Thread 2 dispatches the input event on the same handle, resulting in a race condition between thread 1 and thread 2 .

Event handlers must therefore explicitly protect against race conditions when the handle_events() event loop is executed by multiple threads on the same ACE_WFMO_Reactor object. The ACE_TP_Reactor avoids these race conditions by implementing an internal protocol that automatically suspends a handle before dispatching its event handler. Any follower thread that subsequently becomes the leader doesn't dispatch events on the affected handle until the callback is complete and the handle is resumed. Sidebar 26 explains why this handler suspension protocol can't be used with the ACE_WFMO_Reactor .

Example

This example illustrates how to use a signalable handle with the ACE_WFMO_Reactor .It also illustrates one technique for properly serializing I/O handling in a thread pool running the ACE_WFMO_Reactor event loop. Figure 4.7 (page 110) illustrates the architecture of this server. It's similar to the one in Figure 4.4 (page 96), with the difference being the pool of threads that call ACE_Reactor::handle_events() . Since this example explicitly specifies an instance of ACE_WFMO_Reactor , it works only on Windows.

Figure 4.7. ACE_WFMO_Reactor Logging Server

This example is in the WFMO_Reactor_Logging_Server.cpp file. We start by defining a Quit_Handler class:

 class Quit_Handler : public ACE_Event_Handler {  private:    ACE_Manual_Event quit_seen_; // Keep track of when to shutdown.  public: 

Although this class inherits from ACE_Event_Handler , it's used quite differently from the Quit_Handler defined on page 98. The Quit_Handler constructor illustrates some of the differences below:

 1 Quit_Handler (ACE_Reactor *r): ACE_Event_Handler (r) {   2   SetConsoleMode (ACE_STDIN, ENABLE_LINE_INPUT   3                               ENABLE_ECHO_INPUT   4                               ENABLE_PROCESSED_INPUT);   5   if (reactor ()->register_handler   6       (this, quit_seen_.handle ()) == -1   7        ACE_Event_Handler::register_stdin_handler   8           (this, r, ACE_Thread_Manager::instance ()) == -1)   9     r->end_reactor_event_loop ();  10 } 

Sidebar 26: Why ACE_WFMO_Reactor Doesn't Suspend Handles

The ACE_WFMO_Reactor doesn't implement a handler suspension protocol internally to minimize the amount of policy imposed on application classes. In particular, multithreaded applications can process events more efficiently when doing so doesn't require interevent serialization, as is the case when receiving UDP datagrams. This behavior isn't possible in the ACE_TP_Reactor because of the semantic differences in the functionality of the following OS event demultiplexing mechanisms:

  • WaitForMultipleObjects() . When demultiplexing a socket handle's I/O event, one ACE_WFMO_Reactor thread will obtain the I/O event mask from WSAEnumNetworkEvents() , and the OS atomically clears that socket's internal event mask. Even if multiple threads demultiplex the socket handle simultaneously, only one obtains the I/O event mask and will dispatch the handler. The dispatched handler must take some action that reenables demultiplexing for that handle before another thread will dispatch it.

  • select() . There's no automatic OS serialization for select() . If multiple threads were allowed to see a ready-state socket handle, they would all dispatch it, yielding unpredictable behavior at the ACE_Event_Handler layer and reduced performance due to multiple threads all working on the same handle.

It's important to note that the handler suspension protocol can't be implemented in the application event handler class when it's used in conjunction with the ACE_WFMO_Reactor . This is because suspension requests are queued and aren't acted on immediately, as described on page 107. A handler could therefore receive upcalls from multiple threads until the handler was actually suspended by the ACE_WFMO_Reactor . The Logging_Event_Handler_WFMO class (page 111) illustrates how to use mutual exclusion to avoid race conditions in upcalls.

Lines 2 “4 Simplify input handling by setting the console to read a whole line of text (not small pieces at a time) without including control characters , such as Ctl-C.

Lines 5 “6 We illustrate how to register an event handle with the reactor. We will signal this event when the quit command is entered. Sidebar 27 (page 111) outlines the capabilities of the ACE_Manual_Event class.

Lines 7 “8 Use ACE_Event_Handler::register_stdin_handler() to establish the input mechanism, which causes the Quit_Handler::handle_input() hook method to be called repeatedly until it returns-1. On Windows, all these calls will be made from a thread other than the event loop thread(s), which removes the requirement for the input-handling method to serialize its processing.

Line 9 If either registration fails, we immediately mark the reactor event loop as ended, which forces an immediate end to the loop when the main program starts it.

The Quit_Handler::handle_input() method is shown below:

 virtual int handle_input (ACE_HANDLE h) {    CHAR user_input[BUFSIZ];    DWORD count;    if (!ReadFile (h, user_input, BUFSIZ, &count, 0)) return -1;    user_input[count] = ' 
 virtual int handle_input (ACE_HANDLE h) { CHAR user_input[BUFSIZ]; DWORD count; if (!ReadFile (h, user_input, BUFSIZ, &count, 0)) return -1; user_input[count] = '\0'; if (ACE_OS_String:: strncmp (user_input, "quit", 4) == 0) return -1; return 0; } 
'; if (ACE_OS_String::strncmp (user_input, "quit", 4) == 0) return -1; return 0; }

When the "quit" command is seen, handle_input() returns -1, which triggers ACE_WFMO_Reactor to dispatch Quit_Handler::handle_close() :

 virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask)  { quit_seen_.signal (); return 0; } 

The manual event handle from quit_seen_ was registered with the reactor in the Quit_Handler constructor (page 108). When the event is signaled in the handle_close() method, the ACE_WFMO_Reactor will then demultiplex the event and call the Quit_Handler::handle_signal() method shown below.

 virtual int handle_signal (int, siginfo_t *, ucontext_t *)  { reactor ()->end_reactor_event_loop (); return 0; } 

This hook method calls the reactor's end_reactor_event_loop() method, which causes all of the event-handling threads to stop. It's possible to call end_reactor_event_loop() directly from handle_close() . We moved the call to handle_signal() to illustrate how to use a signalable handle with ACE_WFMO_Reactor .

When the Quit_Handler object is destroyed , its destructor behaves as follows :

 1 Quit_Handler () {  2   ACE_Event_Handler::remove_stdin_handler  3     (reactor (), ACE_Thread_Manager::instance ());  4   reactor ()->remove_handler (quit_seen_.handle (),  5                               ACE_Event_Handler::DONT_CALL);  6 } 

Lines 2 “3 Cancel the effects of the previous register_stdin_handler() call.

Lines 4 “5 Unregister the event handle from the reactor. The DONT_CALL flag prevents the reactor from calling back to handle_close() , so the cleanup is complete at this point and we needn't worry about a later callback through an invalid object pointer.

Sidebar 27: The ACE_Manual_Event and ACE_Auto_Event Classes

ACE provides two synchronization wrapper facade classes that should be familiar to Windows programmers: ACE_Manual_Event and ACE_Auto_Event . These classes allow threads in a process to wait on an event or inform other threads about the occurrence of a specific event in a thread-safe manner. On Windows these classes are wrapper facades around native event objects, whereas on other platforms ACE emulates the Windows event object facility.

Events are similar to condition variables in the sense that a thread can use them to either signal the occurrence of an application-defined event or wait for that event to occur. Unlike stateless condition variables, however, a signaled event remains set until a class-specific action occurs. For example, an ACE_Manual_Event remains set until it is explicitly reset and an ACE_Auto_Event remains set until a single thread waits on it. These two classes allow users to control the number of threads awakened by signaling operations, and allows an event to indicate a state transition, even if no threads are waiting at the time the event is signaled.

Events are more expensive than mutexes , but provide better control over thread scheduling. Events provide a simpler synchronization mechanism than condition variables. Condition variables are more useful for complex synchronization activities, however, since they enable threads to wait for arbitrary condition expressions.

Due to concurrency differences between ACE_WFMO_Reactor and the select() based reactors (shown in Sections 4.2 and 4.3), we also derive a new Logging_Event_Handler_WFMO class to add protection against race conditions. We need only override the handle_input() hook method of Logging_Event_Handler_Ex (page 68) and add a mutex to explicitly serialize access of threads in the thread pool to a client logging daemon connection, as follows:

 class Logging_Event_Handler_WFMO      : public Logging_Event_Handler_Ex {  public:    Logging_Event_Handler_WFMO (ACE_Reactor *r)      : Logging_Event_Handler_Ex (r) {}  protected:    int handle_input (ACE_HANDLE h) {      ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, lock_, -1);      return logging_handler_.log_record ();    }    ACE_Thread_Mutex lock_; // Serialize threads in thread pool.  }; 

Since Logging_Acceptor_Ex (page 67) instantiates a new Logging_Event_Handler_Ex object for each new client connection, our use of a different event handler class also mandates a new acceptor class. The following subclass of Logging_Acceptor_Ex instantiates the correct type of event handler when a new client connection arrives:

 class Logging_Acceptor_WFMO : public Logging_Acceptor_Ex {  public:    Logging_Acceptor_WFMO      (ACE_Reactor *r = ACE_Reactor::instance ())      : Logging_Acceptor_Ex (r) {}  protected:    virtual int handle_input (ACE_HANDLE) {      Logging_Event_Handler_WFMO *peer_handler = 0;      ACE_NEW_RETURN (peer_handler,                      Logging_Event_Handler_WFMO (reactor ()), -1);      if (acceptor_.accept (peer_handler->peer ()) == -1)      { delete peer_handler; return -1; }      else if (peer_handler->open () == -1)      { peer_handler->handle_close (); return -1; }      return 0;    }  }; 

The handle_input() method doesn't require protection against race conditions since it only operates on objects local to the method. In fact, except for the type of event handler that's instantiated for each new connection, Logging_Acceptor_WFMO is identical to Logging_Acceptor_Ex (page 67). The ACE Acceptor-Connector framework in Chapter 7 shows how to factor out the event handler type into a reusable acceptor class.

Our ACE_WFMO_Reactor logging server's main() function is shown below:

 #include "ace/Reactor.h"  #include "ace/Synch.h"  #include "ace/WFMO_Reactor.h"  #include "ace/Thread_Manager.h"  #include "Reactor_Logging_Server.h"  ACE_THR_FUNC_RETURN event_loop (void *); // Forward declaration.  typedef Reactor_Logging_Server<Logging_Acceptor_WFMO>          Server_Logging_Daemon;  int main (int argc, char *argv[]) {    const size_t N_THREADS = 4;    ACE_WFMO_Reactor wfmo_reactor;    ACE_Reactor reactor (&wfmo_reactor);    Server_Logging_Daemon *server = 0;    ACE_NEW_RETURN      (server, Server_Logging_Daemon (argc, argv, &reactor), 1);    Quit_Handler quit_handler (&reactor);    ACE_Thread_Manager::instance ()->spawn_n      (N_THREADS, event_loop, &reactor);    return ACE_Thread_Manager::instance ()->wait ();  } 

The main differences between this main() and the one shown on page 101 are

  • Reactor_Logging_Server is instantiated with Logging_Acceptor_WFMO rather than Logging_Acceptor_Ex .

  • The ACE_WFMO_Reactor is used instead of ACE_TP_Reactor .

  • The controller thread dedicated to shutdown processing is replaced by an instance of Quit_Handler , described above.

  • The calls to the WaitForMultipleObjects() event demultiplexer can actually run concurrently in different threads, rather than having calls to select() be serialized using the Leader/Followers pattern, as is the case with the ACE_TP_Reactor .

  • The program will only run on Windows platforms, instead of all ACE platforms.

  • We don't use the singleton reactor API, but instead use a local reactor instance again.

Ru-Brd


C++ Network Programming
C++ Network Programming, Volume I: Mastering Complexity with ACE and Patterns
ISBN: 0201604647
EAN: 2147483647
Year: 2002
Pages: 65

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net