research blog for data science

RDD, Resilient Distributed DataSet에 대하여[1]

|

이번 포스팅은 “빅데이터 분석을 위한 스파크2 프로그래밍 - Chaper2. RDD” 를 읽고 정리하였습니다. 정리 순서는 책 순서와 동일하고, 책을 읽어가면서 이해가 안되는 부분을 추가적으로 정리하였습니다.

2.1 RDD

2.1.1 들어가기에 앞서

RDD를 공부하기 전 기억하고 넘어가야 할 것들에 대해 정리하였습니다.

1. 스파크 클러스터

클러스터란 여러 대의 서버가 마치 한대의 서버처럼 동작하는 것을 뜻합니다. 스파크는 클러스터 환경에서 동작하며 대량의 데이터를 여러 서버에서 병렬 처리합니다

2. 분산 데이터로서의 RDD

RDD는 Resilient Distrubuted Datasets으로, ‘회복력을 가진 분산 데이터 집합’이란 뜻입니다. (Resilient : 회복력이 있는) 데이터를 처리하는 과정에서 문제가 발생하더라도 스스로 복구할 수 있는 것을 의미합니다. 이는 그 다음 설명 트랜스포메이션과 액션지연(lazy) 동작과 최적화 부분과 함께 다시 설명드리도록 하겠습니다.

3. 트랜스포메이션과 액션

RDD가 제공하는 연산은 크게 트랜스포메이션과 액션이 있습니다. “연산”은 흔히 “메서드”로 이해하시면 됩니다.
트랜스포메이션은 RDD의 변형을 일으키는 연산이고, 실제로 동작이 수행되지는 않습니다.

그림 1. RDD 예시

그림 2.RDD 예시(2)

아래 예시를 보면, 데이터를 읽어 RDD를 생성해서 file변수에 저장한 뒤, flatMap -> map -> reduceByKey 함수를 거치면서 RDD[2], RDD[3], RDD[8]을 새로 생성하는 것을 볼 수 있습니다. 이렇게 transformation을 이전 RDD를 변형해서 새로운 RDD를 생성하는 것입니다.

반면에, action은 동작을 수행해서 원하는 타입의 결과를 만들어내는 것이므로, saveAsTextFile로 수행됩니다. 따라서, saveAsTextFile은 action 연산에 해당됩니다.

4. 지연 동작과 최적화

지연 동작이란, 액션 연산이 수행되기 전까지 실제로 트랜스포메이션 연산을 수행하지 않는 것입니다. 이는 RDD의 특성 중 하나인 ‘회복력’과 관련있습니다. 액션 연산이 수행되기 전까지 동작이 지연이 되는데, 대신에 RDD가 생성되는 방법을 기억하는 것입니다. 따라서 문제가 발생하더라도 기존에 RDD가 생성되는 방법을 기억하여 연산 수행에 문제가 없도록 하는 것입니다. 이는 위의 예시에서 reduceByKey까지는 실제로 트랜스포메이션 연산을 수행하는 것이 아니라 해당 연산을 순서대로 기억해놨다가, saveAsFile연산이 수행될 때(액션 연산이 수행될 때) 비로소 트랜스포메이션 연산도 수행된 것입니다.

지연 동작 방식의 큰 장점은 실행계획의 최적화입니다.

RDD의 불변성

오류로 인해 스파크의 데이터가 일부 유실되면, 데이터를 다시 만들어내는 방식으로 복구되는 것이 RDD의 불변성입니다. 이는 위에서 계속 언급한 “회복력”과 관련됩니다.

RDD는 RDD1->RDD2-> … 가 되면서 한번 만들어진 RDD는 내용이 변경되지 않습니다. RDD를 만드는 방법을 기억해서 문제가 발생 시 언제든지 똑같은 데이터를 생성할 수 있습니다.

5. 파티션과 HDFS

  • RDD데이터는 클러스터를 구성하는 여러 서버에 나뉘어서 저장됨
  • 이 때, 분할된 데이터를 파티션 단위로 관리합니다.
  • HDFS는 하둡의 파일 시스템(hadoop distributed file system)
  • 스파크는 하둡 파일 입출력 API에 의존성을 가지고 있음.

6. Job, Executor, 드라이버 프로그램

  • Job : 스파크 프로그램 실행하는 것 = 스파크 잡(job)을 실행하는 것
  • 하나의 잡은 클러스터에서 병렬로 처리됨
  • 이 때, 클러스터를 구성하는 각 서버마다 executor라는 프로세스가 생성
  • 각 executor는 할당된 파티션 데이터를 처리함
  • 드라이버란 ? 스파크에서 잡을 실행하는 프로그램으로, 메인함수를 가지고 있는 프로그램
  • 드라이버에서 스파크 컨테스트를 생성하고 그 인스턴스를 포함하고 있는 프로그램
  • 스파크컨테스트를 생성해 클러스터의 각 워커 노드들에게 작업을 지시하고 결과를 취합하는 역할을 수행
  • 아래 코드를 보면, main함수 안에 sparkcontext를 생성하고 sc라는 인스턴스를 포함하고 있는 것을 볼 수 있음. 즉, main함수를 가지고 있는 프로그램이 ‘드라이버’에 해당됨
