13 분 소요

내용 출처 : Aiffel

  • 연혁

    연혁 내용
    1950년 앨런 튜링의 이미테이션 게임(개념 등장)
    1966년 MIT, ELIZA 개발(단순 챗봇 발명)
    2011년 애플, Siri 출시(사용성 높은 챗봇)
    2017년 ALBERT, BERT, ULMFiT, Transformer-XL
    (뛰어난 성능의 챗봇 등장)


  • ✨ 챗봇의 5가지 대표 유형(그림출처 : Tony Around)
    • 대화형 챗봇 : 머신러닝/딥러닝 기반 자연어 처리
    • 트리형(버튼) 챗봇 : 정해진 트리에 따라 답변
    • 추천형 챗봇 : 사전 정의된 답변를 알고리즘을 통해 우선 순위별로 리스트
    • 시나리오형 챗봇 : 원하는 서비스 제공을 위해 정해진 시나리오를 수행
    • 결합형 챗봇 : 비즈니스 목적에 따라 여러 챗봇 유형들을 겹합

1. 인코더디코더

1) 인코더 구조 : 두 가지 아키텍쳐

img6

  • 학습 시 Paired data set(입력 데이터 / 출력 데이터)가 필요

2. 트랜스포머의 구조

1) 포지셔널 인코딩

  • 이미지 출처 : http://jalammar.github.io/illustrated-transformer img7
  • 트랜스포머의 인코더 : 누적해 쌓아 올린 층을 통해 정보 추출
  • 트랜스포머의 디코더 : 누적해 쌓아 올린 디코더의 층을 통해 출력
    • 이때, 항상 마지막 인코더 레이어의 출력이 모든 디코더 레이어에 입력됨


img8


  • 기존 자연어 처리와의 차이점 : 포지셔널 인코딩(positional Encoding) img9
    • 왜 필요한가?
      • 트랜스포머 모델은 순차적으로 입력 받는 RNN과 달리 한꺼번에 입력 받기 때문에 단어의 위치 정보, 즉 포지셔널 인코딩이 필요함.
    • 실제 논문에 제시된 포지셔널 인코딩

      img10

      • 인코더, 디코더 모두 임베딩 직후에 추가되는 모습

      • 수식

      img11

      $$PE_{(pos, 2i)} = sin(\frac{pos}{10000^{\frac{2i}{d_{model}}}})$$
      $$PE_{(pos, 2i+1)} = cos(\frac{pos}{10000^{\frac{2i}{d_{model}}}})$$

      $pos$ : 입력 문장에서의 임베딩 벡터의 위치(행 위치)
      $i$ : 임베딩 벡터 내 차원의 인덱스(열 위치)
      $d_{model}$ : 임베딩 벡터의 차

    • 코드 구현
        import tensorflow as tf
        import tensorflow_datasets as tfds
        import os
        import re
        import numpy as np
        import matplotlib.pyplot as plt
      
        ## 클래스 구현
        # 포지셔널 인코딩 레이어
        class PositionalEncoding(tf.keras.layers.Layer):
      		
            def __init__(self, position, d_model):
                super(PositionalEncoding, self).__init__()
                self.pos_encoding = self.positional_encoding(position, d_model)
      		
            def get_angles(self, position, i, d_model):
                angles = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32)) # (d_model값)
                return position * angles # (position값, d_model값) # 브로드캐스트
      		
            def positional_encoding(self, position, d_model):
                # 각도 배열 생성
                angle_rads = self.get_angles(
                    position=tf.range(position, dtype=tf.float32)[:, tf.newaxis],
                    i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
                    d_model=d_model) # position, i shape는 각각 (position값, 1), (1, d_model값)
      			
                # 배열의 짝수 인덱스에는 sin 함수 적용
                sines = tf.math.sin(angle_rads[:, 0::2]) # (position값, d_model값/2)
      			
                # 배열의 홀수 인덱스에는 cosine 함수 적용
                cosines = tf.math.cos(angle_rads[:, 1::2]) # (position값, d_model값/2)
      			
                # sin과 cosine이 교차되도록 재배열
                pos_encoding = tf.stack([sines, cosines], axis=0) # (2, position값, d_model값/2)
                pos_encoding = tf.transpose(pos_encoding,[1, 2, 0]) # (position값, d_model값/2, 2)
                pos_encoding = tf.reshape(pos_encoding, [position, d_model]) # (position값, d_model값) 다시 교차됨!
      			
                pos_encoding = pos_encoding[tf.newaxis, ...] # (1, position값, d_model값)
                return tf.cast(pos_encoding, tf.float32)
      		
            def call(self, inputs):
                    return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]
      
        ## 시각화 하기
        sample_pos_encoding = PositionalEncoding(50, 512)
        plt.pcolormesh(sample_pos_encoding.pos_encoding.numpy()[0], cmap='RdBu')
        plt.xlabel('Depth')
        plt.xlim((0, 512))
        plt.ylabel('Position')
        plt.colorbar()
        plt.show()
      

