Introduction


Multithreaded applications are a useful paradigm for system development because they offer many facilities not available to traditional GNU/Linux processes. In this chapter, we ll explore pthreads programming and the functionality provided by the pthreads API.

Note  

The 2.4 GNU/Linux kernel POSIX thread library was based upon the LinuxThreads implementation (introduced in 1996), which was built on the existing GNU/Linux process model. The 2.6 kernel utilizes the new Native POSIX Thread Library, or NPTL (introduced in 2002), which is a higher performance implementation with numerous advantages over the older component. For example, NPTL provides real thread groups (within a process), compared to one thread per process in the prior model. We ll outline those differences when it s useful to know.
To know which pthreads library is being used, issue the following command:

 $ getconf GNU_LIBPTHREAD_VERSION 
Note  

This will provide either LinuxThreads or NPTL, each with a version number.

What s a Thread?

To define a thread, let s look back at Linux processes to understand their makeup . Both processes and threads have control flows and can run concurrently, but they differ in some very distinct ways. Threads, for example, share data, where processes explicitly don t. When a process is forked (recall from Chapter 12, Introduction to Sockets Programming), a new process is created with its own globals and stack (see Figure 14.1). When a thread is created, the only new element created is a stack that is unique for the thread (see Figure 14.2). The code and global data are common between the threads. This is advantageous, but the shared nature of threads can also be problematic . We ll investigate this later in the chapter.

click to expand
Figure 14.1: Forking a new process.

A GNU/Linux process can create and manage numerous threads. Each thread is identified by a thread identifier that is unique for every thread in a system. Each thread also has its own stack (as shown in Figure 14.2) and also a unique context (program counter, save registers, and so forth). But since the data space is shared by threads, they share more than just user data. For example, file descriptors for open files or sockets are shared also. Therefore, when a multithreaded application uses a socket or file, the access to the resource must be protected against multiple accesses . We ll look at methods for achieving that in this chapter.

click to expand
Figure 14.2: Creating a new thread.
Note  

While writing multithreaded applications can be easier in some ways than traditional process-based applications, there are problems to understand. The shared data aspect of threads is probably the most difficult to design around, but it is also powerful and can lead to simpler applications with higher performance. The key is to strongly consider shared data while developing threaded applications. Another important consideration is that serious multithreaded application development should utilize the 2.6 kernel rather than the 2.4 kernel (given the new NPTL threads implementation).

Thread Function Basics

The APIs that we ve discussed thus far follow a fairly uniform model of returning “1 when an error occurs, with the actual error value in the errno process variable. The threads API returns 0 on success but a positive value to indicate an error.

The Pthreads API

While the pthreads API is comprehensive, it s quite easy to understand and use. We ll now explore the pthreads API, looking at the basics of thread creation through the specialized communication and synchronization methods that are available.

All multithreaded programs must make the pthread function prototypes and symbols available for use. This is accomplished by including the pthread standard header, as:

 #include <pthread.h> 
Note  

The examples that follow are written for brevity, and in some cases, return values are not checked. To avoid debugging surprises , you are strongly encouraged to check all system call return values and never assume that a function is successful.

Thread Basics

All multithreaded applications must create threads and ultimately destroy them. This is provided in two functions by the pthreads API:

 int  pthread_create  (pthread_t *thread,                            pthread_attr_t *attr,                     void *(*start_routine)(void *), void *arg);     int  pthread_exit  (void *retval); 

The pthread_create function permits the creation of a new thread, while pthread_exit allows a thread to terminate itself. There also is a function to permit one thread to terminate another, but we ll investigate that later.

To create a new thread, we call pthread_create and associate our pthread_t object with a function ( start_routine ). This function represents the top level code that will be executed within the thread. We can optionally provide a set of attributes via pthread_attr_t (via pthread_attr_init ). Finally, the fourth argument ( arg ) is an optional argument that is passed to the thread upon creation.

Let s now look at a short example of thread creation (see Listing 14.1). In our main function, we first create a pthread_t object at line 10. This object represents our new thread. We call pthread_create at line 12 and provide the pthread_t object (which will be filled in by the pthread_create function) in addition to our function that contains the code for the thread (argument 3, myThread ). A zero return indicates successful creation of the thread.

