Introducing RockBus: Durable Content Based Routing in WCF

Recently, I published a new project on codeplex called RockBus. It’s a .NET based publish/subscribe and/or content based routing framework that allows for the following functionalities:

* Full support for WCF and it’s protocols, extensibility, etc.

* Durable message delivery using MSMQ and/or SQL Server Service Broker (message persistence in a database is on the todo-list)

* Static subscription configuration via database or web/app configuration files.

* Dynamic (@run-time) subscription configuration via a WCF service interface.

* Transactional delivery to one or multiple subscribers.

* Support for itineraries (complex message flows)

The full list of goodies is documented on the codeplex page: rockbus.codeplex.com.

RockBus can be interesting if you’re looking for a way to decouple your wcf clients and services. For instance, by using publish/subscribe, you can have multiple subscribers (wcf services) receive the same message, while the publisher (wcf client) only has to send the message once, and doesn’t have to know which subscribers actually receive the message(s).

If you want to selectively send messages to subscribers, you can use XPath expressions to filter out the messages. The XPath expressions are part of the subscription information, sent to RockBus by the subscribers.

There are a couple of samples, provided with the RockBus solution:

* 0. No Dependencies: Shows how RockBus can be used to perform in-memory (non-store-and-forward) routing of messages to subscribers. This sample has no dependencies on for instance MSMQ or SSB.

* 1. Durable Cbr: Shows how RockBus can be used to perform content-based routing, while storing the messages in a durable backing store (MSMQ) before delivering them to the subscribers.

* 2. Transactional Delivery: Shows how RockBus can be configured to transactionally deliver one message to two subscribers (one message copy to each subscriber), within 1 transaction. If one subscriber’s transaction rolls back, the other subscriber’s transaction will, too. The transactions are propagated using wcf WsHttp and NetNamedPipe bindings.

* 3. Dynamic Publish/Subscribe: Shows how RockBus can be used to receive subscription information at run-time, store that subscription information into a subscription database, and perform content-based routing on any published messages using that subscription information.

* 4. File Transport: Shows how the RockBus framework can be extended with custom implementation of a transport by implementing the ITransport and IMessagePublisher interfaces. (The File transport is (to be) used for demonstration purposes only).

Advertisements
Posted in Wcf | Leave a comment

Generic Message Contract and Transaction Flow in WCF

OK, this one had me fistpumping (in the air!) for quite some time.

Short introduction. For my upcoming session at the Sela Developer Practice, I’m developing a demo that involves durable content based routing with MSMQ and WCF. I was using a generic message contract:

[ServiceContract(Namespace = ”SomeNamespace”, SessionMode = SessionMode.Allowed)]
public interface ITwoWay
{
    [TransactionFlow(TransactionFlowOption.Allowed)]
    [OperationContract(AsyncPattern = true, IsOneWay = false, Action = "*", ReplyAction = "*")]
    IAsyncResult BeginProcessRequest(Message message, AsyncCallback callback, object state);
    Message EndProcessRequest(IAsyncResult result);
}

N.B. Apart from the name and namespace, this is identical to the WCF 4.0 RoutingService IRequestReplyRouter interface.

Furthermore I was using:

  • WsHttp binding with transactionFlow=”true”,
  • [OperationBehavior(TransactionFlowOption.Allowed)] on both the client generic message contract and the typed service contract,
  • [OperationBehavior(TransactionScopeRequired = true)] on the service method,
  • and a TransactionScope in the client to make sure the WCF call was performed from within an ambient transaction.

I was experiencing a strange issue with flowing the transaction from the client to the service. The transaction SOAP header was not added, even though I was following all the rules to enable transaction flow described above.

It turns out there’s a known issue (described here) with flowing transactions when using a generic message contract. After a lot of research, I found that this is because of the fact that a generic message contract uses wildcard specifications for the Action and/or ReplyAction. Then I wondered how the .NET 4.0 RoutingService would deal with this issue, so I reviewed the code of the .NET 4.0 RoutingService and found this:

