I have a set of json objects of which one key is a nested object of list of keys, which are not common across all the json object. Example object is:
{
id: 'AJDFAKDF',
class: 'class1',
nested_object: {
'key1': 'value1',
'key2': 'value2',
'key3': 'value3'
}
}
The id
is unique identifier of the object and key1
, key2
... will not be same across all objects i.e. 1 object may have key1
in nested_object
and the other may not. Also, class
will vary for each record.
I have 100s of millions of such records and I am interested in finding out the distribution of records by the keys of nested_object
per class
. Sample output would be.
{
class1: {
key1: 32,
key45: 45,
},
class2: {
key2: 39,
key45: 100
}
}
I am using pyspark accumulators to generate this distribution. I have derived from the dict class to create a CounterOfCounters
class. This is on lines of the Counter
class in collections
library of python. Much code is copied from the the source code.
class CounterOfCounters(dict):
def __init__(*args, **kwds):
'''Create a new, empty CounterOfCounters object. Or, initialize
the count from another mapping of elements to their Counter.
>>> c = CounterOfCounters() # a new, empty CoC
>>> c = CounterOfCounters({'a': 4, 'b': 2}) # a new counter from a mapping
'''
if not args:
raise TypeError("descriptor '__init__' of 'Counter' object "
"needs an argument")
self, *args = args
if len(args) > 1:
raise TypeError('expected at most 1 arguments, got %d' % len(args))
super(CounterOfCounters, self).__init__()
self.update(*args, **kwds)
def __missing__(self, key):
# Needed so that self[missing_item] does not raise KeyError
return Counter()
def update(*args, **kwds):
'''Like dict.update() but add counts instead of replacing them.
Source can be a dictionary
'''
# The regular dict.update() operation makes no sense here because the
# replace behavior results in the some of original untouched counts
# being mixed-in with all of the other counts for a mismash that
# doesn't have a straight-forward interpretation in most counting
# contexts. Instead, we implement straight-addition. Both the inputs
# and outputs are allowed to contain zero and negative counts.
# print("In update. [0] ", args)
if not args:
raise TypeError("descriptor 'update' of 'CounterOfCounters' object "
"needs an argument")
self, *args = args
# print("In update. [1] ", self, args)
if len(args) > 1:
raise TypeError('expected at most 1 arguments, got %d' % len(args))
iterable = args[0] if args else None
if iterable is not None:
if isinstance(iterable, dict):
if self:
self_get = self.get
for elem, count in iterable.items():
if isinstance(count, Counter):
self[elem] = count + self_get(elem, Counter())
else:
raise TypeError("values can only be of type Counter")
else:
for elem, count in iterable.items():
if isinstance(count, Counter):
self[elem] = count
else:
raise TypeError("values can only be of type Counter")
else:
raise TypeError("values can only be of type Counter")
if kwds:
raise TypeError("Can not process **kwds as of now")
I then derive from AccumulatorParam
my custom AccumulatorParam class.
class KeyAccumulatorParam(AccumulatorParam):
def zero(self, value):
c = CounterOfCounters(
dict.fromkeys(
value.keys(),
Counter()
)
)
return c
def addInPlace(self, ac1, ac2):
ac1.update(ac2)
return ac1
Then I define the accumulator variable and the function to add it.
keyAccum = sc.accumulator(
CounterOfCounters(
dict.fromkeys(
['class1', 'class2', 'class3', 'class4'],
Counter()
)
),
KeyAccumulatorParam()
)
def accumulate_keycounts(rec):
global keyAccum
c = Counter(list(rec['nested_object'].keys()))
keyAccum.add(CounterOfCounters({
rec['class']: c
}))
After which I call accumulate_keycounts
foreach record in rdd.
test_rdd.foreach(accumulate_keycounts)
On doing this, I get the end of file exception:
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.api.python.PythonRunner$$anon$1$$anonfun$read$1.apply$mcVI$sp(PythonRDD.scala:200)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:199)
... 25 more
Some more additional info. I am using spark 2.2.1 on my MacBook Pro Retina (2013 8GB) for testing my function. The test_rdd
has only ~20 records. It is a sample from larger rdd of ~200000 records.
I am not sure why this error is happening. I have enough memory to finish the task. The accumulator is commutative and associative as per the requirements of accumulator.
What I have debugged till now and figured out is that the error happens after first record is processed i.e. first time addInPlace
is called. The CounterOfCounters.update
function returns normally. But after that the EOFException
occurs.
Any tips to debug or pointers to the problem will be much appreciated.