批量弱口令升级版本

自己写的才好用

import asyncio
import aiohttp
import time


# 协程函数-->负责发请求
async def func(url):
    full_url = f"{url}/login"
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
    }
    data = {
        "userName": "admin",
        "password": "123456"
    }
    try:
        async with aiohttp.ClientSession() as session:
            async with session.post(full_url, headers=headers, data=data, verify_ssl=False,timeout=5) as response:
                data_dict = await response.json()
    except Exception as e:
        print(f"{url} <===请求失败===>")
        return
    if data_dict.get("code") == 200 and data_dict.get("msg") == None:
        msg = f"{url} ===> 存在弱口令"
        print(msg)
        return msg
    else:
        print(f"{url} <===密码错误===>")


# 主函数,负责调度执行任务函数
async def main():
    # 创建任务列表
    with open('ips.txt', mode='r', encoding='u8') as f1, open('结果.txt', mode='w', encoding='u8') as f2:
        task_list = []
        for i in f1:
            i = i.strip()
            if 'http' not in i:
                i = f"http://{i}"
            task_list.append(asyncio.create_task(func(i)))
        l = await asyncio.gather(*task_list)
        for el in l:
            if el: f2.write(f"{el}
")


if __name__ == '__main__':
    start_time = time.time()
    asyncio.run(main())
    print(time.time() - start_time)

多线程版本

import requests
from concurrent.futures import ThreadPoolExecutor
import queue
import threading


def save_file(msg):
    with open('res.txt', mode='a', encoding="utf-8") as f:
        f.write(f"{msg}
")


def check_pass(lock, q):
    url = q.get()
    full_url = f"{url}/login"
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
    }
    data = {
        "userName": "admin",
        "password": "123456"
    }
    try:
        res = requests.post(url=full_url, headers=headers, data=data)
        data_dict = res.json()
    except Exception as e:
        print("<===请求失败===>")
        return

    if data_dict.get("code") == 200 and data_dict.get("msg") == None:
        msg = f"{url} ===> 存在弱口令"
        print(msg)
        with lock:
            save_file(msg)

    else:
        print("<===密码错误===>")


def main():
    lock = threading.RLock()
    q = queue.Queue()
    with ThreadPoolExecutor() as pool:
        with open('ips.txt', mode='r', encoding='utf-8') as f:
            for i in f:
                line = i.strip()
                if "http" not in line:
                    line = f"http://{line}"
                q.put(line)
                pool.submit(check_pass, lock, q)


if __name__ == '__main__':
    main()

协程版本

import asyncio
import aiofiles
import aiohttp


async def save_file(msg):
    async with aiofiles.open('res1.txt', mode='a', encoding="utf-8") as f:
        await f.write(f"{msg}
")


async def check_pass(url):
    full_url = f"{url}/login"
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
    }
    data = {
        "userName": "admin",
        "password": "123456"
    }
    try:
        async with aiohttp.ClientSession() as session:
            async with session.post(full_url, headers=headers, data=data, verify_ssl=False) as response:
                data_dict = await response.json()
    except Exception as e:
        print("<===请求失败===>")
        return

    if data_dict.get("code") == 200 and data_dict.get("msg") == None:
        msg = f"{url} ===> 存在弱口令"
        print(msg)
        await save_file(msg)

    else:
        print("<===密码错误===>")


async def main():
    async with aiofiles.open("ips.txt", mode="r", encoding="utf-8") as f:
        async for i in f:
            line = i.strip()
            if "http" not in line:
                line = f"http://{line}"
            await check_pass(line)


if __name__ == '__main__':

    fiberObject = main()
    asyncio.run(fiberObject)

携程最终版本

import asyncio
import aiofiles
import aiohttp


async def save_file(msg):
    async with aiofiles.open('res1.txt', mode='a', encoding="utf-8") as f:
        await f.write(f"{msg}
")


async def check_pass(url):
    full_url = f"{url}/login"
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
    }
    data = {
        "userName": "admin",
        "password": "123456"
    }
    try:
        async with aiohttp.ClientSession() as session:
            async with session.post(full_url, headers=headers, data=data, verify_ssl=False) as response:
                data_dict = await response.json()
    except Exception as e:
        print("<===请求失败===>")
        return

    if data_dict.get("code") == 200 and data_dict.get("msg") == None:
        msg = f"{url} ===> 存在弱口令"
        print(msg)
        await save_file(msg)

    else:
        print("<===密码错误===>")


async def main():
    task_list = []
    async with aiofiles.open("ips.txt", mode="r", encoding="utf-8") as f:
        async for i in f:
            line = i.strip()
            if "http" not in line:
                line = f"http://{line}"
            # create_task时,将会自动执行任务,我们需要将它防止在任务队列中
            task_list.append(asyncio.create_task(check_pass(line)))
    # await asyncio.wait(task_list)
    # 指的是等待所有的任务运行完成后,继续运行主线程‘
    # 它将返回2个集合,分别是已完成任务的集合和未完成任务的集合
    done, pending = await asyncio.wait(task_list)
    print(done)
    print(pending)

if __name__ == '__main__':
    import time
    start = time.time()
    asyncio.run(main())
    end = time.time()
    print(end - start)


import requests
from concurrent.futures import ThreadPoolExecutor
import queue
import threading


def save_file(msg):
    with open('res.txt', mode='a', encoding="utf-8") as f:
        f.write(f"{msg}
")


def check_pass(lock, url):
    full_url = f"{url}/login"
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
    }
    data = {
        "userName": "admin",
        "password": "123456"
    }
    try:
        res = requests.post(url=full_url, headers=headers, data=data)
        data_dict = res.json()
    except Exception as e:
        print("<===请求失败===>")
        return

    if data_dict.get("code") == 200 and data_dict.get("msg") == None:
        msg = f"{url} ===> 存在弱口令"
        print(msg)
        with lock:
            save_file(msg)

    else:
        print("<===密码错误===>")


def main():
    lock = threading.RLock()
    q = queue.Queue()
    with ThreadPoolExecutor() as pool:
        with open('ips.txt', mode='r', encoding='utf-8') as f:
            for i in f:
                line = i.strip()
                if "http" not in line:
                    line = f"http://{line}"
                q.put(line)
        while q.qsize():
            url = q.get()
            pool.submit(check_pass, lock, url)


if __name__ == '__main__':
    import time
    start = time.time()
    main()
    end = time.time()
    print(end - start)
原文地址:https://www.cnblogs.com/xcymn/p/15336360.html