Eclipse提交代码到Spark集群上运行

Spark集群master节点:      192.168.168.200

Eclipse运行windows主机: 192.168.168.100

场景:

        为了测试在Eclipse上开发的代码在Spark集群上运行的情况,比如:内存、cores、stdout以及相应的变量传递是否正常!

生产环境是把在Eclipse上开发的代码打包放到Spark集群上,然后使用spark-submit提交运行。当然我们也可以启动远程调试,

但是这样就会造成每次测试代码,我们都需要把jar包复制到Spark集群机器上,十分的不方便。因此,我们希望能够在Eclipse直接

模拟spark-submit提交程序运行,便于调试!

一、准备words.txt文件

words.txt :

 

  1. HelloHadoop
  2. HelloBigData
  3. HelloSpark
  4. HelloFlume
  5. HelloKafka

 

上传到HDFS文件系统中,如图:

二、创建Spark测试类

  1. package com.spark.test;
  2.  
  3. import java.util.Arrays;
  4. import java.util.Iterator;
  5.  
  6. import org.apache.spark.SparkConf;
  7. import org.apache.spark.api.java.JavaPairRDD;
  8. import org.apache.spark.api.java.JavaRDD;
  9. import org.apache.spark.api.java.JavaSparkContext;
  10. import org.apache.spark.api.java.function.FlatMapFunction;
  11. import org.apache.spark.api.java.function.Function2;
  12. import org.apache.spark.api.java.function.PairFunction;
  13. import org.apache.spark.api.java.function.VoidFunction;
  14.  
  15. import scala.Tuple2;
  16.  
  17. publicclassJavaWordCount{
  18. publicstaticvoid main(String[] args){
  19. SparkConf sparkConf =newSparkConf().setAppName("JavaWordCount").setMaster("local[2]");
  20. JavaSparkContext jsc =newJavaSparkContext(sparkConf);
  21. JavaRDD<String> lines = jsc.textFile("hdfs://192.168.168.200:9000/test/words.txt");
  22. JavaRDD<String> words = lines.flatMap(newFlatMapFunction<String,String>(){
  23. publicIterator<String> call(String line){
  24. returnArrays.asList(line.split(" ")).iterator();
  25. }
  26. });
  27. JavaPairRDD<String,Integer> pairs = words.mapToPair(newPairFunction<String,String,Integer>(){
  28. publicTuple2<String,Integer> call(String word)throwsException{
  29. returnnewTuple2<String,Integer>(word,1);
  30. }
  31. });
  32. JavaPairRDD<String,Integer> wordCount = pairs.reduceByKey(newFunction2<Integer,Integer,Integer>(){
  33. publicInteger call(Integer v1,Integer v2)throwsException{
  34. return v1 + v2;
  35.  
  36. }
  37. });
  38. wordCount.foreach(newVoidFunction<Tuple2<String,Integer>>(){
  39. publicvoid call(Tuple2<String,Integer> pairs)throwsException{
  40. System.out.println(pairs._1()+":"+ pairs._2());
  41. }
  42. });
  43. jsc.close();
  44. }
  45.  
  46. }

日志输出:


访问spark的web ui : http://192.168.168.200:8080

从中看出spark的master地址为: spark://master:7077

  1. SparkConf sparkConf =newSparkConf().setAppName("JavaWordCount").setMaster("local[2]");​

修改为:

  1. SparkConf sparkConf =newSparkConf().setAppName("JavaWordCount").setMaster("spark://192.168.168.200:7077");

