9.2 The ACE_Module Class

Ru-Brd

Motivation

Many networked applications can be modeled as an ordered series of processing layers that are related hierarchically and that exchange messages between adjacent layers. For example, kernel-level [Rit84, Rag93] and user -level [SS95b, HJE95] protocol stacks, call center managers [SS94], and other families of networked applications can benefit from a message-passing design based on a layered/modular service architecture. As discussed in Section 2.1.4, each layer can handle a self-contained portion (such as input or output, event analysis, event filtering, or service processing) of a service or networked application.

The ACE_Task class provides a reuseable component that can easily be used to separate processing into stages and pass data between them. Since ACE_Task objects are independent, however, additional structure is required to order ACE_Task objects into bidirectional "reader-writer" pairs that can be assembled and managed as a unit. Redeveloping this structure in multiple projects is tedious and unnecessary because the structure is fundamentally application independent. To avoid this redundant development effort, therefore, the ACE Streams framework defines the ACE_Module class.

Class Capabilities

ACE_Module defines a distinct layer of application-defined functionality. This class provides the following capabilities:

  • Each ACE_Module is a bidirectional application-defined processing layer containing a pair of reader and writer tasks that derive from ACE_Task . Layered designs can be expressed easily using ACE_Module , which simplifies development, training, and evolution.

  • The ACE Service Configurator framework supports dynamic construction of ACE_Module objects that can be configured into an ACE_Stream at run time. Layered designs based on ACE_Module are therefore highly extensible.

  • The reader and writer ACE_Task objects contained in an ACE_Module collaborate with adjacent ACE_Task objects by passing messages via a public hook method, which promotes loose coupling and simplifies reconfiguration.

  • The objects composed into an ACE_Module can be varied and replaced independently, which lowers maintenance and enhancement costs.

The interface for ACE_Module is shown in Figure 9.2 and its key methods are shown in the following table:

Figure 9.2. The ACE_Module Class

Method

Description

ACE_Module() open ()

Initialize a module and allocate its resources.

ACE_Module() close()

Destroy a module and release its resources.

reader() writer()

Set/get the reader- and writer-side tasks.

name ()

Set/get the name of the module.

The ACE_Task class described in Section 6.3 provides an object-oriented processing abstraction that can be specialized to target any particular application domain, such as network protocol stacks [SW95] or customer care call center management [SS94]. The ACE_Task message passing and queueing mechanism provides a straightforward way to divide a domain's work into distinct steps and move work and data between them efficiently and bidirectionally. Many domains have symmetric processing steps for data being read and written. For example, protocol processing often involves symmetric tasks for verifying and applying security transforms, such as encryption. An ACE_Module provides a uniform and flexible composition mechanism that relates instances of these application-defined ACE_Task objects into

  • A "reader" side to process messages sent upstream to the ACE_Module layer

  • A "writer" side to process messages sent downstream to the ACE_Module layer

The two tasks that comprise a bidirectional module are referred to as siblings . In the encryption example, for instance, the reader task would verify and decrypt received data while its sibling writer task would encrypt data before it's written. An ACE_Module would be composed from one of each of these tasks, and configured into the stream appropriately.

In cases where a layer actively processes data in only one direction, the inactive sibling task can be specified as a NULL pointer, which triggers the ACE_Module to install an ACE_Thru_Task that simply forwards all messages to the next module's task without modifying them. This design maintains the layering even when layers are rearranged, added, or removed.

Example

This example develops a utility program called display_logfile , which reads log records stored by our logging servers, formats the information, and prints it in a human-readable format. As shown on page 90 in C++NPv1, most fields in a log record are stored in a CDR-encoded binary format, which is concise but not easily understood by humans . To implement the display_logfile program, we implement the following classes derived from ACE_Module :

  • Logrec Reader is a module that converts the log records in a logfile into a canonical format that's then passed to, and processed by, other modules in an ACE_Stream .

  • Logrec Formatter is a module that determines how the fields in the log record will be formatted, for example by converting them from binary to ASCII .

  • Logrec Separator is a module that inserts message blocks containing a separator string between the existing message blocks in a composite log record message.

  • Logrec Writer is a module that prints formatted log record messages to the standard output, where they can be redirected to a file, printer, or console.

