Derived AccumulatorParam in py-spark returns EOFException on addInPace function

165 views Asked by At

I have a set of json objects of which one key is a nested object of list of keys, which are not common across all the json object. Example object is:

{
     id: 'AJDFAKDF',
     class: 'class1',
     nested_object: {
         'key1': 'value1',
         'key2': 'value2',
         'key3': 'value3'
     }
 }

The id is unique identifier of the object and key1, key2... will not be same across all objects i.e. 1 object may have key1 in nested_object and the other may not. Also, class will vary for each record.

I have 100s of millions of such records and I am interested in finding out the distribution of records by the keys of nested_object per class. Sample output would be.

{
    class1: {
        key1: 32,
        key45: 45,
    },
    class2: {
        key2: 39,
        key45: 100
    }
}

I am using pyspark accumulators to generate this distribution. I have derived from the dict class to create a CounterOfCounters class. This is on lines of the Counter class in collections library of python. Much code is copied from the the source code.

class CounterOfCounters(dict):
  def __init__(*args, **kwds):
    '''Create a new, empty CounterOfCounters object. Or, initialize 
    the count from another mapping of elements to their Counter.

    >>> c = CounterOfCounters()                 # a new, empty CoC
    >>> c = CounterOfCounters({'a': 4, 'b': 2}) # a new counter from a mapping
    '''
    if not args:
        raise TypeError("descriptor '__init__' of 'Counter' object "
                        "needs an argument")
    self, *args = args
    if len(args) > 1:
        raise TypeError('expected at most 1 arguments, got %d' % len(args))
    super(CounterOfCounters, self).__init__()
    self.update(*args, **kwds)

  def __missing__(self, key):
    # Needed so that self[missing_item] does not raise KeyError
    return Counter()

  def update(*args, **kwds):
    '''Like dict.update() but add counts instead of replacing them.

    Source can be a dictionary
    '''
    # The regular dict.update() operation makes no sense here because the
    # replace behavior results in the some of original untouched counts
    # being mixed-in with all of the other counts for a mismash that
    # doesn't have a straight-forward interpretation in most counting
    # contexts.  Instead, we implement straight-addition.  Both the inputs
    # and outputs are allowed to contain zero and negative counts.

    # print("In update. [0] ", args)
    if not args:
        raise TypeError("descriptor 'update' of 'CounterOfCounters' object "
                        "needs an argument")
    self, *args = args
    # print("In update. [1] ", self, args)
    if len(args) > 1:
        raise TypeError('expected at most 1 arguments, got %d' % len(args))

    iterable = args[0] if args else None
    if iterable is not None:
        if isinstance(iterable, dict):
            if self:
                self_get = self.get
                for elem, count in iterable.items():
                    if isinstance(count, Counter):
                        self[elem] = count + self_get(elem, Counter())
                    else:
                        raise TypeError("values can only be of type Counter")
            else:
                for elem, count in iterable.items():
                    if isinstance(count, Counter):
                        self[elem] = count
                    else:
                        raise TypeError("values can only be of type Counter")
        else:
            raise TypeError("values can only be of type Counter")
    if kwds:
        raise TypeError("Can not process **kwds as of now")

I then derive from AccumulatorParam my custom AccumulatorParam class.

class KeyAccumulatorParam(AccumulatorParam):
    def zero(self, value):
        c = CounterOfCounters(
            dict.fromkeys(
                value.keys(), 
                Counter()
            )
        )
        return c

    def addInPlace(self, ac1, ac2):
        ac1.update(ac2)
        return ac1

Then I define the accumulator variable and the function to add it.

keyAccum = sc.accumulator(
    CounterOfCounters(
        dict.fromkeys(
            ['class1', 'class2', 'class3', 'class4'],
            Counter()
        )
    ), 
    KeyAccumulatorParam()
)

def accumulate_keycounts(rec):
    global keyAccum
    c = Counter(list(rec['nested_object'].keys()))
    keyAccum.add(CounterOfCounters({
        rec['class']: c
    }))

After which I call accumulate_keycounts foreach record in rdd.

test_rdd.foreach(accumulate_keycounts)

On doing this, I get the end of file exception:

Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.api.python.PythonRunner$$anon$1$$anonfun$read$1.apply$mcVI$sp(PythonRDD.scala:200)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:199)
    ... 25 more

Some more additional info. I am using spark 2.2.1 on my MacBook Pro Retina (2013 8GB) for testing my function. The test_rdd has only ~20 records. It is a sample from larger rdd of ~200000 records.

I am not sure why this error is happening. I have enough memory to finish the task. The accumulator is commutative and associative as per the requirements of accumulator.

What I have debugged till now and figured out is that the error happens after first record is processed i.e. first time addInPlace is called. The CounterOfCounters.update function returns normally. But after that the EOFException occurs.

Any tips to debug or pointers to the problem will be much appreciated.

0

There are 0 answers