Service Bus Applications using Fubu Transportation

In today’s world it is more and more useful to have a system architecture where your system is distributed, for many different reasons. Splitting up your software into separate services can have a lot of benefits. Today we’re going to talk about one of the ways to overcome some of the problem that comes from splitting up your system, particularly over a network.

Your systems are going to need to be able to talk to each other over the network. You’re going to need ways to connect these services so they know how to talk to each other. They need to know how to ask for information or tell another what they know or what they need done. We’ll do this through a ‘messaging’ system. You’re going to need to handle dropped messages and other problems imminent with work over the wire.

Our company uses an OSS product called Fubu Transportation. It has changed names a few times, been pushed inside the main framework, pulled back out etc. But its job is to coordinating messages sent between different parts of the system.

First, you need some sort of Transport system. Fubu Transportation uses Lightning Queues. The transport is in charge of queuing up the messages and making sure they arrive where they are supposed to. It uses LightningDB for this. You can just use local storage, or you can hook it up to a database, generally a document Database. This allows it to be persistent and store the messages in case the receiver is unavailable or the service goes down before it is ready, or even schedule messages to be delivered at a later time.

The first thing you’ll do is set up a settings class that will house your channels. It needs the transport, port and a queue name.


public class TransportSettings
{
  public Uri Pinger { get; set; } =
    "lq.tcp://localhost:2342".ToUri();
  public Uri Ponger { get; set; } =
    "lq.tcp://localhost:2343".ToUri();
}

You certainly could use your choice of extracting this out into some type of settings convention and using configuration for these values.

Next you will configure the channels using FubuTransportRegistry<T> where T is your Settings class you defined above. There is a fluent interface off of a Channel() method to describe what each channel does and how.


public class PingApp : FubuTransportRegistry<TransportSettings>
{
  public PingApp()
  {
    // Configuring PingApp to send PingMessage's
    // to the PongApp
    Channel(x => x.Ponger)
      .AcceptsMessage<PingMessage>();

    // Listen for incoming messages from "Pinger"
    Channel(x => x.Pinger)
      .ReadIncoming();
  }
}

public class PongApp : FubuTransportRegistry<TransportSettings>
{
  // Listen for incoming messages from "Ponger"
  public PongApp()
  {
    Channel(x => x.Ponger)
      .ReadIncoming();
  }
}

Accepting incoming messages

Options off of Channel():
Listens on a specific channel for incoming messages.
.ReadIncoming()

Accept a specific message type
.AcceptsMessage<PingMessage>()

Specific message like this.
.AcceptsMessage(typeof(PongMessage))

Accept messages by some sort of naming convention
.AcceptsMessages(type => type.Name.EndsWith("Message"))

Specify a type within an assembly and accept those types
.AcceptsMessagesInAssemblyContainingType<PingMessage>()

Assembly by name
.AcceptsMessagesInAssembly("MyMessageLibrary")

Accept by namespace
.AcceptsMessagesInNamespace("MyMessageLibrary")

Namespace containing type
.AcceptsMessagesInNamespaceContainingType<PingMessage>()


Message Persistence:

This ignores any persistence and just delivers the message
quickly without persisting it
.DeliveryFastWithoutGuarantee()

Default, but explicitly says you want to persist messages for safe delivery
.DeliveryGuaranteed()

Fubu Transportation has some continual messaging that it does to coordinate activities like setting up subscriptions, and health checks. You should set up a control channel for this.


public ControlChannelApp()
{
  Channel(x => x.Control)
    .UseAsControlChannel()
    .DeliveryFastWithoutGuarantee();
}

Message Consumers

Next we’ll look at how to handle or consume these messages.

We will create consumer methods. FT discovers these by just being a public method on a public class that has a single parameter of the message type that is handles.


public class SimpleConsumer
{
  public void Consume(PingMessage message)
  {
    Console.WriteLine("I handle PingMessage");
  }
}

Fubu Transportation supports the idea of what we call Cascading Messages.
This is a behavior of being able to create reply messages and/or sending another message after one message is processed, all by convention without explicitly setting up more service bus code to send another message. This is done by changing the return type.


