6.2 The ACE_Message_Queue Class

Ru-Brd

Motivation

As discussed in Section 2.1.4 on page 27, networked applications whose services are layered/modular are often composed of a set of collaborating tasks within a process. To simplify interfaces and design, minimize maintenance costs, and maximize reuse, these tasks should have the following properties:

  • Low intertask coupling, that is, separate task objects should have minimal dependencies on each other's data and methods .

  • High intratask cohesion, that is, the methods and data in a task should focus on a related set of functionality.

To achieve these properties, tasks often communicate by passing messages via a generic method, such as push() or put() , rather than calling specific statically typed methods directly. Messages can represent work requests, work results, or other types of data to process. They can also represent control requests that direct tasks to alter their processing, for example, to shut down or reconfigure themselves .

When producer and consumer tasks are collocated in the same process, tasks often exchange messages via an intraprocess message queue. In this design, producer task(s) insert messages into a synchronized message queue serviced by consumer task(s) that remove and process the messages. If the queue is full, producers can either block or wait a bounded amount of time to insert their messages. Likewise, if the queue is empty, consumers can either block or wait a bounded amount of time to remove messages.

Although some operating systems supply intraprocess message queues natively, this capability isn't available on all platforms. Moreover, when it is offered , it's often either highly platform specific, such as VxWorks message queues, and/or inefficient, tedious , and error prone to use, such as System V IPC message queues [Ste99]. Examples of how to create wrapper facade classes to handle these problems appear in C++NPv1. Wrapper facades could encapsulate the spectrum of available intraprocess message queue mechanisms behind a common interface, emulating missing capabilities where needed. ACE takes a different approach, however, for the following reasons:

  • To avoid unnecessary complexity. Native message queueing mechanisms, where they exist, can be hard to program correctly since they use low-level C APIs. They can also impose constraints on system administration that reflect poorly on a product's operational procedures, which can increase product support costs. For example, System V IPC message queues can persist after a program finishes execution if not cleaned up properly. These remnants may prevent an application from restarting, or contribute to resource leaks and often require a system administrator's intervention to repair the system manually. Likewise, System V IPC message queues offer interprocess queueing that incurs more overhead than the intraprocess queueing that's the target use case for many networked applications.

  • To avoid a matrix of increasing complexity. Each native message queueing mechanism has its own message format. Properly encapsulating message queues with wrapper facades therefore requires a corresponding encapsulation of messages. Reconciling two wrappers' desired feature sets with the matrix of message queues and types complicates the design and development effort for the wrappers many times over. This problem would only get worse as ACE is ported to new platforms.

  • ACE already has a powerful message class. ACE_Message_Block is a convenient , efficient, and powerful message class described in Chapter 4 of C++NPv1. It offers more capability than many platform-specific message formats, and also works portably across all ACE platforms.

Due to these factors, ACE defines the ACE_Message_Queue class, which is a portable and efficient intraprocess message queueing mechanism that leverages the advanced capabilities of ACE_Message_Block .

Class Capabilities

ACE_Message_Queue is a portable, lightweight intraprocess message queueing mechanism that provides the following capabilities:

  • It allows messages (which are instances of ACE_Message_Block ) to be enqueued at the front of the queue, the rear of the queue, or in priority order based on the message's priority. Messages can be dequeued from the front or back of the queue.

  • It uses ACE_Message_Block to provide an efficient message buffering mechanism that minimizes dynamic memory allocation and data copying.

  • It can be instantiated for either multithreaded or single-threaded configurations, allowing programmers to trade off strict synchronization for lower overhead when concurrent access to a queue isn't required.

  • In multithreaded configurations, it supports configurable flow control , which prevents fast message producer thread(s) from swamping the processing and memory resources of slower message consumer thread(s).

  • It allows timeouts to be specified on both enqueue and dequeue operations to avoid indefinite blocking.

  • It can be integrated with the ACE Reactor framework's event handling mechanism.

  • It provides allocators that can be strategized so the memory used by messages can be obtained from various sources, such as shared memory, heap memory, static memory, or thread-specific memory.

Figure 6.2 (page 160) shows the interface for ACE_Message_Queue . Since this class has a wide range of features, we divide its description into the four categories below.

Figure 6.2. The ACE_Message_Queue Class

1. Initialization and flow control methods. The following methods can be used to initialize and manage flow control in an ACE_Message_Queue :

Method

Description

ACE_Message_Queue() open ()

Initialize the queue, optionally specifying watermarks and a notification strategy (see Sidebar 38 on page 163).

high_water_mark()

low_water_mark()

Set/get the high and low watermarks that determine when flow control starts and stops.

notification_strategy()

Set/get the notification strategy.

An ACE_Message_Queue contains a pair of watermarks that implement flow control to prevent a fast sender from overrunning the buffering and computing resources of a slower receiver. To reflect the total resource usage of messages in the queue, the watermarks are measured in bytes. Each ACE_Message_Queue maintains a count of the payload bytes in each queued ACE_Message_Block to keep track of the number of bytes in the queue. A new message can be enqueued if the total number of bytes in the queue before queueing the new message is less than or equal to the high watermark. Otherwise, ACE_Message_Queue flow control works as follows :

  • If the queue is "synchronized," the calling thread will block until the total number of bytes in the queue falls below the low watermark or until a timeout occurs.

  • If the queue is "unsynchronized," the call will return -1 with errno set to EWOULD BLOCK .

The ACE_MT_SYNCH or ACE_NULL_SYNCH traits classes (page 163) can be used to designate whether a queue is synchronized or not, respectively. As ACE_Message_Block objects are removed from a message queue, the count of queued bytes is decremented accordingly . The low watermark indicates the number of queued bytes at which a previously flow-controlled ACE_Message_Queue no longer considers itself full.

The default high and low watermarks are both 16K. The default watermark values cause an ACE_Message_Queue to flow control when more than 16K are queued and to cease flow control when the number of queued bytes drops below 16K. These defaults are appropriate for applications in which the average ACE_Message_Block size is considerably less than 1K.

Depending on load and design constraints, applications may require different settings for either or both of the watermarks. The default watermark values can be reset in the ACE_Message_Queue 's constructor or open() method, as well as by its high_water_mark() and low_water_mark() mutator methods. One approach is to specify watermarks using the ACE Service Configurator framework described in Chapter 5, with defaults determined from application benchmarks measured during development and testing. The Example portions of Sections6.3 and 7.4 illustrate how to set and use a message queue's high watermark to exert flow control within multithreaded networked applications.

