selenium webdriver (python)

import os, sys

os_sep = os.sep
base_dir = os.path.dirname(os_sep.join(os.path.abspath(file).split(os_sep)[0:-2]))
sys.path.append(base_dir)

from core.utils import MysqlHelper
import time
import logging
import requests
import threading

from selenium import webdriver

logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s[thread:%(thread)d][process:%(process)d]',
datefmt='%a, %d %b %Y %H:%M:%S',
filename='%s/log/%s%s.log' % (
base_dir, time.strftime('%Y%m%d', time.localtime(time.time())), file.split(os_sep)[-1]),
filemode='a')

class MyThread(threading.Thread):
def init(self, func, args, name):
threading.Thread.init(self)
self.func, self.args, self.name = func, args, name

def run(self):
    self.func(self.args)

url_counter = 0

def main():
# 最近3600秒内url在test_tab0具有唯一性
mysql_obj = MysqlHelper()
#
"""
联合检测
q = 'SELECT url FROM ( SELECT url FROM test_tab0 WHERE unix_timestamp(now()) - create_time<=3600 UNION ALL SELECT url FROM test_tab0_error where status!=2 ) AS t GROUP BY url;'
未失效url检测
q = 'SELECT url FROM test_tab0 WHERE unix_timestamp(now()) <expire_time;'
当前半小时
q = 'SELECT url FROM test_tab0 WHERE unix_timestamp(now()) - create_time<= 1800;'

后期任务:
test_tab0_error积累一定数据后对url重新检测
#3个功能点:当前半个小时、当前未失效的url test_tab0内url的异常情况(当前的2个功能点)、(后期任务:test_tab0_error积累一定数据后对url重新检测)
"""

q = 'SELECT url FROM test_tab0 WHERE unix_timestamp(now()) <expire_time;'

tuple_l = mysql_obj.select(q)
del mysql_obj
if len(tuple_l) == 0:
    s = '无待检测url,程序退出'
    print(s)
    logging.info(s)

# 考虑到每1小时执行下该脚本,对url异常的处理为:第一次请求为预期则终止请求,反之,间隔30后,再至多请求2次,每次间隔10s
sleep_counter, sleep_step, sleep_seconds, mycode_l, repeat_times, repeat_sleep_times = 0, 20, 1, [
    'g3user.com', '51g3.com.cn'], 4, 10

# 重构到基类 where list
# d当前为为了f_l字段的需求改动
def get_onerow(url, f_l=['title', 'uid', 'money_total'], tab='test_tab0'):
    mysql_obj = MysqlHelper()
    f_s = ','.join(f_l)
    q = 'SELECT %s FROM %s WHERE url="%s" ORDER BY id DESC LIMIT 1' % (f_s, tab, url)
    s = '%s%s' % (' DB ', q)
    logging.info(s)
    t = mysql_obj.select(q)
    if t != -1:
        t = t[0]
    del mysql_obj
    return t

def chk_exception_url(url, sleep_seconds=0, http_tag='http://'):
    time.sleep(sleep_seconds)
    global url_counter

    ret = {}
    # db url状态值 状态 0:打不开 1:打开无广告 2:已处理
    ret['ok'], ret['status_code'], s = -1, -1, time.strftime('%Y%m%d %H:%M:%S', time.localtime(time.time()))
    try:
        if url.find('http') == -1:
            url = '%s%s' % (http_tag, url)
        r = requests.get(url)
        ret['status_code'], txt_pos = int(r.status_code), -1
        s = '%s,%s,%s,%s' % (s, ret['status_code'], url, r)
    except Exception as e:
        ret['ok'] = 0
        s = '%s %s %s' % (s, ' SPIDER ', e)
        logging.error(s)
        print(e, url)

    # 当前,仅考虑目标站返回200
    if ret['status_code'] == 200:
        driver = webdriver.PhantomJS(
            executable_path='/usr/local/phantomjs/bin/phantomjs')
        driver.get(url)
        time.sleep(1)
        page_source = driver.page_source
        for ii in mycode_l:
            if page_source.find(ii) > -1:
                ret['ok'], txt_pos = 1, 1
                break
        if txt_pos == -1:
            s = '%s%s' % (s, '返回200,但是在html中未检测到我公司代码。')
            ret['ok'], ret['info'] = 0, s

    # elif ret['status_code'] == 403:
    # www.hsdcw.com/fenlei/41668214.html
    elif ret['status_code'] == 403:
        pass
    else:
        ret['ok'], ret['info'] = 0, s

    url_counter += 1
    s = '%s/%s%s%s' % (url_counter, len(tuple_l), 'chk-ret', s)
    print(s)
    if ret['ok'] == 0:
        logging.warning(s)
    else:
        logging.info(s)
    return ret

tn, tl, tstep = len(tuple_l), [], 5000

def tf(ts):

    te = ts + tstep
    te = min(te, tn)
    for i in tuple_l[ts:te]:
        url = i[0]
        """
      针对新浪爱问的规则:  不检测
      """
        if url.find('iask.sina.com') > -1:
            continue
        write_db_flag = 1
        for t in range(0, repeat_times, 1):
            print('threadID', threading.get_ident(), url)
            ret = chk_exception_url(url, repeat_sleep_times)
            if ret['ok'] == 1:
                write_db_flag = 0
                break

        if write_db_flag == 1:
            try:
                title, uid, money_total = get_onerow(url)
            except Exception as e:
                s = '%s %s %s' % (' DB Exception-去test_tab0查', url, e)
                logging.info(s)
                print(s)
                break

            # 多线程 考虑到原包的 数据库限制,每次均实例化数据库类,用后删除
            mysql_obj = MysqlHelper()
            q = 'SELECT id FROM test_tab0_error WHERE url="%s" LIMIT 1' % (url)
            print(q)
            try:
                r = mysql_obj.select(q)
                s = '%s%s' % ('-SQL-', q)
                logging.info(s)
                print(q)
            except Exception as e:
                s = '%s%s %s' % (' DB Exception-', q, e)
                logging.info(s)
                print(s)
                break

            ctime = int(time.time())
            db_status = 1 if ret['status_code'] == 200 else 0
            if len(r) == 0:
                q = 'INSERT INTO test_tab0_error (title,url,status,remarks,update_time,create_time,uid,money) VALUES ("%s","%s","%s","%s","%s","%s","%s","%s")' % (
                    title, url, db_status, ret['info'], ctime, ctime, uid, money_total)
            elif len(r) == 1:
                continue
                """
              q = 'UPDATE test_tab0_error SET title="%s",status="%s",remarks="%s",update_time="%s" ,uid="%s", money="%s"' % (
                    title, db_status, ret['info'], ctime, uid, money_total)    
              后期处理test_tab0_error二次检测的更新                
             """
            try:
                mysql_obj.execute(q)
                mysql_obj.commit()
                del mysql_obj
                s = '%s%s' % (' DB SQL ok ', q)
                logging.info(s)
                print(s)
            except Exception as e:
                s = '%s%s %s' % (' DB Exception-', q, e)
                logging.error(s)
                print(s)

for i in range(0, tn, tstep):
    if i >= tn:
        break
    thread_instance = MyThread(tf, (i), tf.__name__)
    tl.append(thread_instance)

for t in tl:
    t.setDaemon = False
    t.start()
for t in tl:
    t.join()

if name == 'main':
main()

原文地址:https://www.cnblogs.com/rsapaper/p/6930764.html