维护大数据系统时遇到的几个问题及解决方案

2018年进入一家互联网公司的大数据部门,在进入部门后,遇到了一些问题,这里记录一下。

  先简单介绍一下的背景:公司是一家中小型的互联网电商公司,在2015年基于CDH搭建了大数据系统,大约30台机器(32C+256G),总存储量大于2PB,在进入部门后,需要对集群情况有个梳理,遇到了不少问题:
    当时只知道job分散在 crontab和airflow上,airflow有管理界面,可以看到所有的Job。但对于用crontab管理的job,这些job分散在不同的机器上,每个开发人员都只知道自己的job, 没人知道所有的。 所以当时使用如下命令,扫描了30台机器上所有的crontab,并保存到数据库。
#!/usr/bin/env bash

for ip in `cat ip`
do
    echo $ip
    ssh $ip "cat /etc/passwd | cut -f 1 -d : |xargs -I {} crontab -l -u {}"
done

命令参考:https://blog.csdn.net/mlzhu007/article/details/81662091

ip文件内容

192.168.1.1
192.168.1.2
...

对于airflow来说,有管理页面可以看到所有Job,同时这些Job数据是保存在mysql中,可以直接访问mysql数据库获得

SELECT dag_id,owners FROM `airflow`.`dag` where is_active=1 and is_subdag=0 and is_paused=0 order by last_scheduler_run desc 
    对于airflow来说,可以配置email_on_failure参数,Job运行失败会发送邮件到负责人,但是随着人员流动,有些人离职并没有及时修改负责人邮件,导致job报错了也没有人知道,当时采取了如下方式:
    1. 设置一个邮件组,错误邮件发送到邮件组,部门人员在入职后都加入这个邮件组,所有人都可以接收到邮件,
args = {
    'owner': 'XX',
    'email': ['alarm@xx.com'],
    'email_on_failure': True,
}

    2. 因为历史原因,部分airflow dag并没有配置email参数,报错还是会漏掉, 这里可以从airflow的mysql数据库中获取运行失败的job

-- 最近10分钟,运行失败的job
SELECT dag_id  FROM `airflow`.`task_instance`  where end_date > date_sub(now(),interval 10 MINUTE)  and state = 'failed'
基于task_instance表还可以获取如下
执行最耗时的job top10:
select dag_id,avg(duration) as duration from (
SELECT dag_id,execution_date,min(start_date) as start_date,max(end_date) as end_date,sum(duration) as duration FROM `airflow`.`task_instance`
where start_date > '2018-05-08 00:00:00'   -- 这里调整为最近24小时
group by dag_id,execution_date) as t group by dag_id order by duration desc limit 10

执行次数最多的Job:

select * from (
select  dag_id,count(1) as count from(
SELECT dag_id,execution_date,min(start_date) as start_date,max(end_date) as end_date,sum(duration) as duration FROM `airflow`.`task_instance`
where start_date > '2018-05-08 00:00:00'
group by dag_id,execution_date) as t group by dag_id ) as t1 order by count desc
   上面第二步我们是知道了job是否执行成功,到了这一步想知道job运行是否正常,成功不等于正常, 举个例子:当一个job开始执行的时候,依赖的上游数据还没计算好,该job执行完毕,即使job本身没有报错,跑出来的结果肯定也是有问题的。   
    针对上面的问题,可以给job加一个前置依赖,判断数据是否准备完成,或者配置两个Job的依赖关系。
   但是实际情况远比这个这个复杂,而且这些都需要改造job,工作量大。
   基于此, 针对airflow中正在运行的job,通过对 该job过去5次成功运行的平均耗时时间当前已经运行的耗时时间对比,如果当前job已运行的时间超过过去5次平均耗时的50%,就触发报警。实现方式还是根据airlfow的mysql数据库中的数据实现,这里不再赘述。
   除了批处理job,还有spark streaming程序。偶尔也会有挂掉的情况(GC,异常等),对应的做法是,通过请求spark UI ,获取html代码并解析出运行的job,然后和定义的列表allJob(所有的spark streaming job)对比,allJob.difference(runJob) , 如果有结果,则表示job有挂掉,邮件通知出来。 
    当时有人在HUE中写的hive SQL运行超过几个小时的情况,有些情况属于异常行为,这些情况应该尽早被发现。 yarn有暴露rest接口出来,通过  http://hadoop-master1:8088/ws/v1/cluster/apps?states=RUNNING 就可以获取所有运行的Job,  计算已运行时间,如果超过2个小时,则进行报警。 
    上面是对yarn上运行的job进行了监控,还可以对所有完成的job进行监控 
    通过rest接口http://hadoop-master1:19888/ws/v1/history 获取yarn 上所有job(可以带条件,比如开始时间,结束时间),导入excel , 基于excel可以对yarn有以下基本的分析:
  1. 通过对Job 耗时排序,获取耗时top10,然后看是否有优化的空间。
  2. 通过对比map或者reduce的task耗时的标准平方差, 看是否存在长尾问题。
  3. 查看Job的执行开始时间, 看是否存在job扎堆的情况, 配合yarn CPU的情况, 可以将job打散。
   获取历史job参考:https://blog.csdn.net/xw514124202/article/details/94164393 
    因为数据仓库是分层的,层与层之间有依赖关系,某个表的数据是否正确,如果不正确, 就要去找这个表的上游, 或者看影响的下游表有哪些。
    数据血缘实现主要经历2个阶段:
  1. 手工阶段,获取所有yarn job,然后获取job的“query_string”属性, 得到sql, 然后用阿里的druid,解析SQL, 得到源表和目标表。 
  2. 部署atlas。
    因为我们大部分的计算数据都来自ODS层,而ODS层数据中,埋点数据的准确性对后续的分析至关重要,所以我们从两个维度对埋点数据进行了监控:
  1. 数量:主要是对每个小时收集到的埋点数据量进行统计,然后和前一个小时, 昨天的同时间对比。 超过或低于某个阈值,进行报警。 
  2. 生成时间,主要看 hive 表对应分区的生成时间,没有在规定的时间内生成,则报警。
    大数据通常会用到mongodb, 有些是通过job把mongodb的数据抽到ODS层,有些是通过job把ADS层的数据写入到mongodb,有些是业务站点读取,  访问情况很复杂,DBA有时会告诉我线上mongo压力很大,但是只能具体到实例,具体哪个表,无法知道。 基于此, 我开发了下面的程序, 主要是通过监控MongoDB,来看是哪个机器在访问(原理是循环调用current_op命令) , 配合lsof命令, 可以具体到目标进程。根据进程也就具体到了程序。 
