0x00 思路

  • 分词
  • 特征
  • 训练

0x01 数据

在github上看到https://github.com/SparkSharly/DL_for_xss 这个项目,感觉不错,学习一下,数据集项目中已经附带,就直接使用了

  • eg. normal_examples.csv (20w+取部分)

  • eg. xssed.csv (4W+取部分)

0x02 分词

def GeneSeg(payload):
    #数字泛化为"0"
    payload=payload.lower()
    payload=unquote(unquote(payload))
    payload,num=re.subn(r'\d+',"0",payload)
    #替换url为”http://u
    payload,num=re.subn(r'(http|https)://[a-zA-Z0-9\[email protected]&/#!#\?]+', "http://u", payload)
    #分词
    r = '''
        (?x)[\w\.]+?\(
        |\)
        |"\w+?"
        |'\w+?'
        |http://\w
        |</\w+>
        |<\w+>
        |<\w+
        |\w+=
        |>
        |[\w\.]+
    '''
    return nltk.regexp_tokenize(payload, r)

0x03 特征

  • 建立xss语义模型,构建词汇表

统计高频出现的300词构建词表

words=[]
datas=[]
with open("data/xssed.csv","r",encoding="utf-8") as f:
    reader=csv.DictReader(f,fieldnames=["payload"])
    for row in reader:
        payload=row["payload"]
        word=GeneSeg(payload)
        datas.append(word)
        words+=word

#构建数据集
def build_dataset(datas,words):
    count=[["UNK",-1]]
    counter=Counter(words)
    count.extend(counter.most_common(vocabulary_size-1))
    #print(count)
    vocabulary=[c[0] for c in count]
    #print(vocabulary)
    data_set=[]
    for data in datas:
        d_set=[]
        for word in data:
            if word in vocabulary:
                d_set.append(word)
            else:
                d_set.append("UNK")
                count[0][1]+=1
        data_set.append(d_set)
    print(data_set)
  • word2vec建模
model=Word2Vec(data_set,size=embedding_size,window=skip_window,negative=num_sampled,iter=num_iter)

空间维度设置为32维

查看建模结果,与</script>最语义最相近的词

  • 数据处理
def pre_process():
    with open(vec_dir,"rb") as f :
        word2vec=pickle.load(f)
        #词表('UNK': 0, '0': 1)
        dictionary=word2vec["dictionary"]
        #维度值
        embeddings=word2vec["embeddings"]
        #反向词表(num和word调换,0: 'UNK', 1: '0')
        reverse_dictionary = word2vec["reverse_dictionary"]
    xssed_data=[]
    normal_data=[]
    with open("data/xssed.csv","r",encoding="utf-8") as f:
        reader = csv.DictReader(f, fieldnames=["payload"])
        for row in reader:
            payload=row["payload"]
            #分词['search=', '</script>', '<img', 'src=', 'worksinchrome', 'colon', 'prompt', 'x0', '0', 'x0', 'onerror=', 'eval(', 'src', ')', '>']
            word=GeneSeg(payload)
            xssed_data.append(word)
    with open("data/normal_examples.csv","r",encoding="utf-8") as f:
        reader = csv.DictReader(f, fieldnames=["payload"])
        for row in reader:
            payload=row["payload"]
            word=GeneSeg(payload)
            normal_data.append(word)
    xssed_num=len(xssed_data)
    normal_num=len(normal_data)
    #生成标签[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
    xssed_labels=[1]*xssed_num
    normal_labels=[0]*normal_num
    datas=xssed_data+normal_data
    labels=xssed_labels+normal_labels
    def to_index(data):
        d_index=[]
        for word in data:
            if word in dictionary.keys():
                d_index.append(dictionary[word])
            else:
                d_index.append(dictionary["UNK"])
        return d_index
    #数据转换[23, 5, 34, 14, 0, 0, 0, 0, 1, 0, 81, 0, 0, 3, 2]
    datas_index=[to_index(data) for data in datas]
    #长度不足maxlen的用-1在前端填充
    '''
    [[ -1  -1  -1 ...   0   3   2]
    [ -1  -1  -1 ...  10  17   1]
    [ -1  -1  -1 ... 150   0  71]
    ...
    [ -1  -1  -1 ...  11   2  55]
    [ -1  -1  -1 ...   5  24   1]
    [ -1  -1  -1 ...   1   3   5]]
    '''
    datas_index=pad_sequences(datas_index,value=-1,maxlen=maxlen)
    #从有序列表中选k个作为一个片段返回,eg.[7, 6, 3, 2, 5, 8, 0, 1, 10, 4, 9]
    rand=random.sample(range(len(datas_index)),len(datas_index))
    #数据简单随机排序
    datas=[datas_index[index] for index in rand]
    labels=[labels[index] for index in rand]

    datas_embed=[]
    #获取UNK的维度,本例中是32
    dims=len(embeddings["UNK"])
    n=0
    for data in datas:
        data_embed = []
        for d in data:
            if d != -1:
                #如果不是填充数据,就把真实纬度值替换
                data_embed.extend(embeddings[reverse_dictionary[d]])
            else:
                data_embed.extend([0.0] * dims)
        datas_embed.append(data_embed)
        '''
        [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 
        0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 
        0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,··· -0.5644003, 0.41219762, -1.2313833, -1.3566964, 
        -0.74316794, -1.2668883, 1.0586963, 1.5969143, 0.21956278, 1.1538218, -0.35007623, 0.21183407, 
        -0.53830135, 1.7361579, -0.08175806, -1.1915175, -1.7790002, -1.1044971, 0.40857738]
        '''
        n+=1
        if n%10000 ==0:
            print(n)
    #七成训练,三成测试 
    train_datas,test_datas,train_labels,test_labels=train_test_split(datas_embed,labels,test_size=0.3)
    return train_datas,test_datas,train_labels,test_labels

0x04 SVM训练

通过SVM算法进行模型训练

train_datas, train_labels=pre_process()
print("Start Train Job! ")
start = time.time()
model=LinearSVC()
model = SVC(C=1.0, kernel="linear")
model.fit(train_datas,train_labels)
model.save(model_dir)
end = time.time()
print("Over train job in %f s" % (end - start))
print("Start Test Job!")
start=time.time()
pre=model.predict(test_datas)
end=time.time()
print("Over test job in %s s"%(end-start))
precision = precision_score(test_labels, pre)
recall = recall_score(test_labels, pre)
print("Precision score is :", precision)
print("Recall score is :", recall)
with open(model_dir,"wb") as f:
    pickle.dump(model,f,protocol=2)
print("wirte to ",model_dir)

精确率和召回率: