按日期切割nginx访问日志--及性能优化

先谈下我们需求,一个比较大的nginx访问日志,根据访问日期切割日志,保存在/tmp目录下。

测试机器为腾讯云机子,单核1G内存。测试日志大小80M。

不使用多线程版:

#!/usr/bin/env python
# coding=utf-8

import re
import datetime

if __name__ == '__main__':
    date_pattern = re.compile(r'[(d+)/(w+)/(d+):')
    with open('./access_all.log-20161227') as f:
        for line in f:
            day, mon, year = re.search(date_pattern, line).groups()
            mon = datetime.datetime.strptime(mon, '%b').month
            log_file = '/tmp/%s-%s-%s' % (year, mon, day)
            with open(log_file, 'a+') as f:
                f.write(line)
View Code

耗时:

[root@VM_255_164_centos data_parse]# time python3 log_cut.py 
real 0m41.152s user 0m32.578s sys 0m6.046s

多线程版:

#!/usr/bin/env python
# coding=utf-8

import re
import datetime
import threading


date_pattern = re.compile(r'[(d+)/(w+)/(d+):')

def log_cut(line):
    day, mon, year = re.search(date_pattern, line).groups()
    mon = datetime.datetime.strptime(mon, '%b').month
    log_file = '/tmp/%s-%s-%s' % (year, mon, day)
    with open(log_file, 'a+') as f:
        f.write(line)


if __name__ == '__main__':
    with open('./access_all.log-20161227') as f:
        for line in f:
            t = threading.Thread(target=log_cut, args=(line,))
            t.setDaemon(True)
            t.start()
View Code

耗时:

# time python3 log_cut.py 

real    1m35.905s
user    1m10.292s
sys    0m19.666s

使用多线程版竟然比不使用多进程版要慢的多。。cpu密集型任务使用上下文切换果然很耗时。

线程池版:

线程池类

#!/usr/bin/env python
# coding=utf-8

import queue
import threading
import contextlib
import time

StopEvent = object()

class ThreadPool(object):

    def __init__(self, max_num, max_task_num = None):
        if max_task_num:
            self.q = queue.Queue(max_task_num)  
        else:
            self.q = queue.Queue()
        self.max_num = max_num 
        self.cancel = False
        self.terminal = False
        self.generate_list = [] 
        self.free_list = []

    def run(self, func, args, callback=None):
        if self.cancel:
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)


    def generate_thread(self):
        t = threading.Thread(target=self.call) 
        t.start()

    def call(self):
        current_thread = threading.currentThread()
        self.generate_list.append(current_thread) 

        event = self.q.get() 
        while event != StopEvent:

            func, arguments, callback = event 
            try:
                result = func(*arguments) 
                success = True
            except Exception as e:
                success = False
                result = None

            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass

            with self.worker_state(self.free_list, current_thread):
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()
        else:
            self.generate_list.remove(current_thread)

    def close(self):
        self.cancel = True
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)  # 
            full_size -= 1

    def terminate(self):
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)

        self.q.queue.clear()

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)
threadingPool.py

代码

#!/usr/bin/env python
# coding=utf-8

import re
import datetime
from threadingPool import ThreadPool

date_pattern = re.compile(r'[(d+)/(w+)/(d+):')

def log_cut(line):
    day, mon, year = date_pattern.search(line).groups()
    mon = datetime.datetime.strptime(mon, '%b').month
    log_file = '/tmp/%s-%s-%s' % (year, mon, day)
    with open(log_file, 'a+') as f:
        f.write(line)

def callback(status, result):
    pass

pool = ThreadPool(1)

with open('./access_all.log-20161227') as f:
    for line in f:
        pool.run(log_cut, (line,), callback)

pool.close()
View Code

耗时:

# time python3 log_cut2.py 

real    0m53.371s
user    0m44.761s
sys    0m5.600s

线程池版比多线程版要快,看来写的线程池类还是有用的。减少了上下文切换时间。

进程池版:

#!/usr/bin/env python
# coding=utf-8

import re
import datetime
from multiprocessing import Pool

date_pattern = re.compile(r'[(d+)/(w+)/(d+):')

def log_cut(line):
    day, mon, year = re.search(date_pattern, line).groups()
    mon = datetime.datetime.strptime(mon, '%b').month
    log_file = '/tmp/%s-%s-%s' % (year, mon, day)
    with open(log_file, 'a+') as f:
        f.write(line)

if __name__ == '__main__':
    pool = Pool(1)
    with open('./access_all.log-20161227') as f:
        for line in f:
            pool.apply_async(func=log_cut, args=(line,))
    pool.close()
View Code

单个进程耗时:

# time python3 log_cut.py 

real    0m28.392s
user    0m23.451s
sys    0m1.888s

2个进程耗时:

# time python3 log_cut.py 

real    0m40.920s
user    0m33.690s
sys    0m3.206s

看来使用多进程时,如果是单核cpu只开一个进程,多核cpu的话开多个速度更快,单核cpu开多个进程速度很慢。

shell版

#!/bin/bash

Usage(){
    echo "Usage: $0 Logfile"
}
         
if [ $# -eq 0 ] ;then
    Usage
    exit 0
else
    Log=$1
fi
 
date_log=$(mktemp)
 
cat $Log |awk -F'[ :]' '{print $5}'|awk -F'[' '{print $2}'|uniq > date_log
 
for i in `cat date_log`
do
    grep $i $Log > /tmp/log/${i:7:10}-${i:3:3}-${i:0:2}.access
 
done
View Code

耗时:

# time sh log_cut.sh access_all.log-20161227 

real    0m2.435s
user    0m2.042s
sys    0m0.304s

shell的效果非常棒啊,只用2s多久完成了。

原文地址:https://www.cnblogs.com/xiaoming279/p/6233620.html