Python

モダンデータスタックなワークフローオーケストレーションツール「Prefect」 試してみた

こんにちは!
みなさんPrefectについて知っていますでしょうか?

Prefectとは

PrefectはAirflowを意識してつくられたPythonベースのワークフローオーケストレーションツールです。
データパイプラインやワークフローのオーケストレーションを簡単かつ柔軟に行えるツールで、ローカル環境やクラウド環境での実行、エラー管理、自動化をサポートしています。

PrefectはOSSとSaaS(Prefect Cloud)があり、OSSでは自分たちでServerをたてて運用することもできます。ただしPrefect Cloudに比べると一部の機能がなかったりします。

Prefect CloudではFree、PRO、ENTERPRISEの3つのプランがあります。2024年9月時点ではPROだと$1850/monthからです。それぞれのプランで何ができるかの詳細は下記のリンクをご確認ください。

https://www.prefect.io/pricing

Prefectの機能について

Prefectの主な機能について簡単にAirflowと比較しながら紹介します。
※この記事の内容は筆者の知識や情報収集に基づいていますが、正確性や完全性を保証するものではありません。そのため実際に使用する際はそれぞれ公式ドキュメントを必ず確認するようにしてください。

公式でPrefectとAirflowの比較を出しています。
下記の表は上のリンクから抜き出したものになります。

Prefect vs. AirflowPrefectAirflow
Cron-based scheduling
Scalable orchestration infrastructure
Retries & logging
Automated task dependencies
Event-based triggers
Built-in notifications
Workflow-specific infrastructure
Cross-task data sharing

1.Pythonベースのワークフロー定義

ワークフローをPythonコードで直感的に記述でき、柔軟なタスクの依存関係やフローの設計が可能です。基本的に関数にデコレーターをつけるだけでタスクとフローを定義できます。

Prefect
from prefect import flow, task

@task
def task1():
    print("task1")

@flow
def first_flow():
    task1()
    print("hello prefect")

if __name__ == "__main__":
    first_flow()
Airflow

Airfowも2.0よりTask Flow APIを使用することで以前よりシンプルに書くことができるようになりました。

from airflow.utils.dates import days_ago
from airflow.decorators import task, dag

@task
def task_1():
    return 'first task'

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(2),
}


@dag(dag_id='taskflow', default_args=default_args)
def my_dag():
    op_1 = task_1()

dag = my_dag()

2.動的なワークフロー

実行時にタスクや依存関係を変更でき、AirflowのようにDAGを定義せずに複雑なワークフローや条件分岐が簡単に記述できます。

Prefect
import random
from prefect import flow, task

@task(name="task1")
def task1():
    print("task1")
    return random.randint(1,10)

@task
def task2():
    print('task2')

@task
def task3():
    print('task3')

@flow
def first_flow():
    num = task1()
    print(f"number:{num}")
    if num > 5:
        task2()
    else:
        task3()

    print("hello prefect")

if __name__ == "__main__":
    first_flow()
Airflow

Airflowの場合はtask.branchデコレータを使用して分岐させる必要があります。

  @task
    def random_fun():
        import random

        return random.randrange(-10, 10) > 0

    @task.branch()
    def branching(x):
        if x is True:
            return "true_branch"
        else:
            return "false_branch"

    @task()
    def true_branch():
        print("True")

    @task()
    def false_branch():
        print("False")

    branching(random_fun()) >> [true_branch(), false_branch()]

3.分散実行

Prefectは登録したフローごとに実行環境を変えて実行可能です。そのため重たい処理を実行するフローだけ強力なマシンを積んだ環境で実行することなどができます。フローを実行できる実行環境についてはローカル、AWS Elastic Container Service、Azure Container Instances、Docker、Google Cloud Run、Google Cloud Run V2、Google Vertex AI、Kubernetesがあります。

またタスクの並列処理もサポートしています。詳細についてはリンクをご確認ください。

Prefect
@task
def task_a():
    pass

@task
def task_b():
    pass

@task
def task_c():
    pass
    
@task
def task_d():
    pass

@flow
def my_flow():
    a = task_a.submit()
    b = task_b.submit()
    # Wait for task_a and task_b to complete
    c = task_c.submit(wait_for=[a, b])
    # task_d will wait for task_c to complete
    # Note: If waiting for one task it must still be in a list.
    d = task_d(wait_for=[c])
Airflow
from airflow.utils.dates import days_ago
from airflow.decorators import task, dag


@task
def task_1():
    return 'first task'

@task
def task_2():
    return 'second task'

@task
def task_3(value1, value2):
    return 'third task'

@task
def task_4(value):
    return 'fourth task'

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(2),
}


@dag(dag_id='taskflow', default_args=default_args)
def my_dag():
    op_1 = task_1()
    op_2 = task_2()
    op_3 = task_3(op_1, op_2)
    op_4 = task_4(op_3)

dag = my_dag()

4.再試行とエラーハンドリング

タスクの失敗に対する再試行やエラーハンドリングを簡単に設定できます。

Prefect
@task(retries=3)
def some_task_with_manual_backoff_retries():
Airflow
@task(retries=3)
def test_retries():

5.スケジューリング

Cronベースのスケジューリングやカスタムスケジューリングをサポートしています。
https://docs.prefect.io/3.0/automate/add-schedules#schedule-flow-runs

6.イベントベースのトリガー

特定のイベント(各タスクやフローの状態など)に基づいてワークフローを開始することができ、動的なトリガーベースの処理が可能です。
https://docs.prefect.io/3.0/automate/events/automations-triggers#create-an-automation

https://medium.com/the-prefect-blog/beyond-scheduling-event-driven-flows-with-prefect-b072edd06833

7.状態管理

WebUIで各タスクの状態(成功、失敗、実行中など)をリアルタイムで追跡し、全体のワークフロー状況をモニタリングできます。また画面もAirflowと比較するとモダンなデザインになっています。

Prefect
Airflow

8.タスク間のデータ共有

タスク間でデータを簡単に共有でき効率的なパイプライン設計が可能です。
AirflowではXcomというものを使う必要がありましたが、Airlfow2.0からTaskFlow APIを使用することで以前より簡潔に記述することができるようになりました

Prefect
from prefect import flow, task

@task(name="task1")
def task1(x):
    print(x)
    return x + 2

@task(name="task2")
def task2(x):
    print(x)
    return 2*x

@task(name="task3")
def task3(x):
    print(x)
    return x**2

@flow(name="first flow")
def first_flow(num):
    num1 = task1(num)
    num2 = task2(num1)
    num3 = task3(num2)
    print(num3)

if __name__ == "__main__":
    first_flow(3)
Airflow
from __future__ import annotations
import json
import pendulum
from airflow.decorators import dag, task

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():

    @task()
    def extract():
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict

    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}

    @task()
    def load(total_order_value: float):
        print(f"Total order value is: {total_order_value:.2f}")

    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])


tutorial_taskflow_api()

9.監視とロギング

各タスクの実行ログやメトリクスを収集し、パフォーマンスの監視や問題解決をサポートします。
PrefectCloudではフローが失敗したら二枚目のようにAIがエラーの原因をログから推測し、簡潔に表示してくれたりします。

どういう場面に合いそうか

Prefectを試してみた感想として、大規模なデータ処理や複雑なワークフローの管理に非常に適していると感じました。Prefectはタスク間の依存関係をPythonで簡単に記述できるため、特にMLOpsのような機械学習のパイプラインでは、データの前処理からモデルのトレーニング、評価、デプロイまでの一連の流れを効果的にオーケストレーションできます。

またPrefectはフローごとに異なる実行環境を簡単に設定できるため、マルチクラウドやハイブリッド環境でのワークフローの管理にも適していると感じました。様々な実行環境でのフロー実行が容易なため各フローに最適なリソースを選択できます。またコスト効率やパフォーマンスを最大限に引き出すことができるため、企業のニーズに合わせた柔軟なデータ処理が実現可能になります。

まとめ

PrefectについてAirflowと比較しながら紹介いたしました。
シンプルなワークフローにはオーバースペックかもしれませんが、Airflowに限らず現在複雑なワークフローを運用していて管理が大変などでしたらぜひPrefectを検討してみてください。

ピックアップ記事

  1. 最速で理解したい人のためのIT用語集

関連記事

  1. Google Cloud Platform

    Vertex AI Embeddings for Text によるテキストエンベディングをやってみた…

    こんにちは、石原と申します。自然言語処理(NLP)は近年のA…

  2. ChatGPT

    ChatGPTとVSCodeの連携方法とその使用例:開発効率を飛躍的にアップさせる

    こんにちは、石原と申します。今回のブログが初投稿となります。…

  3. Python

    Python クローリング&スクレイピング

    最初に顧客マスタのデータに別の角度から考察を加えたいとき、外部から何…

  4. Adobe Analytics

    Google Analytics検証を効率的に進める方法

    0.初めにGoogle Analyticsがデータをとれているか検証…

  5. Python

    市区町村一覧・自治体の一覧を取得する

    最初に顧客マスタには郵便番号や市区町村をデータを持っていることが多い…

  6. ChatGPT

    LangChainって何?: 次世代AIアプリケーション構築 その3

    こんにちは、エクスチュアの石原です。こちらは第3回の記事にな…

最近の記事

  1. モダンデータスタックなワークフローオーケストレーションツール…
  2. Streamlit in Snowflakeによるダッシュボ…
  3. Streamlit in SnowflakeによるStrea…
  4. Streamlitを使った簡単なデータアプリケーション作成ガ…
  5. 生成AI機能を活かしたデータカタログ製品「Secoda」を試…
  1. ヒートマップ

    【Tips】ヒートマップ機能について②~ムーブメント、アテンション編~
  2. ObservePoint

    ObservePointの活用で自動車メーカー フォードが達成した4つの成果
  3. ヒートマップ

    Mouseflowの新機能:ライブヒートマップで動的なヒートマップ分析が可能に!…
  4. IT用語集

    コンテナ(Container)って何?
  5. Pardot

    Pardotの初期セットアップをする① DNSレコード設定とトラッカードメイン設…
PAGE TOP