About Me

My photo
I am passionate .NET developer who believes in the power of collaboration and craftsmanship.

Blog Archive

Monday, February 23, 2015

Cassandra DB - EventStore (Application Code Part I - Mapping)

Cassandra DB - EventStore (Application Code Part I - Mapping) The next step in the process is to begin creating the application code that will be used by the Aggregates to interactive with the Event Store(ES). As before I am using an older code base as my template but with some minor adjustments.
public interface IAppendOnlyStore : IDisposable
 {
     void Append(string streamName, 
                 byte[] data, 
                 long expectedStreamVersion = -1);
     Task<IEnumerable<DataWithVersion>> ReadRecords(string streamName, 
                                                    long afterVersion, 
                                                    int maxCount);
     Task<IEnumerable<DataWithName>> ReadRecords(DateTimeOffset afterVersion, 
                                                 int maxCount);
     Task<DateTimeOffset> GetCurrentVersion();
 }
This abstraction as the name implies allows for Append Only Access to the underlying Cassandra store providing a total of 4 methods to Append new events, Read Records for a named streamed, Read All Records regardless of stream from a given point in time and finally the current version of the ES.

Now before I can code the implementations to this abstraction I have to do some mapping of Value Objects which represent the row values to actual Cassandra Tables. As discussed in the previous post the three tables are:
  • Events
    • Used to pull events for a given aggregate
  • EventsToBeProcessed
    • Used to determine which events have yet to be projected
  • EventsVersionsToBeProcessed
    • Used to determine the current version of the store
The Value objects that represent the entities for those three tables are:
  • Record -> Events
  • RecordToBeProcesed -> EventsToBeProcessed
  • EventStoreVersion -> EventsVersionsToBeProcessed
The definitions for those values objects are as follows:
public class Record
{
    public Guid Id { get; set; }
    public string Name { get; set; }
    public long Version { get; set; }
    public DateTimeOffset VersionTimeStamp { get; set; }
    public byte[] Data { get; set; }
}
public class RecordToBeProcesed
{
    public Guid Id { get; set; }
    public string Name { get; set; }
    public long Version { get; set; }
    public DateTimeOffset VersionTimeStamp { get; set; }
    public byte[] Data { get; set; }
    public bool Processed { get; set; }
}
public class EventStoreVersion
{
    public DateTimeOffset VersionTimeStamp { get; set; }
    public bool Processed { get; set; }
}
The property names are identical to those on the tables so I won't spend anytime going over them. In my previous post you can see the definitions for all the columns.

When you use the Fluent Mapping API you have to keep in mind a few things. One there is no method Primary Key so when you have a compound primary key the first value of the key is considered the Partition Key. Two, all the others fields after the Partition Key in the primary are considered the Clustering Key. This is very important and something that I found confusing initially.

Lets take a look at the mapping for these three Value Objects and then go from there.
MappingConfiguration.
   Global.
   Define(
          new Map<Record>()
          .TableName("Events").
           KeyspaceName("EventStore").
           Column(r => r.Id, 
                       cm => cm.WithName("id").
                             WithDbType<Guid>()).
           Column(r => r.Name, 
                       cm => cm.WithName("name").
                             WithDbType<string>()).
           Column(r => r.VersionTimeStamp, 
                       cm => cm.WithName("version_time_stamp").
                             WithDbType<DateTimeOffset>()).
           Column(r => r.Version, 
                       cm => cm.WithName("version").
                       WithDbType<long>()).
           Column(r => r.Data, 
                       cm => cm.WithName("data").
                       WithDbType<byte[]>()).
           PartitionKey(e => e.Name).
           ClusteringKey(Tuple.Create("version_time_stamp", SortOrder.Ascending),
                         Tuple.Create("version", SortOrder.Unspecified)),

           new Map<RecordToBeProcesed>()
           .TableName("Events").
           KeyspaceName("EventStore").
           PartitionKey("processed").
           ClusteringKey(Tuple.Create("version_time_stamp", SortOrder.Ascending),
                         Tuple.Create("name", SortOrder.Descending),
                         Tuple.Create("version", SortOrder.Ascending)).
           KeyspaceName("EventStore").
           Column(r => r.Id, 
                       cm => cm.WithName("id").
                             WithDbType<Guid>()).
           Column(r => r.Name, 
                       cm => cm.WithName("name").
                             WithDbType<string>()).
           Column(r => r.VersionTimeStamp, 
                       cm => cm.WithName("version_time_stamp").
                                WithDbType<DateTimeOffset>()).
           Column(r => r.Version, 
                       cm => cm.WithName("version").
                       WithDbType<long>()).
           Column(r => r.Data, 
                       cm => cm.WithName("data").
                       WithDbType<byte[]>()).
           Column(r => r.Processed, 
                       cm => cm.WithName("processed").
                             WithDbType<bool>()),

           new Map<EventStoreVersion>().
           TableName("EventsVersionsToBeProcessed").
           KeyspaceName("EventStore").
           PartitionKey("version_time_stamp").
           ClusteringKey("processed").
           KeyspaceName("EventStore").
           Column(r => r.VersionTimeStamp, 
                       cm => cm.WithName"version_time_stamp").
                       WithDbType<DateTimeOffset>()).
           Column(r => r.Processed, 
                       cm => cm.WithName("processed").
                       WithDbType<bool>()));`
You will notice from examining the code sample above that to use the Fluent Mapping API correctly you need to know the following about the tables your are mapping:
  • Table Name
  • Keyspace Name
  • Partition Key
  • Clustering Key if applicable
  • Each column name in the table and its data type.
So in my table definition for EventsVersionsToBeProcessed I have two columns:
  • version_time_stamp
  • processed

These two fields also represent the primary key. So in this example the Partition Key is version_time_stamp and the Clustering Key is processed. Once you have that then the rest is straight forward. So create a new Map<T> instance, were T is the type you are mapping to, then use the fluent API to provide the table name, keyspace name, partition key as we just defined, along with the Clustering Key and then the definitions of the columns including the column names and data types.

One thing to keep in mind when using the Mapping API as I have it's not thread safe so if you run concurrent tests you will experience concurrency issues. If you are interested I solved this issue simply by wrapping all the necessary cluster, session creation and mapping methods into a abstraction that I am calling CassandraEnvironment. Within this object I use a static constructor to ensure that the mapping only occurs once despite the number of instances created during the tests. If you are interested the code sample is available here.

In the next post ,now that we have the mapping complete, I will be discussing the actually implementation of IAppendOnlyStore and some of the design decisions that went into its implementation.

Thanks for Reading...

No comments:

Post a Comment