こんにちは!
みなさんPrefectについて知っていますでしょうか?
Prefectとは
PrefectはAirflowを意識してつくられたPythonベースのワークフローオーケストレーションツールです。
データパイプラインやワークフローのオーケストレーションを簡単かつ柔軟に行えるツールで、ローカル環境やクラウド環境での実行、エラー管理、自動化をサポートしています。
PrefectはOSSとSaaS(Prefect Cloud)があり、OSSでは自分たちでServerをたてて運用することもできます。ただしPrefect Cloudに比べると一部の機能がなかったりします。
Prefect CloudではFree、PRO、ENTERPRISEの3つのプランがあります。2024年9月時点ではPROだと$1850/monthからです。それぞれのプランで何ができるかの詳細は下記のリンクをご確認ください。
https://www.prefect.io/pricing
Prefectの機能について
Prefectの主な機能について簡単にAirflowと比較しながら紹介します。
※この記事の内容は筆者の知識や情報収集に基づいていますが、正確性や完全性を保証するものではありません。そのため実際に使用する際はそれぞれ公式ドキュメントを必ず確認するようにしてください。
公式でPrefectとAirflowの比較を出しています。
下記の表は上のリンクから抜き出したものになります。
Prefect vs. Airflow | Prefect | Airflow |
Cron-based scheduling | ✔ | ✔ |
Scalable orchestration infrastructure | ✔ | ✔ |
Retries & logging | ✔ | ✔ |
Automated task dependencies | ✔ | |
Event-based triggers | ✔ | |
Built-in notifications | ✔ | |
Workflow-specific infrastructure | ✔ | |
Cross-task data sharing | ✔ |
1.Pythonベースのワークフロー定義
ワークフローをPythonコードで直感的に記述でき、柔軟なタスクの依存関係やフローの設計が可能です。基本的に関数にデコレーターをつけるだけでタスクとフローを定義できます。
Prefect
from prefect import flow, task
@task
def task1():
print("task1")
@flow
def first_flow():
task1()
print("hello prefect")
if __name__ == "__main__":
first_flow()
Airflow
Airfowも2.0よりTask Flow APIを使用することで以前よりシンプルに書くことができるようになりました。
from airflow.utils.dates import days_ago
from airflow.decorators import task, dag
@task
def task_1():
return 'first task'
default_args = {
'owner': 'airflow',
'start_date': days_ago(2),
}
@dag(dag_id='taskflow', default_args=default_args)
def my_dag():
op_1 = task_1()
dag = my_dag()
2.動的なワークフロー
実行時にタスクや依存関係を変更でき、AirflowのようにDAGを定義せずに複雑なワークフローや条件分岐が簡単に記述できます。
Prefect
import random
from prefect import flow, task
@task(name="task1")
def task1():
print("task1")
return random.randint(1,10)
@task
def task2():
print('task2')
@task
def task3():
print('task3')
@flow
def first_flow():
num = task1()
print(f"number:{num}")
if num > 5:
task2()
else:
task3()
print("hello prefect")
if __name__ == "__main__":
first_flow()
Airflow
Airflowの場合はtask.branchデコレータを使用して分岐させる必要があります。
@task
def random_fun():
import random
return random.randrange(-10, 10) > 0
@task.branch()
def branching(x):
if x is True:
return "true_branch"
else:
return "false_branch"
@task()
def true_branch():
print("True")
@task()
def false_branch():
print("False")
branching(random_fun()) >> [true_branch(), false_branch()]
3.分散実行
Prefectは登録したフローごとに実行環境を変えて実行可能です。そのため重たい処理を実行するフローだけ強力なマシンを積んだ環境で実行することなどができます。フローを実行できる実行環境についてはローカル、AWS Elastic Container Service、Azure Container Instances、Docker、Google Cloud Run、Google Cloud Run V2、Google Vertex AI、Kubernetesがあります。
またタスクの並列処理もサポートしています。詳細についてはリンクをご確認ください。
Prefect
@task
def task_a():
pass
@task
def task_b():
pass
@task
def task_c():
pass
@task
def task_d():
pass
@flow
def my_flow():
a = task_a.submit()
b = task_b.submit()
# Wait for task_a and task_b to complete
c = task_c.submit(wait_for=[a, b])
# task_d will wait for task_c to complete
# Note: If waiting for one task it must still be in a list.
d = task_d(wait_for=[c])
Airflow
from airflow.utils.dates import days_ago
from airflow.decorators import task, dag
@task
def task_1():
return 'first task'
@task
def task_2():
return 'second task'
@task
def task_3(value1, value2):
return 'third task'
@task
def task_4(value):
return 'fourth task'
default_args = {
'owner': 'airflow',
'start_date': days_ago(2),
}
@dag(dag_id='taskflow', default_args=default_args)
def my_dag():
op_1 = task_1()
op_2 = task_2()
op_3 = task_3(op_1, op_2)
op_4 = task_4(op_3)
dag = my_dag()
4.再試行とエラーハンドリング
タスクの失敗に対する再試行やエラーハンドリングを簡単に設定できます。
Prefect
@task(retries=3)
def some_task_with_manual_backoff_retries():
Airflow
@task(retries=3)
def test_retries():
5.スケジューリング
Cronベースのスケジューリングやカスタムスケジューリングをサポートしています。
https://docs.prefect.io/3.0/automate/add-schedules#schedule-flow-runs
6.イベントベースのトリガー
特定のイベント(各タスクやフローの状態など)に基づいてワークフローを開始することができ、動的なトリガーベースの処理が可能です。
https://docs.prefect.io/3.0/automate/events/automations-triggers#create-an-automation
https://medium.com/the-prefect-blog/beyond-scheduling-event-driven-flows-with-prefect-b072edd06833
7.状態管理
WebUIで各タスクの状態(成功、失敗、実行中など)をリアルタイムで追跡し、全体のワークフロー状況をモニタリングできます。また画面もAirflowと比較するとモダンなデザインになっています。
Prefect
Airflow
8.タスク間のデータ共有
タスク間でデータを簡単に共有でき効率的なパイプライン設計が可能です。
AirflowではXcomというものを使う必要がありましたが、Airlfow2.0からTaskFlow APIを使用することで以前より簡潔に記述することができるようになりました
Prefect
from prefect import flow, task
@task(name="task1")
def task1(x):
print(x)
return x + 2
@task(name="task2")
def task2(x):
print(x)
return 2*x
@task(name="task3")
def task3(x):
print(x)
return x**2
@flow(name="first flow")
def first_flow(num):
num1 = task1(num)
num2 = task2(num1)
num3 = task3(num2)
print(num3)
if __name__ == "__main__":
first_flow(3)
Airflow
from __future__ import annotations
import json
import pendulum
from airflow.decorators import dag, task
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def tutorial_taskflow_api():
@task()
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
@task()
def load(total_order_value: float):
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
tutorial_taskflow_api()
9.監視とロギング
各タスクの実行ログやメトリクスを収集し、パフォーマンスの監視や問題解決をサポートします。
PrefectCloudではフローが失敗したら二枚目のようにAIがエラーの原因をログから推測し、簡潔に表示してくれたりします。
どういう場面に合いそうか
Prefectを試してみた感想として、大規模なデータ処理や複雑なワークフローの管理に非常に適していると感じました。Prefectはタスク間の依存関係をPythonで簡単に記述できるため、特にMLOpsのような機械学習のパイプラインでは、データの前処理からモデルのトレーニング、評価、デプロイまでの一連の流れを効果的にオーケストレーションできます。
またPrefectはフローごとに異なる実行環境を簡単に設定できるため、マルチクラウドやハイブリッド環境でのワークフローの管理にも適していると感じました。様々な実行環境でのフロー実行が容易なため各フローに最適なリソースを選択できます。またコスト効率やパフォーマンスを最大限に引き出すことができるため、企業のニーズに合わせた柔軟なデータ処理が実現可能になります。
まとめ
PrefectについてAirflowと比較しながら紹介いたしました。
シンプルなワークフローにはオーバースペックかもしれませんが、Airflowに限らず現在複雑なワークフローを運用していて管理が大変などでしたらぜひPrefectを検討してみてください。