spark广播变量

Spark-广播变量

  • 当我们产生了几百个或是几千个task这些task后期都需要使用到一份共同的数据,假如这个数据量有1G,这些task后期运行完成需要内存开销 几百或几千乘以1g,内存开销还是特别大的,特别浪费资源。而spark提供一个叫数据共享机制广播变量。可以把共同数据从Driver段下发到每一个参与计算的worker节点上,每个worker节点保留该数据一个副本(该副本是只读的,不可改变),后面在每一个worker上运行大量task都共享该副本数据。这样,假如我们有2个worker参与计算,该数据会下发2份,这里就大大减少内存开销。

1.通过spark实现IP地址查询

package cn.wc

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ip_ocation {
  // ip转换
  def ip2Long(ip:String):Long = {
    val ips:Array[String] = ip.split("\.")
    var ipNum:Long = 0L
    for (i <- ips) {
      ipNum = i.toLong | ipNum << 8L
    }
    ipNum
  }
  // 二分查
  def binarySearch(ipNum:Long, city_ip_Array:Array[(String,String,String,String)]):Int = {
    var start = 0
    var end = city_ip_Array.length - 1
    while (start <= end) {
      val middle = (start + end) / 2
      if (ipNum >= city_ip_Array(middle)._1.toLong && ipNum <= city_ip_Array(middle)._2.toLong) {
        return middle
      }
      if (ipNum < city_ip_Array(middle)._1.toLong) {
        end = middle - 1
      }
      if (ipNum > city_ip_Array(middle)._2.toLong) {
        start = middle + 1
      }
    }
    -1
  }
  def main(args: Array[String]): Unit = {
    val sparkConf:SparkConf = new SparkConf().setAppName("IpOcation").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("warn")
    // 读取城市IP信息文件
    val city_id_rdd:RDD[(String,String,String,String)] = sc.textFile("J:\ips.txt").map(x => x.split("\|")).map(x => (x(2), x(3), x(x.length - 2), x(x.length - 1)))
    // 广播变量使用:把城市ip信息数据,下发到每个worker节点
    // 广播无法广播RDD,需要通过collect转换
    val cityTpBroadcase: Broadcast[Array[(String,String,String,String)]] = sc.broadcast(city_id_rdd.collect())
    // 读取运营商日志数据
    val ipsRDD:RDD[String] = sc.textFile("J:\flow.format").map(x => x.split("\|")(1))
    // 遍历ipsDD获取每个IP地址,然后去city_ip_rdd去匹配,获取该ip对应经纬度
    val result:RDD[((String,String), Int)] = ipsRDD.mapPartitions(iter => {
      // 获取广播变量的值
      val city_ip_Array:Array[(String,String,String,String)] = cityTpBroadcase.value
      iter.map(ip => {
        // 将ip地址转换成Long类型数值
        val ipNum:Long = ip2Long(ip)
        // 通过ipNum去广播变量去匹配,获取ipNum,在广播变量数组中下标
        val index:Int = binarySearch(ipNum, city_ip_Array)
        // 获取该数据
        val value: (String,String,String,String) = city_ip_Array(index)
        // 获取经纬度,封装返回数据
        ((value._3,value._4), 1)
      })
    })
    val finalResult: RDD[((String,String), Int)] = result.reduceByKey(_+_)

    finalResult.foreach(println)
    // 保存数据到数据库
    finalResult.foreachPartition(iter => {
      val connection: Connection  = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/spark", "root", "123")
      val sql = "insert into flow(longitude, latitude, total) values (?,?,?)"

      try {
        val ps: PreparedStatement = connection.prepareStatement(sql)
        iter.foreach(line => {
          ps.setString(1, line._1._1)
          ps.setString(2, line._1._2)
          ps.setInt(3, line._2)
          ps.execute()
        })
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        if (connection!= null) {
          connection.close()
        }
      }
    })
    sc.stop()
  }
}

2.spark读取文件数据保存到hbase中

  • pom.xml添加hbase依赖
<dependency>
	<groupId>org.apache.hbase</groupId>
	<artifactId>hbase-client</artifactId>
	<version>1.2.1</version>
</dependency>
原文地址:https://www.cnblogs.com/xujunkai/p/14916344.html