Alternative solution to Cumulative Cardinality Aggregation in Elasticsearch

1.2k views Asked by At

I'm running an Elasticsearch cluster that doesn't have access to x-packs on AWS, but I'd still like to do a cumulative cardinality aggregation to determine the daily counts of new users to my site.

Is there an alternate solution to this problem?

For example, how could I transform:

GET /user_hits/_search
{
  "size": 0,
  "aggs": {
    "users_per_day": {
      "date_histogram": {
        "field": "timestamp",
        "calendar_interval": "day"
      },
      "aggs": {
        "distinct_users": {
          "cardinality": {
            "field": "user_id"
          }
        },
        "total_new_users": {
          "cumulative_cardinality": {
            "buckets_path": "distinct_users" 
          }
        }
      }
    }
  }
}

To produce the same result without cumulative_cardinality?

1

There are 1 answers

1
Joe - Check out my books On BEST ANSWER

Cumulative cardinality was added precisely for that reason -- it wasn't easily calculable before...

As with almost anything in ElasticSearch, though, there's a script to get it done for ya. Here's my take on it.

  1. Set up an index
PUT user_hits
{
  "mappings": {
    "properties": {
      "timestamp": {
        "type": "date",
        "format": "yyyy-MM-dd"
      },
      "user_id": {
        "type": "keyword"
      }
    }
  }
}
  1. Add 1 new user in one day and 2 more the day after, one of which is not strictly 'new'.
POST user_hits/_doc
{"user_id":1,"timestamp":"2020-10-01"}

POST user_hits/_doc
{"user_id":1,"timestamp":"2020-10-02"}

POST user_hits/_doc
{"user_id":3,"timestamp":"2020-10-02"}
  1. Mock a date histogram using a parametrized start + number of day, group the users accordingly, and then compare the days' results vis-à-vis
GET /user_hits/_search
{
  "size": 0,
  "query": {
    "range": {
      "timestamp": {
        "gte": "2020-10-01"
      }
    }
  }, 
  "aggs": {
    "new_users_count_vs_prev_day": {
      "scripted_metric": {
        "init_script": """
          state.by_day_map = [:];
          state.start_millis = new SimpleDateFormat("yyyy-MM-dd").parse(params.start_date).getTime();
          state.day_millis = 24 * 60 * 60 * 1000;
          state.dt_formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC);
        """,
        "map_script": """
          for (def step = 1; step < params.num_of_days + 1; step++) {
            def timestamp = doc.timestamp.value.millis;
            def user_id = doc['user_id'].value;
            def anchor = state.start_millis + (step * state.day_millis);
            // add a `n__` prefix to more easily sort the resulting map later on
            def anchor_pretty = step + '__' + state.dt_formatter.format(Instant.ofEpochMilli(anchor));
            
            if (timestamp <= anchor) {
              if (state.by_day_map.containsKey(anchor_pretty)) {
                state.by_day_map[anchor_pretty].add(user_id);
              } else {
                state.by_day_map[anchor_pretty] = [user_id];
              }
            }
        }
        """,
        "combine_script": """
            List keys=new ArrayList(state.by_day_map.keySet());
            Collections.sort(keys);
          
            def unique_sorted_map = new TreeMap();
            def unique_from_prev_day = [];
            
            for (def key : keys) { 
              def unique_users_per_day = new HashSet(state.by_day_map.get(key));
              
              unique_users_per_day.removeIf(user -> unique_from_prev_day.contains(user));
              
               // remove the `n__` prefix
               unique_sorted_map.put(key.substring(3), unique_users_per_day.size());
               unique_from_prev_day.addAll(unique_users_per_day);
            }
            return unique_sorted_map
        """,
        "reduce_script": "return states",
        "params": {
          "start_date": "2020-10-01",
          "num_of_days": 5
        }
      }
    }
  }
}

yielding

"aggregations" : {
  "new_users_count_vs_prev_day" : {
    "value" : [
      {
        "2020-10-01" : 1,    <-- 1 new unique user            
        "2020-10-02" : 1,    <-- another new unique user
        "2020-10-03" : 0,
        "2020-10-04" : 0,
        "2020-10-05" : 0
      }
    ]
  }
}

The script is guaranteed to be slow but has one, potentially quite useful, advantage -- you can adjust it to return the full list of new user IDs, not just the count that you'd get from the cumulative cardinality which, according to its implementation's author, only works in a sequential, cumulative manner by design.