Databricks

Databricks: Spark RDDで使う主なメソッド

こんにちは、エクスチュアの權泳東(権泳東/コン・ヨンドン)です。

今回はDatabricksのSpark RDDでよく使う主なメソッドを紹介します。
前回のブログでは便利なDataFrameを取り上げましたが、基本のRDDもしっかりおさえておく必要があります。

Transformation系

RDDを処理して別のRDDに変換するTransformation系の主なメソッドは以下のものがあります。

  • map
  • flatMap
  • filter
  • union
  • intersection
  • subtrct
  • distinct

map

引数で渡した関数をRDDの各行に対して実行した結果を返してくれます。

val seq = Seq("Some people feel the rain", "Others just get wet")
val rdd = sc.parallelize(seq)
val caps = rdd.map(x => x.toUpperCase)
caps.collect.foreach(println)

結果:
SOME PEOPLE FEEL THE RAIN
OTHERS JUST GET WET

まずSpark/Scalaを始めた時に引数の「x => x.toUppserCase」って何??
と戸惑いましたが、こう書くのと一緒です。

def upper(x:String): String = {
  x.toUpperCase
}
rdd.map(x => upper(x)).collect.foreach(println)

わざわざ関数を先に宣言しなくてもそのまま引数で渡せるのがScalaの良いところです。

flatMap

mapに似てますが、mapした結果をひとつのコレクションにして返してくれます。

rdd.flatMap(x => x.split(" ")).collect

結果:
Array[String] = Array(Some, people, feel, the, rain, Others, just, get, wet)

flatMapとmapって何が違うのか?という場合は結果を見ると早いです。
mapでは入れ子になったコレクションが返ります。

rdd.map(x => x.split(" ")).collect

結果:
Array[Array[String]] = Array(Array(Some, people, feel, the, rain), Array(Others, just, get, wet))

filter

引数で渡した関数の結果がtrueになるものだけ返します。

rdd.filter(x => x.contains("rain")).collect

結果:
Array[String] = Array(Some people feel the rain)

union

2つのRDDをひとつのRDDにして返します。

val rdd1 = sc.parallelize(Seq(1,2,3,4,5))
val rdd2 = sc.parallelize(Seq(1,6,7,8))
rdd1.union(rdd2).collect()

結果:
Array[Int] = Array(1, 2, 3, 4, 5, 1, 6, 7, 8)

intersection

両方のRDDに完全一致で存在するものだけを返します。

val rdda = sc.parallelize(Seq("alpha", "bravo", "charlie"))
val rddb = sc.parallelize(Seq("bravo", "Alpha", "charles", "alpha"))
rdda.intersection(rddb).collect()

結果:
Array[String] = Array(bravo, alpha)

subtract

RDDから完全一致するものを「引いた」結果を返します。

val ford = sc.parallelize(List("Chop your own wood and it will warm you twice")).flatMap(x => x.split(" ")).map(x => x.toLowerCase)
val drop = sc.parallelize(List("your and will twice")).flatMap(x => x.split(" "))
ford.subtract(drop).collect

結果:
Array[String] = Array(it, warm, wood, own, chop, you)

distinct

RDDから重複を除外した結果を返します。

val dupe = sc.parallelize(Seq("one", 1, "two", 2, "three", "one", "two", 1, 2))
dupe.distinct.collect()

結果:
Array[Any] = Array(1, 2, two, one, three)

&nbps;

Action系

RDDから結果を抽出するAction系の主なメソッドは以下のものがあります。

  • collect
  • count
  • first
  • take
  • reduce
  • takeOrdered
  • top

collect

例文で既に何度も出てきたメソッドです。
collectはRDDの各分散されてるパーティションのデータを「集める」ためのメソッドです。
Transformationが終わったRDDに対して使います。
ただし、集めたデータがSparkクラスタのメモリサイズ以上あるとout-of-memory errorが起きます。
あくまでもRDDをTransformして絞り込んだ結果に対して使いましょう。

