2022-11-14

flask

python

Posted by

applemango

今回はflaskでjwtを用いてaccess tokenとrefresh tokenを実装する

info

所々フォームが出てくるがflaskのサーバーが実行されていたら多分使える

準備

まずはいつものようにライブラリをインストールして

requirements.txt

Flask==2.1.3
Flask-Cors==3.0.10
Flask-JWT-Extended==4.4.2
Flask-SQLAlchemy==2.5.1

ライブラリを読み込んで、そこにコンフィグを入れて、今回使うJWTと諸事情で使うCORSを定義する、後は__name__ == '__main__'を入れた物が下のコードです

今回はこのコードを土台に作って行きます

import os, json
from flask import Flask, jsonify, request
from datetime import timedelta
from flask_cors import CORS, cross_origin
from flask_jwt_extended import create_access_token, get_jwt_identity, jwt_required, JWTManager, get_jwt_identity

basedir = os.path.abspath(os.path.dirname(__file__))
app = Flask(__name__, instance_relative_config=True)
app.config.from_mapping(
    SECRET_KEY = "secret"
    ,JWT_SECRET_KEY = "secret"
    ,JWT_ACCESS_TOKEN_EXPIRES = timedelta(minutes=15)
    ,JWT_REFRESH_TOKEN_EXPIRES = timedelta(hours=24)
    ,JSON_AS_ASCII = False
)
cors = CORS(app, responses={r"/*": {"origins": "*"}})
jwt = JWTManager(app)

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000)

アクセストークンを発行する

とりあえず、まずはアクセストークンを発行しよう

アクセストークンは

access_token = create_access_token(identity="ユーザ名とかユーザーidとか、トークンを見れば後から分かる")

で作る

下のコードはユーザネームとパスワードをpostされたときトークンを返すコードだ

今はまだ、dbを使わずにユーザー名がapple、パスワードがmangoの場合許可している

18

19

20

+

21

+

22

+

23

+

24

+

25

+

26

+

27

+

28

+

jwt = JWTManager(app)

@app.route('/login', methods=['POST'])
@cross_origin()
def create_token():
    username = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["username"]
    password = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["password"]
    if username != "apple" or password != "mango":
        return jsonify({"msg": "Incorrect password or username"}), 401
    access_token = create_access_token(identity=username)
    return jsonify(access_token=access_token)

試しにpostしてみるとアクセストークンが戻ってくるだろう

{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTY2ODQwNDA1NywianRpIjoiMDQyN2U5ZmEtZTVmMS00MTg2LTg3ZjUtOWZlOGVlNmZhYmE2IiwidHlwZSI6ImFjY2VzcyIsInN1YiI6ImFwcGxlIiwibmJmIjoxNjY4NDA0MDU3LCJleHAiOjE2Njg0MDQ5NTd9.XxoAnoG7huTrLVz0wFPn2zzpfkyyV3F1_KCSxbe-s5Y"
}

試しに下のを使ってみたら良いかもしれない、サーバーが実行されていたら多分使える、知らんけど

name = apple, password = mango

info

一応参考までにheaderとbodyを載せておく、postやgetにはaxiosを使っている

axiosでpostするコードは後述する

${name}や${password}は置き換えて欲しい、例えばこんな感じに -> {"username": "apple", "password": "mango"}

methods=POST

header={"Content-Type":"application/json"}

body={"username": "${name}", "password": "${password}"}

トークンを使ってみる

やはりトークンは使ってこそ意味がある

トークンでの認証を必要とするapiを作ろう

と言ってもデコレーターを一つ追加するだけで済む

@jwt_required()

これでこのデコレーターが付いてるapiはアクセストークンが無いとアクセスが出来ない

先ほどトークンを作成するときにidentityを設定下のを覚えているだろうか

identityも簡単に取得することが出来る

current_identity = get_jwt_identity()

これを組み合わせたらidentityを返すコードが書ける

28

29

30

+

31

+

32

+

33

+

34

+

35

+

    return jsonify(access_token=access_token)

@app.route("/user-data", methods=["GET"])
@jwt_required()
@cross_origin()
def need_token2():
    current_identity = get_jwt_identity()
    return jsonify({"user":current_identity}) # type: ignore

今回も一応テストするformを下に書いた

先ほど取得したアクセストークンは期限が15分しか無いので注意が必要だ

もし期限が切れていたら、無効なトークンだと、エラーを吐くだろう

info

前回同様、一応参考までにheaderとbodyを載せておく

