Python

モダンデータスタックなワークフローオーケストレーションツール「Prefect」 試してみた

こんにちは!
みなさんPrefectについて知っていますでしょうか?

Prefectとは

PrefectはAirflowを意識してつくられたPythonベースのワークフローオーケストレーションツールです。
データパイプラインやワークフローのオーケストレーションを簡単かつ柔軟に行えるツールで、ローカル環境やクラウド環境での実行、エラー管理、自動化をサポートしています。

PrefectはOSSとSaaS(Prefect Cloud)があり、OSSでは自分たちでServerをたてて運用することもできます。ただしPrefect Cloudに比べると一部の機能がなかったりします。

Prefect CloudではFree、PRO、ENTERPRISEの3つのプランがあります。2024年9月時点ではPROだと$1850/monthからです。それぞれのプランで何ができるかの詳細は下記のリンクをご確認ください。

https://www.prefect.io/pricing

Prefectの機能について

Prefectの主な機能について簡単にAirflowと比較しながら紹介します。
※この記事の内容は筆者の知識や情報収集に基づいていますが、正確性や完全性を保証するものではありません。そのため実際に使用する際はそれぞれ公式ドキュメントを必ず確認するようにしてください。

公式でPrefectとAirflowの比較を出しています。
下記の表は上のリンクから抜き出したものになります。

Prefect vs. AirflowPrefectAirflow
Cron-based scheduling
Scalable orchestration infrastructure
Retries & logging
Automated task dependencies
Event-based triggers
Built-in notifications
Workflow-specific infrastructure
Cross-task data sharing

1.Pythonベースのワークフロー定義

ワークフローをPythonコードで直感的に記述でき、柔軟なタスクの依存関係やフローの設計が可能です。基本的に関数にデコレーターをつけるだけでタスクとフローを定義できます。

Prefect
from prefect import flow, task

@task
def task1():
    print("task1")

@flow
def first_flow():
    task1()
    print("hello prefect")

if __name__ == "__main__":
    first_flow()
Airflow

Airfowも2.0よりTask Flow APIを使用することで以前よりシンプルに書くことができるようになりました。

from airflow.utils.dates import days_ago
from airflow.decorators import task, dag

@task
def task_1():
    return 'first task'

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(2),
}


@dag(dag_id='taskflow', default_args=default_args)
def my_dag():
    op_1 = task_1()

dag = my_dag()

2.動的なワークフロー

実行時にタスクや依存関係を変更でき、AirflowのようにDAGを定義せずに複雑なワークフローや条件分岐が簡単に記述できます。

Prefect
import random
from prefect import flow, task

@task(name="task1")
def task1():
    print("task1")
    return random.randint(1,10)

@task
def task2():
    print('task2')

@task
def task3():
    print('task3')

@flow
def first_flow():
    num = task1()
    print(f"number:{num}")
    if num > 5:
        task2()
    else:
        task3()

    print("hello prefect")

if __name__ == "__main__":
    first_flow()
Airflow

Airflowの場合はtask.branchデコレータを使用して分岐させる必要があります。

  @task
    def random_fun():
        import random

        return random.randrange(-10, 10) > 0

    @task.branch()
    def branching(x):
        if x is True:
            return "true_branch"
        else:
            return "false_branch"

    @task()
    def true_branch():
        print("True")

    @task()
    def false_branch():
        print("False")

    branching(random_fun()) >> [true_branch(), false_branch()]

3.分散実行

Prefectは登録したフローごとに実行環境を変えて実行可能です。そのため重たい処理を実行するフローだけ強力なマシンを積んだ環境で実行することなどができます。フローを実行できる実行環境についてはローカル、AWS Elastic Container Service、Azure Container Instances、Docker、Google Cloud Run、Google Cloud Run V2、Google Vertex AI、Kubernetesがあります。

またタスクの並列処理もサポートしています。詳細についてはリンクをご確認ください。

Prefect
@task
def task_a():
    pass

@task
def task_b():
    pass

@task
def task_c():
    pass
    
@task
def task_d():
    pass

@flow
def my_flow():
    a = task_a.submit()
    b = task_b.submit()
    # Wait for task_a and task_b to complete
    c = task_c.submit(wait_for=[a, b])
    # task_d will wait for task_c to complete
    # Note: If waiting for one task it must still be in a list.
    d = task_d(wait_for=[c])
Airflow
from airflow.utils.dates import days_ago
from airflow.decorators import task, dag


@task
def task_1():
    return 'first task'

@task
def task_2():
    return 'second task'

@task
def task_3(value1, value2):
    return 'third task'

@task
def task_4(value):
    return 'fourth task'

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(2),
}


@dag(dag_id='taskflow', default_args=default_args)
def my_dag():
    op_1 = task_1()
    op_2 = task_2()
    op_3 = task_3(op_1, op_2)
    op_4 = task_4(op_3)

dag = my_dag()

4.再試行とエラーハンドリング

タスクの失敗に対する再試行やエラーハンドリングを簡単に設定できます。

Prefect
@task(retries=3)
def some_task_with_manual_backoff_retries():
Airflow
@task(retries=3)
def test_retries():

5.スケジューリング

