9.3 The ACE_Stream Class

Ru-Brd

Motivation

The ACE_Module class described in Section 9.2 can be used to decompose a networked application into a series of interconnected , functionally distinct layers . Each module implements a different layer of application-defined functionality, such as a reader, formatter, separator, and writer of log records. ACE_Module provides methods to transfer messages between sibling tasks within a module, as well as between modules. It does not, however, provide a facility to connect or rearrange modules in a particular order. To enable developers to build and manage a series of hierarchically related module layers as a single object, the ACE Streams framework defines the ACE_Stream class.

Class Capabilities

ACE_Stream implements the Pipes and Filters pattern [POSA1] to enable developers to configure and execute hierarchically related services by customizing reusable application-independent framework classes. This class provides the following capabilities:

  • It provides methods to dynamically add, replace, and remove ACE_Module objects to form various stream configurations.

  • It provides methods to send/receive messages to/from an ACE_Stream .

  • It provides a mechanism to connect two ACE_Stream streams together.

  • It provides a way to shut down all modules in a stream and wait for them all to stop.

The interface for ACE_Stream is shown in Figure 9.4. Since this class exports many features of the ACE Streams framework, we group its method descriptions into the three categories described below.

Figure 9.4. The ACE_Stream Class

1. Stream initialization and destruction methods. The following methods initialize and destroy an ACE_Stream :

Method

Description

ACE_Stream() open ()

Initialize a stream.

ACE_Stream() close()

Destroy a stream by releasing its resources and popping (destroying) each module.

wait()

Synchronize with the final close of a stream.

By default, two ACE-created ACE_Module objects are installed in an ACE_Stream when it's initialized , one at the head of the stream and the other at the tail. The head module contains ACE_Stream_Head objects in both its reader and writer sides. Likewise, the tail module contains ACE_Stream_Tail objects in both sides. These two classes interpret predefined control messages and data message blocks that can be circulated through an ACE_Stream at run time. The ACE_Message_Block class defines the types of these message blocks as enumerators whose values are all less than MB_USER .

ACE_Stream_Head can queue messages between an application and an instance of ACE_Stream . When a message is passed to the top module's writer-side ACE_Stream_Head , it's forwarded to the next module in the stream. When a message is passed to the top module's reader-side ACE_Stream_Head , it's queued on the task's message queue (unless its type is MB_FLUSH , in which case the message queue is shut down).

To prevent resource leaks when messages reach the end of a stream without being processed , ACE_Stream_Tail::put() releases messages that it receives when acting as writer. This behavior allows the Logrec_Writer module (page 313) to pass its message blocks through to the next task. Most likely, the next task is an ACE_Stream_Tail .If the program is extended to add another module at that point in the stream, however, the module will continue to work as expected.

2. Stream configuration methods. The following methods push and pop ACE_Module objects onto and off of an ACE_Stream :

Method

Description

push()

Add a new module to the stream right below the stream head.

pop()

Remove the module right below the stream head and close it down.

top()

Return a pointer to the module right below the stream head.

insert()

Insert a new module below the designated module.

replace()

Replace the designated module with a new module.

remove()

Remove the designated module from a stream and close it down.

The push() and pop() methods allow applications to configure a stream at run time by inserting or removing an instance of ACE_Module at the top of a stream. When a module is pushed onto a stream, the open() hook methods of the module's writer- and reader-side tasks are called automatically. Likewise, when a module is popped from a stream the close() hook methods of its writer- and reader-side tasks are invoked.

Since a complete stream is represented as an interconnected series of independent modules, it's often useful to insert, remove, and replace modules at any point in a stream. To support these operations, therefore, ACE_Module objects are named in the stream, and the insert() , replace() and remove() methods can manipulate a stream's modules. ACE_Module objects can be composed in essentially arbitrary configurations that satisfy application requirements and enhance module reuse.

3. Stream communication methods. The following methods are used to send and receive messages on an ACE_Stream :

Method

Description

get()

Receive the next message that's stored at the stream head.

put()

