Page 50 - MSDN Magazine, March 2018
P. 50

The connection is a fully bidirectional stream. As Figure 5 shows, I add a stream reader and a stream writer to it, which allows me to read the JSON-serialized GIS object and to write it back to the relay after resolving the provided geo-coordinates into a location address.
Figure 5 Reading and Writing the JSON-Serialized GIS Object
The Correlation Pattern
There is one more requirement to meet: I need to share customer data with partner organizations. But I don’t want to disclose information that partners aren’t authorized to access. I need to implement a way to sync data among systems only if they’re cor- related with each other.
The Correlation pattern, focuses on the intersection of two datasets and performs a synchronization of that scoped dataset only if a record exists in both systems. While the relay communi- cation with the GIS server would create a new record if the object couldn’t be found in the system, implementing data integration based on the correlation pattern strictly requires that correlated records exist in both systems for the sync to happen. This applies perfectly to my case where I want to share data with marketing partners, but only if they already have this information in their own system. But there’s a clear challenge here—how can I identify related records that represent the same entity (customer) across systems? This condition defines whether customer records can be synced with external partners.
As shown in Figure 6, the data correlation workflow in the e-com- merce application sends a customer record with some marketing information to an Azure Service Bus topic. The customer record is an aggregation of data from multiple entities. It’s not advisable to use the same object (a database entity) as a data transfer object (DTO), as this would create a dependency between the service and the data model in the source application. The brokered message is also decorated with a correlation ID that identifies the specific record in a topic subscription; this correlation ID will be useful later in the partner application for verifying whether a customer record already exists.
The correlation service simply exposes methods for matching customer records on a specific subscription, and registering a new customer and returning its correlation ID:
public class CorrelationService {
public Guid RegisterCustomer(CustomerRecord record, string subscription) {
return store.ContainsKey((record, subscription)) ? GetCustomerCorrelationId(record, subscription) : AddCustomer(record, subscription);
}
public bool CustomerExists(Guid correlationId) {
return store.ContainsValue(correlationId); }
Partner applications subscribe to that topic and retrieve the cus- tomer record and the correlation ID, as shown in Figure 7. If the customer record exists in their system, it can be saved eventually.
The entire solution is available free to download from my GitHub repository at bit.ly/2s0FWow. n
Stefano tempeSta is a Microsoft MVP and MCT, and chapter leader of CRMUG Switzerland. A regular speaker at international conferences, including Microsoft Ignite, Tech Summit and Developer Week, Stefano’s interests extend to Office & Dynamics 365, Blockchain and AI-related technologies.
thankS to the following Microsoft technical expert for reviewing this article: Massimo Bonanni
private async void ProcessMessage(HybridConnectionStream relayConnection, CancellationTokenSource cts)
{
// Bidirectional streams for reading and writing to the relay
var reader = new StreamReader(relayConnection);
var writer = new StreamWriter(relayConnection) { AutoFlush = true }; while (!cts.IsCancellationRequested)
{
// Read a message in input from the relay var message = await reader.ReadToEndAsync();
// Resolve address by invoking a service on the GIS server GisObject gisObject =
JsonConvert.DeserializeObject<GisObject>(message); await new GisServer().ResolveAddressAsync(gisObject);
// Write the message back to the relay
message = JsonConvert.SerializeObject(gisObject); await writer.WriteLineAsync(message);
}
await relayConnection.CloseAsync(cts.Token); }
Figure 6 The Correlation Class
public class Correlation {
private async Task Execute(CustomerEntity customer) {
// Map the Customer entity in the e-commerce application (source) // to Customer record in the partner application (destination) CustomerRecord customerRecord = PrepareCustomerRecord(customer);
// Create a connection to an Azure Service Bus Topic
// Serialize the customer record and send the message to the Topic var client = TopicClient.CreateFromConnectionString(
connectionString, topicName); var message = new BrokeredMessage(
JsonConvert.SerializeObject(customerRecord));
// Register the customer record with the Correlation Service // and obtain a Correlation ID message.Properties["CorrelationId}"] =
new CorrelationService().RegisterCustomer(customerRecord, subscriptionName);
await client.SendAsync(message); }
Figure 7 The Partner Class
class Partner {
public void ReceiveCustomerRecord() {
var client = SubscriptionClient.CreateFromConnectionString( connectionString, topicName, subscriptionName);
client.OnMessageAsync(async message => {
CustomerRecord customerRecord = JsonConvert.DeserializeObject<CustomerRecord>(message.GetBody<string>());
Guid correlationId = (Guid)message.Properties["CorrelationId"];
if (CustomerRecordExists(correlationId)) {
await SaveAsync(customerRecord); }
}); }
44 msdn magazine
Azure Service Bus


































































































   48   49   50   51   52