大数据实时项目(日活)1.6.1

第一章 实时处理模块

 

 

1  模块搭建

 

 

添加scala框架

 

2 代码思路

l 消费kafka中的数据。

l 利用redis过滤当日已经计入的日活设备。

l 把每批次新增的当日日活信息保存到ES中。

l 从ES中查询出数据,发布成数据接口,通可视化化工程调用。

 

 

 

 

 

3 代码开发之消费Kafka

3.1 配置

3.1.1  config.properties

# Kafka配置
kafka.broker.list=hadoop1:9092,hadoop2:9092,hadoop3:9092

# Redis配置
redis.host=hadoop1

redis.port=6379

3.1.2 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"

         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu.gmall2020</groupId>
    <artifactId>gmall2020-realtime</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>2.4.0</spark.version>
        <scala.version>2.11.8</scala.version>
        <kafka.version>1.0.0</kafka.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

   <dependencies>
       <dependency>
           <groupId>com.alibaba</groupId>
           <artifactId>fastjson</artifactId>
           <version>1.2.56</version>
       </dependency>



        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>

        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

       <dependency>
           <groupId>redis.clients</groupId>
           <artifactId>jedis</artifactId>
           <version>2.9.0</version>
       </dependency>


       <dependency>
           <groupId>org.apache.phoenix</groupId>
           <artifactId>phoenix-spark</artifactId>
           <version>4.14.2-HBase-1.3</version>
       </dependency>

       <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-sql_2.11</artifactId>
           <version>${spark.version}</version>
       </dependency>

       <dependency>
           <groupId>io.searchbox</groupId>
           <artifactId>jest</artifactId>
           <version>5.3.3</version>
           <exclusions>
               <exclusion>
                   <groupId>org.slf4j</groupId>
                   <artifactId>slf4j-api</artifactId>
               </exclusion>
           </exclusions>
       </dependency>

       <dependency>
           <groupId>net.java.dev.jna</groupId>
           <artifactId>jna</artifactId>
           <version>4.5.2</version>
       </dependency>

       <dependency>
           <groupId>org.codehaus.janino</groupId>
           <artifactId>commons-compiler</artifactId>
           <version>2.7.8</version>
       </dependency>

   </dependencies>

    <build>
        <plugins>
            <!-- 该插件用于将Scala代码编译成class文件 -->
            <plugin>

                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <!-- 声明绑定到maven的compile阶段 -->
                        <goals>

                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 

 

3.2 工具类

3.2.1 MykafkaUtil

import java.util.Properties

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object MyKafkaUtil {
  private val properties: Properties = PropertiesUtil.load("config.properties")
  val broker_list = properties.getProperty("kafka.broker.list")

  // kafka消费者配置
  var kafkaParam = collection.mutable.Map(

    "bootstrap.servers" -> broker_list,//用于初始化链接到集群的地址
    "key.deserializer" -> classOf[StringDeserializer],

    "value.deserializer" -> classOf[StringDeserializer],
    //用于标识这个消费者属于哪个消费团体
    "group.id" -> "gmall_consumer_group",

    //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
    //可以使用这个配置,latest自动重置偏移量为最新的偏移量
    "auto.offset.reset" -> "latest",

    //如果是true,则这个消费者的偏移量会在后台自动提交,但是kafka宕机容易丢失数据
    //如果是false,会需要手动维护kafka偏移量
    "enable.auto.commit" -> (false: java.lang.Boolean)

  )

  // 创建DStream,返回接收到的输入数据
  // LocationStrategies:根据给定的主题和集群地址创建consumer
  // LocationStrategies.PreferConsistent:持续的在所有Executor之间分配分区
  // ConsumerStrategies:选择如何在Driver和Executor上创建和配置Kafka Consumer
  // ConsumerStrategies.Subscribe:订阅一系列主题


  def getKafkaStream(topic: String,ssc:StreamingContext ): InputDStream[ConsumerRecord[String,String]]={

    val dStream = KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(topic),kafkaParam ))
    dStream
  }


  def getKafkaStream(topic: String,ssc:StreamingContext,groupId:String): InputDStream[ConsumerRecord[String,String]]={
    kafkaParam("group.id")=groupId
    val dStream = KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(topic),kafkaParam ))
    dStream
  }

  def getKafkaStream(topic: String,ssc:StreamingContext,offsets:Map[TopicPartition,Long],groupId:String): InputDStream[ConsumerRecord[String,String]]={
    kafkaParam("group.id")=groupId
    val dStream = KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(topic),kafkaParam,offsets))
    dStream
  }
}

 

 

