I am trying to make a spark2 application which run about 40-50 queries parallel.
Each query takes about 30s to 1m. And to execute query faster, I am trying to use a thread.
When I run the application, most of the queries are executed perfectly, but two to three queries are stuck in a Reducer process for more than one hour and it failed with a Shuffle error.
I used the following query. Do you have any clue with this issue?
from pyspark.sql import SparkSession
from pyspark.sql.session import HiveWarehouseSession
from concurrent.futures import ThreadPoolExecutor, TimeoutError
def execute_query(hive, query):
try:
result = hive.executeUpdate(query)
# Process the result if needed (e.g., result.show())
except Exception as e:
print(f"Error executing query: {e}")
def execute_queries_parallel(query_list):
with SparkSession.builder.appName("example").enableHiveSupport().getOrCreate() as spark:
with HiveWarehouseSession.session(spark).build() as hive:
with ThreadPoolExecutor() as executor:
# Submit the tasks for each query
futures = {executor.submit(execute_query, hive, query): query for query in query_list}
# Wait for the queries to complete or be canceled
for future in concurrent.futures.as_completed(futures):
query = futures[future]
try:
# Set a timeout of 30 minutes for each query
result = future.result(timeout=30 * 60)
except TimeoutError:
# Cancel the query if it runs for more than 30 minutes
print(f"Query '{query}' exceeded the time limit and was canceled.")
future.cancel()
except Exception as e:
print(f"Error executing query '{query}': {e}")
# Example list of 30 queries
query_list = [
"INSERT INTO table1 SELECT * FROM table2",
"INSERT INTO table1 SELECT * FROM table3",
# ... 28 more queries ...
]
# Execute queries in parallel
execute_queries_parallel(query_list)