I am importing a data frame using Spark of 12700 rows and one column, which is quite small but I am using it as a test to then take over 100k data. When I try to export it (csv or parquet) or transform it to pandas directly after importing it is quick and easy. When this Dataframe I enter it into the pipeline using Spark NLP to tokenize, use BertForSequenceClassification emotion model and finally finalizer, this pipeline result I try to export or transform it and it is not as easy as initially, it takes about an hour. I don't understand why this happens, if now as before the pipeline delivers a Dataframe Spark. I would appreciate if anyone has had this problem and took some strategy to solve it. I am running in Python locally. These are the characteristics of my PC in case it is helpful. DELL Latitude 7420 Windows 11 Enterprise, 21H2:
- 16GB RAM
- 4 physical cores and 8 logical cores
I understand that the Spark NLP process consumes resources, but once the Pipeline execution is finished, these resources should be released. Another thing is that every process I want to do with the result variable coming from the pipeline consumes 100% of the CPU. On the other hand, when I was running these processes with the newly imported data it was not like that, the CPU consumption was minimal.
Thank you very much.
Below I show the code I am using and explain the problem I am having:
spark = SparkSession.builder
.appName("Spark NLP")
.master("local[*]")
.config("spark.driver.memory","16G")
.config("spark.executor.memory","16G")
.config("spark.executor.cores", 3)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.driver.maxResultSize", "0")
.config("spark.kryoserializer.buffer.max", "2000M")
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.1")
.getOrCreate()
df = spark.read.csv(path, header=True, sep=';', multiLine=True,encoding='UTF-8',escape="", inferSchema=True)
df.show()
pand = df.toPandas() #I did it to test and it ran the transformation in 0.3 seconds.
document_assembler = DocumentAssembler()
.setInputCol('content')
.setOutputCol('document')
tokenizer = Tokenizer()
.setInputCols(['document'])
.setOutputCol('token')
sequenceClassifier = BertForSequenceClassification
.pretrained('bert_sequence_classifier_beto_emotion_analysis', 'es')
.setInputCols(['token', 'document'])
.setOutputCol('emotion')
.setCaseSensitive(True)
.setMaxSentenceLength(512)
finisher = Finisher()
.setInputCols('emotion')
.setOutputCols('emotion')
pipeline = Pipeline(stages=[
document_assembler,
tokenizer,
sequenceClassifier,
finisher])
result = pipeline.fit(df).transform(df)
result = result.withColumn("emotion", concat_ws(",", col("emotion"))) #I do this so that the columns are of String type
result = result.toPandas() #This process takes about an hour, which is not logical considering the small amount of data compared to what Spark supports.
result.write.parquet('Result_Emotion.parquet') # I tried exporting and it is the same, it takes a long time even using coalesce().
Versions:
- Spark NLP version 5.1.1
- Apache Spark version: 3.2.4
- java version "11.0.20" 2023-07-18 LTS Java(TM) SE Runtime Environment 18.9 (build 11.0.20+9-LTS-256) Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.20+9-LTS-256, mixed mode)