I have to need find evenements in two dataframe (sources and sons). I look for the KeySource == Key in an time interval before son times (the nearest time if son is unique.) I use a dataframe.collect() to compare first dataframe line to the second. Normally, the code (reproduced from my human memory) do the job, but it's very slow even for small tables. I'm new in spark. May I improve performance ? My platform is in pyspark 2.3.
Thanks!
More details :
Ev1
| Time | Key | Index |
| -------- | -------- | -------- |
| t1 | k1 | i1 |
| t2 | k2 | i2 |
| t3 | k3 | i3 |
| t4 | k1 | i4 |
Ev2
| Time | KeySource| Index |
| -------- | -------- | -------- |
| t1 | k1 | i1 |
| t2 | k3 | i2 |
| t3 | k1 | i3 |
| t4 | k5 | i4 |
I look for the KeySource == Key in an time interval before Ev2.Time (the nearest time if son is unique.)
DeltaT_max = 30000
Unique_Son = True or False
Ev2 = Ev2.withColumn("tmin", Ev2.Time -lit(DeltaT_max ) )
Ev1 = Ev2.withColumn("Son_Index", lit(None)) )
Ev1 = Ev1.sort(desc("Time"))
Ev2_without_father = dict()
for ev2 in Ev2.select("Time", "SourceIndex", "Index", "tmin") .collect() :
tmp_Ev1 = Ev1.filter( (Ev1.Key == ev2.KeySource ) & (Ev1.Time >= ev2.tmin) & (Ev1.Time <= ev2.Time) )
if tmp_Ev1.count() > 0 :
# I'm not sure withColumn here are a good idea but my answer is good...
if Unique_Son :
Ev1 = Ev1.withcolumn("Son_Index", when(ev2.Index == tmp_Ev1 .Index, lit(ev2.firs().Index) ).otherwise(Ev2.Son_Index))
else :
Ev1 = Ev1.withcolumn("Son_Index", when(ev2.Index == tmp_Ev1 .Index, lit(ev2.Index) ).otherwise(Ev2.Son_Index))
else :
# rare case but rebuilt a dataframe after for CSV export
Ev2_without_father[ev2.Index] = ev2.KeySource
Ev2 = Ev2.drop("tmin")
Result = Ev1.select("Index", "Son_Index")
Normally, the code (reproduced from my human memory) do the job, but it's very slow even for some hundred lines. I'm new in spark. May I improve performance ? My platform is in pyspark 2.3.
Thanks!