16.6 Buffers with Done Conditions

Team-FLY

The bounded buffer implementations of Section 16.3 and Section 16.5 do not have any mechanism for indicating that no more items will be deposited in the buffer. Unending producer-consumer problems occur frequently at the system level. For example, every network router has a buffer between incoming and outgoing packets. The producers are the processes that handle the incoming lines, and the consumers are the processes handling the outgoing lines. A web server is another example of an unending producer-consumer. The web server clients (browsers) are producers of requests. The web server acts as a consumer in handling these requests .

Things are not so simple when the producers or consumers are controlled by more complicated exit conditions. In a producer-driven variation on the producer-consumer problem, there is one producer and an arbitrary number of consumers. The producer puts an unspecified number of items in the buffer and then exits. The consumers continue until all items have been consumed and the producer has exited.

A possible approach is for the producer to set a flag signifying that it has completed its operation. However, this approach is not straightforward, as illustrated by the next exercise.

Exercise 16.7

Consider the following proposed solution to a producer-driven problem. The producer thread produces only numitem values, calls setdone of Program 13.3 on page 454, and exits. The consumer calls getdone on each iteration of the loop to discover whether the producer has completed. What can go wrong?

Answer:

If the producer calls setdone while consumer is blocked on getitem with an empty buffer, the consumer never receives notification and it deadlocks, waiting for an item to be produced. Also, when consumer detects that producer has called setdone , it has no way of determining whether there are items left in the buffer to be processed without blocking.

Both the semaphore implementation of the bounded buffer in Program 16.6 and the condition variable implementation of the bounded buffer in Program 16.10 have no way of unblocking getitem after setdone is called. Program 16.11 shows an implementation that moves the doneflag into the buffer object. The setdone function not only sets the doneflag but also wakes up all threads that are waiting on condition variables . If getitem is called with an empty buffer after the producer has finished, getitem returns the error ECANCELED . The consumer then terminates when it tries to retrieve the next item.

Program 16.11 bufferconddone.c

A buffer that uses condition variables to detect completion .

 #include <errno.h> #include <pthread.h> #include "buffer.h" static buffer_t buffer[BUFSIZE]; static pthread_mutex_t  bufferlock = PTHREAD_MUTEX_INITIALIZER; static int bufin = 0; static int bufout = 0; static int doneflag = 0; static pthread_cond_t items = PTHREAD_COND_INITIALIZER; static pthread_cond_t slots = PTHREAD_COND_INITIALIZER; static int totalitems = 0; int getitem(buffer_t *itemp) {/* remove an item from buffer and put in itemp */    int error;    if (error = pthread_mutex_lock(&bufferlock))       return error;    while ((totalitems <= 0) && !error && !doneflag)       error = pthread_cond_wait (&items, &bufferlock);    if (error) {       pthread_mutex_unlock(&bufferlock);       return error;    }    if (doneflag && (totalitems <= 0)) {       pthread_mutex_unlock(&bufferlock);       return ECANCELED;    }    *itemp = buffer[bufout];    bufout = (bufout + 1) % BUFSIZE;    totalitems--;    if (error = pthread_cond_signal(&slots)) {       pthread_mutex_unlock(&bufferlock);       return error;    }    return pthread_mutex_unlock(&bufferlock); } int putitem(buffer_t item) {                 /* insert an item in the buffer */    int error;    if (error = pthread_mutex_lock(&bufferlock))       return error;    while ((totalitems >= BUFSIZE) && !error && !doneflag)       error = pthread_cond_wait (&slots, &bufferlock);    if (error) {       pthread_mutex_unlock(&bufferlock);       return error;    }    if (doneflag) {               /* consumers may be gone, don't put item in */       pthread_mutex_unlock(&bufferlock);       return ECANCELED;    }    buffer[bufin] = item;    bufin = (bufin + 1) % BUFSIZE;    totalitems++;    if (error = pthread_cond_signal(&items)) {       pthread_mutex_unlock(&bufferlock);       return error;    }    return pthread_mutex_unlock(&bufferlock); } int getdone(int *flag) {                                     /* get the flag */    int error;    if (error = pthread_mutex_lock(&bufferlock))       return error;    *flag = doneflag;    return pthread_mutex_unlock(&bufferlock); } int setdone(void) {       /* set the doneflag and inform all waiting threads */    int error1;    int error2;    int error3;    if (error1 = pthread_mutex_lock(&bufferlock))       return error1;    doneflag = 1;    error1 = pthread_cond_broadcast(&items);              /* wake up everyone */    error2 = pthread_cond_broadcast(&slots);    error3 = pthread_mutex_unlock(&bufferlock);    if (error1)       return error1;    if (error2)       return error2;    if (error3)       return error3;    return 0; } 
Exercise 16.8

Why did we use the same mutex to protect doneflag in getdone and setdone as we used to protect the buffer in getitem and putitem ?

Answer:

The getitem function needs to access doneflag at a time when it owns the bufferlock mutex. Using the same mutex simplifies the program.

Exercise 16.9

Can the mutex calls in getdone and setdone be eliminated?

Answer:

The lock around doneflag in getdone could be eliminated if we knew that access to an int was atomic. We can guarantee that accesses to doneflag are atomic by declaring it to have type sig_atomic_t . In setdone , it is best to do the condition variable broadcasts while owning the lock, and we need to make sure that the threads see that doneflag has been set to 1 when they wake up.

Program 16.12 and Program 16.13 show modifications of producer of Program 16.7 and consumer of Program 16.8 to account for termination. They are linked with Program 16.11, which provides setdone . They handle the error ECANCELED by terminating without calling seterror .

Program 16.12 randproducerdone.c

A producer that detects whether processing should end .

 #include <errno.h> #include <pthread.h> #include "buffer.h" #include "globalerror.h" #include "randsafe.h" int getdone(int *flag); /* ARGSUSED */ static void *producer(void *arg1) {        /* generate pseudorandom numbers */    int error;    buffer_t item;    int localdone = 0;    while (!localdone) {       if (error = randsafe(&item))          break;       if (error = putitem(item))          break;       if (error = getdone(&localdone))          break;    }    if (error != ECANCELED)       seterror(error);    return NULL; } /* --------------- Public functions ---------------------------------------- */ int initproducer(pthread_t *tproducer) {                       /* initialize */    int error;    error = pthread_create(tproducer, NULL, producer, NULL);    return (seterror(error)); } 
Program 16.13 randconsumerdone.c

A consumer that detects whether the buffer has finished .

 #include <errno.h> #include <math.h> #include <pthread.h> #include "buffer.h" #include "globalerror.h" #include "sharedsum.h" /* ARGSUSED */ static void *consumer(void *arg) {                   /* compute partial sums */    int error;    buffer_t nextitem;    double value;    for (  ;  ;  )  {       if (error = getitem(&nextitem))              /* retrieve the next item */          break;       value = sin(nextitem);       if (error = add(value))          break;    }    if (error != ECANCELED)       seterror(error);    return NULL; } /* --------------- Public functions ---------------------------------------- */ int initconsumer(pthread_t *tconsumer) {                       /* initialize */    int error;    error = pthread_create(tconsumer, NULL, consumer, NULL);    return (seterror(error)); } 

Program 16.14 shows a main program that creates a specified number of the producer threads (Program 16.12) and consumer threads (Program 16.13). After creating the threads, main sleeps for a specified amount of time and then calls the setdone function of Program 16.11. The program joins with all the threads to make sure that they have finished their computations before calling showresults of Program 13.8 on page 459 to display the results.

Exercise 16.10

What would happen if randconsumerdone of Program 16.13 called seterror when getitem returned ECANCELED ?

Answer:

The results of the calculation would not be displayed. The showresults function only prints an error message if geterror returns a nonzero value.

Program 16.14 randpcdone.c

