Ru-Brd |
MotivationAs discussed in Chapter 5 of C++NPv1, a reactive server responds to events from one or more sources. Ideally, response to events is quick enough so that all requests appear to be processed simultaneously , although event processing is usually handled by a single thread. A synchronous event demultiplexer is at the heart of each reactive server. This demultiplexer mechanism detects and reacts to events originating from a number of sources, making the events available to the server synchronously, as part of its normal execution path . The select() function is the most common synchronous event demultiplexer. This system function waits for specified events to occur on a set of I/O handles. [1] When one or more of the I/O handles become active, or after a designated amount of time elapses, select() returns. Its return value indicates the number of handles that are active, that the caller-specified time elapsed before an event occurred, or an error occurred. The caller can then take appropriate action. Additional coverage of select() is available in Chapter 6 of C++NPv1 and in [Ste98].
Although select() is available on most OS platforms, programming to the native select() C API requires developers to wrestle with many low-level details, such as
Chapter 7 of C++NPv1 discussed several wrapper facade classes that can be used to master many complexities associated with these low-level details. It's also useful, however, to use select() in environments where it's necessary to
To address these issues systematically, the ACE Reactor framework defines the ACE_Select_Reactor class, which provides all the capabilities outlined above. Class CapabilitiesACE_Select_Reactor is an implementation of the ACE_Reactor interface that uses the select() synchronous event demultiplexer function to detect I/O and timer events. In addition to supporting all the features of the ACE_Reactor interface, the ACE_Select_Reactor class provides the following capabilities:
The ACE_Select_Reactor is the default implementation of ACE_Reactor on all platforms except Windows, which uses the ACE_WFMO_Reactor for the reasons described in Sidebar 25 (page 105). Implementation overview. ACE_Select_Reactor descends from ACE_Reactor_Impl , as shown in Figure 4.1 (page 89). It therefore serves as a concrete implementation of the ACE_Reactor . As shown in Figure 4.2, ACE_Select_Reactor is actually a typedef of the ACE_Select_Reactor_T template (the Concurrency considerations section on page 93discusses this implementation aspect further). The ACE_Select_Reactor_Impl class contains the data and methods that are independent of the template argument to ACE_Select_Reactor_T , which isolates them from the template argument-dependent factors and prevents them from being duplicated in each template instantiation. Sidebar 20 (page 92) explains how to change the number of event handlers managed by an instance of ACE_Select_Reactor . Figure 4.2. The ACE_Select_Reactor Framework Internals
The ACE Reactor framework's notification mechanism (page 77) enables a reactor to process an open -ended number of event handlers and can be used to unblock a reactor from its event loop. By default, ACE_Select_Reactor implements its notification mechanism via an ACE_Pipe , which is a bidirectional IPC mechanism whose semantics are described in Sidebar 21 (page 93). The two ends of the pipe play the following roles:
Sidebar 17 (page 78) explains how to avoid deadlocks that can result from the fact that the buffer size of an ACE_Pipe is bounded. In addition to those application design tips, Sidebar 22 (page 94) describes another potential problem related to notifications; its solution also provides a way to enlarge the notification mechanism, which helps avoid deadlocks.
Unlike the event handlers registered with a reactor, the handlers passed via a reactor's notification mechanism needn't be associated with I/O-based or timer-based events, which helps improve the flexibility and scalability of ACE_Select_Reactor . Likewise, this mechanism allows all event handler processing to be serialized in the reactor's main thread, which simplifies event handler implementations since they needn't be thread-safe. Figure 4.3 illustrates how ACE_Pipe is used within ACE_Select_Reactor . Figure 4.3. The ACE_Select_Reactor Notification Mechanism
With events originating in numerous sources, both application-supplied (timers and I/O handles) and internal (notification pipe), it's important for an ACE_Select_Reactor to dispatch events to event handlers in an effective order. Years of experimentation and refinement resulted in the following order for event handler dispatching in the ACE_Select_Reactor::handle_events() method:
Applications should generally not rely on the order in which the different types of events are dispatched since not all reactor implementations guarantee the same order. For example, the ACE_Dev_Poll_Reactor (page 114) might not dispatch notifications before I/O events. There are situations, however, where knowing the dispatching order of events is useful. For example, an event handler's handle_output() callback method may encounter an error writing to a socket because the peer application aborts the connection. In this case, it's likely that the socket is also ready for input and the handler's handle_input() callback will be invoked shortly, where common socket and handler cleanup can take place.
Concurrency considerations. The ACE_Select_Reactor is an instantiation of the ACE_Select_Reactor_T class template shown in Figure 4.2 (page 91). This template uses the Strategized Locking pattern [POSA2] to allow application developers to configure the necessary level of synchronization. The TOKEN template argument is always an instantiation of ACE_Select_Reactor_Token_T with one of the following types:
Only one thread (called the owner ) can invoke ACE_Select_Reactor::handle_events() at a time. By default, the owner of an ACE_Reactor is the identity of the thread that initialized it. The ACE_Select_Reactor::owner() method is used to change ownership of the ACE_Select_Reactor to a particular thread id. This method is useful when the thread running the reactor's event loop differs from the thread that initialized the reactor. The event_loop() function on page 97 illustrates this use case.
ExampleSince the reactive logging server on page 84 runs continuously there's no way to shut it down gracefully, other than to terminate it abruptly. For example, an administrator can send its process a "kill 9" from a UNIX login console or end the process via the Windows Task Manager. Abruptly terminating a process via these mechanisms prevents it from performing cleanup activities, such as flushing log records to disk, releasing synchronization locks, and closing TCP / IP connections. In this example, we show how to use the ACE_Select_Reactor::notify() mechanism to shut down the logging server cleanly. Figure 4.4 shows the architecture of our solution, which uses the ACE_Select_Reactor 's notification mechanism to shut down our Reactor_Logging_Server via the following steps: Figure 4.4. ACE_Select_Reactor Logging Server with Controller Thread
The C++ code below illustrates these four steps. It's in Select_Reactor_Logging_Server.cpp and the revised main() function is shown first: 1 #include "ace/streams.h" 2 #include "ace/Reactor.h" 3 #include "ace/Select_Reactor.h" 4 #include "ace/Thread_Manager.h" 5 #include "Reactor_Logging_Server.h" 6 #include <string> 7 // Forward declarations. 8 ACE_THR_FUNC_RETURN controller (void *); 9 ACE_THR_FUNC_RETURN event_loop (void *); 10 11 typedef Reactor_Logging_Server<Logging_Acceptor_Ex> 12 Server_Logging_Daemon; 13 14 int main (int argc, char *argv[]) { 15 ACE_Select_Reactor select_reactor; 16 ACE_Reactor reactor (&select_reactor); 17 18 Server_Logging_Daemon *server = 0; 19 ACE_NEW_RETURN (server, 20 Server_Logging_Daemon (argc, argv, &reactor), 21 1); 22 ACE_Thread_Manager::instance()->spawn (event_loop, &reactor); 23 ACE_Thread_Manager::instance()->spawn (controller, &reactor); 24 return ACE_Thread_Manager::instance ()->wait (); 25 } Lines 112 Include the header files, define some forward declarations, and instantiate the Reactor_Logging_Server template with the Logging_Acceptor_Ex class (page 67) to create the Server_Logging_Daemon type definition. ACE_THR_FUNC_RETURN portably specifies the thread function's return type. Lines 1516 Set the implementation of the local ACE_Reactor instance to be an ACE_Select_Reactor . Lines 2021 Dynamically create an instance of Server_Logging_Daemon . Line 22 Use the ACE_Thread_Manager singleton from Chapter 9 of C++NPv1 to spawn a thread that runs the following event_loop() function: static ACE_THR_FUNC_RETURN event_loop (void *arg) { ACE_Reactor *reactor = ACE_static_cast (ACE_Reactor *, arg); reactor->owner (ACE_OS::thr_self ()); reactor->run_reactor_event_loop (); return 0; } Note how we set the owner of the reactor to the identity of the thread that runs the event loop. The Concurrency considerations discussion (page 94) explains the use of thread ownership for ACE_Select_Reactor . Line 23 Spawn a thread to run the controller() function, which waits for an administrator to shut down the server via a command on its standard input. Line 24 Wait for the other two threads to exit before returning from the main() function. ACE_Thread_Manager::wait() also reaps the exit status of the two threads to avoid memory leaks. Sidebar 42 (page 186) describes the conventions to follow to ensure memory isn't leaked when threads exit. Line 25 At this point, the event loop isn't running, but the Server_Logging_Daemon and existing client connections are still open. The reactor and select_reactor objects are about to go out of scope, however. Since the ACE_Reactor plays the Abstraction role in the Bridge pattern, the only important field in reactor is a pointer to its implementation object, select_reactor . By default, the ACE_Reactor destructor only destroys the implementation object if the ACE_Reactor created it. Since select_reactor was created on the stack and passed to reactor , select_reactor is not destroyed by the ACE_Reactor destructor. Instead, it's destroyed when it goes out of scope. Its destruction triggers callbacks to Logging_Acceptor::handle_close() (page 58) and the Logging_Event_Handler_Ex::handle_close() (page 70) hook methods for each logging handler and logging event handler, respectively, that are still registered with the reactor . The controller() function can be implemented as follows: 1 static ACE_THR_FUNC_RETURN controller (void *arg) { 2 ACE_Reactor *reactor = ACE_static_cast (ACE_Reactor *, arg); 3 Quit_Handler *quit_handler = 0; 4 ACE_NEW_RETURN (quit_handler, Quit_Handler (reactor), 0); 5 6 for (;;) { 7 std::string user_input; 8 std::getline (cin, user_input, '\n'); 9 if (user_input == "quit") { 10 reactor->notify (quit_handler); 11 break; 12 } 13 } 14 return 0; 15 } Lines 24 After casting the void pointer argument back into an ACE_Reactor pointer, we create a special event handler called Quit_Handler . Its handle_exception() and handle_close() methods simply shut down the ACE_Select_Reactor 's event loop and delete the event handler, respectively, as shown below: class Quit_Handler : public ACE_Event_Handler { public: Quit_Handler (ACE_Reactor *r): ACE_Event_Handler (r) {} virtual int handle_exception (ACE_HANDLE) { reactor ()->end_reactor_event_loop (); return -1; // Trigger call to handle_close() method. } virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask) { delete this; return 0; } private: // Private destructor ensures dynamic allocation. virtual Quit_Handler () {} }; Lines 613 Go into a loop that waits for an administrator to type "quit" on the standard input stream. When this occurs, we pass the quit_handler to the reactor via its notify() method and exit the controller thread. The implementation shown above is portable to all ACE platforms that support threads. Section 4.4 illustrates how to take advantage of Windows-specific features to accomplish the same behavior. |
Ru-Brd |