読者です 読者をやめる 読者になる 読者になる

本当は怖い情報科学

情報系大学院生の趣味&実益ブログ。

DAGベース分散処理プラットフォームOnyxの紹介

clojure

この記事は Clojure Advent Calendar 2015 - Qiita の12月22日の記事です(微妙に遅刻しましたすんません)

データを関数で変換しながら次々に処理していくことを考えましょう。関数を頂点、頂点同士の処理の順番と依存関係を辺とするグラフ構造を用いて処理を表現することができることがわかるでしょうか。古き良きフローチャートと似たものです。処理がループすることを許さないとすると、このグラフ構造は有向非循環グラフ、つまりDirected Acyclic Graph (DAG)となることがわかります。

例えば、下の3つの関数を連続して呼び出すコードは

(defn f [x] 
  ;; なんとかかんとか)

(defn g [x]
  ;; なんとかかんとか)

(defn h [x]
  ;; なんとかかんとか)

(def call [x]
  (h (g (f x))))

のようになります。あるいは、スレッディングマクロを使うとこのように書くことも出来ます。

(def call [x]
  (-> x f g h))

スレッディングマクロの威力は絶大ですね。これはもうほとんどDAGに近い記法です。

ところで、処理の内容をDAGとして表現することで何が嬉しいのでしょうか?1つは分散/並列処理がしやすいことにあります。処理同士の依存関係をプログラマが明示的に記述してやることにより、システムが処理を分散配置しやすくなります。また、DAGという制約の多い記法で処理を記述することにより、分散処理の時に障害となる処理を暗黙のうちに使わないようにシステムを設計することが出来ます。

そのような分散処理システムは多数提案されています。有名なものはStormやSparkでしょうか。Hadoopを初めとするMap/Reduce処理系も、DAGの特殊な場合と言うことが出来るでしょう。

今回は、Pure Clojureで書かれたDAGベース分散処理プラットフォームOnyxを紹介します。非常に簡単なサンプルプログラムを紹介して、Onyxにおいてどのようにプログラムを作成するかを述べます。

処理する内容は、入力されたデータ列の各要素に対して、ただ整数インクリメントをするというだけのものです。また、大規模に分散させるのではなく、最低限のプロセス数で処理させていますので、分散処理プラットフォームの紹介としては物足りないかも知れません。core.asyncのchannelを入力・出力として、入力用のchannelからデータが取り出され、インクリメント処理が適用され、その後出力用のchannelに書き出されます。処理内容に比べて非常に長いソースコードですが、逆に処理内容が増えてもこれ以上の大量のコードを追加する必要なくスケーラブルに分散処理を行うことが出来ます。

関数名などをキーワードで指定しているのは、定義の順番に左右されないようにという理由です。全体的に回りくどいと感じられるかも知れませんが、分散処理に付随するいろいろな面倒ごとを吸収するためには必要なおまじないだと思ってください。

さて、普通にlein newで生成したプロジェクトに対して、この記事の末尾にコードを掲載しました。少し長いですがおつきあいください。

;; project.clj
(defproject onyx-samples "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :main onyx-samples.sample1-core-async
  :dependencies [[org.clojure/clojure "1.7.0"]
                 [org.clojure/java.jdbc "0.4.2"]
                 [org.xerial/sqlite-jdbc "3.8.11.2"]
                 [org.clojure/core.async "0.1.346.0-17112a-alpha"]
                 [org.onyxplatform/onyx "0.8.2"]
                 [org.onyxplatform/onyx-sql "0.8.2.1"]
                 [com.stuartsierra/component "0.2.3"]])
;; onyx-samples/src/onyx-samples/sample1_core_async.clj
(ns onyx-samples.sample1-core-async
  (:require [clojure.core.async :refer [chan >! >!! <! <!! go close!]]
            [onyx.plugin.core-async :refer [take-segments!]]
            [com.stuartsierra.component :as component]
            [clojure.pprint :as pp]
            [onyx.api]))

