StreamingContext.getOrCreate

/**
 
  */
object AppRealTime {

  def main(args: Array[String]): Unit = {
    if (args.length < 5) {
      println("please input args like: seconds checkpointdir kafkaBrokerList groupId topic")
      System.exit(1)
    }
    val logger = LoggerFactory.getLogger(AppRealTime.getClass)

    /**
      * 创建StreamContext
      *
      * @return
      */
    def createStreamingContext: StreamingContext = {
      val conf = new SparkConf
      //StreamingContext,里面包含SparkContext
      val ssc = new StreamingContext(conf, Seconds(args(0).trim.toInt))
      //设置checkpoint,保存运行数据
      ssc.checkpoint(args(1).trim)

      //kafka连接参数
      val kafkaParams = Map("metadata.broker.list" -> args(2).trim, "group.id" -> args(3).trim)
      //指定要读取的topics
      val topics = Set(args(4).trim)


      //创建directStream从kafka读取数据
      val data = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

      //迭代处理数据
      data.foreachRDD(rdd => {
        rdd.foreachPartition(p => {
          val conf = HBaseConfiguration.create()
          //连接Connection
          val hConnection = ConnectionFactory.createConnection(conf)
          //获取table
          val click = hConnection.getTable(TableName.valueOf(Constants.HISTORY_CLICK))
          val statistic = hConnection.getTable(TableName.valueOf(Constants.RESULT_STATISTIC))

          try {
            while (p.hasNext) {
              val tuple = p.next()
              val logType = tuple._1
              val logVal = tuple._2
              println(logType+"	"+logVal)
              logType match {
                case "click" => {
                  val clickObj = new Click(logVal)
                  if (HBaseUtil.isExists(click, clickObj.getRowKey)) {
                    clickObj.doRepeat(statistic)
                  } else {
                    clickObj.doNoRepeat(click, statistic)
                  }
                }
                case _ => {
                  logger.info("msg:" + logVal)
                }

              }

            }
          } catch {
            case ex: Exception => {
              logger.error("error :", ex)
            }
          } finally {
            click.close()
            statistic.close()
            hConnection.close()
          }
        })
      })
      ssc
    }

    val ssc = StreamingContext.getOrCreate(args(1).trim, createStreamingContext _)

    ssc.start()
    ssc.awaitTermination()

  }

}

经过粗略的实验(一个分区)发现,使用了这个方法之后可以实现不丢失数据

原文地址:https://www.cnblogs.com/rocky-AGE-24/p/7463698.html