読者です 読者をやめる 読者になる 読者になる

今日も窓辺でプログラム

外資系企業勤めのエンジニアが勉強した内容をまとめておくブログ

シンプルなRNNで文字レベルの言語モデルをTensorFlowで実装してみる

Deep Learning 機械学習 TensorFlow Python NLP

はじめに

GoogleやMicrosoftなどがディープラーニングを使用した機械翻訳をリリースして各所で話題になっています。こんな感じで記事にもなっています。

せっかくなのでこれらのニュースと少し関連のある、RNNを使用した言語モデルをTensorFlowで実装してみたいと思います。
まずはその第一弾として、文字レベルでの英語の言語モデルを実装します。また、学習の過程をTensorBoardを使用して可視化もしています。
今回はとりあえずRNNを組むことを目標とし、ハイパーパラメータの調整など精度を追求するための作業は行いません。

RNNを組むのは初めてで実装があっているか不安なので、もし間違っているところがあったらご指摘いただけると幸いです。

ソースコード

今回の記事の内容を実装したファイルをGitHubに上げておきます。
下記レポジトリ中のcharmodel.pyというファイルで今回の言語モデルを実装しています。
github.com

事前準備

どんな言語モデルを作るのか

文字レベルの言語モデルなので、ある英語のテキスト  c_0, c_1, \cdots, c_n がある場合に、次のような確率を予測するモデルを作成します。
 P(c_n | c_0, c_1, ..., c_{n-1})

例えば、 n = 4 c_0, c_1, c_2, c_3が"hell"という文字列の時に、このモデルは次の文字が"o"である確率 P({\mathrm o} | {\mathrm hell})が予測できます。
また、今回は問題を簡単にするために英語の言語モデルを作成します。a~zの26文字+スペースの27文字が、 c_iの取りうる値になります。

ベースとなるアイデア

基本的に、Tomas MikolovらによるRecurrent neural network based language modelという論文のアイデアをベースに実装している形になります。論文は単語レベルの言語モデルですが、今回はこれを文字レベルで実装します。
理論の詳細は元論文やネット上の日本語の記事などに譲りますが、一言でいうと、 c_0, c_1, \cdots, c_{n-1}を入力したときに、 c_1, c_2, \cdots, c_nが出力されるようなRNNを訓練します。

論文や日本語の解説記事へのリンクも貼っておきます:

入出力のデータ構造

ニューラルネットなので入出力は何かしらのベクトルの形になるのですが、文字数の次元数のベクトルで、1つの要素だけが1、他の要素は全て0というようなベクトルが入力となります。このような表現方法を、one-hot表現と呼んだりします。
例えば、a, b, cに関してはそれぞれ次のような形になります。
 \displaystyle
{\mathrm a} = [ 1, 0, 0, 0, \cdots, 0 ]
 \displaystyle
{\mathrm b} = [ 0, 1, 0, 0, \cdots, 0 ]
 \displaystyle
{\mathrm c} = [ 0, 0, 1, 0, \cdots, 0 ]

出力は、 n番目の文字 c_nがどの文字であるかの確率が27次元のベクトルで出力されます。a, b, cである確率が0.2, 0.05, 0.1である場合は次のようなベクトルになります。
 \displaystyle
P(c_n | c_0, c_1, \cdots, c_{n-1}) = [ 0.2, 0.05, 0.1, \cdots ]

この先RNN部分の実装に集中するために、先にこの部分のコードだけ書いてしまいましょう。コーパスを管理するためのCorpusクラスの一部として実装します。

import numpy as np

class Corpus:
    def __init__(self):
        self.vocabulary_size = 27

    # 1か所だけ1が立ったベクトルを返す
    def make_one_hot(self, char):
        index = self.vocabulary_size - 1 if char == " " else (ord(char) - ord("a"))
        value = np.zeros(self.vocabulary_size)
        value[index] = 1
        return value

    # テキスト中の全ての文字をone-hot表現のベクトルに変換する
    def text_to_matrix(self, text):
        data = np.array([self.make_one_hot(char) for char in text])
        return data.transpose()

corpus = Corpus()
result = corpus.text_to_matrix("abcd")
print(result)
# 転置を取っているので、1文字に対応するベクトルは縦向きになり、このような出力になります。
#[[ 1.  0.  0.  0.]
# [ 0.  1.  0.  0.]
# [ 0.  0.  1.  0.]
# [ 0.  0.  0.  1.]
# [ 0.  0.  0.  0.]
# (中略)
# [ 0.  0.  0.  0.]]

これで、英語(小文字)の文字列を27次元のベクトルの列に変換することができました。
今後使用する場面を考えて、変換語のベクトルの転置を取ってあります。

コーパスの準備

どのコーパスを使うか

今回は文字レベルの言語モデルですし、RNNの実装を経験してみることが目的なのでコーパスは正直何でもよいです。
記事中では、American National CorpusMASC (500K) – data onlyというコーパスの中の書き言葉のblogカテゴリの記事だけを使います。記事数が21、ファイルサイズは計161KBです。

コード

次に、コーパスを実際に入力に使えるテンソルに変換するコードを書きます。
先ほど使用した例だと、入力が"hell"で出力が"o"となるべきなので、入力データは4*27次元の行列のリスト、出力は27次元のベクトルのリストとなります。

さて、コーパスのテキストファイルが./corpusディレクトリ以下にあると仮定して、以下のコードを書きました。
記事の最初で説明した P(c_n | c_0, c_1, ..., c_{n-1}) nは、コード中ではchunk_sizeというパラメータで保持しています。

class Corpus:
    def __init__(self):
        # パラメータ
        self.chunk_size = 5
        self.vocabulary_size = 27 # data_size

        # ./corpusディレクトリの下のテキストファイルをすべて読み込んで一つのテキストにまとめる
        text = ""
        for filename in glob.glob("./corpus/*.txt"):
            with open(filename, "r", encoding="utf-8") as f:
                text += f.read() + " "

        # 簡単な前処理。a-zとスペースのみの状態にする。
        text = text.lower()
        text = text.replace("\n", " ")
        text = re.sub(r"[^a-z ]", "", text)
        text = re.sub(r"[ ]+", " ", text)

        # 文字列をone-hot表現のベクトルの列に変換する
        self.data_num = len(text) - self.chunk_size
        self.data = self.text_to_matrix(text)


    def prepare_data(self):
        """訓練データとテストデータを用意する。
        入力データと出力データはそれぞれ次のような次元になるべき。
        入力: (data_num, chunk_size, vocabulary_size)
        出力: (data_num, vocabulary_size)
        """

        # 入力と出力の次元テンソルを用意
        all_input = np.zeros([self.chunk_size, self.vocabulary_size, self.data_num])
        all_output = np.zeros([self.vocabulary_size, self.data_num])

        # 用意したテンソルに、コーパスのone-hot表現(self.data)からデータを埋めていく
        # i番目からi + chunk_size - 1番目までの文字が1組の入力となる
        for i in range(self.data_num):
            # このときの出力はi + chunk_size番目の文字
            all_output[:, i] = self.data[:, i + self.chunk_size]
            for j in range(self.chunk_size):
                all_input[j, :, i] = self.data[:, i + self.chunk_size - j - 1]

        # 後に使うデータ形式に合わせるために転置をとる
        all_input = all_input.transpose([2, 0, 1])
        all_output = all_output.transpose()

        # 訓練データ:テストデータを4:1に分割する
        training_num = self.data_num * 4 // 5
        return all_input[:training_num], all_output[:training_num], all_input[training_num:], all_output[training_num:]

これでようやく準備が整いました。
実際に動作するコードは、GitHubにあるcharmodel.pyのCorpusクラスを参照してください。

