Hatena::Grouprekken

murawaki の雑記

2010-06-10

gxp for NLPers

gxp は並列処理用のツール。これを使って NLP で並列処理を行ってきた。その話をまとめてみるテスト

gxp については、既にNLP2010 のチュートリアルで開発者自身による解説があった。ただ、NLP の人間との間に問題意識にずれを感じた。これについては簡単な報告が既にある。

gxp を使い始めた理由。研究室が既に使っていたから。今でも使っている理由。それなりに使えているから。いろいろ泥臭いことをやりつつ。

議論には比較対象があるとやりやすい。今なら MapReduceHadoop。しかし Hadoop を使ったことはない。象本は積ん読状態。Google の元論文を研究室で紹介したことはあるが、もう3年も前。聞きかじった情報だけで比較してみる。

gxp がやること

処理を行うための計算機が複数あるとき、gxp はまずそれらを制御下に置く。具体的には、各計算機にデーモンを走らせる。デーモン同士はネットワークでつながっている。全体で木を成す。

利用者は、制御下に入った計算機に対して、gxp を介して手元の端末から命令 (シェルコマンド) を出す。命令はデーモン間でリレーされる。デーモンは自分が実行すべき命令を実行する。命令の種類はいろいろ。一斉に同じ命令を実行したり、タスクの一覧を与えて、それらをスケジューラが適当にばらまいたり。

gxp の利点

ほとんどの環境で動く。ヘテロな環境でも動く。しかも簡単にインストールできる。

ほとんどの環境で動く。gxpPython で書かれている。処理に使いたい各計算機で Python が動けばよい。UNIX を想定してコーディングされているけど、Mac でも動かせそう。多分。

簡単にインストールできる。手元の計算機ひとつだけに gxpインストールすればよい。gxp は他の計算機を獲得する (制御下に置く) 際に自身をコピーする。だからあらかじめ各計算機に gxp をばらまく必要がない。そこに計算機があって、ネットワーク越しにアクセスできるなら、もう gxp が使える。

ヘテロな環境でも動く。デーモンネットワークを作るには、ある計算機から別の計算機へアクセスできる必要がある。gxp ではこのネットワーク接続の部分が抽象化されている。普通の計算機環境なら、sshログインする。他の接続方法も実装されている。

大型計算機にも対応している。大型計算機では、通常バッチキューイングシステムを使わないと処理を実行できない。どうやっているかというと、gxp が普通にキューにジョブを登録する。普通は本当に計算したい処理をキューに登録するところだが、gxp が登録するジョブは、デーモンを立ち上げるためのスクリプト。しばらく待って大型計算機のスケジューラによってジョブが適当な計算機に割り当てられる。すると、デーモンが立ち上がって親計算機と接続する。つまり、単にデーモンが待機しているだけで、本当に実行したいタスクはまだ走っていない状態になる。大型計算機から見るとジョブが走っているように見える。

複数の接続方法を組み合わせることもできる。自分のクラスタでは ssh で接続して、同時にバッチキューイングシステムを通して大型計算機を使うこともできる。ただし、計算ができるというだけで、効率よく計算できるかはまた別の話。

この利点を生かせる条件。自分で Hadoop のようなミドルウェアを好きにインストールできない環境。あるいはそうした計算機も使いたい場合。または、そうした準備がめんどくさいので手っ取り早く始めたい場合。最近は自前でクラスタを持っている研究室が少なくないので、このありがたみは薄れているかもしれない。

なお、ほとんどの環境で動くといったものの、Pythonライブラリバージョンアップで頻繁に変更される影響で、warning が出たり、コードの手直しが必要なことが時々ある。

ep

2012年5月1日追記: 元々開発者推奨ではなかった ep ですが、最新版では使えなくなっています。古いバージョンを取ってくると使えます。2010年頃。

ep の代替機能は js が提供しています。

gxpc js -a work_file=tasks

