Cloud Dataflow

Firebase AnalyticsのデータをフラットなCSVに変換する – Google Cloud Dataflow編

こんにちは、エクスチュアの權泳東(権泳東/コン・ヨンドン)です。

以前Firebase AnalyticsのBigQueryスキーマをフラットな一行のCSVに変換するための処理をNode.jsで書きました。
Firebase AnalyticsのデータをフラットなCSVに変換するETL処理

しかしこれではあまりGoogle Cloud Platformの素晴らしさが伝わらないので、今回はCloud Dataflowを使って同じことを実装します。

Cloud Dataflowの資料を探すと、とりあえずApache Beamの公式サイトのドキュメント読んどけという答えが多いので、読みながら進めます。

Eclipseの開発環境を用意する

まずはGoogleのクイックスタートのとおりに開発環境を準備します。

Java と Eclipse を使用したクイックスタート

このクイックスタートのとおりに進むと、DataflowパイプラインをEclipseから実行出来るようになります。
次は実装のステップに進みます。

パイプラインを実装する

FirebaseAnalyticsのスキーマは、改行で区切られたJSONファイルです。(newline delimited JSON)
Dataflowでは一行ずつインプットを受け取って、それを変換したものをアウトプットに出していくのですが、その変換するプログラムをBeam SDKで実装してDataflowで実行するという流れです。

本家Apache Beamのサイトでそこらへんは詳しく説明してます。
Design Your Pipeline | Apache Beam

で、早速実装したのがこれです。
ブログで晒すには長めのソースコードなので、githubにおいときます。

FirebaseJsonConvert.java

https://github.com/youngdongk/dfl-etl-firebase/blob/master/src/main/java/jp/exture/dataflowdemo/FirebaseJsonConvert.java

ソースコード解説

まずは基本的なところと設定まわりです。

    public interface DflOptions extends PipelineOptions {
    	
        String getInput();
        void setInput(String value);
        
        String getOutput();
        void setOutput(String value);
    }
    
    public static void main(String[] args) {
        PipelineOptionsFactory.register(DflOptions.class);
        DflOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DflOptions.class);
        Pipeline p = Pipeline.create(options);
        
        PCollection<String> lines = p.apply("ReadJsonFile", TextIO.read().from(options.getInput()));
        PCollection<String> output = lines.apply("FlattenJsonFile", ParDo.of(new FlattenJsonFn()));
        output.apply("WriteTsvFile", TextIO.write().to(options.getOutput()).withSuffix(".tsv"));

        p.run().waitUntilFinish();
    }

Dataflowの基本は、PCollectionオブジェクトにしたインプットを「ゴニョゴニョと変換」してさらに新しいPCollectionオブジェクトを生成して、最後にそれをアウトプットするというものです。

mainメソッドの中で、Pipelineを初期化して、そのパイプラインからインプットファイルを一行ずつ読み込んでPCollectionを作ります。
このPcollectionにはJSONの1行=1レコードが入ってきます。

実行時の引数で input= と output= でそれぞれ入力ファイルと出力ファイル(のprefix)を指定するので、PiplineOptionsを継承したDflOptionsというインターフェースを実装します。
これでgetterとsetterが出来上がるので、引数の値をやり取りする時に使います。

PCollectionに入ってるJSONをフラットなTSVに変換するのがFlattenJsonFnで、これはDoFnのサブクラスとして作ってあります。
このFlattenJsonFnを、ParDoという並列処理に引数として渡して実行します。
そこのソースコードはこうなってます。

    private static class FlattenJsonFn extends DoFn<String, String> {

        @ProcessElement
        public void processElement(ProcessContext c) {
            String json = c.element();
            StringBuffer cols = new StringBuffer();

            try {

                JSONParser jsonParser = new JSONParser();
                JSONObject jsonObject = (JSONObject) jsonParser.parse(json);

                JSONObject data_user= (JSONObject) jsonObject.get("user_dim");
                JSONArray data_event= (JSONArray) jsonObject.get("event_dim");

                //user_dimの展開
                String user_id = "";
                String first_open_timestamp_micros = "";

                user_id = (String) data_user.get("user_id");
                try { first_open_timestamp_micros = (String) data_user.get("first_open_timestamp_micros"); } catch(Exception e) {}

                JSONObject device_info = (JSONObject) data_user.get("device_info");
                //以下中略

            } catch (Exception e) {}    
            String rows = cols.toString();
            c.output(rows);
        }
    }

json-simpleというパーサーを使って、JSONを展開し、値を取り出してフラットな一行のStringに変換するという事をやってます。
FirebaseAnalyticsのスキーマは、user_dimというセッションデータと、event_dimというヒットデータの2つの要素で構成されてます。
何層にもネストになったJSONなので、都度JSONArrayをforループで展開しながら読み込んでます。
そこらへんはgithubに載ってるソースコードを見れば分かる事でしょう。

FirebaseAnalyicsのJSONスキーマをCloud Storageに置く

今回はFirebase Demo ProjectのBigQueryデータを使います。
これをCloud StorageにJSON形式でエクスポートします。

20160607のデータを選んでからExport Tableで、Cloud Storageのバケットとファイルパスを指定してエクスポートします。

Eclipse上でビルドする

さて、Eclipseを使ってこのソースコードをビルドします。

新規プロジェクト作成で、Google Cloud Dataflow Java Project を作成するか、または先程クイックスタートで作ったWordCountのプロジェクトを再利用してください。

