Cassandra Key-Value Store Primer

June 7, 2010 | by Omar

The “no-sql” movement has been gaining strength over recent years. There is no denying that relational databases have their role and are highly effective in many cases. However, there are times when the scalability and availability of a relational database can become an issue. The “no-sql” movement is hoping to provide an alternative option to relational databases. These options allow for the creation of data stores that do not necessarily require the creation of fixed schemas or the joining of tables and tend to focus on scaling horizontally.

Apache Cassandra
Cassandra is a fairly recent addition to the “no-sql” movement. Initially developed by Facebook, Cassandra was open sourced in 2008 and is now housed and maintained by Apache.

Cassandra is a highly scalable, eventually consistent, distributed, structured key-value store. Cassandra brings together the distributed systems technologies from Dynamo and the data model from Google’s BigTable. Like Dynamo, Cassandra is eventually consistent. Like BigTable, Cassandra provides a ColumnFamily-based data model richer than typical key/value systems.

Cassandra was open sourced by Facebook in 2008, where it was designed by Avinash Lakshman (one of the authors of Amazon’s Dynamo) and Prashant Malik ( Facebook Engineer ). In a lot of ways you can think of Cassandra as Dynamo 2.0 or a marriage of Dynamo and BigTable. Cassandra is in production use at Facebook but is still under heavy development.

Data Model
The key difference between Cassandra and many of the other key-value stores is that it possesses the concept of a “Super Column.” For more information on what exactly a super column is (and an overview of the Cassandra data model in general) read the following blog post “WTF is a SuperColumn? An Intro to the Cassandra Data Model

Performance
Many may be curious about the performance of these key value stores. We found an article that compared Cassandra to Voldemort

Interacting with Cassandra
Cassandra is developed in java however it does not allow access via JDBC. Instead it provides access to it’s repository via the “Apache Thrift” framework which basically allows developers in any major language to gain access to a given service (in this case the Cassandra data store). For those like me who are used to JDBC, the thrift interface can be a bit confusing given that there isn’t much documentation for them. The Java client made available on the Cassandra website is rather rudimentary with no support for things like transactions, therefore many people choose to make use of a library called Hector that provides many of the features necessary for any production worthy application making use of Cassandra.

The Project
What I wanted to to do was use the Cassandra data store on a pet project that I’m working on in my spare time. I intend to use the datastore to do the following:

  1. Store messages being sent from one user to another
  2. Track queries made by a given user
  3. Store user complaints

Installation
Installation of the server is easy enough. Install Java 1.6 if you haven’t already. Edit /bin/cassandra.in.sh and change JAVA_HOME if you run a different version of java as your default JAVA_HOME. By default the server has an RMI port set to 8080, you can change the value in the /bin/cassandra.in.sh file as well.

Configuration
Edit /bin/storage-conf.xml to define various key spaces and column families. The concept of a Keyspace is similar to the schema in a relational database, where as the ColumnFamily is analogous to a table. In defining the ColumnFamilies you are defining the key groupings and how they are sorted. Therefore, I defined the following:

  <Keyspaces>
    <Keyspace Name="Nikahfied">

      <ColumnFamily Name="Queries"
                    Comment="User queries"/>

      <ColumnFamily Name="Complaints"
                    Comment="User complaints"/>

      <ColumnFamily Name="Messages"
                    ColumnType="Super"
                    CompareWith="UTF8Type"
                    CompareSubcolumnsWith="UTF8Type"
                    RowsCached="10000"
                    KeysCached="50%"
                    Comment="Message threads between users of the system"/>

    </Keyspace>
  </Keyspaces>

I would be creating a schema named Nikahfied and three table are Queries, Complaints and Messages. As you will notice, there is no real structure to the ColumnFamilies other than what type of column and the CompareWith strategy as outlined in the “WTF is a SuperColumn? An Intro to the Cassandra Data Model” article. In this case we have one super column and two regular columns. In code I will ensure that each of the rows fore each ColumnFamily contain the same content however there is nothing forcing you to do so. As you will notice, we can configure each of the ColumnFamiles individually. Messages will need to handle heavy reads and writes where as Queries and Complaints will be more write intensive. Messages therefore have a caching scheme defined where as the others do not as they will primarily handle writes rather than reads.