Listing 14.1: Creating a Thread with pthread_create (on the CD-ROM at ./source/_ch14/ptcreate.c )
start example
  1:  #include <pthread.h>  2:  #include <stdlib.h>  3:  #include <stdio.h>  4:  #include <string.h>  5:  #include <errno.h>  6:   7:  int main()  8:  {  9:  int ret;  10:  pthread_t mythread;  11:   12:  ret =  pthread_create  (&mythread, NULL, myThread, NULL);  13:   14:  if (ret != 0) {  15:  printf("Cant create pthread (%s)\n", strerror(errno));  16:  exit(-1);  17:  }  18:   19:  return 0;  20:  } 
end example
 

The pthread_create function returns zero if successful, otherwise a nonzero value is returned. Now let s look at the thread function itself, which will also demonstrate our pthread_exit function (see Listing 14.2). Our thread simply emits a message to stdout that it ran and then terminated at line 6 with pthread_exit .

Listing 14.2: Terminating a Thread with pthread_exit (on the CD-ROM at ./source/_ch14/ptcreate.c )
start example
  1:  void *myThread(void *arg)  2:  {  3:  printf("Thread ran!\n");  4:   5:  /* Terminate the thread */  6:   pthread_exit  (NULL);  7:  } 
end example
 

Our thread didn t use the void pointer argument, but this could be used to provide the thread with a specific personality, passed in at creation (see argument four of line 12 in Listing 14.1). The argument could represent a scalar value or a structure containing a variety of elements. The exit value presented to pthread_exit must not be of local scope, otherwise it won t exist once the thread is destroyed . The pthread_exit function does not return.

Note  

The startup cost for new threads is minimal in the new NPTL implementation, compared to the older LinuxThreads. In addition to significant improvements and optimizations in the NPTL, the allocation of thread memory structures is improved (thread data structures and thread local storage are now provided on the local thread stack).

Thread Management

Before we dig in to thread synchronization and coordination, let s look at a couple of miscellaneous thread functions that can be of use. The first is the pthread_self function, which can be used by a thread to retrieve its unique identifier. Recall in pthread_create that a pthread_t object reference was passed in as the first argument. This permits the thread creator to know the identifier for the thread just created. The thread itself can also retrieve this identifier by calling pthread_self .

 pthread_t  pthread_self  (void); 

Consider the updated thread function in Listing 14.3, which illustrates retrieving the pthread_t handle. At line 5, we call pthread_self to grab the handle and then emit it to stdout at line 7 (converting it to an int ).

Listing 14.3: Retrieving the pthread_t Handle with pthread_self (on the CD-ROM at ./source/ch14/ptcreate.c )
start example
  1:  void *myThread(void *arg)  2:  {  3:  pthread_t pt;  4:   5:  pt =  pthread_self  ();  6:   7:  printf("Thread %x ran!\n", (int)pt);  8:   9:   pthread_exit  (NULL);  10:  } 
end example
 

Most applications require some type of initialization, but with threaded applications, the job can be difficult. The pthread_once function allows a developer to create an initialization routine that is invoked for a multithreaded application only once (even though multiple threads may attempt to invoke it).

The pthread_once function requires two objects: a pthread_once_t object (that has been preinitialized with pthread_once_init ) and an initialization function. Consider the partial example in Listing 14.4. The first thread to call pthread_once will invoke the initialization function ( initialize_app ), but subsequent calls to pthread_once will result in no calls to initialize_app .

Listing 14.4: Providing a Single-use Initialization Function with pthread_once
start example
  1:  #include <pthread.h>  2:   3:  pthread_once_t my_init_mutex = pthread_once_init;  4:   5:  void initialize_app(void)  6:  {  7:  /* Single-time init here */  8:  }  9:   10:  void *myThread(void *arg)  11:  {  12:  ...  13:   14:   pthread_once  (&my_init_mutex, initialize_app);  15:   16:  ...  17:  } 
end example
 
Note  

The number of threads in LinuxThreads was a compile-time option (1000), whereas NPTL supports a dynamic number of threads. NPTL can support up to 2 billion threads on an IA-32 system [Drepper and Molnar03].