public class CascadingConsumer
{
  public MyResponse Consume(MyMessage message)
  {
    return new MyResponse();
  }
}

When FT executes this consume method, it “knows” that it needs to then send out another message of type MyResponse, with what ever delivery rules apply to its type. FT keeps track of this delivery as one transaction. It will not send the next message until the first message has been fully executed. This also helps with unit testing a lot, because you can check the state of objects within the consume method without having to re-mock the entire bus call here.

You can also use the .Request<T> method instead of a standard .Send() to tell the handler that you’re the one that wants the message back, instead of using the existing routing rules.


public Task<MyResponse> GatherResponse()
{
  return _bus.Request<MyResponse>(new MyMessage());
}

Can be handled by


public MyResponse Consume(MyMessage message)
{
  return new MyResponse();
}

You can also set up conditional responses to send different messages depending on conditions in the consumer. Just set the return type to object, and when the message is returned, it will pick up the configuration for its type and be delivered to where it belongs.


public class ConditionalResponseConsumer
{
  public object Consume(DirectionRequest request)
  {
    switch (request.Direction)
    {
      case "North":
        return new GoNorth();
      case "South":
        return new GoSouth();
    }

    return null;
  }
}

You can also schedule messages for a later time or date.
Note, persistent storage is a good idea here.


public DelayedResponse Consume(DirectionRequest request)
{
  // Process GoWest in 5 minutes
  return new DelayedResponse(new GoWest(), TimeSpan.FromMinutes(5));
}

You can also return multiple cascading messages by setting the return type to IEnumerable<object>


public IEnumerable<object> Consume(MyMessage message)
{
  // Go North now
  yield return new GoNorth();

  // Go West in an hour
  yield return new DelayedResponse(new GoWest(), TimeSpan.FromHours(1));
}

Dynamic Subscriptions

In your application’s Transport Settings file you should be setting up different channels for incoming and outgoing messages to separate concerns.


public class NodeSettings
{
    // This uri points to a different
    // application
    public Uri OtherApp { get; set; }

    // This uri should be the shared
    // channel that all nodes in the
    // application cluster are reading
    public Uri Receiving { get; set; }
}

Now you’ll configure a local subscription for each individual node of the application and a global subscription if you have the same service being load balanced on several servers.


public class LocalApp : FubuTransportRegistry<NodeSettings>
{
  public LocalApp()
  {
    // Explicitly set the logical descriptive
    // name of this application. The default is
    // derived from the name of the class
    NodeName = "MyApplication";

    // Incoming messages
    Channel(x => x.Receiving)
      .ReadIncoming();

    // Local subscription to only this node
    SubscribeLocally()
      .ToSource(x => x.OtherApp)
      .ToMessage<OtherAppMessage1>();

    // Global subscription to the all the
    // running nodes in this clustered application
    SubscribeAt(x => x.Receiving)
      .ToSource(x => x.OtherApp)
      .ToMessage<OtherAppMessage2>()
      .ToMessage<OtherAppMessage3>();
  }
}

FT uses subscription persistence to store information about the list of active applications, nodes, and subscriptions. It has built in message types for SubscriptionsChanged, SubscriptionRequested, and SubscriptionsRemoved that it uses to coordinate subscriptions across running nodes and applications.

When a FT service bus application is initialized, it:

  • Loads the same information about any other nodes in the logical application cluster from the subscription storage
  • Loads the list of previously persisted message subscriptions
  • Sends out SubscriptionRequested messages to request dynamic subscriptions

  • Sagas

    FT supports the idea of Sagas, which are basically long lived message processes.

    Think of a state machine. There is a main Saga Object that has all the logic for transitioning between different states and what should happen when that happens. Then there are individual saga states that get run through the system.These objects are durably stored. Events are published through messages that include the current state and what event happened. It gets passed through the state machine and it updates the object accordingly and then emits a completed message so the rest of the application can react accordingly as well.