spark

spark的安装

  • 系统:Ubuntu

下载地址

tar zxf spark-2.2.1-bin-hadoop2.7.tgz
mkdir spark-2.2.1
mv spark-2.2.1-bin-hadoop2.7 spark-2.2.1
cd spark-2.2.1
ln -s spark-2.2.1-bin-hadoop2.7 spark
  • 在bash里面添加下面的命令
export SPARK_HOME=/home/yueyao/spark-2.2.1/spark
export PATH=$SPARK_HOME/bin:$PATH
  • 由于系统自带的java版本比较低,因此下载java的jdk重新进行安装
tar zxf jdk-8u131-linux-x64.tar.gz
export JAVA_HOME=/home/yueyao/java-1.8/jdk1.8.0_131/
export PATH=$JAVA_HOME/bin:$PATH
  • 写一个简单的Spark程序
#首先初始化SparkContext,导入Spark包并且创建SparkContext
from pyspark import SparkConf, SparkContext
#创建一个SparkConf 对象,设置应用的名称
conf = SparkConf().setMaster("local").setAppName("My App")
#基于SparkConf 对象创建一个SparkContext对象
sc = SparkContext(conf = conf)

  • java版本的单词数统计应用
// 创建一个Java版本的Spark Context
SparkConf conf = new SparkConf().setAppName("wordCount");
JavaSparkContext sc = new JavaSparkContext(conf)
// 读取我们输入的数据
JavaRDD<String> input = sc.textFile(inputFile);
//切分为单词
JavaRDD<String> words = input.flatMap(
    new FlatMapFunction<String,String>(){
        public Iterable<String> call(String x){
            return Arrays.asList(x.split(" "));
        }
    }
);
// 抓换位键值对并计数
JavaPairRDD<String,Integer> counts = word.mapToPair(
    new PairFunction<String, String, Integer>(){
        public Tuple2<String, Integer> call(String x){
            return new Tuple2(x,1);
        }
    }
).reduceByKey( new Function2<Integer,Integer,Integer>(){
    public Integer call(Integer x, Integer y){
        return x + y;
    }
    }
);
// 将统计出来的单词总数存入一个文本文件,引发求值
counts.saveAsTextFile(outputFile);
  • 创建一个Scala版本的单词数统计应用
//创建一个Scala版本的Spark Context
val conf = new SparkConf().setAppName("wordCount")
val sc = new SparkContext(conf)

//读取我们的输入数据
val input = sc.textFile(inputFile)
//把它切分成一个个单词
val words = input.flatMap(line => line.split(" "))
// 转换键值对并计数
val counts = words.map(word => (word, 1)).reduceByKey{case (x,y) => x+y}
//将统计出来的单词总数存入一个文本文件,引发求值
counts.saveAsTextFile(outputFile)
  • 创建一个python版本的wordcount,有个疑问,自己建立的hadoop小集群不能读取本地文件,只能上传到hdfs再读写文件
#导入所需要的模块
from pyspark import SparkConf, SparkContext
import os
#初始化一个sparkconf对象和一个sparkcontext对象
conf = SparkConf().setMaster("local").setAppName("Yue Yao app")
sc = SparkContext(conf = conf)
#调用hadoop命令在hdfs上建立文件夹,同时上传文件到hdfs,这一步可以忽略
os.system('hadoop fs -mkdir /user/yueyao/Spark/input/')
os.system('hadoop fs -put Test.md /user/yueyao/Spark/input')
os.system('hadoop fs -mkdir /user/yueyao/Spark/output/')
#设置输入的文件路径和输出路径
input_file = "/user/yueyao/Spark/input/Test.md"
output_file = "/user/yueyao/Spark/output/out2"

#这里是读入文件
lines = sc.textFile(input_file)
#对数据进行分割
split_file = lines.flatMap(lambda line:line.split(" "))
#对单词进行计数
word_count = split_file.map(lambda word:(word,1))
#通过reduce合并计数
total_count = word_count.reduceByKey(lambda a,b:a+b)
#输出文件,这里是写到了hdfs上面
total_count.saveAsTextFile(output_file)

#上面的步骤可以简写成下面的部分
#counts = lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b)
#couts.saveAsTextFile(output_file)
原文地址:https://www.cnblogs.com/raisok/p/10917680.html