Thread Synchronization

The ability to synchronize threads is an important aspect of multithreaded application development. We ll look at a number of methods, but first we ll look at the most basic method, the ability for the creator thread to wait for the created thread to finish (otherwise known as a join ). This activity is provided by the pthread_join API function. When called, the pthread_join call suspends the calling thread until a join is complete. When the join is done, the caller receives the joined thread s termination status as the return from pthread_join . The pthread_join function (somewhat equivalent to the wait function for processes) has the following prototype:

 int  pthread_join  (pthread_t th, void **thread_return); 

The th argument is the thread to which we wish to join. This argument is returned from pthread_create or passed via the thread itself via pthread_self . The thread_return can be NULL , which means we ll not capture the return status of the thread. Otherwise, the return value from the thread is stored in thread_return .

Note  

A thread is automatically joinable when using the default attributes of pthread__create . If the attribute for the thread is defined as detached, then the thread can t be joined (because it s detached from the creating thread).

To join with a thread, we must have the thread s identifier, which is retrieved from the pthread_create function. Let s look at a complete example (see Listing 14.5).

In this example, permit the creation of five distinct threads by calling pthread__create within a loop (lines 18 “23) and storing the resulting thread identifiers in a pthread_t array (line 16). Once the threads are created, we begin the join process, again in a loop (lines 25 “32). The pthread_join returns zero on success, and upon success, the status variable is emitted (note that this value is returned at line 8 within the thread itself).

Listing 14.5: Joining Threads with pthread_join (on the CD-ROM at ./source/ch14/_ptjoin.c )
start example
  1:  #include <pthread.h>  2:  #include <stdio.h>  3:   4:  void *myThread(void *arg)  5:  {  6:  printf("Thread %d started\n", (int)arg);  7:   8:   pthread_exit  (arg);  9:  }  10:   11:  #define MAX_THREADS     5  12:   13:  int main()  14:  {  15:  int ret, i, status;  16:  pthread_t threadIds[MAX_THREADS];  17:   18:  for (i = 0 ; i < MAX_THREADS ; i++) {  19:  ret =  pthread_create  (&threadIds[i], NULL, myThread, _(void *)i);  20:  if (ret != 0) {  21:  printf("Error creating thread %d\n", (int)threadIds[i]);  22:  }  23:  }  24:   25:  for (i = 0 ; i < MAX_THREADS ; i++) {  26:  ret =  pthread_join  (threadIds[i], (void **)&status);  27:  if (ret != 0) {  28:  printf("Error joining thread %d\n", (int)threadIds[i]);  29:  } else {  30:  printf("Status = %d\n", status);  31:  }  32:  }  33:   34:  return 0;  35:  } 
end example
 

The pthread_join function suspends the caller until the requested thread has been joined. In many cases, we simply don t care about the thread once it s created. In these cases, we can identify this by detaching the thread. The creator or the thread itself can detach itself. We can also specify that the thread is detached when we create the thread (as part of the attributes). Once a thread is detached, it can never be joined. The pthread_detach function has the following prototype:

 int  pthread_detach  (pthread_t th); 

Let s now look at the process of detaching the thread within the thread itself (see Listing 14.6). Recall that a thread can identify its own identifier by calling thread_self .

Listing 14.6: Detaching a Thread from Within with pthread_detach
start example
  1:  void *myThread(void *arg)  2:  {  3:  printf("Thread %d started\n", (int)arg);  4:   5:   pthread_detach  (pthread_self());  6:   7:   pthread_exit  (arg);  8:  } 
end example
 

At line 5, we simply call pthread_detach , specifying the thread identifier by calling pthread_self . When this thread exits, all resources are immediately freed (as it s detached and will never be joined by another thread). The pthread_detach function returns zero on success, nonzero if an error occurs.

Note  

GNU/Linux automatically places a newly created thread into the joinable state. This is not the case in other implementations , which can default to detached.

Thread Mutexes

A mutex is a variable that permits threads to implement critical sections. These sections enforce exclusive access to variables by threads, which if left unprotected would result in data corruption. This topic is discussed in detail in Chapter 16, Synchronization with Semaphores.

Let s start by reviewing the mutex API, and then we ll illustrate the problem being solved . To create a mutex, we simply declare a variable that represents our mutex and initialize it with a special symbolic constant. The mutex is of type pthread_mutex_t and demonstrated as:

 pthread_mutex_t myMutex = PTHREAD_MUTEX_INITIALIZER 

As shown here, the initialization makes this mutex a fast mutex. The mutex initializer can actually be of one of three types, as shown in Table 14.1.

Table 14.1: Mutex Initializers

Type

Description

PTHREAD_MUTEX_INITIALIZER

Fast Mutex

PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP

Recursive Mutex

PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP

Error-checking Mutex

The recursive mutex is a special mutex that allows the mutex to be locked several times (without blocking), as long as it s locked by the same thread. Even though the mutex can be locked multiple times without blocking, the thread must unlock the mutex the same number of times that it was locked. The error-checking mutex can be used to help find errors when debugging. Note that the _NP suffix for recursive and error-checking mutexes indicates that it s not portable.

Now that we have a mutex, we can lock and unlock it to create our critical section. This is done with the pthread_mutex_lock and pthread_mutex_unlock API functions. Another function called pthread_mutex_trylock can be used to try to lock a mutex, but it won t block if the mutex is already locked. Finally, we can destroy an existing mutex using pthread_mutex_destroy . These have the prototype:

 int  pthread_mutex_lock  (pthread_mutex_t *mutex);     int  pthread_mutex_trylock  (pthread_mutex_t *mutex);     int  pthread_mutex_unlock  (pthread_mutex_t *mutex);     int  pthread_mutex_destroy  (pthread_mutex_t *mutex); 

All functions return zero on success or a nonzero error code. All errors returned from pthread_mutex_lock and pthread_mutex_unlock are assertable (not recoverable). Therefore, we ll use the return of these functions to abort our program.

Locking a thread is the means by which we enter a critical section. Once our mutex is locked, we can safely enter the section without having to worry about data corruption or multiple access. To exit our critical section, we unlock the semaphore and we re done. The following code snippet illustrates a simple critical section:

 pthread_mutex_t cntr_mutex = PTHREAD_MUTEX_INITIALIZER;     ...     assert(  pthread_mutex_lock  (&cntr_mutex) == 0);     /* Critical Section */     /* Increment protected counter */     counter++;     /* Critical Section */       assert(  pthread_mutex_unlock  (&cntr_mutex) == 0); 
Note  

A critical section is a section of code that can be executed by at most one process at a time. The critical section exists to protect shared resources from multiple access.

The pthread_mutex_trylock operates under the assumption that if we can t lock our mutex, there s something else that we should do instead of blocking on the pthread_mutex_lock call. This call is demonstrated as:

 ret =  pthread_mutex_trylock  (&cntr_mutex);     if (ret == EBUSY) {       /* Couldnt lock, do something else */     } else if (ret == EINVAL) {       /* Critical error */       assert(0);     } else {       /* Critical Section */       ret =  thread_mutex_unlock  (&cntr_mutex);     } 

Finally, to destroy our mutex, we simply provide it to the pthread_mutex__destroy function. The pthread_mutex_destroy function will succeed only if no thread currently has the mutex locked. If the mutex is locked, the function will fail and return the EBUSY error code. The pthread_mutex_destroy call is demonstrated with the following snippet:

 ret =  pthread_mutex_destroy  (&cntr_mutex);     if (ret == EBUSY) {       /* Mutex is locked, cant destroy */     } else {       /* Mutex was destroyed */     } 

Let s now look at an example that ties these functions together to illustrate why mutexes are important in multithreaded applications. We ll build on our previous applications that provide a basic infrastructure for task creation and joining. Consider the example in Listing 14.7. At line 4, we create our mutex and initialize it as a fast mutex. In our thread, our job is to increment the protVariable counter some number of times. This occurs for each thread (here we create 10), so we ll need to protect the variable from multiple access. We place our variable increment within a critical section by first locking the mutex and then, after incrementing the protected variable, unlocking it. This ensures that each task has sole access to the resource when the increment is performed and protects it from corruption. Finally, at line 52, we destroy our mutex using the pthread_mutex_destroy API function.

Listing 14.7: Protecting a Variable in a Critical Section with Mutexes (on the CD-ROM at ./source/ch14/ptmutex.c )
start example
  1:  #include <pthread.h>  2:  #include <stdio.h>  3:   4:  pthread_mutex_t cntr_mutex = PTHREAD_MUTEX_INITIALIZER;  5:   6:  long protVariable = 0L;  7:   8:  void *myThread(void *arg)  9:  {  10:  int i, ret;  11:   12:  for (i = 0 ; i < 10000 ; i++) {  13:   14:  ret =  pthread_mutex_lock  (&cntr_mutex);  15:   16:  assert(ret == 0);  17:   18:  protVariable++;  19:   20:  ret =  pthread_mutex_unlock  (&cntr_mutex);  21:   22:  assert(ret == 0);  23:   24:  }  25:   26:  pthread_exit(NULL);  27:  }  28:   29:  #define MAX_THREADS     10  30:   31:  int main()  32:  {  33:  int ret, i;  34:  pthread_t threadIds[MAX_THREADS];  35:   36:  for (i = 0 ; i < MAX_THREADS ; i++) {  37:  ret =  pthread_create  (&threadIds[i], NULL, myThread, NULL);  38:  if (ret != 0) {  39:  printf("Error creating thread %d\n", (int)threadIds[i]);  40:  }  41:  }  42:   43:  for (i = 0 ; i < MAX_THREADS ; i++) {  44:  ret =  pthread_join  (threadIds[i], NULL);  45:  if (ret != 0) {  46:  printf("Error joining thread %d\n", (int)threadIds[i]);  47:  }  48:  }  49:   50:  printf("The protected variable value is %ld\n", protVariable);  51:   52:  ret =  pthread_mutex_destroy  (&cntr_mutex);  53:   54:  if (ret != 0) {  55:  printf("Couldnt destroy the mutex\n");  56:  }  57:   58:  return 0;  59:  } 
end example
 

When using mutexes, it s important to minimize the amount of work done in the critical section to what really needs to be done. Since other threads will block until a mutex is unlocked, minimizing the critical section time can lead to better performance.

Thread Condition Variables

Now that we have mutexes out of the way, let s explore condition variables. A condition variable is a special thread construct that allows a thread to wake up another thread based upon a condition. While mutexes provide a simple form of synchronization (based upon the lock status of the mutex), condition variables are a means for one thread to wait for an event and another to signal it that the event has occurred. An event can mean anything here. A thread blocks on a mutex but can wait on any condition variable. Think of them as wait queues, which is exactly what the implementation does in GNU/Linux.

Consider this problem of a thread awaiting a particular condition being met. With only mutexes, the thread would have to poll to acquire the mutex, check the condition, and then release the mutex if there was no work to do (the condition wasn t met). That kind of busy looping can lead to poor performing applications and should therefore be avoided.

The pthreads API provides a number of functions supporting condition variables. These functions provide condition variable creation, waiting, signaling, and destruction. The condition variable API functions are presented below:

 int  pthread_cond_wait  (pthread_cond_t *cond,                            pthread_mutex_t *mutex);     int  pthread_cond_timedwait  (pthread_cond_t *cond,                                 pthread_mutex_t *mutex,                                 const struct timespec *abstime);     int  pthread_cond_signal  (pthread_cond_t *cond);     int  pthread_cond_broadcast  (pthread_cond_t *cond);     int  pthread_cond_destroy  (pthread_cond_t *cond); 

To create a condition variable, we simply create a variable of type pthread_cond_t . We initialize this by setting it to PTHREAD_COND_INITIALIZER (similar to mutex creation and initialization). This is demonstrated as:

 pthread_cond_t recoveryCond = PTHREAD_COND_INITIALIZER; 

Condition variables require the existence of a mutex that is associated with them, which we create as before:

 pthread_mutex_t recoveryMutex = PTHREAD_MUTEX_INITIALIZER; 

