liunx pyinotify的安装和使用

介绍此功能是检测目录的操作的事件

1.安装 

在百度云盘下载或者在gits上下载安装包

  链接:https://pan.baidu.com/s/1Lqt872YEgEo_bNPEnEJMaw
  提取码:bjl2

# git clone https://github.com/seb-m/pyinotify.git     
# cd pyinotify/
# ls
# python setup.py install

2.使用在代码中直接导入就可以

# -*- coding: utf-8 -*-
# !/usr/bin/env python
import os
import Queue
import datetime
import pyinotify
import logging
#debug 
import threading,time

pos = 0
save_pos = 0
filename =""   #
filename1 =""
pathname =""




class Singleton(type):
    def __call__(cls, *args, **kwargs):
        if not hasattr(cls, 'instance'):
            cls.instance = super(Singleton, cls).__call__(*args, **kwargs)
        return cls.instance
    def __new__(cls, name, bases, dct):
        return type.__new__(cls, name, bases, dct)
 
    def __init__(cls, name, bases, dct):
        super(Singleton, cls).__init__(name, bases, dct)


class AuditLog(object):
    __metaclass__ = Singleton
    def __init__(self):

        FORMAT = ('%(asctime)s.%(msecs)d-%(levelname)s'
                  '[%(filename)s:%(lineno)d:%(funcName)s]: %(message)s')
        DATEFMT = '%Y-%m-%d %H:%M:%S'
        logging.basicConfig(level=logging.DEBUG, format=FORMAT,datefmt=DATEFMT)

        logging.debug("__init__")
        self.log_queue = Queue.Queue(maxsize=1000)
        self.str = "AuditLog"


    def start(self,path):  #  path   as:  /var/log/auth.log
        try:
            print "start:",threading.currentThread()
            if( not os.path.exists(path)):
                logging.debug("文件路径不存在")
                return
            logging.debug("文件路径:{}".format(path))
            global pathname
            pathname = path

            # # 输出前面的log
            # printlog()
            file =  path[:path.rfind('/')]
            
            global filename
            global filename1
            filename = path[path.rfind('/')+1:]
            filename1 = filename + ".1"
            print "filename:",filename,"    filename1 ",filename1
            # watch manager
            wm = pyinotify.WatchManager()
            wm.add_watch(file, pyinotify.ALL_EVENTS, rec=True)
            eh = MyEventHandler()
            # notifier
            notifier = pyinotify.Notifier(wm, eh)
            notifier.loop()

        except Exception as e:
            logging.error('[AuditLog]:send error: %s',str(e))
            return

    def read_log_file(self):
        global pos
        global save_pos
        try:
            # if( not os.pathname.exists(pathname)):
            #     logging.debug("读取的文件不存在")
            #     return
            fd = open(pathname)
            if pos != 0:
                fd.seek(pos, 0)
            while True:
                line = fd.readline()
                if line.strip():
                    logging.debug("put queue:{}".format(line.strip()))
                    try:
                        self.log_queue.put_nowait(line.strip())
                    except Queue.Full:
                        logging.warn('send_log_queue is full now')
                    
                    pos = fd.tell()
                    save_pos = pos
                else:
                    break
            fd.close()
        except Exception, e:
            logging.error('[AuditLog]:send error: %s',str(e))

    def read_log1_file(self):

        
        try:
            global save_pos
            global pos 
            pos = 0
            pathname1 = pathname + ".1"
            # if( not os.pathname1.exists(pathname1)):
            #     logging.debug("读取的文件不存在")
            #     return
            fd = open(pathname1)
            if save_pos != 0:
                fd.seek(save_pos, 0)
            while True:
                line = fd.readline()
                if line.strip():
                    if save_pos > fd.tell() :
                        print save_pos,"   ",fd.tell()
                        continue 
                    try:
                        self.log_queue.put_nowait(line.strip())
                        logging.debug("put queue:{}".format(line.strip()))
                    except Queue.Full:
                        logging.warn('send_log_queue is full now')

                    
                else:
                    save_pos = 0
                    break
            fd.close()
        except Exception, e:
            logging.error('[AuditLog]:send error: %s',str(e))



    def __del__(self):
        print "del"






class MyEventHandler(pyinotify.ProcessEvent):

    print "MyEventHandler :",threading.currentThread()
    def __init__(self):
        print "MyEventHandler  __init__"
        self.auditLogObject = AuditLog()
        print self.auditLogObject.str
    # 当文件被修改时调用函数
    def process_IN_MODIFY(self, event):  #文件修改
        print "process_IN_MODIFY",event.name

    def process_IN_CREATE(self, event): #文件创建
        # logging.debug("process_IN_CREATE")
        print "create event:", event.name
                #  log.log.1
        global save_pos
        try:

            if(event.name == filename): 
                print "=="
                self.auditLogObject.read_log1_file()
            else:
                logging.debug("审计的文件不相同{}  {}".format(filename1).format(event.name))
        except Exception as e:
            logging.error('[AuditLog]:send error: %s',str(e))

 
    def process_IN_DELETE(self, event):   #文件删除
        # logging.debug("process_IN_DELETE")
        print "delete event:", event.name

    def process_IN_ACCESS(self, event):  #访问
        # logging.debug("process_IN_ACCESS")
        print "ACCESS event:", event.name



     
    def process_IN_ATTRIB(self, event):  #属性
        # logging.debug("process_IN_ATTRIB")
        print "ATTRIB event:  文件属性", event.name

     
    def process_IN_CLOSE_NOWRITE(self, event):
        # logging.debug("process_IN_CLOSE_NOWRITE")
        print "CLOSE_NOWRITE event:", event.name

     
    def process_IN_CLOSE_WRITE(self, event):  # 关闭写入
        # logging.debug("process_IN_CLOSE_WRITE")
        print "CLOSE_WRITE event:", event.name
        try:
            if(event.name == filename):
                self.auditLogObject.read_log_file()

            else:
                logging.debug("文件名不对 不是审计文件")
                return


        except Exception as e:
            logging.error('[AuditLog]:send error: %s',str(e))
     
     
    def process_IN_OPEN(self, event):  # 打开
        # logging.debug("process_IN_OPEN")
        print "OPEN event:", event.name



def Producer():

    try:
        auditLogObj = AuditLog()
        auditLogObj.start("./log/log.log")

    except Exception as e:
        logging.error('[auditLog]:send error: %s',str(e))
 
 
 
def  Consumer():

    try:
        print "Consumer"
        auditLogObject = AuditLog()
        print auditLogObject.str
        while True:
            print "ddd"
            time.sleep(1)
            
            while auditLogObject.log_queue.qsize() != 0 :
                print "queue size",auditLogObject.log_queue.qsize()
                print auditLogObject.log_queue.get()
            # time.sleep(5)

    except Exception as e:
        logging.error('[AuditLog]:send error: %s',str(e))





if __name__ == '__main__':


 
 
    # a = threading.Thread(target=Producer,)
    # a.start()
    b = threading.Thread(target=Consumer,)
    b.start()
原文地址:https://www.cnblogs.com/wanghuixi/p/11254163.html