Page #102 (MSMQ Overview)

< BACK  NEXT >
[oR]

MSMQ COM Components

MSMQ provides ten COM components that offer the same functionality as MSMQ C language APIs, such as support for queue lookup, queue management, message management, queue administration, and transactions. Table 9.2 lists the components along with their functionality.

Table 9.2. MSMQ COM Components

Component

Functionality

MSMQApplication

Helps obtain a machine s identifier.

MSMQCoordinatedTransactionDispenser

Helps participate in a transaction.

MSMQEvent

Event handler for messages arriving at one or more queues.

MSMQMessage

Allows various properties to be set on a message and to send the message.

MSMQQuery

Allows querying the directory service for existing public queues.

MSMQQueue

Represents an open instance of an MSMQ queue. Helps traverse the messages in the open queue.

MSMQQueueInfo

Helps queue management for example, creating or opening a queue and changing a queue s properties.

MSMQQueueInfos

Represents a set of queues, typically obtained via the MSMQQuery component.

MSMQTransaction

Represents a transaction object. Provides methods for committing or terminating the transaction.

MSMQTransactionDispenser

Helps create a new MSMQ internal transaction object.

All MSMQ components support IDispatch -based interfaces. This enables them to be used not just from C/C++ programming languages, but also from Visual Basic, ActiveX Scripting environments (such as IE and IIS/ASP), and Java applets.

If you are using Visual C++ for developing MSMQ-based applications, you can use the import feature provided by the compiler as shown below:

 #import "mqoa.dll" no_namespace 

This will enable you to develop code in an easy-to-use syntax similar to that of Visual Basic.

graphics/01icon02.gif

Import mqoa.dll in your Visual C++ source code for developing MSMQ applications.


Now let s build our knowledge of the MSMQ COM components by starting with a simple example.

Sending and Receiving Messages

The following code snippet shows how to send a text string as a message:

 #define MY_QUEUE_PATHNAME "ROS84157LAP\\MyPubQueue"  void SendMessage()  {   // Step 1: Set properties on queue-information object    IMSMQQueueInfoPtr spQInfo("MSMQ.MSMQQueueInfo");    spQInfo->PathName = MY_QUEUE_PATHNAME;    // Step 2: Open the queue for send operation    IMSMQQueuePtr spQSend =      spQInfo->Open(MQ_SEND_ACCESS, MQ_DENY_NONE);    // Step 3: Set message-information properties    IMSMQMessagePtr spQMsg("MSMQ.MSMQMessage");    spQMsg->Label = "Test Message";    spQMsg->Body = "This is my test message";    // Step 4: Send the message on the queue    spQMsg->Send(spQSend);    // Step 5: Close the queue    spQSend->Close();    ::MessageBox(NULL, _T("Message Sent"),    _T("Test Send Message"), MB_OK);  } 

The first step is to create an MSMQQueueInfo object and set various properties on it.

A queue has over a dozen properties: creation and modification times, pathname, label, instance properties, and more. The SDK contains more information on each of these properties. The above code uses just one property, PathName, to identify the pathname of the queue.

The second step is to open the queue. The first parameter specifies how an application accesses the queue. We use MQ_SEND_ACCESS to inform MSMQ that the queue needs to be opened for sending messages. The second parameter specifies who can access the queue. For sending messages, the only valid option is MQ_DENY_NONE, which indicates that the queue is available to everyone.

If the queue gets opened successfully, it returns a pointer to interface IMSMQQueue.

The third step is to create an MSMQMessage object and set various properties on it.

A message has almost 50 properties that provides a great deal of control over how a message is sent and handled en route, how it should be encrypted, the identity of the queue to respond to, the priority of the message, and many other useful features.

The body of the message is one such property (and the most important one). Its type is a variant, thus handling a wide variety of data types.

A message can also be assigned a label, which is just a string that describes the message.

The fourth step is to send the message on the queue and the fifth step is to close the queue.

This is as simple as it gets to send a message.

Astute readers may be wondering how is it possible to open the queue without specifying the format name. It turns out that for a public queue, the MSMQ components automatically do the translation from the pathname to the format name. In order to do so, however, the client machine should be able to access the directory service, that is, it has to be online.

