How to write JavaRDD to marklogic database

334 views Asked by At

I am evaluating spark with marklogic database. I have read a csv file, now i have a JavaRDD object which i have to dump into marklogic database.

    SparkConf conf = new SparkConf().setAppName("org.sparkexample.Dataload").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> data = sc.textFile("/root/ml/workArea/data.csv");
    SQLContext sqlContext = new SQLContext(sc);
    JavaRDD<Record> rdd_records = data.map(
      new Function<String, Record>() {
          public Record call(String line) throws Exception {
             String[] fields = line.split(",");
             Record sd = new Record(fields[0], fields[1], fields[2], fields[3],fields[4]);
             return sd;
          }
    });

This JavaRDD object i want to write to marklogic database.

Is there any spark api available for faster writing to the marklogic database ?

Lets say, If we could not write JavaRDD directly to marklogic then what is the currect approach to achieve this ?

Here is the code which i am using to write the JavaRDD data to marklogic database, let me know if it is wrong way to do that.

final DatabaseClient client = DatabaseClientFactory.newClient("localhost",8070, "MLTest");
    final XMLDocumentManager docMgr = client.newXMLDocumentManager();   
    rdd_records.foreachPartition(new VoidFunction<Iterator<Record>>() {
        public void call(Iterator<Record> partitionOfRecords) {
            while (partitionOfRecords.hasNext()) {
                Record record = partitionOfRecords.next();
                System.out.println("partitionOfRecords - "+record.toString());
                String docId = "/example/"+record.getID()+".xml";
                JAXBContext context = JAXBContext.newInstance(Record.class);
                JAXBHandle<Record> handle = new JAXBHandle<Record>(context);
                handle.set(record);
                docMgr.writeAs(docId, handle);
            }
      }
    });
    client.release();

I have used java client api to write the data, but i am getting below exception even though POJO class Record is implementing Serializable interface. Please let me know what could be the reason & how to solve that.

org.apache.spark.sparkexception task not Serializable .

3

There are 3 answers

5
Tushar Adeshara On

Modified example from spark streaming guide, Here you will have to implement connection and writing logic specific to database.

public void send(JavaRDD<String> rdd) {
    rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
      @Override
      public void call(Iterator<String> partitionOfRecords) {
        // ConnectionPool is a static, lazily initialized pool of
        Connection connection = ConnectionPool.getConnection();
        while (partitionOfRecords.hasNext()) {
          connection.send(partitionOfRecords.next());
        }
        ConnectionPool.returnConnection(connection); // return to the pool
        // for future reuse
      }
    });
  }
3
rjrudin On

The easiest way to get data into MarkLogic is via HTTP and the client REST API - specifically the /v1/documents endpoints - http://docs.marklogic.com/REST/client/management .

There are a variety of ways to optimize this, such as via a write set, but based on your question, I think the first thing to decide is - what kind of document do you want to write for each Record? Your example shows 5 columns in the CSV - typically, you'll write either a JSON or XML document with 5 fields/elements, each named based on the column index. So you'd need to write a little code to generate that JSON/XML, and then use whatever HTTP client you prefer (and one option is the MarkLogic Java Client API) to write that document to MarkLogic.

That addresses your question of how to write a JavaRDD to MarkLogic - but if your goal is to get data from a CSV into MarkLogic as fast as possible, then skip Spark and use mlcp - https://docs.marklogic.com/guide/mlcp/import#id_70366 - which involves zero coding.

1
Sam Mefford On

I'm wondering if you just need to make sure everything you access inside your VoidFunction that was instantiated outside it is serializable (see this page). DatabaseClient and XMLDocumentManager are of course not serializable, as they're connected resources. You're right, however, to not instantiate DatabaseClient inside your VoidFunction as that would be less efficient (though it would work). I don't know if the following idea would work with spark. But I'm guessing you could create a class that keeps hold of a singleton DatabaseClient instance:

public static class MLClient {
  private static DatabaseClient singleton;
  private MLClient() {}

  public static DatabaseClient get(DatabaseClientFactory.Bean connectionInfo) {
    if ( connectionInfo == null ) {
      throw new IllegalArgumentException("connectionInfo cannot be null");
    }
    if ( singleton == null ) {
      singleton = connectionInfo.newClient();
    }
    return singleton;
  }
}

then you just create a serializable DatabaseClientFactory.Bean outside your VoidFunction so your auth info is still centralized

DatabaseClientFactory.Bean connectionInfo = 
  new DatabaseClientFactory.Bean();
connectionInfo.setHost("localhost");
connectionInfo.setPort(8000);
connectionInfo.setUser("admin");
connectionInfo.setPassword("admin");
connectionInfo.setAuthenticationValue("digest");

Then inside your VoidFunction you could get that singleton DatabaseClient and new XMLDocumentManager like so:

DatabaseClient client = MLClient.get(connectionInfo);
XMLDocumentManager docMgr = client.newXMLDocumentManager();