About Me

My photo
I am passionate .NET developer who believes in the power of collaboration and craftsmanship.

Blog Archive

Monday, February 23, 2015

Cassandra DB - EventStore (Application Code Part I - Mapping)

Cassandra DB - EventStore (Application Code Part I - Mapping) The next step in the process is to begin creating the application code that will be used by the Aggregates to interactive with the Event Store(ES). As before I am using an older code base as my template but with some minor adjustments.
public interface IAppendOnlyStore : IDisposable
 {
     void Append(string streamName, 
                 byte[] data, 
                 long expectedStreamVersion = -1);
     Task<IEnumerable<DataWithVersion>> ReadRecords(string streamName, 
                                                    long afterVersion, 
                                                    int maxCount);
     Task<IEnumerable<DataWithName>> ReadRecords(DateTimeOffset afterVersion, 
                                                 int maxCount);
     Task<DateTimeOffset> GetCurrentVersion();
 }
This abstraction as the name implies allows for Append Only Access to the underlying Cassandra store providing a total of 4 methods to Append new events, Read Records for a named streamed, Read All Records regardless of stream from a given point in time and finally the current version of the ES.

Now before I can code the implementations to this abstraction I have to do some mapping of Value Objects which represent the row values to actual Cassandra Tables. As discussed in the previous post the three tables are:
  • Events
    • Used to pull events for a given aggregate
  • EventsToBeProcessed
    • Used to determine which events have yet to be projected
  • EventsVersionsToBeProcessed
    • Used to determine the current version of the store
The Value objects that represent the entities for those three tables are:
  • Record -> Events
  • RecordToBeProcesed -> EventsToBeProcessed
  • EventStoreVersion -> EventsVersionsToBeProcessed
The definitions for those values objects are as follows:
public class Record
{
    public Guid Id { get; set; }
    public string Name { get; set; }
    public long Version { get; set; }
    public DateTimeOffset VersionTimeStamp { get; set; }
    public byte[] Data { get; set; }
}
public class RecordToBeProcesed
{
    public Guid Id { get; set; }
    public string Name { get; set; }
    public long Version { get; set; }
    public DateTimeOffset VersionTimeStamp { get; set; }
    public byte[] Data { get; set; }
    public bool Processed { get; set; }
}
public class EventStoreVersion
{
    public DateTimeOffset VersionTimeStamp { get; set; }
    public bool Processed { get; set; }
}
The property names are identical to those on the tables so I won't spend anytime going over them. In my previous post you can see the definitions for all the columns.

When you use the Fluent Mapping API you have to keep in mind a few things. One there is no method Primary Key so when you have a compound primary key the first value of the key is considered the Partition Key. Two, all the others fields after the Partition Key in the primary are considered the Clustering Key. This is very important and something that I found confusing initially.

