1
/
5

【TECH BLOG】Cloud Composer 2上でApache Airflow 2のワークフローを実装する

はじめに

こんにちはZOZOデータサイエンス部MLOpsブロック松岡です。

本記事では先日リリースされたGCP(Google Cloud PlatformCloud Composerの最新バージョンCloud Composer 2について紹介します。

ZOZOTOWNでは、多種多様な商品が毎日新たに出品されています。現在MLOpsブロックでは、機械学習で商品情報の登録を補佐するシステムを開発しています。

このシステムでは商品情報を保存するデータベースへ大量の書き込み処理が発生します。このアクセスによる負荷が日常業務に影響を及ぼすリスクを最小限に抑えるため、推論処理は夜間に行います。夜間に処理を完了させるには強力なマシンリソースを使用する必要があります。コストの観点から処理が行われていない時間はマシンリソースを使用停止する必要もあります。また、人手を介さずに安定して稼働出来る仕組みも求められます。

上記の要件を満たすためにワークフローエンジンを使用することになりました。

MLOpsブロックでは当初Vertex AI Pipelinesを検討しました。

しかし、類似アイテム検索機能にCloud Composerを採用していたことや、スケジューリング機能やリトライ処理が充実していることからCloud Composerについても検討することとしました。

類似アイテム検索ではCloud Composer 1を使用していたため、そのバージョンアップにおける技術調査を兼ねて、Cloud Composer 2における変更点について調査しました。併せてApache Airflow(以下Airflowと記述)2についても調査しています。

Airflowはワークフローエンジン

Cloud ComposerはGCP上にてAirflowをマネージドに提供するサービスです。Cloud Composerについて説明する前にまずAirflowについて簡単に紹介します。

ワークフローエンジンを使うメリット

Airflowは様々な処理を行うワークフローをスケジュールして実行出来るワークフローエンジンです。例えばデータを推論するワークフローを次のような複数のタスクに分けて協調動作させることができます。

  • 外部APIから必要なデータを取得するタスク
  • 取得したデータに対して分類ごとに並列で推論処理を行うタスク
  • 推論の結果を出力するタスク

複数のタスクが依存関係に基づく順序で実行され、各タスクを異なるワーカーインスタンスが処理し、エラー時にはリトライさせることが出来ます。これによりネットワークエラーのような不測の事態が発生した場合でも、人手を介さずにワークフローを復旧させることが出来ます。

Airflowの強み

ワークフローの定義にPythonを使用

AirflowはPythonにより実装されており、ワークフローの定義にもPythonを使用します。そのためXMLなどの設定ファイルでワークフローを記述する方式に比べてプログラマーにとって理解しやすく感じます。

多種多様なOperator

タスクはOperatorをインスタンス化することで定義します。Airflowには標準的なOperatorに加えGKEインスタンスを起動するOperatorなど多様なOperatorがすでに用意されていることも魅力です。適切なOperatorを使用することでタスクを簡単に記載できます。

スケジューリング機能を備えている

Airflow自身にスケジューリング機能を備えており、特定の時間や一定時間ごとにワークフローを自動実行することが出来ます。

ワークフローの流れ

ワークフローはタスクを組み合わせてDAG(有向非巡回グラフ)として定義します。実行可能となったタスクはスケジューラーによりワーカーと呼ばれる実行用のマシンインスタンスに割り当てられます。ワーカーがタスクを完了するとDAGに基づいて次のタスクが実行可能となります。実行中のタスクと実行可能なタスクが全て無くなればDAGが終了します。

Cloud Composer 2について

Cloud ComposerはAirflowをGCP上で実行するマネージドなサービスです。詳細なアーキテクチャはCloud Composerのバージョンによって異なりますが、どちらも大部分はGKE(Google Kubernetes Engine)上で動作しています。

Cloud Composer 2では更にGKEに寄ったアーキテクチャとなっています。例えばAirflow Web ServerはGAE(Google App Engine)からGKE上のDeployment上で動作するよう変更されています。

現在はCloud Composer 1とCloud Composer 2が提供されています。Cloud Composer 2になっての変更点はCloud Composer のバージョニングの概要に記載されていますが、ここではその中でも特に便利に感じた部分を紹介します。

柔軟なマシンスペックの指定

Cloud Composer 2では実行環境がより柔軟に指定出来るようになりました。

より細かいマシンスペックを指定可能

Cloud Composer 1のGKE環境はGKE Standard上に構築されていました。そのため、KubernetesのNodeを実行するvCPUやメモリーを予め決められたマシンタイプの中から選ぶ必要がありました。



Cloud Composer 2のGKE環境はGKE Autopilot上に構築されています。スケジューラー/ウェブサーバー/ワーカーのPodsで使うvCPU、メモリ、ストレージをそれぞれ個別に指定出来るようになりました。


Cloud Composer 1では環境構築後は変更不可であったワーカー、管理用Webサーバー、データベースのマシンスペックも後から変更出来るようになりました。このため環境構築時に将来のスケーリングについて正確に見積もる必要がなくなります。

また、ワークフローの性質に基づいて次のような運用も可能です。

  • 安定稼働しているワークフローではウェブサーバーの性能を落とし、緊急対応時のみウェブサーバーの性能を上げる
  • DAGは単純だが個別の処理が重たい場合、スケジューラーの性能を落としてワーカーの性能を上げる

ワーカーの水平スケールが可能

ワーカー数を負荷に応じて自動でスケールさせられるようになりました。特別な操作や設定は不要で、必要な性能に応じて性能を保ったまま低負荷時のコストを削減できます。

実際にCloud Composer 2の環境を用意して水平スケールを試してみます。Cloud Composer 2の水平スケールはCustom Metrics - Stackdriver Adapterを用いて得られる、未割り当てタスクと現在のワーカー状況を指標として使用します。

未割り当てタスク数とワーカーにより実行可能なタスク数が一致しなくなると次のようにスケーリングが実行されます。

今回はワーカーに0.5個のvCPU、1.875GBメモリ、1GBストレージを使用します。ワーカーの自動スケーリングは、ワーカーの最小数を1、最大数が3で試します。

Cloud Composer 2ではワーカー1vCPUあたりデフォルトで12個のタスクを同時に実行します。これではワーカーへの割当が0.5vCPUの場合でさえ、6タスクまで同時に実行可能となり、なかなかスケーリングが発生しません。

そこで1vCPUあたりの同時実行タスク数を減らしてスケーリングが起こりやすくします。この設定は「AIRFLOW構成のオーバーライド」タブから celeryworker_concurrency を書き換えることで変更ができます。

値に 1 を設定することでそれぞれのワーカーは一度に1つのタスクしか処理しなくなり、ワーカーが枯渇しやすくなります。

実行するDAGは次のとおりです。タスクを12並列で実行するDAGを用意しました。スケーリングは時間がかかるためスケール前にDAGが完了しないよう各タスクで60秒待機します。

import logging
import time
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow import models

default_dag_args = {"start_date": days_ago(2)}

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

with models.DAG(
    "parallel_tasks",
    default_args=default_dag_args,
    schedule_interval=None,
) as dag:

    def task_method(i: int, **context):
        time.sleep(60)
        logging.info(f"Hello: {i} Task")

    start_task = DummyOperator(task_id="start", dag=dag)

    for i in range(0, 12):
        task = PythonOperator(
            task_id=f"hello{i}",
            python_callable=task_method,
            provide_context=True,
            dag=dag,
            op_kwargs={"i": i},
        )
        start_task >> task

DAGのGraphは次のとおりです。

続きはこちら

株式会社ZOZOからお誘い
この話題に共感したら、メンバーと話してみませんか?
株式会社ZOZOでは一緒に働く仲間を募集しています

同じタグの記事

今週のランキング

株式会社 ZOZOさんにいいねを伝えよう
株式会社 ZOZOさんや会社があなたに興味を持つかも