Enterprise Integration Zone is brought to you in partnership with:

Working as professional freelancer for the last 5 years. Specialized on and addicted to .NET from the beginning. Socializing people of all areas, from CEO's to co-workers. A few years ago I made a major shift from developer to "devsigner".Focusing my creativity also on great user experiences on Windows, Windows 8 and Windows Phone. Utilizing Expression Design, Expression Blend, Photoshop and Illustrator. I started developing my first programs on a Commodore C64 (basic and assembly) at the age of fourteen years. Later on an Amiga 500 (C++). After that I made the shift to DOS and Windows from version 3.11 and up. To me the most important part of developing new experiences is to work with creative and outstanding people and to bring new, exciting ideas to life. I strongly believe that everyone can be a hero if he/she get's pushed and motivated and valued. Therefore, and that under any circumstances: "People first!" Specialties:Extremely motivated and pushing people to create results. Ilija has posted 1 posts at DZone. You can read more from them at their website. View Full User Profile

Self Updating WCF Routing Services on Windows Azure Using CloudFx

09.05.2013
| 2244 views |
  • submit to reddit

Several months ago, I started to develop a special project and I needed the ability to add multiple WCF services and make them available to through one single endpoint. Looking for a solution, I stumbled over WCF-Routing services. This kind of service allows you to bundle multiple WCF endpoints from multiple WCF services and expose them through a single endpoint (Service Aggregation). But that is not the only advantage you have.

By intercepting the messages that come in through the WCF-Routing service you could also check for valid user-tokens implement authentication and more. With all the required information in place, I wrote this little POC (Proof Of Concept) to demonstrate how it could be done. :) Here is the official MSDN definition of what a WCF-Routing service is:

The Routing Service is a generic SOAP intermediary that acts as a message router. The core functionality of the Routing Service is the ability to route messages based on message content , which allows a message to be forwarded to a client endpoint based on a value within the message itself, in either the header or the message body.

Another great advantage of the WCF-Routing services is that routes can be configured dynamically at runtime using RoutingExtensions based on the IExtension interface. This allows you to add endpoints of fellow WCF-Services by using Message filters. Here is the official MSDN definition:

The message filters used by the Routing Service provide common message selection functionality, such as evaluating the name of the endpoint that a message was sent to, the SOAP action, or the address or address prefix that the message was sent to. Filters can also be joined with an AND condition, so that messages will only be routed to an endpoint if the message matches both filters. You can also create custom filters by creating your own implementation of MessageFilter .
Your can read more about the different types of Message-Filters here: Message Filters on MSDN In this implementation I use the EndPointAddressMessageFilter to expose the endpoint-address of the WCF- Service on the Routing-Service. Like mentioned before, WCF-Routing services can be configured dynamically to add WCF-Service endpoints at runtime. This is something I wanted to run fully automated and to scalable on Windows Azure. I am not going into the details on how WCF-Routing services work. You can read anything about WCF-Routing services on MSDN: Routing

The Basic Architecture

Base Architecture The basic architecture of the solution is not very complicated. The main part is the WCF-Routing Service itself. If a specific WCF-Service wants to be added to the router, it puts a specific message onto the Service Bus and the WCF-Routing-Service receives those messages. Already added services will be processed from a SQL-Table to be added after the re-start. This means that we don’t have to restart each WCF-Service that was already added to the Service-Routing table.

Worker Role Configuration (WCF-Routing service)

The “Endpoints” tab

We need a public endpoint for our routing service. On the property page for our worker role we can set an public input endpoint. This endpoint will accept simple TCP messages on the predefined port 10100.  

The “Settings” tab

To be able to address the service later via a domain name and not only using the IP-Address of the host, an additional entry “Domain” is added. Because the service is running only on the local machine, the value is set to “localhost”. You can change that later, if you want to run this POC on Windows Azure. 

The “ServcieDefinition.csdef” File

To allow the worker role to run with elevated privileges and to open custom ports for its endpoints, an “Runtime” element needs to be added within the “WorkerRole” tag:

<Runtime executionContext="elevated" />
 

The POC implementation


Implementing the basic Routing-Service

We need to register a WCF service-host of type RoutingService in our worker role to fire-up a new WCF-Routing-Service. Personally I prefer to configure each WCF-Service via code and not via XML configuration file. This is what worker role with a basic WCF-Router implementation looks like (more about the details later):
using System;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
using Microsoft.WindowsAzure.ServiceRuntime;
using System.ServiceModel;
using System.ServiceModel.Routing;
using SelfUpdatingServiceRouter.Behaviours;
using System.ServiceModel.Description;

