sexta-feira, 24 de setembro de 2010

Assembling a local Cassandra cluster in Linux



In order to build a Apache Cassandra [1] cluster exclusively for availability and replication testings, here is a simple solution, based on a single Linux instance, with no virtualization at all.

The idea is to initialize every node, run a testing client, and then manually kill some nodes processes in order to check the service continuous availability and data replication with several replication factors.

Cassandra's last stable version at this post date was 0.6.5, and this is the released considered in the study.

Thinking of a cluster using 3 nodes, we'll create the following file system structure on Linux:

/opt/apache-cassandra-0.6.5/nodes
|
|-- node1
|   |-- bin
|   |-- conf
|   |-- data
|   |-- log
|   `-- txs
|
|-- node2
|   |-- bin
|   |-- conf
|   |-- data
|   |-- log
|   `-- txs
|
`-- node3
    |-- bin
    |-- conf
    |-- data
    |-- log
    `-- txs


That is, each node has its own set of directories, namely:
- bin: binaries (mostly Shell Scripts) tuned for the instance
- conf: configuration files for the node
- data: database binary files
- log: system log in text format
- txs: transaction logs (i.e. commit logs)

In the following sections the steps will be described in detail.


Cassandra installation



First of all, get the required packages for Cassandra from its download site. Extract the file contents into /opt/apache-cassandra-0.6.5/ directory (you'll need administrator privileges in order to do so). Make sure your regular user owns this entire directory tree (use chown if needed).

Certify that you already has Java Virtual Machine (JVM) installed on the system, at least version 1.6. Cassandra is implemented in Java, so it needs JVM to be executed.


Supplying additional network interfaces



As we are building a pseudo-cluster, consider there are no more network interfaces than existing ones. Since Cassandra is a distributed system, their participant nodes must have an exclusive IP address each one. Fortunately we can simulate it on Linux using aliases to the existent loopback interface (i.e., "lo").

Therefore, using root user, we'll issue the instructions below in order to create the aliases lo:2 and lo:3, respectively pointing to 127.0.0.2 and 127.0.0.3:

# ifconfig lo:2 127.0.0.2 up
# ifconfig lo:3 127.0.0.3 up


You can check the overcome of these instructions by invoking ifconfig using no arguments:

$ ifconfig
lo        Link encap:Local Loopback
          inet addr:127.0.0.1  Mask:255.0.0.0
          inet6 addr: ::1/128 Scope:Host
          UP LOOPBACK RUNNING  MTU:16436  Metric:1
          RX packets:30848 errors:0 dropped:0 overruns:0 frame:0
          TX packets:30848 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:0
          RX bytes:2946793 (2.9 MB)  TX bytes:2946793 (2.9 MB)

lo:2      Link encap:Local Loopback
          inet addr:127.0.0.2  Mask:255.0.0.0
          UP LOOPBACK RUNNING  MTU:16436  Metric:1

lo:3      Link encap:Local Loopback
          inet addr:127.0.0.3  Mask:255.0.0.0
          UP LOOPBACK RUNNING  MTU:16436  Metric:1


That way, node1's IP is 127.0.0.1, node2's 127.0.0.2, and so forth.


Registering node hostnames locally



Instead of using IP addresses in the sequential steps, we'll assign hostnames to them and use those names when referencing a cluster node.

As we have no DNS in this configuration, we need to edit /etc/hosts file and assign those IP mappings there. Change this file according to the example below:

/etc/hosts:
127.0.0.1    localhost    node1
127.0.0.2    node2
127.0.0.3    node3


From now on, we'll call the nodes simply as node1, node2, and node3. In order to finally test hostnames, try pinging the nodes as exemplified:

$ ping node2
PING node2 (127.0.0.2) 56(84) bytes of data.
64 bytes from node2 (127.0.0.2): icmp_seq=1 ttl=64 time=0.018 ms
64 bytes from node2 (127.0.0.2): icmp_seq=2 ttl=64 time=0.015 ms
^C
--- node2 ping statistics ---
2 packets transmitted, 2 received, 0% packet loss, time 999ms
rtt min/avg/max/mdev = 0.015/0.016/0.018/0.004 ms



Creating the first node structure



We'll first create the directory structure for node1 and after replicate it to remaining nodes. Switch to Cassandra's root directory, create subdirectories and copy files by issuing the following shell commands using your regular Linux user:

$ cd /opt/apache-cassandra-0.6.5/
$ mkdir -p nodes/node1
$ cp -R bin/ conf/ nodes/node1/
$ cd nodes/node1


The next step is to modify some default settings in the configuration files. Here I opted to use relative paths instead of absolute paths for simplification purposes.

Edit a single line in conf/log4j.properties file.

# Edit the next line to point to your logs directory
log4j.appender.R.File=./log/system.log


Now in conf/storage-conf.xml several tag values must be changed. Be sure to modify the correct tags, as indicated below:

  <Keyspaces>
    <Keyspace Name="Keyspace1">
      ...

      <ReplicationFactor>2</ReplicationFactor>
      ...

    </Keyspace>
  <Keyspaces>
  ...

  <CommitLogDirectory>./txs</CommitLogDirectory>
  <DataFileDirectories>
      <DataFileDirectory>./data</DataFileDirectory>
  </DataFileDirectories>
  ...

  <ListenAddress>node1</ListenAddress>
  <StoragePort>7000</StoragePort>
  ...

  <ThriftAddress></ThriftAddress>
  <ThriftPort>9160</ThriftPort>
  <ThriftFramedTransport>false</ThriftFramedTransport>
  ...


As we are building a special directory arrangement, we'll need to adequate some paths in the scripts. Thus, edit the file bin/cassandra.in.sh by providing the lines below appropriately:

for jar in $cassandra_home/../../lib/*.jar; do
    CLASSPATH=$CLASSPATH:$jar
done


The first node is finally concluded. So, let's turn our attentions to the other two nodes.


Creating the remaining nodes



We are gonna take advantage of the first node directory structure to create the two remaining nodes. Thus, issue the instructions below in order to perform the cloning:

$ cd /opt/apache-cassandra-0.6.5/nodes/
$ mkdir node2 node3
$ cp -R node1/* node2
$ cp -R node1/* node3


When you're done, call a tree command (if available) as illustrated below to check the whole structure:

$ tree -L 2
.
|-- node1
|   |-- bin
|   `-- conf
|-- node2
|   |-- bin
|   `-- conf
`-- node3
    |-- bin
    `-- conf


If you compare this structure to the other one in the beginning of this article, you should note that there are some subdirectories missing (i.e., log, data, txs). That's because they will be created automatically when the server starts up the first time.


Final settings on nodes



The first node is deemed a contact point, i.e., a seed. Other nodes should start the Gossip-based protocol by connecting to it.

Besides, we should make specific settings on each node. Firstly, each node must listen to it's own hostname (i.e., node1, node2, node3). Thus, edit the nodeX/conf/storage-conf.xml for each node by replacing the line below (this example applies to node2):

  <ListenAddress>node2</ListenAddress>


Monitoring on Cassandra is provided through JMX, listening to every host by default on TCP port 8080. As we are building the cluster on the same real instance, this is not possible anymore. Thus, JMX interfaces must be bound to the same host, but we must change the port on each node. Therefore, first node will be listening on 8081, node2 on 8082, and node3 on 8083.

This parameter is configured in nodeX/bin/cassandra.in.sh, so we must change only the port as exemplified below (applying to node2):

# Arguments to pass to the JVM
JVM_OPTS=" \
        -ea \
        -Xms1G \
        -Xmx1G \
        -XX:+UseParNewGC \
        -XX:+UseConcMarkSweepGC \
        -XX:+CMSParallelRemarkEnabled \
        -XX:SurvivorRatio=8 \
        -XX:MaxTenuringThreshold=1 \
        -XX:+HeapDumpOnOutOfMemoryError \
        -Dcom.sun.management.jmxremote.port=8082 \
        -Dcom.sun.management.jmxremote.ssl=false \
        -Dcom.sun.management.jmxremote.authenticate=false"



Starting up every server



For this step it is very interesting to open a new Linux terminal for each node. Thus, issue the instructions below:

$ node1/bin/cassandra -f

$ node2/bin/cassandra -f

$ node3/bin/cassandra -f


Pay attention to the console output on each node. A simple misconfiguration will be related there. Also note the relationship between the nodes. They must discover one another automatically in seconds.


Check services availability



Since every node in the cluster is up and running, their respective TCP ports must appear to the system when executing a netstat command.

In order to check listening TCP ports on Cassandra, one must search for 9160 (Thrift service), 7000 (internal storage), and 808X (JMX interface). Take a look at the example below of a successful cluster startup.

$ netstat -lptn
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
tcp        0      0 127.0.0.1:631           0.0.0.0:*               LISTEN      -             
tcp6       0      0 127.0.0.3:9160          :::*                    LISTEN      8520/java     
tcp6       0      0 127.0.0.2:9160          :::*                    LISTEN      8424/java     
tcp6       0      0 127.0.0.1:9160          :::*                    LISTEN      8336/java     
tcp6       0      0 :::46954                :::*                    LISTEN      8424/java     
tcp6       0      0 :::53418                :::*                    LISTEN      8336/java     
tcp6       0      0 :::49035                :::*                    LISTEN      8520/java     
tcp6       0      0 :::80                   :::*                    LISTEN      -             
tcp6       0      0 :::42737                :::*                    LISTEN      8520/java     
tcp6       0      0 :::8081                 :::*                    LISTEN      8336/java     
tcp6       0      0 :::8082                 :::*                    LISTEN      8424/java     
tcp6       0      0 :::8083                 :::*                    LISTEN      8520/java     
tcp6       0      0 :::60310                :::*                    LISTEN      8424/java     
tcp6       0      0 :::46167                :::*                    LISTEN      8336/java     
tcp6       0      0 ::1:631                 :::*                    LISTEN      -             
tcp6       0      0 127.0.0.3:7000          :::*                    LISTEN      8520/java     
tcp6       0      0 127.0.0.2:7000          :::*                    LISTEN      8424/java     
tcp6       0      0 127.0.0.1:7000          :::*                    LISTEN      8336/java     


The last test was from a network perspective. You might want to ask Cassandra whether these nodes are talking to each other and participating on a healthy partitioning scheme. In order to do so, another checking is to invoke nodetool's ring command.

$ cd /opt/apache-cassandra-0.6.5/

$ ./bin/nodetool -h localhost -p 8081 ring
Address       Status     Load          Range                                      Ring
                                       142865723918937898194528652808268231850  
127.0.0.1     Up         3,1 KB        39461784941927371686416024510057184051     |<--|
127.0.0.3     Up         3,1 KB        54264004217607518447601711663387808864     |   |
127.0.0.2     Up         2,68 KB       142865723918937898194528652808268231850    |-->|


Here it is, the local cluster is up and running with three nodes! :D




You can also check Cassandra's availability by using its simple client application:

$ ./bin/cassandra-cli --host node1 --port 9160
Connected to: "Test Cluster" on node1/9160
Welcome to cassandra CLI.

cassandra> set Keyspace1.Standard1['rowkey']['column'] = 'value'
Value inserted.

cassandra> get Keyspace1.Standard1['rowkey']['column']          
=> (column=636f6c756d6e, value=value, timestamp=1285273581745000)

cassandra> get Keyspace1.Standard1['rowkey']          
=> (column=636f6c756d6e, value=value, timestamp=1285273581745000)
Returned 1 results.

cassandra> del Keyspace1.Standard1['rowkey']                    
row removed.


As we set replication factor to 2 for this given keyspace Keyspace1, it is expected that each value saved in the Cassandra cluster is replicated to two nodes.

Do you really believe it is happening? If not, save another value (using set) and then kill one of the three node processes. Do not kill more than one! Then, retrieve that key again (using get). The system should be capable of automatically recovering from this intentional disaster.

You can easily make a node participate in the cluster again just by executing its respective bin/cassandra command. Take a look at all console outputs after every step, they are very interesting!


References



[1] Apache Cassandra


sexta-feira, 17 de setembro de 2010

PGDay Manaus 2010




PGDay AM 2010

No dia 15 de outubro ocorrerá o PGDay Manaus 2010 na Universidade Federal do Amazonas (UFAM), em Manaus.

Trata-se de um dia inteiro dedicado ao SGBD PostgreSQL, contendo oficinas, palestras e mostras de trabalhos.

Para maiores informações, consulte o site do evento:
http://www.postgresql.org.br/eventos/pgday/am/2010

As inscrições já estão abertas nessa mesma página, participe! :D





terça-feira, 14 de setembro de 2010

Monitorando serviços com SNMP e Zabbix


Inicialização do servidor SNMP e coleta da rede


Entre no terminal da máquina virtual e inicialize o serviço SNMP nela através do comando:

# service snmpd restart


Ainda no terminal, execute os comandos snmpget, snmpgetnext e snmpwalk a fim de testar o cliente e servidor SNMP fornecido pela implementação do Net-SNMP:

# snmpget -v 1 -c public localhost sysDescr.0

# snmpget -v 1 -c public localhost system.sysName.0

# snmpget -v 1 -c public localhost ifDescr.2

# snmpgetnext -v 1 -c public localhost interfaces

# snmpgetnext -v 1 -c public localhost host

# snmpgetnext -v 1 -c public localhost snmpV2

# snmpwalk -v 1 -c public localhost system

# snmpwalk -v 1 -c public localhost interfaces


Repita os comandos anteriores utilizando os parâmetros -On, -Os e -Of.

Volte ao frontend do Zabbix e inclua um item com as seguintes configurações:

  • Description: eth0 in/s

  • Type: SNMPv1 agent

  • SNMP OID: interfaces.ifTable.ifEntry.ifInOctets.2

  • SNMP community: public

  • SNMP port: 161

  • Key: eth0.in

  • Type of information: Numeric (unsigned)

  • Store value: Delta (speed per second)





Repita a operação para incluir este outro indicador:

  • Description: eth0 out/s

  • Type: SNMPv1 agent

  • SNMP OID: ifOutOctets.2

  • SNMP community: public

  • SNMP port: 161

  • Key: eth0.out

  • Type of information: Numeric (unsigned)

  • Store value: Delta (speed per second)

Aguarde alguns minutos e então acesse o menu Monitoring > Latest data e abra os gráficos correspondente as coletas dos indicadores recém-criados:






Coleta da carga da CPU e gráficos customizados


Entre no terminal da máquina virtual e explore a UCD-MIB executando os comandos a seguir:


# snmpwalk -v 1 -c public localhost ucdavis | less

# snmpbulkget -v 2c -c public localhost -C r3 laNames laLoadInt -Os


Observe a carga média dos processadores executando as instruções a seguir:


# uptime

# snmpwalk -v 1 -c public localhost laLoadInt -Os


Provoque o aumento no consumo de CPU executando diversas vezes o comando abaixo:


# cat /dev/urandom | md5sum &


    Dica: para cancelar a execução desses processos, utilize os comandos jobs e kill do Linux.

De volta ao frontend do Zabbix, cadastre o seguinte indicador para o host:


  • Description: cpu load-1

  • Type: SNMPv1 agent

  • SNMP OID: laLoadInt.1 (pode ser OID completo – textual ou numérico)

  • SNMP community: public

  • SNMP port: 161

  • Key: laLoadInt.1

  • Type of information: Numeric (float)

  • Use multiplier: Custom multiplier

  • Custom multiplier: 0.01






Utilizando a funcionalidade de clonagem de indicadores, crie mais dois indicadores alterando apenas os campos indicados abaixo:


  • Description: cpu load-5

  • SNMP OID: laLoadInt.2

  • Key: laLoadInt.2

  • Description: cpu load-15

  • SNMP OID: laLoadInt.3

  • Key: laLoadInt.3


Acesse o menu Configuration > Hosts e crie um novo gráfico contendo os três indicadores de carga de CPU recém incluídos para o host:





Acesse o menu Monitoring > Latest data e o menu Monitoring > Graphs para acompanhar a evolução da coleta destes indicadores:






Referências:

Testando Zabbix Appliance com VirtualBox

Baixe a partir do endereço a seguir o pacote Debian adequado a sua distribuição Linux:

    http://www.virtualbox.org/wiki/Linux_Downloads

Instale o pacote via DPKG utilizando as instruções abaixo:


# dpkg -i virtualbox-*.deb

# apt-get -f install


Baixe a imagem ISO do Zabbix Appliance a partir do endereço a seguir:

    http://www.zabbix.com/download.php






Execute o VirtualBox e crie uma nova máquina utilizando os parâmetros seguintes:

  • Nome: Zabbix 1.8

  • Sistema Operacional: Linux

  • Versão: openSUSE

  • Memória: 512 MB

  • Disco rígido: (nenhum)





Clique na máquina recém-criada e associe a imagem ISO do Zabbix Appliance no drive de CD:





Configure a rede para utilizar a opção “host-only” e a interface “vboxnet0”:




Inicie a nova máquina virtual e aguarde o completo carregamento do sistema operacional.

No prompt que se abrirá, entre com o usuário “root” e a senha “zabbix”.

Carregue o mapa de teclados adequado e confira o endereço IP associado à máquina virtual através dos comandos a seguir:

# loadkeys br-abnt2

# ifconfig eth0


Anote o endereço IP exibido (ex: 192.168.56.101).


Abra no navegador o endereço HTTP correspondente ao frontend do Zabbix. Ex:

    http://192.168.56.101/zabbix/

Entre na aplicação utilizando o login “Admin” e a password “zabbix”:





Acesse o menu Configuration > Hosts e crie um novo host utilizando essas configurações:

  • Name: Local

  • Groups: Linux servers

  • IP address: 127.0.0.1

  • Connect to: IP address





Acesse os indicadores do host e inclua um novo conforme indicado a seguir:

  • Description: cpu load

  • Type: Zabbix agent

  • Key: system.cpu.load

  • Type of information: Numeric (float)





Aguarde alguns minutos e então acesse o menu Monitoring > Latest data, procurando pelos dados históricos e gráficos gerados pela coleta do indicador “cpu load”.





Referências:

quinta-feira, 2 de setembro de 2010

Simplifying Cassandra interactions with Helena



Introduction


In the previous article "Talking to Cassandra using Java and DAO pattern" [1] we saw an introduction to NoSQL-based databases and especially one called Apache Cassandra [2]. After a quick explanation of its data model, we analyzed what it takes to implement a Java code which goal was to access and yet modify key-value pairs stored on a Cassandra keyspace. At that time we used a low-level interface API named Thrift.

You might have noticed that a large amount of code was needed to build an entire and yet simple persistence class, namely a structure based on Data Access Object design pattern. If we compare it to currently most used persistence technologies and frameworks in Java (i.e. JPA and Hibernate), we would probably get crazy!

Is this the price we pay for high-availability and scalability when changing from a traditional relational database model to a newly key-value data store? Poor developers... Well, that's not the end, fortunately! There are already available several high level clients for Cassandra in multiple programming languages (see [3]).

One of these clients, HelenaORM [4] (created by Marcus Thiesen), is the study subject this time. As this library is addressed to Java language, our aim will be to implement the persistence of a simple object using Helena's support.


Creating the entity


As in the other article, we'll choose the "group" entity to exemplify our codes. This time, in addition to the plain old Java class elements, in the Group class we need to annotate it with @HelenaBean and @KeyProperty.

The annotation @HelenaBean is used to specify keyspace and column family for the class, whereas @KeyProperty must indicate which field in the class should be considered the row key in Cassandra - currently annotation only works at getter or setter methods, not in the variable itself. Both annotations are analogous to JPA's @Entity and @Id annotations.

Take a look at the Group entity class properly prepared to be used by HelenaORM:


@HelenaBean(keyspace="ContactList", columnFamily="Groups")
public class Group {

private Integer id;
private String name;

public Group() {
}

public Group(Integer id, String name) {
this.id = id;
this.name = name;
}

@KeyProperty
public Integer getId() {
return id;
}
public String getName() {
return name;
}
public void setId(Integer id) {
this.id = id;
}
public void setName(String name) {
this.name = name;
}

@Override
public String toString() {
return "Group [id=" + id + ", name=" + name + "]";
}

}



Creating the tests


The most interesting new is about to come: with Helena there's no need of traditional building a DAO interface and class!

Helena brings a factory HelenaORMDAOFactory designed to create ready-to-use DAO classes type-safely pointed to a given entity. The classes it produces, of HelenaDAO type, provides these methods: insert(), delete(), and get(). They are out-of-box implementations respectively for inserting (or editing), removing, and retrieving entire object instances from Cassandra.

So, here is the corresponding unitary test class in JUnit reserved for invoking inserts, deletions, and retrievals of Java object instances into Cassandra with the support of Helena:


public class GroupTest {

static HelenaORMDAOFactory factory;
private HelenaDAO<Group> dao;

private static final Integer GROUP_ID = 123;
private static final String GROUP_NAME = "Test Group";

@BeforeClass
public static void setUpBeforeClass() throws Exception {
factory = HelenaORMDAOFactory.withConfig(
"localhost", 9160, SerializeUnknownClasses.YES);
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
factory = null;
}

@Before
public void setUp() throws Exception {
dao = factory.makeDaoForClass(Group.class);
}

@After
public void tearDown() throws Exception {
dao = null;
}

@Test
public void testSave() {

System.out.println("GroupDAOTest.testSave()");

Group group = new Group();
group.setId(GROUP_ID);
group.setName(GROUP_NAME);

System.out.println("Saving group: " + group);
dao.insert(group);
Assert.assertTrue(true);

Group retrieved = dao.get(GROUP_ID.toString());
System.out.println("Retrieved group: " + retrieved);
Assert.assertNotNull(retrieved);
Assert.assertEquals(GROUP_ID, retrieved.getId());
Assert.assertEquals(GROUP_NAME, retrieved.getName());
}

@Test
public void testRetrieve() {

System.out.println("GroupDAOTest.testRetrieve()");

System.out.println("Saving groups");
for (int i = 1; i <= 10; i++) {
Group group = new Group();
group.setId(GROUP_ID * 100 + i);
group.setName(GROUP_NAME + " " + i);
dao.insert(group);
}

List<Group> list = dao.getRange("", "", 10);
System.out.println("Retrieving groups");
Assert.assertNotNull(list);
Assert.assertFalse(list.isEmpty());
Assert.assertTrue(list.size() >= 10);

System.out.println("Retrieved list:");
for (Group group : list) {
System.out.println("- " + group);
}
}

@Test
public void testRemove() {

System.out.println("GroupDAOTest.testRemove()");

Group group = new Group();
group.setId(GROUP_ID);
group.setName(GROUP_NAME);

System.out.println("Saving group: " + group);
dao.insert(group);

System.out.println("Removing group: " + group);
dao.delete(group);
Assert.assertTrue(true);

Group retrieved = dao.get(GROUP_ID.toString());
System.out.println("Retrieved group: " + retrieved);
Assert.assertNull(retrieved);
}

}



Checking the results


After properly executing the tests, you should check the keyspace contents inside Cassandra. In order to do that, you can use Cassandra's client by issuing the instructions described below:


cassandra> count ContactList.Groups['12305']
1 column

cassandra> get ContactList.Groups['12305']
=> (column=name, value=Test Group 5, timestamp=1283287882927)
Returned 1 result.

cassandra> get ContactList.Groups['12305']['name']
=> (column=name, value=Test Group 5, timestamp=1283287882927)



Conclusions


Traditional relational databases took more than 20 years of evolution to reach the state they are now. Aside, object-oriented programming languages and techniques conquered companies and developers forcing the creation of persistence frameworks in order to link both worlds. Soon as Internet usage grew, RDBMSs were not able to efficiently scale, thus a new paradigm was conceived (or reborn): the key-value distributed database model!

Cassandra, one of those distributed databases, is not trivial to talk to, from a developer perspective. In opposition to use a low-level interface API (i.e. Thrift), a lot of high-level clients were created by individual developers (thanks to open source initiatives!), and one of them was HelenaORM.

Thus, in the present article we saw how to leverage and simplify our Java code which handles persistence in Cassandra with the aid of HelenaORM libraries. A lot of work was reduced, isn't it? :D


References

[1] Talking to Cassandra using Java and DAO pattern
[2] Apache Cassandra
[3] Cassandra high level clients
[4] HelenaORM

quarta-feira, 1 de setembro de 2010

Talking to Cassandra using Java and DAO pattern


Introduction


It is definitely unavoidable and not just another hype this wave of NoSQL - an alternative to rigid persistence models provided by RDBMS (Relational Database Management Systems). In a connected world that needs more and more data in a continuous growth and speed, it is proved by huge technology companies like Google that changes need to be done in order to accomodate this. One of these turns was indeed NoSQL databases, such as Bigtable, Hypertable, and Cassandra. The latter will be the subject of this article.

As described in its home page [1]:

"The Apache Cassandra Project develops a highly scalable second-generation distributed database, bringing together Dynamo's fully distributed design and Bigtable's ColumnFamily-based data model. Cassandra was open sourced by Facebook in 2008, and is now developed by Apache committers and contributors from many companies."

So far, Cassandra is in use at Digg, Facebook, Twitter, Reddit, Rackspace, Cloudkick, Cisco, SimpleGeo, Ooyala, OpenX, and more companies that have large, active data sets. The largest production cluster has over 100 TB of data in over 150 machines!

Therefore, let's take a walk-through on how can this new storing and programming model be achieved, particularly using a client on Java language.

Database keyspace setup


We are considering Cassandra's stable version on the time of this document: 0.6.5. Yet, we'll run the server in single mode for simplification purposes. Remember that JDK 1.6 (Java Development Kit) is needed to run Cassandra server.

The first step on this study will be to include the keyspace "ContactList". In order to to this, add the following lines into CASSANDRA_HOME/conf/storage-conf.xml, inside <Keyspaces> tag:

<Keyspace Name="ContactList">
<ColumnFamily CompareWith="UTF8Type" Name="Groups"/>
<ColumnFamily CompareWith="UTF8Type" Name="Contacts"/>
<ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
<ReplicationFactor>1</ReplicationFactor>
<EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
</Keyspace>

Start (or restart) your Cassandra server (by issuing "CASSANDRA_HOME/bin/cassandra -f") after editing that file. The proper data files and index structures should be created.

Please consider reading Cassandra's Getting Started guide [2] for further information on this process.

Once the server is online, check the new keyspace availability by using the client application invoking the following command:

CASSANDRA_HOME/bin/cassandra-cli --host localhost --port 9160

The resulting expected screen is shown below:

Connected to: "Test Cluster" on localhost/9160
Welcome to cassandra CLI.

Type 'help' or '?' for help. Type 'quit' or 'exit' to quit.
cassandra>

You could then try to show available keyspaces on the server, as illustrated:

cassandra> show keyspaces
ContactList
Keyspace1
system

Note that "ContactList" should be included. If not, you should review the previous steps.

Now invoke the instruction below in order to explain detailed information on the new keyspace:

cassandra> describe keyspace ContactList
ContactList.Groups
Column Family Type: Standard
Columns Sorted By: org.apache.cassandra.db.marshal.UTF8Type@33b121

Column Family Type: Standard
Column Sorted By: org.apache.cassandra.db.marshal.UTF8Type
flush period: null minutes
------
ContactList.Contacts
Column Family Type: Standard
Columns Sorted By: org.apache.cassandra.db.marshal.UTF8Type@1b22920

Column Family Type: Standard
Column Sorted By: org.apache.cassandra.db.marshal.UTF8Type
flush period: null minutes
------

OK, apparently our key-value storage is working well. It's time for coding! :D

Starting the use case


We'll start by implementing a simple "group" entity, represented in Java by a POJO-based class named Group with the content below:

public class Group {

private Integer id;
private String name;

public Group() {
}

public Group(Integer id, String name) {
this.id = id;
this.name = name;
}

public Integer getId() {
return id;
}
public String getName() {
return name;
}
public void setId(Integer id) {
this.id = id;
}
public void setName(String name) {
this.name = name;
}

@Override
public String toString() {
return "Group [id=" + id + ", name=" + name + "]";
}

}

Concerns must be separated in different layers. In order to persist the "group" entity, it is highly recommended to have a DAO pattern-based interface. That interface, named IGroupDAO, is destinated to exclusively handle operations on that entity. Here is its corresponding Java code:

public interface IGroupDAO {

void startup();

void shutdown();

List<Group> findAll();

Group findById(Integer id);

void save(Group group);

void remove(Group group);

}

Simply put, save() and remove() methods are enough to perform modifications on the data, whereas findAll() and findById() are reserved for retrieving purposes. You might then ask: "what da hell are startup() and shutdown() doing there"? Hold on, they are a necessary "gambiarra" to show stuff working and might not be inserted on a real application. :)

Developers, developers, developers: do not forget to write tests before actual codes..! Test Driven Development (TDD) is definitely important and worthful besides being so funny to use at coding time!

As we are using Java, let's create unitary a test against that DAO interface using the JUnit [3] testing framework. The resulting class, named GroupDAOTest has the code shown below:

public class GroupDAOTest {

private static IGroupDAO dao;

private static final Integer GROUP_ID = 123;
private static final String GROUP_NAME = "Test Group";

@BeforeClass
public static void setUpBeforeClass() throws Exception {
dao = new GroupDAO();
dao.startup();
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
dao.shutdown();
dao = null;
}

@Test
public void testSave() {

System.out.println("GroupDAOTest.testSave()");

Group group = new Group();
group.setId(GROUP_ID);
group.setName(GROUP_NAME);

System.out.println("Saving group: " + group);
dao.save(group);
Assert.assertTrue(true);

Group retrieved = dao.findById(GROUP_ID);
System.out.println("Retrieved group: " + retrieved);
Assert.assertNotNull(retrieved);
Assert.assertEquals(GROUP_ID, retrieved.getId());
Assert.assertEquals(GROUP_NAME, retrieved.getName());
}

@Test
public void testRetrieve() {

System.out.println("GroupDAOTest.testRetrieve()");

System.out.println("Saving groups");
for (int i = 1; i <= 10; i++) {
Group group = new Group();
group.setId(GROUP_ID * 100 + i);
group.setName(GROUP_NAME + " " + i);
dao.save(group);
}

List<Group> list = dao.findAll();
System.out.println("Retrieving groups");
Assert.assertNotNull(list);
Assert.assertFalse(list.isEmpty());
Assert.assertTrue(list.size() >= 10);

System.out.println("Retrieved list:");
for (Group group : list) {
System.out.println("- " + group);
}
}

@Test
public void testRemove() {

System.out.println("GroupDAOTest.testRemove()");

Group group = new Group();
group.setId(GROUP_ID);
group.setName(GROUP_NAME);

System.out.println("Saving group: " + group);
dao.save(group);

System.out.println("Removing group: " + group);
dao.remove(group);
Assert.assertTrue(true);

Group retrieved = dao.findById(GROUP_ID);
System.out.println("Retrieved group: " + retrieved);
Assert.assertNull(retrieved);
}

}


The goal of this GroupDAOTest test class is to invoke the methods belonging to IGroupDAO interface. For each DAO functionality a method on GroupDAOTest must be created and annotated with @Test. The expected results might be tested using Assert class.

Actual persistence


Base codes were built along with testing classes. However, persistence against Cassandra is not being made yet. Thus, how could we get access to a Cassandra database from a client perspective?

Cassandra uses a more general API called Thrift [4] for its external client-facing interactions. Cassandra's main API/RPC/Thrift port is 9160. Thrift supports a wide variety of languages so you can code your application to use Thrift directly (take a look at [5]), or use a high-level client where available.

Of course we're gonna use the low-level Thrift interface. :)

The first thing to do is to add several Java libraries related to Cassandra and Thrift to the project's classpath. They are released along with Cassandra binaries, especifically at CASSANDRA_HOME/lib directory.

And now we finally implement the GroupDAO class, which must provide actual instructions assigned by IGroupDAO interface.

In this code, we'll need to manually handle the connection to Cassandra through its binary protocol on TCP port 9160. This is exactly where startup() and shutdown() get in.

If you're still not familiar with terms used at key-value databases, such as keystore, column families, etc, don't worry. I suggest you further reading of some articles about Cassandra's Data Model [6, 7]. For the moment, just know that Thrift interface for Java provides support classes like Cassandra.Client, ColumnParent, ColumnOrSuperColumn, KeyRange, SliceRange, and SlicePredicate.

Take a look at the code below for GroupDAO implementation class:

public class GroupDAO implements IGroupDAO {

private static final String KEYSPACE = "ContactList";
private static final String COLUMN_FAMILY = "Groups";
private static final String ENCODING = "utf-8";

private static TTransport tr = null;

/**
* Close the connection to the Cassandra Database.
*/
private static void closeConnection() {
try {
tr.flush();
tr.close();
} catch (TTransportException exception) {
exception.printStackTrace();
}
}

/**
* Open up a new connection to the Cassandra Database.
*
* @return the Cassandra Client
*/
private static Cassandra.Client setupConnection() {
try {
tr = new TSocket("localhost", 9160);
TProtocol proto = new TBinaryProtocol(tr);
Cassandra.Client client = new Cassandra.Client(proto);
tr.open();
return client;
} catch (TTransportException exception) {
exception.printStackTrace();
}
return null;
}

private Cassandra.Client client;

@Override
public void startup() {
this.client = setupConnection();
}

@Override
public void shutdown() {
closeConnection();
}

@Override
public List<Group> findAll() {
List<Group> list = new ArrayList<Group>();
try {
KeyRange keyRange = new KeyRange();
keyRange.setStart_key("");
keyRange.setEnd_key("");

SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new byte[] {});
sliceRange.setFinish(new byte[] {});

SlicePredicate slicePredicate = new SlicePredicate();
slicePredicate.setSlice_range(sliceRange);

ColumnParent columnParent = new ColumnParent(COLUMN_FAMILY);
List<KeySlice> keySlices = client.get_range_slices(KEYSPACE,
columnParent, slicePredicate, keyRange,
ConsistencyLevel.ONE);

if (keySlices == null || keySlices.isEmpty())
return list;

for (KeySlice keySlice : keySlices) {

Group group = new Group();
group.setId(Integer.parseInt(keySlice.getKey()));

for (ColumnOrSuperColumn c : keySlice.getColumns()) {
if (c.getColumn() != null) {
String name = new String(c.getColumn().getName(),
ENCODING);
String value = new String(c.getColumn().getValue(),
ENCODING);
// long timestamp = c.getColumn().getTimestamp();
if (name.equals("name")) {
group.setName(value);
}
}
}

list.add(group);
}
return list;

} catch (Exception exception) {
exception.printStackTrace();
}
return null;
}