Figure 9.3 illustrates the structure of the modules in the ACE_Stream that comprise the display_logfile program. This program uses a producer/consumer concurrency model, where the Logrec_Reader and the Logrec_Writer run as active objects that produce and consume messages, respectively. Figure 9.3 also illustrates the structure of the composite ACE_Message_Block created by the Logrec Reader module and manipulated by the other filter modules in the ACE_Stream .

Figure 9.3. The ACE_Stream Structure for the display_logfile Program

In the example below, we inherit from ACE_Task and ACE_Module to create the Logrec Reader , Logrec Formatter , Logrec Separator , and Logrec Writer modules. The following Logrec_Module template and LOGREC MODULE macro are used to simplify many of the examples below:

 template <class TASK>  class Logrec_Module : public ACE_Module<ACE_MT_SYNCH> {  public:    Logrec_Module (const ACE_TCHAR *name)      : ACE_Module<ACE_MT_SYNCH>                      (name,                       &task_, // Initialize writer-side task.                       0,      // Ignore reader-side task.                       0,                       ACE_Module<ACE_MT_SYNCH>::M_DELETE_READER) {}  private:    TASK task_;  };  #define LOGREC_MODULE(NAME) \    typedef Logrec_Module<NAME> NAME##_Module 

Since the flow of messages is unidirectional (from Logrec Reader to Logrec Writer )we initialize only the writer-side task in the ACE_Module constructor. By passing a NULL pointer to the reader-side task parameter, the ACE_Module constructor will create an instance of ACE_Thru_Task that forwards all messages along without modifying them. The M_DELETE_READER flag instructs the ACE_Module destructor to only delete the reader-side task and not the writer-side task.

Logrec_Reader_Module. This module contains an instance of the Logrec_Reader task class that performs the following activities:

  1. It opens the designated logfile.

  2. It activates the Logrec_Reader instance to be an active object.

  3. It converts the log records in the logfile into a set of chained message blocks, each containing a field from the demarshaled log record, which is then processed by subsequent modules in an ACE_Stream .

The Logrec_Reader class is shown below:

 class Logrec_Reader : public ACE_Task<ACE_MT_SYNCH> {  private:    ACE_TString filename_; // Name of logfile.    ACE_FILE_IO logfile_;  // File containing log records.  public:    enum {MB_CLIENT = ACE_Message_Block::MB_USER,          MB_TYPE, MB_PID, MB_TIME, MB_TEXT};    Logrec_Reader (const ACE_TString &file): filename_ (file) {}    // ... Other methods shown below ...  }; 

We define five enumerators to identify the fields in a log record. Rather than store the field indicator in the data itself, we use the ACE_Message_Block type member to indicate the field type. ACE_Message_Block defines two ranges of type values ” normal and priority ”with a number of values in each range. It also defines a third range for user-defined message types. We initialize MB_CLIENT to the value of ACE_Message_Block::MB_USER , which is the first user-defined message type value guaranteed not to conflict with other values defined within ACE itself.

The Logrec_Reader::open() hook method opens the designated logfile and converts the task into an active object, as shown below.

 virtual int open (void *) {    ACE_FILE_Addr name (filename_.c_str ());    ACE_FILE_Connector con;    if (con.connect (logfile_, name) == -1) return -1;    return activate ();  } 

Logrec_Reader::svc() runs in the active object thread. It reads log records from the logfile, demarshals and stores each one in a composite message block, and passes each composite message block up the stream for further processing by other modules. The logfile is written as a series of log records, each consisting of:

  • A string containing the name of the client that sent the log record

  • A CDR-encoded ACE_Log_Record (see page 79 of C++NPv1 for the marshaling code of the record)

Each log record follows the previous one with no inter-record marker. As shown below, Logrec_Reader::svc() reads the file contents in large chunks and demarshals them as a stream of data. A for loop reads the file contents until EOF is reached. An inner for loop demarshals log records from the data chunks.

 1  virtual int svc () {   2    const size_t FILE_READ_SIZE = 8 * 1024;   3    ACE_Message_Block mblk (FILE_READ_SIZE);   4   5    for (;; mblk.crunch ()) {   6      ssize_t bytes_read = logfile_.recv (mblk.wr_ptr (),   7                                          mblk.space ());   8      if (bytes_read <= 0) break;   9      mblk.wr_ptr (ACE_static_cast (size_t, bytes_read));  10      for (;;) {  11        size_t name_len = ACE_OS_String::strnlen  12                            (mblk.rd_ptr (), mblk.length ());  13        if (name_len == mblk.length ()) break;  14  15        char *name_p = mblk.rd_ptr ();  16        ACE_Message_Block *rec = 0, *head = 0, *temp = 0;  17        ACE_NEW_RETURN  18          (head, ACE_Message_Block (name_len, MB_CLIENT), 0);  19        head->copy (name_p, name_len);  20        mblk.rd_ptr (name_len + 1);   // Skip nul also  21  22        size_t need = mblk.length () + ACE_CDR::MAX_ALIGNMENT;  23        ACE_NEW_RETURN (rec, ACE_Message_Block (need), 0);  24        ACE_CDR::mb_align (rec);  25        rec->copy (mblk.rd_ptr (), mblk.length ());  26  27        ACE_InputCDR cdr (rec); rec->release ();  28        ACE_CDR::Boolean byte_order;  29        if (!cdr.read_boolean (byte_order)) {  30          head->release (); mblk.rd_ptr (name_p); break;  31        }  32        cdr.reset_byte_order (byte_order);  33  34        ACE_CDR::ULong length;  35        if (!cdr.read_ulong (length)) {  36          head->release (); mblk.rd_ptr (name_p); break;  37        }  38        if (length > cdr.length ()) {  39          head->release (); mblk.rd_ptr (name_p); break;  40        }  41        ACE_NEW_RETURN  42          (temp, ACE_Message_Block (length, MB_TEXT), 0);  43        ACE_NEW_RETURN  44          (temp,  45           ACE_Message_Block (2 * sizeof (ACE_CDR::Long),  46                              MB_TIME, temp), 0);  47        ACE_NEW_RETURN  48          (temp,  49           ACE_Message_Block (sizeof (ACE_CDR::Long),  50                              MB_PID, temp), 0);  51        ACE_NEW_RETURN  52          (temp,  53           ACE_Message_Block (sizeof (ACE_CDR::Long),  54                              MB_TYPE, temp), 0);  55        head->cont (temp);  56        // Extract the type...  57        ACE_CDR::Long *lp = ACE_reinterpret_cast  58                          (ACE_CDR::Long *, temp->wr_ptr ());  59        cdr >> *lp;  60        temp->wr_ptr (sizeof (ACE_CDR::Long));  61        temp = temp->cont ();  62        // Extract the PID...  63        lp = ACE_reinterpret_cast  64            (ACE_CDR::Long *, temp->wr_ptr ());  65        cdr >> *lp;  66        temp->wr_ptr (sizeof (ACE_CDR::Long));  67        temp = temp->cont ();  68        // Extract the timestamp...  69        lp = ACE_reinterpret_cast  70          (ACE_CDR::Long *, temp->wr_ptr ());  71        cdr >> *lp; ++lp; cdr >> *lp;  72        temp->wr_ptr (2 * sizeof (ACE_CDR::Long));  73        temp = temp->cont ();  74        // Extract the text length, then the text message  75        ACE_CDR::ULong text_len;  76        cdr >> text_len;  77        cdr.read_char_array (temp->wr_ptr (), text_len);  78        temp->wr_ptr (text_len);  79  80        if (put_next (head) == -1) break;  81        mblk.rd_ptr (mblk.length () - cdr.length ());  82      }  83    }  84  85    ACE_Message_Block *stop = 0;  86    ACE_NEW_RETURN  87      (stop,  88       ACE_Message_Block (0, ACE_Message_Block::MB_STOP), 0);  89    put_next (stop);  90    return 0;  91  } 

Lines 5 “9 Begin the loop that reads the file's contents into an ACE_Message_Block , using the space() method to find out how much free space is available in the block. The block's write pointer is updated to reflect the added data. The final clause in the for loop uses the ACE_Message_Block::crunch() method to shift any data in mblk to the beginning of the block's data buffer so there's room to append more data.

Lines 10 “20 Begin the log record demarshaling loop. We use the ACE_OS_String::strnlen() method to find the length of the hostname string, but will only look through the characters remaining in mblk .If mblk doesn't contain the whole name, we break out of the loop to read more data from the file. If the name is there, we remember the pointer where the name starts ( name_p ) in case we need to get more data and restart the demarshaling. The head ACE_Message_Block is allocated to hold the name; it will become the first block in the message block chain that's passed up the stream.

Lines 22 “25 The mblk read pointer is now at the start of the CDR-encoded ACE_Log_Record . Recall from Chapter 4 of C++NPv1 that a buffer from which ACE's CDR classes will demarshal data must be properly aligned. The current alignment of the mblk read pointer is unknown, and not likely to be properly aligned. We therefore allocate a new ACE_Message_Block large enough to hold the remaining data in mblk , plus any needed bytes for CDR alignment. After calling ACE_CDR::mb_align() to align the new block's write pointer, we copy the remainder of the file data.

Lines 27 “32 Create an ACE_InputCDR object to demarshal the log record contents from rec . Since the ACE_InputCDR constructor increments the reference count on rec , we call rec->release() to release our reference to it, preventing a memory leak. The first item demarshaled is the byte order indicator. It's used to reset the byte order of cdr so the remaining data can be demarshaled properly.

Lines 34 “39 The header written for the marshaled ACE_Log_Record contains a length field designating the number of bytes in the marshaled log record. That value is demarshaled and compared to the number of bytes remaining in the cdr object. If not all of the needed bytes are present, release the client name block ( head ), reset the mblk read pointer to restart at the host name on the next demarshaling pass, and break out of the record-demarshaling loop to read more data from the file.

Lines 41 “55 Allocate message blocks for all of the remaining log record fields. Each has the correct block type for proper identification in modules further down the stream. They're allocated in reverse order to make it easier to supply the cont pointer. When the last one is allocated, therefore, it's the first block in the chain after head and is connected via its continuation pointer.

Lines 57 “61 Demarshal the type field into the first message block by casting the message block's write pointer to ACE_CDR::Long * and use the CDR extraction operator to demarshal the type field. Adjust the block's write pointer to reflect the data just read and move to the next message block in the chain.

Lines 63 “73 The same technique is used to demarshal the process ID and timestamp.

Lines 75 “78 The log record's data portion was marshaled as a counted sequence of bytes. The length of the sequence is demarshaled, and the bytes themselves are then demarshaled into the final message block in the chain.

Line 80 Use put_next() to pass the message block to the next module in the stream for further processing. Sidebar 60 (page 308) explains how put_next() works.

Line 81 Move the file-content block's read pointer up past all of the data that was just extracted. The cdr object has been adjusting its internal pointers as a result of the CDR operations, so the length() method indicates how much data is left. Since the beginning length was the same as mblk.length() (which has not been adjusted during all the demarshaling), we can determine how much of the original message block was consumed.

Lines 85 “89 The entire file is processed, so send a 0- sized ACE_Message_Block with type MB_STOP down the stream. By convention, this message block instructs other modules in the stream to stop their processing.

Sidebar 60: ACE_Task Methods Related to ACE Streams Framework

The ACE_Task described in Section 6.3 also contains the following methods that can be used in conjunction with the ACE Streams framework:

Method

Description

module()

Returns a pointer to the task's module if there is one, else 0

next()

Returns a pointer to the next task in a stream if there is one, else 0

sibling()

Returns a pointer to a task's sibling in a module

put_next()

Passes a message block to the adjacent task in a stream

can_put()

Returns 1 if a message block can be enqueued via put_next() without blocking due to intrastream flow control, else 0

reply()

Passes a message block to the sibling task's adjacent task of a stream, which enables a task to reverse the direction of a message in a stream

An ACE_Task that's part of an ACE_Module can use put_next() to forward a message block to an adjacent module. This method follows the module's next() pointer to the right task, then calls its put() hook method, passing it the message block. The put() method borrows the thread from the task that invoked put_next() . If a task runs as an active object, its put() method can enqueue the message on the task's message queue and allow its svc() hook method to handle the message concurrently with respect to other processing in a stream. Sidebar 62 (page 317) outlines the concurrency models supported by the ACE Streams framework.

Since the name of the logfile is passed to the Logrec_Reader_Module constructor, we can't use the LOGREC _ MODULE macro (page 303). Instead, we define the class explicitly, as shown below:

 class Logrec_Reader_Module : public ACE_Module<ACE_MT_SYNCH> {  public:    Logrec_Reader_Module (const ACE_TString &filename)      : ACE_Module<ACE_MT_SYNCH>                      (ACE_TEXT ("Logrec Reader"),                       &task_, // Initialize writer-side.                       0,      // Ignore reader-side.                       0,                       ACE_Module<ACE_MT_SYNCH>::M_DELETE_READER),        task_ (filename) {}  private:    // Converts the logfile into chains of message blocks.    Logrec_Reader task_;  }; 

Logrec_Formatter_Module. This module contains a Logrec_Formatter task that determines how the log record fields will be formatted, as shown below:

 class Logrec_Formatter : public ACE_Task<ACE_MT_SYNCH> {  private:    typedef void (*FORMATTER[5])(ACE_Message_Block *);    static FORMATTER format_; // Array of format static methods.  public: 

The synchronization trait for a stream's modules and tasks is set by the ACE_Stream that contains them. Since the modules and tasks defined above use ACE_MT_SYNCH , Logrec_Formatter also derives from ACE_Task<ACE_MT_SYNCH> . It runs as a passive object , however, since Logrec_Formatter never calls activate() and its put() method borrows the thread of its caller to format messages, as shown below:

 virtual int put (ACE_Message_Block *mblk, ACE_Time_Value *) {    if (mblk->msg_type () == Logrec_Reader::MB_CLIENT)      for (ACE_Message_Block *temp = mblk;           temp != 0;           temp = temp->cont ()) {        int mb_type =          temp->msg_type () - ACE_Message_Block::MB_USER;        (*format_[mb_type])(temp);      }    return put_next (mblk);  } 

The put() method determines the message type of the ACE_Message_Block forwarded to this module. If the message type is Logrec_Reader::MB_CLIENT , then it's the first in a set of chained blocks containing log record fields. In this case, the method iterates through the record fields invoking the appropriate static methods to convert the fields into a human-readable format. After all the fields are formatted, the message block is passed along to the next module in the stream. If the forwarded block is not of type Logrec_Reader::MB_CLIENT , it's assumed to be the MB_STOP block and is simply forwarded to the next module in the stream.

The following static methods format their corresponding type of field. We start with format_client() :

 static void format_client (ACE_Message_Block *) { return; } 

The MB_CLIENT block has a known-length text string in it. Since this needs no further formatting, format_client() simply returns. Note that the text is in ASCII. Any processing required to translate to a wide-character format or verify the name by looking up its IP address could be added here without disturbing the rest of the stream.

We use the following format_long() method to convert the log record's type and process id into an ASCII representation:

 static void format_long (ACE_Message_Block *mblk) {    ACE_CDR::Long type = * (ACE_CDR::Long *)mblk->rd_ptr ();    mblk->size (11); // Max size in ASCII of 32-bit word.    mblk->reset ();    mblk->wr_ptr ((size_t) sprintf (mblk->wr_ptr (), "%d", type));  } 

The size() method call ensures that there's enough space in the message block to hold the textual representation of the value, reallocating space if necessary. The reset() method call resets the message block's read and write pointers to the start of its memory buffer in preparation for the call to the standard C sprintf() function that does the actual formatting. The sprintf() function returns the number of characters used to format the value, not including the string-terminating NUL character. Since message blocks contain a specific count, we don't include the NUL character in the new length when updating the message block's write pointer.

The format_time() method is more complicated since it converts the seconds and microseconds of the time value into an ASCII string, as follows:

 static void format_time (ACE_Message_Block *mblk) {    ACE_CDR::Long secs = * (ACE_CDR::Long *)mblk->rd_ptr ();    mblk->rd_ptr (sizeof (ACE_CDR::Long));    ACE_CDR::Long usecs = * (ACE_CDR::Long *)mblk->rd_ptr ();    char timestamp[26]; // Max size of ctime_r() string.    time_t time_secs (secs);    ACE_OS::ctime_r (&time_secs, timestamp, sizeof timestamp);    mblk->size (26); // Max size of ctime_r() string.    mblk->reset ();    timestamp[19] = ' 
 static void format_time (ACE_Message_Block *mblk) { ACE_CDR::Long secs = * (ACE_CDR::Long *)mblk->rd_ptr (); mblk->rd_ptr (sizeof (ACE_CDR::Long)); ACE_CDR::Long usecs = * (ACE_CDR::Long *)mblk->rd_ptr (); char timestamp[26]; // Max size of ctime_r() string. time_t time_secs (secs); ACE_OS::ctime_r (&time_secs, timestamp, sizeof timestamp); mblk->size (26); // Max size of ctime_r() string. mblk->reset (); timestamp[19] = '\0'; // NUL-terminate after the time. timestamp[24] = '\0'; // NUL-terminate after the date. size_t fmt_len (sprintf (mblk->wr_ptr (), "%s.%03d %s", timestamp + 4, usecs / 1000, timestamp + 20)); mblk->wr_ptr (fmt_len); } 
'; // NUL-terminate after the time. timestamp[24] = '
 static void format_time (ACE_Message_Block *mblk) { ACE_CDR::Long secs = * (ACE_CDR::Long *)mblk->rd_ptr (); mblk->rd_ptr (sizeof (ACE_CDR::Long)); ACE_CDR::Long usecs = * (ACE_CDR::Long *)mblk->rd_ptr (); char timestamp[26]; // Max size of ctime_r() string. time_t time_secs (secs); ACE_OS::ctime_r (&time_secs, timestamp, sizeof timestamp); mblk->size (26); // Max size of ctime_r() string. mblk->reset (); timestamp[19] = '\0'; // NUL-terminate after the time. timestamp[24] = '\0'; // NUL-terminate after the date. size_t fmt_len (sprintf (mblk->wr_ptr (), "%s.%03d %s", timestamp + 4, usecs / 1000, timestamp + 20)); mblk->wr_ptr (fmt_len); } 
'; // NUL-terminate after the date. size_t fmt_len (sprintf (mblk->wr_ptr (), "%s.%03d %s", timestamp + 4, usecs / 1000, timestamp + 20)); mblk->wr_ptr (fmt_len); }

The final format_string() method is identical to format_client() since it also receives a known-length string. Again, if the string required some manipulation, this processing could easily be added here.

 static void format_string (ACE_Message_Block *) { return; }  }; 

We initialize the array of pointers to formatting methods as follows:

 Logrec_Formatter::FORMATTER Logrec_Formatter::format_ = {    format_client, format_long,    format_long, format_time, format_string  }; 

We use pointers to static methods rather than pointers to nonstatic methods since there's no need to access Logrec_Formatter state.

We next instantiate the LOGREC _ MODULE macro with the Logrec_Formatter class to create the Logrec_Formatter_Module :

 LOGREC_MODULE (Logrec_Formatter); 

Logrec_Separator_Module. This module contains a Logrec_Separator object that inserts message blocks between the existing message blocks in a composite log record message, each new message block contains the separator string.

 class Logrec_Separator : public ACE_Task<ACE_MT_SYNCH> {  private:    ACE_Lock_Adapter<ACE_Thread_Mutex> lock_strategy_;  public: 

This class is a passive object, so Logrec_Separator::put() borrows the thread of its caller to insert the separators:

 1  virtual int put (ACE_Message_Block *mblk,   2                   ACE_Time_Value *) {   3    if (mblk->msg_type () == Logrec_Reader::MB_CLIENT) {   4      ACE_Message_Block *separator = 0;   5      ACE_NEW_RETURN   6        (separator,   7         ACE_Message_Block (ACE_OS_String::strlen ("") + 1,   8                            ACE_Message_Block::MB_DATA,   9                            0, 0, 0, &lock_strategy_), -1);  10      separator->copy ("");  11  12      ACE_Message_Block *dup = 0;  13      for (ACE_Message_Block *temp = mblk; temp != 0; ) {  14        dup = separator->duplicate ();  15        dup->cont (temp->cont ());  16        temp->cont (dup);  17        temp = dup->cont ();  18      }  19      ACE_Message_Block *nl = 0;  20      ACE_NEW_RETURN (nl, ACE_Message_Block (2), 0);  21      nl->copy ("\n");  22      dup->cont (nl);  23      separator->release ();  24    }  25  26    return put_next (mblk);  27  } 

Line 3 If the block's type is MB_STOP , it's forwarded to the next stream module. Since Logrec_Separator is a passive object, it needn't take any action when it's done.

Lines 4 “10 Create a message block to hold the separator string, which is set to "" (a more flexible implementation could make the separator string a parameter to the class constructor). Later in this method, we'll use the ACE_Message_Block::duplicate() method, which returns a " shallow " copy of the message that simply increments its reference count by one, but doesn't actually make a copy of the data itself. Sidebar 61 explains how and why we configured the separator ACE_Message_Block with an ACE_Lock_Adapter<ACE_Thread_Mutex> locking strategy.

Lines 12 “18 Loop through the list of message blocks to insert a duplicate of separator that contains the separator string between each of the original message blocks.

Lines 19 “22 After the separators are inserted, a final block containing a newline is appended to format the output cleanly when it's eventually written to the standard output.

Lines 23 “26 The block from which all the separator blocks were duplicated is released, and the revised composite message is passed along to the next module in the stream.

Sidebar 61: Serializing ACE_Message_Block Reference Counts

If shallow copies of a message block are created and/or released in different threads there's a potential race condition on access to the reference count and shared data. Access to these data must therefore be serialized. Since there are multiple message blocks involved, an external locking strategy is applied. Hence, a message block can be associated with an instance of ACE_Lock_Adapter . Logrec_Separator::put() (page 311) accesses message blocks from multiple threads, so the ACE_Lock_Adapter is parameterized with an ACE_Thread_Mutex . This locking strategy serializes calls to the message block's duplicate() and release() methods to avoid race conditions when a message block is created and released concurrently by different threads. Although Logrec_Separator::put() calls separator->release() before forwarding the message block to the next module, we take this precaution because a subsequent module inserted in the stream may process the blocks using multiple threads.

We now instantiate the LOGREC _ MODULE macro with the Logrec_Separator class to create the Logrec_Separator_Module :

 LOGREC_MODULE (Logrec_Separator); 

Logrec_Writer_Module. This module contains a Logrec_Writer task that performs the following activities:

  • It activates the Logrec_Writer instance to be an active object.

  • It receives formatted log record messages passed by an adjacent module and prints them to its standard output.

The Logrec_Writer class is shown below:

 class Logrec_Writer : public ACE_Task<ACE_MT_SYNCH> {  public:    // Initialization hook method.    virtual int open (void *) { return activate (); } 

The open() hook method converts Logrec_Writer into an active object. The other two methods in this class leverage the fact that its ACE_Task subclass is instantiated with the ACE_MT_SYNCH traits class. For example, Logrec_Writer::put() enqueues messages in its synchronized message queue:

 virtual int put (ACE_Message_Block *mblk, ACE_Time_Value *to)  { return putq (mblk, to); } 

Likewise, Logrec_Writer::svc() runs in its active object thread, dequeueing messages from its message queue and writing them to standard output:

 virtual int svc () {      int stop = 0;      for (ACE_Message_Block *mb; !stop && getq (mb) != -1; ) {        if (mb->msg_type () == ACE_Message_Block::MB_STOP)          stop = 1;        else ACE::write_n (ACE_STDOUT, mb);        put_next (mb);      }      return 0;    }  }; 

When a MB_STOP block is received, the method will break out of the processing loop and return, ending the thread. The ACE::write_n() method prints out all other message blocks chained through their cont() pointers using an efficient gather-write operation. All the message blocks are forwarded to the next module in the stream. By default, the ACE Streams framework releases all message blocks that are forwarded "off the end" of the stream, as discussed in the coverage of ACE_Stream_Tail (page 314).

Finally, we instantiate the LOGREC _ MODULE macro with the Logrec_Writer class to create the Logrec_Writer_Module :

 LOGREC_MODULE (Logrec_Writer); 

The Example portion of Section 9.3 illustrates how all the modules shown above can be configured in an ACE_Stream to create the complete display_logfile program.

Ru-Brd


C++ Network Programming
C++ Network Programming, Volume I: Mastering Complexity with ACE and Patterns
ISBN: 0201604647
EAN: 2147483647
Year: 2002
Pages: 65

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net