About Me

My photo
I am passionate .NET developer who believes in the power of collaboration and craftsmanship.

Blog Archive

Monday, January 9, 2017

Star Problem

Here is the problem. Let say you have a large CSV file that can not be opened completely in memory. Each line is a record that represents a star. The star record has a name and it's coordinates x, y and z. From this file you need to determine the 10 closest stars to a given star by the distance between the given star and the ones in the file using a function that will return the distance value as an double. How would you solve this problem?

Now that I have set up the question I am going to show you how I solved it.

To start off I am going to build the Star object. The star object has some important properties including X,Y,Z and Name that are defined in the question plus a property call DistanceToCompare. This property will hold the value of the distance between the star and the given star. To populate this value their is a method called CalculateDistanceToCompare which takes in the given star object as a parameter. This function follows the standard calculation for determining the distance between two points via their x,y,z coordinates.

In addition to mentioned properties and methods there is a factory method called Create that takes in a string value which will be a line value from CSV file and creates a new star. The Star object also implements IComparable which is important part to the solution it makes it's comparison based on the property value DistanceToCompare. The complete listing is below.

public class Star : IComparable<Star>
{
    public Star(int x, int y, int z, string name)
    {
        this.X = x;
        this.Y = y;
        this.Z = z;
        this.Name = name;
    }
    public int X { get; private set; }
    public int Y { get; private set; }
    public int Z { get; private set; }

    public double? DistanceToCompare { get; private set; }
    public string Name { get; private set; }
    public void CalculateDistanceToCompare(Star point2)
    {
        Func<int, double> squared = (value) =>
        Convert.ToDouble(value) * Convert.ToDouble(value);

        this.DistanceToCompare = Math.Sqrt(
            (squared(this.X - point2.X)) +
            (squared(this.Y - point2.Y)) +
            (squared(this.Z - point2.Z)));
    }

    public static Star Create(string value)
    {
        var parts = value.Split(new string[] { "," },
            StringSplitOptions.RemoveEmptyEntries);

        return new Star(
            int.Parse(parts[0]),
            int.Parse(parts[1]),
            int.Parse(parts[2]),
            parts[3]);
    }
    public override string ToString()
    {
        return string.Format("{0},{1},{2},{3}", X, Y, Z, Name);
    }

    public int CompareTo(Star other)
    {
        if (!this.DistanceToCompare.HasValue ||
                !other.DistanceToCompare.HasValue)
            throw new ArgumentException("No DistanceFromBaseStart to compare exception");

        if (this.DistanceToCompare.Value < other.DistanceToCompare.Value) return -1;
        if (this.DistanceToCompare.Value > other.DistanceToCompare.Value) return 1;
        return 0;
    }
}

Now we can create a given star and compare that star against any other star. But we still need a way to track the top 10 closest so what is the other part of the solution. What is needed is a data structure that can hold 10 items and each time a new item is added the maximum value moves to the top spot and if that structure has more then 10 items the maximum value can be removed and the new max value will move to the top spot.

The data structure that meets this criteria is a Max Heap AKA Priority Queue. If you do some searching in Google you can easily find implementations. Since I was recently reading "Problem Solving in Data Structures & Algorithms Using C#" by Hemant Jain I decided to use his implementation.

As you can see from the example code below it's quite easy to enumerate each line of the file and create a new star. Then compare that star to the given star and check the priority queue if the new star is less than the max value. If it is enqueue the new star and if the priority queue size is bigger then ten then execute the method dequeue which will remove the max value.

Once each line has been read you need to sort the values left in the queue if you want the shortest first and voila you got it.

    private static void StarQuestion()
    {
        System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch();
        watch.Start();

        Star startToBeCompared = new Star(1, 2, 3, "My Little Star");
        PriorityQueue<Star> queue = new PriorityQueue<Star>(false);

        using (var reader = new System.IO.StreamReader(".\\dump.csv"))
        {
            while (!reader.EndOfStream)
            {
                var newStar = Star.Create(reader.ReadLine());
                newStar.CalculateDistanceToCompare(startToBeCompared);
                if (queue.Count < 10)
                {
                    queue.Enqueue(newStar);
                }
                else
                {
                    if (queue.First().DistanceToCompare > newStar.DistanceToCompare)
                    {
                        queue.Enqueue(newStar);
                    }
                }

                if (queue.Count > 10) queue.Dequeue();
            }
        }
        queue.
            Take(10).
            OrderBy(k => k.DistanceToCompare).
            Select(item => string.Format("{0} - {1}", item.Name, item.DistanceToCompare)).
            ToList().
            ForEach(item => Console.WriteLine(item));


        watch.Stop();
        Console.WriteLine(watch.Elapsed.ToString());
        Console.WriteLine("Complete");

    }

