I am facing difficulty in generating unique sequential surrogate Keys to replace the null values in a column of a table. The table is obtained after joining a source table and reference table and the column is the primary key column ("account_key") of resultant table where null values should be replaced with unique sequence keys. The unique keys generated should start with a number assigned to the source and the number should be incremented for each null value. I tried the following
var uniqueId = Table_NO.agg(max("No"))
// var uniqueID = 6000001 (for example)
//or a SQL statement can be written as "var uniqueId = sqlContext.sql(SELECT MAX (NBR) FROM Table"
var a = sc.accumulator(uniqueID)
def generate(s:Int):Int = {
if (s==0)
{
a = a.add(1)
return a.localValue
}
else
{
return s
}
val NumGen = udf(generate(_:Int))
val KeyGen = table_n.withColumn("KEY_New", NumGen(table_n ("account_key")))
I think you can just use a windowing function, depending on your version it might look like this...