16.3 Buffer Implementation with Semaphores

Team-FLY

A more efficient implementation uses POSIX:SEM semaphores (introduced in Section 14.3). Recall that POSIX:SEM semaphores are not part of the POSIX:THR Extension but can be used with threads. Semaphores differ in several operational respects from the POSIX thread functions. If unsuccessful , the semaphore functions return “1 and set errno . In contrast, the POSIX:THR thread functions return a nonzero error code. The blocking semaphore functions can be interrupted by a signal and are cancellation points for thread cancellation, so you must be careful to handle the effects of signals and cancellation when using semaphores.

The traditional semaphore solution to the producer-consumer problem uses two counting semaphores to represent the number of items in the buffer and the number of free slots, respectively. When a thread needs a resource of a particular type, it decrements the corresponding semaphore by calling sem_wait . Similarly when the thread releases a resource, it increments the appropriate semaphore by calling sem_post . Since the semaphore variable never falls below zero, threads cannot use resources that are not there. Always initialize a counting semaphore to the number of resources initially available.

Program 16.3 shows a bounded buffer that synchronizes its access with semaphores. The semslots semaphore, which is initialized to BUFSIZE , represents the number of free slots available. This semaphore is decremented by producers and incremented by consumers through the sem_wait and sem_post calls, respectively. Similarly, the semitems semaphore, which is initialized to 0, represents the number of items in the buffer. This semaphore is decremented by consumers and incremented by producers through the sem_wait and sem_post calls, respectively.

POSIX:SEM semaphores do not have a static initializer and must be explicitly initialized before they are referenced. The implementation assumes that the bufferinit function will be called exactly once before any threads access the buffer. Program 16.4 and Program 16.5 give alternative implementations of bufferinit that do not make these assumptions.

Program 16.3 illustrates several differences between semaphores and mutex locks. The sem_wait function is a cancellation point, so a thread that is blocked on a semaphore can be terminated . The getitem and putitem functions have no other cancellation points, so the threads cannot be interrupted while the buffer data structure is being modified. Since the mutex is not held very long, a canceled thread quickly hits another cancellation point. The semaphore operations, unlike the mutex operations, can also be interrupted by a signal. If we want to use Program 16.3 with a program that catches signals, we need to restart the functions that can return an error with errno set to EINTR . Because semaphore functions return “1 and set errno rather than returning the error directly, the error handling must be modified.

Program 16.3 bufferseminit.c

A bounded buffer synchronized by semaphores. Threads using these functions may be canceled with deferred cancellation without corrupting the buffer .

 #include <errno.h> #include <pthread.h> #include <semaphore.h> #include "buffer.h" static buffer_t buffer[BUFSIZE]; static pthread_mutex_t  bufferlock = PTHREAD_MUTEX_INITIALIZER; static int bufin = 0; static int bufout = 0; static sem_t semitems; static sem_t semslots; int bufferinit(void) { /* call this exactly once BEFORE getitem and putitem  */    int error;    if (sem_init(&semitems, 0, 0))       return errno;    if (sem_init(&semslots, 0, BUFSIZE)) {       error = errno;       sem_destroy(&semitems);                    /* free the other semaphore */       return error;    }    return 0; } int getitem(buffer_t *itemp) {  /* remove item from buffer and put in *itemp */    int error;    while (((error = sem_wait(&semitems)) == -1) && (errno == EINTR)) ;    if (error)       return errno;    if (error = pthread_mutex_lock(&bufferlock))       return error;    *itemp = buffer[bufout];    bufout = (bufout + 1) % BUFSIZE;    if (error = pthread_mutex_unlock(&bufferlock))       return error;    if (sem_post(&semslots) == -1)       return errno;    return 0; } int putitem(buffer_t item) {                    /* insert item in the buffer */    int error;    while (((error = sem_wait(&semslots)) == -1) && (errno == EINTR)) ;    if (error)       return errno;    if (error = pthread_mutex_lock(&bufferlock))       return error;    buffer[bufin] = item;    bufin = (bufin + 1) % BUFSIZE;    if (error = pthread_mutex_unlock(&bufferlock))       return error;    if (sem_post(&semitems) == -1)       return errno;    return 0; } 

Program 16.3 assumes that programs call bufferinit exactly once before referencing the buffer. Program 16.4 shows an alternative implementation that does not make these assumptions. The code assumes that programs call bufferinitmutex at least once before any thread accesses the buffer. The bufferinitmutex function can be called by each thread when the thread starts execution. The static initializer for the mutex ensures that smutex is initialized before any call. The bufferinitmutex can be called any number of times but initializes the semaphores only once.

Program 16.4 bufferinitmutex.c

