quarta-feira, 1 de setembro de 2010

Talking to Cassandra using Java and DAO pattern


Introduction


It is definitely unavoidable and not just another hype this wave of NoSQL - an alternative to rigid persistence models provided by RDBMS (Relational Database Management Systems). In a connected world that needs more and more data in a continuous growth and speed, it is proved by huge technology companies like Google that changes need to be done in order to accomodate this. One of these turns was indeed NoSQL databases, such as Bigtable, Hypertable, and Cassandra. The latter will be the subject of this article.

As described in its home page [1]:

"The Apache Cassandra Project develops a highly scalable second-generation distributed database, bringing together Dynamo's fully distributed design and Bigtable's ColumnFamily-based data model. Cassandra was open sourced by Facebook in 2008, and is now developed by Apache committers and contributors from many companies."

So far, Cassandra is in use at Digg, Facebook, Twitter, Reddit, Rackspace, Cloudkick, Cisco, SimpleGeo, Ooyala, OpenX, and more companies that have large, active data sets. The largest production cluster has over 100 TB of data in over 150 machines!

Therefore, let's take a walk-through on how can this new storing and programming model be achieved, particularly using a client on Java language.

Database keyspace setup


We are considering Cassandra's stable version on the time of this document: 0.6.5. Yet, we'll run the server in single mode for simplification purposes. Remember that JDK 1.6 (Java Development Kit) is needed to run Cassandra server.

The first step on this study will be to include the keyspace "ContactList". In order to to this, add the following lines into CASSANDRA_HOME/conf/storage-conf.xml, inside <Keyspaces> tag:

<Keyspace Name="ContactList">
<ColumnFamily CompareWith="UTF8Type" Name="Groups"/>
<ColumnFamily CompareWith="UTF8Type" Name="Contacts"/>
<ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
<ReplicationFactor>1</ReplicationFactor>
<EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
</Keyspace>

Start (or restart) your Cassandra server (by issuing "CASSANDRA_HOME/bin/cassandra -f") after editing that file. The proper data files and index structures should be created.

Please consider reading Cassandra's Getting Started guide [2] for further information on this process.

Once the server is online, check the new keyspace availability by using the client application invoking the following command:

CASSANDRA_HOME/bin/cassandra-cli --host localhost --port 9160

The resulting expected screen is shown below:

Connected to: "Test Cluster" on localhost/9160
Welcome to cassandra CLI.

Type 'help' or '?' for help. Type 'quit' or 'exit' to quit.
cassandra>

You could then try to show available keyspaces on the server, as illustrated:

cassandra> show keyspaces
ContactList
Keyspace1
system

Note that "ContactList" should be included. If not, you should review the previous steps.

Now invoke the instruction below in order to explain detailed information on the new keyspace:

cassandra> describe keyspace ContactList
ContactList.Groups
Column Family Type: Standard
Columns Sorted By: org.apache.cassandra.db.marshal.UTF8Type@33b121

Column Family Type: Standard
Column Sorted By: org.apache.cassandra.db.marshal.UTF8Type
flush period: null minutes
------
ContactList.Contacts
Column Family Type: Standard
Columns Sorted By: org.apache.cassandra.db.marshal.UTF8Type@1b22920

Column Family Type: Standard
Column Sorted By: org.apache.cassandra.db.marshal.UTF8Type
flush period: null minutes
------

OK, apparently our key-value storage is working well. It's time for coding! :D

Starting the use case


We'll start by implementing a simple "group" entity, represented in Java by a POJO-based class named Group with the content below:

public class Group {

private Integer id;
private String name;

public Group() {
}

public Group(Integer id, String name) {
this.id = id;
this.name = name;
}

public Integer getId() {
return id;
}
public String getName() {
return name;
}
public void setId(Integer id) {
this.id = id;
}
public void setName(String name) {
this.name = name;
}

@Override
public String toString() {
return "Group [id=" + id + ", name=" + name + "]";
}

}

Concerns must be separated in different layers. In order to persist the "group" entity, it is highly recommended to have a DAO pattern-based interface. That interface, named IGroupDAO, is destinated to exclusively handle operations on that entity. Here is its corresponding Java code:

public interface IGroupDAO {

void startup();

void shutdown();

List<Group> findAll();

Group findById(Integer id);

void save(Group group);

void remove(Group group);

}

Simply put, save() and remove() methods are enough to perform modifications on the data, whereas findAll() and findById() are reserved for retrieving purposes. You might then ask: "what da hell are startup() and shutdown() doing there"? Hold on, they are a necessary "gambiarra" to show stuff working and might not be inserted on a real application. :)

Developers, developers, developers: do not forget to write tests before actual codes..! Test Driven Development (TDD) is definitely important and worthful besides being so funny to use at coding time!

