Working with Threads

Providing multiple threads in a Qt application is straightforward: We just subclass QThread and reimplement its run() function. To show how this works, we will start by reviewing the code for a very simple QThread subclass that repeatedly prints the same text on a console.

class Thread : public QThread
{
public:
 Thread();

 void setMessage(const QString &message);
 void run();
 void stop();

private:
 QString messageStr;
 volatile bool stopped;
};

The Thread class inherits from QThread and reimplements the run() function. It provides two additional functions: setMessage() and stop().

The stopped variable is declared volatile because it is accessed from different threads and we want to be sure that it is freshly read every time it is needed. If we omitted the volatile keyword, the compiler might optimize access to the variable, possibly leading to incorrect results.

Thread::Thread()
{
 stopped = false;
}

We set stopped to false in the constructor.

void Thread::run()
{
 while (!stopped)
 cerr << messageStr.ascii();
 stopped = false;
 cerr << endl;
}

The run() function is called to start executing the thread. As long as the stopped variable is false, the function keeps printing the given message to the console. The thread terminates when control leaves the run() function.

void Thread::stop()
{
 stopped = true;
}

The stop() function sets the stopped variable to true, thereby telling run() to stop printing text to the console. This function can be called from any thread at any time. For the purposes of this example, we assume that assignment to a bool is an atomic operation. This is a reasonable assumption, considering that a bool is either true or false. We will see later in this section how to use QMutex to guarantee that assigning to a variable is an atomic operation.

QThread provides a terminate() function that terminates the execution of a thread while it is still running. Using terminate() is not recommended, since it can stop the thread at any point and does not give the thread any chance to clean up after itself. It is always safer to use a stopped variable and a stop() function, as we did here.

Figure 17.1. The Threads application

graphics/17fig01.gif

We will now see how to use the Thread class in a small Qt application that uses two threads, A and B, in addition to the initial thread.

class ThreadForm : public QDialog
{
 Q_OBJECT
public:
 ThreadForm(QWidget *parent = 0, const char *name = 0);

protected:
 void closeEvent(QCloseEvent *event);

private slots:
 void startOrStopThreadA();
 void startOrStopThreadB();

private:
 Thread threadA;
 Thread threadB;
 QPushButton *threadAButton;
 QPushButton *threadBButton;
 QPushButton *quitButton;
};

The ThreadForm class declares two variables of type Thread and some buttons to provide a basic user interface.

ThreadForm::ThreadForm(QWidget *parent, const char *name)
 : QDialog(parent, name)
{
 setCaption(tr("Threads"));

 threadA.setMessage("A");
 threadB.setMessage("B");

 threadAButton = new QPushButton(tr("Start A"), this);
 threadBButton = new QPushButton(tr("Start B"), this);
 quitButton = new QPushButton(tr("Quit"), this);
 quitButton->setDefault(true);

 connect(threadAButton, SIGNAL(clicked()),
 this, SLOT(startOrStopThreadA()));
 connect(threadBButton, SIGNAL(clicked()),
 this, SLOT(startOrStopThreadB()));
 connect(quitButton, SIGNAL(clicked()),
 this, SLOT(close()));
 ...
}

In the constructor, we call setMessage() to make the first thread repeatedly print "A" and the second thread "B".

void ThreadForm::startOrStopThreadA()
{
 if (threadA.running()) {
 threadA.stop();
 threadAButton->setText(tr("Start A"));
 } else {
 threadA.start();
 threadAButton->setText(tr("Stop A"));
 }
}

When the user clicks the button for thread A, startOrStopThreadA() stops the thread if it was running and starts it otherwise. It also updates the button's text.

void ThreadForm::startOrStopThreadB()
{
 if (threadB.running()) {
 threadB.stop();
 threadBButton->setText(tr("Start B"));
 } else {
 threadB.start();
 threadBButton->setText(tr("Stop B"));
 }
}

The code for startOrStopThreadB() is very similar.

void ThreadForm::closeEvent(QCloseEvent *event)
{
 threadA.stop();
 threadB.stop();
 threadA.wait();
 threadB.wait();
 event->accept();
}

If the user clicks Quit or closes the window, we stop any running threads and wait for them to finish (using QThread::wait()) before we call QCloseEvent::accept(). This ensures that the application exits in a clean state, although it doesn't really matter in this example.

To compile the application, we must add this line to the .pro file:

CONFIG += thread

This tells qmake to use the threaded version of the Qt library. To build a threaded Qt library, pass the -thread command-line option to the configure script on Unix and Mac OS X. On Windows, the Qt library is threaded by default. For this particular example, we also need the console option since we want the program's output to appear in the console on Windows:

win32:CONFIG += console

If you run the application and click Start A, the console will be filled with 'A's. If you click Start B, it will now fill with alternating sequences of 'A's and 'B's. Click Stop A, and now it will only print 'B's.

A common requirement for multithreaded applications is that of synchronizing several threads. Qt provides the following classes to do this: QMutex, QMutexLocker, QSemaphore, and QWaitCondition.