namespace SelfUpdatingServiceRouter
{
public class WorkerRole : RoleEntryPoint
{
private string endPointAddress;

private string listenAddress;

private RoleInstanceEndpoint endpointAzure;

/// <summary>
/// Called by Windows Azure after the role instance has been initialized. This method
/// serves as the
/// main thread of execution for your role.
/// </summary>
/// <remarks>
/// <para>
/// Override the Run method to implement your own code to manage the role's execution.
/// The Run method should implement
/// a long-running thread that carries out operations for the role. The default implementation
/// sleeps for an infinite
/// period, blocking return indefinitely.
/// </para>
/// <para>
/// The role recycles when the Run method returns.
/// </para>
/// <para>
/// Any exception that occurs within the Run method is an unhandled exception.
/// </para>
/// </remarks>
public override void Run()
{
//Create a new WCF host of type RoutingService
using (ServiceHost host = new ServiceHost(typeof(RoutingService)))
{

//Configure the host
this.ConfigureServiceHost(host);

while (true)
{
Thread.Sleep(10000);
Trace.TraceInformation("Routing Service Working...", "Information");
Trace.TraceInformation(listenAddress);
}
}
}

/// <summary>
/// Configures the service host.
/// </summary>
/// <param name="host">The host.</param>
private void ConfigureServiceHost(ServiceHost host)
{
try
{
endpointAzure = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints["RoutingServiceMain"];

this.endPointAddress = string.Format("http://{0}/ServiceRouter", endpointAzure.IPEndpoint);

this.listenAddress = string.Format("http://{0}:{1}/ServiceRouter/", RoleEnvironment.GetConfigurationSettingValue("Domain"), endpointAzure.IPEndpoint.Port);

var httpBinding = new BasicHttpBinding();

httpBinding.SendTimeout = TimeSpan.FromMinutes(1);
httpBinding.ReceiveTimeout = TimeSpan.FromMinutes(1);

var routerEndpoint = host.AddServiceEndpoint(typeof(IRequestReplyRouter), httpBinding, this.endPointAddress, new Uri(this.listenAddress));

routerEndpoint.Name = "RouterMain";

host.Description.Behaviors.Add(new RoutingBehavior(new RoutingConfiguration()));

host.Description.Behaviors.Add(new RoutingUpdateBehaviour());

ServiceMetadataBehavior smb = new ServiceMetadataBehavior();
smb.HttpGetEnabled = true;
smb.HttpGetUrl = new Uri(this.endPointAddress);
host.Description.Behaviors.Add(smb);

ServiceDebugBehavior debug = host.Description.Behaviors.Find<ServiceDebugBehavior>();

// if not found - add behavior with setting turned on 
if (debug == null)
{
host.Description.Behaviors.Add(
new ServiceDebugBehavior() { IncludeExceptionDetailInFaults = true });
}
else
{
// make sure setting is turned ON
if (!debug.IncludeExceptionDetailInFaults)
{
debug.IncludeExceptionDetailInFaults = true;
}
}

host.Open();

}
catch (Exception ex)
{
Trace.TraceError(ex.Message);
}
}

/// <summary>
/// Called by Windows Azure to initialize the role instance.
/// </summary>
/// <remarks>
/// <para>
/// Override the OnStart method to run initialization code for your role.
/// </para>
/// <para>
/// Before the OnStart method returns, the instance's status is set to Busy and the
/// instance is not available
/// for requests via the load balancer.
/// </para>
/// <para>
/// If the OnStart method returns false, the instance is immediately stopped. If
/// the method
/// returns true, then Windows Azure starts the role by calling the <see cref="M:Microsoft.WindowsAzure.ServiceRuntime.RoleEntryPoint.Run" />
/// method.
/// </para>
/// <para>
/// A web role can include initialization code in the ASP.NET Application_Start method
/// instead of the OnStart method.
/// Application_Start is called after the OnStart method.
/// </para>
/// <para>
/// Any exception that occurs within the OnStart method is an unhandled exception.
/// </para>
/// </remarks>
/// <returns>
/// True if initialization succeeds, False if it fails. The default implementation
/// returns True.
/// </returns>
public override bool OnStart()
{
// Set the maximum number of concurrent connections 
ServicePointManager.DefaultConnectionLimit = 12;

// For information on handling configuration changes
// see the MSDN topic at 

http://go.microsoft.com/fwlink/?LinkId=166357.


return base.OnStart();
}
}
}
 

The Service Bus part - background

In the first version of this service, I have used the pure Service Bus .NET assemblies. A few days ago I discovered the CloudFx (Cloud Application Framework & Extensions) library on NuGet. This library was originally written by Microsoft folks to boost the development of cloud based projects. It is based on the “Reactive Extensions” (Rx) library. The CloudFx library allows you to use a publish/subscribe pattern on data-streams. Data streams can be events, Twitter feeds, web service requests and more using LINQ. Here is a small sample, that uses a MouseClick event with the publish/subscribe pattern (Source: MSDN):
///Declare an observable
public ISubject<MouseEventArgs> MouseMove;
///Publish data
MouseMove.OnNext(args);
///Subscribe to an observable
MouseMove.Subscribe(args => Display(args));

As you can see there is no usual Event/Delegate pattern visible anymore. An ISubject of type MouseEventArgs is created and the next time a mouse event occurs, the EventArgs are published to all subscribers. A few lines of code to create some awesomeness – love it! Here are some of the great features:

  • Send and receive messages asynchronously using a Service Bus Topic
  • Send and receive unicast and multicast messages with Service Bus
  • Simple use of “Put” and “Get” (send and receive) methods (Very handy to upload and download blobs)
  • Error handling
  • Inter-role communication

In general, it greatly reduces the code you need to implement these scenarios. Please see the excellent CloudFx Samples on MSDN .

Basic DAL implementation

The RouteMeModel class

using System;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Models
{
/// <summary>
/// The model class holding
/// the WCF properties to be
/// sent to the routing service.
/// </summary>
[Table("RouteModel")]
public class RouteMeModel
{
/// <summary>
/// Gets or sets the serice id.
/// </summary>
/// <value>The serice id.</value>
[Key]
[Column("ServiceUID")]
public string SericeUID { get; set; }

/// <summary>
/// Gets or sets the end point address.
/// </summary>
/// <value>The end point address.</value>
[Column("EndPointAddress")]
public string EndPointAddress { get; set; }

/// <summary>
/// Gets or sets the name of the service.
/// </summary>
/// <value>The name of the service.</value>
[Column("ServiceName")]
public string ServiceName { get; set; }

/// <summary>
/// Gets or sets the name of the contract.
/// </summary>
/// <value>The name of the contract.</value>
[Column("ContractName")]
public string ContractName { get; set; }

/// <summary>
/// Gets or sets the full name of the assembly.
/// </summary>
/// <value>The full name of the assembly.</value>
[Column("FullAssemblyName")]
public string FullAssemblyName { get; set; }
}

}
The RoutMeModel class will transport all the required data over the Service Bus to the Routing-Service, which includes:
  • The endpoint address
  • The name of the service
  • The full name of the contract (including the namespace) implemented by the service
  • The file-name of the executing assembly hosting the service implementation
The RouteModel class will be also used as a EF-entity to manage the WCF endpoint data.

Prerequisites

For our local development we will use SQL Server Express 2012. If you don’t have SQL Server Express 2012 installed on your local machine, you can get it here: Microsoft SQL Server 2012 Express

Adding a DbContext implementation and configure it using Fluent-Configuration.

Because we have a very simple model, the configuration of our DbContext :
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Data.Entity;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Models
{
public class RouteContext:DbContext
{
/// <summary>
/// Gets or sets the services.
/// </summary>
/// <value>The services.</value>
public DbSet<RouteMeModel> Services { get; set; }

//TODO:INSERT YOUR CONNECTION STRING HERE
public RouteContext()
: base(@"[YOUR SQL CONNECTION STRING HERE")
{

}

/// <summary>
/// This method is called when the model for a derived context has been initialized,
/// but
/// before the model has been locked down and used to initialize the context.  The
/// default
/// implementation of this method does nothing, but it can be overridden in a derived
/// class
/// such that the model can be further configured before it is locked down.
/// </summary>
/// <param name="modelBuilder">The builder that defines the model for the context
/// being created.</param>
/// <remarks>
/// Typically, this method is called only once when the first instance of a derived
/// context
/// is created.  The model for that context is then cached and is for all further
/// instances of
/// the context in the app domain.  This caching can be disabled by setting the ModelCaching
/// property on the given ModelBuidler, but note that this can seriously degrade
/// performance.
/// More control over caching is provided through use of the DbModelBuilder and DbContextFactory
/// classes directly.
/// </remarks>
protected override void OnModelCreating(DbModelBuilder modelBuilder)
{
//Autodetect changes
this.Configuration.AutoDetectChangesEnabled = true;

//Define the primary key
modelBuilder.Entity<RouteMeModel>().HasKey<string>(r => r.SericeUID);

//set the connstraints for the entity fields
modelBuilder.Entity<RouteMeModel>().Property(r => r.ServiceName).HasMaxLength(120);
//Max lenght for a identifier
modelBuilder.Entity<RouteMeModel>().Property(r => r.ContractName).HasMaxLength(511);
//Max lenght for a URL in a browser. Should fit
modelBuilder.Entity<RouteMeModel>().Property(r => r.EndPointAddress).HasMaxLength(2083);
//Max length for a assemlby name. Including 256 chars for MAX_PATH + assembly full name 
//assuming it has to fit using the standard GAC path
modelBuilder.Entity<RouteMeModel>().Property(r => r.FullAssemblyName).HasMaxLength(356);

//let it do the needed stuff 
//base.OnModelCreating(modelBuilder);

}

}
}
The next thing to do is to generate the data-table using the “Package Manager Console” and EF. First you need to enable migrations, then add a migration and the last thing to do is to update the database. Here are the three commands to type into the “Package Manager Console”:

  • enable-migrations
  • add-migration Initial (initial is the name for the migration)
  • update-database
This will create the data-table using the code-first approach.  

Dynamic WCF-Router configuration – base scenario

It must be possible to send WCF-Service endpoint-data to a Service Bus topic subscriber. The subscriber (in our case the WCF-Routing service) manages the received endpoint-data and transforms it to endpoints that need to be added dynamically to the routing table of the WCF-Routing service. This a prefect use-case for an unicast scenario. Messages are only addressed to one specific receiver, which is the WCF-Routing service.

Installing CloudFX using NuGet

We need to install the latest pre-release to make it work with the latest Azure SDK (2.1 at the time of this writing). Installing CloudFX - NuGet

Configuring CloudFx

