GroovyでApache Sparkアプリケーションを作る #gadvent

このエントリは G*Advent Calendar(Groovy,Grails,Gradle,Spock...) Advent Calendar 2014 - Qiita の12/20担当分です。

Apache Sparkとは?

Hadoopエコシステムにおける次世代の分散処理基盤として注目されています。インメモリ処理とDAGによるタスクスケジューリングを特徴とし、分散処理に必要な耐障害性を備えています。また、RDDという共通のプログラミングモデルの上で機械学習やストリーミング処理が統一的に扱えるため、複雑なビッグデータ処理を実装するのに有利です。
概要をつかむにはこのへんの資料がよいかと思います。

Groovyから使ってみようと思った動機

公式サイト Apache Spark™ - Lightning-Fast Cluster Computing を見ていただくとわかりますが、Spark自体はScalaで開発されており、アプリケーション開発は Scala / Java / Python で行えるようにAPIが提供されています。(機能の実装状況には差異があり、例えばPythonではSpark1.2でやっとStreamingがサポートされたり、と言った状況です。)
せっかくJavaAPIがあるのでぜひ使いたいところなのですが、Scala/Pythonに比べて以下のようなビハインドがあります。

  • ラムダが使えないのでコードが冗長(Java8を使えばマシにはなりますが、ClouderaがJava8をサポートするまで自分は使えません・・)
  • 対話型シェル(REPL)がない

JavaAPIをGroovyから使うことでこれらの課題に対処できないか試行錯誤していますので、まだ道半ばではありますが現在までの状況をまとめておきます。

コードの簡略化

いくらか成果が上がったので、本エントリで記載します。ただし、Sparkの挙動に起因する癖がありますので注意が必要です。

REPL(Groovy Shell)

こちらはSparkの挙動に起因する癖によって壁にぶつかりました。現時点では解決策が見つかっていません。
具体的にはタスク実行時に以下の例外が発生します。

ERROR org.apache.spark.SparkException:
Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable (ClosureCleaner.scala:166)

GroovyでSpark Wordcountを実行してみる

Quick Start - Spark 1.2.0 Documentation にあるWord CountのJavaサンプルをGroovyで書き換えてみました。
プロジェクト全体はnobusue/groovy-spark-sample · GitHubにあげてあります。

Sparkアプリケーション本体

テキストファイルを読み込んで、指定した単語を含む行数をカウントするだけの簡単なサンプルです。適当なテキストファイルを用意して、「sc.textFile("YOUR_TEXT_FILE_PATH")」のところを置き換えてください。

import org.apache.spark.*
import org.apache.spark.api.java.*
import org.apache.spark.api.java.function.*

public class SparkGroovySample {

  public static void main(String[] args) {

    def conf = new SparkConf().setMaster("local[2]").setAppName("WordCount")
    def sc   = new JavaSparkContext(conf)
    def file = sc.textFile("YOUR_TEXT_FILE_PATH").cache()

    def filterFunc = new Function<String,Boolean>() {
      public Boolean call(String s) {
        return s.contains('spark')
    }}

    def filterFunc2 = { it.contains('hadoop') } as Function

    def countsOfSpark = file.filter(filterFunc).count()
    def countsOfHadoop = file.filter(filterFunc2).count()

    println "Count of Spark:${countsOfSpark}, Count of Hadoop:${countsOfHadoop}"
  }
}

例えばSparkのREADME.mdに対して上記を実行すると、

Count of Spark:8, Count of Hadoop:10

みたいになるはずです。
フィルタ関数の定義は、SparkのJava APIでは以下のようにFunctionインターフェースを実装する必要があります。

    def filterFunc = new Function<String,Boolean>() {
      public Boolean call(String s) {
        return s.contains('spark')
    }}

Groovyの場合はクロージャから変換することで多少楽ができます。

    def filterFunc2 = { it.contains('hadoop') } as Function

注意点

コードを眺めているだけでは分かりづらいのですが、sc.textFile()で読み込んだファイルはRDD(JavaRDD)というオブジェクトに格納されています。
RDDに対する操作はワーカーノードで分散処理されるため、filter()などの操作で利用するオブジェクトはすべてシリアライズしてリモートに送信できなければいけません。上記サンプルをGroovyスクリプトではなくGroovyクラスとして実装しているのはこの条件を満たすためです。(実際に試してみると、Groovyスクリプトとして実装した場合にはfilterFunc()がシリアライザブルでないと判断されて例外が発生します。)

おまけ:ログ設定

SparkはLog4jを使っており、デフォルトではSpark自体の動作に関するログがINFOレベルで大量に出力されます。普段は必要ないので、以下のようにして消しておきましょう。

# Set everything to be logged to the console
#log4j.rootCategory=INFO, console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

まとめ

素直にScalaPythonを勉強した方が楽かもしれませんが、それぞれ一長一短があるのでJava(Groovy)で使う方法も引き続き試行錯誤していきます。