The QMutex class provides a means of protecting a variable or a piece of code so that only one thread can access it at a time. The class provides a lock() function that locks the mutex. If the mutex is unlocked, the current thread seizes it immediately and locks it; otherwise, the current thread is blocked until the thread that holds the mutex unlocks it. Either way, when the call to lock() returns, the current thread holds the mutex until it calls unlock(). QMutex also provides a tryLock() function that returns immediately if the mutex is already locked.

For example, let's suppose that we wanted to protect the stopped variable of the Thread class with a QMutex. We would then add the following data member to Thread:

QMutex mutex;

The run() function would change to this:

void Thread::run()
{
 for (;;) {
 mutex.lock();
 if (stopped) {
 stopped = false;
 mutex.unlock();
 break;
 }
 mutex.unlock();

 cerr << messageStr.ascii();
 }
 cerr << endl;
}

The stop() function would become this:

void Thread::stop()
{
 mutex.lock();
 stopped = true;
 mutex.unlock();
}

Locking and unlocking a mutex in complex functions, especially functions that use C++ exceptions, can be error-prone. Qt provides the QMutexLocker convenience class to simplify mutex handling. QMutexLocker's constructor accepts a QMutex as argument and locks it. QMutexLocker's destructor unlocks the mutex. For example, we could rewrite the stop() function above as follows:

void Thread::stop()
{
 QMutexLocker locker(&mutex);
 stopped = true;
}

QSemaphore provides semaphores in Qt. A semaphore is a generalization of mutexes that can be used to guard a certain number of identical resources.

The following two code snippets show the correspondence between QSemaphore and QMutex:

QSemaphore semaphore(1);

QMutex mutex;

semaphore++;

mutex.lock();

semaphore--;

mutex.unlock();

The postfix ++ and -- operators acquire and release one resource protected by the semaphore. By passing 1 to the constructor, we tell the semaphore that it controls a signal resource. The advantage of using a semaphore is that we can pass numbers other than 1 to the constructor and then call ++ multiple times to acquire many resources.

A typical application of semaphores is when transfering a certain amount of data (DataSize) between two threads using a shared circular buffer of a certain size (BufferSize):

const int DataSize = 100000;
const int BufferSize = 4096;
char buffer[BufferSize];

The producer thread writes data to the buffer until it reaches the end, and then restarts from the beginning, overwriting existing data. The consumer thread reads the data as it is generated. Figure 17.2 illustrates this, assuming a tiny 16-byte buffer.

Figure 17.2. The producerconsumer model

graphics/17fig02.gif

The need for synchronization in the producerconsumer example is twofold: If the producer generates the data too fast, it will overwrite data that the consumer hasn't yet read; if the consumer reads the data too fast, it will pass the producer and read garbage.

A crude way to solve this problem is to have the producer fill the buffer, then wait until the consumer has read the entire buffer, and so on. However, on multiprocessor machines, this isn't as fast as letting the producer and consumer threads operate on different parts of the buffer at the same time.

One way to efficiently solve the problem is to use two semaphores:

QSemaphore freeSpace(BufferSize);
QSemaphore usedSpace(BufferSize);

The freeSpace semaphore governs the part of the buffer that the producer can fill with data. The usedSpace semaphore governs the area that the consumer can read. These two areas are complementary. Both are initialized with BufferSize (4096), meaning that they can administer up to that many resources.

For this example, each byte counts as one resource. In a real-world application, we would probably operate on larger units (for example, 64 or 256 bytes at a time) to reduce the overhead associated with using semaphores.

void acquire(QSemaphore &semaphore)
{
 semaphore++;
}

The acquire() function attempts to acquire one resource (one byte in the buffer). QSemaphore uses the postfix ++ operator for this, but in our particular example it is more intuitive to use a function called acquire().

void release(QSemaphore &semaphore)
{
 semaphore--;
}

Similarly, we implement the release() function as a synonym for the postfix -- operator.

void Producer::run()
{
 for (int i = 0; i < DataSize; ++i) {
 acquire(freeSpace);
 buffer[i % BufferSize] = "ACGT"[ (uint)rand() % 4];
 release(usedSpace);
 }
}

In the producer, we start by acquiring one "free" byte. If the buffer is full of data that the consumer hasn't read yet, the call to acquire() will block until the consumer has started to consume the data. Once we have acquired the byte, we fill it with some random data ('A', 'C', 'G', or 'T') and release the byte as "used", so that it can be read by the consumer thread.

void Consumer::run()
{
 for (int i = 0; i < DataSize; ++i) {
 acquire(usedSpace);
 cerr << buffer[i % BufferSize];
 release(freeSpace);
 }
 cerr << endl;
}

In the consumer, we start by acquiring one "used" byte. If the buffer contains no data to read, the call to acquire() will block until the producer has produced some. Once we have acquired the byte, we print it and release the byte as "free", making it possible for the producer to fill it with data again.

