系列文章目录
基础:requests的基本使用:QQ机器人基础 传送门
第一章 :pythonQQ机器人系列:使用requests实现QQ机器人聊天(一)
目录
系列文章目录
前言
一、request库和go-cqhttp框架
二、使用步骤
1.引入库
二.源代码
三.源代码讲解
一.
二.
三.
四.
五.
总结
前言
更偏向零基础。
通过使用requests进行实现QQ机器人(说白了就是使用爬虫),requests库相对于使用socket库和requests库来说是相对简单的,所以比较适合那些,短时间内socket库学不完的,但是因为socket库没用的原因,相应的会使我们制作QQ机器人有较多的缺点,所以能尽量学socket库就加把劲去学socket。
也建议大家学习一下深度学习之类的,毕竟别人家的机器人,永远没有自己的机器人用起来舒服,所以去学个机器学习,深度学习什么的自己训练一个属于自己的机器人,这里我们使用的是青客云的智能聊天API。
这系列的文章会持续更新也更偏向零基础,当然水的时间会比较长。希望大家多多支持。如有不对的地方请斧正。
此代码及后续代码会持续优化。
一、request库和go-cqhttp框架
对于requests的基本使用可以参考requests基础,且前文讲过的东西后文不再讲解。前文仅讲过:url,headers,text与json的区别,json的键值取值。
二、使用步骤
1.引入库
这次我们仅使用requests库以及go-cqhttp这个框架。
先下载requests。
pip install requests
再下载go-cqhttp。
找一个属于自己机器版本的下载即可,下载完后自行运行程序并加载好,360等可能会弹提示,这个是无毒的可以放心。
其他的是不需要改动的,改好后在运行一次go-cqhttp.bat就行了。
这个样就算是启动好了,当然有些可能开了设备锁,这个自行解决就可以了,有些时候扫码弄设备锁会弹链接,复制一下到浏览器打开扫一下就行了。
当然,大家也可能会遇到风控,当然风控是不会封号的,只是限制了你不能用框架了,把QQ号晾几天,水水群就行了。
二.源代码
我就先把代码放上。
import time
import requests
message_id_collect = [] #放入获取的消息ID以便进行判断是否有人说话
message_id_collect_again = [] #放入获取的消息ID以便进行判断是否有人说话
message_append = [] #添加获取的内容
def res_get_error():#获取最新的消息数据
res_get = requests.get(url=url, headers=headers).json()
mes_get = res_get["data"]["messages"]
for mes_get_append in mes_get:
message_append.append(mes_get_append)
#print(message_append)
def res_separate_mes():
mes_get_append = message_append[-1] #最新的消息记录
mes_content = mes_get_append["message"] #拿取消息内容
mes_content_sender = mes_get_append["sender"]["nickname"] #拿取消息发送者的名字
print("获取-----:" + mes_content_sender + ':' + mes_content) # 检查是否正常
API_get_answer(mes_content)
def API_get_answer(mes_content):#聊天API的调用,并获取回答
urls="http://api.qingyunke.com/api.php?key=free&appid=0&msg={}".format(mes_content)
answer_get = requests.get(url=urls,headers=headers).json()
answer_content = answer_get["content"]#获取API回答的内容
print("回答:" + answer_content)#检察是否可以正常运行
answer_post(answer_content)
def answer_post(answer_content):
group_id = res_mes_post["group_id"]#群号
msg = answer_content
urls = "http://127.0.0.1:5700/send_group_msg?group_id=" + group_id + "&message=" + msg
#print(urls)
answer_post_use = requests.post(url=urls,headers=headers).json() #发送消息
#print(answer_post_use)
print("已回答")
answer_post_again()
def answer_post_again():#发消息前的消息ID
res_get_first = message_append[-1]
res_mes_id = res_get_first["message_id"]
message_id_collect.append(res_mes_id)
nums = message_id_collect[-1]
#print(nums)
return nums #返回nums以便进行对比
def answer_post_again_check():#发消息后的消息ID
res_get_again = message_append[-2]
res_mes_id_again = res_get_again["message_id"]
message_id_collect_again.append(res_mes_id_again)
numss = message_id_collect_again[-1]
#print(numss)
return numss # 返回nums以便进行对比
if __name__ == '__main__':
res_mes_post = {"instruction_message": "message_seq", "group_id": "736038975", "message_id": ""}
message_group_id = res_mes_post["group_id"]
url = "http://127.0.0.1:5700/get_group_msg_history?group_id=" + message_group_id
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3947.100 Safari/537.36"}
res = requests.get(url=url, headers=headers).json()
# print(res)#检查
while True:
res_get_error()
answer_post_again()
n = answer_post_again()
#print(message_id_collect)
res_separate_mes()
res_get_error()
time.sleep(1)
while True:
res_get_error()
answer_post_again_check()
m = answer_post_again_check()
time.sleep(1)
#print(message_id_collect_again)
if n == m:
print("无消息")
time.sleep(3)
pass
else:
print("有消息")
break
三.源代码讲解
一.
因为配置的原因。
我们就复制url去网页打开就可以。
接下来就是正常的引入使用的库和爬虫需要的请求头等。
####上面
import time
import requests
message_id_collect = [] #放入发消息前的消息ID
message_id_collect_again = [] #放入发消息后的消息ID以便进行判断是否有人说话
message_append = [] #添加获取的内容
####下面
if __name__ == '__main__':
res_mes_post = {"instruction_message": "message_seq", "group_id": "736038975", "message_id": ""}
message_group_id = res_mes_post["group_id"]
url = "http://127.0.0.1:5700/get_group_msg_history?group_id=" + message_group_id
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3947.100 Safari/537.36"}
res = requests.get(url=url, headers=headers).json()
# print(res)#检查
因为没有使用socket的原因,加上go-cqhttp的指令我没找到其他的获取最新消息的指令,我就只能使用获取历史消息的指令,这条指令是会获取最近的18或19条消息,因为我们没办法确定到底是18条还是19条,所以只能创建一个空的列表,从列表里拿取消息,这样就容易了许多。
讲一下这里的url怎么用,终结点就是http://127.0.0.1:5700,后面必加的内容比如说我们的获取群历史消息指令的一部分,就是http://127.0.0.1:5700/get_group_msg_history,好了,这样的话也得知道QQ群的账号吧,那么参数怎么写呢?这样的话就需要“?”来帮忙了,当然必须是英文输入法下的问号否则指令是错的,然后就是我们整体的获取群历史消息的指令了,
http://127.0.0.1:5700/get_group_msg_history?group_id=自己的群号,然后对于同等级的参数就使用“&”来实现,就像下面源代码讲解的第五部分的url:
http://127.0.0.1:5700/send_group_msg?group_id=群号&message=内容 。
看一下运行结果,当然你得把注释符号去掉。
二.
def res_get_error():#获取最新的消息数据
ressss = requests.get(url=url, headers=headers).json()
mes_get = ressss["data"]["messages"]
for mes_get_append in mes_get:
message_append.append(mes_get_append)
#print(message_append)
def res_get_error():#获取最新的消息数据
ressss = requests.get(url=url, headers=headers).json()
mes_get = ressss["data"]["messages"]
for mes_get_append in mes_get:
message_append.append(mes_get_append)
#print(message_append)
通过这几行代码获取我们的群历史消息,并通过键值取值获取主内容,再通过for...in...循环获取主内容里面的消息,并添加到空列表里。
下面是运行结果。
以下是运行代码,仅运行时使用,不要加到源代码中。
#import time
import requests
url = "http://127.0.0.1:5700/get_group_msg_history?group_id=736038975"
headers = {"User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3947.100 Safari/537.36"}
res = requests.get(url=url,headers=headers).json()
#print(res)#检查
#message_id_collect = [] #放入获取的消息ID以便进行判断是否有人说话
#message_id_collect_again = [] #放入获取的消息ID以便进行判断是否有人说话
message_append = [] #添加获取的内容
def res_get_error():#获取最新的消息数据
res_get = requests.get(url=url, headers=headers).json()
mes_get = res_get["data"]["messages"]
for mes_get_append in mes_get:
message_append.append(mes_get_append)
res_get_error()
print(message_append)
与网页中拿取的信息一样,只不过加在了列表里。
三.
def res_separate_mes():
mes_get_append = message_append[-1] #最新的消息记录
mes_content = mes_get_append["message"] #拿取消息内容
mes_content_sender = mes_get_append["sender"]["nickname"] #拿取消息发送者的名字
print("获取-----:" + mes_content_sender + ':' + mes_content) # 检查是否正常
API_get_answer(mes_content)
因为我们已经把想要获取的消息放在列表里,所以我们直接就在列表里拿取即可,我们想要的最近的消息是在最后一个,我们可以用切片的方法拿取。
我们就先用 message_append[-1] 拿取最后一个的信息,怎么样算一个呢?
这个样算一个,因为我们的网页中也是这样的信息,这就算是一个,所以我们切片也是拿的最后一个。
下面是获取消息的网页结构。
所以我们就用键值取值的方法进行获取发送的内容与发送消息的人,因为发送的消息需要再次发送到API中调用,我们就需要使用 API_get_answer(mes_content) 进行调用下面的函数,下面类似的函数调用不再说明。
我们运行看一下,下面是运行代码,及运行结果。
import requests
url = "http://127.0.0.1:5700/get_group_msg_history?group_id=736038975"
headers = {"User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3947.100 Safari/537.36"}
res = requests.get(url=url,headers=headers).json()
#print(res)#检查
#message_id_collect = [] #放入获取的消息ID以便进行判断是否有人说话
#message_id_collect_again = [] #放入获取的消息ID以便进行判断是否有人说话
message_append = [] #添加获取的内容
def res_get_error():#获取最新的消息数据
res_get = requests.get(url=url, headers=headers).json()
mes_get = res_get["data"]["messages"]
for mes_get_append in mes_get:
message_append.append(mes_get_append)
res_separate_mes()
def res_separate_mes():
mes_get_append = message_append[-1] #最新的消息记录
mes_content = mes_get_append["message"] #拿取消息内容
mes_content_sender = mes_get_append["sender"]["nickname"] #拿取消息发送者的名字
print("获取-----:" + mes_content_sender + ':' + mes_content) # 检查是否正常
if __name__ == '__main__':
res_get_error()
四.
def API_get_answer(mes_content):#聊天API的调用,并获取回答
urls="http://api.qingyunke.com/api.php?key=free&appid=0&msg={}".format(mes_content)
answer_get = requests.get(url=urls,headers=headers).json()
answer_content = answer_get["content"]#获取API回答的内容
print("回答:" + answer_content)#检察是否可以正常运行
answer_post(answer_content)
def API_get_answer(mes_content):#聊天API的调用,并获取回答
urls="http://api.qingyunke.com/api.php?key=free&appid=0&msg={}".format(mes_content)
answer_get = requests.get(url=urls,headers=headers).json()
answer_content = answer_get["content"]#获取API回答的内容
print("回答:" + answer_content)#检察是否可以正常运行
answer_post(answer_content)
这里我们用的是青客云的API,暂时也找不到多么好的API了,所以就用这个了。
这就是url的来源,所以我们调用的时候只需要改一下后面的你好就行了。
我们就使用 .format() 来改就行了,改的地方就是url中放大括号的地方了, mes_content 就是参数,也就是我们获取的消息的内容。因为它的请求也是url,所以我们可以到网页中打开看一下到底什么样子。
这就看见了回答的消息内容,这样一看,也挺简单的,就两个东西,result和content,看来这个content就是我们要的东西了,我们就通过键值取值获取它即可,不过我们只获取消息可不行,还得让他把消息发送过去才行,所以我们就在写一个函数,顺便在这个地方的最后调用一下这个函数。
我们运行看一下,下面是运行代码,及运行结果。
import requests
url = "http://127.0.0.1:5700/get_group_msg_history?group_id=736038975"
headers = {"User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3947.100 Safari/537.36"}
res = requests.get(url=url,headers=headers).json()
#print(res)#检查
#message_id_collect = [] #放入获取的消息ID以便进行判断是否有人说话
#message_id_collect_again = [] #放入获取的消息ID以便进行判断是否有人说话
message_append = [] #添加获取的内容
def res_get_error():#获取最新的消息数据
res_get = requests.get(url=url, headers=headers).json()
mes_get = res_get["data"]["messages"]
for mes_get_append in mes_get:
message_append.append(mes_get_append)
res_separate_mes()
def res_separate_mes():
mes_get_append = message_append[-1] #最新的消息记录
mes_content = mes_get_append["message"] #拿取消息内容
mes_content_sender = mes_get_append["sender"]["nickname"] #拿取消息发送者的名字
print("获取-----:" + mes_content_sender + ':' + mes_content) # 检查是否正常
API_get_answer(mes_content)
def API_get_answer(mes_content):#聊天API的调用,并获取回答
urls="http://api.qingyunke.com/api.php?key=free&appid=0&msg={}".format(mes_content)
answer_get = requests.get(url=urls,headers=headers).json()
answer_content = answer_get["content"]#获取API回答的内容
print("回答:" + answer_content)#检察是否可以正常运行
if __name__ == '__main__':
res_get_error()
五.
def answer_post(answer_content):
group_id = res_mes_post["group_id"]#群号
msg = answer_content
urls = "http://127.0.0.1:5700/send_group_msg?group_id=" + group_id + "&message=" + msg
#print(urls)
answer_post_use = requests.post(url=urls,headers=headers).json() #发送消息
#print(answer_post_use)
print("已回答")
def answer_post(answer_content):
group_id = res_mes_post["group_id"]#群号
msg = answer_content
urls = "http://127.0.0.1:5700/send_group_msg?group_id=" + group_id + "&message=" + msg
#print(urls)
answer_post_use = requests.post(url=urls,headers=headers).json() #发送消息
#print(answer_post_use)
print("已回答")
因为刚刚只是获取的回答的内容,我们还需要把这些回答的消息回答到QQ上。
所以我们就使用 post 来向网页提交,通过这样一个爬虫来实现对于QQ上的消息回复,我们结合go-cqhttp的指令来组成url。
当然这里的 res_mes_post 也是为了剩下在别处换群号的时间,当然这些以后都会优化的。这里也说一下大括号里面也是可以键值取值的。
下面是我们的消息内容,也就是回答消息之前。
这是回答之后的。
下面是运行代码及运行结果。
import requests
message_id_collect = [] #放入获取的消息ID以便进行判断是否有人说话
message_id_collect_again = [] #放入获取的消息ID以便进行判断是否有人说话
message_append = [] #添加获取的内容
def res_get_error():#获取最新的消息数据
res_get = requests.get(url=url, headers=headers).json()
mes_get = res_get["data"]["messages"]
for mes_get_append in mes_get:
message_append.append(mes_get_append)
res_separate_mes()
def res_separate_mes():
mes_get_append = message_append[-1] #最新的消息记录
mes_content = mes_get_append["message"] #拿取消息内容
mes_content_sender = mes_get_append["sender"]["nickname"] #拿取消息发送者的名字
print("获取-----:" + mes_content_sender + ':' + mes_content) # 检查是否正常
API_get_answer(mes_content)
def API_get_answer(mes_content):#聊天API的调用,并获取回答
urls="http://api.qingyunke.com/api.php?key=free&appid=0&msg={}".format(mes_content)
answer_get = requests.get(url=urls,headers=headers).json()
answer_content = answer_get["content"]#获取API回答的内容
print("回答:" + answer_content)#检察是否可以正常运行
answer_post(answer_content)
def answer_post(answer_content):
group_id = res_mes_post["group_id"]#群号
msg = answer_content
urls = "http://127.0.0.1:5700/send_group_msg?group_id=" + group_id + "&message=" + msg
#print(urls)
answer_post_use = requests.post(url=urls,headers=headers).json() #发送消息
#print(answer_post_use)
print("已回答")
if __name__ == '__main__':
res_mes_post = {"instruction_message": "message_seq", "group_id": "736038975", "message_id": ""}
message_group_id = res_mes_post["group_id"]
url = "http://127.0.0.1:5700/get_group_msg_history?group_id=" + message_group_id
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3947.100 Safari/537.36"}
res = requests.get(url=url, headers=headers).json()
res_get_error()
六.
def answer_post_again():#发消息前的消息ID
res_get_first = message_append[-1]
res_mes_id = res_get_first["message_id"]
message_id_collect.append(res_mes_id)
nums = message_id_collect[-1]
#print(nums)
return nums #返回nums以便进行对比
def answer_post_again_check():#发消息后的消息ID
res_get_again = message_append[-2]
res_mes_id_again = res_get_again["message_id"]
message_id_collect_again.append(res_mes_id_again)
numss = message_id_collect_again[-1]
#print(numss)
return numss # 返回nums以便进行对比
因为只使用了requests,所以就导致判断是否有新消息成了麻烦的问题,加上我们的持续的回复是会用到 while True 的,下面会有(这个代码的意思是一直循环除非用 break 打破这个循环),所以总不能让机器人回完消息后一直回复自己的消息,我们就需要判断一下是否有新消息,也尽量避免机器人回复自己的消息,造成一个恶性循环。
因为我们一开始就把内容爬取并添加到了 message_append 这个列表里,所以我们就从这里面获取消息的ID,因为刚才我们只是把消息获取并发送,也没有再次爬取消息添加到列表里,所以列表的最后一个就是你机器人回复的消息,所以我们就拿取这个消息的ID,并通过 return 这个语法,把获取的消息ID返回。这是第一个函数。
第二个函数和第一个函数差不多,这个函数只不过拿取的消息是倒数第二个,而不是倒数第一个,因为机器人发消息后刚才回复的消息就会由倒数第一个消息变成倒数第二个消息,所以我们要拿取第二个的消息的ID。如下图。
回答前,最新的消息倒一。
回答后,刚才的消息为倒二。
七.
if __name__ == '__main__':
res_mes_post = {"instruction_message": "message_seq", "group_id": "736038975", "message_id": ""}
message_group_id = res_mes_post["group_id"]
url = "http://127.0.0.1:5700/get_group_msg_history?group_id=" + message_group_id
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3947.100 Safari/537.36"}
res = requests.get(url=url, headers=headers).json()
# print(res)#检查
while True:
res_get_error()
answer_post_again()
n = answer_post_again()
#print(message_id_collect)
res_separate_mes()
res_get_error()
time.sleep(1)
while True:
res_get_error()
answer_post_again_check()
m = answer_post_again_check()
time.sleep(1)
#print(message_id_collect_again)
if n == m:
print("无消息")
time.sleep(3)
pass
else:
print("有消息")
break
首先,调用 res_get_error() 这个函数,通过这个函数进行群历史消息,再通过调用answer_post_again() 获取发消息前的要回复的消息的ID,再调用 res_separate_mes() 这个函数进行获取消息,获取机器人的回答以及回复消息,再调用 res_get_error() 这个函数把发消息后的内容爬取,之后我们就需要进行判断是否有人发送新的消息,刚才上面也说了,机器人回复后,刚刚被回复的消息就成了倒二,这样第一个函数获取的ID就等于第二个函数获取的ID,我们通过一直循环获取倒二的消息ID,如果有人发消息,机器人回复的消息就变成了倒二,他的消息ID就不等于刚才回复的消息的ID了,就证明有新的消息,这个时候,我们就用 break 打破这个循环,让他再次执行回复。
总结
单纯一个requests是挺麻烦的,尽量也去学习一下socket库,这样便能省下不少的代码,也能节约时间,相比之前,只一个requests,不仅耗时,而且因为爬虫的原因也会有许多的麻烦。