Now let s look at a thread awaiting a condition. In this example, let s say we have a thread whose job is to warn of overload conditions. Work comes in on a queue, with an accompanying counter identifying the amount of work to do. When the amount of work exceeds a certain value ( MAX_NORMAL_WORKLOAD ), then our thread should wake up and perform a recovery. Our fault thread for synchronizing with the alert thread is illustrated as:

 /* Fault Recovery Thread Loop */     while (1) {       assert(  pthread_mutex_lock  (&recoveryMutex) == 0);       while (workload < MAX_NORMAL_WORKLOAD) {  pthread_cond_wait  (&recoveryCond, &recoveryMutex);       }       /**/       /* Recovery Code. */       /**/       assert(  pthread_mutex_unlock  (&recoveryMutex) == 0);     } 

This is the standard pattern when dealing with condition variables. We start by locking the mutex, entering pthread_cond_wait , and upon waking up from our condition, unlocking the mutex. The mutex must be locked first because upon entry to pthread_cond_wait , the mutex is automatically unlocked. When we return from pthread_cond_wait , the mutex has been reacquired, meaning that we ll need to unlock it afterward. The mutex is necessary here to handle race conditions that exist in this call sequence. To ensure that our condition is met, we loop around the pthread_cond_wait , and if the condition is not satisfied (in this case, our workload is normal), then we reenter the pthread_cond_wait call. Note that since the mutex is locked upon return from pthread_cond_wait , we don t need to call pthread__mutex_lock here.

Now let s look at the signal code. This is considerably simpler than that code necessary to wait for the condition. Two possibilities exist for signaling: sending a single signal, or broadcasting to all waiting threads.

The first case is signaling one thread. In either case, we first lock the mutex before calling the signal function and then unlock when we re done. To signal one thread, we call the pthread_cond_signal function, as:

  pthread_mutex_lock  (&recoveryMutex);  pthread_cond_signal  (&recoveryCond);  pthread_mutex_unlock  (&recovery_Mutex); 

Once the mutex is unlocked, exactly one thread is signaled and allowed to execute. Each function returns zero on success or an error code. If our architecture supports multiple threads for recovery, we could instead use the pthread_cond__broadcast . This function awakes all threads currently awaiting the condition. This is demonstrated as:

  pthread_mutex_lock  (&recoveryMutex);  pthread_cond_broadcast  (&recoveryCond);  pthread_mutex_unlock  (&recovery_Mutex); 

Once the mutex is unlocked, the series of threads is then permitted to perform recovery (though one by one since they re dependent upon the mutex).