The priority queue code listed below is from the Hemant Jain's book I mentioned earlier in the post with some minor naming modifications to make it clearer to me. If you are interested in Algorithms in C# I highly recommend the book. I find it complements MIT's Open Course Ware 6-006 Introduction To Algorithms which is from a Python perspective.

I hope you enjoyed this post.

class PriorityQueue<T> : ICollection<T> where T: IComparable<T>
{
    private const int CAPACITY = 16;
    private int size; // Number of elements in PriorityQueue
    private T[] queueStorage; // The PriorityQueue array
    bool isMinHeap;

    public PriorityQueue(bool minHeap = true)
    {
        queueStorage = new T[CAPACITY];
        size = 0;
        isMinHeap = minHeap;
    }
    public PriorityQueue(T[] array,bool minHeap = true)
    {
        size = array.Length;
        queueStorage = new T[array.Length + 1];
        isMinHeap = minHeap;
        Array.Copy(array, 0, queueStorage, 1, array.Length);

        for (int i = (size / 2); i > 0;i--)
        {
            PercolateDown(i);
        }
    }
    private int Compare(int first, int second)
    {
        if (isMinHeap)
            return queueStorage[first].CompareTo(queueStorage[second]);
        else
            return queueStorage[second].CompareTo(queueStorage[first]);

    }
    private void PercolateDown(int position)
    {
        int leftChild = 2 * position;
        int rightChild = leftChild + 1;
        int small = -1;
        T temp;

        if (leftChild <= size)
        {
            small = leftChild;
        }
        if(rightChild <= size && Compare(rightChild,leftChild)< 0)
        {
            small = rightChild;
        }
        if(small != -1 && Compare(small,position) <0)
        {
            temp = queueStorage[position];
            queueStorage[position] = queueStorage[small];
            queueStorage[small] = temp;
            PercolateDown(small);
        }
    }
    private void PercolateUp(int currentPosition)
    {
        int parentPostion = currentPosition / 2;
        if(parentPostion == 0) return;

        if(Compare(parentPostion,currentPosition) > 0) //parent greater that child.
        {
            T temp = queueStorage[currentPosition];
            queueStorage[currentPosition] = queueStorage[parentPostion];
            queueStorage[parentPostion] = temp;
            PercolateUp(parentPostion);
        }
    }
    public virtual void Enqueue(T value)
    {
        if(size == queueStorage.Length -1)
        {
            DoubleStorageCapacity();
        }

        queueStorage[++size] = value;
        PercolateUp(size);
    }
    private void DoubleStorageCapacity()
    {
        T[] oldQueueStorage = queueStorage;
        queueStorage = new T[queueStorage.Length * 2];
        Array.Copy(oldQueueStorage, 1, queueStorage, 1, size);
    }
    public virtual T Dequeue()
    {
        if (IsEmpty())
        {
            throw new InvalidOperationException("HeapEmptyException");
        }

        T value = queueStorage[1];
        queueStorage[1] = queueStorage[size];
        size--;
        PercolateDown(1);
        return value;
    }
    public virtual bool IsEmpty()
    {
        return (size == 0);
    }
    public virtual void Print()
    {
        for (int index = 1; index <= size + 1; index++)
        {
            Console.WriteLine("value is ::" + queueStorage[index]);
        }
    }
    public int Count
    {
        get { return size; }
    }
    public static void Sort(T[] array)
    {
        PriorityQueue<T> hp = new PriorityQueue<T>(array);
        for(int i =0; i < array.Length; i++)
        {
            array[i] = hp.Dequeue();
        }
    }
    public void Add(T item)
    {
        Enqueue(item);
    }

    public bool IsReadOnly
    {
        get { return false; }
    }
    public void Clear()
    {
        size = 0;
    }
    public T GetItem(int index)
    {
        return queueStorage[index];
    }
    public bool Contains(T item)
    {
        for(int i = 1;i <= size; i++)
        {
            if (queueStorage[i].Equals(item)) return true;

        }
        return false;
    }
    public void CopyTo(T[] array,int arrayIndex)
    {
        Array.Copy(queueStorage, 0, array, arrayIndex, size);
    }
    public bool Remove(T item)
    {
        for(int i =1; i <= size; i++)
        {
            if (queueStorage[i].Equals(item))
            {
                queueStorage[i] = queueStorage[size];
                size--;
                PercolateDown(i);
                PercolateUp(i);
                return true;
            }
        }
        return false;
    }
    public IEnumerator<T> GetEnumerator()
    {
        return new PriorityQueueEnumerator<T>(this);
    }
    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    internal class PriorityQueueEnumerator<T> : IEnumerator<T> where T : IComparable<T>
    {
        private PriorityQueue<T> priorityQueue;
        int index;
        T current;

        public PriorityQueueEnumerator(PriorityQueue<T> pq)
        {
            this.priorityQueue = pq;
            current = default(T);
            index = 1;
        }
        public T Current
        {
            get { return current; }
        }
        object IEnumerator.Current
        {
            get
            {
                return this.Current;
            }
        }
        public void Dispose()
        {
            priorityQueue = null;
            current = default(T);
            index = 1;
        }
        public bool MoveNext()
        {
            //Console.WriteLine("hit");
            if (index <= priorityQueue.Count)
            {
                current = priorityQueue.GetItem(index);
                index++;
                return true;
            }
            return false;
        }
        public void Reset()
        {
            current = default(T);
            index = 1;
        }

    }
  }

