利用python抓取分析数据包并存入mysql

7cmb a nobody

1、先决条件

本机配置好python及mysql运行环境,并且安装好python相关模块scapy pymysql

2、抓包及分析

抓包功能使用的是scapy中的sniff()函数,使用方法已经记录在前篇: scapy小记pt.4-嗅探 ,本文一定程度上算后篇

3、建立数据库及表

登陆数据库后,建立并指定相关数据库后,建立一张表,提前规划好数据格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
CREATE DATABASE testsql;

USE testsql;

CREATE TABLE `sniffTable` (

`index` int unsigned AUTO_INCREMENT,

`pktType` char(5),

`capTime` datetime,

`srcIP` char(16),

`dstIP` char(16),

`domain` varchar(128),

PRIMARY KEY(`index`)

)DEFAULT CHARSET=utf8mb4;
name dataType
index u_int()   AUTO_INCREMENT
pktType char(5)
capTime datetime
srcIP char(16)
dstIP char(16)
domain varchar(128)

4、整体代码

database.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import pymysql.cursors
import sys
class linkSql:
flagConnected=False
def __init__(self):
self.__conn=None
conf={
"host":"localhost",
"user":"root",
"database":"testsql",
"password":"yourPassWord",
"charset":"utf8mb4"
}

for i in ["host","user","database","password","password","charset"]:
if i not in conf.keys():
exit("MISSING ARGUMENT \"database.linkSql.conf[%s]\""%(i))

try:
self.__conn=pymysql.connect(
host=conf["host"],
user=conf["user"],
database=conf["database"],
password=conf["password"],
charset=conf["charset"],
cursorclass=pymysql.cursors.DictCursor
)
self.cursors=self.__conn.cursor()
self.flagConnected=True
print("database \"%s\" conneted"%(conf["database"]))
except:
sys.exit("failed to conneted \"%s\""%(conf["database"]))

# table为 数据库内的表名 type:字符串; dataList为 包含所有数据列表的列表 type:list
def assembleSqlText(self,table,dataList):
sqlHead="INSERT INTO " + table + " (pktType,capTime,srcIP,dstIP,`domain`) VALUES ("
sqlTail="),("
workingStr=sqlHead
for i in dataList:
#print(i)
for j in i:
workingStr=workingStr + "\"" + j + "\"" + ","
workingStr=workingStr[:-1]
workingStr+=sqlTail
fullSqlText=workingStr[:-2]
return fullSqlText
def exec(self,sqlCmd):
self.cursors.execute(sqlCmd)
self.__conn.commit()

# 销毁对象时关闭数据库连接
def __del__(self):
try:
self.__conn.close()
except pymysql.Error as e:
pass

# 关闭数据库连接
def close(self):
self.__del__()


sniffResults.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from scapy.all import *
from scapy.layers.http import *
import time
load_layer("tls")
scapy.config.Conf.sniff_promisc=True # 网卡工作模式设置为promisc,如果该主机网卡为局域网转发接口,理论上能截取经过的数据包

# 嗅探函数,得到目标数据包

def getDataList():
dataList=[]
def sniffMain():
argv_bpf = "tcp dst port 80 or 443" # 伯克利包过滤器,如果需要监听别的web端口,即使加载了http模块,http fields也将被raw
# 取代,但是raw里的内容将被自动解析为人类可读的bytes类型
plist = sniff(filter=argv_bpf,
lfilter=lambda x: ((x.haslayer(TLS_Ext_ServerName) or x.haslayer(HTTPRequest)) == True))
return plist

# 分析函数,返回一个分析数据后得到的dataList dataList为 包含所有数据列表的列表 type:list
def analyzeMain(plist):
# analyze tlsPkt, fetch the fields, and push them into the dataList
tlsPList = plist.filter(lambda x: ((x.haslayer(TLS_Ext_ServerName) == True)))
for i in tlsPList:
tempTime = time.localtime(int(i.time))
pktTime = time.strftime("%Y%m%d%H%M%S", tempTime)
domain = i[TLS_Ext_ServerName].servernames[0].servername.decode()
row=["TLS",pktTime,i[IP].src,i[IP].dst,domain] # queue of the row MUST BE [pktType,capTime,srcIP,dstIP,`domain`]
dataList.append(row)

# analyze httpPkt, fetch the fields, and push them into the dataList
httpPList = plist.filter(lambda x: ((x.haslayer(HTTPRequest) == True)))
for i in httpPList:
tempTime = time.localtime(int(i.time))
pktTime = time.strftime("%Y%m%d%H%M%S", tempTime)
domain = i[HTTPRequest].Host.decode()
row = ["HTTP", pktTime, i[IP].src, i[IP].dst,domain] # queue of the row MUST BE [pktType,capTime,srcIP,dstIP,`domain`]
dataList.append(row)
analyzeMain(sniffMain())
return dataList

"""

# for testing

if __name__ == "__main__":
dateList=getDataList()
for i in dateList:
print(i)
"""


main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import database
import sniffResults

if __name__ == "__main__":
print("中断程序以获取结果,并将结果存往数据库")
sqlObj=database.linkSql()
dataList=sniffResults.getDataList()
sqlObj.exec("SELECT MAX(`index`) FROM sniffTable")
lastRecordIndex=sqlObj.cursors.fetchone()["MAX(`index`)"] # store the lasest index in the table

# insert the row of dataList in table
commandInsert=sqlObj.assembleSqlText("sniffTable",dataList)
#print(commandInsert)
sqlObj.exec(commandInsert)

# show the added results
commandSearchNew="SELECT * FROM sniffTable WHERE `index`> " + str(lastRecordIndex)
#print (commandSearchNew)
sqlObj.exec(commandSearchNew)
print("NEW RECORDS:")
for i in sqlObj.cursors.fetchall():
print (i["index"],"\t",i["pktType"],"\t",i["capTime"],"\t",i["srcIP"],"--->",i["dstIP"],"doamin:",i["domain"])

参考

使用python抓包并分析后存入数据库,或直接分析tcpdump和wireshark抓到的包,并存入数据库 - 花柒博客-原创分享

  • 标题: 利用python抓取分析数据包并存入mysql
  • 作者: 7cmb
  • 创建于 : 2023-11-30 17:06:36
  • 更新于 : 2024-05-15 15:29:42
  • 链接: https://7cmb.com/利用python抓取分析数据包并存入mysql/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
此页目录
利用python抓取分析数据包并存入mysql