9.2 The ACE_Thread_Manager Class

I l @ ve RuBoard

Motivation

Different operating systems use different APIs to create, manage, and synchronize the completion of threads. Today's multithreading mechanisms suffer from accidental complexities similar to those discussed in previous chapters. They also introduce the following two types of variability that make writing portable applications hard:

  • Syntactic Native OS threading APIs are often non-portable due to syntactic differences, even if they provide the same capabilities, for example, the Win32 CreateThread () and the Pthreads pthread_create () functions provide similar thread creation capabilities, even though their APIs differ syntactically.

  • Semantic Wrapper facades that export a syntactically uniform C++ programming interface don't necessarily address semantic variations across OS multithreading mechanisms, for example, both Pthreads and UI threads support detached threads, whereas Win32 does not, and VxWorks supports only detached threads.

One particularly vexing aspect of multithreaded applications is determining how to cancel threads portably. Some OS platforms provide native support for canceling threads, for example:

  • Pthreads defines a powerful set of APIs that allow threads to be canceled asynchronously or cooperatively via the pthread_cancel () and pthread_testcancel () functions.

  • It's also possible to cancel threads on some UNIX platforms via signals, for example, via the Pthreads pthread_kill () and UI threads thr_kill () functions.

  • The Win32 TerminateThread () function offers asynchronous cancelation.

Unfortunately, the native OS thread cancelation mechanisms outlined above are nonportable and error prone. For example, UNIX signals and the Win32 TerminateThread () function can stop a thread dead in its tracks, preventing it from releasing any resources it's holding. The Pthreads asynchronous thread cancelation mechanisms provide better support for cleaning up resources held by a thread, but they are still hard to understand and program correctly, and aren't portable to non-Pthreads platforms.

Since it's tedious and error prone to address all of these portability issues in each application, ACE provides the ACE_Thread_Manager class.

Class Capabilities

The ACE_Thread_Manager class uses the Wrapper Facade pattern to guide the encapsulation of the syntactic and semantic variation among different OS multithreading APIs. This class provides the following portable capabilities:

  • Spawns one thread, or multiple threads at once, each running an application-designated function concurrently

  • Alters the most common thread attributes, for example, scheduling priority and stack size , for each of the spawned threads

  • Spawns and manages a set of threads as a cohesive collection, called a thread group

  • Manages the threads in an ACE_Task , which we present in [SH]

  • Facilitates cooperative cancelation of threads, and

  • Waits for one or more threads to exit.

The interface of the ACE_Thread_Manager class is shown in Figure 9.1 and its key platform-independent methods are outlined in the following table:

Method Description
spawn () Creates a new thread of control, passing it the function and function parameter to run as the thread's entry point.
spawn_n () Creates n new threads belonging to the same thread group. Other threads can wait for this entire group of threads to exit.
wait () Blocks until all threads in the thread manager have exited and reaps the exit status of any joinable threads.
join() Waits for a particular thread to exit and reaps its exit status.
cancel_all () Requests all threads managed by an ACE_Thread_Manager object to stop.
testcancel () Asks if the designated thread has been requested to stop.
exit () Exits a thread and releases the thread's resources.
close () Closes down and releases resources for all managed threads.
instance () A static method that returns a pointer to the ACE_Thread_Manager singleton.
Figure 9.1. The ACE_Thread_Manager Class Diagram

The ACE_Thread_Manager:: spawn () method can be passed a set of flags to specify the properties of the created thread. This value is a bit-wise inclusive "or" of the flags shown in the following table:

