Messages can be read from a queue on the same Windows CE device or on another computer. You need to have a valid network connection to the other computer to read from a remote queue. To read one or more messages from a queue, you must do the following:
The minimum properties needed to pass to MQReceiveMessage are the following:
The PROPID_M_BODY_SIZE property is initialized as shown in the following code fragment. After a successful call to MQReceiveMessage, the aMsgPropVar[0].ulVal member will contain the number of bytes in the message body. aMsgPropId[0] = PROPID_M_BODY_SIZE; aMsgPropVar[0].vt = VT_UI4; You will need to allocate a buffer in which the message body will be received, and initialize the PROPID_M_BODY property with this pointer. In the following code, a 1-KB buffer is allocated, and the pointer is assigned to the pElems member. The size of the buffer is assigned to the cElems member.
DWORD dwBodyBufferSize = 1024; LPTSTR lpszBodyBuffer = new TCHAR[dwBodyBufferSize]; aMsgPropId[1] = PROPID_M_BODY; aMsgPropVar[1].vt = VT_VECTOR|VT_UI1; aMsgPropVar[1].caub.pElems = (UCHAR*)lpszBodyBuffer; aMsgPropVar[1].caub.cElems = dwBodyBufferSize; The code in Listing 15.3 shows opening the queue on the Windows CE device created in Listing 15.2, and reading a message from the queue. The timeout of 0 means that MQReceiveMessage will return immediately with a message if one is present, or return a MQ_ERROR_IO_TIMEOUT error if none ispresent. Since MQReceiveMessage is called on the primary thread, it is important that the call to MQReceiveMessage does not block for any length of time. The code displays the number of bytes in the message body and then displays the contents of the body. Since the receive action is MQ_ACTION_RECEIVE, the message will be removed from the queue once it has been read. Listing 15.3 Reading a message from a queuevoid DisplayReadError(HRESULT hr) { if(hr == MQ_ERROR_ACCESS_DENIED) cout _T("Don't have access rights") endl; else if(hr == MQ_ERROR_BUFFER_OVERFLOW ) cout _T("Buffer Overflow") endl; else if(hr == MQ_ERROR_SENDERID_BUFFER_TOO_SMALL ) cout _T("Sender ID Buffer too small") endl; else if(hr == MQ_ERROR_SYMM_KEY_BUFFER_TOO_SMALL ) cout _T("Symmetric key buffer too small") endl; else if(hr == MQ_ERROR_SENDER_CERT_BUFFER_TOO_SMALL ) cout _T("Cert buffer too small") endl; else if(hr == MQ_ERROR_SIGNATURE_BUFFER_TOO_SMALL ) cout _T("Signature buffer too small") endl; else if(hr == MQ_ERROR_PROV_NAME_BUFFER_TOO_SMALL ) cout _T("Provider name too small") endl; else if(hr == MQ_ERROR_LABEL_BUFFER_TOO_SMALL) cout _T("Label buffer too small") endl; else if(hr == MQ_ERROR_FORMATNAME_BUFFER_TOO_SMALL ) cout _T("Format name buffer too small") endl; else if(hr == MQ_ERROR_DTC_CONNECT ) cout _T("Cannot connect to DTC") endl; else if(hr == MQ_ERROR_INSUFFICIENT_PROPERTIES ) cout _T("Insufficient properties") endl; else if(hr == MQ_ERROR_INVALID_HANDLE ) cout _T("Invalid queue handle") endl; else if(hr == MQ_ERROR_IO_TIMEOUT ) cout _T("Timeout") endl; else if(hr == MQ_ERROR_MESSAGE_ALREADY_RECEIVED ) cout _T("Message has been removed from queue") endl; else if(hr == MQ_ERROR_OPERATION_CANCELLED ) cout _T("Operation cancelled") endl; else if(hr == MQ_ERROR_PROPERTY ) cout _T("Property error") endl; else if(hr == MQ_ERROR_QUEUE_DELETED ) cout _T("Queue deleted") endl; else if(hr == MQ_ERROR_ILLEGAL_CURSOR_ACTION ) cout _T("Illegal cursor action") endl; else if(hr == MQ_ERROR_SERVICE_NOT_AVAILABLE ) cout _T("Service not available") endl; else if(hr == MQ_ERROR_STALE_HANDLE ) cout _T("Stale handle") endl; else if(hr == MQ_ERROR_TRANSACTION_USAGE ) cout _T("Transaction Error") endl; else if(hr == MQ_INFORMATION_PROPERTY ) cout _T("Property returned information") endl; else cout _T("Unknown error") endl; } void Listing15_3() { HRESULT hr; QUEUEHANDLE hq; TCHAR wszFormatName[256]; DWORD dwFormatNameLength = 256; hr = MQPathNameToFormatName (_T(".\\Private$\\WinCEInQueue"), wszFormatName, &dwFormatNameLength); cout wszFormatName endl; hr = MQOpenQueue(wszFormatName, MQ_RECEIVE_ACCESS, MQ_DENY_NONE, &hq); if(hr == MQ_OK) cout _T("Opened queue") endl; else { DisplayOpenError(hr); return; } DWORD dwRecAction = MQ_ACTION_RECEIVE; MQMSGPROPS msgprops; MSGPROPID aMsgPropId[2]; MQPROPVARIANT aMsgPropVar[2]; HRESULT aMsgStatus[2]; // Message body buffer DWORD dwBodyBufferSize = 1024; LPTSTR lpszBodyBuffer = new TCHAR[dwBodyBufferSize]; DWORD cPropId = 0; aMsgPropId[cPropId] = PROPID_M_BODY_SIZE aMsgPropVar[cPropId].vt = VT_UI4; cPropId++; aMsgPropId[cPropId] = PROPID_M_BODY; aMsgPropVar[cPropId].vt = VT_VECTOR|VT_UI1; aMsgPropVar[cPropId].caub.pElems = (UCHAR*)lpszBodyBuffer; aMsgPropVar[cPropId].caub.cElems = dwBodyBufferSize; cPropId++; msgprops.cProp = cPropId; msgprops.aPropID = aMsgPropId; msgprops.aPropVar = aMsgPropVar; msgprops.aStatus = aMsgStatus; hr = MQReceiveMessage(hq, // Queue handle 0, // Max time (msec) dwRecAction, // Receive action &msgprops, // Msg property structure NULL, NULL, NULL, NULL); if (FAILED(hr)) { DisplayReadError(hr); MQCloseQueue(hq); delete lpszBodyBuffer; return; } if (aMsgPropVar[0].ulVal == 0) cout _T("No message body exists.") endl; else cout _T("The message body is") lpszBodyBuffer endl; delete lpszBodyBuffer; MQCloseQueue(hq); } Messages can be written to the queue on the Windows CE device from aWindows 2000 application regardless of whether the Windows CE device is connected or not. If not connected, the messages will be written to a temporary queue (Figure 15.3), and then sent once a connection is made. Figure 15.3. Messages waiting to be sent to a Windows CE queueThe Visual Basic code below shows how a queue can be opened by supplying a FormatName that includes the name of the Windows CE device ("ncg_ppc") on which the queue resides. A message can be created and the Label and Body properties initialized; the message is then sent to the open queue. Finally, the queue can be closed. The message sent in this way can be read by the code in Listing 15.3. Dim qSendi As MSMQQueueInfo Dim qSend As MSMQQueue Dim msg As New MSMQMessage Set qSendi = New MSMQQueueInfo qSendi.FormatName = _ "DIRECT=OS:ncg_ppc\Private$\WinCEInQueue" Set qSend = qSendi.Open(MQ_SEND_ACCESS, MQ_DENY_NONE) msg.Label = "Message from Win2000" msg.Body = "Body contents of message" msg.Send qSend qSend.Close Reading Other Message PropertiesIn Listing 15.3 only two message properties were read the message label and the message body. Other message properties can be retrieved by adding to the MQMSGPROPS structure. In this section, the properties PROPID_M_SENTTIME and PROPID_M_ARRIVEDTIME will be demonstrated. These can be used to retrieve the time the message was sent and the time the message arrived in the queue. The times are returned using a 32-bit time_t data value. The following code initializes message properties to return these times: aMsgPropId[2] = PROPID_M_SENTTIME; aMsgPropVar[2].vt = VT_UI4; aMsgPropId[3] = PROPID_M_ARRIVEDTIME; aMsgPropVar[3].vt = VT_UI4; The times can be retrieved after a successful call to MQReceiveMessage using the property's ulVal member, for example: cout aMsgPropVar[2].ulVal endl; Both PROPID_M_SENTTIME and PROPID_M_ARRIVEDTIME return time using the time_t UNIX time data type. This is the number of seconds since midnight, January1, 1970 (coordinated universal time). Most Windows CE time functions use the 64-bit integer FILETIME data type (which is the number of 100-nanosecond intervals since January1, 1601) or SYSTEMTIME structure. You can easily work out how long the message took to arrive the difference is the number of seconds elapsed between the sending of the message and its arrival. However, working out the time and date represented by the time_t value necessitates it being converted to a more useful time format. The following function converts a time_t 32-bit value to a SYSTEMTIME structure. void TimeToSystemTime(time_t t, LPSYSTEMTIME pst) { // Note that LONGLONG is a 64-bit value LONGLONG ll; FILETIME ft; ll = Int32x32To64(t, 10000000) + 116444736000000000; ft.dwLowDateTime = (DWORD)ll; ft.dwHighDateTime = ll >> 32; FileTimeToSystemTime(&ft, pst); } This function converts the time_t value from a 32-bit to a 64-bit value, changes the interval from 1 second to 100 nanoseconds, and changes the base time from January1, 1970, to January1, 1601. It then uses the FileTimeToSystemTime Windows CE function to convert from a FILETIME to a SYSTEMTIME structure. The following code shows how to display the time and date associated with the PROPID_M_SENTTIME property value: TimeToSystemTime(aMsgPropVar[2].ulVal, &st); cout _T("Sent Time: ") st.wMonth _T("/") st.wDay _T("/") st.wYear _T(" ") st.wHour _T(":") st.wMinute _T(":") st.wSecond endl; Peeking Messages and CursorsAn application uses the action MQ_ACTION_PEEK_CURRENT with a MQReceiveMessage call to peek at the first message in a queue. This allows the message to be read but does not remove it from the queue. Messages other than the first in the queue can only be peeked if a cursor is created. A message cursor is created with the function MQCreateCursor, and this function is passed two parameters:
A HRESULT is returned indicating success or failure: HANDLE hCursor; hr = MQCreateCursor(hq, &hCursor); if(FAILED(hr)) { cout _T("Could not open cursor") endl; } The cursor handle can be passed to a call to MQReceiveMessage using the following actions:
The following code reads the current message and does not move the cursor to the next message: dwRecAction = MQ_ACTION_PEEK_CURRENT; hr = MQReceiveMessage(hq, // Queue handle 0, // Max time (msec) dwRecAction, // Receive action &msgprops, // Property structure NULL, // Not OVERLAPPED NULL, // No callback function hCursor, // Cursor NULL // No transaction ); A cursor must be closed when finished with. This is done by calling the function MQCloseCursor, which is passed a handle to the open cursor. The code in Listing 15.4 opens the same queue as Listing 15.3, but differs in the following ways:
Notice how the action used when calling MQReceiveMessage the first time is MQ_ACTION_PEEK_CURRENT this reads the first message in the cursor and does not move the current cursor to the next message. The next and subsequent calls to MQReceiveMessage use the action MQ_ACTION_PEEK_NEXT. This moves the cursor to reference the next message in the queue, then returns that message. Listing 15.4 Peeking messages in a queue with a cursorvoid Listing15_4() { HRESULT hr; QUEUEHANDLE hq; TCHAR wszFormatName[256]; DWORD dwFormatNameLength = 256; hr = MQPathNameToFormatName (_T(".\\Private$\\WinCEInQueue"), wszFormatName, &dwFormatNameLength); cout wszFormatName endl; hr = MQOpenQueue(wszFormatName, MQ_RECEIVE_ACCESS, MQ_DENY_NONE, &hq); if(hr == MQ_OK) cout _T("Opened queue") endl; else { DisplayOpenError(hr); return; } DWORD dwRecAction; MQMSGPROPS msgprops; MSGPROPID aMsgPropId[4]; MQPROPVARIANT aMsgPropVar[4]; HRESULT aMsgStatus[4]; // Message body buffer DWORD dwBodyBufferSize = 1024; TCHAR lpszBodyBuffer[1024]; DWORD cPropId = 0; aMsgPropId[cPropId] = PROPID_M_LABEL_LEN; aMsgPropVar[cPropId].vt = VT_UI4; aMsgPropVar[cPropId].ulVal = 1024; cPropId++; aMsgPropId[cPropId] = PROPID_M_LABEL; aMsgPropVar[cPropId].vt = VT_LPWSTR; aMsgPropVar[cPropId].pwszVal = lpszBodyBuffer; cPropId++; msgprops.cProp = cPropId; msgprops.aPropID = aMsgPropId; msgprops.aPropVar = aMsgPropVar; msgprops.aStatus = aMsgStatus; HANDLE hCursor; hr = MQCreateCursor(hq, &hCursor); if(FAILED(hr)) { cout _T("Could not open cursor") endl; MQCloseQueue(hq); } dwRecAction = MQ_ACTION_PEEK_CURRENT; while(TRUE) { hr = MQReceiveMessage(hq, 0, dwRecAction, &msgprops, NULL, NULL, hCursor, NULL); dwRecAction = MQ_ACTION_PEEK_NEXT; if (FAILED(hr)) { DisplayReadError(hr); MQCloseCursor(hCursor); MQCloseQueue(hq); return; } cout _T("Label: ") lpszBodyBuffer endl; } } Callback Function and Asynchronous Message ReadingThe calls to MQReceiveMessage in previous sections have all been synchronous—the call blocks until the timeout value has passed or a message is available for reading. In many situations applications need to read messages asynchronously to avoid blocking the thread while waiting. One solution is to create a thread (Chapter 5) and call MQReceiveMessage on that thread with a long timeout. Another solution is to call MQReceiveMessage and pass a pointer to a callback function, avoiding the necessity to create threads. This code shows a call to MQReceiveMessage to read a message from the queue with a timeout of 100 minutes, and the function ReceiveCallbackRoutine will be called when the message is received. The call MQReceiveMessage will return immediately with an HRESULT indicating success if the asynchronous request could be set up. dwRecAction = MQ_ACTION_RECEIVE; hr = MQReceiveMessage(hq, // handle to queue 1000 * 60 * 100, // Max time (msec) dwRecAction, // Receive action pMsgprops, // Msg property structure NULL, // No OVERLAPPED structure ReceiveCallbackRoutine, // Callback function NULL, // No Cursor NULL // No transaction ); The MQReceiveMessage callback function is called when a message is available for reading. The parameters passed to this function give context to the function, such as the timeout, the queue the message was read from, the message properties, and the cursor handle. void APIENTRY ReceiveCallbackRoutine(HRESULT hr, QUEUEHANDLE hSource, DWORD dwTimeout, DWORD dwAction, MQMSGPROPS* pMessageProps, LPOVERLAPPED lpOverlapped, HANDLE hCursor) { } There are two programming considerations you will need to note:
In previous examples, property data has been declared as auto variables in the function that opens the queue and reads the message. This technique cannot be used when using callback functions, as the auto variables will have been destroyed by the time the callback function is called. Instead, you will need to use dynamic memory allocation, as shown by the following: MQMSGPROPS* pMsgprops = new MQMSGPROPS; MSGPROPID* pMsgPropId = new MSGPROPID[4]; MQPROPVARIANT* pMsgPropVar = new MQPROPVARIANT[4]; HRESULT* pMsgStatus = new HRESULT[4]; LPTSTR lpszBodyBuffer = new TCHAR[1024]; Note that any data referenced by properties (such as lpszBodyBuffer) will also need to be dynamically allocated. The data so allocated will need to be freed by the callback function, or by the function that calls MQReceiveMessage if the call fails. Listing 15.5 shows the entire code for opening a queue, setting up a message read using a callback function, and declaring the callback function that will be called when a message is received. Note where the property data is allocated and de-allocated, and how the callback function gains access to lpszLabelBuffer through the property structure using the statement pMessageProps->aPropVar[1].pwszVal. Listing 15.5 Callback function to read a messagevoid APIENTRY ReceiveCallbackRoutine(HRESULT hr, QUEUEHANDLE hSource, DWORD dwTimeout, DWORD dwAction, MQMSGPROPS* pMessageProps, LPOVERLAPPED lpOverlapped, HANDLE hCursor) { if (FAILED(hr)) { DisplayReadError(hr); } else { cout _T("Async Msg Read: ") pMessageProps->aPropVar[1].pwszVal endl; MQCloseQueue(hSource); } delete pMessageProps->aPropVar[1].pwszVal; delete pMessageProps->aPropID; delete pMessageProps->aPropVar; delete pMessageProps->aStatus; delete pMessageProps; } void Listing15_5() { HRESULT hr; TCHAR wszFormatName[256]; DWORD dwFormatNameLength = 256; QUEUEHANDLE hq; hr = MQPathNameToFormatName (_T(".\\Private$\\WinCEInQueue"), wszFormatName, &dwFormatNameLength); cout wszFormatName endl; hr = MQOpenQueue(wszFormatName, MQ_RECEIVE_ACCESS, MQ_DENY_NONE, &hq); if(hr == MQ_OK) cout _T("Opened queue") endl; else { DisplayOpenError(hr); return; } DWORD dwRecAction; LPTSTR lpszLabelBuffer = new TCHAR[1024]; MQMSGPROPS* pMsgprops = new MQMSGPROPS; MSGPROPID* pMsgPropId = new MSGPROPID[4]; MQPROPVARIANT* pMsgPropVar = new MQPROPVARIANT[4]; HRESULT* pMsgStatus = new HRESULT[4]; DWORD cPropId = 0; pMsgPropId[cPropId] = PROPID_M_LABEL_LEN; pMsgPropVar[cPropId].vt = VT_UI4; pMsgPropVar[cPropId].ulVal = 1024; cPropId++; pMsgPropId[cPropId] = PROPID_M_LABEL; pMsgPropVar[cPropId].vt = VT_LPWSTR; pMsgPropVar[cPropId].pwszVal = lpszLabelBuffer; cPropId++; pMsgprops->cProp = cPropId; pMsgprops->aPropID = pMsgPropId; pMsgprops->aPropVar = pMsgPropVar; pMsgprops->aStatus = pMsgStatus; dwRecAction = MQ_ACTION_RECEIVE; hr = MQReceiveMessage(hq, // Queue handle 1000 * 60 * 100, // Max time (msec) dwRecAction, // Receive action pMsgprops, // Msg property structure NULL, // No OVERLAPPED structure ReceiveCallbackRoutine, // Callback function NULL, // No Cursor NULL // No transaction ); if(FAILED(hr)) { delete pMsgPropId; delete pMsgPropVar; delete pMsgStatus; delete pMsgprops; delete lpszLabelBuffer; DisplayReadError(hr); } In this case a single message will be read, and the queue is then closed. It is possible that ReceiveCallbackRoutine could make another MQReceiveMessage call to set up an asynchronous read using the same callback function.
|