6.3 The ACE_Task_Class

Ru-Brd

Motivation

The ACE_Message_Queue class described in Section 6.2 can be used to

  • Decouple the flow of information from its processing

  • Link threads that execute producer/consumer services concurrently

To use a producer/consumer concurrency model effectively in an object-oriented program, however, each thread should be associated with the message queue and any other service- related information. To preserve modularity and cohesion, and to reduce coupling, it's therefore best to encapsulate an ACE_Message_Queue with its associated data and methods into one class whose service threads can access it directly.

Thread-spawning capabilities provided by popular OS platforms are based on each spawned thread invoking a C-style function call. The ACE_Thread_Manager wrapper facade class described in Chapter 9 of C++NPv1 implements portable multithreading capabilities. However, programmers must still pass a C-style function to its spawn() and spawn_n() methods. Providing a spawned thread with access to a C++ object requires a bridge to the C++ object environment. The CLD_Handler:: open () method (page 172) illustrated this technique. Since implementing this technique manually for each class is repetitive, it's a good candidate for reuse. The ACE Task framework therefore defines ACE_Task to encapsulate a class's messaging capability and provide a portable way for thread(s) to execute in the context of an object.

Class Capabilities

ACE_Task is the basis of ACE's object-oriented concurrency framework. It provides the following capabilities:

  • It uses an instance of ACE_Message_Queue from Section 6.2 to separate data and requests from their processing.

  • It uses the ACE_Thread_Manager class to activate the task so it runs as an active object [POSA2] that processes its queued messages in one or more threads of control. Since each thread runs a designated class method, they can access all of the task's data members directly.

  • It inherits from ACE_Service_Object , so its instances can be configured dynamically via the ACE Service Configurator framework from Chapter 5.

  • It's a descendant of ACE_Event_Handler , so its instances can also serve as event handlers in the ACE Reactor framework from Chapter 3.

  • It provides virtual hook methods that application classes can reimplement for task-specific service execution and message handling.

Our focus in this section is on the ACE_Task capabilities for queueing and processing messages. It obtains its event-handling, configuration, and dynamic linking/ unlinking capabilities as a consequence of inheriting from ACE classes described in previous chapters.

The interface for ACE_Task is shown in Figure 6.6. Since this class has a rich interface, we group the description of its methods into the three categories described below. Sidebar 60 (page 308) describes some additional ACE_Task methods that are related to the ACE Streams framework.

Figure 6.6. The ACE_Task Class

1. Task initialization methods. The methods used to initialize a task are shown in the following table:

Method

Description

ACE_Task()

Constructor that can assign pointers to the ACE_Message_Queue and ACE_Thread_Manager used by the task

open()

Hook method that performs application-defined initialization activities

thr_mgr()

Get and set a pointer to the task's ACE_Thread_Manager

msg_queue()

Get and set a pointer to the task's ACE_Message_Queue

activate()

Convert a task into an active object that runs in one or more threads

Applications can customize the startup behavior of an ACE_Task by overriding its open() hook method. This method allocates resources used by a task, such as connection handlers, I/O handles, and synchronization locks. Since open() is generally called after the init() method that's inherited via ACE_Service_Object , any options passed via the ACE Service Configurator framework have already been processed . Therefore, open() can act on those configured preferences. The open() method is also often used to convert a task into an active object by calling ACE_Task::activate() (page 187).

The thr_mgr() and msg_queue() methods make it possible to access and change the thread management and message queueing mechanisms used by a task. Alternative thread management and message queueing mechanisms can also be passed to the ACE_Task constructor when an instance of the class is created.

The activate() method uses the ACE_Thread_Manager pointer returned by the thr_mgr() accessor to spawn one or more threads that run within the task. This method converts the task into an active object whose thread(s) direct its own execution and response to events, rather than being driven entirely by passive method calls that borrow the thread of the caller. Sidebar 42 (page 186) describes how to avoid memory leaks when an activated task's thread(s) exit.

2. Task communication, processing, and synchronization methods. The following table describes the methods used to communicate between tasks and to process messages passively and actively within a task:

Method

Description

svc()

A hook method that can implement a task's service processing. It is executed by all threads spawned via the activate() method.

put()

A hook method that can be used to pass a message to a task, where it can be processed immediately or queued for subsequent processing by the svc() hook method.

putq()

getq()

ungetq()

Insert, remove, and replace messages from the task's message queue. The putq() , getq() , and ungetq() methods simplify access to the enqueue_tail() , dequeue_head() , and enqueue_head() methods of a task's message queue, respectively.

