Apache Spark Aggregate functions running into ArrayIndexOutOfBoundsException during runtime

378 views Asked by At

During deployment of my java spark program to a cluster, It runs into an ArrayIndexOutOfBoundsException: 11 exception

From my understanding there is nothing syntactically wrong with how my code is written, and an indexing error doesn't clarify what's going wrong. My program is simply supposed to be able to take 12 columns, separated by spaces, From then it needs to take ONE column (the command column) and do an aggregation to see how many times each command exists i.e.

column1 column2 command column3 ect
dggd     gdegdg  cmd#1   533    ect
dggd     gdegdg  cmd#1   533    ect
dggd     gdegdg  cmd#2   534    ect
dggd     gdegdg  cmd#5   5353   ect
dggd     gdegdg  cmd#2   533    ect

will look something like

commmand    count
command#1    5
command#2    15
command#5    514

I'm running spark 2.1 HDP 2.6 here's the code I have so far

public class Main {
public static void main(String[] args) {
    //functions fu = new functions();

 JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("appName").setMaster("local[*]"));
 SparkSession spark = SparkSession
            .builder()
            .appName("Log File Reader")
            .getOrCreate();

    JavaRDD<String> logsRDD = spark.sparkContext()
            .textFile(args[0],1)
            .toJavaRDD();

    String schemaString = "date time command service responseCode bytes ip dash1 dash2 dash3 num dash4";

    List<StructField> fields = new ArrayList<>();
    String[] fieldName = schemaString.split(" ");

    for (String field : fieldName){
        fields.add(DataTypes.createStructField(field, DataTypes.StringType, true));
    }

    StructType schema = DataTypes.createStructType(fields);

    JavaRDD<Row> rowRDD = logsRDD.map((Function<String, Row>) record -> {
       String[] attributes = record.split(" ");
       return RowFactory.create(attributes[0],attributes[1],attributes[2],attributes[3],attributes[4],attributes[5],
               attributes[6],attributes[7],attributes[8],attributes[9],
               attributes[10],attributes[11]);
    });

    Dataset<Row> dataDF = spark.createDataFrame(rowRDD, schema);

    dataDF.createOrReplaceTempView("data");

    //shows the top 20 rows from the dataframe including all columns
    Dataset<Row> showDF = spark.sql("select * from data");

    //shows the top 20 columns from the same dataframe, but only displays 
    //the command column
    Dataset<Row> commandDF = spark.sql("select command from data");
    showDF.show();
    commandDF.show();

This code works fine, however when I try to find the final result using code like the following, it runs into the indexing error.

logsDF.groupBy(col("command")).count().show();
Dataset<Row> ans = spark.sql("select command, count(*) from logs group by command").show();

And finally the spark-submit code

spark-submit --class com.ect.java.Main /path/application.jar hdfs:///path/textfile.txt

In my mind I'm stuck that it's an environment issue but can't find any documentation relating to an issue related to this

1

There are 1 answers

1
abaghel On BEST ANSWER

Problem is not with aggregate function. Problem is with your log file. You are getting error

Caused by: java.lang.ArrayIndexOutOfBoundsException: 11 

Which means one of the line in your log file has 11 entries and not 12, as required by your program. You can verify this by creating a sample log.txt file and keeping two rows in that file.

Your group by code should be like below (looks like typo). In your sample application you have dataDF not the logsDF. Temp table name is data not logs.

dataDF.groupBy(col("command")).count().show();
Dataset<Row> ans = spark.sql("select command, count(*) from data group by command");
ans.show();