と実行します。tasks は、実行したいコマンドリストを書いたファイルです。ただしフォーマットが変わっています。ep 版では「ID command」とコマンドの前に ID を書きますが、js 版ではコマンドだけです。つまり (人手で付与した) ID によるタスクの管理が廃止されています。そのため、途中までしか終わっていないタスクを再開するのが面倒になりました。ep 版では実行結果のステータスが status ファイルに吐かれており、ep が起動時にこれを参照するので、実行済みのタスクは再実行されませんでした。js ではそうした機能がないので、tasks ファイルから実行済みのタスクを取り除く必要があります。このように、不便になったので、最近はもっぱら gxp make を使っています。

gxp を使うと、もともと並列処理用に記述していないプログラムを並列処理に転用しやすい。対する MapReduceプログラミングパラダイム。基本的には mapper と reducer を新たに書くことになるはず。シェルコマンドも実行できるらしいけど。

NLP でよくあるのは、データの加工。データは分割できて、その断片間には依存関係がないので簡単に並列化できるという設定。依存関係が複雑な場合は MPI でも使ってください。

まず小さなデータで試して成功したら、次は巨大なデータで試すという流れ。

具体例。テキスト中の単語を数えてハッシュに保存するプログラムを考える。

count_word --input input.txt --output output.db

これを並列化する。データが 00000 から 09999 まで1万個あるとする。最終的に欲しいのは、これら全体から得られる頻度データベース

まず、count_word を1万回実行する。パラメータだけを変えて。1万個のタスクは互いに独立なので並列に実行できる。これを実現する gxp の機能が epep は embarrassingly parallel から来てるらしい。

タスク一覧を書いてスケジューラに渡すと、スケジューラが空いている計算機に適当にタスクを割り振る。タスクの一覧は以下のように記述。

00000 count_word --input 00000.txt --output 00000.db
00001 count_word --input 00001.txt --output 00001.db
....
09999 count_word --input 09999.txt --output 09999.db

1行1タスク。行の最初はタスクのID。既存のプログラムが使い回せている。

次に、得られたデータベースをマージする。ここでデータベースをマージするプログラムを新たに書かなければならない。仕方がないと割り切る。

merge_db --start 00000 --end 09999 --output all.db

1万個を順番にマージしていては遅い場合がある。この場合、例えば、2段階のマージを行う。1段階目のマージは、ep で並列実行。以下のようにタスク一覧を記述。

00 merge_db --start 00000 --end 00999 --output 00.db
01 merge_db --start 01000 --end 01999 --output 01.db
....
09 merge_db --start 09000 --end 09999 --output 09.db

次に2段階目のマージを行い最終結果を得る。

merge_db --start 00 --end 09 --output all.db

改めて MapReduce との比較。count_word は map を行い、ローカルに reduce を行っている。処理の独立性が高いので map を書くのは簡単。面倒なのは map された結果を shuffle して、reduce する部分。shuffle を利用者から隠蔽したのは MapReduce の良いところ。gxpep は、この部分の面倒を見てくれない。仕方がないので自分で泥臭い処理を書くことになる。今回は最終出力を一つのデータベースとしたが、例えば、これが巨大すぎるので分割して持たなければならないとなると、一気に処理が面倒になる。

ep はそこそこスケールする。10,000個のタスクを500並列ぐらいで実行するなら問題なく動く。IO の問題を別にすれば (後述)。スケジューラがボトルネックになるとすれば、1秒で終わるタスクが山程あるばあい。数分、あるいは数時間かかるタスクであれば問題ない。

gxp make

gxp で並列処理をする場合、昔は ep しかなかったが、最近は gxp make というものが開発されている。現在の開発者のおすすめは gxp make。epチュートリアルにも載っていない。しかし、このあたり、並列分散処理の研究者自然言語処理研究者が同床異夢な部分ではないかと個人的に思っている。

gxp make とは何か。make を透過的に並列化してくれる。普段 make コマンドをたたいて実行するところを gxpc make -j とするだけで、途中の処理を各計算機に割り振って並列に処理してくれる。

make を使う利点。複雑な処理の依存関係を記述できる。入力を処理して、その出力をまた別の処理にかけて、その結果が分岐して、処理Aと処理Bに別々にかけるといった感じ。こうした依存関係を Makefile に記述しておけば、gxp make が自動的に最後まで処理してくれる。

しかし、この機能は、並列分散処理の人が思っているほど NLPer にとって重要ではないのではないか。多分、並列分散処理の人は、NLP を含む「アプリケーション」の人から完成された処理内容をもらって、それをどう効率よく実行するか研究しているのだと思う。一方、「アプリケーション」の人は、処理内容を作ること自体が研究。研究段階では普通は workflow は完成していない。各段階で出力を人手で確認しては、プログラムを修正するといったことを繰り返す。つまり workflow を一気に処理する必要がない。逆に、一気に flow を処理できる段階になったら研究としては終わっている。この完成段階で並列計算に要求があるとすれば効率。しかし make の利点は簡単に記述できることであって、効率面では期待できない。

gxp make の問題。一つはファイルシステムの問題。これは後述。もう一つはスケールしないこと。少なくとも昔聞いた話では、タスクごとに手元の計算機に対応するプロセスが作られるから、そこがボトルネックになる。10,000個のタスクはとても捌けないということだった。

私は大規模処理には相変わらず ep を使っている。理由はいろいろ。一つは、タスクの数が固定だから。1億ページのコーパスがあって、これを1万ページのブロック1万個に分割して管理している。このコーパスから様々なデータを抽出している。1万個のタスクを処理する必要があるとわかっているのだから、タスクファイルを記述するのが自然。

gxp make は小規模な処理に使っている。並列数10ぐらい。とても大規模とは言えない。gxp make が便利なのは、タスクの数が不定の場合。上のコーパスの例ではタスクの数が10,000と決まっていた。そうではない場合。make なのでファイルの suffix を元にデータを管理する。ある特定の suffix のファイルを追加したら、その差分だけ再実行してほしいといった場合。make が自然に処理してくれる。

別の use case。評価実験の実行に使っている。何のことはない普通の Makefile。workflow は以下の通り。

  1. データを訓練データをテストデータに分割。
  2. 訓練データから学習してモデルファイルを出力。これを複数のアルゴリズムの学習器について実行。
  3. 各モデルファイルをテストデータにかけて結果をファイルに出力。

ここで、例えば、分類器として Passive-Aggressive を使っていたが、新たに Confidence Weighted を実装したとする。前者を suffix .pa、後者を .cw で管理するとする。Makefile に .pa と同様の依存関係を .cw について追加して make すると、Confidence Weighted の部分だけが実行される。これぐらいの規模だと、逐次実行しても問題ないが、締め切り前で急いでいるときに並列に実行したりする。

2012年5月1日追記: gxp make については、構造化パーセプトロンの並列学習の記事でも触れた。

2012年5月1日追記: make の欠点はデバッグが難しいこと。期待通りに動かなかったときに、何が原因か解明するのに時間がかかってしかたがない。良いデバッグ方法を知っていたら教えてほしい。

データ管理の問題

NLP における並列処理はデータの管理につきる。計算は単純に並列化すればいい。ボトルネックとなるのは IO。

例えば NFS で共有されているディスクに一斉にアクセスすると、数十並列で破綻する。ではどうしようかということになるが、gxp は基本的には IO の面倒は見てくれない。gxp make がスケールしないもう一つの理由は、計算機間で NFS でファイルが共有されている状態を想定しているから。とりあえず、ここから先は ep のみを考える。

問題の切り分け。タスクによって、入力が大きく、出力が小さい場合と、入力も出力も巨大な場合がある。例えば、単語のカウントだと、出力は入力に比べて圧倒的に小さい。テキストを構文解析する場合、出力は膨れ上がる。