Flag Description
THR_SCOPE_SYSTEM The new thread is created with system scheduling contention scope, permanently bound to a newly created kernel-thread.
THR_SCOPE_PROCESS The new thread is created with process scheduling contention scope, that is, it runs as a user thread.
THR_NEW_LWP This flag affects the concurrency attribute of the process. The desired level of concurrency for unbound threads is increased by one, which typically adds a new kernel thread to the pool available to run user threads. On OS platforms that don't support an N:M user/kernel threading model this flag is ignored.
THR_DETACHED The new thread is created detached, which means that its exit status is not accessible to other threads. Its thread ID and other resources are reclaimed by the OS as soon as the thread terminates. This flag is the opposite Of THR_JOINABLE.
THR_JOINABLE The new thread is created joinable, which means that its exit status can be obtained by other threads via the ACE_Thread_Manager:: join() method. Its thread ID and other resources are not reclaimed by the OS until another thread joins with it. This flag is the opposite of THR_DETACHED. The default behavior for all ACE thread creation methods is THR_JOINABLE.

The ACE_Thread_Manager not only spawns threads with various properties, it also provides a cooperative thread cancelation mechanism that's safe, easy to use, and portable. To use this mechanism, the canceling thread uses ACE_Thread_Manager:: cancel () to set a flag indicating that a designated thread should cancel itself. The canceled thread is responsible for cooperating in this scheme by periodically calling ACE_Thread_Manager::testcancel () to see if it's been requested to cancel itself.

Since threads are not canceled immediately, the ACE cooperative thread cancelation feature is analogous to using the Pthreads deferred cancelation policy with the cancelation point being the call to ACE_Thread_Manager:: testcancel () ACE cooperative thread cancelation differs from Pthreads deferred cancelation in the following ways:

  • Thread cleanup in ACE must be programmed explicitly after the cancelation point

  • After a thread has been canceled in ACE, it can elect to finish any in-progress work or even ignore the cancelation request altogether.

Thus, ACE thread cancelation is strictly voluntary, which is the only way to cancel a thread both portably and safely.

As with the ACE_Process_Manager described in Section 8.4, the ACE_Thread_Manager can be used in two ways:

  • As a singleton [GHJV95] accessed via its instance () method. This method is implemented using the Double-Checked Locking Optimization pattern, as is described in Sidebar 19.

  • By instantiating one or more instances. This capability can be used to support multiple sets of thread groups within a process. Although this is a legitimate use at any time, it's recommended when developing a shared library (DLL) that uses the ACE_Thread_Manager class. This practice avoids interfering with the singleton instance that the calling application may be using.

Example

Multithreaded servers are common on operating systems where spawning threads incurs less overhead than spawning processes. The following example uses the ACE_Thread_Manager to implement our first multithreaded logging server based on a thread-per-connection concurrency model. As shown in Figure 9.2, the master thread runs continuously and plays the role of a factory that

  1. Accepts a connection and creates a ACE_SOCK_Stream object dynamically and

  2. Spawns a worker thread that uses this object to handle the client's logging session.

Figure 9.2. Architecture of the Thread-per-Connection Logging Server

The worker thread performs all subsequent log record processing on the ACE_SOCK_Stream and destroys it when the connection is closed. This concurrency design is similar to the example in Section 8.4 that spawned a new process for each client connection. This thread-per-connection logging server differs from the process-per-connection implementation in the following ways, however:

Sidebar 19: Serializing Singletons in ACE

The Singleton pattern [GHJV95] ensures that a class has only one Instance and provides a global point of access to it. Singletons defined In ACE use the Double-Checked Locking Optimization pattern [SSRB00] to reduce contention and synchronization overhead when critical sections of code must acquire locks in a thread-safe manner just once during program execution. The following code shows how the static ACE_Thread_Manager::instance () method uses this pattern:

 ACE_Thread_Manager *ACE_Thread_Manager::instance () {   if (ACE_Thread_Manager::thr_mgr_ == 0) {     ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon,                       *ACE_Static_Object_Lock::instance(),                       0));     if (ACE_Thread_Manager: ; thr_mgr_ == 0)       ACE_Thread Manager::thr_mgr_ = new ACE_Thread_Manager;   }   return ACE Thread Manager::thr_mgr_; } 

