Datastax Cassandra bulkloader

468 views Asked by At

I am attempting to use the JMX BulkLoader to ETL data into Cassandra from a remote node onto the cluster

https://github.com/PatrickCallaghan/datastax-analytics-example/blob/master/src/main/java/com/datastax/jmxloader/JmxBulkLoader.java

However after successfully establishing JMX connection, it seems to fail to bulk load.

Note that the bulk load is issued from a remote node to the cassandra cluster.

It almost as if it seems that it expects to be running in locality of the cassandra cluster (i.e. localhost to cassandra cluster)

Am I missing anything here. Can anybody advice

The exception below

java.lang.IllegalArgumentException: Invalid directory /XXXXXXXXX at org.apache.cassandra.service.StorageService.bulkLoadInternal(StorageService.java:3970) at org.apache.cassandra.service.StorageService.bulkLoadAsync(StorageService.java:3962) at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at sun.reflect.misc.Trampoline.invoke(MethodUtil.java:75) at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at sun.reflect.misc.MethodUtil.invoke(MethodUtil.java:279) at com.sun.jmx.mbeanserver.StandardMBeanIntrospector.invokeM2(StandardMBeanIntrospector.java:112) at com.sun.jmx.mbeanserver.StandardMBeanIntrospector.invokeM2(StandardMBeanIntrospector.java:46) at com.sun.jmx.mbeanserver.MBeanIntrospector.invokeM(MBeanIntrospector.java:237) at com.sun.jmx.mbeanserver.PerInterface.invoke(PerInterface.java:138) at com.sun.jmx.mbeanserver.MBeanSupport.invoke(MBeanSupport.java:252) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819) at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.

class JmxBulkLoader(host: String, port: Int) {

  private var connector: JMXConnector = _

  private var storageBean: StorageServiceMBean = _

  private var timer: Timer = new Timer()

  connect("http://hostip , 7199)

 private def connect(host: String, port: Int) {

    val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi".format(host,

      port))

    Logger.info(" Connected to JMX Entity " + jmxUrl)

    val env = new HashMap[String, Any]()

    connector = JMXConnectorFactory.connect(jmxUrl, env)

    val mbeanServerConn = connector.getMBeanServerConnection

    val name = new ObjectName("org.apache.cassandra.db:type=StorageService")

    storageBean = JMX.newMBeanProxy(mbeanServerConn, name, classOf[StorageServiceMBean])

  }

  def close() {

    connector.close()

  }

  def bulkLoad(path: String): Boolean = {

    try {

      val timer = new Stopwatch().start

      val result = storageBean.bulkLoadAsync(path)

      timer.stop

      Logger.info("Async Result of Bulk Load " + result)

      Logger.info("Bulk load took " + timer.getElapsedTime + "millsecs.")

      true

    } catch {

      case e: Exception =>

        Logger.error("Error in Bulk Loading " + e.printStackTrace())

        false

    }

  }

}
2

There are 2 answers

0
luxifer On

It almost as if it seems that it expects to be running in locality of the cassandra cluster (i.e. localhost to cassandra cluster)

Not quite. But think of it: You're calling a Cassandra node's mbean function with a string parameter. This call gets executed by the Cassandra process you're calling (i.e., connecting to). The parameter specifies a path on the side of the node you're connecting to.

You have to make sure the path exists on the target and holds the data you expect (e.g., via shared storage or copying the files beforehand).

0
Artem Aliev On
  1. The Table should exist in Cassandra
  2. The directory should be accessible(local) for cassandra node.
  3. The directory should end with keyspace and target table name:
    /some_path/$KeySpaceName/$TableName