이번 포스팅은 지난 포스팅 <RDD, Resilient Distributed DataSet에 대하여[1]> 에 이어서 진행하도록 하겠습니다. 교재는 빅데이터 분석을 위한 스파크2 프로그래밍을 참고하였습니다.
2. RDD
2.1.1 들어가기에 앞서
지난 포스팅 2-1. RDD Resilient Distributed Dataset에 대하여 에서 다뤘습니다.
2.1.2. 스파크컨텍스트 생성
- 스파크 컨텍스트는 스파크 애플리케이션과 클러스터의 연결을 관리하는 객체임
- 따라서, 스파크 애플리케이션을 사용하려면 무조건 스파크 컨텍스트를 생성하고 이용해야 함
- RDD 생성도 스파크컨텍스트를 이용해 생성 가능함
val conf = new SparkConf().setMaster(“local[*]”).setAppName(“RDDCreateSample”)
val sc = new SparkContext(conf)
[예] scala - sparkcontext 생성
sc = SparkContext(master=“local[*], appName=“RDDCreateTest”, conf=conf)
[예] python - spark context 생성
- 스파크 동작에 필요한 여러 설정 정보 지정 가능함
- SparkConf(), conf=conf 부분에서 config을 통과시켜서 지정 가능함
- 지정해야 하는 정보 중에, master 정보와 appName 정보는 필수 지정 정보임
- master 정보란 ? 스파크가 동작할 클러스터의 마스터 서버를 의미하는 것. 로컬모드에서 local, local[3], local[*]와 같이 사용. [*]는 쓰레드 개수를 의미하며, *는 사용 가능한 모든 쓰레드를 이용하겠다는 이야기임
- appName은 애플리케이션 이름으로, 구분하기 위한 목적임. 스파크 UI화면에 사용됨
2.1.3. RDD생성
- 드라이버 프로그램의 컬렉션 객체 이용
-
자바 or 파이썬 ? 리스트 이용, 스칼라 ? 시퀀스타입 이용
- 드라이버 프로그램?
- 최초로 메인 함수를 실행해 RDD등을 생성하고 각종 연산을 호출하는 프로그램
- 드라이버 내의 메인 함수는 스파크 애플리케이션과 스파크 컨텍스트 객체를 생성함
- 스파크 컨텍스트를 통해 RDD의 연산 정보를 DAG스케쥴러에 전달하면 스케쥴러는 이 정보를 가지고 실행 계획을 수립한 후 이를 클러스터 매니저에게 전달함
val rdd1 = sc.parallelize(List("a","b","c","d","e"))
[예] scala - rdd 생성
[예] python - 드라이버의 컬렉션 객체를 이용한 RDD 생성
-
문자열을 포함한 컬렉션 객체 생성 example) python : ['a','b','c','d']
- parallelize() 메서드를 이용해 RDD 생성
- RDD의 파티션 수를 지정하고 싶을 때, parallelize() 메서드의 두 번째 매개변수로 파티션 개수 지정 가능
val rdd1 = sc.parallelize(1 to 1000, 10)
- 외부 데이터를 읽어서 새로운 RDD를 생성
- 기본적으로 하둡의 다루는 모든 입출력 유형 가능
- 내부적으로 하둡의 입출력을 사용하기 때문임
[예] python - 외부데이터를 이용한 RDD 생성
2.1.4 RDD 기본 액션
기본 액션 연산의 종류에 대해 알아보도록 하겠습니다.
1. collect
- collect은 RDD의 모든 원소를 모아서 배열로 리턴
- 반환 타입이 RDD가 아닌 배열이므로 액션 연산
- RDD에 있는 모든 요소들이 서버의 메모리에 수집됨. 즉, 대용량 데이터를 다룰 땐 조심하고, 주로 작은 용량의 데이터 디버깅용으로 사용함
[예] python - collect 연산
2. count
2.1.5 RDD 트랜스포메이션
기존 RDD를 이용해 새로운 RDD를 생성하는 연산입니다.
1. Map 연산
- RDD에 속하는 모든 요소에 적용하여 새로운 RDD 생성하는 연산
- RDD의 몇몇 연산은 특정 데이터 타입에만 적용 가능함
1.1. map
- 하나의 인자를 받는 함수 자체가 map의 인자로 들어감
- 이 함수를 이용해 rdd의 모든 요소에 적용한 뒤 새로운 RDD 리턴
[예] python - map 연산
- map()에 전달되는 함수의 입력 데이터 타입과 출력 데이터 타입이 일치할 필요 없음. 문자열을 입력받아 정수로 반환하는 함수 사용 가능
[예] python - map 연산, 입력/출력 일치하지 않는 경우
1.2. flatMap
- map()과 마찬가지로, 하나의 인자를 받는 함수가 flatMap의 인자로 들어감
- map()과 차이점은 각 함수의 인자가 반환하는 값의 타입이 다름
- flatMap()에 사용하는 함수 f는 반환값으로 리스트나 시퀀스 같은 여러 개의 값을 담은 (이터레이션이 가능한) 일종의 컬렉션과 유사한 타입의 값을 반환해야 함
- map[U](f:(T) -> U):RDD[U]
- flatMapU:RDD[U])
- TraversableOnce는 이터레이터 타입을 의미
map()과 flatMap() 차이점 예시
[예] python - map 연산 vs. flatMap 연산
- map연산은 문자열의 배열로 구성된 RDD를 생성함
- 각 요소의 문자열(T)이 단어가 포함된 배열(U)이기 때문임
- 반면, flatMap 연산은 문자열로 구성된 RDD를 생성함
- TraversableOnce(U)이기 때문에 문자열의 배열 내의 요소가 모두 끄집어져 나오는 작업을 하게 됨
- flatMap()은 하나의 입력값(“apple, orange”)에 대해 출력 값이 여러개인 경우([“apple”, “orange”]) 유용하게 사용할 수 있음
1.3. mapPartitions
- map과 flatMap은 하나의 인자만을 받는 함수가 인자로 들어가지만, mapPartitions은 여러 인자를 받는 함수가 인자로 들어갈 수 있음 ex) 이터레이터를 인자로 받는 함수
- mapartitions은 인자로 받은 함수가 파티션 단위로 적용하여 새로운 RDD를 생성함. 반면에, map과 flatMap은 인자로 받은 함수가 요소 한개 단위로 적용됨
[예] python - mapPartitions
- sc.parallelize(range(1,11),3)으로 파티션 3개로 나뉨
- DB 연결!!! 가 세번 출력된 걸 보니 파티션 단위로 처리한 것을 확인할 수 있음
- increase함수는 각 파티션 내의 요소에 대한 이터레이터를 전달받아 함수 내부에서 파티션의 개별 요소에 대한 작업을 처리하고 그 결과를 다시 이터레이터 타입으로 반환
1.4. mapPartitionsWithIndex
- mapPartions와 동일하고 다른 점은 인자로 전달되는 함수를 호출할 때 파티션에 속한 요소의 정보뿐만 아니라 해당 파티션의 인덱스 정보도 함께 전달해 준다는 것임
def IncreaseWithIndex(idx, numbers):
for i in numbers:
if(idx == 1):
yield i+1
- mapPartitionswithIndex에 인자로 들어갈 함수는 위와 같이 인덱스 정보도 같이 들어감
[예] python - mapPartitionsWithIndex
1.5. mapValues
- RDD의 모든 요소들이 키와 값의 쌍을 이루고 있는 경우에만 사용 가능한 메서드이며, 인자로 전달받은 함수를 “값”에 해당하는 요소에만 적용하고 그 결과로 구성된 새로운 RDD를 생성
rdd1 = sc.parallelize(["a","b","c"]).map(lambda v:(v,1)) //(키,값)으로 구성된 rdd생성
rdd2 = rdd1.mapValues(lambda i:i+1)
print(rdd2.collect())
[예] python - mapValues
1.6. flatMapValues
- MapValues 처럼 키에 해당되는 값에 함수를 적용하나 flatMap() 연산을 적용할 수 있음
rdd1 = sc.parallelize([(1, "a,b"),(2, "a,c"),(3, "d,e")])
rdd2 = rdd1.flatMapValues(lambda v:v.split(','))
rdd2.collect()
[예] python - flatMapValues
2. 그룹화 연산
- 특정 조건에 따라 요소를 그룹화하거나 특정 함수 적용
2.1. zip
- 두 개의 서로 다른 RDD를 각 요소의 인덱스에 따라 첫번째 RDD의 ‘인덱스’번째를 키로, 두번째 RDD의 ‘인덱스’번째를 값으로 하는 순서쌍을 생성
- 두 개 RDD는 같은 개수의 파티션과 각 파티션 당 요소개수가 동일해야 함
rdd1 = sc.parallelize(["a","b","c"])
rdd2 = sc.parallelize([1,2,3])
rdd3 = rdd1.zip(rdd2)
rdd3.collect()
>>> [('a', 1), ('b', 2), ('c', 3)]
rdd1 = sc.parallelize(range(1,10),3)
rdd2 = sc.parallelize(range(11,20),3)
rdd3 = rdd1.zip(rdd2)
rdd3.collect()
>>> [(1, 11), (2, 12), (3, 13), (4, 14), (5, 15), (6, 16), (7, 17), (8, 18), (9, 19)]
2.2. zipPartitions
- zip()과 다르게 파티션의 개수만 동일하면 됨
- zipPartitions()은 최대 4개 RDD까지 인자로 넣을 수 있음
- 파이썬 사용불가!!
2.3. groupBy
- RDD의 요소를 일정한 기준에 따라 그룹을 나누고, 각 그룹으로 구성된 새로운 RDD를 생성함
- 각 그룹은 키와 각 키에 속한 요소의 시퀀스(iterator)로 구성됨
- 인자로 전달하는 함수가 각 그룹의 키를 결정하는 역할을 담당함
>>> rdd1 = sc.parallelize(range(1,11))
>>> rdd2 = rdd1.groupBy(lambda v: "even" if v%2==0 else "odd")
/// groupBy에 인자로 전달된 함수에 의해 키(even/odd) 결정
print(rdd2.collect())
>>> [('even', <pyspark.resultiterable.ResultIterable object at 0x7fbc3f090e80>), ('odd', <pyspark.resultiterable.ResultIterable object at 0x7fbc3f090470>)]
/// 각 키에 해당하는 값은 iterator임을 확인할 수 있음
>>> for x in rdd2.collect():
... print(x[0], list(x[1]))
...
even [2, 4, 6, 8, 10]
odd [1, 3, 5, 7, 9]
2.3. groupByKey
- 이미 키와 값의 쌍으로 구성된 RDD에만 적용 가능함
>>> rdd1 = sc.parallelize(["a","b","c","b","c"]).map(lambda v:(v,1))
>>> rdd1.collect()
[('a', 1), ('b', 1), ('c', 1), ('b', 1), ('c', 1)]
/// (키,값) 쌍으로 구성된 RDD 생성
>>> rdd2 = rdd1.groupByKey()
>>> rdd2.collect()
[('b', <pyspark.resultiterable.ResultIterable object at 0x7fbc3f0ab6a0>), ('c', <pyspark.resultiterable.ResultIterable object at 0x7fbc3f0ab6d8>), ('a', <pyspark.resultiterable.ResultIterable object at 0x7fbc3f0ab0b8>)]
/// 키에 따라 그룹화함. 그 결과 키에 해당하는 시퀀스 생성
>>> for x in rdd2.collect():
... print(x[0], list(x[1]))
...
b [1, 1]
c [1, 1]
a [1]
2.4. cogroup
- 이미 키와 값의 쌍으로 구성된 RDD에만 적용 가능함
- 여러 개의 RDD를 인자로 받음(최대 3개)
- 여러 RDD에서 동일한 키에 해당하는 요소들로 구성된 시퀀스를 만든 후, (키, 시퀀스)의 튜플을 구성. 그 튜플들로 구성된 새로운 RDD를 생성함
- Tuple(키, Tuple(rdd1요소들의 집합, rdd2요소들의 집합, …))
>>> rdd1 = sc.parallelize([("k1","v1"),("k2","v2"),("k1","v3")])
>>> rdd2 = sc.parallelize([("k1","v4")])
>>> rdd1.collect()
[('k1', 'v1'), ('k2', 'v2'), ('k1', 'v3')]
>>> rdd2.collect()
[('k1', 'v4')]
/// (키, 값)쌍으로 구성된 RDD 2개 생성
>>> rdd3 = rdd1.cogroup(rdd2)
>>> rdd3.collect()
[('k1', (<pyspark.resultiterable.ResultIterable object at 0x7fbc3f0b2fd0>, <pyspark.resultiterable.ResultIterable object at 0x7fbc3f00a828>)), ('k2', (<pyspark.resultiterable.ResultIterable object at 0x7fbc3f00ac18>, <pyspark.resultiterable.ResultIterable object at 0x7fbc3f00a5f8>))]
///k1 키에 대해 rdd1의 v1과 v3요소를 묶고, rdd2의 v4요소를 묶어서 튜플로 구성
>>> for x in rdd3.collect():
... print(x[0], list(x[1][0]), list(x[1][1]))
...
k1 ['v1', 'v3'] ['v4']
k2 ['v2'] []
3. 집합 연산
- RDD에 포함된 요소를 하나의 집합으로 간주하여 집합 연산을 수행(합/교집합)
3.1. distinct
- RDD의 원소에서 중복을 제외한 요소로만 새로운 RDD 구성
>>> rdd = sc.parallelize([1,2,3,1,2,3,1,2,3])
>>> rdd2 = rdd.distinct()
>>> rdd2.collect()
[1, 2, 3]
3.1. cartesian
- 두 RDD요소의 카테시안곱을 구하고 그 결과를 요소로 하는 새로운 RDD구성
>>> rdd1 = sc.parallelize([1,2,3])
>>> rdd2 = sc.parallelize(['a','b','c'])
>>> rdd3 = rdd1.cartesian(rdd2)
>>> rdd3.collect()
[(1, 'a'), (1, 'b'), (1, 'c'), (2, 'a'), (2, 'b'), (2, 'c'), (3, 'a'), (3, 'b'), (3, 'c')]
3.2. subtract
- rdd1.subtract(rdd2) : (rdd1의 요소집합 - rdd2의 요소집합)의 차집합
- rdd2.subtract(rdd1) : (rdd2의 요소집합 - rdd1의 요소집합)의 차집합
>>> rdd1 = sc.parallelize(["a", "b","c","d","e"])
>>> rdd2 = sc.parallelize(["d","e"])
>>> rdd3 = rdd1.subtract(rdd2)
>>> rdd3.collect()
['a', 'b', 'c']
3.2. union
>>> rdd1 = sc.parallelize(["a", "b","c","d","e"])
>>> rdd2 = sc.parallelize(["d","e"])
>>> rdd3 = rdd1.union(rdd2)
>>> rdd3.collect()
['a', 'b', 'c', 'd', 'e', 'd', 'e']
3.3. intersection
- 두 RDD요소의 교집합으로 중복되지 않은 요소로 구성
>>> rdd1 = sc.parallelize(["a","a","b","c"])
>>> rdd2 = sc.parallelize(["a","a","c","c"])
>>> rdd3 = rdd1.intersection(rdd2)
>>> rdd3.collect()
['a', 'c']
3.4. join
- RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메서드
- 공통된 키에 대해서만 join수행
- join 수행 결과 Tuple(키, Tuple(첫번째 RDD요소, 두번쨰 RDD요소))
>>> rdd1 = sc.parallelize(["a", "b","c","d","e"]).map(lambda v : (v,1))
>>> rdd1.collect()
[('a', 1), ('b', 1), ('c', 1), ('d', 1), ('e', 1)]
>>> rdd2 = sc.parallelize(["b","c"]).map(lambda v:(v,2))
>>> rdd2.collect()
[('b', 2), ('c', 2)]
>>> rdd3 = rdd1.join(rdd2)
>>> rdd3.collect()
[('b', (1, 2)), ('c', (1, 2))]
3.5. leftOuterJoin, rightOuterJoin
- 키와 값의 쌍으로 구성된 RDD에 사용가능
- leftjoin, rightjoin을 수행
>>> rdd1 = sc.parallelize(["a", "b","c","d","e"]).map(lambda v : (v,1))
>>> rdd2 = sc.parallelize(["b","c"]).map(lambda v:(v,2))
>>> rdd3 = rdd1.leftOuterJoin(rdd2)
>>> rdd3.collect()
[('a', (1, None)), ('e', (1, None)), ('b', (1, 2)), ('c', (1, 2)), ('d', (1, None))]
///rdd2에는 a,d,e 키가 없기 때문에 해당 키에 대한 튜플 요소는 (rdd1의 요소, None)으로 구성됨
>>> rdd4 = rdd1.rightOuterJoin(rdd2)
>>> rdd4.collect()
[('b', (1, 2)), ('c', (1, 2))]
3.6. subtractByKey
- 키와 값의 쌍으로 구성된 RDD에 사용가능
- rdd1의 요소 중에서 rdd2와 겹치지 않는 키로 구성된 새로운 RDD 생성
>>> rdd1 = sc.parallelize(["a","b"]).map(lambda v:(v,1))
>>> rdd2 = sc.parallelize(["b"]).map(lambda v:(v,1))
>>> rdd3 = rdd1.subtractByKey(rdd2)
>>> rdd3.collect()
[('a', 1)]
4. 집계와 관련된 연산들
4.1 reduceByKey
- 키와 값의 쌍으로 구성된 RDD에서 사용 가능
- RDD 내의 동일한 키를 하나로 병합해 (키,값) 쌍으로 구성된 새로운 RDD 생성
- 함수를 인자로 받음.
- 왜냐하면, 파티션 별로 연산을 수행했을 때, 항상 같은 순서로 연산이 수행되는 것을 보장 못하므로, 함수가 수행하는 연산은 교환법칙과 결합법칙이 성립해야 함
>>> rdd = sc.parallelize(['a','b','b']).map(lambda v:(v,1))
>>> rdd.collect()
[('a', 1), ('b', 1), ('b', 1)]
>>> rdd2 = rdd.reduceByKey(lambda v1, v2:(v1+v2))
>>> rdd2.collect()
[('b', 2), ('a', 1)]
(키,값)쌍으로 하는 RDD를 인자로 받는 트랜스포메이션 메서드
- 데이터 처리 과정에서 사용할 파티셔너와 파티션 개수를 지정할 수 있는 옵션이 있음
- 자체적으로 작성한 파티셔너나 파티션 개수를 통해 병렬 처리 수준 변경 가능
4.2 foldByKey
- 키와 값으로 구성된 RDD에 사용 가능
- reduceByKey()와 유사하지만, 병합 연산의 초기값을 인자로 전달할 수 있음
rdd = sc.parallelize(["a","b","b"]).map(lambda v:(v,1))
>>> rdd2 = rdd.foldByKey(0, lambda v1,v2:v1+v2)
>>> rdd2.collect()
[('b', 2), ('a', 1)]
My view) 개인적으로, foldByKey와 reduceByKey의 차이가 잘 이해가 되지 않아, 초기값과 문자열 병합으로 pyspark를 실행해 보았습니다.
>>> rdd = sc.parallelize(["a","b","b"]).map(lambda v:(v,1))
///초기값을 1로 준 경우
>>> rdd2 = rdd.foldByKey(1, lambda v1,v2:v1+v2)
///초기값을 0으로 준 경우와 다른 결과임
>>> rdd2.collect()
[('b', 4), ('a', 2)]
>>> rdd = sc.parallelize(["a","b","b"]).map(lambda v:(v,'c'))
///초기값을 t로 준 경우
>>> rdd2 = rdd.foldByKey('t', lambda v1,v2:v1+v2)
>>> rdd2.collect()
[('b', 'tctc'), ('a', 'tc')]
위의 두 연산를 보니, foldByKey는 초기값 처리를 아래와 같이 진행하는 것 같습니다. 예를 들어, 초기 rdd가 [(‘a’, 1), (‘b’, 1), (‘b’, 1)] 라 한다면, foldByKey는 키 ‘a’와 ‘b’에 대해 각각 초기값을 가지고 병합연산을 수행합니다. 이 때 먼저, 초기값을 가지고 병합 연산을 수행합니다. 키 ‘a’인 경우, 초기값 1라면, v1=1, v2=1(키 ‘a’에 대응되는 값)이 병합연산을 수행해 (‘a’,2)가 됩니다. 그 다음 reducebykey와 같은 연산을 수행하나, 키 ‘a’는 초기 rdd에 하나밖에 없기 때문에, v1=2, v2=None이 되어 최종 foldByKey연산 결과는 키 ‘a’에 대해서 2값을 가지게 됩니다.
‘b’키 같은 경우도 마찬가지입니다. 먼저 초기값을 가지고 연산을 수행합니다. v1=1(초기값), v2=1(b에 대응되는 값)가 병합연산을 수행해 v1=2가 됩니다. ‘b’키는 두개가 있으므로, 나머지 ‘b’키에 대해 v1=1, v2=1가 병합연산을 거쳐 v2=2가 됩니다. 그다음 두개의 ‘b’키에 대해 다시 병합연산이 수행되어 v1=2, v2=2가 되어 최종적으로 ‘b’키에 대해 4의 값이 생성됩니다.
4.3 combineByKey
- 키와 값인 RDD에 사용 가능
- foldByKey와 reduceByKey와 유사함. 차이점은 병합연산 수행 결과 값의 타입이 바뀔 수 있음
def reduceByKey(func:(V,V)=>V):RDD[K,V]
def foldByKey(zeroValue: V)(func:(V,V)=>V):RDD[K,V]
def combineByKey[C](createCombiner:(V)=>C, mergeValue:(C,V)=>V, mergeCombiners:(C,C)=>C):RDD[K,C]
- combineByKey는 reduceByKey와 foldByKey와 다르게 타입이 C로 바뀌어 있음
- 위의 combineByKey를 보면 메서드가 총 createCombiner, mergeValue, mergeCombiners 세 개임을 알 수 있음
- 아래는 combineByKey에서 자주 등장하는 평균구하기 예시임
#record.py 에 따로 저장해야 함.
#아니면, _pickle.PicklingError: Can't pickle <class '__main__.Record'>: attribute lookup Record on __main__ failed 에러 발생.
class Record:
def __init__(self, amount, number=1):
self.amount = amount
self.number = number
def addAmt(self, amount):
return Record(self.amount + amount, self.number + 1)
def __add__(self, other):
amount = self.amount + other.amount
number = self.number + other.number
return Record(amount, number)
def __str__(self):
return "avg:" + str(self.amount / self.number)
def __repr__(self):
return 'Record(%r, %r)' % (self.amount, self.number)
#createCombiner, mergeValue, mergeCombiners 정의
def createCombiner(v):
return Record(v)
def mergeValue(c, v):
return c.addAmt(v)
def mergeCombiners(c1, c2):
return c1 + c2
#combineByKey 실행
rdd = sc.parallelize([("Math", 100), ("Eng", 80), ("Math", 50), ("Eng", 70), ("Eng", 90)])
rdd2 = rdd.combineByKey(lambda v:createCombiner(v), lambda c,v:mergeValue(c,v), lambda c1,c2:mergeCombiners(c1,c2))
print('Math', rdd2.collectAsMap()['Math'], 'Eng', rdd2.collectAsMap()['Eng'])
>>> Math avg:75.0 Eng avg:80.0
- createCombiner()
- 값을 병합하기 위한 컴바이너. 컴바이너는 병합을 수행하고 컴바이너 타입으로 내부에 저장함. 위의 예로는 createCombiner()는 Record클래스 타입으로 저장됨. 즉, rdd의 각 키에 해당되는 값이 record클래스 타입으로 변환되어 저장됨.
- mergeValue()
- 키에 대한 컴바이너가 존재한다면, 컴바이너를 이용해 값을 병합함.
My View) 위의 예 같은 경우, rdd요소 순서대로 combineByKey가 작동한다면 먼저 (“Math”, 100)에 대해 Combiner가 생성되어, math 키에 대한 Record(100)이 생성됩니다.
그 다음 (“Eng”, 70)에 대해 작동합니다. “Eng”키는 기존에 없던 키이기 때문에 새로 Combiner인 Record(70)이 생성됩니다.
그 다음 (“Math”, 50)인데, 기존 Math키에 대한 컴바이너가 존재하기 때문에, 기존 Math 키 컴바이너를 이용하여 병합합니다. 즉, Record(100).addAmt(50)이 발생합니다. 이렇게 Math키와 Eng키에 대해 모든 요소에 대해 병합을 실시하게 됩니다.
- mergeCombiners()
- createCombiner()와 mergeValue()는 파티션별로 수행됨. 그다음 모든 파티션에 생성된 combiner를 병합하는 과정을 mergeCombiners()를 통해 수행하는 것임. 이를 통해 최종결과가 발생함
4.4 aggregateByKey
- 키와 값의 RDD에서 사용 가능
- 초깃값을 설정할 수 있는 점을 제외하면 comebineByKey와 동일
def combineByKey[C](createCombiner:(V)=>C, mergeValue:(C,V)=>V, mergeCombiners:(C,C)=>C):RDD[(K,C)]
def aggregateByKey[U](zeroValue: U)(seqOp:(U,V)=>U, combOp:(U,U)=>U):RDD[(K,U)]
- aggregateByKey에서 seqOp는 mergeValue역할을, comOp는 mergeCombiner역할을 함.
- combineByKey에서 createCombiner로 병합을 위한 초깃값을 구하지만 aggregateByKey는 함수를 이용해 초깃값을 설정하는 대신 바로 ‘값’으로 초기값을 설정함
rdd = sc.parallelize([("Math", 100), ("Eng", 80), ("Math", 50), ("Eng", 70), ("Eng", 90)])
rdd2 = rdd.aggregateByKey(Record(0,0), lambda c,v:mergeValue(c,v), lambda c1,c2:mergeCombiners(c1,c2))
print('Math :', rdd2.collectAsMap()['Math'], 'Eng :', rdd2.collectAsMap()['Eng'])
>>>Math : avg:75.0 Eng : avg:80.0
5. pipe 및 파티션과 관련된 연산
5.1 pipe
- pipe를 이용하면 데이터를 처리하는 과정에서 외부 프로세스를 활용할 수 있음
- 세 개의 숫자로 구성된 문자열을 리눅스의 cut 유틸리티를 이용해 분리한 뒤첫번째와 세번재 숫자를 뽑아내는 예제
>>> rdd = sc.parallelize(["1,2,3", "4,5,6", "7,8,9"])
>>> rdd2 = rdd.pipe("cut -f 1,3 -d ,")
>>> rdd2.collect()
['1,3', '4,6', '7,9']
5.2. coalesce와 repartition
- 현재 RDD에 사용된 파티션 개수를 조정함
- coalesce는 파티션 개수를 줄이기만 되고, repartition은 늘리는 것과 줄이는 것 둘 다 가능
- coalesce가 따로 있는 이유는 처리 방식에 따른 성능 차이 때문임
- repartition은 셔플을 기반으로 동작을 수행하는 데 반해, coalesce는 강제로 셔플을 수행하라는 옵션을 지정하지 않는 한 셔플을 사용하지 않음.
- 따라서 필터링 등으로 인해 데이터 개수가 줄어든 경우 coalesce을 사용하는 것이 좋음
>>> rdd = sc.parallelize(list(range(1,11)),10)
>>> rdd2 = rdd.coalesce(5)
>>> rdd3 = rdd2.repartition(10)
>>> print("partition size : %d"
>>> print("partition size : %d" %rdd2.getNumPartitions())
partition size : 5
>>> print("partition size : %d" %rdd3.getNumPartitions())
partition size : 10
5.3. repartitionAndSortWithinPartitions
- 키와 값으로 구성된 RDD에서 사용 가능
- RDD를 구성하는 모든 데이터를 특정 기준에 따라 여러 개의 파티션으로 분리하고 각 파티션 단위로 정렬을 수행한 뒤 새로운 RDD를 생성해 주는 메서드임
- 각 데이터가 어떤 파티션에 속할지 결정하기 위한 파티셔너(org.apache.spark.Partitioner)설정
- 키 값을 이용하여 어떤 파티션에 속할지 결정할 뿐만 아니라 키 값을 이용한 정렬도 수행함
- 파티션 재할당을 위해 셔플을 수행하는 단계에서 정렬도 함께 다루게 되어 파티션과 정렬을 각각 따로하는 것에 비해 더 높은 성능을 발휘할 수 있음
- 10개의 무작위 숫자를 위 메서드를 이용해 3개의 파티션으로 분리해 보는 예제
>>> data = [random.randrange(1,100) for i in range(0,10)]
>>> rdd1 = sc.parallelize(data).map(lambda v:(v,"-"))
>>> rdd2 = rdd1.repartitionAndSortWithinPartitions(3, lambda x:x)
>>> rdd2.foreachPartition(lambda values:print(list(values)))
[(50, '-')]
[(16, '-'), (52, '-'), (61, '-'), (67, '-')]
[(6, '-'), (12, '-'), (48, '-'), (51, '-'), (87, '-')]
- pyspark에서 repartitionAndSortWithinPartitions에서 default 파티셔너는 hash 파티셔너로 되어있음
- foreachPartition은 partition단위로 특정함수를 실행해주는 메서드임. 위의 예제에서는 파티션단위로 파티션에 속해있는 값을 프린트해주는 함수를 실행했음
5.4. partitionBy
- 키와 값으로 구성된 RDD에서 사용가능
- 파티션을 변경하고 싶을 때 사용가능
- 기본적으로, hashpartitioner와 rangepartitioner가 있음
- org.apache.spark.partitioner 클래스를 상속해서 파티셔너를 커스터마이징도 가능
>>> rdd1 = sc.parallelize([("apple",1),("mouse",1),("monitor",1)],5)
>>> rdd1.collect()
[('apple', 1), ('mouse', 1), ('monitor', 1)]
>>> rdd2 = rdd1.partitionBy(3)
>>> print("rdd1: %d, rdd2: %d" %(rdd1.getNumPartitions(), rdd2.getNumPartitions()))
rdd1: 5, rdd2: 3
- 위의 예제에서 partitionby에 의해 파티션 갯수가 변경된 것을 확인할 수 있음
6. 필터와 정렬 연산
특정 조건을 만족하는 요소만 선택하거나, 각 요소를 정해진 기준에 따라 정렬함
6.1. filter
- RDD의 각각 요소에 조건에 따라 True/False로 가려내는 함수를 적용하여 True에 해당하는 요소만 걸러냄
>>> rdd1 = sc.parallelize(range(1,6))
>>> rdd2 = rdd1.filter(lambda i:i>2)
>>> print(rdd2.collect())
[3, 4, 5]
6.2. sortByKey
- 키 값을 기준으로 요소를 정렬하는 연산임
- 따라서, 키와 값으로 구성된 RDD에 적용 가능함
>>> rdd1 = sc.parallelize([("q",1),("z",1),("a",1)])
>>> result = rdd1.sortByKey()
>>> print(result.collect())
[('a', 1), ('q', 1), ('z', 1)]
6.3. keys, values
- 키와 값으로 구성된 RDD에 적용 가능함
- keys는 RDD의 키 요소로 구성된 RDD를 생성하고, values는 RDD의 value요소로 구성된 RDD를 생성함
>>> rdd1 = sc.parallelize([("q",1),("z",1),("a",1)])
>>> rdd2 = rdd1.keys()
>>> rdd3 = rdd1.values()
>>> print(rdd2.collect(), rdd3.collect())
['q', 'z', 'a'] [1, 1, 1]
6.4. sample
- 샘플을 추출하는 RDD메서드
- sample(withReplacement, fraction, seed=None)
- withReplacement : True/False복원추출 결정
- fraction
- 복원추출인 경우, RDD각 요소당 평균 추출횟수를 의미함
- 비복원추출인 경우, RDD각 요소당 샘플될 확률을 의미함
- fraction이 sample사이즈를 결정하는 것은 아님. 아래 예제를 보면, sample사이즈는 random한 것을 알 수 있음.
>>> rdd1 = sc.parallelize(range(100))
>>> rdd2 = rdd1.sample(True, 1.5, seed=1234)
>>> rdd3 = rdd1.sample(False, 0.2, seed=1234)
>>>
>>> rdd2.collect()
[1, 1, 2, 6, 6, 7, 7, 9, 10, 10, 11, 12, 12, 12, 13, 15, 17, 18, 19, 19, 19, 20, 21, 21, 23, 24, 25, 25, 26, 26, 26, 26, 26, 27, 28, 28, 28, 29, 30, 31, 32, 33, 33, 34, 35, 36, 36, 36, 37, 37, 38, 39, 39, 42, 42, 44, 44, 45, 45, 46, 48, 48, 48, 49, 49, 49, 50, 50, 50, 51, 53, 54, 57, 58, 59, 60, 63, 64, 65, 65, 66, 69, 71, 71, 71, 72, 72, 72, 73, 73, 75, 76, 77, 79, 80, 80, 81, 84, 84, 85, 85, 85, 86, 88, 88, 88, 89, 89, 89, 90, 90, 91, 91, 92, 92, 92, 94, 94, 95, 95, 95, 95, 96, 97, 97, 99]
>>> rdd3.collect()
[0, 5, 6, 7, 8, 11, 15, 35, 39, 41, 55, 56, 58, 61, 62, 71, 72, 78, 81, 89, 90, 93, 97, 99]
이상으로 본 포스팅을 마치겠습니다. 다음 포스팅은 <RDD, Resilient Distributed Dataset에 대하여[3] - RDD액션, RDD데이터 불러오기와 저장하기> 에 대해 진행하도록 하겠습니다.