Page 47 - MSDN Magazine, March 2018
P. 47

target systems—a financial system for issuing invoices and one or more social media platforms for promotion. The data flow direc- tion is one-directional, from the application to external systems. I’m basically broadcasting the information to the world outside.
The broadcast integration pattern describes how to transfer data from an application to multiple target systems in a continuous real-time or near-real-time flow. This process is expected to be trans- actional: If a transaction completes successfully, data is committed at destination. If the transaction fails, the data transfer is aborted. It’s immediately obvious that this broadcast integration channel must be highly available and reliable, in order to avoid losing critical data in transit. Adopting an ESB as the mechanism for queuing data packets and guaranteeing delivery at destination becomes crucial.
Implementing the broadcast pattern closely resembles imple- menting the publish/subscribe pattern in Azure Service Bus, based on topics and subscriptions (bit.ly/2oOmTtM). Topics represent queues of messages to which recipient applications (subscribers) subscribe to receive updates when a message is posted. My e-commerce application publishes a message into a topic. The ESB acts as a mes- sage broker and guarantees delivery of the message at destination by “pushing” the message to the destination, which consists only of subscribed recipients.
Broadcasting a data packet from the e-commerce application essentially means publishing a message in a topic, and having a target application listening on a specific subscription. The broad- cast pattern adds a transactional attribute to the data flow, with the possibility to cancel the transaction in case of delivery failure. As transactions cross system boundaries, they benefit from a “state machine” that retains a snapshot of the brokered message being transferred, before it’s read by all subscribed applications. If any of the subscribers fails to retrieve the message, the entire transac- tion is aborted, to ensure consistency across all systems involved.
The following code broadcasts a message to an Azure Service Bus topic and implements a state machine (bit.ly/29tKRT3) for track- ing delivery of a message:
public class Broadcast {
public async Task Execute(Entity entity) {
var client = TopicClient.CreateFromConnectionString(connectionString, topicName); var message = new BrokeredMessage(JsonConvert.SerializeObject(entity));
await client.SendAsync(message); }
In case of error on delivery, the state machine moves the message to the “dead letter” queue in Azure Service Bus. The message at that point is no longer valid for data transfer and won’t be processed further.
Sending a message to a topic in Azure Service Bus requires a TopicClient connection and a BrokeredMessage to wrap the orig- inal entity and send it, asynchronously, to the bus. All required objects for connecting to Azure Service Bus are distributed in the WindowsAzure.ServiceBus NuGet package and are available in the Microsoft.ServiceBus.Messaging namespace.
The state machine is a singleton asynchronous dictionary con- taining transaction counters by topic. The dictionary keeps count of the number of active transactions—subscribers—that are waiting for a message on a specific topic from the Service Bus. The dictio- nary is thread-safe to allow for concurrent requests:
msdnmagazine.com
private static StateMachine _instance;
public static StateMachine Current => _instance ?? (_instance = new StateMachine());
protected ConcurrentDictionary<string, int> transactions = new ConcurrentDictionary<string, int>();
As shown in Figure 1, a subscriber application reads a message from the Service Bus topic by beginning a new transaction (using the BeginTransactionAsync method) for a specific topic on the state machine, and then handles the OnMessage event to obtain a copy of the entity. The entity is then processed internally; for example, it may be persisted by the recipient system. In case of error, the transaction is canceled.
Completing or aborting the transaction is managed by the state machine using either of two methods—SuccessAsync or CancelAsync. SuccessAsync invokes CompleteAsync on the brokered message, which indicates that the message should be marked as processed and eventually deleted from the topic. This takes place only when all concurrent active transactions are completed:
public async Task<bool> SuccessAsync(BrokeredMessage message, string topicName) {
bool done = await EndTransactionAsync(topicName); int count = Current.transactions[topicName];
// All concurrent transactions are done if (done && count == 0)
{
await message.CompleteAsync(); }
return done; }
CancelAsync, in contrast, aborts the message broadcast by reset- ting the transaction counter for a topic. By calling the DeadLetter- Async method, the brokered message is then moved to the “dead letter” queue, where unsuccessfully processed messages are stored:
public async Task<bool> CancelAsync(BrokeredMessage message, string topicName) {
// Cancel the message broadcast -> Remove all concurrent transactions int count = Current.transactions[topicName];
bool done = Current.transactions.TryUpdate(topicName, 0, count);
if (done) {
await message.DeadLetterAsync(); }
return done; }
Figure 1 Reading a Message from the Service Bus Topic
public async Task ReadMessageAsync() {
await StateMachine.Current.BeginTransactionAsync(topicName);
var client = SubscriptionClient.CreateFromConnectionString( connectionString, topicName,
subscriptionName);
client.OnMessageAsync(async message => {
var entity = JsonConvert.DeserializeObject(message.GetBody<string>());
try {
Save(entity);
await StateMachine.Current.SuccessAsync(message , topicName); }
catch {
await StateMachine.Current.CancelAsync(message , topicName); }
}); }
March 2018 41


































































































   45   46   47   48   49