Cassandra is not working with UDT

1.5k views Asked by At

I have one Java Application in which, I have Spark-1.4.0 and Cassandra-2.1.5 with Cassandra-Spark-connection-1.4.0-M1.

In this Application, I am trying to store Java Bean Class into Cassandra table using Dataframe or using javaFunctions class which has some UDTs.

messages.foreachRDD(new Function2<JavaRDD<Message>,Time,Void>(){
    @Override
    public Void call(JavaRDD<Message> arg0, Time arg1) throws Exception {
        javaFunctions(arg0).writerBuilder(
                Properties.getString("spark.cassandra.keyspace"),
                Properties.getString("spark.cassandra.table"),
                mapToRow(Message.class)).saveToCassandra();

OR

messages.foreachRDD(new Function2<JavaRDD<Message>,Time,Void>(){
    @Override
    public Void call(JavaRDD<Message> arg0, Time arg1) throws Exception {

        SQLContext sqlContext = SparkConnection.getSqlContext();
        DataFrame df = sqlContext.createDataFrame(arg0, Message.class);

        df.write()
                .mode(SaveMode.Append)
                .option("keyspace",Properties.getString("spark.cassandra.keyspace"))
                .option("table",Properties.getString("spark.cassandra.table"))
                .format("org.apache.spark.sql.cassandra").save();

But i got this error

15/06/16 19:51:38 INFO CassandraConnector: Disconnected from Cassandra cluster: BDI Cassandra
15/06/16 19:51:39 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 4, 192.168.1.19): com.datastax.spark.connector.types.TypeConversionException: Cannot convert object null to com.datastax.spark.connector.UDTValue.
    at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:44)
    at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:40)
    at com.datastax.spark.connector.types.UserDefinedType$$anon$1$$anonfun$convertPF$1.applyOrElse(UserDefinedType.scala:33)
    at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:40)
    at com.datastax.spark.connector.types.UserDefinedType$$anon$1.convert(UserDefinedType.scala:31)
    at com.datastax.spark.connector.writer.SqlRowWriter$$anonfun$readColumnValues$1.apply$mcVI$sp(SqlRowWriter.scala:21)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:20)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:8)
    at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:35)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:119)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:102)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:101)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:130)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:101)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:119)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Previously, I am successfully able to save message object into Cassandra table using mapper class.

MappingManager mapping=new MappingManager(session);
                Mapper<Message> mapper=mapping.mapper(Message.class);
                mapper.save(message);

This is my Java Bean

import com.datastax.driver.mapping.annotations.FrozenKey;
import com.datastax.driver.mapping.annotations.Table;
@Table(name = "data")
public class Message implements Serializable{
    private static final long serialVersionUID = 42L;
    private String admin;
    private String searchname;
    private String searchsource;
    private String searchtype;
    private String messageid;
    private String message;
    @FrozenKey
    private List<Action> actions;
    @Frozen
    private AdminCreator admincreator;
    @Frozen
    private AppReference appreference;
    private String caption;
    @Frozen
    private Reference referencefrom;
    private String icon;
    private Boolean ishidden;
              .....
              .....
              .....
1

There are 1 answers

0
RussS On BEST ANSWER

This functionality was not present in the 1.4.0 release. I have a patch at https://github.com/datastax/spark-cassandra-connector/pull/856 And this should be fixed for future releases.

Please test and comment on https://datastax-oss.atlassian.net/browse/SPARKC-271 if you have time.