Tuesday, October 9, 2012

ZeroMQ + MsgPack + .NET

I have created a simple testbed to demonstrate 2 .NET applications communicating with each other using ZeroMQ Publisher/Subscriber connections for the communications layer, and MsgPack as the serialization/deserialization layer.

ZeroMQ is a highly efficient and scalable network layer that acts as a reliable messaging layer on top of raw sockets. Performance is key here.

For ZeroMQ I use the x86 NuGet package available here: http://nuget.org/packages/clrzmq/2.2.5

MsgPack is a serialization library where efficient storage of the serialized data, and performance of the serialization and deserialization is the main focus.

For MsgPack I found that the current NuGet package was somewhat behind the times, so I built the dll by downloading the latest source from https://github.com/msgpack/msgpack-cli and building it in release configuration, and adding it as a reference to both the client and server applications.

With the two libraries combined, an application should be very efficient and performance should be very high. I built this testbed primarily to see how straightforward it was to use these libraries from .NET applications. As you can see, it is very straightforward indeed!

Here is the client implementation:

using System;
using System.IO;
using MsgPack.Serialization;
using ZeroMQ_MessagePack_Testbed.Models;
using ZMQ;
namespace ZeroMQ_MessagePack_Testbed
{
class Program
{
static void Main(string[] args)
{
using (var context = new Context(1))
{
using (var subscriber = context.Socket(SocketType.SUB))
{
subscriber.Subscribe(new byte[] { 7, 7, 7, 7 });
subscriber.Connect("tcp://localhost:5556");
var serializer = MessagePackSerializer.Create<Message>();
while (true)
{
var response = subscriber.Recv();
using (var messageStream = new MemoryStream(response))
{
//trim the 4 byte header off
messageStream.Seek(4, SeekOrigin.Begin);
//deserialize from the stream
var message = serializer.Unpack(messageStream);
Console.WriteLine("CLIENT: {0} {1} {2}", message.Id, message.Timestamp, DateTime.UtcNow);
}
}
}
}
}
}
}
view raw Client.cs hosted with ❤ by GitHub


Here is the server implementation:

using System;
using System.IO;
using System.Threading;
using MsgPack.Serialization;
using ZeroMQ_MessagePack_Testbed.Models;
using ZMQ;
namespace ZeroMQ_MessagePack_Testbed.Server
{
class Program
{
static void Main(string[] args)
{
using (var context = new Context(1))
{
using (var publisher = context.Socket(SocketType.PUB))
{
publisher.Bind("tcp://*:5556");
var randomizer = new Random(DateTime.Now.Millisecond);
var serializer = MessagePackSerializer.Create<Message>();
while (true)
{
Thread.Sleep(500);
var message = new Message
{
Id = Guid.NewGuid(),
Priority = Priority.Medium,
Subject = "Test Message" + randomizer.Next(0, 100),
Body = "This is a test message " + randomizer.Next(0, 100000),
Timestamp = DateTime.UtcNow
};
Console.WriteLine("SERVER: {0} {1}", message.Id, message.Timestamp);
using (var messageStream = new MemoryStream())
{
//write 4 byte client id first
messageStream.Write(new byte[] { 7, 7, 7, 7 }, 0, 4);
//write the packed message to the stream
serializer.Pack(messageStream, message);
// Send message to 0..N subscribers via a pub socket
publisher.Send(messageStream.GetBuffer());
}
}
}
}
}
}
}
view raw Server.cs hosted with ❤ by GitHub


Here is the shared model:

using System;
namespace ZeroMQ_MessagePack_Testbed.Models
{
public class Message
{
public Guid Id;
public Priority Priority;
public string Subject;
public string Body;
public DateTime Timestamp;
}
public enum Priority
{
High,
Medium,
Low
}
}
view raw Message.cs hosted with ❤ by GitHub


The full solution is available here: https://github.com/SneakyBrian/ZeroMQ_MessagePack_Testbed

1 comment: