sparksql加载mongodb指定字段,并对加载进来的json做解析

如果是要读取mongo全表的数据的话,推荐使用mongo-spark,更简单方便

我个人的需求是要读取mongo的指定列,因为全表数据量太大,

并对加载进来的json数据进行解析,解析框架用的是alibaba封装的fastjson框架。

package spark_read;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;

public class Read_Mongo {

public static void main(String[] args) {
JavaSparkContext jsc = createJavaSparkContext(args);
String uri = getMongoClientURI(args);

SQLContext sqlContext = new SQLContext(jsc);

//只加载需要读取的字段,减少内存的消耗,并用sparksql的方式进行数据融合,减少资源消耗并提升开发效率以及代码简洁性
String list = "_record_id,_in_time,_src";
String[] strings = list.split(",");

//对想要获取的字段切割后,创建structFields,然后创建StructType(DF的schema)
List<StructField> structFields = new ArrayList<StructField>();
for (String s : strings) {
structFields.add(DataTypes.createStructField(s, DataTypes.StringType, true ));
}
StructType schema = DataTypes.createStructType( structFields );
//将options的配置信息存储到一个map里
Map<String, String> map = new HashMap<String, String>();
map.put("uri",uri);
map.put("database", " ");
map.put("collection"," ");
//利用上面自定义的DF的schema 和配置信息读取momngo指定表里的指定列,对读取的列声明成虚拟表,可以进一步操作,如有需要可以转换成RDD进行操作
Dataset<Row> load = sqlContext.read().schema(schema).format("com.mongodb.spark.sql").options(map).load();
load.registerTempTable("mongo");
Dataset<Row> result = sqlContext.sql("select * from mongo");

result.toJavaRDD().foreach(new VoidFunction<Row>() {
private static final long serialVersionUID = 1L;
public void call(Row arg0) throws Exception {
System.out.println(arg0.toString());
}
});


//一次性加载momgo表里的所有数据,利用alibaba封装的json转换工具转换成想要的格式
// JavaMongoRDD<Document> mongoRDD = MongoSpark.load(jsc);
// mongoRDD.foreach(new VoidFunction<Document>() {
//
// private static final long serialVersionUID = 1L;
//
// public void call(Document document) throws Exception {
//
// String data = document.toJson();
//
// JSONObject jsonObject = JSON.parseObject(data);
//
// JSONArray src = jsonObject.getJSONArray("_src");
//
// JSONObject src_obj = (JSONObject) src.get(0);
//
// System.out.println(src_obj.getString("site"));

// }
// });
}

/**
创建spark连接,并设置mongodb读写路径信息
*/
private static JavaSparkContext createJavaSparkContext(final String[] args) {
String uri = getMongoClientURI(args);
//dropDatabase(uri);
SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("MongoSparkConnectorTest")
.set("spark.app.id", "MongoSparkConnectorTour")
.set("spark.mongodb.input.uri", uri)
.set("spark.mongodb.output.uri", uri);

return new JavaSparkContext(conf);
}

/**
删除mongo已存在文件
*/
private static void dropDatabase(final String connectionString) {
MongoClientURI uri = new MongoClientURI(connectionString);
new MongoClient(uri).dropDatabase(uri.getDatabase());
}

/**
获取mondo读写路径
*/
private static String getMongoClientURI(final String[] args) {
String uri;
if (args.length == 0) {
uri = "mongodb://ip:27017"; // default
} else {
uri = args[0];
}
return uri;
}

}

原文地址:https://www.cnblogs.com/hejianxin/p/8066705.html