Monday, March 9, 2015

IOC Contrib Design Flaw

IOC Contrib Design Flaw

When I was first introduced to Akka.net one of my preliminary thoughts was this framework needed IOC support. So I decided to pitch in but before I wrote any code I started doing research into the what it would take to get this done.

Initial Research

The first thing that I found was a Java implementation using the Spring IOC container. From this implementation I began the process of translating the Java into C# so I could understand better how the whole thing worked.

That research led me to open up two discussions on the Google group with concerns that I had based on a great post I read from Mark Seemann who incidentally wrote an excellent book called Dependency Injection in .NET. The basic gist to the discussion was that the interface IndirectActorProducer was missing a method to make working with an IOC container more DI friendly.

So instead I trying to solve that issue I decided to move forward making my contribution using that interface as the basis for coding a DI plugin.

Bug Report

Just recently as I was moving forward with my Event Store with Cassandra and Akka @aaronontheweb pointed out a bug report filed against my contribution relating to the for mentioned issues I brought up in the discussion group but did not pursue a solution for.

So since I have the contribution complete based on the old interface I decided to see if I could come up with a solution to this issue. As a result of my research into the issue and chatting over on Gitter I was able to enlisted the help of @stefansedich to rewrite the AutoFac portion of the code. Which is great since my IOC container of choice is Castle Windsor and I really want this contribution to be first rate so I am very thank full to him for his help.

Proposal

To make this work a few things will need to change with Akka.net to make this work properly. The current IndirectActorProducer interface is going to need a slight change with the addition of a new release method.

public interface IndirectActorProducer : IRelease
{
    /// <summary>
    ///     This factory method must produce a fresh actor instance upon each
    ///     invocation. It is not permitted to return the same instance more than
    ///     once.
    /// </summary>
    /// <returns>A fresh actor instance.</returns>
    ActorBase Produce();

    /// <summary>
    ///     This method is used by [[Props]] to determine the type of actor which will
    ///     be created. The returned type is not used to produce the actor.
    /// </summary>
    Type ActorType { get; }

    /// <summary>
    /// New Method to release the actor produced by this factory's
    /// implementation
    /// </summary>
    void Release(ActorBase actor);
}

The next part is that framework is going to need to call the release method when the Actor is no longer needed by the system. The ActorCell calls the method FinishTerminate and it's from this method we need to call Release. In order to be able to call release the Props object which is injected with the IndirectActorProducer is going to need a need method to pass through the request from the ActorCell to the IndirectActorProducer. To accomplish this I propose either a new IReleaseInterface with the release method and implemented by Props or a new internal Release method be used.

Implementation A

public interface IRelease
{
    void Release(ActorBase actor);

}

public class Props : IRelease
{
    //Rest of implementation here

    void Release(ActorBase actor)
    {
        if (this.producer != null) this.producer.Release(actor);
        actor = null;       
    }

}
private void FinishTerminate()
    {
        // The following order is crucial for things to work properly. Only change this if you're very confident and lucky.
        // 
        // Please note that if a parent is also a watcher then ChildTerminated and Terminated must be processed in this
        // specific order.
        var a = _actor;
        try
        {
            if (a != null)
            {
                a.AroundPostStop();

                //if the actor uses a stash, we must Unstash all messages. 
                //If the user do not want this behavior, the stash should be cleared in PostStop
                //either by calling ClearStash or by calling UnstashAll.
                UnstashAllActorMessages(a);
            }
        }
        catch (Exception x)
        {
            HandleNonFatalOrInterruptedException(
                () => Publish(new Error(x, _self.Path.ToString(), ActorType, x.Message)));
        }
        finally
        {
            try
            //TODO: Akka Jvm: this is done in a call to dispatcher.detach()
            {

                //TODO: Akka Jvm: this is done in a call to MessageDispatcher.detach()
                {
                    var mailbox = Mailbox;
                    var deadLetterMailbox = System.Mailboxes.DeadLetterMailbox;
                    SwapMailbox(deadLetterMailbox);
                    mailbox.BecomeClosed();
                    mailbox.CleanUp();
                }
            }
            finally
            {
                try { Parent.Tell(new DeathWatchNotification(_self, existenceConfirmed: true, addressTerminated: false)); }
                finally
                {
                    try { TellWatchersWeDied(); }
                    finally
                    {
                        try { UnwatchWatchedActors(a); }
                        finally
                        {
                            if (System.Settings.DebugLifecycle)
                                Publish(new Debug(_self.Path.ToString(), ActorType, "Stopped"));

                            ClearActor(a);
                            ClearActorCell();
                            Release(a);
                            _actor = null;
                        }
                    }
                }
            }
        }
    }

