Joining 2 pyspark dataframes and continuing a running window sum and max

59 views Asked by At

I have two spark dataframes

dataframe_1:

+----------------------------+------+---------+-----+
|timestamp                   |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|0     |0        |0    |
|2023-08-18T00:10:00.000+0000|1     |1        |1    |
|2023-08-18T00:20:00.000+0000|1     |2        |1    |
|2023-08-18T00:30:00.000+0000|0     |2        |1    |
|2023-08-18T00:40:00.000+0000|1     |3        |1    |
|2023-08-18T00:50:00.000+0000|0     |3        |1    |
|2023-08-18T01:00:00.000+0000|1     |3        |1    |
+----------------------------+------+---------+-----+

dataframe_2:

+----------------------------+------+
|timestamp                   |target|
+----------------------------+------+
|2023-08-18T01:10:00.000+0000|1     |
|2023-08-18T01:20:00.000+0000|1     |
|2023-08-18T01:30:00.000+0000|1     |
|2023-08-18T01:40:00.000+0000|0     |
|2023-08-18T01:50:00.000+0000|1     |
|2023-08-18T02:00:00.000+0000|0     |
+----------------------------+------+

You can see from the timestamps that both dataframes are 10 minute aggregated data and dataframe_2 is the next amount of data in order (assuming this is running in real time every few hours).

I would like to concat these two dataframes but retain the window sum and window max calculations.

I'm calculating 2 columns called counter_1 and max_1. Those are just the window sum and window max.

The window I'm using is (but really it could be any number of rows between):

window = (Window.partitionBy().orderBy("timestamp").rowsBetween(-4, 0))

The expected output would be this:

+----------------------------+------+---------+-----+
|timestamp                   |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|0     |0        |0    |
|2023-08-18T00:10:00.000+0000|1     |1        |1    |
|2023-08-18T00:20:00.000+0000|1     |2        |1    |
|2023-08-18T00:30:00.000+0000|0     |2        |1    |
|2023-08-18T00:40:00.000+0000|1     |3        |1    |
|2023-08-18T00:50:00.000+0000|0     |3        |1    |
|2023-08-18T01:00:00.000+0000|1     |3        |1    |
|2023-08-18T01:10:00.000+0000|1     |3        |1    |
|2023-08-18T01:20:00.000+0000|1     |4        |1    |
|2023-08-18T01:30:00.000+0000|1     |4        |1    |
|2023-08-18T01:40:00.000+0000|0     |4        |1    |
|2023-08-18T01:50:00.000+0000|1     |4        |1    |
|2023-08-18T02:00:00.000+0000|0     |3        |1    |
+----------------------------+------+---------+-----+

I've tried multiple ways of grouping the data and aggregating sums and max values of the the original dataframe but I've had no luck.

EDIT 3/29/24

Forgot to mention how this should actually work from a design perspective.

The first dataframe doesn't always have to start at 0 for counter_1 and max_1. There could be previous data that sets those values up to the window max for counter_1 or 1 for max_1.

An example of this would be this as dataframe1:

+----------------------------+------+---------+-----+
|timestamp                   |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|1     |3        |1    |
|2023-08-18T00:10:00.000+0000|1     |4        |1    |
|2023-08-18T00:20:00.000+0000|1     |3        |1    |
|2023-08-18T00:30:00.000+0000|0     |3        |1    |
|2023-08-18T00:40:00.000+0000|1     |3        |1    |
|2023-08-18T00:50:00.000+0000|0     |3        |1    |
|2023-08-18T01:00:00.000+0000|1     |3        |1    |
+----------------------------+------+---------+-----+

and this is dataframe2:

+----------------------------+------+
|timestamp                   |target|
+----------------------------+------+
|2023-08-18T01:10:00.000+0000|1     |
|2023-08-18T01:20:00.000+0000|1     |
|2023-08-18T01:30:00.000+0000|1     |
|2023-08-18T01:40:00.000+0000|0     |
|2023-08-18T01:50:00.000+0000|1     |
|2023-08-18T02:00:00.000+0000|0     |
+----------------------------+------+

If you use the solution below you get this result:

+----------------------------+------+---------+-----+
|timestamp                   |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|1     |1        |1    |
|2023-08-18T00:10:00.000+0000|1     |2        |1    |
|2023-08-18T00:20:00.000+0000|1     |3        |1    |
|2023-08-18T00:30:00.000+0000|0     |3        |1    |
|2023-08-18T00:40:00.000+0000|1     |4        |1    |
|2023-08-18T00:50:00.000+0000|0     |3        |1    |
|2023-08-18T01:00:00.000+0000|1     |3        |1    |
|2023-08-18T01:10:00.000+0000|1     |3        |1    |
|2023-08-18T01:20:00.000+0000|1     |4        |1    |
|2023-08-18T01:30:00.000+0000|1     |4        |1    |
|2023-08-18T01:40:00.000+0000|0     |4        |1    |
|2023-08-18T01:50:00.000+0000|1     |4        |1    |
|2023-08-18T02:00:00.000+0000|0     |3        |1    |
+----------------------------+------+---------+-----+

Which is incorrect as the first dataframe should not have values recalculated ideally because those were calculated already just want to add on the new data continuing the windows.

The expected result of these two would be this:

+----------------------------+------+---------+-----+
|timestamp                   |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|1     |3        |1    |
|2023-08-18T00:10:00.000+0000|1     |4        |1    |
|2023-08-18T00:20:00.000+0000|1     |3        |1    |
|2023-08-18T00:30:00.000+0000|0     |3        |1    |
|2023-08-18T00:40:00.000+0000|1     |3        |1    |
|2023-08-18T00:50:00.000+0000|0     |3        |1    |
|2023-08-18T01:00:00.000+0000|1     |3        |1    |
|2023-08-18T01:10:00.000+0000|1     |3        |1    |
|2023-08-18T01:20:00.000+0000|1     |4        |1    |
|2023-08-18T01:30:00.000+0000|1     |4        |1    |
|2023-08-18T01:40:00.000+0000|0     |4        |1    |
|2023-08-18T01:50:00.000+0000|1     |4        |1    |
|2023-08-18T02:00:00.000+0000|0     |3        |1    |
+----------------------------+------+---------+-----+
2

There are 2 answers

0
Aaron Brazier On BEST ANSWER

For anyone else that finds this and has a similar problem this appears to be the solution I'm going to go with after much back and forth with CoPilot and modifying to fit my use case.

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, max, lit
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Continuous DataFrame Join and Window Calculations") \
    .getOrCreate()

# Define the dataframes
df1 = spark.createDataFrame([
    ("2023-08-18T00:00:00.000+0000", 1, 3, 1),
    ("2023-08-18T00:10:00.000+0000", 1, 4, 1),
    ("2023-08-18T00:20:00.000+0000", 1, 3, 1),
    ("2023-08-18T00:30:00.000+0000", 0, 3, 1),
    ("2023-08-18T00:40:00.000+0000", 1, 3, 1),
    ("2023-08-18T00:50:00.000+0000", 0, 3, 1),
    ("2023-08-18T01:00:00.000+0000", 1, 3, 1)
], ["timestamp", "target", "counter_1", "max_1"])

df2 = spark.createDataFrame([
    ("2023-08-18T01:10:00.000+0000", 1),
    ("2023-08-18T01:20:00.000+0000", 1),
    ("2023-08-18T01:30:00.000+0000", 1),
    ("2023-08-18T01:40:00.000+0000", 0),
    ("2023-08-18T01:50:00.000+0000", 1),
    ("2023-08-18T02:00:00.000+0000", 0)
], ["timestamp", "target"])

# Define the window specification
window_spec = Window.orderBy("timestamp").rowsBetween(-4, 0)

# Add 'counter_1' and 'max_1' columns to df2
df2 = df2.withColumn("counter_1", lit(None))
df2 = df2.withColumn("max_1", lit(None))

# Combine the last 4 rows of df1 and all rows of df2
combined_df = df1.orderBy("timestamp", ascending=False).limit(4).union(df2).sort('timestamp')

# Calculate 'counter_1' and 'max_1' for the combined dataframe
combined_df = combined_df.withColumn("counter_1", sum("target").over(window_spec))
combined_df = combined_df.withColumn("max_1",max("target").over(window_spec))

# Replace the 'counter_1' and 'max_1' values in df2 with the calculated values
df2 = df2.drop("counter_1", "max_1")
df2 = df2.join(combined_df, ["timestamp", "target"], how="left")
# Concatenate df1 and df2
result = df1.union(df2)
result.show(truncate=False)

+----------------------------+------+---------+-----+
|timestamp                   |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|1     |3        |1    |
|2023-08-18T00:10:00.000+0000|1     |4        |1    |
|2023-08-18T00:20:00.000+0000|1     |3        |1    |
|2023-08-18T00:30:00.000+0000|0     |3        |1    |
|2023-08-18T00:40:00.000+0000|1     |3        |1    |
|2023-08-18T00:50:00.000+0000|0     |3        |1    |
|2023-08-18T01:00:00.000+0000|1     |3        |1    |
|2023-08-18T01:10:00.000+0000|1     |3        |1    |
|2023-08-18T01:30:00.000+0000|1     |4        |1    |
|2023-08-18T01:20:00.000+0000|1     |4        |1    |
|2023-08-18T01:40:00.000+0000|0     |4        |1    |
|2023-08-18T02:00:00.000+0000|0     |3        |1    |
|2023-08-18T01:50:00.000+0000|1     |4        |1    |
+----------------------------+------+---------+-----+
1
Prathik Kini On
from pyspark.sql.functions import lit,sum,max
from pyspark.sql.window import Window

window_spec = Window.orderBy("timestamp").rowsBetween(-4, 0)

dataframe_2 = dataframe_2.withColumn("counter_1", lit(0)).withColumn("max_1", lit(0))
union_df = dataframe_1.union(dataframe_2)

# Calculate the running sum and max over the window
result_df = union_df \
    .withColumn("counter_1", sum("target").over(window_spec)) \
    .withColumn("max_1", max("target").over(window_spec))

results.show(truncate=False)

+----------------------------+------+---------+-----+
|timestamp                   |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|0     |0        |0    |
|2023-08-18T00:10:00.000+0000|1     |1        |1    |
|2023-08-18T00:20:00.000+0000|1     |2        |1    |
|2023-08-18T00:30:00.000+0000|0     |2        |1    |
|2023-08-18T00:40:00.000+0000|1     |3        |1    |
|2023-08-18T00:50:00.000+0000|0     |3        |1    |
|2023-08-18T01:00:00.000+0000|1     |3        |1    |
|2023-08-18T01:10:00.000+0000|1     |3        |1    |
|2023-08-18T01:20:00.000+0000|1     |4        |1    |
|2023-08-18T01:30:00.000+0000|1     |4        |1    |
|2023-08-18T01:40:00.000+0000|0     |4        |1    |
|2023-08-18T01:50:00.000+0000|1     |4        |1    |
|2023-08-18T02:00:00.000+0000|0     |3        |1    |
+----------------------------+------+---------+-----+