I'm using Hadoop to work on a big data project.
I can use spark to send some SQL command to Hive.
Since this process is slow, I try to write my data into Redis which is an open-source database and use spark to query my data from this database to speed up this process.
I have deployed redis server in my virtual machine, and I can use spark session to read, write and run sql command on redis by using spark-redis module.
https://github.com/RedisLabs/spark-redis
Here's my testing script. I use spark session to get table from hive and write into redis.
from pyspark.sql import SparkSession
import time
import pandas as pd
spark = SparkSession.builder \
.appName("read_and_write") \
.config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
.enableHiveSupport() \
.getOrCreate()
# read table from hive
sparkDF = spark.sql("SELECT * FROM hive_table")
sparkDF.show()
# write table into redis
sparkDF.write.format("org.apache.spark.sql.redis") \
.option("table", "redis_table") \
.mode("overwrite") \
.save()
After writing process finish, I write two script to compare speed between redis and hive.
This script is to test hive:
from pyspark.sql import SparkSession
import time, json
spark = SparkSession.builder \
.appName("hive_spark_test") \
.config("hive.metastore.uris", "thrift://localhost:9083") \
.config("spark.debug.maxToStringFields", "500") \
.config("spark.sql.execution.arrow.enabled", True) \
.config("spark.sql.shuffle.partitions", 20) \
.config("spark.default.parallelism", 20) \
.config("spark.storage.memoryFraction", 0.5) \
.config("spark.shuffle.memoryFraction", 0.3) \
.config("spark.shuffle.consolidateFiles", False) \
.config("spark.shuffle.sort.bypassMergeThreshold", 200) \
.config("spark.shuffle.file.buffer", "32K") \
.config("spark.reducer.maxSizeInFlight", "48M") \
.enableHiveSupport() \
.getOrCreate()
for i in range(20):
# you can use your own sql command
sql_command = "SELECT testColumn1, SUM(testColumn2) AS testColumn2 FROM hive_table WHERE (date BETWEEN '2022-01-01' AND '2022-03-10') GROUP BY GROUPING SETS ((testColumn1))"
readDF = spark.sql(sql_command)
df_json = readDF.toJSON()
df_collect = df_json.collect()
res = [json.loads(i) for i in df_collect]
print(res)
Here's the result. Duration is 0.2s to 0.5s after few round.
enter image description here
This script is to test redis:
from pyspark.sql import SparkSession
import time, json
spark = SparkSession.builder \
.appName("redis_spark_test") \
.config("spark.redis.host", "localhost") \
.config("spark.redis.port", "6379") \
.config("spark.redis.max.pipeline.size", 200) \
.config("spark.redis.scan.count", 200) \
.config("spark.debug.maxToStringFields", "500") \
.config("spark.sql.execution.arrow.enabled", True) \
.config("spark.sql.shuffle.partitions", 20) \
.config("spark.default.parallelism", 20) \
.config("spark.storage.memoryFraction", 0.5) \
.config("spark.shuffle.memoryFraction", 0.3) \
.config("spark.shuffle.consolidateFiles", False) \
.config("spark.shuffle.sort.bypassMergeThreshold", 200) \
.config("spark.shuffle.file.buffer", "32K") \
.config("spark.reducer.maxSizeInFlight", "48M") \
.getOrCreate()
sql_command = """CREATE OR REPLACE TEMPORARY VIEW redis_table (
testColumn1 STRING,
testColumn2 INT,
testColumn3 STRING,
testColumn4 STRING,
date DATE,)
USING org.apache.spark.sql.redis OPTIONS (table 'redis_table')
"""
spark.sql(sql_command)
for i in range(20):
# you can use your own sql command
sql_command = "SELECT testColumn1, SUM(testColumn2) AS testColumn2 FROM redis_table WHERE (date BETWEEN '2022-01-01' AND '2022-03-10') GROUP BY GROUPING SETS ((testColumn1))"
readDF = spark.sql(sql_command)
df_json = readDF.toJSON()
df_collect = df_json.collect()
res = [json.loads(i) for i in df_collect]
print(res)
Here's the result. Duration is 1s to 2s after few round.
enter image description here
This result is conflicted with my survey. Redis should be faster than Hive, but I get the opposite result.
I want to know the reason and try to make Redis can run faster than Hive through Spark if that's possible.
Thank you.