I have written a spark program (Python 3.6 and Spark 2.3.2) for Collaborative Filtering Recommendation System that works on 2 cases:
- Case 1: Item-based CF recommendation system
- Case 2: User-based CF recommendation system with Min-Hash LSH
I have written train and predict programs that has these 2 cases. My code works for user based recommendation but when I try to train my model for Item-based CF, I get the following error:
2020-10-18 20:12:33 ERROR Executor:91 - Exception in task 0.0 in stage 23.0 (TID 196)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\spark\spark-2.3.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\worker.py", line 238, in main
File "C:\spark\spark-2.3.2-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\serializers.py", line 690, in read_int
length = stream.read(4)
File "C:\Users\17372\AppData\Local\Programs\Python\Python36\lib\socket.py", line 586, in readinto
return self._sock.recv_into(b)
socket.timeout: timed out
I tried solving this issue using solutions on this link: Pyspark socket timeout exception after application running for a while
It did not work.
I found a solution to add "--spark.worker.timeout=120" in execution as follows:
bin\spark-submit task3train.py train_review.json task3item.model item_based --spark.worker.timeout=120
I still see the same error. Tried Try Catch blocks as well, but I am not sure how to do it right.
What do I do?
My code for Item-based CF:
if model_type == ITEM_BASED_MODEL:
# group original data by bidx, and remove those unpopular business (rated time < 3)
# tuple(bidx, (uidx, score))
# [(5306, [(3662, 5.0), (3218, 5.0), (300, 5.0),..]), ()
shrunk_bid_uids_rdd = input_lines \
.map(lambda kv: (bus_index_dict[kv[1]], (user_index_dict[kv[0]], kv[2]))) \
.groupByKey().mapValues(lambda uid_score: list(uid_score)) \
.filter(lambda bid_uid_score: len(bid_uid_score[1]) >= CO_RATED_THRESHOLD) \
.mapValues(lambda vals: [{uid_score[0]: uid_score[1]} for uid_score in vals]) \
.mapValues(lambda val: flatMixedList(val))
candidate_bids = shrunk_bid_uids_rdd.map(lambda bid_uids: bid_uids[0]).coalesce(2)
# convert shrunk_bid_uids_rdd into dict form
# dict(bidx: dict(uidx: score))
# => e.g. {5306: defaultdict(<class 'list'>, {3662: 5.0, 3218: 5.0, 300: 5.0...}),
bid_uid_dict = shrunk_bid_uids_rdd \
.map(lambda bid_uid_score: {bid_uid_score[0]: bid_uid_score[1]}) \
.flatMap(lambda kv_items: kv_items.items()).collectAsMap()
# generate all possible pair between candidate bidx
# and compute the pearson similarity
candidate_pair = candidate_bids.cartesian(candidate_bids) \
.filter(lambda id_pair: id_pair[0] < id_pair[1]) \
.filter(lambda id_pair: existNRecords(bid_uid_dict[id_pair[0]],
bid_uid_dict[id_pair[1]])) \
.map(lambda id_pair: (id_pair,
bid_uid_dict[id_pair[1]]))) \
.filter(lambda kv: kv[1] > 0) \
.map(lambda kv: {"b1": reversed_index_bus_dict[kv[0][0]],
"b2": reversed_index_bus_dict[kv[0][1]],
"sim": kv[1]})
I encountered the same error with Python 3.7 and Spark 2.4.4 running locally. No combination of spark options helped.
I was reading rows from parquet files which were heavily skewed. They contained a binary column with values between a few bytes and more than 10MB. The resulting dataframe contained a relatively small number of partitions despite setting a high number for
. The number of partitions remained similar to the number of parquet files I was reading and I kept getting a socket timeout.I tried to set
to a small enough value but the error was still there. The only thing that helped was arepartition
after reading the data to increase the number of partitions and to distribute the rows more evenly. Note that this is only an observation and I still cannot explain why the error went away.If data skew is also a topic here it could be mitigated by changing your code to:
depends on your cluster and job characteristics and there is a sweet spot. Ifn
is too low you will get the socket timeout. Ifn
is too large it will have a negative effect on performance.