I l @ ve RuBoard |
MotivationComplex networked applications often require groups of processes to coordinate to provide a particular service. For example, a multistage workflow automation application may spawn multiple processes to work on different parts of a large problem. One master process may wait for the entire group of worker processes to exit before proceeding with the next stage in the workflow. This is such a common paradigm that ACE provides the ACE_Process_Manager class. Class CapabilitiesThe ACE_Process_Manager class uses the Wrapper Facade pattern to combine the portability and power of ACE_Process with the ability to manage groups of processes as a unit. This class has the following capabilities:
The interface of the ACE_Process_Manager class is shown in Figure 8.4 and its key methods are outlined in the following table:
Figure 8.4. The ACE_Process_Manager Class Diagram
The ACE_Process_Manager can be used in two ways:
ExampleThe example in Section 7.4 illustrated the design and implementation of a reactive logging server that alleviated the drawbacks of a purely iterative server design. Yet another server model for handling client requests is to spawn a new process to handle each client. Process-based concurrency models are often useful when multithreaded solutions are either:
Section 5.2 describes other pros and cons of implementing servers with multiple processes rather than multiple threads. The structure of our multiprocess -based logging server is shown in Figure 8.5. This revision of the logging server uses a process-per-connection concurrency model. It's similar in many ways to the first version in Section 4.4.3 that used an iterative server design. The main difference here is that a master process spawns a new worker process for each accepted connection to the logging service port. The master process then continues to accept new connections. Each worker process handles all logging requests sent by a client across one connection; the process exits when this connection closes . Figure 8.5. Architecture of the Multiprocessing Logging Server
A process-per-connection approach has two primary benefits:
The process-per-connection server shown below works correctly on both POSIX and Win32 platforms. Given the platform differences explained in Section 8.2 and Sidebar 16 on page 164, this is an impressive achievement. Due to the clean separation of concerns in our example logging server's design and ACE's intelligent use of wrapper facades, the differences required for the server code are minimal and well contained. In particular, there's a conspicuous lack of conditionally compiled application code. Due to differing process creation models, however, we must first decide how to run the POSIX process. Win32 forces a program image to run in a new process, where as POSIX does not. On Win32, we keep all of the server logic localized in one program and execute this program image in both the worker and master processes using different command-line arguments. On POSIX platforms, we can either:
To gain a performance advantage and to show how easy ACE makes it to do both ways correctly, we won't run a new program image in the worker processes on POSIX. The process-per-connection logging server code is not particularly involved even though it operates differently on POSIX and Win32. However, the explanations of the event sequences and the details embodied in the ACE wrapper facades that facilitate this simplicity are rather subtle. To make it easier to absorb , we separate the explanation of the Process_Per_Connection_Logging_Server class, the example's process management, and the Logging_Process class. The Process_Per_Connection_Logging_Server class. As with the iterative logging server in Section 4.4.3, we define a class representing our server. We start by including the necessary ACE header files: #include "ace/Log_Record.h" #include "ace/Process.h" #include "ace/Process_Manager.h" #include "ace/Signal.h" #include "Logging_Server.h" We derive the Process_Per_Connection_Logging_Server class from the Logging_Server class defined in Section 4.4.1. We override the run() method to accommodate the two different ways the program can be executed on Win32: as the master process to accept connections and spawn worker processes and as a worker process to service a logging client. The difference is expressed via command-line arguments:
Figures 8.6 and 8.7 depict the master and worker process interactions for POSIX and Win32, respectively. Figure 8.6. Master/Worker Process Creation Sequence for POSIX
Figure 8.7. Master/Worker Process Creation Sequence for Win32
We'll see how command-line parameters are actually passed when we study the Logging_Process class, beginning on page 180. But first, we examine our server's class definition: class Process_Per_Connection_Logging_Server : public Logging_Server { protected: char prog_name_[MAXPATHLEN + 1]; The prog_name_ data member receives the server program's name from its argv[0] command-line argument. We use this name to spawn worker processes that handle new connections. The run() method examines the command line's argument count to decide if the server should run as the master or a worker. If there are two arguments, this is a worker process; otherwise , it's the master process. virtual int run (int argc, char *argv[]) { strncpy (prog_name_, argv[0], MAXPATHLEN); prog_name_[MAXPATHLEN] = 'virtual int run (int argc, char *argv[]) { strncpy (prog_name_, argv[0], MAXPATHLEN); prog_name_[MAXPATHLEN] = '\0'; // Ensure NUL-termination. if (argc == 3) return run_worker (argc, argv); // Only on Win32. else return run_master (argc, argv) ; }'; // Ensure NUL-termination. if (argc == 3) return run_worker (argc, argv); // Only on Win32. else return run_master (argc, argv) ; } The run_master() method is similar to the Logging_Server::run() method; for example, it opens the logging server's listen port and calls handle_connections() to accept new client connections. It does not, however, call the handle_data() hook method, which is always called in the worker process. The master server spawns a new worker process to handle each client's log records, as shown in Figure 8.5. int run_master (int argc, char *argv[]) { u_short logger_port = 0; if (argc == 2) logger_port = atoi (argv[1]); if (open (logger_port) == -1) return -1; for (;;) if (handle_connections () == -1) return -1; return 0; } We inherit the open() method implementation from the Logging_Server base class, which initializes the acceptor endpoint to listen passively on a designated port. Since the wait_for_multiple_events() implementation is a no-op, we simply omit it here and call handle_connections() directly to run the master event loop. The run_worker() method is only executed on Win32. When the worker process is spawned, the master process requests the socket handle be passed to the worker process on the command line. The run_worker() method converts the command-line argument back to a handle, builds an ACE_SOCK_Stream object with the handle, and calls handle_data() to process the client's log records. Since the incoming data type is fixed, it must be converted using a cast. To do so as safely as a platform's C++ compiler allows, ACE provides a set of portable casting macros (including ACE_static_cast used below), that is discussed in Sidebar 17.
int run_worker (int argc, char *argv[]){ ACE_HANDLE socket_handle = ACE_static_cast (ACE_HANDLE, atoi (argv[2])); ACE_SOCK_Stream logging_peer (socket_handle); handle_data (&logging_peer); logging_peer.close (); return 0; } The master server process listens passively for new client logging connections. As in our previous logging servers, the handle_connections() method accepts new connections. In the process-per-connection logging server, however, handle_connections() spawns a new worker process to handle each new client connection. Figure 8.6 on page 174 illustrates the sequence of events that occur when handle_connections() accepts a new connection and spawns a worker process on POSIX. Figure 8.7 on page 175 shows the same sequence on Win32 (in both figures, the name Process_Per_Connection_Logging_Server has been shortened to Logging Server to fit in the available space). The figures and explanation both refer to the Logging_Process class, which is described on page 180. Sidebar 18 on page 178 describes the steps in these figures and the following C++ code segments illustrate how these steps are programmed using ACE_Process_Manager . The implementation of the handle_connections() method of Process_Per_Connection_Logging_Server is shown below: 1 virtual int handle_connections () { 2 ACE_SOCK_Stream logging_peer; 3 if (acceptor ().accept (logging_peer) == -1) 4 return -1; 5 6 Logging_Process *logger = 7 new Logging_Process (prog_name_, logging_peer); 8 ACE_Process_Options options; 9 pid_t pid = ACE_Process_Manager::instance ()->spawn 10 (logger, options); 11 if (pid == 0) { 12 acceptor() .close (); 13 handle_data (&logging_peer); 14 delete logger; 15 ACE_OS::exit (0); 16 } 17 logging_peer.close (); 18 if (pid == -1) 19 ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "spawn ()"), -1); 20 21 return ACE_Process_Manager::instance ()->wait 22 (0, ACE_Time_Value::zero); 23 } We dissect the handle_connections() method implementation below, referring to the steps in Figures 8.6 and 8.7 on page 174 that are explained in Sidebar 18. Lines 24 (Step 1) Call acceptor().accept() ; the call blocks until a client connects. Lines 610 (Steps 24) Spawn a new process to handle the accepted client connection. The work to set the options properly for each platform is done in the Logging_Process::prepare() method shown on page 180. ACE_Process::spawn() calls the proper platform mechanism to create the new process.
Lines 1115 (POSIX Step 5w [1] ) A 0 return value from spawn() is impossible on Win32 since it will either return the new process's PID or 1 on error. These lines are therefore always executed in the context of a fork() 'd POSIX process. The worker process closes its inherited acceptor object since the worker uses only the client's established connection and doesn't accept new connections. The handle_data() method is called to process all of the client's log records, then the Logging_Process object is freed, and the worker process exits. The master process will notice a worker process has exited when it next calls ACE_Process_Manager::wait() on line 21.
Line 17 If the spawn() call returned in the parent (which is always the case for Win32) the logging_peer object is no longer needed, so it's closed. The actual TCP connection is not closed because the following platform-specific behavior is encapsulated in the ACE process wrapper facades:
As usual, the ACE wrapper facades shield application developers from needing to understand these subtle nuances for each OS platform! Lines 2122 (Steps 6 and 7) The ACE_Process_Manager::wait() method checks to see if any worker processes have exited, reaping their status and cleaning up any handles they'd opened. The handle_data() method shown below is identical for all platforms. It puts the client socket into blocking mode, opens the log file to hold the log records, and processes logging records until the logging client closes the socket or an error occurs. Lastly, the log file is closed. virtual int handle_data (ACE_SOCK_Stream *logging_peer) { // Ensure blocking <recv>s. logging_peer->disable (ACE_NONBLOCK); ACE_FILE_IO log_file; make_log_file (log_file, logging_peer); Logging_Handler logging_handler (log_file, *logging_peer); while (logging_handler.log_record () != -1) continue; log_file.close (); return 0; } The Logging_Process class. To set up the new worker process properly, we define a Logging_Process class that's derived from the ACE_Process class described in Section 8.2. Since setup requirements often vary between platforms and applications, ACE_Process provides the prepare() and unmanage() hook methods. Our Logging_Process class uses the prepare() method to pass the new logging client's socket handle to the worker process. It's also the location where we localize any changes if we need to revisit the decision not to run a new program image on POSIX. class Logging_Process : public ACE_Process { private: Logging_Process (); // Force desired constructor to be used. char prog_name_[MAXPATHLEN + 1]; ACE_SOCK_Stream logging_peer_; public: Logging_Process (const char *prog_name, const ACE_SOCK_Stream &logging_peer) : logging_peer_ (logging_peer.get_handle ()) { strcpy (prog_name_, prog_name); } The parameters needed to set up the new worker process are passed to the Logging_Process class's constructor. These include the command name used to spawn the new process and the logging client's ACE_SOCK_Stream to be used by the worker process. Both these parameters are used in the following prepare() hook method, which is called by ACE_Process::spawn() before the new process is spawned. virtual int prepare (ACE_Process_Options &options) { if (options.pass_handle (logging_peer_.get_handle ()) == -1) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "pass_handle()"), -1); options.command_line ("%s", prog_name_); options.avoid_zombies (1); options.creation_flags (ACE_Process_Options::NO_EXEC); return 0; } The prepare() method is illustrated as Step 3 in Figures 8.6 and 8.7 on page 174. Its only argument is a reference to the ACE_Process_Options object that ACE_Process::spawn() is using to spawn the new process. This gives prepare() an opportunity to modify or add to the options as needed. We use prepare() to set the options as follows :
Although some methods have no affect on Win32, we call them anyway so the procedure for setting up the worker process is portable to all ACE platforms that support multiple processes. For a discussion of why this is the correct design (and when it isn't), see Section A.5 on page 248. ACE_Process_Manager encapsulates platform-specific knowledge that determines when a process exits and calls the unmanage() hook method on any process object whose underlying process has exited. The Logging_Process::unmanage() is illustrated as Step 7 in Figures 8.6 and 8.7 on page 174 and is shown below: virtual void unmanage () { delete this; } It simply cleans up the Logging_Process object that was allocated dynamically when the logging client connection was accepted. The logging peer socket handle may have been duplicated, however, when passed to the worker process on Windows 95/98. By encapsulating behavior and state in the ACE_Process class, that handle will be closed in the ACE_Process destructor, so our cleanup code is also portable to all platforms that ACE supports. Finally, we show the main() program, which is a slight extension of our earlier servers. static void sigterm_handler (int /* signum */) { /* No-op. */ } int main (int argc, char *argv[]) { // Register to receive the <SIGTERM> signal. ACE_Sig_Action sa (sigterm_handler, SIGTERM); Process_Per_Connection_Logging_Server server; if (server.run (argc, argv) == -1 && errno != EINTR) ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "server.run()"), 1); // Barrier synchronization. return ACE_Process_Manager::instance ()->wait (); } The ACE_Sig_Action class registers the process to handle the SIGTERM signal, which administrators can use to shutdown the parent server process. Before exiting, the parent calls ACE_Process_Manager::wait() to synchronize on the exits of all worker logging processes before shutting itself down. This barrier synchronization capability could be useful if the parent process needed to write a final time stamp to the logging output device or file. |
I l @ ve RuBoard |