Skip to content

credentials: JwtAuthorizationCredentials = Security(access_security) ): It also allows refresh_ Security access #14

@Leewinner1

Description

@Leewinner1

def get_current_user(
credentials: JwtAuthorizationCredentials = Security(access_security)
)

if not credentials:
    raise HTTPException(status_code=401, detail='error')


return credentials.subject

jwt.md

# 使用python-jose来生成jwt,验证jwt,获取当前用户的方法

# 生成token
# def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
import os
from datetime import datetime, timedelta

from fastapi_jwt import JwtRefreshBearer, JwtAuthorizationCredentials, JwtAccessBearer

from fastapi import Security, HTTPException
from dotenv import load_dotenv

load_dotenv()
secret_key = os.getenv("SECRET_KEY", 'leees')

access_token_expires = int(os.getenv("JWT_EXPIRE_TIME", 7))
refresh_token_expires = int(os.getenv("JWT_REFRESH_TIME", 30))


access_security = JwtAccessBearer(
    secret_key=secret_key,
    auto_error=True,
    # change access token validation timedelta
    access_expires_delta=timedelta(days=access_token_expires)
)


# Read refresh token from bearer header only
refresh_security = JwtRefreshBearer(
    secret_key=secret_key,
    auto_error=True,  # automatically raise HTTPException: HTTP_401_UNAUTHORIZED
    refresh_expires_delta=timedelta(days=refresh_token_expires)
)


def create_token(data: dict):
    return access_security.create_access_token(subject=data)


def create_refresh_token(data: dict):
    return refresh_security.create_refresh_token(subject=data)

# 创建同时返回access_token和refresh_token的方法


def create_tokens_refresh(data: dict):
    access_token = access_security.create_access_token(subject=data)
    refresh_token = refresh_security.create_refresh_token(subject=data)
    return {"access_token": access_token, "refresh_token": refresh_token}

# 刷新token


def refresh(
        credentials: JwtAuthorizationCredentials = Security(refresh_security)
):
    # Update access/refresh tokens pair
    # We can customize expires_delta when creating
    access_token = access_security.create_access_token(
        subject=credentials.subject)
    refresh_token = refresh_security.create_refresh_token(
        subject=credentials.subject, expires_delta=timedelta(days=2))

    return {"access_token": access_token, "refresh_token": refresh_token}


def get_current_user(
        credentials: JwtAuthorizationCredentials = Security(access_security)
):

    # auto_error=False, fo we should check manually

    if not credentials:
        raise HTTPException(status_code=401, detail='error')

    # now we can access Credentials object
    return credentials.subject

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions