Python

モダンデータスタックなワークフローオーケストレーションツール「Prefect」 試してみた

こんにちは!
みなさんPrefectについて知っていますでしょうか?

Prefectとは

PrefectはAirflowを意識してつくられたPythonベースのワークフローオーケストレーションツールです。
データパイプラインやワークフローのオーケストレーションを簡単かつ柔軟に行えるツールで、ローカル環境やクラウド環境での実行、エラー管理、自動化をサポートしています。

PrefectはOSSとSaaS(Prefect Cloud)があり、OSSでは自分たちでServerをたてて運用することもできます。ただしPrefect Cloudに比べると一部の機能がなかったりします。

Prefect CloudではFree、PRO、ENTERPRISEの3つのプランがあります。2024年9月時点ではPROだと$1850/monthからです。それぞれのプランで何ができるかの詳細は下記のリンクをご確認ください。

https://www.prefect.io/pricing

Prefectの機能について

Prefectの主な機能について簡単にAirflowと比較しながら紹介します。
※この記事の内容は筆者の知識や情報収集に基づいていますが、正確性や完全性を保証するものではありません。そのため実際に使用する際はそれぞれ公式ドキュメントを必ず確認するようにしてください。

公式でPrefectとAirflowの比較を出しています。
下記の表は上のリンクから抜き出したものになります。

Prefect vs. AirflowPrefectAirflow
Cron-based scheduling
Scalable orchestration infrastructure
Retries & logging
Automated task dependencies
Event-based triggers
Built-in notifications
Workflow-specific infrastructure
Cross-task data sharing

1.Pythonベースのワークフロー定義

ワークフローをPythonコードで直感的に記述でき、柔軟なタスクの依存関係やフローの設計が可能です。基本的に関数にデコレーターをつけるだけでタスクとフローを定義できます。

Prefect
from prefect import flow, task

@task
def task1():
    print("task1")

@flow
def first_flow():
    task1()
    print("hello prefect")

if __name__ == "__main__":
    first_flow()
Airflow

Airfowも2.0よりTask Flow APIを使用することで以前よりシンプルに書くことができるようになりました。

from airflow.utils.dates import days_ago
from airflow.decorators import task, dag

@task
def task_1():
    return 'first task'

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(2),
}


@dag(dag_id='taskflow', default_args=default_args)
def my_dag():
    op_1 = task_1()

dag = my_dag()

2.動的なワークフロー

実行時にタスクや依存関係を変更でき、AirflowのようにDAGを定義せずに複雑なワークフローや条件分岐が簡単に記述できます。

Prefect
import random
from prefect import flow, task

@task(name="task1")
def task1():
    print("task1")
    return random.randint(1,10)

@task
def task2():
    print('task2')

@task
def task3():
    print('task3')

@flow
def first_flow():
    num = task1()
    print(f"number:{num}")
    if num > 5:
        task2()
    else:
        task3()

    print("hello prefect")

if __name__ == "__main__":
    first_flow()
Airflow

Airflowの場合はtask.branchデコレータを使用して分岐させる必要があります。

  @task
    def random_fun():
        import random

        return random.randrange(-10, 10) > 0

    @task.branch()
    def branching(x):
        if x is True:
            return "true_branch"
        else:
            return "false_branch"

    @task()
    def true_branch():
        print("True")

    @task()
    def false_branch():
        print("False")

    branching(random_fun()) >> [true_branch(), false_branch()]

3.分散実行

Prefectは登録したフローごとに実行環境を変えて実行可能です。そのため重たい処理を実行するフローだけ強力なマシンを積んだ環境で実行することなどができます。フローを実行できる実行環境についてはローカル、AWS Elastic Container Service、Azure Container Instances、Docker、Google Cloud Run、Google Cloud Run V2、Google Vertex AI、Kubernetesがあります。

またタスクの並列処理もサポートしています。詳細についてはリンクをご確認ください。

Prefect
@task
def task_a():
    pass

@task
def task_b():
    pass

@task
def task_c():
    pass
    
@task
def task_d():
    pass

@flow
def my_flow():
    a = task_a.submit()
    b = task_b.submit()
    # Wait for task_a and task_b to complete
    c = task_c.submit(wait_for=[a, b])
    # task_d will wait for task_c to complete
    # Note: If waiting for one task it must still be in a list.
    d = task_d(wait_for=[c])
Airflow
from airflow.utils.dates import days_ago
from airflow.decorators import task, dag


@task
def task_1():
    return 'first task'

@task
def task_2():
    return 'second task'

@task
def task_3(value1, value2):
    return 'third task'

@task
def task_4(value):
    return 'fourth task'

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(2),
}


@dag(dag_id='taskflow', default_args=default_args)
def my_dag():
    op_1 = task_1()
    op_2 = task_2()
    op_3 = task_3(op_1, op_2)
    op_4 = task_4(op_3)

dag = my_dag()

4.再試行とエラーハンドリング

タスクの失敗に対する再試行やエラーハンドリングを簡単に設定できます。

Prefect
@task(retries=3)
def some_task_with_manual_backoff_retries():
Airflow
@task(retries=3)
def test_retries():

