PassLib "컨텍스트(context)"를 생성합니다. 이것은 패스워드를 해싱하고 검증하는데 사용합니다.
팁
PassLib 컨텍스트는 다양한 해싱 알고리즘을 사용할 수 있는 기능을 제공하며, 더 이상 사용이 권장되지 않는 오래된 해싱 알고리즘을 검증하는 기능도 포함되어 있습니다.
예를 들어, 다른 시스템(Django 같은)에서 생성한 패스워드를 읽고 검증할 수 있으며, 새로운 패스워드를 Bcrypt 같은 다른 알고리즘으로 해싱할 수도 있습니다.
그리고 동시에 그런 모든 알고리즘과 호환성을 유지합니다.
사용자로부터 받은 패스워드를 해싱하는 유틸리티 함수를 생성합니다.
그리고 받은 패스워드가 저장된 해시와 일치하는지 검증하는 또 다른 유틸리티 함수도 생성합니다.
그리고 사용자를 인증하고 반환하는 또 다른 함수도 생성합니다.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
🤓 Other versions and variants
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModelfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]
Note
새로운 (가짜) 데이터베이스 fake_users_db를 확인하면, 해시 처리된 패스워드가 어떻게 생겼는지 볼 수 있습니다: "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW".
그리고 생성한 비밀키를 복사해 변수 SECRET_KEY에 대입합니다. (이 예제의 변수 값을 그대로 사용하지 마십시오.)
JWT 토큰을 서명하는 데 사용될 알고리즘을 위한 변수 ALGORITHM 을 생성하고 "HS256" 으로 설정합니다.
토큰 만료 기간을 위한 변수를 생성합니다.
응답을 위한 토큰 엔드포인트에 사용될 Pydantic 모델을 정의합니다.
새 액세스 토큰을 생성하기 위한 유틸리티 함수를 생성합니다.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
🤓 Other versions and variants
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModelfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]
get_current_user 함수를 이전과 동일한 토큰을 받도록 수정하되, 이번에는 JWT 토큰을 사용하도록 합니다.
받은 토큰을 디코딩하여 검증한 후 현재 사용자를 반환합니다.
토큰이 유효하지 않다면 HTTP 오류를 반환합니다.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
🤓 Other versions and variants
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModelfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
🤓 Other versions and variants
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotated,UnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModelfromtyping_extensionsimportAnnotated# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]
Tip
Prefer to use the Annotated version if possible.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportUnionimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompasslib.contextimportCryptContextfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Union[str,None]=NoneclassUser(BaseModel):username:stremail:Union[str,None]=Nonefull_name:Union[str,None]=Nonedisabled:Union[bool,None]=NoneclassUserInDB(User):hashed_password:strpwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Union[timedelta,None]=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/",response_model=User)asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]
지금까지 살펴본 내용을 바탕으로, OAuth2와 JWT 같은 표준을 사용하여 안전한 FastAPI 애플리케이션을 만들 수 있습니다.
거의 모든 프레임워크에서 보안 처리는 상당히 복잡한 주제입니다.
이를 단순화하는 많은 패키지는 데이터 모델, 데이터베이스, 사용 가능한 기능들에 대해 여러 제약이 있습니다. 그리고 지나치게 단순화하는 일부 패키지들은 심각한 보안 결함을 가질 수도 있습니다.
FastAPI 는 어떤 데이터베이스, 데이터 모델, 도구도 강요하지 않습니다.
프로젝트에 가장 적합한 것을 선택할 수 있는 유연성을 제공합니다.
그리고 passlib 와 PyJWT 처럼 잘 관리되고 널리 사용되는 패키지들을 바로 사용할 수 있습니다. FastAPI 는 외부 패키지 통합을 위해 복잡한 메커니즘이 필요하지 않기 때문입니다.
그러나 유연성, 견고성, 보안성을 해치지 않으면서 과정을 단순화할 수 있는 도구들을 제공합니다.
그리고 OAuth2와 같은 표준 프로토콜을 비교적 간단한 방법으로 구현하고 사용할 수 있습니다.
더 세분화된 권한 체계를 위해 OAuth2의 "스코프"를 사용하는 방법은 심화 사용자 안내서에서 더 자세히 배울 수 있습니다. OAuth2의 스코프는 제3자 애플리케이션이 사용자를 대신해 그들의 API와 상호작용하도록 권한을 부여하기 위해, Facebook, Google, GitHub, Microsoft, Twitter 등의 많은 대형 인증 제공업체들이 사용하는 메커니즘입니다.