I want to fill in timestamps for a given code based on a window function in pyspark

28 views Asked by At

I have a dataset and it’s output in the picture attached,

I want to create 3 new columns called start_time_1, start_time_2, start_time_3 such that I can update the first timestamps of each of the codes in coded, code, code based on start_timestamps.

In the attached image, the first group is id=1, so, timestamps when M, 8, 6 started is, 2023-05-06.

So, everytime, M,8 or 6 appears in code1, code2, code3 it should be filled by this timestamp. A came little late at 2023-08-13, so A’s timestamp will be 2023-08-13.

Similarly, on group id=2, D came first at 2023-06-07, so everytime D appeared on any code1, code2, code3. the timestamps will be 2023-06-07.

How do I achieve this on pyspark? Also, please note that the columns are not sorted in ascending order, although in the example shown, the cols are sorted.

I tried using unbounded preceding. but, i was not able to achieve my results. The problem I have is I am not able to search in all the codes. My window function only searches for one column, it doesn’t work when I put all the columns at once.

My output is:

1

There are 1 answers

0
user238607 On

My approach to solve this problem has been to create a map of code and minimum timestamp it appeared in. Then using that map to populate the start_time_i columns. Below is the code.

from pyspark import SparkContext, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import col,when


sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [
    ["1", "2023-08-30", "6", "6", "null", "null" ],
    ["1", "2023-08-13", "A6", "A", "6", "null"],
    ["1", "2023-08-06", "86", "8", "6", "null"],
    ["1", "2023-07-03", "6", "6", "null", "null"],
    ["1", "2023-05-06", "M86", "M", "8", "6"],
    ["2", "2023-08-10", "GA", "G", "A", "null"],
    ["2", "2023-07-04", "7D", "7", "D", "null"],
    ["2", "2023-06-07", "D", "D", "null", "null"],
]


df1Columns = ["id", "start_timestamps", "combined_code", "code1", "code2", "code3"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)


#Replace "null" string with None for all columns
df1=df1.select([when(col(c)=="null", None).otherwise(col(c)).alias(c) for c in df1.columns])
print("Showing the values")
df1.show(n=100, truncate=False)

code_cols = ["code1", "code2", "code3"]
df1 = df1.withColumn("array_col", F.array_compact(F.array(code_cols)))

print("Showing the values")
df1.show(n=100, truncate=False)

df2 = df1.select("id", "start_timestamps", "array_col").withColumn("code_unique", F.explode("array_col")).drop("array_col").cache()
print("Showing the df2 values")
df2.show(n=100, truncate=False)

df3 = df2.groupby("id", "code_unique").agg(F.min("start_timestamps").alias("min_start_ts"))
print("Showing the df3 values")
df3.show(n=100, truncate=False)

df4 = df3.groupby("id").agg(F.collect_list("code_unique").alias("unique_code_list"), F.collect_list("min_start_ts").alias("min_start_ts_list"))
print("Showing the df4 values")
df4.show(n=100, truncate=False)

df5 = df4.withColumn("per_id_map", F.map_from_arrays("unique_code_list", "min_start_ts_list")).select("id", "per_id_map").cache()
print("Showing the df4 values")
df5.show(n=100, truncate=False)

df_joined = df1.join(df5, on=["id"])
print("Showing the df_joined values")
df_joined.show(n=100, truncate=False)

df_mapped = df_joined.withColumn("start_time_1", F.col("per_id_map").getItem(col("code1"))) \
                .withColumn("start_time_2", F.col("per_id_map").getItem(col("code2"))) \
                .withColumn("start_time_3", F.col("per_id_map").getItem(col("code3")))

print("Showing the df_mapped values")
df_mapped.show(n=100, truncate=False)

cols_select = df1Columns + ["start_time_1", "start_time_2", "start_time_3"]

print("Showing the final required dataframe values")
df_mapped.select(cols_select).show(n=100, truncate=False)

Output :

Showing the values
+---+----------------+-------------+-----+-----+-----+
|id |start_timestamps|combined_code|code1|code2|code3|
+---+----------------+-------------+-----+-----+-----+
|1  |2023-08-30      |6            |6    |null |null |
|1  |2023-08-13      |A6           |A    |6    |null |
|1  |2023-08-06      |86           |8    |6    |null |
|1  |2023-07-03      |6            |6    |null |null |
|1  |2023-05-06      |M86          |M    |8    |6    |
|2  |2023-08-10      |GA           |G    |A    |null |
|2  |2023-07-04      |7D           |7    |D    |null |
|2  |2023-06-07      |D            |D    |null |null |
+---+----------------+-------------+-----+-----+-----+

Showing the values
+---+----------------+-------------+-----+-----+-----+---------+
|id |start_timestamps|combined_code|code1|code2|code3|array_col|
+---+----------------+-------------+-----+-----+-----+---------+
|1  |2023-08-30      |6            |6    |null |null |[6]      |
|1  |2023-08-13      |A6           |A    |6    |null |[A, 6]   |
|1  |2023-08-06      |86           |8    |6    |null |[8, 6]   |
|1  |2023-07-03      |6            |6    |null |null |[6]      |
|1  |2023-05-06      |M86          |M    |8    |6    |[M, 8, 6]|
|2  |2023-08-10      |GA           |G    |A    |null |[G, A]   |
|2  |2023-07-04      |7D           |7    |D    |null |[7, D]   |
|2  |2023-06-07      |D            |D    |null |null |[D]      |
+---+----------------+-------------+-----+-----+-----+---------+

Showing the df2 values
+---+----------------+-----------+
|id |start_timestamps|code_unique|
+---+----------------+-----------+
|1  |2023-08-30      |6          |
|1  |2023-08-13      |A          |
|1  |2023-08-13      |6          |
|1  |2023-08-06      |8          |
|1  |2023-08-06      |6          |
|1  |2023-07-03      |6          |
|1  |2023-05-06      |M          |
|1  |2023-05-06      |8          |
|1  |2023-05-06      |6          |
|2  |2023-08-10      |G          |
|2  |2023-08-10      |A          |
|2  |2023-07-04      |7          |
|2  |2023-07-04      |D          |
|2  |2023-06-07      |D          |
+---+----------------+-----------+

Showing the df3 values
+---+-----------+------------+
|id |code_unique|min_start_ts|
+---+-----------+------------+
|1  |6          |2023-05-06  |
|1  |8          |2023-05-06  |
|1  |A          |2023-08-13  |
|1  |M          |2023-05-06  |
|2  |7          |2023-07-04  |
|2  |A          |2023-08-10  |
|2  |D          |2023-06-07  |
|2  |G          |2023-08-10  |
+---+-----------+------------+

Showing the df4 values
+---+----------------+------------------------------------------------+
|id |unique_code_list|min_start_ts_list                               |
+---+----------------+------------------------------------------------+
|1  |[6, 8, A, M]    |[2023-05-06, 2023-05-06, 2023-08-13, 2023-05-06]|
|2  |[7, A, D, G]    |[2023-07-04, 2023-08-10, 2023-06-07, 2023-08-10]|
+---+----------------+------------------------------------------------+

Showing the df4 values
+---+--------------------------------------------------------------------+
|id |per_id_map                                                          |
+---+--------------------------------------------------------------------+
|1  |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|
|2  |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|
+---+--------------------------------------------------------------------+

Showing the df_joined values
+---+----------------+-------------+-----+-----+-----+---------+--------------------------------------------------------------------+
|id |start_timestamps|combined_code|code1|code2|code3|array_col|per_id_map                                                          |
+---+----------------+-------------+-----+-----+-----+---------+--------------------------------------------------------------------+
|1  |2023-08-30      |6            |6    |null |null |[6]      |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|
|1  |2023-08-13      |A6           |A    |6    |null |[A, 6]   |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|
|1  |2023-08-06      |86           |8    |6    |null |[8, 6]   |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|
|1  |2023-07-03      |6            |6    |null |null |[6]      |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|
|1  |2023-05-06      |M86          |M    |8    |6    |[M, 8, 6]|{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|
|2  |2023-08-10      |GA           |G    |A    |null |[G, A]   |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|
|2  |2023-07-04      |7D           |7    |D    |null |[7, D]   |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|
|2  |2023-06-07      |D            |D    |null |null |[D]      |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|
+---+----------------+-------------+-----+-----+-----+---------+--------------------------------------------------------------------+


Showing the df_mapped values
+---+----------------+-------------+-----+-----+-----+---------+--------------------------------------------------------------------+------------+------------+------------+
|id |start_timestamps|combined_code|code1|code2|code3|array_col|per_id_map                                                          |start_time_1|start_time_2|start_time_3|
+---+----------------+-------------+-----+-----+-----+---------+--------------------------------------------------------------------+------------+------------+------------+
|1  |2023-08-30      |6            |6    |null |null |[6]      |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|2023-05-06  |null        |null        |
|1  |2023-08-13      |A6           |A    |6    |null |[A, 6]   |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|2023-08-13  |2023-05-06  |null        |
|1  |2023-08-06      |86           |8    |6    |null |[8, 6]   |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|2023-05-06  |2023-05-06  |null        |
|1  |2023-07-03      |6            |6    |null |null |[6]      |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|2023-05-06  |null        |null        |
|1  |2023-05-06      |M86          |M    |8    |6    |[M, 8, 6]|{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|2023-05-06  |2023-05-06  |2023-05-06  |
|2  |2023-08-10      |GA           |G    |A    |null |[G, A]   |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|2023-08-10  |2023-08-10  |null        |
|2  |2023-07-04      |7D           |7    |D    |null |[7, D]   |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|2023-07-04  |2023-06-07  |null        |
|2  |2023-06-07      |D            |D    |null |null |[D]      |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|2023-06-07  |null        |null        |
+---+----------------+-------------+-----+-----+-----+---------+--------------------------------------------------------------------+------------+------------+------------+

Showing the final required dataframe values
+---+----------------+-------------+-----+-----+-----+------------+------------+------------+
|id |start_timestamps|combined_code|code1|code2|code3|start_time_1|start_time_2|start_time_3|
+---+----------------+-------------+-----+-----+-----+------------+------------+------------+
|1  |2023-08-30      |6            |6    |null |null |2023-05-06  |null        |null        |
|1  |2023-08-13      |A6           |A    |6    |null |2023-08-13  |2023-05-06  |null        |
|1  |2023-08-06      |86           |8    |6    |null |2023-05-06  |2023-05-06  |null        |
|1  |2023-07-03      |6            |6    |null |null |2023-05-06  |null        |null        |
|1  |2023-05-06      |M86          |M    |8    |6    |2023-05-06  |2023-05-06  |2023-05-06  |
|2  |2023-08-10      |GA           |G    |A    |null |2023-08-10  |2023-08-10  |null        |
|2  |2023-07-04      |7D           |7    |D    |null |2023-07-04  |2023-06-07  |null        |
|2  |2023-06-07      |D            |D    |null |null |2023-06-07  |null        |null        |
+---+----------------+-------------+-----+-----+-----+------------+------------+------------+