Docker Remote APIを使ってみる #apijp

このエントリは「Web API Advent Calendar 2014」の12/3担当です。次は「YosAwed」さんです。

最近話題のDockerですが、みなさんもう使ってますよね?

Dockerはクライアント-サーバーアーキテクチャを採用しており、DockerクライアントとDockerサーバーがRemote API経由で接続されています。つまり、「docker ps」などのコマンドはすべてサーバーに送られて処理されているということです。
このRemote API、実は(ほぼ)RESTになっていて、Dockerクライアント以外からでも利用することが可能です。そこで、Remote APIを直接使う方法についてご紹介したいと思います。

テストした環境

Mac OS X(Yosemite)のBoot2Dockerを利用しました。Linux環境でもDockerサーバーの設定だけ追加すれば大丈夫なはずです。

$ docker version
Client version: 1.3.2
Client API version: 1.15
Go version (client): go1.3.3
Git commit (client): 39fa2fa
OS/Arch (client): darwin/amd64
Server version: 1.3.2
Server API version: 1.15
Go version (server): go1.3.3
Git commit (server): 39fa2fa

$ boot2docker version
Boot2Docker-cli version: v1.3.2
Git commit: e41a9ae

Docker Remote APIを使うための設定

LinuxのDockerサーバーは、デフォルトではUnixドメインソケット(unix:///var/run/docker.sock)をリスンするように設定されています。
このままではリモートからTCP接続できないので、Dockerサーバーの起動時オプションに '-H tcp://0.0.0.0:2376' のようにしてTCPのリスンポートを追加する必要があります。

設定方法としては、dockerコマンドの起動時オプションで指定するか、設定ファイルに追記するなど、いくつかやり方があります。
例えばUbuntu14.04では /etc/init/docker.conf に

DOCKER_OPTS=' -H tcp://0.0.0.0:2376 -H unix:///var/run/docker.sock'

を設定し、

$ sudo service docker restart

で反映します。
詳しくはDockerの公式ドキュメントを参照してください。
https://docs.docker.com/articles/basics/#bind-docker-to-another-hostport-or-a-unix-socket

Boot2Dockerの場合

Boot2Dockerの場合はもともとTCPで接続するようになっているので、特に設定は必要ありません。
DockerサーバーのTCP接続先は「boot2docker shellinit」で確認できます。

$ boot2docker shellinit
Writing /Users/nobusue/.boot2docker/certs/boot2docker-vm/ca.pem
Writing /Users/nobusue/.boot2docker/certs/boot2docker-vm/cert.pem
Writing /Users/nobusue/.boot2docker/certs/boot2docker-vm/key.pem
    export DOCKER_CERT_PATH=/Users/nobusue/.boot2docker/certs/boot2docker-vm
    export DOCKER_TLS_VERIFY=1
    export DOCKER_HOST=tcp://192.168.59.103:2376

TCP接続先はBoot2DockerのVMを再起動すると変わることがあるので、以下のコマンドで環境変数を毎回設定しましょう。

$ $(boot2docker shellinit 2>/dev/null)

また、Boot2Docker VMIPアドレスを毎回手打ちするのは面倒なので、以下のコマンドでhostsに登録しておきましょう。(Mac限定)

$ sudo sed -i -e '/dockerhost/d' /etc/hosts
$ echo $(boot2docker ip 2>/dev/null) dockerhost | sudo tee -a /etc/hosts

これでBoot2Docker VMを「dockerhost」で参照できます。
以降はこの設定を前提として記載しますので、未設定の方は適宜IPアドレスに読み替えてください。

Docker Remote APIでイメージ一覧を取得する

当然ですが、事前にDockerサーバーを起動しておいてください。(Boot2Dockerの方は boot2docker up しておいてください。)
また、wgetcurlなど、RESTのリクエストを送信できるコマンドをインストールしておいてください。
(筆者の環境ではcurlは証明書の処理でエラーが発生したため、wgetを利用しています。また、JSONを見やすく整形するためにjqコマンドを利用します。)

Dockerを初めて使う方は「docker pull」で適当なイメージをダウンロードしておいてくださいね。その上でイメージ一覧を確認しておきましょう。

$ docker images
REPOSITORY                   TAG                 IMAGE ID            CREATED             VIRTUAL SIZE
oddpoet/zookeeper            latest              33a69da2484d        5 days ago          909.3 MB
nobusue/kafka                0.8.1.1             29836f0457d5        5 days ago          808.1 MB
nobusue/java7                u72                 140a4d7d1cea        5 days ago          500.6 MB

それではRESTで同じ情報が取れるか確認してみましょう。

まず、DockerサーバーのTCP接続先を確認します。
ここでは、「DOCKER_HOST=tcp://192.168.59.103:2376」を前提としてコマンドを実行例を記載します。

次に、おもむろにRESTリクエストを送信してみましょう。

$ wget http://dockerhost:2376/images/json -O - -q | jq .

Linux環境ではJSONが表示されたと思いますが、Boot2Docker環境では何も表示されないはずです。

実は、Boot2Docker環境ではTLS暗号化が行われており、証明書と秘密鍵の指定が必要です。以下のようにしてください。

$ wget --no-check-certificate --certificate=$DOCKER_CERT_PATH/cert.pem --private-key=$DOCKER_CERT_PATH/key.pem https://dockerhost:2376/images/json -O - -q | jq .

正常に実行できれば、こんな感じのレスポンスが返ってきます。

[
  {
    "Created": 1417099585,
    "Id": "33a69da2484d70d6e9b9e590b6286a4fe6fba5ab91a7444ba6d970a7c97e10d8",
    "ParentId": "47f7486b7c978fda52628680274d3e0aeaa504c027b48fca1b898755ef21eedd",
    "RepoTags": [
      "oddpoet/zookeeper:latest"
    ],
    "Size": 0,
    "VirtualSize": 909336287
  },
  {
    "Created": 1417088378,
    "Id": "29836f0457d5f82b748c42d3ed8f798086f8116b09b700e3c1f36d058404de62",
    "ParentId": "d53de445c803da52a60af0cde9f8448f4f1b34422129e5234c1e2ffa6ec09b67",
    "RepoTags": [
      "nobusue/kafka:0.8.1.1"
    ],
    "Size": 0,
    "VirtualSize": 808064814
  },
  {
    "Created": 1417080614,
    "Id": "140a4d7d1cea6982b3d62921d3edb509f2602c8bda69f99a2c6ca0e055f39f1c",
    "ParentId": "0d7c4caeafc408f94009cc6f8d6cf52a8412a81dd6158ee40dde18715ce115de",
    "RepoTags": [
      "nobusue/java7:u72"
    ],
    "Size": 0,
    "VirtualSize": 500624405
  }
]

「docker images」コマンドと同等の情報が得られることが確認できました。

Docker Remote APIでコンテナ一覧を取得する

次に、コンテナ一覧を取得してみます。

先に適当なコンテナを起動しておきましょう。

$ docker run -ti -d ubuntu
$ docker ps
CONTAINER ID        IMAGE               COMMAND             CREATED             STATUS              PORTS               NAMES
91d01d7df230        ubuntu:latest       "/bin/bash"         7 seconds ago       Up 6 seconds                            cranky_ardinghelli   

RESTでコンテナ一覧を取得してみます。

$ wget --no-check-certificate --certificate=$DOCKER_CERT_PATH/cert.pem --private-key=$DOCKER_CERT_PATH/key.pem https://dockerhost:2376/containers/json -O - -q | jq .
[
  {
    "Command": "/bin/bash",
    "Created": 1417537292,
    "Id": "91d01d7df23053c8bac8b3ccdd8a02015027da69a0b57a8a1e8b6b66f1477c7f",
    "Image": "ubuntu:latest",
    "Names": [
      "/cranky_ardinghelli"
    ],
    "Ports": [],
    "Status": "Up 22 seconds"
  }
]

簡単ですね!

その他のAPIの使い方

公式リファレンスを参照してください。
Docker Remote API - Docker Documentation
なお、Remote APIにはバージョンがあり、DockerサーバーとDockerクライアントでAPIバージョンが異なると接続できません。
自分でRESTを叩く場合には問題ないかもしれませんが、独自のクライアントを実装する場合などは注意してください。

(補足)Unixドメインソケットを覗き見する

socatコマンドでドメインソケットを覗き見できます。デバッグのときなどにどうぞ。

How to set the name of a Docker container using REST API - Stack Overflow

docker pushを中断した後で再開したいとき

dockerで大きめのイメージをpushしようとすると、ネットワークの状態によってはハングして止まったりします。
Ctrl+Cで中断して再度 docker push を行うと、

2014/11/28 13:14:21 Error: push nobusue/kafka is already in progress

という感じでエラーになります。
再度pushするためにはDocker Serverを再起動しないといけないようです。

$ sudo service docker restart

Boot2Docker環境ではBoot2Dockerを再起動すれば大丈夫でした。

$ boot2docker restart

Boot2Dockerの環境設定の自動化

Boot2Docker経由でDockerを利用する場合、dockerコマンドにDockerホストの接続先や証明書のパスなどを教えてやるために環境変数を設定する必要があります。
設定すべき内容は「boot2docker shellinit」コマンドで確認できます。

$ boot2docker shellinit
Writing /Users/nobusue/.boot2docker/certs/boot2docker-vm/ca.pem
Writing /Users/nobusue/.boot2docker/certs/boot2docker-vm/cert.pem
Writing /Users/nobusue/.boot2docker/certs/boot2docker-vm/key.pem
    export DOCKER_CERT_PATH=/Users/nobusue/.boot2docker/certs/boot2docker-vm
    export DOCKER_TLS_VERIFY=1
    export DOCKER_HOST=tcp://192.168.59.103:2376

exportの部分を直接 .bashrc などに書いてもよいのですが、Boot2Dockerをアップグレードしたりすると設定すべき環境変数が変わる場合があります。

面倒なのでこの内容をそのまま読み込むようにしました。
具体的には .bashrc に以下を追加しています。

# Docker
eval $(boot2docker shellinit 2>/dev/null)

今のところこれで大丈夫そうです。
"Writing /Users/nobusue/・・・"が毎回表示されてうっとおしいので、「2>/dev/null」で消しています。(必須ではないです。)

(2014/11/28追記)
上記設定はBoot2Dockerが起動済でないと意味がないので、以下のように修正しました。

# Docker
if [ "`boot2docker status`" = "running" ]; then
  eval $(boot2docker shellinit 2>/dev/null)
fi

Hadoopクラスタ on AWSの俺々ベストプラクティス(2014年10月時点)

  • VPCを使って固定IPを振り、ホスト指定は基本的にIPで行う
  • DNS名はAWSデフォルトから変えない(逆引き対策)
  • クラスタ用にサブネットを切り、サブネット内は通信フリーにする(ポート開放の運用負荷軽減のため)
  • サブネット内にはVPNでアクセスできるようにする(管理系UIなど独自のポート番号を使うものが多いため)
  • VPC内にNTPサーバーを立て、全サーバーを同期する
  • AMIはHVMに対応したものを使う(t2.smallやr3.largeなどお得なインスタンスを使うため)
  • マーケットプレイス提供のAMIはルートボリューム以外としてマウントできない制限がかかっているので使わず、コミュニティAMIをカスタマイズして使う

Hadoop Conference Japan 2014参加レポート

最近Apache Spark関連のお仕事をしているので、2014/7/7に開催されたHadoop Conference Japan 2014に参加してきました。
Hadoopユーザー会主催、リクルートテクノロジーズ後援で、今回で5回目だそうです。

イベント概要&資料/Ustream公開

当日のタイムテーブル、および録画(Ustream)はこちらから参照できます。

https://hcj2014.eventbrite.com/

参加者に配布されたパスと扇子。

全体を通して

参加登録者数が1300名、うち65%は本カンファレンスに初参加とのことでした。
BigData処理に対する関心の高まりを感じるとともに、実際に利用しているユーザーはまだ少数派で、これから本格的に普及するというステージのようです。
Hadoopエコシステムの拡大に伴い、単純な分散ファイルシステム(HDFS)と並列バッチ処理(MapReduce)という使い方から、SQLによるアドホッククエリ、インメモリ処理による高速化、機械学習/グラフ処理への応用など、ユースケースが高度化しています。
それらを受け、今回のカンファレンスでは「SQL処理(BigQuery/Norikra/Drill/Impala/Presto)」「インメモリ処理(Spark)」「機械学習(Hivemall/Mahout)」をメインテーマとしてセッションを構成したとのこと。

基調講演

NTTData 濱野氏

資料) http://www.slideshare.net/hamaken/hadoop-conference-japan-2014-36781484

