Databricks

Databricks: Spark RDDで使う主なメソッド

こんにちは、エクスチュアの権泳東(コン・ヨンドン)です。

今回はDatabricksのSpark RDDでよく使う主なメソッドを紹介します。
前回のブログでは便利なDataFrameを取り上げましたが、基本のRDDもしっかりおさえておく必要があります。

Transformation系

RDDを処理して別のRDDに変換するTransformation系の主なメソッドは以下のものがあります。

  • map
  • flatMap
  • filter
  • union
  • intersection
  • subtrct
  • distinct

map

引数で渡した関数をRDDの各行に対して実行した結果を返してくれます。

val seq = Seq("Some people feel the rain", "Others just get wet")
val rdd = sc.parallelize(seq)
val caps = rdd.map(x => x.toUpperCase)
caps.collect.foreach(println)

結果:
SOME PEOPLE FEEL THE RAIN
OTHERS JUST GET WET

まずSpark/Scalaを始めた時に引数の「x => x.toUppserCase」って何??
と戸惑いましたが、こう書くのと一緒です。

def upper(x:String): String = {
  x.toUpperCase
}
rdd.map(x => upper(x)).collect.foreach(println)

わざわざ関数を先に宣言しなくてもそのまま引数で渡せるのがScalaの良いところです。

flatMap

mapに似てますが、mapした結果をひとつのコレクションにして返してくれます。

rdd.flatMap(x => x.split(" ")).collect

結果:
Array[String] = Array(Some, people, feel, the, rain, Others, just, get, wet)

flatMapとmapって何が違うのか?という場合は結果を見ると早いです。
mapでは入れ子になったコレクションが返ります。

rdd.map(x => x.split(" ")).collect

結果:
Array[Array[String]] = Array(Array(Some, people, feel, the, rain), Array(Others, just, get, wet))

filter

引数で渡した関数の結果がtrueになるものだけ返します。

rdd.filter(x => x.contains("rain")).collect

結果:
Array[String] = Array(Some people feel the rain)

union

2つのRDDをひとつのRDDにして返します。

val rdd1 = sc.parallelize(Seq(1,2,3,4,5))
val rdd2 = sc.parallelize(Seq(1,6,7,8))
rdd1.union(rdd2).collect()

結果:
Array[Int] = Array(1, 2, 3, 4, 5, 1, 6, 7, 8)

intersection

両方のRDDに完全一致で存在するものだけを返します。

val rdda = sc.parallelize(Seq("alpha", "bravo", "charlie"))
val rddb = sc.parallelize(Seq("bravo", "Alpha", "charles", "alpha"))
rdda.intersection(rddb).collect()

結果:
Array[String] = Array(bravo, alpha)

subtract

RDDから完全一致するものを「引いた」結果を返します。

val ford = sc.parallelize(List("Chop your own wood and it will warm you twice")).flatMap(x => x.split(" ")).map(x => x.toLowerCase)
val drop = sc.parallelize(List("your and will twice")).flatMap(x => x.split(" "))
ford.subtract(drop).collect

結果:
Array[String] = Array(it, warm, wood, own, chop, you)

distinct

RDDから重複を除外した結果を返します。

val dupe = sc.parallelize(Seq("one", 1, "two", 2, "three", "one", "two", 1, 2))
dupe.distinct.collect()

結果:
Array[Any] = Array(1, 2, two, one, three)

&nbps;

Action系

RDDから結果を抽出するAction系の主なメソッドは以下のものがあります。

  • collect
  • count
  • first
  • take
  • reduce
  • takeOrdered
  • top

collect

例文で既に何度も出てきたメソッドです。
collectはRDDの各分散されてるパーティションのデータを「集める」ためのメソッドです。
Transformationが終わったRDDに対して使います。
ただし、集めたデータがSparkクラスタのメモリサイズ以上あるとout-of-memory errorが起きます。
あくまでもRDDをTransformして絞り込んだ結果に対して使いましょう。

val nums = sc.parallelize(Seq(1,2,3,4,5,6,7,8,9,10))
nums.collect()

結果:
Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

count

RDDの行数をカウントします。

nums.count()

結果:
Long = 10

first

RDDの一番最初の行を返します。

numRdd.first()

結果:
Int = 1

take

引数で指定した行数だけ返します。

nums.take(5)

結果:
Array[Int] = Array(1, 2, 3, 4, 5)

reduce

引数で渡された関数を実行した結果を集約して一つにして返してくれます。

nums.reduce((x, y) => x + y)

結果:
Int = 55

引数で渡せる関数は、「同じ型の2つの変数を引数として受け取って、同じ型で返す」というルールがあります。

で、こういう書き方も可能です。
結果は同じく55になります。

def add(x:Int, y:Int): Int = {
  return x + y
}
nums.reduce(add)

こんなに冗長的に書く必要がないのがScalaのメリットです。

takeOrdered

takeに似てますが、ソートされた結果を返します。

nums.takeOrdered(4)

結果:
Array[Int] = Array(1, 2, 3, 4)

top

これもtakeに似てますが、降順ソート(上位トップx)の結果を返します。

nums.top(3)

結果:
Array[Int] = Array(10, 9, 8)

Key/ValueペアRDDのTransformation系

ここまで主にIntかStringの値のコレクションだけを扱いましたが、Spark/Scalaをいじってるとキーバリュー型RDDを扱うケースも出てきます。

KeyValueペアRDDでよく使う主なTransformation系メソッドは以下のものがあります。

  • groupByKey
  • reduceByKey
  • sortByKey
  • join

groupByKey

同じキーを持つ値をグループ化します。

val rdd = sc.parallelize(Seq("My", "future", "starts", "when", "I", "wake", "up", "every", "morning")) 
val kv = rdd.map(x => (x.length, x))
val len = kv.groupByKey()
len.collect.foreach(println)