A main program that creates producer threads of Program 16.12 and consumer threads of Program 16.13. After sleeping, it calls setdone . The program should use the buffer of Program 16.11 .

 #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include "buffer.h" #include "doneflag.h" #include "globalerror.h" int initconsumer(pthread_t *tid); int initproducer(pthread_t *tid); int showresults(void); int main(int argc, char *argv[]) {    int error;    int i;    int numberconsumers;    int numberproducers;    int sleeptime;    pthread_t *tidc;    pthread_t *tidp;    if (argc != 4) {       fprintf(stderr, "Usage: %s sleeptime producers consumers\n", argv[0]);       return 1;    }    sleeptime = atoi(argv[1]);    numberproducers = atoi(argv[2]);    numberconsumers = atoi(argv[3]);    tidp = (pthread_t *)calloc(numberproducers, sizeof(pthread_t));    if (tidp == NULL) {       perror("Failed to allocate space for producer IDs");       return 1;    }    tidc = (pthread_t *)calloc(numberconsumers, sizeof(pthread_t));    if (tidc == NULL) {       perror("Failed to allocate space for consumer IDs");       return 1;    }    for (i = 0; i < numberconsumers; i++)             /* initialize consumers */       if (error = initconsumer(tidc+i)) {          fprintf(stderr, "Failed to create consumer %d:%s\n",                           i, strerror(error));          return 1;       }    for (i = 0; i < numberproducers; i++)             /* initialize producers */       if (error = initproducer(tidp+i)) {          fprintf(stderr, "Failed to create producer %d:%s\n",                           i, strerror(error));          return 1;       }    sleep(sleeptime);                  /* wait a while to get the partial sum */    if (error = setdone()) {       fprintf(stderr, "Failed to set done indicator:%s\n", strerror(error));       return 1;    }    for (i = 0; i < numberproducers; i++)               /* wait for producers */       if (error = pthread_join(tidp[i], NULL)) {          fprintf(stderr, "Failed producer %d join:%s\n", i, strerror(error));          return 1;       }    for (i = 0; i < numberconsumers; i++)               /* wait for consumers */       if (error = pthread_join(tidc[i], NULL)) {          fprintf(stderr, "Failed consumer %d join:%s\n", i, strerror(error));          return 1;       }    if (showresults())       return 1;    return 0; } 

Program 16.15 shows a second version of main that creates a signal thread of Program 13.14 on page 476 to wait on SIGUSR1 . Program 13.14 should be linked to bufferconddone.c rather than doneflag.c so that it calls the correct setdone . As before, main creates a specified number of the producer and consumer threads of Program 16.12 and Program 16.13. After creating the threads, main waits for the threads to complete by executing pthread_join before displaying the results. The threads continue to compute until the user sends a SIGUSR1 signal from the command line. At this point, the signalthread calls setdone , causing the producers and consumers to terminate.

Program 16.15 randpcsig.c

A main program that creates producer threads of Program 16.12 and consumer threads of Program 16.13. The threads detect done when the user enters SIGUSR1 .

 #include <pthread.h> #include <signal.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include "buffer.h" #include "globalerror.h" #include "sharedsum.h" #include "signalthread.h" int initconsumer(pthread_t *tid); int initproducer(pthread_t *tid); int showresults(void); int main(int argc, char *argv[]) {    int error;    int i;    int numberconsumers;    int numberproducers;    pthread_t *tidc;    pthread_t *tidp;    if (argc != 3) {       fprintf(stderr, "Usage: %s producers consumers\n", argv[0]);       return 1;    }    numberproducers = atoi(argv[1]);    numberconsumers = atoi(argv[2]);    if (error = signalthreadinit(SIGUSR1)) {       perror("Failed to start signalthread");       return 1;    }    fprintf(stderr,"Process %ld will run until SIGUSR1 (%d) signal.\n",                    (long)getpid(), SIGUSR1);    tidp = (pthread_t *)calloc(numberproducers, sizeof(pthread_t));    if (tidp == NULL) {       perror("malloc producer IDs");       return 1;    }    tidc = (pthread_t *)calloc(numberconsumers, sizeof(pthread_t));    if (tidc == NULL) {       perror("malloc consumer IDs");       return 1;    }    for (i = 0; i < numberconsumers; i++)             /* initialize consumers */       if (error = initconsumer(tidc + i)) {          fprintf(stderr, "Failed to create consumer %d:%s\n",                           i, strerror(error));          return 1;       }    for (i = 0; i < numberproducers; i++)             /* initialize producers */       if (error = initproducer(tidp + i)) {          fprintf(stderr, "Failed to create producer %d:%s\n",                           i, strerror(error));          return 1;       }    for (i = 0; i < numberproducers; i++)               /* wait for producers */       if (error = pthread_join(tidp[i], NULL)) {          fprintf(stderr, "Failed producer %d join:%s\n", i, strerror(error));          return 1;       }    for (i = 0; i < numberconsumers; i++)               /* wait for consumers */       if (error = pthread_join(tidc[i], NULL)) {          fprintf(stderr, "Failed consumer %d join:%s\n", i, strerror(error));          return 1;       }    if (showresults())       return 1;    return 0; } 
Team-FLY


Unix Systems Programming
UNIX Systems Programming: Communication, Concurrency and Threads
ISBN: 0130424110
EAN: 2147483647
Year: 2003
Pages: 274

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net