HTTP是一种无状态的协议,客户端向服务端发起请求,在通过用户名和密码进行用户认证后,HTTP协议不会记录认证状态,所以用户在下次请求时还是需要再次认证。为了能够保持http连接状态,需要在用户首次认证成功后,在服务端存储用户的登录信息,并在响应时传递给浏览器,由浏览器保存在cookies中。下次请求时,携带者cookies信息,服务器通过cookies,获取session信息,达到保持连接状态的目的。
9.1、COOKIES 9.1.1、cookies特点COOKIES是储存在用户本地浏览器上的数据,具有以下特点:
- 以键值对的方式存储
- 由服务端生成,客户端存储,在Django中,由HttpResponse 对象完成设置。
- cookies中的数据,每次请求时都会带到服务端。
- cookies大小不超过4k,由于每次请求都会携带,数据过大影响请求性能
- cookies不安全,一般用于存放不敏感的数据。
-
添加cookies,语法如下:
HttpResponse.set_cookie(key, value='', max_age=None, expires=None) render(request, 'xxx').set_cookie(key, value='', max_age=None, expires=None) # max_age:存活时间,单位为s # expires:过期时间,字符串或datetime.datetime对象 # 如果max_age和expires都未设置,关闭浏览器失效
-
获取cookies,语法如下:
request.COOKIES.get('key', default=None) request.COOKIES['key'] # key不存在报错 -
删除cookies,语法如下:
HttpResponse.delete_cookie(key) render(request, 'xxx').delete_cookie(key) # 存在则删除,不存在不报错
在服务端设置cookies:
视图函数:
from django.http import HttpResponse
def test_cookie(request):
resp = HttpResponse("abc")
resp.set_cookie("username",'xxx',3600)
return resp
客户端通过路由调用视图函数后,查看浏览器,cookies成功保存:
每次请求时,浏览器均会带上cookies信息:
服务端获取cookies:
def test_cookie(request):
resp = HttpResponse("OK!!!")
cookie1 = request.COOKIES.get('username')
cookie2 = request.COOKIES.get('username1')
cookie3 = request.COOKIES['username']
print(cookie1, cookie2, cookie3)
return resp
# xxx None xxx
删除cookies:
def test_cookie(request):
resp = HttpResponse("OK!!!")
resp.delete_cookie("username")
return resp
调用视图函数后,查看浏览器,cookies已被清除。
9.2、sessionsession表示会话控制,存在服务器上,主要用于http状态保持。在cookies中保存sessionid,用以保持状态。
session对象是一个SessionStore类型的对象,可以用类字典的方式操作。
9.2.1、session设置(Django)-
保存session
request.session['KEY'] = VALUE
-
获取session
VALUE = request.session['KEY'] VALUE = request.session.get('KEY', default=None) -
删除session
del request.session['KEY']
在本例中,登录后通过session来保持连接状态,发送请求。
第一步:创建一个Django项目,并启动2个应用:user应用负责管理用户登录,interface应用负责发送服务请求。
第二步:设置session过期时间:Django中settings.py设置如下:
SESSION_COOKIE_AGE = 30*60 # 设置过期时间30分钟,默认为两周 SESSION_SAVE_EVERY_REQUEST = True # 每次请求后重新保存 SESSION_EXPIRE_AT_BROWSER_CLOSE = True # 设置关闭浏览器时失效
第三步:创建路由和视图函数对应关系,采用分布式路由:
主路由配置:
from django.contrib import admin
from django.urls import path
from django.conf.urls import include, url
urlpatterns = [
path('admin/', admin.site.urls),
url('^user', include('user.urls')),
url('^interface', include('interface.urls')),
]
user.urls路由配置:
from django.conf.urls import url
from . import views
urlpatterns = [
url(r'^/login$', views.login_view),
url(r'^/logout$', views.logout_view),
]
interface.urls路由配置:
from django.conf.urls import url
from . import views
urlpatterns = [
url(r'^/test$', views.interface_view),
url(r'^/get_time$', views.get_time_view),
]
第四步:编写user视图函数views.py和模板,视图函数包含登录和登出功能:
from django.shortcuts import render
from django.http import HttpResponse, HttpResponseRedirect
# Create your views here.
# 用户名密码表,实际一般存于数据库中。
USERS = [
{"username": "zhangsan", "password": "12345"},
{"username": "lisi", "password": "67890"},
]
def login_view(request):
if request.method == 'GET':
# 先检查是否登录,如果已登录,直接跳转到功能测试界面
# 检查session
if 'username' in request.session:
# 若已登录,重定向到接口测试界面
return HttpResponseRedirect('/interface/test')
# session中无用户信息,则返回登录界面
return render(request, 'login.html')
# 登录逻辑
if request.method == "POST":
username = request.POST.get('username')
password = request.POST.get('password')
user_info = {"username": username, "password": password}
if user_info not in USERS:
return HttpResponse("用户不存在或账号密码错误!")
# 用户校验通过,设置session
request.session['username'] = username
# 登录成功后,跳转到接口测试界面
resp = HttpResponseRedirect('/interface/test')
return resp
def logout_view(request):
# 登出
# 删除session
if 'username' in request.session:
del request.session['username']
resp = HttpResponseRedirect('/user/login')
# 删除cookies
resp.delete_cookie("sessionid")
return resp
模板:login.html
登录
第五步:编写interface视图函数和模板。
通过装饰器检查登录状态:
from django.http import HttpResponse, HttpResponseRedirect
# 检查是否登录
def check_login(fn):
def wrap(request, *args, **kwargs):
# 检查用户是否登录
# 检查session
if 'username' not in request.session:
return HttpResponseRedirect('/user/login')
return fn(request, *args, **kwargs)
return wrap
视图函数:
from django.shortcuts import render
from common import check_login
from django.http import JsonResponse
from datetime import datetime, timedelta
# Create your views here.
@check_login.check_login
def interface_view(request):
# 主页
return render(request, 'interface_test.html')
# 获取当前时间的函数,需要登录才可使用。
@check_login.check_login
def get_time_view(request):
country = 'china'
gmt_time = datetime.now()
if request.POST.get("country") == '2':
country = 'United Kingdom'
gmt_time = datetime.now() + timedelta(hours=-8)
elif request.POST.get("country") == '3':
country = 'Japan'
gmt_time = datetime.now() + timedelta(hours=1)
elif request.POST.get("country") == '4':
country = 'United States'
gmt_time = datetime.now() + timedelta(hours=-13)
resp = {"country":country,"time":gmt_time.strftime("%Y-%m-%d %H:%M:%S")}
return JsonResponse(resp)
模板:
Title
第七步:启动服务,前端登录界面如下:
登录后,浏览器cookies中自动保存sessionid:
此时请求接口,可以正常获取响应:
退出登录后,cookies被清除,此时请求接口,直接跳转到登录界面。
二、token令牌传统的session认证存在如下问题:
- 每个用户的登录信息都会保存到服务器的session中,用户增多,服务端开销变大。
- 非BS模式不适用,session依赖于cookies,移动端等经常不支持cookies。
- 前后端分离的系统中不适用
- cookie无法跨域。
为了解决如上问题,可以采用token认证。
2.1、base64base64是字节码编码方式的一种(非加密)。编码时以将字符串拆为3个一组,每组编码后的长度为4。所以生成的编码长度是4的倍数。python实现示例如下:
import base64 s = b'good' # 注意:是对字节串编码,编码后的结果也是字节串 print(base64.b64encode(s)) # b'Z29vZA==' s2 = b'Z29vZA==' print(base64.b64decode(s2)) # b'good'
base64编码中包含两个特殊符号:+和/。对url进行base64编码,生成的编码中如果带了+和/会破坏url结构。(地址解析时,+会转换为空格,/用作目录分隔)。所以,如果是对url地址编码,需要用到如下两个函数:
base64.urlsafe_b64encode() base64.urlsafe_b64decode()
作用同b64encode/b64decode,只是会将编码后的字符串中的+替换为-,/替换为_。
2.2、散列算法(对称加密)散列算法具有如下特点:
- 任意输入,定长输出(如:md5长度为32,sha256长度为64;)。
- 单向性,不可逆。
使用示例如下:
import hashlib str1 = b'good' s1 = hashlib.md5() # 创建md5对象 s1.update(str1) # 添加待加密的字符串 print(s1.hexdigest()) # 生产16进制结果 s2 = hashlib.sha256() s2.update(str1) print(s2.hexdigest()) # 755f85c2723bb39381c7379a604160d8 # 770e607624d689265ca6c44884d0807d9b054d23c473c106c72be9de08b7376c2.3、hmac
hmac-sha256使用散列算法,同时结合一个加密密钥,用以保证数据完整性,通过可以用作身份验证,示例如下:
import hmac key1 = b"12345" str1 = b'good' h1 = hmac.new(key1, str1, digestmod='SHA256') # 第一个参数为key,第二个为待加密的字符串,第三个为hmac算法 print(h1.hexdigest()) # 236a72d97af4629e22399b1ecd5f29e2e38a7d6fdb9ff530477e6be9b470c8a32.4、JWT
JWT全称JSON Web Token。JWT是一个字符串,将用户信息保存到一个json串中,进行编码得到一个JWT token。且携带签名信息,可以对信息校验防止信息被篡改。JWT由以下3部分组成:
2.3.1、HeaderHeader是一个描述JWT元数据的JSON对象。格式如下:
{'alg':'HS256', 'typ':'JWT'}
alg表示签名使用的算法,默认为HMAC SHA256(HS256);typ表示token类别,JWT令牌必须为大写的JWT。
将JSON对象转为字符串,并用base64编码即可得到Header:
import base64
header = b'{"alg":"HS256","typ":"JWT"}'
h_b64 = base64.urlsafe_b64encode(header).replace(b'=',b'')
print(h_b64)
# b'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9'
2.3.2、Payload
Payload是有效载荷部分,格式为字典,分为公有声明和私有声明。
-
公有声明,为可选项,常用的公有声明如下:
exp 此token的到期时间的时间戳 iss 此token的签发者 aud 此token的用户 iat 此token的创建时间的时间戳 sub 此token的主题
-
私有声明。用户根据业务需求,添加自定义的key,如:
{"username":"user1"}
公有声明和私有声明同在一个字典中,转成json串并用base64加密,如:
import base64
payload = b'{"sub":"12345","username":"user1"}'
p_b64 = base64.urlsafe_b64encode(payload).replace(b'=',b'')
print(p_b64)
# b'eyJzdWIiOiIxMjM0NSIsInVzZXJuYW1lIjoidXNlcjEifQ'
2.3.3、Signature
Signature表示对header和payload数据签名。规则如下:
- 指定一个密钥secret,该密钥仅保存在服务器中,不向用户公开;
- 根据header中的alg确定算法,用自定义的key,对base64后的header+‘.’+base64后的payload进行hmac计算,结算结果再进行base64编码
示例如下:
import base64, hmac
header = b'{"alg":"HS256","typ":"JWT"}'
h_b64 = base64.urlsafe_b64encode(header).replace(b'=', b'')
payload = b'{"sub":"12345","username":"user1"}'
p_b64 = base64.urlsafe_b64encode(payload).replace(b'=', b'')
k=b'12345'
sinature = hmac.new(k, h_b64 + b'.' + p_b64, 'SHA256').digest()
s_b64 = base64.urlsafe_b64encode(sinature).replace(b'=', b'')
print(s_b64)
# b'bgN2jph3GCSThXOw6LbqyoEJUdlP3hzt1cTZPHe9fco'
最终结果如下:
h_b64 + b'.' + p_b64 + b'.' + s_b64 # b'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NSIsInVzZXJuYW1lIjoidXNlcjEifQ.bgN2jph3GCSThXOw6LbqyoEJUdlP3hzt1cTZPHe9fco'
查询JWT官网生成结果,两者一致:
2.5、pyjwt模块模块安装:pip install pyjwt
模块使用:
import jwt
res = jwt.encode({"sub": "12345", "username": "user1"},'12345','HS256')
# 第一个参数为payload,格式为字典
# 第二个参数为key,格式为str
# 第三个参数为algorithm,格式为str
# 返回值为token,str类型
print(res)
# eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NSIsInVzZXJuYW1lIjoidXNlcjEifQ.hsVFKCCeopFXTn4uDmKwRGzmwIvrXwZzHz0TR0UcuzU
token = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxMjM0NSIsInVzZXJuYW1lIjoidXNlcjEifQ.hsVFKCCeopFXTn4uDmKwRGzmwIvrXwZzHz0TR0UcuzU'
rep = jwt.decode(token, '12345', 'HS256')
# 返回值为payload,dict类型
print(rep)
# {'sub': '12345', 'username': 'user1'}
若payload中带了exp,token过期会抛出异常:ExpiredSignatureError。如:
import jwt,time
exp_time = time.time() + 10 # 10s后过期
token = jwt.encode({"sub": "12345", "username": "user1", "exp":exp_time},'12345','HS256')
time.sleep(5)
rep1 = jwt.decode(token, '12345', 'HS256') # 此时token未过期
print(rep1)
# {'sub': '12345', 'username': 'user1', 'exp': 1656930558.6011097}
time.sleep(10)
rep2 = jwt.decode(token, '12345', 'HS256') # 此时token已过期
print(rep2)
# jwt.exceptions.ExpiredSignatureError: Signature has expired
2.6、token示例
前后端分离中token的应用示例:
定义一个接口,发送接口请求之前需要先检查登录,登录状态由token保持:
一、前端通过flask部署,端口5000,两个URL:
http://127.0.0.1:5000/login
http://127.0.0.1:5000/interface/query_movies
from flask import Flask, send_file
app = Flask(__name__)
@app.route('/login')
def login():
#登录
return send_file('templates/login.html')
@app.route('/interface/query_movies')
def interface():
return send_file('templates/interface.html')
if __name__ == '__main__':
app.run(debug=True)
templates/login.html:
登陆 Login:
templates/interface.html:
Title
电影编号:
二、后端为Django:
user_token模块:
urls.py:
路由为:http:127.0.0.1:8000/token/login_token
from django.conf.urls import url
from . import views
urlpatterns = [
url(r'^/login_token$', views.login_token_view),
]
views函数:
import json
import time
from django.shortcuts import render
from django.http import JsonResponse
# Create your views here.
# 保存的用户名,实际使用时一般存在数据库中,密码hash加密
USERS = [
{"username": "wangwu", "password": "12345"},
{"username": "zhaoliu", "password": "67890"},
]
def login_token_view(request):
# 获取登录请求数据
data_str = request.body
data_obj = json.loads(data_str)
# 后端校验
username = data_obj.get('username')
password = data_obj.get('password')
# 用户名为空
if not username:
resp = {'code': 10001, 'error': 'username cannot be empty!'}
return JsonResponse(resp)
# 密码为空
if not password:
resp = {'code': 10002, 'error': 'password cannot be empty!'}
return JsonResponse(resp)
# 用户名或密码错误
if data_obj not in USERS:
resp = {'code': 10003, 'error': 'username or password is incorrect!'}
return JsonResponse(resp)
# 校验通过,创建token并返回
token = create_token(username)
resp = {'code': 200, 'username': username, 'data': {'token': token}}
return JsonResponse(resp)
def create_token(username, expire=60*60):
import jwt
key = 'myJwtKey'
now = time.time()
paylaod = {'username': username, 'exp': int(now + expire)}
return jwt.encode(paylaod, key, algorithm='HS256')
interface:
路由为:http:127.0.0.1:8000/interface/query_movies
from django.conf.urls import url
from . import views
urlpatterns = [
url(r'^/query_movies$', views.query_movies_view),
]
视图:
import json
import jwt
from django.shortcuts import render
from common import check_login
from django.http import JsonResponse
movies = [
{
'movieId':1,
'movieName':'肖申克的救赎'
},
{
'movieId': 2,
'movieName': '霸王别姬'
},
{
'movieId': 3,
'movieName': '藏龙卧虎'
}
]
@check_login.check_token
def query_movies_view(request):
data_str = request.body
data_obj = json.loads(data_str)
movie_id = data_obj.get("movieId")
# 后端校验
# 未输入movieId
if not movie_id:
return JsonResponse({"code": 200, "data": movies})
# movieId不为整数
try:
movie_id = int(data_obj.get("movieId"))
except ValueError as e:
return JsonResponse({"code": 20001, "error": "format error!"})
# movieId不存在
if movie_id not in [item.get('movieId') for item in movies]:
return JsonResponse({"code": 20002, "error": "movie not exist"})
# movieId存在
print(request.META)
for i in movies:
if movie_id == i.get("movieId"):
return JsonResponse({"code": 200, "data": [i]})
检查token装饰器:
from django.http import HttpResponse, HttpResponseRedirect, JsonResponse
import jwt
KEY = 'myJwtKey'
def check_token(func):
def wrapper(request, *args, **kwargs):
// 请求头信息一般包含贼META对象中,header为Authorization,META中key为HTTP_AUTHORIZATION
token = request.META.get('HTTP_AUTHORIZATION')
if not token:
return JsonResponse({"code": 20003, "error": "please login"})
try:
result = jwt.decode(token, KEY, algorithms='HS256')
except jwt.exceptions.ExpiredSignatureError as e:
# token过期
return JsonResponse({"code": 20004, "error": "login expired"})
except Exception as e:
print(e)
return JsonResponse({"code": 20005, "error": str(e)})
username = result.get('username')
# 给request对象增加user属性
request.user = username
return func(request, *args, **kwargs)
return wrapper
示例:
登录接口:
# req
{"username": "wangwu", "password":"12345"}
# res
{
"code": 200,
"username": "wangwu",
"data": {
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VybmFtZSI6Indhbmd3dSIsImV4cCI6MTY1ODIyNTI0MH0.Yiw_HT3w4WbZ8SpS98PxjlN8eEmjdQey-6K9OTSPXi4"
}
}
# req
{"username": "wangwu", "password":"123456"}
# res
{
"code": 10003,
"error": "username or password is incorrect!"
}
查询接口
# req
{"movieId": 4}
# token错误时res
{
"code": 20005,
"error": "Signature verification failed"
}
# 无token时res
{
"code": 20003,
"error": "please login"
}
# token正确
{
"code": 20002,
"error": "movie not exist"
}



