Spark Bucketizer 特征离散化、桶化

1、概念

将连续数值转换为离散类别特征。

比如需求把人分为50以上和50以下太不精准了,应该分为20岁以下,20-30岁,30-40岁,36-50岁,50以上,那么就得用到数值离散化的处理方法了。
离散化就是把特征进行适当的离散处理,比如上面所说的年龄是个连续的特征,但是我把它分为不同的年龄阶段就是把它离散化了,这样更利于我们分析用户行为进行精准推荐。
Bucketizer能方便的将一堆数据分成不同的区间。

2、code

package com.home.spark.ml

import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.Bucketizer
import org.apache.spark.sql.SparkSession

/**

  * Bucketizer将一列连续特征转换为一列特征存储桶,其中存储桶由用户指定。它带有一个参数:
  *
  * splits:用于将连续特征映射到存储桶的参数。使用n + 1个拆分,有n个存储桶。
  * 拆分x,y定义的存储区除最后一个存储区(也包含y)外,其值都在[x,y)范围内。
  * 分割数应严格增加。必须明确提供-inf,inf的值以覆盖所有Double值;否则,超出指定分割的值将被视为错误。
  *
  * 拆分的两个示例是Array(Double.NegativeInfinity,0.0,1.0,Double.PositiveInfinity)和Array(0.0,1.0,2.0)。
  *
  * 请注意,如果您不了解目标列的上限和下限,则应添加Double.NegativeInfinity和Double.PositiveInfinity作为拆分的边界,以防止出现超出Bucketizer边界的异常。
  *
  * 注意,提供的拆分必须严格按升序排列,即s0 <s1 <s2 <... <sn。
  **/
object Ex_Bucketizer {

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf(true).setMaster("local[2]").setAppName("spark ml")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)

    val data = Array(-999.9, -0.5, -0.3, 0.0, 0.2,0.5,999.9)
    val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

    val bucketizer  = new Bucketizer().setInputCol("features").setOutputCol("bucketedFeatures").setSplits(splits)
    // Transform original data into its bucket index.
    val bucketedData = bucketizer.transform(dataFrame)

    println(s"Bucketizer output with ${bucketizer.getSplits.length-1} buckets")
    bucketedData.show()

    val splitsArray = Array(
      Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity),
      Array(Double.NegativeInfinity, -0.3, 0.0, 0.3, Double.PositiveInfinity))

    val data2 = Array(
      (-999.9, -999.9),
      (-0.5, -0.2),
      (-0.3, -0.1),
      (0.0, 0.0),
      (0.2, 0.4),
      (999.9, 999.9))
    val dataFrame2 = spark.createDataFrame(data2).toDF("features1", "features2")

    val bucketizer2 = new Bucketizer()
      .setInputCols(Array("features1", "features2"))
      .setOutputCols(Array("bucketedFeatures1", "bucketedFeatures2"))
      .setSplitsArray(splitsArray)

    // Transform original data into its bucket index.
    val bucketedData2 = bucketizer2.transform(dataFrame2)

    println(s"Bucketizer output with [" +
      s"${bucketizer2.getSplitsArray(0).length-1}, " +
      s"${bucketizer2.getSplitsArray(1).length-1}] buckets for each input column")
    bucketedData2.show()

    spark.stop()
  }

}

Bucketizer output with 4 buckets
+--------+----------------+
|features|bucketedFeatures|
+--------+----------------+
|  -999.9|             0.0|
|    -0.5|             1.0|
|    -0.3|             1.0|
|     0.0|             2.0|
|     0.2|             2.0|
|     0.5|             3.0|
|   999.9|             3.0|
+--------+----------------+

Bucketizer output with [4, 4] buckets for each input column
+---------+---------+-----------------+-----------------+
|features1|features2|bucketedFeatures1|bucketedFeatures2|
+---------+---------+-----------------+-----------------+
|   -999.9|   -999.9|              0.0|              0.0|
|     -0.5|     -0.2|              1.0|              1.0|
|     -0.3|     -0.1|              1.0|              1.0|
|      0.0|      0.0|              2.0|              2.0|
|      0.2|      0.4|              2.0|              3.0|
|    999.9|    999.9|              3.0|              3.0|
+---------+---------+-----------------+-----------------+


原文地址:https://www.cnblogs.com/asker009/p/12205579.html