Synchronization


It’s best to avoid synchronization issues by not sharing data between threads. Of course, this is not always possible. If data sharing is necessary, you must use synchronization techniques so that only one thread at a time accesses and changes shared state. Remember the synchronization issues with race conditions and deadlocks. If you don’t pay attention to these issues, the reason for problems in applications is hard to find because threading issues just occur from time to time.

This section discusses synchronization technologies that you can use with multiple threads.

  • lock statement

  • Interlocked class

  • Monitor class

  • Wait handles

  • Mutex

  • Semaphore

  • Events

lock, Interlocked and Monitor can be used for synchronization within a process. The classes Mutex, Event, and Semaphore also offer synchronization between threads of multiple processes.

lock Statement and Thread Safety

C# has its own keyword for the synchronization of multiple threads: the lock statement. The lock statement is an easy way to hold for a lock and release it.

Before adding lock statements, let’s go into another race condition. The class SharedState just demonstrates using shared state between threads, and keeps an integer value.

  public class SharedState {    private int state = 0;    public int State     {       get { return state; }       set { state = value; }    } } 

The class Task contains the method DoTheTask(), which is the entry point for a new thread. With the implementation, the State of SharedState is incremented 50,000 times. The variable sharedState is initialized in the constructor of this class.

  public class Task {    SharedState sharedState;    public Task(SharedState sharedState)    {       this.sharedState = sharedState;    }    public void DoTheTask()    {       for (int i = 0; i < 50000; i++)       {          sharedState.State += 1;       }    } } 

In the Main() method, a SharedState object is created and passed to the constructor of 20 Thread objects. All threads are started. After starting the threads, the Main() method does another loop to join every one of the 20 threads to wait until all threads are completed. After the threads are completed, the summarized value of the shared state is written to the console. Having 50,000 loops and 20 threads, a value of 1000000 could be expected. However, often this is not the case.

     class Program    {       static void Main()       {          int numThreads = 20;          SharedState state = new SharedState();          Thread[] threads = new Thread[numThreads];          for (int i = 0; i < numThreads; i++)          {             threads[i] = new Thread(new Task(state).DoTheTask);             threads[i].Start();          }          for (int i = 0; i < numThreads; i++)          {             threads[i].Join();          }          Console.WriteLine("summarized {0}", state.State);       }    } } 

Results received from multiple runs of the application are as shown:

 summarized 939270 summarized 993799 summarized 998304 summarized 937630

The behavior is different every time, but none of the results is correct. You get big differences between debug and release builds. You get differences on the types of CPUs you are using. If you change the loop count for smaller values, you will get correct values many times - but not every time. The application is small enough to see the problem easily; the reason for such a problem can be hard to find in a large application.

You must add synchronization to this program. This can be done with the lock keyword.