A subclass of ACE_Task can perform application-defined processing on messages passed to it by overriding its put() and svc() hook methods to implement the following two processing models:

1. Passive processing. The put() method is used to pass messages to an ACE_Task . Pointers to ACE_Message_Block s are passed between tasks to avoid data copying overhead. Task processing can be performed entirely in the context of put() , where the caller's thread is borrowed for the duration of its processing. A task's svc() hook method need not be used if the task only processes requests passively in put() .

Sidebar 42: Avoiding Memory Leaks When Threads Exit

Calls to the ACE_Task::activate() or the ACE_Thread_Manager 's spawn() and spawn_n() methods can include either of the following flags:

  • THR _ DETACHED , which designates the spawned thread(s) as detached so that when the thread exits, the ACE_Thread_Manager ensures the storage used for the thread's state and exit status is reclaimed.

  • THR _ JOINABLE , which designates the spawned thread(s) as joinable so that ACE_Thread_Manager ensures the identity and exit status of an exiting thread is retained until another thread reaps its exit status.

The terms detached and joinable stem from POSIX Pthreads [IEE96].

By default, ACE_Thread_Manager (and hence the ACE_Task class that uses it) spawns threads with the THR _ JOINABLE flag. To avoid leaking resources that the OS holds for joinable threads, an application must call one of the following methods:

  1. ACE_Task::wait() , which waits for all threads to exit an ACE_Task object

  2. ACE_Thread_Manager::wait_task() , which waits for all threads to exit in a specified ACE_Task object

  3. ACE_Thread_Manager::join() , which waits for a designated thread to exit.

If none of these methods are called, ACE and the OS won't reclaim the thread stack and exit status of a joinable thread, and the program will leak memory.

If it's inconvenient to wait for threads explicitly in your program, you can simply pass THR _ DETACHED when spawning threads or activating tasks. Many networked application tasks and long-running daemon threads can be simplified by using detached threads. However, an application can't wait for a detached thread to finish with ACE_Task::wait() or obtain its exit status via join() . Applications can, however, use ACE_Thread_Manager::wait() to wait for both joinable and detached threads managed by an ACE_Thread_Manager to finish.

2. Active processing. A task's application-defined processing can also be performed actively . In this case, one or more threads execute the task's svc() hook method to process messages concurrently with respect to other activities in an application. If a task's put() method doesn't perform all the processing on a message, it can use putq() to enqueue the message and return to its caller immediately.

A task's svc() hook method can use ACE_Task::getq() to dequeue messages placed onto the message queue and process them concurrently. The getq() method blocks until either a message is available on the message queue or the specified absolute timeout elapses. The blocking nature of getq() allows the thread(s) of a task to block and only wake up when there's work available on the message queue.

