During deployment of my java spark program to a cluster, It runs into an ArrayIndexOutOfBoundsException: 11
exception
From my understanding there is nothing syntactically wrong with how my code is written, and an indexing error doesn't clarify what's going wrong. My program is simply supposed to be able to take 12 columns, separated by spaces, From then it needs to take ONE column (the command column) and do an aggregation to see how many times each command exists i.e.
column1 column2 command column3 ect
dggd gdegdg cmd#1 533 ect
dggd gdegdg cmd#1 533 ect
dggd gdegdg cmd#2 534 ect
dggd gdegdg cmd#5 5353 ect
dggd gdegdg cmd#2 533 ect
will look something like
commmand count
command#1 5
command#2 15
command#5 514
I'm running spark 2.1 HDP 2.6 here's the code I have so far
public class Main {
public static void main(String[] args) {
//functions fu = new functions();
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("appName").setMaster("local[*]"));
SparkSession spark = SparkSession
.builder()
.appName("Log File Reader")
.getOrCreate();
JavaRDD<String> logsRDD = spark.sparkContext()
.textFile(args[0],1)
.toJavaRDD();
String schemaString = "date time command service responseCode bytes ip dash1 dash2 dash3 num dash4";
List<StructField> fields = new ArrayList<>();
String[] fieldName = schemaString.split(" ");
for (String field : fieldName){
fields.add(DataTypes.createStructField(field, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);
JavaRDD<Row> rowRDD = logsRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(" ");
return RowFactory.create(attributes[0],attributes[1],attributes[2],attributes[3],attributes[4],attributes[5],
attributes[6],attributes[7],attributes[8],attributes[9],
attributes[10],attributes[11]);
});
Dataset<Row> dataDF = spark.createDataFrame(rowRDD, schema);
dataDF.createOrReplaceTempView("data");
//shows the top 20 rows from the dataframe including all columns
Dataset<Row> showDF = spark.sql("select * from data");
//shows the top 20 columns from the same dataframe, but only displays
//the command column
Dataset<Row> commandDF = spark.sql("select command from data");
showDF.show();
commandDF.show();
This code works fine, however when I try to find the final result using code like the following, it runs into the indexing error.
logsDF.groupBy(col("command")).count().show();
Dataset<Row> ans = spark.sql("select command, count(*) from logs group by command").show();
And finally the spark-submit code
spark-submit --class com.ect.java.Main /path/application.jar hdfs:///path/textfile.txt
In my mind I'm stuck that it's an environment issue but can't find any documentation relating to an issue related to this
Problem is not with aggregate function. Problem is with your log file. You are getting error
Which means one of the line in your log file has 11 entries and not 12, as required by your program. You can verify this by creating a sample log.txt file and keeping two rows in that file.
Your group by code should be like below (looks like typo). In your sample application you have
dataDF
not thelogsDF
. Temp table name isdata
notlogs
.