シンプルなRNNを組んでみる

では、TensorFlowでRNNを組んでいきます。以前TensorFlowを使用した時と同様に、inference, loss, trainingの3つのパートを順に定義していきます。

今回は、入力層と出力層がそれぞれ文字数(=27)次元で、その間の隠れ層の数をhidden_layer_sizeというパラメータで調整できるようにします。
RNNのユニットはLSTMやGRUなどいろいろ流行っているものがあるようですが、まずは一番基本的な形のものを使用します。図にするとこの記事にあるような形で、TensorFlowでいうとBasicRNNCellを使用します。

学習はパラメータbatch_sizeで決められたサイズでバッチ学習をするとし、RNNの展開するステップ数(つまり、何文字前まで見るか)という値はchunk_sizeというパラメータで持っておくことにします。

パラメータを定義

まず最初に、言語モデルのクラスとパラメータを定義しておきます。入力層と出力層の値以外は、とりあえず仮のものです。

class CharacterBasedLM:
    def __init__(self):
        self.input_layer_size = 27  # 入力層の数
        self.hidden_layer_size = 30 # 隠れ層のRNNユニットの数
        self.output_layer_size = 27 # 出力層の数
        self.batch_size = 128       # バッチサイズ
        self.chunk_size = 5         # 展開するシーケンスの数。この値が5の場合、c_0, c_1, ..., c_4を入力し、c_5の確率が出力される。
        self.learning_rate = 0.001  # 学習率
        self.epochs = 1000          # 学習するエポック数

inference

inferenceは、実際のネットワークの構成を記述する部分です。
隠れ層のユニットには、tf.nn.rnn_cell.BasicRNNCellという最も単純な形のものを使用します。

慣れてる方はそうでもないのかもしれませんが、私は各ステップでどの次元のテンソルが処理されているのかがわからなくなり、混乱することがよくありました。
なので、ソースコードにはどの次元のテンソルを扱っているのかできる限り書いてあります。

    def inference(self, input_data, initial_state):
        """
        input_data: (batch_size, chunk_size, input_layer_size = vocabulary_size) 次元のテンソル
        initial_state: (batch_size, hidden_layer_size) 次元の行列
        """
        # 重みとバイアスの初期化。
        hidden_w = tf.Variable(tf.truncated_normal([self.input_layer_size, self.hidden_layer_size], stddev=0.01))
        hidden_b = tf.Variable(tf.ones([self.hidden_layer_size]))
        output_w = tf.Variable(tf.truncated_normal([self.hidden_layer_size, self.output_layer_size], stddev=0.01))
        output_b = tf.Variable(tf.ones([self.output_layer_size]))

        # BasicRNNCellは(batch_size, hidden_layer_size)がchunk_sizeつながったリストを入力とします。
        # 現時点で入力データは(batch_size, chunk_size, input_layer_size)という3次元のテンソルなので、
        # tf.transposeやtf.reshapeなどを駆使してテンソルのサイズを調整してあげます。
        input_data = tf.transpose(input_data, [1, 0, 2]) # 転置。(chunk_size, batch_size, input_layer_size=vocabulary_size)
        input_data = tf.reshape(input_data, [-1, self.input_layer_size]) # 変形。(chunk_size * batch_size, input_layer_size)
        input_data = tf.matmul(input_data, hidden_w) + hidden_b # 重みWとバイアスBを適用。 (chunk_size, batch_size, hidden_layer_size)
        input_data = tf.split(0, self.chunk_size, input_data) # リストに分割。chunk_size * (batch_size, hidden_layer_size)

        # BasicRNNCellを定義して、先ほど準備した入力データを食わせます。
        cell = tf.nn.rnn_cell.BasicRNNCell(self.hidden_layer_size)
        outputs, states = tf.nn.rnn(cell, input_data, initial_state=initial_state)

        # 最後に隠れ層から出力層につながる重みとバイアスを処理して終了です。
        # 出力層はchunk_size個のベクトルを出力しますが、興味があるのは最後の1文字だけなので
        # outputs[-1] で最後の1文字だけを処理します。
        # 言語モデルなので出力層は確率で解釈したいのですが、softmax層はこの関数の外側で
        # 定義することにします。
        output = tf.matmul(outputs[-1], output_w) + output_b

        return output

loss

lossの部分で損失関数を定義します。今回はクロスエントロピーを使います。
softmax_cross_entropy_with_logitsを使うと、inference部分で定義した出力にsoftmaxを適用したうえでクロスエントロピーを計算してくれます。

    def loss(self, logits, actual_labels):
        cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits, actual_labels))
        return cost

training

training部分では最適化をどう行うかを定義します。今回はとりあえずAdamを使うことにします。

    def training(self, cost):
        optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate).minimize(cost)
        return optimizer

TensorBoard用にデータを保存しつつ学習させる

ここまで来たら、あとは実際にデータを流して学習させる部分を書くのみです。
以前TensorBoardで学習の過程を可視化する方法などを学んだので、TensorBoard用のログも保存しつつ学習させていきます。
以前使用していた時とはAPIの名前が変わったりしているようですが、基本的にやっていることは変わりません。
下記は バージョン0.12.0rc1でのコードです。

コメントを書いてあるのでここで細かい解説はしませんが、基本的に今まで用意した関数を使用して最適化や精度を求めるオペレーションを定義し、sess.run()を使ってオペレーションを走らせます。

    def train(self):
        # 変数等の用意
        input_data = tf.placeholder("float", [None, self.chunk_size, self.input_layer_size])
        actual_labels = tf.placeholder("float", [None, self.output_layer_size])
        initial_state = tf.placeholder("float", [None, self.hidden_layer_size])

        prediction = self.inference(input_data, initial_state)
        cost = self.loss(prediction, actual_labels)
        optimizer = self.training(cost)
        correct = tf.equal(tf.argmax(prediction, 1), tf.argmax(actual_labels, 1))
        accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))

        # TensorBoardで可視化するため、クロスエントロピーをサマリーに追加
        tf.summary.scalar("Cross entropy", cost)
        summary = tf.summary.merge_all()

        # 訓練・テストデータの用意
        corpus = Corpus()
        trX, trY, teX, teY = corpus.prepare_data()
        training_num = trX.shape[0]

        # ログを保存するためのディレクトリ
        timestamp = time.time()
        dirname = datetime.datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S")

        # ここから実際に学習を走らせる
        with tf.Session() as sess:
            sess.run(tf.global_variables_initializer())
            summary_writer = tf.summary.FileWriter("./log/" + dirname, sess.graph)

            # エポックを回す
            for epoch in range(self.epochs):
                step = 0
                epoch_loss = 0
                epoch_acc = 0

                # 訓練データをバッチサイズごとに分けて学習させる (=optimizerを走らせる)
                # エポックごとの損失関数の合計値や(訓練データに対する)精度も計算しておく
                while (step + 1) * self.batch_size < training_num:
                    start_idx = step * self.batch_size
                    end_idx = (step + 1) * self.batch_size

                    batch_xs = trX[start_idx:end_idx, :, :]
                    batch_ys = trY[start_idx:end_idx, :]

                    _, c, a = sess.run([optimizer, cost, accuracy], feed_dict={input_data: batch_xs, actual_labels: batch_ys, initial_state: np.zeros([self.batch_size, self.hidden_layer_size])})
                    epoch_loss += c
                    epoch_acc += a
                    step += 1

                # コンソールに損失関数の値や精度を出力しておく
                print("Epoch", epoch, "completed ouf of", self.epochs, "-- loss:", epoch_loss, " -- accuracy:", epoch_acc / step)

                # Epochが終わるごとにTensorBoard用に値を保存
                summary_str = sess.run(summary, feed_dict={input_data: trX, actual_labels: trY, initial_state: np.zeros([trX.shape[0], self.hidden_layer_size])})
                summary_writer.add_summary(summary_str, epoch)
                summary_writer.flush()

            # 学習したモデルも保存しておく
            saver = tf.train.Saver()
            saver.save(sess, "./data/wiki2.ckpt")

            # 最後にテストデータでの精度を計算して表示する
            a = sess.run(accuracy, feed_dict={input_data: teX, actual_labels: teY, initial_state: np.zeros([teX.shape[0], self.hidden_layer_size])})
            print("Accuracy on test:", a)

以上のコードを先述の学習データに対して走らせたところ、エポックを1000回まわすのにCPUのみで1時間半ほどかかりました。その際の結果はこんな感じです。

Epoch 0 completed ouf of 1000 -- loss: 2780.44679809  -- accuracy: 0.174677835052
Epoch 1 completed ouf of 1000 -- loss: 2607.54153371  -- accuracy: 0.220288337629
...
Epoch 998 completed ouf of 1000 -- loss: 1832.8586483  -- accuracy: 0.436807345361
Epoch 999 completed ouf of 1000 -- loss: 1833.50227726  -- accuracy: 0.436565721649
Accuracy on test: 0.395389

5文字を見て6文字目にどの文字かを予測するモデルなので、一切パラメータの調整をしてないRNNとしてはまあこんなもんなのではないでしょうか。

TensorBoardで学習過程を確認

次に損失関数の変化をTensorBoardを使って確認します。

tensorboard --logdir=./log

上記コマンドでtensorboardを起動後、http://localhost:6006にアクセスするとTensorBoardが表示されます。今回はクロスエントロピーの値を記録したので、学習が進むごとのクロスエントロピーの変化がこのように確認できました。

f:id:kanohk:20161218004819p:plain

100エポックほど学習した時点で、既に損失関数は安定していることが見て取れますね。
(にしても、なぜ縦軸の値がこんなに小さい。。?どっか間違ってますかね。。)

保存した学習済みモデルをロードして使用する

最後に、保存したモデルがちゃんとロードして使えるかも念のため確認しておきます。ある文字列(コンテキスト)が与えられたときに、学習したモデルを使って次にどの文字が来るかを予測する関数を書いてみます。

保存したモデルは saver = tf.train.Saver()で用意したsaverに対してsaver.restore("your model")という形で簡単に復元できます。
注意点としては、このsaver.restore()を呼び出した時点で定義されている変数のみが復元されるので、saver.restore()する前に復元したい変数は定義を済ませておく必要があります。

    def predict(self, context):
        """ あるコンテキストで次に来る文字の確率を予測する

        context: str, 予測したい文字の直前の文字列。chunk_size文字以上の長さが必要。
        """
        # 最初に復元したい変数をすべて定義してしまいます
        tf.reset_default_graph()
        input_data = tf.placeholder("float", [None, self.chunk_size, self.input_layer_size])
        initial_state = tf.placeholder("float", [None, self.hidden_layer_size])
        prediction = tf.nn.softmax(self.inference(input_data, initial_state))
        predicted_labels = tf.argmax(prediction, 1)

        # 入力データの作成。contextをone-hot表現に変換する
        x = np.zeros([1, self.chunk_size, self.input_layer_size])
        for i in range(self.chunk_size):
            char = context[len(context) - self.chunk_size + i]
            index = self.input_layer_size - 1 if char == " " else (ord(char) - ord("a"))
            x[0][i][index] = 1
        feed_dict = {
            input_data: x,# (1, chunk_size, vocabulary_size)
            initial_state: np.zeros([1, self.hidden_layer_size])
        }

        # tf.Session()を用意
        with tf.Session() as sess:
            # 保存したモデルをロードする。ロード前にすべての変数を用意しておく必要がある。
            saver = tf.train.Saver()
            saver.restore(sess, "./data/wiki2.ckpt")

            # ロードしたモデルを使って予測結果を計算
            u, v = sess.run([prediction, predicted_labels], feed_dict=feed_dict)

            # コンソールに文字ごとの確率を表示
            for i in range(27):
                c = "_" if i == 26 else chr(i + ord('a'))
                print(c, ":", u[0][i])

            print("Prediction:", context + ("_" if v[0] == 26 else chr(v[0] + ord('a'))))

        return u[0]

例えば、lm.predict("restauran")という風に関数を呼び出すと、次のような結果が返されます。

a : 0.00296767
b : 0.0260844
c : 0.042143
d : 0.057472
e : 0.00122766
f : 0.00602959
g : 0.0849243
h : 0.000131062
i : 0.00938033
j : 9.02491e-06
k : 0.000470502
l : 0.161962
m : 0.0193229
n : 0.14235
o : 0.00168979
p : 0.00562347
q : 0.000140272
r : 0.055243
s : 0.0590861
t : 0.184823
u : 0.00634102
v : 0.00238714
w : 0.00201063
x : 0.000272067
y : 0.00896958
z : 0.00129576
_ : 0.117644
Prediction: restaurant

各文字が次に来る確率と、最も確率の高い予測を最後に出力しています。今回のモデルはaccuracyが4割程度なのでうまくいかないことも結構あるのですが、この例のように当たることもあります。
文字レベルだとこれ以上精度を上げるのは難しそうですし、今回はRNNをとりあえず組んでみることが目的だったので、精度をここから上げるための努力は特にしないことにします。

まとめ

今回はTensorFlowを使って文字レベルの言語モデルを学習するRNNを実装しました。
TensorFlowには BasicRNNCell というRNNユニットが既に実装されたクラスが定義されており、それを利用すると簡単にRNNを実装することができます。

今回は文字レベルの言語モデルでしたが、単語レベルの言語モデルも基本的な考え方は変えずに実装することができるはずです。
次回以降は、単語レベルの言語モデルの実装、LSTMやGRUなどを使用したRNNに手を出してみる、などもう少し進んだことが何かできればなと考えています。

参考サイト

[1] Recurrent Neural Networks  |  TensorFlow
TensorFlow公式のチュートリアル(英語)です。日本語訳もググると出てきます。こことかこことかこことか。
LSTMを使った言語モデルの実装方法が開設されていますが、問題が大きいのと初めてRNNを実装するというような人には少し複雑でとっつきにくい印象です。

[2] Understanding LSTM Networks -- colah's blog
まずは、RNNとは何かについて。こちらの記事に詳しく解説されています。LSTMなどについてもわかりやすく説明されていて、RNN初心者の私には非常に役に立ちました。
これもに日本語訳があるようです。こことかこことか。

[3] The Unreasonable Effectiveness of Recurrent Neural Networks
RNN自体の説明から始まり、文字レベルでの言語モデルを実装されています。シェイクスピアやウィキペディア、さらにはLinuxのソースコード(!)など、様々なコーパスに対して学習をして、そのコーパスっぽい文章を生成する実験もされていて、おもしろいです。
これに関しても、元記事の一部を日本語に翻訳した記事が存在するようです。

[4] Python Programming Tutorials
英語ですが、TensorFlowを使ったRNNの組み方をサンプルコードやYouTubeの解説動画付きで詳しく説明してあります。
RNNはディープラーニングシリーズの第11弾なのですが、第3弾第4弾の内容を前提にしているので、そのあたりも先に確認しておいたほうがよいでしょう。

[5] Toy Neural Network and Recurrent Neural Network Regression with Tensorflow (1) – THE JAR
この記事もTensorFlowを使用して簡単なRNNを実装する方法を紹介しています。サンプルコードやサンプルデータも公開されており、実際に手元でサンプルコードをすぐ走らせて試すことができます。