When initially defining the column structure I was a bit confused. I was taking the definition of Cassandra as a “key value store” literally and was confused about there being a primary key and the secondary keys. When defining a basic Column I though it would simple have a key and a value. So things would be defined as Nikahfied.<ColumnFamily>.key = value where this is actually not the case. For a basic ColumnFamily, the structure is as follows: Nikahfied.<ColumnFamily>.<Primary key>.<Secondary Key> = value. In the case of a super column, this goes one level further Nikahfied.<ColumnFamily>.<Primary key>.<Secondary Key>.<Tertiary Key> = value. That had me puzzled for some time…upon re-reading the “WTF is a SuperColumn? An Intro to the Cassandra Data Model” article I realized my mistake so I though I’d point it out to anyone else who was confused about this construct.

Data Structures:
Queries:
The structure of queries would be simple enough. I would store each query by date/time. To track which queries the user made when.

Nikahfied.Queries.UserId.<Date/Time> = <Query>

Complaints:
There is only one type of complaint wherein a user can complain about another user. So I wanted to track who received the complaint, who complained and when. The structure of a complaint would be defined as follows:
Nikahfied.Complaints.<Violator Id>.<Complainant Id> = <Date/Time>

Messages:
If there was ever a case to use a super column, a messages would be it. A user has a number of threads that contain one or more message. A user can receive and send messages. Messages themselves are the same, I simply need to track which threads contain messages that a user sent. I also need to keep track of which threads have unread messages within them. My initial design was the following:
Nikahfied.Messages.RecipientId.threads = A JSON Object that contains Threads defined in JSON Objects themselves. A thread being a group of
.sent_ids = An array of thread id's that contain messages that the user has sent
.unread_ids = An array of thread ids that contain unread messages

However, with this structure there was no need for a super column and I was determined on making use of one :) . So I expanded out the thread structure as follows:

Nikahfied.Messages.RecipientId.threads.<thread id> = A JSON Object that contains an individual Thread
.sent_ids = An array of thread id's that contain messages that the user has sent
.unread_ids = An array of thread ids that contain unread messages

Now that we have the content structure defined I now need to create a hook between my application and the Cassandra key value store. For development I have decided to use the Grails Framework to speed up development. I have included the Hector library to access the framework but needed to define a CassandraService to be the central point of contact between various controllers and other services to the Cassandra key store. Both Cassandra’s Thrift interface and Hector have pretty poor documentation so I created a Grails service that makes the interaction with Cassandra a bit clearer (at least to me). Note: You will need to be using Java 1.6.

import me.prettyprint.cassandra.service.CassandraClientPool
import me.prettyprint.cassandra.service.CassandraClientPoolFactory
import me.prettyprint.cassandra.service.CassandraClient
import me.prettyprint.cassandra.service.Keyspace
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.NotFoundException;

import org.apache.cassandra.thrift.SuperColumn

class CassandraService {

    boolean transactional = true

    def servers=["localhost:9160"]
    def defaultKeyspace="Nikahfied"
    private static final String NOT_FOUND = ""

    private execute(keyspaceName=defaultKeyspace,block){
        CassandraClientPool pool = CassandraClientPoolFactory.INSTANCE.get();
        CassandraClient client = pool.borrowClient(servers);

        try {
            Keyspace keyspace = client.getKeyspace(keyspaceName)
            return block(keyspace)
        } finally {
            pool.releaseClient(client);
        }
    }

    /**
     * Get a single super column
     * @param columnFamily
     * @param secondaryKey
     * @param key primary key
     * @return Matching super column
     */
    public SuperColumn getSuperColumn(String columnFamily, String key, String secondaryKey) {
        ColumnPath cp = new ColumnPath(columnFamily)
        cp.setSuper_column(secondaryKey.bytes)
        return execute {Keyspace keyspace -&gt;
            SuperColumn sc = null;
            try {
                sc = keyspace.getSuperColumn(key, cp)
            } catch (NotFoundException nfe) {
                sc = null;
            }
            return sc
        }
    }

    /**
     * Get multiple super columns
     * @param columnFamily
     * @param secondaryKey
     * @param keys primary keys
     * @return matching super columns
     */
    public Map multigetSuperColumn(String columnFamily, List keys, String secondaryKey) {
        ColumnPath cp = new ColumnPath(columnFamily)
        cp.setSuper_column(secondaryKey.bytes)
        return execute {Keyspace keyspace -&gt;
            Map scMap = keyspace.multigetSuperColumn(keys, cp)
            return scMap
        }
    }

    /**
     * Get multiple columns
     * @param secondaryKey secondaryKey
     * @param keys primary keys
     * @param columnFamily column family
     * @return results if any
     */
    public Map multigetColumn(List keys, String secondaryKey, String columnFamily) {
    	ColumnPath cp = new ColumnPath(columnFamily)
        cp.setColumn(secondaryKey.bytes)
        return execute {Keyspace keyspace -&gt;
            Map cMap = keyspace.multigetColumn(keys, cp)
            return cMap
        }
    }

    /**
     * Get a single column and it's values
     * @param columnFamily
     * @param key primary key
     * @return matching column
     */
    public Column getColumn(String columnFamily, String key, String secondaryKey) {
        def cp = new ColumnPath(columnFamily)
        cp.setColumn(secondaryKey.bytes)
        return execute {Keyspace keyspace -&gt;
            Column c = null;
            try {
                c = keyspace.getColumn(key, cp)
            } catch (NotFoundException nfe) {
                c = null;
            }
            return c
        }
    }

    /**
     * Sets the new value for this column path
     * @param cf Column family
     * @param secondaryKey secondary key
     * @param key primary key
     * @param value value
     */
    def setColumnPathValue(String cf, String key, String secondaryKey, String value){
        def cp = new ColumnPath(cf)
        cp.setColumn(secondaryKey.bytes)
        return setColumnValue(cp, key, value)
    }

    /**
     * Sets the new super column value for this column path
     * @param cf Column Family
     * @param sc Super Key Id
     * @param secondaryKey Secondary Key
     * @param key primary key
     * @param value value
     * @return the value of the Column requested
     */
    def setColumnPathValue(String cf, String sc, String key, String secondaryKey, String value){
        def cp = new ColumnPath(cf)
        cp.setSuper_column(sc.bytes)
        cp.setColumn(secondaryKey.bytes)
        return setColumnValue(cp, key, value)
    }

    /**
     * Set a regular column
     * @param cp Column path
     * @param key primary key
     * @param value value
     */
    private setColumnValue(ColumnPath cp, String key, String value){
        return execute{ Keyspace keyspace -&gt;
            keyspace.insert(key, cp, value.bytes)
        }
    }

    /**
     * Batch insert content for a give key and it's values
     * @param key primary key
     * @param columnMap column key values
     * @param superColumnMap super column key values
     */
    public batchInsert(String key, Map&lt;String, List&gt; columnMap, Map&lt;String, List&gt; superColumnMap) {
    	return execute{ Keyspace keyspace -&gt;
        	keyspace.batchInsert(key, columnMap, superColumnMap)
    	}
    }

}

Using this class I have created a MessageService, ComplaintsService and a QueriesService that interact with the CassandraService to retrieve and store data. There is an adjoining Message and Complaints controller where as the QueriesService is referenced by the SearchController. Hopefully this blog post will provide you with a better understanding of how to get started with Cassandra the CassandraService class should provide you with a jump start on integrating Cassandra into an existing Grails or Java project.

Bookmark and Share

Tags: , , , ,

Leave a Reply