たのしい Scalding 入門
TL; DR: git clone https://github.com/niw/scalding_examples.git
ざっとググった感じ、Scalding についてはまだあまり日本語情報なさそうなので、こう広めたりする目的も込めてちょっとまとめておこうと思います。
Scalding とは
Scalding とは、Scala に相当入れ込んでる Twitter で使われているライブラリで、Hadoop の MapReduce を Scala で簡単に書けるものです。中身は Cascading をラップしたものになっています。
Scala に相当入れ込んでる Twitter ではしかしながら Pig が頻繁に使われているのですがある方面では Scalding が使われておりまして、Pig と肩を並べられるくらいに使えて、さらに Scala の秘めた力も使えるんだぜ…! と、少なくとも Scalding の README には書かれております。
そんな Scalding は英単語の意味としては火傷っていう意味があって、下手に Scalding でググるとちょっと目を覆いたくなるような痛々しい画像がいっぱい表示されるので、Scalding でググる際は要注意です。
Scalding を使ってみる
本家 Scalding のページには、おなじみ WordCount のサンプルがどーんと載っていて、簡単そうに見えるのですが、さて実際にそれを動かすとなるとちょっと大変だったりします。
というのも、Scalding で MapReduce を書く部分はいいのですが、Scalding を Hadoop クラスタに送りつけるためにはうまい具合に jar を作らないとダメだったり、添付されているユーティリティスクリプト scald.rb
がかなり読みにくかったり (Ruby の会社だったのに) とかいろいろあってちょっと足踏みしちゃうんですね。
そこで、今回はいろいろ整理してさっぱりさせたこちらのレポジトリを使っていこうかと思います。
まずは動かす
以降、README.md
と同じ内容になるわけなのですが、多少コメント多めに。
まずは必要な道具を揃えます。ビルドツールに maven を使っているので入れておきます。あと zinc もオススメです。Intelli J IDEA もあると良いですね。
brew install maven
brew install zinc
ちなみに OS X 前提で書いてますが他の環境でも大差ないかなと思います (Windows は… ちょっとわかりません。)
レポジトリをチェックアウトしてビルドします。
git clone https://github.com/niw/scalding_examples.git
cd scalding_examples
mvn -Pzinc clean package
これで依存するモノすべてをダウンロードしてコンパイルしてテストして適切な jar を作ることができます。
ちなみに、使ってる pom.xml
は依存関係の中で CDH3(ちょっと古い) がデフォルトになってるので、使ってる Hadoop に合わせて hadoop-core
(や hadoop-client
) のバージョンを変えてください。そうしないといざ Hadoop にジョブを送りつけてもプロトコルちゃうねんとか言われてしまいます。
これで準備が整ったので何もしない MapReduce を実行してみましょう。
$ echo "Hello\tWorld" > input
$ src/main/scripts/run.sh --local EchoJob --in input --out output
...
$ cat output
Hello World
おお。何もしないですね。タブ区切りのファイルを読んでタブ区切りのファイルに書き出すというまったくもって Map も Reduce もしていない ジョブです。
class EchoJob(args: Args) extends Job(args) {
Tsv(args("in"), ('key, 'value))
.write(Tsv(args("out")))
}
src/main/scripts/run.sh
は必要最小限のユーティリティスクリプトになっています。--local
をつけるとローカルモードになって hadoop
コマンドとか使いませんが、作った jar を java
か hadoop
で実行しているだけです。
Hadoop クラスタで実行する
では、次に Hadoop クラスタに実際に MapReduce ジョブを送ってみます。
Hadoop の準備
その前に Hadoop の擬似クラスタとかがローカルやどこかで動いていないといけません。ない場合は次の手順でお手軽にインストールしておきます。
まず Hadoop と言っても Linux と同じでいろんなディストリビューションがあって、本家はバージョン番号が意味不明なことになっていたりしてまったくどれを使ったらいいのか謎いのですが、とりあえず Cloudera の CDH を使っておきましょう。RedHat の Linux みたいな感じ。
cd /usr/local
curl -O http://archive.cloudera.com/cdh/3/hadoop-0.20.2-cdh3u6.tar.gz
tar xzvf hadoop-0.20.2-cdh3u6.tar.gz
ln -s hadoop-0.20.2-cdh3u6 hadoop
この CDH3 は古いのですが、CHD 4.3 とか Hadoop 2.x を使うと設定の警告が出まくったりします。org.apache.conf.Configuration
の Deprecated 警告の実装が残念なことが原因なのですが、まあ枯れてるほうがトラブル少ないしいろいろググりやすいでしょ、ということで。
次に擬似クラスタ環境の設定をします。本来は複数台のホストで実行する環境なのですが、全部手元で動かしても動きます。
cd hadoop
mv conf conf.original
cp -Rp example-confs/conf.pseudo conf
基本添付されている設定ファイルで問題ないのですが、必要であれば hadoop.tmp.dir
と dfs.name.dir
を変えておきます。
$ mkdir -p /usr/local/hadoop/var/cache
$ cat | patch -p1
--- a/conf/core-site.xml
+++ b/conf/core-site.xml
@@ -11,3 +11,3 @@
<name>hadoop.tmp.dir</name>
- <value>/var/lib/hadoop-0.20/cache/${user.name}</value>
+ <value>/usr/local/hadoop/var/cache/${user.name}</value>
</property>
--- a/conf/hdfs-site.xml
+++ b/conf/hdfs-site.xml
@@ -25,3 +25,3 @@
<name>dfs.name.dir</name>
- <value>/var/lib/hadoop-0.20/cache/hadoop/dfs/name</value>
+ <value>/usr/local/hadoop/var/cache/hadoop/dfs/name</value>
</property>
/usr/local/hadoop/bin
を PATH
に追加して hadoop
コマンドを使えるようにしておきます。
export PATH=/usr/local/hadoop/bin:$PATH
で、HDFSをフォーマットして
hadoop namenode -format
namenode, datanode, jobtracker と tasktracker を起動します。全部デーモンなんですが、Procfile
つくって foreman
とかで起動してやるとログが見えてデバッグが楽です。
hadoop namenode
hadoop datanode
hadoop jobtracker
hadoop tasktracker
これらはウェブインターフェイスも提供してくれるので、Hadoop ジョブの状態は http://localhost:50030/ とかで見られます。
Hadoop クラスタで動かしてみる
さて、おなじみの WordCount いってみましょう。
$ cat > input
meow nyan purr
purr nyan nyan
$ hadoop fs -put input input
$ src/main/scripts/run.sh WordCountJob --in input --out output
...
$ hadoop fs -cat 'output/part-*'
meow 1
nyan 3
purr 2
ちゃんと動きましたね。このジョブは行から単語を抜き出すところが通常の Scala のコードとなっていて、 Scalding の良さを実感できるようになっています。
// --in オプションのパスをテキストファイルとして読んで
// 各行のテキストは line フィールドに入るので → [offset, line]
TextLine(args("in"))
// tokenize() で word フィールドに展開して → [offset, line, word]
.flatMap('line -> 'word) { line: String => tokenize(line) }
// 同じ word でグループ化してその数を数えて → [word, size]
.groupBy('word) { _.size }
// --out オプションのパスにタブ区切りで書きだす
.write(Tsv(args("out")))
// ここは普通の Scala
def tokenize(text: String): Array[String] =
text.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+")
次のステップ
ざっと最初の一歩は眺められたかなと思うのでつぎはいろいろなジョブを書いて遊んでみると良いかなと思います。Intelli J IDEA で Scala プラグインを入れて pom.xml
を開けばそのまま IDE な環境で楽にできます。
idea pom.xml
src/main/scala/Main.scala
は Scalding のジョブのブートストラップの部分を簡略したもので、通常 com.twitter.scalding.Tool
オブジェクトの main()
からジョブを開始させますが、自分で書くこともできます。
ジョブのテストは src/test/scala/WordCountJobSpec.scala
などを参考に使ってください。
Spec2 を使う際に in { ... }
の中にジョブを書くと困ったことになるので、should { ... }
の中に書いて in { ... }
はそれぞれの sink ごとに書くのが良いです。
その他、本家の Wiki ページにはかなりの情報がまとまっていますので参考にしましょう。
同じ MapReduce のジョブを Ruby(Hadoop Streaming), Pig, Hive, Scalding などで書いた Rosetta Code も便利です。
というわけで、Scalding を使うと簡単に Scala で MapReduce ジョブが書けることがわかりました。
また Scalding は、ぱっと見わかりやすいように DSL 的なものを Scala の implicit とかの魔術を使って作ってあります。ですので一度なにかよくわからないことが起こるとまったくなにが起こってるのかわからないという素敵な側面もあって楽しそうです。 今後も活用していきたいなあと思います。