I l @ ve RuBoard |
MotivationSemaphores are a powerful mechanism used to lock and/or synchronize access to shared resources in concurrent applications. A semaphore contains a count that indicates the status of a shared resource. Application designers assign the meaning of the semaphore's count, as well as its initial value. Semaphores can therefore be used to mediate access to a pool of resources. Since releasing a semaphore increments its count regardless of the presence of waiters, they are useful for keeping track of events that change shared program state. Threads can make decisions based on these events, even if they've already occurred. Although some form of semaphore mechanism is available on most operating systems, the ACE semaphore wrapper facades resolve issues arising from subtle variations in syntax and semantics across a broad range of environments. Class CapabilitiesThe ACE_Thread_Semaphore and ACE_Process_Semaphore classes portably encapsulate process-scoped and system-scoped semaphores, respectively, in accordance with the Wrapper Facade pattern. These class interfaces are largely the same as the ACE_LOCK* pseudo-class shown in Figure 10.1 on page 209. The constructor is slightly different, however, since semaphore initialization is more expressive than mutexes and readers/writer locks, allowing the semaphore's initial count to be set. The relevant portion of the ACE_Thread_Semaphore API is shown below: class ACE_Thread_Semaphore { public: // Initialize the semaphore, with an initial value of <count>, // a maximum value of <max>, and unlocked by default. ACE_Thread_Semaphore (u_int count = 1, const char *name = 0, void *arg = 0, int max = 0x7FFFFFFF); // ... same as pseudo <ACE_LOCK> signatures. }; The ACE_Process_Semaphore has the same interface, though it synchronizes threads at the system scope rather than at the process scope. These two ACE classes encapsulate OS-native semaphore mechanisms whenever possible, emulating them if the OS platform doesn't support semaphores natively. This allows applications to use semaphores and still be ported to new platforms regardless of the native semaphore support, or lack thereof. Section A.5.2 on page 251 shows an emulation of ACE_Thread_Semaphore on platforms that don't support it natively. The ACE_Null_Semaphore class implements all of its methods as "no-op" inline functions. We implement two of its acquire() methods below: class ACE_Null_Semaphore { public: int acquire () { return 0; } int acquire (ACE_Time_Value *) { errno = ETIME; return -1; } // ... }; Note that the timed version of acquire() returns “1 and sets errno to ETIME to ensure that ACE_Null_Semaphore can be interchanged properly in conjunction with the Strategized Locking pattern [SSRB00]. For the non-null ACE semaphores, the blocking version of acquire() is often used to serialize access to a critical section, whereas the timed version is often used to wait for another thread to update some condition or change some shared state. When using an ACE_Null_Semaphore , however, there's no other thread involved to change a state or condition. Otherwise, a null semaphore would be inappropriate. Returning an error value signifies that the state or condition has not been (and can't be) changed, which is consistent with the behavior of the threaded case in which a timeout occurs before the state or condition is changed. ExampleAlthough semaphores can coordinate the processing of multiple threads, they don't themselves pass any data between threads. Passing data between threads is a common concurrent programming technique, however, so some type of lightweight intraprocess message queueing mechanism can be quite useful. We therefore show a Message_Queue class implementation that provides the following capabilities:
We show the key parts of the Message_Queue class implementation below and showcase the use of ACE_Thread_Semaphore . To simplify the design and evolution of this code, we also apply the following patterns and idioms from POSA2 [SSRB00]:
Most of the patterns from the POSA2 book can be implemented using ACE concurrency and synchronization classes. We start with the definition of the Message_Queue class: class Message_Queue { public: // Default high and low water marks. enum { DEFAULT_LWM = 0, // 0 is the low water mark. DEFAULT_HWM = 16 * 1024 // 16 K is the high water mark. }; // Initialize. Message_Queue (size_t = DEFAULT_HWM, size_t = DEFAULT_LWM); // Destroy. ~Message_Queue (); // Checks if queue is full/empty. int is_full () const; int is_empty () const; // Interface for enqueueing and dequeueing ACE_Message_Blocks. int enqueue_tail (ACE_Message_Block *, ACE_Time_Value * = 0); int dequeue_head (ACE_Message_Block *&, ACE_Time_Value * = 0); private: // Implementations that enqueue/dequeue ACE_Message_Blocks. int enqueue_tail_i (ACE_Message_Block *, ACE_Time_Value * = 0); int dequeue_head_i (ACE_Message_Block *&, ACE_Time_Value * = 0); // Implement the checks for boundary conditions. int is_empty_i () const; int is_full_i () const; // Lowest number before unblocking occurs. int low_water_mark_; // Greatest number of bytes before blocking. int high_water_mark_; // Current number of bytes in the queue. int cur_bytes_; // Current number of messages in the queue. int cur_count_; // Number of threads waiting to dequeue a message. size_t dequeue_waiters_; // Number of threads waiting to enqueue a message. size_t enqueue_waiters_; // C++ wrapper facades to coordinate concurrent access. mutable ACE_Thread_Mutex lock_; ACE_Thread_Semaphore notempty_; ACE_Thread_Semaphore notfull_; // Remaining details of queue implementation omitted.... }; The Message_Queue constructor shown below creates an empty message list and initializes the ACE_Thread_Semaphores to start with a count of O (the mutex lock_ is initialized automatically by its default constructor). Message_Queue::Message_Queue (size_t hwm, size_t lwm) : low_water_mark_ (lwm), high_water_mark (hwm), cur_bytes_(0), cur_count_ (0), dequeue_waiters_ (0), enqueue_waiters_ (0), notempty_ (0), notfull_ (0) { /* Remaining constructor implementation omitted ... */ } The following methods check if a queue is "empty," that is, contains no messages, or "full," that is, contains more than high_water_mark_ bytes in it. These methods, like the others below, are designed in accordance with the Thread-Safe Interface pattern [SSRB00]. We start with the is_empty() and is_full() interface methods: int Message_Queue::is_empty () const { ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, lock_, -1); return is_empty_i (); } int Message_Queue::is_full () const { ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, lock_, -1); return is_full_i (); } These methods acquire the lock_ and then forward the call to one of the following implementation methods: int Message_Queue::is_empty_i () const { return cur_bytes_ <= 0 && cur count_ <= 0; } int Message_Queue::is_full_i () const { return cur _bytes_ >= high_water_mark_; } These methods assume the lock_ is held and actually perform the work. The enqueue_tail() method inserts a new item at the end of the queue and returns a count of the number of messages in the queue. As with the dequeue_head() method, if the timeout parameter is 0, the caller will block until action is possible. Otherwise, the caller will block only up to the amount of time in *timeout . A blocked call can return when a signal occurs or if the time specified in timeout elapses, in which case errno is set to EWOULDBLOCK. int Message_Queue::enqueue_tail (ACE_Message_Block *new_mblk, ACE_Time_Value *timeout) { ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, lock_, -1); int result = 0; // Wait until the queue is no longer full. while (is_full_i () && result != -1) { ++enqueue_waiters_; guard.release (); result = notfull_.acquire (timeout); guard.acquire (); } if (result == -1) { if (enqueue_waiters_ > 0) --enqueue_waiters_; if (errno == ETIME) errno = EWOULDBLOCK; return -1; } // Enqueue the message at the tail of the queue. int queued_messages = enqueue_tail_i (new_mblk); // Tell any blocked threads that the queue has a new item! if (dequeue_waiters_ > 0) { --dequeue_waiters_; notempty_.release (); } return queued_messages; // guard's destructor releases lock_. } The enqueue_tail() method releases the notempty_ semaphore when there's at least one thread waiting to dequeue a message. The actual enqueueing logic resides in enqueue_tail_i() , which we omit here since it's a low-level implementation detail. Note the potential race condition in the time window between the call to not_full_acquire() and reacquiring the guard lock. It's possible for another thread to call dequeue_head () , decrementing enqueue_waiters_ in that small window of time. After the lock is reacquired, therefore, the count is checked to guard against decrementing enqueue_waiters_ below 0. The dequeue_head() method removes the front item from the queue, passes it back to the caller, and returns a count of the number of items still in the queue, as follows : int Message_Queue::dequeue_head (ACE_Message_Block *&first_item, ACE_Time_Value *timeout) { ACE_GUARD_RETURN (ACE_Thread_Mutex, guard, lock_, -1); int result = 0; // Wait until the queue is no longer empty. while (is_empty_i () && result != -1) { ++dequeue_waiters_; guard.release (); result = notempty_.acquire (timeout); guard.acquire (); } if (result == -1) { if (dequeue_waiters_ > 0) --dequeue_waiters if (errno == ETIME) errno = EWOULDBLOCK; return -1; } // Remove the first message from the queue. int queued_messages = dequeue_head_i (first_item); // Only signal if we've fallen below the low water mark. if (cur_bytes_ <= low_water_mark_ && enqueue_waiters_ > 0) { enqueue_waiters_--; notfull_.release (); } return queued messages; // <guard> destructor releases <lock_> } The Message_Queue class shown above implements a subset of the features in the ACE_Message_Queue and ACE_Message_Queue_Ex , which are presented in [SH]. These ACE message queue classes differ from the Message_Queue implementation shown above in the following ways:
|
I l @ ve RuBoard |