spark 操作redis

依赖库

spark 操作redis的时候,依赖的库是spark-redis

首先我们导入依赖

    <!-- https://mvnrepository.com/artifact/com.redislabs/spark-redis -->
    <dependency>
      <groupId>com.redislabs</groupId>
      <artifactId>spark-redis_2.11</artifactId>
      <version>2.4.2</version>
    </dependency>

spark-redis 参数设置

首先初始化一个spark实例,spark-redis的参数在config中进行配置。

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

  def main(args: Array[String]): Unit = {
    val conf: SparkConf =new SparkConf().setAppName("setRedis").setMaster("local[2]")
    conf.set("redis.host", "localhost")    //redis 主机节点
    conf.set("redis.port", "6379")  //端口号,不填默认为6379

    val session: SparkSession =SparkSession.builder().config(conf).getOrCreate()
    val sc: SparkContext =session.sparkContext

  }

还可以设置一些额外的参数

conf.set("redis.auth","null")  //用户权限配置
conf.set("redis.db","0")  //数据库设置
conf.set("redis.timeout","2000")  //设置连接超时时间

简单使用

sc通过导入的隐式转换可以调出的读取Redis的方法,都是以fromRedis开头的,都是redis可以存储的数据结构,这里以常见的KV进行示例

import com.redislabs.provider.redis._

读取Redis

通过fromRedisKV方法,获取一个键的值。

val rdd: RDD[(String, String)] =sc.fromRedisKV("a")
rdd.collect().foreach(println(_))

/**
 * (a,1)
 */

fromRedisKV()的源码:

  /**
    * @param keysOrKeyPattern an array of keys or a key pattern
    * @param partitionNum     number of partitions
    * @return RedisKVRDD of simple Key-Values stored in redis server
    */
  def fromRedisKV[T](keysOrKeyPattern: T,
                     partitionNum: Int = 3)
                    (implicit
                     redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf),
                     readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)):
  RDD[(String, String)] = {
    keysOrKeyPattern match {
      case keyPattern: String => fromRedisKeyPattern(keyPattern, partitionNum).getKV()
      case keys: Array[String] => fromRedisKeys(keys, partitionNum).getKV()
      case _ => throw new scala.Exception(IncorrectKeysOrKeyPatternMsg)
    }
  }

fromRedisKV()的参数:

  1. 泛类型 keysOrKeyPattern
    从的模式匹配代码中可以看出,这里的T可是是两种类型,一个是String,另一个是Array[String],如果传入其他类型则会抛出运行时异常,其中String类型的意思是匹配键,这里可以用通配符比如foo*,所以返回值是一个结果集RDD[(String, String)],当参数类型为Array[String]时是指传入key的数组,返回的结果则为相应的的结果集,RDD的内容类型也是KV形式。

  2. Ine类型 partitionNum
    生成RDD的分区数,默认为3,可以根据实际情况进行更改防止数据过度倾斜。

  3. 柯里化形式隐式参数 redisConfig
    由于我们之前在sparkConf里面set了相应的参数,这里不传入这个参数即可。如要调整,则可以按照源码中的方式传入,其中RedisEndpoint是一个case class类,而且很多参数都有默认值(比如6379的端口号),所以自己建立一个RedisEndpoint也是非常方便的。

写入Redis

通过toRedisKV将数据写入redis

val data: Seq[(String, String)] = Seq[(String,String)](("high","111"), ("abc","222"), ("together","333"))
val redisData:RDD[(String,String)] = sc.parallelize(data)
sc.toRedisKV(redisData)

查看redis

127.0.0.1:6379> keys *
1) "high"
2) "together"
3) "abc"
127.0.0.1:6379> 

toRedisKV()的源码:

/**
    * @param kvs Pair RDD of K/V
    * @param ttl time to live
    */
  def toRedisKV(kvs: RDD[(String, String)], ttl: Int = 0)
               (implicit
                redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf),
                readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)) {
    kvs.foreachPartition(partition => setKVs(partition, ttl, redisConfig, readWriteConfig))
  }

toRedisKV()的参数

  1. kv类型的RDD
    kvs是一个键值对类型的RDD,键和值的类型都是String类型

  2. Int类型的ttl
    ttl是存入数据的过期时间,单位是秒

以上就是spark读写redis的两个常用的方法。

原文地址:https://www.cnblogs.com/Jaryer/p/13667913.html