@Override
public Group findById(Integer id) {
try {
SlicePredicate slicePredicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new byte[] {});
sliceRange.setFinish(new byte[] {});
slicePredicate.setSlice_range(sliceRange);

ColumnParent columnParent = new ColumnParent(COLUMN_FAMILY);
List<ColumnOrSuperColumn> result = client.get_slice(KEYSPACE, id
.toString(), columnParent, slicePredicate,
ConsistencyLevel.ONE);

if (result == null || result.isEmpty())
return null;

Group group = new Group();
group.setId(id);

for (ColumnOrSuperColumn c : result) {
if (c.getColumn() != null) {
String name = new String(c.getColumn().getName(), ENCODING);
String value = new String(c.getColumn().getValue(), ENCODING);
// long timestamp = c.getColumn().getTimestamp();
if (name.equals("name")) {
group.setName(value);
}
}
}

return group;
} catch (Exception exception) {
exception.printStackTrace();
}
return null;
}

@Override
public void save(Group group) {
try {
long timestamp = System.currentTimeMillis();
Map<String, List<ColumnOrSuperColumn>> job = new HashMap<String, List<ColumnOrSuperColumn>>();

List<ColumnOrSuperColumn> columns = new ArrayList<ColumnOrSuperColumn>();
Column column = new Column("name".getBytes(ENCODING), group
.getName().getBytes(ENCODING), timestamp);
ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
columnOrSuperColumn.setColumn(column);
columns.add(columnOrSuperColumn);

job.put(COLUMN_FAMILY, columns);

client.batch_insert(KEYSPACE, group.getId().toString(), job,
ConsistencyLevel.ALL);
} catch (Exception exception) {
exception.printStackTrace();
}
}

