Scalability in .NET

 < Day Day Up > 



If you have a multiprocessor system, then you'll see threads really show their worth. The Windows OS manages the allocation of threads to processors and, as you have seen throughout this book, firing any process automatically starts a thread. The .NET Framework does not provide fine-grained control of the processor allocation, preferring to allow the operating system to control the scheduling, as it will have more information on the loading of the processors than the CLR would. It does, however, provide some control over which processor an entire process runs on. However, this applies to all of its threads, so its use is not applicable to this book.

If you have only one thread, the main thread, then every task within that thread will operate on the same processor. However, if a new thread is fired, then the operating system schedules which processor it should be executed on. This decision as to which processor will run the thread does itself consume some processor resources and so, for small tasks, it isn't generally worth it as the time to execute may be only as long as the time it takes for the OS to allocate the task to a processor. However, this allocation has been taking less and less time in successive versions of Windows, and for anything other than the most trivial of tasks, when using threads, you should find a performance improvement by creating a new thread to execute your task. It is in symmetric multi-processor (SMP) systems that the benefits of threading are really shown, as the processors can be used to their full effect to distribute the load of the application.

In the next section, we describe how to create a thread pool manager with which you can create and manage threads, and which will ensure that a maximum and minimum number of threads exist in a pool and that idle threads get reused.

A Thread Pool Manager

Throughout this book, you have seen different ways of creating threads and in this chapter we have described the ThreadPool class to make use of the operating system's own thread pool for short-lived threads. We can implement a half-way house between the two, however. We can create a class that will keep a pool of a specified number of threads to be supplied to any requesting application. This will enable the threads to be managed more easily by your code, and also allow for faster thread execution as you may be able to use a previously instantiated thread object. This class will draw together much of the knowledge acquired so far, and you will be able to use it in your own multithreaded applications. We will explain this class as we go along, and at the end provide an application to test that this assembly is working as expected.

So, let's get started and explain the code of our thread pool manager contained in a file named MyThreadPool.cs:

    using System;    using System.Text;    using System.Threading    namespace GenThreadPool    { 

The above declarations show that the only additional external assembly needed is System.dll. The GenThreadPool namespace is defined to contain all of the relevant classes for this project. Below we show the interface called IThreadPool that will be used for the GenThreadPoolImpl class:

      public interface IThreadPool      {        void AddJob(System.Threading.Thread jobToRun);        Stats GetStats();      }    } 

