Spark常用算子-value数据类型的算子

package com.test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;

/**
 * Value数据类型的Transformation算子
 * @author FengZhen
 *
 */
public class SparkValue {
	
	public static void main(String[] args){
		//SparkConf conf = new SparkConf().setAppName(SparkValue.class.getName()).setMaster("local[2]");
	    SparkConf conf = new SparkConf().setAppName(SparkValueTest.class.getName());
		JavaSparkContext sc = new JavaSparkContext(conf);
		// 数据
		JavaRDD<String> ds = sc.textFile("hdfs://bjqt/data/labeldata/datalabel.csv");
		/**
		 * 一、输入分区与输出分区一对一型
		 * 1、map算子
		 * 2、flatMap算子
		 * 3、mapPartitions算子
		 * 4、glom算子
		 */
		
		/**
		 * 1.map算子
		 * 将原来 RDD 的每个数据项通过 map 中的用户自定义函数 f 映射转变为一个新的元素。
		 * 源码中 map 算子相当于初始化一个 RDD, 新 RDD 叫做 MappedRDD(this, sc.clean(f))。
		 */
		JavaRDD<String> nameRDD = ds.map(new Function<String, String>() {
			private static final long serialVersionUID = 1L;

			@Override
			public String call(String v1) throws Exception {
				String[] values = v1.split(",");
				return values[0];
			}
		});
		List<String> nameList = nameRDD.collect();
		System.out.println(nameList);
		
		/**
		 * 2.flatMap算子
		 * 将原来 RDD 中的每个元素通过函数 f 转换为新的元素,并将生成的 RDD 的每个集合中的元素合并为一个集合,
		 * 内部创建 FlatMappedRDD(this,sc.clean(f))。
		 */
		JavaRDD<String> flatMapRDD = ds.flatMap(new FlatMapFunction<String, String>() {
			private static final long serialVersionUID = 1L;

			@Override
			public Iterator<String> call(String t) throws Exception {
				t = t.replace("‘", "");
				String[] values = t.split(",");
				List<String> result = Arrays.asList(values);
				return result.iterator();
			}
		});
		List<String> flatMapList = flatMapRDD.collect();
		System.out.println(flatMapList);
		
		/**
		 * 3.mapPartitions算子
		 * mapPartitions 函 数 获 取 到 每 个 分 区 的 迭 代器,在 函 数 中 通 过 这 个 分 区 整 体 的 迭 代 器 对整 个 分 区 的 元 素 进 行 操 作。 
		 * 内 部 实 现 是 生 成MapPartitionsRDD
		 * 做过滤
		 */
		JavaRDD<String> mapPartitionsRDD = nameRDD.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
			private static final long serialVersionUID = 1L;

			@Override
			public Iterator<String> call(Iterator<String> t) throws Exception {
				List<String> nameList = new ArrayList<String>();
				while (t.hasNext()) {
					String name = (String) t.next();
					if (name.startsWith("丁")) {
						nameList.add(name);
					}
				}
				return nameList.iterator();
			}
		});
		List<String> mapPartitionsList = mapPartitionsRDD.collect();
		System.out.println(mapPartitionsList);
		
		/**
		 * 4.glom算子
		 * glom函数将每个分区形成一个数组,内部实现是返回的GlommedRDD。
		 */
		JavaRDD<List<String>> glomRDD = nameRDD.glom();
		List<List<String>> glomList = glomRDD.collect();
		System.out.println(glomList);
		
		/**
		 * 二、输入分区与输出分区多对一型 
		 * 5.union算子
		 * 6.cartesian算子
		 */
		/**
		 * 5.union算子
		 *  使用 union 函数时需要保证两个 RDD 元素的数据类型相同,返回的 RDD 数据类型和被合并的 RDD 元素数据类型相同,
		 *  并不进行去重操作,保存所有元素。如果想去重可以使用 distinct()。
		 *  同时 Spark 还提供更为简洁的使用 union 的 API,通过 ++ 符号相当于 union 函数操作。
		 */
		JavaRDD<String> mapPartitionsRDD1 = nameRDD.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
			private static final long serialVersionUID = 1L;

			@Override
			public Iterator<String> call(Iterator<String> t) throws Exception {
				List<String> nameList = new ArrayList<String>();
				while (t.hasNext()) {
					String name = (String) t.next();
					if (name.startsWith("王")) {
						nameList.add(name);
					}
				}
				return nameList.iterator();
			}
		});
		JavaRDD<String> unionRDD = mapPartitionsRDD.union(mapPartitionsRDD1);
		List<String> unionList = unionRDD.collect();
		System.out.println(unionList);
		
		/**
		 * 6.cartesian算子
		 * 对两个RDD内 的 所 有 元 素 进 行 笛 卡 尔 积 操作。 操 作 后, 内 部 实 现 返 回CartesianRDD。
		 */
		 JavaPairRDD<String, String> cartesianRDD = mapPartitionsRDD.cartesian(mapPartitionsRDD1);
		 Map<String, String> cartesianMap = cartesianRDD.collectAsMap();
		 System.out.println(cartesianMap);
		 
		 
		 /**
		  * 三、输入分区与输出分区多对多型
		  * 7、grouBy算子
		  */
		 /**
		  * 7.grouBy算子
		  * groupBy :将元素通过函数生成相应的 Key,数据就转化为 Key-Value 格式,之后将 Key 相同的元素分为一组。
  			函数实现如下:
  				1)将用户函数预处理:
  				val cleanF = sc.clean(f)
  				2)对数据 map 进行函数操作,最后再进行 groupByKey 分组操作。
     			this.map(t => (cleanF(t), t)).groupByKey(p)
  				其中, p 确定了分区个数和分区函数,也就决定了并行化的程度。
		  */
		 JavaPairRDD<String, Iterable<String>> groupRDD = nameRDD.groupBy(new Function<String, String>() {
			private static final long serialVersionUID = 1L;

			@Override
			public String call(String v1) throws Exception {
				return v1.substring(0, 1);
			}
		 });
		 Map<String, Iterable<String>> groupMap = groupRDD.collectAsMap();
		 System.out.println(groupMap);
		 
		 /**
		  * 四、输出分区为输入分区子集型
		  * 8、filter算子
		  * 9、distinct算子
		  * 10、subtract算子
		  * 11、sample算子
		  * 12、takeSample算子
		  */
		 
		 /**
		  * 8.filter算子
		  * filter 函数功能是对元素进行过滤,对每个 元 素 应 用 f 函 数, 返 回 值 为 true 的 元 素 在RDD 中保留,返回值为 false 的元素将被过滤掉。 
		  * 内 部 实 现 相 当 于 生 成 FilteredRDD(this,sc.clean(f))。
		  * 下面代码为函数的本质实现:
		  * deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))
		  */
		 JavaRDD<String> filterRDD = nameRDD.filter(new Function<String, Boolean>() {
			private static final long serialVersionUID = 1L;

			@Override
			public Boolean call(String v1) throws Exception {
				if (v1.contains("二")) {
					return true;
				}
				return false;
			}
		 });
		 List<String> filterMap = filterRDD.collect();
		 System.out.println(filterMap);
		 
		 /**
		  * 9.distinct算子
		  * distinct将RDD中的元素进行去重操作
		  */
		 JavaRDD<String> repeatRDD = filterRDD.union(filterRDD);
		 List<String> repeatMap = repeatRDD.collect();
		 System.out.println(repeatMap);
		 JavaRDD<String> distinctRDD = repeatRDD.distinct();
		 List<String> distinctMap = distinctRDD.collect();
		 System.out.println(distinctMap);
		 
		 /**
		  * 10.subtract算子
		  * subtract相当于进行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素
		  */
		 JavaRDD<String> subRDD1 = nameRDD.filter(new Function<String, Boolean>() {
			private static final long serialVersionUID = 1L;

			@Override
			public Boolean call(String v1) throws Exception {
				if (v1.contains("丁") || v1.contains("齐")) {
					return true;
				}
				return false;
			}
		 });
		 JavaRDD<String> subRDD2 = nameRDD.filter(new Function<String, Boolean>() {
			private static final long serialVersionUID = 1L;

				@Override
				public Boolean call(String v1) throws Exception {
					if (v1.contains("丁")) {
						return true;
					}
					return false;
				}
			 });
		 JavaRDD<String> subtractRDD = subRDD1.subtract(subRDD2);
		 List<String> subtractList = subtractRDD.collect();
		 System.out.println(subtractList);
		 
		 /**
		  * 11.sample算子
		  *  sample 将 RDD 这个集合内的元素进行采样,获取所有元素的子集。
		  *  用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方式。
		  *  内部实现是生成 SampledRDD(withReplacement, fraction, seed)。
		  *  函数参数设置:
		  *  ‰   withReplacement=true,表示有放回的抽样。
		  *  ‰   withReplacement=false,表示无放回的抽样。
		  */
		 JavaRDD<String> sampleRDD = nameRDD.sample(false, 0.01, 5);
		 List<String> sampleList = sampleRDD.collect();
		 System.out.println(sampleList);
		 
		 /**
		  * 12.takeSample算子
		  * takeSample()函数和上面的sample函数是一个原理,但是不使用相对比例采样,
		  * 而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行Collect(),
		  * 返回结果的集合为单机的数组。
		  */
		 List<String> takeSampleList = nameRDD.takeSample(false, 1, 5);
		 System.out.println(takeSampleList);
		
		 sc.close();
	}
}

  

原文地址:https://www.cnblogs.com/EnzoDin/p/9254566.html