import signal
import sys
from datetime import datetime
import time
from collections import Counter
from collections import defaultdict
from pymongo import MongoClient

client = MongoClient("mongodb://xx:xxx@127.0.0.1:27023/admin")
db = client.admin

dic = defaultdict(list)
global_dic = defaultdict(list)
global_uuid_list = list()


def run():
    signal.signal(signal.SIGINT, quit)
    signal.signal(signal.SIGTERM, quit)
    while True:
        current_ops = db.current_op()
        inprogs = current_ops['inprog']

        for item in inprogs:
            if 'client' in item:
                # dic[item['ns'] + " - " + item['op']].append(item['client'])
                opid = item['opid']
                if opid not in global_uuid_list:
                    global_dic[item['ns'] + " - " + item['op'] + " - " + item.get('planSummary', "")].append(
                        item['client'])
                    global_uuid_list.append(opid)

        # printdic(dic)
        # print
        # dic.clear()
        time.sleep(0.1)


def printdic(innerdic):
    print 'time:', datetime.now()
    for ns, iplist in innerdic.items():
        print ns
        cnt_total = Counter(iplist)
        for ip in cnt_total.most_common():
            print '	', ip[0], ip[1]


def printrep(innerdic):
    for ns, iplist in innerdic.items():
        cnt_total = Counter(iplist)
        cmd = 0
        for ip in cnt_total.most_common():
            cmd += int(ip[1])
        print ns, cmd


def quit(signum, frame):
    print ''
    print ''
    print ''
    printdic(global_dic)
    print ''
    print ''
    print '-----------------------------'
    printrep(global_dic)
    sys.exit()


if __name__ == '__main__':
    run()

运行上面的程序,在一段时间后,按ctrl+C 结束,即会如下打印报表,可以看到在指定时间内,访问最频繁的表以及 客户端IP 地址。 

time: 2018-05-10 20:55:08.030770
local.oplog.rs - getmore -
        192.168.88.1:498168 9
        192.168.88.28:34739 9
admin.$cmd - query -
        1687.0.0.1:42237 1
        192.168.25.59:33525 1
        192.168.25.59:33524 1
db_xx.col1 - update -
        192.168.28.13:54815 1
 - query -
        192.168.28.13:54815 1
db_xx.$cmd - query -
        192.168.26.25:25360 3
        192.168.26.25:48908 3
        192.168.28.13:53171 2
        192.168.211.1:45186 1


-----------------------------
local.oplog.rs - getmore -  18
admin.$cmd - query -  3
db_xx.col1 - update -  1
 - query -  1
db_xx.$cmd - query -  9
   比如某个表,每天写入的数据量很大, 但是根据名称判断应该是一个很早的表了, 应该没有访问了, 但是不能确定, 此时就需要一种方法来判断。
   如果是mysql,可以通过查找information_schema的方式, 知道这个表最后更新时间, 来判断是否有job在写。 对于读就没有特别好的办法了,当时采用方法如下:
  1. 开启MySQL general log日志, 
  2. 通过flume或者其他方式将日志保存到HDFS上进行分析,判断目标表是否有访问。
 这里之所以花费大力气判断一个表是否有读,主要是根据读的情况,来决定是否下线对应的Job,毕竟Job不可能无休止的增加,对于已经下线的业务,对应的Job也可以下线,节约出资源跑其他job。而之所以采用这种方式,也是因为大数据项目中数据引用,数据依赖错综复杂, 通过找代码判断一个表是否有访问已经非常困难。
   通过解析fsimage的方式。 查看小文件。 
hdfs oiv -i fsimage_{XX} -o fsimage.csv -p Delimited -t tmp
  1. 这是我所遇到的问题的一部分,后面还会遇到元数据管理,数据源管理等。
  2. 大数据治理理想很丰满, 现实很骨感, 一般公司都是先搞一套大数据, 快速开发, 快速上线,快速产出价值, 然后才会做数据治理的工作。 毕竟相比于业务价值,数据治理的价值在刚起步阶段并不是那么明显。
  3. 限于当时的人力和时间,当时主要还是做搜索,推荐等业务项目的开发,针对大数据本身的治理工作也是遇到问题才去解决, 没有形成一个体系,终究是个遗憾。
原文地址:https://www.cnblogs.com/beyondbit/p/14761141.html