HTTP POST elastic search event stream bulk

1.9k views Asked by At

I have a node.js program that is using streams to read a file (nodejs event stream setting a variable per stream )

I would like to use the same program to write this data into elastic search . I wrote up a small write function

var writeFunction = function(data) {
    //console.log(data);
    var client = request.newClient("http://localhost:9200");
    client.post('/newtest3/1',data,function(err,res,body) {
        return console.log(res.statusCode);
    });
};

and hooked this up with the streaming

var processMyFile = function(file) {
    var stream = getStream(file);
    var nodeName = stream.nodeName;
    stream
        .pipe(es.split())
        .on('end',endFunction)
        .pipe(es.map(function(data,cb) {
            processFunction(nodeName,data,cb);
        }))
        .pipe(es.map(function(data,cb) {
            writeFunction(data);
        }));

}

The above works as expected asynchronously and writes the data except that it takes a long time .It also seems to work as a buffer since the write takes a much longer time than the read.( advantage of using the pipe ) I know there is a bulk interface in elastic search and I can import using that . The shakesphere.json example in the Kibana getting started guide (http://www.elasticsearch.org/guide/en/kibana/current/using-kibana-for-the-first-time.html)

This means I would need to create a file in the format needed by the bulk import and then run a curl program etc. I would like to avoid creating a temporary file .

Is there an easier way to import data into elasticsearch faster as part of the streaming process

1

There are 1 answers

0
Eric Saboia On

elasticsearch-streams Will help you to use the bulk interface with streaming, without the need of write a json file first.

I believe that your code would be more or less like this:

var TransformToBulk = require('elasticsearch-streams').TransformToBulk
var WritableBulk = require('elasticsearch-streams').WritableBulk;
var client = new require('elasticsearch').Client();

var bulkExec = function(bulkCmds, callback) {
  client.bulk({
    index : 'newtest3',
    type  : '1',
    body  : bulkCmds
  }, callback);
};

var ws = new WritableBulk(bulkExec);
var toBulk = new TransformToBulk(function getIndexTypeId(doc) { return { _id: doc.id }; });

var processMyFile = function(file) {
  var stream = getStream(file);

  stream
    .pipe(toBulk)
    .pipe(ws)
    .on('close', endFunction)
    .on('err', endFunction);
}