CloudFx is configured using standard app.config files. To simplify the configuration process, I took the configuration file from the CloudFX samples solution, and adapted it accordingly to my needs. The most important setting is the part where the Service Bus settings take place:
<ServiceBusConfiguration defaultEndpoint="wcfrouter" defaultNamespace="[YOUR NAMESPACE HERE]" defaultIssuerName="owner" defaultIssuerSecret="[YOUR KEY HERE]
<add name="wcfrouter" endpointType="Topic" topicPath="WcfRouterSample" />
</ServiceBusConfiguration>

In short the values are:

  • defaultEndPoint is the name for the CloudFx endpoint
  • defaultNamespace is your Service Bus namespace name
  • defaultIssuer is the issuer of your ACS token on Azure (standard owner)
  • defaultIssuerSecret is the default key (ACS token) issued by “owner”
  • endPointType is the type of the endpoint like topic, queue, relay and so forth
  • topicPath this is where you set the name of the topic, that will be created if it does not exist already

The subscription name will be set to “To” property of the routing-message context. In this sample “ServiceRouter”. More about the routing-message context later.

WCF-Router Side Implementation

On the WCF-Router side the following things need to be done:

  • Check, if there are entries in the RouteModel table, if so, check if the services have been added, if not, add the new route to the table
  • Listen for incoming requests, signaling that a WCF-Service wants to be added. Before that happens, check if the service is already in the database, if so, check if something has changed and update, if not just add it or reload it from the database
  • Routing happens based on filtering endpoint addresses
  • The filter-table and filters will be added dynamically per request
  • The contracts will be dynamically added using contract descriptions
  • A separate assembly that contains the service contract will be downloaded from blob-storage and used for the contract descriptions
  • The dynamic update functionality will be implemented using a custom behavior and a custom extension implementation
 
All the requirements are packed into a custom IExtension<T> implementation that can be found in the System.ServiceModel namespace (System.ServiceModel.dll). The IExtension<T>-Interface allows to extend

  • System.ServiceModel.IExtensibleObject<T>
  • System.ServcieModel.IContextChannel
  • System.ServiceModel.ServiceHost (That’s what happens here)
  • System.ServiceModel.InstanceContext
  • System.ServiceModel.OperationContext

using the extensible object pattern. You can read more about it here: IExtension<T> Interface on MSDN Read more about extensible objects and the the “ExtensibleObject<T> Pattern” here . To make the extension available to our WCF-Routing service, we make it available though a service-behavior that we can add to our WCF-Routing service at runtime. Behaviors are basically WCF configuration elements, that allow to extend specific WCF objects and add additional functionality.

Personally I see service-behavior like a kind of “bootstrapper” that allows you to add “plugins” (extensions) to a WCF-Service at runtime (this is what I use them for). A service-behavior exposes the service-host if you implement the IServiceBehaviour interface, so that you can add extensions to that specific host. The “Hook” we can use to add an extension to the current service host is by implementing the IServiceBehavior interface and to derive from the abstract class BehaviourExtensionElement (represents a WCF configuration element). Then we can use the ApplyDispatchBehaviour method (comes from IServiceBehavior) and add our extension to the service-host.
using System;
using System.Collections.Generic;
using System.Linq;
using System.ServiceModel.Configuration;
using System.ServiceModel.Description;
using System.Text;
using System.Threading.Tasks;
using SelfUpdatingServiceRouter.Extensions;

namespace SelfUpdatingServiceRouter.Behaviours
{
public class RoutingUpdateBehaviour : BehaviorExtensionElement, IServiceBehavior
{

/// <summary>
/// Gets the type of behavior.
/// </summary>
/// <returns>The type of behavior.</returns>
/// <value></value>
public override Type BehaviorType
{
get { return typeof(RoutingUpdateBehaviour); }
}

/// <summary>
/// Creates a behavior extension based on the current configuration settings.
/// </summary>
/// <returns>The behavior extension.</returns>
protected override object CreateBehavior()
{
return new RoutingUpdateBehaviour();
}

/// <summary>
/// Adds the binding parameters.
/// </summary>
/// <param name="serviceDescription">The service description.</param>
/// <param name="serviceHostBase">The service host base.</param>
/// <param name="endpoints">The endpoints.</param>
/// <param name="bindingParameters">The binding parameters.</param>
public void AddBindingParameters(ServiceDescription serviceDescription, System.ServiceModel.ServiceHostBase serviceHostBase, System.Collections.ObjectModel.Collection<ServiceEndpoint> endpoints, System.ServiceModel.Channels.BindingParameterCollection bindingParameters)
{
//Not implemented
}

/// <summary>
/// Provides the ability to change run-time property values or insert custom
/// extension objects such as error handlers, message or parameter interceptors,
/// security extensions, and other custom extension objects.
/// </summary>
/// <param name="serviceDescription">The service description.</param>
/// <param name="serviceHostBase">The host that is currently being built.</param>
public void ApplyDispatchBehavior(ServiceDescription serviceDescription, System.ServiceModel.ServiceHostBase serviceHostBase)
{
RouterUpdateExtension updateExtension = new RouterUpdateExtension();
serviceHostBase.Extensions.Add(updateExtension);
}

/// <summary>
/// Provides the ability to inspect the service host and the service description
/// to confirm that the service can run successfully.
/// </summary>
/// <param name="serviceDescription">The service description.</param>
/// <param name="serviceHostBase">The service host that is currently being constructed.</param>
public void Validate(ServiceDescription serviceDescription, System.ServiceModel.ServiceHostBase serviceHostBase)
{
//Not implemented
}
}
}
The extension for our service-host (which in this case is the WCF-Routing service) is realized by implementing the IExtension<ServiceHostBase> interface and the IDisposable interface. This is the most powerful piece of code in the whole solution. Maybe this is the most comprehensive example on how to configure a WCF-Routing service dynamically using code only.
using Microsoft.Experience.CloudFx.Framework.Configuration;
using Microsoft.Experience.CloudFx.Framework.Messaging;
using Microsoft.Experience.CloudFx.Framework.Storage;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.ServiceRuntime;
using Models;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reactive;
using System.Reflection;
using System.ServiceModel;
using System.ServiceModel.Description;
using System.ServiceModel.Dispatcher;
using System.ServiceModel.Routing;
using System.Text;
using System.Threading.Tasks;