val nums = sc.parallelize(Seq(1,2,3,4,5,6,7,8,9,10))
nums.collect()

結果:
Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

count

RDDの行数をカウントします。

nums.count()

結果:
Long = 10

first

RDDの一番最初の行を返します。

numRdd.first()

結果:
Int = 1

take

引数で指定した行数だけ返します。

nums.take(5)

結果:
Array[Int] = Array(1, 2, 3, 4, 5)

reduce

引数で渡された関数を実行した結果を集約して一つにして返してくれます。

nums.reduce((x, y) => x + y)

結果:
Int = 55

引数で渡せる関数は、「同じ型の2つの変数を引数として受け取って、同じ型で返す」というルールがあります。

で、こういう書き方も可能です。
結果は同じく55になります。

def add(x:Int, y:Int): Int = {
  return x + y
}
nums.reduce(add)

こんなに冗長的に書く必要がないのがScalaのメリットです。

takeOrdered

takeに似てますが、ソートされた結果を返します。

nums.takeOrdered(4)

結果:
Array[Int] = Array(1, 2, 3, 4)

top

これもtakeに似てますが、降順ソート(上位トップx)の結果を返します。

nums.top(3)

結果:
Array[Int] = Array(10, 9, 8)

Key/ValueペアRDDのTransformation系

ここまで主にIntかStringの値のコレクションだけを扱いましたが、Spark/Scalaをいじってるとキーバリュー型RDDを扱うケースも出てきます。

KeyValueペアRDDでよく使う主なTransformation系メソッドは以下のものがあります。

  • groupByKey
  • reduceByKey
  • sortByKey
  • join

groupByKey

同じキーを持つ値をグループ化します。

val rdd = sc.parallelize(Seq("My", "future", "starts", "when", "I", "wake", "up", "every", "morning")) 
val kv = rdd.map(x => (x.length, x))
val len = kv.groupByKey()
len.collect.foreach(println)

結果:
(1,CompactBuffer(I))
(2,CompactBuffer(My, up))
(4,CompactBuffer(when, wake))
(5,CompactBuffer(every))
(6,CompactBuffer(future, starts))
(7,CompactBuffer(morning))

reduceByKey

同じキー同士でグループ化したのちに、グループ毎にreduceします。

val item = sc.parallelize(Seq(("item1", 540), 
                              ("item2", 432),
                              ("item2", 648),
                              ("item3", 324)
                             ))
val sum = item.reduceByKey((total, price) => total + price)
sum.collect()

結果:
Array[(String, Int)] = Array((item3,324), (item1,540), (item2,1080))

sortByKey

読んで字のごとく、キーでソートします。

val flip = sum.map(x => (x._2, x._1)).sortByKey()
flip.collect()

結果:
Array[(Int, String)] = Array((324,item3), (540,item1), (1080,item2))

降順でソートしたい場合は、引数にfalseを渡します。

val flipDesc = sum.map(x => (x._2, x._1)).sortByKey(false)
flipDesc.collect()

結果:
Array((1080,item2), (540,item1), (324,item3))

join

同じキーを持つRDD同士を結合します。
innner joinなので、両方のRDDにキーが存在するものだけ結合されます。

val memRank = sc.parallelize(Seq((100, "A"),
                                   (101, "B"),
                                   (102, "S"),
                                   (103, "B"),
                                   (104, "A"),
                                   (105, "C")
                                  )
                              )
val memName = sc.parallelize(Seq((100, "John"),
                                    (101, "Mary"),
                                    (102, "Amber"),
                                    (103, "Mike"),
                                    (104, "Bob")
                                   )
                               )
val memInfo = memRank.join(memName)
memInfo.collect.foreach(println)

結果:
(104,(A,Bob))
(100,(A,John))
(101,(B,Mary))
(102,(S,Amber))
(103,(B,Mike))

