セキュリティ

はじめてのOpenID入門(OAuth2の拡張)

投稿日:

以下の記事で解説したように、OAuth2.0は「認可(Authorization)」のフレームワークであり、クライアントがユーザーのパスワードを知らずに、代わりに発行されたトークンを使って保護されたリソースにアクセスする仕組みです。

一方、OpenID Connect (OIDC) は OAuth2.0 を拡張して「認証(Authentication)」を実現します。
OIDCでは、トークンの一種である IDトークン(ID Token) を発行し、ユーザーの本人確認情報(発行者、ユーザー識別子、ログイン時間、nonceなど)を JWT 形式で返します。

■ サンプルコードシーケンス

サンプルコードシーケンスを以下に示します。(リソースサーバと認可サーバは、同一のサーバとします)クライアントのポートは8000、認可(/リソース)サーバのポートは9000を使います。

クライアントには、認可(/リソース)サーバから事前に連携されたクライアントIDとクライアントシークレットを保持します。これは事前にクライアントを認可(/リソース)サーバへ登録する手順を実施することで、クライアントIDとクライアントシークレットを入手します。

また、認可(/リソース)サーバでは、ユーザID、パスワードなどユーザ毎のリソース管理とクライアント管理を行います。OIDCでの主な変更点は、⑬~⑲のフローとなります。

①ユーザは、サービス利用のためブラウザからアプリケーションアクセスする
②ユーザはブラウザに表示されたログインボタンをクリックする
③クライアントは、クライアントIDを設定し、認可(/リソース)サーバへリダイレクトする
④認可(/リソース)サーバは、ログイン画面をユーザに返す。その際、クライアントIDを設定する
⑤ユーザは、ブラウザに表示されたログイン画面からユーザID、パスワードを入力する
⑥クライアントID、ユーザID、パスワードが認可(/リソース)サーバへ送信される

⑦認可(/リソース)サーバにて、クライアントID、ユーザID、パスワードにより認証処理が実施される
⑧認可(/リソース)サーバは、クライアントを許可してよいかユーザに問い合わせるため、クライアントIDを含む認可確認画面をユーザに返す
⑨ユーザは、ブラウザに表示された認可確認画面にて認可(Yes)を入力する
➉認可(/リソース)サーバは、認可コード生成する
⑪認可(/リソース)サーバは、生成した認可コードをクライアントに通知する(認証・認可状態)

⑫クライアントは、認可(/リソース)サーバから認可コード、クライアントID、クライアントシークレットを使ってアクセストークンを取得する
⑬認可(/リソース)サーバはアクセストークンとIDトークン(JWT)を生成し、クライアントへ返す
⑭⑮認可(/リソース)サーバからJWT復号用公開鍵取得する
⑯公開鍵によりIDトークン(JWT)をデコードし、検証した後、IDトークンをセッションに設定する
⑰⑱アクセストークンを指定し、認可(/リソース)サーバからリソース情報取得する
⑲結果画面を返信する

■ OIDC対応で追加される要素

解説したOAuth2.0の構成に対して、OIDCへ対応するため、幾つか追加・修正すべき機能があります。

・サーバ側(oidc_server.py)

1)Token Validatorを追加

IDトークンの有効性を確認するインスタンス(トークンバリデータ)を追加し、登録します。
Token Validator(トークンバリデータ)は、アクセストークンの有効性を検証する仕組みです。AuthlibではBearerTokenValidatorを継承してMyBearerTokenValidatorを実装し、保存済みトークンを照合して有効なトークンオブジェクトを返します。
これをResourceProtectorに登録することで、@require_oauth()デコレータを使い、リソースサーバの保護リソース(エンドポイント 例:/userinfo)へのアクセス時に自動的にトークン検証を行います。これにより、リソースサーバはトークンの有効期限・失効状態を統一的に確認し、正当なアクセストークンを持つクライアントのみがリソースにアクセスできる安全な構成を実現します。

# — Token Validator —
# カスタムのBearerTokenValidatorを実装
class MyBearerTokenValidator(BearerTokenValidator):

# ResourceProtectorの初期化
require_oauth = ResourceProtector()

# カスタムトークンバリデータを登録require_oauth.register_token_validator(MyBearerTokenValidator(tokens))


Token Validator(トークンバリデータ)では、IDトークン (ID Token)として、ユーザーの認証結果をJWT形式で表すトークン(署名付きで改ざん検出可能)を定義します。

# トークンオブジェクトを返す
class TokenObj:
            def __init__(self, d):
                self._d = d
                self.user = d[‘user’]
                self.client_id = d[‘client_id’]
                self.scope = d.get(‘scope’, ”)
                self.issued_at = int(d[‘created_at’].timestamp())
                self.expires_in = int(d.get(‘expires_in’, 3600))


2)独自のOpenIDCodeインスタンスの定義と登録

OpenIDCodeクラスを継承し、nonceのチェックとIDトークン生成を実装します。このクラスのインスタンスを、AnthorizationCodeGrantクラスのインスタンスを認可サーバのインスタンスに登録する際に、指定します。

OpenIDCodeはOAuth2.0に認証機能を拡張するクラスで、IDトークンの生成を担当します。これを継承したMyOpenIDCodeでは、nonce(リプレイ攻撃防止用値)の検証とIDトークン生成を実装します。認可サーバでは、AuthorizationCodeGrantWithOIDCと共にMyOpenIDCodeを登録することで、認可コードフローにOpenID Connectを統合します。これにより、アクセストークン発行時にIDトークンが自動生成され、クライアントは署名付きのIDトークンから発行者・ユーザー・有効期限などを検証し、安全にユーザー本人を識別できるようになります。

# — OIDC grant extension —
# OpenIDCodeを拡張してnonceのチェックとIDトークン生成を実装
class MyOpenIDCode(OpenIDCode):

# 認可コードグラントとOIDC拡張を登録
authorization.register_grant(
    AuthorizationCodeGrantWithOIDC,
    [MyOpenIDCode(require_nonce=True)]
)

3)UserInfo エンドポイント
アクセストークンを使ってユーザー情報(メールアドレス、名前など)を取得するAPI。

4)Discovery エンドポイント (/.well-known/openid-configuration)
OIDCプロバイダ(認可サーバ)の設定情報をクライアントに自動配布するためのエンドポイントです。

5)JWKS エンドポイント (/jwks)
IDトークン署名の検証に使う公開鍵(JSON Web Key Set)を返却するエンドポイントです。

・クライアント側(oidc_client.py)

クライアントは、OAuth2.0の流れに加えて以下を実施します:

1)Discoveryでエンドポイントを自動設定

サーバのDiscoveryでエンドポイントを呼び出し、事前に必要な設定情報を取得します。

discovery = requests.get(f'{SERVER_URL}/.well-known/openid-configuration’).json()
AUTHORIZATION_ENDPOINT = discovery[‘authorization_endpoint’]
TOKEN_ENDPOINT = discovery[‘token_endpoint’]
USERINFO_ENDPOINT = discovery[‘userinfo_endpoint’]
JWKS_URI = discovery[‘jwks_uri’]

2)認可リクエスト時に nonce を付加

認可リクエストをサーバにリダイレクトする際、パラメータに任意の値(nonce)を設定します。

params = {
‘response_type’: ‘code’,
‘client_id’: CLIENT_ID,
‘redirect_uri’: REDIRECT_URI,
‘scope’: ‘openid profile email’,
‘state’: state,
‘nonce’: nonce
}

3)IDトークン検証

IDトークンの認証を実施ます。

claims = jwt.decode(
id_token,
JsonWebKey.import_key(public_jwk)
)
claims.validate() # 有効期限(exp)、発行者(iss)、aud(クライアントID)などを検証

4)UserInfo取得

userinfo_resp = requests.get(
USERINFO_ENDPOINT,
headers={‘Authorization’: f”Bearer {session[‘access_token’]}”}
)
session[‘user_info’] = userinfo_resp.json()

5)セッション構成

セッションの構成を以下のように設定します。

– クライアントのCookie名を 'oidc_session'
– サーバのCookie名を 'session' に変更して衝突回避
– ログアウト時はサーバにリダイレクトしてセッションを削除

■ サンプルコード

上記の変更点を含めたサーバとクライアントのAuthlibを使ったソースコードは以下となります。

・認可(/リソース)サーバのPython/Flaskのコード

from flask import Flask, request, redirect, session, jsonify, render_template_string
from authlib.integrations.flask_oauth2 import AuthorizationServer
from authlib.oauth2.rfc6749 import grants
from authlib.jose import JsonWebKey
import datetime
from authlib.oidc.core.grants import OpenIDCode
from authlib.oidc.core import UserInfo
from urllib.parse import urlencode
from werkzeug.datastructures import ImmutableMultiDict

# --- 簡単なログインフォーム用HTML ---
LOGIN_HTML = '''
<!DOCTYPE html>
<html>
<head><title>Login</title></head>
<body>
    <h2>Login Required</h2>
    <form method="post" action="/authorize">
        <!-- Hidden fields to preserve authorization request -->
        {% for key, value in request.args.items() %}
        <input type="hidden" name="{{ key }}" value="{{ value }}">
        {% endfor %}
        
        <div>
            <label>Username:</label>
            <input type="text" name="username" value="testuser" required>
        </div>
        <div>
            <label>Password:</label>
            <input type="password" name="password" value="testpass" required>
        </div>
        <button type="submit" name="action" value="login">Login</button>
        <button type="submit" name="action" value="deny">Deny</button>
    </form>
</body>
</html>
'''

# --- Stub client lookup matching client_1 ---
def query_client(client_id):
    class Client:
        def __init__(self):
            self.client_id = 'client_1'
            self.client_secret = 'client_secret_1'
            self.redirect_uris = ['http://localhost:8000/callback']
            self.grant_types = ['authorization_code']
            self.response_types = ['code']
            self.scope = 'openid profile email'
            
        def check_client_secret(self, secret):
            return secret == self.client_secret
            
        def check_redirect_uri(self, uri):
            return uri in self.redirect_uris
            
        def check_response_type(self, response_type):
            return response_type in self.response_types
            
        def check_grant_type(self, grant_type):
            return grant_type in self.grant_types
            
        def get_default_redirect_uri(self):
            return self.redirect_uris[0]
            
    return Client() if client_id == 'client_1' else None

def save_token(token_data, request):
    print('Saving token:', token_data)

# --- ユーザー認証 ---
def authenticate_user(username, password):
    if username == 'testuser' and password == 'testpass':
        return {'user_id': '123', 'email': 'testuser@example.com', 'name': 'Test User'}
    return None

# --- Generate keys ---
private_jwk = JsonWebKey.generate_key(kty='RSA', crv_or_size=2048, is_private=True)
public_jwk = private_jwk.as_dict(is_private=False)

# --- In-memory auth code store ---
auth_codes = {}
used_nonces = set()  # nonce追跡用

# --- OIDC grant extension ---
class MyOpenIDCode(OpenIDCode):
    def exists_nonce(self, nonce, request):
        # nonceが既に使用されているかチェック
        if nonce in used_nonces:
            return True
        used_nonces.add(nonce)
        return False

    def get_jwt_config(self, grant):
        return {
            'key': private_jwk,
            'alg': 'RS256',
            'iss': 'http://localhost:9000',
            'exp': 3600,
        }

    def generate_user_info(self, user, scope):
        userinfo = UserInfo(sub=str(user.get('user_id')))
        if 'email' in scope:
            userinfo['email'] = user.get('email')
        if 'profile' in scope:
            userinfo['name'] = user.get('name')
        return userinfo

class AuthorizationCodeGrantWithOIDC(grants.AuthorizationCodeGrant):
    def save_authorization_code(self, code, request):
        # nonceを取得(複数のソースから)
        nonce = None
        if hasattr(request, 'data') and request.data:
            nonce = request.data.get('nonce')
        if not nonce and hasattr(request, 'form'):
            nonce = request.form.get('nonce')
        if not nonce and hasattr(request, 'args'):
            nonce = request.args.get('nonce')
        
        print(f"Saving auth code with nonce: {nonce}")
        
        auth_codes[code] = {
            'client_id': request.client.client_id,
            'redirect_uri': request.redirect_uri,
            'scope': request.scope,
            'user': request.user,
            'nonce': nonce,
            'created_at': datetime.datetime.utcnow()
        }

    def query_authorization_code(self, code, client):
        data = auth_codes.get(code)
        if data and data['client_id'] == client.client_id:
            class AuthCode:
                def __init__(self, code, data):
                    self.code = code
                    self.client_id = data['client_id']
                    self.redirect_uri = data['redirect_uri']
                    self.scope = data['scope']
                    self.user = data['user']
                    self.nonce = data['nonce']
                    self.auth_time = data['created_at']
                    
                def is_expired(self):
                    return (datetime.datetime.utcnow() - self.auth_time).total_seconds() > 600
                    
            return AuthCode(code, data)
        return None

    def delete_authorization_code(self, authorization_code):
        auth_codes.pop(authorization_code.code, None)

    def authenticate_user(self, authorization_code):
        return authorization_code.user

# --- Flask app ---
app = Flask(__name__)
app.secret_key = 'server_secret_key'

authorization = AuthorizationServer(app, query_client=query_client, save_token=save_token)
authorization.register_grant(
    AuthorizationCodeGrantWithOIDC,
    [MyOpenIDCode(require_nonce=True)]
)

@app.route('/.well-known/openid-configuration')
def openid_configuration():
    issuer = 'http://localhost:9000'
    return jsonify({
        'issuer': issuer,
        'authorization_endpoint': issuer + '/authorize',
        'token_endpoint': issuer + '/token',
        'userinfo_endpoint': issuer + '/userinfo',
        'jwks_uri': issuer + '/jwks',
        'response_types_supported': ['code'],
        'subject_types_supported': ['public'],
        'id_token_signing_alg_values_supported': ['RS256'],
        'scopes_supported': ['openid', 'profile', 'email'],
    })

@app.route('/jwks')
def jwks():
    return jsonify({'keys': [public_jwk]})

@app.route('/authorize', methods=['GET', 'POST'])
def authorize():
    print(f"\n=== /authorize called: {request.method} ===")
    
    if request.method == 'GET':
        print(f"GET params: {dict(request.args)}")
        # 認証が必要な場合、ログインフォームを表示
        return render_template_string(LOGIN_HTML, request=request)
    
    if request.method == 'POST':
        print(f"POST form data: {dict(request.form)}")
        action = request.form.get('action')
        print(f"Action: {action}")
        
        if action == 'deny':
            # ユーザーがアクセスを拒否した場合
            print("User denied access")
            redirect_uri = request.form.get('redirect_uri')
            state = request.form.get('state')
            params = {'error': 'access_denied'}
            if state:
                params['state'] = state
            return redirect(f"{redirect_uri}?{urlencode(params)}")
        
        if action == 'login':
            # ユーザー認証
            username = request.form.get('username')
            password = request.form.get('password')
            print(f"Attempting login: username={username}")
            user = authenticate_user(username, password)
            
            if not user:
                print("Authentication failed")
                return "Invalid credentials", 401
            
            print(f"User authenticated: {user}")
            
            # セッションにユーザー情報を保存
            session['user'] = user
            
            # 重要: 元のリクエストパラメータをrequest.argsとして復元
            # フォームからGETパラメータを復元
            oauth_params = {}
            for key in ['response_type', 'client_id', 'redirect_uri', 'scope', 'state', 'nonce']:
                value = request.form.get(key)
                if value:
                    oauth_params[key] = value
            
            print(f"OAuth params from form: {oauth_params}")
            
            # ユーザー情報をrequestに設定
            request.user = user
            
            # 重要: grantを承認するために'confirm'パラメータを追加
            oauth_params['confirm'] = 'true'
            request.args = ImmutableMultiDict(oauth_params)
            
            print(f"Restored OAuth params to request.args: {dict(request.args)}")
            
            # 認可レスポンスを作成
            print("Creating authorization response...")
            try:
                response = authorization.create_authorization_response(grant_user=user)
                print(f"Authorization response created: {response}")
                return response
            except Exception as e:
                print(f"Error creating authorization response: {e}")
                import traceback
                traceback.print_exc()
                raise
    
    # 既に認証済みの場合
    if 'user' in session:
        request.user = session['user']
        return authorization.create_authorization_response()
    
    return "Bad Request", 400

@app.route('/token', methods=['POST'])
def issue_token():
    return authorization.create_token_response()

@app.route('/userinfo', methods=['GET', 'POST'])
def userinfo():
    return authorization.create_userinfo_response()

@app.route('/logout')
def logout():
    session.clear()
    used_nonces.clear()  # nonceもクリア
    return "Logged out successfully"

if __name__ == '__main__':
    print("OIDC Server starting on http://localhost:9000")
    print("Test credentials: username=testuser, password=testpass")
    app.run(port=9000, debug=True)


・クライアントのPython/Flaskのコード

from flask import Flask, redirect, request, session, url_for
import requests
from urllib.parse import urlencode
from requests.auth import HTTPBasicAuth
from authlib.jose import jwt, JsonWebKey
from authlib.common.security import generate_token
import json
import time

app = Flask(__name__)
app.secret_key = 'client_secret_key'

# 設定
SERVER_URL = 'http://localhost:9000'
CLIENT_ID = 'client_1'
CLIENT_SECRET = 'client_secret_1'
REDIRECT_URI = 'http://localhost:8000/callback'
SCOPE = 'openid profile email'

# Discovery
try:
    discovery = requests.get(f'{SERVER_URL}/.well-known/openid-configuration').json()
    AUTHORIZATION_ENDPOINT = discovery['authorization_endpoint']
    TOKEN_ENDPOINT = discovery['token_endpoint']
    USERINFO_ENDPOINT = discovery['userinfo_endpoint']
    JWKS_URI = discovery['jwks_uri']
    print("OIDC Discovery successful")
except Exception as e:
    print(f"Warning: Discovery failed: {e}")
    AUTHORIZATION_ENDPOINT = f'{SERVER_URL}/authorize'
    TOKEN_ENDPOINT = f'{SERVER_URL}/token'
    USERINFO_ENDPOINT = f'{SERVER_URL}/userinfo'
    JWKS_URI = f'{SERVER_URL}/jwks'

@app.route('/')
def index():
    user_info = session.get('user_info')
    if user_info:
        return f'''
        <h2>Welcome, {user_info.get('name', 'User')}!</h2>
        <p>Email: {user_info.get('email', 'Not provided')}</p>
        <p><a href="/claims">View ID Token Claims</a></p>
        <p><a href="/logout">Logout</a></p>
        '''
    return '''
    <h2>OIDC Client Demo</h2>
    <p><a href="/login">Login with OIDC</a></p>
    '''

def cleanup_old_states():
    """10分以上前のstateを削除"""
    if 'oauth_states' in session:
        current_time = time.time()
        oauth_states = session['oauth_states']
        
        expired_states = [
            state for state, data in oauth_states.items()
            if current_time - data.get('timestamp', 0) > 600
        ]
        
        for state in expired_states:
            del oauth_states[state]
        
        if expired_states:
            session['oauth_states'] = oauth_states
            session.modified = True
            print(f"Cleaned up {len(expired_states)} expired states")

@app.route('/login')
def login():
    try:
        cleanup_old_states()
        
        state = generate_token(48)
        nonce = generate_token(48)
        
        # 複数のstateを管理
        if 'oauth_states' not in session:
            session['oauth_states'] = {}
        
        session['oauth_states'][state] = {
            'nonce': nonce,
            'timestamp': time.time()
        }
        session.modified = True
        
        print(f"Created new state: {state[:20]}... with nonce: {nonce[:20]}...")
        print(f"Total states in session: {len(session['oauth_states'])}")
        
        params = {
            'response_type': 'code',
            'client_id': CLIENT_ID,
            'redirect_uri': REDIRECT_URI,
            'scope': SCOPE,
            'state': state,
            'nonce': nonce
        }
        
        auth_url = AUTHORIZATION_ENDPOINT + '?' + urlencode(params)
        print(f"Redirecting to: {auth_url}")
        return redirect(auth_url)
        
    except Exception as e:
        print(f"Login error: {e}")
        import traceback
        traceback.print_exc()
        return f"Login error: {str(e)}", 500

@app.route('/callback')
def callback():
    try:
        print("\n=== Callback received ===")
        print(f"Query params: {dict(request.args)}")
        
        # エラーチェック
        error = request.args.get('error')
        if error:
            error_description = request.args.get('error_description', '')
            print(f"Authorization error: {error} - {error_description}")
            return f"Authorization error: {error}<br>Description: {error_description}", 400
        
        # パラメータ取得
        code = request.args.get('code')
        state = request.args.get('state')
        
        if not code:
            print("No authorization code received")
            return "Authorization code not received", 400
        
        if not state:
            print("No state parameter received")
            return "State parameter not received", 400
        
        print(f"Received code: {code[:20]}... and state: {state[:20]}...")
        
        # Stateチェック - セッションの内容を詳しくログ出力
        oauth_states = session.get('oauth_states', {})
        print(f"Session oauth_states type: {type(oauth_states)}")
        print(f"Session oauth_states keys: {list(oauth_states.keys())}")
        print(f"Number of states in session: {len(oauth_states)}")
        
        if state not in oauth_states:
            print(f"State validation FAILED!")
            print(f"Received state: {state}")
            print(f"Available states: {list(oauth_states.keys())}")
            
            # デバッグ: セッション全体の内容を表示
            print(f"Full session content: {dict(session)}")
            
            return "Invalid state parameter - possible CSRF attack", 400
        
        # stateデータを取得
        state_data = oauth_states[state]
        nonce = state_data['nonce']
        timestamp = state_data['timestamp']
        
        print(f"State validated successfully!")
        print(f"Nonce: {nonce[:20]}..., Age: {time.time() - timestamp:.1f}s")
        
        # タイムスタンプチェック(10分以内)
        if time.time() - timestamp > 600:
            del oauth_states[state]
            session['oauth_states'] = oauth_states
            session.modified = True
            return "State expired (older than 10 minutes)", 400
        
        # 使用済みstateを削除(一度だけ使用可能にする)
        del oauth_states[state]
        session['oauth_states'] = oauth_states
        session.modified = True
        print(f"State consumed and removed from session")
        
        # トークン交換
        token_data = {
            'grant_type': 'authorization_code',
            'code': code,
            'redirect_uri': REDIRECT_URI
        }
        
        print(f"Requesting token from: {TOKEN_ENDPOINT}")
        token_resp = requests.post(
            TOKEN_ENDPOINT,
            data=token_data,
            auth=HTTPBasicAuth(CLIENT_ID, CLIENT_SECRET),
            headers={'Accept': 'application/json'}
        )
        
        print(f"Token response status: {token_resp.status_code}")
        
        if token_resp.status_code != 200:
            print(f"Token error response: {token_resp.text}")
            return f"Token error: {token_resp.text}", token_resp.status_code
        
        tokens = token_resp.json()
        print(f"Received tokens: {list(tokens.keys())}")
        
        # ID Token検証
        id_token = tokens.get('id_token')
        if not id_token:
            return "ID token not received", 400
        
        # JWKS取得
        jwks_resp = requests.get(JWKS_URI)
        if jwks_resp.status_code != 200:
            return f"JWKS fetch error: {jwks_resp.text}", 500
            
        jwks = jwks_resp.json()
        if not jwks.get('keys'):
            return "No keys in JWKS", 500
            
        public_jwk = jwks['keys'][0]
        
        # ID Token検証とデコード
        try:
            claims = jwt.decode(
                id_token, 
                JsonWebKey.import_key(public_jwk)
            )
            
            # Claims検証
            claims.validate_exp(leeway=60)
            claims.validate_iss(SERVER_URL)
            claims.validate_aud(CLIENT_ID)
            
            # Nonce検証
            token_nonce = claims.get('nonce')
            print(f"Token nonce: {token_nonce[:20] if token_nonce else 'None'}...")
            print(f"Expected nonce: {nonce[:20]}...")
            
            if token_nonce != nonce:
                print(f"Nonce mismatch! Token: {token_nonce}, Expected: {nonce}")
                return "Invalid nonce in ID token", 400
            
            print(f"Nonce validated successfully")
            print(f"ID Token claims: {dict(claims)}")
            
            # セッションに保存
            session['id_token'] = id_token
            session['id_token_claims'] = dict(claims)
            session['access_token'] = tokens.get('access_token')
            
            # ユーザー情報取得(オプション)
            if session.get('access_token'):
                try:
                    userinfo_resp = requests.get(
                        USERINFO_ENDPOINT,
                        headers={'Authorization': f"Bearer {session['access_token']}"}
                    )
                    if userinfo_resp.status_code == 200:
                        session['user_info'] = userinfo_resp.json()
                        print(f"User info: {session['user_info']}")
                except Exception as e:
                    print(f"Userinfo fetch failed: {e}")
            
            print("Login successful, redirecting to home")
            return redirect(url_for('index'))
            
        except Exception as e:
            print(f"JWT validation error: {e}")
            import traceback
            traceback.print_exc()
            return f"ID token validation failed: {str(e)}", 400
            
    except Exception as e:
        print(f"Callback error: {e}")
        import traceback
        traceback.print_exc()
        return f"Callback error: {str(e)}", 500

@app.route('/claims')
def claims():
    claims_data = session.get('id_token_claims')
    if not claims_data:
        return redirect(url_for('login'))
    
    formatted_claims = json.dumps(claims_data, indent=2, default=str)
    
    return f'''
    <h2>ID Token Claims</h2>
    <pre>{formatted_claims}</pre>
    <p><a href="/">Back to Home</a></p>
    <p><a href="/logout">Logout</a></p>
    '''

@app.route('/logout')
def logout():
    session.clear()
    return redirect(url_for('index'))

@app.errorhandler(Exception)
def handle_error(e):
    print(f"Application error: {e}")
    import traceback
    traceback.print_exc()
    return f"Application error: {str(e)}", 500

if __name__ == '__main__':
    print("OIDC Client starting on http://localhost:8000")
    print("Make sure the OIDC Server is running on http://localhost:9000")
    app.run(port=8000, debug=True)

■ まとめ

OpenID Connect(OIDC)は、認可に特化したOAuth2.0に認証機能を追加し、IDトークン(JWT)によってユーザ本人確認を可能にする仕組みです。OIDCでは、Discovery、JWKS、UserInfo、IDトークン生成などが標準化され、安全な認証フローを実現します。

クライアントはnonceを付与して認可リクエストを送り、返却されたIDトークンの署名・期限・発行者を検証することで、改ざん防止と本人性の保証を行います。また、アクセストークンを用いてUserInfo APIからユーザ属性を取得し、ログイン後のユーザ情報を管理できます。結果として、OAuth2.0はOIDCにより「安全なログインプロトコル」として拡張されます。

-セキュリティ

執筆者:


comment

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

関連記事

はじめての「Webセキュリティ」入門

情報セキュリティの中で、最も注視される領域がインターネットを使うWebシステムのセキュリティです。拡大しているECサイトやWebサービスでは、誰でも使える「インターネット」を介してアクセスされます。顧 …

セキュリティプログラミング

セキュリティプログラミングとは、ソフトウェア開発において、脆弱性を作り込まないように安全な実装を行うことを意味します。目的は、外部からの攻撃や内部不正による情報漏えい、改ざん、サービス停止を防ぐことで …

「セキュリティマネジメントマニュアル」公開

セキュリティ対策といっても、イメージできない方々も多いでしょう。 セキュリティ対策を実施するためのセキュリティマネジメントを理解するためのマニュアルを作成しました。 基本方針(トップマネジメント)と情 …

シングルサインオン(SSO)入門(SAML編)

シングルサインオン(SSO)は、ユーザーが一度の認証(ログイン)で、複数のシステムやサービスにアクセスできる仕組みを指します。ユーザを認証するログイン処理を一か所に統括し、アプリケーションやサービスと …

リリースと外部認証

本番環境への移行前には、単にコードをデプロイするだけでなく、リスクを最小化しつつ安定稼働を確保するための準備が必要です。 ■ 最終テスト システムテストが完了した後、ユーザによる受け入れテスト(UAT …

Chinese (Simplified)Chinese (Traditional)EnglishFilipinoFrenchGermanHindiJapaneseKoreanMalayThaiVietnamese