I have billions of data in hdfs, now I want to load the data by Spark and then give every record a sorted number tag, how cloud I write the code so that I can get the most efficient performance?

Let's consider for example, this dataset:

100.0
120.0
400.0
500.0
20.0
12.0
33.0
...
8.0
9.0
...

The result I just want to get is:

0.0 1
1.0 2
5.0 3
8.0 4
9.0 5
...
27898880 2500000
27898893 2500001
....

I have tried the window function row_number() in SparkSQL as below, but it can not use the multi-cores, it just run in single-core, so it is so slow.

select
    score,
    row_number() over(order by score) as sort
from
    my_score_data

1 Answers

1
Oli On Best Solutions

As you mention it, a non partioned window is not a good idea. Indeed, not only is it mono core, but it also creates a lot of shuffle. All the data ends up in one partition, and thus on one node, which can cause an out of memory error (and most certainly will with large datasets).

To solve your problem, there are several solutions in Spark. The simplest one is to use the RDD API. If you need consecutive indices, you can use zipWithIndex. It is much more efficient than a window. It triggers a simple spark job.

val rdd = sc
    .textFile("hdfs:///pathToFile/file.txt")
    .sortBy(identity) // just in case you want sorted data
val tupleRDD = rdd.zipWithIndex // A rdd of tuple, the index is at the 2nd place
val stringRDD = tupleRDD.map(t => t._1 + " " + t._2) // we can also create a string

If you don't need consecutive indices, but only increasing indices (with possible gaps), you can use zipWithUniqueId() instead. It does not trigger any spark job and will thus be extremely efficient.