Python守护进程和脚本单例运行

一、简介
     守护进程最重要的特性是后台运行;它必须与其运行前的环境隔离开来,这些环境包括未关闭的文件描述符、控制终端、会话和进程组、工作目录以及文件创建掩码等;它可以在系统启动时从启动脚本/etc/rc.d中启动,可以由inetd守护进程启动,也可以有作业规划进程crond启动,还可以由用户终端(通常是shell)执行。
       Python有时需要保证只运行一个脚本实例,以避免数据的冲突。 
 
二、Python守护进程
1、函数实现
#!/usr/bin/env python
#coding: utf-8
import sys, os
 
'''将当前进程fork为一个守护进程
   注意:如果你的守护进程是由inetd启动的,不要这样做!inetd完成了
   所有需要做的事情,包括重定向标准文件描述符,需要做的事情只有chdir()和umask()了
'''
 
def daemonize (stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
     #重定向标准文件描述符(默认情况下定向到/dev/null)
    try: 
        pid = os.fork() 
          #父进程(会话组头领进程)退出,这意味着一个非会话组头领进程永远不能重新获得控制终端。
        if pid > 0:
            sys.exit(0)   #父进程退出
    except OSError, e: 
        sys.stderr.write ("fork #1 failed: (%d) %s
" % (e.errno, e.strerror) )
        sys.exit(1)
 
     #从母体环境脱离
    os.chdir("/")  #chdir确认进程不保持任何目录于使用状态,否则不能umount一个文件系统。也可以改变到对于守护程序运行重要的文件所在目录
    os.umask(0)    #调用umask(0)以便拥有对于写的任何东西的完全控制,因为有时不知道继承了什么样的umask。
    os.setsid()    #setsid调用成功后,进程成为新的会话组长和新的进程组长,并与原来的登录会话和进程组脱离。
 
     #执行第二次fork
    try: 
        pid = os.fork() 
        if pid > 0:
            sys.exit(0)   #第二个父进程退出
    except OSError, e: 
        sys.stderr.write ("fork #2 failed: (%d) %s
" % (e.errno, e.strerror) )
        sys.exit(1)
 
     #进程已经是守护进程了,重定向标准文件描述符
 
    for f in sys.stdout, sys.stderr: f.flush()
    si = open(stdin, 'r')
    so = open(stdout, 'a+')
    se = open(stderr, 'a+', 0)
    os.dup2(si.fileno(), sys.stdin.fileno())    #dup2函数原子化关闭和复制文件描述符
    os.dup2(so.fileno(), sys.stdout.fileno())
    os.dup2(se.fileno(), sys.stderr.fileno())
 
#示例函数:每秒打印一个数字和时间戳
def main():
    import time
    sys.stdout.write('Daemon started with pid %d
' % os.getpid())
    sys.stdout.write('Daemon stdout output
')
    sys.stderr.write('Daemon stderr output
')
    c = 0
    while True:
        sys.stdout.write('%d: %s
' %(c, time.ctime()))
        sys.stdout.flush()
        c = c+1
        time.sleep(1)
 
if __name__ == "__main__":
      daemonize('/dev/null','/tmp/daemon_stdout.log','/tmp/daemon_error.log')
      main()
        可以通过命令ps -ef | grep daemon.py查看后台运行的继承,在/tmp/daemon_error.log会记录错误运行日志,在/tmp/daemon_stdout.log会记录标准输出日志。
 

概括一下守护进程的编写步骤:

  1. fork出子进程,退出父进程
  2. 子进程变更工作目录(chdir)、文件权限掩码(umask)、进程组和会话组(setsid)
  3. 子进程fork孙子进程,退出子进程
  4. 孙子进程刷新缓冲,重定向标准输入/输出/错误(一般到/dev/null,意即丢弃)
  5. (可选)pid写入文件

理解几个要点


为什么要fork两次

第一次fork,是为了脱离终端控制的魔爪。父进程之所以退出,是因为终端敲击键盘、或者关闭时给它发送了信号;而fork出来的子进程,在父进程自杀后成为孤儿进程,进而被操作系统的init进程接管,因此脱离终端控制。

所以其实,第二次fork并不是必须的(很多开源项目里的代码就没有fork两次)。只不过出于谨慎考虑,防止进程再次打开一个控制终端。因为子进程现在是会话组长了(对话期的首次进程),有能力打开控制终端,再fork一次,孙子进程就不能打开控制终端了。


文件描述符


Linux是“一切皆文件”,文件描述符是内核为已打开的文件所创建的索引,通常是非负整数。进程通过文件描述符执行IO操作。


默认情况下,0代表标准输入,1代表标准输出,2代表标准错误。


umask权限掩码


我们知道,在Linux中,任何一个文件都有读(read)、写(write)和执行(execute)的三种使用权限。其中,读的权限用数字4代表,写权限是2,执行权限是1。命令ls -l可以查看文件权限,r/w/x分别表示具有读/写/执行权限。


任何文件,也都有用户(User),用户组(Group),其他组(Others)三种身份权限。一般用3个数字表示文件权限,例如754:

     7,是User权限,即文件拥有者权限

     5,是Group权限,拥有者所在用户组的组员所具有的权限

     4,是Others权限,即其他组用户的权限啦

而umask是为了控制默认权限,防止新建文件或文件夹具有全权。

系统一般默认为022(使用命令umask查看),表示默认创建文件的权限是644,文件夹是755。你应该可以看出它们的规律,就是文件权限和umask的相加结果为666(笑),文件夹权限和umask的相加结果为777。


进程组

每个进程都属于一个进程组(PG,Process Group),进程组可以包含多个进程。
进程组有一个进程组长(Leader),进程组长的ID(PID, Process ID)就作为整个进程组的ID(PGID,Process Groupd ID)。

会话组

登陆终端时,就会创造一个会话,多个进程组可以包含在一个会话中。而创建会话的进程,就是会话组长。
已经是会话组长的进程,不可以再调用setsid()方法创建会话。因此,上面代码中,子进程可以调用setsid(),而父进程不能,因为它本身就是会话组长。

另外,sh(Bourne Shell)不支持会话机制,因为会话机制需要shell支持工作控制(Job Control)。


守护进程与后台进程

通过&符号,可以把命令放到后台执行。它与守护进程是不同的:


  1. 守护进程与终端无关,是被init进程收养的孤儿进程;而后台进程的父进程是终端,仍然可以在终端打印
  2. 守护进程在关闭终端时依然坚挺;而后台进程会随用户退出而停止,除非加上nohup
  3. 守护进程改变了会话、进程组、工作目录和文件描述符,后台进程直接继承父进程(shell)的

换句话说:守护进程就是默默地奋斗打拼的有为青年,而后台进程是默默继承老爸资产的富二代。




2、类实现 #!/usr/bin/env python #coding: utf-8 #python模拟linux的守护进程 import sys, os, time, atexit, string from signal import SIGTERM class Daemon: def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): #需要获取调试信息,改为stdin='/dev/stdin', stdout='/dev/stdout', stderr='/dev/stderr',以root身份运行。 self.stdin = stdin self.stdout = stdout self.stderr = stderr self.pidfile = pidfile def _daemonize(self): try: pid = os.fork() #第一次fork,生成子进程,脱离父进程 if pid > 0: sys.exit(0) #退出主进程 except OSError, e: sys.stderr.write('fork #1 failed: %d (%s) ' % (e.errno, e.strerror)) sys.exit(1) os.chdir("/") #修改工作目录 os.setsid() #设置新的会话连接 os.umask(0) #重新设置文件创建权限 try: pid = os.fork() #第二次fork,禁止进程打开终端 if pid > 0: sys.exit(0) except OSError, e: sys.stderr.write('fork #2 failed: %d (%s) ' % (e.errno, e.strerror)) sys.exit(1) #重定向文件描述符 sys.stdout.flush() sys.stderr.flush() si = file(self.stdin, 'r') so = file(self.stdout, 'a+') se = file(self.stderr, 'a+', 0) os.dup2(si.fileno(), sys.stdin.fileno()) os.dup2(so.fileno(), sys.stdout.fileno()) os.dup2(se.fileno(), sys.stderr.fileno()) #注册退出函数,根据文件pid判断是否存在进程 atexit.register(self.delpid) pid = str(os.getpid()) file(self.pidfile,'w+').write('%s ' % pid) def delpid(self): os.remove(self.pidfile) def start(self): #检查pid文件是否存在以探测是否存在进程 try: pf = file(self.pidfile,'r') pid = int(pf.read().strip()) pf.close() except IOError: pid = None if pid: message = 'pidfile %s already exist. Daemon already running! ' sys.stderr.write(message % self.pidfile) sys.exit(1) #启动监控 self._daemonize() self._run() def stop(self): #从pid文件中获取pid try: pf = file(self.pidfile,'r') pid = int(pf.read().strip()) pf.close() except IOError: pid = None if not pid: #重启不报错 message = 'pidfile %s does not exist. Daemon not running! ' sys.stderr.write(message % self.pidfile) return #杀进程 try: while 1: os.kill(pid, SIGTERM) time.sleep(0.1) #os.system('hadoop-daemon.sh stop datanode') #os.system('hadoop-daemon.sh stop tasktracker') #os.remove(self.pidfile) except OSError, err: err = str(err) if err.find('No such process') > 0: if os.path.exists(self.pidfile): os.remove(self.pidfile) else: print str(err) sys.exit(1) def restart(self): self.stop() self.start() def _run(self): """ run your fun""" while True: #fp=open('/tmp/result','a+') #fp.write('Hello World ') sys.stdout.write('%s:hello world ' % (time.ctime(),)) sys.stdout.flush() time.sleep(2) if __name__ == '__main__': daemon = Daemon('/tmp/watch_process.pid', stdout = '/tmp/watch_stdout.log') if len(sys.argv) == 2: if 'start' == sys.argv[1]: daemon.start() elif 'stop' == sys.argv[1]: daemon.stop() elif 'restart' == sys.argv[1]: daemon.restart() else: print 'unknown command' sys.exit(2) sys.exit(0) else: print 'usage: %s start|stop|restart' % sys.argv[0] sys.exit(2) 运行结果: 可以参考:http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/,它是当Daemon设计成一个模板,在其他文件中from daemon import Daemon,然后定义子类,重写run()方法实现自己的功能。 class MyDaemon(Daemon): def run(self): while True: fp=open('/tmp/run.log','a+') fp.write('Hello World ') time.sleep(1) 不足:信号处理signal.signal(signal.SIGTERM, cleanup_handler)暂时没有安装,注册程序退出时的回调函数delpid()没有被调用。 然后,再写个shell命令,加入开机启动服务,每隔2秒检测守护进程是否启动,若没有启动则启动,自动监控恢复程序。 #/bin/sh while true do count=`ps -ef | grep "daemonclass.py" | grep -v "grep"` if [ "$?" != "0" ]; then daemonclass.py start fi sleep 2 done 三、python保证只能运行一个脚本实例 1、打开文件本身加锁 #!/usr/bin/env python #coding: utf-8 import fcntl, sys, time, os pidfile = 0 def ApplicationInstance(): global pidfile pidfile = open(os.path.realpath(__file__), "r") try: fcntl.flock(pidfile, fcntl.LOCK_EX | fcntl.LOCK_NB) #创建一个排他锁,并且所被锁住其他进程不会阻塞 except: print "another instance is running..." sys.exit(1) if __name__ == "__main__": ApplicationInstance() while True: print 'running...' time.sleep(1) 注意:open()参数不能使用w,否则会覆盖本身文件;pidfile必须声明为全局变量,否则局部变量生命周期结束,文件描述符会因引用计数为0被系统回收(若整个函数写在主函数中,则不需要定义成global)。 2、打开自定义文件并加锁 #!/usr/bin/env python #coding: utf-8 import fcntl, sys, time pidfile = 0 def ApplicationInstance(): global pidfile pidfile = open("instance.pid", "w") try: fcntl.lockf(pidfile, fcntl.LOCK_EX | fcntl.LOCK_NB) #创建一个排他锁,并且所被锁住其他进程不会阻塞 except IOError: print "another instance is running..." sys.exit(0) if __name__ == "__main__": ApplicationInstance() while True: print 'running...' time.sleep(1) 3、检测文件中PID #!/usr/bin/env python #coding: utf-8 import time, os, sys import signal pidfile = '/tmp/process.pid' def sig_handler(sig, frame): if os.path.exists(pidfile): os.remove(pidfile) sys.exit(0) def ApplicationInstance(): signal.signal(signal.SIGTERM, sig_handler) signal.signal(signal.SIGINT, sig_handler) signal.signal(signal.SIGQUIT, sig_handler) try: pf = file(pidfile, 'r') pid = int(pf.read().strip()) pf.close() except IOError: pid = None if pid: sys.stdout.write('instance is running... ') sys.exit(0) file(pidfile, 'w+').write('%s ' % os.getpid()) if __name__ == "__main__": ApplicationInstance() while True: print 'running...' time.sleep(1) 4、检测特定文件夹或文件 #!/usr/bin/env python #coding: utf-8 import time, commands, signal, sys def sig_handler(sig, frame): if os.path.exists("/tmp/test"): os.rmdir("/tmp/test") sys.exit(0) def ApplicationInstance(): signal.signal(signal.SIGTERM, sig_handler) signal.signal(signal.SIGINT, sig_handler) signal.signal(signal.SIGQUIT, sig_handler) if commands.getstatusoutput("mkdir /tmp/test")[0]: print "instance is running..." sys.exit(0) if __name__ == "__main__": ApplicationInstance() while True: print 'running...' time.sleep(1) 也可以检测某一个特定的文件,判断文件是否存在: import os import os.path import time #class used to handle one application instance mechanism class ApplicationInstance: #specify the file used to save the application instance pid def __init__( self, pid_file ): self.pid_file = pid_file self.check() self.startApplication() #check if the current application is already running def check( self ): #check if the pidfile exists if not os.path.isfile( self.pid_file ): return #read the pid from the file pid = 0 try: file = open( self.pid_file, 'rt' ) data = file.read() file.close() pid = int( data ) except: pass #check if the process with specified by pid exists if 0 == pid: return try: os.kill( pid, 0 ) #this will raise an exception if the pid is not valid except: return #exit the application print "The application is already running..." exit(0) #exit raise an exception so don't put it in a try/except block #called when the single instance starts to save it's pid def startApplication( self ): file = open( self.pid_file, 'wt' ) file.write( str( os.getpid() ) ) file.close() #called when the single instance exit ( remove pid file ) def exitApplication( self ): try: os.remove( self.pid_file ) except: pass if __name__ == '__main__': #create application instance appInstance = ApplicationInstance( '/tmp/myapp.pid' ) #do something here print "Start MyApp" time.sleep(5) #sleep 5 seconds print "End MyApp" #remove pid file appInstance.exitApplication() 上述os.kill( pid, 0 )用于检测一个为pid的进程是否还活着,若该pid的进程已经停止则抛出异常,若正在运行则不发送kill信号。 5、socket监听一个特定端口 #!/usr/bin/env python #coding: utf-8 import socket, time, sys def ApplicationInstance(): try: global s s = socket.socket() host = socket.gethostname() s.bind((host, 60123)) except: print "instance is running..." sys.exit(0) if __name__ == "__main__": ApplicationInstance() while True: print 'running...' time.sleep(1) 可以将该函数使用装饰器实现,便于重用(效果与上述相同): #!/usr/bin/env python #coding: utf-8 import socket, time, sys import functools #使用装饰器实现 def ApplicationInstance(func): @functools.wraps(func) def fun(*args,**kwargs): import socket try: global s s = socket.socket() host = socket.gethostname() s.bind((host, 60123)) except: print('already has an instance...') return None return func(*args,**kwargs) return fun @ApplicationInstance def main(): while True: print 'running...' time.sleep(1) if __name__ == "__main__": main() 四、总结 (1)守护进程和单脚本运行在实际应用中比较重要,方法也比较多,可选择合适的来进行修改,可以将它们做成一个单独的类或模板,然后子类化实现自定义。 (2)daemon监控进程自动恢复避免了nohup和&的使用,并配合shell脚本可以省去很多不定时启动挂掉服务器的麻烦。 (3)若有更好的设计和想法,可随时留言,在此先感谢!
原文地址:https://www.cnblogs.com/lincappu/p/13405218.html