private void Release(ActorBase actor)
{
    (IRelease)_props.Release(actor);

}

Implementation B

public interface IRelease
{
    void Release(ActorBase actor);

}

public class Props 
{
    //Rest of implementation here

    internal void Release(ActorBase actor)
    {
        if (this.producer != null) this.producer.Release(actor);
        actor = null;       
    }

}
private void FinishTerminate()
    {
        // The following order is crucial for things to work properly. Only change this if you're very confident and lucky.
        // 
        // Please note that if a parent is also a watcher then ChildTerminated and Terminated must be processed in this
        // specific order.
        var a = _actor;
        try
        {
            if (a != null)
            {
                a.AroundPostStop();

                //if the actor uses a stash, we must Unstash all messages. 
                //If the user do not want this behavior, the stash should be cleared in PostStop
                //either by calling ClearStash or by calling UnstashAll.
                UnstashAllActorMessages(a);
            }
        }
        catch (Exception x)
        {
            HandleNonFatalOrInterruptedException(
                () => Publish(new Error(x, _self.Path.ToString(), ActorType, x.Message)));
        }
        finally
        {
            try
            //TODO: Akka Jvm: this is done in a call to dispatcher.detach()
            {

                //TODO: Akka Jvm: this is done in a call to MessageDispatcher.detach()
                {
                    var mailbox = Mailbox;
                    var deadLetterMailbox = System.Mailboxes.DeadLetterMailbox;
                    SwapMailbox(deadLetterMailbox);
                    mailbox.BecomeClosed();
                    mailbox.CleanUp();
                }
            }
            finally
            {
                try { Parent.Tell(new DeathWatchNotification(_self, existenceConfirmed: true, addressTerminated: false)); }
                finally
                {
                    try { TellWatchersWeDied(); }
                    finally
                    {
                        try { UnwatchWatchedActors(a); }
                        finally
                        {
                            if (System.Settings.DebugLifecycle)
                                Publish(new Debug(_self.Path.ToString(), ActorType, "Stopped"));

                            ClearActor(a);
                            ClearActorCell();
                            Release(a);
                            _actor = null;
                        }
                    }
                }
            }
        }
    }

private void Release(ActorBase actor)
{
    _props.Release(actor);

}

Either solution will provide the desired result. Since this is just a proposal I would be interested to see what others think. Please feel free to comment either on my blog or at https://gitter.im/akkadotnet/akka.net.

Monday, February 23, 2015

Cassandra DB - EventStore (Application Code Part I - Mapping)

Cassandra DB - EventStore (Application Code Part I - Mapping) The next step in the process is to begin creating the application code that will be used by the Aggregates to interactive with the Event Store(ES). As before I am using an older code base as my template but with some minor adjustments.
public interface IAppendOnlyStore : IDisposable
 {
     void Append(string streamName, 
                 byte[] data, 
                 long expectedStreamVersion = -1);
     Task<IEnumerable<DataWithVersion>> ReadRecords(string streamName, 
                                                    long afterVersion, 
                                                    int maxCount);
     Task<IEnumerable<DataWithName>> ReadRecords(DateTimeOffset afterVersion, 
                                                 int maxCount);
     Task<DateTimeOffset> GetCurrentVersion();
 }
This abstraction as the name implies allows for Append Only Access to the underlying Cassandra store providing a total of 4 methods to Append new events, Read Records for a named streamed, Read All Records regardless of stream from a given point in time and finally the current version of the ES.

Now before I can code the implementations to this abstraction I have to do some mapping of Value Objects which represent the row values to actual Cassandra Tables. As discussed in the previous post the three tables are:
  • Events
    • Used to pull events for a given aggregate
  • EventsToBeProcessed
    • Used to determine which events have yet to be projected
  • EventsVersionsToBeProcessed
    • Used to determine the current version of the store
The Value objects that represent the entities for those three tables are:
  • Record -> Events
  • RecordToBeProcesed -> EventsToBeProcessed
  • EventStoreVersion -> EventsVersionsToBeProcessed
The definitions for those values objects are as follows:
public class Record
{
    public Guid Id { get; set; }
    public string Name { get; set; }
    public long Version { get; set; }
    public DateTimeOffset VersionTimeStamp { get; set; }
    public byte[] Data { get; set; }
}
public class RecordToBeProcesed
{
    public Guid Id { get; set; }
    public string Name { get; set; }
    public long Version { get; set; }
    public DateTimeOffset VersionTimeStamp { get; set; }
    public byte[] Data { get; set; }
    public bool Processed { get; set; }
}
public class EventStoreVersion
{
    public DateTimeOffset VersionTimeStamp { get; set; }
    public bool Processed { get; set; }
}
The property names are identical to those on the tables so I won't spend anytime going over them. In my previous post you can see the definitions for all the columns.