An initialization function for bufferseminit.c that can be called more than once .

 #include <pthread.h> static int seminit = 0; static pthread_mutex_t smutex = PTHREAD_MUTEX_INITIALIZER; int bufferinit(void); int bufferinitmutex(void) {                /* initialize buffer at most once */    int error = 0;    int errorinit = 0;    if (error = pthread_mutex_lock(&smutex))        return error;    if (!seminit && !(errorinit = bufferinit()))        seminit = 1;    error = pthread_mutex_unlock(&smutex);    if (errorinit)              /* buffer initialization error occurred first */       return errorinit;    return error; } 
Exercise 16.4

How can we make the initialization of the semaphores completely transparent to the calling program?

Answer:

Make bufferinitmutex have internal linkage by adding the static qualifier. Now getitem and putitem should call bufferinitmutex before calling sem_wait . The initialization is now transparent, but we pay a price in efficiency.

Program 16.5 shows an alternative to bufferinitmutex for providing at-most-once initialization of the buffer in Program 16.3. The implementation uses pthread_once . Notice that initerror isn't protected by a mutex lock, because it will only be changed once and that modification occurs before any call to bufferinitonce returns. Call the bufferinitonce function from each thread when it is created, or just from the main thread before it creates the producer and consumer threads. You can make initialization transparent by calling bufferinitonce at the start of getitem and putitem .

Program 16.5 bufferinitonce.c

An initialization function for bufferseminit.c that uses pthread_once to ensure that initialization is performed only once .

 #include <pthread.h> static int initerror = 0; static pthread_once_t initonce = PTHREAD_ONCE_INIT; int bufferinit(void); static void initialization(void) {    initerror = bufferinit();    return; } int bufferinitonce(void) {                 /* initialize buffer at most once */    int error;    if (error = pthread_once(&initonce, initialization))       return error;    return initerror; } 

Program 16.6 shows an alternative way of making the buffer initialization transparent without the overhead of calling the initialization routine from each putitem and getitem . The initdone variable is declared to be of type volatile sig_atomic_t . The volatile qualifier indicates that the value may change asynchronously to the running thread. The sig_atomic_t type is one that can be accessed atomically.

Program 16.6 buffersem.c

A semaphore buffer implementation that does not require explicit initialization and has low initialization overhead .

 #include <errno.h> #include <pthread.h> #include <semaphore.h> #include <signal.h> #include "buffer.h" static buffer_t buffer[BUFSIZE]; static pthread_mutex_t  bufferlock = PTHREAD_MUTEX_INITIALIZER; static int bufin = 0; static int bufout = 0; static volatile sig_atomic_t initdone = 0; static int initerror = 0; static pthread_once_t initonce = PTHREAD_ONCE_INIT; static sem_t semitems; static sem_t semslots; static int bufferinit(void) { /* called exactly once by getitem and putitem  */    int error;    if (sem_init(&semitems, 0, 0))       return errno;    if (sem_init(&semslots, 0, BUFSIZE)) {       error = errno;       sem_destroy(&semitems);                    /* free the other semaphore */       return error;    }    return 0; } static void initialization(void) {    initerror = bufferinit();    if (!initerror)       initdone = 1; } static int bufferinitonce(void) {          /* initialize buffer at most once */    int error;    if (error = pthread_once(&initonce, initialization))       return error;    return initerror; } int getitem(buffer_t *itemp) {  /* remove item from buffer and put in *itemp */    int error;    if (!initdone)       bufferinitonce();    while (((error = sem_wait(&semitems)) == -1) && (errno == EINTR)) ;    if (error)       return errno;    if (error = pthread_mutex_lock(&bufferlock))       return error;    *itemp = buffer[bufout];    bufout = (bufout + 1) % BUFSIZE;    if (error = pthread_mutex_unlock(&bufferlock))       return error;    if (sem_post(&semslots) == -1)       return errno;    return 0; } int putitem(buffer_t item) {                    /* insert item in the buffer */    int error;    if (!initdone)       bufferinitonce();    while (((error = sem_wait(&semslots)) == -1) && (errno == EINTR)) ;    if (error)       return errno;    if (error = pthread_mutex_lock(&bufferlock))       return error;    buffer[bufin] = item;    bufin = (bufin + 1) % BUFSIZE;    if (error = pthread_mutex_unlock(&bufferlock))       return error;    if (sem_post(&semitems) == -1)       return errno;    return 0; } 

The initdone variable is statically initialized to 0. Its value changes only when the initialization has completed and the value is changed to 1. If the value of initdone is nonzero, we may assume that the initialization has completed successfully. If the value is 0, the initialization may have been done, so we use the bufferinitonce as in Program 16.5. Using initdone lowers the overhead of checking for the initialization once the initialization has completed. It does not require additional function calls once the initialization is complete.

The bounded buffer implementation of this section has no mechanism for termination. It assumes that producers and consumers that access the buffer run forever. The semaphores are not deleted unless an initialization error occurs.

Team-FLY


Unix Systems Programming
UNIX Systems Programming: Communication, Concurrency and Threads
ISBN: 0130424110
EAN: 2147483647
Year: 2003
Pages: 274

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net