SparkHiveContext和直接Spark读取hdfs上文件然后再分析效果区别

最近用spark在集群上验证一个算法的问题,数据量大概是一天P级的,使用hiveContext查询之后再调用算法进行读取效果很慢,大概需要二十多个小时,一个查询将近半个小时,代码大概如下:

        try:
            sql = """
                select ltescrsrq, mr_ltencrsrq1, mr_ltencrsrq2, mr_ltencrsrq3, ltescrsrp, mr_ltencrsrp1,
                mr_ltencrsrp2, mr_ltencrsrp3, mr_ltesctadv, mr_longitude, mr_latitude
                from noce.agg_mro_chr_relate_bak  where x = %s
                and y = %s
                and day=20170511
                and 6371000 * ACOS(SIN(x_latitude * PI() / 180) * SIN(y_latitude * PI() / 180) +
                COS(x_latitude * PI() / 180) * COS(y_latitude * PI() / 180) * COS(y_longitude * PI() / 180 -
                x_longitude * PI() / 180)) < 2000
                """ % (a, b)
            sqlcontext.sql(sqlQuery="set hive.mapred.supports.subdirectories=true")
            sqlcontext.sql(sqlQuery="set mapred.input.dir.recursive=true")
            result = sqlcontext.sql(sqlQuery=sql).collect()

        except Exception as e:
            print(e.message)
            break

主要是where之后的hive查询太过缓慢,于是试着直接spark用textFile读取文件然后在进行map和filter操作:

data = sc.textFile("/DATA/PUBLIC/***/**/*/day=%s/*/*/*" % day)
sc.setLogLevel("WARN")
data = data.filter(lambda x: x.split('|')[41] != '' or x.split('|')[40] != '')
data_filter = data.filter(lambda x: int(x.split('|')[1]) == int(*) and int(x.split('|')[2]) == int(*) and 6371000 *
                    np.arccos(np.sin(float(x.split('|')[76]) * np.pi / 180) * np.sin(float(x.split('|')[41]) * np.pi / 180) +
                    np.cos(float(x.split('|')[76]) * np.pi / 180) * np.cos(float(x.split('|')[41]) * np.pi / 180) *
                    np.cos(float(x.split('|')[40]) * np.pi / 180 - float(x.split('|')[75]) * np.pi / 180)) < 2000)
result = data_filter.map(lambda x: [x.split('|')[7], x.split('|')[26], x.split('|')[27], x.split('|')[28],
                                            x.split('|')[6], x.split('|')[21], x.split('|')[22], x.split('|')[23],
                                            x.split('|')[50], x.split('|')[75], x.split('|')[76]]).collect()
result = [map(convert, result[i]) for i in range(len(result))]

验证之后的结果是这样大概总共才半个小时就可以全部跑完。效率何止提升了20倍!!!看来spark对hive的优化做的还不够好,有些人说sparksql可以,但是看了下官网的文档hivecontext是基于sparksql 的,所以感觉 效果还是不理想。

原文地址:https://www.cnblogs.com/Kaivenblog/p/7777596.html