JWT(JSON-Web-Token),是一种开发的行业标准RFC 7519 ,用于安全的表示双方之间的声明。目前,jwt广泛应用在系统的用户认证方面,特别是现在前后端分离项目。

1. jwt认证流程

在项目开发中,一般会按照上图所示的过程进行认证,即:用户登录成功之后,服务端给用户浏览器返回一个token,以后用户浏览器要携带token再去向服务端发送请求,服务端校验token的合法性,合法则给用户看数据,否则返回一些错误信息。

  • 传统token方式和jwt在认证方面有什么差异?

    • 传统token

      用户登录成功后,服务端生成一个随机token给用户,并且在服务端(数据库或缓存)中保存一份token,以后用户再来访问时需携带token,服务端接收到token之后,去数据库或缓存中进行校验token的是否超时、是否合法。

    • jwt

      用户登录成功后,服务端通过jwt生成一个随机token给用户(服务端无需保留token),以后用户再来访问时需携带token,服务端接收到token之后,通过jwt对token进行校验是否超时、是否合法。

2. 创建token

2.1 原理

jwt的生成token格式如下,即:由 . 连接的三段字符串组成。

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

生成规则如下:

  • 第一段HEADER部分,固定包含算法和token类型,对此json进行base64url加密,这就是token的第一段。

{
  "alg": "HS256",
  "typ": "JWT"
}
  • 第二段PAYLOAD部分,包含一些数据,对此json进行base64url加密,这就是token的第二段。

{
  "sub": "1234567890",
  "name": "John Doe",
  "iat": 1516239022
  ...
}
  • 第三段SIGNATURE部分,把前两段的base密文通过.拼接起来,然后对其进行HS256加密,再然后对hs256密文进行base64url加密,最终得到token的第三段。

base64url(
    HMACSHA256(
      base64UrlEncode(header) + "." + base64UrlEncode(payload),
      your-256-bit-secret (秘钥加盐)
    )
)

最后将三段字符串通过 .拼接起来就生成了jwt的token。

注意:base64url加密是先做base64加密,然后再将 - 替代 +_ 替代 /

2.2 代码实现

基于Python的pyjwt模块创建jwt的token。

  • 安装

    pip3 install pyjwt

  • 构建token

import jwt
import datetime

# 随机字符串作为算法加密盐
SALT = XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

def create_token():
    


    # 构建headers
    headers = {
        'typ': 'jwt',
        'alg': 'HS256'
    }

    # 构建载荷
    payload = {
        'user_id': 1, # 自定义用户ID
        'username': 'xinchen.luan', # 自定义用户名
        'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=60) # token有效期
    }

    # 构建token
    token = jwt.encode(payload=payload, key=SALT, algorithm='HS256', headers=headers)

    return token

3. 校验token

一般在认证成功后,把jwt生成的token返回给用户,以后用户再次访问时候需要携带token,此时jwt需要对token进行超时合法性校验。

获取token之后,会按照以下步骤进行校验:

  • 将token分割成 header_segmentpayload_segmentcrypto_segment 三部分

  • 对第一部分header_segment进行base64url解密,得到header

  • 对第二部分payload_segment进行base64url解密,得到payload

  • 对第三部分crypto_segment进行base64url解密,得到signature

  • 对第三部分signature部分数据进行合法性校验

    • 拼接前两段密文,即:signing_input

    • 从第一段明文中获取加密算法,默认:HS256

    • 使用 算法+盐 对signing_input 进行加密,将得到的结果和signature密文进行比较。

示例代码:

import jwt
import datetime
from jwt import exceptions

SALT = XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 

def get_payload(token):
    """
    根据token获取payload
    :param token: jwt
    :return:
    """
    try:
        # 从token中获取payload【不校验合法性】
        # unverified_payload = jwt.decode(token, None, False)
        # print(unverified_payload)
 
        # 从token中获取payload【校验合法性】
        verified_payload = jwt.decode(token, SALT, True)
        return verified_payload
    # 捕获有效期失效错误
    except exceptions.ExpiredSignatureError:
        print('token已失效')
    # 解密校验失败
    except jwt.DecodeError:
        print('token认证失败')
    # 合法性校验失败
    except jwt.InvalidTokenError:
        print('非法token')
         
