Databricks

Databricks: Spark RDDで使う主なメソッド

こんにちは、エクスチュアの権泳東(コン・ヨンドン)です。

今回はDatabricksのSpark RDDでよく使う主なメソッドを紹介します。
前回のブログでは便利なDataFrameを取り上げましたが、基本のRDDもしっかりおさえておく必要があります。

Transformation系

RDDを処理して別のRDDに変換するTransformation系の主なメソッドは以下のものがあります。

  • map
  • flatMap
  • filter
  • union
  • intersection
  • subtrct
  • distinct

map

引数で渡した関数をRDDの各行に対して実行した結果を返してくれます。

val seq = Seq("Some people feel the rain", "Others just get wet")
val rdd = sc.parallelize(seq)
val caps = rdd.map(x => x.toUpperCase)
caps.collect.foreach(println)

結果:
SOME PEOPLE FEEL THE RAIN
OTHERS JUST GET WET

まずSpark/Scalaを始めた時に引数の「x => x.toUppserCase」って何??
と戸惑いましたが、こう書くのと一緒です。

def upper(x:String): String = {
  x.toUpperCase
}
rdd.map(x => upper(x)).collect.foreach(println)

わざわざ関数を先に宣言しなくてもそのまま引数で渡せるのがScalaの良いところです。

flatMap

mapに似てますが、mapした結果をひとつのコレクションにして返してくれます。

rdd.flatMap(x => x.split(" ")).collect

結果:
Array[String] = Array(Some, people, feel, the, rain, Others, just, get, wet)

flatMapとmapって何が違うのか?という場合は結果を見ると早いです。
mapでは入れ子になったコレクションが返ります。

rdd.map(x => x.split(" ")).collect

結果:
Array[Array[String]] = Array(Array(Some, people, feel, the, rain), Array(Others, just, get, wet))

filter

引数で渡した関数の結果がtrueになるものだけ返します。

rdd.filter(x => x.contains("rain")).collect

結果:
Array[String] = Array(Some people feel the rain)

union

2つのRDDをひとつのRDDにして返します。

val rdd1 = sc.parallelize(Seq(1,2,3,4,5))
val rdd2 = sc.parallelize(Seq(1,6,7,8))
rdd1.union(rdd2).collect()

結果:
Array[Int] = Array(1, 2, 3, 4, 5, 1, 6, 7, 8)

intersection

両方のRDDに完全一致で存在するものだけを返します。

val rdda = sc.parallelize(Seq("alpha", "bravo", "charlie"))
val rddb = sc.parallelize(Seq("bravo", "Alpha", "charles", "alpha"))
rdda.intersection(rddb).collect()

結果:
Array[String] = Array(bravo, alpha)

subtract

RDDから完全一致するものを「引いた」結果を返します。

val ford = sc.parallelize(List("Chop your own wood and it will warm you twice")).flatMap(x => x.split(" ")).map(x => x.toLowerCase)
val drop = sc.parallelize(List("your and will twice")).flatMap(x => x.split(" "))
ford.subtract(drop).collect

結果:
Array[String] = Array(it, warm, wood, own, chop, you)

distinct

RDDから重複を除外した結果を返します。

val dupe = sc.parallelize(Seq("one", 1, "two", 2, "three", "one", "two", 1, 2))
dupe.distinct.collect()

結果:
Array[Any] = Array(1, 2, two, one, three)

&nbps;

Action系

RDDから結果を抽出するAction系の主なメソッドは以下のものがあります。

  • collect
  • count
  • first
  • take
  • reduce
  • takeOrdered
  • top

collect

例文で既に何度も出てきたメソッドです。
collectはRDDの各分散されてるパーティションのデータを「集める」ためのメソッドです。
Transformationが終わったRDDに対して使います。
ただし、集めたデータがSparkクラスタのメモリサイズ以上あるとout-of-memory errorが起きます。
あくまでもRDDをTransformして絞り込んだ結果に対して使いましょう。

