Flink之布隆过滤器

大数据处理中,用去去重,布隆过滤器十分常见

1、代码

// 定义一个布隆过滤器
class Bloom(size: Long) extends Serializable {
  //布隆过滤器的默认大小是32M
  //32 * 1024 * 1024 * 8
  //2^5  2^10   2^10 * 2^3

  //1后面28个0
  private val cap = if (size > 0) size else 1 << 28

  //定义hash函数的结果,当做位图的offset
  def hash(value: String, seed: Int): Long = {
    var result = 0L
    for( i <- 0 until value.length ){
      //各种方法去实现都行
      result += result * seed + value.charAt(i)
    }
    //他们之间进行&运算结果一定在位图之间
    result  & ( cap - 1 ) //0后面28个1
  }
}

2、使用

//1、定义一个对象
lazy val bloom = new Bloom(1<<28)
//2、使用布隆对象对数据进行hash,从而获取偏移量
val offset = bloom.hash(userId, 61) 

 =================================

除了自定义的布隆过滤器,还可以使用Twitter 的开源包

使用案例:https://mp.weixin.qq.com/s/0LyPCADTAHCmV1eLA5h9oQ
<dependency>
    <groupId>com.clearspring.analytics</groupId>
    <artifactId>stream</artifactId>
    <version>2.7.0</version>
</dependency>
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import com.clearspring.analytics.stream.membership.BloomFilter;
 
public class BloomFilterFunction implements Function<String, Void> {
    BloomFilter filter = new BloomFilter(20, 20);
 
    Void process(String input, Context context) throws Exception {
      if (!filter.isPresent(input)) {
        filter.add(input);
        // Route to “not seen” topic
        context.publish(“notSeenTopic”, input);
      }
      return null;
   }
}
原文地址:https://www.cnblogs.com/ywjfx/p/14234834.html