Druid, what the diff? Using Apache Druid to report top changes in a metric between two intervals

At our company we are using Druid to power our time-based (as well as our non time-based) analytics dashboard. We looked into several options such as a relational database (Postgres) , Clickhouse and Elasticsearch. Taking into account the pros and cons of each,  we finally decided for Druid, as it was the leader in most query benchmarks. In fact, these benchmarks are worth a future post of their own.

One of the features we needed to support was top absolute changes in a certain metric between two different time frames. For example, which country had the biggest change (meaning, biggest increase or drop) in traffic when comparing today to yesterday. Supposedly an elementary feature, however not supported by Druid natively despite of it being a time based datasource.

That is why I implemented such a query on top of Druid using Javascript aggregator. Keeping in mind that any Javascript aggregator could be implemented in Java.  Javascript could be an unfitting fit in two cases: Enabling Javascript support is not an option in your Druid cluster or when you are worried about performance hit of Javascript is too big (the code is compiled for every query).

In this post I would like to share the solution of how this was achieved. The idea is simple: Create two new synthetic metrics representing two condition sums. The first is the sum of the metric for for all rows falling within the first period and the second is the sum of the metric for all rows falling within second period. Afterwards add a post-aggregator, which creates a third synthetic metric: the absolute value of subtraction between the two new metrics. We can now use the topN query on a metric which is the difference between the metrics’ sum in two time intervals, as per the original requirement.

{
  "queryType": "topN",
  "dataSource": "sample_data",
  "dimension": "some_dim",
  "threshold": 5,
  "metric": "diff",
  "granularity": "all",
  "aggregations": [
    {
      "type": "javascript",
      "name": "revenue_current",
      "fieldNames": [
        "revenue",
        "__time"
      ],
      "fnAggregate": "function(current, revenue, __time) { 
                        return current + 
                          ((__time >= 1536192000000 && 
                            __time < 1536278400000) ? revenue : 0) ;", 
      "fnCombine": "function(a, b) { return a + b; }; }", 
      "fnReset": "function() { return 0; }" },
   { 
      "type": "javascript", 
      "name": "revenue_previous", 
      "fieldNames": [ "revenue", "__time" ], 
      "fnAggregate": "function(current, revenue, __time) { 
                        return current + 
                          ((__time >= 1536278400000 && 
                            __time < 1536364800000) ? revenue : 0) ;",
      "fnCombine": "function(a, b) { return a + b; }; }",
      "fnReset": "function() { return 0; }"
    }
  ],
  "postAggregations": [
    {
      "type": "javascript",
      "name": "diff",
      "fieldNames": [
        "revenue_current",
        "revenue_previous"
      ],
      "function": "function(a, b) { return Math.abs(a - b) ; }"
    }
  ],
  "intervals": [
    "2018-09-06T00:00:00.000/2018-09-07T00:00:00.000",
    "2018-09-07T00:00:00.000/2018-09-08T00:00:00.000"
  ]
}

Query Breakdown:

  • queryType: “topN” is the type of query that will breakdown our data by a single dimension and return the top results broken down by its values.
  • dimension: “some_dim” is the dimension by which we want to breakdown the result
  • metric: “diff” is indicates for Druid to perform the sorting of top results based on our the post-aggregator “diff” which will be created in the postAggregator object.
  • granularity: “all” is important, as we want to get the two intervals’ metrics in a single interval bucket spanning the whole time frame, as post-aggregators are only possible on metrics returned in the same interval bucket.
  • “aggregations”:
    • “type”: “javascript” which allows us to apply conditional logic not inherently supported by druid
    • “fieldNames”: dimensions/metrics from datasource which will be exposed to the function
      • “__time” is the dimension corresponding with the row’s time
    • “fnAggregate”: this is the meat of the query. It basically implements conditional logic for summing up the row’s revenue. ‘current’ is auto injected by Druid and is the total value summed until now. Now comes the conditional clause: If the row’s time falls within the milliseconds representations of the first interval, add to ‘current’ the metric’s value, otherwise add to ‘current’ zero. Notice we converted to the Unix millisecond representations of 2018-09-06/2018-09-07/2018-09-08
    • “fnCombine”: simple arithmetic addition
    • “funReset”: the initial value, in our case zero.
  • “postAggregations”:
    • “type”: “javascript” allows us to use the Math.abs function on the subtraction of the two previously created aggregators.
    • “fieldNames”: the names of the two previosly created aggregators.
    • “function”: this is the Javascript implementation
  • “intervals”: we need to pass the two intervals, which correspond to the intervals we want to see the changes for.

Notes:

  • I couldn’t find at the time a solution for “conditional metric aggregator” that sums a metric only if other dimensions/metrics meet a certain predicate and that is why I reverted to Javascript. In case such a feature is implemented, it could be used instead of the Javascript aggregator, which basically implements a similar logic.
  • If the absolute changes are not required, it is possible to use a simpler in-built arithmetic post aggregator instead of the Javascript one.
  • This solution works easy for any Sum Aggregator, but to allow this for HyperUnique will be more implementation work, as merging is not as simple as adding numbers.

 

Leave a Reply

Your email address will not be published. Required fields are marked *