How to create a custom transformer with out any input column?

288 views Asked by At

We have requirement where in we wanted to generate scores of our model with some random values in between 0-1.

To do that we wanted to have a custom transformer which will be generating random numbers with out any input fields.

So can we generate a transformer without input fields in mleap?

Like usually we do create as below:

import ml.combust.mleap.core.Model
import ml.combust.mleap.core.types._

case class RandomNumberModel() extends Model {
  private val rnd = scala.util.Random

  def apply(): Double = rnd.nextFloat

  override def inputSchema: StructType = StructType("input" -> ScalarType.String).get

  override def outputSchema: StructType = StructType("output" -> ScalarType.Double ).get

}

How to make it as input schema no necessary to put?

1

There are 1 answers

0
Elmar Macek On

I have never tried that, but given how I achieved to have a custom transformer with multiple input fields ...

package org.apache.spark.ml.feature.mleap

import ml.combust.mleap.core.Model
import ml.combust.mleap.core.types._
import org.apache.spark.ml.linalg._

case class PropertyGroupAggregatorBaseModel (props: Array[String],
                                        aggFunc: String) extends Model {
  val outputSize = props.size

  //having multiple inputs, you will have apply with a parameter Seq[Any]
  def apply(features: Seq[Any]): Vector = {
    val properties = features(0).asInstanceOf[Seq[String]]
    val values = features(1).asInstanceOf[Seq[Double]]
    val mapping = properties.zip(values)
    val histogram = props.foldLeft(Array.empty[Double]){
      (acc, property) =>
        val newValues = mapping.filter(x => x._1 == property).map(x => x._2)
        val newAggregate = aggFunc match {
          case "sum" => newValues.sum.toDouble
          case "count" => newValues.size.toDouble
          case "avg" => (newValues.sum / Math.max(newValues.size, 1)).toDouble
        }
        acc :+ newAggregate
    }

    Vectors.dense(histogram)
  }

  override def inputSchema: StructType =  {
    //here you define the input 
    val inputFields = Seq(
      StructField("input1" -> ListType(BasicType.String)),
      StructField("input2" -> ListType(BasicType.Double))
    )
    StructType(inputFields).get
  }

  override def outputSchema: StructType = StructType(StructField("output" -> TensorType.Double(outputSize))).get
}

My suggestion would be, that the apply might already work for you. I guess if you define inputSchema as follows, it might work:

override def inputSchema: StructType =  {
    //here you define the input 
    val inputFields = Seq.empty[StructField]
    StructType(inputFields).get
  }