用pymongo 删除mongodb中的重复数据(aggregate mapreduce)

方法一    aggergate:

aggregate是mongo管道方法,使用管道运行百万以上数据需要注意:

  1. batchsize (如果长时间连接mongo会导致游标丢失错误,这个参数是每次读取数量,可以设置为固顶置,不断返回数据,游标不会丢失)
  2. allowDiskUse(是允许最大占用最大内存,设置为True,不会报内存不够的错误)
  3. 上面这两个参数需要用=号键值对的方式设置,不能用:字典的形式
  4. pipline
  5. 用bulk_wirte进行去重,bulk的两个参数也需要设置
  6. 用python的迭代器去运行

4、pipline里的参数;

db.school.aggregate(
[
{$match:{time:{$gt:1513612800}}},
{$group:{_id:{insituteName:"$institute",class:"$name"},count:{$sum:1}}},
{$sort:{count:-1}},
{$limit:10}]
)
注:
1. $match 表示查询条件
2. $group 表示分组,_id固定写法,
   如果按照多个字段分组,则字段必须都在_id对象中,可以取别名,例如上面示例,输出结果为:
   { "_id" : { "insituteName" : "数学系",class: "一班"}, "count" : 200 }
   { "_id" : { "insituteName" : "英语系",class: "二班" }, "count" : 201 } 
   如果按照一个字段分组,则可以写为{$group:{_id:"$institute",count:{$sum:1}}},此时,查询结果为:
   { "_id" : "数学系", "count" : 1000 }
   { "_id" : "英语系", "count" : 800 }
   若没有分组字段,则{$group:{_id:null,count:{$sum:1}}}
3. $sort 排序,1——正序,-1——倒序
4. $limit 限制输出数据条数
5. count:{$sum:1},1表示统计查询结果数量,如果想统计time字段和则使用 count:{$sum:"$time"}。
    聚合函数有:min,max,sum,avg
6. $project 表示选择显示的字段,1——显示,0——不显示,如下示例:
db.school.aggregate([{$project:{insituteName:1,"time:1,_id:0}},{$match:{"time:{$gt:1513612800}}},{$sort:{"time:-1}},{$limit:10}])
输出:
   { "insituteName" : "数学系", "time" : 1521392371 }
   { "name" : "英语系", "time" : 1521392370 }
7. $skip在跳过指定数量的文档,并返回余下的文档,例如,下面跳过前5条文档,显示第6到第10条:

db.school.aggregate([{$project:{time:1,name:1,_id:0}},{$match:{time:{$gt:1513612800}}},{$group:{_id:{className:"$name"},count:{$sum:1}}},{$sort:{count:-1}},{$limit:10},{$skip:5}]

注释:
$project先筛选出所有文档的time和name两个字段,
$match然后筛选出时间大于1513612800的文档,
$group按照name字段分组,重命名为className,
$sum统计服务条件的文档数量,
$sort按照数量count字段倒序排序,
$limit限制一共输出前10条文档;
$skip跳过前5条文档,输出第6到第10条文档。

参考:
https://blog.csdn.net/liuxiaoxiaosmile/article/details/79666391

  

5和6、python的迭代器,如果数据量过大,bulk_write也会报错,那么需要用python的iter去运行

示例:

map_id = map(lambda doc: doc['dups'][1:], infoall_quali.aggregate(
                                                                      pipeline=pipeline,
                                                                      batchSize=200,
                                                                      allowDiskUse=True
                                                                      ))
list_id = [item for sublist in map_id for item in sublist]
alist = iter(list_id)
anum = 1
count = 1000
len_resultlist = len(list_id)
print('Please wait for Copy database {}'.format(len_resultlist))
while True:
    if (anum - 1) * count >= len_resultlist:
        break
    blist = islice(alist, 0, count)
    result = infoall_quali.bulk_write(list(map(lambda _id: DeleteOne({'_id': _id}), [x for x in blist])),
                                          ordered=False,
                                          bypass_document_validation=True
                                          ).bulk_api_result
    print(anum,result, datetime.datetime.now())
    anum += 1
myclient.close()

方法二、mapreduce:

MongoDB中的MapReduce主要有以下几阶段:

  • Map:把一个操作Map到集合中的每一个文档

  • Shuffle: 根据Key分组对文档,并且为每个不同的Key生成一系列(>=1个)的值表(List of values)。

  • Reduce: 处理值表中的元素,直到值表中只有一个元素。然后将值表返回到Shuffle过程,循环处理,直到每个Key只对应一个值表,并且此值表中只有一个元素,这就是MR的结果。

  • Finalize:此步骤不是必须的。在得到MR最终结果后,再进行一些数据“修剪”性质的处理。

原文地址:https://www.cnblogs.com/Robertzewen/p/10265636.html