spark uniq 本质上就是单词计数

粗体部分示例:

    # dns_domain_info_list_rdd ==> [(src_ip, domain, domain_ip, timestamp, metadataid), ....]
    all_domains_list = dns_domain_info_list_rdd.map(lambda x: (x[1], 1)).reduceByKey(operator.add).map(lambda x: x[0]).collect()
    all_domains_set = set(all_domains_list)
    domains_with_responsed_ip_list = dns_domain_info_list_rdd.filter(lambda x: x[2]).map(lambda x: (x[1], 1)).reduceByKey(operator.add).map(lambda x: x[0]).collect()
    domains_with_responsed_ip_set = set(domains_with_responsed_ip_list)
    print "all domains cnt:", len(all_domains_list)
    print "all domains sample:", all_domains_list[:3]
    print "all domains set cnt:", len(all_domains_set)
    print "all domains(with ip) cnt:", len(domains_with_responsed_ip_list)
    print "all domains(with ip) sample:", domains_with_responsed_ip_list[:3]
    print "all domains(with ip) set cnt:", len(domains_with_responsed_ip_set)
    nx_domains = all_domains_set - domains_with_responsed_ip_set # 针对同一个域名的多次请求,有的有应答,有的没有应答,按有应答处理。
原文地址:https://www.cnblogs.com/bonelee/p/8919331.html