运行,发现会有报org.apache.spark.SparkException的错:

  1. Exceptionin thread "main" org.apache.spark.SparkException:Job aborted due to stage failure:Task1in stage 0.0 failed 4 times, most recent failure:Lost task 1.3in stage 0.0(TID 6,192.168.168.200): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seqin instance of org.apache.spark.rdd.MapPartitionsRDD
  2. at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
  3. at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
  4. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
  5. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
  6. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
  7. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  8. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
  9. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
  10. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
  11. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  12. at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
  13. at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
  14. at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
  15. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:71)
  16. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
  17. at org.apache.spark.scheduler.Task.run(Task.scala:86)
  18. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  19. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  20. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  21. at java.lang.Thread.run(Thread.java:745)
  22.  
  23. Driver stacktrace:
  24. at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
  25. at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
  26. at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
  27. at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  28. at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  29. at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
  30. at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  31. at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  32. at scala.Option.foreach(Option.scala:257)
  33. at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
  34. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
  35. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
  36. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
  37. at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  38. at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
  39. at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
  40. at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
  41. at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
  42. at org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)
  43. at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:894)
  44. at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:892)
  45. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  46. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  47. at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  48. at org.apache.spark.rdd.RDD.foreach(RDD.scala:892)
  49. at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:350)
  50. at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
  51. at com.spark.test.JavaWordCount.main(JavaWordCount.java:39)
  52. Causedby: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seqin instance of org.apache.spark.rdd.MapPartitionsRDD
  53. at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
  54. at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
  55. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
  56. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
  57. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
  58. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  59. at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
  60. at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
  61. at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
  62. at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  63. at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
  64. at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
  65. at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
  66. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:71)
  67. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
  68. at org.apache.spark.scheduler.Task.run(Task.scala:86)
  69. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  70. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  71. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  72. at java.lang.Thread.run(Thread.java:745)

在网上找到的解决办法是配置jar包的路径即可,先用maven install把程序打包成jar包,然后setJars方法。

  1. SparkConf sparkConf =newSparkConf().setAppName("JavaWordCount").setMaster("spark://192.168.168.200:7077");
  2. String[] jars ={"I:\TestSpark\target\TestSpark-0.0.1-jar-with-dependencies.jar"};
  3. sparkConf.setJars(jars);

最终源码如下:

  1. package com.spark.test;
  2.  
  3. import java.util.Arrays;
  4. import java.util.Iterator;
  5.  
  6. import org.apache.spark.SparkConf;
  7. import org.apache.spark.api.java.JavaPairRDD;
  8. import org.apache.spark.api.java.JavaRDD;
  9. import org.apache.spark.api.java.JavaSparkContext;
  10. import org.apache.spark.api.java.function.FlatMapFunction;
  11. import org.apache.spark.api.java.function.Function2;
  12. import org.apache.spark.api.java.function.PairFunction;
  13. import org.apache.spark.api.java.function.VoidFunction;
  14.  
  15. import scala.Tuple2;
  16.  
  17. publicclassJavaWordCount{
  18. publicstaticvoid main(String[] args){
  19. SparkConf sparkConf =newSparkConf().setAppName("JavaWordCount").setMaster("spark://192.168.168.200:7077");
  20. String[] jars ={"I:\TestSpark\target\TestSpark-0.0.1-jar-with-dependencies.jar"};
  21. sparkConf.setJars(jars);
  22. JavaSparkContext jsc =newJavaSparkContext(sparkConf);
  23. JavaRDD<String> lines = jsc.textFile("hdfs://192.168.168.200:9000/test/words.txt");
  24. JavaRDD<String> words = lines.flatMap(newFlatMapFunction<String,String>(){
  25. publicIterator<String> call(String line){
  26. returnArrays.asList(line.split(" ")).iterator();
  27. }
  28. });
  29. JavaPairRDD<String,Integer> pairs = words.mapToPair(newPairFunction<String,String,Integer>(){
  30. publicTuple2<String,Integer> call(String word)throwsException{
  31. returnnewTuple2<String,Integer>(word,1);
  32. }
  33. });
  34. JavaPairRDD<String,Integer> wordCount = pairs.reduceByKey(newFunction2<Integer,Integer,Integer>(){
  35. publicInteger call(Integer v1,Integer v2)throwsException{
  36. return v1 + v2;
  37.  
  38. }
  39. });
  40. wordCount.foreach(newVoidFunction<Tuple2<String,Integer>>(){
  41. publicvoid call(Tuple2<String,Integer> pairs)throwsException{
  42. System.out.println(pairs._1()+":"+ pairs._2());
  43. }
  44. });
  45. jsc.close();
  46. }
  47.  
  48. }

运行正常,没有出现报错!

查看stdout是否统计正确:

至此,你可以很方便的在Eclipse上开发调试你的代码啦!

原文地址:https://www.cnblogs.com/yangcx666/p/8723807.html