2) 어텐션

  • 어텐션 값이란? 단어간의 유사도(단어간의 거리|유클리드)

    img12

    • 어텐션 값 도출 과정
      1. ‘쿼리(Query)’와 모든 ‘키(Key)’의 유사도를 각각 도출
      2. 유사도를 키(Key)와 맵핑 된 각 ‘값(Value)’에 반영
      3. 유사도가 반영된 ‘값(Value)’을 모두 더해서 뭉쳐줌


      • 📌주요 용어

        쿼리(Query) : 기준 단어 임베딩$(pos, d_{model})$ x $W_{Query}$
        키(Key) : 측정할 단어 임베딩$(pos, d_{model})$ x $W_{Key}$
        값(Value) : 측정할 단어 임베딩$(pos, d_{model})$ x $W_{Value}$

  • 수식
    • $Attention(Q, K, V) = softmax(\frac{QK^{T}}{\sqrt{d_{k}}})V$
      • $Q$, $K$, $V$ 는 각각 쿼리(Query), 키(Key), 값(Value)

    • $QK^{T}$
      att1
      • 초록색 행렬 = 각 단어 벡터 유사도(거리)가 모두 기록된 행렬

    • 그림으로 바꾼 수식
      att2
      • 닷 프로덕트 어텐션(dot product attention)
        • Key의 depth(행 개수)인 $\sqrt{d_{k}}$로 스케일링
        • softmax함수의 기울기 소실 문제 해결

  • 트랜스포머에 사용된 어텐션
    img13

      1. 인코더 셀프 어텐션 [인코더]
      1. 디코더 셀프 어텐션 [디코더]
      1. 인코더-디코더 어텐션 [디코더]

    • ✨ 셀프 어텐션이란?
      • 대상이 한 문장 내의 단어들인 경우

    • 📌예시 : 저는 학생입니다. → I am a student

      • 1번 어텐션 : 저는 학생입니다.라는 문장 내 단어들 간의 유사도 측정
        • 목적 : 한국어의 어순 데이터?
      • 2번 어텐션 : I am a student라는 문장 내 단어들 간의 유사도 측정
        • 목적 : 영어의 어순 데이터?
      • 3번 어텐션 : 두 문장의 단어들 간의 유사도 측정
        • 목적 : 한국어-영어간의 단어 유사도 데이터?

  • 코드 구현
    # 스케일드 닷 프로덕트 어텐션 함수
    def scaled_dot_product_attention(query, key, value, mask):
      # 어텐션 가중치는 Q와 K의 닷 프로덕트
      matmul_qk = tf.matmul(query, key, transpose_b=True) # 행렬내적, b전치
    
      # 가중치를 정규화
      depth = tf.cast(tf.shape(key)[-1], tf.float32) # key 행개수
      logits = matmul_qk / tf.math.sqrt(depth)
    
      # 패딩에 마스크 추가(특정 단어 무시)
      if mask is not None:
          logits += (mask * -1e9) # 무한한 음수값
    
      # softmax적용
      attention_weights = tf.nn.softmax(logits, axis=-1)
    
      # 최종 어텐션은 가중치와 V의 닷 프로덕트
      output = tf.matmul(attention_weights, value)
      return output
    