次に、FirebaseJsonConvert.javaをプロジェクトに追加します。
package名は適当に変えるなりしてください。

ビルドするためにはjson-simpleライブラリが必要です。
json-simple

プロジェクトのpom.xmlを開いて、dependencies要素内に、下記を追加します。

<dependency>
   <groupId>com.googlecode.json-simple</groupId>
   <artifactId>json-simple</artifactId>
   <version>1.1.1</version>
</dependency>

EclipseからDataflowパイプラインを実行する

ではEclipseからこのパイプライン処理をDatafow上で実行します。

Run -> Run Configurations を開き、左側のペインからDataflow Pipelineを選びます。

1. Mainタブを開き、下記を設定してApplyします。

Name: 適当に名前をつける
Project: このプロジェクトを選ぶ
Main class: jp.exture.dataflowdemo.FirebaseJsonConvert

2. Pipline Argumentsタブを開き、下記を設定してApply

Runner: DataflowRunnerを選択
Account: GCPのログインアカウントを選択
Cloud Platform project ID: Dataflowを実行するGCPプロジェクトを選択
Cloud Storage staging location: ステージングファイルを置くCloud Storage上のフォルダを選択。
↑ステージング用フォルダをバケット上に作っておいてください。

3. Argumentsタブを開き、Program Argumentsに下記を入力してApply

--input=gs://<bucket名>/<エクスポートしたファイル名> --output=gs://<bucket名>/アウトプットファイルのprefix

outputは自動的に拡張子.tsvを付与するので、ファイルのprefixだけを指定します。

そしてRunボタンを押すと実行されます。
Eclipseのコンソールに、実行中の状況が出力されます。
実行に成功すると、finished with status DONE. というメッセージが出て終了します。

Dataflowコンソールには、実行したJobの一覧が出てきます。
成功したJobには緑のチェックアイコンがついてます。

Cloud Storageを見ると、変換されたTSVファイルが4つ出来ています。

複数のWorkerによって分割処理されるので、このように結果が自動的に分割されて出力されます。
出来上がったTSVファイルを見ると、イベント毎(ヒット毎)に一行のフラットなデータに変換されて出力されている事が確認出来ます。

おまけ:コマンドラインから実行する

Eclipseではなくて、コマンドラインで実行したい場合は、mavenコマンドで実行します。

$ mvn compile exec:java \
    -Dexec.mainClass=jp.exture.dataflowdemo.FirebaseJsonConvert \
    -Dexec.args="--project=<project名> \
    --stagingLocation=gs://<バケット名>/<ステージングフォルダ>/ \
    --input=gs://<バケット名>/<入力ファイル> \
    --output=gs://<バケット名>/<出力ファイルのprefix> \
    --runner=DataflowRunner"

今回はCloud Dataflowパイプラインを使って、FirebaseAnalyticsのBigQueryスキーマをJSON型からフラットなTSVファイルに変換する方法について説明しました。

弊社では、Google Cloud Platformを使って、FirebaseAnalytics, GoogleAnalytics, AdobeAnalyticsなど各デジタル解析ソリューションのデータを分析するための基盤構築支援業務を行っております。

お問い合わせはこちらからどうぞ。

ブログへの記事リクエストはこちらまで

ピックアップ記事

  1. 最速で理解したい人のためのIT用語集

関連記事

  1. Cloud Dataproc

    BigQueryテーブルをAVRO形式でエクスポートしてHiveで扱う

    こんにちは、エクスチュアの權泳東(権泳東/コン・ヨンドン)です。…

  2. Google Cloud Platform

    Vertex AI Embeddings for Text によるテキストエンベディングをやってみた…

    こんにちは、石原と申します。自然言語処理(NLP)は近年のA…

  3. Application Integration

    Google Cloud iPaaS 「Application Integration」を使ってみた…

    こんにちは、エクスチュアの黒岩です。今回の記事では、Goog…

  4. Adobe Analytics

    Adobe Analytics: DatafeedをGoogle BigQueryにロード(2019…

    こんにちは、エクスチュアの權泳東(権泳東/コン・ヨンドン)です。…

  5. Google Cloud Platform

    Looker: LookerbotでSlackにグラフ画像をスケジュール投稿する

    こんにちは、エクスチュアの權泳東(権泳東/コン・ヨンドン)です。…

  6. Google Cloud Platform

    【GCP】Cloud Workflowsでデータパイプラインの構築を試してみた①概要編

    こんにちは、エクスチュアの黒岩と申します。エクスチュアブログ…

最近の記事

  1. モダンデータスタックなワークフローオーケストレーションツール…
  2. Streamlit in Snowflakeによるダッシュボ…
  3. Streamlit in SnowflakeによるStrea…
  4. Streamlitを使った簡単なデータアプリケーション作成ガ…
  5. 生成AI機能を活かしたデータカタログ製品「Secoda」を試…
  1. Mouseflow

    ページ解析ツールMouseflowにふれてみた
  2. IT用語集

    ドメイン(Domain)、クロスドメイン(Cross Domain)って何?
  3. IT用語集

    クローリング(Crawling)って何?
  4. IT用語集

    ナレッジマネジメント(Knowledge Management)って何?
  5. ObservePoint

    Webサイトのプライバシーポリシー検証(1/6):プライバシーポリシーを設置する…
PAGE TOP