methods=GET

header={"Content-Type":"application/json","Authorization":"Bearer ${token}"}

body={""}

これでtokenを入力すると

{
  "user": "apple"
}

と帰ってくるはずだ

これで基本的な使い方は終わりだ

詳しくは公式ドキュメントを見て欲しい

画像やファイルを認証する

jwtを使っていると画像のリクエストにもjwtを要求したい事もあるだろう

実はこれも簡単に実装出来る

まずはconfigを少し変えよう

10

11

12

13

+

14

15

16

17

app.config.from_mapping(
    SECRET_KEY = "secret"
    ,JWT_SECRET_KEY = "secret"
    ,JWT_TOKEN_LOCATION = ["headers", "query_string"]
    ,JWT_ACCESS_TOKEN_EXPIRES = timedelta(minutes=15)
    ,JWT_REFRESH_TOKEN_EXPIRES = timedelta(hours=24)
    ,JSON_AS_ASCII = False
)

それに画像を返せるようにもしましょう

2

3

-

4

+

5

6

7

import os, json
from flask import Flask, jsonify, request
from flask import Flask, jsonify, request, send_from_directory
from datetime import timedelta
from flask_cors import CORS, cross_origin
from flask_jwt_extended import create_access_token, get_jwt_identity, jwt_required, JWTManager, get_jwt_identity

デフォルトではheaderのtokenしか認証しないがこれでクエリ文字列(urlの?jwt=のやつ)も対象に加わった

後は普通に画像を返すプログラムを書けば終わりだ

@app.route("/files/<path>", methods=["GET"])
@cross_origin()
@jwt_required()
def send_image(path):
    return send_from_directory("",path)

send_from_directoryは適当なpathに変えて実際に存在するpathにアクセスすると画像が表示されるだろう

urlはこんな感じ

http://127.0.0.1:5000/files/test.png?jwt=exampletoken

warning

こんな方法を書いといて何ですが、この方法は推奨されません

それはブラウザーの履歴にjwtトークンが保存される可能性もありますし、jwtをサーバーに記録したらされるなど、

セキュリティ上の脆弱性が発生する可能性があります

今回はセキュリティを犠牲に一番簡単な方法を使用していますが、本番環境で使用する事は推奨されませんし、使用すべきではありません

解決策

署名付きurlを生成するライブラリ

リフレッシュトークンを付ける

なぜリフレッシュトークンを使用するのか

先ほどアクセストークンのみのを作りましたがそれには一つ欠点が有ります

アクセストークンは頻繁に使用します

それこそ画像を一枚取得するのにも一回、一回、トークンを送信します

セキュリティの事を考えたらあまり推奨されませんがURLにtokenを付ける、なんてことも有ります

そんな事をしていたらtokenがばれてしまうリスクが高い為あまり長時間使えません

多くの場合アクセストークンがばれてしまったら時間制限が終わるまで悪用されるリスクが有ります

その為30分程度使ったら更新しなければなりません

ですが15分や30分ごとに認証が必要なのも面倒です

15分ごとにパスワードを入力する必要があるredditやyoutubeを想像してみてください

動画を見るたびにログインしないといけません

それを解決するのがリフレッシュトークンです

アクセストークンとリフレッシュトークンの大きな違いが

アクセストークンはコンテンツなどの送信や取得したりするのに使用される期限が短いトークンですが

リフレッシュトークンは認証のみに使用される期限が長いトークンと言う大きな違いが有ります

その為アクセストークンとリフレッシュトークンは、分ける必要が有るのです

たまに期限が切れたトークンを送信すると期限が切れていないトークンが帰ってくるという物も有りますが、それは期限が無限のアクセストークンの為、使用すべきでは有りません、そんな物を使っているとセキュリティリスクを増大させます、JWTなどを使う場合トークンの期限が長い場合は必ずアクセストークンとリフレッシュトークンを使用すべきです

リフレッシュトークンの実装

さてリフレッシュトークンを実装していきましょう

5

6

-

7

+

8

+

9

+

from flask_cors import CORS, cross_origin
from flask_jwt_extended import create_access_token, get_jwt_identity, jwt_required, JWTManager, get_jwt_identity
from flask_jwt_extended import create_access_token, get_jwt_identity, jwt_required, JWTManager \
    , get_jwt,get_jwt_identity, current_user, create_refresh_token \
    , decode_token, get_current_user

リフレッシュトークンを使用する為にはいつくか追加でimportする必要がある、current_userdecode_tokenはまだ使わないがめんどくさいのでこの機にimportしておく

27

28

29

30

-

31

+

32

+

33

34

+

35

+

36

+

37

+

38

+

39

+

40

+

    if username != "apple" or password != "mango":
        return jsonify({"msg": "Incorrect password or username"}), 401
    access_token = create_access_token(identity=username)
    return jsonify(access_token=access_token)
    refresh_token = create_refresh_token(identity=username)
    return jsonify(access_token=access_token, refresh_token=refresh_token)

@app.route("/refresh", methods=["POST"])
@jwt_required(refresh=True)
@cross_origin()
def refresh():
    identity = get_jwt_identity()
    access_token = create_access_token(identity=identity)
    return jsonify(access_token=access_token)

今回新しく色々と追加されました

refresh_token = create_refresh_token(identity=identity="ユーザ名とかユーザーidとか、トークンを見れば後から分かる")

まずはcreate_refresh_tokenです

これはcreate_access_tokenとほぼ同じです、アクセストークンかリフレッシュトークンかの違いしか有りません

@jwt_required(refresh=True)

これは先ほどと同じデコレーターですが今回は引数にrefreshが指定されています

これはリフレッシュトークンを要求すると言う事を表しています、デフォルトではFalseです

Trueの場合リフレッシュトークンでしかアクセス出来ず、Falseの場合アクセストークンからしかアクセス出来ません

これで127.0.0.1:5000/loginにポストすると前回と違いアクセストークンも一緒に来ていると思います

例えばこんな感じの

{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTY2ODQxMTAyMywianRpIjoiZGJlMjI5NzUtOWYxNS00NGYyLWJkMDEtYWNkMzA3YThmNmRhIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6ImFwcGxlIiwibmJmIjoxNjY4NDExMDIzLCJleHAiOjE2Njg0MTE5MjN9.3u6l8jwWaBYr0pYTXFKx8pmyATNI2TAtFQgK3RWY1M0",
  "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTY2ODQxMTAyMywianRpIjoiY2QyMmRiZTAtOTg0MC00ZmQ2LWE0MzMtYTY0ODZhNzU2OThmIiwidHlwZSI6InJlZnJlc2giLCJzdWIiOiJhcHBsZSIsIm5iZiI6MTY2ODQxMTAyMywiZXhwIjoxNjY4NDk3NDIzfQ.XsGzDqssLtre-qMVO4DceoghrGnltyX2gUNVfTxWjYc"
}

上にも載せているが一応もう一度

info

methods=POST

header={"Content-Type":"application/json"}

body={"username": "${name}", "password": "${password}"}

リフレッシュ

info

methods=POST - NoBody

header={"Content-Type":"application/json","Authorization":"Bearer ${refresh_token}"}

リフレッシュすればアクセストークンが取得出来ます

{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTY2ODQxMjM4NCwianRpIjoiYTMzMWRjY2UtNjE2Zi00Y2ZjLThjOTMtMTIzMGM1ZmQyMjEzIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6ImFwcGxlIiwibmJmIjoxNjY4NDEyMzg0LCJleHAiOjE2Njg0MTMyODR9.EGeWj3koN8APQz8tET0d2ZoXY6TmpFLcXumObkVEfow"
}

これでリフレッシュトークンを作成できました

ログアウト

今まで見ていて何か不思議に思った事は無いだろうか

ログアウトが無い事に

なぜ今までログアウトが出てきていないかというと"ログインしている=有効なトークンを持っている"という状態だからトークンを破棄(保存していたトークンを捨てる)するだけでログアウトしている事と同義なのだ

だかアクセストークンは15分で消えるがリフレッシュトークンは有効期限が1年などの場合がある

いくらばれていないからと言っても一年も使えるトークンを放置しておくのはリスクがある

もしかしたらどこかに保存されている可能性も十二分に考えられる

その為トークンを無効化させる必要が有る

ただトークンを無効化させるのは現実的ではない

その為今回はトークンのブロックリストを作りそこに含まれていないトークンのみ使えるようにする

トークンブロックリストを作る為にはデータベースを作る必要がある

今回はdbを作るのにflask sqlalchemyを使う

データベースの準備

3度目のライブラリの追加して

8

9

+

10

+

11

+

    , decode_token, get_current_user
from sqlalchemy import func
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime, timezone

コンフィグを書き直して

20

21

+

22

+

23

+

