I am using pyspark with two accumulators:
total = spark.sparkContext.accumulator(0)
errors = spark.sparkContext.accumulator(0)
- one accumulator
totalstores the total number of records processed up until now - Another accumulator
errorsstores the total number of records that failed to be processed.
These accumulators get updated on the executors (in tasks) each time a record is processed. total.add(1) or errors.add(1)
I need to monitor (errors/total)*100. If it exceeds some threshold after, say 1m records have been processed, then I'd stop the job.
So my question is, how can I monitor the values of these accumulators periodically in pyspark? (or in spark in general).
The trouble is, they can be updated in the executors, but can only be read/accessed on the driver. I tried reading them (total.value) when they get updated each time in the executors, but got the error "Accumulator.value cannot be accessed inside tasks".
I was thinking of creating a thread with an infinite loop on the driver to sleep and check their values periodically, but encountered an prob of when to exit this loop when the processing of records has finished.