How to calculate the time to fetch records from Kafka?

779 views Asked by At

I have a simple job with trigger=15 seconds, Source=Kafka and Sink=S3. Is it possible to find how much time did it take to download messages from Kafka? Or say if I had Sink=Console, it bring back data on the driver, is it possible to find how much time to download data from Kafka and how much time to bring it back to driver?

From driver I get these for query while writing to S3. Is it possible to understand how much time did it spend in downloading 99998 rows from Kafka out of triggerExecution = 44 seconds?

Streaming query made progress: {
  id : 1383g52b-8de4-4e95-a3s9-aea73qe3ea56,
  runId : 1206f5tc-t503-44r0-bc0c-26ce404w6724,
  name : null,
  timestamp : 2017-08-25T01:42:10.000Z,
  numInputRows : 99998,
  inputRowsPerSecond : 1666.6333333333334,
  processedRowsPerSecond : 2263.9860535669814,
  durationMs : {
    addBatch : 42845,
    getBatch : 3,
    getOffset : 68,
    queryPlanning : 6,
    triggerExecution : 44169,
    walCommit : 1245
  },
  stateOperators : [ ],
  sources : [ {
    description : KafkaSource[Subscribe[kafka_topic]],
    startOffset : {
      kafka_topic : {
        2 : 20119244,
        4 : 20123550,
        1 : 20124601,
        3 : 20113622,
        0 : 20114208
      }
    },
    endOffset : {
      kafka_topic : {
        2 : 20139245,
        4 : 20143531,
        1 : 20144592,
        3 : 20133663,
        0 : 20134192
      }
    },
    numInputRows : 99998,
    inputRowsPerSecond : 1666.6333333333334,
    processedRowsPerSecond : 2263.9860535669814
  } ],
  sink : {
    description : FileSink[s3://s3bucket]
  }
}

Thanks!

2

There are 2 answers

5
Jacek Laskowski On

You should find the answers to your questions by reviewing StreamingQuery.lastProgress.durationMs.

In the order of their calculation the following durations tell you:

  • getOffset is the time to get the offsets from all the sources

  • getBatch is the time to get the streaming Datasets (aka batches) from all the sources (one by one, sequentially).

  • addBatch is the time to write the streaming Dataset to a sink

With that said...

Is it possible to find how much time did it take to download messages from Kafka?

That's addBatch duration (since that's when the Dataset gets executed as an RDD on executors)

Is it possible to understand how much time did it spend in downloading 99998 rows from Kafka out of triggerExecution = 44 seconds?

You'd have to sum addBatch durations from StreamingQuery.recentProgress array.

1
Tathagata Das On

Since the reading from Kafka and the processing of the read records are pipelined, it is pretty hard to find the exact time taken to read.

And many times this is not important because processing is the bottleneck rather than reading from Kafka. So the real question is, why do you care about the exact Kafka read time?