【Spark】如何用Spark查询IP地址?


需求

日常生活中,当我们打开地图时,会通过地图道路颜色获取当前交通情况,也可以通过地图上经常网购的IP地址热力图得出哪些地区网购观念更发达,还有当前疫情的情况,各个地区疫情的热力图可以直观反应出疫情的严重程度。
想要获取热力图,首先要清楚,通过点击流日志中的IP地址信息,可以推算出所在城市甚至经度和维度,通过计算同一经度纬度出现的次数,就可以绘制出热力图


思路

现在拥有两张信息表:
一张是IP字典,这里面可以对我们有直接实用价值的就是有序的转换为Long类型的IP范围,还有每一个IP范围所对应的经纬度

1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.0.8.0|1.0.15.255|16779264|16781311|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
1.0.32.0|1.0.63.255|16785408|16793599|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
1.1.0.0|1.1.0.255|16842752|16843007|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.1.2.0|1.1.7.255|16843264|16844799|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.1.8.0|1.1.63.255|16844800|16859135|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
1.2.0.0|1.2.1.255|16908288|16908799|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.2.2.0|1.2.2.255|16908800|16909055|亚洲|中国|北京|北京|海淀|北龙中网|110108|China|CN|116.29812|39.95931
1.2.4.0|1.2.4.255|16909312|16909567|亚洲|中国|北京|北京||中国互联网信息中心|110100|China|CN|116.405285|39.904989
1.2.5.0|1.2.7.255|16909568|16910335|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.2.8.0|1.2.8.255|16910336|16910591|亚洲|中国|北京|北京||中国互联网信息中心|110100|China|CN|116.405285|39.904989
1.2.9.0|1.2.127.255|16910592|16941055|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
1.3.0.0|1.3.255.255|16973824|17039359|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
1.4.1.0|1.4.3.255|17039616|17040383|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
……
……

另一张是用户IP上网记录,虽然有很多很长,但是对于目前此项目,只需要提取其中的IP地址就可以了

20090121000132095572000|125.213.100.123|show.51.com|/shoplist.php?phpfile=shoplist2.php&style=1&sex=137|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; Mozilla/4.0(Compatible Mozilla/4.0(Compatible-EmbeddedWB 14.59 http://bsalsa.com/ EmbeddedWB- 14.59  from: http://bsalsa.com/ )|http://show.51.com/main.php|
20090121000132124542000|117.101.215.133|www.jiayuan.com|/19245971|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TencentTraveler 4.0)|http://photo.jiayuan.com/index.php?uidhash=d1c3b69e9b8355a5204474c749fb76ef|__tkist=0; myloc=50%7C5008; myage=2009; PROFILE=14469674%3A%E8%8B%A6%E6%B6%A9%E5%92%96%E5%95%A1%3Am%3Aphotos2.love21cn.com%2F45%2F1b%2F388111afac8195cc5d91ea286cdd%3A1%3A%3Ahttp%3A%2F%2Fimages.love21cn.com%2Fw4%2Fglobal%2Fi%2Fhykj_m.jpg; last_login_time=1232454068; SESSION_HASH=8176b100a84c9a095315f916d7fcbcf10021e3af; RAW_HASH=008a1bc48ff9ebafa3d5b4815edd04e9e7978050; COMMON_HASH=45388111afac8195cc5d91ea286cdd1b; pop_1232093956=1232468896968; pop_time=1232466715734; pop_1232245908=1232469069390; pop_1219903726=1232477601937; LOVESESSID=98b54794575bf547ea4b55e07efa2e9e; main_search:14469674=%7C%7C%7C00; registeruid=14469674; REG_URL_COOKIE=http%3A%2F%2Fphoto.jiayuan.com%2Fshowphoto.php%3Fuid_hash%3D0319bc5e33ba35755c30a9d88aaf46dc%26total%3D6%26p%3D5; click_count=0%2C3363619
20090121000132406516000|117.101.222.68|gg.xiaonei.com|/view.jsp?p=389|Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; CIBA)|http://home.xiaonei.com/Home.do?id=229670724|_r01_=1; __utma=204579609.31669176.1231940225.1232462740.1232467011.145; __utmz=204579609.1231940225.1.1.utmccn=(direct)
20090121000132581311000|115.120.36.118|tj.tt98.com|/tj.htm|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TheWorld)|http://www.tt98.com/|
20090121000132864647000|123.197.64.247|cul.sohu.com|/20071227/n254338813_22.shtml|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TheWorld)|http://cul.sohu.com/20071227/n254338813_22.shtml|ArticleTab=visit:1; IPLOC=unknown; SUV=0901080709152121; vjuids=832dd37a1.11ebbc5d590.0.b20f858f14e918; club_chat_ircnick=JaabvxC4aaacQ; spanel=%7B%22u%22%3A%22%22%7D; vjlast=1232467312,1232467312,30
20090121000133296729000|222.55.57.176|down.chinaz.com|/|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; iCafeMedia; TencentTraveler 4.0)||cnzz_a33219=0; vw33219=%3A18167791%3A; sin33219=http%3A//www.itxls.com/wz/wyfx/it.html; rtime=0; ltime=1232464387281; cnzz_eid=6264952-1232464379-http%3A//www.itxls.com/wz/wyfx/it.html
20090121000133331104000|123.197.66.93|www.pkwutai.cn|/down/downLoad-id-45383.html|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 1.7)|http://www.baidu.com/s?tn=b1ank_pg&ie=gb2312&bs=%C3%C0%C6%BC%B7%FE%D7%B0%B9%DC%C0%ED%C8%ED%BC%FE&sr=&z=&cl=3&f=8&wd=%C6%C6%BD%E2%C3%C0%C6%BC%B7%FE%D7%B0%B9%DC%C0%ED%C8%ED%BC%FE&ct=0|