private static void ConfigureTransactionFlow(ServiceEndpoint endpoint)
{
    CustomBinding binding = endpoint.Binding as CustomBinding;
    if (binding == null)
    {
        binding = new CustomBinding(endpoint.Binding);
    }

    TransactionFlowBindingElement element = binding.Elements.Find<TransactionFlowBindingElement>();
    if (element != null)
    {
        element.AllowWildcardAction = true;
        endpoint.Binding = binding;
    }
}

Applying this change to the binding configuration fixes the transaction flow when using a generic message contract.

There’s one more gotcha though; if you apply this to the endpoint of the channel (or any class derived from ClientBase<T>), it still doesn’t work, because the channel has already been initialized internally before you get to change the binding configuration. You need to apply this to the endpoint of the ChannelFactory, BEFORE you create the channel. (I use a customized base class (other than ClientBase<T>) for WCF proxies where I arrange for a timely initialization of the binding configuration).

Sela

Posted in Wcf | Leave a comment

WcfTestClient: The namespace ” already contains a definition for ‘…’

While using the WcfTestClient, I received the error message displayed in the title. It turned out that in my case, I was using colons in the namespaces of my wcf service, like so:

[ServiceContract(Namespace=“urn:SomeName::SomeMoreNames”)]
public interface IXxx

[OperationContract(Action = “urn:SomeName::SomeMoreNames”)]

{
void MetodName();
}

I managed to fix the error by changing the namespaces into something like “http://SomeName.SomeMoreNames&#8221;. Hope this helps anyone.

Posted in Wcf | Leave a comment

Asynchronous transactions and propagation in WCF

I was recently looking into implementing a transactional WCF service asynchronously, and ran into the fact that asynchrony and transactions don’t mix … easily. I also found that there weren’t that many samples of asynchronously implemented transactional WCF services out there that could help to understand the implementation details and play around with, so i decided to write a sample.

The sample contains two WCF services: StorageService and StorageServiceTransactional. As the names suggest, the first doesn’t use transactions, the latter does. The sample uses wsHttpBinding and netMsmqBinding. Below a picture of a call sequence that could be performed with the sample:

AsyncTrans

The sample allows to easily (by (un)commenting only a couple of lines) change the sequence of calls. For instance, you could make the non-transactional StorageService (instead of the transactional StorageServiceTransactional) pick up the message from the MSMQ queue, and see what happens to the transactions and the propagation. The number of possible combinations grows quickly; that’s exactly why this sample can come in handy.

The sample writes the TransactionInformation to the console at several steps during the execution of the call sequences. This allows you to inspect the transaction identifiers, and the transaction propagation. Below a screenshot of the console output, with highlights to the transaction identifiers:

AsyncTrans.ConsoleOutput

In the picture above, you can see that the client and the host used different transactions (different local and distributed identifiers). However, the message exchange between the two services within the host was done within one and the same transaction (same identifiers). The fact that the message exchange was executed ‘within one executable’ doesn’t make too much difference; if you would run two instances of the host executable (and make the appropriate changes regarding the listen urls), the message exchange would still be performed in one transaction. The LocalIdentifier would be different (LocalIdentifier corresponds to AppDomain), but the DistributedIdentifier would be the same.

If you’re wondering about the contents of the SOAP header; below the fragment of the SOAP header (taken with Fiddler2), containing the WS-* header describing the transaction context. You can see that the LocalTransactionIds match the DistributedIdentifiers displayed above:

<CoordinationContext s:mustUnderstand="1" xmlns="http://schemas.xmlsoap.org/ws/2004/10/wscoor" xmlns:mstx="http://schemas.microsoft.com/ws/2006/02/transactions">
  <wscoor:Identifier xmlns:wscoor="http://schemas.xmlsoap.org/ws/2004/10/wscoor">urn:uuid:e49c680d-ebd5-449a-b2a5-36e01b16ac0f</wscoor:Identifier>
  <Expires>59904</Expires>
  <CoordinationType>http://schemas.xmlsoap.org/ws/2004/10/wsat</CoordinationType>
  <RegistrationService>
    <Address xmlns="http://schemas.xmlsoap.org/ws/2004/08/addressing">https://bram08r2/WsatService/Registration/Coordinator/Disabled/</Address>
    <ReferenceParameters xmlns="http://schemas.xmlsoap.org/ws/2004/08/addressing">
      <mstx:RegisterInfo>
        <mstx:LocalTransactionId>e49c680d-ebd5-449a-b2a5-36e01b16ac0f</mstx:LocalTransactionId>
      </mstx:RegisterInfo>
    </ReferenceParameters>
  </RegistrationService>
  <mstx:IsolationLevel>0</mstx:IsolationLevel>
  <mstx:LocalTransactionId>e49c680d-ebd5-449a-b2a5-36e01b16ac0f</mstx:LocalTransactionId>
  <PropagationToken xmlns="http://schemas.microsoft.com/ws/2006/02/tx/oletx">AQAAAAM...snipped for brevity...AAAA==</PropagationToken>
</CoordinationContext>

Regarding the asynchronous implementation

The WCF services were implemented with the Task Parallel Library, but you could easily swap that out for ‘old-school’ APM style BeginXxx etc. What’s more important is the way the transaction completion is implemented. Asynchronously completing transactions is more complicated, because there are multiple threads involved in the completion process, and you just can’t rely on using (TransactionScope ts = new TransactionScope) { … }. The sample was implemented using the Transaction.Clone() method, which returns a DependentTransaction. This DependentTransaction is used to block the completion process, and it is passed as the asynchronous state, to be completed when the asynchronous call completes (the EndXxx method);

If you find anything wrong with this sample, pls. let me know! Other suggestions are welcome too. You can download it from here.

Hope this helps anyone!

Posted in Wcf | Leave a comment

Map Request to Response in BizTalk Server

Consider the following scenario:
  • You need to receive a message into BizTalk Server, and send an acknowledgement back to the sender (probably using a Request-Response port).
  • The response is trivial, it’s similar to a void response in WCF, or containing only information that can be obtained from the request message (such as an ID or something).
The figure below displays this scenario graphically:
BizTalk Server doesn’t allow that easily out of the box, because there’s no standard way to create a response and correlate it to the request without involving an orchestration or a solicit-response port.
In comes the context property RouteDirectToTP. If you set this property on the inbound request with value true, the BizTalk Messaging Engine will route the request immediately back to the receive port (actually, it routes it back to any instance subscription with the matching EpmRRCorrelationToken. This post explains why. In the above scenario, the receive port has such an instance subscription). Since the receive port has an outbound map, that transforms the request to response, the sender will receive the response message.
In the above sample, the context property is set using a ContextManipulator pipeline component, which can manipulate context properties based on xml configuration. This xml configuration looks like this:
<cmc xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema">
  <cm xsi:type="add">
    <cpi>
      <name>RouteDirectToTP</name>
      <namespace>http://schemas.microsoft.com/BizTalk/2003/system-properties</namespace>
      <value>True</value>
      <type>Boolean</type>
      <promoted>true</promoted>
    </cpi>
  </cm>
</cmc>

The ContextManipulator pipeline component allows to add, remove, map, change, promote and demote context properties.

Below a screenshot of using the WCF Test Client to receive a response using the above sample:

WCFTestClient

You can download the sample from my skydrive. It includes the sources for the ContextManipulator pipeline component.

Posted in BizTalk | Tagged , , , , , | 1 Comment

Sela Developer Practice samples

Yesterday 29-12-2009 we presented at the Sela Developer Practice in a session on the BizTalk ESB Toolkit 2.0 and the Windows Azure platform AppFabric .NET ServiceBus. We demonstrated some samples, which are available in this post.