Hadoopは「はじめて」一般に普及した並列分散処理系であると言える。その理由の一つとして「シンプル」であることが挙げられるであろう。データリードの高速化とシンプルな処理モデル(MapReduce)に特化することで、大量データを全件処理しなければならないニーズに対応してきた。

Hadoopエコシステムの今後の方向性として以下が挙げられる。

  • MapReduce以外の多様なアプリケーションへの対応: YARN(Yet Another Resource Negotiator)の普及により下地が整いつつある。MapReduceは手堅くなくなることは無いが、それ以外の選択肢が求められている。
  • インメモリ処理への対応: Hadoopの弱点である「レイテンシー(遅延)」を解決するために必要。メモリの低価格化により、すべてをインメモリ処理することも非現実的ではなくなりつつある。Sparkが筆頭。

Hadoopエコシステムで利用の多いプロダクトはHDFS/MapReduce(v1)に加え、Hive/Zookeeper/HBaseあたり。個人的にはHBaseがこれほど使われていることが驚き。ImpalaやSparkなどは前回比で急成長している。

Doug Cutting氏 "The Future of Data"

資料) http://www.slideshare.net/Cloudera_jp/the-future-of-data-jp

未来は予言できないが、事実から予測することはできる。これまでの事実からHadoopの今後を予測してみる。

  1. ムーアの法則は依然有効であり、CPU/メモリ/ディスク/ネットワークのコストは下がり続けていく。コモディティハードウェアによる分散処理は、今後ますます普及していくだろう。
  2. データが生命線である。データを活用することが企業を競争優位に導くより重要な要素になるだろう。
  3. オープンソースソフトウェア(OSS)が生き残る。LinuxAndroidApacheLuceneといったOSSはオープンであるがゆえにエコシステムを形成しやすい。特にプラットフォーム技術はOSSが主流になるだろう。ビッグデータの基盤としてHadoopは今後もメインストリームであり続ける。
  4. Hadoopはさらに拡大していく。最初はコア機能以外何も無かったが、セキュリティやSPOF(単一障害点)除去、MapReduce以外への対応など、成長を続けている。
  5. Hadoopが普通になる。ビッグデータ処理と言えばHadoopを使うことが当たり前になっていく。
  6. Hadoopビッグデータにおいて支配的になる。IBM/Microsoft/OracleなどのベンダーもHadoopエコシステムに参加している。
  7. トランザクション処理にもHadoopが利用されるようになる。既に実装は始まっており、将来的にはOSSになる予定。

Hadoopエコシステムが目指すのは「Enterprise Data Hub」になることである。企業のすべてのデータが一カ所にまとまり、必要なときに必要なデータがすぐ利用できるという環境を実現したい。

Patric Wendell氏 "The Future of Spark"

資料) http://www.slideshare.net/hadoopconf/the-future-of-apache-spark

Apache Sparkのメイン開発者の一人。(スライドの日本語の一部は自分で翻訳したそうです。)

Sparkコミュニティは非常にアクティブ。以下を基本的な方針として開発を進めている。

  • 表現力の高い、クリーンなAPIの提供
  • 統一されたプラットフォーム
  • 拡張性の高いツールの提供

Spark1.0以降はpublic API(安定的)とexperimental API(実験的)に分け、開発者に優しいプラットフォームを目指している。また、マイナーリリース(1.1など)は3ヶ月毎に定期的に行い、必要に応じてメンテナンスリリース(1.0.1など)を行うなど、リリースサイクルを明確化している。

Sparkのアーキテクチャは安定したCore Runtimeと、拡張機能を提供するSpark Libsに分かれている。(JavaにおけるJavaVMとJVM言語の関係に近い。)Sparkの未来はSpark Libsにかかっており、今後も拡張を続けていく。特に重要なのは以下。

  • Spark SQL: もっとも成長しているライブラリ。スキーマによる型付けに対応したRDDを提供する。NoSQL(Cassandra,HBase,MongoDB)やRDBとの統合を実装中であり、Spark1.1ではJDBC/ODBCドライバを提供予定。(1.0.1でベータ公開)
  • MLlib: 二番目に成長しているライブラリ。Sparkの基盤上で高速な機械学習を行う。新しいアルゴリズムも順次追加していく。
  • SparkR: RからSparkに連携できるライブラリ。MLlibとの連携機能を予定している。
  • Spark Streaming: Sparkでストリーミング処理を行うライブラリ。新しいデータソースへの対応や、Flume連携強化を予定している。