val nums = sc.parallelize(Seq(1,2,3,4,5,6,7,8,9,10))
nums.collect()

結果:
Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

count

RDDの行数をカウントします。

nums.count()

結果:
Long = 10

first

RDDの一番最初の行を返します。

numRdd.first()

結果:
Int = 1

take

引数で指定した行数だけ返します。

nums.take(5)

結果:
Array[Int] = Array(1, 2, 3, 4, 5)

reduce

引数で渡された関数を実行した結果を集約して一つにして返してくれます。

nums.reduce((x, y) => x + y)

結果:
Int = 55

引数で渡せる関数は、「同じ型の2つの変数を引数として受け取って、同じ型で返す」というルールがあります。

で、こういう書き方も可能です。
結果は同じく55になります。

def add(x:Int, y:Int): Int = {
  return x + y
}
nums.reduce(add)

こんなに冗長的に書く必要がないのがScalaのメリットです。

takeOrdered

takeに似てますが、ソートされた結果を返します。

nums.takeOrdered(4)

結果:
Array[Int] = Array(1, 2, 3, 4)

top

これもtakeに似てますが、降順ソート(上位トップx)の結果を返します。

nums.top(3)

結果:
Array[Int] = Array(10, 9, 8)

Key/ValueペアRDDのTransformation系

ここまで主にIntかStringの値のコレクションだけを扱いましたが、Spark/Scalaをいじってるとキーバリュー型RDDを扱うケースも出てきます。

KeyValueペアRDDでよく使う主なTransformation系メソッドは以下のものがあります。

  • groupByKey
  • reduceByKey
  • sortByKey
  • join

groupByKey

同じキーを持つ値をグループ化します。

val rdd = sc.parallelize(Seq("My", "future", "starts", "when", "I", "wake", "up", "every", "morning")) 
val kv = rdd.map(x => (x.length, x))
val len = kv.groupByKey()
len.collect.foreach(println)

結果:
(1,CompactBuffer(I))
(2,CompactBuffer(My, up))
(4,CompactBuffer(when, wake))
(5,CompactBuffer(every))
(6,CompactBuffer(future, starts))
(7,CompactBuffer(morning))

reduceByKey

同じキー同士でグループ化したのちに、グループ毎にreduceします。

val item = sc.parallelize(Seq(("item1", 540), 
                              ("item2", 432),
                              ("item2", 648),
                              ("item3", 324)
                             ))
val sum = item.reduceByKey((total, price) => total + price)
sum.collect()

結果:
Array[(String, Int)] = Array((item3,324), (item1,540), (item2,1080))

sortByKey

読んで字のごとく、キーでソートします。

val flip = sum.map(x => (x._2, x._1)).sortByKey()
flip.collect()

結果:
Array[(Int, String)] = Array((324,item3), (540,item1), (1080,item2))

降順でソートしたい場合は、引数にfalseを渡します。

val flipDesc = sum.map(x => (x._2, x._1)).sortByKey(false)
flipDesc.collect()

結果:
Array((1080,item2), (540,item1), (324,item3))

join

同じキーを持つRDD同士を結合します。
innner joinなので、両方のRDDにキーが存在するものだけ結合されます。

val memRank = sc.parallelize(Seq((100, "A"),
                                   (101, "B"),
                                   (102, "S"),
                                   (103, "B"),
                                   (104, "A"),
                                   (105, "C")
                                  )
                              )
val memName = sc.parallelize(Seq((100, "John"),
                                    (101, "Mary"),
                                    (102, "Amber"),
                                    (103, "Mike"),
                                    (104, "Bob")
                                   )
                               )
val memInfo = memRank.join(memName)
memInfo.collect.foreach(println)

結果:
(104,(A,Bob))
(100,(A,John))
(101,(B,Mary))
(102,(S,Amber))
(103,(B,Mike))

