dask read_csv is fast but dataframe operation is slow

46 views Asked by At

I am trying to improve speed of read_csv() then later dataframe using pandas 2. I tried dask today and read_csv() is indeed really fast. But dataframe operation is slow. why is that? how to improve the speed of dataframe oepration after using dask?

Thanks

Here is the speed comparison between pandas 2 and dask

  1. read_csv() using pandas 2: 172 seconds
timer_start=timeit.default_timer()
df_pyarrow=pd.read_csv('input\\'+filename,parse_dates=True,sep='\t',engine='pyarrow')  
timer_end=timeit.default_timer()

timer_minutes=(timer_end-timer_start)/60
timer_seconds=(timer_end-timer_start)

print(f'Time took to Finish reading file is {timer_seconds:.1f} seconds')
print(f'Time took to Finish reading file is {timer_minutes:.2f} minutes')

Time took to Finish reading file is 172.2 seconds
Time took to Finish reading file is 2.87 minutes
  1. read_csv() using dask, it only takes 4 seconds
import dask.dataframe as dd
timer_start=timeit.default_timer()
ddf=dd.read_csv('input\\'+filename,parse_dates=True,sep='\t',sample=1000000)  
# ddf=dd.read_csv('input\\'+filename,parse_dates=True,sep='\t')  
timer_end=timeit.default_timer()

timer_minutes=(timer_end-timer_start)/60
timer_seconds=(timer_end-timer_start)

print(f'Time took to Finish reading file is {timer_seconds:.1f} seconds')
print(f'Time took to Finish reading file is {timer_minutes:.2f} minutes')
Time took to Finish reading file is 4.1 seconds
Time took to Finish reading file is 0.07 minute

Now after getting dataframe, I just addded a new column, using pandas 2 it is almost 0 seconds, however using dask it will take much longer, here is comparison

timer_start=timeit.default_timer()
df_pyarrow['new_col']=0
timer_end=timeit.default_timer()
timer_minutes=(timer_end-timer_start)/60
timer_seconds=(timer_end-timer_start)
print(f'Time took to Finish reading file is {timer_seconds:.1f} seconds')
print(f'Time took to Finish reading file is {timer_minutes:.2f} minutes')

Time took to Finish reading file is 0.0 seconds
Time took to Finish reading file is 0.00 minutes

Now for dask's dataframe, it takes 6 seconds to add a new column which is even slower than read_csv(), why is that? How to improve the speed of dataframe operation when using dask()? Thanks

timer_start=timeit.default_timer()
ddf['new_col']=0
timer_end=timeit.default_timer()
timer_minutes=(timer_end-timer_start)/60
timer_seconds=(timer_end-timer_start)
print(f'Time took to Finish reading file is {timer_seconds:.1f} seconds')
print(f'Time took to Finish reading file is {timer_minutes:.2f} minutes')

Time took to Finish reading file is 6.7 seconds
Time took to Finish reading file is 0.11 minutes
1

There are 1 answers

3
Paul H On

I think you've missed an important detail as to how dask works.

This doens't read a single CSV:

ddf = dd.read_csv('input\\'+filename,parse_dates=True,sep='\t',sample=1000000)

What it does instead is build a task graph.

You can add items to this task graph by continuing to tack on other operations. e.g., .groupby, .join etc.

Generally speaking, none of the tasks you add will be executed until you call

df_in_memory = ddf.compute()

However, some operations have implicit calls to compute. It appears that

ddf['new_col'] = 0

...is one of them.

To compare apples to apples, time all of your operations. By that I'm

timer_start=timeit.default_timer()
df_pyarrow = ( 
    pd.read_csv('input\\'+filename,parse_dates=True,sep='\t',engine='pyarrow')
        .assign(newcol=0)
)  
timer_end=timeit.default_timer()
timer_minutes=(timer_end-timer_start)/60
timer_seconds=(timer_end-timer_start)

print(f'Time took to Finish reading file is {timer_seconds:.1f} seconds')
print(f'Time took to Finish reading file is {timer_minutes:.2f} minutes')

vs

import dask.dataframe as dd

timer_start=timeit.default_timer()
ddf = (
    dd.read_csv('input\\'+filename,parse_dates=True,sep='\t',sample=1000000)  
      .assign(newcol=0)
).compute()
timer_end=timeit.default_timer()

timer_minutes=(timer_end-timer_start)/60
timer_seconds=(timer_end-timer_start)

print(f'Time took to Finish reading file is {timer_seconds:.1f} seconds')
print(f'Time took to Finish reading file is {timer_minutes:.2f} minutes')

Note that dask's task graph-based approach does require a bit of overhead. If your production data fit in memory, I wouldn't expect much of a performance gain.