;; A very simple Onyx test code that takes segments from a channel
;; and just move them to another channel

;; ここでは、入力と出力は、core.asyncのchannelを利用します。
(def in-ch (chan 500))
(def out-ch (chan 500))

;; システムの状態管理にcom.stuartsierra.componentを使用します。
(def system nil)

;; メインの定義。今回は、:in というタスクでchannelからデータを読み込み、
;; :incというタスクで処理をして、それを:outというタスクで別のchannelに出力します。
(def workflow
  [[:in :inc]
   [:inc :out]])

;; 起動するpeer(プロセスのようなもの)を指定します。
;; 少なくともworkflowに指定されたタスクの数以上なくてはいけません。
;; なので、上記で定義されたworkflow
(def n-peers (->> workflow (mapcat identity) set count))

;; とりあえずは重要でないパラメーター
(def batch-size 10)
(def batch-timeout 50)

;; それぞれのタスクについて、詳細を定義するデータ構造です。
;; Onyxではcatalogと呼ばれます。
(def catalog
  ;; :inタスク。ここでは標準で用意されているcore-asyncプラグインを使います。
  ;; channelの名前等の必要な情報は、後述のlifecycleで定義します。
  [{:onyx/name :in                            
    :onyx/type :input
    :onyx/plugin :onyx.plugin.core-async/input
    :onyx/medium :core.async
    :onyx/max-peers 1
    :onyx/batch-timeout batch-timeout
    :onyx/batch-size batch-size
    :onyx/doc "Reads segments from a core.async channel"}

   ;; 今回の処理のメインとなる:incタスクです。
   ;; my-incという関数を指定しています。
   {:onyx/name :inc
    :onyx/type :function
    :onyx/fn :onyx-samples.sample1-core-async/my-inc
    :onyx/batch-size batch-size}

   ;; :inタスクと同様の、:outタスクを指定します。
   {:onyx/name :out
    :onyx/type :output
    :onyx/plugin :onyx.plugin.core-async/output
    :onyx/medium :core.async
    :onyx/max-peers 1
    :onyx/batch-timeout batch-timeout
    :onyx/batch-size batch-size
    :onyx/doc "Writes segments to a core.async channel"}
   ])

;; :in用のchannelを設定するlifecycle関数。詳細は後述
(defn inject-in-ch [event lifecycle]
  (println "inject-in-ch is called.")
  {:core.async/chan in-ch})

;; :out用のchannelを設定するlifecycle関数
(defn inject-out-ch [event lifecycle]
  (println "inject-out-ch is called.")
  {:core.async/chan out-ch})

(def in-calls
  {:lifecycle/before-task-start inject-in-ch})

(def out-calls
  {:lifecycle/before-task-start inject-out-ch})

;; lifecycleとは、タスク起動の各段階で起動できる関数です。
;; channel等のパラメーターを設定したり、タスク起動の前にリソースを獲得したりといった
;; ことが出来ます。
(def lifecycles
  [
   ;; :in用のchannelを設定する関数をfully-qualified keywordで指定します。
   {:lifecycle/task :in
    :lifecycle/calls :onyx-samples.sample1-core-async/in-calls}
   
   ;; core-asyncプラグインで指定されているlifecycle関数
   {:lifecycle/task :in
    :lifecycle/calls :onyx.plugin.core-async/reader-calls}
   
   ;; :out用のchannelを設定する関数をfully-qualified keywordで指定します。
   {:lifecycle/task :out
    :lifecycle/calls :onyx-samples.sample1-core-async/out-calls}
   
   ;; core-asyncプラグインで指定されているlifecycle関数
   {:lifecycle/task :out
    :lifecycle/calls :onyx.plugin.core-async/writer-calls}
   ])

(def onyx-id (java.util.UUID/randomUUID))

;; peerやzookeeperを起動するパラメーターです
(def env-config
  {:zookeeper/address "127.0.0.1:2188"
   :zookeeper/server? true
   :zookeeper.server/port 2188
   :onyx/id onyx-id})

(def peer-config
  {:zookeeper/address "127.0.0.1:2188"
   :onyx.peer/job-scheduler :onyx.job-scheduler/balanced
   :onyx.messaging/impl :aeron
   :onyx.messaging/peer-port 40200
   :onyx.messaging/bind-addr "localhost"
   :onyx/id onyx-id})

;; componentでプログラムの初期化処理・終了処理・状態を管理します。
(defrecord OnyxDevEnv [n-peers]
  component/Lifecycle

  ;; start関数でpeerを起動します。
  (start [component]
    (println "Starting Onyx development environment")
    (let [onyx-id (java.util.UUID/randomUUID)
          env (onyx.api/start-env env-config)
          peer-group (onyx.api/start-peer-group peer-config)
          peers (onyx.api/start-peers n-peers peer-group)]
      (assoc component :env env :peer-group peer-group
             :peers peers :onyx-id onyx-id)))

  ;; stopでpeerをシャットダウンします
  (stop [component]
    (println "Stopping Onyx development environment")
    (doseq [v-peer (:peers component)]
      (onyx.api/shutdown-peer v-peer))
    (onyx.api/shutdown-peer-group (:peer-group component))
    (onyx.api/shutdown-env (:env component))
    (assoc component :env nil :peer-group nil :peers nil)))

;; 処理する関数の本体です。
;; 渡されるデータ構造は、segmentと呼ばれ、要するにClojureのmapです。
;; map以外のデータ構造は許可されていません。
(defn my-inc [segment]
  (update-in segment [:n] inc))

;; componentのインスタンスを #'system というvarに束縛します。
(defn init []
  (alter-var-root #'system (constantly (map->OnyxDevEnv {:n-peers n-peers}))))

(defn start []
  (when (nil? system)
    (init))
  (alter-var-root #'system (fn [s] (component/start s)))
  nil)

(defn stop []
  (alter-var-root #'system (fn [s] (when s (component/stop s))))
  nil)

;; 入力用のchannelにデータを流し込んでから、jobをsubmitします。
(defn submit-jobs []
  (dotimes [i 20]
    (let [segment {:n i :greeting (str "Hello" i)}]
      (>!! in-ch segment)))
  (>!! in-ch :done)
  (let [job {:workflow workflow
             :catalog catalog
             :lifecycles lifecycles
             :task-scheduler :onyx.task-scheduler/balanced}]
    (println "Submitting")
    (onyx.api/submit-job peer-config job)))

;; 一連の関数を起動するmain関数です
(defn -main [& args]
  (init)
  (start)
  (submit-jobs)
  (pp/pprint (take-segments! out-ch))
  (stop)
  (shutdown-agents))

そして、以下が実行した結果です。個々のデータ(segment)に対してmy-inc関数が適用したものが出力用channelに出力されていることがわかりますね。

# 実行結果
$ lein run                                                                                                                                                        [~/Dropbox/code/onyx-samples]
Starting Onyx development environment
Submitting
inject-out-ch is called.
inject-in-ch is called.
[{:n 1, :greeting "Hello0"}
 {:n 2, :greeting "Hello1"}
 {:n 3, :greeting "Hello2"}
 {:n 4, :greeting "Hello3"}
 {:n 5, :greeting "Hello4"}
 {:n 6, :greeting "Hello5"}
 {:n 7, :greeting "Hello6"}
 {:n 8, :greeting "Hello7"}
 {:n 9, :greeting "Hello8"}
 {:n 10, :greeting "Hello9"}
 {:n 11, :greeting "Hello10"}
 {:n 12, :greeting "Hello11"}
 {:n 13, :greeting "Hello12"}
 {:n 14, :greeting "Hello13"}
 {:n 15, :greeting "Hello14"}
 {:n 16, :greeting "Hello15"}
 {:n 17, :greeting "Hello16"}
 {:n 18, :greeting "Hello17"}
 {:n 19, :greeting "Hello18"}
 {:n 20, :greeting "Hello19"}
 :done]
Stopping Onyx development environment
【広告】