Cognito user pool カスタム認証チャレンジを利用した Email MFA

Cognito user pool カスタム認証チャレンジを利用した Email MFA

Takahiro Iwasa
Takahiro Iwasa
8 min read
Cognito Custom Authentication Lambda Triggers

Cognito には MFA のサポートがありますが、 SMS または TOTP のオプションから選択することしかできません。 多くのウェブサイトでは、追加のメール認証が提供されています。 Cognito user poolのカスタム認証チャレンジを使用してこれを実装できます。

前提条件

以下の手順で、 GitHub からサンプルをローカルにクローンし、以下をインストールしてください。

ディレクトリ構成

./
|-- src/
|   |-- create_auth_challenge/
|   |   `-- app.py
|   |-- define_auth_challenge/
|   |   `-- app.py
|   |-- example/
|   |   |-- main.py
|   |   `-- requirements.txt
|   |-- layers/
|   |   `-- python/
|   |       `-- layers/
|   |          `-- cognito_custom_challenge_helper.py
|   `-- verify_auth_challenge/
|      `-- app.py
|-- tests/
|-- samconfig.toml
`-- template.yaml

Python スクリプト

Lambda Layer

カスタムチャレンジのリクエストとレスポンスを簡単に処理できるようにするために、以下のコードで src/layers/python/layers/cognito_custom_challenge_helper.py を作成してください。

import copy
from enum import Enum


class CustomChallengeName(Enum):
    SRP_A = 'SRP_A'
    PASSWORD_VERIFIER = 'PASSWORD_VERIFIER'
    CUSTOM_CHALLENGE = 'CUSTOM_CHALLENGE'


class Session:
    def __init__(self, session: dict):
        _session = copy.deepcopy(session)
        self.challenge_name = _session['challengeName']
        self.challenge_result = _session['challengeResult']
        self.challenge_metadata = _session.get('challengeMetadata', '')

    def is_srp_a(self) -> bool:
        return self.challenge_name == CustomChallengeName.SRP_A.value \
            and self.challenge_result is True

    def is_password_verifier(self) -> bool:
        return self.challenge_name == CustomChallengeName.PASSWORD_VERIFIER.value \
            and self.challenge_result is True

    def is_custom_challenge(self) -> bool:
        return self.challenge_name == CustomChallengeName.CUSTOM_CHALLENGE.value

    def can_issue_tokens(self) -> bool:
        return self.is_custom_challenge() and self.challenge_result is True


class CustomChallengeRequest:
    def __init__(self, event: dict):
        _request = copy.deepcopy(event['request'])
        _session = _request.get('session', [])
        self.last_session = Session(_session[-1]) if _session else None
        self.user_attributes = _request['userAttributes']
        self.challenge_answer = _request.get('challengeAnswer', '')
        self.private_challenge_parameters = _request.get('privateChallengeParameters', {})

    def verify_answer(self) -> bool:
        return self.private_challenge_parameters.get('answer') == self.challenge_answer


class CustomChallengeResponse:
    def __init__(self, event: dict):
        _response = copy.deepcopy(event['response'])
        self._response = _response

    def set_answer(self, answer: str) -> None:
        self._response['privateChallengeParameters'] = {}
        self._response['privateChallengeParameters']['answer'] = answer

    def set_metadata(self, data: str) -> None:
        self._response['challengeMetadata'] = data

    def set_next_challenge(self, name: CustomChallengeName) -> None:
        self._response['challengeName'] = name.value
        self._response['issueTokens'] = False
        self._response['failAuthentication'] = False

    def set_answer_correct(self, correct: bool) -> None:
        self._response['answerCorrect'] = correct

    def issue_tokens(self) -> None:
        self._response['challengeName'] = ''
        self._response['issueTokens'] = True
        self._response['failAuthentication'] = False

    def fail(self) -> None:
        self._response['issueTokens'] = False
        self._response['failAuthentication'] = True

    def __dict__(self) -> dict:
        return self._response

Define Auth Challenge

カスタム認証フローを開始すると、 Cognito は src/define_auth_challenge/app.py に配置された “Define Auth challenge Lambda trigger” を呼びます。

from layers.cognito_custom_challenge_helper import CustomChallengeRequest, CustomChallengeResponse, CustomChallengeName


def lambda_handler(event: dict, context: dict) -> dict:
    # Parse the event to create a request and response object.
    request = CustomChallengeRequest(event)
    response = CustomChallengeResponse(event)
    last_session = request.last_session

    if last_session.is_srp_a():
        # When the last session is SRP_A, require the client to authenticate with a password.
        response.set_next_challenge(CustomChallengeName.PASSWORD_VERIFIER)

    elif last_session.is_password_verifier():
        # When the last session is PASSWORD_VERIFIER, initiate the custom challenge.
        response.set_next_challenge(CustomChallengeName.CUSTOM_CHALLENGE)

    elif last_session.is_custom_challenge():
        if last_session.can_issue_tokens():
            # When the last session is CUSTOM_CHALLENGE and authentication has been completed, issue tokens.
            response.issue_tokens()
        else:
            # When the last session is CUSTOM_CHALLENGE and the client is still during authentication flow,
            # require the client to answer the next challenge.
            response.set_next_challenge(CustomChallengeName.CUSTOM_CHALLENGE)
    else:
        # If the client is in an unexpected flow, the current authentication must fail.
        response.fail()

    event['response'] = response.__dict__()
    return event

Create Auth Challenge

認証チャレンジフローでユーザーに送信される OTP コードを作成する際、 Cognito は src/create_auth_challenge/app.py に配置された “Create Auth challenge Lambda trigger” を呼び出します。 後述の AWS SAM テンプレートで指定された環境変数 CODE_LENGTH および EMAIL_SENDER を使用します。

import os
import random

import boto3

from layers.cognito_custom_challenge_helper import CustomChallengeRequest, CustomChallengeResponse

client = boto3.client('ses')

CODE_LENGTH = int(os.environ.get('CODE_LENGTH', 6))
EMAIL_SENDER = os.environ.get('EMAIL_SENDER')


def lambda_handler(event: dict, context: dict) -> dict:
    # Parse the event to create a request object.
    request = CustomChallengeRequest(event)
    last_session = request.last_session

    if last_session.is_custom_challenge():
        # When the last session is a custom challenge, extract the otp code from the last session metadata.
        code = last_session.challenge_metadata.replace('challenge-', '')
    else:
        # When the last session is not a custom challenge, generate an otp code and send it to the client.
        code = generate_code()
        message = create_message(code)
        send_mail_to(request.user_attributes['email'], message)

    # Create a response
    response = CustomChallengeResponse(event)
    response.set_answer(code)
    response.set_metadata(f'challenge-{code}')
    event['response'] = response.__dict__()
    return event


def generate_code(length=CODE_LENGTH) -> str:
    return str(random.randint(0, 10 ** length - 1)).zfill(length)


def create_message(code: str) -> str:
    return f'Your authentication code: {code}'


def send_mail_to(email: str, body: str) -> None:
    client.send_email(
        Source=EMAIL_SENDER,
        Destination={
            'ToAddresses': [email],
        },
        Message={
            'Subject': {
                'Charset': 'UTF-8',
                'Data': 'Authentication Code',
            },
            'Body': {
                'Text': {
                    'Charset': 'UTF-8',
                    'Data': body,
                },
            },
        },
    )

Cognito では、認証フローセッションの期間を設定できます。

Verify Auth Challenge

OTP コードを検証する際、 Cognito は src/verify_auth_challenge/app.py に配置された “Verify Auth challenge Lambda trigger” を呼び出します。

from layers.cognito_custom_challenge_helper import CustomChallengeRequest, CustomChallengeResponse


def lambda_handler(event: dict, context: dict) -> dict:
    # Parse the event to create a request and response object.
    request = CustomChallengeRequest(event)
    response = CustomChallengeResponse(event)

    # Create a response.
    correct = request.verify_answer()
    response.set_answer_correct(correct)
    event['response'] = response.__dict__()
    return event

AWS リソース作成

AWS SAM テンプレート

カスタム認証チャレンジを有効にするには、 ExplicitAuthFlows 内の ALLOW_CUSTOM_AUTH (39行目)を設定してください。 各 AWS::Serverless::Function 内の CognitoEvent が Cognito Lambda トリガーとして機能する Lambda にリンクされていることを確認してください。 Amazon SES を使用してメールを送信するためには、 CreateAuthChallenge 関数内の Policies の設定が必要です。

AWSTemplateFormatVersion: 2010-09-09
Transform: AWS::Serverless-2016-10-31
Description: Email MFA using Cognito User Pool custom authentication challenges

Globals:
  Function:
    Timeout: 3
    MemorySize: 128

Parameters:
  CodeLength:
    Type: Number
    Default: '6'
  EmailSender:
    Type: String

Resources:
  CognitoUserPool:
    Type: AWS::Cognito::UserPool
    Properties:
      UserPoolName: cognito-custom-auth-email-mfa
      Policies:
        PasswordPolicy:
          MinimumLength: 8
      UsernameAttributes:
        - email
      Schema:
        - Name: email
          AttributeDataType: String
          Required: true

  CognitoUserPoolClient:
    Type: AWS::Cognito::UserPoolClient
    Properties:
      UserPoolId: !Ref CognitoUserPool
      ClientName: client
      GenerateSecret: false
      ExplicitAuthFlows:
        - ALLOW_CUSTOM_AUTH
        - ALLOW_REFRESH_TOKEN_AUTH
        - ALLOW_USER_SRP_AUTH

  LambdaLayer:
    Type: AWS::Serverless::LayerVersion
    Properties:
      LayerName: cognito_email_mfa_layer
      ContentUri: src/layers
      CompatibleRuntimes:
        - python3.11
      RetentionPolicy: Delete

  CreateAuthChallenge:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/create_auth_challenge
      Handler: app.lambda_handler
      Runtime: python3.11
      Layers:
        - !Ref LambdaLayer
      Environment:
        Variables:
          CODE_LENGTH: !Ref CodeLength
          EMAIL_SENDER: !Ref EmailSender
      Events:
        CognitoEvent:
          Type: Cognito
          Properties:
            Trigger: CreateAuthChallenge
            UserPool: !Ref CognitoUserPool
      Policies:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Action: ses:SendEmail
            Resource: "*"

  DefineAuthChallenge:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/define_auth_challenge
      Handler: app.lambda_handler
      Runtime: python3.11
      Layers:
        - !Ref LambdaLayer
      Events:
        CognitoEvent:
          Type: Cognito
          Properties:
            Trigger: DefineAuthChallenge
            UserPool: !Ref CognitoUserPool

  VerifyAuthChallenge:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/verify_auth_challenge
      Handler: app.lambda_handler
      Runtime: python3.11
      Layers:
        - !Ref LambdaLayer
      Events:
        CognitoEvent:
          Type: Cognito
          Properties:
            Trigger: VerifyAuthChallengeResponse
            UserPool: !Ref CognitoUserPool

ビルドおよびデプロイ

<YOUR_SES_EMAIL_SENDER> を送信元のメールアドレスに置き換え、次のコマンドでビルドおよびデプロイしてください。 OTP コードの長さを変更する場合は、 CodeLength パラメータも指定してください。

sam build
sam deploy --parameter-overrides EmailSender=<YOUR_SES_EMAIL_SENDER>
# sam deploy --parameter-overrides EmailSender=<YOUR_SES_EMAIL_SENDER> CodeLength=10

テスト

<YOUR_USER_POOL_ID> および <YOUR_EMAIL> を実際の値で置き換え、以下のコマンドで Cognito にテストユーザーを作成してください。

POOL_ID=<YOUR_USER_POOL_ID>
EMAIL=<YOUR_EMAIL>

# Add a Cognito user.
aws cognito-idp admin-create-user \
  --user-pool-id $POOL_ID \
  --username $EMAIL

# Make the user confirmation status "Confirmed"
echo -n 'Password: '
read password
aws cognito-idp admin-set-user-password \
  --user-pool-id $POOL_ID \
  --username $EMAIL \
  --password $password \
  --permanent

以下のコマンドを実行してください。

cd src/example
pip install -r requirements.txt
python main.py \
  --pool-id <YOUR_USER_POOL_ID> \
  --client-id <YOUR_CLIENT_ID> \
  --username <YOUR_EMAIL> \
  --password <YOUR_PASSWORD>

main.py では、 SRP - Secure Remote Password で必要な値を計算するために、以下のライブラリを使用しています。

AWS 公式のコード例もご参考ください。

クリーンアップ

以下のコマンドを使用して、プロビジョニングされた AWS リソースを削除してください。

sam delete

まとめ

Cognito カスタム認証チャレンジを利用することで、メールによる MFA を実現できます。 Cognito が正式対応することを期待したいと思います。

この投稿が、お役に立てば幸いです。

Takahiro Iwasa

Takahiro Iwasa

Software Developer at KAKEHASHI Inc.
Involved in the requirements definition, design, and development of cloud-native applications using AWS. Now, building a new prescription data collection platform at KAKEHASHI Inc. Japan AWS Top Engineers 2020-2023.