走读中学到的技巧 airflow

自动发掘dag实例
import imp
    def process_file(self, filepath):
        mod_name, file_ext = os.path.splitext(os.path.split(filepath)[-1])
        if file_ext != '.py':
            return
        try:
            logging.info("Importing " + filepath)
            m = imp.load_source(mod_name, filepath)
        except :
            traceback.print_exc()
            logging.error("Failed to import: " + filepath)
        else:
            for dag in m.__dict__.values():
                if type(dag) == DAG:
                    if dag.dag_id in self.dags:
                        raise Exception(
                            'Two DAGs with the same dag_id. No good.')
                    self.dags[dag.dag_id] = dag
                    dag.dagbag = self
通过向队列中放入特殊的量来实现终止任务
def end(self): # Sending poison pill to all worker [self.queue.put((None, None))
for w in self.workers] # Wait for commands to finish self.queue.join()

=================== subprocess

subprocess.Popen用来创建新的进程。

subprocess.Popen(args, bufsize=0, executable=None, stdin=None,
    stdout=None, stderr=None, preexec_fn=None, close_fds=False,
    shell=False, cwd=None, env=None, universal_newlines=False,
    startupinfo=None, creationflags=0
)

shell参数:

当shell=True时,表示在系统默认的shell环境中执行新的进程,此shell在windows表示为cmd.exe,在linux为/bin/sh。

executable参数:

当指定shell=True时,executable用来修改默认的shell环境,比如executable='/bin/bash'。

stdin,stdout,stderr参数:

默认地stdin,stdout,stderr均为None,此时表示此新进程的stdin,stdout,stderr均为默认,从keyboard获得输入,将输出和错误输出到display。如果stdin设置为PIPE,此时的stdin其实是个file对象,用来提供输入到新创建的子进程;如果stdout设置为PIPE,此时stdout其实是个file对象,用来保存新创建的子进程的输出;如果stderr设置为PIPE,此时的stderr其实是个file对象,用来保存新创建的子进程的错误输出。

universal_newlines参数:

如果此参数设置为True,则新进程的stdout和stderr将输出为text,换行为' '或' '或' '按照不同的系统平台。

Popen类拥有的方法及属性

1、Popen.pid

获取子进程的进程ID。

2、Popen.returncode

获取进程的返回码。如果进程未结束,将返回None。

3、communicate(input=None)

与子进程进行交互,像stdin发送数据,并从stdout和stderr读出数据存在一个tuple中并返回。
参数input应该是一个发送给子进程的字符串,如果未指定数据,将传入None。

4、poll()

检查子进程是否结束,并返回returncode属性。

5、wait()

等待子进程执行结束,并返回returncode属性,如果为0表示执行成功。

6、send_signal( sig)

发送信号给子进程。

7、terminate()

终止子进程。windows下将调用Windows API TerminateProcess()来结束子进程。

8、kill()

官方文档对这个函数的解释跟terminate()是一样的,表示杀死子进程。

def _read_task_logs(self, stream):
        while True:
            line = stream.readline().decode('utf-8')
            if len(line) == 0:
                break
            self.logger.info('Subtask: {}'.format(line.rstrip('
')))

    def run_command(self, run_with, join_args=False):
        """
        起子进程,同时读取子进程的日志"""
        cmd = [" ".join(self._command)] if join_args else self._command
        full_cmd = run_with + cmd
        self.logger.info('Running: {}'.format(full_cmd))
        proc = subprocess.Popen(
            full_cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT
        )

        # Start daemon thread to read subprocess logging output
        log_reader = threading.Thread(
            target=self._read_task_logs,
            args=(proc.stdout,),
        )
        log_reader.daemon = True
        log_reader.start()
        return proc
    def execute(self, execution_date):

        bash_command = self.bash_command

        logging.info("Runnning command: " + bash_command)
        sp = Popen(
            bash_command, shell=True, stdout=PIPE, stderr=PIPE)
        out, err = sp.communicate()
        sp.wait()

        logging.info("Command STDOUT:
" + out)
        if err:
            logging.error(err)
        if sp.returncode:
            raise Exception("Bash command failed")
       将不确定元素在执行时变成确定       
                    task_copy = copy.copy(self.task)
                    for attr in task_copy.__class__.template_fields:
                        source = getattr(task_copy, attr) # 'echo {{ ti.execution_date }}'
                        setattr(
                            task_copy, attr,
                            jinja2.Template(source).render(**jinja_context)
                        ) # 'echo 2020-01-02 00:00:00'
                    task_copy.execute(self.execution_date)
作者通过pickle将dag存入了数据库,和从数据库进行加载
self.pickle = pickle.dumps(dag)
return pickle.loads(self.pickle)

 =========== 超时退出

signal包的核心是使用signal.signal()函数来预设(register)信号处理函数:singnal.signal(signalnum, handler)
signalnum为某个信号,handler为该信号的处理函数。我们在信号基础里提到,进程可以无视信号,可以采取默认操作,还可以自定义操作。当handler为signal.SIG_IGN时,信号被无视(ignore)。当handler为singal.SIG_DFL,进程采取默认操作(default)。当handler为一个函数名时,进程采取函数中定义的操作。

signal.alarm(),它被用于在一定时间之后,向进程自身发送SIGALRM信号

 1 class timeout(object):
 2     """
 3     To be used in a ``with`` block and timeout its content.
 4     """
 5     def __init__(self, seconds=1, error_message='Timeout'):
 6         self.seconds = seconds
 7         self.error_message = error_message
 8 
 9     def handle_timeout(self, signum, frame):
10         logging.error("Process timed out")
11         raise AirflowTaskTimeout(self.error_message)
12 
13     def __enter__(self):
14         try:
15             signal.signal(signal.SIGALRM, self.handle_timeout)
16             signal.alarm(self.seconds) #self.seconds秒之后,向自己发送SIGALRM信号
17         except ValueError as e:
18             logging.warning("timeout can't be used in the current context")
19             logging.exception(e)
20 
21     def __exit__(self, type, value, traceback):
22         try:
23             signal.alarm(0)  #通过alarm(0)来取消调用回调函数self.handle_timeout
24         except ValueError as e:
25             logging.warning("timeout can't be used in the current context")
26             logging.exception(e)
创建后台守护进程
pid, stdout, stderr, log_file = setup_locations("scheduler", args.pid, args.stdout, args.stderr, args.log_file) handle = setup_logging(log_file) stdout = open(stdout, 'w+') stderr = open(stderr, 'w+') ctx = daemon.DaemonContext( pidfile=TimeoutPIDLockFile(pid, -1), files_preserve=[handle], stdout=stdout, stderr=stderr, ) with ctx: job.run() stdout.close() stderr.close()

 

  1. 使用importlib.import_module函数将包含特定环境代码的模块加载到局部变量module中。

  2. 最后,更新这个模块的globals,将development.py文件中设置合并到其中

def helper():
    pass
p = multiprocessing.Process(target=helper,
                                    args=(),
                                    name="{}-Process".format(thread_name))
p.start() #启动进程
p.terminate() #终止进程,但进程不会马上停,可以通过is_alive查看是否还活着
p.join(5) #进程
#若知道进程pid,可以强杀进程
 os.kill(p.pid, signal.SIGKILL)
 静态方法没有self参数,也没cls参数,如:
 @staticmethod
    def _launch_process(result_queue,
                        file_path,
                        pickle_dags,
                        dag_id_white_list,
                        thread_name,
                        log_file):
        pass
修改进程的标准输入、输出
        def helper():
            f = open(log_file, "a")
            original_stdout = sys.stdout
            original_stderr = sys.stderr

            sys.stdout = f
            sys.stderr = f

            try:
                pass
            except:
                # Log exceptions through the logging framework.
                logging.exception("Got an exception! Propagating...")
                raise
            finally:
                sys.stdout = original_stdout
                sys.stderr = original_stderr
                f.close()
时间差
ts = datetime.now() #some process td = datetime.now() - ts td = td.total_seconds() + (float(td.microseconds) / 1000000) #总耗时,单位是秒
创建子进程执行命令:
command = "exec bash -c '{0}'".format(command)
try:
    subprocess.check_call(command, shell=True)
    state = State.SUCCESS
except subprocess.CalledProcessError as e:
    state = State.FAILED
    self.logger.error("failed to execute task {}:".format(str(e)))

 修改当前线程的线程名:threading.current_thread().name = thread_name #修改进程的主线程名

水电费



原文地址:https://www.cnblogs.com/testzcy/p/12150093.html