基于Google Guava之BloomFilter的Redis的重构

对另一篇博客代码的补充。

原博是谁不知道,参考博文:https://segmentfault.com/a/1190000012620152

不再基于jedis,改用redisTemplate。跑了几次,发现确实可以动态扩容。原博牛逼!!!

RedisBloomFilter.java

package com.ylzinfo.ehc.server.bloomFilter.redis;

import com.google.common.base.Predicate;
import com.google.common.hash.Funnel;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.io.Serializable;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

/**
 * @Auther: syh
 * @Date: 2020/7/10
 * @Description: 基于redis和guava的bloomFilter
 */
public class RedisBloomFilter<T> implements Predicate<T>, Serializable {

    private final RedisBitmaps bits;
    private final int numHashFunctions;
    private final Funnel<? super T> funnel;
    private final RedisBloomFilter.Strategy strategy;

    private RedisBloomFilter(
            RedisBitmaps bits, int numHashFunctions, Funnel<? super T> funnel, RedisBloomFilter.Strategy strategy) {
        checkArgument(numHashFunctions > 0, "numHashFunctions (%s) must be > 0", numHashFunctions);
        checkArgument(
                numHashFunctions <= 255, "numHashFunctions (%s) must be <= 255", numHashFunctions);
        this.bits = checkNotNull(bits);
        this.numHashFunctions = numHashFunctions;
        this.funnel = checkNotNull(funnel);
        this.strategy = checkNotNull(strategy);
    }

    public static <T> RedisBloomFilter create(Funnel<? super T> funnel, int expectedInsertions, double fpp) {
        return create(funnel, (long) expectedInsertions, fpp);
    }

    public static <T> RedisBloomFilter<T> create(
            Funnel<? super T> funnel, long expectedInsertions, double fpp) {
        return create(funnel, expectedInsertions, fpp, RedisBloomFilterStrategies.MURMUR128_MITZ_64);
    }

    static <T> RedisBloomFilter<T> create(
            Funnel<? super T> funnel, long expectedInsertions, double fpp, RedisBloomFilter.Strategy strategy) {
        checkNotNull(funnel);
        checkArgument(
                expectedInsertions >= 0, "Expected insertions (%s) must be >= 0", expectedInsertions);
        checkArgument(fpp > 0.0, "False positive probability (%s) must be > 0.0", fpp);
        checkArgument(fpp < 1.0, "False positive probability (%s) must be < 1.0", fpp);
        checkNotNull(strategy);

        if (expectedInsertions == 0) {
            expectedInsertions = 1;
        }

        long numBits = optimalNumOfBits(expectedInsertions, fpp);
        int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);
        try {
            return new RedisBloomFilter<T>(new RedisBitmaps(numBits), numHashFunctions, funnel, strategy);
        } catch (IllegalArgumentException e) {
            throw new IllegalArgumentException("Could not create BloomFilter of " + numBits + " bits", e);
        }
    }

    @Override
    public boolean apply(@Nullable T input) {
        return mightContain(input);
    }

    public boolean put(T object) {
        return strategy.put(object, funnel, numHashFunctions, bits);
    }

    public boolean mightContain(T object) {
        return strategy.mightContain(object, funnel, numHashFunctions, bits);
    }

    static long optimalNumOfBits(long n, double p) {
        if (p == 0) {
            p = Double.MIN_VALUE;
        }
        return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
    }

    static int optimalNumOfHashFunctions(long n, long m) {
        return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
    }

    interface Strategy extends Serializable {
        <T> boolean put(T object, Funnel<? super T> funnel, int numHashFunctions, RedisBitmaps bits);

        <T> boolean mightContain(T object, Funnel<? super T> funnel, int numHashFunctions, RedisBitmaps bits);

        int ordinal();
    }
}

  

RedisBloomFilterStrategies.java

package com.ylzinfo.ehc.server.bloomFilter.redis;

import com.google.common.hash.Funnel;
import com.google.common.hash.Hashing;
import com.google.common.primitives.Longs;

/**
 * @Auther: syh
 * @Date: 2020/7/10
 * @Description:
 */
public enum RedisBloomFilterStrategies implements RedisBloomFilter.Strategy {

    MURMUR128_MITZ_64() {
        @Override
        public <T> boolean put(T object, Funnel<? super T> funnel, int numHashFunctions, RedisBitmaps bits) {
            long bitSize = bits.bitSize();
            byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).asBytes();
            long hash1 = lowerEight(bytes);
            long hash2 = upperEight(bytes);

            boolean bitsChanged = false;
            long combinedHash = hash1;

            long[] offsets = new long[numHashFunctions];
            for (int i = 0; i < numHashFunctions; i++) {
                offsets[i] = (combinedHash & Long.MAX_VALUE) % bitSize;
                combinedHash += hash2;
            }
            bitsChanged = bits.set(offsets);
            bits.ensureCapacityInternal();//自动扩容
            return bitsChanged;
        }

        @Override
        public <T> boolean mightContain(T object, Funnel<? super T> funnel, int numHashFunctions, RedisBitmaps bits) {
            long bitSize = bits.bitSize();
            byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).asBytes();
            long hash1 = lowerEight(bytes);
            long hash2 = upperEight(bytes);
            long combinedHash = hash1;

            long[] offsets = new long[numHashFunctions];
            for (int i = 0; i < numHashFunctions; i++) {
                offsets[i] = (combinedHash & Long.MAX_VALUE) % bitSize;
                combinedHash += hash2;
            }
            return bits.get(offsets);
        }

        private /* static */ long lowerEight(byte[] bytes) {
            return Longs.fromBytes(
                    bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]);
        }

        private /* static */ long upperEight(byte[] bytes) {
            return Longs.fromBytes(
                    bytes[15], bytes[14], bytes[13], bytes[12], bytes[11], bytes[10], bytes[9], bytes[8]);
        }
    }
}

  

RedisBitmaps.java

package com.ylzinfo.ehc.server.bloomFilter.redis;

import com.google.common.math.LongMath;
import com.google.common.primitives.Longs;

import java.math.RoundingMode;
import java.util.Arrays;
import java.util.List;
import java.util.stream.LongStream;

/**
 * @Auther: syh
 * @Date: 2020/7/10
 * @Description:
 */
public class RedisBitmaps {

    private static final String BASE_KEY = "bloomfilter";
    private static final String CURSOR = "cursor";
    private long bitSize;

    RedisBitmaps(long bits) {

        this.bitSize = LongMath.divide(bits, 64, RoundingMode.CEILING) * Long.SIZE;//位数组的长度,相当于n个long的长度

        if (bitCount() == 0) {
            RedisExecutor.newExecutor()
                    .execute(conn -> conn.setBit(currentKey().getBytes(), bitSize - 1, false));
        }
    }

    boolean get(long[] offsets) {
        for (long i = 0; i < cursor() + 1; i++) {
            final long cursor = i;
            //只要有一个cursor对应的bitmap中,offsets全部命中,则表示可能存在
            boolean match = Arrays.stream(offsets).boxed()
                    .map(offset -> {
                        List<Boolean> list = RedisExecutor.newExecutor()
                                .executePipelined(conn -> conn.getBit(genkey(cursor).getBytes(), offset));
                        return !list.contains(false);
                    })
                    .allMatch(b -> b == null ? false : b);
            if (match)
                return true;
        }
        return false;
    }

    boolean get(final long offset) {
        Boolean rst = RedisExecutor.newExecutor()
                .execute(conn -> conn.getBit(currentKey().getBytes(), offset));
        return rst == null ? false : rst;
    }

    boolean set(long[] offsets) {
        if (cursor() > 0 && get(offsets)) {
            return false;
        }
        boolean bitsChanged = false;
        for (long offset : offsets)
            bitsChanged |= set(offset);
        return bitsChanged;
    }

    boolean set(long offset) {
        if (!get(offset)) {
            RedisExecutor.newExecutor()
                    .execute(conn -> conn.setBit(currentKey().getBytes(), offset, true));
            return true;
        }
        return false;
    }

    long bitCount() {
        Long rst = RedisExecutor.newExecutor()
                .execute(conn -> conn.bitCount(currentKey().getBytes()));
        return rst == null ? 0 : rst;
    }

    long bitSize() {
        return this.bitSize;
    }

    private String currentKey() {
        return genkey(cursor());
    }

    private String genkey(long cursor) {
        return BASE_KEY + "-" + cursor;
    }

    private long cursor() {
        String cursor = RedisExecutor.newExecutor()
                .execute(conn -> conn.get(CURSOR.getBytes()));
        return cursor == null ? 0 : Longs.tryParse(cursor);
    }

    void ensureCapacityInternal() {
        if (bitCount() * 2 > bitSize())
            grow();
    }

    void grow() {
        Long cursor = RedisExecutor.newExecutor()
                .execute((conn) -> conn.incr(CURSOR.getBytes()));

        RedisExecutor.newExecutor()
                .execute(conn -> conn.setBit(genkey(cursor).getBytes(), bitSize - 1, false));
    }

    void reset() {
        byte[][] keys = LongStream.range(0, cursor() + 1).boxed().map(k -> genkey(k).getBytes()).toArray(byte[][]::new);

        RedisExecutor.newExecutor()
                .execute(conn -> conn.del(keys));
        RedisExecutor.newExecutor()
                .execute(conn -> conn.set(CURSOR.getBytes(), "0".getBytes()));
        RedisExecutor.newExecutor()
                .execute(conn -> conn.setBit(currentKey().getBytes(), bitSize - 1, false));
    }
}

  

RedisExecutor.java

package com.ylzinfo.ehc.server.bloomFilter.redis;

import com.ylzinfo.ehc.core.gateway.SpringContextUtil;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.List;

/**
 * @Auther: syh
 * @Date: 2020/7/10
 * @Description:
 */
public class RedisExecutor<T> {

    private RedisTemplate redisTemplate;

    public static <T> RedisExecutor<T> newExecutor() {
        return new RedisExecutor<>();
    }

    public RedisExecutor() {
        redisTemplate = SpringContextUtil.getBean("redisTemplate");
    }

    public <T> T execute(PipelineExecutor executor) {
        return (T) redisTemplate.execute((RedisCallback) conn -> {
            conn.openPipeline();
            T rst = (T) executor.exec(conn);
            conn.close();
            return rst;
        });
    }


    public<T> List<T> executePipelined(PipelineExecutor executor) {
        List<T> list = redisTemplate.executePipelined((RedisCallback) conn -> {
            conn.openPipeline();
            executor.exec(conn);
            conn.close();
            return null;
        });
        return list;
    }

    @FunctionalInterface
    public interface PipelineExecutor<T> {
        T exec(RedisConnection conn);
    }
}

  

测试controller: 参数num表示待校验数据,grow表示是否开启扩容,每次扩容1000条。

package com.ylzinfo.ehc.server.bloomFilter.redis;

import com.google.common.hash.Funnels;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Auther: syh
 * @Date: 2020/7/10
 * @Description:
 */
@RestController
public class RedisTest {
    RedisBloomFilter<String> bloomFilter = RedisBloomFilter.create(
            Funnels.stringFunnel(Charset.forName("utf-8")), 1000, 0.1);
    private boolean instance =true;
    private AtomicInteger incr = new AtomicInteger(0);

    @RequestMapping("bloom/filter/test")
    public Object test(HttpServletRequest request) {
        String num = request.getParameter("num");
        String grow = request.getParameter("grow");

        if (instance || "true".equals(grow)) {
            for (int i = 0; i < 1000; i++) {
                bloomFilter.put(String.valueOf(incr.getAndIncrement()));
            }
            instance = false;
        }

        return bloomFilter.mightContain(num);
    }
}

  

num=1,grow=false时,返回true(命中目标)

num-1000,grow=false时,返回false(因为还没扩容,所以未命中)
num-1000,grow=true时,返回true(已扩容,所以命中)

原文地址:https://www.cnblogs.com/braska/p/13280791.html