The first sample involves using BizTalk to publish messages on the .NET ServiceBus using the NetEventRelayBinding. The information, published by BizTalk, is retrieved from the yahoo finance website. This part of the demo was inspired by Kent Weare’s blog on consuming the Yahoo finance webservice from BizTalk. After transforming the csv file format, provided by the yahoo finance webservice, it’s published on the .NET ServiceBus and consumed by a console application client.

The second sample involves duplex communication using the NetTcpRelayBinding. It’s amazing to see that sessionful duplex can be performed accross the .NET ServiceBus. Hope you enjoy!

Posted in BizTalk | Tagged , , , , , | Leave a comment

Debatching, aggregation and resequencing using the BizTalk ESB Toolkit 2.0 and ESB.Extensions

The ESB.Extensions framework is out!! You can download it here.

The ESB.Extensions builds on top of the ESB Toolkit 2.0, and thus requires BizTalk Server 2009. The ESB.Extensions contains a number of services, that are itinerary-aware: ReceivePipelineService, SendPipelineService, ResequencerService and ResequencerGoService. Let’s look at them more closely.

The ReceivePipelineService

When the ReceivePipelineService executes, the following steps are performed:

  1. Receives a message.
  2. Uses the resolution framework to resolve the receive pipeline type.
  3. Executes the receive pipeline.
  4. [Optional] If the pipeline didn’t set these properties, assigns BatchId and SequenceId promoted properties to the pipeline output message(s).
  5. [Optional] If the resolution (resolutionDictionary) included a new itinerary for the pipeline output message, this new itinerary is applied to the pipeline output message(s), and the original itinerary is reserved for further processing of the original (inbound) message. If no itinerary was specified during the resolution, it is assumed that the pipeline output message(s) should continue the itinerary of the original message, and processing of the original message is discontinued.
  6. Publishes the pipeline output messages.
  7. [Optional] If a new itinerary was specified for the pipeline output message(s) during resolution, advances the itinerary of the original message and publishes the original inbound message (if it had outstanding itinerary steps).

The following drawing depicts an itinerary, which I have titled “OneWay-Debatch-MessageSendPort” (ODM) that uses the ReceivePipelineService:

OnRamp, Debatch, MessageSendPort

 

Fairly simple scenario, you send an order batch through an itinerary onramp to BizTalk, where the ReceivePipelineService orchestration will pick it up, debatch it using the receive pipeline, and forwards the order items to the Message SendPort. (i.e. bound to a FILE location).

Debatching is one obvious application for the ReceivePipelineService, but you can specify any receive pipeline type in the resolver, so it can also be a 1:1 scenario where the pipeline has only 1 output message. I’m fairly certain that it will not work with FFDisAsm pipelines, though!

The SendPipelineService

When the SendPipelineService executes, the following steps are performed:

  1. Receives the 1st message within the batch of messages to be executed by the send pipeline. After receipt of this 1st message, a convoy subscription is created based on the Itinerary (should point to SendPipelineService) and the BatchId.
  2. Resolves the SendPipelineServiceResolution object, necessary to perform the execution of the send pipeline. This resolution object contains the following parameters:
    • Send pipeline type: the type of the send pipeline to execute.
    • XLANGMessageComparer type: The SendPipelineService uses a serializable list to collect the messages to be aggregated. You can specify the comparer type to control how the messages are sorted. This happens in memory, but does not add much overhead compared to the use of Microsoft.XLANGs.Pipeline.SendPipelineInputMessages. I used an XLANGMessage comparer that compares the messages’s SequenceId promoted property values.
    • Batch Timeout: The timeout to apply to the entire batch. When this timeout expires, all the collected messages up to that point in time will be aggregated and the service completes.
    • Message Timeout: The timeout to apply to each message. When this timeout expires, all the collected messages up to that point in time will be aggregated and the service completes.
  3. [Optional] If a new itinerary was specified during resolution, the new itinerary will be applied to the pipeline output message, and the original inbound message will continue it’s own itinerary. If no new itinerary was specified, it is assumed that the original inbound message’s itinerary should be applied to the aggregated message. So, if a new itinerary was specified, the original message’s itinerary is advanced and applied to the original message, and the original message is republished for further processing.
  4. The inbound message is added to a list of pipeline input messages.
  5. The receive loop (batch convoy) starts.
  6. The next message within the batch is received and added to the list of pipeline input messages.
  7. [Optional] Like in step 3, if necessary, the original message’s itinerary is advanced and the message republished.
  8. When all messages have been received (or when a Batch or Message Timeout has occurred), the receive loop (and the corresponding batch convoy) is ended.
  9. The pipeline input messages are sorted using the XLANGMessageComparer type, specified in the resolver.
  10. The itinerary (new or original advanced) is applied to the pipeline output message, and the message is published.