if __name__ == '__main__':
    token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1NzM1NTU1NzksInVzZXJuYW1lIjoid3VwZWlxaSIsInVzZXJfaWQiOjF9.xj-7qSts6Yg5Ui55-aUOHJS4KSaeLq5weXMui2IIEJU"
    payload = get_payload(token)

4. DRF+JWT实践案例

在用户登录成功之后,生成token并返回,用户再次来访问时需携带token。

此示例在drf的认证组件中对token进行校验,支持用户通过Authorization请求头传递token。

  • 首先,编写一个工具类给login视图用来创建token

from django.conf import settings
import jwt
import datetime


def create_token(payload, timeout=1):
    """
    创建JsonWebToken
    :param payload: jwt明文载荷,一般传入id、username字典,切勿传入涉密信息
    :param timeout: jwt有效期,单位分钟
    :return: 完整Token
    """
    salt = settings.SECRET_KEY
    headers = {
        'typ': 'jwt',
        'alg': 'HS256'
    }
    payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(minutes=timeout)
    token = jwt.encode(payload=payload, key=salt, algorithm='HS256', headers=headers)

    return token
  • 然后编写JWT请求头认证组件

from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
import jwt
from jwt import exceptions
from django.conf import settings
from django.contrib.auth import get_user_model
from api.extensions import return_code

User = get_user_model()


class JwtHeadersAuthentication(BaseAuthentication):

    def authenticate(self, request):
        """
        自定义JWT请求头认证方法
        :param request:
        :return: (request.user,request.auth)
        """
        # 获取请求头认证信息
        http_authorization = request.META.get('HTTP_AUTHORIZATION')
        salt = settings.SECRET_KEY

        # 请求头不含认证信息,抛出请求异常
        if not http_authorization:
            raise AuthenticationFailed({'code': return_code.INVALID_REQUEST, 'error': '非法请求'})

        # 拆分Bearer和Token主体
        method, token = http_authorization.split(' ')
        try:
            # Token解密
            payload = jwt.decode(jwt=token, key=salt, algorithms="HS256")
        except exceptions.ExpiredSignatureError:
            # Token过期
            raise AuthenticationFailed({'code': return_code.EXPIRED_TOKEN, 'error': "Token已失效"})
        except jwt.DecodeError:
            # Token解密失败
            raise AuthenticationFailed({'code': return_code.DECODE_FAILED_TOKEN, 'error': "Token不存在"})
        except jwt.InvalidTokenError:
            # JWT格式错误
            raise AuthenticationFailed({'code': return_code.INVALID_TOKEN, 'error': "非法Token"})

        # 根据payload用户信息获取User对象
        user_obj = User.objects.filter(id=payload['id']).first()

        return (user_obj, token)

    def authenticate_header(self, request):
        return 'Bearer Token'
  • 在login视图中调用jwt工具类

from api.utils.jwt_auth import create_token

class LoginViewSet(CreateModelMixin, viewsets.GenericViewSet):
    queryset = User.objects.all()
    authentication_classes = []
    serializer_class = LoginSerializer

    def create(self, request, *args, **kwargs):
        serializer = self.get_serializer(data=request.data)
        # serializer.is_valid(raise_exception=True)
        if not serializer.is_valid():
            return Response({'code': return_code.USR_NOT_FOUND, 'error': serializer.errors})

        username, password = serializer.validated_data.values()

        user_obj = auth.authenticate(request, username=username, password=password)

        if not user_obj:
            return Response({'code': return_code.AUTH_FAILED, 'error': '认证失败,请检查账号密码是否正确。'})
        auth.login(request, user_obj)
        # 创建token,传入userid和username作为载荷,设置token有效期1天
        token = create_token(payload={'id': user_obj.id, 'name': user_obj.username}, timeout=60 * 24)
        return Response(
            {'code': return_code.SUCCESS, 'results': {'id': user_obj.id, 'name': user_obj.display_name, 'token': token}})
  • 在DRF settings中应用JWT认证组件

REST_FRAMEWORK = {
    .....
    # 认证类
    'DEFAULT_AUTHENTICATION_CLASSES': ('api.extensions.authentication.JwtHeadersAuthentication',)
}
  • 查看效果

    • 未携带token时,拒绝请求

    • 登录获取token

    • 携带token,成功获取数据