I have a node.js program that is using streams to read a file (nodejs event stream setting a variable per stream )
I would like to use the same program to write this data into elastic search . I wrote up a small write function
var writeFunction = function(data) {
//console.log(data);
var client = request.newClient("http://localhost:9200");
client.post('/newtest3/1',data,function(err,res,body) {
return console.log(res.statusCode);
});
};
and hooked this up with the streaming
var processMyFile = function(file) {
var stream = getStream(file);
var nodeName = stream.nodeName;
stream
.pipe(es.split())
.on('end',endFunction)
.pipe(es.map(function(data,cb) {
processFunction(nodeName,data,cb);
}))
.pipe(es.map(function(data,cb) {
writeFunction(data);
}));
}
The above works as expected asynchronously and writes the data except that it takes a long time .It also seems to work as a buffer since the write takes a much longer time than the read.( advantage of using the pipe ) I know there is a bulk interface in elastic search and I can import using that . The shakesphere.json example in the Kibana getting started guide (http://www.elasticsearch.org/guide/en/kibana/current/using-kibana-for-the-first-time.html)
This means I would need to create a file in the format needed by the bulk import and then run a curl program etc. I would like to avoid creating a temporary file .
Is there an easier way to import data into elasticsearch faster as part of the streaming process
elasticsearch-streams Will help you to use the bulk interface with streaming, without the need of write a json file first.
I believe that your code would be more or less like this: