OAuth2.0(オープンオーソライゼーション 2.0)は、ユーザーの認証情報を共有せずに、サービスがユーザーのリソースにアクセスできるようにする認可プロトコルです。認証情報を含むユーザー情報をリソースとして一元管理し、サービスからリソース管理を分離することで、ユーザが様々なサービスを利用可能とすることができます。認証情報が危険にさらされにくく、他のサービスにおいても認証機能と認可機能を実装する必要がないため、サービスの構造をシンプルにすることができます。ウェブサイトやモバイルアプリ、デスクトップアプリなど、さまざまなアプリケーションで利用されています。
■ OAuth2.0の概要
代表的なフローである「AuthorizationCodeフロー」について解説します。
OAuth2.0に必要となるコンポーネントとして、以下の通りです。
・リソースサーバ
リソースとは、ユーザID、パスワードなどの認証情報や名前、住所などを含むユーザ情報のことです。
リソースサーバでは、ユーザ情報を一元管理します。リソースサーバからユーザ情報を取得するには、「アクセストークン」と呼ばれるハッシュ値が必要となります。
・ユーザ
ユーザは、リソースサーバで管理されるリソースの所有者です。
ユーザがブラウザを使って、サービスを利用する場合、そのユーザが本人かどうかを確認(本人認証)する必要があります。そのため、リソースサーバにより、ユーザIDとパスワードにより、本人認証が成功した後で、サービスの利用が可能となります。
・クライアント(サーバ)
リソースを利用するサービスのことです。ユーザがサービスを利用する際、ユーザの本人認証が成功した後で、アクセストークンを使って、リソースサーバからユーザ情報を取得できるようになります。
・認可サーバ
クライアントの情報を管理し、ユーザ、クライアント、リソースサーバの3つを仲介します。クライアントの情報は、クライアントIDおよびクライアントシークレットと呼ばれるパスワードです。
ポイントとなる認可サーバの機能は、ユーザの本人認証、認可コードの発行、アクセストークンの発行の3つです。
①ユーザの本人認証
リソースサーバからユーザ情報を取得し、ユーザが入力したユーザIDとパスワードによる認証を実施する。
②認可コードの発行
クライアントへリソースを利用する許可をユーザから受け取り、クライアントへ認可コードを発行する。
③アクセストークンの発行
クライアントID、クライアントシークレット、認可コードからアクセストークンを生成する。
クライアントは、認可サーバを介して、ユーザの本人認証からクライアントへのアクセストークン生成までを実施し、以降、アクセストークンを使って、リソース情報へとアクセスすることが可能となります。
■ サンプルコードシーケンス
サンプルコードシーケンスを以下に示します。(リソースサーバと認可サーバは、同一のサーバとします)クライアントのポートは8000、認可(/リソース)サーバのポートは9000を使います。
クライアントには、認可(/リソース)サーバから事前に連携されたクライアントIDとクライアントシークレットを保持します。これは事前にクライアントを認可(/リソース)サーバへ登録する手順を実施することで、クライアントIDとクライアントシークレットを入手します。
また、認可(/リソース)サーバでは、ユーザID、パスワードなどユーザ毎のリソース管理とクライアント管理を行います。

①ユーザは、サービス利用のためブラウザからアプリケーションアクセスする
②ユーザはブラウザに表示されたログインボタンをクリックする
③クライアントは、クライアントIDを設定し、認可(/リソース)サーバへリダイレクトする
④認可(/リソース)サーバは、ログイン画面をユーザに返す。その際、クライアントIDを設定する
⑤ユーザは、ブラウザに表示されたログイン画面からユーザID、パスワードを入力する
⑥クライアントID、ユーザID、パスワードが認可(/リソース)サーバへ送信される
⑦認可(/リソース)サーバにて、クライアントID、ユーザID、パスワードにより認証処理が実施される
⑧認可(/リソース)サーバは、クライアントを許可してよいかユーザに問い合わせるため、クライアントIDを含む認可確認画面をユーザに返す
⑨ユーザは、ブラウザに表示された認可確認画面にて認可(Yes)を入力する
➉認可(/リソース)サーバは、認可コード生成する
⑪認可(/リソース)サーバは、生成した認可コードをクライアントに通知する(認証・認可状態)
⑫クライアントは、認可(/リソース)サーバから認可コード、クライアントID、クライアントシークレットを使ってアクセストークンを取得する
⑬認可(/リソース)サーバはアクセストークンを生成し、クライアントへ返す
⑭クライアントは、アクセストークンをセッションに設定する(以降、セッションにてアクセストークンを保持)
⑮クライアントは、認可(/リソース)サーバからアクセストークンを使ってリソース情報取得する
⑯認可(/リソース)サーバはリソース情報を返す
⑰クライアントは、リソース情報含む結果画面をユーザに返す
■ サンプルコード
Python/FlaskによるOAuth2.0の仕組みを実現したサンプルコードを示します。
認可サーバの実装には、Authlibを使います。先に、Authlibの概要を解説します。
・Authlibの概要
Authlibを使って認可サーバを構築するためには、Authlibに沿った実装をする必要があります。
まず、以下のように認可サーバのインスタンスを生成する際、query_client関数、save_token関数を定義し、指定する必要があります。これらの関数は、Authlib内部から自動的に呼び出されます。
# AuthorizationServer インスタンスを生成
authorization = AuthorizationServer(app, query_client=query_client, save_token=save_token)
query_client関数は、クライアントの存在を問い合わせ、クライアントのインスタンスを返却するためのものです。
# --- クライアントクエリ関数 ---
def query_client(client_id):
client_data = clients.get(client_id)
if client_data:
return Client(client_data)
return None
save_token関数は、生成したトークンを内部で管理するため保存するためのものです。
# --- トークン保存関数 ---
def save_token(token, request):
# 必要な情報を token に追加
token['client_id'] = request.client.client_id
token['user_id'] = request.user['user_id']
token['scope'] = request.scope
tokens[token['access_token']] = token
print("Token saved:", token)
また、ClientクラスとAnthorizationCodeGrantクラスを定義する必要があります。これらクラスのメソッドは、OAuth2.0のシーケンスにおいて、Authlibの内部から呼び出されます。
1)Clientクラスを定義する
クライアントのインスタンスとなるClientクラスを定義します。必須となるメンバ変数とメソッドを実装する必要があります。
# --- Client クラスの定義 ---
class Client:
# クライアント情報を受け取ってインスタンスを生成
def __init__(self, data):
self.client_id = data.get('client_id')
self.client_secret = data.get('client_secret')
self.redirect_uris = data.get('redirect_uris', [])
self.scope = data.get('scope')
self.token_endpoint_auth_method = data.get('token_endpoint_auth_method')
self.response_types = data.get('response_types', ['code'])
self.grant_types = data.get('grant_types', ['authorization_code'])
# 登録されたリダイレクト URI のリストに含まれているか確認
def check_redirect_uri(self, redirect_uri):
return redirect_uri in self.redirect_uris
# デフォルトのリダイレクト URI を取得
def get_default_redirect_uri(self):
if self.redirect_uris:
return self.redirect_uris[0]
return None
# response_type が許可されているか確認
def check_response_type(self, response_type):
# シンプルに 'code' だけを許可する場合の例
return response_type in self.response_types
# クライアントシークレットの確認
def check_client_secret(self, client_secret):
# 登録されている client_secret と渡された値を比較
return self.client_secret == client_secret
# token_endpoint_auth_methodの確認
def check_endpoint_auth_method(self, method, endpoint='token'):
return self.token_endpoint_auth_method == method
# grant_typesの確認
def check_grant_type(self, grant_type):
return grant_type in self.grant_types
# リダイレクトURIを取得
def get_redirect_uri(self):
return self.get_default_redirect_uri()
# scopeの取得
def get_allowed_scope(self, scope):
# クライアントに許可されたスコープは self.scope(スペース区切り文字列)に保存されているとする
allowed = set(self.scope.split()) if self.scope else set()
requested = set(scope.split())
# 許可されたスコープとリクエストされたスコープの交差集合を返す
return " ".join(allowed.intersection(requested))
2)AnthorizationCodeGrantクラスを定義する
認可コードを発行するクラスを定義します。このクラスは、以下のようにして、認可サーバのインスタンスに登録します。
# AuthorizationCodeGrant クラスを登録
authorization.register_grant(AuthorizationCodeGrant)
認可コードを発行するクラスは、必須となる幾つかのメソッドを実装する必要があります。
# --- 認可コードグラントの実装 ---
class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):
# 認可コードの保存
def save_authorization_code(self, code, request):
# AuthorizationCode のインスタンスとして保存する
authorization_codes[code] = AuthorizationCode(
code=code,
client_id=request.client.client_id,
redirect_uri=request.redirect_uri,
scope=request.scope,
user_id=request.user['user_id']
)
print("authorization_code saved:", code)
# 認可コードの取得
def query_authorization_code(self, code, client):
data = authorization_codes.get(code)
if data and data.client_id == client.client_id:
return data
return None
# 認可コードの削除
def delete_authorization_code(self, authorization_code):
# インスタンスの code 属性をキーとして使用
authorization_codes.pop(authorization_code.code, None)
# 認可コードからユーザ情報を取得
def authenticate_user(self, authorization_code):
# インスタンスの属性から user_id を取得
user_id = authorization_code.user_id
#return users.get(user_id)
return GetUser(user_id)
Authlibを使った認可(/リソース)サーバのPython/Flaskのコードは以下のようになります。
# server.py
from flask import Flask, request, redirect, session, jsonify
from authlib.integrations.flask_oauth2 import AuthorizationServer
from authlib.oauth2.rfc6749 import grants
from authlib.common.security import generate_token
from urllib.parse import urlencode, quote
app = Flask(__name__)
app.secret_key = 'server_secret_key' # セッションの暗号化キー
# --- シンプルなインメモリ DB ---
# ユーザーリスト
users = {'1': {'user_id': 'id001', 'password': 'mypassword'}}
# ユーザー情報を取得する関数
def GetUser(user_id):
for no in users:
user_data = users[no]
if user_data['user_id'] == user_id:
return user_data
return None
# クライアントリスト
clients = {
'client_1': {
'client_id': 'client_1', # クライアント ID
'client_secret': 'client_secret_1', # クライアントシークレット
'redirect_uris': ['http://localhost:8000/callback'], # クライアントへのリダイレクト URI
'scope': 'profile', # クライアントが要求するスコープ
'token_endpoint_auth_method': 'client_secret_basic', # トークンエンドポイントの認証方法(Basic認証)
'response_types': ['code'], # サポートする response_type
'grant_types': ['authorization_code'], # サポートするグラントタイプ(AurhorizationCode)
}
}
# --- クライアントクエリ関数 ---
def query_client(client_id):
client_data = clients.get(client_id)
if client_data:
return Client(client_data)
return None
# 認可コードとアクセストークンの保存用リスト
authorization_codes = {} # 認可コードリスト(保存用)
tokens = {} # トークンリスト(保存用)
# --- トークン保存関数 ---
def save_token(token, request):
# 必要な情報を token に追加
token['client_id'] = request.client.client_id
token['user_id'] = request.user['user_id']
token['scope'] = request.scope
tokens[token['access_token']] = token
print("Token saved:", token)
# --- Client クラスの定義 ---
class Client:
# クライアント情報を受け取ってインスタンスを生成
def __init__(self, data):
self.client_id = data.get('client_id')
self.client_secret = data.get('client_secret')
self.redirect_uris = data.get('redirect_uris', [])
self.scope = data.get('scope')
self.token_endpoint_auth_method = data.get('token_endpoint_auth_method')
self.response_types = data.get('response_types', ['code'])
self.grant_types = data.get('grant_types', ['authorization_code'])
# 登録されたリダイレクト URI のリストに含まれているか確認
def check_redirect_uri(self, redirect_uri):
return redirect_uri in self.redirect_uris
# デフォルトのリダイレクト URI を取得
def get_default_redirect_uri(self):
if self.redirect_uris:
return self.redirect_uris[0]
return None
# response_type が許可されているか確認
def check_response_type(self, response_type):
# シンプルに 'code' だけを許可する場合の例
return response_type in self.response_types
# クライアントシークレットの確認
def check_client_secret(self, client_secret):
# 登録されている client_secret と渡された値を比較
return self.client_secret == client_secret
# token_endpoint_auth_methodの確認
def check_endpoint_auth_method(self, method, endpoint='token'):
return self.token_endpoint_auth_method == method
# grant_typesの確認
def check_grant_type(self, grant_type):
return grant_type in self.grant_types
# リダイレクトURIを取得
def get_redirect_uri(self):
return self.get_default_redirect_uri()
# scopeの取得
def get_allowed_scope(self, scope):
# クライアントに許可されたスコープは self.scope(スペース区切り文字列)に保存されているとする
allowed = set(self.scope.split()) if self.scope else set()
requested = set(scope.split())
# 許可されたスコープとリクエストされたスコープの交差集合を返す
return " ".join(allowed.intersection(requested))
#---- AuthorizationCode クラスの定義 ----
class AuthorizationCode:
# インスタンスを生成
def __init__(self, code, client_id, redirect_uri, scope, user_id):
self.code = code
self.client_id = client_id
self.redirect_uri = redirect_uri
self.scope = scope
self.user_id = user_id
# リダイレクトURLを取得
def get_redirect_uri(self):
return self.redirect_uri
# scopeを取得
def get_scope(self):
return self.scope
# 有効期限を取得
def get_expires_in(self):
# 例えば認可コードの有効期限を 600 秒 (10 分) とする
return 600
# --- 認可コードグラントの実装 ---
class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):
# 認可コードの保存
def save_authorization_code(self, code, request):
# AuthorizationCode のインスタンスとして保存する
authorization_codes[code] = AuthorizationCode(
code=code,
client_id=request.client.client_id,
redirect_uri=request.redirect_uri,
scope=request.scope,
user_id=request.user['user_id']
)
print("authorization_code saved:", code)
# 認可コードの取得
def query_authorization_code(self, code, client):
data = authorization_codes.get(code)
if data and data.client_id == client.client_id:
return data
return None
# 認可コードの削除
def delete_authorization_code(self, authorization_code):
# インスタンスの code 属性をキーとして使用
authorization_codes.pop(authorization_code.code, None)
# 認可コードからユーザ情報を取得
def authenticate_user(self, authorization_code):
# インスタンスの属性から user_id を取得
user_id = authorization_code.user_id
#return users.get(user_id)
return GetUser(user_id)
# --- Authorization Server のセットアップ ---
# AuthorizationServer インスタンスを生成
authorization = AuthorizationServer(app, query_client=query_client, save_token=save_token)
# AuthorizationCodeGrant クラスを登録
authorization.register_grant(AuthorizationCodeGrant)
#
# 認可サーバ(/リソースサーバ)のエンドポイント
#
# --- ログインエンドポイント ---
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST': # POSTの場合
# ユーザIDとパスワードを取得
user_id = request.form.get('user_id')
password = request.form.get('password')
# ユーザが存在するか確認
user=GetUser(user_id)
if not user:
return 'Invalid user_id or password', 401
# 次の遷移先URLにリダイレクト
session['user_id'] = user_id
next_url = request.args.get('next') or '/'
return redirect(next_url)
# GETの場合、LOGIN画面を返す
return '''
<form method="post">
<label>user_id: <input name="user_id" /></label>
<br>
<label>password: <input name="password" /></label>
<button type="submit">Login</button>
</form>
'''
# --- 認可エンドポイント(認可サーバ) ---
@app.route('/authorize', methods=['GET', 'POST'])
def authorize():
# セッション情報にユーザIDがない場合(ログインしていない)はログインページへリダイレクト
user_id = session.get('user_id')
if not user_id:
# URL全体をエンコードして「next」にセットする
return redirect('/login?next=' + quote(request.url, safe=''))
# ユーザ情報を取得
user = GetUser(user_id)
if request.method == 'GET': # GETの場合
# クエリパラメーターに必要な情報が不足している場合、デフォルト値を付加してリダイレクト
if not request.args.get('client_id') or not request.args.get('response_type'):
query = {
'response_type': 'code',
'client_id': 'client_1',
'redirect_uri': clients['client_1']['redirect_uris'][0],
'scope': 'profile',
'state': 'xyz123'
}
return redirect(request.base_url + '?' + urlencode(query))
try:
#
# Authlib の authorization.get_consent_grant() メソッドを呼び出し、
# リクエストから送られた OAuth2.0 のパラメーター(client_id、redirect_uri、scope、state など)を基に
# 認可リクエストを検証し、認可情報(grant オブジェクト)を生成します。
# この grant オブジェクトは、クライアント情報(例えば grant.client.client_id)などが含まれ、
# 後続の処理でユーザーに表示するために利用されます。
#
grant = authorization.get_consent_grant(end_user=user)
return f'''
<h1>Authorize Client: {grant.client.client_id}?</h1>
<form method="post">
<button name="confirm" value="yes" type="submit">Yes</button>
<button name="confirm" value="no" type="submit">No</button>
</form>
'''
except Exception as e:
return str(e)
# POSTの場合
confirm = request.form.get('confirm')
if confirm == 'yes':
# ユーザがクライアントへリソースへのアクセスを許可した場合
# ユーザーの同意を反映した認可レスポンス
# (例えば、認可コードを生成し、リダイレクト URI に付加してリダイレクトするレスポンス)を作成します。
return authorization.create_authorization_response(grant_user=user)
# ユーザがクライアントへリソースへのアクセスを拒否した場合
# 通常は "access_denied" エラーが付加されたリダイレクトレスポンスが作成され、
# クライアント側で認可が拒否されたことを認識できるようになります。
return authorization.create_authorization_response(grant_user=None)
# --- アクセストークン取得エンドポイント(認可サーバ) ---
@app.route('/token', methods=['POST'])
def issue_token():
# アクセストークンを生成して返却する
return authorization.create_token_response()
# --- リソース情報取得エンドポイント(リソースサーバ) ---
@app.route('/api/me')
def api_me():
# アクセストークンは Authorization ヘッダー (Bearer) で受け取る
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer '):
# アクセストークンを取得し、存在をチェック
access_token = auth_header[7:]
token_data = tokens.get(access_token)
if token_data:
# アクセストークンが存在する場合、クライアントID、ユーザID、scopeを返却する
return jsonify({
'client_id': token_data['client_id'],
'user_id': token_data.get('user_id'),
'scope': token_data.get('scope'),
})
# アクセストークンがない場合、エラーを返却する
return jsonify(error='Invalid token'), 401
if __name__ == '__main__':
# ポート9000で起動
app.run(port=9000, debug=True)
また、クライアントのPython/Flaskのコードを以下に示します。
# client.py
from flask import Flask, redirect, request, session, url_for
import requests
from urllib.parse import urlencode, quote
# client.py の /callback ルート(修正例)
from requests.auth import HTTPBasicAuth
app = Flask(__name__)
app.secret_key = 'client_secret_key'
# OAuth2.0 クライアントの設定
CLIENT_ID = 'client_1' # クライアントID
CLIENT_SECRET = 'client_secret_1' # クライアントシークレット
AUTHORIZATION_ENDPOINT = 'http://localhost:9000/authorize' # 認可エンドポイント(認可サーバ)
TOKEN_ENDPOINT = 'http://localhost:9000/token' # アクセストークン取得エンドポイント(認可サーバ)
REDIRECT_URI = 'http://localhost:8000/callback' # クライアントのリダイレクトURI
RESOURCE_ENDPOINT = 'http://localhost:9000/api/me' # リソース情報取得エンドポイント(リソースサーバ)
#--- ルートエンドポイント ---
@app.route('/')
def index():
# セッションからアクセストークン取得
access_token = session.get('access_token')
if access_token:
# アクセストークンが存在する場合、アクセストークンと、リソース情報取得エンドポイントリンクを返却する
return f"Logged in! Access token: {access_token}<br><a href='/profile'>View Profile</a>"
# アクセストークンが存在しない場合、ログインエンドポイントリンクを返却する
return '<a href="/login">Login with OAuth2</a>'
#--- ログインエンドポイント ---
@app.route('/login')
def login():
# クライアントID、クライアントシークレットなどを設定し、
# 認可エンドポイント(認可サーバ)へリダイレクトする
params = {
'response_type': 'code',
'client_id': CLIENT_ID,
'redirect_uri': REDIRECT_URI,
'scope': 'profile',
'state': 'xyz123', # 本番では十分にランダムな値を生成すること
}
url = f"{AUTHORIZATION_ENDPOINT}?{urlencode(params)}"
return redirect(url)
#--- コールバックポイント ---
@app.route('/callback')
def callback():
# エラーチェック
error = request.args.get('error')
if error:
return f"Error: {error}"
# 認可コード、stateを取得
code = request.args.get('code')
state = request.args.get('state')
# state の検証(本番ではより厳密に実施すること)
if state != 'xyz123':
return "Invalid state parameter", 400
# 認可コードなどを指定し、アクセストークン取得エンドポイント(認可サーバ)にアクセスし、
# アクセストークンを取得する
data = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': REDIRECT_URI,
}
token_response = requests.post(
TOKEN_ENDPOINT,
data=data,
auth=HTTPBasicAuth(CLIENT_ID, CLIENT_SECRET)
)
if token_response.status_code != 200:
# エラーの場合、エラーを返却する
return f"Token error: {token_response.text}", token_response.status_code
# 正常の場合、セッションにアクセストークンを設定し、クライアント内のprofile()を呼び出す
token_json = token_response.json()
session['access_token'] = token_json.get('access_token')
return redirect(url_for('profile'))
#--- リソース情報取得エンドポイント ---
@app.route('/profile')
def profile():
# セッション情報からアクセストークンを取得
access_token = session.get('access_token')
if not access_token:
# アクセストークンが無ければ、クライアント内のlogin()を呼び出す
return redirect(url_for('login'))
# アクセストークンを設定し、リソース情報取得エンドポイント(リソースサーバ)にアクセスする
headers = {'Authorization': f'Bearer {access_token}'}
response = requests.get(RESOURCE_ENDPOINT, headers=headers)
if response.status_code != 200:
# エラーの場合、エラーを返却する
return f"Failed to fetch profile: {response.text}", response.status_code
# 正常の場合、リソース情報を返す
return f"Protected Resource Data: {response.json()}"
if __name__ == '__main__':
# ポート8000で起動
app.run(port=8000, debug=True)
■ まとめ
OAuth2.0は、ユーザーの認証情報(例:パスワード)を直接クライアントに渡さず、第三者のアプリに対して限定的なアクセス権を委譲する認可フレームワークです。利用者はまず認可サーバーでログインし、同意画面でどのリソースにアクセスさせるか(スコープ)を明示的に承認します。その結果、認可コードが発行され、クライアントはこのコードを用いてトークンエンドポイントにリクエストし、アクセストークンと交換します。こうして得たトークンを使用して、クライアントは保護されたリソースにアクセスできるため、ユーザーのプライバシーを守りながら安全なサービス連携を実現します。トークンには、有効期限があるため有効期限切れの前までに、リフレッシュトークンと交換する必要があります。
OAuth2.0は、認証プロトコルではなく、認可プロトコルです。クライアントは、トークンを使って認可サーバにリソースが有効かを問い合わせる必要があり、単独では有効かどうか判断できません。認証プロトコルには、OAuth2.0を拡張したOpenID(OID)があります。