Key/ValueペアRDDのAction系

KeyValueペアRDDにおけるAction系メソッドは以下のものがあります。

  • countByKey
  • collectAsMap
  • lookup

countByKey

キーごとに数をカウントします。返ってくるのはMap型コレクションです。

val item = sc.parallelize(Seq(("item1", 540), 
                              ("item2", 432),
                              ("item2", 648),
                              ("item3", 324)
                             ))
item.countByKey()

結果:
scala.collection.Map[String,Long] = Map(item3 -> 1, item1 -> 1, item2 -> 2)

collectAsMap

重複してるキーがある場合は、ひとつしか返って来ないので注意

item.collectAsMap()

結果:
scala.collection.Map[String,Int] = Map(item1 -> 540, item3 -> 324, item2 -> 648)

↑item2がひとつだけしかない。先にreduceByKeyしてから使うべき。

lookup

該当するキーに一致する値を取り出します。
存在しないキーを指定すると空のコレクションが返ってきます。
指定したキーがRDD内に存在しているか確認したい時に使えます。

item.lookup("item1")
item.lookup("item2")
item.lookup("item5")

結果:
Seq[Int] = WrappedArray(540)
Seq[Int] = WrappedArray(432, 648)
Seq[Int] = WrappedArray()

まとめ

今回はRDDでよく使う主なTransformation系/Action系メソッドについて紹介しました。
RDDはDatabricks Spark/Scalaの基本的な機能です。
基本を抑えて楽しい分析ライフを!

弊社はデジタルマーケティングからビッグデータ分析まで幅広くこなすデータ分析のプロ集団です。
Databricksのコンサルティング/導入支援についてのお問合わせはこちらからどうぞ。

ブログへの記事リクエストはこちらまでどうぞ。

関連記事

  1. プログラミング

    Node.jsでCSVファイル内のダブルクオートで囲まれたカラム内のカンマを除去する

    こんにちは、エクスチュアCTOの権です。いろんなデータを扱う業…

  2. Adobe Analytics

    Adobe Analytics: SegmentsAPIを使って大量のセグメント設定を作成・更新する…

    こんにちは、エクスチュアCTOの権です。今回はAdobe Ma…

  3. Google Cloud Platform

    Node.js+GAE: 日本語自然文を形態素解析してネガポジ判定をする

    こんにちは、エクスチュアの権泳東(コン・ヨンドン)です。今回は…

  4. Adobe Analytics

    AdobeAnalytics: GTMのdataLayerをAdobeAnalyticsの処理ルール…

    こんにちは、エクスチュアCTOの権です。今回はGoogle T…

  5. Databricks

    Databricksを始める前に覚えておきたいScalaプログラミングの基本

    こんにちは、エクスチュアの権泳東(コン・ヨンドン)です。最近ツイッ…

  6. Google BigQuery

    GCP: 今月のGCP課金額をslackに自動的に書き込む

    こんにちは、エクスチュアCTOの権です。今回はGCPの課金額を…

最近の記事

  1. Databricks: Delta Lakeを使ってみる
  2. Adobe Analytics:計算指標でevents変数を…
  3. Databricks: Spark DataFramesをJ…
  4. Databricks: Spark RDDで使う主なメソッド…
  5. GCPのBQMLを使ってKaggleコンペに挑んでみた(その…
  1. Adobe Analytics

    Adobe Analytics:セグメントの落とし穴:意図しないデータが混ざる①…
  2. Adobe Analytics

    Adobe Analytics: DWHレポートをAWSのS3バケットに配信する…
  3. Adobe Target

    Adobe Target: スマホアプリ上でABテストをする
  4. Cloud Dataflow

    Firebase AnalyticsのデータをフラットなCSVに変換する …
  5. Salesforce

    Zapier(ザピエル)経由でSansanで取り込んだ名刺をSalesforce…
PAGE TOP