3) 멀티 헤드 어텐션

  • 정의 : 다수의 어텐션 병렬 수행
    img14
    • 한번 수행 시 놓칠 수 있는 정보를 캐치

  • 코드 구현
    class MultiHeadAttention(tf.keras.layers.Layer):
      
      def __init__(self, d_model, num_heads, name="multi_head_attention"):
          super(MultiHeadAttention, self).__init__(name=name)
          self.num_heads = num_heads
          self.d_model = d_model
    		
          assert d_model % self.num_heads == 0
    		
          self.depth = d_model // self.num_heads # 각 어텐션별 뎁스
    		
          self.query_dense = tf.keras.layers.Dense(units=d_model)
          self.key_dense = tf.keras.layers.Dense(units=d_model)
          self.value_dense = tf.keras.layers.Dense(units=d_model)
    		
          self.dense = tf.keras.layers.Dense(units=d_model)
    
      def split_heads(self, inputs, batch_size):
          inputs = tf.reshape(inputs, shape=(batch_size, -1, self.num_heads, self.depth))
          return tf.transpose(inputs, perm=[0, 2, 1, 3])
    
      def call(self, inputs):
          query, key, value, mask = inputs['query'], inputs['key'], inputs[
              'value'], inputs['mask']
          batch_size = tf.shape(query)[0]
    		
          # Q, K, V에 각각 Dense를 적용합니다
          query = self.query_dense(query)
          key = self.key_dense(key)
          value = self.value_dense(value)
    		
          # 병렬 연산을 위한 머리를 여러 개 만듭니다
          query = self.split_heads(query, batch_size)
          key = self.split_heads(key, batch_size)
          value = self.split_heads(value, batch_size)
    		
          # 스케일드 닷 프로덕트 어텐션 함수
          scaled_attention = scaled_dot_product_attention(query, key, value, mask)
    		
          scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
    		
          # 어텐션 연산 후에 각 결과를 다시 연결(concatenate)합니다
          concat_attention = tf.reshape(scaled_attention,
                                        (batch_size, -1, self.d_model))
    		
          # 최종 결과에도 Dense를 한 번 더 적용합니다
          outputs = self.dense(concat_attention)
    		
          return outputs
    

4) 마스킹

  • 정의 : 특정 값들을 가려서 실제 연산에 방해가 되지 않도록 하는 기법
  • 패딩 마스킹(Padding Masking)

    img15

    • 코드 구현
        def create_padding_mask(x):
            mask = tf.cast(tf.math.equal(x, 0), tf.float32)
            return mask[:, tf.newaxis, tf.newaxis, :] # (batch_size, 1, 1, sequence length)
      
  • 룩 어헤드 마스킹(Look-ahead masking, 다음 단어 가리기)
    • RNN은 구조 상 자신보다 앞에 있던 단어들만 참고해 다음 단어를 예측가능
    • 반면 트랜스포머는 위치와 상관없이 모든 단어를 참고해 다음 단어를 예측가능
      • 그러나 이전 단어들로부터 다음 단어를 예측하는 훈련이 필요 → 자신보다 다음에 나올 단어를 참고하지 않도록 가리는 기법 img16
    • 코드 구현
        def create_look_ahead_mask(x):
            seq_len = tf.shape(x)[1]
            look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
            padding_mask = create_padding_mask(x)
            return tf.maximum(look_ahead_mask, padding_mask)
      

