- 这个只是说了Django后端的搭建,前端我就不讲了,
- 可以使用这个网站进行测试
- http://www.websocket-test.com/
- Django3.1
- Python3.7
- Redis 必须使用5.0 以上
-
Django环境搭建自己搭去
-
安装channels
pip install -U channels
-
在这里,我们使用redis做为channels的通道后端,以便支持更多的功能
pip install channels_redis
-
dwebsocket 安装这个可以用原生的写法
pip install dwebsocket
# setting.py
# 添加应用
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'Your-app',
'channels', # 添加的channels应用,固定写发
]
# 新增的配置,固定写法
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [('127.0.0.1', 6379)],
},
# 配置路由的路径
# "ROUTING": "exmchannels.routing.channel_routing",
},
}
# 下面是要根据自己的写路由的位置来确定的
ASGI_APPLICATION = 'apps.blog.routing.application'
起步
-
在你的应用中创建两个文件
- routing.py (相当Django的urls,当然你可以换成你想要的名字)
- consumers.py (=views.py)
-
路由配置
# apps/blog/routing.py from channels.routing import ProtocolTypeRouter, URLRouter from channels.auth import AuthMiddlewareStack from django.urls import path # ------ from apps.blog.consumers import ChatConsumer # ChatConsumer是你创建的视图 # 这个位置的application就是你在setting中配置的 application = ProtocolTypeRouter({ # Empty for now (http->django views is added by default) 'websocket': AuthMiddlewareStack( URLRouter([ path('mes', ChatConsumer.as_asgi()), # 这个配置就像路由配置,没有什么区别 ]) ), })
# apps/blog/consumers.py
import json
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
from channels.layers import get_channel_layer
from django.http import HttpResponse
class ChatConsumer(WebsocketConsumer):
# connect disconnect receive 是固定写法
def connect(self):
# self.scope 这个里面有很多参数,可以自己打印出来看看,然后找到自己想要的
self.room_name = self.scope['url_route']['kwargs'].get('room_name', 'room_name')
# room_group_name 这个是一个分组,最好你主动推送时是要用到这个名字的,所以这个名字命名一定要有点规则,需要注意的是,这个分组里面是可以放多个连接的,然后推送时,会推送到这个组里面所有用户
self.room_group_name = 'chat_%s' % self.room_name
# 固定写法,加入组,如果组已存在,则加入,不存在,则先创建后加入
async_to_sync(self.channel_layer.group_add)(
self.room_group_name,
self.channel_name
)
# 开启连接
self.accept()
def disconnect(self, close_code):
# 断开连接时调用
async_to_sync(self.channel_layer.group_discard)(
self.room_group_name,
self.channel_name
)
# Receive message from WebSocket
def receive(self, text_data):
# 接受客户端的信息,你处理的函数
text_data_json = json.loads(text_data)
message = text_data_json['message']
# Send message to room group
async_to_sync(self.channel_layer.group_send)(
self.room_group_name,
{
'type': 'chat_message',
'message': message
}
)
# Receive message from room group
def chat_message(self, event):
# message = event['message']
# Send message to WebSocket
self.send(text_data=json.dumps({
'message': '我是主动发送'
}))
# 这个是主动推送的关键,一定要写成一个函数,然后在调用
def push(username, event):
"""
:param username: 上面定义的组的名字
:param event: 你要返回的消息内容,可以是str,dict
:return:
"""
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
username,
{
"type": "chat_message", # 这个就是你上面 ChatConsumer 定义的 chat_message方法
"event": event # 是chat_message方法接受的参数
}
)
def mes_send(request):
# 主动推送def push(username, event):
push('chat_room_name', '我推送的信息')
return HttpResponse()
到这里就结束了
写在最后-
如果还想比较一下,就可以去看着这个
https://www.jb51.net/article/165580.htm
-
官网
https://channels.readthedocs.io/en/latest/index.html



