Python

モダンデータスタックなワークフローオーケストレーションツール「Prefect」 試してみた

こんにちは!
みなさんPrefectについて知っていますでしょうか?

Prefectとは

PrefectはAirflowを意識してつくられたPythonベースのワークフローオーケストレーションツールです。
データパイプラインやワークフローのオーケストレーションを簡単かつ柔軟に行えるツールで、ローカル環境やクラウド環境での実行、エラー管理、自動化をサポートしています。

PrefectはOSSとSaaS(Prefect Cloud)があり、OSSでは自分たちでServerをたてて運用することもできます。ただしPrefect Cloudに比べると一部の機能がなかったりします。

Prefect CloudではFree、PRO、ENTERPRISEの3つのプランがあります。2024年9月時点ではPROだと$1850/monthからです。それぞれのプランで何ができるかの詳細は下記のリンクをご確認ください。

https://www.prefect.io/pricing

Prefectの機能について

Prefectの主な機能について簡単にAirflowと比較しながら紹介します。
※この記事の内容は筆者の知識や情報収集に基づいていますが、正確性や完全性を保証するものではありません。そのため実際に使用する際はそれぞれ公式ドキュメントを必ず確認するようにしてください。

公式でPrefectとAirflowの比較を出しています。
下記の表は上のリンクから抜き出したものになります。

Prefect vs. AirflowPrefectAirflow
Cron-based scheduling
Scalable orchestration infrastructure
Retries & logging
Automated task dependencies
Event-based triggers
Built-in notifications
Workflow-specific infrastructure
Cross-task data sharing

1.Pythonベースのワークフロー定義

ワークフローをPythonコードで直感的に記述でき、柔軟なタスクの依存関係やフローの設計が可能です。基本的に関数にデコレーターをつけるだけでタスクとフローを定義できます。

Prefect
from prefect import flow, task

@task
def task1():
    print("task1")

@flow
def first_flow():
    task1()
    print("hello prefect")

if __name__ == "__main__":
    first_flow()
Airflow

Airfowも2.0よりTask Flow APIを使用することで以前よりシンプルに書くことができるようになりました。

from airflow.utils.dates import days_ago
from airflow.decorators import task, dag

@task
def task_1():
    return 'first task'

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(2),
}


@dag(dag_id='taskflow', default_args=default_args)
def my_dag():
    op_1 = task_1()

dag = my_dag()

2.動的なワークフロー

実行時にタスクや依存関係を変更でき、AirflowのようにDAGを定義せずに複雑なワークフローや条件分岐が簡単に記述できます。

Prefect
import random
from prefect import flow, task

@task(name="task1")
def task1():
    print("task1")
    return random.randint(1,10)

@task
def task2():
    print('task2')

@task
def task3():
    print('task3')

@flow
def first_flow():
    num = task1()
    print(f"number:{num}")
    if num > 5:
        task2()
    else:
        task3()

    print("hello prefect")

if __name__ == "__main__":
    first_flow()
Airflow

Airflowの場合はtask.branchデコレータを使用して分岐させる必要があります。

  @task
    def random_fun():
        import random

        return random.randrange(-10, 10) > 0

    @task.branch()
    def branching(x):
        if x is True:
            return "true_branch"
        else:
            return "false_branch"

    @task()
    def true_branch():
        print("True")

    @task()
    def false_branch():
        print("False")

    branching(random_fun()) >> [true_branch(), false_branch()]

3.分散実行

Prefectは登録したフローごとに実行環境を変えて実行可能です。そのため重たい処理を実行するフローだけ強力なマシンを積んだ環境で実行することなどができます。フローを実行できる実行環境についてはローカル、AWS Elastic Container Service、Azure Container Instances、Docker、Google Cloud Run、Google Cloud Run V2、Google Vertex AI、Kubernetesがあります。

またタスクの並列処理もサポートしています。詳細についてはリンクをご確認ください。

Prefect
@task
def task_a():
    pass

@task
def task_b():
    pass

@task
def task_c():
    pass
    
@task
def task_d():
    pass

@flow
def my_flow():
    a = task_a.submit()
    b = task_b.submit()
    # Wait for task_a and task_b to complete
    c = task_c.submit(wait_for=[a, b])
    # task_d will wait for task_c to complete
    # Note: If waiting for one task it must still be in a list.
    d = task_d(wait_for=[c])
Airflow
from airflow.utils.dates import days_ago
from airflow.decorators import task, dag


@task
def task_1():
    return 'first task'

@task
def task_2():
    return 'second task'

@task
def task_3(value1, value2):
    return 'third task'

@task
def task_4(value):
    return 'fourth task'

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(2),
}


@dag(dag_id='taskflow', default_args=default_args)
def my_dag():
    op_1 = task_1()
    op_2 = task_2()
    op_3 = task_3(op_1, op_2)
    op_4 = task_4(op_3)

dag = my_dag()

4.再試行とエラーハンドリング

タスクの失敗に対する再試行やエラーハンドリングを簡単に設定できます。

