こんにちは、エクスチュアの權泳東(権泳東/コン・ヨンドン)です。
今回はDatabricksのSpark RDDでよく使う主なメソッドを紹介します。
前回のブログでは便利なDataFrameを取り上げましたが、基本のRDDもしっかりおさえておく必要があります。
Transformation系
RDDを処理して別のRDDに変換するTransformation系の主なメソッドは以下のものがあります。
- map
- flatMap
- filter
- union
- intersection
- subtrct
- distinct
map
引数で渡した関数をRDDの各行に対して実行した結果を返してくれます。
val seq = Seq("Some people feel the rain", "Others just get wet") val rdd = sc.parallelize(seq) val caps = rdd.map(x => x.toUpperCase) caps.collect.foreach(println)
結果:
SOME PEOPLE FEEL THE RAIN
OTHERS JUST GET WET
まずSpark/Scalaを始めた時に引数の「x => x.toUppserCase」って何??
と戸惑いましたが、こう書くのと一緒です。
def upper(x:String): String = { x.toUpperCase } rdd.map(x => upper(x)).collect.foreach(println)
わざわざ関数を先に宣言しなくてもそのまま引数で渡せるのがScalaの良いところです。
flatMap
mapに似てますが、mapした結果をひとつのコレクションにして返してくれます。
rdd.flatMap(x => x.split(" ")).collect
結果:
Array[String] = Array(Some, people, feel, the, rain, Others, just, get, wet)
flatMapとmapって何が違うのか?という場合は結果を見ると早いです。
mapでは入れ子になったコレクションが返ります。
rdd.map(x => x.split(" ")).collect
結果:
Array[Array[String]] = Array(Array(Some, people, feel, the, rain), Array(Others, just, get, wet))
filter
引数で渡した関数の結果がtrueになるものだけ返します。
rdd.filter(x => x.contains("rain")).collect
結果:
Array[String] = Array(Some people feel the rain)
union
2つのRDDをひとつのRDDにして返します。
val rdd1 = sc.parallelize(Seq(1,2,3,4,5)) val rdd2 = sc.parallelize(Seq(1,6,7,8)) rdd1.union(rdd2).collect()
結果:
Array[Int] = Array(1, 2, 3, 4, 5, 1, 6, 7, 8)
intersection
両方のRDDに完全一致で存在するものだけを返します。
val rdda = sc.parallelize(Seq("alpha", "bravo", "charlie")) val rddb = sc.parallelize(Seq("bravo", "Alpha", "charles", "alpha")) rdda.intersection(rddb).collect()
結果:
Array[String] = Array(bravo, alpha)
subtract
RDDから完全一致するものを「引いた」結果を返します。
val ford = sc.parallelize(List("Chop your own wood and it will warm you twice")).flatMap(x => x.split(" ")).map(x => x.toLowerCase) val drop = sc.parallelize(List("your and will twice")).flatMap(x => x.split(" ")) ford.subtract(drop).collect
結果:
Array[String] = Array(it, warm, wood, own, chop, you)
distinct
RDDから重複を除外した結果を返します。
val dupe = sc.parallelize(Seq("one", 1, "two", 2, "three", "one", "two", 1, 2)) dupe.distinct.collect()
結果:
Array[Any] = Array(1, 2, two, one, three)
&nbps;
Action系
RDDから結果を抽出するAction系の主なメソッドは以下のものがあります。
- collect
- count
- first
- take
- reduce
- takeOrdered
- top
collect
例文で既に何度も出てきたメソッドです。
collectはRDDの各分散されてるパーティションのデータを「集める」ためのメソッドです。
Transformationが終わったRDDに対して使います。
ただし、集めたデータがSparkクラスタのメモリサイズ以上あるとout-of-memory errorが起きます。
あくまでもRDDをTransformして絞り込んだ結果に対して使いましょう。
val nums = sc.parallelize(Seq(1,2,3,4,5,6,7,8,9,10)) nums.collect()
結果:
Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
count
RDDの行数をカウントします。
nums.count()
結果:
Long = 10
first
RDDの一番最初の行を返します。
numRdd.first()
結果:
Int = 1
take
引数で指定した行数だけ返します。
nums.take(5)
結果:
Array[Int] = Array(1, 2, 3, 4, 5)
reduce
引数で渡された関数を実行した結果を集約して一つにして返してくれます。
nums.reduce((x, y) => x + y)
結果:
Int = 55
引数で渡せる関数は、「同じ型の2つの変数を引数として受け取って、同じ型で返す」というルールがあります。
で、こういう書き方も可能です。
結果は同じく55になります。
def add(x:Int, y:Int): Int = { return x + y } nums.reduce(add)
こんなに冗長的に書く必要がないのがScalaのメリットです。
takeOrdered
takeに似てますが、ソートされた結果を返します。
nums.takeOrdered(4)
結果:
Array[Int] = Array(1, 2, 3, 4)
top
これもtakeに似てますが、降順ソート(上位トップx)の結果を返します。
nums.top(3)
結果:
Array[Int] = Array(10, 9, 8)
Key/ValueペアRDDのTransformation系
ここまで主にIntかStringの値のコレクションだけを扱いましたが、Spark/Scalaをいじってるとキーバリュー型RDDを扱うケースも出てきます。
KeyValueペアRDDでよく使う主なTransformation系メソッドは以下のものがあります。
- groupByKey
- reduceByKey
- sortByKey
- join
groupByKey
同じキーを持つ値をグループ化します。
val rdd = sc.parallelize(Seq("My", "future", "starts", "when", "I", "wake", "up", "every", "morning")) val kv = rdd.map(x => (x.length, x)) val len = kv.groupByKey() len.collect.foreach(println)
結果:
(1,CompactBuffer(I))
(2,CompactBuffer(My, up))
(4,CompactBuffer(when, wake))
(5,CompactBuffer(every))
(6,CompactBuffer(future, starts))
(7,CompactBuffer(morning))
reduceByKey
同じキー同士でグループ化したのちに、グループ毎にreduceします。
val item = sc.parallelize(Seq(("item1", 540), ("item2", 432), ("item2", 648), ("item3", 324) )) val sum = item.reduceByKey((total, price) => total + price) sum.collect()
結果:
Array[(String, Int)] = Array((item3,324), (item1,540), (item2,1080))
sortByKey
読んで字のごとく、キーでソートします。
val flip = sum.map(x => (x._2, x._1)).sortByKey() flip.collect()
結果:
Array[(Int, String)] = Array((324,item3), (540,item1), (1080,item2))
降順でソートしたい場合は、引数にfalseを渡します。
val flipDesc = sum.map(x => (x._2, x._1)).sortByKey(false) flipDesc.collect()
結果:
Array((1080,item2), (540,item1), (324,item3))
join
同じキーを持つRDD同士を結合します。
innner joinなので、両方のRDDにキーが存在するものだけ結合されます。
val memRank = sc.parallelize(Seq((100, "A"), (101, "B"), (102, "S"), (103, "B"), (104, "A"), (105, "C") ) ) val memName = sc.parallelize(Seq((100, "John"), (101, "Mary"), (102, "Amber"), (103, "Mike"), (104, "Bob") ) ) val memInfo = memRank.join(memName) memInfo.collect.foreach(println)
結果:
(104,(A,Bob))
(100,(A,John))
(101,(B,Mary))
(102,(S,Amber))
(103,(B,Mike))
Key/ValueペアRDDのAction系
KeyValueペアRDDにおけるAction系メソッドは以下のものがあります。
- countByKey
- collectAsMap
- lookup
countByKey
キーごとに数をカウントします。返ってくるのはMap型コレクションです。
val item = sc.parallelize(Seq(("item1", 540), ("item2", 432), ("item2", 648), ("item3", 324) )) item.countByKey()
結果:
scala.collection.Map[String,Long] = Map(item3 -> 1, item1 -> 1, item2 -> 2)
collectAsMap
重複してるキーがある場合は、ひとつしか返って来ないので注意
item.collectAsMap()
結果:
scala.collection.Map[String,Int] = Map(item1 -> 540, item3 -> 324, item2 -> 648)
↑item2がひとつだけしかない。先にreduceByKeyしてから使うべき。
lookup
該当するキーに一致する値を取り出します。
存在しないキーを指定すると空のコレクションが返ってきます。
指定したキーがRDD内に存在しているか確認したい時に使えます。
item.lookup("item1") item.lookup("item2") item.lookup("item5")
結果:
Seq[Int] = WrappedArray(540)
Seq[Int] = WrappedArray(432, 648)
Seq[Int] = WrappedArray()
まとめ
今回はRDDでよく使う主なTransformation系/Action系メソッドについて紹介しました。
RDDはDatabricks Spark/Scalaの基本的な機能です。
基本を抑えて楽しい分析ライフを!
弊社はデジタルマーケティングからビッグデータ分析まで幅広くこなすデータ分析のプロ集団です。
Databricksのコンサルティング/導入支援についてのお問合わせはこちらからどうぞ。
ブログへの記事リクエストはこちらまでどうぞ。