Google Cloud Platform

【GCP】Cloud Workflowsでデータパイプラインの構築を試してみた①概要編

こんにちは、エクスチュアの黒岩と申します。

エクスチュアブログでは今回の記事が初投稿となります。
エクスチュアではデータエンジニアとして、クラウドやETLツールを使用したデータ基盤の構築を主に行っています。その中でもMatillionというETLツールを約3年ほど扱っており、設計から構築まで一貫して行うことも可能ですので、ご興味のある企業様やMatillion活用に困っている方、是非お気軽にお問い合わせください!(今後Matillionの記事も書いていきたい、、)

さて今回は、Google CloudのサービスであるCloud Workflowsを使用する機会がありましたので、サービスの概要から簡単なデータパイプラインの構築を行うまで、数回の記事に分けて紹介できればと思っています。
これからWorkflowsを初めて使用する方や、使用中だが実装に悩んでいる方などの参考に少しでもなれば幸いです。

第一弾となる本記事では、Cloud Workflowsのサービス概要や想定されるケースの説明と、実際にサンプルワークフローを実行してみたり定義を確認するところだったりを扱っていきます。

Cloud Workflowsとは?

特徴

  • フルマネージドのデータオーケストレーション開発ツール
  • HTTPベースのAPI呼び出しを含む、各種GCPサービスを組み合わせることができる
  • yamlまたはjsonで記述可能で、ステップと呼ばれる個別の手順を定義した形式で構成する
  • サービスの依存関係をエンドツーエンドで明示的にモニタリングできる
  • サーバーレスであり、ワークフロー実行時のみ課金が発生

料金

ステップには内部ステップと外部ステップの2種類存在し、それぞれ実行回数毎に課金が発生します。

内部ステップ:Google CloudのAPI実行を始め、Cloud FunctionsやCloud Runの実行、ワークフロー内の変数割当やサブワークフローの実行など、Google Cloudの中で完結するステップのこと。
外部ステップ: Google Cloudの外部サービスへのHTTP APIのリクエストや、Google Cloudリソースであってもカスタムドメインを使っているエンドポイントへのリクエストのこと。

内部ステップ月額
最初の 5,000 ステップ無料
5,001~100,000,000 ステップ$0.01 / 1,000 ステップ
100,000,001 以降のステップGCPへ問い合わせ
外部ステップ月額
最初の 2,000 呼び出し無料
2,001~100,000,000 ステップ$0.025 / 1,000 ステップ
100,000,001 以降のステップGCPへ問い合わせ

※2023年6月時点

結論、比較的シンプルなデータパイプラインであれば無料で運用できてしまいますね、、🆓

想定される利用ケース

複数のGCPサービスを使用するデータ処理を、サーバーレス且つイベント駆動型で実行したい場合などが考えられます。例として以下のような手順の処理があげられます。

  1. AWS S3に存在するファイルをGCSへ転送
  2. 転送したファイルをBigQueryにロード
  3. dataformを使用してSQLを実行
各処理の概念図

果たしてCloud Workflowsで実現可能なのでしょうか、、まずはサンプルワークフローを見ていきましょう👀

サンプルワークフローを実行してみる

それでは実際に、GCPが用意しているクイックスタートに沿ってサンプルワークフローを実行してみましょう。このクイックスタートでは、指定したロケーション名に関連する記事をwikipediaから取得する(wikipedia APIを実行)ことができるワークフロージョブのようです。

また前提として、課金が有効化されているGCPプロジェクトが用意できていることとします。

1. Cloud WorkflowsのAPIを有効化する

1. GCPプロジェクトの検索窓で「workflows」と検索し、候補に表示される「ワークフロー」をクリック。

2. 初めて使用する場合に限り、以下の画面が表示されるためWorkflows APIを有効化

2. ワークフロージョブの作成

1. 「作成」を選択

2. 各項目を入力していく

 ※サービスアカウントについて
ワークフロージョブにおいて、Google Cloud の各サービスをAPIで実行するためのアカウントとして使用されます。今回のクイックスタートでは特殊な権限は不要であるがサービスアカウントの指定は必須であるため、デフォルトで存在する ”App Engine default service account” を使用すれば問題ありません。

3. デプロイする

3. ワークフロージョブの実行

1. 「実行」を選択

2. まずは引数を与えずに実行

3. 出力結果を確認

実行ステータスや出力結果、実行ログを見ることができる

4. 引数に特定の値を与えて実行してみる

5. 出力結果を確認

“europe”というキーワードに関連する記事が出力されたことが分かる

以上で実行環境の構築から処理の実行まで完了です!たったこれだけのステップで処理のデプロイから引数を与えて外部APIの実行まで可能だなんて、、汎用性が高そうな気がしますよね💡?

ワークフローの定義を確認

それでは、先ほど実行したワークフロージョブはどんな定義がされていたのか確認してみましょう。
実はこの定義、yamlまたはjsonで書かれたコードを元に勝手に可視化されているんです!

  1. checkSearchTermInInput:引数から入力値が与えられているか確認
  2. checkSearchTermInInput.condition1:入力値が与えられていれば変数に入力値を代入
  3. getLocation:入力値が与えられていなければ、GCPプロジェクトのロケーションを取得
  4. setFromCallResult:取得したロケーションの値を変数に代入
  5. readWikipedia:wikipedia apiに対して、変数の値をパラメータに持たせて実行
  6. returnOutput:実行結果を出力

