[Alg] 随机抽样完备算法-蓄水池算法 Reservoir Sampling

1. 问题定义

在保证$n$个元素被抽取的概率是相同的前提下,从总量为$n$的样本空间中随机抽取$k$个元素

2. 应用场景和一般算法

(1) 对于总数$n$值已知的情况

我们可以用最简单的随机数算法,生成范围在 $[1, n]$间的$k$个随机数。

(2) 对于总数$n$值提前未知的情况

一种方法是,首先遍历这个样本空间下所有样本并计数,得到n,之后再用(1)中的方法等随机抽样。

但是先遍历一遍样本空间,在样本很大的情况下是很耗时的。有没有不需要提前遍历一遍整个样本空间的,又能公平抽样的算法呢?

这里将介绍蓄水池算法,是一个被证明绝对公平的,不需提前预知样本空间大小的抽样算法

(3) 实际场景

什么时候是"总数提前未知且要求随机抽样"的场景呢?

ex1. 从一本很厚的电话簿中抽取 1000 人进行姓氏统计

ex2. 从 Google 搜索 "Ken Thompson",从中抽取 100 个结果查看哪些是今年的

ex3. 我自己的实际工作场景:在推词工作中,要对最终项目产出的推荐词进行相关性评估,即评估推荐词和已购词的相关性。这里需要抽取,比如300条case人工评定一下相关性。那这里我们词库中的词太多啦,在hdfs中保存成了几千的part。要怎么对这么多数据随机抽取300条呢?

最初自己的想法是随机选几个part, 把随机取的几个part拼接,之后用随机数算法抽取。

但是有个问题,由于hadoop的工作原理,在reduce阶段把key相同/相近的放在一起了,使得保存在集群上的各个part间的数据可能不是完全独立的,并且不同part包含的数据量也可能不同。那在第一步随机选part这个阶段就不能保证完全公平了。虽然这个粗糙的方法也不太伤大雅,但是有更好的算法就可以学着用一下~

3. 蓄水池算法抽样过程

假设数据序列的规模为n, 需要采样的数量为$k$. 

首先构建一个可容纳$k$个元素的数组,将序列的前$k$个元素放入数组中。

然后从第$k + 1$个元素开始,以$frac{k}{n}$的概率来决定该元素是否被替换到数组中 (数组中的元素被替换的概率是相同的)。

当遍历完所有元素后,数组中剩下的元素即为所需采样的样本。

4. 证明

(1) 对于第$i$个数 ($i <= k$)

(i) 在第$1, ..., k$步,该元素被选中到蓄水池数组的概率为1.

(ii) 在第$k+1$步,数据序列规模为$k + 1$.

该元素被替换的概率 = $k + 1$个元素被选中的概率 * $i$被选中替换的概率 = $frac{k}{k + 1} imes frac{1}{k} = frac{1}{k + 1}$

该元素被保留的概率 = $1 - frac{1}{k + 1} = frac{k}{k + 1}$

(iii) 在第$k+2$步,数据序列规模为$k + 2$.

该元素不被第$k + 1$个元素替换(被保留)的概率 = $1 - frac{k}{k + 2} imes frac{1}{k} = frac{k + 1}{k + 2}$

(iv) 第$n$步,该元素被保留的概率

$$1 imes frac{k}{k + 1} imes frac{k + 1}{k + 2} imes ... imes frac{n - 1}{n} = frac{k}{n} $$

(2) 对于第$j$个数 ($j > k$)

在第$j$步被选中的概率为$frac{k}{j}$。

在第$j + 1$步,不被$j + 1$个元素替换的概率为$1 - frac{k}{j + 1} imes frac {1}{k} = frac{j}{j + 1}$。

在第$j + 2$步,不被$j + 2$个元素替换的概率为$1 - frac{k}{j + 2} imes frac {1}{k} = frac{j + 1}{j + 2}$

运行到第$n$步,该元素被保留的概率

$$1 imes frac{j}{j + 1} imes frac{j + 1}{j + 2} imes ... imes frac{n - 1}{n} = frac{k}{n}$$

因此,对于这$n$个元素,被保留的概率均为$frac{k}{n}$。

5. 代码实现

import random

class ReservoirSampling(object):
    """
    Reservoir Sampling Alg
    """
    def __init__(self, N):
        super(ReservoirSampling, self).__init__()
        self.pool = [] # total samples
        self.result = [] # sampling result
        self.N = N # total sample number
        self.init()

    def init(self):
        for i in range(self.N):
            self.pool.append(i)
    
    def sampling(self, K):
        """ 1. Put first K element of pool into the result list 
        """ 
        for i in range(K):
            self.result.append(self.pool[i])
        
        """ 2. whether the pool[i] can be saved 
               and replace one element in result[]
        """
        for i in range(K, self.N):
            """ the element pool[i] can be saved in porb: k / (i + 1)
                tmp: [0, k - 1] => k elements
                current total: [0, i] => (i + 1) elements    
                tmp < K can represent: k / (i + 1)
            """
            tmp = random.randint(0, i)
            if tmp < K:
                """ choose one element in result[tmp] and replace with pool[i]
                    since tmp in range: [0, K - 1] -> total indexes of result[]
                    result[tmp]: 1 / k prob to choose one element in result
                """
                self.result[tmp] = self.pool[i]

        return self.result
    
if __name__ == "__main__":
    sampler = ReservoirSampling(100000)
    res = sampler.sampling(30)
    print res

参考链接:https://www.cnblogs.com/snowInPluto/p/5996269.html


PS1. 附上对应用场景中ex3 我自己工作场景 先抽part 在从part中抽样 不公平原因的分析

比如所有part中一共有10,000 条数据,一共有20个part。那么随机选2个part,再从选中的2个part中随机选出1条数据,求概率。

那么选中一条数据的概率是:(1 / 20) * (1 / 19) * (1 / (part1_size + part2_size))

可见这个值会随着抽样出的part的大小不同而不同,并不能保证每条数据以相同的概率被抽样到。

PS2. 蓄水池算法如何用于hadoop?

暂时没有想到解决方法。

理论上有n个mapper, 每个mapper用蓄水池抽出来 k 个sample, 则有 n * k 个sample。再过一个reduce从 n * k 个sample中选出k个sample就好了。

但是实际上分给每个mapper的数据量可能不相同,则每个mapper中每条元素被抽到的概率为 k / mapper_size,其中mapper_size表示该mapper处理的元素个数。

由此可见mapper_size的不同会导致不同mapper中数据元素被抽到的概率不同。所以损失算法的公平性。

原文地址:https://www.cnblogs.com/shiyublog/p/13204701.html