Transformed DStream in pyspark gives error when pprint called on it

4.3k views Asked by At

I'm exploring Spark Streaming through PySpark, and hitting an error when I try to use the transform function with take.

I can successfully use sortBy against the DStream via transform and pprint the result.

author_counts_sorted_dstream = author_counts_dstream.transform\
  (lambda foo:foo\
   .sortBy(lambda x:x[0].lower())\
   .sortBy(lambda x:x[1],ascending=False))
author_counts_sorted_dstream.pprint()

But if I use take following the same pattern and try to pprint it:

top_five = author_counts_sorted_dstream.transform\
  (lambda rdd:rdd.take(5))
top_five.pprint()

the job fails with

Py4JJavaError: An error occurred while calling o25.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/streaming/util.py", line 67, in call
    return r._jrdd
AttributeError: 'list' object has no attribute '_jrdd'

You can see the full code and output in the notebook here.

What am I doing wrong?

1

There are 1 answers

0
zero323 On BEST ANSWER

Function you pass to transform should transform from RDD to RDD. If you use an action, like take, you have to convert the result back to RDD:

sc: SparkContext = ...

author_counts_sorted_dstream.transform(
  lambda rdd: sc.parallelize(rdd.take(5))
)

In contrast RDD.sortBy used is a transformation (returns an RDD) so there is no need for further parallelization.

On a side note following function:

lambda foo: foo \
    .sortBy(lambda x:x[0].lower()) \
    .sortBy(lambda x:x[1], ascending=False)

doesn't make much sense. Remember that Spark sort by shuffle therefore it is not stable. If you want to sort by multiple fields you should use a composite key like:

lambda x: (x[0].lower(), -x[1])