Identifying overlapping records based on timestamps and removing old overlaps in pyspark

33 views Asked by At

This is a pyspark overlapping time period problem:

Sample data

data = [
    (1, "2024-01-28T05:00:00Z", "2024-01-28T06:00:00Z", "1/24/24"),
    (1, "2024-01-28T05:30:00Z", "2024-01-28T07:00:00Z", "1/25/24"),
    (1, "2024-01-28T06:00:00Z", "2024-01-28T09:00:00Z", "1/24/24"),
    (1, "2024-01-28T07:00:00Z", "2024-01-28T10:30:00Z", "1/25/24"),
    (3, "2024-01-28T12:00:00Z", "2024-01-28T13:00:00Z", "1/26/24"),
]

columns = ["station_id", "start_time", "end_time", "partition_date"]

I am trying to identify overlapping records based on start_time and end_time fields, for the same station_id, and once I identify I want to only keep the rows with the most recent partition_date and remove the overlapping rows that have the old partition date.

The intended output would be:

output = [
    (1, "2024-01-28T05:30:00Z", "2024-01-28T07:00:00Z", "1/25/24"),
    (1, "2024-01-28T07:00:00Z", "2024-01-28T10:30:00Z", "1/25/24"),
    (3, "2024-01-28T12:00:00Z", "2024-01-28T13:00:00Z", "1/26/24"),
]

I have tried a few ways to do this from doing joins to doing windowing, but either way I end up not achieving the desired results, which are: identifying overlapping rows, and only keep the most recent overlapping rows while removing the rest, and also keeping all rows that are not overlapping. The goal is to time a specific time duration for a station_id only once so for example 5:00 to 6:00 should only have one record for one station_id.

One solution I tried:

dl_ota_airings_df_dup = dl_ota_airings_df_3.selectExpr("station_id as station_id2", "start_time as start_time2", "end_time as end_time2", "content_id as content_id2", "partition_date as partition_date2")

join_condition = ((dl_ota_airings_df_3["station_id"] == dl_ota_airings_df_dup["station_id2"]) &
    ((dl_ota_airings_df_3["start_time"] >= dl_ota_airings_df_dup["start_time2"]) & (dl_ota_airings_df_3["start_time"] < dl_ota_airings_df_dup["end_time2"]))
    |
    ((dl_ota_airings_df_3["end_time"] <= dl_ota_airings_df_dup["end_time2"]) & (dl_ota_airings_df_3["end_time"] > dl_ota_airings_df_dup["start_time2"])))
    # &
    # ((dl_ota_airings_df_3["start_time"] != dl_ota_airings_df_dup["start_time2"]) & (dl_ota_airings_df_3["end_time"] != dl_ota_airings_df_dup["end_time2"]) & (dl_ota_airings_df_3["partition_date"] != dl_ota_airings_df_dup["partition_date2"])))

df_overlapping = dl_ota_airings_df_3.join(dl_ota_airings_df_dup, join_condition, "left")

dl_ota_airings_df_4 = df_overlapping.filter("station_id2 is null or (partition_date > partition_date2)").drop("station_id2", "start_time2", "end_time2", "content_id2", "partition_date2").dropDuplicates()

I always have some edge cases that are not captured by this logic, because for any available overlap for any station_id, I want to keep only one record with the most recent partition while I remove the rest. Please advise, or point me in the right direction here.

1

There are 1 answers

1
Sasidharan On

You can try below code to achieve this

windowSpec = Window.partitionBy("station_id").orderBy("start_time")

result_df = df.withColumn("prev_end_time", F.lag("end_time").over(windowSpec)) \
    .withColumn("overlap", F.when(F.col("start_time") < F.col("prev_end_time"), True).otherwise(False)) \
    .withColumn("max_partition_date", F.max("partition_date").over(Window.partitionBy("station_id"))) \
    .filter((~F.col("overlap")) | (F.col("partition_date") == F.col("max_partition_date"))) \
    .select("station_id", "start_time", "end_time", "partition_date")