How avoid a dataframe.collect() in a pyspark request for better perform

133 views Asked by At

I have to need find evenements in two dataframe (sources and sons). I look for the KeySource == Key in an time interval before son times (the nearest time if son is unique.) I use a dataframe.collect() to compare first dataframe line to the second. Normally, the code (reproduced from my human memory) do the job, but it's very slow even for small tables. I'm new in spark. May I improve performance ? My platform is in pyspark 2.3.

Thanks!


More details :

Ev1 

| Time     | Key      | Index    |
| -------- | -------- | -------- |
| t1       | k1       | i1       |
| t2       | k2       | i2       |
| t3       | k3       | i3       |
| t4       | k1       | i4       |

Ev2
| Time     | KeySource| Index    |
| -------- | -------- | -------- |
| t1       | k1       | i1       |
| t2       | k3       | i2       |
| t3       | k1       | i3       |
| t4       | k5       | i4       |

I look for the KeySource == Key in an time interval before Ev2.Time (the nearest time if son is unique.)

DeltaT_max = 30000
Unique_Son = True or False

Ev2 = Ev2.withColumn("tmin", Ev2.Time -lit(DeltaT_max ) )
Ev1 = Ev2.withColumn("Son_Index", lit(None)) )
Ev1 = Ev1.sort(desc("Time"))

Ev2_without_father = dict()

for ev2 in Ev2.select("Time", "SourceIndex", "Index", "tmin") .collect() :
   tmp_Ev1 = Ev1.filter( (Ev1.Key == ev2.KeySource ) & (Ev1.Time >= ev2.tmin) & (Ev1.Time <= ev2.Time)  )
   if tmp_Ev1.count() > 0 :
        # I'm not sure withColumn here are a good idea but my answer is good...
        if Unique_Son  : 
            Ev1 = Ev1.withcolumn("Son_Index", when(ev2.Index == tmp_Ev1 .Index, lit(ev2.firs().Index) ).otherwise(Ev2.Son_Index))
        else :
            Ev1 = Ev1.withcolumn("Son_Index", when(ev2.Index == tmp_Ev1 .Index, lit(ev2.Index) ).otherwise(Ev2.Son_Index))

   else :
        # rare case but rebuilt a dataframe after for CSV export
        Ev2_without_father[ev2.Index] = ev2.KeySource 
   
Ev2 = Ev2.drop("tmin")

Result = Ev1.select("Index", "Son_Index")

Normally, the code (reproduced from my human memory) do the job, but it's very slow even for some hundred lines. I'm new in spark. May I improve performance ? My platform is in pyspark 2.3.

Thanks!

0

There are 0 answers