5) 인코더

  • 2개의 서브 층 : 셀프 어텐션, 피드 포워드 신경망 img17

  • 코드 구현1 : 하나의 인코더
    # 인코더 하나의 레이어를 함수로 구현.
    # 이 하나의 레이어 안에는 두 개의 서브 레이어가 존재합니다.
    def encoder_layer(units, d_model, num_heads, dropout, name="encoder_layer"):
      inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
    	
      # 패딩 마스크 사용
      padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")
    	
      # 첫 번째 서브 레이어 : 멀티 헤드 어텐션 수행 (셀프 어텐션)
      attention = MultiHeadAttention(
        d_model, num_heads, name="attention")({
            'query': inputs,
            'key': inputs,
            'value': inputs,
            'mask': padding_mask
        })
    	
      # 어텐션의 결과는 Dropout과 Layer Normalization이라는 훈련을 돕는 테크닉을 수행
      attention = tf.keras.layers.Dropout(rate=dropout)(attention)
      attention = tf.keras.layers.LayerNormalization(epsilon=1e-6)(inputs + attention)
    	
      # 두 번째 서브 레이어 : 2개의 완전연결층
      outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention)
      outputs = tf.keras.layers.Dense(units=d_model)(outputs)
    	
      # 완전연결층의 결과는 Dropout과 LayerNormalization이라는 훈련을 돕는 테크닉을 수행
      outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
      outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention + outputs)
    	
      return tf.keras.Model(inputs=[inputs, padding_mask], outputs=outputs, name=name)
    


  • 코드 구현2 : 인코더 쌓기
    def encoder(vocab_size,
                num_layers,
                units,
                d_model,
                num_heads,
                dropout,
                name="encoder"):
      inputs = tf.keras.Input(shape=(None,), name="inputs")
    	
      # 패딩 마스크 사용
      padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")
    	
      # 임베딩 레이어
      embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
      embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
    	
      # 포지셔널 인코딩
      embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
    	
      outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)
    	
      # num_layers만큼 쌓아올린 인코더의 층.
      for i in range(num_layers):
          outputs = encoder_layer(
              units=units,
              d_model=d_model,
              num_heads=num_heads,
              dropout=dropout,
              name="encoder_layer_{}".format(i),
          )([outputs, padding_mask])
    	
      return tf.keras.Model(inputs=[inputs, padding_mask], outputs=outputs, name=name)
    

6) 디코더

  • 세개의 서브층 : 셀프 어텐션, 인코더-디코더 어텐션, 피드 포워드 신경망 img18 img19

  • 코드 구현1 : 하나의 디코더
    # 디코더 하나의 레이어를 함수로 구현.
    # 이 하나의 레이어 안에는 세 개의 서브 레이어가 존재합니다.
    def decoder_layer(units, d_model, num_heads, dropout, name="decoder_layer"):
      inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
      enc_outputs = tf.keras.Input(shape=(None, d_model), name="encoder_outputs")
      look_ahead_mask = tf.keras.Input(shape=(1, None, None), name="look_ahead_mask")
      padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')
    	
      # 첫 번째 서브 레이어 : 멀티 헤드 어텐션 수행 (셀프 어텐션)
      attention1 = MultiHeadAttention(
        d_model, num_heads, name="attention_1")(inputs={
            'query': inputs,
            'key': inputs,
            'value': inputs,
            'mask': look_ahead_mask
        })
    	
      # 멀티 헤드 어텐션의 결과는 LayerNormalization이라는 훈련을 돕는 테크닉을 수행
      attention1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention1 + inputs)
    
      # 두 번째 서브 레이어 : 마스크드 멀티 헤드 어텐션 수행 (인코더-디코더 어텐션)
      attention2 = MultiHeadAttention(
        d_model, num_heads, name="attention_2")(inputs={
            'query': attention1,
            'key': enc_outputs,
            'value': enc_outputs,
            'mask': padding_mask
        })
    	
      # 마스크드 멀티 헤드 어텐션의 결과는
      # Dropout과 LayerNormalization이라는 훈련을 돕는 테크닉을 수행
      attention2 = tf.keras.layers.Dropout(rate=dropout)(attention2)
      attention2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention2 + attention1)
    	
      # 세 번째 서브 레이어 : 2개의 완전연결층
      outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention2)	
      outputs = tf.keras.layers.Dense(units=d_model)(outputs)
    	
      # 완전연결층의 결과는 Dropout과 LayerNormalization 수행
      outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
      outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(outputs + attention2)
    
      return tf.keras.Model(
        inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
        outputs=outputs,
        name=name)
    


  • 코드 구현2 : 디코더 쌓기
    def decoder(vocab_size,
                num_layers,
                units,
                d_model,
                num_heads,
                dropout,
                name='decoder'):
      inputs = tf.keras.Input(shape=(None,), name='inputs')
      enc_outputs = tf.keras.Input(shape=(None, d_model), name='encoder_outputs')
      look_ahead_mask = tf.keras.Input(shape=(1, None, None), name='look_ahead_mask')
    	
      # 패딩 마스크
      padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')
    	
      # 임베딩 레이어
      embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
      embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
    	
      # 포지셔널 인코딩
      embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
    	
      # Dropout이라는 훈련을 돕는 테크닉을 수행
      outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)
    	
      for i in range(num_layers):
          outputs = decoder_layer(
              units=units,
              d_model=d_model,
              num_heads=num_heads,
              dropout=dropout,
              name='decoder_layer_{}'.format(i),
        )(inputs=[outputs, enc_outputs, look_ahead_mask, padding_mask])
    
      return tf.keras.Model(
          inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
          outputs=outputs,
          name=name)
    

