I have an Rdd with and ID and features. In short, I am trying to output something that matches the label ("id") with the cluster number it belongs to (0, 1, 2, etc.)

Three lines from the rdd dataset look something like this (though its more like 100 lines, first item is a string and the rest are floats):

rdd = ["id1",2,12,3.4,19], ["id2",4,17,3.6,40] ["id3",5,14,2.3,47]...

I run the features of this model by creating an RDD with only the features (the id breaks the model running directly on the original RDD):

feature_rdd = [2,12,3.4,19], [4,17,3.6,40] [5,14,2.3,47]...

model = KMeans.train(parsedData, num_clusters, maxIterations=max_iterations, initializationMode=initialization_mode, seed=seed)

I predict using:

predictions = model.predict(feature_rdd)

and get an RDD that looks something like this, corresponding to the cluster number of the prediction of the line:

[0, 0, 1, 2, 0...]

I would like to somehow combine the id together with the prediction so I can report which IDs belong in which cluster. I can't find a great way of doing this. I tried to union the two RDDs, but then it gives just another item in the new Rdd instead of pairing each prediction with each ID. I've also tried to convert both two Dataframes, but had issues with the mix of casting of the variables. I'm looking to do either something akin to dataframe:

*****************
* id  * cluster *
*****************
* "id1" *    0  * 
* "id2" *    0  *
* "id3" *    1  *
*****************

or just somehow paired together and exportable to a list, etc.

["id1", 0],["id2", 1]...

But any help on how to approach this problem is greatly appreciated.

1 Answers

0
Florian On

You could use map to get the first entry of your rdd with the features, and then use zip to add the predicted clusters. You can convert the resulting rdd with createDataFrame. An example is shown below, Hope this helps.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

data = sc.parallelize(
[
    ('id1',1,2,3),
    ('id2',2,3,4),
    ('id3',3,4,5)
])
predictions = sc.parallelize(
[
    (1),
    (0),
    (1)
])

# zip the id's (first element of each entry in the rdd) and the predictions into one rdd.
id_and_predictions = data.map(lambda x: x[0]).zip(predictions)

# Convert to DataFrame
schema = StructType([
    StructField('id',StringType()), StructField('cluster',IntegerType())
])
df = sqlContext.createDataFrame(id_and_predictions,schema)
df.show()

Output:

+---+-------+
| id|cluster|
+---+-------+
|id1|      1|
|id2|      0|
|id3|      1|
+---+-------+