3.2.2 PropertiesUtil

object PropertiesUtil {

  def main(args: Array[String]): Unit = {
        val properties: Properties = PropertiesUtil.load("config.properties")

        println(properties.getProperty("kafka.broker.list"))
  }

   def load(propertieName:String): Properties ={
     val prop=new Properties();
     prop.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertieName) , "UTF-8"))
     prop
   }

}

 

3.3.3  RedisUtil

object RedisUtil {

  var jedisPool:JedisPool=null

  def getJedisClient: Jedis = {

    if(jedisPool==null){
//      println("开辟一个连接池")
      val config = PropertiesUtil.load("config.properties")

      val host = config.getProperty("redis.host")
      val port = config.getProperty("redis.port")

      val jedisPoolConfig = new JedisPoolConfig()
      jedisPoolConfig.setMaxTotal(100)  //最大连接数
      jedisPoolConfig.setMaxIdle(20)   //最大空闲
      jedisPoolConfig.setMinIdle(20)     //最小空闲
      jedisPoolConfig.setBlockWhenExhausted(true)  //忙碌时是否等待
      jedisPoolConfig.setMaxWaitMillis(500)//忙碌时等待时长 毫秒
      jedisPoolConfig.setTestOnBorrow(true) //每次获得连接的进行测试

      jedisPool=new JedisPool(jedisPoolConfig,host,port.toInt)

    }
//    println(s"jedisPool.getNumActive = ${jedisPool.getNumActive}")
 //   println("获得一个连接")
    jedisPool.getResource

  }
}

 

log4j.properties

log4j.appender.atguigu.MyConsole=org.apache.log4j.ConsoleAppender
log4j.appender.atguigu.MyConsole.target=System.out
log4j.appender.atguigu.MyConsole.layout=org.apache.log4j.PatternLayout    
log4j.appender.atguigu.MyConsole.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n

log4j.rootLogger =error,atguigu.MyConsole

 

 

 

 

3.3 业务类消费kafka

def main(args: Array[String]): Unit = {
  val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("dau_app")
  val ssc = new StreamingContext(sparkConf, Seconds(5))
  val groupId = "GMALL_DAU_CONSUMER"
  val topic = "GMALL_START"
  val startupInputDstream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(topic, ssc)



  val startLogInfoDStream: DStream[JSONObject] = startupInputDstream.map { record =>
    val startupJson: String = record.value()
    val startupJSONObj: JSONObject = JSON.parseObject(startupJson)
    val ts: lang.Long = startupJSONObj.getLong("ts")
    startupJSONObj
 }

startLogInfoDStream.print(100)


ssc.start()

ssc.awaitTermination()

 

 


  }

}

 

 

 

4 代码开发2---去重

 4.1  思路  

  1. 把今日新增的活跃用户保存到redis中
  2. 根据保存反馈得到用户是否已存在

4.2 设计Redis的kv

key

value

dau:2019-01-22

设备id

 

4.3 业务代码

 

通过redis过滤

 

 