namespace SelfUpdatingServiceRouter.Extensions
{
class RouterUpdateExtension : IExtension<ServiceHostBase>, IDisposable
{

private RoleInstanceEndpoint endpointAzure;

ServiceHostBase owner;

IObserver<RouteMeModel> modelObserver;

ServiceBusPublishSubscribeChannel pubSubChannel;

List<ServiceEndpoint> serviceEndPoints;

//The routing configuration to use
RoutingConfiguration rc;

/// <summary>
/// Attaches the specified owner.
/// </summary>
/// <param name="owner">The owner.</param>
public void Attach(ServiceHostBase owner)
{
this.owner = owner;

//This could be done using the owner, you can read it near the bottom of this code-file
endpointAzure = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints["RoutingServiceMain"];

//start the fun
this.Init();
}

/// <summary>
/// Detaches the specified owner.
/// </summary>
/// <param name="owner">The owner.</param>
public void Detach(ServiceHostBase owner)
{
this.Dispose();
}

public void Dispose()
{
//Dispose all the stuff
}

/// <summary>
/// Inits the Service Bus stuff
/// </summary>
private void Init()
{
this.serviceEndPoints = new List<ServiceEndpoint>();
this.rc = new RoutingConfiguration();
this.AddRoutingEntries();
this.SetupServiceBus();
}

/// <summary>
/// Adds the routing entries.
/// </summary>
private void AddRoutingEntries()
{
using (var ctx = new RouteContext())
{
if (ctx.Services.Count() > 0)
{
foreach (var entry in ctx.Services)
{
//Add a route for the service
AddServiceBusEntry(entry);
}
}
}
}

/// <summary>
/// Setups the service bus.
/// </summary>
private void SetupServiceBus()
{
//Setup the service subscription channel
var config = CloudApplicationConfiguration.Current.GetSection<ServiceBusConfigurationSection>(ServiceBusConfigurationSection.SectionName);

pubSubChannel = new ServiceBusPublishSubscribeChannel(config.Endpoints.Get(config.DefaultEndpoint));

CreateSubscriptionForService(pubSubChannel, "RoutingService");
}

/// <summary>
/// Creates the subscription for service.
/// </summary>
/// <param name="pubSubChannel">The pub sub channel.</param>
/// <param name="p">The p.</param>
private void CreateSubscriptionForService(ServiceBusPublishSubscribeChannel pubSubChannel, string serviceName)
{
//Define a filter to receive only messages that have a specific "To" property
var filter = FilterExpressions.GroupOr(
FilterExpressions.MatchTo(serviceName),
FilterExpressions.MatchTo("ServiceRouter"));
//Now we need to create an observer, that will check for new incoming messages
modelObserver = Observer.Create<RouteMeModel>(msg =>
{
var exists = CheckIfRoutingEntryExists(msg);

if (!exists)
{
AddNewServiceEntry(msg);
}
});

pubSubChannel.Subscribe(serviceName,modelObserver,filter);
}

/// <summary>
/// Adds the new service entry.
/// </summary>
/// <param name="msg">The MSG.</param>
private void AddNewServiceEntry(RouteMeModel msg)
{
using (var ctx = new RouteContext())
{
ctx.ChangeTracker.DetectChanges();
msg.SericeUID = Guid.NewGuid().ToString();
ctx.Services.Add(msg);
ctx.SaveChanges();
//Add a route for the service
AddServiceBusEntry(msg);
}
}

/// <summary>
/// Checks if routing entry exists.
/// </summary>
/// <param name="msg">The MSG.</param>
/// <returns></returns>
private bool CheckIfRoutingEntryExists(RouteMeModel msg)
{
using (var ctx = new RouteContext())
{
var entry = (from service in ctx.Services
where service.ServiceName.Equals(msg.ServiceName) && msg.ContractName.Equals(msg.ContractName)
select service).FirstOrDefault();
if(entry == null)
{
return false;
}
else
{
return true;
}
}
}

/// <summary>
/// Adds the service bus entry.
/// </summary>
/// <param name="message">The message.</param>
private void AddServiceBusEntry(RouteMeModel message)
{
//Load the contract assembly from blob storage
//Get the current type of contract to add to the client endpoint
var storageConnection = CloudConfigurationManager.GetSetting("Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString");

var cloudStorage = new ReliableCloudBlobStorage(StorageAccountInfo.Parse(storageConnection));

var contractAssemblyContainer = CloudConfigurationManager.GetSetting("AssemblyContainerName");

var contractAssemblyName = CloudConfigurationManager.GetSetting("ContractAssemblyName");

byte[] data = null;

using (MemoryStream mstream = new MemoryStream())
{
var gotIt = cloudStorage.Get(contractAssemblyContainer, contractAssemblyName, mstream);
if (gotIt)
{
//Get the byte content. This methods does not care about the position
data = mstream.ToArray();
}
}

//Now we load the contract assembly
var assembly = Assembly.Load(data);

Type contractType = assembly.GetType(message.ContractName);

//The contract description we use 
var conDesc = ContractDescription.GetContract(contractType);

var HTTPbinding = new BasicHttpBinding();

var currentServiceEndPoint = new ServiceEndpoint(
conDesc,
HTTPbinding,
new EndpointAddress(message.EndPointAddress));

currentServiceEndPoint.Name = message.ServiceName;

var routerMainEndpoint = owner.Description.Endpoints.Where(ep => ep.Name == "RouterMain").FirstOrDefault();

var conDescRouter = ContractDescription.GetContract(typeof(IRequestReplyRouter));
var rEndPoint = new ServiceEndpoint(conDescRouter,new BasicHttpBinding(), new EndpointAddress( routerMainEndpoint.Address.Uri.OriginalString +"/" + message.ServiceName));
rEndPoint.Name = message.ServiceName;

this.owner.AddServiceEndpoint(rEndPoint);

var addressFilter = new EndpointAddressMessageFilter(new EndpointAddress(routerMainEndpoint.Address.Uri.OriginalString + "/" + message.ServiceName));

//We don't want to route on headers only
rc.RouteOnHeadersOnly = false;

//Add the filter table
rc.FilterTable.Add(addressFilter, new List<ServiceEndpoint>() { currentServiceEndPoint });

//Apply the dynamic configuration
this.owner.Extensions.Find<RoutingExtension>().ApplyConfiguration(rc);
}

/// <summary>
/// Checks if end point was added.
/// </summary>
/// <param name="host">The host.</param>
/// <param name="endpointAddress">The endpoint address.</param>
/// <returns></returns>
private static bool CheckIfEndPointWasAdded(ServiceHostBase host, string endpointAddress)
{
bool isPresent = false;

foreach (var endpoint in host.Description.Endpoints.ToList())
{
if (endpoint.ListenUri.AbsoluteUri.Equals(endpointAddress))
{
isPresent = true;
break;
}
}

return isPresent;
}
}
}
  The main players in this implementation are the following methods:

  • AddRoutingEntries and AddServiceBusEntry, both responsible for adding endpoint-entries to the database
  • SetupServiceBus and CreateSubscriptionForService, both responsible for setting up a CloudFx driven Service Bus Topic subscription
  • AddServiceBusEntry (should be renamed), responsible for the dynamic WCF-Router configuration
