今日も窓辺でプログラム

外資系企業勤めのエンジニアが勉強した内容をまとめておくブログ

Scala + SparkでDatasetを使ってTFIDFを計算する

はじめに

前回の記事ではSparkをローカルで試せる環境を用意しました。

Windows Subsystem for Linux(WSL)でSpark環境を構築してみる - 今日も窓辺でプログラム

今回は日本語のテキストファイルに含まれる単語数をカウントする処理をSpark上で行ってみたいと思います。 SparkもScalaもGradleも初めてなので、もしかしたら怪しいことを言っている箇所もあるかもしれません。。

環境は前回に引き続き、Windows Subsystem for Linux上のUbuntu 16.04で行っています。

目次

事前準備

ScalaとGradleのインストール

SparkはPython等もサポートしていますが、Scalaで書かれているそうなのでScalaを使ってみようと思います。 下記コマンドでScalaとGradleというビルドツールをインストールしました。

sudo apt-get install scala

sudo add-apt-repository ppa:cwchien/gradle
sudo apt-get update
sudo apt-get upgrade gradle

add-apt-repositoryをしないと、古いバージョンのGradleしかインストールされないので注意が必要です。

Ubuntu に最新のgradleをインストールする

Gradleでビルド環境用意

まずはGradleでビルドできる環境を用意してしまいます。

cd <project root>
gradle init

というコマンドを実行すると、自動でビルド定義やシェルスクリプトなどのファイルが生成されます。 生成されたbuild.gradleを以下のように変更します。

plugins {
    id 'scala'
    id 'application'
    id 'com.github.johnrengelman.shadow' version '2.0.4'
}

mainClassName = 'WordCount'

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.apache.spark:spark-core_2.11:2.3.1'
    compile 'org.apache.spark:spark-sql_2.11:2.3.1'
    compile 'org.apache.spark:spark-mllib_2.11:2.3.1'
    compile 'org.scala-lang:scala-library:2.11.8'
    compile 'com.atilika.kuromoji:kuromoji-ipadic:0.9.0'
}

shadowJar {
  zip64 true
}

設定ファイルの詳細説明は省きますが、

  • 後ほど必要になるので、Sparkのライブラリ, scala, 日本語形態素解析kuromojiをdependenciesに指定しておく
  • Sparkに送信するjarを作成するためshadowJarというプラグインを使用する
  • WordCount というクラスがメインクラス

あたりが主な設定内容です。

テスト用の日本語テキストの準備

単語数を数える対象となるテキストファイルをいくつか用意します。今回は勉強のためなのでデータは何でもよかったので、青空文庫のデータを使用することにしました。

テキストファイルにはルビがふってあるものが多く、個人的に気になったのでGitHubからルビのないテキストファイル2つを選んでダウンロード、/src/main/resources/textfiles に保存しました。

ファイルを解凍した直後はShift-JISでエンコードされていたので、UTF8への変換も行いました。

GitHubでルビのないテキスト検索に使用したクエリはこちら↓

https://github.com/aozorabunko/aozorabunko/search?q=filename%3A*.zip+-filename%3Aruby

単語カウントとTFIDFの計算

ではさっそくWordCountというクラスを実装してしまいます。

Sparkの公式サイトにTFIDFを計算するためのサンプルが載っているので、これをもとに実装してみます。 上記ドキュメントではDatasetを使うことが推奨されているようなので、頑張ってDataset APIを使って実装してみました。

単語分割用関数

サンプルは英語なのでスペースで文字列を区切っていますが、日本語だとそうはいきません。Kuromojiを使って単語分割する関数を用意しておきます。

引数には1つのドキュメントが文字列として入ってくる想定で、戻り値はList[String]としておきます。

  def tokenize(text: String) : List[String] = {
    val tokenizer = new Tokenizer()
    val sentences = text.split('\n')
    sentences.map(s => tokenizer.tokenize(s).asScala)
             .flatMap(tokens => tokens.map(t => t.getSurface))
             .toList
  }

Datasetにテキストファイルを読み込む

SparkSessionのread.textFileを使ってディレクトリを指定すると、そのディレクトリにあるテキストファイルをすべて読み込んでくれます。

デフォルトのオプションだとテキストファイルの1行がDatasetの1行に対応する形になるのですが、下記のようにwholetextオプションにtrueを指定するとテキストファイル1つがDatasetの1行に対応するので、TFIDFを計算する上では都合がいいように思います。

idカラムはあると後々の処理に便利だったので一応つけておきました。

val textDirectory = "./src/main/resources/textfiles/"
val spark = SparkSession.builder.appName("WordCount").getOrCreate()
val text = spark.read.option("wholetext", true).textFile(textDirectory)
                .withColumn("id", monotonically_increasing_id).cache

この時点で、textにはidとvalueという2つのカラムがあり、列の数はテキストファイルの個数分のDatasetが入っていることになります。

単語分割

事前に定義しておいたtokenize関数をDatasetの各行に適用していきます。単純にmapするだけです。

import spark.implicits._
val words = text.map(row => DocumentWithId(row.getAs[Long]("id"), tokenize(row.getAs("value"))))

1点注意が必要なのは、1行目のimport spark.implicits._です。 Datastoreに値を格納するには、型にあったエンコーダを定義しておく必要があるようで、プリミティブ型については上記importを書いておくと用意されているエンコーダが参照できるようになるみたいです。

TFIDFの計算

ここまでこればあとはサンプル通りにコードを書けばOKです。HashingTFとIDFというモジュールを使います。

// tf
val hashingTF = new HashingTF().setInputCol("value").setOutputCol("tf")
val wordFrequencies = hashingTF.transform(words)

// idf
val idf = new IDF().setInputCol("tf").setOutputCol("tfidf")
val idfModel = idf.fit(wordFrequencies)
val rescaledData = idfModel.transform(wordFrequencies)

これでtfidfというカラムにtfidfの値を格納したSparseVector (=org.apache.spark.ml.linalg.SparseVector) が入っている状態になりました。 試しに中を見てみると、このような形になっていました。(手元では2つのテキストファイルでテストしたので、行の数は2個になっています。)

rescaledData.select("id", "tfidf").show
+----------+--------------------+
|        id|               tfidf|
+----------+--------------------+
|         0|(262144,[152,386,...|
|8589934592|(262144,[1,83,581...|
+----------+--------------------+

showだとSparseVectorが途切れていますが、中をちゃんと見るコードを書いてみるとしっかり値が入っていることも確認できます。

文書間の類似度の計算等を行いたい場合は、このSparseVectorをもとに計算をすることになると思うのですが、TFIDF自体は計算できたので今回はいったんここで区切りたいと思います。

ソースコード

今回使用したソースコードはGitHubにあげてあります。
https://github.com/kanoh-k/blog/tree/17c4a0f11b82e2f277b4a3e45d7d23107df2bc8b

Scalaのコードは↓
https://github.com/kanoh-k/blog/blob/17c4a0f11b82e2f277b4a3e45d7d23107df2bc8b/spark/src/main/scala/WordCount.scala

build.gradleは↓になります。
https://github.com/kanoh-k/blog/blob/17c4a0f11b82e2f277b4a3e45d7d23107df2bc8b/spark/build.gradle