コードも見ていきましょう。

main:
    params: [input]
    steps:
    - checkSearchTermInInput:
        switch:
            - condition: '${"searchTerm" in input}'
              assign:
                - searchTerm: '${input.searchTerm}'
              next: readWikipedia
    - getLocation:
        call: sys.get_env
        args:
            name: GOOGLE_CLOUD_LOCATION
        result: location
    - setFromCallResult:
        assign:
            - searchTerm: '${text.split(location, "-")[0]}'
    - readWikipedia:
        call: http.get
        args:
            url: 'https://en.wikipedia.org/w/api.php'
            query:
                action: opensearch
                search: '${searchTerm}'
        result: wikiResult
    - returnOutput:
            return: '${wikiResult.body[1]}'

main
mainは必ずしも必須ではありませんが、実行時に引数を受け取りたい場合は必須となります。そしてその引数を受け取るために params を記述し、その後に処理を順に実行していくための step を記述していくという流れになっていることがわかります。

stepは基本的に上から順に実行されますが、条件によっては実行したくないstepがある場合や、次のstepを実行するまでに間隔を空けたい場合があるかと思います。そんな時に使用できるのが、上記コードにも記載のある switch や condition 、 sleep といったWorkflows側で用意された関数です。

Workflows側で用意された関数
クイックスタートのステップ内で使用されている switch や sys.get_env, text.split を始め、かなり多くの関数が用意されていることが以下の公式リファレンスを見るとわかります。
https://cloud.google.com/workflows/docs/reference/stdlib/overview

ワークフローが失敗したことを検知するためにもログを吐き出したいなあ、、と思っていたらこんな関数もあったり。
https://cloud.google.com/workflows/docs/reference/stdlib/sys/log

しかし実際に試してみた経験上、歯痒いところに手が届かないといったことはあったため今後の関数拡充には期待したいところです、、🙏

API呼び出し
wikipediaのような外部のAPI呼び出しはもちろん、Google CloudサービスのAPIも豊富に呼び出すことが可能です。
https://cloud.google.com/workflows/docs/reference/googleapis

この豊富なAPIを活用すれば、想定される利用ケースのデータパイプラインは実現可能なイメージが湧いてきますね🔥

終わりに

本記事では、主にCloud Workflowsのサービス概要の説明と、実際にサンプルワークフローを実行したり定義を確認したりといった部分に焦点を当てて書きました。
続いての記事では、実際に想定されるケースの書き方や実装を中心にやっていければなと考えていますので、是非そちらも楽しみにお待ちいただければと思います。

※7月21日追記
続編として実践編の記事を公開しましたので、以下リンクよりご覧ください!
https://ex-ture.com/blog/2023/07/21/gcp-cloud-workflows-practice/

ここまで読んでくださった方、実際にCloud Workflowsを触ってみた方、どうもありがとうございました&お疲れ様でした!


エクスチュアはGoogle Cloud Platformのサービスパートナーです。

Cloud WorkflowsやGA4を含むGoogle Cloud Platform、Adobe Experience Cloud、Tableau、Lookerなどに精通したスタッフによるデータ活用サポート、各マーケティングツールの導入実装・活用支援のコンサルティングサービスや、GCP/AWSなどのパブリッククラウドを使ったデータ分析基盤構築コンサルティングサービスを提供しております。

デジタルマーケティングに関するお悩みや活用支援、他分析ツールなどについてお困りの方は
お気軽にご質問・ご相談ください。

ピックアップ記事

  1. 最速で理解したい人のためのIT用語集

関連記事

  1. Google Cloud Platform

    Google Compute Engine のLinuxVMにVNC接続する

    こんにちは、エクスチュアの權泳東(権泳東/コン・ヨンドン)です。…

  2. Google Cloud Platform

    Server-side GTM を Google AppEngine にデプロイする

    こんにちは、エクスチュアの権泳東(權泳東/コン・ヨンドン)です。…

  3. GitHub Actions

    GitHub ActionsでGCEへのデプロイを楽にしてみた

    はじめに今回はgithub actionsを導入する…

  4. Google BigQuery

    BigQuery BI Engine解説

    こんにちは、エクスチュア渡部です。2019/4/9-4/11に行わ…

  5. Google BigQuery

    BigQueryのユーザー定義関数(UDF)をTableauで使う

    こんにちは、エクスチュアの渡部です。今回はTableauでstan…

  6. Adobe Analytics

    Adobe Analytics: DWHレポートの日付列をBigQueryのDate型として扱う

    こんにちは、エクスチュアの權泳東(権泳東/コン・ヨンドン)です。…

最近の記事

  1. 第14回関西DB勉強会-Snowflake Summit参加…
  2. Open Interpreter+VScode+Docker…
  3. LangChainって何?: 次世代AIアプリケーション構築…
  4. 回帰分析はかく語りき Part1 単回帰分析
  5. GitHub ActionsでGCEへのデプロイを楽にしてみ…
  1. Snowflake

    SnowflakeのHybrid Tableのマニュアルを読み解く
  2. Mouseflow

    MouseflowにおけるITP対応について
  3. Databricks

    Databricks: Spark DataFramesをJDBCから作成する
  4. ブログ

    インドネシアのデジタルマーケティング
  5. ブログ

    IPアドレスとは
PAGE TOP