research blog for data science

RDD, Resilient Distributed Dataset에 대하여[3] - RDD액션, RDD 데이터 불러오기와 저장하기, 공유변수

|

이번 포스팅은 지난 포스팅 <RDD, Resilient Distributed DataSet에 대하여[2] - RDD기본액션, RDD트랜스포메이션> 에 이어서 진행하도록 하겠습니다. 교재는 빅데이터 분석을 위한 스파크2 프로그래밍을 참고하였습니다.


2.1.6 RDD 액션

RDD트랜스포메이션 연산은 느긋한 평가(lazy evaluation) 또는 지연 계산 방식을 따릅니다. 이는 계산에 필요한 정보를 누적하다가 계산이 필요한 시점이 돼서야 계산을 수행하는 방식을 뜻합니다. 여기서 계산이 필요한 시점은 RDD 액션 메서드가 호출된 시점입니다. RDD 액션 메서드가 호출이 되어야 비로소 RDD 트랜스포메이션 연산이 수행되게 됩니다.

1. 출력 연산

1.1. first

  • RDD 요소 중 ,첫번째 요소를 돌려줌
>>> rdd = sc.parallelize(range(50))
>>> result = rdd.first()
>>> print(result)
0

1.2. take

  • RDD 요소중, n번째까지 요소를 돌려줌
>>> rdd = sc.parallelize(range(50))
>>> result = rdd.take(5)
>>> print(result)
[0, 1, 2, 3, 4]

1.3. takeSample

  • 지정된 크기의 sample을 추출해서 리스트, 배열 타입등으로 반환함
  • sample 메서드와의 차이점
    • sample 메서드는 RDD 트랜스포메이션 메서드이고, 크기를 지정할 수 없음.
  • takeSample(withReplacement, num, seed=None)
>>> rdd = sc.parallelize(range(100))
>>> result = rdd.takeSample(False, 3)
>>> result
[55, 23, 45]

1.5. countByValue

  • RDD의 요소들이 나타낸 횟수를 맵 형태로 돌려주는 메서드
>>> rdd = sc.parallelize([1,1,3,2,1,2,2,1,1,4,5,3,2,3])
>>> result = rdd.countByValue()
>>> print(result)
defaultdict(<class 'int'>, {1: 5, 3: 3, 2: 4, 4: 1, 5: 1})

1.6. reduce

  • reduce 메서드 인자는 함수가 들어감.
  • 그 함수는 교환법칙과 결합법칙이 성립하는 함수여야 함.
  • 따라서, 메서드 인자로 받은 함수를 이용해서 하나의 요소로 합치는 메서드임.
  • def reduce(f: (T,T)=>T):T
    • 동일한 타입 2개를 입력으로 받아, 같은 타입으로 반환해주는 메서드임
  • 실제 구현은 파티션단위로 나눠져서 처리됨. 분산 프로그램이기 때문임.
>>> from operator import add
>>> add(1,2)
3
>>> sc.parallelize([1,2,3,4,5]).reduce(add)
15

1.7. fold

  • reduce와 동일하나, 초기값을 설정할 수 있음
  • def fold(zeroValue: T)(op: (T,T)=>T):T
  • 그런데 유의할 점은 파티션단위로 나뉘어서 처리하기 때문에, 파티션단위로 처리할 때마다 초깃값을 이용하여 연산이 수행됨. 따라서, 더하기 연산을 할 땐 항등원인 0을, 곱셈 연산을 할 땐 항등원인 1을 초깃값으로 설정하는 것이 좋음
>>> rdd = sc.parallelize(range(1,11), 3)
>>> rdd.fold(1, add)
59 #값이 55가 아니라 59가 나오는 것을 확인할 수 있음. 
reduce와 fold차이
#product.py
class Product:
    def __init__(self, price):
        self.price = price
        self.count = 1
def addPriceandCount(p1, p2):
    p1.price += p2.price
    p1.count += 1
    return p1 #return을 p1인 이유 --> 입력값과 출력값의 타입이 동일해야 함.

if __name__ =='__main__':
    conf = SparkConf()
    conf.set("spark.driver.host", "127.0.0.1")
    sc = SparkContext(master="local[*]", appName="RDDOpSample", conf=conf)

    rdd = sc.parallelize([Product(300), Product(200), Product(100)], 10)

    #reduce
    result = rdd.reduce(addPriceandCount)
    print(result.price, result.count)

    #fold
    result = rdd.fold(Product(0), addPriceandCount)
    print(result.price, result.count)
  • fold의 count합을 보면 11인 것을 알 수 있음. 그 이유는 위에서 파티션 개수를 10으로 지정하였고, 파티션 단위로 연산을 초기값을 이용하여 연산을 수행하기 때문임

