2022-11-14
Posted by
今回はflaskでjwtを用いてaccess tokenとrefresh tokenを実装する
info
所々フォームが出てくるがflaskのサーバーが実行されていたら多分使える
準備
まずはいつものようにライブラリをインストールして
requirements.txt
Flask==2.1.3
Flask-Cors==3.0.10
Flask-JWT-Extended==4.4.2
Flask-SQLAlchemy==2.5.1
ライブラリを読み込んで、そこにコンフィグを入れて、今回使うJWTと諸事情で使うCORSを定義する、後は__name__ == '__main__'
を入れた物が下のコードです
今回はこのコードを土台に作って行きます
import os, json
from flask import Flask, jsonify, request
from datetime import timedelta
from flask_cors import CORS, cross_origin
from flask_jwt_extended import create_access_token, get_jwt_identity, jwt_required, JWTManager, get_jwt_identity
basedir = os.path.abspath(os.path.dirname(__file__))
app = Flask(__name__, instance_relative_config=True)
app.config.from_mapping(
SECRET_KEY = "secret"
,JWT_SECRET_KEY = "secret"
,JWT_ACCESS_TOKEN_EXPIRES = timedelta(minutes=15)
,JWT_REFRESH_TOKEN_EXPIRES = timedelta(hours=24)
,JSON_AS_ASCII = False
)
cors = CORS(app, responses={r"/*": {"origins": "*"}})
jwt = JWTManager(app)
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
アクセストークンを発行する
とりあえず、まずはアクセストークンを発行しよう
アクセストークンは
access_token = create_access_token(identity="ユーザ名とかユーザーidとか、トークンを見れば後から分かる")
で作る
下のコードはユーザネームとパスワードをpostされたときトークンを返すコードだ
今はまだ、dbを使わずにユーザー名がapple、パスワードがmangoの場合許可している
18
19
20
+
21
+
22
+
23
+
24
+
25
+
26
+
27
+
28
+
jwt = JWTManager(app)
@app.route('/login', methods=['POST'])
@cross_origin()
def create_token():
username = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["username"]
password = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["password"]
if username != "apple" or password != "mango":
return jsonify({"msg": "Incorrect password or username"}), 401
access_token = create_access_token(identity=username)
return jsonify(access_token=access_token)
試しにpostしてみるとアクセストークンが戻ってくるだろう
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTY2ODQwNDA1NywianRpIjoiMDQyN2U5ZmEtZTVmMS00MTg2LTg3ZjUtOWZlOGVlNmZhYmE2IiwidHlwZSI6ImFjY2VzcyIsInN1YiI6ImFwcGxlIiwibmJmIjoxNjY4NDA0MDU3LCJleHAiOjE2Njg0MDQ5NTd9.XxoAnoG7huTrLVz0wFPn2zzpfkyyV3F1_KCSxbe-s5Y"
}
試しに下のを使ってみたら良いかもしれない、サーバーが実行されていたら多分使える、知らんけど
name = apple, password = mango
info
一応参考までにheaderとbodyを載せておく、postやgetにはaxiosを使っている
axiosでpostするコードは後述する
${name}や${password}は置き換えて欲しい、例えばこんな感じに -> {"username": "apple", "password": "mango"}
methods=POST
header={"Content-Type":"application/json"}
body={"username": "${name}", "password": "${password}"}
トークンを使ってみる
やはりトークンは使ってこそ意味がある
トークンでの認証を必要とするapiを作ろう
と言ってもデコレーターを一つ追加するだけで済む
@jwt_required()
これでこのデコレーターが付いてるapiはアクセストークンが無いとアクセスが出来ない
先ほどトークンを作成するときにidentityを設定下のを覚えているだろうか
identityも簡単に取得することが出来る
current_identity = get_jwt_identity()
これを組み合わせたらidentityを返すコードが書ける
28
29
30
+
31
+
32
+
33
+
34
+
35
+
return jsonify(access_token=access_token)
@app.route("/user-data", methods=["GET"])
@jwt_required()
@cross_origin()
def need_token2():
current_identity = get_jwt_identity()
return jsonify({"user":current_identity}) # type: ignore
今回も一応テストするformを下に書いた
先ほど取得したアクセストークンは期限が15分しか無いので注意が必要だ
もし期限が切れていたら、無効なトークンだと、エラーを吐くだろう
info
前回同様、一応参考までにheaderとbodyを載せておく
methods=GET
header={"Content-Type":"application/json","Authorization":"Bearer ${token}"}
body={""}
これでtokenを入力すると
{
"user": "apple"
}
と帰ってくるはずだ
これで基本的な使い方は終わりだ
詳しくは公式ドキュメントを見て欲しい
画像やファイルを認証する
jwtを使っていると画像のリクエストにもjwtを要求したい事もあるだろう
実はこれも簡単に実装出来る
まずはconfigを少し変えよう
10
11
12
13
+
14
15
16
17
app.config.from_mapping(
SECRET_KEY = "secret"
,JWT_SECRET_KEY = "secret"
,JWT_TOKEN_LOCATION = ["headers", "query_string"]
,JWT_ACCESS_TOKEN_EXPIRES = timedelta(minutes=15)
,JWT_REFRESH_TOKEN_EXPIRES = timedelta(hours=24)
,JSON_AS_ASCII = False
)
それに画像を返せるようにもしましょう
2
3
-
4
+
5
6
7
import os, json
from flask import Flask, jsonify, request
from flask import Flask, jsonify, request, send_from_directory
from datetime import timedelta
from flask_cors import CORS, cross_origin
from flask_jwt_extended import create_access_token, get_jwt_identity, jwt_required, JWTManager, get_jwt_identity
デフォルトではheaderのtokenしか認証しないがこれでクエリ文字列(urlの?jwt=のやつ)も対象に加わった
後は普通に画像を返すプログラムを書けば終わりだ
@app.route("/files/<path>", methods=["GET"])
@cross_origin()
@jwt_required()
def send_image(path):
return send_from_directory("",path)
send_from_directoryは適当なpathに変えて実際に存在するpathにアクセスすると画像が表示されるだろう
urlはこんな感じ
http://127.0.0.1:5000/files/test.png?jwt=exampletoken
warning
こんな方法を書いといて何ですが、この方法は推奨されません
それはブラウザーの履歴にjwtトークンが保存される可能性もありますし、jwtをサーバーに記録したらされるなど、
セキュリティ上の脆弱性が発生する可能性があります
今回はセキュリティを犠牲に一番簡単な方法を使用していますが、本番環境で使用する事は推奨されませんし、使用すべきではありません
解決策
署名付きurlを生成するライブラリ
リフレッシュトークンを付ける
なぜリフレッシュトークンを使用するのか
先ほどアクセストークンのみのを作りましたがそれには一つ欠点が有ります
アクセストークンは頻繁に使用します
それこそ画像を一枚取得するのにも一回、一回、トークンを送信します
セキュリティの事を考えたらあまり推奨されませんがURLにtokenを付ける、なんてことも有ります
そんな事をしていたらtokenがばれてしまうリスクが高い為あまり長時間使えません
多くの場合アクセストークンがばれてしまったら時間制限が終わるまで悪用されるリスクが有ります
その為30分程度使ったら更新しなければなりません
ですが15分や30分ごとに認証が必要なのも面倒です
15分ごとにパスワードを入力する必要があるredditやyoutubeを想像してみてください
動画を見るたびにログインしないといけません
それを解決するのがリフレッシュトークンです
アクセストークンとリフレッシュトークンの大きな違いが
アクセストークンはコンテンツなどの送信や取得したりするのに使用される期限が短いトークンですが
リフレッシュトークンは認証のみに使用される期限が長いトークンと言う大きな違いが有ります
その為アクセストークンとリフレッシュトークンは、分ける必要が有るのです
たまに期限が切れたトークンを送信すると期限が切れていないトークンが帰ってくるという物も有りますが、それは期限が無限のアクセストークンの為、使用すべきでは有りません、そんな物を使っているとセキュリティリスクを増大させます、JWTなどを使う場合トークンの期限が長い場合は必ずアクセストークンとリフレッシュトークンを使用すべきです
リフレッシュトークンの実装
さてリフレッシュトークンを実装していきましょう
5
6
-
7
+
8
+
9
+
from flask_cors import CORS, cross_origin
from flask_jwt_extended import create_access_token, get_jwt_identity, jwt_required, JWTManager, get_jwt_identity
from flask_jwt_extended import create_access_token, get_jwt_identity, jwt_required, JWTManager \
, get_jwt,get_jwt_identity, current_user, create_refresh_token \
, decode_token, get_current_user
リフレッシュトークンを使用する為にはいつくか追加でimportする必要がある、current_user
やdecode_token
はまだ使わないがめんどくさいのでこの機にimportしておく
27
28
29
30
-
31
+
32
+
33
34
+
35
+
36
+
37
+
38
+
39
+
40
+
if username != "apple" or password != "mango":
return jsonify({"msg": "Incorrect password or username"}), 401
access_token = create_access_token(identity=username)
return jsonify(access_token=access_token)
refresh_token = create_refresh_token(identity=username)
return jsonify(access_token=access_token, refresh_token=refresh_token)
@app.route("/refresh", methods=["POST"])
@jwt_required(refresh=True)
@cross_origin()
def refresh():
identity = get_jwt_identity()
access_token = create_access_token(identity=identity)
return jsonify(access_token=access_token)
今回新しく色々と追加されました
refresh_token = create_refresh_token(identity=identity="ユーザ名とかユーザーidとか、トークンを見れば後から分かる")
まずはcreate_refresh_token
です
これはcreate_access_token
とほぼ同じです、アクセストークンかリフレッシュトークンかの違いしか有りません
@jwt_required(refresh=True)
これは先ほどと同じデコレーターですが今回は引数にrefreshが指定されています
これはリフレッシュトークンを要求すると言う事を表しています、デフォルトではFalseです
Trueの場合リフレッシュトークンでしかアクセス出来ず、Falseの場合アクセストークンからしかアクセス出来ません
これで127.0.0.1:5000/login
にポストすると前回と違いアクセストークンも一緒に来ていると思います
例えばこんな感じの
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTY2ODQxMTAyMywianRpIjoiZGJlMjI5NzUtOWYxNS00NGYyLWJkMDEtYWNkMzA3YThmNmRhIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6ImFwcGxlIiwibmJmIjoxNjY4NDExMDIzLCJleHAiOjE2Njg0MTE5MjN9.3u6l8jwWaBYr0pYTXFKx8pmyATNI2TAtFQgK3RWY1M0",
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTY2ODQxMTAyMywianRpIjoiY2QyMmRiZTAtOTg0MC00ZmQ2LWE0MzMtYTY0ODZhNzU2OThmIiwidHlwZSI6InJlZnJlc2giLCJzdWIiOiJhcHBsZSIsIm5iZiI6MTY2ODQxMTAyMywiZXhwIjoxNjY4NDk3NDIzfQ.XsGzDqssLtre-qMVO4DceoghrGnltyX2gUNVfTxWjYc"
}
上にも載せているが一応もう一度
info
methods=POST
header={"Content-Type":"application/json"}
body={"username": "${name}", "password": "${password}"}
リフレッシュ
info
methods=POST - NoBody
header={"Content-Type":"application/json","Authorization":"Bearer ${refresh_token}"}
リフレッシュすればアクセストークンが取得出来ます
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTY2ODQxMjM4NCwianRpIjoiYTMzMWRjY2UtNjE2Zi00Y2ZjLThjOTMtMTIzMGM1ZmQyMjEzIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6ImFwcGxlIiwibmJmIjoxNjY4NDEyMzg0LCJleHAiOjE2Njg0MTMyODR9.EGeWj3koN8APQz8tET0d2ZoXY6TmpFLcXumObkVEfow"
}
これでリフレッシュトークンを作成できました
ログアウト
今まで見ていて何か不思議に思った事は無いだろうか
ログアウトが無い事に
なぜ今までログアウトが出てきていないかというと"ログインしている=有効なトークンを持っている"という状態だからトークンを破棄(保存していたトークンを捨てる)するだけでログアウトしている事と同義なのだ
だかアクセストークンは15分で消えるがリフレッシュトークンは有効期限が1年などの場合がある
いくらばれていないからと言っても一年も使えるトークンを放置しておくのはリスクがある
もしかしたらどこかに保存されている可能性も十二分に考えられる
その為トークンを無効化させる必要が有る
ただトークンを無効化させるのは現実的ではない
その為今回はトークンのブロックリストを作りそこに含まれていないトークンのみ使えるようにする
トークンブロックリストを作る為にはデータベースを作る必要がある
今回はdbを作るのにflask sqlalchemyを使う
データベースの準備
3度目のライブラリの追加して
8
9
+
10
+
11
+
, decode_token, get_current_user
from sqlalchemy import func
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime, timezone
コンフィグを書き直して
20
21
+
22
+
23
+
24
,JSON_AS_ASCII = False
,SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'app.db')
,SQLALCHEMY_TRACK_MODIFICATIONS = False
)
定義を追加する
22
23
24
+
cors = CORS(app, responses={r"/*": {"origins": "*"}})
jwt = JWTManager(app)
db = SQLAlchemy(app)
これでデータベースの準備は終わった早速始めよう
ブロックリストの実装
まずはdbのテーブルを定義しよう
25
26
27
28
+
29
+
30
+
31
+
32
+
33
34
35
jwt = JWTManager(app)
db = SQLAlchemy(app)
class TokenBlocklist(db.Model): # type: ignore
id = db.Column(db.Integer, primary_key=True)
jti = db.Column(db.String(36), nullable=False, index=True)
type = db.Column(db.String(16), nullable=False)
created_at = db.Column(db.DateTime, server_default=func.now(), nullable=False)
@app.route('/login', methods=['POST'])
@cross_origin()
定義したらdbを作成する必要がある
>>>db.create_all()
これで作成出来た、詳しい事は下を見て欲しい
info
このプロジェクトがA:\abc\osaka\main\apiにある仮想環境(venv)で動いていると仮定した場合
適時変更してください
仮想環境にいない場合はpythonファイルがあるフォルダーに移動し
A:\abc\osaka\main\api>python
>>>from app import db
>>>db.create_all()
A:\abc>cd A:\abc\osaka\main\api
A:\abc\osaka\main\api>venv\Scripts\activate
(venv) A:\abc\osaka\main\api>python
Python 3.9.4 (tags/v3.9.4:*******, **** ** ****, **:**:**) [MSC v.**** ** bit (******)] on ******
Type "help", "copyright", "credits" or "license" for more information.
>>>from app import db
>>>db.create_all()
>>>Ctrl + Z
(venv) A:\abc\osaka\main\api>
定義出来たら早速ログアウトを実装しよう
51
52
53
54
+
55
+
56
+
57
+
58
+
59
+
60
+
61
+
62
+
63
+
64
+
65
66
67
access_token = create_access_token(identity=identity)
return jsonify(access_token=access_token)
@app.route("/logout", methods=["DELETE"])
@jwt_required(verify_type=False)
@cross_origin()
def modify_token():
token = get_jwt()
jti = token["jti"]
ttype = token["type"]
now = datetime.now(timezone.utc)
db.session.add(TokenBlocklist(jti=jti, type=ttype, created_at=now))
db.session.commit()
return jsonify(msg=f"{ttype.capitalize()} token successfully revoked")
@app.route("/user-data", methods=["GET"])
@jwt_required()
今回もjwt_required
に新しい引数が追加された
@jwt_required(verify_type=False)
この新しい引数verify_typeはトークンの種類を区別するかしないかを指定する引数です
本来ならトークンの種類は考慮されますがこの引数を付けるとアクセストークンとリフレッシュトークンの両方からアクセスできるようになります
get_jwt()
get_jwtではtokenのデータを取得出来ます
今回はtokenに入ってるjtiと言う一意の値を使い判別する為tokenのデータを取得します
これでログアウトするとトークンがブロックリストに追加されるようになりました
ですがまだ完璧ではありません
まだトークンがブロックされたものかされていない物かを判別するプログラムを書いていません
書きましょう
書かないと今までの苦労が全て水の泡になってしまいます
幸いflask_jwt_extendedには便利なデコレーターがあります
@jwt.token_in_blocklist_loader
このデコレーターの付いた関数がFalseを返せば含まれておらずTrueを返せば含まれていると言う事を実装できます
Trueを返した場合は、自動的に無効なトークンとして処理されます
63
64
65
66
+
67
+
68
+
69
+
70
+
71
72
73
db.session.commit()
return jsonify(msg=f"{ttype.capitalize()} token successfully revoked")
@jwt.token_in_blocklist_loader
def token_block(_jwt_header, jwt_data):
if TokenBlocklist.query.filter_by(jti=jwt_data["jti"]).first():
return True
return False
@app.route("/user-data", methods=["GET"])
@jwt_required()
これでログアウトが実装出来ました
確認しましょう
上手くいけば、ログアウトする前はリフレッシュできるが、ログアウト後だと
"Token has been revoked"
とエラーが出るだろう
ユーザを追加する
jwtはどこ行ったと言う感じではあるが
一応解説するタイミングがここしか無いのでここで解説しておく
まずは4度目のライブラリを追加しよう
(最初に全て入れとけば良かったと後悔し始めている)
(ついでに分割すれば良かったとも後悔している)
10
11
12
+
13
14
15
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime, timezone
from werkzeug.security import generate_password_hash, check_password_hash
basedir = os.path.abspath(os.path.dirname(__file__))
app = Flask(__name__, instance_relative_config=True)
generate_password_hashはパスワードをハッシュ化するのに使う、check_password_hashはハッシュ化したパスワードが合っているか確認するのに使う
次はUserのデーブルを定義しよう
27
28
29
30
+
31
+
32
+
33
+
34
+
35
+
36
+
37
+
38
39
40
jwt = JWTManager(app)
db = SQLAlchemy(app)
class User(db.Model): # type: ignore
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String(), nullable=False)
password = db.Column(db.String())
def set_password(self, password):
self.password = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password, password)
class TokenBlocklist(db.Model): # type: ignore
id = db.Column(db.Integer, primary_key=True)
後はdbを更新するだけだ
今回はdbをアップグレードするのが面倒なので一回リセットする
TokenBlockListを作った時のように
実行します
>>>from app import db
>>>db.drop_all()
>>>db.create_all()
これでテーブルを定義出来ました
後はユーザ登録できるようにし
42
43
44
45
+
46
+
47
+
48
+
49
+
50
+
51
+
52
+
53
+
54
+
55
+
56
+
57
+
58
+
59
+
60
+
61
+
62
+
63
64
65
type = db.Column(db.String(16), nullable=False)
created_at = db.Column(db.DateTime, server_default=func.now(), nullable=False)
@app.route('/register', methods=['POST'])
@cross_origin()
def register():
username = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["username"]
password = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["password"]
if password is None or username is None or password == "" or username == "":
return jsonify({"msg": "password or username has not been entered"}), 400
if len(password) < 5:
return jsonify({"msg": "password is too short"}), 400
if len(username) < 3:
return jsonify({"msg": "username is too short"}), 400
if User.query.filter_by(name=username).first() is not None:
return jsonify({"msg": "The username is already in use"}), 409
user = User(name=username)
user.set_password(password)
db.session.add(user)
db.session.commit()
return jsonify("create account")
@app.route('/login', methods=['POST'])
@cross_origin()
ログインも出来るようにします
66
67
68
69
-
70
+
71
+
72
73
74
75
def create_token():
username = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["username"]
password = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["password"]
if username != "apple" or password != "mango":
user = User.query.filter_by(name=username).first()
if user is None or not user.check_password(password):
return jsonify({"msg": "Incorrect password or username"}), 401
access_token = create_access_token(identity=username)
refresh_token = create_refresh_token(identity=username)
return jsonify(access_token=access_token, refresh_token=refresh_token)
これで出来ました
ユーザを作ってログインして見ましょう
ですが少し物足りないですね
いまはcurrent_identityでユーザー名しか取得出来ないですがそれ以外も取得出来るようにしましょう
99
100
101
102
+
103
+
104
+
105
+
106
107
108
return True
return False
@jwt.user_lookup_loader
def user_lookup_callback(_jwt_header, jwt_data):
identity = jwt_data["sub"]
return User.query.filter_by(name=identity).one_or_none()
@app.route("/user-data", methods=["GET"])
@jwt_required()
これでcurrent_userを使用してよりユーザーのデータベースのデータに簡単にアクセスできるようになりました
試しに何かデータを返す関数を作りましょう
111
112
113
114
+
115
+
116
+
117
+
118
+
119
120
121
current_identity = get_jwt_identity()
return jsonify({"user":current_identity}) # type: ignore
@app.route("/user-data-v2", methods=["GET"])
@jwt_required()
@cross_origin()
def need_tokenV2():
return jsonify({"username":current_user.name,"id":current_user.id}) # type: ignore
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
これで出来ました
確認してみましょう
最終的なコード
import os, json
from flask import Flask, jsonify, request
from datetime import timedelta
from flask_cors import CORS, cross_origin
from flask_jwt_extended import create_access_token, get_jwt_identity, jwt_required, JWTManager \
, get_jwt,get_jwt_identity, current_user, create_refresh_token \
, decode_token, get_current_user
from sqlalchemy import func
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime, timezone
from werkzeug.security import generate_password_hash, check_password_hash
basedir = os.path.abspath(os.path.dirname(__file__))
app = Flask(__name__, instance_relative_config=True)
app.config.from_mapping(
SECRET_KEY = "secret"
,JWT_SECRET_KEY = "secret"
,JWT_ACCESS_TOKEN_EXPIRES = timedelta(minutes=15)
,JWT_REFRESH_TOKEN_EXPIRES = timedelta(hours=24)
,JSON_AS_ASCII = False
,SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'app.db')
,SQLALCHEMY_TRACK_MODIFICATIONS = False
)
cors = CORS(app, responses={r"/*": {"origins": "*"}})
jwt = JWTManager(app)
db = SQLAlchemy(app)
class User(db.Model): # type: ignore
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String(), nullable=False)
password = db.Column(db.String())
def set_password(self, password):
self.password = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password, password)
class TokenBlocklist(db.Model): # type: ignore
id = db.Column(db.Integer, primary_key=True)
jti = db.Column(db.String(36), nullable=False, index=True)
type = db.Column(db.String(16), nullable=False)
created_at = db.Column(db.DateTime, server_default=func.now(), nullable=False)
@app.route('/register', methods=['POST'])
@cross_origin()
def register():
username = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["username"]
password = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["password"]
if password is None or username is None or password == "" or username == "":
return jsonify({"msg": "password or username has not been entered"}), 400
if len(password) < 5:
return jsonify({"msg": "password is too short"}), 400
if len(username) < 3:
return jsonify({"msg": "username is too short"}), 400
if User.query.filter_by(name=username).first() is not None:
return jsonify({"msg": "The username is already in use"}), 409
user = User(name=username)
user.set_password(password)
db.session.add(user)
db.session.commit()
return jsonify("create account")
@app.route('/login', methods=['POST'])
@cross_origin()
def create_token():
username = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["username"]
password = json.loads(json.loads(request.get_data().decode('utf-8'))["body"])["password"]
user = User.query.filter_by(name=username).first()
if user is None or not user.check_password(password):
return jsonify({"msg": "Incorrect password or username"}), 401
access_token = create_access_token(identity=username)
refresh_token = create_refresh_token(identity=username)
return jsonify(access_token=access_token, refresh_token=refresh_token)
@app.route("/refresh", methods=["POST"])
@jwt_required(refresh=True)
@cross_origin()
def refresh():
identity = get_jwt_identity()
access_token = create_access_token(identity=identity)
return jsonify(access_token=access_token)
@app.route("/logout", methods=["DELETE"])
@jwt_required(verify_type=False)
@cross_origin()
def modify_token():
token = get_jwt()
jti = token["jti"]
ttype = token["type"]
now = datetime.now(timezone.utc)
db.session.add(TokenBlocklist(jti=jti, type=ttype, created_at=now))
db.session.commit()
return jsonify(msg=f"{ttype.capitalize()} token successfully revoked")
@jwt.token_in_blocklist_loader
def token_block(_jwt_header, jwt_data):
if TokenBlocklist.query.filter_by(jti=jwt_data["jti"]).first():
return True
return False
@jwt.user_lookup_loader
def user_lookup_callback(_jwt_header, jwt_data):
identity = jwt_data["sub"]
return User.query.filter_by(name=identity).one_or_none()
@app.route("/user-data", methods=["GET"])
@jwt_required()
@cross_origin()
def need_token2():
current_identity = get_jwt_identity()
return jsonify({"user":current_identity}) # type: ignore
@app.route("/user-data-v2", methods=["GET"])
@jwt_required()
@cross_origin()
def need_tokenV2():
return jsonify({"username":current_user.name,"id":current_user.id}) # type: ignore
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
これで完成しました
最後に今まで作った物を試して見ましょう
最終的に作ったapi
ユーザ登録
ログイン
トークンのリフレッシュ
ログアウト
ユーザー情報の確認
このドキュメントどう?