Cronベースのスケジューリングやカスタムスケジューリングをサポートしています。
https://docs.prefect.io/3.0/automate/add-schedules#schedule-flow-runs

6.イベントベースのトリガー

特定のイベント(各タスクやフローの状態など)に基づいてワークフローを開始することができ、動的なトリガーベースの処理が可能です。
https://docs.prefect.io/3.0/automate/events/automations-triggers#create-an-automation

https://medium.com/the-prefect-blog/beyond-scheduling-event-driven-flows-with-prefect-b072edd06833

7.状態管理

WebUIで各タスクの状態(成功、失敗、実行中など)をリアルタイムで追跡し、全体のワークフロー状況をモニタリングできます。また画面もAirflowと比較するとモダンなデザインになっています。

Prefect
Airflow

8.タスク間のデータ共有

タスク間でデータを簡単に共有でき効率的なパイプライン設計が可能です。
AirflowではXcomというものを使う必要がありましたが、Airlfow2.0からTaskFlow APIを使用することで以前より簡潔に記述することができるようになりました

Prefect
from prefect import flow, task

@task(name="task1")
def task1(x):
    print(x)
    return x + 2

@task(name="task2")
def task2(x):
    print(x)
    return 2*x

@task(name="task3")
def task3(x):
    print(x)
    return x**2

@flow(name="first flow")
def first_flow(num):
    num1 = task1(num)
    num2 = task2(num1)
    num3 = task3(num2)
    print(num3)

if __name__ == "__main__":
    first_flow(3)
Airflow
from __future__ import annotations
import json
import pendulum
from airflow.decorators import dag, task

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():

    @task()
    def extract():
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict

    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}

    @task()
    def load(total_order_value: float):
        print(f"Total order value is: {total_order_value:.2f}")

    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])


tutorial_taskflow_api()

9.監視とロギング

各タスクの実行ログやメトリクスを収集し、パフォーマンスの監視や問題解決をサポートします。
PrefectCloudではフローが失敗したら二枚目のようにAIがエラーの原因をログから推測し、簡潔に表示してくれたりします。

どういう場面に合いそうか

Prefectを試してみた感想として、大規模なデータ処理や複雑なワークフローの管理に非常に適していると感じました。Prefectはタスク間の依存関係をPythonで簡単に記述できるため、特にMLOpsのような機械学習のパイプラインでは、データの前処理からモデルのトレーニング、評価、デプロイまでの一連の流れを効果的にオーケストレーションできます。

またPrefectはフローごとに異なる実行環境を簡単に設定できるため、マルチクラウドやハイブリッド環境でのワークフローの管理にも適していると感じました。様々な実行環境でのフロー実行が容易なため各フローに最適なリソースを選択できます。またコスト効率やパフォーマンスを最大限に引き出すことができるため、企業のニーズに合わせた柔軟なデータ処理が実現可能になります。

まとめ

PrefectについてAirflowと比較しながら紹介いたしました。
シンプルなワークフローにはオーバースペックかもしれませんが、Airflowに限らず現在複雑なワークフローを運用していて管理が大変などでしたらぜひPrefectを検討してみてください。

Streamlit in Snowflakeによるダッシュボード作成前のページ

VPC Service Controlsで「NO_MATCHING_ACCESS_LEVEL」とエラーが出た時にやること次のページ

ピックアップ記事

  1. 最速で理解したい人のためのIT用語集

関連記事

  1. Python

    PyTorchのキホンを理解する

    PyTorchのキホンを理解するNumpyのndarray(多次元配…

  2. Python

    わかりやすいPyTorch入門②(ニューラルネットワークによる分類)

    ニューラルネットワークを使ってワインの種類を分類する今回はsciki…

  3. Python

    pythonを使ったダミーデータ生成

    最初になにか発見したことを総合研究所で発表したり、デモ資料を作ったり…

  4. Adobe Analytics

    Google Analytics検証を効率的に進める方法

    0.初めにGoogle Analyticsがデータをとれているか検証…

  5. Generative AI

    ChainlitでのOAuth認証にスコープを追加する方法

    こんにちは!ChainlitというPythonでチャットアプ…

  6. ChatGPT

    LangChainのソースコードから実装を見てみる(ChatModelのinvoke編)

    生成AIのアプリケーション開発をするライブラリであるLangChai…

カテゴリ
最近の記事
  1. dbt Projects on SnowflakeをTASK…
  2. AWS発のAIエージェントIDE「Kiro」を使用した仕様駆…
  3. AWS発のAIエージェントIDE「Kiro」を使用した仕様駆…
  4. TableauとSnowflakeを接続する方法
  5. 【dbts25】Snowflake×PostgreSQLのニ…
  1. ブログ

    インドネシアのデジタルマーケティング
  2. IT用語集

    コマンド(Command)・コマンドプロンプト(Command Prompt)っ…
  3. Web解析

    コラム:Web解析から顧客体験分析(CXA)に
  4. Treasure Data

    Treasure Data(トレジャーデータ)でよく使う関数9選[presto]…
  5. Adobe Analytics

    Adobe Analytics:マーケティングチャネルの設定方法 全流入経路を1…
PAGE TOP