結果:
(1,CompactBuffer(I))
(2,CompactBuffer(My, up))
(4,CompactBuffer(when, wake))
(5,CompactBuffer(every))
(6,CompactBuffer(future, starts))
(7,CompactBuffer(morning))

reduceByKey

同じキー同士でグループ化したのちに、グループ毎にreduceします。

val item = sc.parallelize(Seq(("item1", 540), 
                              ("item2", 432),
                              ("item2", 648),
                              ("item3", 324)
                             ))
val sum = item.reduceByKey((total, price) => total + price)
sum.collect()

結果:
Array[(String, Int)] = Array((item3,324), (item1,540), (item2,1080))

sortByKey

読んで字のごとく、キーでソートします。

val flip = sum.map(x => (x._2, x._1)).sortByKey()
flip.collect()

結果:
Array[(Int, String)] = Array((324,item3), (540,item1), (1080,item2))

降順でソートしたい場合は、引数にfalseを渡します。

val flipDesc = sum.map(x => (x._2, x._1)).sortByKey(false)
flipDesc.collect()

結果:
Array((1080,item2), (540,item1), (324,item3))

join

同じキーを持つRDD同士を結合します。
innner joinなので、両方のRDDにキーが存在するものだけ結合されます。

val memRank = sc.parallelize(Seq((100, "A"),
                                   (101, "B"),
                                   (102, "S"),
                                   (103, "B"),
                                   (104, "A"),
                                   (105, "C")
                                  )
                              )
val memName = sc.parallelize(Seq((100, "John"),
                                    (101, "Mary"),
                                    (102, "Amber"),
                                    (103, "Mike"),
                                    (104, "Bob")
                                   )
                               )
val memInfo = memRank.join(memName)
memInfo.collect.foreach(println)

結果:
(104,(A,Bob))
(100,(A,John))
(101,(B,Mary))
(102,(S,Amber))
(103,(B,Mike))

Key/ValueペアRDDのAction系

KeyValueペアRDDにおけるAction系メソッドは以下のものがあります。

  • countByKey
  • collectAsMap
  • lookup

countByKey

キーごとに数をカウントします。返ってくるのはMap型コレクションです。

val item = sc.parallelize(Seq(("item1", 540), 
                              ("item2", 432),
                              ("item2", 648),
                              ("item3", 324)
                             ))
item.countByKey()

結果:
scala.collection.Map[String,Long] = Map(item3 -> 1, item1 -> 1, item2 -> 2)

collectAsMap

重複してるキーがある場合は、ひとつしか返って来ないので注意

item.collectAsMap()

結果:
scala.collection.Map[String,Int] = Map(item1 -> 540, item3 -> 324, item2 -> 648)

↑item2がひとつだけしかない。先にreduceByKeyしてから使うべき。

lookup

該当するキーに一致する値を取り出します。
存在しないキーを指定すると空のコレクションが返ってきます。
指定したキーがRDD内に存在しているか確認したい時に使えます。

item.lookup("item1")
item.lookup("item2")
item.lookup("item5")

結果:
Seq[Int] = WrappedArray(540)
Seq[Int] = WrappedArray(432, 648)
Seq[Int] = WrappedArray()

まとめ

今回はRDDでよく使う主なTransformation系/Action系メソッドについて紹介しました。
RDDはDatabricks Spark/Scalaの基本的な機能です。
基本を抑えて楽しい分析ライフを!

弊社はデジタルマーケティングからビッグデータ分析まで幅広くこなすデータ分析のプロ集団です。
Databricksのコンサルティング/導入支援についてのお問合わせはこちらからどうぞ。

ブログへの記事リクエストはこちらまでどうぞ。

関連記事

  1. Adobe Analytics

    Adobe AnaltyicsとGoogle Analytics の「生Webビーコン」をBigQu…

    こんにちは、エクスチュアの権泳東(コン・ヨンドン)です。今回は…

  2. Google BigQuery

    Big QueryでWindow関数を用いて、累積和を計算する

    こんにちは。エクスチュアでインターンをさせて頂いている中野です。…

  3. Google BigQuery

    Google Apps Scriptを使ってスプレッドシートからBigQueryのテーブルを更新する…

    こんにちは、エクスチュアの権泳東(コン・ヨンドン)です。今回はAp…

  4. Adobe Analytics

    Azure DatabricksでAdobe AnalyticsのDatafeedを分析する

    こんにちは、エクスチュアの権泳東(コン・ヨンドン)です。今回はAz…

  5. Google Cloud Platform

    Node.js+GAE: 日本語自然文を形態素解析してネガポジ判定をする

    こんにちは、エクスチュアの権泳東(コン・ヨンドン)です。今回は…

  6. Adobe Analytics

    AA + GA : SafariのITP2.1に備えてphpでクッキーを永続化する

    こんにちは、エクスチュアの権泳東(コン・ヨンドン)です。Saf…

最近の記事

  1. Linux技術者資格のLPIC-3試験に合格しました
  2. Adobe Summit 2020レポート: Custome…
  3. KARTE:指定期間の来訪が再訪問の場合にスコアリングする
  4. Adobe Mobile SDK 4.xからAEP SDKに…
  5. Adobe Summit 2020レポート: Data &#…
  1. Amazon Web Services

    Databricks Community Editionを使ってApache S…
  2. Cloud Dataflow

    Firebase AnalyticsのデータをフラットなCSVに変換する …
  3. Tableau

    【TC19ブログ】エクスチュアの海外カンファレンス参加支援制度
  4. Databricks

    Databricks: Spark DataFramesをJDBCから作成する
  5. Adobe Analytics

    Adobe AnalyticsのDatafeedをBigQueryのColumn…
PAGE TOP