Ru-Brd |
MotivationSection 7.3 focused on how to decouple the functionality of service handlers from the steps required to passively connect and initialize them. It's equally useful to decouple the functionality of service handlers from the steps required to actively connect and initialize them. Moreover, networked applications that communicate with a large number of peers may need to actively establish many connections concurrently, handling completions as they occur. To consolidate these capabilities into a flexible, extensible, and reusable abstraction, the ACE Acceptor-Connector framework defines the ACE_Connector class. Class CapabilitiesACE_Connector is a factory class that implements the Connector role in the Acceptor-Connector pattern [POSA2]. This class provides the following capabilities:
The interface for ACE_Connector is shown in Figure 7.7 (page 230). This template class is parameterized by: Figure 7.7. The ACE_Connector Class
The ACE_Connector class has a flexible interface that can be customized extensively by developers. We therefore group the description of its methods into the two categories described on the following page. 1. Connector initialization, destruction methods, and accessor methods. The following methods are used to initialize, destroy, and access an ACE_Connector :
The ACE_Connector 's constructor and open() method can be passed a flag indicating whether a service handler's IPC endpoint initialized by the connector should start in blocking (the default) or nonblocking ( ACE _ NONBLOCK ) mode. These methods can also be passed the reactor associated with the connector. They use the singleton ACE_Reactor by default, just like the ACE_Svc_Handler and ACE_Acceptor . An ACE_Connector object is closed either when it's destroyed or when its close() method is called explicitly. An ACE_Connector allocates no resources for synchronous connections, so there's nothing to clean up if it's used only synchronously. For asyn-chronous connections, however, these methods release the resources a connector allocates to track pending connections that haven't completed by the time the connector is closed. Each remaining unconnected service handler is also closed by invoking its close() hook method. 2. Connection establishment and service handler initialization methods. The following ACE_Connector methods can be used to establish connections actively and initialize their associated service handlers:
Networked applications use the connect() template method to initiate a connection attempt actively, regardless of whether its completion is handled synchronously or asynchronously. This method uses the following steps to connect and initialize a new service handler:
Each connect() call tries to establish a connection with the designated peer. If connect() gets an immediate indication of connection success or failure, it ignores the ACE_Synch_Options parameter. If it doesn't get an immediate indication of connection success or failure, however, connect() uses its ACE_Synch_Options parameter to vary the completion processing via two orthogonal factors:
If the reactor is used, the connect() method can register both the PEER_CONNECTOR (for connect completion detection) and a timer (for the caller-specified time limit) with the reactor and returns -1 to its caller with errno set to EWOULDBLOCK . The ultimate success or failure of the connect will result in the ACE_Connector::activate_svc_handler() method's activating the handler for success or the service handler's close() method for failure. The application runs the reactor's event loop, and the appropriate call will be made during the course of the reactor's normal event loop processing. If the reactor isn't used, the connect() method doesn't return until the connect attempt completes, fails, or times out. Its success or failure still results in a call to either the activate_svc_handler() method or the service handler's close() method, respectively. The table below summarizes the ACE_Connector 's behavior, based on the ACE_Synch_Options values, if the connect request doesn't complete immediately.
Regardless of how connections are established, any or all of the default make_svc_handler() , connect_svc_handler() , and activate_svc_handler() methods can be overridden by subclasses. This extensible Template Method pattern design allows a range of behavior modification and customization to support many use cases. We describe the three primary variation points in ACE_Connector::connect() on the following page. 1. Obtain a service handler. Since an acceptor is often driven by upcalls from a reactor, its make_svc_handler() factory method usually creates a new service handler. In contrast, a connector can choose to connect and initialize a service handler as either
The default implementation of ACE_Connector::make_svc_handler() handles these two cases, as follows : template <class SVC_HANDLER, class PEER_CONNECTOR> int ACE_Connector<SVC_HANDLER, PEER_CONNECTOR>::make_svc_handler (SVC_HANDLER *&sh) { if (sh == 0) ACE_NEW_RETURN (sh, SVC_HANDLER, -1); sh->reactor (reactor ()); return 0; } This method looks similar to ACE_Acceptor::make_svc_handler() (page 219). However, ACE_Connector::make_svc_handler() allocates a new service handler only if the pointer passed to it by reference is NULL , which enables the client application to determine if the caller or the connector should create the service handler. 2. Connection establishment. The ACE_Connector::connect() template method calls its connect_svc_handler() hook method to initiate a new connection with a peer acceptor. The default implementation of this method simply forwards to the PEER_CONNECTOR::connect() method to initiate the connection, as shown below: template <class SVC_HANDLER, class PEER_CONNECTOR> int ACE_Connector<SVC_HANDLER, PEER_CONNECTOR>::connect_svc_handler (SVC_HANDLER *sh, const ACE_TYPENAME PEER_CONNECTOR::PEER_ADDR &remote_addr, ACE_Time_Value *timeout, const ACE_TYPENAME PEER_CONNECTOR::PEER_ADDR &local_addr, int reuse_addr, int flags, int perms) { return connector_.connect (svc_handler->peer (), remote_addr, timeout, local_addr, reuse_addr, flags, perms); } More powerful implementations of connect_svc_handler() can be defined by subclasses of ACE_Connector . Here are some examples:
3. Service handler activation. The ACE_Connector can activate service handlers in the following two ways, depending on how the connection was initiated:
The default behavior of ACE_Connector::activate_svc_handler() is identical to the ACE_Acceptor::activate_svc_handler() (page 221). This commonality between the ACE_Acceptor and ACE_Connector underscores the power of the ACE Acceptor-Connector framework, which completely decouples passive and active connection establishment from the initialization and use of a service handler. ExampleThis example applies the ACE Acceptor-Connector framework to implement another client logging daemon that extends the one shown in the Example portion of Section 6.2. Rather than using the ad hoc Acceptor-Connector pattern implementation shown in Figure 6.4 (page 169), we use the ACE_Acceptor , ACE_Connector , and ACE_Svc_Handler classes described in this chapter. The resulting implementation is more concise and portable since much of the code we had to write manually in the previous client logging daemon is available as reusable classes from the ACE Acceptor-Connector framework. Our new implementation is also more powerful since it provides an authentication protocol that's interposed transparently to ensure that the client logging daemon is permitted to connect to the server logging daemon. Like the earlier client logging daemon, our new version uses two threads, which perform the following tasks using various ACE framework classes:
The classes comprising the new client logging daemon based on the ACE Acceptor-Connector framework are shown in Figure 7.9. The role of each class is outlined below: Figure 7.9. Classes in the Acceptor-Connector Client Logging Daemon
The interactions between instances of these classes are shown in Figure 7.10 (page 238). When the service is operating, there are two threads. The first is the initial program thread and it runs the reactor event loop. This thread performs the following processing: Figure 7.10. Interactions in the Acceptor-Connector Client Logging Daemon
The forwarder thread is started when the initial connection to the logging server is made and continues to run until the service is terminated . This thread runs the AC_Output_Handler active object service thread. We start our implementation by including the necessary ACE header files. #include "ace/OS.h" #include "ace/Acceptor.h" #include "ace/Connector.h" #include "ace/Get_Opt.h" #include "ace/Handle_Set.h" #include "ace/INET_Addr.h" #include "ace/Log_Record.h" #include "ace/Message_Block.h" #include "ace/Reactor.h" #include "ace/Service_Object.h" #include "ace/Signal.h" #include "ace/Svc_Handler.h" #include "ace/Synch.h" #include "ace/SOCK_Acceptor.h" #include "ace/SOCK_Connector.h" #include "ace/SOCK_Stream.h" #include "ace/Thread_Manager.h" #include "Logging_Handler.h" #include "AC_CLD_export.h" #include <openssl/ssl.h> The classes in Figure 6.4 are defined in the AC_Client_Logging_Daemon.cpp file and described below. AC_Input_Handler. This class provides the following capabilities:
The AC_Input_Handler class is shown below: class AC_Input_Handler : public ACE_Svc_Handler<ACE_SOCK_Stream, ACE_NULL_SYNCH> { public: AC_Input_Handler (AC_Output_Handler *handler = 0) : output_handler_ (handler) {} virtual int open (void *); // Initialization hook method. virtual int close (u_int = 0); // Shutdown hook method. protected: // Reactor hook methods. virtual int handle_input (ACE_HANDLE handle); virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, ACE_Reactor_Mask = 0); // Pointer to the output handler. AC_Output_Handler *output_handler_; // Keep track of connected client handles. ACE_Handle_Set connected_clients_; }; Since AC_Input_Handler uses no per-client state, only a single instance is defined in AC_CLD_Acceptor (page 247) to handle input from all connected clients. We therefore define an ACE_Handle_Set , which is a wrapper facade for fd_set defined in Chapter 7 of C++NPv1. This ACE_Handle_Set keeps track of all connected client socket handles so we can remove them from the singleton reactor when the AC_Input_Handler is closed. Sidebar 53(page 240) discusses the pros and cons of different strategies for using multiple versus single service handlers. Since AC_Input_Handler inherits from ACE_Svc_Handler it can use the singleton ACE_Reactor to wait for log records to arrive from any client applications connected to the client logging daemon via loopback TCP sockets. It therefore doesn't use its message queue, so it instantiates the ACE_Svc_Handler with the ACE_NULL_SYNCH strategy to minimize its synchronizer usage. When a log record arrives at the client logging daemon, the singleton ACE_Reactor dispatches the following AC_Input_Handler::handle_input() hook method: int AC_Input_Handler::handle_input (ACE_HANDLE handle) { ACE_Message_Block *mblk = 0; Logging_Handler logging_handler (handle); if (logging_handler.recv_log_record (mblk) != -1) if (output_handler_->put (mblk->cont ()) != -1) { mblk->cont (0); mblk->release (); return 0; // Success return. } else mblk->release (); return -1; // Error return. }
This method uses the Logging_Handler from Chapter 4 of C++NPv1 to read a log record out of the socket handle parameter, store the record into an ACE_Message_Block , and pass this message to the AC_Output_Handler (page 243), which will queue it and service it in a separate thread. We only enqueue the log record data (which is referenced by mblk->cont() ) and not the hostname (which is referenced by mblk ). If a client application disconnects the socket or an error occurs, handle_input() returns -1. This value triggers the singleton reactor to call the following handle_close() hook method that closes the socket. int AC_Input_Handler::handle_close (ACE_HANDLE handle, ACE_Reactor_Mask) { connected_clients_.clr_bit (handle); return ACE_OS::closesocket (handle); } Note that we needn't delete this object in handle_close() since the AC_Input_Handler 's memory is managed by the AC_CLD_Acceptor (page 247). The handle_close() method also removes the specified handle from the connected_clients_ handle set, where it was added when the connection was opened. When a connection request arrives at the client logging daemon from a client application, AC_CLD_Acceptor::handle_input() will dispatch the following AC_Input_Handler::open() hook method: 1 int AC_Input_Handler::open (void *) { 2 ACE_HANDLE handle = peer ().get_handle (); 3 if (reactor ()->register_handler 4 (handle, this, ACE_Event_Handler::READ_MASK) == -1) 5 return -1; 6 connected_clients_.set_bit (handle); 7 return 0; 8 } Lines 25 The client logging daemon reuses a single AC_Input_Handler object for all of its logging handlers. When AC_CLD_Acceptor::accept_svc_handler() is called to accept the new connection, it therefore reuses the handle in the AC_Input_Handler 's ACE_SOCK_Stream for each connection. We use the three-parameter ACE_Reactor::register_handler() method (page 73) to register a pointer to this single object with the singleton reactor for READ events. When log records arrive, the singleton reactor will dispatch AC_Input_Handler::handle_input() (page 239). Recall from Sidebar 53 that singleton service handlers and multithreaded event dispatching may introduce a race condition. We may lose track of some connections if the AC_Input_Handler class is used with a multithreaded reactor event loop since the AC_Input_Handler object's ACE_SOCK_Stream member can be changed by multiple threads. Each involved event dispatching thread does the following:
If thread B accepts a new connection before thread A completes the two steps above, it's likely that the AC_Input_Handler::open() invocation in thread A will actually register the socket accepted from thread B . If so, the first socket accepted would still be open, but never registered with the reactor and, thus, abandoned . To prevent this race condition, synchronization is needed around the code that accesses the shared ACE_SOCK_Stream object in the AC_Input_Handler . Unfortunately, the synchronization scope includes the two steps listed above and so must be done from their caller, ACE_Acceptor::handle_input() . The use of multiple reactor event dispatching threads would therefore end up requiring a new implementation of handle_input to properly synchronize concurrent connection acceptance. The example in this section therefore uses a single reactor event dispatching thread. Line 6 Record the handle of the connected client in the connected_clients_ ACE_Handle_Set . We keep track of connected client handles so we can remove them from the reactor when AC_Input_Handler::close() is called. This method is called by AC_CLD_Acceptor::handle_close() (page 247) or AC_Client_Logging_Daemon::fini() (page 253) to shut down the client logging daemon: 1 int AC_Input_Handler::close (u_int) { 2 ACE_Message_Block *shutdown_message = 0; 3 ACE_NEW_RETURN 4 (shutdown_message, 5 ACE_Message_Block (0, ACE_Message_Block::MB_STOP), -1); 6 output_handler_->put (shutdown_message); 7 8 reactor ()->remove_handler 9 (connected_clients_, ACE_Event_Handler::READ_MASK); 10 return output_handler_->wait (); 11 } Lines 26 Insert a 0- sized message block of type MB_STOP into the message queue. When the forwarder thread running AC_Output_Handler::svc() (page 244) dequeues this shutdown_message it will flush its remaining log records to the logging server, close the message queue, and exit the thread. Lines 89 Remove all handles in the connected_clients_ handle set in one operation. Each removed handle will generate a reactor callback to AC_Input_Handler::handle_close() on its behalf , where the socket handle is closed. Line 10 Use the output_handler_ 's wait() method to block until its svc() hook method exits before returning from AC_Input_Handler::close() . This method reaps the exit status of the forwarder thread to prevent memory leaks. AC_Output_Handler. This class provides the following capabilities:
The AC_Output_Handler class is shown below: class AC_Output_Handler : public ACE_Svc_Handler<ACE_SOCK_Stream, ACE_MT_SYNCH> { public: enum { QUEUE_MAX = sizeof (ACE_Log_Record) * ACE_IOV_MAX }; virtual int open (void *); // Initialization hook. // Entry point into the <AC_Output_Handler>. virtual int put (ACE_Message_Block *, ACE_Time_Value * = 0); protected: // Pointer to connection factory for <AC_Output_Handler>. AC_CLD_Connector *connector_; // Handle disconnects from the logging server. virtual int handle_input (ACE_HANDLE handle); // Hook method forwards log records to server logging daemon. virtual int svc (); // Send buffered log records using a gather-write operation. virtual int send (ACE_Message_Block *chunk[], size_t &count); }; #if !defined (FLUSH_TIMEOUT) #define FLUSH_TIMEOUT 120 /* 120 seconds == 2 minutes. */ #endif /* FLUSH_TIMEOUT */ Since AC_Output_Handler is derived from ACE_Svc_Handler and instantiates its synchronization traits with ACE_MT_SYNCH , it inherits the ACE_SOCK_Stream , ACE_Thread_Manager , and synchronized ACE_Message_Queue , as well as the ability to activate itself to become an active object. AC_Input_Handler::handle_input() (page 239) plays the reactive role in the variant of the Half-Sync/Half-Async pattern we use to structure the concurrency architecture of this client logging daemon. It passes log records to the AC_Output_Handler via the following put() method: int AC_Output_Handler::put (ACE_Message_Block *mb, ACE_Time_Value *timeout) { int result; while ((result = putq (mb, timeout)) == -1) if (msg_queue ()->state () != ACE_Message_Queue_Base::PULSED) break; return result; } This method simply enqueues the message block onto the AC_Output_Handler 's synchronized message queue. If the putq() call is blocked and the message queue is pulsed, it simply reexecutes the putq() call to try again. The following two methods explain how this class uses queue pulsing . The AC_CLD_Connector factory (page 248) initializes the AC_Output_Handler by calling its open() hook method shown below: 1 int AC_Output_Handler::open (void *connector) { 2 connector_ = 3 ACE_static_cast (AC_CLD_Connector *, connector); 4 int bufsiz = ACE_DEFAULT_MAX_SOCKET_BUFSIZ; 5 peer ().set_option (SOL_SOCKET, SO_SNDBUF, 6 &bufsiz, sizeof bufsiz); 7 if (reactor ()->register_handler 8 (this, ACE_Event_Handler::READ_MASK) == -1) 9 return -1; 10 if (msg_queue ()->activate () 11 == ACE_Message_Queue_Base::ACTIVATED) { 12 msg_queue ()->high_water_mark (QUEUE_MAX); 13 return activate (THR_SCOPE_SYSTEM); 14 } else return 0; 15 } Lines 23 Save the pointer to the AC_CLD_Connector factory that called this method. If the connection to the server must be reconnected, the same factory will be used. Lines 46 Increase the connected socket's send buffer to its largest size to maximize throughput over long-delay and/or high-speed networks. Lines 78 Register this object with the singleton reactor so that its handle_input() method (page 246) will be notified immediately if the logging server disconnects. Lines 1013 This method is called each time a new connection to the logging server is established. On the initial connection, the message queue will be in the ACTIVATED state, so we set the message queue's high watermark to sizeof(ACE_Log_Record) x ACE _ IOV _ MAX , just like we do on line 6 of CLD_Handler::open() (page 172). We then spawn a system-scoped thread that runs the AC_Input_Handler::svc() hook method concurrently. Since AC_Input_Handler::close() (page 242) waits for this thread to exit we don't pass the THR _ DETACHED flag to activate() . If the message queue was not in the ACTIVATED state, however, we know the logging server connection was reestablished. In this case, the message queue's high watermark was already set and the service thread is already executing. We now show the AC_Output_Handler::svc() hook method, which runs in its own thread and forwards log records to the server logging daemon. As shown below, this method optimizes network throughput by buffering log records until a maximum number have arrived or a maximum time elapses. 1 int AC_Output_Handler::svc () { 2 ACE_Message_Block *chunk[ACE_IOV_MAX]; 3 size_t message_index = 0; 4 ACE_Time_Value time_of_last_send (ACE_OS::gettimeofday ()); 5 ACE_Time_Value timeout; 6 ACE_Sig_Action no_sigpipe ((ACE_SignalHandler) SIG_IGN); 7 ACE_Sig_Action original_action; 8 no_sigpipe.register_action (SIGPIPE, &original_action); 9 10 for (;;) { 11 if (message_index == 0) { 12 timeout = ACE_OS::gettimeofday (); 13 timeout += FLUSH_TIMEOUT; 14 } 15 ACE_Message_Block *mblk = 0; 16 if (getq (mblk, &timeout) == -1) { 17 if (errno == ESHUTDOWN) { 18 if (connector_->reconnect () == -1) break; 19 continue; 20 } else if (errno != EWOULDBLOCK) break; 21 else if (message_index == 0) continue; 22 } else { 23 if (mblk->size () == 0 24 && mblk->msg_type () == ACE_Message_Block::MB_STOP) 25 { mblk->release (); break; } 26 chunk[message_index] = mblk; 27 ++message_index; 28 } 29 if (message_index >= ACE_IOV_MAX 30 (ACE_OS::gettimeofday () - time_of_last_send 31 >= FLUSH_TIMEOUT)) { 32 if (send (chunk, message_index) == -1) break; 33 time_of_last_send = ACE_OS::gettimeofday (); 34 } 35 } 36 37 if (message_index > 0) send (chunk, message_index); 38 no_sigpipe.restore_action (SIGPIPE, original_action); 39 return 0; 40 } We omit the AC_Output_Handler::send() implementation since it's identical to CLD_Handler::send() (page 175), which sends log records to the logging server and reconnects to the server if the connection is closed during the send. In fact, AC_Output_Handler::svc() is similar to CLD_Handler::forward() (page 173). The primary difference is that lines 17 through 20 above check to see if the queue was pulsed in response to the logging server disconnecting. The pulse is performed by the AC_Output_Handler::handle_input() method below, which is dispatched by the singleton reactor when the server closes a connection to the client logging daemon. When the connection is reestablished via AC_Connector::reconnect() , the ACE_Connector class calls AC_Output_Handler::open() (page 244), which changes the message queue's state back to ACTIVATED . 1 int AC_Output_Handler::handle_input (ACE_HANDLE h) { 2 peer ().close (); 3 reactor ()->remove_handler 4 (h, ACE_Event_Handler::READ_MASK 5 ACE_Event_Handler::DONT_CALL); 6 msg_queue ()->pulse (); 7 return 0; 8 } Line 2 Close the connection to release the socket handle. Lines 35 Remove the socket handle from the reactor since it's no longer valid. Since we've now taken care of all required cleanup and this object is not being deleted, the DONT _ CALL flag is passed to remove_handler() . When the connection to the server logging daemon is reestablished, open() will be called again, and a new socket will be registered for this object. Line 6 To avoid trying to reconnect to the server logging daemon while the forwarder thread may be attempting the same thing, transfer the work to the forwarder thread using the pulse() method (page 166). If the forwarder thread is waiting on the synchronized message queue (page 244) (and therefore unaware that the connection is closed), it will wake up and immediately begin a reconnect. The AC_Input_Handler::handle_input() method can continue to queue message blocks, ensuring that flow control is properly back-propagated to client applications if the connection can't be reestablished for a long amount of time. The ACE_Svc_Handler::close() method will be called automatically by the ACE Acceptor-Connector framework when the thread running the svc() hook method exits. This method cleans up all the dynamically allocated resources, such as the synchronized message queue and its contents, and removes the AC_Output_Handler from the singleton reactor. AC_CLD_Acceptor. This class provides the following capabilities: [1]
The AC_CLD_Acceptor class definition is shown below: class AC_CLD_Acceptor : public ACE_Acceptor<AC_Input_Handler, ACE_SOCK_Acceptor> { public: AC_CLD_Acceptor (AC_Output_Handler *handler = 0) : output_handler_ (handler), input_handler_ (handler) {} protected: typedef ACE_Acceptor<AC_Input_Handler, ACE_SOCK_Acceptor> PARENT; // <ACE_Acceptor> factory method. virtual int make_svc_handler (AC_Input_Handler *&sh); // <ACE_Reactor> close hook method. virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE, ACE_Reactor_Mask = 0); // Pointer to the output handler. AC_Output_Handler *output_handler_; // Single input handler. AC_Input_Handler input_handler_; }; AC_CLD_Acceptor is a subclass of ACE_Acceptor , so it inherits all the capabilities described in Section 7.3. Since we only need one instance of AC_Input_Handler we override the ACE_Acceptor::make_svc_handler() method as follows: int AC_CLD_Acceptor::make_svc_handler (AC_Input_Handler *&sh) { sh = &input_handler_; return 0; } This method sets the service handler to the address of the input_handler_ data member, which ensures there's only one instance of AC_Input_Handler , regardless of the number of clients that connect. The following AC_CLD_Acceptor::handle_close() method is invoked by the reactor automatically if a failure occurs while accepting a connection or registering a handle and event handler with the reactor: 1 int AC_CLD_Acceptor::handle_close (ACE_HANDLE, 2 ACE_Reactor_Mask) { 3 PARENT::handle_close (); 4 input_handler_.close (); 5 return 0; 6} Line 3 Call up to its parent's handle_close() method to close the acceptor. Line 4 Call the input_handler_ 's close() method (page 242) to clean up the AC_Input_Handler 's resources and shut down the AC_Output_Handler 's message queue and svc() thread. AC_CLD_Connector. This class provides the following capabilities:
The AC_CLD_Connector class definition is shown below: class AC_CLD_Connector : public ACE_Connector<AC_Output_Handler, ACE_SOCK_Connector> { public: typedef ACE_Connector<AC_Output_Handler, ACE_SOCK_Connector> PARENT; AC_CLD_Connector (AC_Output_Handler *handler = 0) : handler_ (handler), ssl_ctx_ (0), ssl_ (0) {} virtual AC_CLD_Connector (void) { // Frees the SSL resources. SSL_free (ssl_); SSL_CTX_free (ssl_ctx_); } // Initialize the Connector. virtual int open (ACE_Reactor *r = ACE_Reactor::instance (), int flags = 0); int reconnect (); // Re-establish connection to server. protected: // Connection establishment and authentication hook method. virtual int connect_svc_handler (AC_Output_Handler *svc_handler, const ACE_SOCK_Connector::PEER_ADDR &remote_addr, ACE_Time_Value *timeout, const ACE_SOCK_Connector::PEER_ADDR &local_addr, int reuse_addr, int flags, int perms); // Pointer to <AC_Output_Handler> we're connecting. AC_Output_Handler *handler_; // Address at which logging server listens for connections. ACE_INET_Addr remote_addr_; // The SSL "context" data structure. SSL_CTX *ssl_ctx_; // The SSL data structure corresponding to authenticated SSL // connections. SSL *ssl_; }; The CLD_Connector::open() method implementation performs the canonical ACE_Connector initialization, in addition to using OpenSSL to establish the client's identity. #if !defined (CLD_CERTIFICATE_FILENAME) # define CLD_CERTIFICATE_FILENAME "cld-cert.pem" #endif /* !CLD_CERTIFICATE_FILENAME */ #if !defined (CLD_KEY_FILENAME) # define CLD_KEY_FILENAME "cld-key.pem" #endif /* !CLD_KEY_FILENAME */ int AC_CLD_Connector::open (ACE_Reactor *r, int flags) { if (PARENT::open (r, flags) != 0) return -1; OpenSSL_add_ssl_algorithms (); ssl_ctx_ = SSL_CTX_new (SSLv3_client_method ()); if (ssl_ctx_ == 0) return -1; if (SSL_CTX_use_certificate_file (ssl_ctx_, CLD_CERTIFICATE_FILENAME, SSL_FILETYPE_PEM) <= 0 SSL_CTX_use_PrivateKey_file (ssl_ctx_, CLD_KEY_FILENAME, SSL_FILETYPE_PEM) <= 0 !SSL_CTX_check_private_key (ssl_ctx_)) return -1; ssl_ = SSL_new (ssl_ctx_); if (ssl_ == 0) return -1; return 0; } This code initializes and validates the OpenSSL data structures using essentially the same logic as the implementation of TPC_Logging_Acceptor::open() (page 224). Unlike CLD_Connector , AC_CLD_Connector needn't implement a connect() method. Instead, it reuses the ACE_Connector::connect() template method. When a connection is completed via ACE_Connector , the framework calls the following AC_CLD_Connector::connect_svc_handler() hook method, which uses OpenSSL to implement an authentication protocol that ensures the client logging daemon is permitted to connect with the server logging daemon. The server's identity is also verified . The server logging daemon's end of this protocol appears in TPC_Logging_Acceptor::accept_svc_handler() (page 222) and the client logging daemon's end is shown below: 1 int AC_CLD_Connector::connect_svc_handler 2 (AC_Output_Handler *svc_handler, 3 const ACE_SOCK_Connector::PEER_ADDR &remote_addr, 4 ACE_Time_Value *timeout, 5 const ACE_SOCK_Connector::PEER_ADDR &local_addr, 6 int reuse_addr, int flags, int perms) { 7 if (PARENT::connect_svc_handler 8 (svc_handler, remote_addr, timeout, 9 local_addr, reuse_addr, flags, perms) == -1) return -1; 10 SSL_clear (ssl_); 11 SSL_set_fd (ssl_, ACE_reinterpret_cast 12 (int, svc_handler->get_handle ())); 13 14 SSL_set_verify (ssl_, SSL_VERIFY_PEER, 0); 15 16 if (SSL_connect (ssl_) == -1 17 SSL_shutdown (ssl_) == -1) return -1; 18 remote_addr_ = remote_addr; 19 return 0; 20 } Lines 79 Establish the TCP connection using the default connect_svc_handler() . Lines 1012 Reset the SSL data structures for use with a new SSL connection. Line 14 Configure the SSL data structures so that authentication of the server is performed and enforced when establishing the SSL connection. Line 16 Perform the actual SSL connection/negotiation. If authentication of the server fails, the SSL_connect() call will fail. Line 17 Shutdown the SSL connection if authentication succeeds. Since we don't actually encrypt the data we can communicate through the TCP stream from here on. If data encryption is also required, Sidebar 52 (page 227) describes how the ACE wrapper facades for OpenSSL can be applied. Line 18 Save the address of the connected server logging daemon in case the connection needs to be reestablished using the AC_CLD_Connector::reconnect() method discussed below. By overriding the open() and connect_svc_handler() hook methods, we can add authentication to our client logging daemon without affecting any other part of its implementation. This extensibility illustrates the power of the Template Method pattern used in the ACE_Connector class design. The following AC_CLD_Connector::reconnect() method uses the same exponential backoff algorithm as CLD_Connector::reconnect() (page 179) to avoid swamping a logging server with connection requests : int AC_CLD_Connector::reconnect () { // Maximum number of times to retry connect. const size_t MAX_RETRIES = 5; ACE_Time_Value timeout (1); size_t i; for (i = 0; i < MAX_RETRIES; ++i) { ACE_Synch_Options options (ACE_Synch_Options::USE_TIMEOUT, timeout); if (i > 0) ACE_OS::sleep (timeout); if (connect (handler_, remote_addr_, options) == 0) break; timeout *= 2; // Exponential backoff. } return i == MAX_RETRIES ? -1 : 0; } AC_Client_Logging_Daemon. This class is a facade that integrates the other classes described above to implement the new client logging daemon. Its definition is shown below: class AC_Client_Logging_Daemon : public ACE_Service_Object { protected: // Factory that passively connects the <AC_Input_Handler>. AC_CLD_Acceptor acceptor_; // Factory that actively connects the <AC_Output_Handler>. AC_CLD_Connector connector_; // The <AC_Output_Handler> connected by <AC_CLD_Connector>. AC_Output_Handler output_handler_; public: AC_Client_Logging_Daemon () : acceptor_ (&output_handler_), connector_ (&output_handler_) {} // Service Configurator hook methods. virtual int init (int argc, ACE_TCHAR *argv[]); virtual int fini (); virtual int info (ACE_TCHAR **bufferp, size_t length = 0) const; virtual int suspend (); virtual int resume (); }; AC_Client_Logging_Daemon inherits from ACE_Service_Object . It can therefore be configured dynamically via a svc.conf file that's processed by the ACE Service Configurator framework described in Chapter 5. When an instance of AC_Client_Logging_Daemon is linked dynamically, the ACE Service Configurator framework calls the AC_Client_Logging_Daemon::init() hook method shown below: 1 int AC_Client_Logging_Daemon::init 2 (int argc, ACE_TCHAR *argv[]) { 3 u_short cld_port = ACE_DEFAULT_SERVICE_PORT; 4 u_short sld_port = ACE_DEFAULT_LOGGING_SERVER_PORT; 5 ACE_TCHAR sld_host[MAXHOSTNAMELEN]; 6 ACE_OS_String::strcpy (sld_host, ACE_LOCALHOST); 7 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("p:r:s:"), 0); 8 get_opt.long_option (ACE_TEXT ("client_port"), 'p', 9 ACE_Get_Opt::ARG_REQUIRED); 10 get_opt.long_option (ACE_TEXT ("server_port"), 'r', 11 ACE_Get_Opt::ARG_REQUIRED); 12 get_opt.long_option (ACE_TEXT ("server_name"), 's', 13 ACE_Get_Opt::ARG_REQUIRED); 14 15 for (int c; (c = get_opt ()) != -1;) 16 switch (c) { 17 case 'p': // Client logging daemon acceptor port number. 18 cld_port = ACE_static_cast 19 (u_short, ACE_OS::atoi (get_opt.opt_arg ())); 20 break; 21 case 'r': // Server logging daemon acceptor port number. 22 sld_port = ACE_static_cast 23 (u_short, ACE_OS::atoi (get_opt.opt_arg ())); 24 break; 25 case 's': // Server logging daemon hostname. 26 ACE_OS_String::strsncpy 27 (sld_host, get_opt.opt_arg (), MAXHOSTNAMELEN); 28 break; 29 } 30 31 ACE_INET_Addr cld_addr (cld_port); 32 ACE_INET_Addr sld_addr (sld_port, sld_host); 33 34 if (acceptor_.open (cld_addr) == -1) return -1; 35 AC_Output_Handler *oh = &output_handler_; 36 if (connector_.connect (oh, sld_addr) == -1) 37 { acceptor_.close (); return -1; } 38 return 0; 39 } Lines 36 Assign the default client logging daemon listen port ( cld_port ) and the default server logging daemon port ( sld_port ) and hostname ( sld_host ). These can be changed by arguments passed into this method. In particular, the server logging daemon hostname will often need to be set using the -s option. Lines 729 The ACE_Get_Opt iterator describe in Sidebar 8 (page 47) parses options passed by the svc.conf file. The final parameter of 0 to ACE_Get_Opt ensures option parsing begins at argv[0] rather than argv[1] , which is the default. If any of the "-p" , "-r" ,or "-s" options, or their long option equivalents, are passed in the argv parameter to init() , the appropriate port number or hostname is modified accordingly . Lines 3132 With the port numbers and server logging daemon's hostname now known, form the addresses needed to establish connections. Lines 3437 Initialize the acceptor_ (page 247) and connector_ (page 248). When the client logging daemon is removed, the ACE Service Configurator framework calls the following AC_Client_Logging_Daemon::fini() hook method: int AC_Client_Logging_Daemon::fini () { return acceptor_.close (); } The fini() method calls the close() method inherited by AC_CLD_Acceptor . This method in turn calls AC_CLD_Acceptor::handle_close() to trigger a shutdown of the message queue and forwarder thread. The ACE Service Configurator framework deletes the AC_Client_Logging_Daemon instance after fini() returns. This shutdown sequence is depicted in Figure 7.11. Figure 7.11. AC_Client_Logging_Daemon Shutdown Sequence
Now that we've implemented all the client logging daemon's classes, we can add the ACE _ FACTORY _ DEFINE macro. [2]
ACE_FACTORY_DEFINE (AC_CLD, AC_Client_Logging_Daemon) This macro automatically defines the _make_AC_Client_Logging_Daemon() factory function, which is used in the following svc.conf file: dynamic AC_Client_Logging_Daemon Service_Object * AC_CLD:_make_AC_Client_Logging_Daemon() "-p $CLIENT_LOGGING_DAEMON_PORT" This file directs the ACE Service Configurator framework to configure the client logging daemon via the following steps:
We're now ready to show the main() function, in SR_Configurable_Logging_Server.cpp . It's similar to the Configurable_Logging_Server.cpp program used for other services, but requires slightly different tactics. Sidebar 11 (page 55) discusses some problems that can arise when event handlers don't control their own life cycle. The AC_Client_Logging_Daemon service in this chapter is allocated dynamically by the ACE Configurator framework when it's activated. However, its acceptor and service handlers are member objects in the service, and therefore don't control their own life cycle. Although our design carefully manages all the handlers's life cycle activities, problems can still occur on Windows because of the ACE_WFMO_Reactor deferred cleanup semantics (page 107). We therefore explicitly set the ACE_Reactor singleton to be an ACE_Select_Reactor . 1 #include "ace/OS.h" 2 #include "ace/Reactor.h" 3 #include "ace/Select_Reactor.h" 4 #include "ace/Service_Config.h" 5 6 int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) { 7 ACE_Select_Reactor *select_reactor; 8 ACE_NEW_RETURN (select_reactor, ACE_Select_Reactor, 1); 9 ACE_Reactor *reactor; 10 ACE_NEW_RETURN (reactor, ACE_Reactor (select_reactor, 1), 1); 11 ACE_Reactor::close_singleton (); 12 ACE_Reactor::instance (reactor, 1); 13 14 ACE_Service_Config::open (argc, argv); 15 16 ACE_Reactor::instance ()->run_reactor_event_loop (); 17 return 0; 18 } Lines 78 The reactor implementation-specific examples in Chapter 4 used automatic instances of the desired reactor types. In this example, however, the reactor must persist until after the ACE_Object_Manager terminates services at shutdown time. To ensure this, we allocate the reactor dynamically. The third argument to the ACE _ NEW _ RETURN macro is the value to return from main() if allocation fails. Lines 910 The ACE_Reactor is also allocated dynamically. The ACE_Select_Reactor is passed as the implementation to use. The second argument to the ACE_Reactor constructor tells the new ACE_Reactor instance to delete the implementation object when the reactor closes. Lines 1112 Close any existing ACE_Reactor singleton and replace it with the new ACE_Select_Reactor -based instance. The second argument to ACE_Reactor::instance() explcitly turns control of the ACE_Reactor instance's memory to the ACE_Reactor singleton management mechanism. This design ensures that the reactor singleton shutdown activity generated by the ACE_Object_Manager will close the reactor and free the dynamically allocated memory after the services are shut down. Lines 1416 As usual, configure services and run the reactor event loop. The examples in this chapter have illustrated a number of powerful ACE Acceptor-Connector techniques and strategies:
The capabilities in this chapter's client and server logging daemons could clearly be implemented using different mechanisms and protocols. Due to ACE's highly refined framework design, however, none of the different ways to redesign these daemons will require rewriting the networking code, buffer management, queueing mechanisms, concurrency strategies, or demultiplexing techniques. The only changes are in the networked service implementation itself. |
Ru-Brd |