If the format name is available before the queue is opened, then the independent client machine need not be online. To verify this, step one of our previous code can be modified to obtain the format name before opening the queue, as follows:

 void SendMessage()  {   // Step 1: Set properties on queue-information object    IMSMQQueueInfoPtr spQInfo("MSMQ.MSMQQueueInfo");    spQInfo->PathName = MY_QUEUE_PATHNAME;    spQInfo->Refresh();    ::MessageBox(NULL, spQInfo->FormatName, _T("Format Name"),      MB_OK);    ...  } 

Once the message box is displayed with the format name, one can unplug the network cable. The message will still go through, that is as far as the sender program is concerned. If you check the outgoing queues (from the Computer Services snap-in) of the local machine, you will notice that the message is sitting in the outgoing queue. Once the network cable is plugged back in, the local queue manager will forward the message to the destination queue.

The receiving program code is as simple as the sending code:

 void ReceiveMessage()  {   // Step 1: Set properties on queue-information object    IMSMQQueueInfoPtr spQInfo("MSMQ.MSMQQueueInfo");    spQInfo->PathName = MY_QUEUE_PATHNAME;    // Step 2: Open the queue for receive operation    IMSMQQueuePtr spQRec =      spQInfo->Open(MQ_RECEIVE_ACCESS, MQ_DENY_NONE);    // Step 3: Attempt to receive a message    // (with one second timeout)    _variant_t vtReceiveTimeout = 1000L;    IMSMQMessagePtr spRMsg = spQRec->Receive(&vtMissing,      vtMissing, &vtMissing, &vtReceiveTimeout);    // Step 4: Retrieve the message body and label    if (NULL != spRMsg) {     _bstr_t bsBody = spRMsg->Body;      _bstr_t bsLabel = spRMsg->Label;      ::MessageBox(NULL, bsBody, bsLabel, MB_OK);    }else {     ::MessageBox(NULL, _T("No messages found"),          _T("Test Receive Message"), MB_OK);    }    // Step 5: Close the queue    spQRec->Close();  } 

Step one is similar to that of the sending program.

Step two specifies opening the queue for receiving messages by setting the first parameter to MQ_RECEIVE_ACCESS. The second parameter (the sharing mode) can be set as before to MQ_DENY_NONE. In this case, other listener applications may contend to retrieve messages from the queue. To exclusively retrieve the messages from the queue, the sharing mode can be set to MQ_DENY_RECEIVE_SHARE.

The third step is to receive the message. If the message is not already available in the queue, method Receive waits for a specific timeout interval to check if a message arrives on the queue and could be retrieved. If the timeout interval expires, the method call returns with a NULL value for the message.

The fourth parameter to the Receive call controls the timeout interval. The code in the last example shows how to set a timeout value of 1000 milliseconds. If you do not want to wait at all, you can use a timeout value of 0. A value of 1 indicates that you want to wait indefinitely. This is also the default value if a parameter is not specified. [2]

[2] Any parameter that is marked as optional in the IDL file can be specified as vtMissing under Visual C++-native COM support.

Step four displays the message and step five closes the queue.

The above example code receives messages synchronously. The Receive method call is blocked until a message arrives on the queue or a timeout occurs. While this style of coding allows you to process messages as they arrive, it also holds the calling thread hostage. An alternative technique is to use the MSMQ events mechanism. As messages arrive on the queue, MSMQ raises a notification. The application can then respond to the notification and retrieve the message. This style of asynchronous programming is discussed in the next chapter.

Guaranteed Delivery

By default, messages are designated as express messages. An express message is stored in memory until it can be delivered. Since express messages are not written to disk, they offer extremely high-speed communications. However, if the machine shuts down unexpectedly, an express message is lost.

A message can be marked as recoverable. Recoverable messages are stored in a backup file at each intermediate queue until delivered to the destination queue. Thus, such messages are not lost if the machine or the queue manager crashes. However, you gain this failure protection at the expense of communication speed.

To set a message as recoverable, the Delivery property on the message should be specified as MQ_MSG_DELIVERY_RECOVERABLE. Obviously, this has to be done before the message is sent. The following code snippet illustrates the logic:

 // Step 3: Set message-information properties  IMSMQMessagePtr spQMsg("MSMQ.MSMQMessage");  spQMsg->Label = "Test Message";  spQMsg->Body = "This is my test message";  spQMsg->Delivery = MQMSG_DELIVERY_RECOVERABLE; 

Responding to a Message

When a client application makes a method call on a remote object, it is often desirable that it gets back some return value from the remote object. This is not a problem when DCOM/RPC is used. A method call gets blocked until the remote method is executed, and the requested values are returned as output parameters to the method call. However, when MSMQ is used, the input data is sent as a message to a queue but no output data can be obtained immediately because of the asynchronous nature of the communication.

MSMQ s solution is to ask the server to send the response back as another message.

Which queue should the response message be sent to?

When sending a message, the client application has to supply the response queue information as one of the properties, ResponseQueueInfo, of the message.

The following code snippet is a modified version of the earlier example. Here, the client is interested in getting the product of two numbers. The numbers are packed as one string.

 // Step 3: Set response queue information  IMSMQQueueInfoPtr spResponseQInfo("MSMQ.MSMQQueueInfo");  spResponseQInfo->PathName = MY_RESPONSE_QUEUE_PATHNAME;  // Step 4: Set message-information properties  IMSMQMessagePtr spQMsg("MSMQ.MSMQMessage");  spQMsg->Label = "Need product of two numbers";  spQMsg->Body = "2 5";  spQMsg->Delivery = MQMSG_DELIVERY_RECOVERABLE;  spQMsg->ResponseQueueInfo = spResponseQInfo; 

The response queue for the example had been created as a private queue.

 #define MY_RESPONSE_QUEUE_PATHNAME ".\\Private$\\MyResponseQueue" 

graphics/01icon01.gif

A private queue is a perfect candidate for receiving response messages; there is no need for lookup in the directory ser vice, thereby saving processing cycles, networking bandwidth, and disk space.


Sending Objects in the Message Body

In the previous example, we had to pass two parameters into the message body. This was accomplished by packing the parameters as one string. The listener application had to parse the string to obtain the parameters.

Though packing parameters as one string and passing the string as a message is not that inconvenient programmatically, MSMQ offers a better solution: it lets you store a COM object inside the message body. The object can hold any number of parameters as needed.

Not all objects can be stored in the message body. MSMQ requires that an object support one of the two standard interfaces that are needed for persistence IPersistStream or IPersistStorage. MSMQ can then serialize such an object into and out of a message body.

To illustrate storing objects in the message body, we will modify our earlier program. As XML is becoming very popular as a format for exchanging data, we will store an XML object in the message body. Fortunately, the XML parser that comes standard on Windows 2000, Microsoft.XMLDOM, supports the IPersistStream interface. We can use this knowledge to our benefit and pack our parameters into XML.

For the demonstration, I will load the XML tree from a file. The file format and the implementation for loading the file is shown here:

 // File MyInput.xml  <Root>    <First>2</First>    <Second>5</Second>  </Root>  // Function to create XML document  MSXML::IXMLDOMDocumentPtr CreateXMLDOcument()  {   MSXML::IXMLDOMDocumentPtr spDoc(__uuidof(MSXML::DOMDocument));    spDoc->load("MyInput.xml");    return spDoc;  } 

When the message body is fetched in the listener application, MSMQ creates a new instance of the XML object and initializes the object (by calling the Load method of IPersistStream). The application can read this XML tree and compute the product of the two numbers. The implementation is shown in the following code:

 int ComputeProduct(MSXML::IXMLDOMDocumentPtr spDoc)  {   MSXML::IXMLDOMElementPtr spRoot = spDoc->firstChild;    MSXML::IXMLDOMElementPtr spFirstChild =      spRoot->selectSingleNode("First");    long nVal1 = spFirstChild->nodeTypedValue;    MSXML::IXMLDOMElementPtr spSecondChild =      spRoot->selectSingleNode("Second");    long nVal2 = spSecondChild->nodeTypedValue;    return nVal1 * nVal2;  } 

The following code snippet shows the relevant code for the sender and the listener applications:

 // Sender    ...  // Step 4: Set message-information properties  IMSMQMessagePtr spQMsg("MSMQ.MSMQMessage");  spQMsg->Label = "Need product of two numbers";  spQMsg->Body = _variant_t(static_cast<IUnknown*>(CreateXMLDOcument()));  spQMsg->Delivery = MQMSG_DELIVERY_RECOVERABLE;  spQMsg->ResponseQueueInfo = spResponseQInfo;  ...  // Listener  // Step 4: Process the message body  int nProduct = ComputeProduct(spRMsg->Body);  ... 

graphics/01icon01.gif

The current implementation of the XML parser that ships with Windows 2000 has a bug that causes MSMQ to fail when it tries to recreate the XML document from the message body. Microsoft is investigating this problem.


As you can see, passing persistable objects is quite easy. One thing to keep in mind though: as a new instance of the class is created on the listener s machine, the DLL server that implements the class should also be registered on this machine.

For an example of passing an ADO recordset into the message body, see Ted Pattison s article, Using Visual Basic to Integrate MSMQ into Your Distributed Applications [Pat-99]. The article also gives an excellent introduction to using VB with MSMQ.

Transactions

Earlier in the section, we talked about the possibility of losing express messages. The suggested workaround is to use a guaranteed message by setting the Delivery property of the message to MQMSG_DELIVERY_RECOVERABLE. This ensures that the message will be delivered to the queue, at the expense of performance, however.

Even if a message is marked as recoverable, there is one more potential problem: the message could be delivered to the destination queue more than once. This could happen, for example, if the queue manager on the sender machine does not receive an acknowledgement from the queue manager on the receiving machine within a specified time. Assuming that the message is lost, the queue manager resends the message. Consequently, the listener application would process the same request multiple times.

If processing the same message multiple times is not desirable, there is yet another level of reliability that you can add use a transactional queue.

A transactional queue guarantees two things:

  1. Messages are not lost or duplicated, and

  2. Messages inside a single transaction get delivered in the order they were sent.

If this is all that you need, and if you don t have to coordinate the transaction with any other type of resource manager (such as an SQL Server database), MSMQ offers its own internal mechanism for handling a transaction.

graphics/01icon01.gif

The transactional attribute has to be set on the queue at the time of creation. It cannot be changed later.


MSMQ is also capable of participating in an external transaction coordinated by the DTC (DTC is covered in Chapter 8). For example, you can write a transaction that receives a request message, modifies an SQL Server database, and sends a response message. As all the three operations are part of a single transaction, the DTC enforces the ACID rules to all three operations.

The internal transaction mechanism does not use the DTC. Instead, it uses a more efficient protocol tuned for transactional messaging. Consequently, internal transactions are faster than externally coordinated transactions.

Internal Transaction

Let s extend the listener code in the earlier example to receive a request message and send two response messages, all being part of one internal transaction. The following is the revised code snippet:

 // Step 3: Start the transaction  IMSMQTransactionDispenserPtr spTD("MSMQ.MSMQTransactionDispenser");  IMSMQTransactionPtr spTransaction = spTD->BeginTransaction();  _variant_t vtTransaction =    static_cast<IDispatch*>(spTransaction);  // Step 4: Attempt to receive a message (with one second timeout)  _variant_t vtReceiveTimeout = 1000L;  IMSMQMessagePtr spRMsg = spQRec->Receive(&vtTransaction,    &vtMissing, &vtMissing, &vtReceiveTimeout);  if (NULL == spRMsg) {   ::MessageBox(NULL, _T("No messages found"),      _T("Test Receive Message"), MB_OK);    return;  }  // Step 5: Process the message body  int nProduct = ComputeProduct(spRMsg->Body);  // Step 6: Prepare two response messages  IMSMQMessagePtr spResponseMsg1("MSMQ.MSMQMessage");  spResponseMsg1->Label = "Returned value 1";  spResponseMsg1->Body = (long) nProduct;  spResponseMsg1->CorrelationId = spRMsg->Id;  IMSMQMessagePtr spResponseMsg2("MSMQ.MSMQMessage");  spResponseMsg2->Label = "Returned value 2";  spResponseMsg2->Body = (long) nProduct;  spResponseMsg2->CorrelationId = spRMsg->Id;  // Step 7: Open the response queue  IMSMQQueuePtr spResponseQ =    spRMsg->ResponseQueueInfo->Open(MQ_SEND_ACCESS, MQ_DENY_NONE);  // Step 8: Send the responses  spResponseMsg1->Send(spResponseQ, &vtTransaction);  spResponseMsg2->Send(spResponseQ, &vtTransaction);  // Step 9: Commit the transaction  spTransaction->Commit(); 

To start an internal transaction, you must create a new MSMQTransactionDispenser object and invoke its BeginTransaction method. A call to BeginTransaction returns an MSMQTransaction object. You can pass a reference to this MSMQTransaction object in your message s Send or Receive operations. The transaction is completed when the Commit method is called on the MSMQTransaction object.

graphics/01icon01.gif

MSMQ requires that the pointer to the IMSMQTransaction be passed as a VARIANT to an interface method that needs it as a parameter. This VARIANT must be explicitly set to the VT_DISPATCH type (not VT_UNKNOWN). In the above code snippet, if spTransaction is cast to IUnknown* instead of IDispatch*, as shown below, MSMQ will fail with a parameter-type-mismatch error.

 _variant_t vtTransaction =    static_cast<IUnknown*>(spTransaction); 


If all that is needed is to send just one message as part of an internal transaction, MSMQ provides a shortcut. You can pass MQ_SINGLE_MESSAGE when you call Send, as shown here:

 _variant_t vtTransaction = (long) MQ_SINGLE_MESSAGE;  spQMsg->Send(spQSend, &vtTransaction); 

graphics/01icon01.gif

Here s one thing to keep in mind while using internal transactions: in order to obtain a higher level of concurrency, MSMQ runs at a lower isolation level than those of other resource managers such as SQL server. This may result in some oddities. For example, let s say two transactions are being run by two different listener applications. If transaction A receives the message from the queue, transaction B will see the queue as empty. If transaction A aborts later, the message will be written back to the queue. The queue was really never empty. Transaction B saw the queue as empty because it read the queue in an uncommitted state.


External Transactions

MSMQ provides a component for creating DTC-based transactions called the MSMQCoordinatedTransactionDispenser. Similar to the MSQMTransactionDispenser component that we saw earlier, this component exposes a single method, BeginTransaction, which returns a transaction object. In fact, in most cases, these two components can be interchanged.

The coordinated dispenser lets you enlist other resource managers, such as SQL Server, into a transaction. Consequently, updating a database and sending a response message, for example, can be handled as one transaction. If any of the operations fail, the whole transaction fails.

Though we can use the MSMQCoordinatedTransactionDispenser component for creating an external transaction, let s take the benefit of COM+ services instead and simplify our code.

Recall the stock trader example that we used in the previous chapter. Once shares of a stock are bought on a client s behalf, the trade-management component requests the account-management component (synchronously) to debit the client s account by a certain amount.

It is possible that the account-management component may reside on a different machine. If there is a network outage and the machine is not reachable, the whole transaction would fail and the shares just bought would have to be returned.

Using MSMQ would be perfect for such a situation. Instead of making a DCOM method call, the trade-management component would send a message to a queue. The account-management component would listen on this queue. If a new message becomes available, it will carry out the following actions:

  1. Retrieve the message from the queue.

  2. Update the database, if sufficient funds are available in the account.

  3. Send a response message back to the sender.

If any of these three actions fail, the transaction should be aborted. The database should be restored back to its initial state and the message should go back into the queue where it came from.

To demonstrate MSMQ external transaction support, I will create a COM component that supports an interface method called Listen. The following code snippet shows the implementation of this method:

 STDMETHODIMP CMyListener::Listen()  {   // Get the context state    CComPtr<IContextState> spState;    HRESULT hr = ::CoGetObjectContext(__uuidof(IContextState),      (void**) &spState);    if (FAILED(hr)) {     return hr;    }    // Listen/retrive a message    try {     hr = ListenEx();      if (FAILED(hr)) {       spState->SetMyTransactionVote(TxAbort);        return hr;      }    }catch(_com_error& e) {     spState->SetMyTransactionVote(TxAbort);      return Error((LPWSTR) e.Description(),        __uuidof(IMyListener), e.Error());    }    // Commit the transaction    spState->SetMyTransactionVote(TxCommit);    return S_OK;  }  HRESULT CMyListener::ListenEx()  {   // Step 1: Set properties on queue-information object    IMSMQQueueInfoPtr spQInfo(__uuidof(MSMQQueueInfo));    spQInfo->FormatName = MY_TRANS_QUEUE_PATHNAME;    // Step 2: Open the queue for receive operation    IMSMQQueuePtr spQRec = spQInfo->Open(MQ_RECEIVE_ACCESS,      MQ_DENY_NONE);    // Step 3: Attempt to receive a message within one second    _variant_t vtTransaction = (long) MQ_MTS_TRANSACTION;    _variant_t vtReceiveTimeout = 1000L;    IMSMQMessagePtr spRMsg =      spQRec->Receive(&vtTransaction, &vtMissing,        &vtMissing, &vtReceiveTimeout);    if (NULL == spRMsg) {     return Error(OLESTR("No messages found"),        __uuidof(IMyListener), E_FAIL);    }    // Step 4: Process the message body    _bstr_t bsClient;    long lAmount;    ParseXMLInput(spRMsg->Body, bsClient, lAmount);    // Step 6: Open and update the database    ADOConnectionPtr spConn = OpenAccountsDB();    long lCurrentBalance = GetBalance(spConn, bsClient);    if (lCurrentBalance < lAmount) {     return Error(_T("Not enough balance"),      __uuidof(IMyListener), E_FAIL);    }    long lNewBalance = lCurrentBalance - lAmount;    UpdateBalance(spConn, bsClient, lNewBalance);    // Step 7: Prepare a response message    IMSMQMessagePtr spResponseMsg(__uuidof(MSMQMessage));    spResponseMsg->Label = "Updated the database";    spResponseMsg->Body = VARIANT_TRUE;    spResponseMsg->CorrelationId = spRMsg->Id;    // Step 8: Open the response queue    IMSMQQueuePtr spResponseQ =      spRMsg->ResponseQueueInfo->Open(MQ_SEND_ACCESS,        MQ_DENY_NONE);    // Step 8: Send the response    spResponseMsg->Send(spResponseQ, &vtTransaction);    return S_OK;  } 

A significant portion of the code came from our example in the previous chapter; the rest came from our previous receive-message example.

The code is rather straightforward. The main method, Listen, either commits or aborts the transaction based on the outcome of method ListenEx. Method ListenEx receives the message, updates the database, and sends a response message back.

The only part that is somewhat different is that the transaction-type parameter passed to Send and Receive is of type MQ_MTS_TRANSACTION. This is just to inform these methods that they are being called as part of an external transaction.

It should be noted that a message that is sent as part of a transaction is not transmitted while the transaction is active. If the transaction aborts, the message has to be recalled as part of the rollback process. However, a message being transmitted is like an arrow leaving the bow; there is no way to recall it. Therefore, COM+ will not send the message until the transaction commits successfully.

graphics/01icon02.gif

After sending a message as part of a transaction, do not wait for a response message within the transaction. You will not get the response message while the transaction is active.


At this point, we should be reasonably comfortable with MSMQ s programming model.

Though MSMQ COM components offer a rich set of features, one has to be careful with some nasty details during implementation. Moreover, programming with MSMQ (or any other queuing architecture) requires a different frame of mind than what we are generally used to. Wouldn t it be nice if someone or something could hide the nasty details of MSMQ programming from you, but still let you enjoy the benefits of MSMQ?

The folks at Microsoft COM+ design team made this magic happen by providing a service called Queued Components (QC).


< BACK  NEXT >


COM+ Programming. A Practical Guide Using Visual C++ and ATL
COM+ Programming. A Practical Guide Using Visual C++ and ATL
ISBN: 130886742
EAN: N/A
Year: 2000
Pages: 129

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net