1.8. aggregate

  • 입력와 출력의 타입이 다른 경우 사용 가능
  • def aggregateU(seqOp:(U,T)=>U, combOp:(U,U)=>U):U
    • 크게 세가지 인자를 받음. 첫번째는 초깃값으로 fold와 동일
    • aggregate은 병합을 크게 2단계로 구성되는데, 1단계는 seqOp에 의해, 2단계는 combOp에 의해 진행됨
    • seqOp는 초깃값과 동일한 타입(U)과 RDD요소 타입(T)가 입력되어 병합 결과 초깃값과 동일한 타입인 U가 반환됨
    • combOp는 최종병합에서 사용됨
#rdd에 속한 요소들의 평균을 aggregate을 이용하여 구하는 예제
#record.py
class Record:

    def __init__(self, amount, number=1):
        self.amount = amount
        self.number = number

    def addAmt(self, amount):
        return Record(self.amount + amount, self.number + 1)

    def __add__(self, other):
        amount = self.amount + other.amount
        number = self.number + other.number
        return Record(amount, number)

    def __str__(self):
        return "avg:" + str(self.amount / self.number)

    def __repr__(self):
        return 'Record(%r, %r)' % (self.amount, self.number)
def seqop(r,v):
    return r.addAmt(v)

if __name__ =='__main__':
    conf = SparkConf()
    conf.set("spark.driver.host", "127.0.0.1")
    sc = SparkContext(master="local[*]", appName="RDDOpSample", conf=conf)

    rdd = sc.parallelize([100,80,75,90,95], 3)

    #aggregate
    result = rdd.aggregate(Record(0,0), seqop, lambda r1, r2:r1+r2)
    print(result) # avg:88.0

1.9. sum

  • 모든 요소의 합을 구해주며, Double, Long등 숫자타입인 경우에만 사용가능
>>> rdd = sc.parallelize(range(1,11))
>>> rdd.sum()
55

1.10. foreach, foreachPartition

  • foreach는 RDD의 개별요소에 전달받은 함수를 적용하는 메서드이고, foreachPartition은 파티션 단위로 적용됨
  • 이때 인자로 받는 함수는 한개의 입력값을 가지는 함수임
  • 이 메서드를 사용할 때 유의할 점은 드라이버 프로그램(메인 함수를 포함하고 있는 프로그램)이 작동하고 있는 서버위가 아니라 클러스터의 각 개별 서버에서 실행된다는 것
  • 따라서 foreach() 인자로 print함수를 전달한다는 것은 각 서버의 콘솔에 출력하라는 의미가 됨.
def sideEffect(values):
    print("partition side effect")
    for v in values:
        print("value side effect : %s" %v)

if __name__ =='__main__':
    conf = SparkConf()
    conf.set("spark.driver.host", "127.0.0.1")
    sc = SparkContext(master="local[*]", appName="RDDOpSample", conf=conf)

    rdd = sc.parallelize(range(1,11),3)
    result = rdd.foreach(lambda v:print("value side effect: %s" %v))
    result2 = rdd.foreachPartition(sideEffect)
###
value side effect: 2
value side effect: 3
value side effect: 4
value side effect: 5
value side effect: 6
value side effect: 7
value side effect: 8
value side effect: 9
value side effect: 10
partition side effect
value side effect : 7
value side effect : 8
value side effect : 9
value side effect : 10
partition side effect
value side effect : 4
value side effect : 5
value side effect : 6
partition side effect
value side effect : 1
value side effect : 2
value side effect : 3
###

1.11. toDebugString

  • 디버깅을 위한 메서드. RDD파티션 개수나 의존성 정보 등 세부 정보 알고 싶을 때 사용
>>> rdd = sc.parallelize(range(1,100), 10).persist().map(lambda v:(v,1)).coalesce(2)
>>> rdd.toDebugString()
b'(2) CoalescedRDD[65] at coalesce at NativeMethodAccessorImpl.java:0 []\n |  PythonRDD[64] at RDD at PythonRDD.scala:53 []\n |  PythonRDD[63] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[62] at parallelize at PythonRDD.scala:195 []'

1.12. cache, persist, unpersist

  • rdd액션 연산이 수행될때마다 RDD 생성 히스토리를 이용해 복구하는 단계를 수행하지만 너무나 번거로움
  • 따라서 반복적으로 사용되는 RDD인 경우 메모리에 저장해서 사용함
  • cache와 persist는 rdd정보를 메모리 또는 디스크에 저장해서 다음 액션을 수행 시 다시 rdd를 생성하는 단계를 거치지 않음
  • unpersist는 저장된 메모리가 더이상 필요없을 시 취소할 때 사용

RDD 데이터 불러오기와 저장하기