2. Enqueue/dequeue methods and message buffering. The following methods perform the bulk of the work in an ACE_Message_Queue :

Method

Description

is_empty()

is_full()

The is_empty() method returns true when the queue contains no message blocks. The is_full() method returns true when the number of bytes in the queue is greater than the high watermark.

enqueue_tail()

Insert a message at the back of the queue.

enqueue_head()

Insert a message at the front of the queue.

enqueue_prio()

Insert a message according to its priority.

dequeue_head()

Remove and return the message at the front of the queue.

dequeue_tail()

Remove and return the message at the back of the queue.

The design of the ACE_Message_Queue class is based on the message buffering and queueing facilities in System V STREAMS [Rag93]. Messages passed to a message queue are instances of ACE_Message_Block and can be classified as either simple or composite messages. Simple messages contain a single ACE_Message_Block . Composite messages contain multiple ACE_Message_Block objects that are linked together in accordance with the Composite pattern [GoF], which provides a structure for building recursive aggregations. A composite message often contains the following message types:

  • A control message that contains bookkeeping information, such as destination addresses and length fields, followed by

  • One or more data messages that contain the actual contents of a composite message

Messages in a queue are linked bidirectionally via a pair of pointers that can be obtained using their next () and prev() accessor methods. This design optimizes enqueueing and dequeueing at the head and tail of a queue. Priority-based queueing with the ACE_Message_Queue::enqueue_prio() method uses the ACE_Message_Block::msg_priority() accessor method to enqueue a message block ahead of all lower-priority message blocks already in the queue. Message blocks at the same priority are enqueued in first-in, first-out (FIFO) order.

Each message block in a composite message is chained together unidirectionally via a continuation pointer, which can be obtained via the cont() accessor method. Figure 6.3 (page 162) illustrates how three messages can be linked together to form an ACE_Message_Queue . The head and tail messages in the queue are simple messages, whereas the middle one is a composite message with one message block chained via its continuation pointer.

Figure 6.3. The Structure of an ACE_Message_Queue

Sidebar 39 (page 164) describes ACE_Message_Queue_Ex , which is a variant of ACE_Message_Queue that exchanges strongly typed messages.

