实验将通过朴素贝叶斯模型分类邮件数据,判断邮件属于垃圾邮件还是正常邮件,邮件数据包括训练集train(正常邮件normal,垃圾邮件spam),测试集test,中文停用词文件cn_stopwords.txt
import os
'''1.获取邮件列表'''
# 训练数据中的垃圾邮件目录
train_spam_dir = './data/train/spam'
# 训练数据中的垃圾邮件列表
train_spam_list = os.listdir(train_spam_dir)
# 训练数据中的正常邮件目录
train_normal_dir = './data/train/normal'
# 训练数据中的垃圾邮件列表
train_normal_list = os.listdir(train_normal_dir)
# 测试数据中的邮件目录
test_dir = './data/test'
# 测试数据中的邮件列表
test_list = os.listdir(test_dir)
import re
import jieba
'''2、实现两个函数,用于获取词集和条件概率的实现'''
# 获取邮件词集
def get_word_set(email_path):
# 该邮件的文本内容
word_list = ''
# 中文停用词表的文件路径
stop_word_path = './data/cn_stopwords.txt'
# 将中文停用词表文件读取为Python列表
with open(stop_word_path,'r',encoding='utf-8') as f:
stop_word_list = [line.strip() for line in f.readlines()]
# 遍历邮件文本的每一行
with open(email_path,'r',encoding='gbk') as email_f:
for line in email_f.readlines():
# 使用正则表达式将 line 中的非中文字符替换为空串
s = re.sub(r'[^u4e00-u9fa5]','',line)
# 将每行文本分词后的列表添加到结果列表
word_list += s
word_list = list(jieba.cut(word_list))
# 过滤停用词,并去重
return set([w for w in word_list if w not in stop_word_list])
# email_path = './data/train/normal/201'
# word_set = get_word_set(email_path)
# print(word_set)
from collections import Counter
def get_word_frequency(email_dir,email_list):
# 该类邮件的单词列表
word_list = []
# 遍历所有邮件
for email in email_list:
# 邮件路径
email_path = os.path.join(email_dir,email)
# 将每封邮件的词集转化为列表后进行合并
word_list += list(get_word_set(email_path))
# 返回每个单词在该类邮件中出现的频率
return {w:(count/len(email_list)) for w,count in Counter(word_list).items()}
# word_frequency = get_word_frequency(train_normal_dir,train_normal_list)
# print(len(word_frequency))
'''3、计算先验概率'''
# 训练集中垃圾邮件和正常邮件的数量
spam_num,normal_num = len(train_spam_list),len(train_normal_list)
# 计算先验概率
p_spam = spam_num / (spam_num + normal_num)
p_normal = 1 - p_spam
print(p_spam,p_normal)
# 0.5394809880655009 0.46051901193449907
'''4、将获取邮件的词频作为条件概率估计值'''
# 垃圾邮件词频
spam_word_probability = get_word_frequency(train_spam_dir,train_spam_list)
# 正常邮件词频
normal_word_probability = get_word_frequency(train_normal_dir,train_normal_list)
'''5、垃圾邮件过滤'''
# 定义一个字典,用于保存测试集邮件的预测结果
y_pred = dict()
# 遍历测试邮件列表
for email in test_list:
# 测试邮件路径
email_path = os.path.join(test_dir,email)
# 测试邮件词集
test_word = get_word_set(email_path)
# 用于朴素贝叶斯分类的单词需要满足:1.存在于测试邮件词集 2. 在两类邮件中有相应的词频
test_word = test_word & spam_word_probability.keys() & normal_word_probability.keys()
# 初始化联合概率为先验概率:P(X,Y=spam) = P(Y=spam)
p_words_spam = p_spam
# 初始化联合概率为先验概率:P(X,Y=normal) = P(Y=normal)
p_words_normal = p_normal
# 遍历邮件分类词集
for w in test_word:
# 更新联合概率: P(X,Y=spam) *= P(X_i|Y=spam),i=[1-n]
p_words_spam *= spam_word_probability[w]
# 更新联合概率: P(X,Y=normal) *= P(X_i|Y=normal),i=[1-n]
p_words_normal *= normal_word_probability[w]
# 计算后验概率
p_spam_words = p_words_spam / (p_words_normal + p_words_spam)
# 大于阈值判定为垃圾邮件,否则为正常邮件
if p_spam_words > 0.5:
y_pred[email] = 1
else:
y_pred[email] = 0
# print(y_pred)
# 准确率评估
def get_accuracy(y_pred):
# 预测正确的样本个数
correct_count = 0
# 遍历预测结果
for email, pred in y_pred.items():
# 邮件名<1000的为正常邮件;邮件名>1000的为垃圾邮件
if (int(email) < 1000 and pred == 0) or (int(email) > 1000 and pred == 1):
correct_count += 1
# 返回预测准确率
return correct_count / len(y_pred)
accuracy = get_accuracy(y_pred)
print(accuracy)
# 0.9821428571428571



