Java 8 compatible no error given spark version needed

147 views Asked by At

I am dealing with searching suitable versions for java and spark. I am new for spark. I changed a lot version and got error for all. So I am here. I just want to deploy spark in my local in windows. When I run master and workers I want my code to run in master and among workers.

I was using Java 17 and finally dropped to java 8. Now spark version is problematic. As for now, I was downloaded spark 2.4.8 with scala 2.12 and Java 8. Now the error is about serialVersionUID etc.

23/09/30 14:34:16 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.                  java.io.InvalidClassException: org.apache.spark.rpc.netty.NettyRpcEndpointRef; local class incompatible: stream classdesc serialVersionUID = -4186747031772874359, local class serialVersionUID = 6257082371135760434

And also my best trying was with Java 8 (always) and spark 2.4.6. In this version I could see in localhost:8080 the application is completed but my application was dying becasue there were SerializedLamda exception. Idk why this exception occurs. I wrote another code which doesnt use lambdas then works. But the given code throws exception.

Please give me best version for spark deployment in local with real time streaming.

Here the code;

public class Demo implements Serializable {
    private static final long serialVersionUID = -4186747031772874359L;
    public static void main(String[] args) {
        JavaSparkContext sparkContext = new JavaSparkContext("spark://192.168.1.158:7077", "TheAnalysis");
        JavaRDD<String> rawDataRDD = sparkContext.textFile("src/main/resources/WorldCup/WorldCupPlayers.csv");
        //all players
        JavaRDD<Player> playersRDD = rawDataRDD.map(line -> new Player(line.split(",", -1)));

//        playersRDD.foreach(players -> System.out.println(players.getPlayerName())); // print user name

        //MESSI DUNYA KUPASINDA KAC MAC YAPTI?
        JavaRDD<Player> messiRDD = playersRDD.filter(player -> player.getPlayerName().equalsIgnoreCase("MESSI"));
        System.out.println("Messi dünya kupasında " + messiRDD.count() + " maç yaptı");

        //Tüm oyuncular dünya kupasında kaç maç yaptı?
        JavaPairRDD<String, String> mapPairRDD = playersRDD.mapToPair(player -> new Tuple2<>(player.getPlayerName(), player.getMatchID()));
//        mapPairRDD.foreach((player) -> System.out.println(player._1 + " -- " + player._2));

        JavaPairRDD<String, Iterable<String>> groupPlayer = mapPairRDD.groupByKey();
        groupPlayer.foreach(player -> System.out.println(player._1 + "--" + player._2));

        JavaRDD<ModifiedPlayer> modifiedPlayerRDD = groupPlayer.map(player -> new ModifiedPlayer(player._1, Iterators.size(player._2.iterator())));
        modifiedPlayerRDD.foreach(modifiedPlayer -> System.out.println(modifiedPlayer.getPlayerName() + " : " + modifiedPlayer.getMatchSize()));
        long count = modifiedPlayerRDD.count();
        System.out.println(count);
    }
}

A lot of spark versions tried but all failed.

1

There are 1 answers

3
Chris On

Whilst I'd question why you use RDD's instead of DataFrame/Set, these kinds of serialisation issues are typically due to something in the function not being serializable - even when they look like they should be (Scala wise Seq could easily be a Stream which is not serializable despite the interface seemingly being so).

In this case perhaps Player has the issue and is not Serializable. Instead of trying messiRDD.count, try playersRDD.count and see if it also fails with the same error. If so then Player itself is not serializeable (or contains other non-serializable fields).

If not then you'll have to swap lambdas for class instances like here.