I l @ ve RuBoard |
MotivationDifferent operating systems use different APIs to create, manage, and synchronize the completion of threads. Today's multithreading mechanisms suffer from accidental complexities similar to those discussed in previous chapters. They also introduce the following two types of variability that make writing portable applications hard:
One particularly vexing aspect of multithreaded applications is determining how to cancel threads portably. Some OS platforms provide native support for canceling threads, for example:
Unfortunately, the native OS thread cancelation mechanisms outlined above are nonportable and error prone. For example, UNIX signals and the Win32 TerminateThread () function can stop a thread dead in its tracks, preventing it from releasing any resources it's holding. The Pthreads asynchronous thread cancelation mechanisms provide better support for cleaning up resources held by a thread, but they are still hard to understand and program correctly, and aren't portable to non-Pthreads platforms. Since it's tedious and error prone to address all of these portability issues in each application, ACE provides the ACE_Thread_Manager class. Class CapabilitiesThe ACE_Thread_Manager class uses the Wrapper Facade pattern to guide the encapsulation of the syntactic and semantic variation among different OS multithreading APIs. This class provides the following portable capabilities:
The interface of the ACE_Thread_Manager class is shown in Figure 9.1 and its key platform-independent methods are outlined in the following table:
Figure 9.1. The ACE_Thread_Manager Class Diagram
The ACE_Thread_Manager:: spawn () method can be passed a set of flags to specify the properties of the created thread. This value is a bit-wise inclusive "or" of the flags shown in the following table:
The ACE_Thread_Manager not only spawns threads with various properties, it also provides a cooperative thread cancelation mechanism that's safe, easy to use, and portable. To use this mechanism, the canceling thread uses ACE_Thread_Manager:: cancel () to set a flag indicating that a designated thread should cancel itself. The canceled thread is responsible for cooperating in this scheme by periodically calling ACE_Thread_Manager::testcancel () to see if it's been requested to cancel itself. Since threads are not canceled immediately, the ACE cooperative thread cancelation feature is analogous to using the Pthreads deferred cancelation policy with the cancelation point being the call to ACE_Thread_Manager:: testcancel () ACE cooperative thread cancelation differs from Pthreads deferred cancelation in the following ways:
Thus, ACE thread cancelation is strictly voluntary, which is the only way to cancel a thread both portably and safely. As with the ACE_Process_Manager described in Section 8.4, the ACE_Thread_Manager can be used in two ways:
ExampleMultithreaded servers are common on operating systems where spawning threads incurs less overhead than spawning processes. The following example uses the ACE_Thread_Manager to implement our first multithreaded logging server based on a thread-per-connection concurrency model. As shown in Figure 9.2, the master thread runs continuously and plays the role of a factory that
Figure 9.2. Architecture of the Thread-per-Connection Logging Server
The worker thread performs all subsequent log record processing on the ACE_SOCK_Stream and destroys it when the connection is closed. This concurrency design is similar to the example in Section 8.4 that spawned a new process for each client connection. This thread-per-connection logging server differs from the process-per-connection implementation in the following ways, however:
Section 5.2 provides a more general description of the pros and cons of implementing servers with multiple threads rather than with multiple processes. We start, as usual, by including the necessary ACE header files: #include "ace/SOCK_Stream.h" #include "ace/Thread_Manager.h" #include "Logging_Server.h" #include "Logging_Handler.h" We define a Thread_Per_Connection_Logging_Server class that inherits from Logging_Server . class Thread Per Connection Logging_Server : public Logging Server { private: class Thread_Args { public: Thread_Args (Thread_ Per_Conenction_Logging_Server *lsp) : this_ (lsp) {} Thread_Per_Connection_Logging_Server *this_; ACE_SOCK Stream logging peer_; }; // Passed as a parameter to <ACE_Thread_Manager::spawn>. static void *run_svc (void *arg); protected: // Other methods shown below... }; As with Process_Per_Connection_Logging_Server in Section 8.4, we inherit and reuse the open () and wait_for_multiple_events () methods.
The handle_connections () method accepts a connection into the ACE_ SOCK_Stream data member of the thread_args object. (Sidebar 20 explains why we allocate thread_args dynamically). We then use the singleton ACE_Thread_Manager to spawn a new thread that handles the newly connected client. virtual int handle_connections () { Thread_Args *thread_args = new Thread_Args (this); if (acceptor ().accept (thread_args->logging_peer_) == -1) return -1; else if (ACE_Thread_Manager:: instance ()->spawn ( // Pointer-to-function entry point. Thread_Per_Connection_Logging_Server::run_svc, // <run_svc> parameter. ACE_static_cast (void *, thread_ args), THR_DETACHED THR_SCOPE_SYSTEM) == -1) return -1; else return 0; } The static run_svc () method is the entry point function of each new thread. The underlying OS thread creation function will pass the thread_args pointer to run_svc () , which assumes control of the logging_peer 's lifetime. Since we don't rendezvous with the thread to collect its exit status, we pass the THR_DETACHED flag, which instructs the ACE_Thread_Manager and underlying OS threading implementation to free up resources as soon as a thread exits. All threads spawned by the ACE_Thread_Manager singleton can be managed as a whole. We can therefore wait for them to finish even though they weren't designated as joinable via the THR_JOINABLE flag when they were spawned. Sidebar 21 illustrates how threads are spawned using these flags.
The run_svc () method shown below serves as the entry point for each new thread created to process a client log records. void *Thread_Per_Connection_Logging_Server::run_svc (void *arg) { Thread_Args *thread_args = ACE_static_cast (Thread_Args *, arg); thread_args->this_->handle_data (&thread_args->logging_peer_); thread_args->logging_peer_.close (); delete thread_args; return 0; // Return value is ignored. } As shown in Sidebar 21 on page 195, the ACE_Thread_Manager ::spawn () method is passed the Thread_Args pointer used by run_svc () . This pointer must be cast to a void * to conform to the ACE threading API, which is portable across operating systems. After the run_svc () function is invoked by the OS thread library, it casts the pointer back to a Thread_Args * . The ACE ACE_static_cast () macro (see Sidebar 17 on page 176) makes it easy to cast the pointer as safely as the compiler allows. Note that since we spawned run_svc () in a detached thread it's return value is ignored. The static run_svc () method uses the data members in the Thread_ Args parameter to forward control back to the following handle_data () method so that log record processing can continue. protected: virtual int handle_data (ACE_SOCK_Stream *logging_peer) { ACE_FILE_IO log_file; // Client's hostname is used as the logfile name. make_log_file (log_file, logging_peer); // Place the connection into blocking mode. client->disable (ACE_NONBLOCK); Logging_Handler logging_handler (log_file, *logging_peer); ACE_Thread_Manager *tm = ACE_Thread_Manager:: instance (); ACE_thread_t me = ACE_OS::thr_self (); // Keep handling log records until client closes connection // or this thread is asked to cancel itself. while (!tm->testcancel (me) && logging_handler.log_record () != -1) continue; log_file.close (); return 0; } This version of handle_data () is similar to the process-per-connection example on page 179. In this version, however, the handle_data () thread that's processing client log records can be asked to cancel itself, which is an example of cooperative cancelation. To cooperate, each handle_data () thread calls ACE_Thread_Manager:: testcancel () before handling a log record to see if the main thread has requested a cancelation. One shortcoming of the placement of the testcancel () call and the design of the Logging_Handler:: log_record () method is that the thread won't notice the cancel request until after the next log record is received. If clients send log records fairly often this may not be a problem. The ACE Reactor and Task frameworks described in [SH] can be used to avoid this shortcoming. Finally, the main () program executes the logging server's event loop within its run () template method. int main (int argc, char *argv[]) { // Register to receive the <SIGTERM> signal. ACE_Sig_Action sa (sigterm_handler, SIGTERM); Thread_Per_Connection_Logging_Server server; if (server.run (argc, argv) == -1} ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "server.run()"), 1); // Cooperative thread cancelation. ACE_Thread_Manager::instance ()->cancel_all (); // Barrier synchronization, wait no more than a minute. ACE_Time_Value timeout (60); return ACE Thread Manager::instance ()->wait (&timeout); } The thread-per-connection implementation of the logging server catches the SIGTERM signal to allow a system administrator to shut it down. Rather than wait an unlimited time for all logging clients to terminate their sessions, however, the main thread uses the ACE_Thread_Manager cooperative cancelation mechanism to request all service threads spawned by the ACE_Thread_Manager singleton to shut down. The wait () call at the end of main () allows the logging server to wait up to a minute to synchronize on the completion of all the canceled threads. This is an example of barrier synchronization and it prevents problems from occurring on OS platforms where undesirable behavior occurs if the main thread exits while other threads are still running. By bounding our waiting time via the timeout argument, however, we ensure that the logging server doesn't hang indefinitely. |
I l @ ve RuBoard |