scapy抓包使用

# coding=utf-8
import json
import time
import os
import dpkt
import socket
import datetime
import uuid
import traceback

from dpkt.ethernet import Ethernet
from scapy.layers.l2 import Ether
from scapy.sendrecv import sniff
from scapy.utils import wrpcap

from BigData.data_common.utils.file_util import FileUtil


def get_local_ip():
    hostname = socket.gethostname()
    # 获取本机内网ip
    local_ips = socket.gethostbyname_ex(hostname)[-1]
    return local_ips

def body_transfer(body):
    str_body = body.decode()
    body_ls = str_body.split("&")
    d = {}
    for item in body_ls:
        key_, value_ = item.split("=")
        d[key_.strip()] = value_.strip()


def analysis_pcap(timestamp, buf):
    data = {}
    if isinstance(buf, dpkt.ip.IP):
        eth = buf
    else:
        eth = dpkt.ethernet.Ethernet(buf)
    # print(eth.data.__dict__)
    # print("ip layer:"+eth.data.__class__.__name__) #以太包的数据既是网络层包
    # print("tcp layer:"+eth.data.data.__class__.__name__) #网络层包的数据既是传输层包
    # print("http layer:" + eth.data.data.data.__class__.__name__) #传输层包的数据既是应用层包
    # print('Timestamp: ',str(datetime.datetime.utcfromtimestamp(timestamp))) #打印出包的抓取时间
    if isinstance(eth.data, dpkt.ip.IP) or isinstance(eth.data, dpkt.ip6.IP6):
        #     # print('%d Non IP Packet type not supported %s' % (int(timestamp), eth.data.__class__.__name__))
        #     print('ip.data type is {}'.format(eth.data.__class__.__name__))
        #     print(repr(eth.data))
        #     return data
        ip = eth.data
        if isinstance(eth.data, dpkt.ip.IP):
            src_ip = socket.inet_ntoa(ip.src)
            dst_ip = socket.inet_ntoa(ip.dst)
            # do_not_fragment =bool(ip.off & dpkt.ip.IP_DF)
            # more_fragments =bool(ip.off & dpkt.ip.IP_MF)
            # fragment_offset = ip.off & dpkt.ip.IP_OFFMASK
            # key = 'IPV4' if isinstance(eth.data, dpkt.ip.IP) else 'IPV6'
            data.update({
                'time': timestamp,
                'IPV4': {'src': src_ip, 'dst': dst_ip}
            })
        else:
            src_ip = ip.src
            dst_ip = ip.dst
            data.update({
                'time': timestamp,
                'IPV6': {'src': src_ip, 'dst': dst_ip}
            })
        print('ip.data type is {}'.format(ip.data.__class__.__name__))
        if isinstance(ip.data, dpkt.tcp.TCP):
            layer = ip.data
            data.update(analysis_tcp(layer))
        elif isinstance(ip.data, dpkt.udp.UDP):
            layer = ip.data
            data.update(analysis_udp(layer))
        elif isinstance(ip.data, dpkt.icmp.ICMP) or isinstance(ip.data, dpkt.icmp6.ICMP6):
            layer = ip.data
            data.update(analysis_icmp(layer))
        else:
            print('analysis_pcap ip.data {}'.format(repr(ip.data)))
            data = {'time': timestamp, eth.data.__class__.__name__: {}}
    else:
        print('analysis_pcap eth.data {}'.format(repr(eth.data)))
        data = {'time': timestamp, eth.data.__class__.__name__: eth.data.__dict__}
    return data

def analysis_udp(udp, key="UDP"):
    try:
        data_dict = {}
        try:
            data_str = udp.data.decode('utf-8')
            if data_str.startswith('M-SEARCH'):
                data_list = data_str.strip().split('\n')[1:]
                for item in data_list:
                    k, v = item.split(':')[0], ':'.join(item.split(':')[1:])
                    data_dict[k.strip()] = v.strip()
            else:
                data_dict = data_str
        except:
            pass
        return {
            key: {
                'sport': udp.sport,
                'dport': udp.dport,
                'ulen': udp.ulen,
                'sum': udp.sum,
                'data': data_dict
            }
        }
    except:
        pass
    print('analysis_udp udp {}'.format(repr(udp)))
    return {}

def analysis_tcp(tcp, key='TCP'):
    data = {
        key: {
            'dport': tcp.dport,
            'sport': tcp.sport,
            'ack': tcp.ack,
            'seq': tcp.seq
        }
    }
    try:
        request = dpkt.http.Request(tcp.data)
        data['HTTP'] = {
                'type': 'request',
                'uri': request.uri,
                'Method': request.method.upper(),
                'Headers': dict(request.headers),
                'Body': body_transfer(request.body),
                'Data': body_transfer(request.data)
            }
    except:
        pass
    try:
        response = dpkt.http.Response(tcp)
        data['HTTP'] = {
                'type': 'response',
                'Headers': dict(response.headers),
                'Body': body_transfer(response.body),
                'Data': body_transfer(response.data)
            }
    except:
        pass
    try:
        data_dict = {}
        data_str = tcp.data.decode('utf-8')
        if data_str.startswith('M-SEARCH'):
            data_list = data_str.strip().split('\n')[1:]
            for item in data_list:
                k, v = item.split(':')[0], ':'.join(item.split(':')[1:])
                data_dict[k.strip()] = v.strip()
        else:
            data_dict = data_str
        if key in data and data_dict:
            data[key]['Data'] = data_dict
    except:
        pass
    if data:
        return data
    print('analysis_tcp tcp {}'.format(repr(tcp)))
    return {}

def analysis_icmp(icmp, key='ICMP'):
    try:
        if isinstance(icmp, dpkt.icmp.ICMP):
            return {
                'ICMP': {
                    'type': icmp.type,
                    'sum': icmp.sum,
                    'Data': analysis_pcap(int(time.time()), icmp.data.data)
                }
            }
        else:
            data_str = ''
            try:
                data_str = icmp.data.decode('utf-8')
            except:
                pass
            return {
                'ICMP6': {
                    'type': icmp.type,
                    'sum': icmp.sum,
                    'Data': data_str
                }
            }
    except:
        pass
    return {}


def analysis_pcap2(timestamp, buf):
    e = Ether(buf)
    e.show()

def get_dpkt():
    # 这里是针对单网卡的机子, 多网卡的可以在参数中指定网卡, 例:iface=Qualcomm QCA9377 802.11ac Wireless Adapter
    dpkt_ = sniff(count = 10)
    _uuid = uuid.uuid1()
    filename = f"{_uuid}.pcap"
    wrpcap(filename, dpkt_)
    return filename


def main():
    while True:
        filename = get_dpkt()
        with open(filename, "rb") as f:
            pcap = dpkt.pcap.Reader(f)
            local_ips = get_local_ip()
            for timestamp, buf in pcap:
                res = analysis_pcap(timestamp, buf)
                # analysis_pcap2(timestamp, buf)
                print(res)
                # FileUtil.write_lines('test.txt', json.dumps(res))

        os.remove(filename)
        # FileUtil.flush()

if __name__ =='__main__':
    main()
https://github.com/jiangsiwei2018/BigData.git 实例代码git仓库地址
原文地址:https://www.cnblogs.com/satansz/p/15575292.html