Ru-Brd |
MotivationThe ACE_Message_Queue class described in Section 6.2 can be used to
To use a producer/consumer concurrency model effectively in an object-oriented program, however, each thread should be associated with the message queue and any other service- related information. To preserve modularity and cohesion, and to reduce coupling, it's therefore best to encapsulate an ACE_Message_Queue with its associated data and methods into one class whose service threads can access it directly. Thread-spawning capabilities provided by popular OS platforms are based on each spawned thread invoking a C-style function call. The ACE_Thread_Manager wrapper facade class described in Chapter 9 of C++NPv1 implements portable multithreading capabilities. However, programmers must still pass a C-style function to its spawn() and spawn_n() methods. Providing a spawned thread with access to a C++ object requires a bridge to the C++ object environment. The CLD_Handler:: open () method (page 172) illustrated this technique. Since implementing this technique manually for each class is repetitive, it's a good candidate for reuse. The ACE Task framework therefore defines ACE_Task to encapsulate a class's messaging capability and provide a portable way for thread(s) to execute in the context of an object. Class CapabilitiesACE_Task is the basis of ACE's object-oriented concurrency framework. It provides the following capabilities:
Our focus in this section is on the ACE_Task capabilities for queueing and processing messages. It obtains its event-handling, configuration, and dynamic linking/ unlinking capabilities as a consequence of inheriting from ACE classes described in previous chapters. The interface for ACE_Task is shown in Figure 6.6. Since this class has a rich interface, we group the description of its methods into the three categories described below. Sidebar 60 (page 308) describes some additional ACE_Task methods that are related to the ACE Streams framework. Figure 6.6. The ACE_Task Class
1. Task initialization methods. The methods used to initialize a task are shown in the following table:
Applications can customize the startup behavior of an ACE_Task by overriding its open() hook method. This method allocates resources used by a task, such as connection handlers, I/O handles, and synchronization locks. Since open() is generally called after the init() method that's inherited via ACE_Service_Object , any options passed via the ACE Service Configurator framework have already been processed . Therefore, open() can act on those configured preferences. The open() method is also often used to convert a task into an active object by calling ACE_Task::activate() (page 187). The thr_mgr() and msg_queue() methods make it possible to access and change the thread management and message queueing mechanisms used by a task. Alternative thread management and message queueing mechanisms can also be passed to the ACE_Task constructor when an instance of the class is created. The activate() method uses the ACE_Thread_Manager pointer returned by the thr_mgr() accessor to spawn one or more threads that run within the task. This method converts the task into an active object whose thread(s) direct its own execution and response to events, rather than being driven entirely by passive method calls that borrow the thread of the caller. Sidebar 42 (page 186) describes how to avoid memory leaks when an activated task's thread(s) exit. 2. Task communication, processing, and synchronization methods. The following table describes the methods used to communicate between tasks and to process messages passively and actively within a task:
A subclass of ACE_Task can perform application-defined processing on messages passed to it by overriding its put() and svc() hook methods to implement the following two processing models: 1. Passive processing. The put() method is used to pass messages to an ACE_Task . Pointers to ACE_Message_Block s are passed between tasks to avoid data copying overhead. Task processing can be performed entirely in the context of put() , where the caller's thread is borrowed for the duration of its processing. A task's svc() hook method need not be used if the task only processes requests passively in put() .
2. Active processing. A task's application-defined processing can also be performed actively . In this case, one or more threads execute the task's svc() hook method to process messages concurrently with respect to other activities in an application. If a task's put() method doesn't perform all the processing on a message, it can use putq() to enqueue the message and return to its caller immediately. A task's svc() hook method can use ACE_Task::getq() to dequeue messages placed onto the message queue and process them concurrently. The getq() method blocks until either a message is available on the message queue or the specified absolute timeout elapses. The blocking nature of getq() allows the thread(s) of a task to block and only wake up when there's work available on the message queue. Unlike put() , the svc() method is never invoked by a client of a task directly. It's instead invoked by one or more threads when a task becomes an active object after its activate() method is called. The activate() method uses the ACE_Thread_Manager associated with an ACE_Task to spawn one or more threads, as shown below: template <class SYNCH_STRATEGY> int ACE_Task<SYNCH_STRATEGY>::activate (long flags, int n_threads, /* Other params omitted */) { // ... thr_mgr ()->spawn_n (n_threads, &ACE_Task<SYNCH_STRATEGY>::svc_run, ACE_static_cast (void *, this), flags, /* Other params omitted */); // ... } The THR _ SCOPE _ SYSTEM , THR _ SCOPE _ PROCESS , THR _ NEW _ LWP , THR _ DETACHED , and THR _ JOINABLE flags in the table on page 190 of C++NPv1 can be passed in the flags parameter to activate() . Sidebar 42 describes how THR _ DETACHED and THR _ JOINABLE can be used to avoid memory leaks when threads exit. ACE_Task::svc_run() is a static method used by activate() as an adapter function. It runs in the newly spawned thread(s) of control, which provide an execution context for the svc() hook method. Figure 6.7 illustrates the steps associated with activating an ACE_Task using the Windows _beginthreadex() function to spawn the thread. Naturally, the ACE_Task class shields applications from OS-specific details. Figure 6.7. Activating an ACE Task
When an ACE_Task subclass executes as an active object, its svc() method runs an event loop that uses its getq() method to wait for messages to arrive on the task's message queue. This queue can buffer a sequence of data messages and control messages for subsequent processing by a task's svc() method. As messages arrive and are enqueued by a task's put() method, its svc() method runs in separate thread(s), dequeueing the messages and performing application-defined processing concurrently, as shown in Figure 6.8. Figure 6.8. Passing Messages Between ACE_Task Objects
Sidebar 43 compares and contrasts the ACE_Task capabilities with Java's Runnable interface and Thread class. 3. Task destruction. The methods used in the destruction of all or parts of a task are shown in the following table:
The lifetime of an ACE_Task object is not tied to the lifetime of any threads activated in that object. Since deleting an ACE_Task object doesn't shut down any active threads, the threads must therefore exit before the task object can be deleted. ACE provides various ways to request a task's threads to exit, including the cooperative cancellation mechanism (pages 190191 of C++NPv1). A task's message queue can also be used to pass a shutdown message to the task's threads, as described in Sidebar 41 (page 167). Applications can customize an ACE_Task 's destruction by overriding its close() hook method. This method can free application-defined resources allocated by a task, such as connection control blocks, I/O handles, and synchronization locks. Whereas the open() hook method should be called at most once per instance to initialize an object before it's activated, both the svc() and close() hook methods are called once in each thread. The svc() hook should therefore perform any needed per-thread initialization. The ACE Task framework will call the close() hook method in each thread after the svc() hook method returns, so avoid calling a task's close() hook directly, particularly if a task is an active object. This asymmetry between the open() and close() hook methods is necessary because there's no reliable opportunity to clean up a task's resources except in the task's threads.
If a task activates multiple threads, the close() method must not free resources (or delete the task object itself) if other threads are still executing. The thr_count() method returns the number of threads still active in the task. ACE_Task decrements the thread count before calling close() ,soif thr_count() returns a value greater than 0, the object is still active. The wait() method can be used to block until all threads running in this task exit, at which point thr_count() equals 0. Sidebar 44 (page 190) describes the steps to follow when destroying an ACE_Task . ExampleThis example shows how to combine ACE_Task and ACE_Message_Queue with the ACE_Reactor from Chapter 3 and ACE_Service_Config from Chapter 5 to implement a concurrent logging server. This server design is based on the Half-Sync/Half-Async pattern [POSA2] and the eager spawning thread pool strategy described in Chapter 5 of C++NPv1. Figure 6.9 (page 191) shows how a pool of worker threads is prespawned when the logging server is launched. Log records can be processed concurrently until the number of simultaneous client requests exceeds the number of worker threads. At this point, the main thread buffers additional requests in a synchronized ACE_Message_Queue until a worker thread becomes available or until the queue becomes full. Figure 6.9. Architecture of the Thread Pool Logging Server
The ACE_Message_Queue plays several roles in our thread pool logging server's half-sync/half-async concurrency design:
Prespawning and queueing help to amortize the cost of thread creation, as well as constrain the use of OS resources, which can significantly improve server scalability. The following table outlines the classes that we'll cover in the thread pool logging server example below:
The relationship between these classes is shown in Figure 6.10 (page 192). The TP_Logging_Acceptor and TP_Logging_Handler classes play the reactive role in the Half-Sync/Half-Async pattern and the TP_Logging_Task::svc() method, which runs concurrently in the worker threads, plays the synchronous role in the pattern. Figure 6.10. The Thread Pool Logging Server Classes
Each class in Figure 6.10 is described below. We start by including the necessary ACE header files into the TP_Logging_Server.h file: #include "ace/OS.h" #include "ace/Auto_Ptr.h" #include "ace/Singleton.h" #include "ace/Synch.h" #include "ace/Task.h" #include "Logging_Acceptor.h" #include "Logging_Event_Handler.h" #include "Reactor_Logging_Server.h" #include "TPLS_export.h" TP Logging Task. This class provides the following capabilities:
The TP_Logging_Task is shown below: class TP_Logging_Task : public ACE_Task<ACE_MT_SYNCH> { // Instantiated with an MT synchronization trait. public: enum { MAX_THREADS = 4 }; // ...Methods defined below... }; The TP_Logging_Task::open() hook method calls ACE_Task::activate() to convert this task into an active object, as follows : virtual int open (void * = 0) { return activate (THR_NEW_LWP, MAX_THREADS); } If activate() returns successfully, the TP_Logging_Task::svc() method will be running in MAX _ THREADS separate threads. We show the TP_Logging_Task::svc() method (page 199) after we describe the TP_Logging_Acceptor and TP_Logging_Handler classes. The TP_Logging_Task::put() method inserts a message block containing a log record into the queue. virtual int put (ACE_Message_Block *mblk, ACE_Time_Value *timeout = 0) { return putq (mblk, timeout); } We need only one instance of TP_Logging_Task , so we convert it into a singleton using the singleton adapter template described in Sidebar 45 (page 194). Since the TP_Logging_Task will be located in a DLL, however, we must use the ACE_Unmanaged_Singleton rather than ACE_Singleton . This design requires that we close the singleton explicitly when the logging task shuts down in TP_Logging_Server::fini() (page 201). typedef ACE_Unmanaged_Singleton<TP_Logging_Task, ACE_Null_Mutex> TP_LOGGING_TASK; Since TP_LOGGING_TASK::instance() is only accessed from the main thread, we use ACE_Null_Mutex as the synchronization type parameter for ACE_Unmanaged_Singleton . If this singleton were accessed concurrently by other threads we'd need to parameterize it with ACE_Recursive_Thread_Mutex (Chapter 10 of C++NPv1) to serialize access. TP_Logging_Acceptor. This class is a factory that provides the following capabilities:
The TP_Logging_Acceptor class is shown below:
class TP_Logging_Acceptor : public Logging_Acceptor { public: TP_Logging_Acceptor (ACE_Reactor *r = ACE_Reactor::instance ()) : Logging_Acceptor (r) {} virtual int handle_input (ACE_HANDLE) { TP_Logging_Handler *peer_handler = 0; ACE_NEW_RETURN (peer_handler, TP_Logging_Handler (reactor ()), -1); if (acceptor_.accept (peer_handler->peer ()) == -1) { delete peer_handler; return -1; } else if (peer_handler->open () == -1) peer_handler->handle_close (ACE_INVALID_HANDLE, 0); return 0; } }; Since TP_Logging_Acceptor inherits from the Logging_Acceptor (page 54) it can override handle_input() to create instances of TP_Logging_Handler . TP_Logging_Handler. This class provides the following capabilities:
The TP_Logging_Handler class is shown below: class TP_Logging_Handler : public Logging_Event_Handler { friend class TP_Logging_Acceptor; Since this class derives from Logging_Event_Handler (page 59 in Section 3.3), it can receive log records when dispatched by a reactor. The destructor is defined as protected to ensure dynamic allocation. However, since the TP_Logging_Acceptor::handle_input() method deletes objects of this type, TP_Logging_Acceptor must be a friend of this class. Declaring a friend class is appropriate in this case because these classes exhibit a high degree of cohesion, and it's important to maintain the restriction on dynamic allocation of TP_Logging_Handler . The three data members are used to implement the protocol for closing TP_Logging_Handler objects concurrently, as described in Sidebar 46 (page 196). protected: virtual TP_Logging_Handler () {} // No-op destructor. // Number of pointers to this class instance that currently // reside in the <TP_LOGGING_TASK> singleton's message queue. int queued_count_; // Indicates whether <Logging_Event_Handler::handle_close()> // must be called to cleanup and delete this object. int deferred_close_; // Serialize access to <queued_count_> and <deferred_close_>. ACE_Thread_Mutex lock_;
The public part of the class defines the constructor and a pair of methods dispatched by the reactor when certain events occur. public: TP_Logging_Handler (ACE_Reactor *reactor) : Logging_Event_Handler (reactor), queued_count_ (0), deferred_close_ (0) {} // Called when input events occur, e.g., connection or data. virtual int handle_input (ACE_HANDLE); // Called when this object is destroyed, e.g., when it's // removed from a reactor. virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask); }; ACE_FACTORY_DECLARE (TPLS, TP_Logging_Handler) TP_Logging_Handler::handle_input() plays the reactive role in the HalfSync/Half-Async pattern. It differs from the Logging_Event_Handler::handle_input() method (page 59) since it doesn't process a log record immediately after receiving it. Instead, it inserts each log record into the TP_LOGGING_TASK singleton's message queue, where it will be processed concurrently. However, while processing the log record, the TP_LOGGING_TASK needs to access this handler's log file (inherited from Logging_Event_Handler ). To facilitate this, handle_input() combines the log record with a message block containing the handler's pointer and inserts the resulting composite message at the end of the TP_LOGGING_TASK singleton's message queue, as shown below: 1 int TP_Logging_Handler::handle_input (ACE_HANDLE) { 2 ACE_Message_Block *mblk = 0; 3 if (logging_handler_.recv_log_record (mblk) != -1) { 4 ACE_Message_Block *log_blk = 0; 5 ACE_NEW_RETURN 6 (log_blk, ACE_Message_Block 7 (ACE_reinterpret_cast (char *, this)), -1); 8 log_blk->cont (mblk); 9 ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, lock_, -1); 10 if (TP_LOGGING_TASK::instance ()->put (log_blk) == -1) 11 { log_blk->release (); return -1; } 12 ++queued_count_; 13 return 0; 14 } else return -1; 15 } Lines 23 Read a log record from a connected socket into a dynamically allocated ACE_Message_Block . Lines 48 Create an ACE_Message_Block called log_blk that contains a pointer to this handler. The ACE_Message_Block constructor simply "borrows" the this pointer and sets the ACE_Message_Block::DONT_DELETE flag internally to ensure that the handler itself isn't destroyed when the message block is released in TP_Logging_Task::svc() (page 199). The mblk is attached to log_blk 's continuation chain to form a composite message. Lines 910 Use an ACE_Guard to acquire the lock_ that serializes access to queued_count_ . To avoid a race condition in which a service thread processes the record before queued_count_ can be incremented, the lock is acquired before calling put() to insert the composite message block into the TP_LOGGING_TASK singleton's message queue. Lines 1113 Release the log_blk resources and return -1 if put() fails. If it succeeds, increment the count of the number of times the handler's in the TP_LOGGING_TASK 's queue. In either case, the return statement causes the guard to release lock_ . Line 14 If the client closes the connection, or a serious error occurs, handle_input() returns -1. This value causes the reactor to call TP_Logging_Handler::handle_close() , which implements a key portion of the protocol for closing TP_Logging_Handler s concurrently, as described in Sidebar 46 (page 196). The handle_close() method is shown below: 1 int TP_Logging_Handler::handle_close (ACE_HANDLE handle, 2 ACE_Reactor_Mask) { 3 int close_now = 0; 4 if (handle != ACE_INVALID_HANDLE) { 5 ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, lock_, -1); 6 if (queued_count_ == 0) close_now = 1; 7 else deferred_close_ = 1; 8 } else { 9 ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, lock_, -1); 10 queued_count_--; 11 if (queued_count_ == 0) close_now = deferred_close_; 12 } 13 14 if (close_now) return Logging_Event_Handler::handle_close (); 15 return 0; 16 } Line 3 The close_now variable records whether the Logging_Event_Handler::handle_close() method should be called on line 14. This decision depends on the reference count decisions made in the rest of this method. Lines 47 This code runs when handle_close() is called when handle_input() returns -1. Lines 67 are performed within a critical section protected by an ACE_Guard that acquires and releases the lock_ automatically within the scope of the if clause. If queue_count_ equals 0, there are no references to this object remaining in the TP_LOGGING_TASK , so we set the close_now local variable to 1. Otherwise, we set the deferred_close_ data member to note that as soon as the reference count reaches 0 this handler should be destroyed since the client has already closed its socket endpoint. As log records are processed, the TP_Logging_Task::svc() method (page 199) calls handle_close() again, and will execute the lines described below. Lines 812 The handle equals ACE _ INVALID _ HANDLE when handle_close() is called in TP_Logging_Task::svc() (page 199) after the handler has been removed from the task's message queue. We therefore decrement the queue_count_ . If the count is now 0, we record whether we need to finish processing a close request that was deferred earlier. As in lines 57, we use an ACE_Guard to acquire and release the lock_ automatically within the scope of the else clause. Line 14 Call Logging_Event_Handler::handle_close() (page 60) if the local variable close_now is true to close the socket and log file, and then delete itself. Now that we've shown the TP_Logging_Handler class, we can show the TP_Logging_Task::svc() method, which runs concurrently in each of the worker threads and implements the synchronous role in the Half-Sync/Half-Asynch pattern. This method runs its own event loop that blocks on the synchronized message queue. After a message is enqueued by TP_Logging_Handler::handle_input() (page 197), it will be dequeued by an available worker thread and written to the appropriate log file corresponding to the client, as shown below. 1 int TP_Logging_Task::svc () { 2 for (ACE_Message_Block *log_blk; getq (log_blk) != -1;) { 3 TP_Logging_Handler *tp_handler = ACE_reinterpret_cast 4 (TP_Logging_Handler *, log_blk->rd_ptr ()); 5 Logging_Handler logging_handler (tp_handler->log_file ()); 6 logging_handler.write_log_record (log_blk->cont ()); 7 log_blk->release (); 8 tp_handler->handle_close (ACE_INVALID_HANDLE, 0); 9 } 10 return 0; 11 } Lines 24 Call the getq() method, which blocks until a message block is available. As shown in Figure 6.11, each message block is a composite message containing three message blocks chained together via their continuation pointers in the following order: Figure 6.11. Message Block Chain of Log Record Information
Lines 56 Initialize logging_handler with the log_file and then call Logging_Handler::write_log_record() , which writes the log record to the log file. The write_log_record() method is responsible for releasing its message block chain, as shown in Chapter 4 of C++NPv1. Lines 78 Call log_blk->release() to reclaim allocated resources. Since the TP_Logging_Handler pointer is borrowed rather than allocated dynamically, however, we must explicitly call TP_Logging_Handler::handle_close() (page 198) on tp_handler . This method decreases the TP_Logging_Handler reference count and cleans up the object properly, using the protocol described in Sidebar 46 (page 196). TP_Logging_Server. This facade class inherits from ACE_Service_Object , contains a Reactor_Logging_Server , and uses the TP_LOGGING_TASK singleton, as shown below. class TP_Logging_Server : public ACE_Service_Object { protected: // Contains the reactor, acceptor, and handlers. typedef Reactor_Logging_Server<TP_Logging_Acceptor> LOGGING_DISPATCHER; LOGGING_DISPATCHER *logging_dispatcher_; public: TP_Logging_Server (): logging_dispatcher_ (0) {} // Other methods defined below... }; The TP_Logging_Server::init() hook method enhances the reactive logging server implementation from Chapter 3 as follows: virtual int init (int argc, ACE_TCHAR *argv[]) { int i; char **array = 0; ACE_NEW_RETURN (array, char*[argc], -1); ACE_Auto_Array_Ptr<char *> char_argv (array); for (i = 0; i < argc; ++i) char_argv[i] = ACE::strnew (ACE_TEXT_ALWAYS_CHAR (argv[i])); ACE_NEW_NORETURN (logging_dispatcher_, TP_Logging_Server::LOGGING_DISPATCHER (i, char_argv.get (), ACE_Reactor::instance ())); for (i = 0; i < argc; ++i) ACE::strdelete (char_argv[i]); if (logging_dispatcher_ == 0) return -1; else return TP_LOGGING_TASK::instance ()->open (); } This init() method is similar to the one on page 122. It allocates an instance of TP_Logging_Server::LOGGING_DISPATCHER and stores its pointer in the logging_dispatcher_ member. It also calls TP_Logging_Task::open() to prespawn a pool of worker threads that process log records concurrently. The TP_Logging_Server::fini() method is shown next : 1 virtual int fini () { 2 TP_LOGGING_TASK::instance ()->flush (); 3 TP_LOGGING_TASK::instance ()->wait (); 4 TP_LOGGING_TASK::close (); 5 delete logging_dispatcher_; 6 return 0; 7 } Line 2 Call the flush() method of the TP_LOGGING_TASK singleton, thereby closing the message queue associated with the task, which deletes all the queued messages and signals the threads in the pool to exit. Lines 34 Use the ACE_Thread_Manager 's barrier synchronization feature to wait for the pool of threads spawned by TP_Logging_Task::open() to exit and then explicitly close the singleton because this DLL is about to be unlinked. Lines 56 Delete the logging_dispatcher_ allocated in init() and return. For brevity, we omit the suspend() , resume() , and info () hook methods, which are similar to those shown in earlier examples. Finally, we place ACE _ FACTORY _ DEFINE into TP_Logging_Server.cpp . ACE_FACTORY_DEFINE (TPLS, TP_Logging_Server) This macro automatically defines the _make_TP_Logging_Server() factory function that's used in the following svc.conf file: dynamic TP_Logging_Server Service_Object * TPLS:_make_TP_Logging_Server() "$TP_LOGGING_SERVER_PORT" This file directs the ACE Service Configurator framework to configure the thread pool logging server via the following steps:
Once again, the ACE Service Configurator framework enables us to reuse the main() program from Configurable_Logging_Server.cpp (page 147). |
Ru-Brd |