Below a scenario that uses both the ReceivePipelineService and the SendPipelineService:

OnRamp, Debatch, Aggregate, MessageSendPort

An orderbatch gets sent through an itinerary onramp, gets debatched by the ReceivePipelineService. The resolver does not specify a new itinerary, so the original message’s itinerary is used for the debatched messages. The debatched messages all have the same BatchId, and have a current itinerary step that points to the SendPipelineService, so they get picked up by the SendPipelineService. The SendPipelineService sorts and aggregates the messages, and publishes one output message, which is sent to the offramp.

The Resequencer and ResequencerGo services

The idea for these services actually came from a TechEd (2006?) session, where Lee Graber detailed the use of the (what he called) Go-pattern to implement a resequencer. The Resequencer and ResequencerGo services work together to resequence any given sequence of messages. The ResequencerService blocks processing of a certain message until it receives the corresponding Go message (by using a parallel convoy). The ResequencerGoService’s responsibility is to work out which message within the sequence is up next and publishes the corresponding Go message.

The ESB.Extension implementation let’s the resolver decide which message is up next, so the resolver (i.e. BRE) could access a database or any other (in-memory?) representation of the message sequence (or batch) to determine who’s next. In my BizUnit tests I have used the simplest possible scenario, where the maximum number of messages is always 5 and the next SequenceId is simple determined by incrementing by 1.

Below a scenario that uses the debatcher, aggregator and resequencer services:

OnRamp, Debatch, Resequence, Aggregate, MessageSendPort

 

In this scenario, the orderbatch gets debatched, every order item gets picked up by a seperate Sequencer instance. The 1st Go message, corresponding to the 1st message (SequenceId == 0) is sent to BizTalk, triggering the 1st Resequencer to release the 1st message. That message gets picked up by the SendPipelineService, and the SendPipelineService stores the pipeline input message, then itinerary-advances and republishes the original message, which triggers the ResequencerGoService to increment to the next SequenceId and publish the corresponding Go message. This process repeates until all messages have been processed (the ResequencerGo’s resolution works out when it is the last message and does not publish a new Go message when it is the last). The SendPipelineService aggregates the message, applies the itinerary and ships it off to the offramp.

Conclusion

A select number of scenarios has been described here, and these are also the scenarios that have been tested. But there can be many many other scenarios, ranging from likely to not-in-a-million-years, that could be implemented with these services:

  • Receiving incoming messages for a certain interval (24 hours?), then sort and aggregate them and sending them off.
  • Debatching, sending the seperate items to web services, aggregating the responses, sending them back to the caller. (Doesn’t work yet because Request-Response hasn’t been implemented yet; future version)
  • Receiving incoming messages, block them with the ResequencerService, after a specified interval, start sending the messages to another system in a specified order.
  • If I missed an obvious scenario, please drop a comment!
Posted in BizTalk | Tagged , , , , , , , | Leave a comment