When you use the Fluent Mapping API you have to keep in mind a few things. One there is no method Primary Key so when you have a compound primary key the first value of the key is considered the Partition Key. Two, all the others fields after the Partition Key in the primary are considered the Clustering Key. This is very important and something that I found confusing initially.

Lets take a look at the mapping for these three Value Objects and then go from there.
MappingConfiguration.
   Global.
   Define(
          new Map<Record>()
          .TableName("Events").
           KeyspaceName("EventStore").
           Column(r => r.Id, 
                       cm => cm.WithName("id").
                             WithDbType<Guid>()).
           Column(r => r.Name, 
                       cm => cm.WithName("name").
                             WithDbType<string>()).
           Column(r => r.VersionTimeStamp, 
                       cm => cm.WithName("version_time_stamp").
                             WithDbType<DateTimeOffset>()).
           Column(r => r.Version, 
                       cm => cm.WithName("version").
                       WithDbType<long>()).
           Column(r => r.Data, 
                       cm => cm.WithName("data").
                       WithDbType<byte[]>()).
           PartitionKey(e => e.Name).
           ClusteringKey(Tuple.Create("version_time_stamp", SortOrder.Ascending),
                         Tuple.Create("version", SortOrder.Unspecified)),

           new Map<RecordToBeProcesed>()
           .TableName("Events").
           KeyspaceName("EventStore").
           PartitionKey("processed").
           ClusteringKey(Tuple.Create("version_time_stamp", SortOrder.Ascending),
                         Tuple.Create("name", SortOrder.Descending),
                         Tuple.Create("version", SortOrder.Ascending)).
           KeyspaceName("EventStore").
           Column(r => r.Id, 
                       cm => cm.WithName("id").
                             WithDbType<Guid>()).
           Column(r => r.Name, 
                       cm => cm.WithName("name").
                             WithDbType<string>()).
           Column(r => r.VersionTimeStamp, 
                       cm => cm.WithName("version_time_stamp").
                                WithDbType<DateTimeOffset>()).
           Column(r => r.Version, 
                       cm => cm.WithName("version").
                       WithDbType<long>()).
           Column(r => r.Data, 
                       cm => cm.WithName("data").
                       WithDbType<byte[]>()).
           Column(r => r.Processed, 
                       cm => cm.WithName("processed").
                             WithDbType<bool>()),

           new Map<EventStoreVersion>().
           TableName("EventsVersionsToBeProcessed").
           KeyspaceName("EventStore").
           PartitionKey("version_time_stamp").
           ClusteringKey("processed").
           KeyspaceName("EventStore").
           Column(r => r.VersionTimeStamp, 
                       cm => cm.WithName"version_time_stamp").
                       WithDbType<DateTimeOffset>()).
           Column(r => r.Processed, 
                       cm => cm.WithName("processed").
                       WithDbType<bool>()));`
You will notice from examining the code sample above that to use the Fluent Mapping API correctly you need to know the following about the tables your are mapping:
  • Table Name
  • Keyspace Name
  • Partition Key
  • Clustering Key if applicable
  • Each column name in the table and its data type.
So in my table definition for EventsVersionsToBeProcessed I have two columns:
  • version_time_stamp
  • processed

These two fields also represent the primary key. So in this example the Partition Key is version_time_stamp and the Clustering Key is processed. Once you have that then the rest is straight forward. So create a new Map<T> instance, were T is the type you are mapping to, then use the fluent API to provide the table name, keyspace name, partition key as we just defined, along with the Clustering Key and then the definitions of the columns including the column names and data types.

One thing to keep in mind when using the Mapping API as I have it's not thread safe so if you run concurrent tests you will experience concurrency issues. If you are interested I solved this issue simply by wrapping all the necessary cluster, session creation and mapping methods into a abstraction that I am calling CassandraEnvironment. Within this object I use a static constructor to ensure that the mapping only occurs once despite the number of instances created during the tests. If you are interested the code sample is available here.

In the next post ,now that we have the mapping complete, I will be discussing the actually implementation of IAppendOnlyStore and some of the design decisions that went into its implementation.

Thanks for Reading...

Thursday, February 12, 2015

Cassandra DB - Event Store (Part III Second Model)

Modeling Cassandra Blog Post 2

In my original attempt to model an Event Store (ES) I used a single table. That was a bit of a naive approach. My past experiences modeling relational databases like MS SQL gave me false sense of security. Even though Cassandra's query language CQL is simple and similar to T-SQL in syntax that is by no means go good way to compare the two when it comes to modeling.

As I hinted to in my previous post the solution to modeling an ES with CQL is to use multiple tables. In order to determine which tables were necessary I first had to determine what information I needed to query.

Just like last time I started off with the Events table. It looks pretty much the same as my original but with some key differences. The first is the name column is now the clustering key,instead of id, followed by version_time_stamp then version. You will also notice that I added WITH CLUSTERING ORDER BY (version_time_stamp ASC).

CREATE KEYSPACE IF NOT EXISTS 
"EventStore" WITH replication = 
{'class':'SimpleStrategy', 'replication_factor':3};

CREATE TABLE IF NOT EXISTS
"EventStore"."Events" (
id uuid,
name varchar,
version int,
version_time_stamp timestamp,
data blob,
PRIMARY KEY(name,version_time_stamp, version ))
WITH CLUSTERING ORDER BY (version_time_stamp ASC);

This schema allows all events for a named aggregate to be pulled in the order they were created. The statement WITH CLUSTERING ORDER BY (version_time_stamp ASC) provides this capability. So when I want all the events in the order they occurred for I given key its is simple as the following query.

//query pulls all events for the given key
// account1
SELECT *
FROM    "EventStore"."Events"
WHERE   name = 'account1';

If there was a case, like a read model rebuild, were all events are needed regardless of aggregate key from a given point in time moving forward then following query could be used.

SELECT * 
FROM "EventStore"."Events" 
WHERE version_time_stamp >= 1423273614879 ALLOW FILTERING;

Notice the use of ALLOW FILTERING. This is necessary but performance is not optimal and should only be used in situation were performance is not a issue.

One of the other requirements is a way to select all the recently committed events to generate projections. Now the previous query could be used but it's not exactly optimal. To resolve this I decided the formation of two additional tables was necessary.

The first is all of the events that have not yet been projected and the second is a check point used to keep track of the time-line of events that need to be processed.

The first of the two tables are almost identical to the Events table with the exception of the primary key and cluster ordering. In this case the partition key is processed followed by version_time_stamp, name and version columns.

CREATE TABLE IF NOT EXISTS
"EventStore"."EventsToBeProcessed" (
id uuid,
name varchar,
version int,
version_time_stamp timestamp,
data blob,
processed boolean,
PRIMARY KEY(processed,version_time_stamp,name,version ))
WITH CLUSTERING ORDER BY (version_time_stamp ASC, name DESC, version ASC);

This table schema allows for queries like:

//All events from the specified time stamp backwards
SELECT * 
FROM "EventStore"."EventsToBeProcessed" 
WHERE processed = false AND version_time_stamp <= 1423273614879;

//All events between a specific time-line
SELECT * 
FROM "EventStore"."EventsToBeProcessed" 
WHERE processedv= False AND version_time_stamp >= 1423273614879 AND 
version_time_stamp <= 1423273614879;

In order to determine what version_time_stamp to use it is necessary to track each time stamp and when it was processed.

CREATE TABLE IF NOT EXISTS
"EventStore"."EventsVersionsToBeProcessed"(
version_time_stamp timestamp,
processed boolean,
PRIMARY KEY(version_time_stamp, processed));

This table is very simple but it serves the purpose of tracking the current version of the store. With this I can find the current version or the current version that has not been processed.

//Current Version
SELECT * 
FROM "EventStore"."EventsVersionsToBeProcessed" 
LIMIT 1;

//Current Version that has not been processed
SELECT * 
FROM "EventStore"."EventsVersionsToBeProcessed" 
WHERE processed = false 
LIMIT 1 ALLOW FILTERING;

To make this process work I will employ the use of CQL BATCHES like in the following example.

BEGIN BATCH

    //This is just an example
    INSERT INTO "EventStore"."Events" 
    (id,name,data,version, version_time_stamp) 
    VALUES(Uuid(),'account1',null,1,1423269363256);

    INSERT INTO "EventStore"."EventsToBeProcessed" 
    (id,name,data,version, version_time_stamp,processed) 
    VALUES(Uuid(),'account1',null,1,1423269363256,false);

    INSERT INTO "EventStore"."EventsVersionsToBeProcessed" 
    (version_time_stamp,processed) 
    VALUES(1423269363256, false);

APPLY BATCH;

There are a few more details that need to be worked out like an Update query to be run against the two processed tables once projections for a given point in time have been completed and possible DELETE queries to purge out that information. These details I will address during the creation of the application code.

I think this is a good point to stop. The base model is complete and may be adjusted but it certainly meets my core requirements. The next time I will go through the application code.

Thanks for reading...

Tuesday, February 3, 2015

Cassandra DB - Event Store (Part II First Model)

My first model is based on a simple SQL implementation that I have used in the past which is defined as follows:

CREATE TABLE [dbo].[Events](
[Id] [int] PRIMARY KEY IDENTITY,
[Name] [nvarchar](50) NOT NULL,
[Version] [int] NOT NULL,
[Data] [varbinary](max) NOT NULL
) ON [PRIMARY]

The table contains only four columns:
  • Id - An identity integer value which is the primary key and used to determine the version of the Event Store with the following query.
  • SELECT MAX(Id) as version FROM Events
  • Name - Is a key used to identify the stream of events for an aggregate.
  • Version - Used to sort the events for a particular stream to ensure they are order in chronological order.
  • Data - The raw binary representation of the events for the given key and version.
The first modeling challenge that I have to deal with is how to determine the version of the Event Store on system start. Since Cassandra does not have an Identity column and because its great for time series modeling the logical decision is to try and use the timestamp datatype. So lets see how that could look.

CREATE KEYSPACE IF NOT EXISTS 
"EventStore" WITH replication = 
{'class':'SimpleStrategy', 'replication_factor':3};

CREATE TABLE IF NOT EXISTS
 "EventStore"."Events" (
id uuid,
name varchar,
version int,
version_time_stamp timestamp,
data blob,
PRIMARY KEY(id, name))
WITH CLUSTERING ORDER BY (name DESC);

CREATE INDEX 
IF NOT EXISTS version_index ON 
"Events"(version_time_stamp);

This time around there are five columns and one index:

  • id - primary partition key
  • name - Name is the same as the SQL model
  • version - The same as the SQL model
  • version_time_stamp - Time stamp when the record was created
  • data - The same as the SQL model
  • version_index - Index Used to pull missing records

Now that I have a table model and index defined I would like to write the following queries to get the current version of the event store and compare that value against the last know version and if it differs then use that last known value as a parameter in the second query to get all of the missing records.

//Get the current version
SELECT version_timestamp
FROM Events 
LIMIT 1

//Get All the records since the last version
SELECT id, name,version,version_timestamp,data 
FROM Events 
WHERE version_index >= ?


The problem with this solution is that I can not use version_index without the partition key, the version_timestamp needs to be a part of the CLUSTERING ORDER BY statement in the table definition to ensure proper ordering and my secondary index does not have high cardinality. Which does not follow the recommendations from DataStax.

When to Use Secondary IndexesCassandra's built-in secondary indexes are best on a column family having many rows that contain the indexed value. The more unique values that exist in a particular column, the more overhead you will have, on average, to query and maintain the index. For example, suppose you had a user table with a billion users and wanted to look up users by the state they lived in. Many users will share the same column value for state (such as CA, NY, TX, etc.). This would be a good candidate for a secondary index.
When Not to Use Secondary Indexes
Do not use secondary indexes to query a huge volume of records for a small number of results. For example, if you create indexes on columns that have many distinct values, a query between the fields will incur many seeks for very few results. In the column family with a billion users, looking up users by their email address (a value that is typically unique for each user) instead of by their state, is likely to be very inefficient. It would probably be more efficient to manually maintain a dynamic column family as a form of an index instead of using a secondary index. For columns containing unique data, it is sometimes fine performance-wise to use secondary indexes for convenience, as long as the query volume to the indexed column family is moderate and not under constant load.
I basically have a model that is full of impedances mismatches with my queries. So what is the next step? Well I will go through that the next time but here is a hint based on some advice that got recently from DataStax's Luke Tillman author of CQL Poco on modeling with Cassandra.
Many times, you'll end up with a "table per query" type data model, where you insert multiple copies of the data at write time with each table designed to handle a specific query.
Before signing off let me recommend two excellent resources created by Luke Tillman which are a great slide aimed at .NET Developers and the other is Luke's blog and in particular the following post if you are just getting started.

Thanks for reading. 

Wednesday, January 21, 2015

Cassandra DB - Event Store (Part I - Research)

Recently I have been doing research on Cassandra DB considering it for an event store. Its reputation as being an extremely performant and horizontally scalable key value data store made it stand out as one that was worth my time investigating. In addition to performance what also makes it an attractive store is that it is an open source project backed by the company Datastax which provides a entire platform around it. Cassandra itself is Apache 2.0 license and Datastax provides a C# driver as well as many other popular platforms.

Interesting enough as I was reading through the Apache Cassandra website I came across an excellent post they linked to detailing why that particular company had chosen Cassandra. It was not until after I read about half way through the post that I realized it was written by one of the Akka.net guys Aaron Stannard. If you are not sure Cassandra is worth the investment in time I urge you to read his post especially the comments it will I am sure change your mind.

Since my goal is to use this with the Dot Net Platform having a C# driver is extremely important. Datastax wrote their own driver which runs on Framework 4.0. They also provide nice documentation and how tos which is extremely helpful.  One other awesome DB that I will also investigate is Foundation DB since it also has a supported driver but it's a little more restrictive in its use.
The free version is the exact same software. The only difference is that you're limited to 6 processes in production. Development and testing processes are unlimited.
The team at Foundation DB have a great product but when I first investigated it's use about a year ago the Dot Net Story was not that strong. Now with all the recent changes I am looking forward to giving it another go. In fact I am planning on testing it against Cassandra once I have built my initial prototype.

One last thing about Event Stores before I continue, on the Dot Net Side there is a great project called NEventStore which has already done all the hard work with whole bunch of different DBs out of the box, like Ravendb and Mongodb, which can be an excellent solution as well as another great product called EventStore which some may consider the gold standard for Event Stores.

Besides the C# driver provided by Datastax there are a few others like Fluent Cassandra but it was recommended to me by one of the contributors to Fluent Cassandra that if I was using the latest build of database that I might be better of using the Datastax driver.

As a part of my setup I decided to download Cassandra directly from the Apache Cassandra website not using the Datastax download. The setup was simple enough, just follow these simple steps.

  1. Download the database .gz file from Apache Cassandra website
  2. Extract the .gz it to a local directory on your windows machine using a tool like Peazip.
  3. Download python  2.x version (skip to step 5 if you already have python installed)
  4. Install python with the msi 
  5. Add the python path under System Properties -> Environmental Variables -> System Variables ->Path 
  6. System Properties -> Environmental Variables -> System Variables ->Path
  7. Ensure you have the Java installed with the latest version if not download and install it.
  8. Ensure that the path has for Java has been added System Properties -> Environmental Variables -> System Variables ->Path
  9. Open a command prompt under administrator
  10. Navigate to the bin directory under the parent directory you extract Cassandra to.
  11. Finally type cassandra -f and you will have a running Cassandra db.
At some point I will get Docker working properly on windows with casandra container but this setup will do nicely for now. Next time I will just do a quick demostration on what its like to interact with the datastax C# driver then from there I will demonstrate a basic event store.

Thanks for reading....







Thursday, January 8, 2015

Monitoring Akka.net Based Systems

Building maintainable and reliable systems is hard because of their impermanent and fragile nature. In order for systems to solve important problems or provide business value they have to function properly.

During the development process unit tests, use cases and domain users provide important feedback to ensure what is built meets the desired outcome. That is the reason for methodologies like Agile, TDD and Lean but once a system is in production how can you ensure that it is still functioning properly?

The path to success can be found by embracing the fact that software is fragile and just like during development feedback is key. It's not enough to just monitor if a daemon or windows service is running. What is truly important is that it is running smoothly and not stuck in some kind of chaotic state.

There is a great blog post by Ian Malpass titled Measure Anything, Measure Everything which details a project he started called StatsD. I think this quote from the post sums up StatsD.

StatsD is a simple NodeJS daemon (and by “simple” I really mean simple — NodeJS makes event-based systems like this ridiculously easy to write) that listens for messages on a UDP port. (See Flickr’s “Counting & Timing” for a previous description and implementation of this idea, and check out the open-sourced code on github to see our version.) It parses the messages, extracts metrics data, and periodically flushes the data to graphite.
A few months ago I a became acquainted with the Akka.net project for creating Actor based systems listening to .Net Rocks. As a result of my interests in the project I decided to contribute by creating an extension, based on the Java version of Akka, that create Actors using dependency injection. While on that journey I got some great feedback from Bartosz Sypytkowski and Aaron Stannard not to mention Roger Alsing.

When Aaron was giving me feedback on my documentation he provided me with a link to his akka-monitoring project as a reference. The akka-monitoring project provides monitoring for Akka.net based systems using StatsD right out of the box. It was from this project that I came across the for mention blog post and was my inspiration for this post.

GraPHPite Demonstration

Akka-monitoring is really simple to use and provides an array of automated measures but some of the measures require the programmer to implement some simple code as demonstrated here.

https://github.com/Aaronontheweb/akka-monitoring
In Aaron's FAQ I saw the following question that got my interest.

Any plans to automatically collect actor lifecycle and message received data? 
That depends largely on how much traction Akka.Monitoring gets - we're considering subclassing UntypedActor and TypedActor to automatically provide lifecycle and receive instrumentation, but we want to see how other people use it first.
This got me thinking would be possible to solve the for mention question without subclassing but instead with using AOP.  So I decided to create a thing called an Interceptor, which is Castle Windsor's way of doing AOP, and wired that up to Akka.DI.CastleWindsor.

The interceptor creates a proxy around the actual object that it represents. Each method executed on the object, which is an Actor in the case, is intercepted prior to executing allowing you to call the various akka-monitoring methods without actually touching the Actors directly. Here is a very basic example.

https://github.com/jcwrequests/AkkaMonitoringSampe/blob/master/MonitorInterceptor.cs
https://github.com/jcwrequests/AkkaMonitoringSampe/blob/master/MonitorInterceptor.cs
I have created a working sample based on the akka-monitoring examples using my own simple console monitor to demonstrate the concept presented in this post without the need for a StatsD server.  This sample is for demonstration purposes only. If you are interested in pursuing this for your Akka based system then I would recommend following the best practices in the CastleWindsor documentation and consulting the akka-monitoring project where you will find excellent documentation on setting up a StatsD Server as well as implementing your own monitor if StatsD does not fit your needs.