2018年5月3日木曜日

Netflix/conductor を使ってみた

概要

conductor はワークフローツールで Netflix が開発している OSS です
今回はインストールと挙動の確認までやってみました
ドキュメントがまだ少ないので不確かな情報が多いです

環境

  • macOS 10.13.4
  • conductor 1.9.0
  • docker 18.04.0-ce

インストール

でコードが取得できたらまずはビルドします
docker を使ってビルドできるので docker を使います

  • docker build -f docker/server/Dockerfile.build -t conductor:server-build .
  • docker run -v $(pwd):/conductor conductor:server-build
  • docker build -f docker/server/Dockerfile -t conductor:server .
  • docker build -f docker/ui/Dockerfile -t conductor:ui .

これでサーバと UI のイメージが作成されます
あとはコンテナを起動すれば OK です
この 2 つでローカル上で動作サーバと UI を動作するための jar ファイルなどが生成されます
これを元にサーバと UI の docker イメージを作成します

  • cd conductor/docker
  • docker-compose build

でビルドが完了したら

  • cd docker
  • docker-compose up -d

で UI とサーバを起動します
これで localhost:5000 にアクセスすると conductor の UI が表示されます
また localhost:8080 が API サーバのアクセスポイントになります

docker-compose を使わないのであれば

  • cd conductor/docker
  • docker build -t conductor:server ./server
  • docker build -t conductor:ui ./ui

でビルドしてから up でも OK です
個別に run すると依存している elasticsearch と dynamite も手動で上げる必要があるので起動するときは docker-compose を素直に使うことをおすすめします

kitchensink というサンプルのワークフローを使って挙動を確認してみる

conductor server が起動すると kitchensink というワークフローが登録されているのが確認できます
まだワークフローだけで実際に動作するワーカーがないので手動 (curl) でワークフローを辿ってみたいと思います

まずワークフローを実行させましょう
実行は UI からはできず API から行います
UI はステータスの確認を行うのに使います

  • curl -X POST --header 'Content-Type: application/json' --header 'Accept: text/plain' 'http://localhost:8080/api/workflow/kitchensink' -d '{ "task2Name": "task_5" }'

これでワークフローが起動します
他のワークフローはいろいろ試した残骸なので無視してください
conductor1.png

更にワークフローの詳細を確認すると task_1 というタスクが SCHEDULED になっていることが確認できます
conductor2.png

要するにワークフローが開始され一番初めのタスクが待ち状態になりました
待ち状態というのは具体的にいうとワーカーさんに「処理してください」という状態になったということです (おそらく)

task_1 の情報を取得しましょう
以下の API は所謂ポーリング処理になり実際はワーカーから定期的に投げられる処理になります

  • curl http://localhost:8080/api/tasks/poll/task_1

すると以下のような JSON が返ってくると思います

{
  "taskType": "task_1",
  "status": "IN_PROGRESS",
  "inputData": {
    "mod": null,
    "oddEven": null,
    "env": {
      "taskId": "eb934c01-731f-48b2-a5e7-a9ee81162256",
      "workflowId": "14c8631506a5"
    }
  },
  "referenceTaskName": "task_1",
  "retryCount": 0,
  "seq": 1,
  "pollCount": 1,
  "taskDefName": "task_1",
  "scheduledTime": 1524795601602,
  "startTime": 1524795883025,
  "endTime": 0,
  "updateTime": 1524795883025,
  "startDelayInSeconds": 0,
  "retried": false,
  "callbackFromWorker": true,
  "responseTimeoutSeconds": 3600,
  "workflowInstanceId": "54313f13-199c-497c-9732-843e20a86b67",
  "taskId": "eb934c01-731f-48b2-a5e7-a9ee81162256",
  "callbackAfterSeconds": 0,
  "workflowTask": {
    "name": "task_1",
    "taskReferenceName": "task_1",
    "inputParameters": {
      "mod": "${workflow.input.mod}",
      "oddEven": "${workflow.input.oddEven}",
      "env": {
        "taskId": "${CPEWF_TASK_ID}",
        "workflowId": "${HOSTNAME}"
      }
    },
    "type": "SIMPLE",
    "startDelay": 0
  },
  "queueWaitTime": 281423,
  "taskStatus": "IN_PROGRESS"
}

長ったらしいので自分も全部は理解していませんがタスクの状態や実行時間、入力情報などがあります
ポイントは status でこれが IN_PROGRESS になっているのがわかります
要するにワーカーが一度ポーリングするとステータスが IN_PROGRESS になりどれかのワーカーに握られたタスクだとということを明示しています

本来はここでタスクに紐付いたロジックをワーカーが実行します
そして処理が完了したらタスクのステータスを完了にします
ここでは成功した状態にします

curl -H 'Content-Type:application/json' -H 'Accept:application/json' -X POST http://localhost:8080/api/tasks/ -d '
{
    "taskId": "eb934c01-731f-48b2-a5e7-a9ee81162256",
    "workflowInstanceId": "54313f13-199c-497c-9732-843e20a86b67",
    "status": "COMPLETED",
    "output": {
        "mod": 5,
        "taskToExecute": "task_1",
        "oddEven": 0,
        "dynamicTasks": [
            {
                "name": "task_1",
                "taskReferenceName": "task_1_1",
                "type": "SIMPLE"
            },
            {
                "name": "sub_workflow_4",
                "taskReferenceName": "wf_dyn",
                "type": "SUB_WORKFLOW",
                "subWorkflowParam": {
                    "name": "sub_flow_1"
                }
            }
        ],
        "inputs": {
            "task_1_1": {},
            "wf_dyn": {}
        }
    }
}'

API 的には /tasks に対して POST を投げているだけですがボディの JSON がやたら長ったらしいです
おそらくちゃんと運用するようになったら各パラメータが何を意味するかちゃんと理解する必要はあると思います

ここでは冒頭の workflowInstanceIdtaskId を変更します
先ほどポーリングした際に task_1 から取得したものを設定してください

そしてリクエストすると task_1 のタスクが終了し次の処理に移行しているのがわかると思います
conductor3.png

途中でイベントが挟まっていますがイベントは本来別の外部キューシステムなどにエンキューしたりするのができるようです
https://netflix.github.io/conductor/metadata/systask/#event

次のタスクは task_5 となっているので再度ポーリングするとステータスが SCHEDULED -> IN_PROGRESS に変わるのが確認できると思います

だいたいこんな感じで、タスクポーリング -> ワーカー処理 -> タスクにフィードバックという流れでフローが進んでいくようです
ことあとで更に Decision タスクや Fork タスクなどが出てくるようですがそれはまた別の機会に試してみたいと思います

最後に

Netflix/conductor の起動方法と簡単な使い方の確認をしてみました
本格的に使うにはワーカーが必要です
どうやら SDK として Java or Python があるようなので次回は Python SDK を使ってワーカーと連携させてみたいと思います

まだ利便性が見えてきているわけではないですが、感じとしては Amazon SQS のようなキューイングシステムになるのでワーカー側で結構頑張らないとダメな印象はあります
イベントパイプラインのような pub/sub システムではないのでその辺りの区別も必要かなと思います

参考サイト

0 件のコメント:

コメントを投稿