核心思路就是将上网记录中的IP地址转换为Long类型,利用二分法和IP字典中的IP范围比较,找出对于的经度和维度,然后对经度纬度作计数,最后把拿到的数据放到mysql数据库中


ip地址转换为Long类型的两种方法

以下方法是从博主 @徐志/忘川风华录 的文章 IP地址转换成Long型数字算法和原理(全网最细!!)
查到的,佩服的可以点击链接前往观摩

ip地址转换数字地址的原理

IP地址一般是一个32位的二进制数意思就是如果将IP地址转换成二进制表示应该有32为那么长,但是它通常被分割为4个“8位二进制数”(也就是4个字节每,每个代表的就是小于2的8 次方)。IP地址通常用“点分十进制”表示成(a.b.c.d)的形式,其中,a,b,c,d都是0~255之间的十进制整数。例:点分十进IP地址(100.4.5.6),实际上是32位二进制数(01100100.00000100.00000101.00000110)

第一种方法

val ipV4="125.213.100.123"
val fragments = ip22.split("[.]")
var Ip_Num=125*256*256*256+213*256*256+100*256+123
println(Ip_Num)     //打印的结果2111136891

第二种方法

val ip22="125.213.100.123"
val fragments = ip22.split("[.]")
var Ip_Num = 0L
for (i <- 0 until fragments.length){
  Ip_Num =  fragments(i).toLong | ipNum << 8L
}
println(Ip_Num)     //打印结果  2111136891

步骤

一、在mysql创建数据库表

/*
SQLyog Ultimate v8.32 
MySQL - 5.6.22-log : Database - spark
*********************************************************************
*/


/*!40101 SET NAMES utf8 */;

/*!40101 SET SQL_MODE=''*/;

/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`spark` /*!40100 DEFAULT CHARACTER SET utf8 */;

USE `spark`;

/*Table structure for table `iplocation` */

DROP TABLE IF EXISTS `iplocation`;

CREATE TABLE `iplocation` (
  `longitude` varchar(32) DEFAULT NULL,
  `latitude` varchar(32) DEFAULT NULL,
  `total_count` varchar(32) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

二、开发代码

import java.sql.{Connection, DriverManager}

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast


object IpLocation {

  //定义把IP地址转换为Long类型的方法 125.213.100.123
  def ipToLong(ip: String): Long = {
    val split: Array[String] = ip.split("\.")
    var resultNum: Long = 0L
    for (eachNum <- split) {
      resultNum = eachNum.toLong | resultNum << 8L
    }
    resultNum
  }

  /**
   * 使用二分查找法,确定用户ip落在了哪个范围
   *
   * @param userLongIP
   * @param ipArray
   * @return 返回ipArray的下标索引
   */
  def binarySearch(userLongIP: Long, ipArray: Array[(String, String, String, String)]): Int = {
    var startIndex: Int = 0
    var endIndex: Int = ipArray.length
    //创建while循环
    while (startIndex <= endIndex) {
      var middle: Int = {
        startIndex + endIndex
      } / 2
      //判断用户落在哪个范围
      if (userLongIP >= ipArray(middle)._1.toLong && userLongIP <= ipArray(middle)._2.toLong) {
        //使用return可以直接返回数据并停掉while循环
        return middle
      }
      //如果落在了middle的左边
      if (userLongIP < ipArray(middle)._1.toLong) {
        endIndex = middle - 1
      }
      //如果落在了middle的右边
      if (userLongIP > ipArray(middle)._1.toLong) {
        startIndex = middle + 1
      }
    }
    //避免方法报错,随便返回一个值即可,因为肯定不会到这个值,while循环最终都会落在return middle上
    0
  }

  /**
   * 定义函数,将数据放到mysql中去
   */
  //datas获取一个分区里的所有数据
  val data2Mysql = (datas: Iterator[((String, String), Int)]) => {
    //连接到mysql
    val connection: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", "root", "123456")
    //使用sql语句导入数据
    val preparedStatement = connection.prepareStatement("insert into iplocation(longitude,latitude,total_count) values(?,?,?)")
    //foreach能获取到datas中的每一条数据
    datas.foreach(record => {
      preparedStatement.setString(1, record._1._1) //获取经度
      preparedStatement.setString(2, record._1._2) //获取纬度
      preparedStatement.setInt(3, record._2) //获取次数
      preparedStatement.execute()
    })
    preparedStatement.close()
    connection.close()
  }

  def main(args: Array[String]): Unit = {
    //获取SparkConf
    val sparkConf = new SparkConf().setAppName("IP-Location").setMaster("local[2]").set("spark.driver.host", "localhost")
    //获取SparkContext
    val sparkContext = new SparkContext(sparkConf)
    //设置日志筛选条件
    sparkContext.setLogLevel("WARN")

    //读取IP字典
    val IpDictionary: RDD[String] = sparkContext.textFile("/Users/zhaozhuang/Desktop/4、Spark/Spark第二天/第二天教案/资料/服务器访问日志根据ip地址查找区域/用户ip上网记录以及ip字典/ip.txt")
    //IP字典内的数据需要的是IP字段Long类型的起始和结束,还有经度和维度
    //首先需要将拿到的数据进行分割  1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
    val map: RDD[Array[String]] = IpDictionary.map(x => x.split("\|"))
    //切割后,我们需要的数据就是ip范围起始值、结束值、经度和纬度 下标分别是2,3,length-2,length-1
    val ipRange: RDD[(String, String, String, String)] = map.map(x => (x(2), x(3), x(x.length - 2), x(x.length - 1)))
    //将IP字典的数据放到广播变量中,供所有的block块使用
    val collect: Array[(String, String, String, String)] = ipRange.collect()
    val broadcast: Broadcast[Array[(String, String, String, String)]] = sparkContext.broadcast(collect)


    //读取用户上网记录表
    val userIpRecord: RDD[String] = sparkContext.textFile("/Users/zhaozhuang/Desktop/4、Spark/Spark第二天/第二天教案/资料/服务器访问日志根据ip地址查找区域/用户ip上网记录以及ip字典/20090121000132.394251.http.format")
    //用户上网记录表中只需要IP地址的字段,所以进行切割拿到IP地址字段即可
    //数据样式 20090121000132095572000|125.213.100.123|show.51.com|/shoplist.php?phpfile=shoplist2.php&style=1&sex=137|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; Mozilla/4.0(Compatible Mozilla/4.0(Compatible-EmbeddedWB 14.59 http://bsalsa.com/ EmbeddedWB- 14.59  from: http://bsalsa.com/ )|http://show.51.com/main.php|
    val allUserIP: RDD[String] = userIpRecord.map(x => x.split("\|")(1))

    /**
     * mapPartitions需要两个参数
     * def mapPartitions[U: ClassTag](
     * f: Iterator[T] => Iterator[U],       第一个是给一个是迭代器的函数,返回一个迭代器
     * preservesPartitioning: Boolean = false)        第二个参数为默认值
     *
     * 本方法是将拿到的用户IP一次性提取一个分区的数据,iter就表示一个分区的所有数据
     */
    val numTimes: RDD[((String, String), Int)] = allUserIP.mapPartitions(iter => {
      //获取广播变量里的值   数据样式:16777472|16778239|119.306239|26.075302
      val ipArray: Array[(String, String, String, String)] = broadcast.value
      //通过map方法遍历一个分区中的所有数据
      iter.map(ip => {
        //获取每一个用户的IP,需要是Long类型
        val userLongIP: Long = ipToLong(ip)
        //将用户ip和广播变量中的数据进行比较,确定一下用户ip在哪个ip范围,返回值是ipArray索引
        val resultIndex: Int = binarySearch(userLongIP, ipArray)
        //每个经纬度出现一次就计作一次
        ((ipArray(resultIndex)._3, ipArray(resultIndex)._4), 1)
      })
    })


    //将每个经度和维度出现的次数进行累加
    val locationCount: RDD[((String, String), Int)] = numTimes.reduceByKey(_ + _)

    //将最终累加的结果值保存到Mysql中去
    locationCount.foreachPartition(data2Mysql)

  }
}

原文地址:https://www.cnblogs.com/zzzsw0412/p/12772392.html