Let’s take some of the methods out and focus on their implementation. First the methods that are responsible for the CloudFx setup.

The SetupServiceBus-Method

This method is loading the CloudFx Service Bus configuration using the CloudFx specific CloudApplicationConfiguration class. After loading the configuration, it sets up a ServicePublishSubscribeChannel using the default configuration endpoint to load the Service Bus configuration from the app.config file. The ServicePublishSubscribeChannel can be used to publish messages for a specific Service Bus topic, or to subscribe for messages on a specific Service Bus topic. In this case we do a subscription and wait for topic-messages to arrive.
/// <summary>
/// Setups the service bus.
/// </summary>
private void SetupServiceBus()
{
//Setup the service subscription channel
var config = CloudApplicationConfiguration.Current.GetSection<ServiceBusConfigurationSection>(ServiceBusConfigurationSection.SectionName);

pubSubChannel = new ServiceBusPublishSubscribeChannel(config.Endpoints.Get(config.DefaultEndpoint));

CreateSubscriptionForService(pubSubChannel, "RoutingService");
}

The CreateSubscriptionForService-Method

That’s where the real magic of CloudFx happens. First we set-up a filter-expression for the incoming messages. We receive all messages that contain either the name of the service OR the string constant “ServiceRouter”. All other messages are not of any interest. Then we create an Observer<RouteModel>. The observer will check the Service Bus topic for appropriate messages using the filter-criteria's we defined before. The last thing we do is to subscribe to our pub/sub channel using the service-name (channel name) our observer and the filter. From now on the message-loop is running, ready to receive messages asynchronously. That’s pretty cool!  
/// <summary>
/// Creates the subscription for service.
/// </summary>
/// <param name="pubSubChannel">The pub sub channel.</param>
/// <param name="p">The p.</param>
private void CreateSubscriptionForService(ServiceBusPublishSubscribeChannel pubSubChannel, string serviceName)
{
//Define a filter to receive only messages that have a specific "To" property
var filter = FilterExpressions.GroupOr(
FilterExpressions.MatchTo(serviceName),
FilterExpressions.MatchTo("ServiceRouter"));
//Now we need to create an observer, that will check for new incoming messages
modelObserver = Observer.Create<RouteMeModel>(msg =>
{
var exists = CheckIfRoutingEntryExists(msg);

if (!exists)
{
AddNewServiceEntry(msg);
}
});

pubSubChannel.Subscribe(serviceName,modelObserver,filter);
}
 

