Dask Partitions or Delayed in a NLP Stanza preocess

76 views Asked by At

I´m working over a NLP process with Stanza. Stanza takes a long time to run the NLP process and I understand that my problem is quite partitionable.

I use these libraries

pip install stanza
import stanza
stanza.download('es')
nlp = stanza.Pipeline(lang='es')
import pandas as pd
import dask.dataframe as dd
import dask
import datetime

I have the following function

data_text = pd.DataFrame({'text': ["hola mi nombre es juancito y dedico mucho tiempo de mi día a estudiar. Gracias a Dios me gustan mucho las matematicas y las ciencias naturales. Este es un mensaje de ejemplo.",
                                   "hola mi nombre es juancito y dedico mucho tiempo de mi día a estudiar. Gracias a Dios me gustan mucho las matematicas y las ciencias naturales. Este es un mensaje de ejemplo.",
                                   "hola mi nombre es juancito y dedico mucho tiempo de mi día a estudiar. Gracias a Dios me gustan mucho las matematicas y las ciencias naturales. Este es un mensaje de ejemplo.",
                                   "hola mi nombre es juancito y dedico mucho tiempo de mi día a estudiar. Gracias a Dios me gustan mucho las matematicas y las ciencias naturales. Este es un mensaje de ejemplo.",
                                   "hola mi nombre es juancito y dedico mucho tiempo de mi día a estudiar. Gracias a Dios me gustan mucho las matematicas y las ciencias naturales. Este es un mensaje de ejemplo.",
                                   "hola mi nombre es juancito y dedico mucho tiempo de mi día a estudiar. Gracias a Dios me gustan mucho las matematicas y las ciencias naturales. Este es un mensaje de ejemplo."]})

def concept_const_func(data_text_inp):
  beginning = datetime.datetime.now()

  # Data
  data_text_func = data_text_inp.reset_index(drop=True)

  # Consolidation
  df_tw_out = pd.DataFrame({'tw': ["drop"]})

  for i in range(0,len(data_text_func)):
    # Text
    tweet_test = data_text_func["text"][i]

    # NLP
    doc_review = nlp(tweet_test)

    # Principales Definiciones
    print(i)
    for sent in doc_review.sentences:
        for dep in sent.dependencies:
            if dep[1] == 'nsubj':
              df_tw_aux = pd.DataFrame({"tw" : [dep[0].text + " " + dep[2].text]})
              df_tw_out = pd.concat([df_tw_out, df_tw_aux])

  ending = datetime.datetime.now()
  print(ending-beginning)
  return df_tw_out

When i run the code with pandas or dask delayed i get the same result in terms of execution time.

# Just Pandas
df_pd = concept_const_func(data_text)

# Dask Delayes
df_dd = dask.delayed(concept_const_func)(data_text)
df_dd.compute()

I also tried to solve it with a map_partition() but couldn't get it to work correctly.Mainly because the most time-consuming part of the code is the NLP() and I can't figure out how to use the DASK partition for this process that needs to input a str.

Can anyone think of an alternative to solve the problem (reduce the execution times of that code by partitioning the NLP())?

Thank you!

2

There are 2 answers

0
Guillaume EB On

I understand you're trying to apply a computation row-wise of an input dataframe. In this case, in order to parallelize with Dask, you should do something like:

def nlp_apply(row):
    # Text
    tweet_test = row["text"]

    # NLP
    doc_review = nlp(tweet_test)

    # Principales Definiciones
    for sent in doc_review.sentences:
        for dep in sent.dependencies:
            if dep[1] == 'nsubj':
              df_tw_aux = pd.DataFrame({"tw" : [dep[0].text + " " + dep[2].text]})

    return df_tw_aux

res = ddf.apply(nlp_apply, axis=1).compute()

There is probably some things to modify about the return result of the nlp_apply function.

In order to parallelize correctly, you'll also need to chunk the dask dataframe appropriately (with several partitions, https://docs.dask.org/en/stable/dataframe-create.html), and you'll may need to use another Dask scheduler than the default one (https://docs.dask.org/en/stable/scheduling.html).

0
John R On

Your issue stems from the fact that you are creating a one-partition dask dataframe, which is essentially a pandas dataframe.

You should read your data into a dask dataframe and use the npartitions keyword argument to create an appropriate number of partitions on which dask can parallelize your function.

Delayed not needed here. Map partitions should work fine.