Databricks Cloudデモ: 以下の操作がブラウザから行える。詳細はUstreamを参照。ベータユーザー募集中なので登録してね、とのこと。

  • Spark Cluster作成。
  • Notebook(作業場所)を作成し、SQLクエリやScala/Pythonコードを記述して実行。全体はmarkdown記法で記述できるようです。
  • Parquetファイルのアップロード。(外部システムからのデータ連携)
  • ロードしたファイルのスキーマを表示、SQL実行。
  • ダイナミックダッシュボードによるデータ表示。SQL実行結果も動的にグラフ表示可能。
太田一樹氏 "Hadoopエコシステムの変遷と、見えてきた使いどころ"

資料) 未公開

TreasureDataのCTOで、FruentdやPrestの開発にコミットしている。

Hadoopの普及に伴い、闇雲にHadoopを使いたいという人も増えてきた。「なぜHadoopを使うのか?」を考えることが重要。

太田氏の意見:「Collect/Store/Use/Better Use」の全サイクルを行うならばHadoopが有効。逆に、Collectだけなら専用ソリューションを使った方がよい。

各フェーズにおいて注目すべきキーワードは以下:

  • Collect: Fluentd / Flume / Sqoop / Kafkaなど、組み込みに特化したものなどもありバリエーションが増えている
  • Store: Parquet / ORCFile / HDFS / HBase / Acculo / Ambariなど、運用管理が弱いのが課題であり、マネージドサービスの普及が期待される
  • Use: Spark / Tez / YARN / Cascadingなど、新しいユースケースへの対応が拡大している
  • Better Use: SQL on Hadoop機械学習など、より高度なデータ利用

DBの進化、特にMPP(Massively Parallel Processing)が著しく、Hadoopとの棲み分けの境界線があいまいになってきている。MPPはスキーマを事前に決める必要があるため、データ処理効率は有利だが、新しいデータを投入したいというニーズへの対応に時間がかかる(通常は数週間)のが問題。

現時点では構造化データはMPP、非構造化データはHadoopが強いが、今後は技術の発展により混沌としてくるのではないか。

午後の部

判別しづらいので、勝手にセッションの通し番号振ってます。

C-1 マルチテナント化に向けたHadoopの最新セキュリティ事情 <小林氏(Cloudera)>

資料) http://www.slideshare.net/Cloudera_jp/hadoop-hcj2014

2012/9入社でCloudera製品のテクニカルサポートを担当。

Hadoopユースケースが「バッチ(MR/Hive/Pig)」から「インタラクティブ分析(Impala/Presto)」に変化してきている。また、従来は部門別にHadoopクラスタを構築しているユーザーが多かったが、クラスタ間データコピーの手間や運用負荷、リソース利用効率の悪さなどから、クラスタを一元化したいというニーズが増えてきている。

クラスタ一元化に伴い、リソース管理(特定ユーザーの占有を防止)とアクセス制限(認証認可)が必要となる。本セッションでは認証認可について紹介する。

従来のHadoopでは以下のような状況であった:

  • 認証: Kerberos ⇒Hive/Impalaも対応
  • 認可: HDFS ACL ⇒Hive Server2でKereros対応可能だが、行/列レベルの細かい制御はできない

Apache SentryはHive/Impala用の認可モジュールを提供しており、この課題に対処できる。具体的には「server - database - table(view) / URI」の階層でポリシーを定義し、認可を設定することが可能。

現状ではポリシーファイル(.ini)で設定する方法だが、Sentry1.4(CDH5.1)ではGrant/Revokeなどの操作によりポリシー定義が可能になる予定。

Sentryはライブラリとして提供されており、Hive/Impalaなどとの統合レイヤーを利用して組み込むアーキテクチャになっている。統合レイヤーは拡張可能。

B-1 Hadoop用の標準GUI、Hueの最新情報 <川崎氏(Cloudera)>

資料) http://www.slideshare.net/Cloudera_jp/hadoopgui-hue

Hue = Hadoop User Experience、コマンドラインを使わず、簡単にHadoopが使えるようにするというのが目的。

開発はGitHub上でOSSとして進められており、コア開発者は3名。GitHub上が最新であり、そこからバイナリディストリビューション(tar ball)→CDHと展開される。新しいバージョンを試したければGitHubから取得してください。

Hueの過去バージョンではいろいろ使いにくい点があったが、3.5以降では大きく改善されており、3.6では可視化機能が大きくパワーアップしている。Solrで検索した結果をダッシュボードで可視化できる。ダッシュボードはブラウザ上でドラッグ&ドロップしてカスタマイズも可能。

デモがあったので、見たい方はUstreamで。

Hueサーバー(Djangoで実装されたアプリケーション)は通信オーバーヘッド削減のため、Hadoopクラスタ内に配置するのが推奨構成。また、HueサーバーはHA構成も可能。

認証はSimple(DB)およびEnterprise(LDAP/OAuth/SAML)をサポート、ユーザー毎に利用アプリケーションを制限するなどの権限管理も可能。

今後、Sentry対応、Spark対応、Oozie v2対応などが予定されている。

以下が情報源なので併せて参照してください。

B-2 Hivemall: Apache Hiveを用いたスケーラブルな機械学習基盤 <油井氏(産総研)>

資料) http://www.slideshare.net/myui/hdj2014-myui

HivemallはHiveのUDF(ユーザー定義関数)およびUDTF(ユーザー定義テーブル生成関数)として実装した機械学習ライブラリ。油井氏が開発者で、OSS(LGPLライセンス)でGitHub上にて開発を進めている。

開発の動機: プログラミング不要な機械学習のプラットフォームが欲しい。既存のもの(Mahout / Vowpal Wabbit / H2O / Oryx / Spark MLlib)はすべてAPIによるプログラミングが必要だが、非プログラマ向けのソリューションが必要なのでは。

