scapy抓包使用

技术scapy抓包使用 scapy抓包使用# coding=utf-8
import json
import time
import os
import dpkt
import socket
impor

斯盖抓包使用

# coding=utf-8

导入json

导入时间

导入操作系统

导入dpkt

导入套接字

导入日期时间

导入uuid

导入追溯

从dpkt.ethernet导入以太网

来自scapy.layers.l2进口乙醚

来自scapy.sendrecv导入嗅探

从scapy.utils导入wrpcap

从大数据。data _ common。utils。文件_ util导入文件跑龙套

def get_local_ip():

hostname=socket.gethostname()

# 获取本机内网互联网协议(互联网协议的缩写)

local _ IPS=套接字。gethostbyname _ ex(主机名)[-1]

返回本地互联网协议(Internet Protocol)

def body_transfer(body):

str_body=body.decode()

body_ls=str_body.split(')

d={}

对于body_ls:中的项目

key_,value_=item.split('=')

d[key_ .strip()]=value_ .条带()

def analysis_pcap(时间戳,buf):

数据={}

if isinstance(buf,dpkt.ip.IP):

eth=buf

else:

eth=dpkt.ethernet。以太网(buf)

# print(eth.data.__dict__)

#打印(' IP层: ' eth。数据。_ _类_ _ .__名称__) #以太包的数据既是网络层包

# print(' TCP层: ' eth。数据。数据。_ _类_ _ .__名称__) #网络层包的数据既是传输层包

# print(' http layer : ' eth。数据。数据。数据。_ _类_ _ .__名称__) #传输层包的数据既是应用层包

#打印(' Timestamp: ',字符串(日期时间。日期时间。时间戳的utc时间(时间戳))#打印出包的抓取时间

if isinstance(eth.data,dpkt.ip.IP)或isinstance(eth.data,dpkt.ip6.IP6):

# #打印(%d)非互联网协议(Internet Protocol)数据包类型不受支持% s"%(int(timestamp),eth.data.__class__).__名称_ _))

# print('ip.data type为{} ')。格式(eth.data.__class__ .__名称_ _))

# print(repr(eth.data))

#返回数据

ip=eth.data

if isinstance(eth.data,dpkt.ip.IP):

src_ip=socket.inet_ntoa(ip.src)

dst_ip=socket.inet_ntoa(ip.dst)

# do _ not _ fragment=bool(IP。关闭dpkt。知识产权。IP _ DF)

# more _ fragments=bool(IP。关闭dpkt。知识产权。IP _ MF)

# fragment _ offset=IP。关闭dpkt。知识产权。IP _ off掩码

# key=' IP v4 '如果是实例(eth。data,dpkt.ip.IP) else 'IPV6 '

数据。更新({ 0

时间' :时间戳,

IPV4': {'src': src_ip,' dst': dst_ip}

})

else:

src_ip=ip.src

dst_ip=ip.dst

数据。更新({ 0

时间' :时间戳,

IPV6': {'src': src_ip,' dst

': dst_ip}
})
print('ip.data type is {}'.format(ip.data.__class__.__name__))
if isinstance(ip.data, dpkt.tcp.TCP):
layer = ip.data
data.update(analysis_tcp(layer))
elif isinstance(ip.data, dpkt.udp.UDP):
layer = ip.data
data.update(analysis_udp(layer))
elif isinstance(ip.data, dpkt.icmp.ICMP) or isinstance(ip.data, dpkt.icmp6.ICMP6):
layer = ip.data
data.update(analysis_icmp(layer))
else:
print('analysis_pcap ip.data {}'.format(repr(ip.data)))
data = {'time': timestamp, eth.data.__class__.__name__: {}}
else:
print('analysis_pcap eth.data {}'.format(repr(eth.data)))
data = {'time': timestamp, eth.data.__class__.__name__: eth.data.__dict__}
return data
def analysis_udp(udp, key="UDP"):
try:
data_dict = {}
try:
data_str = udp.data.decode('utf-8')
if data_str.startswith('M-SEARCH'):
data_list = data_str.strip().split('\n')[1:]
for item in data_list:
k, v = item.split(':')[0], ':'.join(item.split(':')[1:])
data_dict[k.strip()] = v.strip()
else:
data_dict = data_str
except:
pass
return {
key: {
'sport': udp.sport,
'dport': udp.dport,
'ulen': udp.ulen,
'sum': udp.sum,
'data': data_dict
}
}
except:
pass
print('analysis_udp udp {}'.format(repr(udp)))
return {}
def analysis_tcp(tcp, key='TCP'):
data = {
key: {
'dport': tcp.dport,
'sport': tcp.sport,
'ack': tcp.ack,
'seq': tcp.seq
}
}
try:
request = dpkt.http.Request(tcp.data)
data['HTTP'] = {
'type': 'request',
'uri': request.uri,
'Method': request.method.upper(),
'Headers': dict(request.headers),
'Body': body_transfer(request.body),
'Data': body_transfer(request.data)
}
except:
pass
try:
response = dpkt.http.Response(tcp)
data['HTTP'] = {
'type': 'response',
'Headers': dict(response.headers),
'Body': body_transfer(response.body),
'Data': body_transfer(response.data)
}
except:
pass
try:
data_dict = {}
data_str = tcp.data.decode('utf-8')
if data_str.startswith('M-SEARCH'):
data_list = data_str.strip().split('\n')[1:]
for item in data_list:
k, v = item.split(':')[0], ':'.join(item.split(':')[1:])
data_dict[k.strip()] = v.strip()
else:
data_dict = data_str
if key in data and data_dict:
data[key]['Data'] = data_dict
except:
pass
if data:
return data
print('analysis_tcp tcp {}'.format(repr(tcp)))
return {}
def analysis_icmp(icmp, key='ICMP'):
try:
if isinstance(icmp, dpkt.icmp.ICMP):
return {
'ICMP': {
'type': icmp.type,
'sum': icmp.sum,
'Data': analysis_pcap(int(time.time()), icmp.data.data)
}
}
else:
data_str = ''
try:
data_str = icmp.data.decode('utf-8')
except:
pass
return {
'ICMP6': {
'type': icmp.type,
'sum': icmp.sum,
'Data': data_str
}
}
except:
pass
return {}
def analysis_pcap2(timestamp, buf):
e = Ether(buf)
e.show()
def get_dpkt():
# 这里是针对单网卡的机子, 多网卡的可以在参数中指定网卡, 例:iface=Qualcomm QCA9377 802.11ac Wireless Adapter
dpkt_ = sniff(count = 10)
_uuid = uuid.uuid1()
filename = f"{_uuid}.pcap"
wrpcap(filename, dpkt_)
return filename
def main():
while True:
filename = get_dpkt()
with open(filename, "rb") as f:
pcap = dpkt.pcap.Reader(f)
local_ips = get_local_ip()
for timestamp, buf in pcap:
res = analysis_pcap(timestamp, buf)
# analysis_pcap2(timestamp, buf)
print(res)
# FileUtil.write_lines('test.txt', json.dumps(res))
os.remove(filename)
# FileUtil.flush()
if __name__ =='__main__':
main()

https://github.com/jiangsiwei2018/BigData.git
实例代码git仓库地址

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/103394.html

(0)

相关推荐

  • C#与C++哪个更强

    技术C#与C++哪个更强这篇文章主要介绍“C#与C++哪个更强”,在日常操作中,相信很多人在C#与C++哪个更强问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”C#与C++哪个更强”的疑惑有

    攻略 2021年11月29日
  • 春城的意思,《寒食》春城……的意思

    技术春城的意思,《寒食》春城……的意思对皇都春色的陶醉和对盛世承平的歌咏。《寒食》作者春城的意思:韩翎春城无处不飞花,寒食东风御柳斜。日暮汉宫传蜡烛,轻烟散入五侯家。【译文】:春天,长安城处处飘飞着落花;寒食节,东风把御

    生活 2021年10月23日
  • 根号下的数的取值范围,根号里面的数的取值范围

    技术根号下的数的取值范围,根号里面的数的取值范围根号下的数可以等于零根号下的数的取值范围。
    通常说的根号都是只二次根号,即√,它表示对根号下的数开平方。根号下的数叫做“被开方数”。所以根号下的数需要满足的条件:是某个数的

    生活 2021年10月28日
  • 直方图与ACS实例分析

    技术直方图与ACS实例分析本篇内容主要讲解“直方图与ACS实例分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“直方图与ACS实例分析”吧!一般情况下ACS必须结合直方图一起

    攻略 2021年11月15日
  • 好玩的单机手游有哪些,有哪些好玩的单机策略手游

    技术好玩的单机手游有哪些,有哪些好玩的单机策略手游一:《暴战机甲兵》 游戏中模拟了31世纪后的未来战场,在那时,以核聚变为动力的巨型步行机器人,也就是所谓的战斗机甲,成为战场的主宰者,搭配坦克好玩的单机手游有哪些、星舰、

    生活 2021年10月27日
  • 怎么使用Oracle数据库的逻辑备份工具

    技术怎么使用Oracle数据库的逻辑备份工具这篇文章主要介绍“怎么使用Oracle数据库的逻辑备份工具”,在日常操作中,相信很多人在怎么使用Oracle数据库的逻辑备份工具问题上存在疑惑,小编查阅了各式资料,整理出简单好

    攻略 2021年11月5日