In my original attempt to model an Event Store (ES) I used a single table. That was a bit of a naive approach. My past experiences modeling relational databases like MS SQL gave me false sense of security. Even though Cassandra's query language CQL is simple and similar to T-SQL in syntax that is by no means go good way to compare the two when it comes to modeling.
As I hinted to in my previous post the solution to modeling an ES with CQL is to use multiple tables. In order to determine which tables were necessary I first had to determine what information I needed to query.
Just like last time I started off with the Events table. It looks pretty much the same as my original but with some key differences. The first is the name
column is now the clustering key,instead of id, followed by version_time_stamp
then version
. You will also notice that I added WITH CLUSTERING ORDER BY (version_time_stamp ASC)
.
CREATE KEYSPACE IF NOT EXISTS
"EventStore" WITH replication =
{'class':'SimpleStrategy', 'replication_factor':3};
CREATE TABLE IF NOT EXISTS
"EventStore"."Events" (
id uuid,
name varchar,
version int,
version_time_stamp timestamp,
data blob,
PRIMARY KEY(name,version_time_stamp, version ))
WITH CLUSTERING ORDER BY (version_time_stamp ASC);
This schema allows all events for a named aggregate to be pulled in the order they were created. The statement WITH CLUSTERING ORDER BY (version_time_stamp ASC)
provides this capability. So when I want all the events in the order they occurred for I given key its is simple as the following query.
//query pulls all events for the given key
// account1
SELECT *
FROM "EventStore"."Events"
WHERE name = 'account1';
If there was a case, like a read model rebuild, were all events are needed regardless of aggregate key from a given point in time moving forward then following query could be used.
SELECT *
FROM "EventStore"."Events"
WHERE version_time_stamp >= 1423273614879 ALLOW FILTERING;
Notice the use of ALLOW FILTERING
. This is necessary but performance is not optimal and should only be used in situation were performance is not a issue.
One of the other requirements is a way to select all the recently committed events to generate projections. Now the previous query could be used but it's not exactly optimal. To resolve this I decided the formation of two additional tables was necessary.
The first is all of the events that have not yet been projected and the second is a check point used to keep track of the time-line of events that need to be processed.
The first of the two tables are almost identical to the Events table with the exception of the primary key and cluster ordering. In this case the partition key is processed
followed by version_time_stamp
, name
and version
columns.
CREATE TABLE IF NOT EXISTS
"EventStore"."EventsToBeProcessed" (
id uuid,
name varchar,
version int,
version_time_stamp timestamp,
data blob,
processed boolean,
PRIMARY KEY(processed,version_time_stamp,name,version ))
WITH CLUSTERING ORDER BY (version_time_stamp ASC, name DESC, version ASC);
This table schema allows for queries like:
//All events from the specified time stamp backwards
SELECT *
FROM "EventStore"."EventsToBeProcessed"
WHERE processed = false AND version_time_stamp <= 1423273614879;
//All events between a specific time-line
SELECT *
FROM "EventStore"."EventsToBeProcessed"
WHERE processedv= False AND version_time_stamp >= 1423273614879 AND
version_time_stamp <= 1423273614879;
In order to determine what version_time_stamp
to use it is necessary to track each time stamp and when it was processed.
CREATE TABLE IF NOT EXISTS
"EventStore"."EventsVersionsToBeProcessed"(
version_time_stamp timestamp,
processed boolean,
PRIMARY KEY(version_time_stamp, processed));
This table is very simple but it serves the purpose of tracking the current version of the store. With this I can find the current version or the current version that has not been processed.
//Current Version
SELECT *
FROM "EventStore"."EventsVersionsToBeProcessed"
LIMIT 1;
//Current Version that has not been processed
SELECT *
FROM "EventStore"."EventsVersionsToBeProcessed"
WHERE processed = false
LIMIT 1 ALLOW FILTERING;
To make this process work I will employ the use of CQL BATCHES like in the following example.
BEGIN BATCH
//This is just an example
INSERT INTO "EventStore"."Events"
(id,name,data,version, version_time_stamp)
VALUES(Uuid(),'account1',null,1,1423269363256);
INSERT INTO "EventStore"."EventsToBeProcessed"
(id,name,data,version, version_time_stamp,processed)
VALUES(Uuid(),'account1',null,1,1423269363256,false);
INSERT INTO "EventStore"."EventsVersionsToBeProcessed"
(version_time_stamp,processed)
VALUES(1423269363256, false);
APPLY BATCH;
There are a few more details that need to be worked out like an Update query to be run against the two processed tables once projections for a given point in time have been completed and possible DELETE queries to purge out that information. These details I will address during the creation of the application code.
I think this is a good point to stop. The base model is complete and may be adjusted but it certainly meets my core requirements. The next time I will go through the application code.
Thanks for reading...
Hi, how can you achieve full read model rebuild in chronological order if your event table has event name as partitioning key? That implies, yes all you events will be replayed sorted by name. Also that means, that partitions will be huge, because all events of same version will be added to one partition. Is this you final schema or you played with it little bit more? Thanks!
ReplyDeleteThanks for the comment. This was an experiment that was based on some work I had done earlier with a file based event store. While I was in the middle of this experiment I became acquainted with Akka.Net Persistence and began exploring that since it already had a Cassandra story and Akka maps so well over to Aggregates. As a result of that and some other life events I had to abandoned the experiment. If you are interested in using Casandra as an Event Store check out Akka at http://getakka.net or talk to the Akka guys on Gitter. They are doing some interesting work not to mention using Azure Appfabric as a persistent store.
ReplyDelete