16.4 Introduction to a Simple Producer-Consumer Problem

Team-FLY

This section introduces a simple producer-consumer problem to test the buffer implementations ; the problem is based on Programs 13.6 and 13.7 in Section 13.2.3. The programs approximate the average value of sin(x) on the interval from 0 to 1, using a probabilistic algorithm. The producers calculate random numbers between 0 and 1 and put them in a buffer. Each consumer removes a value x from the buffer and adds the value of sin(x) to a running sum, keeping track of the number of entries summed. At any time, the sum divided by the count gives an estimate of the average value. Simple calculus shows that the exact average value is 1 “ cos(1) or about 0.4597. Using bounded buffers is not a particularly efficient way of solving this problem, but it illustrates many of the relevant ideas needed to solve more interesting problems.

Program 16.7 shows a threaded producer object that uses the bounded buffer defined by Program 16.6. Each producer of Program 16.7 generates random double values and places them in the buffer. The implementation uses the globalerror object of Program 13.4 on page 455 to keep the number of the first error that occurs and uses the thread-safe randsafe of Program 13.2 on page 454 to generate random numbers. The initproducer function, which creates a producer thread, can be called multiple times if multiple producers are needed.

Program 16.8 shows an implementation of a consumer object. The publicly accessible initconsumer function allows an application to create as many consumer threads as desired. In case of an error, the offending thread sets the global error and returns. The other threads continue unless they also detect that an error occurred.

Program 16.9 is a main program that can be used with the producer (Program 16.7) and consumer (Program 16.8) threads as well as the buffersem buffer implementation (Program 16.6). The implementation assumes that no explicit buffer initialization is required. Program 16.9 takes three command-line arguments; a sleeptime in seconds, the number of producer threads and the number of consumer threads. The main program starts the threads, sleeps for the indicated time, and displays the results so far. After sleeping again, the main program displays the results and returns, terminating all the threads. This application illustrates the producer-consumer problem when the threads run forever or until main terminates.

The main program of Program 16.9 can display errors by using strerror rather than strerror_r because it is the only thread making this call. Program 16.9 calls the showresults function of Program 13.8 on page 459 to display the statistics.

Program 16.7 randproducer.c

An implementation of a producer that generates random numbers and places them in a synchronized buffer, such as the one shown in Program 16.6 .

 #include <pthread.h> #include "buffer.h" #include "globalerror.h" #include "randsafe.h" /* ARGSUSED */ static void *producer(void *arg1) {        /* generate pseudorandom numbers */    int error;    buffer_t item;    for (  ;  ;  ) {       if (error = randsafe(&item))          break;       if (error = putitem(item))          break;    }    seterror(error);    return NULL; } /* --------------- Public functions ---------------------------------------- */ int initproducer(pthread_t *tproducer) {                       /* initialize */    int error;    error = pthread_create(tproducer, NULL, producer, NULL);    return (seterror(error)); } 
Exercise 16.5

What happens to the semaphores when Program 16.9 terminates?

Answer:

Since we are using POSIX:SEM unnamed semaphores with pshared equal to 0, the resources of the semaphores are released when the process terminates. If we had been using named semaphores or POSIX:XSI semaphores, they would still exist after the process terminated .

Program 16.8 randconsumer.c

An implementation of a consumer that calculates the sine of double values removed from a shared buffer and adds them to a running sum .

 #include <math.h> #include <pthread.h> #include "buffer.h" #include "globalerror.h" #include "sharedsum.h" /* ARGSUSED */ static void *consumer(void *arg) {                   /* compute partial sums */    int error;    buffer_t nextitem;    double value;    for (  ;  ;  )  {       if (error = getitem(&nextitem))              /* retrieve the next item */          break;       value = sin(nextitem);       if (error = add(value))          break;    }    seterror(error);    return NULL; } /* --------------- Public functions ---------------------------------------- */ int initconsumer(pthread_t *tconsumer) {                       /* initialize */    int error;    error = pthread_create(tconsumer, NULL, consumer, NULL);    return (seterror(error)); } 
Exercise 16.6

Suppose Program 16.9 runs on a machine with a single processor under preemptive priority scheduling. In what order are the items processed if BUFSIZE is 8 and one of the producers starts first?

Answer:

For preemptive priority scheduling, a thread with greater priority than the currently running thread preempts it. If the producer and consumers have the same priority, as in Program 16.9, a producer deposits eight items in the buffer and then blocks. The first consumer then retrieves the first eight items. One of the producers then produces the next 8 items, and so on. This alternation of blocks occurs because the producers and consumers are of equal priority. On the other hand, if the consumers have a higher priority, a consumer preempts the producer after the producer deposits a single item, so the producer and the consumers alternately process individual items. If the producer has higher priority, it fills the buffer with 8 items and then preempts the consumers after each slot becomes available.

Program 16.9 randpcforever.c

A main program that creates any number of producer and consumer threads .

 #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include "buffer.h" #include "globalerror.h" #include "sharedsum.h" int initconsumer(pthread_t *tid); int initproducer(pthread_t *tid); int showresults(void); int main(int argc, char *argv[]) {    int error;    int i;    int numberconsumers;    int numberproducers;    int sleeptime;    pthread_t tid;    if (argc != 4) {       fprintf(stderr, "Usage: %s sleeptime producers consumers\n", argv[0]);       return 1;    }    sleeptime = atoi(argv[1]);    numberproducers = atoi(argv[2]);    numberconsumers = atoi(argv[3]);    for (i = 0; i < numberconsumers; i++)             /* initialize consumers */       if (error = initconsumer(&tid)) {          fprintf(stderr, "Failed to create consumer %d:%s\n",                           i, strerror(error));          return 1;       }    for (i = 0; i < numberproducers; i++)             /* initialize producers */       if (error = initproducer(&tid)) {          fprintf(stderr, "Failed to create producer %d:%s\n",                           i, strerror(error));          return 1;       }    sleep(sleeptime);                          /* wait to get the partial sum */    if (showresults())       return 1;    sleep(sleeptime);                        /* wait again before terminating */    if (showresults())       return 1;    return 0; } 
Team-FLY


Unix Systems Programming
UNIX Systems Programming: Communication, Concurrency and Threads
ISBN: 0130424110
EAN: 2147483647
Year: 2003
Pages: 274

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net