Reading Messages from a Queue

< BACK  NEXT >
[oR]

Messages can be read from a queue on the same Windows CE device or on another computer. You need to have a valid network connection to the other computer to read from a remote queue. To read one or more messages from a queue, you must do the following:

  • Open the queue using MQOpenQueue

  • Initialize a MQMSGPROPS structure in which the message will be received

  • Call MQReceiveMessage (Table 15.3) to read a message, if one is present

  • Close the queue when finished reading messages by calling MQCloseQueue

The minimum properties needed to pass to MQReceiveMessage are the following:

  • PROPID_M_BODY_SIZE Property receives the number of bytes in the message body

  • PROPID_M_BODY Property receives the message body data

The PROPID_M_BODY_SIZE property is initialized as shown in the following code fragment. After a successful call to MQReceiveMessage, the aMsgPropVar[0].ulVal member will contain the number of bytes in the message body.

 aMsgPropId[0] = PROPID_M_BODY_SIZE; aMsgPropVar[0].vt = VT_UI4; 

You will need to allocate a buffer in which the message body will be received, and initialize the PROPID_M_BODY property with this pointer. In the following code, a 1-KB buffer is allocated, and the pointer is assigned to the pElems member. The size of the buffer is assigned to the cElems member.

Table 15.3. MQReceiveMessage Reads a message from the queue
MQReceiveMessage
QUEUEHANDLE hSource Handle to an open queue.
DWORD dwTimeout Timeout to wait for message, INFINITE to wait forever, or 0 to return immediately if no message is present.
DWORD dwAction How to access the queue:
MQ_ACTION_RECEIVE Read the next message and remove message.
MQ_ACTION_PEEK_CURRENT Read current message, but do not remove it.
MQ_ACTION_PEEK_NEXT Use a cursor to read the next message, but do not remove it.
MQMSGPROPS pMessageProps Structure in which the message will be received.
LPOVERLAPPED lpOverlapped Pointer to an OVERLAPPED structure for asynchronous message reading. Use NULL for synchronous access.
PMQRECEIVECALLBACK fnReceiveCallback Pointer to callback function for asynchronous message reads. Use NULL for synchronous access.
HANDLE hCursor Handle to cursor for reading messages, or NULL for no cursor.
Transaction *pTransaction Not supported, pass as NULL.
HRESULT Return Value MQ_OK for success, or error message on failure.

 DWORD dwBodyBufferSize = 1024; LPTSTR lpszBodyBuffer = new TCHAR[dwBodyBufferSize]; aMsgPropId[1] = PROPID_M_BODY; aMsgPropVar[1].vt = VT_VECTOR|VT_UI1; aMsgPropVar[1].caub.pElems =       (UCHAR*)lpszBodyBuffer; aMsgPropVar[1].caub.cElems = dwBodyBufferSize; 

The code in Listing 15.3 shows opening the queue on the Windows CE device created in Listing 15.2, and reading a message from the queue. The timeout of 0 means that MQReceiveMessage will return immediately with a message if one is present, or return a MQ_ERROR_IO_TIMEOUT error if none ispresent. Since MQReceiveMessage is called on the primary thread, it is important that the call to MQReceiveMessage does not block for any length of time. The code displays the number of bytes in the message body and then displays the contents of the body. Since the receive action is MQ_ACTION_RECEIVE, the message will be removed from the queue once it has been read.