24

    ,JSON_AS_ASCII = False
    ,SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
        'sqlite:///' + os.path.join(basedir, 'app.db')
    ,SQLALCHEMY_TRACK_MODIFICATIONS = False
)

定義を追加する

22

23

24

+

cors = CORS(app, responses={r"/*": {"origins": "*"}})
jwt = JWTManager(app)
db = SQLAlchemy(app)

これでデータベースの準備は終わった早速始めよう

ブロックリストの実装

まずはdbのテーブルを定義しよう

25

26

27

28

+

29

+

30

+

31

+

32

+

33

34

35

jwt = JWTManager(app)
db = SQLAlchemy(app)

class TokenBlocklist(db.Model): # type: ignore
    id = db.Column(db.Integer, primary_key=True)
    jti = db.Column(db.String(36), nullable=False, index=True)
    type = db.Column(db.String(16), nullable=False)
    created_at = db.Column(db.DateTime, server_default=func.now(), nullable=False)

@app.route('/login', methods=['POST'])
@cross_origin()

定義したらdbを作成する必要がある

>>>db.create_all()

これで作成出来た、詳しい事は下を見て欲しい

info

このプロジェクトがA:\abc\osaka\main\apiにある仮想環境(venv)で動いていると仮定した場合

適時変更してください

仮想環境にいない場合はpythonファイルがあるフォルダーに移動し

A:\abc\osaka\main\api>python

>>>from app import db

>>>db.create_all()

A:\abc>cd A:\abc\osaka\main\api

A:\abc\osaka\main\api>venv\Scripts\activate

(venv) A:\abc\osaka\main\api>python
Python 3.9.4 (tags/v3.9.4:*******, ****  ** ****, **:**:**) [MSC v.**** ** bit (******)] on ******
Type "help", "copyright", "credits" or "license" for more information.
>>>from app import db

>>>db.create_all()

>>>Ctrl + Z

(venv) A:\abc\osaka\main\api>

定義出来たら早速ログアウトを実装しよう

51

52

53

54

+

55

+

56

+

57

+

58

+

59

+

60

+

61

+

62

+

63

+

64

+

65

66

67

    access_token = create_access_token(identity=identity)
    return jsonify(access_token=access_token)

@app.route("/logout", methods=["DELETE"])
@jwt_required(verify_type=False)
@cross_origin()
def modify_token():
    token = get_jwt()
    jti = token["jti"]
    ttype = token["type"]
    now = datetime.now(timezone.utc)
    db.session.add(TokenBlocklist(jti=jti, type=ttype, created_at=now))
    db.session.commit()
    return jsonify(msg=f"{ttype.capitalize()} token successfully revoked")

@app.route("/user-data", methods=["GET"])
@jwt_required()

今回もjwt_requiredに新しい引数が追加された

@jwt_required(verify_type=False)

この新しい引数verify_typeはトークンの種類を区別するかしないかを指定する引数です

本来ならトークンの種類は考慮されますがこの引数を付けるとアクセストークンとリフレッシュトークンの両方からアクセスできるようになります

get_jwt()

get_jwtではtokenのデータを取得出来ます

今回はtokenに入ってるjtiと言う一意の値を使い判別する為tokenのデータを取得します

これでログアウトするとトークンがブロックリストに追加されるようになりました

ですがまだ完璧ではありません

まだトークンがブロックされたものかされていない物かを判別するプログラムを書いていません

書きましょう

書かないと今までの苦労が全て水の泡になってしまいます

幸いflask_jwt_extendedには便利なデコレーターがあります

@jwt.token_in_blocklist_loader

このデコレーターの付いた関数がFalseを返せば含まれておらずTrueを返せば含まれていると言う事を実装できます

Trueを返した場合は、自動的に無効なトークンとして処理されます

63

64

65

66

+

67

+

68

+

69

+

70

+

71

72

73

    db.session.commit()
    return jsonify(msg=f"{ttype.capitalize()} token successfully revoked")

@jwt.token_in_blocklist_loader
def token_block(_jwt_header, jwt_data):
    if TokenBlocklist.query.filter_by(jti=jwt_data["jti"]).first():
        return True
    return False

@app.route("/user-data", methods=["GET"])
@jwt_required()

これでログアウトが実装出来ました

確認しましょう

上手くいけば、ログアウトする前はリフレッシュできるが、ログアウト後だと

"Token has been revoked"

とエラーが出るだろう

ユーザを追加する

jwtはどこ行ったと言う感じではあるが

