I am trying to run k-means clustering on a large dataset using spark . I get the following error after k-means converges. Following are the logs:
15/06/17 14:47:44 INFO KMeans: Run 0 finished in 10 iterations
15/06/17 14:47:44 INFO KMeans: Iterations took 35.037 seconds.
15/06/17 14:47:44 INFO KMeans: KMeans converged in 10 iterations.
15/06/17 14:47:44 INFO KMeans: The cost for the best run is 2.3225877802439418E7.
15/06/17 14:47:44 INFO MapPartitionsRDD: Removing RDD 3 from persistence list
15/06/17 14:47:44 INFO BlockManager: Removing RDD 3
15/06/17 14:47:44 INFO MapPartitionsRDD: Removing RDD 2 from persistence list
15/06/17 14:47:44 INFO BlockManager: Removing RDD 2
>****The stack trace is given below****
> Exception happened during processing of request from ('127.0.0.1',
> 46181) Traceback (most recent call last): File
> "/usr/lib64/python2.6/SocketServer.py", line 283, in
> _handle_request_noblock
> self.process_request(request, client_address) File "/usr/lib64/python2.6/SocketServer.py", line 309, in process_request
> self.finish_request(request, client_address) File "/usr/lib64/python2.6/SocketServer.py", line 322, in finish_request
> self.RequestHandlerClass(request, client_address, self) File "/usr/lib64/python2.6/SocketServer.py", line 617, in __init__
> self.handle() File "/root/spark/python/pyspark/accumulators.py", line 235, in handle
> num_updates = read_int(self.rfile) File "/root/spark/python/pyspark/serializers.py", line 544, in read_int
> raise EOFError EOFError
TypeError Traceback (most recent call last)
<ipython-input-18-c23362d77571> in <module>()
----> 1 model = KMeans.train(data, 600, initializationMode="k-means||")
/root/spark/python/pyspark/mllib/clustering.pyc in train(cls, rdd, k, maxIterations, runs, initializationMode, seed, initializationSteps, epsilon)
135 model = callMLlibFunc("trainKMeansModel", rdd.map(_convert_to_vector), k, maxIterations,
136 runs, initializationMode, seed, initializationSteps, epsilon)
--> 137 centers = callJavaFunc(rdd.context, model.clusterCenters)
138 return KMeansModel([c.toArray() for c in centers])
139
/root/spark/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func, *args)
119 """ Call Java Function """
120 args = [_py2java(sc, a) for a in args]
--> 121 return _java2py(sc, func(*args))
122
123
/root/spark/python/pyspark/mllib/common.pyc in _java2py(sc, r, encoding)
107 elif isinstance(r, (JavaArray, JavaList)):
108 try:
--> 109 r = sc._jvm.SerDe.dumps(r)
110 except Py4JJavaError:
111 pass # not pickable
/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
312 return
313 else:
--> 314 return OUTPUT_CONVERTER[type](answer[2:], gateway_client)
315
316
/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in <lambda>(value, y)
141 DECIMAL_TYPE: (lambda value, y: Decimal(value)),
142 INTEGER_TYPE: (lambda value, y: int(value)),
--> 143 BYTES_TYPE: (lambda value, y: decode_bytearray(value)),
144 DOUBLE_TYPE: (lambda value, y: float(value)),
145 STRING_TYPE: (lambda value, y: unescape_new_line(value)),
/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in decode_bytearray(encoded)
211 def decode_bytearray(encoded):
212 new_bytes = strtobyte(encoded)
--> 213 return bytearray2([bytetoint(b) for b in standard_b64decode(new_bytes)])
214
215
/usr/lib64/python2.6/base64.pyc in standard_b64decode(s)
91 characters present in the string.
92 """
---> 93 return b64decode(s)
94
95 def urlsafe_b64encode(s):
/usr/lib64/python2.6/base64.pyc in b64decode(s, altchars)
74 except binascii.Error, msg:
75 # Transform this exception for consistency
---> 76 raise TypeError(msg)
77
78
TypeError: Incorrect padding
I am using spark on amazon ec-2 with 15 slave nodes. Each node has 30 GB of RAM and has 4 cores. I am running the code using Ipython notebook. Any help would be greatly appreciated.
Also , I am using the following command to start python notebook with spark:
*
> SPARK_WORKER_INSTANCES=30 SPARK_WORKER_CORES=4 SPARK_WORKER_MEMORY=30g
> SPARK_MEM=30g OUR_JAVA_MEM="30g"
> SPARK_DAEMON_JAVA_OPTS="-XX:MaxPermSize=30g -Xms30g -Xmx30g" IPYTHON=1
> PYSPARK_PYTHON=/usr/bin/python SPARK_PRINT_LAUNCH_COMMAND=1
> ./spark/bin/pyspark --conf "
> spark.driver.extraJavaOptions=-XX:+UseCompressedOops
> -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts -XX:FreqInlineSize=300 -XX:MaxInlineSize=300 -Xmx15g spark.executor.extraJavaOptions=-XX:+UseCompressedOops
> -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts -XX:FreqInlineSize=300 -XX:MaxInlineSize=300 -Xmx 15g" --driver-java-options " -Xmx15g -Xms15g -Xmx15g"
-
*