스파크는 하둡 API기반이라서 다양한 데이터 포맷과 파일을 지원합니다.

  • 파일 포맷 : 텍스트파일, JSON, 하둡의 시퀀스파일, csv
  • 파일 시스템 : 로컬 파일 시스템, 하둡파일시스템(HDFS), AWS의 S3, 오픈스택의 Swift등
    • 파일시스템이란 ? 컴퓨터에서 파일이나 자료를 쉽게 발견할 수 있도록 유지 관리하는 방법임. 즉, 저장매체에는 많은 파일이 있으므로, 이러한 파일을 관리하는 방법을 말함. 파일을 빠르게 읽기, 쓰기, 삭제 등 기본적인 기능을 원활히 수행하기 위한 목적임

1. 텍스트 파일

rdd = sc.textFile("file:////Users/ralasun/Desktop/ralasun.github.io/_posts/2020-07-11-introRL(1).md")
>>> rdd.collect()
['---', 'layout : post', 'title: Reinforcement Learning 소개[1]', 'category: Reinforcement Learning', 'tags: cs234 reinforcement-learning david-silver sutton', '---', '', '이번 포스팅은 강화학습이 기존에 알려진 여러 방법론들과의 비교를 통한 강화학습 특성과 구성요소를 다룹니다. ...```
  • “file:///”처럼 ///를 세개 작성해야 함. HDFS와 구별하기 위해서임
  • 또한 클러스터내 각 서버에서 동일한 경로를 통해 지정한 파일에 접근이 가능해야 함
  • sc.textFile(path, n)에서, n을 통해 파티션 개수 정할 수 있음
#save
rdd.saveAsTextFile("<path_to_save>/sub1")

#save(gzip)
rdd.saveAsTextFile("<path_to_save>/sub1", codec)
  • 위와 같이 rdd를 text파일로도 저장이 가능함. 두번째는 압축을 사용하는 방법임

2. 오브젝트 파일

  • 오브젝트 직렬화 방법으로 RDD를 저장함. python의 경우, pickle형태로 저장함
  • 텍스트파일도 가능함
>>> rdd = sc.parallelize(range(1,1000),3)
>>> rdd.saveAsPickleFile("/Users/ralasun/Desktop/pythonpickle.pkl")
>>> rdd2 = sc.pickleFile("/Users/ralasun/Desktop/pythonpickle.pkl")       
>>> rdd2.take(2)
[667, 668]

3. 시퀀스 파일

  • 시퀀스파일이란, 키와 값으로 구성된 데이터를 저장하는 이진 파일 포맷으로, 하둡에서 자주 사용됨
  • 오브젝트 파일과의 차이점은 오브젝트 파일은 RDD에 포함된 각 데이터가 serializable 인터페이스를 구현하고 있어야 하는 것처럼 시퀀스 파일로 만들고 싶은 RDD가 하둡의 writable 인터페이스를 구현하고 있어야 함.
  • saveAsNewAPIHadoopFile(path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None)
    • sequence파일로 저장하기 위해선 outputFormatClass에 문자열의 형태로 하둡내 시퀀스포맷의 풀네임을 작성해야 함. keyclass와 valueclass도 마찬가지임. 이렇게 하는 이유는 하둡의 writable 인터페이스를 구현해야 할 객체가 필요하기 때문임.
    • 따라서 내부에서는 keyclass와 valueclass 인자에 전달한 포맷으로 rdd를 변환한 뒤 sequencefile포맷으로 저장하는 작업을 거치는 것임
 path = "/Users/ralasun/Desktop/ppkl"
>>> outputFormatClass = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"
>>> inputformatClass = "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"
>>> keyClass = "org.apache.hadoop.io.Text"
>>> valueClass = "org.apache.hadoop.io.IntWritable"
>>> rdd = sc.parallelize(["a","b","c","b","c"])
>>> rdd2 = rdd.map(lambda x:(x,1))
>>> rdd2.collect()
[('a', 1), ('b', 1), ('c', 1), ('b', 1), ('c', 1)]
>>> rdd2.saveAsNewAPIHadoopFile(path, outputFormatClass, keyClass, valueClass)
rdd3 = sc.newAPIHadoopFile(path, inputformatClass, keyClass, valueClass)
>>> for k, v in rdd3.collect():
...     print(k,v)
... 
a 1
b 1
b 1
c 1
c 1

클러스터 환경에서의 공유 변수

클러스터 환경에서 하나의 잡을 수행하기 위해 다수의 서버가 여러 개의 프로세스를 실행합니다. 따라서, 여러 프로세스가 공유할 수 있는 자원을 관리(읽기/쓰기 자원)할 수 있도록 스파크는 지원하는데, 브로드캐스트 변수와 어큐뮬레이터라 합니다.

브로드캐스트 변수(Broadcast Variables)
  • 스파크 잡이 실행되는 동안 클러스터 내의 모든 서버에서 공유할 수 있는 읽기전용 자원을 설정할 수 있는 변수임
  • 예를 들어, 온라인 쇼핑몰에서 사용자 ID와 구매 정보가 담긴 10TB짜리 로그를 분석할 때, 우리가 찾고자 하는 사용자 ID목록이 담긴 세트 컬렉션 타입의 데이터를 공유 변수로 설정해 각 서버에서 로그를 처리하면서 현재 처리하려는 로그가 우리가 찾고자 하는 로그가 맞는지 확인하는 용도로 사용 가능함
>>> bu = sc.broadcast(["u1","u2"])
#1. sparkcontext의 broadcast인자를 이용해서 broadcast변수 생성

>>> rdd = sc.parallelize(["u1","u2","u3","u4","u5","u6"],3)
>>> result = rdd.filter(lambda v: v in bu.value)
#2. broadcast변수 요소 접근시 value매서드를 이용

>>> result.collect()
['u1', 'u2']
어큐뮬레이터(Accumulators)
  • 어큐뮬레이터는 쓰기 동작을 위한 것임
  • 예를 들어, 온라인 쇼핑몰에서 사용자 접속 로그파일을 각 서버에서 취합해서 분석하는 경우임
  • 또한 다수의 서버로 구성된 클러스터 환경에서 오류가 발생 했을 시, 어느 프로세스에서 오류가 난건지 확인이 필요함. 그러기 위해선 에러 정보를 한곳에 모아서 볼 수 있는 방법이 필요함.
  • 어큐뮬레이터는 이렇게 클러스터내의 모든 서버가 공유하는 쓰기 공간을 제공해서, 각 서버에서 발생하는 이벤트나 정보를 모아두는 용도로 사용함.
#accumulator 기본 예제
def accumulate(v, acc):
    if(len(v.split(":")) !=2):
        acc.add(1)

if __name__ =='__main__':
    conf = SparkConf()
    conf.set("spark.driver.host", "127.0.0.1")
    sc = SparkContext(master="local[*]", appName="RDDOpSample", conf=conf)

    acc1 = sc.accumulator(0)
    data = ["U1:Addr1", "U2:Addr2", "U3", "U4:Addr4", "U5:Addr5","U6:Addr6", "U7"]
    rdd = sc.parallelize(data)
    rdd.foreach(lambda v : accumulate(v, acc1))
    print(acc1.value)
  • 파이썬의 경우 어큐뮬레이터의 이름 지정 불가능함
  • 기본 제공하는 어큐뮬레이터는 sparkcontext의 accumulator 메서드를 이용하는데, 초깃값으로 정수, 실수, 복소수 타입중 하나여야 함. 따라서, 사용자 정의 데이터 타입에 대한 어큐뮬레이터는 아래와 같이 사용해야 함.
from pyspark import AccumulatorParam
from record import Record
from builtins import isinstance

class RecordAccumulatorParam(AccumulatorParam):

    def zero(self, initialValue):
        return Record(0)

    def addInPlace(self, v1, v2):
        if(isinstance(v2, Record)):
            return v1+v2
        else:
            return v1.addAmt(v2)

def accumulate(v, acc):
    if(len(v.split(":"))!=2):
        acc.add(1)

if __name__ =='__main__':
    conf = SparkConf()
    conf.set("spark.driver.host", "127.0.0.1")
    sc = SparkContext(master="local[*]", appName="RDDOpSample", conf=conf)

    acc = sc.accumulator(Record(0), RecordAccumulatorParam())
    data = ["U1:Addr1", "U2:Addr2", "U3", "U4:Addr4", "U5:Addr5","U6:Addr6", "U7"]
    rdd = sc.parallelize(data)
    rdd.foreach(lambda v: accumulate(v, acc))
    print(acc.value.amount) #>> 2
#AccumulatorParam에 대한 pyspark Document

class pyspark.AccumulatorParam
# Helper object that defines how to accumulate values of a given type.

	addInPlace(value1, value2)
	# Add two values of the accumulator’s data type, returning a new value; for efficiency, can also update value1 in place and return it.

	zero(value)
	# Provide a “zero value” for the type, compatible in dimensions with the provided value (e.g., a zero vector)
  • Recordclass타입에 대한 accumulator를 작성한 것임.

  • 어큐뮬레이터 사용시 주의할 점 두 가지

      1. 어큐뮬레이터를 증가시키는 동작은 클러스터 내 모든 서버에서 가능하나, 어큐뮬레이터 내 데이터를 읽는 동작은 드라이버 프로그램 내에서만 가능
        • transformation 또는 action 연산 내부에서는 어큐뮬레이터를 증가시킬 수 있으나, 그 값을 참조해서 사용은 불가능하다는 것을 뜻함.
        • 어큐뮬레이터는 액션 메서드 내에서만 수행하는 것이 좋음. 트렌스포메이션은 여러번 수행될 수 있기 때문에 집계가 잘못될 수 있음

이상으로 본 포스팅을 마치겠습니다.

Comments