5.スケジューリング

Cronベースのスケジューリングやカスタムスケジューリングをサポートしています。
https://docs.prefect.io/3.0/automate/add-schedules#schedule-flow-runs

6.イベントベースのトリガー

特定のイベント(各タスクやフローの状態など)に基づいてワークフローを開始することができ、動的なトリガーベースの処理が可能です。
https://docs.prefect.io/3.0/automate/events/automations-triggers#create-an-automation

https://medium.com/the-prefect-blog/beyond-scheduling-event-driven-flows-with-prefect-b072edd06833

7.状態管理

WebUIで各タスクの状態(成功、失敗、実行中など)をリアルタイムで追跡し、全体のワークフロー状況をモニタリングできます。また画面もAirflowと比較するとモダンなデザインになっています。

Prefect
Airflow

8.タスク間のデータ共有

タスク間でデータを簡単に共有でき効率的なパイプライン設計が可能です。
AirflowではXcomというものを使う必要がありましたが、Airlfow2.0からTaskFlow APIを使用することで以前より簡潔に記述することができるようになりました

Prefect
from prefect import flow, task

@task(name="task1")
def task1(x):
    print(x)
    return x + 2

@task(name="task2")
def task2(x):
    print(x)
    return 2*x

@task(name="task3")
def task3(x):
    print(x)
    return x**2

@flow(name="first flow")
def first_flow(num):
    num1 = task1(num)
    num2 = task2(num1)
    num3 = task3(num2)
    print(num3)

if __name__ == "__main__":
    first_flow(3)
Airflow
from __future__ import annotations
import json
import pendulum
from airflow.decorators import dag, task

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():

    @task()
    def extract():
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict

    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}

    @task()
    def load(total_order_value: float):
        print(f"Total order value is: {total_order_value:.2f}")

    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])


tutorial_taskflow_api()

9.監視とロギング

各タスクの実行ログやメトリクスを収集し、パフォーマンスの監視や問題解決をサポートします。
PrefectCloudではフローが失敗したら二枚目のようにAIがエラーの原因をログから推測し、簡潔に表示してくれたりします。

どういう場面に合いそうか

Prefectを試してみた感想として、大規模なデータ処理や複雑なワークフローの管理に非常に適していると感じました。Prefectはタスク間の依存関係をPythonで簡単に記述できるため、特にMLOpsのような機械学習のパイプラインでは、データの前処理からモデルのトレーニング、評価、デプロイまでの一連の流れを効果的にオーケストレーションできます。

またPrefectはフローごとに異なる実行環境を簡単に設定できるため、マルチクラウドやハイブリッド環境でのワークフローの管理にも適していると感じました。様々な実行環境でのフロー実行が容易なため各フローに最適なリソースを選択できます。またコスト効率やパフォーマンスを最大限に引き出すことができるため、企業のニーズに合わせた柔軟なデータ処理が実現可能になります。

まとめ

PrefectについてAirflowと比較しながら紹介いたしました。
シンプルなワークフローにはオーバースペックかもしれませんが、Airflowに限らず現在複雑なワークフローを運用していて管理が大変などでしたらぜひPrefectを検討してみてください。

Streamlit in Snowflakeによるダッシュボード作成前のページ

VPC Service Controlsで「NO_MATCHING_ACCESS_LEVEL」とエラーが出た時にやること次のページ

ピックアップ記事

  1. 最速で理解したい人のためのIT用語集

関連記事

  1. Generative AI

    ChainlitでのOAuth認証にスコープを追加する方法

    こんにちは!ChainlitというPythonでチャットアプ…

  2. Google Cloud Platform

    Vertex AI Embeddings for Text によるテキストエンベディングをやってみた…

    こんにちは、石原と申します。自然言語処理(NLP)は近年のA…

  3. Python

    回帰分析はかく語りき Part1 単回帰分析

    こんにちは、小郷です。回帰と言えばフリードリヒ・ニーチェの永劫回帰を…

  4. Python

    市区町村一覧・自治体の一覧を取得する

    最初に顧客マスタには郵便番号や市区町村をデータを持っていることが多い…

  5. Python

    わかりやすいPyTorch入門①(学習と評価)

    Google ColabでPyTorchを触ってみるまずはGoogl…

  6. Python

    わかりやすいPyTorch入門②(ニューラルネットワークによる分類)

    ニューラルネットワークを使ってワインの種類を分類する今回はsciki…

カテゴリ
最近の記事
  1. dbt Fusion使ってみた
  2. Manusを使ってみたうえでManusに感想ブログを書かせて…
  3. SquadbaseとStreamlitでお手軽アプリ開発
  4. [Snowflake Summit 2025] Snowfl…
  5. [Snowflake新機能]AI_AGGを試してみた
  1. Databricks

    Databricks: Spark DataFramesをJDBCから作成する
  2. IT用語集

    Java(ジャバ)って何?
  3. ブログ

    ④DMPについてーOracle DMP
  4. Tableau

    【学生向け】TableauのアカデミックプログラムでTableauが無料で使える…
  5. IT用語集

    インポート(Import)とエクスポート(Export)って何?
PAGE TOP