spark2.x-java-用spark-sql和spark-streaming 统计网站的访客数(uv)

环境:
spark2.2.0 JDK1.8

感觉网上关于spark2.0的java程序案例太少了,在这里补充一个,大家有好的案例也可以分享啊
不多说,直接上代码

/**
* @author admin
* @define 统计网站日用户访问量
* create 2018-01-15 19:55
*/
public class DailyUvStreaming {

public static void main(String[] args) throws InterruptedException {

//设置日志级别
// Logger.getLogger("org").setLevel(Level.ERROR);

//初始化spark上下文
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("DailyUvStreaming");

//初始化spark上下文 以及时间间隔
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));

//监听TCP服务
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

//将 DStream 转换成 DataFrame 并且运行sql查询
lines.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {
@Override
public void call(JavaRDD<String> rdd, Time time) {
SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());

//通过反射将RDD转换为DataFrame
JavaRDD<UserAccessLog> rowRDD = rdd.map(new Function<String, UserAccessLog>() {
@Override
public UserAccessLog call(String line) {
UserAccessLog userLog = new UserAccessLog();
String[] cols = line.split(" ");
userLog.setDate(cols[0]);
userLog.setUserId(cols[1]);
return userLog;
}
});
Dataset<Row> dataFrame = spark.createDataFrame(rowRDD, UserAccessLog.class);

// 创建临时表
dataFrame.createOrReplaceTempView("log");

//按日期分组 去重userId,计算访客数
Dataset<Row> result =
spark.sql("select date, count(distinct userId) as uv from log group by date");
System.out.println("========= " + time + "=========");

//输出前20条数据
result.show();
}
});


//开始流式计算
jssc.start();
// 等待计算终止
jssc.awaitTermination();
jssc.stop(true);


}
}


/**
* 懒汉式单例
*/
class JavaSparkSessionSingleton {
private static transient SparkSession instance = null;

public static SparkSession getInstance(SparkConf sparkConf) {
if (instance == null) {
instance = SparkSession
.builder()
.config(sparkConf)
.config("spark.testing.memory", "2147480000")
.getOrCreate();
}
return instance;
}
}
————————————————
版权声明:本文为CSDN博主「认真起来的菜鸟」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_16038125/article/details/79074287

原文地址:https://www.cnblogs.com/javalinux/p/15065038.html