一応解説するタイミングがここしか無いのでここで解説しておく

まずは4度目のライブラリを追加しよう

(最初に全て入れとけば良かったと後悔し始めている)

(ついでに分割すれば良かったとも後悔している)

10

11

12

+

13

14

15

from flask_sqlalchemy import SQLAlchemy
from datetime import datetime, timezone
from werkzeug.security import generate_password_hash, check_password_hash

basedir = os.path.abspath(os.path.dirname(__file__))
app = Flask(__name__, instance_relative_config=True)

generate_password_hashはパスワードをハッシュ化するのに使う、check_password_hashはハッシュ化したパスワードが合っているか確認するのに使う

次はUserのデーブルを定義しよう

27

28

29

30

+

31

+

32

+

33

+

34

+

35

+

36

+

37

+

38

39

40

jwt = JWTManager(app)
db = SQLAlchemy(app)

class User(db.Model):  # type: ignore
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    name = db.Column(db.String(), nullable=False)
    password = db.Column(db.String())
    def set_password(self, password):
        self.password = generate_password_hash(password)
    def check_password(self, password):
        return check_password_hash(self.password, password)

class TokenBlocklist(db.Model): # type: ignore
    id = db.Column(db.Integer, primary_key=True)

後はdbを更新するだけだ

今回はdbをアップグレードするのが面倒なので一回リセットする

TokenBlockListを作った時のように

実行します

>>>from app import db
>>>db.drop_all()
>>>db.create_all()

これでテーブルを定義出来ました

後はユーザ登録できるようにし

42

43

44

45

+

46

+

47

+

48

+

49

+

50

+

51

+

52

+

53

+

54

+

55

+

56

+

57

+

58

+

59

+

60

+

61

+

62

+

63

64

65

    type = db.Column(db.String(16), nullable=False)
    created_at = db.Column(db.DateTime, server_default=func.now(), nullable=False)

@app.route('/register', methods=['POST'])
@cross_origin()
def register():
    username = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["username"]
    password = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["password"]
    if password is None or username is None or password == "" or username == "":
        return jsonify({"msg": "password or username has not been entered"}), 400
    if len(password) < 5:
        return jsonify({"msg": "password is too short"}), 400
    if len(username) < 3:
        return jsonify({"msg": "username is too short"}), 400
    if User.query.filter_by(name=username).first() is not None:
        return jsonify({"msg": "The username is already in use"}), 409
    user = User(name=username)
    user.set_password(password)
    db.session.add(user)
    db.session.commit()
    return jsonify("create account")

@app.route('/login', methods=['POST'])
@cross_origin()

ログインも出来るようにします

66

67

68

69

-

70

+

71

+

72

73

74

75

def create_token():
    username = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["username"]
    password = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["password"]
    if username != "apple" or password != "mango":
    user = User.query.filter_by(name=username).first()
    if user is None or not user.check_password(password):
        return jsonify({"msg": "Incorrect password or username"}), 401
    access_token = create_access_token(identity=username)
    refresh_token = create_refresh_token(identity=username)
    return jsonify(access_token=access_token, refresh_token=refresh_token)

これで出来ました

ユーザを作ってログインして見ましょう

ですが少し物足りないですね

いまはcurrent_identityでユーザー名しか取得出来ないですがそれ以外も取得出来るようにしましょう

99

100

101

102

+

103

+

104

+

105

+

106

107

108

        return True
    return False

@jwt.user_lookup_loader
def user_lookup_callback(_jwt_header, jwt_data):
    identity = jwt_data["sub"]
    return User.query.filter_by(name=identity).one_or_none()

@app.route("/user-data", methods=["GET"])
@jwt_required()

これでcurrent_userを使用してよりユーザーのデータベースのデータに簡単にアクセスできるようになりました

試しに何かデータを返す関数を作りましょう

111

112

113

114

+

115

+

116

+

117

+

118

+

119

120

121

    current_identity = get_jwt_identity()
    return jsonify({"user":current_identity}) # type: ignore

@app.route("/user-data-v2", methods=["GET"])
@jwt_required()
@cross_origin()
def need_tokenV2():
    return jsonify({"username":current_user.name,"id":current_user.id}) # type: ignore

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000)

これで出来ました

確認してみましょう

最終的なコード