Lets take a look at the mapping for these three Value Objects and then go from there.
MappingConfiguration.
   Global.
   Define(
          new Map<Record>()
          .TableName("Events").
           KeyspaceName("EventStore").
           Column(r => r.Id, 
                       cm => cm.WithName("id").
                             WithDbType<Guid>()).
           Column(r => r.Name, 
                       cm => cm.WithName("name").
                             WithDbType<string>()).
           Column(r => r.VersionTimeStamp, 
                       cm => cm.WithName("version_time_stamp").
                             WithDbType<DateTimeOffset>()).
           Column(r => r.Version, 
                       cm => cm.WithName("version").
                       WithDbType<long>()).
           Column(r => r.Data, 
                       cm => cm.WithName("data").
                       WithDbType<byte[]>()).
           PartitionKey(e => e.Name).
           ClusteringKey(Tuple.Create("version_time_stamp", SortOrder.Ascending),
                         Tuple.Create("version", SortOrder.Unspecified)),

           new Map<RecordToBeProcesed>()
           .TableName("Events").
           KeyspaceName("EventStore").
           PartitionKey("processed").
           ClusteringKey(Tuple.Create("version_time_stamp", SortOrder.Ascending),
                         Tuple.Create("name", SortOrder.Descending),
                         Tuple.Create("version", SortOrder.Ascending)).
           KeyspaceName("EventStore").
           Column(r => r.Id, 
                       cm => cm.WithName("id").
                             WithDbType<Guid>()).
           Column(r => r.Name, 
                       cm => cm.WithName("name").
                             WithDbType<string>()).
           Column(r => r.VersionTimeStamp, 
                       cm => cm.WithName("version_time_stamp").
                                WithDbType<DateTimeOffset>()).
           Column(r => r.Version, 
                       cm => cm.WithName("version").
                       WithDbType<long>()).
           Column(r => r.Data, 
                       cm => cm.WithName("data").
                       WithDbType<byte[]>()).
           Column(r => r.Processed, 
                       cm => cm.WithName("processed").
                             WithDbType<bool>()),

           new Map<EventStoreVersion>().
           TableName("EventsVersionsToBeProcessed").
           KeyspaceName("EventStore").
           PartitionKey("version_time_stamp").
           ClusteringKey("processed").
           KeyspaceName("EventStore").
           Column(r => r.VersionTimeStamp, 
                       cm => cm.WithName"version_time_stamp").
                       WithDbType<DateTimeOffset>()).
           Column(r => r.Processed, 
                       cm => cm.WithName("processed").
                       WithDbType<bool>()));`
You will notice from examining the code sample above that to use the Fluent Mapping API correctly you need to know the following about the tables your are mapping:
  • Table Name
  • Keyspace Name
  • Partition Key
  • Clustering Key if applicable
  • Each column name in the table and its data type.
So in my table definition for EventsVersionsToBeProcessed I have two columns:
  • version_time_stamp
  • processed

These two fields also represent the primary key. So in this example the Partition Key is version_time_stamp and the Clustering Key is processed. Once you have that then the rest is straight forward. So create a new Map<T> instance, were T is the type you are mapping to, then use the fluent API to provide the table name, keyspace name, partition key as we just defined, along with the Clustering Key and then the definitions of the columns including the column names and data types.

One thing to keep in mind when using the Mapping API as I have it's not thread safe so if you run concurrent tests you will experience concurrency issues. If you are interested I solved this issue simply by wrapping all the necessary cluster, session creation and mapping methods into a abstraction that I am calling CassandraEnvironment. Within this object I use a static constructor to ensure that the mapping only occurs once despite the number of instances created during the tests. If you are interested the code sample is available here.

In the next post ,now that we have the mapping complete, I will be discussing the actually implementation of IAppendOnlyStore and some of the design decisions that went into its implementation.

Thanks for Reading...

Thursday, February 12, 2015

Cassandra DB - Event Store (Part III Second Model)

Modeling Cassandra Blog Post 2

In my original attempt to model an Event Store (ES) I used a single table. That was a bit of a naive approach. My past experiences modeling relational databases like MS SQL gave me false sense of security. Even though Cassandra's query language CQL is simple and similar to T-SQL in syntax that is by no means go good way to compare the two when it comes to modeling.

As I hinted to in my previous post the solution to modeling an ES with CQL is to use multiple tables. In order to determine which tables were necessary I first had to determine what information I needed to query.

Just like last time I started off with the Events table. It looks pretty much the same as my original but with some key differences. The first is the name column is now the clustering key,instead of id, followed by version_time_stamp then version. You will also notice that I added WITH CLUSTERING ORDER BY (version_time_stamp ASC).

CREATE KEYSPACE IF NOT EXISTS 
"EventStore" WITH replication = 
{'class':'SimpleStrategy', 'replication_factor':3};

CREATE TABLE IF NOT EXISTS
"EventStore"."Events" (
id uuid,
name varchar,
version int,
version_time_stamp timestamp,
data blob,
PRIMARY KEY(name,version_time_stamp, version ))
WITH CLUSTERING ORDER BY (version_time_stamp ASC);

This schema allows all events for a named aggregate to be pulled in the order they were created. The statement WITH CLUSTERING ORDER BY (version_time_stamp ASC) provides this capability. So when I want all the events in the order they occurred for I given key its is simple as the following query.

//query pulls all events for the given key
// account1
SELECT *
FROM    "EventStore"."Events"
WHERE   name = 'account1';

If there was a case, like a read model rebuild, were all events are needed regardless of aggregate key from a given point in time moving forward then following query could be used.

SELECT * 
FROM "EventStore"."Events" 
WHERE version_time_stamp >= 1423273614879 ALLOW FILTERING;

Notice the use of ALLOW FILTERING. This is necessary but performance is not optimal and should only be used in situation were performance is not a issue.

One of the other requirements is a way to select all the recently committed events to generate projections. Now the previous query could be used but it's not exactly optimal. To resolve this I decided the formation of two additional tables was necessary.

The first is all of the events that have not yet been projected and the second is a check point used to keep track of the time-line of events that need to be processed.

The first of the two tables are almost identical to the Events table with the exception of the primary key and cluster ordering. In this case the partition key is processed followed by version_time_stamp, name and version columns.

CREATE TABLE IF NOT EXISTS
"EventStore"."EventsToBeProcessed" (
id uuid,
name varchar,
version int,
version_time_stamp timestamp,
data blob,
processed boolean,
PRIMARY KEY(processed,version_time_stamp,name,version ))
WITH CLUSTERING ORDER BY (version_time_stamp ASC, name DESC, version ASC);

This table schema allows for queries like:

//All events from the specified time stamp backwards
SELECT * 
FROM "EventStore"."EventsToBeProcessed" 
WHERE processed = false AND version_time_stamp <= 1423273614879;

//All events between a specific time-line
SELECT * 
FROM "EventStore"."EventsToBeProcessed" 
WHERE processedv= False AND version_time_stamp >= 1423273614879 AND 
version_time_stamp <= 1423273614879;

In order to determine what version_time_stamp to use it is necessary to track each time stamp and when it was processed.

CREATE TABLE IF NOT EXISTS
"EventStore"."EventsVersionsToBeProcessed"(
version_time_stamp timestamp,
processed boolean,
PRIMARY KEY(version_time_stamp, processed));

This table is very simple but it serves the purpose of tracking the current version of the store. With this I can find the current version or the current version that has not been processed.

//Current Version
SELECT * 
FROM "EventStore"."EventsVersionsToBeProcessed" 
LIMIT 1;

//Current Version that has not been processed
SELECT * 
FROM "EventStore"."EventsVersionsToBeProcessed" 
WHERE processed = false 
LIMIT 1 ALLOW FILTERING;

To make this process work I will employ the use of CQL BATCHES like in the following example.

BEGIN BATCH

    //This is just an example
    INSERT INTO "EventStore"."Events" 
    (id,name,data,version, version_time_stamp) 
    VALUES(Uuid(),'account1',null,1,1423269363256);

    INSERT INTO "EventStore"."EventsToBeProcessed" 
    (id,name,data,version, version_time_stamp,processed) 
    VALUES(Uuid(),'account1',null,1,1423269363256,false);

    INSERT INTO "EventStore"."EventsVersionsToBeProcessed" 
    (version_time_stamp,processed) 
    VALUES(1423269363256, false);

APPLY BATCH;

There are a few more details that need to be worked out like an Update query to be run against the two processed tables once projections for a given point in time have been completed and possible DELETE queries to purge out that information. These details I will address during the creation of the application code.

I think this is a good point to stop. The base model is complete and may be adjusted but it certainly meets my core requirements. The next time I will go through the application code.

Thanks for reading...

Tuesday, February 3, 2015

Cassandra DB - Event Store (Part II First Model)

My first model is based on a simple SQL implementation that I have used in the past which is defined as follows:

CREATE TABLE [dbo].[Events](
[Id] [int] PRIMARY KEY IDENTITY,
[Name] [nvarchar](50) NOT NULL,
[Version] [int] NOT NULL,
[Data] [varbinary](max) NOT NULL
) ON [PRIMARY]

The table contains only four columns:
  • Id - An identity integer value which is the primary key and used to determine the version of the Event Store with the following query.
  • SELECT MAX(Id) as version FROM Events
  • Name - Is a key used to identify the stream of events for an aggregate.
  • Version - Used to sort the events for a particular stream to ensure they are order in chronological order.
  • Data - The raw binary representation of the events for the given key and version.
The first modeling challenge that I have to deal with is how to determine the version of the Event Store on system start. Since Cassandra does not have an Identity column and because its great for time series modeling the logical decision is to try and use the timestamp datatype. So lets see how that could look.

CREATE KEYSPACE IF NOT EXISTS 
"EventStore" WITH replication = 
{'class':'SimpleStrategy', 'replication_factor':3};

CREATE TABLE IF NOT EXISTS
 "EventStore"."Events" (
id uuid,
name varchar,
version int,
version_time_stamp timestamp,
data blob,
PRIMARY KEY(id, name))
WITH CLUSTERING ORDER BY (name DESC);

CREATE INDEX 
IF NOT EXISTS version_index ON 
"Events"(version_time_stamp);

This time around there are five columns and one index:

  • id - primary partition key
  • name - Name is the same as the SQL model
  • version - The same as the SQL model
  • version_time_stamp - Time stamp when the record was created
  • data - The same as the SQL model
  • version_index - Index Used to pull missing records

Now that I have a table model and index defined I would like to write the following queries to get the current version of the event store and compare that value against the last know version and if it differs then use that last known value as a parameter in the second query to get all of the missing records.

//Get the current version
SELECT version_timestamp
FROM Events 
LIMIT 1

//Get All the records since the last version
SELECT id, name,version,version_timestamp,data 
FROM Events 
WHERE version_index >= ?


The problem with this solution is that I can not use version_index without the partition key, the version_timestamp needs to be a part of the CLUSTERING ORDER BY statement in the table definition to ensure proper ordering and my secondary index does not have high cardinality. Which does not follow the recommendations from DataStax.

When to Use Secondary IndexesCassandra's built-in secondary indexes are best on a column family having many rows that contain the indexed value. The more unique values that exist in a particular column, the more overhead you will have, on average, to query and maintain the index. For example, suppose you had a user table with a billion users and wanted to look up users by the state they lived in. Many users will share the same column value for state (such as CA, NY, TX, etc.). This would be a good candidate for a secondary index.
When Not to Use Secondary Indexes
Do not use secondary indexes to query a huge volume of records for a small number of results. For example, if you create indexes on columns that have many distinct values, a query between the fields will incur many seeks for very few results. In the column family with a billion users, looking up users by their email address (a value that is typically unique for each user) instead of by their state, is likely to be very inefficient. It would probably be more efficient to manually maintain a dynamic column family as a form of an index instead of using a secondary index. For columns containing unique data, it is sometimes fine performance-wise to use secondary indexes for convenience, as long as the query volume to the indexed column family is moderate and not under constant load.
I basically have a model that is full of impedances mismatches with my queries. So what is the next step? Well I will go through that the next time but here is a hint based on some advice that got recently from DataStax's Luke Tillman author of CQL Poco on modeling with Cassandra.
Many times, you'll end up with a "table per query" type data model, where you insert multiple copies of the data at write time with each table designed to handle a specific query.
Before signing off let me recommend two excellent resources created by Luke Tillman which are a great slide aimed at .NET Developers and the other is Luke's blog and in particular the following post if you are just getting started.

Thanks for reading.