Listing 15.3 Reading a message from a queue
 void DisplayReadError(HRESULT hr) {   if(hr == MQ_ERROR_ACCESS_DENIED)         cout   _T("Don't have access rights")   endl;   else if(hr == MQ_ERROR_BUFFER_OVERFLOW )         cout   _T("Buffer Overflow")   endl;   else if(hr == MQ_ERROR_SENDERID_BUFFER_TOO_SMALL )         cout   _T("Sender ID Buffer too small")   endl;   else if(hr == MQ_ERROR_SYMM_KEY_BUFFER_TOO_SMALL )         cout   _T("Symmetric key buffer too small")                endl;   else if(hr == MQ_ERROR_SENDER_CERT_BUFFER_TOO_SMALL )         cout   _T("Cert buffer too small")   endl;   else if(hr == MQ_ERROR_SIGNATURE_BUFFER_TOO_SMALL )         cout   _T("Signature buffer too small")   endl;   else if(hr == MQ_ERROR_PROV_NAME_BUFFER_TOO_SMALL )         cout   _T("Provider name too small")   endl;   else if(hr == MQ_ERROR_LABEL_BUFFER_TOO_SMALL)         cout   _T("Label buffer too small")   endl;   else if(hr == MQ_ERROR_FORMATNAME_BUFFER_TOO_SMALL )         cout   _T("Format name buffer too small")   endl;   else if(hr == MQ_ERROR_DTC_CONNECT )         cout   _T("Cannot connect to DTC")   endl;   else if(hr == MQ_ERROR_INSUFFICIENT_PROPERTIES )         cout   _T("Insufficient properties")   endl;   else if(hr == MQ_ERROR_INVALID_HANDLE )         cout   _T("Invalid queue handle")   endl;   else if(hr == MQ_ERROR_IO_TIMEOUT )         cout   _T("Timeout")   endl;   else if(hr == MQ_ERROR_MESSAGE_ALREADY_RECEIVED )         cout   _T("Message has been removed from queue")   endl;   else if(hr == MQ_ERROR_OPERATION_CANCELLED )         cout   _T("Operation cancelled")   endl;   else if(hr == MQ_ERROR_PROPERTY )         cout   _T("Property error")   endl;   else if(hr == MQ_ERROR_QUEUE_DELETED )         cout   _T("Queue deleted")   endl;   else if(hr == MQ_ERROR_ILLEGAL_CURSOR_ACTION )         cout   _T("Illegal cursor action")   endl;   else if(hr == MQ_ERROR_SERVICE_NOT_AVAILABLE )         cout   _T("Service not available")   endl;   else if(hr == MQ_ERROR_STALE_HANDLE )         cout   _T("Stale handle")   endl;   else if(hr == MQ_ERROR_TRANSACTION_USAGE )         cout   _T("Transaction Error")   endl;   else if(hr == MQ_INFORMATION_PROPERTY )         cout   _T("Property returned information")   endl;   else         cout   _T("Unknown error")   endl; } void Listing15_3() {   HRESULT hr;   QUEUEHANDLE hq;   TCHAR wszFormatName[256];   DWORD dwFormatNameLength = 256;   hr = MQPathNameToFormatName       (_T(".\\Private$\\WinCEInQueue"),       wszFormatName,       &dwFormatNameLength);   cout  wszFormatName  endl;   hr = MQOpenQueue(wszFormatName,       MQ_RECEIVE_ACCESS,       MQ_DENY_NONE,       &hq);   if(hr == MQ_OK)     cout   _T("Opened queue")   endl;   else   {     DisplayOpenError(hr);     return;   }   DWORD dwRecAction = MQ_ACTION_RECEIVE;   MQMSGPROPS msgprops;   MSGPROPID aMsgPropId[2];   MQPROPVARIANT aMsgPropVar[2];   HRESULT aMsgStatus[2];   // Message body buffer   DWORD dwBodyBufferSize = 1024;   LPTSTR lpszBodyBuffer = new TCHAR[dwBodyBufferSize];   DWORD cPropId = 0;   aMsgPropId[cPropId] = PROPID_M_BODY_SIZE   aMsgPropVar[cPropId].vt = VT_UI4;   cPropId++;   aMsgPropId[cPropId] = PROPID_M_BODY;   aMsgPropVar[cPropId].vt = VT_VECTOR|VT_UI1;   aMsgPropVar[cPropId].caub.pElems =       (UCHAR*)lpszBodyBuffer;   aMsgPropVar[cPropId].caub.cElems = dwBodyBufferSize;   cPropId++;   msgprops.cProp = cPropId;   msgprops.aPropID = aMsgPropId;   msgprops.aPropVar = aMsgPropVar;   msgprops.aStatus = aMsgStatus;   hr = MQReceiveMessage(hq, // Queue handle       0,         // Max time (msec)       dwRecAction, // Receive action       &msgprops, // Msg property structure       NULL, NULL, NULL, NULL);   if (FAILED(hr))   {     DisplayReadError(hr);     MQCloseQueue(hq);     delete lpszBodyBuffer;     return;   }   if (aMsgPropVar[0].ulVal == 0)     cout   _T("No message body exists.")   endl;   else     cout   _T("The message body is")  lpszBodyBuffer   endl;   delete lpszBodyBuffer;   MQCloseQueue(hq); } 

Messages can be written to the queue on the Windows CE device from aWindows 2000 application regardless of whether the Windows CE device is connected or not. If not connected, the messages will be written to a temporary queue (Figure 15.3), and then sent once a connection is made.

Figure 15.3. Messages waiting to be sent to a Windows CE queue
graphics/15fig03.gif

The Visual Basic code below shows how a queue can be opened by supplying a FormatName that includes the name of the Windows CE device ("ncg_ppc") on which the queue resides. A message can be created and the Label and Body properties initialized; the message is then sent to the open queue. Finally, the queue can be closed. The message sent in this way can be read by the code in Listing 15.3.

 Dim qSendi As MSMQQueueInfo Dim qSend As MSMQQueue Dim msg As New MSMQMessage Set qSendi = New MSMQQueueInfo qSendi.FormatName = _   "DIRECT=OS:ncg_ppc\Private$\WinCEInQueue" Set qSend = qSendi.Open(MQ_SEND_ACCESS, MQ_DENY_NONE) msg.Label = "Message from Win2000" msg.Body = "Body contents of message" msg.Send qSend qSend.Close 

Reading Other Message Properties

In Listing 15.3 only two message properties were read the message label and the message body. Other message properties can be retrieved by adding to the MQMSGPROPS structure. In this section, the properties PROPID_M_SENTTIME and PROPID_M_ARRIVEDTIME will be demonstrated. These can be used to retrieve the time the message was sent and the time the message arrived in the queue. The times are returned using a 32-bit time_t data value. The following code initializes message properties to return these times:

 aMsgPropId[2] = PROPID_M_SENTTIME; aMsgPropVar[2].vt = VT_UI4; aMsgPropId[3] = PROPID_M_ARRIVEDTIME; aMsgPropVar[3].vt = VT_UI4; 

The times can be retrieved after a successful call to MQReceiveMessage using the property's ulVal member, for example:

 cout   aMsgPropVar[2].ulVal   endl; 

Both PROPID_M_SENTTIME and PROPID_M_ARRIVEDTIME return time using the time_t UNIX time data type. This is the number of seconds since midnight, January1, 1970 (coordinated universal time). Most Windows CE time functions use the 64-bit integer FILETIME data type (which is the number of 100-nanosecond intervals since January1, 1601) or SYSTEMTIME structure. You can easily work out how long the message took to arrive the difference is the number of seconds elapsed between the sending of the message and its arrival. However, working out the time and date represented by the time_t value necessitates it being converted to a more useful time format. The following function converts a time_t 32-bit value to a SYSTEMTIME structure.

 void TimeToSystemTime(time_t t, LPSYSTEMTIME pst) {   // Note that LONGLONG is a 64-bit value   LONGLONG ll;   FILETIME ft;   ll = Int32x32To64(t, 10000000) + 116444736000000000;   ft.dwLowDateTime = (DWORD)ll;   ft.dwHighDateTime = ll >> 32;   FileTimeToSystemTime(&ft, pst); } 

This function converts the time_t value from a 32-bit to a 64-bit value, changes the interval from 1 second to 100 nanoseconds, and changes the base time from January1, 1970, to January1, 1601. It then uses the FileTimeToSystemTime Windows CE function to convert from a FILETIME to a SYSTEMTIME structure. The following code shows how to display the time and date associated with the PROPID_M_SENTTIME property value:

 TimeToSystemTime(aMsgPropVar[2].ulVal, &st);   cout   _T("Sent Time: ")          st.wMonth   _T("/")   st.wDay          _T("/")   st.wYear   _T(" ")          st.wHour   _T(":")   st.wMinute          _T(":")   st.wSecond   endl; 

Peeking Messages and Cursors

An application uses the action MQ_ACTION_PEEK_CURRENT with a MQReceiveMessage call to peek at the first message in a queue. This allows the message to be read but does not remove it from the queue. Messages other than the first in the queue can only be peeked if a cursor is created. A message cursor is created with the function MQCreateCursor, and this function is passed two parameters:

  • Handle to an open queue

  • A pointer to a variable to receive a handle to the open cursor

A HRESULT is returned indicating success or failure:

 HANDLE hCursor; hr = MQCreateCursor(hq, &hCursor); if(FAILED(hr)) {   cout   _T("Could not open cursor")   endl; } 

The cursor handle can be passed to a call to MQReceiveMessage using the following actions:

  • MQ_ACTION_RECEIVE Read and remove the message at the current cursor location.

  • MQ_ACTION_PEEK_CURRENT Peek the current message but do not remove it, and keep the cursor pointing at the current message.

  • MQ_ACTION_PEEK_NEXT Peek the next message but do not remove it, and move the cursor on to the next message.

The following code reads the current message and does not move the cursor to the next message:

 dwRecAction = MQ_ACTION_PEEK_CURRENT; hr = MQReceiveMessage(hq, // Queue handle     0,             // Max time (msec)     dwRecAction,   // Receive action     &msgprops,     // Property structure     NULL,          // Not OVERLAPPED     NULL,          // No callback function     hCursor,       // Cursor     NULL           // No transaction     ); 

A cursor must be closed when finished with. This is done by calling the function MQCloseCursor, which is passed a handle to the open cursor. The code in Listing 15.4 opens the same queue as Listing 15.3, but differs in the following ways:

  • It sets properties to read the message label rather than the body

  • It opens a cursor and peeks all messages in the queue rather than reading the first message in the queue

Notice how the action used when calling MQReceiveMessage the first time is MQ_ACTION_PEEK_CURRENT this reads the first message in the cursor and does not move the current cursor to the next message. The next and subsequent calls to MQReceiveMessage use the action MQ_ACTION_PEEK_NEXT. This moves the cursor to reference the next message in the queue, then returns that message.

Listing 15.4 Peeking messages in a queue with a cursor
 void Listing15_4() {   HRESULT hr;   QUEUEHANDLE hq;   TCHAR wszFormatName[256];   DWORD dwFormatNameLength = 256;   hr = MQPathNameToFormatName       (_T(".\\Private$\\WinCEInQueue"),       wszFormatName,       &dwFormatNameLength);   cout   wszFormatName   endl;   hr = MQOpenQueue(wszFormatName,       MQ_RECEIVE_ACCESS,       MQ_DENY_NONE,       &hq);   if(hr == MQ_OK)     cout   _T("Opened queue")   endl;   else   {     DisplayOpenError(hr);     return;   }   DWORD dwRecAction;   MQMSGPROPS msgprops;   MSGPROPID aMsgPropId[4];   MQPROPVARIANT aMsgPropVar[4];   HRESULT aMsgStatus[4];   // Message body buffer   DWORD dwBodyBufferSize = 1024;   TCHAR lpszBodyBuffer[1024];   DWORD cPropId = 0;   aMsgPropId[cPropId] = PROPID_M_LABEL_LEN;   aMsgPropVar[cPropId].vt = VT_UI4;   aMsgPropVar[cPropId].ulVal = 1024;   cPropId++;   aMsgPropId[cPropId] = PROPID_M_LABEL;   aMsgPropVar[cPropId].vt = VT_LPWSTR;   aMsgPropVar[cPropId].pwszVal = lpszBodyBuffer;   cPropId++;   msgprops.cProp = cPropId;   msgprops.aPropID = aMsgPropId;   msgprops.aPropVar = aMsgPropVar;   msgprops.aStatus = aMsgStatus;   HANDLE hCursor;   hr = MQCreateCursor(hq, &hCursor);   if(FAILED(hr))   {     cout   _T("Could not open cursor")   endl;     MQCloseQueue(hq);   }   dwRecAction = MQ_ACTION_PEEK_CURRENT;   while(TRUE)   {     hr = MQReceiveMessage(hq,         0,         dwRecAction,         &msgprops,         NULL,         NULL,         hCursor,         NULL);      dwRecAction = MQ_ACTION_PEEK_NEXT;     if (FAILED(hr))     {       DisplayReadError(hr);       MQCloseCursor(hCursor);       MQCloseQueue(hq);       return;     }     cout   _T("Label: ")   lpszBodyBuffer   endl;   } } 

Callback Function and Asynchronous Message Reading

The calls to MQReceiveMessage in previous sections have all been synchronous&mdash;the call blocks until the timeout value has passed or a message is available for reading. In many situations applications need to read messages asynchronously to avoid blocking the thread while waiting. One solution is to create a thread (Chapter 5) and call MQReceiveMessage on that thread with a long timeout. Another solution is to call MQReceiveMessage and pass a pointer to a callback function, avoiding the necessity to create threads. This code shows a call to MQReceiveMessage to read a message from the queue with a timeout of 100 minutes, and the function ReceiveCallbackRoutine will be called when the message is received. The call MQReceiveMessage will return immediately with an HRESULT indicating success if the asynchronous request could be set up.

   dwRecAction = MQ_ACTION_RECEIVE;   hr = MQReceiveMessage(hq, // handle to queue     1000 * 60 * 100,     // Max time (msec)     dwRecAction,         // Receive action     pMsgprops,           // Msg property structure     NULL,                // No OVERLAPPED structure     ReceiveCallbackRoutine, // Callback function     NULL,                // No Cursor     NULL                 // No transaction     ); 

The MQReceiveMessage callback function is called when a message is available for reading. The parameters passed to this function give context to the function, such as the timeout, the queue the message was read from, the message properties, and the cursor handle.

 void APIENTRY ReceiveCallbackRoutine(HRESULT hr,     QUEUEHANDLE hSource,     DWORD dwTimeout,     DWORD dwAction,     MQMSGPROPS* pMessageProps,     LPOVERLAPPED lpOverlapped,     HANDLE hCursor) { } 

There are two programming considerations you will need to note:

  • Property data should be dynamically allocated to allow the callback function to have correct access to it.

  • The queue will need to be closed at some stage after the callback function has been called.

In previous examples, property data has been declared as auto variables in the function that opens the queue and reads the message. This technique cannot be used when using callback functions, as the auto variables will have been destroyed by the time the callback function is called. Instead, you will need to use dynamic memory allocation, as shown by the following:

 MQMSGPROPS* pMsgprops = new MQMSGPROPS; MSGPROPID* pMsgPropId = new MSGPROPID[4]; MQPROPVARIANT* pMsgPropVar = new MQPROPVARIANT[4]; HRESULT* pMsgStatus = new HRESULT[4]; LPTSTR lpszBodyBuffer = new TCHAR[1024]; 

Note that any data referenced by properties (such as lpszBodyBuffer) will also need to be dynamically allocated. The data so allocated will need to be freed by the callback function, or by the function that calls MQReceiveMessage if the call fails. Listing 15.5 shows the entire code for opening a queue, setting up a message read using a callback function, and declaring the callback function that will be called when a message is received. Note where the property data is allocated and de-allocated, and how the callback function gains access to lpszLabelBuffer through the property structure using the statement pMessageProps->aPropVar[1].pwszVal.

Listing 15.5 Callback function to read a message
 void APIENTRY ReceiveCallbackRoutine(HRESULT hr,     QUEUEHANDLE hSource, DWORD dwTimeout,     DWORD dwAction, MQMSGPROPS* pMessageProps,     LPOVERLAPPED lpOverlapped, HANDLE hCursor) {   if (FAILED(hr))   {     DisplayReadError(hr);   }   else   {     cout   _T("Async Msg Read: ")        pMessageProps->aPropVar[1].pwszVal   endl;     MQCloseQueue(hSource);   }   delete pMessageProps->aPropVar[1].pwszVal;   delete pMessageProps->aPropID;   delete pMessageProps->aPropVar;   delete pMessageProps->aStatus;   delete pMessageProps; } void Listing15_5() {   HRESULT hr;   TCHAR wszFormatName[256];   DWORD dwFormatNameLength = 256;   QUEUEHANDLE hq;   hr = MQPathNameToFormatName         (_T(".\\Private$\\WinCEInQueue"),         wszFormatName,         &dwFormatNameLength);   cout   wszFormatName   endl;   hr = MQOpenQueue(wszFormatName,       MQ_RECEIVE_ACCESS,       MQ_DENY_NONE,       &hq);   if(hr == MQ_OK)     cout   _T("Opened queue")   endl;   else   {     DisplayOpenError(hr);     return;   }   DWORD dwRecAction;   LPTSTR lpszLabelBuffer = new TCHAR[1024];   MQMSGPROPS* pMsgprops = new MQMSGPROPS;   MSGPROPID* pMsgPropId = new MSGPROPID[4];   MQPROPVARIANT* pMsgPropVar = new MQPROPVARIANT[4];   HRESULT* pMsgStatus = new HRESULT[4];   DWORD cPropId = 0;   pMsgPropId[cPropId] = PROPID_M_LABEL_LEN;   pMsgPropVar[cPropId].vt = VT_UI4;   pMsgPropVar[cPropId].ulVal = 1024;   cPropId++;   pMsgPropId[cPropId] = PROPID_M_LABEL;   pMsgPropVar[cPropId].vt = VT_LPWSTR;   pMsgPropVar[cPropId].pwszVal = lpszLabelBuffer;   cPropId++;   pMsgprops->cProp = cPropId;   pMsgprops->aPropID = pMsgPropId;   pMsgprops->aPropVar = pMsgPropVar;   pMsgprops->aStatus = pMsgStatus;   dwRecAction = MQ_ACTION_RECEIVE;   hr = MQReceiveMessage(hq, // Queue handle     1000 * 60 * 100,        // Max time (msec)     dwRecAction,            // Receive action     pMsgprops,              // Msg property structure     NULL,                   // No OVERLAPPED structure     ReceiveCallbackRoutine, // Callback function     NULL,                   // No Cursor     NULL                    // No transaction );   if(FAILED(hr))   {     delete pMsgPropId;     delete pMsgPropVar;     delete pMsgStatus;     delete pMsgprops;     delete lpszLabelBuffer;     DisplayReadError(hr);   } 

In this case a single message will be read, and the queue is then closed. It is possible that ReceiveCallbackRoutine could make another MQReceiveMessage call to set up an asynchronous read using the same callback function.


< BACK  NEXT >


Windows CE 3. 0 Application Programming
Windows CE 3.0: Application Programming (Prentice Hall Series on Microsoft Technologies)
ISBN: 0130255920
EAN: 2147483647
Year: 2002
Pages: 181

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net