import os, json
from flask import Flask, jsonify, request
from datetime import timedelta
from flask_cors import CORS, cross_origin
from flask_jwt_extended import create_access_token, get_jwt_identity, jwt_required, JWTManager \
    , get_jwt,get_jwt_identity, current_user, create_refresh_token \
    , decode_token, get_current_user
from sqlalchemy import func
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime, timezone
from werkzeug.security import generate_password_hash, check_password_hash

basedir = os.path.abspath(os.path.dirname(__file__))
app = Flask(__name__, instance_relative_config=True)
app.config.from_mapping(
    SECRET_KEY = "secret"
    ,JWT_SECRET_KEY = "secret"
    ,JWT_ACCESS_TOKEN_EXPIRES = timedelta(minutes=15)
    ,JWT_REFRESH_TOKEN_EXPIRES = timedelta(hours=24)
    ,JSON_AS_ASCII = False
    ,SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
        'sqlite:///' + os.path.join(basedir, 'app.db')
    ,SQLALCHEMY_TRACK_MODIFICATIONS = False
)
cors = CORS(app, responses={r"/*": {"origins": "*"}})
jwt = JWTManager(app)
db = SQLAlchemy(app)

class User(db.Model):  # type: ignore
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    name = db.Column(db.String(), nullable=False)
    password = db.Column(db.String())
    def set_password(self, password):
        self.password = generate_password_hash(password)
    def check_password(self, password):
        return check_password_hash(self.password, password)

class TokenBlocklist(db.Model): # type: ignore
    id = db.Column(db.Integer, primary_key=True)
    jti = db.Column(db.String(36), nullable=False, index=True)
    type = db.Column(db.String(16), nullable=False)
    created_at = db.Column(db.DateTime, server_default=func.now(), nullable=False)

@app.route('/register', methods=['POST'])
@cross_origin()
def register():
    username = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["username"]
    password = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["password"]
    if password is None or username is None or password == "" or username == "":
        return jsonify({"msg": "password or username has not been entered"}), 400
    if len(password) < 5:
        return jsonify({"msg": "password is too short"}), 400
    if len(username) < 3:
        return jsonify({"msg": "username is too short"}), 400
    if User.query.filter_by(name=username).first() is not None:
        return jsonify({"msg": "The username is already in use"}), 409
    user = User(name=username)
    user.set_password(password)
    db.session.add(user)
    db.session.commit()
    return jsonify("create account")

@app.route('/login', methods=['POST'])
@cross_origin()
def create_token():
    username = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["username"]
    password = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["password"]
    user = User.query.filter_by(name=username).first()
    if user is None or not user.check_password(password):
        return jsonify({"msg": "Incorrect password or username"}), 401
    access_token = create_access_token(identity=username)
    refresh_token = create_refresh_token(identity=username)
    return jsonify(access_token=access_token, refresh_token=refresh_token)

@app.route("/refresh", methods=["POST"])
@jwt_required(refresh=True)
@cross_origin()
def refresh():
    identity = get_jwt_identity()
    access_token = create_access_token(identity=identity)
    return jsonify(access_token=access_token)

@app.route("/logout", methods=["DELETE"])
@jwt_required(verify_type=False)
@cross_origin()
def modify_token():
    token = get_jwt()
    jti = token["jti"]
    ttype = token["type"]
    now = datetime.now(timezone.utc)
    db.session.add(TokenBlocklist(jti=jti, type=ttype, created_at=now))
    db.session.commit()
    return jsonify(msg=f"{ttype.capitalize()} token successfully revoked")

@jwt.token_in_blocklist_loader
def token_block(_jwt_header, jwt_data):
    if TokenBlocklist.query.filter_by(jti=jwt_data["jti"]).first():
        return True
    return False

@jwt.user_lookup_loader
def user_lookup_callback(_jwt_header, jwt_data):
    identity = jwt_data["sub"]
    return User.query.filter_by(name=identity).one_or_none()

@app.route("/user-data", methods=["GET"])
@jwt_required()
@cross_origin()
def need_token2():
    current_identity = get_jwt_identity()
    return jsonify({"user":current_identity}) # type: ignore

@app.route("/user-data-v2", methods=["GET"])
@jwt_required()
@cross_origin()
def need_tokenV2():
    return jsonify({"username":current_user.name,"id":current_user.id}) # type: ignore

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000)

これで完成しました

最後に今まで作った物を試して見ましょう

最終的に作ったapi

ユーザ登録

ログイン

トークンのリフレッシュ

ログアウト

ユーザー情報の確認

このドキュメントどう?

emoji
emoji
emoji
emoji