As we are using Java, let's create unitary a test against that DAO interface using the JUnit [3] testing framework. The resulting class, named GroupDAOTest has the code shown below:

public class GroupDAOTest {

private static IGroupDAO dao;

private static final Integer GROUP_ID = 123;
private static final String GROUP_NAME = "Test Group";

@BeforeClass
public static void setUpBeforeClass() throws Exception {
dao = new GroupDAO();
dao.startup();
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
dao.shutdown();
dao = null;
}

@Test
public void testSave() {

System.out.println("GroupDAOTest.testSave()");

Group group = new Group();
group.setId(GROUP_ID);
group.setName(GROUP_NAME);

System.out.println("Saving group: " + group);
dao.save(group);
Assert.assertTrue(true);

Group retrieved = dao.findById(GROUP_ID);
System.out.println("Retrieved group: " + retrieved);
Assert.assertNotNull(retrieved);
Assert.assertEquals(GROUP_ID, retrieved.getId());
Assert.assertEquals(GROUP_NAME, retrieved.getName());
}

@Test
public void testRetrieve() {

System.out.println("GroupDAOTest.testRetrieve()");

System.out.println("Saving groups");
for (int i = 1; i <= 10; i++) {
Group group = new Group();
group.setId(GROUP_ID * 100 + i);
group.setName(GROUP_NAME + " " + i);
dao.save(group);
}

List<Group> list = dao.findAll();
System.out.println("Retrieving groups");
Assert.assertNotNull(list);
Assert.assertFalse(list.isEmpty());
Assert.assertTrue(list.size() >= 10);

System.out.println("Retrieved list:");
for (Group group : list) {
System.out.println("- " + group);
}
}

@Test
public void testRemove() {

System.out.println("GroupDAOTest.testRemove()");

Group group = new Group();
group.setId(GROUP_ID);
group.setName(GROUP_NAME);

System.out.println("Saving group: " + group);
dao.save(group);

System.out.println("Removing group: " + group);
dao.remove(group);
Assert.assertTrue(true);

Group retrieved = dao.findById(GROUP_ID);
System.out.println("Retrieved group: " + retrieved);
Assert.assertNull(retrieved);
}

}


The goal of this GroupDAOTest test class is to invoke the methods belonging to IGroupDAO interface. For each DAO functionality a method on GroupDAOTest must be created and annotated with @Test. The expected results might be tested using Assert class.

Actual persistence


Base codes were built along with testing classes. However, persistence against Cassandra is not being made yet. Thus, how could we get access to a Cassandra database from a client perspective?

Cassandra uses a more general API called Thrift [4] for its external client-facing interactions. Cassandra's main API/RPC/Thrift port is 9160. Thrift supports a wide variety of languages so you can code your application to use Thrift directly (take a look at [5]), or use a high-level client where available.

Of course we're gonna use the low-level Thrift interface. :)

The first thing to do is to add several Java libraries related to Cassandra and Thrift to the project's classpath. They are released along with Cassandra binaries, especifically at CASSANDRA_HOME/lib directory.

And now we finally implement the GroupDAO class, which must provide actual instructions assigned by IGroupDAO interface.

In this code, we'll need to manually handle the connection to Cassandra through its binary protocol on TCP port 9160. This is exactly where startup() and shutdown() get in.

If you're still not familiar with terms used at key-value databases, such as keystore, column families, etc, don't worry. I suggest you further reading of some articles about Cassandra's Data Model [6, 7]. For the moment, just know that Thrift interface for Java provides support classes like Cassandra.Client, ColumnParent, ColumnOrSuperColumn, KeyRange, SliceRange, and SlicePredicate.

Take a look at the code below for GroupDAO implementation class:

public class GroupDAO implements IGroupDAO {

private static final String KEYSPACE = "ContactList";
private static final String COLUMN_FAMILY = "Groups";
private static final String ENCODING = "utf-8";

private static TTransport tr = null;

/**
* Close the connection to the Cassandra Database.
*/
private static void closeConnection() {
try {
tr.flush();
tr.close();
} catch (TTransportException exception) {
exception.printStackTrace();
}
}

/**
* Open up a new connection to the Cassandra Database.
*
* @return the Cassandra Client
*/
private static Cassandra.Client setupConnection() {
try {
tr = new TSocket("localhost", 9160);
TProtocol proto = new TBinaryProtocol(tr);
Cassandra.Client client = new Cassandra.Client(proto);
tr.open();
return client;
} catch (TTransportException exception) {
exception.printStackTrace();
}
return null;
}

private Cassandra.Client client;

@Override
public void startup() {
this.client = setupConnection();
}

@Override
public void shutdown() {
closeConnection();
}

@Override
public List<Group> findAll() {
List<Group> list = new ArrayList<Group>();
try {
KeyRange keyRange = new KeyRange();
keyRange.setStart_key("");
keyRange.setEnd_key("");

SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new byte[] {});
sliceRange.setFinish(new byte[] {});

SlicePredicate slicePredicate = new SlicePredicate();
slicePredicate.setSlice_range(sliceRange);

ColumnParent columnParent = new ColumnParent(COLUMN_FAMILY);
List<KeySlice> keySlices = client.get_range_slices(KEYSPACE,
columnParent, slicePredicate, keyRange,
ConsistencyLevel.ONE);

if (keySlices == null || keySlices.isEmpty())
return list;

for (KeySlice keySlice : keySlices) {

Group group = new Group();
group.setId(Integer.parseInt(keySlice.getKey()));

for (ColumnOrSuperColumn c : keySlice.getColumns()) {
if (c.getColumn() != null) {
String name = new String(c.getColumn().getName(),
ENCODING);
String value = new String(c.getColumn().getValue(),
ENCODING);
// long timestamp = c.getColumn().getTimestamp();
if (name.equals("name")) {
group.setName(value);
}
}
}

list.add(group);
}
return list;

} catch (Exception exception) {
exception.printStackTrace();
}
return null;
}

@Override
public Group findById(Integer id) {
try {
SlicePredicate slicePredicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new byte[] {});
sliceRange.setFinish(new byte[] {});
slicePredicate.setSlice_range(sliceRange);

ColumnParent columnParent = new ColumnParent(COLUMN_FAMILY);
List<ColumnOrSuperColumn> result = client.get_slice(KEYSPACE, id
.toString(), columnParent, slicePredicate,
ConsistencyLevel.ONE);

if (result == null || result.isEmpty())
return null;

Group group = new Group();
group.setId(id);

for (ColumnOrSuperColumn c : result) {
if (c.getColumn() != null) {
String name = new String(c.getColumn().getName(), ENCODING);
String value = new String(c.getColumn().getValue(), ENCODING);
// long timestamp = c.getColumn().getTimestamp();
if (name.equals("name")) {
group.setName(value);
}
}
}

return group;
} catch (Exception exception) {
exception.printStackTrace();
}
return null;
}

@Override
public void save(Group group) {
try {
long timestamp = System.currentTimeMillis();
Map<String, List<ColumnOrSuperColumn>> job = new HashMap<String, List<ColumnOrSuperColumn>>();

List<ColumnOrSuperColumn> columns = new ArrayList<ColumnOrSuperColumn>();
Column column = new Column("name".getBytes(ENCODING), group
.getName().getBytes(ENCODING), timestamp);
ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
columnOrSuperColumn.setColumn(column);
columns.add(columnOrSuperColumn);

job.put(COLUMN_FAMILY, columns);

client.batch_insert(KEYSPACE, group.getId().toString(), job,
ConsistencyLevel.ALL);
} catch (Exception exception) {
exception.printStackTrace();
}
}

@Override
public void remove(Group group) {
try {
ColumnPath columnPath = new ColumnPath(COLUMN_FAMILY);
client.remove(KEYSPACE, group.getId().toString(), columnPath,
System.currentTimeMillis(), ConsistencyLevel.ALL);
} catch (Exception exception) {
exception.printStackTrace();
}
}

}

Now make sure you project is fully "buildable" and make the testing class run! Unless your Cassandra server is offline, the resulting on JUnit is "evergreen".

Here are the tasks supposed to be performed by the given unitary test:
  1. instantiate a group identified by the key 123 and persist it, searching it and comparing to the expected object;
  2. persist a group, remove it on the database and then try to search for its key expecting not to find it;
  3. insert 10 instances of groups with keys numbered from 12301 to 12310 and test their retrieval.

If you're so skeptical as me, you'll love to search for those persisted data by hands. In that case, prompt to the Cassandra's client and try sending some commands to retrieve groups and their columns recently stored. Here are some suggestions:

cassandra> count ContactList.Groups['12301']
1 columns

cassandra> get ContactList.Groups['12301']
=> (column=name, value=Test Group 1, timestamp=1283287745613)
Returned 1 results.

cassandra> get ContactList.Groups['12301']['name']
=> (column=name, value=Test Group 1, timestamp=1283287745613)


Conclusion


We have reached the end of this study, at last! In this article some concepts about a NoSQL-based database called Cassandra were introduced. Besides, we implemented in Java a very simple example of persistence into a Cassandra keyspace using a low-level API named Thrift. In the end we just searched for the stored values in the database through Cassandra's client.

References


[1] Apache Cassandra
[2] Getting Started with Cassandra
[3] JUnit - Unit Testing Framework
[4] Apache Thrift
[5] Cassandra Wiki - Thrift Examples
[6] Cassandra Wiki - Data Model
[7] A Quick Introduction to the Cassandra Data Model