Dask: outer join read from multiple csv files

698 views Asked by At
import dask.dataframe as dd
import numpy as np
from dask import delayed

df1 = pd.DataFrame({'a': np.arange(10), 'b': np.random.rand()})
df1 = df1.astype({'a':np.float64})
df2 = pd.DataFrame({'a': np.random.rand(5), 'c': 1})
df1.to_csv('df1.csv')
df2.to_csv('df2.csv')
dd.read_csv('*.csv').compute()

Gives inner join result:

   Unnamed: 0         a         b
0           0  0.000000  0.218319
1           1  1.000000  0.218319
2           2  2.000000  0.218319
...

And:

df1_delayed = delayed(lambda: df1)()
df2_delayed = delayed(lambda: df2)()
dd.from_delayed([df1_delayed, df2_delayed]).compute()

Gives outer join result:

          a         b    c
0  0.000000  0.218319  NaN
1  1.000000  0.218319  NaN
2  2.000000  0.218319  NaN
...

How to make read_csv work in the same mode?

EDIT:

Even passing dtype schema down to pandas doesn't work:

dd.read_csv('*.csv', dtype={'a':np.float64, 'b': np.float64, 'c': np.float64}).compute()
1

There are 1 answers

0
MRocklin On BEST ANSWER

Generally dask.dataframe assumes that all Pandas dataframes that form the dask.dataframe have the same columns and dtype. Behavior is ill-defined if this is not the case.

If your CSVs have different columns and dtypes then I recommend using dask.delayed as you've done in your second example and explicitly add the new empty columns before calling dask.dataframe.from_delayed.