This defines two methods for the thread pool, AddJob() and GetStats(), which will be detailed in the following definitions of the GenThreadPoolImpl class, which generates the thread pool that we will be using:

      public class GenThreadPoolImpl : IThreadPool      {        private int  maxThreads;        private int _minThreads;        private int   maxIdleTime;        private static bool  debug;        private ArrayList _pendingJobs;        private ArrayList  availableThreads;        public bool Debug        {          get          {            return debug;          }          set          {             debug = value;          }        }        public ArrayList PendingJobs        {          get          {            return this. pendingJobs;          }          set          {            this. pendingJobs = value;          }        }        public ArrayList AvailableThreads        {          get          {            return this. availableThreads;          }        }        public int MaxIdleTime        {          get          {            return this._maxIdleTime;          }          set          {            this. maxIdleTime = value;          }        }        public int MaxThreads        {          get          {            return this._maxThreads;          }          set          {            this. maxThreads = value;          }        }        public int MinThreads        {          get          {            return this._minThreads;          }          set          {            this. minThreads = value;          }        } 

This class implements the IThreadPool interface, which we defined earlier, and then goes on to define a few Private fields. The properties are just wrappers around the relevant Private members to prevent users from altering the values directly, in case further rules need to be added later. The fields m_maxThreads, m_minThreads, and m_maxIdleTime specify the maximum and minimum number of threads in the pool, and how long in milliseconds to allow a thread to remain idle before removing it from the pool. There are three constructors for this class:

        public GenThreadPoolImpl()        {           maxThreads = 1;           minThreads = 0;           _maxIdleTime = 300;           this. pendingJobs =               ArrayList.Synchronized (new ArrayList() );           this. availableThreads =               ArrayList.Synchronized (new ArrayList());            debug = false;        } 

The default constructor only permits one thread to be present in the pool, and will destroy it after only 0.3 seconds. It also performs some lazy initialization, creating an array list to contain the jobs awaiting a thread, and the threads not yet allocated to a method call. The m_debug flag, when set to true, would allow further debugging information while testing:

        public GenThreadPoolImpl(int maxThreads, int minThreads,                                 int maxIdleTime)        {           maxThreads = maxThreads;           _minThreads = minThreads;           maxIdleTime = maxIdleTime;          this._pendingJobs =              ArrayList.Synchronized (new ArrayList());          this. availableThreads =              ArrayList.Synchronized (new ArrayList());           debug = false;          InitAvailableThreads();        } 

When a GenThreadPoolImpl class is instantiated with three integers, we specify how the minimum and maximum number of threads, and the idle time of the threads. It also fires off the InitAvailableThreads() method, detailed below:

        private void InitAvailableThreads()        {          if(this. minThreads > 0)            for(int i = 0; i < this. minThreads; i++)            {              Thread t = new                  Thread(new ThreadStart(new GenPool (this, this).run ));               ThreadElement e = new ThreadElement (t);               e.Idle = true;               _availableThreads.Add (e);            }          Console.WriteLine("Initialized the ThreadPool. "                            + " Number of Available threads: "                            + this. availableThreads.Count);        } 

This creates the threads needed for the pool on instantiation. The default constructor only specified one thread, so it wasn't necessary before. This cycles through, creating the maximum number of threads allowed by the pool, specified in m_maxThreads. Below is the constructor for four arguments:

        public GenThreadPoolImpl(int maxThreads, int minThreads,                                 int maxIdleTime, bool debug )        {            _maxThreads = maxThreads;             minThreads = minThreads;             maxIdleTime = maxIdleTime;            this. pendingJobs =                ArrayList.Synchronized (new ArrayList());            this._availableThreads =                ArrayList.Synchronized (new ArrayList());            _debug = debug_;            InitAvailableThreads();        } 

This constructor does the same as the above, only allowing us to set the debugging flag. We now go on to describe the business end of this class, the AddJob() method:

        public void AddJob(Thread job)        {          if(job == null)return;          lock(this)          { 

The above method actually adds a job to the pool. If the job passed as a parameter is non-existent, then it exits the method. Otherwise, it provides a lock on the GenThreadPoolImpl instance to ensure that no other thread or process can add or remove a job:

          pendingJobs.Add (job);         int index = findFirstIdleThread();         if(_debug)             Console.WriteLine("First Idle Thread is " + index);         if(index == -1)         {           if(( maxThreads == -1)               || (  availableThreads.Count < maxThreads))           {             if( debug)                 Console.WriteLine("Creating a new thread");             Thread t = new Thread(new                 ThreadStart (new GenPool(this, this).run )); 

The job is added to an ArrayList, which will store all the jobs awaiting execution and completion. The FindFirstIdleThread() method returns the index of a thread contained within m_availableThreads that is currently idle and so available for use. If the method returns -1, then there are no idle threads and the pool needs to attempt to create a new one. The Run() method of the GenPool class is fired inside this thread:

             ThreadElement e = new ThreadElement (t);             e.Idle = false;             e.getMyThread().Start();             try             {               availableThreads.Add (e);             }             catch(OutOfMemoryException)             {               Console.WriteLine("Out of Memory");                availableThreads.Add (e);               Console.WriteLine("Added Job again");             }             return;           }           if(_debug)               Console.WriteLine("No Threads Available .."                                 + this.GetStats().ToString()); 

The ThreadElement class is another helper class that will be defined later. It adds some additional properties to a standard thread so that the pool can manage it effectively. The thread's Start() method is fired before it is added to the m_availableThreads collection.

    else    {      try      {        if( debug)            Console.WriteLine("Using an existing thread...");        ((ThreadElement)_availableThreads[index]).Idle = false; 

Above, we start to detail the condition whereby a thread is deemed idle and so free for allocation to a new job. Firstly, we convert the thread explicitly into a ThreadElement and change its idle flag:

        lock(((ThreadElement) availableThreads[index]).getMyThread ())        {          Monitor.Pulse((              (ThreadElement) availableThreads[index]).getMyThread ());        } 

Here we lock the thread so that it cannot be affected by any other process. We then alert all waiting threads that it is now available for use, so we issue a Monitor.Pulse() instruction, and then release the lock:

      }      catch(Exception ex)      {        Console.WriteLine ("Error while reusing thread " + ex.Message );        if( debug)        {          Console.WriteLine("Value of index is " + index );          Console.WriteLine ("Size of available threads is " +                             this. availableThreads.Count);          Console.WriteLine ("Available Threads is "                             + this. availableThreads .IsSynchronized );        }      }    }//end of else    }//lock    }//end of method 

Finally, we catch any exceptions and output the results to the command line, providing more useful debugging information if the this.Debug flag has been set. That completes the AddJob() method so now let's look at the implementation of the GetStats() method:

        public Stats GetStats()        {          Stats stats = new Stats();          stats.maxThreads =  maxThreads;          stats.minThreads =  minThreads;          stats.maxIdleTime =  maxIdleTime;          stats.pendingJobs =  pendingJobs.Count;          stats.numThreads =  availableThreads.Count;          stats.jobsInProgress =               availableThreads.Count - findIdleThreadCount();          return stats;        } 

The GetStats() method returns a Stats() structure, which we will define later. As we will see, it contains the minimum and maximum number of threads, as well as other values set in the constructor. Now let's look at the FindIdleThreadCount() method:

        public int FindIdleThreadCount()        {          int idleThreads = 0;          for (int i = 0; i <  availableThreads.Count; i++)          {            if(((ThreadElement)_availableThreads[i]).Idle)               idleThreads++;          }          return idleThreads;        } 

This method is one called earlier in the class and it simply goes through the array list of threads and returns the how many of them are idle. We also used the FindFirstIdleThread() method so let's see it:

        public int FindFirstIdleThread()        {          for (int i = 0; i <  availableThreads.Count; i++)          {            if(((ThreadElement) availableThreads[i]).Idle )               return i;          }          return -1;        } 

As we can see, the method returns the index of the first idle thread in the array list. We will also need the following method:

        public int FindThread()        {          for(int i = 0; i <  availableThreads.Count; i++)          {            if(((ThreadElement) availableThreads[i])                .GetMyThread()                .Equals (Thread.CurrentThread ))            return i;          }          return -1;        } 

This method is used to determine in which index position in the array list the current thread is located. We'll also need the following method:

        public  void RemoveThread()        {          for(int i = 0 ; i <  availableThreads.Count; i++)          {            if(((ThreadElement) availableThreads[i])                .GetMyThread()                .Equals (Thread.CurrentThread ))            {               availableThreads.RemoveAt (i);              return;            }          }        } 

This removes the current thread from the array list of threads. This is, of course, used to remove a thread from the pool when it is finished with and has been idle for longer than the time specified in this.MaxIdleTime. Now we start to define the rest of the classes for this assembly:

      public class GenPool      {        private Object _lock;        private GenThreadPoolImpl  gn;        public GenPool(Object lock , GenThreadPoolImpl gn)        {          this. lock = lock ;          this._gn = gn;        } 

The GenPool class executes all of the pending threads, and once complete, after the period specified in MaxIdleTime, will remove them from the pool. It checks to see if there are any threads available on the GenThreadPoolImpl passed as a reference to the constructor, and it locks the values of the object passed as the first parameter. In general, this will be the same GenThreadPoolImpl object passed as the second argument:

        public void Run()        {          Thread job = null;          try          {            while(true)            {              while(true)              {                lock(this._lock )              {                if(_gn.PendingJobs.Count == 0)                {                  int index =  gn.findThread();                  if(index == -1)return;                  ((ThreadElement) gn.AvailableThreads[index]).Idle =                      true;                  break;                }                job = (Thread) gn.PendingJobs[0];                 gn.PendingJobs.RemoveAt (0);              }//end of lock 

This Run() method starts a loop to attempt to find a thread in the pool that matches the current thread, and begin its execution. You can see above that it locks the object passed in as a parameter to the constructor, and if there are no pending jobs, then it just finds the thread in the pool that matches the current one, returning -1 if there isn't one. If there is a pending job, then it retrieves the first one, and then removes it from the queue:

             //run the job          job.Start ();        } 

It then begins execution of the method on the pending thread, and returns to the start of the loop:

      try      {        lock(this)        {          if( gn.MaxIdleTime == -1)            Monitor.Wait (this);          else Monitor.Wait (this, gn.MaxIdleTime);        }      } 

In the next part of the loop (once it has no more pending jobs), it locks the current object and waits for the thread to be free for execution for the period specified in MaxIdleTime.

      lock(_lock)      {        if(_gn.PendingJobs.Count == 0)        {          if( gn.MinThreads != -1 &&  gn.AvailableThreads.Count >              _gn.MinThreads)          {            _gn.removeThread();            return;          }        }      } 

Finally, it locks the object again, and if there are no pending jobs and there are more than the minimum required number of threads, then it removes the thread from the pool. We now move on to the ThreadElement class:

      public class ThreadElement      {        private bool  idle;        private Thread _thread;        public ThreadElement(Thread th)        {          this. thread = th;          this. idle = true;        } 

A ThreadElement is what is stored in the thread pool, and takes a thread as the parameter for its constructor. It sets the thread as idle on construction of this object:

        public bool Idle        {          get          {            return this. idle;          }          set          {            this. idle = value;          }        }       public Thread GetMyThread(){return this._thread;} 

The above code is straightforward. The Idle property essentially defines when the thread's execution is complete, and the GetMyThread() method just returns the Thread object. Now look at the following structure:

      public struct Stats      {        public int maxThreads;        public int minThreads;        public int maxIdleTime;        public int numThreads;        public int pendingJobs;        public int jobsInProgress; 

Here we define the Stats structure that we mentioned earlier, which stores all of the statistics of the thread pool. The fields are self-describing. ToString() is the only method:

      public override String ToString()      {        StringBuilder sb = new StringBuilder ("MaxThreads = ");        sb.Append(maxThreads);        sb.Append("\nMinThreads = ");        sb.Append(minThreads);        sb.Append("\nMaxIdleTime = ");        sb.Append(maxIdleTime);        sb.Append("\nPending Jobs = ");        sb.Append(pendingJobs);        sb.Append("\nJobs In Progress = ");        sb.Append(jobsInProgress);        return sb.ToString ();      } 

This ToString() method returns the structure in a string format, using StringBuilder to build up the string. The 107 argument initializes the StringBuilder's size to 107 characters, as it is fair to assume that there are not likely to be more than 99,999 threads. If so, then StringBuilder will resize itself anyway. This capacity specification allows a small performance boost.

If you have an application that is firing methods repeatedly on different threads, this class can manage the process and help ensure that too many threads aren't spawned. Apart from containing a maximum and minimum number of threads, it will reuse an existing thread if possible. You can now compile this project into a DLL, and use this class from within other projects. Below is code that will allow you to test this thread pool class, TestGenThreadPool.cs:

    using System;    using System.Threading;    using GenThreadPool;    namespace TestGenThreadPool    {      public class TestPerformance      {        public int count;        private Object  lock = new Object();        public TestPerformance(IThreadPool pool, int times)        {          count = 0;          DateTime start =System.DateTime .Now;          Console.WriteLine("Start Time for Job is "                            + System.DateTime .Now);          for (int i = 0; i < times; i++)          {            Thread t1 = new Thread(                new ThreadStart (new Job(this).Run ));            pool.AddJob(t1);          }          Console.WriteLine("End Time for Job is " +                            System.DateTime .Now);          Console.WriteLine("Performance using Pool[in ms]: ");          Console.WriteLine(""                            + (System.DateTime.Now - start).ToString());          count = 0;          start = System.DateTime.Now;          Console.WriteLine("Start Time for JobThread is " +                            System.DateTime.Now.ToString());          for (int i = 0; i < times; i++)          {            Thread jt = new Thread(new ThreadStart(new Job(this).Run));            jt.Start();          }          while (true)          {            lock (_lock)            {              if (count == times)                  break;            }            try            {              Thread.Sleep(1000);            }            catch            {            }          }          Console.WriteLine("End Time for JobThread is "                            + System.DateTime.Now.ToString());          Console.WriteLine("Performance using no Pool[in ms]: ");          Console.WriteLine(""                            + (System.DateTime.Now - start).ToString());        }        sealed class JobThread        {          private Object  lock = new Object();          private TestPerformance tpf;          public JobThread(TestPerformance tpf )          {            this.tpf = tpf_;          }          public void Run()          {            lock( lock)            {              tpf.count++;            }          }        }        sealed class Job        {          private Object  lock = new Object();          private TestPerformance tpf;          public Job(TestPerformance tpf )          {            this.tpf = tpf ;          }          public void Run()          {            tpf.count++;          }        }      }    }      class TestPool      {        static void Main(string[] args)        {          GenThreadPool.IThreadPool tp =              new GenThreadPoolImpl(200, 300, 300, true);          TestPerformance p = new TestPerformance (tp, 100);        }      } 

The above application just mechanically attempts to add new threads to an instance of the thread pool, with the debug flag set to true. It is quite straightforward, but the best way to see this thread pool in action is to try it out in your own applications. You can use this class, once it is compiled.



 < Day Day Up > 



C# Threading Handbook
C# Threading Handbook
ISBN: 1861008295
EAN: 2147483647
Year: 2003
Pages: 74

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net