7) 챗봇 데이터 로드

  • Cornell Movie-Dialogs Corpus 영화 및 TV 프로그램에서 사용되었던 대화의 쌍으로 구성된 데이터셋을 사용
  • 목표
    1. 정해진 개수인 50,000개의 질문과 답변의 쌍을 추출한다.
    2. 문장에서 단어와 구두점 사이에 공백을 추가한다.
    3. 알파벳과 ! ? , . 이 4개의 구두점을 제외하고 다른 특수문자는 모두 제거한다.

  • 데이터 불러오기
      path_to_zip = tf.keras.utils.get_file(
          'cornell_movie_dialogs.zip',
          origin='http://www.cs.cornell.edu/~cristian/data/cornell_movie_dialogs_corpus.zip', extract=True)
    
      path_to_dataset = os.path.join(os.path.dirname(path_to_zip), "cornell movie-dialogs corpus") 
    
      path_to_movie_lines = os.path.join(path_to_dataset, 'movie_lines.txt')
      path_to_movie_conversations = os.path.join(path_to_dataset,'movie_conversations.txt')
    


  • 샘플 최대 개수 지정
      # 사용할 샘플의 최대 개수
      MAX_SAMPLES = 50000
    


  • 자연어 전처리
      # 전처리 함수
      def preprocess_sentence(sentence):
          # 입력받은 sentence를 소문자로 변경하고 양쪽 공백을 제거
          sentence = sentence.lower().strip()
    		
          # 단어와 구두점(punctuation) 사이의 거리를 만듭니다.
          # 예를 들어서 "I am a student." => "I am a student ."와 같이
          # student와 온점 사이에 거리를 만듭니다.
          sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
          sentence = re.sub(r'[" "]+', " ", sentence)
    		
          # (a-z, A-Z, ".", "?", "!", ",")를 제외한 모든 문자를 공백인 ' '로 대체합니다
          sentence = re.sub(r"[^a-zA-Z?.!,]+", " ", sentence)
          sentence = sentence.strip()
          return sentence
    


  • Paired data 만들기
      # 질문과 답변의 쌍인 데이터셋을 구성하기 위한 데이터 로드 함수
      def load_conversations():
          id2line = {}
          with open(path_to_movie_lines, errors='ignore') as file:
              lines = file.readlines()
          for line in lines:
              parts = line.replace('\n', '').split(' +++$+++ ')
              id2line[parts[0]] = parts[4]
    		
          inputs, outputs = [], []
          with open(path_to_movie_conversations, 'r') as file:
              lines = file.readlines()
    		
          for line in lines:
              parts = line.replace('\n', '').split(' +++$+++ ')
              conversation = [line[1:-1] for line in parts[3][1:-1].split(', ')]
    		
              for i in range(len(conversation) - 1):
                  # 전처리 함수를 질문에 해당되는 inputs와 답변에 해당되는 outputs에 적용.
                  inputs.append(preprocess_sentence(id2line[conversation[i]]))
                  outputs.append(preprocess_sentence(id2line[conversation[i + 1]]))
    		
              if len(inputs) >= MAX_SAMPLES:
                  return inputs, outputs
          return inputs, outputs
    


  • 샘플 확인
      # 데이터를 로드하고 전처리하여 질문을 questions, 답변을 answers에 저장합니다.
      questions, answers = load_conversations()
      print('전체 샘플 수 :', len(questions))
      print('전체 샘플 수 :', len(answers))
    
      print('전처리 후의 22번째 질문 샘플: {}'.format(questions[21]))
      print('전처리 후의 22번째 답변 샘플: {}'.format(answers[21]))
    

8) 병렬 데이터 전처리

  • 단어장 만들기
      import tensorflow_datasets as tfds
    
      # 질문과 답변 데이터셋에 대해서 Vocabulary 생성
      tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(questions + answers, target_vocab_size=2**13)
    
      # 시작 토큰과 종료 토큰에 고유한 정수를 부여합니다.
      START_TOKEN, END_TOKEN = [tokenizer.vocab_size], [tokenizer.vocab_size + 1]
    


  • 시작, 종료 토큰 번호 확인, 사이즈 조정
      print('START_TOKEN의 번호 :' ,[tokenizer.vocab_size])
      print('END_TOKEN의 번호 :' ,[tokenizer.vocab_size + 1])
    
      # 시작 토큰과 종료 토큰을 고려하여 +2를 하여 단어장의 크기를 산정합니다.
      VOCAB_SIZE = tokenizer.vocab_size + 2
    


  • 정수 인코딩(Integer encoding) & 패딩(Padding) 변환결과 확인
      # 임의의 22번째 샘플에 대해서 정수 인코딩 작업을 수행.
      # 각 토큰을 고유한 정수로 변환
      print('정수 인코딩 후의 21번째 질문 샘플: {}'.format(tokenizer.encode(questions[21])))
      print('정수 인코딩 후의 21번째 답변 샘플: {}'.format(tokenizer.encode(answers[21])))
    


  • 문장의 최대 길이 지정(패딩(Padding) 값)
      # 샘플의 최대 허용 길이 또는 패딩 후의 최종 길이
      MAX_LENGTH = 40
    
      def tokenize_and_filter(inputs, outputs):
          tokenized_inputs, tokenized_outputs = [], []
    
          for (sentence1, sentence2) in zip(inputs, outputs):
              # 정수 인코딩 과정에서 시작 토큰과 종료 토큰을 추가
              sentence1 = START_TOKEN + tokenizer.encode(sentence1) + END_TOKEN
              sentence2 = START_TOKEN + tokenizer.encode(sentence2) + END_TOKEN
    
          # 최대 길이 40 이하인 경우에만 데이터셋으로 허용
          if len(sentence1) <= MAX_LENGTH and len(sentence2) <= MAX_LENGTH:
              tokenized_inputs.append(sentence1)
              tokenized_outputs.append(sentence2)
    		
          # 최대 길이 40으로 모든 데이터셋을 패딩
          tokenized_inputs = tf.keras.preprocessing.sequence.pad_sequences(tokenized_inputs, maxlen=MAX_LENGTH, padding='post')
          tokenized_outputs = tf.keras.preprocessing.sequence.pad_sequences(tokenized_outputs, maxlen=MAX_LENGTH, padding='post')
    		
          return tokenized_inputs, tokenized_outputs
    


  • 샘플 길이 40을 넘은 경우 일부 샘플 제외
      questions, answers = tokenize_and_filter(questions, answers)
      print('단어장의 크기 :',(VOCAB_SIZE))
      print('필터링 후의 질문 샘플 개수: {}'.format(len(questions)))
      print('필터링 후의 답변 샘플 개수: {}'.format(len(answers)))
    


  • 교사 강요(Teacher Forcing) 사용
    • 개념 : t시점 예측 값t+1 입력으로 사용하지 않고, t+1시점 정답을 사용
    • 목적 : 훈련 프로세스 개선(미사용 시, 잘못된 예측 하나가 연쇄적으로 다음 예측 정확도에 영향을 미침 / start_token 제거)
      BATCH_SIZE = 64
      BUFFER_SIZE = 20000
    
      # 디코더는 이전의 target을 다음의 input으로 사용합니다.
      # 이에 따라 outputs에서는 START_TOKEN을 제거하겠습니다.
      dataset = tf.data.Dataset.from_tensor_slices((
          {
              'inputs': questions,
              'dec_inputs': answers[:, :-1] # end_token 제거
          },
          {
              'outputs': answers[:, 1:] # start_token 제거
          },
      ))
    
      dataset = dataset.cache()
      dataset = dataset.shuffle(BUFFER_SIZE)
      dataset = dataset.batch(BATCH_SIZE)
      dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
    

9) 모델 정의 및 학습하기

  • 트랜스포머 함수 정의
      def transformer(vocab_size,
                      num_layers,
                      units,
                      d_model,
                      num_heads,
                      dropout,
                      name="transformer"):
          inputs = tf.keras.Input(shape=(None,), name="inputs")
          dec_inputs = tf.keras.Input(shape=(None,), name="dec_inputs")
    		
          # 인코더에서 패딩을 위한 마스크
          enc_padding_mask = tf.keras.layers.Lambda(
          create_padding_mask, output_shape=(1, 1, None),
          name='enc_padding_mask')(inputs)
    		
          # 디코더에서 미래의 토큰을 마스크 하기 위해서 사용합니다.
          # 내부적으로 패딩 마스크도 포함되어져 있습니다.
          look_ahead_mask = tf.keras.layers.Lambda(
          create_look_ahead_mask,
          output_shape=(1, None, None),
          name='look_ahead_mask')(dec_inputs)
    		
          # 두 번째 어텐션 블록에서 인코더의 벡터들을 마스킹
          # 디코더에서 패딩을 위한 마스크
          dec_padding_mask = tf.keras.layers.Lambda(
          create_padding_mask, output_shape=(1, 1, None),
          name='dec_padding_mask')(inputs)
    		
          # 인코더
          enc_outputs = encoder(
              vocab_size=vocab_size,
              num_layers=num_layers,
              units=units,
              d_model=d_model,
              num_heads=num_heads,
              dropout=dropout,
              )(inputs=[inputs, enc_padding_mask])
    			
          # 디코더
          dec_outputs = decoder(
              vocab_size=vocab_size,
              num_layers=num_layers,
              units=units,
              d_model=d_model,
              num_heads=num_heads,
              dropout=dropout,
              )(inputs=[dec_inputs, enc_outputs, look_ahead_mask, dec_padding_mask])
    		
          # 완전연결층
          outputs = tf.keras.layers.Dense(units=vocab_size, name="outputs")(dec_outputs)
    		
          return tf.keras.Model(inputs=[inputs, dec_inputs], outputs=outputs, name=name)
    


  • 모델 생성
      tf.keras.backend.clear_session()
    
      # 하이퍼파라미터
      NUM_LAYERS = 2 # 인코더와 디코더의 층의 개수
      D_MODEL = 256 # 인코더와 디코더 내부의 입, 출력의 고정 차원
      NUM_HEADS = 8 # 멀티 헤드 어텐션에서의 헤드 수
      UNITS = 512 # 피드 포워드 신경망의 은닉층의 크기
      DROPOUT = 0.1 # 드롭아웃의 비율
    
      model = transformer(
          vocab_size=VOCAB_SIZE,
          num_layers=NUM_LAYERS,
          units=UNITS,
          d_model=D_MODEL,
          num_heads=NUM_HEADS,
          dropout=DROPOUT)
    
      model.summary()
    


  • 손실 함수 정의
      def loss_function(y_true, y_pred):
          y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
          loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True, reduction='none')(y_true, y_pred)
    		
          mask = tf.cast(tf.not_equal(y_true, 0), tf.float32)
          loss = tf.multiply(loss, mask)
    		
          return tf.reduce_mean(loss)
    


  • 커스텀 된 학습 : 학습률을 초기에 급격히 높였다가, 서서히 낮추는 방식
    • 수식 : $lrate = d^{-0.5}_{model}·min(step_num^{-0.5}, step_num·warmup_step^{-.15})$
      class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    
          def __init__(self, d_model, warmup_steps=4000):
              super(CustomSchedule, self).__init__()
    		
              self.d_model = d_model
              self.d_model = tf.cast(self.d_model, tf.float32)
    		
              self.warmup_steps = warmup_steps
    		
          def __call__(self, step):
              arg1 = tf.math.rsqrt(step)
              arg2 = step * (self.warmup_steps**-1.5)
    		
              return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)
    


  • 학습률 변화 시각화
      sample_learning_rate = CustomSchedule(d_model=128)
    
      plt.plot(sample_learning_rate(tf.range(200000, dtype=tf.float32)))
      plt.ylabel("Learning Rate")
      plt.xlabel("Train Step")
    


  • 모델 컴파일
      learning_rate = CustomSchedule(D_MODEL)
    
      optimizer = tf.keras.optimizers.Adam(
          learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)
    
      def accuracy(y_true, y_pred):
          y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
          return tf.keras.metrics.sparse_categorical_accuracy(y_true, y_pred)
    
      model.compile(optimizer=optimizer, loss=loss_function, metrics=[accuracy])
    


  • 학습 시키기
      EPOCHS = 10
      model.fit(dataset, epochs=EPOCHS, verbose=1)
    

