Create an elastic index by passing a list of custom objects

203 views Asked by At

I am facing an issue while creating an elastic search index by passing a list of custom object. Following is the bean which I populate based on certain conditions -

public class DataBean {
private String request_id;
private String clicks;
private String impressions;
private String conversion;

public String getRequest_id() {
    return request_id;
}

public void setRequest_id(String request_id) {
    this.request_id = request_id;
}

public String getClicks() {
    return clicks;
}

public void setClicks(String clicks) {
    this.clicks = clicks;
}

public String getImpressions() {
    return impressions;
}

public void setImpressions(String impressions) {
    this.impressions = impressions;
}

public String getConversion() {
    return conversion;
}

public void setConversion(String conversion) {
    this.conversion = conversion;
}

@Override
    public String toString() {
        return "DataBean [ clicks=" + clicks + ", impressions=" + impressions + ", conversion=" + conversion + ", request_id=" + request_id + "]";
    }
}

and following is the gist of code which loops over the records and populates the above bean and stores it in a list which is used to create an index -

....
if (result.isSucceeded()) {
final List<DataBean> dataBeanList = new ArrayList<>();
final List<SearchResult.Hit<DataBean, Object>> hits = result.getHits(DataBean.class, Object.class);

for (SearchResult.Hit<DataBean, Object> hit : hits) {
    if (!dataBeanList.stream().anyMatch(
            dataBean -> Objects.equals(dataBean.getRequest_id(), hit.source.getRequest_id()))) {
        DataBean source = hit.source;
        dataBeanList.add(source);
    }
}

MetricAggregation aggregations = result.getAggregations();
TermsAggregation termsAggregation = aggregations.getTermsAggregation("request_id");
for (TermsAggregation.Entry entry : termsAggregation.getBuckets()) {
    for (DataBean dataBean : dataBeanList) {
        if (entry.getKey().equalsIgnoreCase(dataBean.getRequest_id())) {

            Double impression = entry.getSumAggregation("imp").getSum();
            Double click = entry.getSumAggregation("click").getSum();
            Double action = entry.getSumAggregation("action").getSum();
            dataBean.setImpressions(impression.toString());
            dataBean.setClicks(click.toString());
            dataBean.setConversion(action.toString());
            break;
        }
    }
}

Index index = new Index.Builder(new Gson().toJson(dataBeanList)).index(destinationIndex).type("hour")
        .build();

try {
    client.execute(index);
} catch (IOException ioException) {
    LOGGER.debug("Failed to execute the search request on indices");
    throw new JestClientExecutionException("Failed to execute the search request on indices - ",
            ioException);
}
}

And following is the index template file -

{
  "order": 0,
  "template": "hour_*",
  "settings": {
  },
  "mappings": {
    "hour": {
      "properties": {
        "conversion": {
          "index": "not_analyzed",
          "store": true,
          "type": "integer"
        },
        "impressions": {
          "index": "not_analyzed",
          "store": true,
          "type": "integer"
        },
        "clicks": {
          "index": "not_analyzed",
          "store": true,
          "type": "integer"
        },
        "request_id": {
          "analyzer": "keyword",
          "index": "not_analyzed",
          "store": true,
          "type": "text"
        }
      }
    }
  },
  "aliases": {}
}

Now when I try to execute this code, it gives we following error -

   [2017-09-04T06:10:29,365][INFO ][o.e.c.m.MetaDataCreateIndexService] [node-1] [hour_11_2017-09-04] creating index, cause [auto(bulk api)], templates [hour], shards [5]/[1], mappings [hour]
[2017-09-04T06:10:30,622][DEBUG][o.e.a.b.TransportShardBulkAction] [node-1] [hour_11_2017-09-04][1] failed to execute bulk item (index) BulkShardRequest [[hour_11_2017-09-04][1]] containing [index {[hour_11_2017-09-04][hour][AV5LgwJct35uj5JLu_X5], source[_na_]}]
org.elasticsearch.index.mapper.MapperParsingException: failed to parse
        at org.elasticsearch.index.mapper.DocumentParser.wrapInMapperParsingException(DocumentParser.java:176) ~[elasticsearch-5.5.1.jar:5.5.1]
        at org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:69) ~[elasticsearch-5.5.1.jar:5.5.1]
        at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:277) ~[elasticsearch-5.5.1.jar:5.5.1]
        at org.elasticsearch.index.shard.IndexShard.prepareIndex(IndexShard.java:529) ~[elasticsearch-5.5.1.jar:5.5.1]
        at org.elasticsearch.index.shard.IndexShard.prepareIndexOnPrimary(IndexShard.java:506) ~[elasticsearch-5.5.1.jar:5.5.1]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.prepareIndexOperationOnPrimary(TransportShardBulkAction.java:450) ~[elasticsearch-5.5.1.jar:5.5.1]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.executeIndexRequestOnPrimary(TransportShardBulkAction.java:458) ~[elasticsearch-5.5.1.jar:5.5.1]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.executeBulkItemRequest(TransportShardBulkAction.java:143) [elasticsearch-5.5.1.jar:5.5.1]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:113) [elasticsearch-5.5.1.jar:5.5.1]
        at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:69) [elasticsearch-5.5.1.jar:5.5.1]

I tried replacing list of objects with a map to figure out whether list is a problem. I observed that map works but list doesn't. So the problem is with the list of objects but I am unable to figure out how to solve this. If anyone has any insights please let me know.

0

There are 0 answers