出力が小さければ、一カ所に吐いても問題ないことが多い。もっといえば、スループットが許容範囲内かが問題。どう実行するかを決めるのは計算機のご機嫌。あと、ちびちびデータを転送していたら効率が悪そうなので、各計算機でローカルに出力をキャッシュしておいて、処理の最後に一気に出力する。ちゃんと速度を比較したことはないけど。

出力は共有ディスクに吐くとして、入力はどうするか。いろいろ試行錯誤した。初期の方法。計算機のローカルディスクの同じパスに同じファイルをばらまく。例えば、各ファイルの複製を3個作るとする。タスク記述に制限を加えて、複製を持つ計算機だけが該当タスクを実行するようにする。

00000 :on (hostA00|hostB01|hostC02); count_word --input /local/00000.txt --output /shared/00000.db
00001 :on (hostA01|hostB02|hostC03); count_word --input /local/00001.txt --output /shared/00001.db
....
09999 :on (hostA09|hostB03|hostC04); count_word --input /local/09999.txt --output /shared/09999.db

on は正規表現にマッチするホストだけで実行できることを記述する。00000.txt は hostA00 と hostB01 と hostC02 のローカルディスクに置いてある。

この方法は欠点が多い。まず、データを自分でばらまいて、どこに何があるのか把握しておくのは面倒。時間もかかる。同じデータを何度も使いまわすのでなければやってられない。次に、一部の計算機がダウンすると実行バランスが崩れる。それに、処理の最終局面で効率が悪い。処理できるデータがなくて遊んでいる計算機が増えてくる。

最後の問題へのやっつけ対策。sshfs を利用。入力は相変わらず各計算機にばらまいてあるが、スケジューリングで実行ホストを制限しない。タスクを割り当てられた計算機がデータを持っていればそれでよし。なければ持っている他の計算機に sshfs でアクセス。当然ローカルディスクよりも遥かに遅い。しかし、タスク一覧の最後の方を sshfs 方式にしておくと、最終局面で遊んでいる計算機が減る。しかし面倒。それに、大型計算機センターだと、計算ノードから外側にアクセスできないことが多くて、仮に FUSE をサポートしていても sshfs が使えないはず。

最近の対処法。gfarm を利用。gfarm分散ファイルシステム。複数の計算機のディスクを仮想的に一つのファイルシステムとして見せる。GoogleFS と違って POSIXAPI を一通り実装している。普通に mount できる。

gfarm を使うと、計算機のローカルディスクにファイルをばらまくという処理をこのミドルウェアに任せられる。もっとも、gfarm は自動で複製を作ってくれない。2012年5月1日追記: 新しめの gfarm ではマウント時に gfarm2fs -o ncopy=3 /mount/point と指定すると、ファイル作成時に自動的に複製される。複製を作るコマンドは gfrep。これを自分でたたく。計算機が死にまくると、アクセスできる複製数が0になることがある。そうならないように定期的にチェックして、やばかったら複製を増やす必要がある。

gfarm によって入力データが計算機間で適当にばらまかれたとする。epgfarm 上に一斉にアクセスしても、実際のデータへのアクセスはばらける。入力だけでなく、大きめの出力も gfarm 上に吐けばいい。吐いたら gfrep で複製。

しかし、実際にやってみると ep 起動直後に実行されるタスクでファイルの open か read に失敗する例がちらほら。どこかで TCP の接続の上限とかに引っかかっているような気がする。要検証。

やったことはないが、gfarm を使えば、gxp make もスケールするのではないか。上述のプロセス数の上限までは。

問題は効率。ep のスケジューラが gfarm と連動していない。効率を上げるには、タスクを実行する計算機がデータを持つ計算機と同じか、ネットワーク的に近いべき。現状は、そんなことを考慮せず、適当に実行する計算機を決めている。

解決策。資源の近さを考慮したスケジューラを書けばよい。ファイルの実体の所在は gfwhere コマンドでわかる。それを使えばよい。しかし、実装はそんなに簡単ではない。そもそもここまでくると私の仕事ではない。Hadoop ならこの部分を自動でやってくれるはず。どれぐらいうまくやってくれるのか知らないが。