val dauLoginfoDstream: DStream[JSONObject] = startLogInfoDStream.transform { rdd =>
  println("前:" +  rdd.count())
  val logInfoRdd: RDD[JSONObject] = rdd.mapPartitions { startLogInfoItr =>
    val jedis: Jedis = RedisUtil.getJedisClient
    val dauLogInfoList = new ListBuffer[JSONObject]

    val startLogList: List[JSONObject] = startLogInfoItr.toList

    for (startupJSONObj <- startLogList) {
      val ts: lang.Long = startupJSONObj.getLong("ts")
      val dt: String = new SimpleDateFormat("yyyy-MM-dd").format(new Date(ts))
      val dauKey = "dau:" + dt
      val ifFirst: lang.Long = jedis.sadd(dauKey, startupJSONObj.getJSONObject("common").getString("mid"))
      if (ifFirst == 1L) {
        dauLogInfoList += startupJSONObj
      }
    }
    jedis.close()
    dauLogInfoList.toIterator
  }
 // println("后:" + logInfoRdd.count())
  logInfoRdd

}

 

 

 

 

 

 

 

 

 

 

 

5 代码开发3 --- 保存到ES中

5.1  建立索引模板

PUT   _template/gmall_dau_info_template

{

  "index_patterns": ["gmall_dau_info*"],                  

  "settings": {                                               

    "number_of_shards": 3

  },

  "aliases" : {

    "{index}-query": {},

    "gmall_dau_info-query":{}

  },

   "mappings": {

     "_doc":{  

       "properties":{

         "mid":{

           "type":"keyword"

         },

         "uid":{

           "type":"keyword"

         },

         "ar":{

           "type":"keyword"

         },

         "ch":{

           "type":"keyword"

         },

         "vc":{

           "type":"keyword"

         },

          "dt":{

           "type":"keyword"

         },

          "hr":{

           "type":"keyword"

         },

          "mi":{

           "type":"keyword"

         },

         "ts":{

           "type":"date"

         }  

  

         

       }

     }

   }

}

 

5.2   建立case class

case class DauInfo(
                mid:String,
                uid:String,
                ar:String,
                ch:String,
                vc:String,
                var dt:String,
                var hr:String,
                var mi:String,
                ts:Long) {

}

 

5.3  pom.xml 中增加依赖

<dependency>
    <groupId>io.searchbox</groupId>
    <artifactId>jest</artifactId>
    <version>5.3.3</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>net.java.dev.jna</groupId>
    <artifactId>jna</artifactId>
    <version>4.5.2</version>
</dependency>

<dependency>
    <groupId>org.codehaus.janino</groupId>
    <artifactId>commons-compiler</artifactId>
    <version>2.7.8</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
<dependency>

    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>2.4.6</version>
</dependency>

 

5.4 业务保存代码

调整数据结构

val dauDstream: DStream[DauInfo] = dauLoginfoDstream.map { startupJsonObj =>
  val dtHr: String = new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date(startupJsonObj.getLong("ts")))
  val dtHrArr: Array[String] = dtHr.split(" ")
  val dt = dtHrArr(0)
  val timeArr = dtHrArr(1).split(":")
  val hr = timeArr(0)
  val mi = timeArr(1)
  val commonJSONObj: JSONObject = startupJsonObj.getJSONObject("common")
  DauInfo(commonJSONObj.getString("mid"), commonJSONObj.getString("uid"), commonJSONObj.getString("mid"), commonJSONObj.getString("ch")
    , commonJSONObj.getString("vc"), dt, hr, mi, startupJsonObj.getLong("ts"))
}

 

 

保存,写入ES

dauDstream.foreachRDD{rdd=>
  rdd.foreachPartition{dauInfoItr=>
    val dauInfoWithIdList: List[(String, DauInfo)] = dauInfoItr.toList.map(dauInfo=>(dauInfo.dt+  "_"+dauInfo.mid,dauInfo))
    val dateStr: String = new SimpleDateFormat("yyyyMMdd").format(new Date())
    MyEsUtil.bulkInsert(dauInfoWithIdList,"gmall_dau_info_"+dateStr)


  }

}

 

 

6、 精确一次消费

6.1 定义

精确一次消费(Exactly-once) 是指消息一定会被处理且只会被处理一次。不多不少就一次处理。