The object defined with the lock statement means you wait to get the lock for the specified object. You can only pass a reference type. Locking a value type would just lock a copy, and this wouldn’t make any sense. The compiler provides an error locking value types anyway. As soon as the lock is granted - only one thread gets the lock - the block of the lock statement can run. At the end of the lock statement block, the lock for the object is released and another thread waiting for the lock can be granted.

  lock (obj) {    // synchronized region } 

For locking static members, you can place the lock on the type object:

  lock (typeof(StaticClass)) { } 

You can make the instance members of a class thread-safe by using the lock keyword. This way, only one thread at a time can access the methods DoThis() and DoThat() for the same instance.

  public class Demo {    public void DoThis()    {       lock (this)       {          // only one thread a time can access the DoThis and DoThat methods       }    }    public void DoThat()    {       lock (this)       {       }    } } 

However, because the object of the instance can also be used for synchronized access from the outside, and you can’t control this from the class itself, you can apply the SyncRoot pattern. With the SyncRoot pattern, a private object named syncRoot is created, and this object is used with the lock statements.

 public class Demo {    private object syncRoot = new object();    public void DoThis()    {       lock (syncRoot)       {          // only one thread a time can access the DoThis and DoThat methods       }    }    public void DoThat()    {       lock (syncRoot)       {       }    } }

Using locks costs time and is not always needed. You can create two versions of a class, a synchronized and a nonsynchronized one. This is demonstrated by changing the class Demo here. The class Demo itself is not synchronized, as you can see in the implementation of the DoThis() and DoThat() methods. The class also defines the IsSynchronized property, where the client can get information about the synchronization option of the class. To make a synchronized variant of the class, the static method Synchronized() can be used to pass a nonsynchronized object, and this method returns an object of type SynchronizedDemo. SynchronizedDemo is implemented as an inner class that is derived from the base class Demo and overrides the virtual members of the base class. The overridden members make use of the SyncRoot pattern.

  public class Demo {    private class SynchronizedDemo : Demo    {       private object syncRoot = new object();       private Demo d;       public SynchronizedDemo(Demo d)       {          this.d = d;       }       public override bool IsSynchronized       {          get { return true; }       }       public override void DoThis()       {          lock (syncRoot)          {             d.DoThis();          }       }       public override void DoThat()       {          lock (syncRoot)          {             d.DoThat();          }       }    }    public virtual bool IsSynchronized    {       get { return false; }    }    public static Demo Synchronized(Demo d)    {       if (!d.IsSynchronized)       {          return new SynchronizedDemo(d);       }       return d;    }    public virtual void DoThis()    {    }    public virtual void DoThat()    {    } } 

You must bear in mind that when using the SynchronizedDemo class, only methods are synchronized. There’s no synchronization for invoking two members of this class.

Important 

The SyncRoot pattern might lead to a false sense of thread safety. The .NET 1.0 collection classes implement the SyncRoot pattern; the .NET 2.0 generic collection classes don’t implement this pattern anymore.

Let’s face this with the example shown earlier. If you try to make the SharedState class thread-safe by locking access to the properties with the SyncRoot pattern, you still get the race condition shown earlier.

 public class SharedState {    private int state = 0;    private object syncRoot = new object();    public int State // there's still a race condition, don't do this!    {       get { lock (syncRoot) {return state; }}       set { lock (syncRoot) {state = value; }}    }  }

The thread invoking the DoTheTask method is accessing the get accessor of the SharedState class to get the current value of the state, and then the get accessor sets the new value for the state. In between calling the get and the set accessor the object is not locked, and another thread can be the interim value.

 public void DoTheTask() {    for (int i = 0; i < 50000; i++)    {       sharedState.State += 1;    } }

So, it’s better to leave the SharedState class as it was earlier without thread safety

  public class SharedState {    private int state = 0;    public int State    {       get { return state; }       set { state = value; }    } } 

and to add the lock statement where it belongs, inside the method DoTheTask():

 public void DoTheTask() {    for (int i = 0; i < 50000; i++)    {       lock (sharedState)       {          sharedState.State += 1;       }    } }

This way, the results of the application are always as expected:

 summarized 1000000

Important 

Using the lock statement in one place does not mean that all other threads accessing the object are waiting. You have to explicitly use synchronization with every thread accessing the shared state.

Of course, you can also change the design of the SharedState class and offer increment as an atomic operation. This is a design question - what should be an atomic functionality of the class?

 public class SharedState {    private int state = 0;    private object syncRoot = new object();    public int State    {       get { return state; }    }    public int IncrementState()    {       lock (syncRoot)       {          return ++state;       }    } }

Tip 

For the last example shown of locking the increment of the state, there’s a faster version using the Interlocked class shown next.

Interlocked

The Interlocked class is used to make simple statements for variables atomic. i++ is not thread-safe. i++ consists of getting a value from the memory, incrementing the value by 1, and storing the value back into memory. These operations can be interrupted by the thread scheduler. The Interlocked class provides methods for incrementing, decrementing, and exchanging values in a thread-safe manner.

The methods provided by the Interlocked class are described in the following table.

Open table as spreadsheet

Interlocked Member

Description

Increment()

The Increment() method increments a variable and stores the result in an atomic operation.

Decrement()

Decrement() decrements a variable and stores the result.

Exchange()

Exchange() sets a variable to the specified value and returns the original value of the variable.

CompareExchange()

CompareExchange() compares two variables for equality, and if they are same, the specified value is set and the original value returned.

Add()

The Add() method adds two values and replaces the first variable with the result.

Read()

The Read() method is used to read 64-bit values from memory in an atomic operation. On a 32-bit system, reading 64 bits is not atomic; here values from two memory addresses are read.

On a 64-bit system, the Read() method is not required because accessing 64 bit is an atomic operation.

Using the Interlocked class in contrast to other synchronization techniques is a lot faster. However, you can only use it for simple synchronization issues.

For example, instead of using the lock statement to lock access to the variable someState when setting it to a new value, in case it is null, you can use the Interlocked class, which is faster:

  lock (this) {    if (someState == null)    {       someState = newState;    } } 

The faster version with the same functionality uses the Interlocked.CompareExchange method:

  Interlocked.CompareExchange<SomeState>(ref someState, newState, null); 

And instead of doing an increment inside a lock statement:

  public int State {    get    {       lock (this)       {          return ++state;       }    } } 

Interlocked.Increment() is faster:

 public int State {    get    {       return Interlocked.Increment(ref state);    } }

Monitor

The C# lock statement is resolved by the compiler to the use of the Monitor class. The lock statement as follows

  lock (obj) {    // synchronized region for obj } 

is resolved to invoking the Enter() method that waits until the thread gets the lock of the object. Only one thread a time may be the owner of the object lock. As soon as the lock is resolved, the thread can enter the synchronized section. The Exit() method of the Monitor class releases the lock. That the lock is released in any case (also in case an exception is thrown), the Exit() method is placed in the finally handler of the try block.

Tip 

try/finally is covered in Chapter 13.

  Monitor.Enter(obj); try {    // synchronized region for obj } finally {    Monitor.Exit(obj); } 

The class Monitor has a big advantage compared to the lock statement of C#: you can add a timeout value waiting to get the lock. So instead of endlessly waiting to get the lock, you can use the TryEnter method, where you can pass a timeout value that defines the maximum amount of time waiting to get the lock. If the lock for obj is acquired, TryEnter returns true and performs synchronized access to the state guarded by the object obj. If obj is locked for more than 500 milliseconds by another thread, TryEnter returns false and the thread does not wait any longer but is used to do something else. Maybe at a later time, the thread can try to acquire the lock once more.

  if (Monitor.TryEnter(obj, 500)) {    try    {       // acquired the lock       // synchronized region for obj    }    finally    {       Monitor.Exit(obj);    } } else {    // didn't get the lock, do something else } 

Wait Handle

WaitHandle is an abstract base class that you can use to wait for a signal to be set. There are different things you can wait for, as WaitHandle is a base class and some classes are derived from it.

In the use of asynchronous delegates early in this chapter, the WaitHandle was already in use. The method BeginInvoke() of the asynchronous delegate returns an object that implements the interface IAsycResult. Using IAsyncResult you can access a WaitHandle with the property AsyncWaitHandle. When you invoke the method WaitOne(), the thread waits until a signal is received that is associated with the wait handle.

 static void Main() {    TakesAWhileDelegate d1 = TakesAWhile;    IAsyncResult ar = d1.BeginInvoke(1, 3000, null, null);    while (true)    {       Console.Write(".");       if (ar.AsyncWaitHandle.WaitOne(50, false))       {          Console.WriteLine("Can get the result now");          break;       }    }    int result = d1.EndInvoke(ar);    Console.WriteLine("result: {0}", result); }

The methods that are defined by the class WaitHandle to perform a wait are described in the following table.

Open table as spreadsheet

WaitHandle Member

Description

WaitOne()

WaitOne() is an instance method where you can wait for a signal to occur. Optionally, you can specify a timeout value for the maximum amount of time to wait.

WaitAll()

WaitAll() is a static method used to pass an array of WaitHandle objects and wait until all of these handles are signaled.

WaitAny()

WaitAny() is a static method used to pass an array of WaitHandle objects and wait until one of these handles is signaled. This method returns the index of the wait handle object that was signaled, so you know with what functionality you can continue in the program. If the timeout occurred before one handle was signaled, WaitAny() returns WaitTimeout.

With the SafeWaitHandle property, you can also assign a native handle to an operating system resource and wait for that handle. For example, you can assign a SafeFileHandle to wait for a file I/O operation to complete, or a custom SafeTransactionHandle as shown in Chapter 21, “Transactions.”

The classes Mutex, Event, and Semaphore are derived from the base class WaitHandle, so you can use these all with waits.

Mutex

Mutex (mutual exclusion) is one of the classes of the .NET Framework that offer synchronization across multiple processes. It is very similar to the Monitor class in that there’s just one owner. Just one thread can get a lock of the mutex and access the synchronized code regions that are secured by the mutex.

With the constructor of the Mutex class you can define if the mutex should initially be owned by the calling thread, define a name of the mutex, and get the information if the mutex already existed. In the sample code, the third parameter is defined as an out parameter to receive a Boolean value if the mutex was created newly. If the value returned is false, the mutex was already defined. The mutex might be defined in a different process, as a mutex with a name is known for the operating system and is shared between different processes. If there is not a name assigned to the mutex, the mutex is unnamed and not shared between different processes.

  bool createdNew; Mutex mutex = new Mutex(false, "ProCSharpMutex", out createdNew); 

To open an existing mutex, you can also use the method Mutex.OpenExisting(), which doesn’t require the same .NET privileges as creating the mutex with the constructor.

Because the Mutex class derives from the base class WaitHandle, you can do a WaitOne() to acquire the mutex lock and be the owner of the mutex during that time. The mutex is released by invoking the ReleaseMutex() method.

  if (mutex.WaitOne()) {    try    {       // synchronized region    }    finally    {       mutex.ReleaseMutex();    } } else {    // some problem happened while waiting } 

Because a named mutex is known systemwide, you can use it to not allow an application to be started twice. In the following Windows Forms application, the constructor of the Mutex object is invoked. Then it is verified if the mutex with the name SingletonWinAppMutex exists already. If it does, the application exits.

 static class Program {    [STAThread]    static void Main()    {       bool createdNew;       Mutex mutex = new Mutex(false, "SingletonWinAppMutex", out createdNew);       if (!createdNew)       {          MessageBox.Show("You can only start one instance of the application");          Application.Exit();          return;       }       Application.EnableVisualStyles();       Application.SetCompatibleTextRenderingDefault(false);       Application.Run(new Form1());    } }

Semaphore

A semaphore is very similar to a mutex, but there’s the difference that the semaphore can be used by multiple threads at once. A semaphore is a counting mutex; with a semaphore you can define a count how many threads are allowed to access the resource guarded by the semaphore simultaneously. This can be used if you have a number of the resources available and can only allow a specific number of threads access to the resource. For example, say that you want to access physical I/O ports on the system and there are three ports available. So, three threads can access the I/O ports simultaneously, but the fourth thread needs to wait until the resource is released by one of the other threads.

In the sample application, in the Main method six threads are created and one semaphore with a count of 4. In the constructor of the Semaphore class, you can define count for the number of locks that can be acquired with the semaphore (the second parameter) and the number of locks that are free initially (the first parameter). If the first parameter has a lower value than the second parameter, the difference between the values defines the already allocated semaphore count. As with the mutex, you can also assign a name to the semaphore to share it between different processes. Here, no named is defined with the semaphore, so it is used only within this process. After the Semaphore object is created, six threads are started, and they get all the same semaphore.

  using System; using System.Threading; using System.Diagnostics; namespace Wrox.ProCSharp.Threading {    class Program    {       static void Main()       {          int threadCount = 6;          int semaphoreCount = 4;          Semaphore semaphore = new Semaphore(semaphoreCount, semaphoreCount);          Thread[] threads = new Thread[threadCount];          for (int i = 0; i < threadCount; i++)          {             threads[i] = new Thread(ThreadMain);             threads[i].Start(semaphore);          }          for (int i = 0; i < threadCount; i++)          {             threads[i].Join();          }          Console.WriteLine("All threads finished");       } 

In the thread’s main method, ThreadMain(), the thread does a WaitOne() to lock the semaphore. Remember the semaphore has a count of 4, so 4 threads can acquire the lock. Thread 5 must wait and here the timeout of 500 milliseconds is defined for a maximum wait time. If the lock cannot be acquired after the wait time, the thread writes a message to the console and repeats the wait in a loop. As soon as the lock is made, the thread writes a message to the console, sleeps for some time, and releases the lock. Again, with the release of the lock it is important that the resource be released in all cases. That’s why the Release() method of the Semaphore class is invoked in a finally handler.

        static void ThreadMain(object o)       {          Semaphore semaphore = o as Semaphore;          Trace.Assert(semaphore != null, "o must be a Semaphore type");          bool isCompleted = false;          while (!isCompleted)          {             if (semaphore.WaitOne(600, false))             {                try                {                   Console.WriteLine("Thread {0} locks the sempahore",                         Thread.CurrentThread.ManagedThreadId);                   Thread.Sleep(2000);                }                finally                {                   semaphore.Release();                   Console.WriteLine("Thread {0} releases the semaphore",                      Thread.CurrentThread.ManagedThreadId);                   isCompleted = true;                }             }             else             {                Console.WriteLine("Timeout for thread {0}; wait again",                   Thread.CurrentThread.ManagedThreadId);             }          }       }    } } 

Running the application, you can indeed see that with four threads the lock is made immediately. The threads with IDs 7 and 8 have to wait. The wait continues in the loop until one of the other threads releases the semaphore.

 Thread 3 locks the sempahore Thread 4 locks the sempahore Thread 5 locks the sempahore Thread 6 locks the sempahore Timeout for thread 8; wait again Timeout for thread 7; wait again Timeout for thread 8; wait again Timeout for thread 7; wait again Timeout for thread 7; wait again Timeout for thread 8; wait again Thread 3 releases the semaphore Thread 8 locks the sempahore Thread 4 releases the semaphore Thread 7 locks the sempahore Thread 5 releases the semaphore Thread 6 releases the semaphore Thread 8 releases the semaphore Thread 7 releases the semaphore All threads finished

Events

Events are the next of the systemwide synchronization resources. For using system events from managed code, the .NET Framework offers the classes ManualResetEvent and AutoResetEvent in the namespace System.Threading.

Tip 

The event keyword from C# that was covered in Chapter 7 has nothing to do with the event classes from the namespace System.Threading. The event keyword is based on delegates, while both event classes are .NET wrappers to the systemwide native event resource for synchronization.

You can use events to inform other threads that some data is here, something is completed, and so on. An event can be signaled or not signaled. A thread can wait for the event to be in a signaled state with the help of the WaitHandle class, which was already discussed.

A ManualResetEvent is signaled by invoking the Set() method and turned back to a nonsignaled state with the Reset() method. If multiple threads are waiting for an event to be signaled, and the Set() method is invoked, all threads waiting are released. Also, if a thread just invokes the WaitOne() method, but the event is already signaled, the waiting thread can continue immediately.

An AutoResetEvent is also signaled by invoking the Set() method. It is also possible to set it back to a nonsignaled state with the Reset() method. However, if a thread is waiting for an auto-reset event to be signaled, the event is automatically changed into a nonsignaled state when the wait state of the first thread is finished. The event changes automatically back into a nonsignaled state. This way, if multiple threads are waiting for the event to be set, only one thread is released from its wait state. It is not the thread that has been waiting the longest for the event to be signaled but the thread waiting with the highest priority.

To demonstrate events with the AutoResetEvent class, the class ThreadTask defines the method Calculation(), which is the entry point for a thread. With this method, the thread receives input data for calculation (defined with the struct InputData) and writes the result to the variable result that can be accessed from the Result property. As soon as the result is completed (after a random amount of time), the event is signaled by invoking the Set() method of the AutoResetEvent.

  public struct InputData {    public int X;    public int Y;    public InputData(int x, int y)    {       this.X = x;       this.Y = y;    } } public class ThreadTask {    private AutoResetEvent autoEvent;    private int result;    public int Result    {       get { return result; }    }    public ThreadTask(AutoResetEvent ev)    {       this.autoEvent = ev;    }    public void Calculation(object obj)    {       InputData data = (InputData)obj;       Console.WriteLine("Thread {0} starts calculation",          Thread.CurrentThread.ManagedThreadId);       Thread.Sleep(new Random().Next(3000));       result = data.X + data.Y;       // signal the event - completed!       Console.WriteLine("Thread {0} is ready",          Thread.CurrentThread.ManagedThreadId);       autoEvent.Set();    } } 

The Main() method of the program defines arrays of four AutoResetEvent objects and four ThreadTask objects. Every ThreadTask is initialized in the constructor with an AutoResetEvent object, so that every thread gets its own event object to signal when it is completed. Now the ThreadPool class is used to have background threads running the calculation tasks by invoking the method QueueUserWorkItem().

  class Program {    static void Main()    {       int taskCount = 4;       AutoResetEvent[] autoEvents = new AutoResetEvent[taskCount];       ThreadTask[] tasks = new ThreadTask[taskCount];       for (int i = 0; i < taskCount; i++)       {          autoEvents[i] = new AutoResetEvent(false);          tasks[i] = new ThreadTask(mevents[i]);                    ThreadPool.QueueUserWorkItem(tasks[i].Calculation,             new InputData(i + 1, i + 3));       }       //... 

The WaitHandle class is now used to wait for any one of the events in the array. WaitAny() waits until any one of the events is signaled. The index returned from WaitAny() matches the index the event of the array that is passed to WaitAny() to provide the information on which one of the events was signaled, so the result from this event can be read.

        for (int i = 0; i < taskCount; i++)       {          int index = WaitHandle.WaitAny(autoEvents);          if (index == WaitHandle.WaitTimeout)          {             Console.WriteLine("Timeout!!");          }          else          {             Console.WriteLine("finished task for {0}, result: {1}", index,                tasks[index].Result);          }       }    } } 

Starting the application, you can see the threads doing the calculation and setting the event to inform the main thread that it can read the result. Depending on random times, whether the build is a debug or release build, and your hardware, you might see different orders and also a different number of threads from the pool doing the tasks. Here, thread 4 was reused from the pool for doing two tasks because it was fast enough to finish the calculation first.

 Thread 3 starts calculation Thread 4 starts calculation Thread 5 starts calculation Thread 4 is ready finished task for 1, result: 6 Thread 4 starts calculation Thread 3 is ready finished task for 0, result: 4 Thread 4 is ready finished task for 3, result: 10 Thread 5 is ready finished task for 2, result: 8




Professional C# 2005 with .NET 3.0
Professional C# 2005 with .NET 3.0
ISBN: 470124725
EAN: N/A
Year: 2007
Pages: 427

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net