10) 챗봇 테스트하기

  • 예측 과정
    1. 새로운 입력 문장에 대해서는 훈련 때와 동일한 전처리를 거친다.
    2. 입력 문장을 토크나이징하고, START_TOKEN과 END_TOKEN을 추가한다.
    3. 패딩 마스킹과 룩 어헤드 마스킹을 계산한다.
    4. 디코더는 입력 시퀀스로부터 다음 단어를 예측한다.
    5. 디코더는 예측된 다음 단어를 기존의 입력 시퀀스에 추가하여 새로운 입력으로 사용한다.
    6. END_TOKEN이 예측되거나 문장의 최대 길이에 도달하면 디코더는 동작을 멈춘다.

  • 예측 과정 구현
      def decoder_inference(sentence):
          sentence = preprocess_sentence(sentence)
    
          # 입력된 문장을 정수 인코딩 후, 시작 토큰과 종료 토큰을 앞뒤로 추가.
          # ex) Where have you been? → [[8331   86   30    5 1059    7 8332]]
          sentence = tf.expand_dims(
            START_TOKEN + tokenizer.encode(sentence) + END_TOKEN, axis=0)
    	
          # 디코더의 현재까지의 예측한 출력 시퀀스가 지속적으로 저장되는 변수.
          # 처음에는 예측한 내용이 없음으로 시작 토큰만 별도 저장. ex) 8331
          output_sequence = tf.expand_dims(START_TOKEN, 0)
    	
          # 디코더의 인퍼런스 단계
          for i in range(MAX_LENGTH):
              # 디코더는 최대 MAX_LENGTH의 길이만큼 다음 단어 예측을 반복합니다.
              predictions = model(inputs=[sentence, output_sequence], training=False)
              predictions = predictions[:, -1:, :]
    
              # 현재 예측한 단어의 정수
              predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)
    
              # 만약 현재 예측한 단어가 종료 토큰이라면 for문을 종료
              if tf.equal(predicted_id, END_TOKEN[0]):
                  break
    		
              # 예측한 단어들은 지속적으로 output_sequence에 추가됩니다.
              # 이 output_sequence는 다시 디코더의 입력이 됩니다.
              output_sequence = tf.concat([output_sequence, predicted_id], axis=-1)
    
          return tf.squeeze(output_sequence, axis=0)
    


  • 생성 함수 구현
      def sentence_generation(sentence):
          # 입력 문장에 대해서 디코더를 동작 시켜 예측된 정수 시퀀스를 리턴받습니다.
          prediction = decoder_inference(sentence)
    
            # 정수 시퀀스를 다시 텍스트 시퀀스로 변환합니다.
          predicted_sentence = tokenizer.decode(
            [i for i in prediction if i < tokenizer.vocab_size])
    
          print('입력 : {}'.format(sentence))
          print('출력 : {}'.format(predicted_sentence))
    		
          return predicted_sentence
    


  • 대답 확인
      sentence_generation('Where have you been?')
    

✨ 관련 프로젝트 링크


댓글남기기