Prefect
@task(retries=3)
def some_task_with_manual_backoff_retries():
Airflow
@task(retries=3)
def test_retries():

5.スケジューリング

Cronベースのスケジューリングやカスタムスケジューリングをサポートしています。
https://docs.prefect.io/3.0/automate/add-schedules#schedule-flow-runs

6.イベントベースのトリガー

特定のイベント(各タスクやフローの状態など)に基づいてワークフローを開始することができ、動的なトリガーベースの処理が可能です。
https://docs.prefect.io/3.0/automate/events/automations-triggers#create-an-automation

https://medium.com/the-prefect-blog/beyond-scheduling-event-driven-flows-with-prefect-b072edd06833

7.状態管理

WebUIで各タスクの状態(成功、失敗、実行中など)をリアルタイムで追跡し、全体のワークフロー状況をモニタリングできます。また画面もAirflowと比較するとモダンなデザインになっています。

Prefect
Airflow

8.タスク間のデータ共有

タスク間でデータを簡単に共有でき効率的なパイプライン設計が可能です。
AirflowではXcomというものを使う必要がありましたが、Airlfow2.0からTaskFlow APIを使用することで以前より簡潔に記述することができるようになりました

Prefect
from prefect import flow, task

@task(name="task1")
def task1(x):
    print(x)
    return x + 2

@task(name="task2")
def task2(x):
    print(x)
    return 2*x

@task(name="task3")
def task3(x):
    print(x)
    return x**2

@flow(name="first flow")
def first_flow(num):
    num1 = task1(num)
    num2 = task2(num1)
    num3 = task3(num2)
    print(num3)

if __name__ == "__main__":
    first_flow(3)
Airflow
from __future__ import annotations
import json
import pendulum
from airflow.decorators import dag, task

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():

    @task()
    def extract():
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict

    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}

    @task()
    def load(total_order_value: float):
        print(f"Total order value is: {total_order_value:.2f}")

    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])


tutorial_taskflow_api()

9.監視とロギング

各タスクの実行ログやメトリクスを収集し、パフォーマンスの監視や問題解決をサポートします。
PrefectCloudではフローが失敗したら二枚目のようにAIがエラーの原因をログから推測し、簡潔に表示してくれたりします。

どういう場面に合いそうか

Prefectを試してみた感想として、大規模なデータ処理や複雑なワークフローの管理に非常に適していると感じました。Prefectはタスク間の依存関係をPythonで簡単に記述できるため、特にMLOpsのような機械学習のパイプラインでは、データの前処理からモデルのトレーニング、評価、デプロイまでの一連の流れを効果的にオーケストレーションできます。

またPrefectはフローごとに異なる実行環境を簡単に設定できるため、マルチクラウドやハイブリッド環境でのワークフローの管理にも適していると感じました。様々な実行環境でのフロー実行が容易なため各フローに最適なリソースを選択できます。またコスト効率やパフォーマンスを最大限に引き出すことができるため、企業のニーズに合わせた柔軟なデータ処理が実現可能になります。

まとめ

PrefectについてAirflowと比較しながら紹介いたしました。
シンプルなワークフローにはオーバースペックかもしれませんが、Airflowに限らず現在複雑なワークフローを運用していて管理が大変などでしたらぜひPrefectを検討してみてください。

ピックアップ記事

  1. 最速で理解したい人のためのIT用語集

関連記事

  1. ChatGPT

    LangChainって何?: 次世代AIアプリケーション構築 その2

    こんにちは、エクスチュアの石原です。こちらは第2回の記事にな…

  2. Adobe Analytics

    Google Analytics検証を効率的に進める方法

    0.初めにGoogle Analyticsがデータをとれているか検証…

  3. Python

    PyTorchのキホンを理解する

    PyTorchのキホンを理解するNumpyのndarray(多次元配…

  4. Python

    わかりやすいPyTorch入門⑤(CNNとデータの拡張)

    CNNとデータの拡張データの拡張とは今回は前回学んだCNNの練習に…

  5. ChatGPT

    LangChainって何?: 次世代AIアプリケーション構築 その1

    こんにちは、エクスチュアの石原です。近年、大規模言語モデル(…

  6. Python

    Streamlit in Snowflakeによるダッシュボード作成

    こんにちは、エクスチュアの石原です。前回に引き続き、Stre…

最近の記事

  1. AIを使ったマーケティングゲームを作ってみた
  2. Snowflakeや最新データ基盤が広義のマーケティングにも…
  3. 回帰分析はかく語りき Part3 ロジスティック回帰
  4. GCSへのSnowflake Open Catalogによる…
  5. VPC Service Controlsで「NO_MATCH…
  1. Adobe Analytics

    はじめてのAdobe Analytics実装
  2. Databricks

    databricks:GCPで利用を開始する
  3. Adobe Analytics

    Adobe Analytics: DatafeedをGoogle BigQuer…
  4. Adobe Dynamic Tag Manager

    Tag Manager: Adobe DTM で Google Analytic…
  5. Mouseflow

    ページ分析ツールの強み
PAGE TOP