@Override
public void remove(Group group) {
try {
ColumnPath columnPath = new ColumnPath(COLUMN_FAMILY);
client.remove(KEYSPACE, group.getId().toString(), columnPath,
System.currentTimeMillis(), ConsistencyLevel.ALL);
} catch (Exception exception) {
exception.printStackTrace();
}
}

}

Now make sure you project is fully "buildable" and make the testing class run! Unless your Cassandra server is offline, the resulting on JUnit is "evergreen".

Here are the tasks supposed to be performed by the given unitary test:
  1. instantiate a group identified by the key 123 and persist it, searching it and comparing to the expected object;
  2. persist a group, remove it on the database and then try to search for its key expecting not to find it;
  3. insert 10 instances of groups with keys numbered from 12301 to 12310 and test their retrieval.

If you're so skeptical as me, you'll love to search for those persisted data by hands. In that case, prompt to the Cassandra's client and try sending some commands to retrieve groups and their columns recently stored. Here are some suggestions:

cassandra> count ContactList.Groups['12301']
1 columns

cassandra> get ContactList.Groups['12301']
=> (column=name, value=Test Group 1, timestamp=1283287745613)
Returned 1 results.

cassandra> get ContactList.Groups['12301']['name']
=> (column=name, value=Test Group 1, timestamp=1283287745613)


Conclusion


We have reached the end of this study, at last! In this article some concepts about a NoSQL-based database called Cassandra were introduced. Besides, we implemented in Java a very simple example of persistence into a Cassandra keyspace using a low-level API named Thrift. In the end we just searched for the stored values in the database through Cassandra's client.

References


[1] Apache Cassandra
[2] Getting Started with Cassandra
[3] JUnit - Unit Testing Framework
[4] Apache Thrift
[5] Cassandra Wiki - Thrift Examples
[6] Cassandra Wiki - Data Model
[7] A Quick Introduction to the Cassandra Data Model