TypeError: Incorrect padding while running Kmeans on Spark Mllib (spark 1.4.0)

352 views Asked by At

I am trying to run k-means clustering on a large dataset using spark . I get the following error after k-means converges. Following are the logs:

15/06/17 14:47:44 INFO KMeans: Run 0 finished in 10 iterations
15/06/17 14:47:44 INFO KMeans: Iterations took 35.037 seconds.
15/06/17 14:47:44 INFO KMeans: KMeans converged in 10 iterations.
15/06/17 14:47:44 INFO KMeans: The cost for the best run is 2.3225877802439418E7.
15/06/17 14:47:44 INFO MapPartitionsRDD: Removing RDD 3 from persistence list
15/06/17 14:47:44 INFO BlockManager: Removing RDD 3
15/06/17 14:47:44 INFO MapPartitionsRDD: Removing RDD 2 from persistence list
15/06/17 14:47:44 INFO BlockManager: Removing RDD 2

>****The stack trace is given below****

> Exception happened during processing of request from ('127.0.0.1',
> 46181) Traceback (most recent call last):   File
> "/usr/lib64/python2.6/SocketServer.py", line 283, in
> _handle_request_noblock
>     self.process_request(request, client_address)   File "/usr/lib64/python2.6/SocketServer.py", line 309, in process_request
>     self.finish_request(request, client_address)   File "/usr/lib64/python2.6/SocketServer.py", line 322, in finish_request
>     self.RequestHandlerClass(request, client_address, self)   File "/usr/lib64/python2.6/SocketServer.py", line 617, in __init__
>     self.handle()   File "/root/spark/python/pyspark/accumulators.py", line 235, in handle
>     num_updates = read_int(self.rfile)   File "/root/spark/python/pyspark/serializers.py", line 544, in read_int
>     raise EOFError EOFError

TypeError                                 Traceback (most recent call last)
<ipython-input-18-c23362d77571> in <module>()
----> 1 model = KMeans.train(data, 600, initializationMode="k-means||")

/root/spark/python/pyspark/mllib/clustering.pyc in train(cls, rdd, k, maxIterations, runs, initializationMode, seed, initializationSteps, epsilon)
    135         model = callMLlibFunc("trainKMeansModel", rdd.map(_convert_to_vector), k, maxIterations,
    136                               runs, initializationMode, seed, initializationSteps, epsilon)
--> 137         centers = callJavaFunc(rdd.context, model.clusterCenters)
    138         return KMeansModel([c.toArray() for c in centers])
    139 

/root/spark/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func, *args)
    119     """ Call Java Function """
    120     args = [_py2java(sc, a) for a in args]
--> 121     return _java2py(sc, func(*args))
    122 
    123 

/root/spark/python/pyspark/mllib/common.pyc in _java2py(sc, r, encoding)
    107         elif isinstance(r, (JavaArray, JavaList)):
    108             try:
--> 109                 r = sc._jvm.SerDe.dumps(r)
    110             except Py4JJavaError:
    111                 pass  # not pickable

/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    312             return
    313         else:
--> 314             return OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    315 
    316 

/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in <lambda>(value, y)
    141               DECIMAL_TYPE: (lambda value, y: Decimal(value)),
    142               INTEGER_TYPE: (lambda value, y: int(value)),
--> 143               BYTES_TYPE: (lambda value, y: decode_bytearray(value)),
    144               DOUBLE_TYPE: (lambda value, y: float(value)),
    145               STRING_TYPE: (lambda value, y: unescape_new_line(value)),

/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in decode_bytearray(encoded)
    211 def decode_bytearray(encoded):
    212     new_bytes = strtobyte(encoded)
--> 213     return bytearray2([bytetoint(b) for b in standard_b64decode(new_bytes)])
    214 
    215 

/usr/lib64/python2.6/base64.pyc in standard_b64decode(s)
     91     characters present in the string.
     92     """
---> 93     return b64decode(s)
     94 
     95 def urlsafe_b64encode(s):

/usr/lib64/python2.6/base64.pyc in b64decode(s, altchars)
     74     except binascii.Error, msg:
     75         # Transform this exception for consistency
---> 76         raise TypeError(msg)
     77 
     78 

TypeError: Incorrect padding

I am using spark on amazon ec-2 with 15 slave nodes. Each node has 30 GB of RAM and has 4 cores. I am running the code using Ipython notebook. Any help would be greatly appreciated.

Also , I am using the following command to start python notebook with spark:

*

> SPARK_WORKER_INSTANCES=30 SPARK_WORKER_CORES=4 SPARK_WORKER_MEMORY=30g
> SPARK_MEM=30g OUR_JAVA_MEM="30g" 
> SPARK_DAEMON_JAVA_OPTS="-XX:MaxPermSize=30g -Xms30g -Xmx30g" IPYTHON=1
> PYSPARK_PYTHON=/usr/bin/python SPARK_PRINT_LAUNCH_COMMAND=1 
> ./spark/bin/pyspark --conf "
> spark.driver.extraJavaOptions=-XX:+UseCompressedOops
> -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts -XX:FreqInlineSize=300 -XX:MaxInlineSize=300 -Xmx15g  spark.executor.extraJavaOptions=-XX:+UseCompressedOops
> -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts -XX:FreqInlineSize=300 -XX:MaxInlineSize=300 -Xmx 15g" --driver-java-options " -Xmx15g -Xms15g -Xmx15g"
    *
0

There are 0 answers