< BACK  NEXT > |
MSMQ provides ten COM components that offer the same functionality as MSMQ C language APIs, such as support for queue lookup, queue management, message management, queue administration, and transactions. Table 9.2 lists the components along with their functionality.
Component | Functionality |
---|---|
MSMQApplication | Helps obtain a machine s identifier. |
MSMQCoordinatedTransactionDispenser | Helps participate in a transaction. |
MSMQEvent | Event handler for messages arriving at one or more queues. |
MSMQMessage | Allows various properties to be set on a message and to send the message. |
MSMQQuery | Allows querying the directory service for existing public queues. |
MSMQQueue | Represents an open instance of an MSMQ queue. Helps traverse the messages in the open queue. |
MSMQQueueInfo | Helps queue management for example, creating or opening a queue and changing a queue s properties. |
MSMQQueueInfos | Represents a set of queues, typically obtained via the MSMQQuery component. |
MSMQTransaction | Represents a transaction object. Provides methods for committing or terminating the transaction. |
MSMQTransactionDispenser | Helps create a new MSMQ internal transaction object. |
All MSMQ components support IDispatch -based interfaces. This enables them to be used not just from C/C++ programming languages, but also from Visual Basic, ActiveX Scripting environments (such as IE and IIS/ASP), and Java applets.
If you are using Visual C++ for developing MSMQ-based applications, you can use the import feature provided by the compiler as shown below:
#import "mqoa.dll" no_namespace
This will enable you to develop code in an easy-to-use syntax similar to that of Visual Basic.
Import mqoa.dll in your Visual C++ source code for developing MSMQ applications. |
Now let s build our knowledge of the MSMQ COM components by starting with a simple example.
The following code snippet shows how to send a text string as a message:
#define MY_QUEUE_PATHNAME "ROS84157LAP\\MyPubQueue" void SendMessage() { // Step 1: Set properties on queue-information object IMSMQQueueInfoPtr spQInfo("MSMQ.MSMQQueueInfo"); spQInfo->PathName = MY_QUEUE_PATHNAME; // Step 2: Open the queue for send operation IMSMQQueuePtr spQSend = spQInfo->Open(MQ_SEND_ACCESS, MQ_DENY_NONE); // Step 3: Set message-information properties IMSMQMessagePtr spQMsg("MSMQ.MSMQMessage"); spQMsg->Label = "Test Message"; spQMsg->Body = "This is my test message"; // Step 4: Send the message on the queue spQMsg->Send(spQSend); // Step 5: Close the queue spQSend->Close(); ::MessageBox(NULL, _T("Message Sent"), _T("Test Send Message"), MB_OK); }
The first step is to create an MSMQQueueInfo object and set various properties on it.
A queue has over a dozen properties: creation and modification times, pathname, label, instance properties, and more. The SDK contains more information on each of these properties. The above code uses just one property, PathName, to identify the pathname of the queue.
The second step is to open the queue. The first parameter specifies how an application accesses the queue. We use MQ_SEND_ACCESS to inform MSMQ that the queue needs to be opened for sending messages. The second parameter specifies who can access the queue. For sending messages, the only valid option is MQ_DENY_NONE, which indicates that the queue is available to everyone.
If the queue gets opened successfully, it returns a pointer to interface IMSMQQueue.
The third step is to create an MSMQMessage object and set various properties on it.
A message has almost 50 properties that provides a great deal of control over how a message is sent and handled en route, how it should be encrypted, the identity of the queue to respond to, the priority of the message, and many other useful features.
The body of the message is one such property (and the most important one). Its type is a variant, thus handling a wide variety of data types.
A message can also be assigned a label, which is just a string that describes the message.
The fourth step is to send the message on the queue and the fifth step is to close the queue.
This is as simple as it gets to send a message.
Astute readers may be wondering how is it possible to open the queue without specifying the format name. It turns out that for a public queue, the MSMQ components automatically do the translation from the pathname to the format name. In order to do so, however, the client machine should be able to access the directory service, that is, it has to be online.
If the format name is available before the queue is opened, then the independent client machine need not be online. To verify this, step one of our previous code can be modified to obtain the format name before opening the queue, as follows:
void SendMessage() { // Step 1: Set properties on queue-information object IMSMQQueueInfoPtr spQInfo("MSMQ.MSMQQueueInfo"); spQInfo->PathName = MY_QUEUE_PATHNAME; spQInfo->Refresh(); ::MessageBox(NULL, spQInfo->FormatName, _T("Format Name"), MB_OK); ... }
Once the message box is displayed with the format name, one can unplug the network cable. The message will still go through, that is as far as the sender program is concerned. If you check the outgoing queues (from the Computer Services snap-in) of the local machine, you will notice that the message is sitting in the outgoing queue. Once the network cable is plugged back in, the local queue manager will forward the message to the destination queue.
The receiving program code is as simple as the sending code:
void ReceiveMessage() { // Step 1: Set properties on queue-information object IMSMQQueueInfoPtr spQInfo("MSMQ.MSMQQueueInfo"); spQInfo->PathName = MY_QUEUE_PATHNAME; // Step 2: Open the queue for receive operation IMSMQQueuePtr spQRec = spQInfo->Open(MQ_RECEIVE_ACCESS, MQ_DENY_NONE); // Step 3: Attempt to receive a message // (with one second timeout) _variant_t vtReceiveTimeout = 1000L; IMSMQMessagePtr spRMsg = spQRec->Receive(&vtMissing, vtMissing, &vtMissing, &vtReceiveTimeout); // Step 4: Retrieve the message body and label if (NULL != spRMsg) { _bstr_t bsBody = spRMsg->Body; _bstr_t bsLabel = spRMsg->Label; ::MessageBox(NULL, bsBody, bsLabel, MB_OK); }else { ::MessageBox(NULL, _T("No messages found"), _T("Test Receive Message"), MB_OK); } // Step 5: Close the queue spQRec->Close(); }
Step one is similar to that of the sending program.
Step two specifies opening the queue for receiving messages by setting the first parameter to MQ_RECEIVE_ACCESS. The second parameter (the sharing mode) can be set as before to MQ_DENY_NONE. In this case, other listener applications may contend to retrieve messages from the queue. To exclusively retrieve the messages from the queue, the sharing mode can be set to MQ_DENY_RECEIVE_SHARE.
The third step is to receive the message. If the message is not already available in the queue, method Receive waits for a specific timeout interval to check if a message arrives on the queue and could be retrieved. If the timeout interval expires, the method call returns with a NULL value for the message.
The fourth parameter to the Receive call controls the timeout interval. The code in the last example shows how to set a timeout value of 1000 milliseconds. If you do not want to wait at all, you can use a timeout value of 0. A value of 1 indicates that you want to wait indefinitely. This is also the default value if a parameter is not specified. [2]
[2] Any parameter that is marked as optional in the IDL file can be specified as vtMissing under Visual C++-native COM support.
Step four displays the message and step five closes the queue.
The above example code receives messages synchronously. The Receive method call is blocked until a message arrives on the queue or a timeout occurs. While this style of coding allows you to process messages as they arrive, it also holds the calling thread hostage. An alternative technique is to use the MSMQ events mechanism. As messages arrive on the queue, MSMQ raises a notification. The application can then respond to the notification and retrieve the message. This style of asynchronous programming is discussed in the next chapter.
By default, messages are designated as express messages. An express message is stored in memory until it can be delivered. Since express messages are not written to disk, they offer extremely high-speed communications. However, if the machine shuts down unexpectedly, an express message is lost.
A message can be marked as recoverable. Recoverable messages are stored in a backup file at each intermediate queue until delivered to the destination queue. Thus, such messages are not lost if the machine or the queue manager crashes. However, you gain this failure protection at the expense of communication speed.
To set a message as recoverable, the Delivery property on the message should be specified as MQ_MSG_DELIVERY_RECOVERABLE. Obviously, this has to be done before the message is sent. The following code snippet illustrates the logic:
// Step 3: Set message-information properties IMSMQMessagePtr spQMsg("MSMQ.MSMQMessage"); spQMsg->Label = "Test Message"; spQMsg->Body = "This is my test message"; spQMsg->Delivery = MQMSG_DELIVERY_RECOVERABLE;
When a client application makes a method call on a remote object, it is often desirable that it gets back some return value from the remote object. This is not a problem when DCOM/RPC is used. A method call gets blocked until the remote method is executed, and the requested values are returned as output parameters to the method call. However, when MSMQ is used, the input data is sent as a message to a queue but no output data can be obtained immediately because of the asynchronous nature of the communication.
MSMQ s solution is to ask the server to send the response back as another message.
Which queue should the response message be sent to?
When sending a message, the client application has to supply the response queue information as one of the properties, ResponseQueueInfo, of the message.
The following code snippet is a modified version of the earlier example. Here, the client is interested in getting the product of two numbers. The numbers are packed as one string.
// Step 3: Set response queue information IMSMQQueueInfoPtr spResponseQInfo("MSMQ.MSMQQueueInfo"); spResponseQInfo->PathName = MY_RESPONSE_QUEUE_PATHNAME; // Step 4: Set message-information properties IMSMQMessagePtr spQMsg("MSMQ.MSMQMessage"); spQMsg->Label = "Need product of two numbers"; spQMsg->Body = "2 5"; spQMsg->Delivery = MQMSG_DELIVERY_RECOVERABLE; spQMsg->ResponseQueueInfo = spResponseQInfo;
The response queue for the example had been created as a private queue.
#define MY_RESPONSE_QUEUE_PATHNAME ".\\Private$\\MyResponseQueue"
A private queue is a perfect candidate for receiving response messages; there is no need for lookup in the directory ser vice, thereby saving processing cycles, networking bandwidth, and disk space. |
In the previous example, we had to pass two parameters into the message body. This was accomplished by packing the parameters as one string. The listener application had to parse the string to obtain the parameters.
Though packing parameters as one string and passing the string as a message is not that inconvenient programmatically, MSMQ offers a better solution: it lets you store a COM object inside the message body. The object can hold any number of parameters as needed.
Not all objects can be stored in the message body. MSMQ requires that an object support one of the two standard interfaces that are needed for persistence IPersistStream or IPersistStorage. MSMQ can then serialize such an object into and out of a message body.
To illustrate storing objects in the message body, we will modify our earlier program. As XML is becoming very popular as a format for exchanging data, we will store an XML object in the message body. Fortunately, the XML parser that comes standard on Windows 2000, Microsoft.XMLDOM, supports the IPersistStream interface. We can use this knowledge to our benefit and pack our parameters into XML.
For the demonstration, I will load the XML tree from a file. The file format and the implementation for loading the file is shown here:
// File MyInput.xml <Root> <First>2</First> <Second>5</Second> </Root> // Function to create XML document MSXML::IXMLDOMDocumentPtr CreateXMLDOcument() { MSXML::IXMLDOMDocumentPtr spDoc(__uuidof(MSXML::DOMDocument)); spDoc->load("MyInput.xml"); return spDoc; }
When the message body is fetched in the listener application, MSMQ creates a new instance of the XML object and initializes the object (by calling the Load method of IPersistStream). The application can read this XML tree and compute the product of the two numbers. The implementation is shown in the following code:
int ComputeProduct(MSXML::IXMLDOMDocumentPtr spDoc) { MSXML::IXMLDOMElementPtr spRoot = spDoc->firstChild; MSXML::IXMLDOMElementPtr spFirstChild = spRoot->selectSingleNode("First"); long nVal1 = spFirstChild->nodeTypedValue; MSXML::IXMLDOMElementPtr spSecondChild = spRoot->selectSingleNode("Second"); long nVal2 = spSecondChild->nodeTypedValue; return nVal1 * nVal2; }
The following code snippet shows the relevant code for the sender and the listener applications:
// Sender ... // Step 4: Set message-information properties IMSMQMessagePtr spQMsg("MSMQ.MSMQMessage"); spQMsg->Label = "Need product of two numbers"; spQMsg->Body = _variant_t(static_cast<IUnknown*>(CreateXMLDOcument())); spQMsg->Delivery = MQMSG_DELIVERY_RECOVERABLE; spQMsg->ResponseQueueInfo = spResponseQInfo; ... // Listener // Step 4: Process the message body int nProduct = ComputeProduct(spRMsg->Body); ...
The current implementation of the XML parser that ships with Windows 2000 has a bug that causes MSMQ to fail when it tries to recreate the XML document from the message body. Microsoft is investigating this problem. |
As you can see, passing persistable objects is quite easy. One thing to keep in mind though: as a new instance of the class is created on the listener s machine, the DLL server that implements the class should also be registered on this machine.
For an example of passing an ADO recordset into the message body, see Ted Pattison s article, Using Visual Basic to Integrate MSMQ into Your Distributed Applications [Pat-99]. The article also gives an excellent introduction to using VB with MSMQ.
Earlier in the section, we talked about the possibility of losing express messages. The suggested workaround is to use a guaranteed message by setting the Delivery property of the message to MQMSG_DELIVERY_RECOVERABLE. This ensures that the message will be delivered to the queue, at the expense of performance, however.
Even if a message is marked as recoverable, there is one more potential problem: the message could be delivered to the destination queue more than once. This could happen, for example, if the queue manager on the sender machine does not receive an acknowledgement from the queue manager on the receiving machine within a specified time. Assuming that the message is lost, the queue manager resends the message. Consequently, the listener application would process the same request multiple times.
If processing the same message multiple times is not desirable, there is yet another level of reliability that you can add use a transactional queue.
A transactional queue guarantees two things:
Messages are not lost or duplicated, and
Messages inside a single transaction get delivered in the order they were sent.
If this is all that you need, and if you don t have to coordinate the transaction with any other type of resource manager (such as an SQL Server database), MSMQ offers its own internal mechanism for handling a transaction.
The transactional attribute has to be set on the queue at the time of creation. It cannot be changed later. |
MSMQ is also capable of participating in an external transaction coordinated by the DTC (DTC is covered in Chapter 8). For example, you can write a transaction that receives a request message, modifies an SQL Server database, and sends a response message. As all the three operations are part of a single transaction, the DTC enforces the ACID rules to all three operations.
The internal transaction mechanism does not use the DTC. Instead, it uses a more efficient protocol tuned for transactional messaging. Consequently, internal transactions are faster than externally coordinated transactions.
Let s extend the listener code in the earlier example to receive a request message and send two response messages, all being part of one internal transaction. The following is the revised code snippet:
// Step 3: Start the transaction IMSMQTransactionDispenserPtr spTD("MSMQ.MSMQTransactionDispenser"); IMSMQTransactionPtr spTransaction = spTD->BeginTransaction(); _variant_t vtTransaction = static_cast<IDispatch*>(spTransaction); // Step 4: Attempt to receive a message (with one second timeout) _variant_t vtReceiveTimeout = 1000L; IMSMQMessagePtr spRMsg = spQRec->Receive(&vtTransaction, &vtMissing, &vtMissing, &vtReceiveTimeout); if (NULL == spRMsg) { ::MessageBox(NULL, _T("No messages found"), _T("Test Receive Message"), MB_OK); return; } // Step 5: Process the message body int nProduct = ComputeProduct(spRMsg->Body); // Step 6: Prepare two response messages IMSMQMessagePtr spResponseMsg1("MSMQ.MSMQMessage"); spResponseMsg1->Label = "Returned value 1"; spResponseMsg1->Body = (long) nProduct; spResponseMsg1->CorrelationId = spRMsg->Id; IMSMQMessagePtr spResponseMsg2("MSMQ.MSMQMessage"); spResponseMsg2->Label = "Returned value 2"; spResponseMsg2->Body = (long) nProduct; spResponseMsg2->CorrelationId = spRMsg->Id; // Step 7: Open the response queue IMSMQQueuePtr spResponseQ = spRMsg->ResponseQueueInfo->Open(MQ_SEND_ACCESS, MQ_DENY_NONE); // Step 8: Send the responses spResponseMsg1->Send(spResponseQ, &vtTransaction); spResponseMsg2->Send(spResponseQ, &vtTransaction); // Step 9: Commit the transaction spTransaction->Commit();
To start an internal transaction, you must create a new MSMQTransactionDispenser object and invoke its BeginTransaction method. A call to BeginTransaction returns an MSMQTransaction object. You can pass a reference to this MSMQTransaction object in your message s Send or Receive operations. The transaction is completed when the Commit method is called on the MSMQTransaction object.
MSMQ requires that the pointer to the IMSMQTransaction be passed as a VARIANT to an interface method that needs it as a parameter. This VARIANT must be explicitly set to the VT_DISPATCH type (not VT_UNKNOWN). In the above code snippet, if spTransaction is cast to IUnknown* instead of IDispatch*, as shown below, MSMQ will fail with a parameter-type-mismatch error. _variant_t vtTransaction = static_cast<IUnknown*>(spTransaction); |
If all that is needed is to send just one message as part of an internal transaction, MSMQ provides a shortcut. You can pass MQ_SINGLE_MESSAGE when you call Send, as shown here:
_variant_t vtTransaction = (long) MQ_SINGLE_MESSAGE; spQMsg->Send(spQSend, &vtTransaction);
Here s one thing to keep in mind while using internal transactions: in order to obtain a higher level of concurrency, MSMQ runs at a lower isolation level than those of other resource managers such as SQL server. This may result in some oddities. For example, let s say two transactions are being run by two different listener applications. If transaction A receives the message from the queue, transaction B will see the queue as empty. If transaction A aborts later, the message will be written back to the queue. The queue was really never empty. Transaction B saw the queue as empty because it read the queue in an uncommitted state. |
MSMQ provides a component for creating DTC-based transactions called the MSMQCoordinatedTransactionDispenser. Similar to the MSQMTransactionDispenser component that we saw earlier, this component exposes a single method, BeginTransaction, which returns a transaction object. In fact, in most cases, these two components can be interchanged.
The coordinated dispenser lets you enlist other resource managers, such as SQL Server, into a transaction. Consequently, updating a database and sending a response message, for example, can be handled as one transaction. If any of the operations fail, the whole transaction fails.
Though we can use the MSMQCoordinatedTransactionDispenser component for creating an external transaction, let s take the benefit of COM+ services instead and simplify our code.
Recall the stock trader example that we used in the previous chapter. Once shares of a stock are bought on a client s behalf, the trade-management component requests the account-management component (synchronously) to debit the client s account by a certain amount.
It is possible that the account-management component may reside on a different machine. If there is a network outage and the machine is not reachable, the whole transaction would fail and the shares just bought would have to be returned.
Using MSMQ would be perfect for such a situation. Instead of making a DCOM method call, the trade-management component would send a message to a queue. The account-management component would listen on this queue. If a new message becomes available, it will carry out the following actions:
Retrieve the message from the queue.
Update the database, if sufficient funds are available in the account.
Send a response message back to the sender.
If any of these three actions fail, the transaction should be aborted. The database should be restored back to its initial state and the message should go back into the queue where it came from.
To demonstrate MSMQ external transaction support, I will create a COM component that supports an interface method called Listen. The following code snippet shows the implementation of this method:
STDMETHODIMP CMyListener::Listen() { // Get the context state CComPtr<IContextState> spState; HRESULT hr = ::CoGetObjectContext(__uuidof(IContextState), (void**) &spState); if (FAILED(hr)) { return hr; } // Listen/retrive a message try { hr = ListenEx(); if (FAILED(hr)) { spState->SetMyTransactionVote(TxAbort); return hr; } }catch(_com_error& e) { spState->SetMyTransactionVote(TxAbort); return Error((LPWSTR) e.Description(), __uuidof(IMyListener), e.Error()); } // Commit the transaction spState->SetMyTransactionVote(TxCommit); return S_OK; } HRESULT CMyListener::ListenEx() { // Step 1: Set properties on queue-information object IMSMQQueueInfoPtr spQInfo(__uuidof(MSMQQueueInfo)); spQInfo->FormatName = MY_TRANS_QUEUE_PATHNAME; // Step 2: Open the queue for receive operation IMSMQQueuePtr spQRec = spQInfo->Open(MQ_RECEIVE_ACCESS, MQ_DENY_NONE); // Step 3: Attempt to receive a message within one second _variant_t vtTransaction = (long) MQ_MTS_TRANSACTION; _variant_t vtReceiveTimeout = 1000L; IMSMQMessagePtr spRMsg = spQRec->Receive(&vtTransaction, &vtMissing, &vtMissing, &vtReceiveTimeout); if (NULL == spRMsg) { return Error(OLESTR("No messages found"), __uuidof(IMyListener), E_FAIL); } // Step 4: Process the message body _bstr_t bsClient; long lAmount; ParseXMLInput(spRMsg->Body, bsClient, lAmount); // Step 6: Open and update the database ADOConnectionPtr spConn = OpenAccountsDB(); long lCurrentBalance = GetBalance(spConn, bsClient); if (lCurrentBalance < lAmount) { return Error(_T("Not enough balance"), __uuidof(IMyListener), E_FAIL); } long lNewBalance = lCurrentBalance - lAmount; UpdateBalance(spConn, bsClient, lNewBalance); // Step 7: Prepare a response message IMSMQMessagePtr spResponseMsg(__uuidof(MSMQMessage)); spResponseMsg->Label = "Updated the database"; spResponseMsg->Body = VARIANT_TRUE; spResponseMsg->CorrelationId = spRMsg->Id; // Step 8: Open the response queue IMSMQQueuePtr spResponseQ = spRMsg->ResponseQueueInfo->Open(MQ_SEND_ACCESS, MQ_DENY_NONE); // Step 8: Send the response spResponseMsg->Send(spResponseQ, &vtTransaction); return S_OK; }
A significant portion of the code came from our example in the previous chapter; the rest came from our previous receive-message example.
The code is rather straightforward. The main method, Listen, either commits or aborts the transaction based on the outcome of method ListenEx. Method ListenEx receives the message, updates the database, and sends a response message back.
The only part that is somewhat different is that the transaction-type parameter passed to Send and Receive is of type MQ_MTS_TRANSACTION. This is just to inform these methods that they are being called as part of an external transaction.
It should be noted that a message that is sent as part of a transaction is not transmitted while the transaction is active. If the transaction aborts, the message has to be recalled as part of the rollback process. However, a message being transmitted is like an arrow leaving the bow; there is no way to recall it. Therefore, COM+ will not send the message until the transaction commits successfully.
After sending a message as part of a transaction, do not wait for a response message within the transaction. You will not get the response message while the transaction is active. |
At this point, we should be reasonably comfortable with MSMQ s programming model.
Though MSMQ COM components offer a rich set of features, one has to be careful with some nasty details during implementation. Moreover, programming with MSMQ (or any other queuing architecture) requires a different frame of mind than what we are generally used to. Wouldn t it be nice if someone or something could hide the nasty details of MSMQ programming from you, but still let you enjoy the benefits of MSMQ?
The folks at Microsoft COM+ design team made this magic happen by providing a service called Queued Components (QC).
< BACK  NEXT > |