HiveのUDFとして実装することにより、プログラミング不要かつコンパイル不要(試行錯誤が容易)とすることができた。5分もあれば一通りチュートリアルを動かすことができるはず。

HiveのMapReduceへのマッピングを考慮してスケーラブルになるように実装している。かつ、Feature hashingというテクニックを利用することで、メモリに全データが入らなくても大丈夫なように配慮している。

最先端のオンライン機械学習アルゴリズム(CW,AROW,SCW)をサポートしており、他に比べ収束が早い。

反復学習の際に中間結果をHDFSに出力すると遅くなるため、Hivemallでは反復をデータのランダム化処理に置き換えることでエミュレートし、反復回数を削減している。これにより実用的なオーバーヘッドで学習精度の向上を達成している。実際の性能評価でも収束までの時間および精度で優位が確認できた。

Apache incubator化の話があるとのこと。

A-3 A Deeper Understanding of Spark Internals

資料) http://www.slideshare.net/hadoopconf/japanese-spark-internalssummit20143

Sparkを効率的に動かすためには内部構造を知る必要がある。

主にExecution model / Shuffle / Cachingに分解できるが、Cachingの話は後で考えればよい。

Execution modelは以下のフェーズから構成されている:

  1. Create DAG of RDDs: RDDに対する処理をDAGとして構築する
  2. Create logical execution plan: 論理的な実行プランを作る、なるべくパイプライン化できるようにし、複数のstageに分割する
  3. Schedule tasks: stage毎にtaskの実行をスケジューリングする

stage内に含まれるすべてのtaskの実行が完了してから、次のstageの実行に進む。

shuffleデータはディスクに書き出される(大半はOSのディスクキャッシュに収まるはず)、shuffleデータの受け渡しはpullベース(pushベースではない)。

groupby操作では1つのkey-valueペアはメモリ上に格納する必要があるため、巨大なkey-valueペアを避けた方がよい。これは将来のバージョンでは改善したいと考えている。

良くある問題:

  • パーティションの数が少なすぎる: 通常は100から10000程度がよい、最小でもコア数の2倍、最大はtaskの実行時間が100msを超えない程度
  • メモリ消費を最小化せよ: 巨大なkey-valueペアを避ける、GCログやdmesg(OOM-killer)で状況を確認
  • shuffleは必要最小限に: メモリ消費、ディスクI/O、ネットワークI/Oのすべてに影響する
  • 標準APIをきちんと理解して使おう
  • distinct(numPatitions=xx)をうまく使おう?⇒どういう効果があるのか別途調査せねば。

その他のTips:

  • 小さいデータサイズ、キャッシュなしで始め、後からチューニングせよ
  • 可能ならSparkSQLを使おう →Sparkランタイムがチューニングしてくれる(しかも将来的に改善されるはず)
  • rdd.toDebugString()でDAGの内容が確認できる
  • Spark ShellとSpark UIを組み合わせてパフォーマンス分析 →特にFailed TasksやMemory Usageに注目せよ

QA

  • SparkSQLのjoinは内部的にどのような挙動? →単純なHash Joinを行っている
  • Spark Streamingの結果が通常のRDDとして扱えない? →Streamingのための拡張を行っている。Unionするか、SparkSQLで抽出して変換してください。
B-4 Spark1.0での動作検証 - Hadoopユーザ・デベロッパから見たSparkへの期待 <土橋氏(NTTData) / 田中聡氏(NTTDoCoMo)>

資料)
http://www.slideshare.net/hadoopconf/hadoopspark-hadoop-conference-japan-2014
http://www.slideshare.net/hadoopxnttdata/apache-spark-nttdatahcj2014

NTTDoCoMoとNTTDataの共同プロジェクト「社会の頭脳システム」を2009年から運用している。1000ノード程度のHadoopクラスタで、ペタバイト級のデータを処理している。

最近、以下が課題となってきた:

  1. レイテンシの短縮
  2. Hadoopの進化への対応

1.に関して、従来は1時間おきのバッチ処理で十分だったものが、もっと短いサイクルで処理を行いたいというニーズが出てきた。→Sparkによるインメモリ化

2.に関して、MapReduce以外の処理への対応のためにクラスタを新規増設してきたが、管理負荷・資源効率の観点から集約したい。→YARNで多様なワークロードを統合

以上を解決するため、Spark+YARNに目を付けて技術検証を行った。観点は、自分たちのサービスにSpark+YARNが使えるかということで、具体的には数十TBのデータを投入してみて予想外の事象が発生しないかを確認した。

結果は非常に良好で、予想通りデータ量に対してほぼリニアにスケールするパフォーマンスが得られた。

ただし細かく見てみるといろいろな注意点があり、特にshuffle時にネットワークI/OだけでなくディスクI/Oが発生することに注意が必要。また、SparkがRDDで使用するメモリと、キャッシュなどアプリケーション以外で使用するメモリのサイジングが難しい。全体的には全体的に玄人好みのフレームワークと言える。

B-5 Treasure Data on The YARN <小林氏(TreasureData)>

資料) http://www.slideshare.net/ryukobayashi/treasure-data-on-the-yarn-hadoop-conference-japan-2014

TDのサービスはカラム型ストレージ(スキーマレス)とMapReduceの組み合わせで構成されており、Hive/Pig/Impala/Prestoによるクエリーをサポートしている。

ストレージは独自に実装したPlazmaDBを利用しており、AmazonS3上にデータを永続化するようになっている。HDFSMapReduceの中間ファイル出力以外には利用していない。

Hadoopをカスタマイズして運用しており、MRv1からYARNへの移行中であるが、移行にとても苦労した。ここではその苦労話を紹介する。

TDではHadoopクラスタを4つ運用しており、独自に開発したキュー/スケジューラを通じてHadoopクラスタにジョブが投入されるようになっている。ここに手を入れて、移行先のYARNベースのクラスタに同じジョブを投入するようにし、問題が起きないかをチェックしている。

TDのユーザー数は焼く5000で、データ量は6兆レコード、毎日4万程度のジョブを処理している。

YARNについてはB-3小沢氏の発表がかなり詳しかったので、そちらを参照してください。

http://www.slideshare.net/ozax86/taming-yarn-hadoop-conference-japan-2014-36775198

YARNを利用するにあたってのTips:

  • Job History Serverを稼働させていないと、過去のJobのログが確認できない。必ず起動しておくべき。
  • Hadoop2.4.0以降を使うべし。2.2/2.3のYARNにはバグがあり、スケジューラがデッドロックする。CDH5.0.2のYARNにはパッチがあたっているので大丈夫。
  • hadoop-conf-pseudoには設定ミスがあるので修正が必要(Hadoop2.2の内容のままになっている)
  • CDHやHDPのVMからコピーしてくるか、AmbariやCoudera Managerなどのツールを使うのがよい
  • YARNのWebUIからスタックトレースなどが見られるようになったので便利
  • メモリのチューニングが難しいので、設定支援ツール(hdp-configuration-utils.pyスクリプトやAmberi)を使うのがおすすめ
  • パッケージでYARNを導入しても、必須ディレクトリ等が作られていないので起動できない。忘れずに。
B-5 実践機械学習 - MahoutとSolrを活用したリコメンデーションにおけるイノベーション <草薙氏(MapR)>

資料) http://www.slideshare.net/MapR_Japan/mahoutsolr-hadoop-conference-japan-2014

本セッションの内容は、以下のebook(日本語)に基づいたものだそうです。詳細はこちらを参照するのがよいかと思います。

https://www.hadoop-times.com/technology/pdf-practical-machine-learning.html (要登録)

機械学習を実ビジネスに応用するためには、コストとリターンのバランスを考えることが重要。ここでは、Mahoutが提供する比較的シンプルな機械学習アルゴリズム検索エンジン(Solr)を組み合わせることで、効果的なリコメンデーションシステムを構築することを考えています。

題材として、AmazonのようなECサイトでのリコメンデーションの実現方法、さらに音楽配信サービスにおけるリコメンデーションについての入門的な内容の紹介がありました。これから機械学習を学ぶには良い教材だと思います。

最後にMapR(Hadoopディストリビューションの一つ)の紹介があり、MapRの特徴としてLucid Works(Solr商用版)のバンドル、およびNFS対応を紹介していました。CDH(Cloudera)やHDP(Hortonworks)などのメジャーディストリビューションとの差別化を図っているようです。

まとめ

全体的にHadoop新時代を感じさせる内容でした。
Sparkは急激に進化していることもあり、内部動作に関する情報はあまりまとまったものがなかったので、今回のセッションは非常に有用でした。

AntのGroovyタスクを使うときの俺流ベストプラクティス

G* Advent Calendar 2013の12/16担当、@nobusue です。

12/7に続き二度目の登場になりますが、引き続き実務に役立つシリーズでいきたいと思います。といっても、「ビルドはGradleで決まりだよね!」というイマドキの現場ではなく、いまだにAntでがんばっている(ちょっと残念な)現場の方向けです。

最近はMavenやGradleやsbtなんかの新興勢力に押され気味のAnt御大ですが、その安定感からいまだに「Ant以外認めない」という現場も多いと聞きます。(あくまで伝聞ですよ。。。)実際、要件的にはAntで十分というケースも多いでしょう。しかし、やはりAntで無理やりがんばるのはあまり得策ではないケースというのもままあります。例えば、

  1. テンプレートエンジンを利用して設定ファイルを動的に生成したい(単純なプロパティの置換ではすまない、もしくは日本語を含む文字列を置換したい場合など)
  2. 環境に応じて動的にビルドのロジックを切り替えたい(ファイルの有無や、プラットフォームの差異によって挙動を変えたい)
  3. ビルドの中でJavaのクラスを呼びたいが、わざわざカスタムAntタスクを作るのはめんどう

というような場合です。

わたしが今入っている現場では、まさに上記(1)のケースにヒットしてしまいました。(プロパティファイルにUnicodeエスケープした日本語を書いておくと、AntのCopyタスクのfilterでエスケープが解除されてしまい、そのままエスケープなしの状態でプロパティファイルが作られてしまう、、、というややこしい問題です。もう一回native2asciiすればいいんですけど、ダサダサですよね。)

というわけで、今回の内容は「いろいろ大人の事情でGradleに移行できないけど、せめてカスタムタスクぐらいはGroovyで書きたい」という人向けです。たいした内容ではありませんが、意外にまとまった情報がなかったのでまとめておきたいと思います。

AntのGroovyタスクとは?

GroovyのディストリビューションにはAntのカスタムタスクとしてGroovyタスクが内包されています。ですので、Antのクラスパスにgroovy-all-x.y.z.jarを追加し、ビルドスクリプトに以下の定義を追加すればGroovyタスクが利用できるようになります。だまって $ANT_HOME/lib 以下にgroovy-all.jarをぶっこんどくのがおススメです。

<taskdef name="groovy"
 classname="org.codehaus.groovy.ant.Groovy"
 classpath="groovy-all-x.y.z.jar" />

あとは、以下のようにビルドスクリプト内でGroovyが使えるようになります。

<groovy><![CDATA[
  ant.echo level:'info', message:'Hello groovy task!'
]]></groovy>

詳しくは、 The groovy Ant Task とか Antスクリプト内でGroovyを利用する あたりをご参照ください。

モジュール化してカスタムタスクっぽく使う

Groovyタスクは強力ですが、Ant以外受け付けない方に見つかるとお叱りを受ける可能性もあります。ですので、以下のようにしてGroovyコードの部分を隠蔽してしまいましょう。
呼び出し元:

<project name="main">
  <import file="./buildUtil.xml"/>
  <target name="build">
    <antcall target="complexTask"/>
  </target>
</project>

呼び出し先(Groovyタスク):

<project name="util">
  <taskdef name="groovy"
   classname="org.codehaus.groovy.ant.Groovy"
   classpath="groovy-all-x.y.z.jar" />
  <target name="complexTask">
    <groovy><![CDATA[
      ant.echo level:'info', message:'Groovy makes easier!'
    ]]></groovy>
  </target>
</project>

GroovyにはAntBuilderという便利な仕組みが内包されており、Groovyタスクの中からAntタスクを呼び出すこともできますので、Antで用意されているファイル操作なんかはそのまま便利に利用しましょう。

パラメータを渡す

前出のようにモジュール化した場合、当然ながら「呼び出し元タスクからパラメータを渡したい」という要望が出てきます。これ、不思議とサンプルコードが見当たらないのですが、やり方は簡単で以下のようにするだけです。
呼び出し元:

<project name="main">
  <import file="./buildUtil.xml"/>
  <target name="build">
    <antcall target="complexTask">
      <param name="sourcePath" value="./source"/>
      <param name="distPath" value="./dist"/>
    </antcall>
  </target>
</project>

呼び出し先(Groovyタスク):

<project name="util">
  <taskdef name="groovy"
   classname="org.codehaus.groovy.ant.Groovy"
   classpath="groovy-all-x.y.z.jar" />
  <target name="complexTask">
    <groovy><![CDATA[
      def sourcePath = properties['sourcePath']
      def distPath = properties['distPath']
      ant.echo level:'debug', message:"sourcePath: ${sourcePath}"
      ant.echo level:'debug', message:"distPath: ${distPath}"
    ]]></groovy>
  </target>
</project>

要するに、呼び出し側でで渡してやれば、Groovyタスク側ではproperties(マップ)で受け取ることができるということです。分かってしまえばどうということはありませんが、自力で調べると意外に試行錯誤が面倒でしたので、備忘の意味でここで記録に残しておきます。

ログ出力

汎用性の高いタスクができると、幅広く使われるようになります。そうすると、printデバッグではいずれ限界がきますので、早い段階でメッセージのログレベルをきちんと分けておきましょう。
実は既に小出しにしていましたが、

ant.echo level:'debug', message:"sourcePath: ${sourcePath}"

のように、AntBuilder経由でant.echoをログレベル付きで実行すればよいです。(Taskクラスのloggerを取得しようとしたのですが、どうもうまくいかないのでこの方法にしました。)
ログレベルを分けておくと、開発時はdebugを出力して、運用時はwarning以上のみ出力というような使い分けが可能です。ログレベルはAntのコマンドラインオプションで指定できますので、こちらなどを参考にいろいろためしてみてください。

まとめ

Antに疲れたらGroovyタスクで息抜きしましょう。

次は @nagai_masato さんです!

GroovyでGCログ解析: ちょっと扱いにくいフォーマットを処理するときのパターン

G* Advent Calendar 2013の12/7担当、@nobusue です。

今年は基本に返って、実務に役立つGroovyの利用方法をご紹介しようと思います。(というか実際に業務で使ったのですが。。。)

お題はGCログの解析です。
ここでご紹介するサンプルはOracle JDK7(HotSpot VM)が出力するGCログを想定していますが、仕組みはシンプルなので他のフォーマット(IBM JDK以外)にも多少アレンジすれば適用可能なはずです。

GCログを解析する場合、通常はGCViewerなどのツールを使うことが多いですが、それでは対応が難しい要件もたまにあります。
例えば、「GCログが100本くらい取ってあって、それらを解析して最適なヒープサイズを見積もる」というようなケースです。いちいちGUIツールで一つ一つ開いてられないですよね。。。

そこでGCログをスクリプトで処理してやろうか、と思うのですが、GCログには以下のようなちょっと困った特徴があります。

  1. 一行で無理矢理情報を階層化しているので、フィールドの分割が単純にいかない
  2. 設定しているGCアルゴリズムによってフォーマットがかなり変わる
  3. CPU負荷が高いとログが壊れてフォーマットが不正になる場合がある

特に3.が厄介で、こいつのせいでまともにパーサーを実装しようとすると例外処理が無駄に複雑になってしまいます。かといって、一行全体をパターンマッチングで処理しようとすると、ものすごく長大な正規表現が必要になり、デバッグに苦しむことになります。(というか実際苦しんだ。)

そこで、現実的なアプローチとして以下の戦略を採用します。

  • 一行を区切り文字でおおざっぱに分割する
  • 分割した各要素から、正規表現で必要な部分を取り出す

具体的なやり方は以下のようになります。

GCログ出力設定

まずは準備としてGCログを採取します。
JVMの起動時オプションに以下を追加してください。

 -Dverbose:gc -Xloggc:gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps

これで、JVM起動時のカレントディレクトリに gc.log が出力されるようになります。(+PrintGCDateStampsはJDK6以降でないと使えないので、JDK5以前の方は+PrintGCTimeStampsに変更してくださいね。)
GCログの具体的な出力は以下のようになります。
マイナーGCの場合:

2013-12-07T17:33:31.668+0900: 19.047: [GC [PSYoungGen: 26476K->6407K(30208K)] 59443K->40152K(74240K), 0.0631133 secs] [Times: user=0.20 sys=0.00, real=0.06 secs] 

フルGCの場合:

2013-12-07T17:33:31.731+0900: 19.110: [Full GC [PSYoungGen: 6407K->0K(30208K)] [ParOldGen: 33744K->34259K(65024K)] 40152K->34259K(95232K) [PSPermGen: 40231K->39553K(65536K)], 0.1483206 secs] [Times: user=0.36 sys=0.00, real=0.15 secs] 

GCログの読み方の詳細についてはここでは触れません。こちらあたりが参考になると思います。

GCログの1行をおおざっぱに分割する

GCログのフォーマットを眺めてみると、以下のような構造になっています。

2013-12-07T17:33:31.668+0900: 19.047:
[GC
  [PSYoungGen: 26476K->6407K(30208K)]
  59443K->40152K(74240K),
  0.0631133 secs]
[Times: user=0.20 sys=0.00, real=0.06 secs] 

2013-12-07T17:33:31.731+0900: 19.110: 
[Full GC
  [PSYoungGen: 6407K->0K(30208K)]
  [ParOldGen: 33744K->34259K(65024K)]
  40152K->34259K(95232K)
  [PSPermGen: 40231K->39553K(65536K)],
  0.1483206 secs]
[Times: user=0.36 sys=0.00, real=0.15 secs] 

「Timesの部分は役に立たないので捨てる」という決断をすれば、1行を"["と"]"、および","で区切れば意味のある固まりに分割できることがわかります。
これをGroovyで実装すると以下のようになります。

def items = line.tokenize("[")*.tokenize("]").flatten()*.tokenize(",").flatten()*.trim()

flatten()が2回発生していますが、元のフォーマットが変態なせいですので我慢してください。
これで、「ヒープ領域ごとのMAXを取得する」という目的にかなり近づくことができました。

パターンマッチングで必要な部分を抜き出す

この前の作業で、

[PSYoungGen: 6407K->0K(30208K)]

みたいな文字列を切り出すことはできました。じゃあ、ここから"(30208K)"の数字部分だけ取り出すにはどうすればよいでしょうか?
良い子の皆さんは既にお分かりだと思いますが、正規表現を使うのが一番手っ取り早くお手軽だと思います。でも、いざ使うとなると、サンプルとか少なくて意外に悩みませんか?
こういう場合、私はいつも次のようにしています。

def s = "[PSYoungGen: 6407K->0K(30208K)]"
def max = s.find( /PSYoungGen:.*\((\d+)K\)/ ) {all,m0 -> return m0}

何をしているのか簡単に解説します。

  1. find(/正規表現(正規表現)/)で文字列のマッチを行います。括弧で囲んでいる部分が、切り出したい文字列に相当する部分です。(バックリファレンス)
  2. findの最後の引数にクロージャを渡すと、クロージャの第1引数にはマッチした文字列全体、第2引数以降にはバックリファレンスで指定した文字列が渡されます。
  3. 欲しいのはバックリファレンスの1つめ、すなわちクロージャの第2引数なので、これをそのままreturnで返します。加工が必要ならここでやることもできます。

サンプルコード

第一引数としてGCログのファイル名を渡すと、

Young Max: 41472K
Old Max  : 65024K
Total Max: 95744K
Perm Max : 65536K
Total GC pause time: 0.3325945 sec

みたいな感じで、ヒープの各領域のMAXを表示するサンプルがこちらになります。(フルGCが発生していないとOld/Permは取得できないので"0K"になります。そういう仕様なのでご了承ください。)

def srcFile = new File(args[0])

def youngMax=0, oldMax=0, totalMax=0, permMax=0
def totalPause=0.0

srcFile.eachLine{ line ->
  if( line =~ /^\d\d\d\d-\d\d-\d\dT/) {
    def gcinfo = parseGC(line)
    //println gcinfo
    try{
   	ym = gcinfo.ym.toInteger()
   	tm = gcinfo.tm.toInteger()
   	pt = gcinfo.pt.toDouble()
    } catch(e) { return }
   youngMax = (ym > youngMax) ? ym : youngMax
   totalMax = (tm > totalMax) ? tm : totalMax
   totalPause += pt
   if( line.contains("[Full GC")) {
   try{
   	om = gcinfo.om.toInteger()
   	pm = gcinfo.pm.toInteger()
   } catch(e) { return }
    	oldMax = (om > oldMax) ? om : oldMax
    	permMax = (pm > permMax) ? pm : permMax
   } 
 }
}
println "Young Max: ${youngMax}K"
println "Old Max  : ${oldMax}K"
println "Total Max: ${totalMax}K"
println "Perm Max : ${permMax}K"
println "Total GC pause time: ${totalPause} sec"

def Map parseGC(line) {
    def items = line.tokenize("[")*.tokenize("]")
               .flatten()*.tokenize(",").flatten()*.trim()
    //println items
    def m = [:]
    items.each{
        switch(it) {
	    case ~/PSYoungGen:.*/:
	        m.ym = it.find( /PSYoungGen:.*\((\d+)K\)/ ) {all,m0 -> return m0}
		break
	    case ~/ParOldGen:.*/:
		m.om = it.find( /ParOldGen:.*\((\d+)K\)/ ) {all,m0 -> return m0}
	        break
	    case ~/PSPermGen:.*/:
	 	m.pm = it.find( /PSPermGen:.*\((\d+)K\)/ ) {all,m0 -> return m0}
		break
	    case ~/\d+K->\d+K\(\d+K\)/:
		m.tm = it.find( /\d+K->\d+K\((\d+)K\)/ ) {all,m0 -> return m0}
		break
	    case ~/\d*\.\d*\s+secs/:
		m.pt = it.find( /(\d*\.\d*)\s+secs/ ) {all,m0 -> return m0}
	}
   }
   return m
}

以上です。
Groovyの便利さを感じていただくことができましたか?

次は@さんです!