Unlike put() , the svc() method is never invoked by a client of a task directly. It's instead invoked by one or more threads when a task becomes an active object after its activate() method is called. The activate() method uses the ACE_Thread_Manager associated with an ACE_Task to spawn one or more threads, as shown below:

 template <class SYNCH_STRATEGY> int  ACE_Task<SYNCH_STRATEGY>::activate (long flags,                                      int n_threads,                                      /* Other params omitted */) {    // ...    thr_mgr ()->spawn_n (n_threads,                         &ACE_Task<SYNCH_STRATEGY>::svc_run,                         ACE_static_cast (void *, this),                         flags, /* Other params omitted */);    // ...  } 

The THR _ SCOPE _ SYSTEM , THR _ SCOPE _ PROCESS , THR _ NEW _ LWP , THR _ DETACHED , and THR _ JOINABLE flags in the table on page 190 of C++NPv1 can be passed in the flags parameter to activate() . Sidebar 42 describes how THR _ DETACHED and THR _ JOINABLE can be used to avoid memory leaks when threads exit.

ACE_Task::svc_run() is a static method used by activate() as an adapter function. It runs in the newly spawned thread(s) of control, which provide an execution context for the svc() hook method. Figure 6.7 illustrates the steps associated with activating an ACE_Task using the Windows _beginthreadex() function to spawn the thread. Naturally, the ACE_Task class shields applications from OS-specific details.

Figure 6.7. Activating an ACE Task

When an ACE_Task subclass executes as an active object, its svc() method runs an event loop that uses its getq() method to wait for messages to arrive on the task's message queue. This queue can buffer a sequence of data messages and control messages for subsequent processing by a task's svc() method. As messages arrive and are enqueued by a task's put() method, its svc() method runs in separate thread(s), dequeueing the messages and performing application-defined processing concurrently, as shown in Figure 6.8.

Figure 6.8. Passing Messages Between ACE_Task Objects

Sidebar 43 compares and contrasts the ACE_Task capabilities with Java's Runnable interface and Thread class.

3. Task destruction. The methods used in the destruction of all or parts of a task are shown in the following table:

Method

Description

ACE_Task()

Deletes resources allocated in ACE_Task constructor, which includes the message queue, if it wasn't passed as a parameter to the constructor.

close()

Hook method that performs application-defined shutdown activities. This method should generally not be called directly by applications, particularly if the task is an active object.

flush()

Closes the message queue associated with the task, which frees all of its enqueued message blocks and releases any threads blocked on the queue.

thr_count()

Returns the number of threads currently active in the ACE_Task .

wait()

A barrier synchronizer that waits for all joinable threads running in this task to exit before returning.

The lifetime of an ACE_Task object is not tied to the lifetime of any threads activated in that object. Since deleting an ACE_Task object doesn't shut down any active threads, the threads must therefore exit before the task object can be deleted. ACE provides various ways to request a task's threads to exit, including the cooperative cancellation mechanism (pages 190191 of C++NPv1). A task's message queue can also be used to pass a shutdown message to the task's threads, as described in Sidebar 41 (page 167).

Applications can customize an ACE_Task 's destruction by overriding its close() hook method. This method can free application-defined resources allocated by a task, such as connection control blocks, I/O handles, and synchronization locks. Whereas the open() hook method should be called at most once per instance to initialize an object before it's activated, both the svc() and close() hook methods are called once in each thread. The svc() hook should therefore perform any needed per-thread initialization. The ACE Task framework will call the close() hook method in each thread after the svc() hook method returns, so avoid calling a task's close() hook directly, particularly if a task is an active object. This asymmetry between the open() and close() hook methods is necessary because there's no reliable opportunity to clean up a task's resources except in the task's threads.

Sidebar 43: ACE_Task vs. Java Runnable and Thread

If you've used Java's Runnable interface and Thread class [Lea00], the ACE_Task design should look familiar, as discussed below:

  • ACE_Task::activate() is similar to the Java Thread.start() method since they both spawn internal threads. The Java Thread.start() method spawns only one thread, whereas activate() can spawn multiple threads within the same ACE_Task , making it easy to implement thread pools as shown in the Example part of this section.

  • ACE_Task::svc() is similar to the Java Runnable.run() method since both methods are hooks that run in newly spawned thread(s) of control. The Java run() hook method executes in only a single thread per object, whereas the ACE_Task::svc() method can execute in multiple threads per task object.

  • ACE_Task contains a message queue that allows applications to exchange and buffer messages. In contrast, this type of queueing capability must be added by Java developers explicitly.

If a task activates multiple threads, the close() method must not free resources (or delete the task object itself) if other threads are still executing. The thr_count() method returns the number of threads still active in the task. ACE_Task decrements the thread count before calling close() ,soif thr_count() returns a value greater than 0, the object is still active. The wait() method can be used to block until all threads running in this task exit, at which point thr_count() equals 0. Sidebar 44 (page 190) describes the steps to follow when destroying an ACE_Task .

Example

This example shows how to combine ACE_Task and ACE_Message_Queue with the ACE_Reactor from Chapter 3 and ACE_Service_Config from Chapter 5 to implement a concurrent logging server. This server design is based on the Half-Sync/Half-Async pattern [POSA2] and the eager spawning thread pool strategy described in Chapter 5 of C++NPv1. Figure 6.9 (page 191) shows how a pool of worker threads is prespawned when the logging server is launched. Log records can be processed concurrently until the number of simultaneous client requests exceeds the number of worker threads. At this point, the main thread buffers additional requests in a synchronized ACE_Message_Queue until a worker thread becomes available or until the queue becomes full.

Figure 6.9. Architecture of the Thread Pool Logging Server

Sidebar 44: Destroying an ACE_Task

Special care must be taken when destroying an ACE_Task that runs as an active object. Before destroying an active object, ensure that the thread(s) running its svc() hook method have exited. Sidebar 41 (page 167) describes several techniques to shut down svc() hook methods that are blocked on a task's message queue.

If a task's life cycle is managed externally, whether dynamically allocated or instantiated on the stack, one way to ensure a proper destruction sequence looks like this:

 My_Task *task = new Task; // Allocate a new task dynamically.  task->open (); // Initialize the task.  task->activate (); // Run task as an active object.  // ... do work ...  // Deactive the message queue so the svc() method unblocks  // and the thread exits.  task->msg_queue ()->deactivate ();  task->wait (); // Wait for the thread to exit.  delete task; // Reclaim the task memory. 

This technique relies on the task to exit all of its threads when the task's message queue is deactivated. This design introduces behavioral coupling, however, between the Task class and its users. Users depend on particular behavior when the message queue is deactivated, so any change to this behavior would cause undesired ripple effects throughout all systems that use the Task class.

If a task is allocated dynamically, it may therefore be better to have the task's close() hook delete itself when the last thread exits the task, rather than calling delete on a pointer to the task directly. You may still want to wait() on the threads to exit the task, however, particularly if you're preparing to shut down the process. On some OS platforms, when the main thread returns from main() , the entire process will be shut down immediately, whether there were other threads active or not.

The ACE_Message_Queue plays several roles in our thread pool logging server's half-sync/half-async concurrency design:

  • It decouples the main reactor thread from the thread pool. This design allows multiple worker threads to be active simultaneously . It also offloads the responsibility for maintaining queues of log record data from kernel space to user space, which has more virtual memory to queue log records than the kernel.

  • It helps to enforce flow control between clients and the server. When the number of bytes in the message queue reaches its high watermark, its flow control protocol blocks the main thread. As the underlying TCP socket buffers fill up, the flow control propagates back to the server's clients. This prevents clients from establishing new connections or sending log records until the worker threads have a chance to catch up and unblock the main thread.

Prespawning and queueing help to amortize the cost of thread creation, as well as constrain the use of OS resources, which can significantly improve server scalability.

The following table outlines the classes that we'll cover in the thread pool logging server example below:

Class

Description

TP_Logging_Task

Runs as an active object, with a pool of threads that process and store log records inserted into its synchronized message queue

TP_Logging_Handler

Target of upcalls from the ACE_Reactor that receives log records from clients and inserts them into the TP_Logging_Task 's message queue

TP_Logging_Acceptor

A factory that accepts connections and creates TP_Logging_Handler objects to process client requests

TP_Logging_Server

A facade class that integrates the other three classes together

The relationship between these classes is shown in Figure 6.10 (page 192). The TP_Logging_Acceptor and TP_Logging_Handler classes play the reactive role in the Half-Sync/Half-Async pattern and the TP_Logging_Task::svc() method, which runs concurrently in the worker threads, plays the synchronous role in the pattern.

Figure 6.10. The Thread Pool Logging Server Classes

Each class in Figure 6.10 is described below. We start by including the necessary ACE header files into the TP_Logging_Server.h file:

 #include "ace/OS.h"  #include "ace/Auto_Ptr.h"  #include "ace/Singleton.h"  #include "ace/Synch.h"  #include "ace/Task.h"  #include "Logging_Acceptor.h"  #include "Logging_Event_Handler.h"  #include "Reactor_Logging_Server.h"  #include "TPLS_export.h" 

TP Logging Task. This class provides the following capabilities:

  • It derives from ACE_Task , which it instantiates to provide a synchronized ACE_Message_Queue .

  • It spawns a pool of worker threads that all run the same svc() method to process and store log records inserted into its synchronized message queue.

The TP_Logging_Task is shown below:

 class TP_Logging_Task : public ACE_Task<ACE_MT_SYNCH> {           // Instantiated with an MT synchronization trait.  public:    enum { MAX_THREADS = 4 };    // ...Methods defined below...  }; 

The TP_Logging_Task::open() hook method calls ACE_Task::activate() to convert this task into an active object, as follows :

 virtual int open (void * = 0)  { return activate (THR_NEW_LWP, MAX_THREADS); } 

If activate() returns successfully, the TP_Logging_Task::svc() method will be running in MAX _ THREADS separate threads. We show the TP_Logging_Task::svc() method (page 199) after we describe the TP_Logging_Acceptor and TP_Logging_Handler classes.

The TP_Logging_Task::put() method inserts a message block containing a log record into the queue.

 virtual int put (ACE_Message_Block *mblk,                   ACE_Time_Value *timeout = 0)  { return putq (mblk, timeout); } 

We need only one instance of TP_Logging_Task , so we convert it into a singleton using the singleton adapter template described in Sidebar 45 (page 194). Since the TP_Logging_Task will be located in a DLL, however, we must use the ACE_Unmanaged_Singleton rather than ACE_Singleton . This design requires that we close the singleton explicitly when the logging task shuts down in TP_Logging_Server::fini() (page 201).

 typedef ACE_Unmanaged_Singleton<TP_Logging_Task, ACE_Null_Mutex>          TP_LOGGING_TASK; 

Since TP_LOGGING_TASK::instance() is only accessed from the main thread, we use ACE_Null_Mutex as the synchronization type parameter for ACE_Unmanaged_Singleton . If this singleton were accessed concurrently by other threads we'd need to parameterize it with ACE_Recursive_Thread_Mutex (Chapter 10 of C++NPv1) to serialize access.

TP_Logging_Acceptor. This class is a factory that provides the following capabilities:

  • It accepts connections from client logging daemons.

  • It creates TP_Logging_Handler s that receive log records from connected clients.

The TP_Logging_Acceptor class is shown below:

Sidebar 45: The ACE_Singleton Template Adapters

Although it's possible to code the TP_Logging_Task explicitly to be a singleton, this approach is tedious and error-prone . ACE therefore defines the following ACE_Singleton template adapter that applications can use to manage singleton life cycles:

 template <class TYPE, class LOCK>  class ACE_Singleton : public ACE_Cleanup {  public:    static TYPE *instance (void) {      ACE_Singleton<TYPE, LOCK> *&s = singleton_;      if (s == 0) {        LOCK *lock = 0;        ACE_GUARD_RETURN (LOCK, guard,              ACE_Object_Manager::get_singleton_lock (lock), 0);        if (s == 0) {          ACE_NEW_RETURN (s, (ACE_Singleton<TYPE, LOCK>), 0);          ACE_Object_Manager::at_exit (s);        }      }      return &s->instance_;    }  protected:    ACE_Singleton (void); // Default constructor.    TYPE instance_; // Contained instance.    // Single instance of the <ACE_Singleton> adapter.    static ACE_Singleton<TYPE, LOCK> *singleton_;  }; 

The ACE_Singleton::instance() static method uses the Double-Checked Locking Optimization pattern [POSA2] to construct and access an instance of the type-specific ACE_Singleton . It then registers the instance with the ACE_Object_Manager for cleanup at program termination. As described in Sidebar 23 of C++NPv1 (page 218), the ACE_ObjectManager assumes responsibility for destroying the ACE_Singleton instance, as well as the adapted TYPE instance.

A program can crash during singleton cleanup if the object code implementing TYPE is unlinked before the ACE_Object_Manager cleans up singletons, which is often the case for singletons located in dynamically linked services. We therefore recommend using ACE_Unmanaged_Singleton when defining singletons in DLLs that will be linked and unlinked dynamically. This class offers the same double-checked locking optimization to create the singleton. To destroy the singleton, however, requires an explicit call to ACE_Unmanaged_Singleton::close() . A dynamic service's fini() method is a good place to call this close() method, as shown in the TP_Logging_Server::fini() method (page 201).

 class TP_Logging_Acceptor : public Logging_Acceptor {  public:    TP_Logging_Acceptor (ACE_Reactor *r = ACE_Reactor::instance ())      : Logging_Acceptor (r) {}    virtual int handle_input (ACE_HANDLE) {      TP_Logging_Handler *peer_handler = 0;      ACE_NEW_RETURN (peer_handler,                      TP_Logging_Handler (reactor ()), -1);      if (acceptor_.accept (peer_handler->peer ()) == -1) {        delete peer_handler;        return -1;      } else if (peer_handler->open () == -1)        peer_handler->handle_close (ACE_INVALID_HANDLE, 0);      return 0;    }  }; 

Since TP_Logging_Acceptor inherits from the Logging_Acceptor (page 54) it can override handle_input() to create instances of TP_Logging_Handler .

TP_Logging_Handler. This class provides the following capabilities:

  • It receives log records from a connected client.

  • It enqueues the log records into the TP_LOGGING_TASK singleton's synchronized message queue.

The TP_Logging_Handler class is shown below:

 class TP_Logging_Handler : public Logging_Event_Handler {    friend class TP_Logging_Acceptor; 

Since this class derives from Logging_Event_Handler (page 59 in Section 3.3), it can receive log records when dispatched by a reactor.

The destructor is defined as protected to ensure dynamic allocation. However, since the TP_Logging_Acceptor::handle_input() method deletes objects of this type, TP_Logging_Acceptor must be a friend of this class. Declaring a friend class is appropriate in this case because these classes exhibit a high degree of cohesion, and it's important to maintain the restriction on dynamic allocation of TP_Logging_Handler . The three data members are used to implement the protocol for closing TP_Logging_Handler objects concurrently, as described in Sidebar 46 (page 196).

 protected:    virtual TP_Logging_Handler () {} // No-op destructor.    // Number of pointers to this class instance that currently    // reside in the <TP_LOGGING_TASK> singleton's message queue.    int queued_count_;    // Indicates whether <Logging_Event_Handler::handle_close()>    // must be called to cleanup and delete this object.    int deferred_close_;    // Serialize access to <queued_count_> and <deferred_close_>.    ACE_Thread_Mutex lock_; 

Sidebar 46: Closing TP_Logging_Handlers Concurrently

A challenge with thread pool servers is closing objects that can be accessed concurrently by multiple threads. In our thread pool logging server, TP_Logging_Handler pointers are used by TP_LOGGING_TASK threads. These service threads are separate from the thread running the reactor event loop that's driving callbacks to TP_Logging_Handler . We must therefore ensure that a TP_Logging_Handler object isn't destroyed while there are still pointers to it in use by TP_LOGGING_TASK .

When a logging client closes a connection, TP_Logging_Handler::handle_input() (page 197) returns -1. The reactor then calls the handler's handle_close() method, which ordinarily cleans up resources and deletes the handler. Unfortunately, that would wreak havoc if one or more pointers to that handler were still enqueued or being used by threads in the TP_LOGGING_TASK pool. We therefore use a reference counting protocol to ensure the handler isn't destroyed while a pointer to it is still in use. The UML activity diagram below illustrates the behavior this protocol enforces:

The protocol counts how many times a handler resides in the TP_LOGGING_TASK singleton's message queue. If the count is greater than 0 when the logging client socket is closed, TP_Logging_Handler::handle_close() can't yet destroy the handler. Later, as the TP_LOGGING_TASK processes each log record, the handler's reference count is decremented. When the count reaches 0, the handler can finish processing the close request that was deferred earlier.

The public part of the class defines the constructor and a pair of methods dispatched by the reactor when certain events occur.

 public:    TP_Logging_Handler (ACE_Reactor *reactor)      : Logging_Event_Handler (reactor),        queued_count_ (0),        deferred_close_ (0) {}    // Called when input events occur, e.g., connection or data.    virtual int handle_input (ACE_HANDLE);    // Called when this object is destroyed, e.g., when it's    // removed from a reactor.    virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask);  };  ACE_FACTORY_DECLARE (TPLS, TP_Logging_Handler) 

TP_Logging_Handler::handle_input() plays the reactive role in the HalfSync/Half-Async pattern. It differs from the Logging_Event_Handler::handle_input() method (page 59) since it doesn't process a log record immediately after receiving it. Instead, it inserts each log record into the TP_LOGGING_TASK singleton's message queue, where it will be processed concurrently. However, while processing the log record, the TP_LOGGING_TASK needs to access this handler's log file (inherited from Logging_Event_Handler ). To facilitate this, handle_input() combines the log record with a message block containing the handler's pointer and inserts the resulting composite message at the end of the TP_LOGGING_TASK singleton's message queue, as shown below:

 1 int TP_Logging_Handler::handle_input (ACE_HANDLE) {   2   ACE_Message_Block *mblk = 0;   3   if (logging_handler_.recv_log_record (mblk) != -1) {   4     ACE_Message_Block *log_blk = 0;   5     ACE_NEW_RETURN   6       (log_blk, ACE_Message_Block   7                   (ACE_reinterpret_cast (char *, this)), -1);   8     log_blk->cont (mblk);   9     ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, lock_, -1);  10     if (TP_LOGGING_TASK::instance ()->put (log_blk) == -1)  11     { log_blk->release (); return -1; }  12     ++queued_count_;  13     return 0;  14   } else return -1;  15 } 

Lines 23 Read a log record from a connected socket into a dynamically allocated ACE_Message_Block .

Lines 48 Create an ACE_Message_Block called log_blk that contains a pointer to this handler. The ACE_Message_Block constructor simply "borrows" the this pointer and sets the ACE_Message_Block::DONT_DELETE flag internally to ensure that the handler itself isn't destroyed when the message block is released in TP_Logging_Task::svc() (page 199). The mblk is attached to log_blk 's continuation chain to form a composite message.

Lines 910 Use an ACE_Guard to acquire the lock_ that serializes access to queued_count_ . To avoid a race condition in which a service thread processes the record before queued_count_ can be incremented, the lock is acquired before calling put() to insert the composite message block into the TP_LOGGING_TASK singleton's message queue.

Lines 1113 Release the log_blk resources and return -1 if put() fails. If it succeeds, increment the count of the number of times the handler's in the TP_LOGGING_TASK 's queue. In either case, the return statement causes the guard to release lock_ .

Line 14 If the client closes the connection, or a serious error occurs, handle_input() returns -1. This value causes the reactor to call TP_Logging_Handler::handle_close() , which implements a key portion of the protocol for closing TP_Logging_Handler s concurrently, as described in Sidebar 46 (page 196). The handle_close() method is shown below:

 1 int TP_Logging_Handler::handle_close (ACE_HANDLE handle,   2                                       ACE_Reactor_Mask) {   3   int close_now = 0;   4   if (handle != ACE_INVALID_HANDLE) {   5     ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, lock_, -1);   6     if (queued_count_ == 0) close_now = 1;   7     else deferred_close_ = 1;   8   } else {   9     ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, lock_, -1);  10     queued_count_--;  11     if (queued_count_ == 0) close_now = deferred_close_;  12   }  13  14   if (close_now) return Logging_Event_Handler::handle_close ();  15   return 0;  16 } 

Line 3 The close_now variable records whether the Logging_Event_Handler::handle_close() method should be called on line 14. This decision depends on the reference count decisions made in the rest of this method.

Lines 47 This code runs when handle_close() is called when handle_input() returns -1. Lines 67 are performed within a critical section protected by an ACE_Guard that acquires and releases the lock_ automatically within the scope of the if clause. If queue_count_ equals 0, there are no references to this object remaining in the TP_LOGGING_TASK , so we set the close_now local variable to 1. Otherwise, we set the deferred_close_ data member to note that as soon as the reference count reaches 0 this handler should be destroyed since the client has already closed its socket endpoint. As log records are processed, the TP_Logging_Task::svc() method (page 199) calls handle_close() again, and will execute the lines described below.

Lines 812 The handle equals ACE _ INVALID _ HANDLE when handle_close() is called in TP_Logging_Task::svc() (page 199) after the handler has been removed from the task's message queue. We therefore decrement the queue_count_ . If the count is now 0, we record whether we need to finish processing a close request that was deferred earlier. As in lines 57, we use an ACE_Guard to acquire and release the lock_ automatically within the scope of the else clause.

Line 14 Call Logging_Event_Handler::handle_close() (page 60) if the local variable close_now is true to close the socket and log file, and then delete itself.

Now that we've shown the TP_Logging_Handler class, we can show the TP_Logging_Task::svc() method, which runs concurrently in each of the worker threads and implements the synchronous role in the Half-Sync/Half-Asynch pattern. This method runs its own event loop that blocks on the synchronized message queue. After a message is enqueued by TP_Logging_Handler::handle_input() (page 197), it will be dequeued by an available worker thread and written to the appropriate log file corresponding to the client, as shown below.

 1 int TP_Logging_Task::svc () {   2   for (ACE_Message_Block *log_blk; getq (log_blk) != -1;) {   3     TP_Logging_Handler *tp_handler = ACE_reinterpret_cast   4       (TP_Logging_Handler *, log_blk->rd_ptr ());   5     Logging_Handler logging_handler (tp_handler->log_file ());   6     logging_handler.write_log_record (log_blk->cont ());   7     log_blk->release ();   8     tp_handler->handle_close (ACE_INVALID_HANDLE, 0);   9   }  10   return 0;  11 } 

Lines 24 Call the getq() method, which blocks until a message block is available. As shown in Figure 6.11, each message block is a composite message containing three message blocks chained together via their continuation pointers in the following order:

Figure 6.11. Message Block Chain of Log Record Information

  1. A pointer to the TP_Logging_Handler that contains the log file where the log record will be written

  2. The hostname of the connected client

  3. The marshaled log record contents

Lines 56 Initialize logging_handler with the log_file and then call Logging_Handler::write_log_record() , which writes the log record to the log file. The write_log_record() method is responsible for releasing its message block chain, as shown in Chapter 4 of C++NPv1.

Lines 78 Call log_blk->release() to reclaim allocated resources. Since the TP_Logging_Handler pointer is borrowed rather than allocated dynamically, however, we must explicitly call TP_Logging_Handler::handle_close() (page 198) on tp_handler . This method decreases the TP_Logging_Handler reference count and cleans up the object properly, using the protocol described in Sidebar 46 (page 196).

TP_Logging_Server. This facade class inherits from ACE_Service_Object , contains a Reactor_Logging_Server , and uses the TP_LOGGING_TASK singleton, as shown below.

 class TP_Logging_Server : public ACE_Service_Object {  protected:    // Contains the reactor, acceptor, and handlers.    typedef Reactor_Logging_Server<TP_Logging_Acceptor>            LOGGING_DISPATCHER;    LOGGING_DISPATCHER *logging_dispatcher_;  public:    TP_Logging_Server (): logging_dispatcher_ (0) {}    // Other methods defined below...  }; 

The TP_Logging_Server::init() hook method enhances the reactive logging server implementation from Chapter 3 as follows:

 virtual int init (int argc, ACE_TCHAR *argv[]) {    int i;    char **array = 0;    ACE_NEW_RETURN (array, char*[argc], -1);    ACE_Auto_Array_Ptr<char *> char_argv (array);     for (i = 0; i < argc; ++i)       char_argv[i] = ACE::strnew (ACE_TEXT_ALWAYS_CHAR (argv[i]));     ACE_NEW_NORETURN       (logging_dispatcher_,        TP_Logging_Server::LOGGING_DISPATCHER          (i, char_argv.get (), ACE_Reactor::instance ()));     for (i = 0; i < argc; ++i) ACE::strdelete (char_argv[i]);     if (logging_dispatcher_ == 0) return -1;     else return TP_LOGGING_TASK::instance ()->open ();  } 

This init() method is similar to the one on page 122. It allocates an instance of TP_Logging_Server::LOGGING_DISPATCHER and stores its pointer in the logging_dispatcher_ member. It also calls TP_Logging_Task::open() to prespawn a pool of worker threads that process log records concurrently.

The TP_Logging_Server::fini() method is shown next :

 1 virtual int fini () {  2   TP_LOGGING_TASK::instance ()->flush ();  3   TP_LOGGING_TASK::instance ()->wait ();  4   TP_LOGGING_TASK::close ();  5   delete logging_dispatcher_;  6   return 0;  7 } 

Line 2 Call the flush() method of the TP_LOGGING_TASK singleton, thereby closing the message queue associated with the task, which deletes all the queued messages and signals the threads in the pool to exit.

Lines 34 Use the ACE_Thread_Manager 's barrier synchronization feature to wait for the pool of threads spawned by TP_Logging_Task::open() to exit and then explicitly close the singleton because this DLL is about to be unlinked.

Lines 56 Delete the logging_dispatcher_ allocated in init() and return. For brevity, we omit the suspend() , resume() , and info () hook methods, which are similar to those shown in earlier examples.

Finally, we place ACE _ FACTORY _ DEFINE into TP_Logging_Server.cpp .

 ACE_FACTORY_DEFINE (TPLS, TP_Logging_Server) 

This macro automatically defines the _make_TP_Logging_Server() factory function that's used in the following svc.conf file:

 dynamic TP_Logging_Server Service_Object *  TPLS:_make_TP_Logging_Server() "$TP_LOGGING_SERVER_PORT" 

This file directs the ACE Service Configurator framework to configure the thread pool logging server via the following steps:

  1. Dynamically link the TPLS DLL into the address space of the process.

  2. Use the ACE_DLL class to extract the _make_TP_Logging_Server() factory function from the TPLS DLL symbol table.

  3. This function is called to obtain a pointer to a dynamically allocated TP_Logging_Server .

  4. The Service Configurator framework calls the TP_Logging_Server::init() hook method through this pointer, passing the value of the TP_LOGGING_SERVER_PORT environment variable as its single argument. This string designates the port number where the logging server listens for client connection requests.

  5. If init() succeeds, the TP_Logging_Server pointer is stored in the ACE_Service_Repository under the name "TP_Logging_Server" .

Once again, the ACE Service Configurator framework enables us to reuse the main() program from Configurable_Logging_Server.cpp (page 147).

Ru-Brd


C++ Network Programming
C++ Network Programming, Volume I: Mastering Complexity with ACE and Patterns
ISBN: 0201604647
EAN: 2147483647
Year: 2002
Pages: 65

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net