Key/ValueペアRDDのAction系

KeyValueペアRDDにおけるAction系メソッドは以下のものがあります。

  • countByKey
  • collectAsMap
  • lookup

countByKey

キーごとに数をカウントします。返ってくるのはMap型コレクションです。

val item = sc.parallelize(Seq(("item1", 540), 
                              ("item2", 432),
                              ("item2", 648),
                              ("item3", 324)
                             ))
item.countByKey()

結果:
scala.collection.Map[String,Long] = Map(item3 -> 1, item1 -> 1, item2 -> 2)

collectAsMap

重複してるキーがある場合は、ひとつしか返って来ないので注意

item.collectAsMap()

結果:
scala.collection.Map[String,Int] = Map(item1 -> 540, item3 -> 324, item2 -> 648)

↑item2がひとつだけしかない。先にreduceByKeyしてから使うべき。

lookup

該当するキーに一致する値を取り出します。
存在しないキーを指定すると空のコレクションが返ってきます。
指定したキーがRDD内に存在しているか確認したい時に使えます。

item.lookup("item1")
item.lookup("item2")
item.lookup("item5")

結果:
Seq[Int] = WrappedArray(540)
Seq[Int] = WrappedArray(432, 648)
Seq[Int] = WrappedArray()

まとめ

今回はRDDでよく使う主なTransformation系/Action系メソッドについて紹介しました。
RDDはDatabricks Spark/Scalaの基本的な機能です。
基本を抑えて楽しい分析ライフを!

弊社はデジタルマーケティングからビッグデータ分析まで幅広くこなすデータ分析のプロ集団です。
Databricksのコンサルティング/導入支援についてのお問合わせはこちらからどうぞ。

ブログへの記事リクエストはこちらまでどうぞ。

ピックアップ記事

  1. 最速で理解したい人のためのIT用語集

関連記事

  1. Adobe Analytics

    AdobeAnalytics: スクロールで目標に到達したらカスタムリンク

    こんにちは、エクスチュアの権泳東(コン・ヨンドン)です。Ado…

  2. Google BigQuery

    GCP: 今月のGCP課金額をslackに自動的に書き込む

    こんにちは、エクスチュアの權泳東(権泳東/コン・ヨンドン)です。…

  3. Google BigQuery

    Google Apps Scriptを使ってスプレッドシートからBigQueryのテーブルを更新する…

    こんにちは、エクスチュアの權泳東(権泳東/コン・ヨンドン)です。…

  4. Databricks

    Databricks: Spark DataFrameでユーザー定義関数を使う

    こんにちは、エクスチュアの權泳東(権泳東/コン・ヨンドン)です。…

  5. Adobe Analytics

    Azure DatabricksでAdobe AnalyticsのDatafeedを分析する

    こんにちは、エクスチュアの權泳東(権泳東/コン・ヨンドン)です。…

  6. Adobe Analytics

    Adobe Mobile SDK 4.xからAEP SDKに移行する

    こんにちは、エクスチュアの権泳東(コン・ヨンドン)です。Ado…

最近の記事

  1. 【GA4/GTM】dataLayerを使ってカスタムイベント…
  2. KARTE を使ってサイト外でも接客を
  3. 【GA4/GTM】dataLayerを活用しよう
  4. ジャーニーマップをデジタルマーケティングの視点で
  5. ChatGPT ProからClaude3 Proへ移行した話…
  1. ObservePoint

    ObservePointの活用で自動車メーカー フォードが達成した4つの成果
  2. GA 360 Suite

    GoogleDataStudio:複数のデータソースにフィルターを適用する
  3. Adobe Cloud Platform Auditor

    Adobe Cloud Platform Auditor (Powered by…
  4. Adobe Analytics

    Adobe Summit 2020レポート: Implementation Ti…
  5. Adobe Analytics

    訪問回数 とは-Adobe Analyticsの指標説明
PAGE TOP