FlinkML: Joining DataSets of LabeledVector does not work

120 views Asked by At

I am currently trying to join two DataSets (part of the flink 0.10-SNAPSHOT API). Both DataSets have the same form:

predictions:
6.932018685453303E155 DenseVector(0.0, 1.4, 1437.0)

org:
2.0 DenseVector(0.0, 1.4, 1437.0)

general form:
LabeledVector(Double, DenseVector(Double,Double,Double))

What I want to create is a new DataSet[(Double,Double)] containing only the labels of the two DataSets i.e.:

join:
6.932018685453303E155 2.0

Therefore I tried the following command:

val join = org.join(predictions).where(0).equalTo(0){
  (l, r) => (l.label, r.label)
}

But as a result 'join' is empty. Am I missing something?

1

There are 1 answers

3
Fabian Hueske On

You are joining on the label field (index 0) of the LabeledVector type, i.e., building all pairs of elements with matching labels. Your example indicates that you want to join on the vector field instead.

However, joining on the vector field, for example by calling:

org.join(predictions).where("vector").equalTo("vector"){
  (l, r) => (l.label, r.label)
}

will not work, because DenseVector, the type of the vector field, is not recognized as key type by Flink (such as all kinds of Arrays).

Till describes how to compare prediction and label values in a comment below.