int main()
{
 usedSpace += BufferSize;

 Producer producer;
 Consumer consumer;
 producer.start();
 consumer.start();
 producer.wait();
 consumer.wait();
 return 0;
}

Finally, in main(), we start by acquiring all the "used" space (using QSemaphore's counterintuitive += operator) to ensure that the consumer will not acquire it and read garbage. Then we start the producer and consumer threads. What happens then is that the producer converts some "free" space into "used" space, and the consumer can then convert it back to "free" space.

When we run the program, it writes a random sequence of 100,000 'A's, 'C's, 'G's, and 'T's to the console and then terminates. To really understand what is going on, we can disable writing the output and instead write 'P' each time the producer generates a byte and 'c' each time the consumer reads a byte. And to make things as simple to follow as possible, we can use much smaller values for DataSize and BufferSize.

For example, here's a possible run with a DataSize of 10 and a BufferSize of 4: "PcPcPcPcPcPcPcPcPcPc". In this case, the consumer reads the bytes as soon as they are generated by the producer; the two threads are executing at the same speed. Another possibility is that the producer fills the whole buffer before the consumer even starts reading it: "PPPPccccPPPPccccPPcc". There are many other possibilities. Semaphores give a lot of latitude to the system-specific thread scheduler, which can study the threads' behavior and choose an optimal scheduling policy.

A different approach to the problem of synchronizing a producer and a consumer is to use QWaitCondition and QMutex. A QWaitCondition allows a thread to wake up other threads when some condition has been met. This allows for more precise control than is possible with mutexes alone. To show how it works, we will redo the producerconsumer example using wait conditions.

const int DataSize = 100000;
const int BufferSize = 4096;
char buffer[BufferSize];

QWaitCondition bufferIsNotFull;
QWaitCondition bufferIsNotEmpty;
QMutex mutex;
int usedSpace = 0;

In addition to the buffer, we declare two QWaitConditions, one QMutex, and one variable that stores how many bytes in the buffer are "used" bytes.

void Producer::run()
{
 for (int i = 0; i < DataSize; ++i) {
 mutex.lock();
 while (usedSpace == BufferSize)
 bufferIsNotFull.wait(&mutex);
 buffer[i % BufferSize] = "ACGT"[ (uint)rand() % 4];
 ++usedSpace;
 bufferIsNotEmpty.wakeAll();
 mutex.unlock();
 }
}

In the producer, we start by checking whether the buffer is full. If it is, we wait on the "buffer is not full" condition. When that condition is met, we write one byte to the buffer, increment usedSpace, and wake any thread waiting for the "buffer is not empty" condition to turn true.

We use a mutex to protect all accesses to the usedSpace variable. The QWaitCondition::wait() function can take a locked mutex as its first argument, which it unlocks before blocking the current thread and then locks before returning.

For this example, we could have replaced the while loop

while (usedSpace == BufferSize)
 bufferIsNotFull.wait(&mutex);

with this if statement:

if (usedSpace == BufferSize) {
 mutex.unlock();
 bufferIsNotFull.wait();
 mutex.lock();
}

However, this would break as soon as we allow more than one producer thread, since another producer could seize the mutex immediately after the wait() call and make the "buffer is not full" condition false again.

void Consumer::run()
{
 for (int i = 0; i < DataSize; ++i) {
 mutex.lock();
 while (usedSpace == 0)
 bufferIsNotEmpty.wait(&mutex);
 cerr << buffer[i % BufferSize];
 --usedSpace;
 bufferIsNotFull.wakeAll();
 mutex.unlock();
 }
 cerr << endl;
}

The consumer does the opposite of the producer: It waits for the "buffer is not empty" condition and wakes up any thread waiting for the "buffer is not full" condition.

In all the examples so far, our threads have accessed the same global variables. But some threaded applications need to have a global variable hold different values in different threads. This is often called thread-local storage (TLS) or thread-specific data (TSD). We can fake it using a map keyed on thread IDs (returned by QThread::currentThread()), but a nicer approach is to use the QThreadStorage class.

A common use of QThreadStorage is for caches. By having a separate cache in different threads, we avoid the overhead of locking, unlocking, and possibly waiting for a mutex. For example:

QThreadStorage *> cache;

void insertIntoCache(int id, double value)
{
 if (!cache.hasLocalData())
 cache.setLocalData(new QMap);
 cache.localData()->insert(id, value);
}

void removeFromCache(int id)
{
 if (cache.hasLocalData())
 cache.localData()->remove(id);
}

The cache variable holds one pointer to a QMap per thread. (Because of problems with some compilers, the template type in QThreadStorage must be a pointer type.) The first time we use the cache in a particular thread, hasLocalData() returns false and we create the QMap object.

In addition to caching, QThreadStorage can be used for global error-state variables (similar to errno), to ensure that modifications in one thread don't affect other threads.





C++ GUI Programming with Qt 3
C++ GUI Programming with Qt 3
ISBN: 0131240722
EAN: 2147483647
Year: 2006
Pages: 140
Simiral book on Amazon

Flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net