sparkR的一个运行的例子

   在sparkR在配置完成的基础上,本例采用Spark on yarn模式,介绍sparkR运行的一个例子。

     在spark的安装目录下,/examples/src/main/r,有一个dataframe.R文件。该文件默认是在本地的模式下运行的,不与hdfs交互。可以将脚本进行相应修改,提交到yarn模式下。

      在提交之前,要先将${SPARK_HOME}/examples/src/main/resources/people.json 文件上传到hdfs上,我上传到了hdfs://data-mining-cluster/data/bigdata_mining/sparkr/people.json 目录下。

[plain] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. dataframe.R文件内容:  
[plain] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. # Convert local data frame to a SparkR DataFrame  
  2. df <- createDataFrame(sqlContext, localDF)  
  3.   
  4. # Print its schema  
  5. printSchema(df)  
  6. # root  
  7. #  |-- name: string (nullable = true)  
  8. #  |-- age: double (nullable = true)  
  9.   
  10. # Create a DataFrame from a JSON file  
  11. #path <- file.path("hdfs://data-mining-cluster/data/bigdata_mining/sparkr/people.json")  
  12. path <- file.path("hdfs://data-mining-cluster/data/bigdata_mining/sparkr/people.json")  
  13. peopleDF <- read.json(sqlContext, path)  
  14. printSchema(peopleDF)  
  15.   
  16. # Register this DataFrame as a table.  
  17. registerTempTable(peopleDF, "people")  
  18.   
  19. # SQL statements can be run by using the sql methods provided by sqlContext  
  20. teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19")  
  21.   
  22. # Call collect to get a local data.frame  
  23. teenagersLocalDF <- collect(teenagers)  
  24.   
  25. # Print the teenagers in our dataset   
  26. print(teenagersLocalDF)  
  27.   
  28. # Stop the SparkContext now  
  29. sparkR.stop()  

sparkR  --master yarn-client dataframe.R  这样就可以将任务提交到yarn上了。

     另外,有些集群会报如下错误:

      

[html] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. 16/06/16 11:40:35 ERROR RBackendHandler: json on 15 failed  
  2. Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :   
  3.   java.lang.RuntimeException: Error in configuring object  
  4.     at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)  
  5.     at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)  
  6.     at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)  
  7.     at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:185)  
  8.     at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:198)  
  9.     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)  
  10.     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)  
  11.     at scala.Option.getOrElse(Option.scala:120)  
  12.     at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)  
  13.     at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)  
  14.     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)  
  15.     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)  
  16.     at scala.Option.getOrElse(Option.scala:120)  
  17.     at org.apache.spark.rdd.RDD.partitions(  
  18. Calls: read.json -> callJMethod -> invokeJava  

       这种情况一般是由于启用了lzo压缩导致的。可以通过--jars  添加lzo的jar包,就可以了。例如:sparkR  --master yarn-client dataframe.R --jars /usr/local/Hadoop/share/hadoop/common/hadoop-lzo-0.4.20-SNAPSHOT.jar

       

原文地址:https://www.cnblogs.com/awishfullyway/p/6632690.html