process_thread_action

import psycopg2
import threading

conn_fmac = psycopg2.connect(database='filter_useless_mac', user='user', password='password', host='192.168.168.168',
                             port='5432')


def fetch_rows(f_l):
    r = {}
    with conn_fmac:
        with conn_fmac.cursor() as curs:
            for i in f_l:
                # http://initd.org/psycopg/docs/faq.html
                # The arguments in the execute() methods can only represent data to pass to the query: they cannot represent a table or field name:
                curs.execute('SELECT detail_data FROM apiv2_single_mac_with_res WHERE mac= %s LIMIT 1 ', (i,))

                # psycopg2.InternalError: current transaction is aborted, commands ignored until end of transaction block
                try:
                    t = curs.fetchone()
                    if t is not None:
                        r[i] = {}
                        r[i] = t[0]
                except Exception:
                    continue
    return r


def update_rows(id, new_val):
    with conn_fmac:
        with conn_fmac.cursor() as curs:
            try:
                curs.execute(
                    'UPDATE control_group_with_compute_res SET mac_with_res_position_lat_lon_unique_num=%s WHERE oid_timestamp=%s',
                    (new_val, id))
                print(threading.get_ident(), 'OK')
            except Exception:
                print(Exception)


class MyThread(threading.Thread):
    def __init__(self, func, args, name):
        threading.Thread.__init__(self)
        self.name, self.func, self.args = name, func, args

    def run(self):
        self.func(self.args)


def main():
    with conn_fmac:
        with conn_fmac.cursor() as curs:
            curs.execute(
                'SELECT oid_timestamp,mac_with_res_position FROM control_group_with_compute_res WHERE mac_with_res_position IS NOT NULL ORDER BY oid_timestamp DESC ')
            tuple_l = curs.fetchall()

    tn, tl, tstep = len(tuple_l), [], 200

    def tf(ts):
        print(ts)
        te = ts + tstep
        te = min(te, tn)
        for i in tuple_l[ts:te]:
            oid_timestamp, mac_with_res_position = i
            n_l = []
            for k in mac_with_res_position:
                longitude, latitude = mac_with_res_position[k]['longitude'], mac_with_res_position[k]['latitude']
                s = '%s%s' % (longitude, latitude)
                if s not in n_l:
                    n_l.append(s)
            n = len(n_l)
            if n > 0:
                update_rows(oid_timestamp, n)

    for i in range(0, tn, tstep):
        if i >= tn:
            break
        thread_instance = MyThread(tf, (i), tf.__name__)
        tl.append(thread_instance)

    for t in tl:
        t.setDaemon = False
        t.start()
    for t in tl:
        t.join()


if __name__ == '__main__':
    main()
import json
import psycopg2
import threading

conn_fmac = psycopg2.connect(database='filter_useless_mac', user='postgres', password='postgres', host='192.168.8.8',
                             port='5432')

def update_rows(id, new_val):
    with conn_fmac:
        with conn_fmac.cursor() as curs:
            try:
                curs.execute(
                    'UPDATE control_group_with_compute_res SET add_lat_lon_to_original_res=%s WHERE oid_timestamp=%s',
                    (new_val, id))
                print(threading.get_ident(), 'OK')
            except Exception:
                print(Exception)


class MyThread(threading.Thread):
    def __init__(self, func, args, name):
        threading.Thread.__init__(self)
        self.name, self.func, self.args = name, func, args

    def run(self):
        self.func(self.args)


def main():
    with conn_fmac:
        with conn_fmac.cursor() as curs:
            sql = "SELECT tmp.oid_timestamp, ja.latitude, ja.longitude FROM ( SELECT oid_timestamp, detail_data ->> 'area_code' AS area_code FROM control_group_with_compute_res) tmp LEFT JOIN jmtool_areacode_longitude_latitude ja ON tmp.area_code = ja.area_code WHERE ja.area_code IS NOT NULL ORDER BY oid_timestamp ASC;"
            curs.execute(sql)
            tuple_l = curs.fetchall()

    tn, tl, tstep = len(tuple_l), [], 200

    def tf(ts):
        print(ts)
        te = ts + tstep
        te = min(te, tn)
        for i in tuple_l[ts:te]:
            oid_timestamp, lat, lon = i
            r = {}
            r['from'], r['latitude'], r['longitude'] = 'jmtool_areacode', lat, lon
            update_rows(oid_timestamp, json.dumps(r, ensure_ascii=False))

    for i in range(0, tn, tstep):
        if i >= tn:
            break
        thread_instance = MyThread(tf, (i), tf.__name__)
        tl.append(thread_instance)

    for t in tl:
        t.setDaemon = False
        t.start()
    for t in tl:
        t.join()


if __name__ == '__main__':
    main()
原文地址:https://www.cnblogs.com/rsapaper/p/7698029.html