The AddServiceBusEntry-Method

This methods implements some real cool things:
  • Loading the contract assembly for the services to be added to the WCF-Router from Azure Blob storage using CloudFx!
  • It adds the endpoint addresses of the WCF-Services to route to as well as the filters to the WCF-Routing service and re-configures the WCF-Routing service at runtime!
To download the contract assembly from Azure Blob storage we need only three lines of code: The following line loads the configuration settings (the Service Bus connection string) from our configuration file
var storageConnection = CloudConfigurationManager.GetSetting("Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString");
This line creates a new ReliableCloudBlobStorage instance:
var cloudStorage = new ReliableCloudBlobStorage(StorageAccountInfo.Parse(storageConnection));
and this line downloads the contract-assembly blob, using the ReliableCloudBlobStorage instance:
var gotIt = cloudStorage.Get(contractAssemblyContainer, contractAssemblyName, mstream);
It returns true, if the download was successful, otherwise false. That’s all the code required to download a blob. To upload a blob, you use the Put-Method :)
/// <summary>
/// Adds the service bus entry.
/// </summary>
/// <param name="message">The message.</param>
private void AddServiceBusEntry(RouteMeModel message)
{
//Load the contract assembly from blob storage
//Get the current type of contract to add to the client endpoint
var storageConnection = CloudConfigurationManager.GetSetting("Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString");

var cloudStorage = new ReliableCloudBlobStorage(StorageAccountInfo.Parse(storageConnection));

var contractAssemblyContainer = CloudConfigurationManager.GetSetting("AssemblyContainerName");

var contractAssemblyName = CloudConfigurationManager.GetSetting("ContractAssemblyName");

byte[] data = null;

using (MemoryStream mstream = new MemoryStream())
{
var gotIt = cloudStorage.Get(contractAssemblyContainer, contractAssemblyName, mstream);
if (gotIt)
{
//Get the byte content. This methods does not care about the position
data = mstream.ToArray();
}
}

//Now we load the contract assembly
var assembly = Assembly.Load(data);

Type contractType = assembly.GetType(message.ContractName);

//The contract description we use 
var conDesc = ContractDescription.GetContract(contractType);

var HTTPbinding = new BasicHttpBinding();

var currentServiceEndPoint = new ServiceEndpoint(
conDesc,
HTTPbinding,
new EndpointAddress(message.EndPointAddress));

currentServiceEndPoint.Name = message.ServiceName;

var routerMainEndpoint = owner.Description.Endpoints.Where(ep => ep.Name == "RouterMain").FirstOrDefault();

var conDescRouter = ContractDescription.GetContract(typeof(IRequestReplyRouter));
var rEndPoint = new ServiceEndpoint(conDescRouter,new BasicHttpBinding(), new EndpointAddress( routerMainEndpoint.Address.Uri.OriginalString +"/" + message.ServiceName));
rEndPoint.Name = message.ServiceName;

this.owner.AddServiceEndpoint(rEndPoint);

var addressFilter = new EndpointAddressMessageFilter(new EndpointAddress(routerMainEndpoint.Address.Uri.OriginalString + "/" + message.ServiceName));

//We don't want to route on headers only
rc.RouteOnHeadersOnly = false;

//Add the filter table
rc.FilterTable.Add(addressFilter, new List<ServiceEndpoint>() { currentServiceEndPoint });

//Apply the dynamic configuration
this.owner.Extensions.Find<RoutingExtension>().ApplyConfiguration(rc);
}
 

The Publisher Part

The publishing part is implemented in a separate assembly called “ServiceMessenger”. It contains only one class, the Messenger class. This assembly needs to be referenced by any of the WCF-Services that want to add their endpoints to the WCF-Router.
using Microsoft.Experience.CloudFx.Framework.Configuration;
using Microsoft.Experience.CloudFx.Framework.Messaging;
using Models;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ServiceMessenger
{
public class Messenger
{
private RouteMeModel model;

/// <summary>
/// Initializes a new instance of the <see cref="Messenger" /> class.
/// </summary>
/// <param name="servcieModel">The servcie model.</param>
public Messenger(string endPointAddress, string contracName, string assemblyName, string serviceName)
{
this.model = new RouteMeModel()
{
ContractName = contracName,
EndPointAddress = endPointAddress,
FullAssemblyName = assemblyName,
ServiceName = serviceName
};
}

/// <summary>
/// Setups the service bus.
/// </summary>
public void SendMessageToRouter()
{
//Setup the service subscription channel

var config = CloudApplicationConfiguration.Current.GetSection<ServiceBusConfigurationSection>(ServiceBusConfigurationSection.SectionName);

var routerServiceCtx = new RoutingMessageContext { To = "ServiceRouter" };

using (var pubSubChannel = new ServiceBusPublishSubscribeChannel(config.Endpoints.Get(config.DefaultEndpoint)))
{
pubSubChannel.Settings.MessageTimeToLive = TimeSpan.FromSeconds(120);

//publish the message
pubSubChannel.Publish(this.model, routerServiceCtx);
}
}
}
}
Creating a new instance allows you to pass all the important parameters to create a new routing-entry and send it via the Service Bus to the WCF-Routing service, where the RouterUpdate extension will receive the data and add a new service endpoint, if it not already exists. The initialization process to create a new ServcieBusPublishSubscriber channel is identical with the one we used to subscribe to the Service Bus topic.
/// <summary>
/// Setups the service bus.
/// </summary>
public void SendMessageToRouter()
{
//Setup the service subscription channel

var config = CloudApplicationConfiguration.Current.GetSection<ServiceBusConfigurationSection>(ServiceBusConfigurationSection.SectionName);

var routerServiceCtx = new RoutingMessageContext { To = "ServiceRouter" };

using (var pubSubChannel = new ServiceBusPublishSubscribeChannel(config.Endpoints.Get(config.DefaultEndpoint)))
{
pubSubChannel.Settings.MessageTimeToLive = TimeSpan.FromSeconds(120);

//publish the message
pubSubChannel.Publish(this.model, routerServiceCtx);
}
}
There are two differences:
  • A RoutingMessageContext instance is created, and the receiver is set via the “To” property
  • The Publish-Method of the ServiceBusPublishSubscriberChannel is used to send an instance of the RouteModel class over the wire, which contains the endpoint data of the service that wants to be added to the WCF-Routing service
That’s all that needs to be done to publish a message for a specific Service Bus topic using CloudFx!  

The Sample WCF-Services and the WCF-Test Client

There are two WCF-Services that publish their endpoints using the ServiceMessenger assembly to the WCF-Router:

  • HelloWorld
  • and HelloWorldExtended

Each of them implements a simple service-method that returns a string. Both services have one endpoint and one metadata publishing behavior. Both service interfaces are defined in the ContractAssembly that has been uploaded to local storage for testing (this is what you should do as well, if you want to test the solution). The services are hosted in separate worker roles, that are nearly identical. Except for the service endpoints:
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.Diagnostics;
using Microsoft.WindowsAzure.ServiceRuntime;
using Microsoft.WindowsAzure.Storage;
using System.ServiceModel;
using ContractAssembly;
using ServiceMessenger;
using System.ServiceModel.Description;

namespace HelloWorld
{
public class WorkerRole : RoleEntryPoint
{
private string endPointAddress;

private string listenAddress;

private RoleInstanceEndpoint endpointAzure;

public override void Run()
{

using (ServiceHost host = new ServiceHost(typeof(HelloWorld)))
{

Thread.Sleep(60000);

// This is a sample worker implementation. Replace with your logic.
Trace.TraceInformation("HelloWorldExtended entry point called", "Information");

this.ConfigureServiceHost(host);

while (true)
{
Thread.Sleep(1000);
Trace.TraceInformation(endPointAddress);
}
}
}

/// <summary>
/// Configures the service host.
/// </summary>
/// <param name="host">The host.</param>
private void ConfigureServiceHost(ServiceHost host)
{
try
{
endpointAzure = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints["HelloWorldMain"];

this.endPointAddress = string.Format("http://{0}/HelloWorld", endpointAzure.IPEndpoint);

this.listenAddress = string.Format("http://{0}:{1}/HelloWorld/", RoleEnvironment.GetConfigurationSettingValue("Domain"), endpointAzure.IPEndpoint.Port);

var httpBinding = new BasicHttpBinding();

httpBinding.SendTimeout = TimeSpan.FromMinutes(1);
httpBinding.ReceiveTimeout = TimeSpan.FromMinutes(1);

var helloWorldExtendedEndPoint = host.AddServiceEndpoint(typeof(IHelloWorldService), httpBinding, this.endPointAddress, new Uri(this.listenAddress));

helloWorldExtendedEndPoint.Name = "HelloWorld";

var messenger = new Messenger(this.endPointAddress, "ContractAssembly.IHelloWorldService", "ContractAssembly.dll", "HelloWorld");

messenger.SendMessageToRouter();

ServiceMetadataBehavior smb = new ServiceMetadataBehavior();
smb.HttpGetEnabled = true;
smb.HttpGetUrl = new Uri(this.endPointAddress);
host.Description.Behaviors.Add(smb);

host.Open();

}
catch (Exception ex)
{
Trace.TraceError(ex.Message);
}
}

public override bool OnStart()
{
// Set the maximum number of concurrent connections 
ServicePointManager.DefaultConnectionLimit = 12;

// For information on handling configuration changes
// see the MSDN topic at 

http://go.microsoft.com/fwlink/?LinkId=166357.


return base.OnStart();
}
}
}
  To publish the single endpoint of each service we use two lines of code:
var messenger = new Messenger(this.endPointAddress, "ContractAssembly.IHelloWorldService", "ContractAssembly.dll", "HelloWorld");

messenger.SendMessageToRouter();
We create a new Messenger instance and publish the new service endpoint using the SendMessageToRouter method. Sweet! For a quick overview of the solution, please see this code-map: codemap Well, that’s it!  I hope you enjoyed reading this article! Code on GitHub        
Published at DZone with permission of its author, Ilija Injac.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)