Issue with running hive queries with spark2 using thread

18 views Asked by At

I am trying to make a spark2 application which run about 40-50 queries parallel.

Each query takes about 30s to 1m. And to execute query faster, I am trying to use a thread.

When I run the application, most of the queries are executed perfectly, but two to three queries are stuck in a Reducer process for more than one hour and it failed with a Shuffle error.

I used the following query. Do you have any clue with this issue?

from pyspark.sql import SparkSession
from pyspark.sql.session import HiveWarehouseSession
from concurrent.futures import ThreadPoolExecutor, TimeoutError

def execute_query(hive, query):
    try:
        result = hive.executeUpdate(query)
        # Process the result if needed (e.g., result.show())
    except Exception as e:
        print(f"Error executing query: {e}")

def execute_queries_parallel(query_list):
    with SparkSession.builder.appName("example").enableHiveSupport().getOrCreate() as spark:
        with HiveWarehouseSession.session(spark).build() as hive:

            with ThreadPoolExecutor() as executor:
                # Submit the tasks for each query
                futures = {executor.submit(execute_query, hive, query): query for query in query_list}

                # Wait for the queries to complete or be canceled
                for future in concurrent.futures.as_completed(futures):
                    query = futures[future]
                    try:
                        # Set a timeout of 30 minutes for each query
                        result = future.result(timeout=30 * 60)
                    except TimeoutError:
                        # Cancel the query if it runs for more than 30 minutes
                        print(f"Query '{query}' exceeded the time limit and was canceled.")
                        future.cancel()
                    except Exception as e:
                        print(f"Error executing query '{query}': {e}")

# Example list of 30 queries
query_list = [
    "INSERT INTO table1 SELECT * FROM table2",
    "INSERT INTO table1 SELECT * FROM table3",
    # ... 28 more queries ...
]

# Execute queries in parallel
execute_queries_parallel(query_list)
0

There are 0 answers