Public static void main(String[] args){
	...
	JavaSparkContext s c = getSparkContext("WordCount", args[0]);
	...}

7. 함수의 전달

  • 스파크는 함수형 프로그래밍 언어인 스칼라로 작성되어 “함수”를 다른 함수의 “매개변수”로서 전달 가능
  • 아래 예제(Scala)를 보면 map의 인자에 ‘_+1’이 전달되는데, 익명 함수로 전달되는 것임
val rdd1 = sc.paralleize(1 to 10)
val rdd2 = rdd1.map(_+1)
  • 파이썬으로 작성하면 아래와 같이, lambda 함수가 매개변수로 들어가게 됨
rdd1.map(lambda v:v+1)

[참고]함수형 프로그래밍

함수형 프로그래밍과 객체 지향 프로그래밍의 차이를 통해 이해해보겠습니다. 객체 지향 프로그래밍은 객체 안에 상태를 저장하고, 해당 상태를 이용해서 제공할 수 있는(메소드)를 추가하고 상태변화를 ‘누가 어디까지 볼 수 있게 할지’를 설정하고 조정합니다. 따라서 적절한 상태 변경이 되도록 구성합니다. 반면에 함수형 프로그래밍은 상태 변경을 피하며 함수 간의 데이터 흐름을 사용합니다. 입력은 여러 함수들을 통해 흘러 다니게 됩니다. 따라서, 함수의 인자로 함수가 들어오고 반환의 결과로도 함수가 나올 수 있습니다.

함수 전달 시 유의할 점

Class PassingFunctionSample{
	val count=1
	def add(I: int):Int={
	count+i
	}
	
	def runMapSample(sc:SparkContext){
	val rdd1 = sc.parallelize(1 to 10);
	val rdd2 = rdd1.map(add)}
	}

위와 같이 코드를 작성해서 실행하면, ‘java.io.NotSerializaionException’이라는 오류가 발생합니다. 이는 전달된 add함수가 클러스터를 구성하는 각 서버에서 동작할 수 있도록 전달되어야 하는데, 전달이 안되기 때문입니다. 그 이유는 add함수는 PassingFunctionSample의 메소드로 결국 클래스 PassingFunctionSample이 전체 다 전달되기 때문입니다. 해당 클래스는 Serializable 인터페이스를 구현하지 않습니다. 즉, 클래스가 각 서버에 전달될 수 있는 기능을 가지고 있지 않는 것입니다. 함수만 따로 전달되어야 하는 것입니다.

스칼라 같은 경우 ‘싱글톤 객체’를 이용하여 해결 할 수 있습니다. 파이썬의 예제도 살펴보면, 아래는 클래스 전체가 전달되는 잘못된 예입니다.

class PassingFunctionSample():

    def add1(self, i):
        return i + 1

    def runMapSample1(self, sc):
        rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
        rdd2 = rdd1.map(self.add1) 
        # rdd2 = rdd1.map(add2)
        print(", ".join(str(i) for i in rdd2.collect()))

self로 인해 전체 클래스가 전달됩니다.(파이썬은 예외없이 실행되므로 유의할 것!)

class PassingFunctionSample():

    @staticmethod
    def add1(self, i):
        return i + 1

    def runMapSample1(self, sc):
        rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
        rdd2 = rdd1.map(add2)
        print(", ".join(str(i) for i in rdd2.collect()))


if __name__ == "__main__":

    def add2(i):
        return i + 1

    conf = SparkConf()
    sc = SparkContext(master="local[*]", appName="PassingFunctionSample", conf=conf)

    obj = PassingFunctionSample()
    obj.runMapSample1(sc)

위와 같이 함수 add2가 독립적으로(클래스 전체가) 전달될 수 있도록 해야합니다.

변수 전달 시 유의할 점

class PassingFunctionSample {

	var increment = 1

  def runMapSample3(sc: SparkContext) {
    val rdd1 = sc.parallelize(1 to 10)
    val rdd2 = rdd1.map(_ + increment) \\익명함수 전달
    print(rdd2.collect.toList)
  }

  def runMapSample4(sc: SparkContext) {
    val rdd1 = sc.parallelize(1 to 10)
    val localIncrement = increment
    val rdd2 = rdd1.map(_ + localIncrement)
    print(rdd2.collect().toList)
  }
}

runMapSample3 처럼 변수가 직접 전달되면 안되고, runMapSample4처럼 지역변수로 변환해서 전달해야 합니다. 그래야 나중에 변수가 변경되어 생기는 문제를 방지할 수 있습니다.

데이터 타입에 따른 RDD 연산

RDD 연산 함수에서 인자 타입을 보고 적절하게 맞는 연산 함수를 사용해야 합니다.


이상으로 <RDD, Resilient Distributed DataSet에 대하여[1]> 마치겠습니다. 다음 포스팅에서 이어가도록 하겠습니다.


  1. 함수형 언어, https://sungjk.github.io/2017/07/17/fp.html, https://docs.python.org/ko/3/howto/functional.html

Comments