pyhon C段地址,已使用IP地址扫描

pyhon C段地址,已使用IP地址扫描

# -*- coding: UTF-8 -*-
import time
import threading
import subprocess
from queue import Queue
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
import platform


def get_ip_list(net_segment):
    # 创建一个队列
    IP_QUEUE = Queue()
    ip_list = []
    list_segment = net_segment.split('.')
    ip_index = 1
    # 将需要 ping 的 ip 加入队列
    for i in range(1, 254):
        list_segment[-1] = str(ip_index + i)
        addr = ('.').join(list_segment)
        IP_QUEUE.put(addr)

    # 定义一个执行 ping 的函数
    def ping_ip(ip):
        os_type = platform.system()
        if os_type == 'Windows':
            res = subprocess.call('ping -n 2 -w 5 %s' % ip,
                                  stdout=subprocess.PIPE)  # linux 系统将 '-n' 替换成 '-c',增加 'shell=True'
        else:
            res = subprocess.call('ping -c 2 -w 5 %s' % ip, shell=True,
                                  stdout=subprocess.PIPE)  # linux 系统将 '-n' 替换成 '-c',增加 'shell=True'
       
        if res == 0:
            print(ip, True)
            if lock.acquire():
                ip_list.append(ip)
                lock.release()

    # 创建一个最大任务为100的线程池
    pool = ThreadPoolExecutor(max_workers=100)
    lock = threading.Lock()
    start_time = time.time()
    all_task = []
    while not IP_QUEUE.empty():
        all_task.append(pool.submit(ping_ip, IP_QUEUE.get()))
    # 等待所有任务结束
    wait(all_task, return_when=ALL_COMPLETED)
    print('ping耗时:%s' % (time.time() - start_time))
    print("当前已用Ip数量:%s" % str(len(ip_list)))
    return ip_list


if __name__ == '__main__':
    ip_list = get_ip_list("192.168.0.0")
    print(ip_list)
    print(len(ip_list))
原文地址:https://www.cnblogs.com/Anec/p/13855691.html