如果达不到精确一次消费,可能会达到另外两种情况:

至少一次消费at least once,主要是保证数据不会丢失,但有可能存在数据重复问题。

最多一次消费 (at most once,主要是保证数据不会重复,但有可能存在数据丢失问题。

如果同时解决了数据丢失和数据重复的问题,那么就实现了精确一次消费的语义了。

6.2 问题如何产生

数据何时会丢失:  比如实时计算任务进行计算,到数据结果存盘之前,进程崩溃,假设在进程崩溃前kafka调整了偏移量,那么kafka就会认为数据已经被处理过,即使进程重启,kafka也会从新的偏移量开始,所以之前没有保存的数据就被丢失掉了。

数据何时会重复: 如果数据计算结果已经存盘了,在kafka调整偏移量之前,进程崩溃,那么kafka会认为数据没有被消费,进程重启,会重新从旧的偏移量开始,那么数据就会被2次消费,又会被存盘,数据就被存了2遍,造成数据重复。

 

6.3 如何解决

策略一:利用关系型数据库的事务进行处理。

出现丢失或者重复的问题,核心就是偏移量的提交与数据的保存,不是原子性的。如果能做成要么数据保存和偏移量都成功,要么两个失败。那么就不会出现丢失或者重复了。

这样的话可以把存数据和偏移量放到一个事务里。这样就做到前面的成功,如果后面做失败了,就回滚前面那么就达成了原子性。

问题与限制

但是这种方式有限制就是数据必须都要放在某一个关系型数据库中,无法使用其他功能强大的nosql数据库。如果保存的数据量较大一个数据库节点不够,多个节点的话,还要考虑分布式事务的问题。

 

策略二:手动提交偏移量+幂等性处理

咱们知道如果能够同时解决数据丢失和数据重复问题,就等于做到了精确一次消费。

那咱们就各个击破。

首先解决数据丢失问题,办法就是要等数据保存成功后再提交偏移量,所以就必须手工来控制偏移量的提交时机。

但是如果数据保存了,没等偏移量提交进程挂了,数据会被重复消费。怎么办?那就要把数据的保存做成幂等性保存。即同一批数据反复保存多次,数据不会翻倍,保存一次和保存一百次的效果是一样的。如果能做到这个,就达到了幂等性保存,就不用担心数据会重复了。

难点

话虽如此,在实际的开发中手动提交偏移量其实不难,难的是幂等性的保存,有的时候并不一定能保证。所以有的时候只能优先保证的数据不丢失。数据重复难以避免。即只保证了至少一次消费的语义

 

 

6.4  手动提交偏移

 流程

 

为什么用redis保存偏移量:

本身kafka 0.9版本以后consumer的偏移量是保存在kafka的__consumer_offsets主题中。但是如果用这种方式管理偏移量,有一个限制就是在提交偏移量时,数据流的元素结构不能发生转变,即提交偏移量时数据流,必须是InputDStream[ConsumerRecord[String, String]] 这种结构。但是在实际计算中,数据难免发生转变,或聚合,或关联,一旦发生转变,就无法在利用以下语句进行偏移量的提交:

xxDstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

所以实际生产中通常会利用zookeeper,redis,mysql等工具对偏移量进行保存。

 

偏移量管理类

 

import java.util

import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.Jedis
import  scala.collection.JavaConversions._
object OffsetManager {


  /**
    * 从Redis中读取偏移量
    * @param groupId
    * @param topic
    * @return
    */
  def getOffset(groupId:String,topic:String):Map[TopicPartition,Long]={

      var offsetMap=Map[TopicPartition,Long]()

      val jedisClient: Jedis = RedisUtil.getJedisClient

      val redisOffsetMap: util.Map[String, String] = jedisClient.hgetAll("offset:"+groupId+":"+topic)

      jedisClient.close()
      if(redisOffsetMap!=null&&redisOffsetMap.isEmpty){
            null
      }else {


            val redisOffsetList: List[(String, String)] = redisOffsetMap.toList

            val kafkaOffsetList: List[(TopicPartition, Long)] = redisOffsetList.map { case ( partition, offset) =>
             (new TopicPartition(topic, partition.toInt), offset.toLong)
           }
           kafkaOffsetList.toMap
      }
   }

  /**
    * 偏移量写入到Redis中
    * @param groupId
    * @param topic
    * @param offsetArray
    */
  def saveOffset(groupId:String,topic:String ,offsetArray:Array[OffsetRange]):Unit= {

    if (offsetArray != null && offsetArray.size > 0) {
      val offsetMap: Map[String, String] = offsetArray.map { offsetRange =>
        val partition: Int = offsetRange.partition
        val untilOffset: Long = offsetRange.untilOffset
        (partition.toString, untilOffset.toString)
      }.toMap

      val jedisClient: Jedis = RedisUtil.getJedisClient

      jedisClient.hmset("offset:" + groupId + ":" + topic, offsetMap)

      jedisClient.close()
    }
  }

}

 

 

 

根据自定义偏移量加载的读取kafka数据

def main(args: Array[String]): Unit = {
  val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("dau_app")
  val ssc = new StreamingContext(sparkConf, Seconds(5))
  val groupId = "GMALL_DAU_CONSUMER"
  val topic = "GMALL_START"

  //从redis读取偏移量
  val startupOffsets: Map[TopicPartition, Long] = OffsetManager.getOffset(groupId,topic)


  //根据偏移起始点获得数据
  val startupInputDstream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(topic, ssc,startupOffsets,groupId)



  //获得偏移结束点
  var startupOffsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]

  val startupInputGetOffsetDstream: DStream[ConsumerRecord[String, String]] = startupInputDstream.transform { rdd =>
    startupOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    rdd
  }


  val startLogInfoDStream: DStream[JSONObject] = startupInputGetOffsetDstream.map { record =>
    val startupJson: String = record.value()
    val startupJSONObj: JSONObject = JSON.parseObject(startupJson)
    val ts: lang.Long = startupJSONObj.getLong("ts")
    startupJSONObj
   
  }
  startLogInfoDStream.print(100)

…..

…..

…..

…..

 

dauDstream.foreachRDD{rdd=>
  rdd.foreachPartition{dauInfoItr=>

 

///可以观察偏移量
if(startupOffsetRanges!=null&&startupOffsetRanges.size>0){

  val offsetRange: OffsetRange = startupOffsetRanges(TaskContext.get().partitionId())
  println("from:"+offsetRange.fromOffset +" --- to:"+offsetRange.untilOffset)
}

 


    val dauInfoWithIdList: List[(String, DauInfo)] = dauInfoItr.toList.map(dauInfo=>(dauInfo.dt+  "_"+dauInfo.mid,dauInfo))
    val dateStr: String = new SimpleDateFormat("yyyyMMdd").format(new Date())
    MyEsUtil.bulkInsert(dauInfoWithIdList,"gmall_dau_info_"+dateStr)


  }
//在保存时最后提交偏移量


  OffsetManager.saveOffset(groupId ,topic, startupOffsetRanges)

 

//如果流发生了转换,无法用以下方法提交偏移量
//dauDstream.asInstanceOf[CanCommitOffsets].commitAsync(startupOffsetRanges)



}

 

…..

…..

 

 

 

 

 

 

 

 

 

 

 

 

第二章 日活数据查询接口

 

1 访问路径

总数

http://publisher:8070/realtime-total?date=2019-02-01

分时统计

http://publisher:8070/realtime-hour?id=dau&date=2019-02-01

 

2 要求数据格式

总数

[{"id":"dau","name":"新增日活","value":1200},

{"id":"new_mid","name":"新增设备","value":233} ]

分时统计

{"yesterday":{"11":383,"12":123,"17":88,"19":200 },

"today":{"12":38,"13":1233,"17":123,"19":688 }}

 

3 搭建发布工程

 

 

 

 

 

 

 

4 配置文件

4.1  pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <artifactId>gmall2019_dw</artifactId>
    <groupId>com.atguigu.gmall2019.dw</groupId>
    <version>1.0-SNAPSHOT</version>
  </parent>
  <groupId>com.atguigu.gmall2019.dw.publisher</groupId>
  <artifactId>dw-publisher</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>dw-publisher</name>
  <description>Demo project for Spring Boot</description>

  <properties>
    <java.version>1.8</java.version>
  </properties>

  <dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.8.1</version>
    </dependency>


    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.68</version>
    </dependency>

    <dependency>


        <groupId>io.searchbox</groupId>
        <artifactId>jest</artifactId>
        <version>5.3.3</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>net.java.dev.jna</groupId>
        <artifactId>jna</artifactId>
        <version>4.5.2</version>
    </dependency>

    <dependency>
        <groupId>org.codehaus.janino</groupId>
        <artifactId>commons-compiler</artifactId>
        <version>2.7.8</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
    <dependency>

        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>2.4.6</version>
    </dependency>



    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

 

 

 


  </dependencies>

  <build>
    <plugins>
       <plugin>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-maven-plugin</artifactId>
       </plugin>
    </plugins>
  </build>

</project>
 

 

 

4.2 application.properties

server.port=8070

 

logging.level.root=error

 



 

 

5 代码部分

5.1 代码结构

控制层

PublisherController

实现接口的web发布

服务层

PublisherService

数据业务查询interface

PublisherServiceImpl

业务查询的实现类

主程序

GmallPublisherApplication

增加扫描包

 

 

 

 

 

5.2  controller

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall2019.dw.publisher.service.PublisherService;
import org.apache.commons.lang.time.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

@RestController
public class PublisherController {

  @Autowired
EsService esService;

//@RequestMapping(value = "realtime-total" ,method = RequestMethod.GET)
@GetMapping("realtime-total")

public String realtimeTotal(@RequestParam("date") String dt){
    List<Map<String,Object>>  rsList=new ArrayList<>();

    Map<String,Object> dauMap = new HashMap();
    dauMap.put("id","dau");
    dauMap.put("name","新增日活");
    Long dauTotal = esService.getDauTotal(dt);
    if(dauTotal!=null){
        dauMap.put("value",dauTotal);
    }else {
        dauMap.put("value",0L);
    }

    rsList.add(dauMap);

    Map<String,Object> newMidMap = new HashMap();
    newMidMap.put("id","new_mid");
    newMidMap.put("name","新增设备");
    newMidMap.put("value",233);
    rsList.add(newMidMap);

    return  JSON.toJSONString(rsList);
}


@GetMapping("realtime-hour")
public String realtimeHour(@RequestParam(value = "id",defaultValue ="-1" ) String id ,@RequestParam("date") String dt ){
    if(id.equals("dau")){
        Map<String,Map> hourMap=new HashMap<>();
        Map dauHourTdMap = esService.getDauHour(dt);
        hourMap.put("today",dauHourTdMap);
        String yd = getYd(dt);
        Map dauHourYdMap = esService.getDauHour(yd);
        hourMap.put("yesterday",dauHourYdMap);
        return JSON.toJSONString(hourMap);
    }
    return null;
}

private  String getYd(String td){
    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
    String yd=null;
    try {
        Date tdDate = dateFormat.parse(td);
        Date ydDate = DateUtils.addDays(tdDate, -1);
        yd=dateFormat.format(ydDate);
    } catch (ParseException e) {
        e.printStackTrace();
        throw new RuntimeException("日期格式转变失败");
    }
    return yd;
}

 

 

 

5.3  service

import java.util.Map;

public interface EsService {

    //日活的总数查询
    public   Long getDauTotal(String date);

    //日活的分时查询
    public   Map getDauHour(String date);

}

 

 

 

5.4  service层实现类

@Service
public class EsServiceImpl implements EsService {


    public static void main(String[] args) {
        new EsServiceImpl().getDauTotal("2020-05-14");
    }

    @Autowired
    JestClient jestClient;

    @Override
    public Long getDauTotal(String date) {
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(new MatchAllQueryBuilder());

        String query = searchSourceBuilder.toString();
        date=date.replace("-","");
        String indexName="gmall1122_dau_info_"+date+"-query";
        Search search = new Search.Builder(query).addIndex(indexName).addType("_doc").build();
        Long total=0L;
        try {
            SearchResult searchResult = jestClient.execute(search);
            if(searchResult.getTotal()!=null){
                total = searchResult.getTotal();
            }
        } catch (IOException e) {
            e.printStackTrace();
            throw  new RuntimeException("查询ES异常");
        }
        return total;
    }

    @Override
    public Map getDauHour(String date) {

            String indexName = "gmall0105_dau_info_"+date+"-query";
    //构造查询语句
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

    TermsBuilder aggBuilder = AggregationBuilders.terms("groupby_hr").field("hr").size(24);
    searchSourceBuilder.aggregation(aggBuilder);

    Search search = new Search.Builder(searchSourceBuilder.toString()).addIndex(indexName).addType("_doc").build();
    try {
        SearchResult searchResult = jestClient.execute(search);
   //封装返回结果
        Map<String,Long> aggMap=new HashMap<>();

        if(searchResult.getAggregations().getTermsAggregation("groupby_hr")!=null){
            List<TermsAggregation.Entry> buckets = searchResult.getAggregations().getTermsAggregation("groupby_hr").getBuckets();
            for (TermsAggregation.Entry bucket : buckets) {
                aggMap.put( bucket.getKey(),bucket.getCount());
            }
        }
        return aggMap;
    } catch (IOException e) {
        e.printStackTrace();
        throw new RuntimeException("es 查询异常");
    }

}

 

 

 

 

 

 

 

6  搭建可视化工程进行对接

拷贝前端工程,启动主程序启动。

 

 

第三章 利用kibana 搭建数据可视化

如果数据保存在Elasticsearch那么利用kibana进行可视化展示是一种非常高效的可视化解决方案。

这种kibana可视化方案,优势是快速高效。但是对于展示效果的定制化和炫酷程度不能有太高的要求。

 

 

步骤一 :创建 index patten 其实就是创建数据源 确定数据范围。

在 Management中 创建index patterns

 

 

 

由于索引是以天为时间单位建立,通过用“*”来控制后缀可以得到不同的数据范围。

 

选择一个时间字段,用于对可视化图形灵活筛选时间范围。

 

 

步骤二: 配置单图

 

在VIsualize选择加号

 

选择一个可视化图形

 

 

 

 

 

 

 

选择一个index pattern

 

 

 

 

 

 

 

柱状图配置

Y轴配置度量值

1 Aggregation:聚合方法

2 Customer Label: Y轴的显示标签

 

X 轴配置维度列

1 Aggregation: 分组方法

2 Field: 分组字段

3 Order by : 分组如何排序

4 Order 排序顺序

5 size 列出前N名

 

6 Group other …. : 是否把不在TopN的分组组合起来组成一个Other列

 

7 Show Missing Values 是否把空值数据汇总起来组成一个Missing列。

 

8 Custom Label

Customer Label: X轴的显示标签

 

 

 

 

 

 

右上角选择刷新频率和时间范围

 

配置好后点击 执行按钮

 

 

最后保存,命名。

 

 

 

 

步骤三 配置仪表盘

仪表盘 Dashboard,就是配置多个图在一个页面上同时展示。

 

选择Dashboard 然后选择Create new DashBoard

 

 

 

然后选择add

 

从之前配置的可视化图中,选择图形。

 

 

 

选好后选择save保存。就可以在Dashboard列表中查看大盘图了。

 

原文地址:https://www.cnblogs.com/shan13936/p/13949119.html