Cloud Zone is brought to you in partnership with:

I am a computer science and engineering undergraduate interested in web services security and big data technologies. I enjoy sharing my exposure and experience in those areas via my blog. Pushpalanka is a DZone MVB and is not an employee of DZone and has posted 12 posts at DZone. You can read more from them at their website. View Full User Profile

Using Casandra's 'sstable' for Bulk-Loading Data on the Cloud

07.31.2012
| 5358 views |
  • submit to reddit
The 'sstableloader' introduced from Apache Cassandra 0.8.1 onwards, provides a powerful way to load huge volumes of data into a Cassandra cluster. If you are moving from a cloud cluster to a dedicated cluster or vice-versa or  from a different database to Cassandra you will be interested in this tool. As shown below in whatever case if you can generate the 'sstable' from the data to be loaded into Cassandra, you can load it in bulk to the cluster using 'sstableloader'. I have tried it in version 1.1.2 here.

With this post I ll share my experience where I created sstables from a .csv file and loaded to a Cassandra instance running on same machine, which acts as the cluster here.
  1. sstable generation
  2. Bulk loading Cassandra using sstableloader
  3. Using JMX
'sstable' generation To generate 'SSTableSimpleUnsortedWriter' the 'cassandra.yaml' file should be present in the class path. In Intellij Idea you can do it in Run-->Edit Configurations-->Application-->Configuration-->VM Options. There you should give the path to cassandra.yaml as follows.
-Dcassandra-foreground -Dcassandra.config=file:///<path to/apache-cassandra-1.1.2/conf/cassandra.yaml> -ea -Xmx1G


Here is the simple code to generate the sstables according to the context I tried, referring the documentation in Datastax. With just few modifications you may be able to use it. I ll try to explain the code bit below. 
SSTableSimpleUnsortedWriter eventWriter = new SSTableSimpleUnsortedWriter(directory, partitioner, keySpace, "Events", AsciiType.instance,null, 64);
This writer does not assume any order in the rows. Instead it buffers the rows in memory and write them in sorted order. You can define a threshold on the amount of rows to be buffered to avoid loading entire data set in memory. Each time the threshold is reached one sstable is created and buffer is rested.

directory - The directory to write sstables partitioner - strategy to distribute data over the nodes. I have used RandomPartitioner which use MD5 hash value to distribute data. This blogpost may help you to decide on what to use according to your context from RandomPartitioner and OrderPreservingPartitioner. There are two more partitioners available. keySpace - the Keyspace name "Events" - name of column family AsciiType.instance - the column family comparator null - the subComparator is set to null as this is not a super column family 64 - buffer size in MB. This should be decided upon the context to achieve best performance

With the following code we are creating the rows and adding columns of each row according to the entry read from .csv file. As for the Cassandra wiki one row can have upto 2Billion columns like this.
 
eventWriter.newRow(uuid);
eventWriter.addColumn(bytes("sourceAdd"),bytes(entry.sourceAdd), timestamp);
eventWriter.addColumn(bytes("sourceChannelType"),bytes(entry.sourceChannelType), timestamp);

The static nested class CsvEntry is used to read just the relevant fields from the csv row.

Once you run the code pointing to the csv file, there will be a directory created in the location you specified as 'directory'. Inside it you will find something similar to following which contains the created sstables.

Bulk loading Cassandra using sstableloader Inside bin directory of Cassandra you can find this tool sstableloader. You can run it through command line pointing to the above generated sstables. Good guidance on that can be found in Datastax and this blog. Also you can directly use the class 'org.apache.cassandra.tools.Bulkloader' in java code to load the sstables to a Cassandra cluster.

If you are testing all this in localhost, following steps need to be taken to try out sstableloader.

  • Get a copy of the running Cassandra instance
  • Set another loop-back address. In Linux you can do it using,
sudo ifconfig lo:0 127.0.0.2 netmask 255.0.0.0 up
  • Set the rpc address and listen address of the copied /conf/casandra.yaml to 127.0.0.2. Of course you can set rpc address to 0.0.0.0 if you want to listen all interfaces.
  • Then from the copied Cassandra run sstableloader we run sstableloader from command line as follows,
./sstableloader -d 127.0.0.2 <path to generated sstables>
  • It needs to be noticed the path should end as /keyspace_name/columnfamily_name (eg : ...../CDRs/Events for the above screenshot)

Using JMX bulk load

You can also use the following code to bulk load Cassandra from generated sstables. I received this from Cassandra user mailing list from Brian Jeltema. The main method needs to be run giving path to generated sstables as above, as an argument.
import java.io.IOException;>
import java.util.HashMap;
import java.util.Map;

import javax.management.JMX;
import javax.management.MBeanServerConnection;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

import org.apache.cassandra.service.StorageServiceMBean;


public class JmxBulkLoader {
    private JMXConnector connector;
    private StorageServiceMBean storageBean;


    public JmxBulkLoader(String host, int port) throws Exception    {
        connect(host, port);
    }
    private void connect(String host, int port) throws IOException, MalformedObjectNameException    {
        JMXServiceURL jmxUrl = new JMXServiceURL(String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", host, port));
        Map<String,Object> env = new HashMap<String,Object>();
        connector = JMXConnectorFactory.connect(jmxUrl, env);
        MBeanServerConnection mbeanServerConn = connector.getMBeanServerConnection();
        ObjectName name = new ObjectName("org.apache.cassandra.db:type=StorageService");
        storageBean = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class);
    }

    public void close() throws IOException    {
        connector.close();
    }

    
    public void bulkLoad(String path) {
        storageBean.bulkLoad(path);
    }

    
    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            throw new IllegalArgumentException("usage: paths to bulk files");
        }
        JmxBulkLoader np = new JmxBulkLoader("127.0.0.1", 7199);
        for (String arg : args) {
            np.bulkLoad(arg);
        }
        np.close();
    }
}
Published at DZone with permission of Pushpalanka Jayawardhana, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)