Send a message down a stream, starting at the module that's below the stream head.

The get() and put() methods allow applications to send and receive messages on an ACE_Stream object. Similar to inserting message blocks in an ACE_Message_Queue , the blocking behavior of these methods can be modified by passing the following types of ACE_Time_Value values:

Value

Behavior

NULL ACE_Time_Value pointer

Indicates that get() or put() should wait indefinitely, that is, it will block until the method completes.

Non- NULL ACE_Time_Value pointer whose sec() and usec() methods return 0

Indicates that get() or put() should perform a nonblocking operation, that is, if the method doesn't succeed immediately, return -1 and set errno to EWOULDBLOCK .

A non- NULL ACE_Time_Value pointer whose sec() or usec() method returns > 0

Indicates that get() or put() should wait until the absolute time of day, returning -1 with errno set to EWOULDBLOCK if the method does not complete by this time.

When message blocks arrive at an ACE_Stream they can pass through a series of interconnected ACE_Task objects by calling their put_next() methods, as discussed in Sidebar 60 (page 308). Message blocks can originate from various sources, such as an application thread, a reactor or proactor , or an ACE timer queue dispatcher. Sidebar 62 outlines the concurrency models supported by the ACE Streams framework.

Sidebar 62: ACE Streams Framework Concurrency Models

Section 5.6 of C++NPv1 describes the two canonical types of concurrency architectures, task-based and message-based [SS93], which are supported by the ACE Streams framework. For example, a put() method may enqueue a message and defer handling to its task's svc() method that executes concurrently in a separate thread. This approach is illustrated by the task-based architecture shown below:

Conversely, a put() method can borrow the thread of control from its caller to handle a message immediately. This approach is illustrated by the message-based architecture shown in the figure above. A stream's concurrency architecture has a significant impact on performance and ease of programming, as described in [SS95b].

Example

We conclude by showing how to configure the display_logfile program with an ACE_Stream object that contains the modules presented in Section 9.2. The program below creates an ACE_Stream and then pushes all of the modules onto the stream.

 int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) {    if (argc != 2) ACE_ERROR_RETURN                    ((LM_ERROR, "usage: %s logfile\n", argv[0]), 1);    ACE_TString logfile (argv[1]);    ACE_Stream<ACE_MT_SYNCH> stream;    if (stream.push        (new Logrec_Writer_Module (ACE_TEXT ("Writer"))) != -1        && stream.push        (new Logrec_Separator_Module (ACE_TEXT ("Separator"))) != -1        && stream.push        (new Logrec_Formatter_Module (ACE_TEXT ("Formatter"))) != -1        && stream.push        (new Logrec_Reader_Module (logfile)) != -1)      return ACE_Thread_Manager::instance ()->wait () == 0 ? 0 : 1;    return 1;  } 

As each module is pushed onto the stream, the ACE Streams framework calls the open() hook method on its writer-side task, which initializes the task. The Logrec_Writer_Module and Logrec_Reader_Module 's open() hook methods convert their writer tasks into active objects. The main thread doesn't actually take part in the processing. After pushing all the modules onto the stream successfully, it simply waits for the other threads to exit from their active objects when they're done processing the logfile.

The main() function on the previous page allocates a number of modules dynamically at program startup and while processing the logfile. We omit most of the error handling logic from the main() function to save space. Although it's unlikely that this program would exhaust the heap at startup time, a well-designed production application should check for errors and handle them appropriately.

Implementing the display_logfile program with the ACE Streams framework enables the transparent, incremental evolution of application functionality. For example, it's straightforward to implement and configure a Logrec Sorter module that changes the order in which the fields in the log record are displayed. Likewise, the ACE Streams framework reduces the complexity of each module layer in the display_logfile program, thereby simplifying its implementation, testing, and maintenance.

Ru-Brd


C++ Network Programming
C++ Network Programming, Volume I: Mastering Complexity with ACE and Patterns
ISBN: 0201604647
EAN: 2147483647
Year: 2002
Pages: 65

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net