The ESB.Extensions framework is out!! You can download it here.
The ESB.Extensions builds on top of the ESB Toolkit 2.0, and thus requires BizTalk Server 2009. The ESB.Extensions contains a number of services, that are itinerary-aware: ReceivePipelineService, SendPipelineService, ResequencerService and ResequencerGoService. Let’s look at them more closely.
When the ReceivePipelineService executes, the following steps are performed:
- Receives a message.
- Uses the resolution framework to resolve the receive pipeline type.
- Executes the receive pipeline.
- [Optional] If the pipeline didn’t set these properties, assigns BatchId and SequenceId promoted properties to the pipeline output message(s).
- [Optional] If the resolution (resolutionDictionary) included a new itinerary for the pipeline output message, this new itinerary is applied to the pipeline output message(s), and the original itinerary is reserved for further processing of the original (inbound) message. If no itinerary was specified during the resolution, it is assumed that the pipeline output message(s) should continue the itinerary of the original message, and processing of the original message is discontinued.
- Publishes the pipeline output messages.
- [Optional] If a new itinerary was specified for the pipeline output message(s) during resolution, advances the itinerary of the original message and publishes the original inbound message (if it had outstanding itinerary steps).
The following drawing depicts an itinerary, which I have titled “OneWay-Debatch-MessageSendPort” (ODM) that uses the ReceivePipelineService:
Fairly simple scenario, you send an order batch through an itinerary onramp to BizTalk, where the ReceivePipelineService orchestration will pick it up, debatch it using the receive pipeline, and forwards the order items to the Message SendPort. (i.e. bound to a FILE location).
Debatching is one obvious application for the ReceivePipelineService, but you can specify any receive pipeline type in the resolver, so it can also be a 1:1 scenario where the pipeline has only 1 output message. I’m fairly certain that it will not work with FFDisAsm pipelines, though!
When the SendPipelineService executes, the following steps are performed:
- Receives the 1st message within the batch of messages to be executed by the send pipeline. After receipt of this 1st message, a convoy subscription is created based on the Itinerary (should point to SendPipelineService) and the BatchId.
- Resolves the SendPipelineServiceResolution object, necessary to perform the execution of the send pipeline. This resolution object contains the following parameters:
- Send pipeline type: the type of the send pipeline to execute.
- XLANGMessageComparer type: The SendPipelineService uses a serializable list to collect the messages to be aggregated. You can specify the comparer type to control how the messages are sorted. This happens in memory, but does not add much overhead compared to the use of Microsoft.XLANGs.Pipeline.SendPipelineInputMessages. I used an XLANGMessage comparer that compares the messages’s SequenceId promoted property values.
- Batch Timeout: The timeout to apply to the entire batch. When this timeout expires, all the collected messages up to that point in time will be aggregated and the service completes.
- Message Timeout: The timeout to apply to each message. When this timeout expires, all the collected messages up to that point in time will be aggregated and the service completes.
- [Optional] If a new itinerary was specified during resolution, the new itinerary will be applied to the pipeline output message, and the original inbound message will continue it’s own itinerary. If no new itinerary was specified, it is assumed that the original inbound message’s itinerary should be applied to the aggregated message. So, if a new itinerary was specified, the original message’s itinerary is advanced and applied to the original message, and the original message is republished for further processing.
- The inbound message is added to a list of pipeline input messages.
- The receive loop (batch convoy) starts.
- The next message within the batch is received and added to the list of pipeline input messages.
- [Optional] Like in step 3, if necessary, the original message’s itinerary is advanced and the message republished.
- When all messages have been received (or when a Batch or Message Timeout has occurred), the receive loop (and the corresponding batch convoy) is ended.
- The pipeline input messages are sorted using the XLANGMessageComparer type, specified in the resolver.
- The itinerary (new or original advanced) is applied to the pipeline output message, and the message is published.
Below a scenario that uses both the ReceivePipelineService and the SendPipelineService:
An orderbatch gets sent through an itinerary onramp, gets debatched by the ReceivePipelineService. The resolver does not specify a new itinerary, so the original message’s itinerary is used for the debatched messages. The debatched messages all have the same BatchId, and have a current itinerary step that points to the SendPipelineService, so they get picked up by the SendPipelineService. The SendPipelineService sorts and aggregates the messages, and publishes one output message, which is sent to the offramp.
The Resequencer and ResequencerGo services
The idea for these services actually came from a TechEd (2006?) session, where Lee Graber detailed the use of the (what he called) Go-pattern to implement a resequencer. The Resequencer and ResequencerGo services work together to resequence any given sequence of messages. The ResequencerService blocks processing of a certain message until it receives the corresponding Go message (by using a parallel convoy). The ResequencerGoService’s responsibility is to work out which message within the sequence is up next and publishes the corresponding Go message.
The ESB.Extension implementation let’s the resolver decide which message is up next, so the resolver (i.e. BRE) could access a database or any other (in-memory?) representation of the message sequence (or batch) to determine who’s next. In my BizUnit tests I have used the simplest possible scenario, where the maximum number of messages is always 5 and the next SequenceId is simple determined by incrementing by 1.
Below a scenario that uses the debatcher, aggregator and resequencer services:
In this scenario, the orderbatch gets debatched, every order item gets picked up by a seperate Sequencer instance. The 1st Go message, corresponding to the 1st message (SequenceId == 0) is sent to BizTalk, triggering the 1st Resequencer to release the 1st message. That message gets picked up by the SendPipelineService, and the SendPipelineService stores the pipeline input message, then itinerary-advances and republishes the original message, which triggers the ResequencerGoService to increment to the next SequenceId and publish the corresponding Go message. This process repeates until all messages have been processed (the ResequencerGo’s resolution works out when it is the last message and does not publish a new Go message when it is the last). The SendPipelineService aggregates the message, applies the itinerary and ships it off to the offramp.
A select number of scenarios has been described here, and these are also the scenarios that have been tested. But there can be many many other scenarios, ranging from likely to not-in-a-million-years, that could be implemented with these services:
- Receiving incoming messages for a certain interval (24 hours?), then sort and aggregate them and sending them off.
- Debatching, sending the seperate items to web services, aggregating the responses, sending them back to the caller. (Doesn’t work yet because Request-Response hasn’t been implemented yet; future version)
- Receiving incoming messages, block them with the ResequencerService, after a specified interval, start sending the messages to another system in a specified order.
- If I missed an obvious scenario, please drop a comment!