The ACE_Static_Object_Lock::instance () returns a preinltialized lock that Is created before the main () function runs, as described in Sidebar 23 on page 218.

  • It spawns a new thread for each connection instead of a new process, so it should be more efficient.

  • It's more portable because it works on any platform that supports threads.

  • It's considerably simpler because portable multithreaded servers are easier to program than multiprocessed servers.

Section 5.2 provides a more general description of the pros and cons of implementing servers with multiple threads rather than with multiple processes.

We start, as usual, by including the necessary ACE header files:

 #include "ace/SOCK_Stream.h" #include "ace/Thread_Manager.h" #include "Logging_Server.h" #include "Logging_Handler.h" 

We define a Thread_Per_Connection_Logging_Server class that inherits from Logging_Server .

 class Thread Per Connection Logging_Server : public Logging Server { private:   class Thread_Args {   public:     Thread_Args (Thread_ Per_Conenction_Logging_Server *lsp)       : this_ (lsp) {}     Thread_Per_Connection_Logging_Server *this_;     ACE_SOCK Stream logging peer_;   };   // Passed as a parameter to <ACE_Thread_Manager::spawn>.   static void *run_svc (void *arg); protected:   // Other methods shown below... }; 

As with Process_Per_Connection_Logging_Server in Section 8.4, we inherit and reuse the open () and wait_for_multiple_events () methods.

Sidebar 20: Traps and Pitfalls of Mixing Threads and Objects

The life cycle of objects passed as parameters to thread entry point functions must be managed carefully . Many programmers confuse a thread and an object by saying things like "this object runs in a thread." It's critical to make the following distinction:

  • A thread is a unit of execution.

  • An object is a chunk of memory and associated methods.

Thus, there's no implicit connection between a thread and any object that the thread accesses during execution. It's essential therefore to make sure that no thread can access an object after it's been deleted.

For example, if the Thread_Args object was constructed / destroyed in the scope of the for loop, the thread would have this object overwritten by a subsequent client connection. To avoid these problems, we therefore allocate Thread_Args objects dynamically and let the run_svc () function delete them before returning.

The handle_connections () method accepts a connection into the ACE_ SOCK_Stream data member of the thread_args object. (Sidebar 20 explains why we allocate thread_args dynamically). We then use the singleton ACE_Thread_Manager to spawn a new thread that handles the newly connected client.

 virtual int handle_connections () {   Thread_Args *thread_args = new Thread_Args (this);   if (acceptor ().accept (thread_args->logging_peer_) == -1)     return -1;   else if (ACE_Thread_Manager:: instance ()->spawn (       // Pointer-to-function entry point.       Thread_Per_Connection_Logging_Server::run_svc,       // <run_svc> parameter.       ACE_static_cast (void *, thread_ args),       THR_DETACHED  THR_SCOPE_SYSTEM) == -1)     return -1;   else     return 0; } 

The static run_svc () method is the entry point function of each new thread. The underlying OS thread creation function will pass the thread_args pointer to run_svc () , which assumes control of the logging_peer 's lifetime. Since we don't rendezvous with the thread to collect its exit status, we pass the THR_DETACHED flag, which instructs the ACE_Thread_Manager and underlying OS threading implementation to free up resources as soon as a thread exits. All threads spawned by the ACE_Thread_Manager singleton can be managed as a whole. We can therefore wait for them to finish even though they weren't designated as joinable via the THR_JOINABLE flag when they were spawned. Sidebar 21 illustrates how threads are spawned using these flags.

Sidebar 21: How Threads are Spawned in ACE

The figure below illustrates the sequence of calls that occur when ACE__Thread_Manager:: spawn () is invoked on a platform configuration that uses the UI Threads thr_create () system function:



Regardless of OS, the following steps occur to spawn a thread:

  1. The OS creates a thread execution context

  2. The OS allocates memory for the thread's stack

  3. The new thread's register set is prepared so that when it's scheduled into execution, it will call the thread entry point function supplied as a parameter to spawn () and

  4. The thread is marked runnable so the OS can start executing it

The run_svc () method shown below serves as the entry point for each new thread created to process a client log records.

 void *Thread_Per_Connection_Logging_Server::run_svc (void *arg) {   Thread_Args *thread_args = ACE_static_cast (Thread_Args *, arg);   thread_args->this_->handle_data (&thread_args->logging_peer_);   thread_args->logging_peer_.close ();   delete thread_args;   return 0; // Return value is ignored. } 

As shown in Sidebar 21 on page 195, the ACE_Thread_Manager ::spawn () method is passed the Thread_Args pointer used by run_svc () . This pointer must be cast to a void * to conform to the ACE threading API, which is portable across operating systems. After the run_svc () function is invoked by the OS thread library, it casts the pointer back to a Thread_Args * . The ACE ACE_static_cast () macro (see Sidebar 17 on page 176) makes it easy to cast the pointer as safely as the compiler allows. Note that since we spawned run_svc () in a detached thread it's return value is ignored.

The static run_svc () method uses the data members in the Thread_ Args parameter to forward control back to the following handle_data () method so that log record processing can continue.

 protected:   virtual int handle_data (ACE_SOCK_Stream *logging_peer) {     ACE_FILE_IO log_file;     // Client's hostname is used as the logfile name.     make_log_file (log_file, logging_peer);     // Place the connection into blocking mode.     client->disable (ACE_NONBLOCK);     Logging_Handler logging_handler (log_file, *logging_peer);     ACE_Thread_Manager *tm = ACE_Thread_Manager:: instance ();     ACE_thread_t me = ACE_OS::thr_self ();     // Keep handling log records until client closes connection     // or this thread is asked to cancel itself.     while (!tm->testcancel (me)            && logging_handler.log_record () != -1)       continue;     log_file.close ();     return 0; } 

This version of handle_data () is similar to the process-per-connection example on page 179. In this version, however, the handle_data () thread that's processing client log records can be asked to cancel itself, which is an example of cooperative cancelation. To cooperate, each handle_data () thread calls ACE_Thread_Manager:: testcancel () before handling a log record to see if the main thread has requested a cancelation.

One shortcoming of the placement of the testcancel () call and the design of the Logging_Handler:: log_record () method is that the thread won't notice the cancel request until after the next log record is received. If clients send log records fairly often this may not be a problem. The ACE Reactor and Task frameworks described in [SH] can be used to avoid this shortcoming.

Finally, the main () program executes the logging server's event loop within its run () template method.

 int main (int argc, char *argv[]) {   // Register to receive the <SIGTERM> signal.   ACE_Sig_Action sa (sigterm_handler, SIGTERM);   Thread_Per_Connection_Logging_Server server;   if (server.run (argc, argv) == -1}     ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "server.run()"), 1);   // Cooperative thread cancelation.   ACE_Thread_Manager::instance ()->cancel_all ();   // Barrier synchronization, wait no more than a minute.   ACE_Time_Value timeout (60);   return ACE Thread Manager::instance ()->wait (&timeout); } 

The thread-per-connection implementation of the logging server catches the SIGTERM signal to allow a system administrator to shut it down. Rather than wait an unlimited time for all logging clients to terminate their sessions, however, the main thread uses the ACE_Thread_Manager cooperative cancelation mechanism to request all service threads spawned by the ACE_Thread_Manager singleton to shut down.

The wait () call at the end of main () allows the logging server to wait up to a minute to synchronize on the completion of all the canceled threads. This is an example of barrier synchronization and it prevents problems from occurring on OS platforms where undesirable behavior occurs if the main thread exits while other threads are still running. By bounding our waiting time via the timeout argument, however, we ensure that the logging server doesn't hang indefinitely.

I l @ ve RuBoard


C++ Network Programming
C++ Network Programming, Volume I: Mastering Complexity with ACE and Patterns
ISBN: 0201604647
EAN: 2147483647
Year: 2001
Pages: 101

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net