大数据 Spark 连接外部资源

Spark中使用外部连接获取配置信息

Spark Streaming在启动的时候只能使用一个数据源的数据,但是我们的配置是随着业务进行改变的,所以需要动态的进行业务配置的获取。

连接redis

使用单例模式,在Driver上定义,在分区上遍历,JedisConnectionPool是在Master上定义的,广播到Worker上,同时JedisConnectionPool在每个work上始终只有一个实例存在,因为在方法中使用的是懒加载模式,只有在启动时才会初始化JedisConnectionPool,所以是在节点上完成的初始化,所以也不会出现序列化问题。

object HandleAppData {
  val LOG = LoggerFactory.getLogger(this.getClass)

  val port = 1

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("HandleKafkaDataAndApp")

    val ssc = new StreamingContext(conf, Seconds(10))
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> "ifan1:9092", //kafka集群地址
      "serializer.class" -> "kafka.serializer.StringEncoder",
      "group.id" -> "test1", //消费者组名
      "auto.offset.reset" -> "largest", //latest自动重置偏移量为最新的偏移量
      "enable.auto.commit" -> "false") //如果是true,则这个消费者的偏移量会在后台自动提交
    val topics = Set("testspark")

    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

    stream.foreachRDD {
      rdd =>
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.foreachPartition(handleHBase(_, offsetRanges))
    }

    ssc.start()
    ssc.awaitTermination()
  }

  def handleHBase(iterator: Iterator[(String, String)], offsetRanges: Array[OffsetRange]): Unit = {
    val jedis = JedisConnectionPool.getConnection()
    val pipe = jedis.pipelined()
    try {
      val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
      LOG.info(s"topic = ${o.topic} partiton = ${o.partition} offset = ${o.untilOffset} fromOffset = ${o.fromOffset}")
      // 将获取的数据放在 redis 中
      iterator.foreach(kv => {
        pipe.lpush(s"data:${o.topic}:${o.partition}", kv._2)
        LOG.info(s"${o.topic} => ${o.partition} => ${kv._2}")
      })

      // 保存 kafka offset
      jedis.set(s"partition:${o.topic}:${o.partition}", s"${o.untilOffset}")
    } finally {
      pipe.exec()
      pipe.close()
      jedis.close()
    }
  }

}

object JedisConnectionPool {

  val config = new JedisPoolConfig()
  // 最大连接数
  config.setMaxTotal(3)
  // 最大空闲连接数
  config.setMaxIdle(1)
  val pool = new JedisPool(config, "ifan1", 63790)

  def getConnection(): Jedis = {
    pool.getResource
  }
}

原文地址:https://www.cnblogs.com/iFanLiwei/p/12839689.html