この記事は Clojure Advent Calendar 2015 - Qiita の12月22日の記事です(微妙に遅刻しましたすんません)
データを関数で変換しながら次々に処理していくことを考えましょう。関数を頂点、頂点同士の処理の順番と依存関係を辺とするグラフ構造を用いて処理を表現することができることがわかるでしょうか。古き良きフローチャートと似たものです。処理がループすることを許さないとすると、このグラフ構造は有向非循環グラフ、つまりDirected Acyclic Graph (DAG)となることがわかります。
(defn f [x] ;; なんとかかんとか) (defn g [x] ;; なんとかかんとか) (defn h [x] ;; なんとかかんとか) (def call [x] (h (g (f x))))
(def call [x] (-> x f g h))
今回は、Pure Clojureで書かれたDAGベース分散処理プラットフォームOnyxを紹介します。非常に簡単なサンプルプログラムを紹介して、Onyxにおいてどのようにプログラムを作成するかを述べます。
さて、普通にlein new
;; project.clj (defproject onyx-samples "0.1.0-SNAPSHOT" :description "FIXME: write description" :url "http://example.com/FIXME" :license {:name "Eclipse Public License" :url "http://www.eclipse.org/legal/epl-v10.html"} :main onyx-samples.sample1-core-async :dependencies [[org.clojure/clojure "1.7.0"] [org.clojure/java.jdbc "0.4.2"] [org.xerial/sqlite-jdbc ""] [org.clojure/core.async "0.1.346.0-17112a-alpha"] [org.onyxplatform/onyx "0.8.2"] [org.onyxplatform/onyx-sql ""] [com.stuartsierra/component "0.2.3"]])
;; onyx-samples/src/onyx-samples/sample1_core_async.clj (ns onyx-samples.sample1-core-async (:require [clojure.core.async :refer [chan >! >!! <! <!! go close!]] [onyx.plugin.core-async :refer [take-segments!]] [com.stuartsierra.component :as component] [clojure.pprint :as pp] [onyx.api])) ;; A very simple Onyx test code that takes segments from a channel ;; and just move them to another channel ;; ここでは、入力と出力は、core.asyncのchannelを利用します。 (def in-ch (chan 500)) (def out-ch (chan 500)) ;; システムの状態管理にcom.stuartsierra.componentを使用します。 (def system nil) ;; メインの定義。今回は、:in というタスクでchannelからデータを読み込み、 ;; :incというタスクで処理をして、それを:outというタスクで別のchannelに出力します。 (def workflow [[:in :inc] [:inc :out]]) ;; 起動するpeer(プロセスのようなもの)を指定します。 ;; 少なくともworkflowに指定されたタスクの数以上なくてはいけません。 ;; なので、上記で定義されたworkflow (def n-peers (->> workflow (mapcat identity) set count)) ;; とりあえずは重要でないパラメーター (def batch-size 10) (def batch-timeout 50) ;; それぞれのタスクについて、詳細を定義するデータ構造です。 ;; Onyxではcatalogと呼ばれます。 (def catalog ;; :inタスク。ここでは標準で用意されているcore-asyncプラグインを使います。 ;; channelの名前等の必要な情報は、後述のlifecycleで定義します。 [{:onyx/name :in :onyx/type :input :onyx/plugin :onyx.plugin.core-async/input :onyx/medium :core.async :onyx/max-peers 1 :onyx/batch-timeout batch-timeout :onyx/batch-size batch-size :onyx/doc "Reads segments from a core.async channel"} ;; 今回の処理のメインとなる:incタスクです。 ;; my-incという関数を指定しています。 {:onyx/name :inc :onyx/type :function :onyx/fn :onyx-samples.sample1-core-async/my-inc :onyx/batch-size batch-size} ;; :inタスクと同様の、:outタスクを指定します。 {:onyx/name :out :onyx/type :output :onyx/plugin :onyx.plugin.core-async/output :onyx/medium :core.async :onyx/max-peers 1 :onyx/batch-timeout batch-timeout :onyx/batch-size batch-size :onyx/doc "Writes segments to a core.async channel"} ]) ;; :in用のchannelを設定するlifecycle関数。詳細は後述 (defn inject-in-ch [event lifecycle] (println "inject-in-ch is called.") {:core.async/chan in-ch}) ;; :out用のchannelを設定するlifecycle関数 (defn inject-out-ch [event lifecycle] (println "inject-out-ch is called.") {:core.async/chan out-ch}) (def in-calls {:lifecycle/before-task-start inject-in-ch}) (def out-calls {:lifecycle/before-task-start inject-out-ch}) ;; lifecycleとは、タスク起動の各段階で起動できる関数です。 ;; channel等のパラメーターを設定したり、タスク起動の前にリソースを獲得したりといった ;; ことが出来ます。 (def lifecycles [ ;; :in用のchannelを設定する関数をfully-qualified keywordで指定します。 {:lifecycle/task :in :lifecycle/calls :onyx-samples.sample1-core-async/in-calls} ;; core-asyncプラグインで指定されているlifecycle関数 {:lifecycle/task :in :lifecycle/calls :onyx.plugin.core-async/reader-calls} ;; :out用のchannelを設定する関数をfully-qualified keywordで指定します。 {:lifecycle/task :out :lifecycle/calls :onyx-samples.sample1-core-async/out-calls} ;; core-asyncプラグインで指定されているlifecycle関数 {:lifecycle/task :out :lifecycle/calls :onyx.plugin.core-async/writer-calls} ]) (def onyx-id (java.util.UUID/randomUUID)) ;; peerやzookeeperを起動するパラメーターです (def env-config {:zookeeper/address "" :zookeeper/server? true :zookeeper.server/port 2188 :onyx/id onyx-id}) (def peer-config {:zookeeper/address "" :onyx.peer/job-scheduler :onyx.job-scheduler/balanced :onyx.messaging/impl :aeron :onyx.messaging/peer-port 40200 :onyx.messaging/bind-addr "localhost" :onyx/id onyx-id}) ;; componentでプログラムの初期化処理・終了処理・状態を管理します。 (defrecord OnyxDevEnv [n-peers] component/Lifecycle ;; start関数でpeerを起動します。 (start [component] (println "Starting Onyx development environment") (let [onyx-id (java.util.UUID/randomUUID) env (onyx.api/start-env env-config) peer-group (onyx.api/start-peer-group peer-config) peers (onyx.api/start-peers n-peers peer-group)] (assoc component :env env :peer-group peer-group :peers peers :onyx-id onyx-id))) ;; stopでpeerをシャットダウンします (stop [component] (println "Stopping Onyx development environment") (doseq [v-peer (:peers component)] (onyx.api/shutdown-peer v-peer)) (onyx.api/shutdown-peer-group (:peer-group component)) (onyx.api/shutdown-env (:env component)) (assoc component :env nil :peer-group nil :peers nil))) ;; 処理する関数の本体です。 ;; 渡されるデータ構造は、segmentと呼ばれ、要するにClojureのmapです。 ;; map以外のデータ構造は許可されていません。 (defn my-inc [segment] (update-in segment [:n] inc)) ;; componentのインスタンスを #'system というvarに束縛します。 (defn init [] (alter-var-root #'system (constantly (map->OnyxDevEnv {:n-peers n-peers})))) (defn start [] (when (nil? system) (init)) (alter-var-root #'system (fn [s] (component/start s))) nil) (defn stop [] (alter-var-root #'system (fn [s] (when s (component/stop s)))) nil) ;; 入力用のchannelにデータを流し込んでから、jobをsubmitします。 (defn submit-jobs [] (dotimes [i 20] (let [segment {:n i :greeting (str "Hello" i)}] (>!! in-ch segment))) (>!! in-ch :done) (let [job {:workflow workflow :catalog catalog :lifecycles lifecycles :task-scheduler :onyx.task-scheduler/balanced}] (println "Submitting") (onyx.api/submit-job peer-config job))) ;; 一連の関数を起動するmain関数です (defn -main [& args] (init) (start) (submit-jobs) (pp/pprint (take-segments! out-ch)) (stop) (shutdown-agents))
# 実行結果 $ lein run [~/Dropbox/code/onyx-samples] Starting Onyx development environment Submitting inject-out-ch is called. inject-in-ch is called. [{:n 1, :greeting "Hello0"} {:n 2, :greeting "Hello1"} {:n 3, :greeting "Hello2"} {:n 4, :greeting "Hello3"} {:n 5, :greeting "Hello4"} {:n 6, :greeting "Hello5"} {:n 7, :greeting "Hello6"} {:n 8, :greeting "Hello7"} {:n 9, :greeting "Hello8"} {:n 10, :greeting "Hello9"} {:n 11, :greeting "Hello10"} {:n 12, :greeting "Hello11"} {:n 13, :greeting "Hello12"} {:n 14, :greeting "Hello13"} {:n 15, :greeting "Hello14"} {:n 16, :greeting "Hello15"} {:n 17, :greeting "Hello16"} {:n 18, :greeting "Hello17"} {:n 19, :greeting "Hello18"} {:n 20, :greeting "Hello19"} :done] Stopping Onyx development environment