Mutex Kernel Objects

[Previous] [Next]

Mutex kernel objects ensure that a thread has mutual exclusive access to a single resource. In fact, this is how the mutex got its name. A mutex object contains a usage count, a thread ID, and a recursion counter. Mutexes behave identically to critical sections, but mutexes are kernel objects, while critical sections are user-mode objects. This means that mutexes are slower than critical sections. But it also means that threads in different processes can access a single mutex, and it means that a thread can specify a timeout value while waiting to gain access to a resource.

The thread ID identifies which thread in the system currently owns the mutex, and the recursion counter indicates the number of times that this thread owns the mutex. Mutexes have many uses and are among the most frequently used kernel objects. Typically, they are used to guard a block of memory that is accessed by multiple threads. If multiple threads were to access the memory block simultaneously, the data in the block would be corrupted. Mutexes ensure that any thread accessing the memory block has exclusive access to the block so that the integrity of the data is maintained.

The rules for a mutex are as follows:

  • If the thread ID is 0 (an invalid thread ID), the mutex is not owned by any thread and is signaled.
  • If the thread ID is nonzero, a thread owns the mutex and the mutex is nonsignaled.
  • Unlike all the other kernel objects, mutexes have special code in the operating system that allows them to violate the normal rules. (I'll explain this exception shortly.)

To use a mutex, one process must first create the mutex by calling CreateMutex:

 HANDLE CreateMutex( PSECURITY_ATTRIBUTES psa, BOOL fInitialOwner, PCTSTR pszName); 

The psa and pszName parameters are discussed in Chapter 3. Of course, another process can obtain its own process relative handle to an existing mutex by calling OpenMutex:

 HANDLE OpenMutex( DWORD fdwAccess, BOOL bInheritHandle, PCTSTR pszName); 

The fInitialOwner parameter controls the initial state of the mutex. If you pass FALSE (the usual case), both the mutex object's thread ID and recursion counter are set to 0. This means that the mutex is unowned and is therefore signaled.

If you pass TRUE for fInitialOwner, the object's thread ID is set to the calling thread's ID and the recursion counter is set to 1. Since the thread ID is nonzero, the mutex is initially nonsignaled.

A thread gains access to the shared resource by calling a wait function, passing the handle of the mutex guarding the resource. Internally, the wait function checks the thread ID to see if it is 0 (the mutex is signaled). If the thread ID is 0, the thread ID is set to the calling thread's ID, the recursion counter is set to 1, and the calling thread remains schedulable.

If the wait function detects that the thread ID is not 0 (the mutex is nonsignaled), the calling thread enters a wait state. The system remembers this and when the mutex's thread ID is set back to 0, the system sets the thread ID to the waiting thread's ID, sets the recursion counter to 1, and allows the waiting thread to be schedulable again. As always, these checks and changes to the mutex kernel object are performed atomically.

For mutexes, there is one special exception to the normal kernel object signaled/nonsignaled rules. Let's say that a thread attempts to wait on a nonsignaled mutex object. In this case, the thread is usually placed in a wait state. However, the system checks to see whether the thread attempting to acquire the mutex has the same thread ID as recorded inside the mutex object. If the thread IDs match, the system allows the thread to remain schedulable—even though the mutex was nonsignaled. We don't see this "exceptional" behavior applied to any other kernel object anywhere in the system. Every time a thread successfully waits on a mutex, the object's recursion counter is incremented. The only way the recursion counter can have a value greater than 1 is if the thread waits on the same mutex multiple times, taking advantage of this rule exception.

Once a thread has successfully waited on a mutex, the thread knows that it has exclusive access to the protected resource. Any other threads that attempt to gain access to the resource (by waiting on the same mutex) are placed in a wait state. When the thread that currently has access to the resource no longer needs its access, it must release the mutex by calling the ReleaseMutex function:

 BOOL ReleaseMutex(HANDLE hMutex); 

This function decrements the object's recursion counter by 1. If a thread successfully waits on a mutex object multiple times, that thread has to call ReleaseMutex the same number of times before the object's recursion counter becomes 0. When the recursion counter hits 0, the thread ID is also set to 0 and the object becomes signaled.

When the object becomes signaled, the system checks to see whether any other threads are waiting on the mutex. If so, the system "fairly" selects one of the waiting threads and gives it ownership of the mutex. This means, of course, that the thread ID is set to the selected thread's ID and the recursion counter is set to 1. If no other thread is waiting on the mutex, the mutex stays in the signaled state so that the next thread that waits on the mutex immediately gets it.

Abandonment Issues

Mutex objects are different from all other kernel objects because they have a notion of "thread ownership." None of the other kernel objects that we've discussed in this chapter remembers which thread successfully waited on it; only mutexes keep track of this. This thread ownership concept for mutexes is the reason why mutexes have the special rule exception that allows a thread to acquire the mutex even when it is nonsignaled.

This exception applies not only to a thread that is attempting to acquire a mutex, it also applies to threads attempting to release a mutex. When a thread calls ReleaseMutex, the function checks to see whether the calling thread's ID matches the thread ID in the mutex object. If the IDs match, the recursion counter is decremented as described earlier. If the thread IDs don't match, ReleaseMutex does nothing and returns FALSE (indicating failure) back to the caller. Making a call to GetLastError at this time will return ERROR_NOT_OWNER (attempt to release mutex not owned by caller).

So if a thread owning a mutex terminates (using ExitThread, TerminateThread, ExitProcess, or TerminateProcess) before releasing the mutex, what happens to the mutex and the other threads that are waiting on it? The answer is that the system considers the mutex to be abandoned—the thread that owns it can never release it because the thread has died.

Because the system keeps track of all mutex and thread kernel objects, it knows exactly when mutexes become abandoned. When a mutex becomes abandoned, the system automatically resets the mutex object's thread ID to 0 and its recursion counter to 0. Then the system checks to see whether any threads are currently waiting for the mutex. If so, the system "fairly" selects a waiting thread, sets the thread ID to the selected thread's ID, and sets the recursion counter to 1; the selected thread becomes schedulable.

This is the same as before except that the wait function does not return the usual WAIT_OBJECT_0 value to the thread. Instead, the wait function returns the special value of WAIT_ABANDONED. This special return value (which applies only to mutex objects) indicates that the mutex the thread was waiting on was owned by another thread that was terminated before it finished using the shared resource. This is not the best situation to be in. The newly scheduled thread has no idea what state the resource is currently in—the resource might be totally corrupt. You have to decide for yourself what your application should do in this case.

In real life, most applications never check explicitly for the WAIT_ABANDONED return value because a thread is rarely just terminated. (This whole discussion provides another great example of why you should never call the TerminateThread function.)

Mutexes vs. Critical Sections

Mutexes and critical section have identical semantics with respect to scheduling waiting threads. However, they differ in some of their other attributes. The following table compares them.

Characteristic Mutex Critical Section
Performance Slow Fast
Can be used across process boundaries Yes No
Declaration HANDLE hmtx; CRITICAL_SECTION cs;
Initialization hmtx= CreateMutex (NULL, FALSE, NULL); InitializeCriticalSection(&cs);
Cleanup CloseHandle(hmtx); DeleteCriticalSection(&cs);
Infinite wait WaitForSingleObject (hmtx, INFINITE); EnterCriticalSection(&cs);
0 wait WaitForSingleObject (hmtx, 0); TryEnterCriticalSection(&cs);
Arbitrary wait WaitForSingleObject (hmtx, dwMilliseconds); Not possible
Release ReleaseMutex(hmtx); LeaveCriticalSection(&cs);
Can be waited on with other kernel objects Yes (use WaitForMultipleObjects or similar function) No

The Queue Sample Application

The Queue ("09 Queue.exe") application, listed in Figure 9-2, uses a mutex and a semaphore to control a queue of data elements. The source code and resource files for the application are in the 09-Queue directory on the companion CD-ROM. When you run Queue, the following dialog box appears.

click to view at full size.

When Queue initializes, it creates four client threads and two server threads. Each client thread sleeps for some period of time and then appends a request element to a queue. As each element is queued, the Client Threads list box is updated. Each entry indicates which client thread appended the entry and which entry it was. For example, the first entry in the list box indicates that client thread 0 appended its first request. Then client threads 1 through 3 appended their first request, followed by client thread 0 appending its second request, and so on.

The server threads have nothing to do until at least one element appears in the queue. When an element appears, a single server thread wakes up to process the request. The Server Threads list box shows the status of the server threads. The first entry shows that server thread 0 is processing a request from client thread 0. The request being processed is the client thread's first request. The second entry shows server thread 1 processing client thread 1's first request, and so on.

In this example, the server threads cannot process the client's requests quickly enough and the queue fills to maximum capacity. I initialize the queue data structure so it can hold no more than 10 elements at a single time; this causes the queue to fill quickly. Plus, there are four client threads and only two server threads. We see that the queue is full when client thread 3 attempts to append its fifth request to the queue.

OK, so that's what you see; what's more interesting is how it works. The queue is managed and controlled by a C++ class, CQueue:

 class CQueue { public: struct ELEMENT { int m_nThreadNum, m_nRequestNum; // Other element data should go here. }; typedef ELEMENT* PELEMENT; private: PELEMENT m_pElements; // Array of elements to be processed int m_nMaxElements; // # of elements in the array HANDLE m_h[2]; // Mutex & semaphore handles HANDLE &m_hmtxQ; // Reference to m_h[0] HANDLE &m_hsemNumElements; // Reference to m_h[1] public: CQueue(int nMaxElements); ~CQueue(); BOOL Append(PELEMENT pElement, DWORD dwMilliseconds); BOOL Remove(PELEMENT pElement, DWORD dwMilliseconds); }; 

The public ELEMENT structure inside this class defines what a queue data element looks like. The actual content is not particularly important. For this sample application, clients place their client thread number and their request number in this element so that the servers can display this information in their list box when they process the retrieved element. A real-life application would generally not require this information.

For the private members, we have m_pElements, which points to a fixed-size array of ELEMENT structures. This is the data that needs protecting from the multiple client/server threads. The m_nMaxElements member indicates how large this array is initialized to when the CQueue object is constructed. The next member, m_h, is an array of two kernel object handles. To properly protect the queue's data elements, you need two kernel objects: a mutex and a semaphore. In the CQueue constructor, these two objects are created and their handles are placed in this array.

As you'll see shortly, the code sometimes calls WaitForMultipleObjects, passing the address to the handle array. You'll also see that sometimes the code needs to refer to just one of these kernel object handles. To make the code more readable and maintainable, I also declare two handle reference members, m_hmtxQ and m_hsemNumElements. When the CQueue constructor executes, it initializes these handle reference members to m_h[0] and m_h[1], respectively.

You should now have no trouble understanding CQueue's constructor and destructor methods, so let's turn our attention to the Append method. This method attempts to append an ELEMENT to the queue. But first, the thread must make sure that it has exclusive access to the queue. The Append method does this by calling WaitForSingleObject, passing the handle of the m_hmtxQ mutex. If WAIT_OBJECT_0 is returned, the thread has exclusive access to the queue.

Next, the Append method must attempt to increment the number of elements in the queue by calling ReleaseSemaphore and passing a release count of 1. If ReleaseSemaphore is successful, the queue is not full and the new element can be appended. Fortunately, ReleaseSemaphore also returns the previous count of queue elements in the lPreviousCount variable. This tells you exactly which array index the new element should be placed in. After copying the element into the queue's array, the function returns. Once the element is completely appended to the queue, Append calls ReleaseMutex so that other threads can access the queue. The remaining parts of the Append function have to do with failure cases and error handling.

Now let's look at how a server thread calls the Remove method to extract an element from the queue. First, the thread must make sure that it has exclusive access to the queue, and the queue must have at least one element in it. Certainly, a server thread has no reason to wake if no elements are in the queue. So the Remove method first calls WaitForMultipleObjects, passing both the mutex and the semaphore's handles. Only when both of these objects are signaled should a server thread wake up.

If WAIT_OBJECT_0 is returned, the thread has exclusive access to the queue and at least one element must be in the queue. At this point, the code extracts the element at index 0 in the array and then shifts the remaining elements in the array down one. This is not the most efficient way to implement a queue because memory copies like this are expensive, but our purpose here is to demonstrate thread synchronization. Finally, ReleaseMutex is called so that other threads can safely access the queue.

Note that the semaphore object keeps track of how many elements are in the queue at any given time. You can see how this number is incremented: the Append method calls ReleaseSemaphore when a new element is appended to the queue. But you don't immediately see how this count is decremented when an element is removed from the queue. The decrementing is done by the Remove method's call to WaitForMultipleObjects. Remember that the side effect of successfully waiting on a semaphore is that its count is decremented by one. This is very convenient for us.

Now that you understand how the CQueue class works, the rest of the source code is easy to understand.

Figure 9-2. The Queue sample application

Queue.cpp

 /****************************************************************************** Module: Queue.cpp Notices: Copyright (c) 2000 Jeffrey Richter ******************************************************************************/ #include "..\CmnHdr.h" /* See Appendix A. */ #include <windowsx.h> #include <tchar.h> #include <process.h> // For _beginthreadex #include "Resource.h" /////////////////////////////////////////////////////////////////////////////// class CQueue { public: struct ELEMENT { int m_nThreadNum, m_nRequestNum; // Other element data should go here }; typedef ELEMENT* PELEMENT; private: PELEMENT m_pElements; // Array of elements to be processed int m_nMaxElements; // Maximum # of elements in the array HANDLE m_h[2]; // Mutex & semaphore handles HANDLE &m_hmtxQ; // Reference to m_h[0] HANDLE &m_hsemNumElements; // Reference to m_h[1] public: CQueue(int nMaxElements); ~CQueue(); BOOL Append(PELEMENT pElement, DWORD dwMilliseconds); BOOL Remove(PELEMENT pElement, DWORD dwMilliseconds); }; /////////////////////////////////////////////////////////////////////////////// CQueue::CQueue(int nMaxElements) : m_hmtxQ(m_h[0]), m_hsemNumElements(m_h[1]) { m_pElements = (PELEMENT) HeapAlloc(GetProcessHeap(), 0, sizeof(ELEMENT) * nMaxElements); m_nMaxElements = nMaxElements; m_hmtxQ = CreateMutex(NULL, FALSE, NULL); m_hsemNumElements = CreateSemaphore(NULL, 0, nMaxElements, NULL); } /////////////////////////////////////////////////////////////////////////////// CQueue::~CQueue() { CloseHandle(m_hsemNumElements); CloseHandle(m_hmtxQ); HeapFree(GetProcessHeap(), 0, m_pElements); } /////////////////////////////////////////////////////////////////////////////// BOOL CQueue::Append(PELEMENT pElement, DWORD dwTimeout) { BOOL fOk = FALSE; DWORD dw = WaitForSingleObject(m_hmtxQ, dwTimeout); if (dw == WAIT_OBJECT_0) { // This thread has exclusive access to the queue // Increment the number of elements in the queue LONG lPrevCount; fOk = ReleaseSemaphore(m_hsemNumElements, 1, &lPrevCount); if (fOk) { // The queue is not full; append the new element m_pElements[lPrevCount] = *pElement; } else { // The queue is full; set the error code and return failure SetLastError(ERROR_DATABASE_FULL); } // Allow other threads to access the queue ReleaseMutex(m_hmtxQ); } else { // Timeout, set error code and return failure SetLastError(ERROR_TIMEOUT); } return(fOk); // Call GetLastError for more info } /////////////////////////////////////////////////////////////////////////////// BOOL CQueue::Remove(PELEMENT pElement, DWORD dwTimeout) { // Wait for exclusive access to queue and for queue to have element. BOOL fOk = (WaitForMultipleObjects(chDIMOF(m_h), m_h, TRUE, dwTimeout) == WAIT_OBJECT_0); if (fOk) { // The queue has an element; pull it from the queue *pElement = m_pElements[0]; // Shift the remaining elements down MoveMemory(&m_pElements[0], &m_pElements[1], sizeof(ELEMENT) * (m_nMaxElements - 1)); // Allow other threads to access the queue ReleaseMutex(m_hmtxQ); } else { // Timeout, set error code and return failure SetLastError(ERROR_TIMEOUT); } return(fOk); // Call GetLastError for more info } /////////////////////////////////////////////////////////////////////////////// CQueue g_q(10); // The shared queue volatile BOOL g_fShutdown = FALSE; // Signals client/server threads to die HWND g_hwnd; // How client/server threads give status // Handles to all client/server threads & number of client/server threads HANDLE g_hThreads[MAXIMUM_WAIT_OBJECTS]; int g_nNumThreads = 0; /////////////////////////////////////////////////////////////////////////////// DWORD WINAPI ClientThread(PVOID pvParam) { int nThreadNum = PtrToUlong(pvParam); HWND hwndLB = GetDlgItem(g_hwnd, IDC_CLIENTS); for (int nRequestNum = 1; !g_fShutdown; nRequestNum++) { TCHAR sz[1024]; CQueue::ELEMENT e = { nThreadNum, nRequestNum }; // Try to put an element on the queue if (g_q.Append(&e, 200)) { // Indicate which thread sent it and which request wsprintf(sz, TEXT("Sending %d:%d"), nThreadNum, nRequestNum); } else { // Couldn't put an element on the queue wsprintf(sz, TEXT("Sending %d:%d (%s)"), nThreadNum, nRequestNum, (GetLastError() == ERROR_TIMEOUT) ? TEXT("timeout") : TEXT("full")); } // Show result of appending element ListBox_SetCurSel(hwndLB, ListBox_AddString(hwndLB, sz)); Sleep(2500); // Wait before appending another element } return(0); } /////////////////////////////////////////////////////////////////////////////// DWORD WINAPI ServerThread(PVOID pvParam) { int nThreadNum = PtrToUlong(pvParam); HWND hwndLB = GetDlgItem(g_hwnd, IDC_SERVERS); while (!g_fShutdown) { TCHAR sz[1024]; CQueue::ELEMENT e; // Try to get an element from the queue if (g_q.Remove(&e, 5000)) { // Indicate which thread is processing it, which thread // sent it, and which request we're processing wsprintf(sz, TEXT("%d: Processing %d:%d"), nThreadNum, e.m_nThreadNum, e.m_nRequestNum); // The server takes some time to process the request Sleep(2000 * e.m_nThreadNum); } else { // Couldn't get an element from the queue wsprintf(sz, TEXT("%d: (timeout)"), nThreadNum); } // Show result of processing element ListBox_SetCurSel(hwndLB, ListBox_AddString(hwndLB, sz)); } return(0); } /////////////////////////////////////////////////////////////////////////////// BOOL Dlg_OnInitDialog(HWND hwnd, HWND hwndFocus, LPARAM lParam) { chSETDLGICONS(hwnd, IDI_QUEUE); g_hwnd = hwnd; // Used by client/server threads to show status DWORD dwThreadID; // Create the client threads for (int x = 0; x < 4; x++) g_hThreads[g_nNumThreads++] = chBEGINTHREADEX(NULL, 0, ClientThread, (PVOID) (INT_PTR) x, 0, &dwThreadID); // Create the server threads for (x = 0; x < 2; x++) g_hThreads[g_nNumThreads++] = chBEGINTHREADEX(NULL, 0, ServerThread, (PVOID) (INT_PTR) x, 0, &dwThreadID); return(TRUE); } /////////////////////////////////////////////////////////////////////////////// void Dlg_OnCommand(HWND hwnd, int id, HWND hwndCtl, UINT codeNotify) { switch (id) { case IDCANCEL: EndDialog(hwnd, id); break; } } /////////////////////////////////////////////////////////////////////////////// INT_PTR WINAPI Dlg_Proc(HWND hwnd, UINT uMsg, WPARAM wParam, LPARAM lParam) { switch (uMsg) { chHANDLE_DLGMSG(hwnd, WM_INITDIALOG, Dlg_OnInitDialog); chHANDLE_DLGMSG(hwnd, WM_COMMAND, Dlg_OnCommand); } return(FALSE); } /////////////////////////////////////////////////////////////////////////////// int WINAPI _tWinMain(HINSTANCE hinstExe, HINSTANCE, PTSTR pszCmdLine, int) { DialogBox(hinstExe, MAKEINTRESOURCE(IDD_QUEUE), NULL, Dlg_Proc); InterlockedExchangePointer((PVOID*) &g_fShutdown, (PVOID) TRUE); // Wait for all the threads to terminate & then cleanup WaitForMultipleObjects(g_nNumThreads, g_hThreads, TRUE, INFINITE); while (g_nNumThreads—) CloseHandle(g_hThreads[g_nNumThreads]); return(0); } //////////////////////////////// End of File ////////////////////////////////// 

Queue.rc

 //Microsoft Developer Studio generated resource script. // #include "Resource.h" #define APSTUDIO_READONLY_SYMBOLS ///////////////////////////////////////////////////////////////////////////// // // Generated from the TEXTINCLUDE 2 resource. // #include "afxres.h" ///////////////////////////////////////////////////////////////////////////// #undef APSTUDIO_READONLY_SYMBOLS ///////////////////////////////////////////////////////////////////////////// // English (U.S.) resources #if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_ENU) #ifdef _WIN32 LANGUAGE LANG_ENGLISH, SUBLANG_ENGLISH_US #pragma code_page(1252) #endif //_WIN32 ///////////////////////////////////////////////////////////////////////////// // // Dialog // IDD_QUEUE DIALOG DISCARDABLE 38, 36, 298, 225 STYLE WS_MINIMIZEBOX | WS_VISIBLE | WS_CAPTION | WS_SYSMENU CAPTION "Queue" FONT 8, "MS Sans Serif" BEGIN GROUPBOX "&Client threads",IDC_STATIC,4,4,140,216 LISTBOX IDC_CLIENTS,8,16,132,200,NOT LBS_NOTIFY | LBS_NOINTEGRALHEIGHT | WS_VSCROLL | WS_TABSTOP GROUPBOX "&Server threads",IDC_STATIC,156,4,140,216 LISTBOX IDC_SERVERS,160,16,132,200,NOT LBS_NOTIFY | LBS_NOINTEGRALHEIGHT | WS_VSCROLL | WS_TABSTOP END ///////////////////////////////////////////////////////////////////////////// // // Icon // // Icon with lowest ID value placed first to ensure application icon // remains consistent on all systems. IDI_QUEUE ICON DISCARDABLE "Queue.Ico" #ifdef APSTUDIO_INVOKED ///////////////////////////////////////////////////////////////////////////// // // TEXTINCLUDE // 1 TEXTINCLUDE DISCARDABLE BEGIN "Resource.h\0" END 2 TEXTINCLUDE DISCARDABLE BEGIN "#include ""afxres.h""\r\n" "\0" END 3 TEXTINCLUDE DISCARDABLE BEGIN "\r\n" "\0" END #endif // APSTUDIO_INVOKED ///////////////////////////////////////////////////////////////////////////// // // DESIGNINFO // #ifdef APSTUDIO_INVOKED GUIDELINES DESIGNINFO DISCARDABLE BEGIN IDD_QUEUE, DIALOG BEGIN RIGHTMARGIN, 244 BOTTOMMARGIN, 130 END END #endif // APSTUDIO_INVOKED #endif // English (U.S.) resources ///////////////////////////////////////////////////////////////////////////// #ifndef APSTUDIO_INVOKED ///////////////////////////////////////////////////////////////////////////// // // Generated from the TEXTINCLUDE 3 resource. // ///////////////////////////////////////////////////////////////////////////// #endif // not APSTUDIO_INVOKED 



Programming Applications for Microsoft Windows
Programming Applications for Microsoft Windows (Microsoft Programming Series)
ISBN: 1572319968
EAN: 2147483647
Year: 1999
Pages: 193

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net