The pthreads API also supports a version of timed-wait for a condition variable. This function, pthread_cond_timedwait , allows the caller to specify an absolute time representing when to give up and return to the caller. The return value will be ETIMEDOUT , to indicate that the function returned because of a time-out rather than a successful return. The following code snippet illustrates its use:

 struct timeval currentTime;     struct timespec  expireTime;     int ret;     ...     assert(  pthread_mutex_lock  (&recoveryMutex) == 0);     gettimeofday(&currentTime);     expireTime.tv_sec = currentTime.tv_sec + 1;     expireTime.tv_nsec = currentTime.tv_usec * 1000;     ret = 0;     while ((workload < MAX_NORMAL_WORKLOAD) && (ret != ETIMEDOUT) {       ret =  pthread_cond_timedwait  (&recoveryCond, &recoveryMutex,                      &expireTime);     }     if (ret == ETIMEDOUT) {       /* Timeout  perform timeout processing */     } else {       /* Condition met  perform condition recovery processing */     }     assert(  pthread_mutex_unlock  (&recoveryMutex) == 0); 

The first item to note is the generation of a timeout. We use the gettimeofday function to get the current time and then add one second to it in the timespec structure. This will be passed to pthread_cond_timedwait to identify the time at which we desire a timeout if the condition has not been met. In this case, which is very similar to the standard pthread_cond_wait example, we check in our loop that the pthread_cond_timedwait function has not returned ETIMEDOUT . If it has, we exit our loop and then check again to perform timeout processing. Otherwise, we perform our standard condition processing (recovery for this example) and then reacquire the mutex.

The final function to note here is pthread_cond_destroy . We simply pass the condition variable to the function, as:

  pthread_mutex_destroy  (&recoveryCond); 

It s important to note that in the GNU/Linux implementation, no resources are actually attached to the condition variable, so this function simply checks to see if any threads are currently pending on the condition variable.

Let s now look at a complete example that brings together all of the elements discussed above for condition variables. In this example, we ll illustrate condition variables in the context of producers and consumers. We ll create a producer thread that creates work and then N consumer threads that operate on the (simulated) work.

Our first listing (Listing 14.8) shows the main program. This listing is similar to our previous examples of creating and then joining threads, with a few changes. We create two types of threads in this listing. At lines 18 “21, we create a number of consumer threads, and at line 24, we create a single producer thread. We ll look at these shortly. After creation of the last thread, we join the producer thread (resulting in a suspend of the main application until it has completed). We then wait for the work to complete (as identified by a simple counter, workCount ). We want to allow the consumer threads to complete their work, so we wait until this variable is zero, indicating that all work is consumed.

The block of code at lines 33 “36 shows joins for the consumer threads, with one interesting change. In this example, the consumer threads never quit, so we cancel them here using the pthread_cancel function. This function has the prototype:

 int  pthread_cancel  (pthread_t thread); 

This permits us to terminate another thread when we re done with it. In this example, we ve produced the work that we need the consumers to work on, so we cancel each thread in turn (line 34). Finally, we destroy our condition variable and mutex at lines 37 and 38, respectively.

Listing 14.8    : Producer/Consumer Example Initialization and main (on the CD-ROM at ./source/ch14/ptcond.c )
start example
  1:  #include <pthread.h>  2:  #include <stdio.h>  3:   4:  pthread_mutex_t cond_mutex = PTHREAD_MUTEX_INITIALIZER;  5:  pthread_cond_t condition = PTHREAD_COND_INITIALIZER;  6:   7:  int workCount = 0;  8:   9:  #define MAX_CONSUMERS   10  10:   11:  int main()  12:  {  13:  int i;  14:  pthread_t consumers[MAX_CONSUMERS];  15:  pthread_t producer;  16:   17:  /* Spawn the consumer thread */  18:  for (i = 0 ; i < MAX_CONSUMERS ; i++) {  19:   pthread_create  (&consumers[i], NULL,  20:  consumerThread, NULL);  21:  }  22:   23:  /* Spawn the single producer thread */  24:   pthread_create  (&producer, NULL,  25:  producerThread, NULL);  26:   27:  /* Wait for the producer thread */  28:   pthread_join  (producer, NULL);  29:   30:  while ((workCount > 0));  31:   32:  /* Cancel and join the consumer threads */  33:  for (i = 0 ; i < MAX_CONSUMERS ; i++) {  34:   pthread_cancel  (consumers[i]);  35:  }  36:   37:   pthread_mutex_destroy  (&cond_mutex);  38:   pthread_cond_destroy  (&condition);  39:   40:  return 0;  41:  } 
end example
 

Next, let s look at the producer thread function (Listing 14.9). The purpose of the producer thread is to produce work, simulated by incrementing the workCount variable. A nonzero workCount indicates that work is available to do. We loop for a number of times to create work, as is shown at lines 8 “22. As shown in the condition variable sample, we first lock our mutex at line 10 and then create work to do (increment workCount ). We then notify the awaiting consumer (worker) threads at line 14 using the pthread_cond_broadcast function. This will notify any awaiting consumer threads that work is now available to do. Next, at line 15, we unlock the mutex, allowing the consumer threads to lock the mutex and perform their work.

At lines 20 “22, we simply do some busy work to allow the kernel to schedule another task (thereby avoiding synchronous behavior, for illustration purposes).

When all of the work has been produced, we permit the producer thread to exit (which will be joined in our main function at line 28 of Listing 14.8).

Listing 14.9: Producer Thread Example for Condition Variables (on the CD-ROM at ./source/ch14/ptcond.c )
start example
  1:  void *producerThread(void *arg)  2:  {  3:  int i, j, ret;  4:  double result=0.0;  5:   6:  printf("Producer started\n");  7:   8:  for (i = 0 ; i < 30 ; i++) {  9:   10:  ret =  pthread_mutex_lock  (&cond_mutex);  11:  if (ret == 0) {  12:  printf("Producer: Creating work (%d)\n", workCount);  13:  workCount++;  14:   pthread_cond_broadcast  (&condition);  15:   pthread_mutex_unlock  (&cond_mutex);  16:  } else {  17:  assert(0);  18:  }  19:   20:  for (j = 0 ; j < 60000 ; j++) {  21:  result = result + (double)random();  22:  }  23:   24:  }  25:   26:  printf("Producer finished\n");  27:   28:   pthread_exit  (NULL);  29:  } 
end example
 

Now let s look at the consumer thread (see Listing 14.10). Our first task is to detach ourselves (line 5), since we won t ever join with the creating thread. Then we go into our work loop (lines 9 “22) to process the workload. We first lock the condition mutex at line 11 and then wait for the condition to occur at line 12. We then check to make sure that the condition is true (there s work to do) at line 14. Note that since we re broadcasting to threads, we may not have work to do for every thread, so we test before we assume that work is available.

Once we ve completed our work (in this case, simply decrementing the work count at line 15), we release the mutex at line 19 and wait again for work at line 11. Note that since we cancel our thread, we ll never see the printf at line 23, nor will we exit the thread at line 25. The pthread_cancel function terminates the thread so that the thread does not terminate normally.

Listing 14.10: Consumer Thread Example for Condition Variables (on the CD-ROM at ./source/ch14/ptcond.c )
start example
  1:  void *consumerThread(void *arg)  2:  {  3:  int ret;  4:   5:   pthread_detach  (pthread_self());  6:   7:  printf("Consumer %x: Started\n",  pthread_self  ());  8:   9:  while(1) {  10:   11:  assert(  pthread_mutex_lock  (&cond_mutex) == 0);  12:  assert(  pthread_cond_wait  (&condition, &cond_mutex) == 0);  13:   14:  if (workCount) {  15:  workCount;  16:  printf("Consumer %x: Performed work (%d)\n",  17:   pthread_self  (), workCount);  18:  }  19:  assert(  pthread_mutex_unlock  (&cond_mutex) == 0);  20:   21:  }  22:   23:  printf("Consumer %x: Finished\n",  pthread_self  ());  24:   25:   pthread_exit  (NULL);  26:  } 
end example
 

Let s look at this application in action. For brevity, we ll show only the first 30 lines emitted, but this will give you a good indication of how the application behaves (see Listing 14.11). We can see the consumer threads starting up, the producer starting, and then work being created and consumed in turn.

Listing 14.11: Application Output for Condition Variable Application
start example
 # ./ptcond     Consumer 4082cd40: Started     Consumer 4102ccc0: Started     Consumer 4182cc40: Started     Consumer 42932bc0: Started     Consumer 43132b40: Started     Consumer 43932ac0: Started     Consumer 44132a40: Started     Consumer 449329c0: Started     Consumer 45132940: Started     Consumer 459328c0: Started     Producer started     Producer: Creating work (0)     Producer: Creating work (1)     Consumer 4082cd40: Performed work (1)     Consumer 4102ccc0: Performed work (0)     Producer: Creating work (0)     Consumer 4082cd40: Performed work (0)     Producer: Creating work (0)     Producer: Creating work (1)     Producer: Creating work (2)     Producer: Creating work (3)     Producer: Creating work (4)     Producer: Creating work (5)     Consumer 4082cd40: Performed work (5)     Consumer 4102ccc0: Performed work (4)     Consumer 4182cc40: Performed work (3)     Consumer 42932bc0: Performed work (2)     Consumer 43132b40: Performed work (1)     Consumer 43932ac0: Performed work (0)     Producer: Creating work (0) 
end example
 
Note  

The design of multithreaded applications follows a small number of patterns (or models). The master/servant model is common where a single master doles out work to a collection of servants. The pipeline model splits work up into stages where one or more threads make up each of the work phases.




GNU/Linux Application Programming
GNU/Linux Application Programming (Programming Series)
ISBN: 1584505680
EAN: 2147483647
Year: 2006
Pages: 203
Authors: M. Tim Jones

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net