B.2. The Publish-Subscribe FrameworkThe source code available with this book contains a complete publish-subscribe example. I wanted not just to provide sample publish-subscribe services and clients, but also to provide a general-purpose framework that automates implementing such services and adding the support for any application. The first step in building the framework was to factor the publish-subscribe management interfaces, and provide separate contracts for transient and persistent subscriptions and for publishing.[*]
B.2.1. Managing Transient SubscriptionsFor managing transient subscriptions, I defined the ISubscriptionService interface shown in Example B-1. Example B-1. The ISubscriptionService interface manages transient subscribers
Note that ISubscriptionService does not identify the callback contract its implementing endpoint expects. Being a general-purpose interface, it is unaware of particular callback contracts. It is up to the using application to define those callback contracts. The callback interface is provided in the using application by deriving from ISubscriptionService and specifying the desired callback contract: interface IMyEvents { [OperationContract(IsOneWay = true)] void OnEvent1( ); [OperationContract(IsOneWay = true)] void OnEvent2(int number); [OperationContract(IsOneWay = true)] void OnEvent3(int number,string text); } [ServiceContract(CallbackContract = typeof(IMyEvents))] interface IMySubscriptionService : ISubscriptionService {} Typically, every operation on the callback contract corresponds to a specific event. The subinterface of ISubscriptionService (IMySubscriptionService in this example) does not need to add operations. The transient subscription management functionality is provided by ISubscriptionService. In each call to Subscribe( ) or Unsubscribe( ), the subscriber needs to provide the name of the operation (and therefore the event) it wants to subscribe or unsubscribe to. If the caller wishes to subscribe to all events, it can pass an empty or null string. My framework offers an implementation for the methods of ISubscriptionService in the form of the generic abstract class SubscriptionManager<T>: public abstract class SubscriptionManager<T> where T : class { public void Subscribe(string eventOperation); public void Unsubscribe(string eventOperation); //More members } The generic type parameter for SubscriptionManager<T> is the events contract. Note that SubscriptionManager<T> does not implement ISubscriptionService. The using application needs to expose its own transient subscription service in the form of an endpoint that supports its specific subinterface of ISubscriptionService. To do so, the application needs to provide a service class that derives from SubscriptionManager<T>, specify the callback contract as a type parameter, and derive from the specific subinterface of ISubscriptionService. For example, to implement a transient subscriptions service using the IMyEvents callback interface: [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] class MySubscriptionService : SubscriptionManager<IMyEvents> ,IMySubscriptionService {} MySubscriptionService doesn't need any code because IMySubscriptionService does not add any new operations, and SubscriptionManager<T> already implements the methods of ISubscriptionService. Note that just deriving from SubscriptionManager<IMyEvents> is insufficient because it does not derive from a contract interfaceyou must add the derivation from IMySubscriptionService to support transient subscriptions. Finally, the using application needs to define an endpoint for IMySubscriptionService: <services> <service name = "MySubscriptionService"> <endpoint address = "..." binding = "..." contract = "IMySubscriptionService" /> </service> </services> Example B-2 shows how SubscriptionManager<T> manages transient subscriptions. Example B-2. The transient subscribers management in SubscriptionManager<T>
SubscriptionManager<T> stores the transient subscribers in a generic static dictionary called m_transientStore: static Dictionary<string,List<T>> m_TransientStore; Each entry in the dictionary contains the name of the event operation and all its subscribers in the form of a linked list. The static constructor of SubscriptionManager<T> uses reflection to get all the operations of the callback interfaces (the type parameter for SubscriptionManager<T>) and initializes the dictionary to have all the operations with empty lists. The Subscribe( ) method extracts the callback reference from the operation call context. If the caller specifies an operation name, Subscribe( ) calls the helper method AddTransient( ). AddTransient( ) retrieves the list of subscribers for the event from the store. If the list does not contain the subscriber, it adds it in. If the caller specifies an empty string or null for the operation name, Subscribe( ) calls AddTransient( ) for each operation in the callback contract. Unsubscribe( ) operates in a similar manner. Note that the caller can subscribe to all events and then unsubscribe from a particular one. B.2.2. Managing Persistent SubscribersFor managing persistent subscribers, I defined the IPersistentSubscriptionService interface shown in Example B-3. Example B-3. The IPersistentSubscriptionService interface manages persistent subscribers
To add a persistent subscriber, the caller needs to call Subscribe( ), providing the address of the subscriber, the event's contract name, and the specific event operation itself. To unsubscribe, the caller uses Unsubscribe( ) with the same information. Note that IPersistentSubscriptionService does not imply in any way where the subscribers persist on the service sidethat is an implementation detail. The class SubscriptionManager<T> presented previously also implements the methods of IPersistentSubscriptionService: [BindingRequirement(TransactionFlowEnabled = true)] public abstract class SubscriptionManager<T> where T : class { public void Unsubscribe(string address,string eventsContract, string eventOperation); public void Subscribe(string address,string eventsContract, string eventOperation); //More members } SubscriptionManager<T> stores the persistent subscribers in SQL Server. It is configured to use the Client/Service transaction mode (presented in Chapter 7), and it enforces that mode using my BindingRequirement attribute. The generic type parameter for SubscriptionManager<T> is the events contract. Note that SubscriptionManager<T> does not derive from IPersistentSubscriptionService. The using application needs to expose its own persistent subscription service, but there is no need to derive a new contract from IPersistentSubscriptionService because no callback references are required. The application simply derives from SubscriptionManager<T>, specifying the events contract as a type parameter and adding a derivation from IPersistentSubscriptionService; for example: [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] class MySubscriptionService : SubscriptionManager<IMyEvents>, IPersistentSubscriptionService {} There is no need for any code in MySubscriptionService because SubscriptionManager<T> already implements the methods of IPersistentSubscriptionService. Note that just deriving from SubscriptionManager<IMyEvents> is insufficient because it does not derive from a contract interfaceyou must add the derivation from IPersistentSubscriptionService to support persistent subscriptions. Finally, the application needs to define an endpoint for IPersistentSubscriptionService: <services> <service name = "MySubscriptionService"> <endpoint address = "..." binding = "..." contract = "IPersistentSubscriptionService" /> </service> </services> The implementation of the methods of IPersistentSubscriptionService by SubscriptionManager<T> is shown in Example B-4. Example B-4 is very similar to Example B-2, except the subscribers are stored in SQL Server, not in memory in a dictionary. Example B-4. Persistent subscribers management in SubscriptionManager<T>
If the application wants to support both transient and persistent subscribers for the same events contract, simply derive the subscription service class from both the specialized subinterface of ISubscriptionService and from IPersistentSubscriptionService: [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] class MySubscriptionService : SubscriptionManager<IMyEvents>, IMySubscriptionService,IPersistentSubscriptionService {} and expose the two matching endpoints: <services> <service name = "MySubscriptionService"> <endpoint address = "..." binding = "..." contract = "IMySubscriptionService" /> <endpoint address = "..." binding = "..." contract = "IPersistentSubscriptionService" /> </service> </services> B.2.3. Event PublishingThe parts of the publish-subscribe framework shown so far have only dealt with the aspects of subscription management. The framework also enables easy implementation of the publishing service. The publishing service should support the same events contract as the subscribers, and should be the only point of contact known to the publishers in the application. Because the publishing service exposes the events contract in an endpoint, you need to mark the events contract as a service contract, even if you only use it for duplex callbacks with transient subscribers: [ServiceContract] interface IMyEvents { [OperationContract(IsOneWay = true)] void OnEvent1( ); [OperationContract(IsOneWay = true)] void OnEvent2(int number); [OperationContract(IsOneWay = true)] void OnEvent3(int number,string text); } The publish-subscribe framework contains the helper class PublishService<T>, defined as: public abstract class PublishService<T> where T : class { protected static void FireEvent(params object[] args); } PublishService<T> requires as a type parameter the type of the event's contract. To provide your own publishing service, derive from PublishService<T> and use the FireEvent( ) method to deliver the event to all subscribers, be they transient or persistent, as shown in Example B-5. Example B-5. Implementing an event-publishing service
Note that you can use FireEvent( ) to fire any type of event, regardless of the parameters because of the use of the params object array. Finally, the application needs to expose an endpoint for the publishing service with the events contract: <services> <service name = "MyPublishService"> <endpoint address = "..." binding = "..." contract = "IMyEvents" /> </service> </services> Example B-6 shows the implementation of PublishService<T>. Example B-6. Implementing PublishService<T>
To simplify firing the event by the publishing service, the FireEvent( ) method accepts the parameters to pass to the subscribers, yet its caller does not provide it with the name of the operation to invoke on the subscribers. To that end, FireEvent( ) accesses its stack frame and extracts the name of its calling method. It then uses an overloaded FireEvent( ) that accepts the method name. That method in turn uses the helper method PublishPersistent( ) to publish to all persistent subscribers, and the PublishTransient( ) helper method to publish to all transient subscribers. Both publishing methods operate in an almost identical way: they access SubscriptionManager<T> to retrieve their respective subscribers list, and then use the Publish( ) method to fire the event. The subscribers are returned in the form of an array of proxies to the subscribers. That array is passed to the Publish( ) method. Publish( ) could have simply invoked the subscribers at this point. However, I wanted to support concurrent publishing of the events, so that if any subscriber is undisciplined and takes a long time to process the event, this will not preclude the other subscribers from receiving the event in a timely manner. Note that having the event operations marked as one-way is no guarantee for asynchronous invocation, and besides, I wanted to support concurrent publishing even when the event operation is not marked as a one-way operation. Publish( ) defines two anonymous methods. The first calls the Invoke( ) helper method, which will result in firing the event to the individual subscriber provided and then closes the proxy if so specified. Because Invoke( ) was never compiled against the specific subscriber type, it uses reflection and late binding for the invocation. Invoke( ) also suppresses any exceptions raised by the invocation, because those are of no interested to the publishing party. The second anonymous method queues up the first anonymous method to be executed by a thread from the thread pool. Finally, Publish( ) invokes the second anonymous method on every subscriber in the provided array. Note how uniformly PublishService<T> TReats the subscribersit almost does not matter if they are transient or persistent. The only difference is that after publishing to a persistent subscriber, you need to close the proxy. This uniformity is achieved by the helper methods GettransientList( ) and GetPersistentList( ) of SubscriptionManager<T>. Of these two, GettransientList( ) is the simpler one: public abstract class SubscriptionManager<T> where T : class { internal static T[] GetTransientList(string eventOperation) { lock(typeof(SubscriptionManager<T>)) { if(m_TransientStore.ContainsKey(eventOperation)) { List<T> list = m_TransientStore[eventOperation]; return list.ToArray( ); } return new T[]{}; } } //More members } GetTRansientList( ) looks up in the transient store for all the subscribers to the specified operation and returns them as an array. GetPersistentList( ) faces a bigger challenge: there is no ready-made list of proxies to persistent subscribers; all that is known about them is their addresses. GetPersistentList( ) therefore needs to instantiate the persistent subscribers proxies, as shown in Example B-7. Example B-7. Creating the persistent subscribers proxy list
To create a proxy for each subscriber, GetPersistentList( ) needs the subscriber's address, binding, and contract. The contract is of course the type parameter for SubscriptionManager<T>. To obtain the addresses, GetPersistentList( ) calls GetSubscribersToContractEventOperation( ) to query the database and return an array of all addresses of the persistent subscribers who subscribed to the specified event. All GetPersistentList( ) needs now is the binding used by each subscriber. For that, GetPersistentList( ) calls the helper method GetBindingFromAddress( ), which infers the binding to use from the address schema. GetBindingFromAddress( ) treats all HTTP addresses as WSHttpBinding. In addition, GetBindingFromAddress( ) turns on reliability and transaction propagation for each binding, to enable including the event in the publisher's transaction when one-way operations are not used, such as with this events contract: [ServiceContract] interface IMyEvents { [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] void OnEvent1( ); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] void OnEvent2(int number); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] void OnEvent3(int number,string text); } B.2.4. Administering Persistent SubscribersWhile you can add and remove persistent subscriptions at runtime by using the methods of the IPersistentSubscriptionService interface shown in Example B-3, because of their persistent nature, it is likely that managing the subscriptions will be done via some kind of an administration tool. To that end, IPersistentSubscriptionService defines additional operations that answer various queries of the subscribers store, as shown in Example B-8. Example B-8. The IPersistentSubscriptionService interface
All of these administration operations utilize a simple data structure called PersistentSubscription, which contains the address of the subscriber, the subscribed contract, and the event. GetAllSubscribers( ) simply returns the list of all subscribers. GetSubscribersToContract( ) returns all subscribers to a specific contract, and GetSubscribersToContractEventType( ) returns all subscribers to a particular event operation on a specified contract. Finally, for completeness' sake, GetAllSubscribersFromAddress( ) returns all subscribers that provided a specified address. My publish-subscribe framework includes a sample persistent subscription administration tool called Persistent Subscription Manager, shown in Figure B-2. Figure B-2. The Persistent Subscription Manager applicationThe administration tool uses IPersistentSubscriptionService to add or remove subscriptions. To add a new subscription, you need to provide it with the metadata exchange address of the events contract definition. You can use the metadata exchange address of the persistent subscribers themselves or the metadata exchange address of the publish service (such as the one shown in Example B-5) because they are polymorphic. Enter the metadata exchange base address in the MEX Address text box and click the Lookup button. The tool will programmatically retrieve the metadata of the event service and populate the Contract and Event combo boxes. Retrieving the metadata and parsing its content is done using the MetadataHelper class presented in Chapter 2. To subscribe, provide the address of the persistent subscriber and click the Subscribe button. Persistent Subscription Manager then adds the subscription by calling to the subscription service (the MySubscriptionService service in the examples so far). The address for the subscription service is maintained in the Persistent Subscription Manager config file.
B.2.5. Queued Publishers and SubscribersInstead of using the synchronous binding for either publishing or subscribing to the events, you can use the NetMsmqBinding. A queued publish-subscribe service combines the benefits of a loosely coupled system and the flexibility of disconnected execution. When using queued events, all events on the contract must of course be marked as one-way operations. As shown in Figure B-3, you can use queuing at either end independently. Figure B-3. Queued publish-subscribeYou can have a queued publisher and connected synchronous subscribers. You can have a connected publisher publishing to queued subscribers, or you could have both queued publishers and queued subscribers. Note, however, that you cannot have queued transient subscriptions because there is no support within the MSMQ binding for duplex callbacks, since that would render the disconnected aspect of the communication useless. As before, you can use the administration tool to manage the subscribers, and the administration operations are still connected and synchronous. B.2.5.1. Queued publisherTo utilize a queued publisher, the publishing service needs to expose a queued endpoint using the MSMQ binding. When firing events at a queued publisher, the publishing service can be offline, or the publishing client itself can be disconnected. Note that when publishing two events to a queued publishing service, there are no guarantees as to the order of delivery and processing of these events by the end subscribers. You can only assume order of publishing when the events contract is configured for a session and only when dealing with a single publishing service. B.2.5.2. Queued subscriberTo deploy a queued subscriber, the persistent subscribing service needs to expose a queued endpoint. Doing so will enable it to be offline even when the publisher is on-line. When the subscriber is connected again, it will receive all its queued-up events. In addition, queued subscribers can handle the case when the publishing service itself is disconnected, because no events are lost. When multiple events are fired at a single queued subscriber, there are no guarantees as to the order of delivery of the events. The subscriber can only assume the order of publishing when the event's contract has a session. Of course, having both a queued publisher and subscriber allows both to work offline at the same time. |