Access newly created column in withinColumn

57 views Asked by At

I have the following dataset:

|value|
+-----+
|    1|
|    2|
|    3|

I want to create a new column newValue that takes the value of newValue from the previous row and does something with it. For simplicity just increment by 3. If there is no previous column, in case of the first row, value should be taken. The result will look like this:

|value|newValue|
+-----+--------+
|    1|       1|
|    2|       4| # newValue previous row (1) + 3
|    3|       7| # newValue previous row (4) + 3

I tried it with the following code, but it seems that the new column newValue does not exist yet when trying to access the previous row. How can I access the newly created column within withColumn?

val data = Seq(1, 2, 3)
val dataset: Dataset[Int] = data.toDS()
val windowSpec = Window.orderBy("value")

val result = dataset.withColumn("newValue", coalesce(lag("newValue", 1).over(windowSpec) + 3, $"value"))

This leads to the following error message:

org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `newValue` cannot be resolved. Did you mean one of the following? [`value`]

3

There are 3 answers

0
s.polam On

I believe all you need is running sum with some constant value 3

scala> df.show(false)
+-----+
|value|
+-----+
|1    |
|2    |
|3    |
|4    |
|5    |
+-----+
df
.withColumn(
    "newValue", 
    sum(
        when(
            lag($"value", 1).over(Window.orderBy($"value")).isNotNull, 
            lit(3)
        ).otherwise($"value")
    )
    .over(Window.orderBy($"value"))
)
.show(false)

Output

+-----+--------+
|value|newValue|
+-----+--------+
|1    |1       |
|2    |4       | -- 1  + 3
|3    |7       | -- 4  + 3
|4    |10      | -- 7  + 3
|5    |13      | -- 10 + 3
+-----+--------+
1
Rushikesh On

I am not sure if window function can be used to achieve this. You can do the calculation by converting DataFrame to RDD or List as a workaround. I tested it on an example, it seems to be working but not sure if this is an optimized approach.

df = spark.sql("select id as value, row_number() over(order by id) as rn from range(1, 4)")

firstVal = df.select("value").first()[0]
dfCount = df.count()
dataList = [[firsrtVal, 1]]

for i in range(2, dfCount + 1):
    dataList.append(list([dataList[-1][0] + 3, i]))

df2 = spark.createDataFrame(dataList, ["newValue", "rn"])

finalDF = df.join(df2, 'rn', 'inner').select("value", "newValue")
0
pasha701 On

Access to previous aggregated value is not possible. For this particular case, newValue can be calculated as - sum of all previous values, plus current position multiplied on three:

val dataset: Dataset[Int] = Seq(1, 2, 3, 12, 43).toDS()
val rangeWindow = Window.orderBy("value").rangeBetween(Window.unboundedPreceding, Window.currentRow - 1)
val formula = sum("value").over(rangeWindow) + count("*").over(rangeWindow) * 3

val result = dataset
  .withColumn("newValue", coalesce(formula, lit(1)))

Output:

+-----+--------+
|value|newValue|
+-----+--------+
|1    |1       |
|2    |4       |
|3    |9       |
|12   |15      |
|43   |30      |
+-----+--------+