3. Parameterized synchronization strategies. If you examine Figure 6.2 (page 160) carefully you'll see the ACE_Message_Queue template is parameterized by a SYNCH_STRATEGY traits class. This design is based on the Strategized Locking pattern [POSA2], which parameterizes the synchronization mechanisms that a class uses to protect its critical sections from concurrent access. Internally, the ACE_Message_Queue class uses the following traits from its SYNCH_STRATEGY traits class template parameter:

 template <class SYNCH_STRATEGY>  class ACE_Message_Queue {    // ...  protected:    // C++ traits that coordinate concurrent access.    ACE_TYPENAME SYNCH_STRATEGY::MUTEX lock_;    ACE_TYPENAME SYNCH_STRATEGY::CONDITION notempty_;    ACE_TYPENAME SYNCH_STRATEGY::CONDITION notfull_;  }; 

Sidebar 40 (page 165) describes the C++ traits and traits class idioms. These idioms enable application developers to customize an ACE_Message_Queue 's synchronization strategy to suit their particular needs. The ACE_Message_Queue class fragment above shows how the following traits in its traits class template parameter are used:

Sidebar 38: Integrating an ACE_Message_Queue with an ACE_Reactor

Some platforms offer a way to integrate native message queue events with synchronous event demultiplexing . For example, AIX's version of select() can demultiplex events generated by System V message queues. Although this use of select() is nonportable, the ability to integrate a message queue with a reactor is useful in applications in which one thread must respond to both I/O events and items enqueued to a message queue. The ACE_Message_Queue class therefore offers a portable way to integrate event queueing with the ACE Reactor framework presented in Chapter 3.

The constructor of the ACE_Message_Queue class, as well as its open() and notification_strategy() methods, can be used to set a notification strategy for the ACE_Message_Queue . This design is an example of the Strategy pattern [GoF], which allows various algorithms to be substituted without changing the client (which is the ACE_Message_Queue class in this case). The notification strategy must be derived from ACE_Notification_Strategy , which allows the flexibility to insert any strategy necessary for your application. One such subclass strategy is ACE_Reactor_Notification_Strategy , whose constructor associates it with an ACE_Reactor , an ACE_Event_Handler , and an event mask. After the strategy object is associated with an ACE_Message_Queue , each queued message triggers the following sequence of actions:

  1. ACE_Message_Queue calls the strategy's notify() method.

  2. The ACE_Reactor_Notification_Strategy::notify() method notifies the associated reactor using the reactor notification mechanism (page 77).

  3. The reactor dispatches the notification to the specified event handler using the designated mask.

  • The lock_ member serializes access to the queue's state and

  • notempty_ and notfull_ are condition variables that enable callers to wait efficiently for a message to arrive or to insert a message, respectively.

The SYNCH_STRATEGY traits class used to parameterized ACE_Message_Queue allows these members to adapt to a synchronized or unsynchronized queue at compile time.

Sets of ACE's synchronization wrapper facades can be combined to form traits classes that define customized synchronization strategies. ACE provides the following two traits classes that prepackage the most common synchronization traits:

  • ACE_NULL_SYNCH The traits in this class are implemented in terms of "null" locking mechanisms, as shown below.

     class ACE_NULL_SYNCH {  public:    typedef ACE_Null_Mutex MUTEX;    typedef ACE_Null_Mutex NULL_MUTEX;    typedef ACE_Null_Mutex PROCESS_MUTEX;    typedef ACE_Null_Mutex RECURSIVE_MUTEX;    typedef ACE_Null_Mutex RW_MUTEX;    typedef ACE_Null_Condition CONDITION;    typedef ACE_Null_Semaphore SEMAPHORE;    typedef ACE_Null_Semaphore NULL_SEMAPHORE;  }; 

Sidebar 39: The ACE_Message_Queue_Ex Class

The ACE_Message_Queue class enqueues and dequeues ACE_Message_Block objects, which provide a dynamically extensible way to represent messages. For programs requiring strongly typed messaging, ACE provides the ACE_Message_Queue_Ex class, which enqueues and dequeues messages that are instances of a MESSAGE_TYPE template parameter, rather than an ACE_Message_Block .

ACE_Message_Queue_Ex offers the same capabilities as ACE_Message_Queue . Its primary advantage is that application-defined data types can be queued without the need to type cast on enqueue and dequeue or copy objects into the data portion of an ACE_Message_Block . Since ACE_Message_Queue_Ex is not derived from ACE_Message_Queue , however, it can't be used with the ACE_Task class described in Section 6.3.

The ACE_NULL_SYNCH class is an example of the Null Object pattern [Woo97], which simplifies applications by defining a "no-op" placeholder that removes conditional statements in a class implementation. ACE_NULL_SYNCH is often used in single-threaded applications or in applications in which the need for interthread synchronization has either been eliminated via careful design or implemented via some other mechanism. The client logging daemon examples in Section 7.4 (page 238) illustrate the use of the ACE_NULL_SYNCH traits class.

  • ACE_MT_SYNCH The traits in this predefined class are implemented in terms of actual locking mechanisms, as shown below:

     class ACE_MT_SYNCH {  public:    typedef ACE_Thread_Mutex MUTEX;    typedef ACE_Null_Mutex NULL_MUTEX;    typedef ACE_Process_Mutex PROCESS_MUTEX;    typedef ACE_Recursive_Thread_Mutex RECURSIVE_MUTEX;    typedef ACE_RW_Thread_Mutex RW_MUTEX;    typedef ACE_Condition_Thread_Mutex CONDITION;    typedef ACE_Thread_Semaphore SEMAPHORE;    typedef ACE_Null_Semaphore NULL_SEMAPHORE;  }; 

The ACE_MT_SYNCH traits class defines a strategy containing portable, efficient synchronizers suitable for multithreaded applications. The client logging daemon examples in Section 6.2 (page 180) and Section 7.4 (page 243) illustrate the use of the ACE_MT_SYNCH traits class.

Sidebar 40: The C++ Traits and Traits Class Idioms

A trait is a type that conveys information used by another class or algorithm to determine policies at compile time. A traits class [Jos99] is a useful way to collect a set of traits that should be applied in a given situation to alter another class's behavior appropriately. Traits and traits classes are C++ policy-based class design idioms [Ale01] that are widely used throughout the C++ standard library [Aus99].

For example, the char_traits class defines the traits of a character type, such as its data type and functions to compare, search for, and assign characters of that type. The C++ standard library provides specializations of char_traits<> for char and wchar_t . These character traits then modify the behavior of common classes, such as basic_iostream<> and basic_string<> . The iostream and string classes are defined by specializing the class templates with char_traits<char> . Similarly, the wiostream and wstring classes are defined by specializing the templates with char_traits<wchar_t> .

These C++ idioms are similar in spirit to the Strategy pattern [GoF], which allows substitution of class behavioral characteristics without requiring a change to the class itself. The Strategy pattern involves a defined interface that's commonly bound dynamically at run time using virtual methods. In contrast, the traits and traits class idioms involve substitution of a set of class members and/or methods that can be bound statically at compile time using C++ parameterized types.

Parameterizing the ACE_Message_Queue template with a traits class provides the following benefits.

  • It allows ACE_Message_Queue to work in both single-threaded and multithreaded configurations without requiring changes to the class implementation.

  • It allows the synchronization aspects of an instantiation of ACE_Message_Queue to be changed wholesale via the Strategized Locking pattern.

For example, the ACE_Message_Queue 's MUTEX and CONDITION traits resolve to ACE_Null_Mutex and ACE_Null_Condition if the ACE_NULL_SYNCH traits class is used. In this case, the resulting message queue class incurs no synchronization overhead. In contrast, if an ACE_Message_Queue is parameterized with the ACE_MT_SYNCH traits class, its MUTEX and CONDITION traits resolve to ACE_Thread_Mutex and ACE_Condition_Thread_Mutex . In this case, the resulting message queue class behaves in accordance with the Monitor Object design pattern [POSA2], which

  • Synchronizes concurrent method execution to ensure that only one method at a time runs within an object

  • Allows an object's methods to schedule their execution sequences cooperatively

Sidebar 49 (page 214) explains how ACE uses macros to implement ACE_NULL_SYNCH and ACE_MT_SYNCH for C++ compilers that lack support for traits classes in templates.

When ACE_Message_Queue is parameterized with ACE_NULL_SYNCH , calls to its enqueue and dequeue methods never block the calling thread when they reach the queue's boundary conditions. They instead return -1 with errno set to EWOULDBLOCK . Conversely, when an ACE_Message_Queue is instantiated with ACE_MT_SYNCH , its enqueue and dequeue methods support blocking, nonblocking, and timed operations. For example, when a synchronized queue is empty, calls to its dequeue methods will block by default until a message is enqueued and the queue is no longer empty. Likewise, when a synchronized queue is full, calls to its enqueue methods block by default until sufficient messages are dequeued to decrease the number of bytes in the queue below its low watermark and the queue is no longer full. This default blocking behavior can be modified by passing the following types of ACE_Time_Value values to these methods:

Value

Behavior

NULL ACE_Time_Value pointer

Indicates that the enqueue or dequeue method should wait indefinitely, that is, it will block until the method completes, the queue is closed, or a signal interrupts the call.

Non- NULL ACE_Time_Value pointer whose sec() and usec() methods return 0

Indicates that enqueue and dequeue methods should perform a nonblocking operation, that is, if the method doesn't succeed immediately return -1 and set errno to EWOULDBLOCK .

Non- NULL ACE_Time_Value pointer whose sec() or usec() method returns > 0

Indicates that enqueue or dequeue method should wait until the absolute time of day, returning -1 with errno set to EWOULDBLOCK if the method does not complete by this time. The call will also return earlier if the queue is closed or a signal interrupts the call.

Sidebar 6 (page 45) describes the different interpretations of timeout values used by various classes in ACE.

  1. 4. Shutdown and message release methods. The following methods can be used to shut down, deactivate, and/or release the messages in an ACE_Message_Queue :

Method

Description

deactivate()

Changes the queue state to DEACTIVATED and wakes up all threads waiting on enqueue or dequeue operations. This method does not release any queued messages.

pulse()

Changes the queue state to PULSED and wakes up all threads waiting on enqueue or dequeue operations. This method does not release any queued messages.

state()

Returns the queue's state .

activate()

Changes the queue state to ACTIVATED .

ACE_Message_Queue() close()

Deactivates the queue and releases any queued messages immediately.

flush()

Releases the messages in a queue, but doesn't change its state.

An ACE_Message_Queue is always in one of three states internally:

  • ACTIVATED , in which all operations work normally (a queue always starts in the ACTIVATED state).

  • DEACTIVATED , in which all enqueue and dequeue operations immediately return -1 and set errno to ESHUTDOWN until the queue is activated again.

  • PULSED , the transition to which causes waiting enqueue and dequeue operations to return immediately as if the queue were deactivated; however, all operations initiated while in the PULSED state behave as in ACTIVATED state.

The DEACTIVATED and PULSED states are useful in situations in which it's necessary to notify all producer and consumer threads that some significant event has occurred. Transitioning to either state causes all waiting producer and consumer threads to wake up. The difference between them is the queue's enqueue/dequeue behavior after the transition. In the DEACTIVATED state, all enqueue and dequeue operations fail until the queue's state is changed to ACTIVATED . The PULSED state, however, is behaviorally equivalent to the AC TIVATED state, that is, all enqueue/dequeue operations proceed normally. The PULSED state is mainly informationalan awakened producer/consumer can decide whether to attempt further queue operations by examining the return value from the state() method. The Example portion of Section 7.4 (page 246) illustrates the use of the pulse() method to trigger connection reestablishment.

No messages are removed from the queue on any state transition. The messages in a queue can be released by the ACE_Message_Queue destructor, close() , or flush() methods. These methods release all the message blocks remaining in the message queue. The flush() method doesn't deactivate the queue, however, whereas the other two methods do. Sidebar 41 describes several protocols that shut ACE_Message_Queue s down gracefully.

Sidebar 41: ACE_MessageuscoreQueue Graceful Shutdown Protocols

To avoid losing queued messages unexpectedly when an ACE_Message_Queue needs to be closed, producer and consumer threads can implement the following protocol:

  1. A producer thread can enqueue a special message, such as a message block whose payload is size 0 and/or whose type is MB_STOP , to indicate that it wants the queue closed.

  2. The consumer thread can close the queue when it receives this shutdown message, after processing any other messages ahead of it in the queue.

A variant of this protocol can use the ACE_Message_Queue::enqueue_prio() method to boost the priority of the shutdown message so it takes precedence over lower-priority messages that may already reside in the queue.

Example

This example shows how ACE_Message_Queue can be used to implement a client logging daemon. As shown in Figure 1.10 (page 21), a client logging daemon runs on every host participating in the networked logging service and performs the following tasks:

  • It uses a local IPC mechanism, such as shared memory, pipes, or loopback sockets, to receive log records from client applications on the client logging daemon's host.

  • It uses a remote IPC mechanism, such as TCP / IP , to forward log records to a server logging daemon running on a designated host.

Our example uses two threads to implement a bounded buffer concurrency model [BA90] based on a synchronized ACE_Message_Queue using the ACE_MT_SYNCH traits class (page 164).

In our client logging daemon, the main thread uses an event handler and the ACE Reactor framework to read log records from sockets connected to client applications via the network loopback device. The event handler queues each log record in the synchronized ACE_Message_Queue . A separate forwarder thread runs concurrently, performing the following steps continuously:

  1. Dequeueing messages from the message queue

  2. Buffering the messages into larger chunks

  3. Forwarding the chunks to the server logging daemon over a TCP connection

By using a synchronized message queue, the main thread can continue to read log records from client applications as long as the message queue isn't full. Overall server concurrency can therefore be enhanced, even if the forwarder thread blocks occasionally when sending log records to the logging server over a flow controlled connection.

The client logging daemon plays multiple roles, including accepting connections from client applications, receiving log records, establishing connections to the logging server, and forwarding log records. We therefore can't implement it by reusing our Reactor_Logging_Server class from the Example portion of Section 3.5. Instead, we define a new group of classes shown in Figure 6.4. The role of each class is outlined below:

Figure 6.4. The Client Logging Daemon Classes

Class

Description

CLD_Handler

Target of callbacks from the ACE_Reactor . Receives log records from clients , converts them into ACE_Message_Block s, and inserts them into the synchronized message queue that's processed by a separate thread and forwarded to the logging server.

CLD_Acceptor

A factory that passively accepts connections from clients and registers them with the ACE_Reactor to be processed by the CLD_Handler .

CLD_Connector

A factory that actively establishes (and when necessary reestablishes) connections with the logging server.

Client_Logging_Daemon

A facade class that integrates the other three classes together.

As shown in Figure 6.4, the classes in this client logging daemon implementation are designed in accordance with the Acceptor-Connector pattern. CLD_Acceptor plays the acceptor role, CLD_Connector plays the connector role, and CLD_Handler plays the service handler role. The relationships between the main thread, the forwarder thread, the classes in Figure 6.4, and the synchronized ACE_Message_Queue that joins them are shown in Figure 6.5 (page 170).

Figure 6.5. Interactions between Objects in the Client Logging Daemon

We start our implementation by including the necessary ACE header files.

 #include "ace/OS.h"  #include "ace/Event_Handler.h"  #include "ace/INET_Addr.h"  #include "ace/Get_Opt.h"  #include "ace/Log_Record.h"  #include "ace/Message_Block.h"  #include "ace/Message_Queue.h"  #include "ace/Reactor.h"  #include "ace/Service_Object.h"  #include "ace/Signal.h"  #include "ace/Synch.h"  #include "ace/SOCK_Acceptor.h"  #include "ace/SOCK_Connector.h"  #include "ace/SOCK_Stream.h"  #include "ace/Thread_Manager.h"  #include "Logging_Acceptor.h"  #include "CLD_export.h" 

Each class in Figure 6.4 is defined in the Client_Logging_Daemon.cpp file and described below.

CLD Handler. This class provides the following capabilities:

  • It receives log records from clients.

  • It converts the log records into ACE_Message_Block s.

  • It enqueues the message blocks in a synchronized message queue.

  • It runs a separate thread that dequeues the message blocks and forwards them to the logging server in large chunks.

The CLD_Handler class is shown below:

 #if !defined (FLUSH_TIMEOUT)  #define FLUSH_TIMEOUT 120 /* 120 seconds == 2 minutes. */  #endif /* FLUSH_TIMEOUT */  class CLD_Handler : public ACE_Event_Handler {  public:    enum { QUEUE_MAX = sizeof (ACE_Log_Record) * ACE_IOV_MAX };    // Initialization hook method.    virtual int open (CLD_Connector *);    // Shut down hook method.    virtual int close ();    // Accessor to the connection to the logging server.    virtual ACE_SOCK_Stream &peer () { return peer_; }    // Reactor hook methods.    virtual int handle_input (ACE_HANDLE handle);    virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,                              ACE_Reactor_Mask = 0);  protected:    // Forward log records to the server logging daemon.    virtual ACE_THR_FUNC_RETURN forward ();    // Send buffered log records using a gather-write operation.    virtual int send (ACE_Message_Block *chunk[], size_t count);    // Entry point into forwarder thread of control.    static ACE_THR_FUNC_RETURN run_svc (void *arg);    // A synchronized <ACE_Message_Queue> that queues messages.    ACE_Message_Queue<ACE_MT_SYNCH> msg_queue_;       // Manage the forwarder thread.    ACE_Thread_Manager thr_mgr_;    // Pointer to our <CLD_Connector>.    CLD_Connector *connector_;    // Connection to the logging server.    ACE_SOCK_Stream peer_;  }; 

There's no need for a constructor or destructor in CLD_Handler since its open() and close() hooks perform initialization and destruction activities when called by the CLD_Acceptor factory class (page 176). CLD_Handler performs two roles input and output which are explained below.

  • Input role. Since CLD_Handler inherits from ACE_Event_Handler , it can use the ACE Reactor framework to wait for log records to arrive from client applications connected to the client logging daemon via loopback TCP sockets. When a log record arrives at the client logging daemon, the singleton ACE_Reactor dispatches the following CLD_Handler::handle_input() hook method:

     1 int CLD_Handler::handle_input (ACE_HANDLE handle) {   2   ACE_Message_Block *mblk = 0;   3   Logging_Handler logging_handler (handle);   4   5   if (logging_handler.recv_log_record (mblk) != -1)   6     if (msg_queue_.enqueue_tail (mblk->cont ()) != -1) {   7       mblk->cont (0);   8       mblk->release ();   9       return 0; // Success return.  10     } else mblk->release ();  11   return -1; // Error return.  12 } 

Lines 35 Use the Logging_Handler from Chapter 4 of C++NPv1 to read a log record out of the socket handle parameter and store the record in an ACE_Message_Block .

Lines 68 Insert the message into the synchronized queue serviced by the forwarder thread. The server logging daemon expects to receive a marshaled log record, but not a hostname string, so we enqueue only the log record data (which is referenced by mblk->cont() ) and not the hostname (which is referenced by mblk ). When the server logging daemon receives the log record, it prepends the name of the client logging host, just as this client logging daemon recorded the name of the logging client. If the enqueue_tail() call succeeds, the continuation field is set to NULL to ensure that mblk->release() only reclaims the message block that stores the hostname.

Lines 1011 If a client application disconnects the TCP connection or an error occurs, the handle_input() hook method returns -1 (Sidebar 41 on page 61 discusses strategies for handling peers that simply stop communicating). This value triggers the reactor to call the following handle_close() hook method that closes the socket:

 int CLD_Handler::handle_close (ACE_HANDLE handle,                                 ACE_Reactor_Mask)  { return ACE_OS::closesocket (handle); } 

Note that we needn't delete this object in handle_close() since the memory is managed by the Client_Logging_Daemon class (page 180).

  • Output role. The CLD_Handler object is initialized when CLD_Connector 's connect() method (page 179) calls the following open() hook method:

     1 int CLD_Handler::open (CLD_Connector *connector) {  2   connector_ = connector;  3   int bufsiz = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;  4   peer ().set_option (SOL_SOCKET, SO_SNDBUF,  5                       &bufsiz, sizeof bufsiz);  6   msg_queue_.high_water_mark (CLD_Handler::QUEUE_MAX);  7   return thr_mgr_.spawn (&CLD_Handler::run_svc,  8                          this, THR_SCOPE_SYSTEM);  9 } 

Lines 25 Store a pointer to the CLD_Connector and increase the peer() socket send buffer to its largest size to maximize throughput over long-delay and/or high-speed networks.

Line 6 Set the msg_queue_ 's high watermark to sizeof(ACE_Log_Record) x ACE _ IOV _ MAX . Since log records are buffered in groups of up to ACE _ IOV _ MAX before being sent to the server logging daemon, the queue's high watermark is set to accomodate at least ACE _ IOV _ MAX log records. Although the queued log records will be CDR-marshaled, using the maximum sized demarshaled log record ( ACE_Log_Record ) is a fair approximation .

Lines 78 Use the ACE_Thread_Manager from Chapter 9 of C++NPv1 to spawn a system-scoped thread that executes the CLD_Handler::run_svc() static method concurrently with respect to the main thread. [1] The run_svc() static method casts its void pointer argument to a CLD_Handler pointer and then delegates its processing to the for ward() method, as shown below:

[1] Since the CLD_Handler::close() method (page 176) waits for this thread to exit, we needn't pass the THR _ DETACHED flag to spawn() .

 ACE_THR_FUNC_RETURN CLD_Handler::run_svc (void *arg) {    CLD_Handler *handler = ACE_static_cast (CLD_Handler *, arg);    return handler->forward ();  } 

We now show the CLD_Handler::forward() method, which runs in its own thread and forwards log records to the server logging daemon. As shown below, this method optimizes network throughput by buffering log records until a maximum number have arrived or a maximum time elapses.

 1 ACE_THR_FUNC_RETURN CLD_Handler::forward () {   2   ACE_Message_Block *chunk[ACE_IOV_MAX];   3   size_t message_index = 0;   4   ACE_Time_Value time_of_last_send (ACE_OS::gettimeofday ());   5   ACE_Time_Value timeout;   6   ACE_Sig_Action no_sigpipe ((ACE_SignalHandler) SIG_IGN);   7   ACE_Sig_Action original_action;   8   no_sigpipe.register_action (SIGPIPE, &original_action);   9  10   for (;;) {  11     if (message_index == 0) {  12       timeout = ACE_OS::gettimeofday ();  13       timeout += FLUSH_TIMEOUT;  14     }  15     ACE_Message_Block *mblk = 0;  16     if (msg_queue_.dequeue_head (mblk, &timeout) == -1) {  17       if (errno != EWOULDBLOCK) break;  18       else if (message_index == 0) continue;  19     } else {  20       if (mblk->size () == 0  21           && mblk->msg_type () == ACE_Message_Block::MB_STOP)  22         { mblk->release (); break; }  23       chunk[message_index] = mblk;  24       ++message_index;  25     }  26     if (message_index >= ACE_IOV_MAX  27          (ACE_OS::gettimeofday () - time_of_last_send  28             >= FLUSH_TIMEOUT)) {  29       if (send (chunk, message_index) == -1) break;  30       time_of_last_send = ACE_OS::gettimeofday ();  31     }  32   }  33  34   if (message_index > 0) send (chunk, message_index);  35   msg_queue_.close ();  36   no_sigpipe.restore_action (SIGPIPE, original_action);  37   return 0;  38 } 

Lines 25 We will buffer as many blocks as can be sent in a single gather-write operation. We therefore declare an array of ACE_Message_Block pointers to hold pointers to the blocks that will be dequeued. We also define an index to keep track of the number of buffered records and ACE_Time_Value objects to record the last time a log record was sent and the next flush timeout. These ACE_Time_Value objects are used to bound the amount of time log records are buffered before they're transmitted to the logging server.

Lines 68 UNIX systems will raise the SIGPIPE signal if send() fails due to the peer closing the connection. The default behavior for SIGPIPE is to abort the process. We use the ACE_Sig_Action class to ignore the SIGPIPE signal in this method, which will allow us to handle any send() failures in the normal (nonsignal) execution path .

Line 10 This loop will run until the message queue is deactivated.

Lines 1114 Reset the flush timeout on the first iteration, after a timeout, and after message blocks are forwarded.

Lines 1516 Wait up to the next flush timeout to dequeue a pointer to the next ACE_Message_Block from the msg_queue_ .

Lines 1718 If the dequeue operation failed, and it wasn't because of a timeout, break out of the loop. If it was a timeout, but there are no buffered log records, just continue the loop waiting to dequeue a message block.

Lines 1922 After successfully dequeueing a message block, we first check to see if its size() is 0 and its type is MB_STOP . By convention, this application uses this type of message block to request the thread to shut down. When we receive the shutdown message, we release the message block and break out of the loop.

Lines 2324 We store the log record block in the next available slot in the chunk array and increment the count of saved message blocks.

Lines 2630 Whenever the buffer fills up or the FLUSH _ TIMEOUT elapses, call the CLD_Handler::send() method (see below) to flush the buffered log records in one gather-write operation. The send() method releases all the saved message blocks and resets message_index to reflect the fact that there are no valid blocks remaining. If the records were sent successfully, we record the time when the records were sent.

Line 34 If dequeue_head() fails or a shutdown message was received, any remaining buffered log records are flushed by a call to CLD_Handler::send() .

Line 35 Close the message queue to release its resources.

Line 36 Restore the disposition of the SIGPIPE signal to its original action before the forward() method was called.

Line 37 Return from the CLD_Handler::forward() method, which exits the thread.

The CLD_Handler::send() method sends the buffered log records to the logging server. It's also responsible for reconnecting to the server if the connection is closed.

 1 int CLD_Handler::send (ACE_Message_Block *chunk[],   2                        size_t &count) {   3   iovec iov[ACE_IOV_MAX];   4   size_t iov_size;   5   int result = 0;   6   7   for (iov_size = 0; iov_size < count; ++iov_size) {   8     iov[iov_size].iov_base = chunk[iov_size]->rd_ptr ();   9     iov[iov_size].iov_len = chunk[iov_size]->length ();  10   }  11  12   while (peer ().sendv_n (iov, iov_size) == -1)  13     if (connector_->reconnect () == -1) {  14       result = -1;  15       break;  16     }  17  18   while (iov_size > 0) {  19     chunk[--iov_size]->release (); chunk[iov_size] = 0;  20   }  21   count = iov_size;  22   return result;  23 } 

Lines 39 To set up the gather-write operation, we gather the data pointers and lengths from the supplied message blocks into an iovec array.

Lines 1216 The ACE_SOCK_Stream::sendv_n() method flushes the buffered log records in one gather-write operation. If sendv_n() fails due to a broken connection, we attempt to reestablish the connection using the CLD_Connector::reconnect() method. If reconnect() succeeds, sendv_n() is reinvoked. As shown on page 179, CLD_Connector::reconnect() tries upto MAX _ RETRIES times to reestablish connections.

All the data is sent on each iteration of the while loop. Since there's no application-level transaction monitoring, there's no end-to-end acknowledgment that the transmitted log records were received and recorded. The sendv_n() can pass back the number of bytes successfully sent to the TCP layer for transmission, but offers no guarantee that any of the bytes were received at the peer host or read by the server logging daemon. Since there's no reliable way to tell how many log records were received and recorded, and logging a record multiple times does no harm, they are all re-sent if the connection must be reestablished.

Lines 1821 The final loop in the method releases all the log record data and sets all the ACE_Message_Block pointers to 0. To reflect the fact that there are no valid log records remaining in chunk , we reset count to 0.

CLD_Handler::close() is a public method called by the CLD_Acceptor::handle_close() method (page 178) or the Client_Logging_Daemon::fini() method (page 181) to shut the handler down. It inserts a 0-sized message of type MB_STOP into the message queue as follows:

 int CLD_Handler::close () {    ACE_Message_Block *shutdown_message = 0;    ACE_NEW_RETURN      (shutdown_message,      ACE_Message_Block (0, ACE_Message_Block::MB_STOP), -1);    msg_queue_.enqueue_tail (shutdown_message);    return thr_mgr_.wait ();  } 

When the forwarder thread receives the shutdown_message , it flushes its remaining log records to the logging server, closes the message queue, and exits the thread. We use the ACE_Thread_Manager::wait() method to block until the forwarder thread exits before returning. This method also reaps the exit status of the forwarder thread to prevent memory leaks.

CLD_Acceptor. This class provides the following capabilities:

  • It's a factory that passively accepts connections from clients.

  • It then registers the connections with the ACE_Reactor . Log records sent from clients over the connections are then processed by an instance of the CLD_Handler shown above.

The CLD_Acceptor class definition is shown below:

 class CLD_Acceptor : public ACE_Event_Handler {  public:    // Initialization hook method.    virtual int open (CLD_Handler *, const ACE_INET_Addr &,                      ACE_Reactor * = ACE_Reactor::instance ());    // Reactor hook methods.    virtual int handle_input (ACE_HANDLE handle);    virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,                              ACE_Reactor_Mask = 0);    virtual ACE_HANDLE get_handle () const;  protected:    // Factory that passively connects <ACE_SOCK_Stream>s.    ACE_SOCK_Acceptor acceptor_;    // Pointer to the handler of log records.    CLD_Handler *handler_;  }; 

Since CLD_Acceptor inherits from ACE_Event_Handler , it can register itself with the ACE Reactor framework to accept connections, as shown below:

 int CLD_Acceptor::open (CLD_Handler *handler,                          const ACE_INET_Addr &local_addr,                          ACE_Reactor *r) {    reactor (r); // Store the reactor pointer.    handler_ = handler;    if (acceptor_.open (local_addr) == -1         reactor ()->register_handler             (this, ACE_Event_Handler::ACCEPT_MASK) == -1)      return -1;    return 0;  } 

This method instructs the acceptor_ to start listening for connection requests and then registers this object with the singleton reactor to accept new connections. The reactor double-dispatches to the following CLD_Acceptor::get_handle() method to obtain the acceptor_ 's socket handle:

 ACE_HANDLE CLD_Acceptor::get_handle () const  { return acceptor_.get_handle (); } 

When a connection request arrives at the client logging daemon, the singleton reactor dispatches the following CLD_Acceptor::handle_input() hook method:

 1 int CLD_Acceptor::handle_input (ACE_HANDLE) {   2   ACE_SOCK_Stream peer_stream;   3   if (acceptor_.accept (peer_stream) == -1) return -1;   4   else if (reactor ()->register_handler   5              (peer_stream.get_handle (),   6               handler_,   7               ACE_Event_Handler::READ_MASK) == -1)   8     return -1;   9   else return 0;  10 } 

Lines 23 Accept the connection into peer_stream , which just accepts the connection and initializes the new socket handle registered with the reactor. It therefore needn't exist after handle_input() returns. The destructor of ACE_SOCK_Stream doesn't shut down the handle automatically for the reasons described in Chapter 3 of C++NPv1.

Lines 47 Use the three-parameter variant of register_handler() (page 73) to register a pointer to the CLD_Handler with the reactor for READ events. This register_handler() method enables the client logging daemon to reuse a single C++ object for all of its logging handlers. When log records arrive, the reactor will dispatch the CLD_Handler::handle_input() method (page 171).

The following handle_close() method is invoked automatically by the reactor if a failure occurs while accepting a connection or registering a handle and event handler:

 int CLD_Acceptor::handle_close (ACE_HANDLE, ACE_Reactor_Mask) {    acceptor_.close ();    handler_->close ();    return 0;  } 

This method closes both the acceptor factory and the CLD_Handler . CLD_Handler::close() (page 176) triggers a shutdown of the message queue and forwarder thread.

CLD_Connector. This class provides the following capabilities:

  • It's a factory that actively establishes a connection from the client logging daemon to the server logging daemon.

  • It activates the instance of CLD_Handler to forward log records concurrently to the logging server.

The CLD_Connector class definition is shown below:

 class CLD_Connector {  public:    // Establish a connection to the logging server    // at the <remote_addr>.    int connect (CLD_Handler *handler,                 const ACE_INET_Addr &remote_addr);    // Re-establish a connection to the logging server.    int reconnect ();  private:    // Pointer to the <CLD_Handler> that we're connecting.    CLD_Handler *handler_;    // Address at which the logging server is listening    // for connections.    ACE_INET_Addr remote_addr_;  } 

The connect() method is shown below:

 1 int CLD_Connector::connect   2     (CLD_Handler *handler,   3      const ACE_INET_Addr &remote_addr) {   4   ACE_SOCK_Connector connector;   5   6   if (connector.connect (handler->peer (), remote_addr) == -1)   7     return -1;   8   else if (handler->open (this) == -1)   9   { handler->handle_close (); return -1; }  10   handler_ = handler;  11   remote_addr_ = remote_addr;  12   return 0;  13 } 

Lines 46 Use the ACE Socket wrapper facades from Chapter 4 of C++NPv1 to establish a TCP connection with the server logging daemon.

Lines 89 Activate the CLD_Handler by invoking its open() hook method (page 172). If successful, this method spawns a thread that runs the CLD_Handler::forward() method to forward log records to the logging server. If open() fails, however, we call the handle_close() method on the handler to close the socket.

Lines 1011 Store the handler and the remote address to simplify the implementation of the CLD_Connector::reconnect() method, which is used to reconnect to the logging server when it closes client connections, either due to a crash or due to the Evictor pattern (page 66). As shown below, the reconnect() method uses an exponential backoff algorithm to avoid swamping a logging server with connection requests:

 int CLD_Connector::reconnect () {    // Maximum number of times to retry connect.    const size_t MAX_RETRIES = 5;    ACE_SOCK_Connector connector;    ACE_Time_Value timeout (1); // Start with 1 second timeout.    size_t i;    for (i = 0; i < MAX_RETRIES; ++i) {      if (i > 0) ACE_OS::sleep (timeout);      if (connector.connect (handler_->peer (), remote_addr_,                             &timeout) == -1)        timeout *= 2; // Exponential backoff.      else {        int bufsiz = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;        handler_->peer ().set_option (SOL_SOCKET, SO_SNDBUF,                                      &bufsiz, sizeof bufsiz);        break;      }    }    return i == MAX_RETRIES ? -1 : 0;  } 

As earlier, we increase the peer() socket send buffer to its largest size to maximize throughput over long-delay and/or high-speed networks.

Client_Logging_Daemon. This class is a facade that integrates the three classes described above to implement the client logging daemon. Its definition is shown below.

 class Client_Logging_Daemon : public ACE_Service_Object {  public:    // Service Configurator hook methods.    virtual int init (int argc, ACE_TCHAR *argv[]);    virtual int fini ();    virtual int info (ACE_TCHAR **bufferp, size_t length = 0) const;    virtual int suspend ();    virtual int resume ();  protected:    // Receives, processes, and forwards log records.    CLD_Handler handler_;    // Factory that passively connects the <CLD_Handler>.    CLD_Acceptor acceptor_;    // Factory that actively connects the <CLD_Handler>.    CLD_Connector connector_;  }; 

Client_Logging_Daemon inherits from ACE_Service_Object . It can therefore be configured dynamically via a svc.conf file processed by the ACE Service Configurator framework described in Chapter 5. When an instance of Client_Logging_Daemon is linked dynamically, the ACE Service Configurator framework calls Client_Logging_Daemon::init() , which is shown below:

 1 int Client_Logging_Daemon::init (int argc, ACE_TCHAR *argv[]) {   2   u_short cld_port = ACE_DEFAULT_SERVICE_PORT;   3   u_short sld_port = ACE_DEFAULT_LOGGING_SERVER_PORT;   4   ACE_TCHAR sld_host[MAXHOSTNAMELEN];   5   ACE_OS_String::strcpy (sld_host, ACE_LOCALHOST);   6   7   ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("p:r:s:"), 0);   8   get_opt.long_option (ACE_TEXT ("client_port"), 'p',   9                        ACE_Get_Opt::ARG_REQUIRED);  10   get_opt.long_option (ACE_TEXT ("server_port"), 'r',  11                        ACE_Get_Opt::ARG_REQUIRED);  12   get_opt.long_option (ACE_TEXT ("server_name"), 's',  13                        ACE_Get_Opt::ARG_REQUIRED);  14  15   for (int c; (c = get_opt ()) != -1;)  16     switch (c) {  17     case 'p': // Client logging daemon acceptor port number.  18       cld_port = ACE_static_cast  19         (u_short, ACE_OS::atoi (get_opt.opt_arg ()));  20       break;  21     case 'r': // Server logging daemon acceptor port number.  22       sld_port = ACE_static_cast  23         (u_short, ACE_OS::atoi (get_opt.opt_arg ()));  24       break;  25     case 's': // Server logging daemon hostname.  26       ACE_OS_String::strsncpy  27         (sld_host, get_opt.opt_arg (), MAXHOSTNAMELEN);  28       break;  29     }  30  31   ACE_INET_Addr cld_addr (cld_port);  32   ACE_INET_Addr sld_addr (sld_port, sld_host);  33  34   if (acceptor_.open (&handler_, cld_addr) == -1)  35     return -1;  36   else if (connector_.connect (&handler_, sld_addr) == -1)  37   { acceptor_.handle_close (); return -1; }  38   return 0;  39 } 

Lines 25 Assign the default client logging daemon listen port ( cld_port ) and the default server logging daemon port ( sld_port ) and hostname ( sld_host ). These network addresses can be changed by arguments passed into this method. In particular, the server logging daemon hostname will often need to be set using the -s option.

Lines 729 Use the ACE_Get_Opt iterator described in Sidebar 8 (page 47) to parse any options passed by the svc.conf file. The final parameter of 0 to ACE_Get_Opt ensures option parsing will begin at argv[0] rather than argv[1] , which is the default. If any of the "-p" , "-r" ,or "-s" options, or their long option equivalents, are passed in the argv parameter to init() , the appropriate port number or hostname is modified accordingly.

Lines 3132 With the port numbers and server logging daemon's hostname now known, form the addresses needed to establish connections.

Lines 3437 Initialize the acceptor_ (page 177) and connector_ (page 179).

When the client logging daemon is removed, the ACE Service Configurator framework calls the following Client_Logging_Daemon::fini() hook method:

 int Client_Logging_Daemon::fini () {    acceptor_.handle_close ();    handler_.close ();    return 0;  } 

This fini() method closes the ACE_SOCK_Acceptor socket factory and the CLD_Handler , which triggers a shutdown of the message queue and forwarder thread. The ACE Service Configurator framework will delete the Client_Logging_Daemon instance after fini() returns.

Now that we've implemented all the client logging daemon's classes, we place the following ACE _ FACTORY _ DEFINE macro from Sidebar 32 (page 136) in the implementation file: [2]

[2] We leave the suspend() , resume() , and info() hook methods as an exercise for the reader.

 ACE_FACTORY_DEFINE (CLD, Client_Logging_Daemon) 

This macro defines the _make_Client_Logging_Daemon() factory function, which is used in the following svc.conf file:

 dynamic Client_Logging_Daemon Service_Object *  CLD:_make_Client_Logging_Daemon() "-p $CLIENT_LOGGING_DAEMON_PORT" 

This file directs the ACE Service Configurator framework to configure the client logging daemon via the following steps:

  1. It dynamically links the CLD DLL into the address space of the process.

  2. It uses the ACE_DLL class described in Sidebar 33 (page 143) to extract the _make_Client_Logging_Daemon() factory function address from the CLD DLL symbol table.

  3. The factory function is called to obtain a pointer to a dynamically allocated Client_Logging_Daemon object.

  4. The Service Configurator framework then calls Client_Logging_Daemon::init() on this new object, passing as its argc / argv argument the string "-p" followed by an expansion of the CLIENT_LOGGING_DAEMON_PORT environment variable designating the port number where the client logging daemon listens for client application connection requests. The "-r" and "s" options can also be passed in this manner.

  5. If init() succeeds, the Client_Logging_Daemon pointer is stored in the ACE_Service_Repository under the name "Client_Logging_Daemon" .

Rather than write a new main() program, we reuse the one from Configurable_Logging_Server (page 147). The svc.conf file above simply configures in the Client Logging Daemon service when the program starts. The Example portion of Section 7.4 shows how the ACE Acceptor-Connector framework can be used to further simplify and enhance the multithreaded client logging daemon implementation shown above.

Ru-Brd


C++ Network Programming
C++ Network Programming, Volume I: Mastering Complexity with ACE and Patterns
ISBN: 0201604647
EAN: 2147483647
Year: 2002
Pages: 65

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net