research blog for data science

Introduction to Graph Neural Network - GNN 소개 및 개념

|

이번 포스팅을 시작으로, Graph Neural Network(GNN)에 대해 본격적으로 다루도록 하겠습니다. 이번 포스팅은 Graph Neural Network가 나온 배경 및 기본적인 구조와 개념에 대해 다루도록 하겠습니다.


우리가 흔히 많이 보는 데이터의 종류로는 이미지, 정형 데이터, 텍스트가 있습니다. 이미지는 2-D grid 형식인 격자 형식을 가지며, 정형 테이터는 테이블 형태를 띕니다. 또한 텍스트는 1-D sequence로 생각할 수 있습니다. 즉, 이들 데이터는 ‘격자’의 모양으로 표현할 수 있으며 이는 Euclidean space 상에 있는 것을 뜻합니다.

그림 1. Euclidean space vs. Non-Euclidean space

그림 2. 3D mesh 이미지

그러나, social network 데이터, molecular 데이터, 3D mesh 이미지 데이터(그림 2.)는 ‘비’ Eculidean space 데이터입니다. 그렇다면 기존 CNN과 RNN계열의 모델과 다르게 이런 형태의 데이터를 처리할 수 있는 새로운 모델이 필요합니다. 그것이 바로 Graph Neural Network 입니다.

What is graph?

GNN을 본격적으로 시작하기 전에 그래프에 대해서 알아보도록 하겠습니다. 그래프란 $G = (N, E)$ 로 구성된 일종의 자료 구조입니다. V는 노드들의 집합이고, E는 노드 사이를 연결하는 엣지들의 집합입니다. 노드에는 일반적으로 데이터의 정보가 담겨있고, 엣지는 데이터 간의 관계 정보가 포함되어 있습니다. 또한, 아래와 같은 그래프 형태를 ‘undirected graph’ 라고도 합니다.

그림 3. graph

directed graph

그림 4. directed graph

방향 그래프란 엣지가 방향성을 가지는 그래프입니다. 아래 그림에서 $V_2$ 에서 $V_1$ 으로 향하는 엣지 $e_1$ 이 있다면, $V_2$ 를 predecessor, $V_1$ 을 sucessor 라고 부릅니다. 그리고 $e_1$ 을 $V_2$ 의 outgoing edge, $V_1$ 의 incoming edge 라고 합니다.

그렇다면, 이러한 그래프를 네트워크의 인풋으로 넣기 위해선 행렬 형태로 표현해야 합니다. 따라서 그래프를 표현하기 위한 방법으로는 adjacency matrix, degree matrix, laplacian matrix가 있습니다.

그림 5. degree vs. adjacency vs. laplacian

Adjacency matrix

adjacency 행렬은 그래프 노드의 개수가 N개라면, NxN 정사각 행렬입니다. i노드와 j노드가 연결되어 있으면 $A_{ij} = 1$ 아니면 $A_{ij} = 0$ 의 성분을 가집니다.

Degree matrix

Degree 행렬은 그래프 노드의 개수가 N개라면 NxN 크기를 가지는 대각행렬입니다. 각 꼭짓점의 차수에 대한 정보를 포함하고 있는 행렬로, 꼭짓점의 차수란 꼭짓점와 연결된 엣지의 갯수를 말합니다.

[D_{i,j} = \begin{cases} deg(v_i) \quad if \,\, i=j
0 \quad otherwise \end{cases}]

Laplacian matrix

adjacency 행렬은 노드 자신에 대한 정보가 없습니다. 그에 반해 laplacian 행렬은 노드와 연결된 이웃노드와 자기 자신에 대한 정보가 모두 포함된 행렬입니다. laplacian 행렬은 degree 행렬에서 adjacency 행렬을 빼준 것입니다.

[L = D - A]

[L_{i,j} = \begin{cases} deg(v_i) \quad if \,\, i=j
-1 \quad if \,\, i \neq j
0 \quad otherwise \end{cases}]

Motivation : GNN $\approx$ CNN

다시 GNN으로 돌아오겠습니다. GNN의 아이디어는 Convolutional Neural Network(CNN)에서 시작되었습니다. CNN은 아래와 같은 특징을 가지고 있습니다.

  • local connectivity
  • shared weights
  • use of Multi-layer

위와 같은 특징 때문에, CNN은 spatial feature를 계속해서 layer마다 계속해서 추출해 나가면서 고차원적인 특징을 표현할 수 있습니다. 위와 같은 특징은 마찬가지로 graph 영역에도 적용할 수 있습니다.

그림 6. GNN $\approx$ CNN

Local Connectivity

<그림 3.> 을 보면, CNN과 GNN의 유사한 점을 확인할 수 있습니다. 먼저, graph도 한 노드와 이웃노드 간의 관계를 local connectivity라 볼 수 있기 때문에, 한 노드의 특징을 뽑기 위해서 local connection에 있는 이웃노드들의 정보만 받아서 특징을 추출할 수 있습니다. 즉, CNN의 filter의 역할과 유사합니다.

Shared Weights

또한 이렇게 graph 노드의 특징을 추출하는 weight은 다른 노드의 특징을 추출하는데도 동일한 가중치를 사용할 수 있어(shared weight), computational cost를 줄일 수 있습니다.

Use of Multi-layer

CNN에서 multi layer 구조로 여러 레이어를 쌓게 되면 초반에는 low-level feature위주로 뽑고, 네트워크가 깊어질수록 high level feature를 뽑습니다. graph같은 경우에 multi-layer구조로 쌓게되면 초반 layer는 단순히 이웃노드 간의 관계에 대해서만 특징을 추출하지만, 네트워크가 깊어질수록 나와 간접적으로 연결된 노드의 영향력까지 고려된 특징을 추출할 수 있게 됩니다.

그렇다면, 위와 같은 특성을 가지려면 GNN은 어떻게 인풋 그래프에 대하여 연산을 해야하는지 알아보도록 하겠습니다.

Original Graph Neural Network

graph neural network는 Scarselli et al.의 The Graph Neural Network Model에서 처음 등장했습니다. GNN의 목적은 결국 이웃노드들 간의 정보를 이용해서 해당 노드를 잘 표현할 수 있는 특징 벡터를 잘 찾아내는 것입니다. 이렇게 찾아낸 특징 벡터를 통해 task를 수행할 수 있습니다(graph classification, node classification 등).

그림 7. GNN

GNN의 동작은 따라서 크게 두가지로 생각할 수 있습니다.

  1. propagation step - 이웃노드들의 정보를 받아서 현재 자신 노드의 상태를 업데이트 함
  2. output step - task 수행을 위해 노드 벡터에서 task output를 출력함

이를 수식으로 표현하면 아래와 같습니다.

[x_n = f_w(l_n, l_{co[n]}, x_{ne[n]}, l_{ne[n]})]

[o_n = g_w(x_n, l_n)]

이때, $l_n, l_{co[n]}, x_{ne[n]}, l_{ne[n]}$ 은 각각 n 노드의 라벨, n노드와 연결된 엣지들의 라벨, 이웃노드들의 states, 이웃노드들의 라벨입니다. 또한 $f_w$ 와 $o_w$ 는 각각 propagation function(논문에선 transition function 이라 표현함)와 output function입니다.

propagation function(transition function)은 이웃 노드들의 정보와 노드와 연결된 엣지정보들을 토대로 현재 자신의 노드를 표현합니다. 즉, d-차원의 공간에 이러한 인풋들을 받아서 맵핑하는 과정이라 생각할 수 있습니다. output function은 task 수행을 위해 학습을 통해 얻은 node feature을 입력으로 하여 output을 얻습니다. 예를 들어, node label classification 이라면 node label이 아웃풋이 될 것입니다.

Learning algorithm : Banach fixed point theorem

그렇다면 어떻게 학습이 이뤄질까요 ? 위에서 Motivation : GNN $\approx CNN$ 에서 Multi-layer를 GNN에 사용하면 얻는 이점은 layer가 깊어질수록 직접적으로 연결된 이웃 노드 이외에 멀리 있는 노드들의 영향력을 고려하여 현재 노드의 feature를 구성할 수 있다고 하였습니다.

이렇게 멀리있는 노드에서부터 현재 노드까지 정보가 전달되는 과정을 message passing이라고 합니다. message passing이란 개념은 GNN이 등장하고 난 이후에, Gilmer et al.의 “Neural message passing for quantumchemistry” 에서 등장하였습니다. 해당 논문은 여러 종류의 GNN 구조를 일반화하는 프레임워크를 message passing 이라는 것으로 제안한 논문입니다.

하지만, 초기 GNN은 multi-layer 구조가 아니기 때문에 불가능합니다. 따라서, Banach fixed point theorem에 따라, iterative method로 고정된 node feature를 찾습니다.

그림 8. Network obtained by unfolding the encoding network

$x_n$ 와 $o_n$ 이 어떤 수렴된 값을 가지려면, Banach fixed point theorem에 의하면 propagation function이 contraction map이어야 합니다.

$F_w$ is a contraction map with respect to the state, i.e., there exists $\mu$ , $0 \leq \mu \le 1$ , such that $|F_w(x, l) - F_w(y, l)| \leq \mu |x-y|$ holds for any x, y where $| \cdot |$ denotes a vectorial norm.

Contraction Map에 대한 개인적인 생각
사실 contraction map의 수학적인 이해가 완벽하게 되진 않았습니다. 그러나, 제가 생각하는 contraction map은 다음과 같습니다. 선형대수학에서 선형변환을 진행하면, m차원의 벡터가 n차원의 공간으로 맵핑이 됩니다. 이 때, 서로 다른 두 m차원의 벡터가 n차원의 공간으로 맵핑이 되었을 때, 두 벡터 사이의 거리가 줄어드는 방향이라면 이 맵핑 function은 contraction map입니다.

그렇다면 fixed point가 되려면, 즉 수렴된 node feature들은 contraction map에 의해 정의된 공간 안에서 존재하는 것이고, 어떻게 보면 node feature를 서치하는 범위가 작다라고 생각할 수 있습니다.

이러한 문제 때문에 추후 다양한 버전의 GNN은 이러한 제한된 가정을 두지 않고 우리가 딥러닝 네트워크 학습시 사용하는 방식으로 node feature 값을 찾습니다. 즉, node feature의 search space가 훨씬 넓어지는 것입니다.

다시 돌아와서, 그렇다면 iterative method식으로 수식을 전개하면 아래와 같이 전개할 수 있습니다.

[x_n(t+1) = f_w(l_n, l_{co[n]}, x_{ne[n]}(t), l_{ne[n]})]

[o_n(t) = g_w(x_n(t), l_n), \quad n \in N]

fixed 된 $x_n, o_n$ 을 얻으면 아래와 같은 loss를 계산할 수 있고, gradient 계산을 통해 weight을 업데이트합니다. 여기서 weight은 $F_w$ 의 파라미터 입니다. neural network 라면 network의 가중치가 됩니다.

Variants of GNNs

Scarselli의 GNN 이후로 여러 변형된 GNN이 많이 등장하였습니다. 초기 GNN은 학습 방식의 단점에 의해 수렴이 잘 되지 않는다는 문제가 있습니다. 이러한 문제를 해결하기 위해 초기 GNN 이후에 다양한 GNN이 등장하였습니다. 대표적으로 Graph Convolutional Network와 Gated Graph Neural Network 등이 있습니다.

다음 포스팅부터는 GNN이 더욱 더 유명해진 계기가 된 Graph Convolutional Network에 대해 다루도록 하겠습니다. 읽어주셔서 감사합니다.


  1. Zhou, Jie, et al. “Graph neural networks: A review of methods and applications.” arXiv preprint arXiv:1812.08434 (2018).
  2. What is graph?, https://ratsgo.github.io/data%20structure&algorithm/2017/11/18/graph/
  3. Scarselli, F., et al. “The Graph Neural Network Model.” IEEE Transactions on Neural Networks 1.20 (2009): 61-80.

TADA, Trend Alignment with Dual-Attention Multi-Task Recurrent Neural Networks for Sales Prediction 논문 리뷰

|

다변량 시계열 예측 모델에 관한 논문으로, 다변량 시계열 데이터를 가지고 encoder-decoder RNN 모델 기반으로 dual-attention과 multi-task RNN으로 구성된 모델입니다.


Problem

다변량 시계열 예측을 위한 여러 통계 기반 모델링이 있으나, 판매량에 영향을 주는 변수들 간의 관계를 파악하기 어렵고, 이 변수들로 부터 의미있는 정보(contextual information)을 추출하는 건 더욱 어렵습니다. 예를 들어, 겨울의복은 날씨에 영향에 두드러지게 받지만, 일반적인 셔츠는 사계절내내 잘 입는 옷이기 때문에 겨울의복보단 계절의 영향을 덜 받습니다. 또한 소비자의 주관적인 선호도(브랜드 선호도, 상품 선호도 등)에 따라 상품 판매는 크게 또한 달라지게 됩니다. 따라서, 본 논문에서 주목하는 다변량 시계열 예측에서의 문제는 아래와 같이 크게 세가지입니다.

  1. how to fully capture the dynamic dependencies among multiple influential factors?
    판매에 영향을 주는 여러 변수들 간의 관계는 시간에 따라 변할 가능성이 높습니다. 그렇다면 매 스텝마다 변수들 간의 관계를 어떻게 포착할 수 있을까요 ?
  2. how can we possibly glean wisdoms form the past to compensate for the unpredictability of influential factors?
    이 변수들이 미래에 어떻게 변할지는 아무도 모릅니다. 그렇다면 과거 이 변수들의 정보만을 가지고 어떻게 미래를 눈여겨 볼 수 있는 정보를 추출할지는 생각해 봐야 합니다.
  3. how to align the upcoming trend with historical sales trends?
    현실 시계에서의 판매 트랜드는 전혀 규칙적이지 않습니다. 그렇다면 과거 판매 트렌드를 어떻게 하면 현실 트렌드와 연관지을 수 있을까요 ?

TADA : Trend Alignment with Dual-Attention Multi-Task RNN

Problem Formulation

본 논문에서 풀고자 하는 다변량 시계열 예측 문제는 아래와 같이 수학적으로 정의됩니다.

[{\hat{y_t}}^{T+\triangle}{t=T+1} = F({{\mathbf x_t}}^{T}{t=1}, {{y_t}}^{T}_{t=1})]

$\mathbf x_t$ 는 influential factors로 판매량 이외의 변수(ex. 날씨, 브랜드, 상품인덱스 등)이고, $y_t$ 는 판매량 입니다.

TADA 모델 개요

그림 1. 모델 개요

위의 그림은 본 논문의 모델 개요입니다. 크게 아래와 같이 구성되어 있습니다.

  • Multi-task based Encoder Structures
  • Dual-Attention based Decoder Structures
    • Attention got weighted decoder input mapping
    • attention for trend alignment

Multi-task based Encoder Structures

그림 2. Multi-task based Encoder

influential factor의 semantic한 특징을 잘 추출한다면 분명 예측에 도움이 될 것입니다. 그러나 매 타임 스텝마다 어떻게 하면 판매량 예측에 도움될 semantic한 특징을 추출할 수 있을까요 ? 본 논문에서는 influential factor를 크게 intrinsic한 속성과 objective한 속성으로 나누어 LSTM을 이용한 인코딩을 각각 따로하였습니다. 이를 통해 각각 두 개의 LSTM(intrinsic LSTM, external LSTM)을 통해 각기 다른 semantic한 특징을 추출할 수 있습니다. 따라서, 위의 문제 정의는 아래와 같이 다시 정의될 수 있습니다.

[{\hat{y_t}}^{T+\triangle}{t=T+1} = F({{\mathbf x_t^{int}}}^{T}{t=1}, {{\mathbf x_t^{ext}}}^{T}{t=1}, {{y_t}}^{T}{t=1})]

intrinsic한 속성이란 브랜드, 카테고리, 가격등 상품과 관련된 것이고, objective한 속성은 날씨, 휴일유무, 할인등과 관련된 속성입니다. 아래 표는 논문에서 실험한 데이터의 intrinsic/objective 속성입니다.

하지만 우리가 구하고 싶은 건 두 가지의 다른 semantic한 feature를 적절하게 결합하여 의미있는 contextual vector를 만드는 것입니다. 따라서 또다른 LSTM 네트워크인 Synergic LSTM을 구축하여 joint representation을 학습합니다. 이때, Synergic LSTM에 입력으로 들어가는 건 각 타임스텝에 해당되는 $h_t^{int}$ 와 $h_t^{ext}$ 뿐만 아니라 판매량 $y_t$ 도 같이 joint space가 구축되도록 학습됩니다.

먼저, 두 타입스텝 t에서의 두 개의 hidden state을 $h_t^{int}$ 와 $h_t^{ext}$ 이용하여 Synergic LSTM의 인풋인 $\mathbf X_t^{syn}$ 을 아래와 같이 계산합니다.

[\mathbf X_t^{syn} = \mathbf W_{syn}[\mathbf h_t^{int};\mathbf h_t^{ext};y_t]]

그런 다음, intrinsic LSTM/external LSTM과 동일하게 각 타임스텝마다 두 정보가 결합되어 인코딩된 hidden stated인 $\mathbf h^{con}_t$ 를 계산합니다.

그림 3. Multi-task based Encoder(2)

Dual-Attention based Decoder Structures

Multi-task Encoder를 통해 과거 판매량 시계열 데이터를 인코딩하면 contextual vectors인 ${\mathbf h_t^{con}}^T_{t=1}$ 이 계산되어 나옵니다. $h_t^{con}$ 은 타임스텝 t까지의 시계열 데이터에 대한 contextual 정보를 품고 있습니다.

LSTM decoder도 encoder와 유사하게 예측에 필요한 contextual vector $\mathbf d_t^{con}$ 을 생성합니다. 따라서, $T < t \leq T + \Delta$ 에 대해 decoder 수학식은 아래와 같습니다.

[\mathbf d_t^{con} = LSTM^{dec}(\mathbf x_t^{dec}, \mathbf d^{con}_{t-1})]

위 식에서 $\mathbf x_t^{dec}$ 는 attention weighted input입니다. 그러면 contextual vector가 어떻게 만들어지는지 보기 전에 attention weighted input 계산 과정을 살펴봅시다.

Attention for Weighted Decoder Input Mapping

그림 4. Attention for Weighted Decoder Input

Decoder에 입력될 Input은 encoder contextual vector들에서 각 디코터 타임 스텝에 필요한 정보를 적절하게 취하도록 하기 위해 attention 메카니즘을 통해 생성합니다.

[\mathbf x_t^{dec} = \mathbf W_{dec}\left[\sum_{t’=1}^T \alpha_{tt’}^{int}\mathbf h_{t’}^{int};\sum_{t’=1}^T \alpha_{tt’}^{ext}\mathbf h_{t’}^{ext}\right] + \mathbf b_{dec}]

$\alpha_{tt’}^{int}$ 와 $\alpha_{tt’}^{ext}$ 는 어텐션 가중치를 의미합니다. 어텐션 가중치는 아래 과정을 통해 계산됩니다.

[e^{int}{tt’} = \mathbf v^{\mathrm T}{int}tanh(\mathbf M_{int}\mathbf d_{t-1}^{con} + \mathbf H_{int}\mathbf h_{t’}^{int})]

[e^{ext}{tt’} = \mathbf v^{\mathrm T}{ext}tanh(\mathbf M_{int}\mathbf d_{t-1}^{con} + \mathbf H_{ext}\mathbf h_{t’}^{ext})]

[\alpha_{tt’}^{int} = \frac{exp(e_{tt’}^{int})}{\sum_{s=1}^{T}exp(e_{ts}^{int})}]

[\alpha_{tt’}^{ext} = \frac{exp(e_{tt’}^{ext})}{\sum_{s=1}^{T}exp(e_{ts}^{ext})}]

이때, $\sum_{t’=1}^{T}\alpha_{tt’}^{int} = \sum_{t’=1}^{T}\alpha_{tt’}^{ext} = 1$ 이 여야 합니다.

Attention for Trend Alignment

그림 5. Attention for Trend Alignment

미래를 예측하기 위해선 과거의 trend 패턴을 안다면 좀 더 수월할 수 있습니다. 따라서, 미래에 예상되는 패턴과 유사한 패턴을 과거에서 찾는 작업을 attention을 통해 진행하는 과정을 본 논문에서 제안하였습니다. 그러나, 일반적으로 attention 메카니즘은 현 타임스텝에서 아웃풋을 출력하기 위해 이전 hidden state들중에서 가장 align되는 정보를 선택합니다. 과거 정보들 중에서 미래의 트렌드와 유사한 트렌드 정보를 선택적으로 이용하고 싶다면 전통적인 attention 메카니즘을 그대로 사용하기는 어렵습니다. 왜냐하면, 일반적인 데이터에선 trend외에 노이즈도 많이 포함하고 있기 때문입니다. 즉, 전체 데이터에 trend + noise라서 이전 모든 과거들에서 유사한 trend 패턴만을 집중하는 건 힘듭니다. 따라서 논문 저자는 아래와 같은 방법을 고안하였습니다.

먼저, ${\mathbf h_t^{con}}_{t=1}^T$ 를 $\triangle$ 타임 스텝 크기에 해당되는 contextual vector를 이어붙여서 $\triangle$ -step trend vector를 생성합니다.

$\mathbf p_i$ 는 과거 시계열 데이터에서 $\triangle$ 간격에 해당되는 구간의 트렌드를 나타냅니다. $i$ 가 1씩 증가하므로, 마치 슬라이딩 윈도우 1씩 움직이면서 트렌드를 포착하는 것과 유사합니다.

마찬가지 방식으로 decoder hidden state들을 이어 붙여 미래에 예상될 트렌드 정보를 생성합니다.

따라서, 그림5 처럼 과거에 생성된 트렌드 벡터과 미래 트렌드 벡터를 각각 내적하여 가장 큰 값에 해당되는 인덱스 i를 반환합니다. 내적값이 가장 크다는 것은 가장 유사함을 의미합니다.

[e_i^{trd} = \mathbf p_i^{\mathrm T} \tilde{\mathbf p}]

[i’ = argmax(e_i^{trd} , e_{i+1}^{trd},\dots, e_{T+\triangle -1}^{trd})]

그 다음 $\mathbf p_{i’}$ 내의 각 ${\mathbf d_t^{con}}$ 와 $\mathbf h_t^{con}$ 을 아래와 같은 계산과정을 거쳐서 $\tilde {\mathbf d}^{con}$ 을 생성합니다.

$\tilde {\mathbf d}^{con}$ 은 타임스텝 t에서의 과거 유사한 트렌드 정보에 집중하여 생성된 aligned contextual vector 입니다.

Sales Prediction and Model Learning

위에서 생성된 aligned contextual vector ${\widetilde {\mathbf d_t}^{con}}$ 를 가지고 판매량을 예측합니다.

[\hat y_t = \mathbf v_y^{\mathrm T} \mathbf {\widetilde d}^{con} + b_y]

$\hat y_t , \,\,(T+1 \leq t \leq T+\Delta)$ 는 타임스텝 T에서의 예측된 판매량입니다.

본 논문에서 학습은 L2 regularization과 함께 Mean Squared Error를 minimize하였습니다.

Experiment and result.

전체적인 결과에 관한 건 논문을 참고 바랍니다. trend alignment 부분에 대한 결과를 살펴보면 과거 유사하다고 찾은 trend와 예측된 trend는 아래 그래프와 같이 나왔습니다.

보면 어느정도 과거 매치된 트렌드와 유사한 트렌드를 따르는 것을 확인할 수 있었습니다.

Lessons Learned

  • 결과를 보면 어느정도 lag가 발생하는 것으로 보입니다.
  • trend에 대한 파악을 먼저하고 판매량 데이터 입력을 나중에 하면 어떨까?
  • dual-stage attention에서의 input attention 모듈과 multi-tasked encoder를 결합하는 건 어떨까 ??

이상으로 본 논문 리뷰를 마치겠습니다.


  1. Chen, Tong, et al. “Tada: trend alignment with dual-attention multi-task recurrent neural networks for sales prediction.” 2018 IEEE International Conference on Data Mining (ICDM). IEEE, 2018.

A Dual-Stage Attention-Based Recurrent Neural Network for Time-Series Prediction 논문 리뷰

|

A Dual-Stage Attention-Based Recurrent Neural Network는 다변량 시계열 예측 모델입니다(Multi-Variate Time Series Prediction). Bahdanau et al.의 Attention network 기반 시퀀스 모델 을 베이스로, 인코더 뿐만 아니라 디코더에도 Attention netowork를 이용해 예측을 위한 다변량 시계열 변수 간 상대적인 중요도와 타임 스텝 간 상대적인 중요도를 모두 고려한 모델입니다.

Problem

RDD, Resilient Distributed Dataset에 대하여[3] - RDD액션, RDD 데이터 불러오기와 저장하기, 공유변수

|

이번 포스팅은 지난 포스팅 <RDD, Resilient Distributed DataSet에 대하여[2] - RDD기본액션, RDD트랜스포메이션> 에 이어서 진행하도록 하겠습니다. 교재는 빅데이터 분석을 위한 스파크2 프로그래밍을 참고하였습니다.


2.1.6 RDD 액션

RDD트랜스포메이션 연산은 느긋한 평가(lazy evaluation) 또는 지연 계산 방식을 따릅니다. 이는 계산에 필요한 정보를 누적하다가 계산이 필요한 시점이 돼서야 계산을 수행하는 방식을 뜻합니다. 여기서 계산이 필요한 시점은 RDD 액션 메서드가 호출된 시점입니다. RDD 액션 메서드가 호출이 되어야 비로소 RDD 트랜스포메이션 연산이 수행되게 됩니다.

1. 출력 연산

1.1. first

  • RDD 요소 중 ,첫번째 요소를 돌려줌
>>> rdd = sc.parallelize(range(50))
>>> result = rdd.first()
>>> print(result)
0

1.2. take

  • RDD 요소중, n번째까지 요소를 돌려줌
>>> rdd = sc.parallelize(range(50))
>>> result = rdd.take(5)
>>> print(result)
[0, 1, 2, 3, 4]

1.3. takeSample

  • 지정된 크기의 sample을 추출해서 리스트, 배열 타입등으로 반환함
  • sample 메서드와의 차이점
    • sample 메서드는 RDD 트랜스포메이션 메서드이고, 크기를 지정할 수 없음.
  • takeSample(withReplacement, num, seed=None)
>>> rdd = sc.parallelize(range(100))
>>> result = rdd.takeSample(False, 3)
>>> result
[55, 23, 45]

1.5. countByValue

  • RDD의 요소들이 나타낸 횟수를 맵 형태로 돌려주는 메서드
>>> rdd = sc.parallelize([1,1,3,2,1,2,2,1,1,4,5,3,2,3])
>>> result = rdd.countByValue()
>>> print(result)
defaultdict(<class 'int'>, {1: 5, 3: 3, 2: 4, 4: 1, 5: 1})

1.6. reduce

  • reduce 메서드 인자는 함수가 들어감.
  • 그 함수는 교환법칙과 결합법칙이 성립하는 함수여야 함.
  • 따라서, 메서드 인자로 받은 함수를 이용해서 하나의 요소로 합치는 메서드임.
  • def reduce(f: (T,T)=>T):T
    • 동일한 타입 2개를 입력으로 받아, 같은 타입으로 반환해주는 메서드임
  • 실제 구현은 파티션단위로 나눠져서 처리됨. 분산 프로그램이기 때문임.
>>> from operator import add
>>> add(1,2)
3
>>> sc.parallelize([1,2,3,4,5]).reduce(add)
15

1.7. fold

  • reduce와 동일하나, 초기값을 설정할 수 있음
  • def fold(zeroValue: T)(op: (T,T)=>T):T
  • 그런데 유의할 점은 파티션단위로 나뉘어서 처리하기 때문에, 파티션단위로 처리할 때마다 초깃값을 이용하여 연산이 수행됨. 따라서, 더하기 연산을 할 땐 항등원인 0을, 곱셈 연산을 할 땐 항등원인 1을 초깃값으로 설정하는 것이 좋음
>>> rdd = sc.parallelize(range(1,11), 3)
>>> rdd.fold(1, add)
59 #값이 55가 아니라 59가 나오는 것을 확인할 수 있음. 
reduce와 fold차이
#product.py
class Product:
    def __init__(self, price):
        self.price = price
        self.count = 1
def addPriceandCount(p1, p2):
    p1.price += p2.price
    p1.count += 1
    return p1 #return을 p1인 이유 --> 입력값과 출력값의 타입이 동일해야 함.

if __name__ =='__main__':
    conf = SparkConf()
    conf.set("spark.driver.host", "127.0.0.1")
    sc = SparkContext(master="local[*]", appName="RDDOpSample", conf=conf)

    rdd = sc.parallelize([Product(300), Product(200), Product(100)], 10)

    #reduce
    result = rdd.reduce(addPriceandCount)
    print(result.price, result.count)

    #fold
    result = rdd.fold(Product(0), addPriceandCount)
    print(result.price, result.count)
  • fold의 count합을 보면 11인 것을 알 수 있음. 그 이유는 위에서 파티션 개수를 10으로 지정하였고, 파티션 단위로 연산을 초기값을 이용하여 연산을 수행하기 때문임

1.8. aggregate

  • 입력와 출력의 타입이 다른 경우 사용 가능
  • def aggregateU(seqOp:(U,T)=>U, combOp:(U,U)=>U):U
    • 크게 세가지 인자를 받음. 첫번째는 초깃값으로 fold와 동일
    • aggregate은 병합을 크게 2단계로 구성되는데, 1단계는 seqOp에 의해, 2단계는 combOp에 의해 진행됨
    • seqOp는 초깃값과 동일한 타입(U)과 RDD요소 타입(T)가 입력되어 병합 결과 초깃값과 동일한 타입인 U가 반환됨
    • combOp는 최종병합에서 사용됨
#rdd에 속한 요소들의 평균을 aggregate을 이용하여 구하는 예제
#record.py
class Record:

    def __init__(self, amount, number=1):
        self.amount = amount
        self.number = number

    def addAmt(self, amount):
        return Record(self.amount + amount, self.number + 1)

    def __add__(self, other):
        amount = self.amount + other.amount
        number = self.number + other.number
        return Record(amount, number)

    def __str__(self):
        return "avg:" + str(self.amount / self.number)

    def __repr__(self):
        return 'Record(%r, %r)' % (self.amount, self.number)
def seqop(r,v):
    return r.addAmt(v)

if __name__ =='__main__':
    conf = SparkConf()
    conf.set("spark.driver.host", "127.0.0.1")
    sc = SparkContext(master="local[*]", appName="RDDOpSample", conf=conf)

    rdd = sc.parallelize([100,80,75,90,95], 3)

    #aggregate
    result = rdd.aggregate(Record(0,0), seqop, lambda r1, r2:r1+r2)
    print(result) # avg:88.0

1.9. sum

  • 모든 요소의 합을 구해주며, Double, Long등 숫자타입인 경우에만 사용가능
>>> rdd = sc.parallelize(range(1,11))
>>> rdd.sum()
55

1.10. foreach, foreachPartition

  • foreach는 RDD의 개별요소에 전달받은 함수를 적용하는 메서드이고, foreachPartition은 파티션 단위로 적용됨
  • 이때 인자로 받는 함수는 한개의 입력값을 가지는 함수임
  • 이 메서드를 사용할 때 유의할 점은 드라이버 프로그램(메인 함수를 포함하고 있는 프로그램)이 작동하고 있는 서버위가 아니라 클러스터의 각 개별 서버에서 실행된다는 것
  • 따라서 foreach() 인자로 print함수를 전달한다는 것은 각 서버의 콘솔에 출력하라는 의미가 됨.
def sideEffect(values):
    print("partition side effect")
    for v in values:
        print("value side effect : %s" %v)

if __name__ =='__main__':
    conf = SparkConf()
    conf.set("spark.driver.host", "127.0.0.1")
    sc = SparkContext(master="local[*]", appName="RDDOpSample", conf=conf)

    rdd = sc.parallelize(range(1,11),3)
    result = rdd.foreach(lambda v:print("value side effect: %s" %v))
    result2 = rdd.foreachPartition(sideEffect)
###
value side effect: 2
value side effect: 3
value side effect: 4
value side effect: 5
value side effect: 6
value side effect: 7
value side effect: 8
value side effect: 9
value side effect: 10
partition side effect
value side effect : 7
value side effect : 8
value side effect : 9
value side effect : 10
partition side effect
value side effect : 4
value side effect : 5
value side effect : 6
partition side effect
value side effect : 1
value side effect : 2
value side effect : 3
###

1.11. toDebugString

  • 디버깅을 위한 메서드. RDD파티션 개수나 의존성 정보 등 세부 정보 알고 싶을 때 사용
>>> rdd = sc.parallelize(range(1,100), 10).persist().map(lambda v:(v,1)).coalesce(2)
>>> rdd.toDebugString()
b'(2) CoalescedRDD[65] at coalesce at NativeMethodAccessorImpl.java:0 []\n |  PythonRDD[64] at RDD at PythonRDD.scala:53 []\n |  PythonRDD[63] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[62] at parallelize at PythonRDD.scala:195 []'

1.12. cache, persist, unpersist

  • rdd액션 연산이 수행될때마다 RDD 생성 히스토리를 이용해 복구하는 단계를 수행하지만 너무나 번거로움
  • 따라서 반복적으로 사용되는 RDD인 경우 메모리에 저장해서 사용함
  • cache와 persist는 rdd정보를 메모리 또는 디스크에 저장해서 다음 액션을 수행 시 다시 rdd를 생성하는 단계를 거치지 않음
  • unpersist는 저장된 메모리가 더이상 필요없을 시 취소할 때 사용

RDD 데이터 불러오기와 저장하기

스파크는 하둡 API기반이라서 다양한 데이터 포맷과 파일을 지원합니다.

  • 파일 포맷 : 텍스트파일, JSON, 하둡의 시퀀스파일, csv
  • 파일 시스템 : 로컬 파일 시스템, 하둡파일시스템(HDFS), AWS의 S3, 오픈스택의 Swift등
    • 파일시스템이란 ? 컴퓨터에서 파일이나 자료를 쉽게 발견할 수 있도록 유지 관리하는 방법임. 즉, 저장매체에는 많은 파일이 있으므로, 이러한 파일을 관리하는 방법을 말함. 파일을 빠르게 읽기, 쓰기, 삭제 등 기본적인 기능을 원활히 수행하기 위한 목적임

1. 텍스트 파일

rdd = sc.textFile("file:////Users/ralasun/Desktop/ralasun.github.io/_posts/2020-07-11-introRL(1).md")
>>> rdd.collect()
['---', 'layout : post', 'title: Reinforcement Learning 소개[1]', 'category: Reinforcement Learning', 'tags: cs234 reinforcement-learning david-silver sutton', '---', '', '이번 포스팅은 강화학습이 기존에 알려진 여러 방법론들과의 비교를 통한 강화학습 특성과 구성요소를 다룹니다. ...```
  • “file:///”처럼 ///를 세개 작성해야 함. HDFS와 구별하기 위해서임
  • 또한 클러스터내 각 서버에서 동일한 경로를 통해 지정한 파일에 접근이 가능해야 함
  • sc.textFile(path, n)에서, n을 통해 파티션 개수 정할 수 있음
#save
rdd.saveAsTextFile("<path_to_save>/sub1")

#save(gzip)
rdd.saveAsTextFile("<path_to_save>/sub1", codec)
  • 위와 같이 rdd를 text파일로도 저장이 가능함. 두번째는 압축을 사용하는 방법임

2. 오브젝트 파일

  • 오브젝트 직렬화 방법으로 RDD를 저장함. python의 경우, pickle형태로 저장함
  • 텍스트파일도 가능함
>>> rdd = sc.parallelize(range(1,1000),3)
>>> rdd.saveAsPickleFile("/Users/ralasun/Desktop/pythonpickle.pkl")
>>> rdd2 = sc.pickleFile("/Users/ralasun/Desktop/pythonpickle.pkl")       
>>> rdd2.take(2)
[667, 668]

3. 시퀀스 파일

  • 시퀀스파일이란, 키와 값으로 구성된 데이터를 저장하는 이진 파일 포맷으로, 하둡에서 자주 사용됨
  • 오브젝트 파일과의 차이점은 오브젝트 파일은 RDD에 포함된 각 데이터가 serializable 인터페이스를 구현하고 있어야 하는 것처럼 시퀀스 파일로 만들고 싶은 RDD가 하둡의 writable 인터페이스를 구현하고 있어야 함.
  • saveAsNewAPIHadoopFile(path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None)
    • sequence파일로 저장하기 위해선 outputFormatClass에 문자열의 형태로 하둡내 시퀀스포맷의 풀네임을 작성해야 함. keyclass와 valueclass도 마찬가지임. 이렇게 하는 이유는 하둡의 writable 인터페이스를 구현해야 할 객체가 필요하기 때문임.
    • 따라서 내부에서는 keyclass와 valueclass 인자에 전달한 포맷으로 rdd를 변환한 뒤 sequencefile포맷으로 저장하는 작업을 거치는 것임
 path = "/Users/ralasun/Desktop/ppkl"
>>> outputFormatClass = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"
>>> inputformatClass = "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"
>>> keyClass = "org.apache.hadoop.io.Text"
>>> valueClass = "org.apache.hadoop.io.IntWritable"
>>> rdd = sc.parallelize(["a","b","c","b","c"])
>>> rdd2 = rdd.map(lambda x:(x,1))
>>> rdd2.collect()
[('a', 1), ('b', 1), ('c', 1), ('b', 1), ('c', 1)]
>>> rdd2.saveAsNewAPIHadoopFile(path, outputFormatClass, keyClass, valueClass)
rdd3 = sc.newAPIHadoopFile(path, inputformatClass, keyClass, valueClass)
>>> for k, v in rdd3.collect():
...     print(k,v)
... 
a 1
b 1
b 1
c 1
c 1

클러스터 환경에서의 공유 변수

클러스터 환경에서 하나의 잡을 수행하기 위해 다수의 서버가 여러 개의 프로세스를 실행합니다. 따라서, 여러 프로세스가 공유할 수 있는 자원을 관리(읽기/쓰기 자원)할 수 있도록 스파크는 지원하는데, 브로드캐스트 변수와 어큐뮬레이터라 합니다.

브로드캐스트 변수(Broadcast Variables)
  • 스파크 잡이 실행되는 동안 클러스터 내의 모든 서버에서 공유할 수 있는 읽기전용 자원을 설정할 수 있는 변수임
  • 예를 들어, 온라인 쇼핑몰에서 사용자 ID와 구매 정보가 담긴 10TB짜리 로그를 분석할 때, 우리가 찾고자 하는 사용자 ID목록이 담긴 세트 컬렉션 타입의 데이터를 공유 변수로 설정해 각 서버에서 로그를 처리하면서 현재 처리하려는 로그가 우리가 찾고자 하는 로그가 맞는지 확인하는 용도로 사용 가능함
>>> bu = sc.broadcast(["u1","u2"])
#1. sparkcontext의 broadcast인자를 이용해서 broadcast변수 생성

>>> rdd = sc.parallelize(["u1","u2","u3","u4","u5","u6"],3)
>>> result = rdd.filter(lambda v: v in bu.value)
#2. broadcast변수 요소 접근시 value매서드를 이용

>>> result.collect()
['u1', 'u2']
어큐뮬레이터(Accumulators)
  • 어큐뮬레이터는 쓰기 동작을 위한 것임
  • 예를 들어, 온라인 쇼핑몰에서 사용자 접속 로그파일을 각 서버에서 취합해서 분석하는 경우임
  • 또한 다수의 서버로 구성된 클러스터 환경에서 오류가 발생 했을 시, 어느 프로세스에서 오류가 난건지 확인이 필요함. 그러기 위해선 에러 정보를 한곳에 모아서 볼 수 있는 방법이 필요함.
  • 어큐뮬레이터는 이렇게 클러스터내의 모든 서버가 공유하는 쓰기 공간을 제공해서, 각 서버에서 발생하는 이벤트나 정보를 모아두는 용도로 사용함.
#accumulator 기본 예제
def accumulate(v, acc):
    if(len(v.split(":")) !=2):
        acc.add(1)

if __name__ =='__main__':
    conf = SparkConf()
    conf.set("spark.driver.host", "127.0.0.1")
    sc = SparkContext(master="local[*]", appName="RDDOpSample", conf=conf)

    acc1 = sc.accumulator(0)
    data = ["U1:Addr1", "U2:Addr2", "U3", "U4:Addr4", "U5:Addr5","U6:Addr6", "U7"]
    rdd = sc.parallelize(data)
    rdd.foreach(lambda v : accumulate(v, acc1))
    print(acc1.value)
  • 파이썬의 경우 어큐뮬레이터의 이름 지정 불가능함
  • 기본 제공하는 어큐뮬레이터는 sparkcontext의 accumulator 메서드를 이용하는데, 초깃값으로 정수, 실수, 복소수 타입중 하나여야 함. 따라서, 사용자 정의 데이터 타입에 대한 어큐뮬레이터는 아래와 같이 사용해야 함.
from pyspark import AccumulatorParam
from record import Record
from builtins import isinstance

class RecordAccumulatorParam(AccumulatorParam):

    def zero(self, initialValue):
        return Record(0)

    def addInPlace(self, v1, v2):
        if(isinstance(v2, Record)):
            return v1+v2
        else:
            return v1.addAmt(v2)

def accumulate(v, acc):
    if(len(v.split(":"))!=2):
        acc.add(1)

if __name__ =='__main__':
    conf = SparkConf()
    conf.set("spark.driver.host", "127.0.0.1")
    sc = SparkContext(master="local[*]", appName="RDDOpSample", conf=conf)

    acc = sc.accumulator(Record(0), RecordAccumulatorParam())
    data = ["U1:Addr1", "U2:Addr2", "U3", "U4:Addr4", "U5:Addr5","U6:Addr6", "U7"]
    rdd = sc.parallelize(data)
    rdd.foreach(lambda v: accumulate(v, acc))
    print(acc.value.amount) #>> 2
#AccumulatorParam에 대한 pyspark Document

class pyspark.AccumulatorParam
# Helper object that defines how to accumulate values of a given type.

	addInPlace(value1, value2)
	# Add two values of the accumulator’s data type, returning a new value; for efficiency, can also update value1 in place and return it.

	zero(value)
	# Provide a “zero value” for the type, compatible in dimensions with the provided value (e.g., a zero vector)
  • Recordclass타입에 대한 accumulator를 작성한 것임.

  • 어큐뮬레이터 사용시 주의할 점 두 가지

      1. 어큐뮬레이터를 증가시키는 동작은 클러스터 내 모든 서버에서 가능하나, 어큐뮬레이터 내 데이터를 읽는 동작은 드라이버 프로그램 내에서만 가능
        • transformation 또는 action 연산 내부에서는 어큐뮬레이터를 증가시킬 수 있으나, 그 값을 참조해서 사용은 불가능하다는 것을 뜻함.
        • 어큐뮬레이터는 액션 메서드 내에서만 수행하는 것이 좋음. 트렌스포메이션은 여러번 수행될 수 있기 때문에 집계가 잘못될 수 있음

이상으로 본 포스팅을 마치겠습니다.

RDD, Resilient Distributed Dataset에 대하여[2] - RDD기본액션, RDD트랜스포메이션

|

이번 포스팅은 지난 포스팅 <RDD, Resilient Distributed DataSet에 대하여[1]> 에 이어서 진행하도록 하겠습니다. 교재는 빅데이터 분석을 위한 스파크2 프로그래밍을 참고하였습니다.


2. RDD

2.1.1 들어가기에 앞서

지난 포스팅 2-1. RDD Resilient Distributed Dataset에 대하여 에서 다뤘습니다.

2.1.2. 스파크컨텍스트 생성

  • 스파크 컨텍스트는 스파크 애플리케이션과 클러스터의 연결을 관리하는 객체임
  • 따라서, 스파크 애플리케이션을 사용하려면 무조건 스파크 컨텍스트를 생성하고 이용해야 함
  • RDD 생성도 스파크컨텍스트를 이용해 생성 가능함
val conf = new SparkConf().setMaster(“local[*]”).setAppName(“RDDCreateSample”)
val sc = new SparkContext(conf)
[예] scala - sparkcontext 생성

 

 sc = SparkContext(master=“local[*], appName=“RDDCreateTest”, conf=conf)
[예] python - spark context 생성

 

  • 스파크 동작에 필요한 여러 설정 정보 지정 가능함
  • SparkConf(), conf=conf 부분에서 config을 통과시켜서 지정 가능함
  • 지정해야 하는 정보 중에, master 정보와 appName 정보는 필수 지정 정보임
    • master 정보란 ? 스파크가 동작할 클러스터의 마스터 서버를 의미하는 것. 로컬모드에서 local, local[3], local[*]와 같이 사용. [*]는 쓰레드 개수를 의미하며, *는 사용 가능한 모든 쓰레드를 이용하겠다는 이야기임
    • appName은 애플리케이션 이름으로, 구분하기 위한 목적임. 스파크 UI화면에 사용됨

2.1.3. RDD생성

  • RDD를 생성하는 방법은 크게 2가지임
  1. 드라이버 프로그램의 컬렉션 객체 이용
    • 자바 or 파이썬 ? 리스트 이용, 스칼라 ? 시퀀스타입 이용
    • 드라이버 프로그램?
      - 최초로 메인 함수를 실행해 RDD등을 생성하고 각종 연산을 호출하는 프로그램
      - 드라이버 내의 메인 함수는 스파크 애플리케이션과 스파크 컨텍스트 객체를 생성함
      - 스파크 컨텍스트를 통해 RDD의 연산 정보를 DAG스케쥴러에 전달하면 스케쥴러는 이 정보를 가지고 실행 계획을 수립한 후 이를 클러스터 매니저에게 전달함
    • 
      val rdd1 = sc.parallelize(List("a","b","c","d","e"))
      [예] scala - rdd 생성
       

      [예] python - 드라이버의 컬렉션 객체를 이용한 RDD 생성

    • 문자열을 포함한 컬렉션 객체 생성 example) python : ['a','b','c','d']
    • parallelize() 메서드를 이용해 RDD 생성
      - RDD의 파티션 수를 지정하고 싶을 때, parallelize() 메서드의 두 번째 매개변수로 파티션 개수 지정 가능
    • 
      val rdd1 = sc.parallelize(1 to 1000, 10)
      
  2. 외부 데이터를 읽어서 새로운 RDD를 생성
    • 기본적으로 하둡의 다루는 모든 입출력 유형 가능
    • 내부적으로 하둡의 입출력을 사용하기 때문임

    [예] python - 외부데이터를 이용한 RDD 생성

2.1.4 RDD 기본 액션

기본 액션 연산의 종류에 대해 알아보도록 하겠습니다.

1. collect

  • collect은 RDD의 모든 원소를 모아서 배열로 리턴
  • 반환 타입이 RDD가 아닌 배열이므로 액션 연산
  • RDD에 있는 모든 요소들이 서버의 메모리에 수집됨. 즉, 대용량 데이터를 다룰 땐 조심하고, 주로 작은 용량의 데이터 디버깅용으로 사용함

[예] python - collect 연산

2. count

  • RDD 구성하는 전체 요소 개수 반환

2.1.5 RDD 트랜스포메이션

기존 RDD를 이용해 새로운 RDD를 생성하는 연산입니다.

1. Map 연산

  • RDD에 속하는 모든 요소에 적용하여 새로운 RDD 생성하는 연산
  • RDD의 몇몇 연산은 특정 데이터 타입에만 적용 가능함

1.1. map

  • 하나의 인자를 받는 함수 자체가 map의 인자로 들어감
  • 이 함수를 이용해 rdd의 모든 요소에 적용한 뒤 새로운 RDD 리턴

[예] python - map 연산

  • map()에 전달되는 함수의 입력 데이터 타입과 출력 데이터 타입이 일치할 필요 없음. 문자열을 입력받아 정수로 반환하는 함수 사용 가능

[예] python - map 연산, 입력/출력 일치하지 않는 경우

1.2. flatMap

  • map()과 마찬가지로, 하나의 인자를 받는 함수가 flatMap의 인자로 들어감
  • map()과 차이점은 각 함수의 인자가 반환하는 값의 타입이 다름
  • flatMap()에 사용하는 함수 f는 반환값으로 리스트나 시퀀스 같은 여러 개의 값을 담은 (이터레이션이 가능한) 일종의 컬렉션과 유사한 타입의 값을 반환해야 함
    • map[U](f:(T) -> U):RDD[U]
    • flatMapU:RDD[U])
      • TraversableOnce는 이터레이터 타입을 의미
map()과 flatMap() 차이점 예시

[예] python - map 연산 vs. flatMap 연산

  • map연산은 문자열의 배열로 구성된 RDD를 생성함
  • 각 요소의 문자열(T)이 단어가 포함된 배열(U)이기 때문임
  • 반면, flatMap 연산은 문자열로 구성된 RDD를 생성함
  • TraversableOnce(U)이기 때문에 문자열의 배열 내의 요소가 모두 끄집어져 나오는 작업을 하게 됨
  • flatMap()은 하나의 입력값(“apple, orange”)에 대해 출력 값이 여러개인 경우([“apple”, “orange”]) 유용하게 사용할 수 있음

1.3. mapPartitions

  • map과 flatMap은 하나의 인자만을 받는 함수가 인자로 들어가지만, mapPartitions은 여러 인자를 받는 함수가 인자로 들어갈 수 있음 ex) 이터레이터를 인자로 받는 함수
  • mapartitions은 인자로 받은 함수가 파티션 단위로 적용하여 새로운 RDD를 생성함. 반면에, map과 flatMap은 인자로 받은 함수가 요소 한개 단위로 적용됨

[예] python - mapPartitions

  • sc.parallelize(range(1,11),3)으로 파티션 3개로 나뉨
  • DB 연결!!! 가 세번 출력된 걸 보니 파티션 단위로 처리한 것을 확인할 수 있음
  • increase함수는 각 파티션 내의 요소에 대한 이터레이터를 전달받아 함수 내부에서 파티션의 개별 요소에 대한 작업을 처리하고 그 결과를 다시 이터레이터 타입으로 반환

1.4. mapPartitionsWithIndex

  • mapPartions와 동일하고 다른 점은 인자로 전달되는 함수를 호출할 때 파티션에 속한 요소의 정보뿐만 아니라 해당 파티션의 인덱스 정보도 함께 전달해 준다는 것임
def IncreaseWithIndex(idx, numbers):
	for i in numbers:
		if(idx == 1):
			yield i+1
  • mapPartitionswithIndex에 인자로 들어갈 함수는 위와 같이 인덱스 정보도 같이 들어감

[예] python - mapPartitionsWithIndex

1.5. mapValues

  • RDD의 모든 요소들이 키와 값의 쌍을 이루고 있는 경우에만 사용 가능한 메서드이며, 인자로 전달받은 함수를 “값”에 해당하는 요소에만 적용하고 그 결과로 구성된 새로운 RDD를 생성
rdd1 = sc.parallelize(["a","b","c"]).map(lambda v:(v,1)) //(키,값)으로 구성된 rdd생성
rdd2 = rdd1.mapValues(lambda i:i+1)
print(rdd2.collect())

[예] python - mapValues

1.6. flatMapValues

  • MapValues 처럼 키에 해당되는 값에 함수를 적용하나 flatMap() 연산을 적용할 수 있음
rdd1 = sc.parallelize([(1, "a,b"),(2, "a,c"),(3, "d,e")])
rdd2 = rdd1.flatMapValues(lambda v:v.split(','))
rdd2.collect()

[예] python - flatMapValues

2. 그룹화 연산

  • 특정 조건에 따라 요소를 그룹화하거나 특정 함수 적용

2.1. zip

  • 두 개의 서로 다른 RDD를 각 요소의 인덱스에 따라 첫번째 RDD의 ‘인덱스’번째를 키로, 두번째 RDD의 ‘인덱스’번째를 값으로 하는 순서쌍을 생성
  • 두 개 RDD는 같은 개수의 파티션과 각 파티션 당 요소개수가 동일해야 함
rdd1 = sc.parallelize(["a","b","c"])
rdd2 = sc.parallelize([1,2,3])
rdd3 = rdd1.zip(rdd2)
rdd3.collect()
>>> [('a', 1), ('b', 2), ('c', 3)]
rdd1 = sc.parallelize(range(1,10),3)
rdd2 = sc.parallelize(range(11,20),3)
rdd3 = rdd1.zip(rdd2)
rdd3.collect()
>>> [(1, 11), (2, 12), (3, 13), (4, 14), (5, 15), (6, 16), (7, 17), (8, 18), (9, 19)]

2.2. zipPartitions

  • zip()과 다르게 파티션의 개수만 동일하면 됨
  • zipPartitions()은 최대 4개 RDD까지 인자로 넣을 수 있음
  • 파이썬 사용불가!!

2.3. groupBy

  • RDD의 요소를 일정한 기준에 따라 그룹을 나누고, 각 그룹으로 구성된 새로운 RDD를 생성함
  • 각 그룹은 키와 각 키에 속한 요소의 시퀀스(iterator)로 구성됨
  • 인자로 전달하는 함수가 각 그룹의 키를 결정하는 역할을 담당함
>>> rdd1 = sc.parallelize(range(1,11))
>>> rdd2 = rdd1.groupBy(lambda v: "even" if v%2==0 else "odd") 
/// groupBy에 인자로 전달된 함수에 의해 키(even/odd) 결정

print(rdd2.collect())
>>> [('even', <pyspark.resultiterable.ResultIterable object at 0x7fbc3f090e80>), ('odd', <pyspark.resultiterable.ResultIterable object at 0x7fbc3f090470>)]
/// 각 키에 해당하는 값은 iterator임을 확인할 수 있음

>>> for x in rdd2.collect():
...     print(x[0], list(x[1]))
... 
even [2, 4, 6, 8, 10]
odd [1, 3, 5, 7, 9]

2.3. groupByKey

  • 이미 키와 값의 쌍으로 구성된 RDD에만 적용 가능함
>>> rdd1 = sc.parallelize(["a","b","c","b","c"]).map(lambda v:(v,1))
>>> rdd1.collect()
[('a', 1), ('b', 1), ('c', 1), ('b', 1), ('c', 1)]
/// (키,값) 쌍으로 구성된 RDD 생성

>>> rdd2 = rdd1.groupByKey()
>>> rdd2.collect()
[('b', <pyspark.resultiterable.ResultIterable object at 0x7fbc3f0ab6a0>), ('c', <pyspark.resultiterable.ResultIterable object at 0x7fbc3f0ab6d8>), ('a', <pyspark.resultiterable.ResultIterable object at 0x7fbc3f0ab0b8>)]
/// 키에 따라 그룹화함. 그 결과 키에 해당하는 시퀀스 생성

>>> for x in rdd2.collect():
...     print(x[0], list(x[1]))
... 
b [1, 1]
c [1, 1]
a [1]

2.4. cogroup

  • 이미 키와 값의 쌍으로 구성된 RDD에만 적용 가능함
  • 여러 개의 RDD를 인자로 받음(최대 3개)
  • 여러 RDD에서 동일한 키에 해당하는 요소들로 구성된 시퀀스를 만든 후, (키, 시퀀스)의 튜플을 구성. 그 튜플들로 구성된 새로운 RDD를 생성함
  • Tuple(키, Tuple(rdd1요소들의 집합, rdd2요소들의 집합, …))
>>> rdd1 = sc.parallelize([("k1","v1"),("k2","v2"),("k1","v3")])
>>> rdd2 = sc.parallelize([("k1","v4")])
>>> rdd1.collect()
[('k1', 'v1'), ('k2', 'v2'), ('k1', 'v3')]
>>> rdd2.collect()
[('k1', 'v4')]
/// (키, 값)쌍으로 구성된 RDD 2개 생성

>>> rdd3 = rdd1.cogroup(rdd2)
>>> rdd3.collect()
[('k1', (<pyspark.resultiterable.ResultIterable object at 0x7fbc3f0b2fd0>, <pyspark.resultiterable.ResultIterable object at 0x7fbc3f00a828>)), ('k2', (<pyspark.resultiterable.ResultIterable object at 0x7fbc3f00ac18>, <pyspark.resultiterable.ResultIterable object at 0x7fbc3f00a5f8>))]
///k1 키에 대해 rdd1의 v1과 v3요소를 묶고, rdd2의 v4요소를 묶어서 튜플로 구성

>>> for x in rdd3.collect():
...     print(x[0], list(x[1][0]), list(x[1][1]))
... 
k1 ['v1', 'v3'] ['v4']
k2 ['v2'] []

3. 집합 연산

  • RDD에 포함된 요소를 하나의 집합으로 간주하여 집합 연산을 수행(합/교집합)

3.1. distinct

  • RDD의 원소에서 중복을 제외한 요소로만 새로운 RDD 구성
>>> rdd = sc.parallelize([1,2,3,1,2,3,1,2,3])
>>> rdd2 = rdd.distinct()
>>> rdd2.collect()
[1, 2, 3]

3.1. cartesian

  • 두 RDD요소의 카테시안곱을 구하고 그 결과를 요소로 하는 새로운 RDD구성
>>> rdd1 = sc.parallelize([1,2,3])
>>> rdd2 = sc.parallelize(['a','b','c'])
>>> rdd3 = rdd1.cartesian(rdd2)
>>> rdd3.collect()
[(1, 'a'), (1, 'b'), (1, 'c'), (2, 'a'), (2, 'b'), (2, 'c'), (3, 'a'), (3, 'b'), (3, 'c')]

3.2. subtract

  • rdd1.subtract(rdd2) : (rdd1의 요소집합 - rdd2의 요소집합)의 차집합
  • rdd2.subtract(rdd1) : (rdd2의 요소집합 - rdd1의 요소집합)의 차집합
>>> rdd1 = sc.parallelize(["a", "b","c","d","e"])
>>> rdd2 = sc.parallelize(["d","e"])
>>> rdd3 = rdd1.subtract(rdd2)
>>> rdd3.collect()
['a', 'b', 'c']  

3.2. union

  • 두 RDD요소의 합집합
>>> rdd1 = sc.parallelize(["a", "b","c","d","e"])
>>> rdd2 = sc.parallelize(["d","e"])
>>> rdd3 = rdd1.union(rdd2)
>>> rdd3.collect()
['a', 'b', 'c', 'd', 'e', 'd', 'e']

3.3. intersection

  • 두 RDD요소의 교집합으로 중복되지 않은 요소로 구성
>>> rdd1 = sc.parallelize(["a","a","b","c"])
>>> rdd2 = sc.parallelize(["a","a","c","c"])
>>> rdd3 = rdd1.intersection(rdd2)
>>> rdd3.collect()
['a', 'c']

3.4. join

  • RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는 메서드
  • 공통된 키에 대해서만 join수행
  • join 수행 결과 Tuple(키, Tuple(첫번째 RDD요소, 두번쨰 RDD요소))
>>> rdd1 = sc.parallelize(["a", "b","c","d","e"]).map(lambda v : (v,1))
>>> rdd1.collect()
[('a', 1), ('b', 1), ('c', 1), ('d', 1), ('e', 1)]
>>> rdd2 = sc.parallelize(["b","c"]).map(lambda v:(v,2))
>>> rdd2.collect()
[('b', 2), ('c', 2)]
>>> rdd3 = rdd1.join(rdd2)
>>> rdd3.collect()
[('b', (1, 2)), ('c', (1, 2))]

3.5. leftOuterJoin, rightOuterJoin

  • 키와 값의 쌍으로 구성된 RDD에 사용가능
  • leftjoin, rightjoin을 수행
>>> rdd1 = sc.parallelize(["a", "b","c","d","e"]).map(lambda v : (v,1))
>>> rdd2 = sc.parallelize(["b","c"]).map(lambda v:(v,2))
>>> rdd3 = rdd1.leftOuterJoin(rdd2)
>>> rdd3.collect()
[('a', (1, None)), ('e', (1, None)), ('b', (1, 2)), ('c', (1, 2)), ('d', (1, None))]
///rdd2에는 a,d,e 키가 없기 때문에 해당 키에 대한 튜플 요소는 (rdd1의 요소, None)으로 구성됨

>>> rdd4 = rdd1.rightOuterJoin(rdd2)
>>> rdd4.collect()
[('b', (1, 2)), ('c', (1, 2))]

3.6. subtractByKey

  • 키와 값의 쌍으로 구성된 RDD에 사용가능
  • rdd1의 요소 중에서 rdd2와 겹치지 않는 키로 구성된 새로운 RDD 생성
>>> rdd1 = sc.parallelize(["a","b"]).map(lambda v:(v,1))
>>> rdd2 = sc.parallelize(["b"]).map(lambda v:(v,1))
>>> rdd3 = rdd1.subtractByKey(rdd2)
>>> rdd3.collect()
[('a', 1)]

4. 집계와 관련된 연산들

4.1 reduceByKey

  • 키와 값의 쌍으로 구성된 RDD에서 사용 가능
  • RDD 내의 동일한 키를 하나로 병합해 (키,값) 쌍으로 구성된 새로운 RDD 생성
  • 함수를 인자로 받음.
  • 왜냐하면, 파티션 별로 연산을 수행했을 때, 항상 같은 순서로 연산이 수행되는 것을 보장 못하므로, 함수가 수행하는 연산은 교환법칙과 결합법칙이 성립해야 함
>>> rdd = sc.parallelize(['a','b','b']).map(lambda v:(v,1))
>>> rdd.collect()
[('a', 1), ('b', 1), ('b', 1)]
>>> rdd2 = rdd.reduceByKey(lambda v1, v2:(v1+v2))
>>> rdd2.collect()
[('b', 2), ('a', 1)]
(키,값)쌍으로 하는 RDD를 인자로 받는 트랜스포메이션 메서드
- 데이터 처리 과정에서 사용할 파티셔너와 파티션 개수를 지정할 수 있는 옵션이 있음 - 자체적으로 작성한 파티셔너나 파티션 개수를 통해 병렬 처리 수준 변경 가능

4.2 foldByKey

  • 키와 값으로 구성된 RDD에 사용 가능
  • reduceByKey()와 유사하지만, 병합 연산의 초기값을 인자로 전달할 수 있음
rdd = sc.parallelize(["a","b","b"]).map(lambda v:(v,1))
>>> rdd2 = rdd.foldByKey(0, lambda v1,v2:v1+v2)
>>> rdd2.collect()
[('b', 2), ('a', 1)]

My view) 개인적으로, foldByKey와 reduceByKey의 차이가 잘 이해가 되지 않아, 초기값과 문자열 병합으로 pyspark를 실행해 보았습니다.

>>> rdd = sc.parallelize(["a","b","b"]).map(lambda v:(v,1))

///초기값을 1로 준 경우
>>> rdd2 = rdd.foldByKey(1, lambda v1,v2:v1+v2)

///초기값을 0으로 준 경우와 다른 결과임
>>> rdd2.collect()
[('b', 4), ('a', 2)]
>>> rdd = sc.parallelize(["a","b","b"]).map(lambda v:(v,'c'))

///초기값을 t로 준 경우
>>> rdd2 = rdd.foldByKey('t', lambda v1,v2:v1+v2)
>>> rdd2.collect()
[('b', 'tctc'), ('a', 'tc')]

위의 두 연산를 보니, foldByKey는 초기값 처리를 아래와 같이 진행하는 것 같습니다. 예를 들어, 초기 rdd가 [(‘a’, 1), (‘b’, 1), (‘b’, 1)] 라 한다면, foldByKey는 키 ‘a’와 ‘b’에 대해 각각 초기값을 가지고 병합연산을 수행합니다. 이 때 먼저, 초기값을 가지고 병합 연산을 수행합니다. 키 ‘a’인 경우, 초기값 1라면, v1=1, v2=1(키 ‘a’에 대응되는 값)이 병합연산을 수행해 (‘a’,2)가 됩니다. 그 다음 reducebykey와 같은 연산을 수행하나, 키 ‘a’는 초기 rdd에 하나밖에 없기 때문에, v1=2, v2=None이 되어 최종 foldByKey연산 결과는 키 ‘a’에 대해서 2값을 가지게 됩니다.

‘b’키 같은 경우도 마찬가지입니다. 먼저 초기값을 가지고 연산을 수행합니다. v1=1(초기값), v2=1(b에 대응되는 값)가 병합연산을 수행해 v1=2가 됩니다. ‘b’키는 두개가 있으므로, 나머지 ‘b’키에 대해 v1=1, v2=1가 병합연산을 거쳐 v2=2가 됩니다. 그다음 두개의 ‘b’키에 대해 다시 병합연산이 수행되어 v1=2, v2=2가 되어 최종적으로 ‘b’키에 대해 4의 값이 생성됩니다.

4.3 combineByKey

  • 키와 값인 RDD에 사용 가능
  • foldByKey와 reduceByKey와 유사함. 차이점은 병합연산 수행 결과 값의 타입이 바뀔 수 있음
def reduceByKey(func:(V,V)=>V):RDD[K,V]
def foldByKey(zeroValue: V)(func:(V,V)=>V):RDD[K,V]
def combineByKey[C](createCombiner:(V)=>C, mergeValue:(C,V)=>V, mergeCombiners:(C,C)=>C):RDD[K,C]
  • combineByKey는 reduceByKey와 foldByKey와 다르게 타입이 C로 바뀌어 있음
  • 위의 combineByKey를 보면 메서드가 총 createCombiner, mergeValue, mergeCombiners 세 개임을 알 수 있음
  • 아래는 combineByKey에서 자주 등장하는 평균구하기 예시임
#record.py 에 따로 저장해야 함. 
#아니면, _pickle.PicklingError: Can't pickle <class '__main__.Record'>: attribute lookup Record on __main__ failed 에러 발생.

class Record:

    def __init__(self, amount, number=1):
        self.amount = amount
        self.number = number
        
    def addAmt(self, amount):
        return Record(self.amount + amount, self.number + 1)
    
    def __add__(self, other):
        amount = self.amount + other.amount
        number = self.number + other.number 
        return Record(amount, number)
        
    def __str__(self):
        return "avg:" + str(self.amount / self.number)

    def __repr__(self):
        return 'Record(%r, %r)' % (self.amount, self.number)
#createCombiner, mergeValue, mergeCombiners 정의
def createCombiner(v):
    return Record(v)

def mergeValue(c, v):
    return c.addAmt(v)

def mergeCombiners(c1, c2):
    return c1 + c2
#combineByKey 실행
rdd = sc.parallelize([("Math", 100), ("Eng", 80), ("Math", 50), ("Eng", 70), ("Eng", 90)])
rdd2 = rdd.combineByKey(lambda v:createCombiner(v), lambda c,v:mergeValue(c,v), lambda c1,c2:mergeCombiners(c1,c2))
print('Math', rdd2.collectAsMap()['Math'], 'Eng', rdd2.collectAsMap()['Eng'])
>>> Math avg:75.0 Eng avg:80.0
  • createCombiner()
    • 값을 병합하기 위한 컴바이너. 컴바이너는 병합을 수행하고 컴바이너 타입으로 내부에 저장함. 위의 예로는 createCombiner()는 Record클래스 타입으로 저장됨. 즉, rdd의 각 키에 해당되는 값이 record클래스 타입으로 변환되어 저장됨.
  • mergeValue()
    • 키에 대한 컴바이너가 존재한다면, 컴바이너를 이용해 값을 병합함.

My View) 위의 예 같은 경우, rdd요소 순서대로 combineByKey가 작동한다면 먼저 (“Math”, 100)에 대해 Combiner가 생성되어, math 키에 대한 Record(100)이 생성됩니다.

그 다음 (“Eng”, 70)에 대해 작동합니다. “Eng”키는 기존에 없던 키이기 때문에 새로 Combiner인 Record(70)이 생성됩니다.

그 다음 (“Math”, 50)인데, 기존 Math키에 대한 컴바이너가 존재하기 때문에, 기존 Math 키 컴바이너를 이용하여 병합합니다. 즉, Record(100).addAmt(50)이 발생합니다. 이렇게 Math키와 Eng키에 대해 모든 요소에 대해 병합을 실시하게 됩니다.

  • mergeCombiners()
    • createCombiner()와 mergeValue()는 파티션별로 수행됨. 그다음 모든 파티션에 생성된 combiner를 병합하는 과정을 mergeCombiners()를 통해 수행하는 것임. 이를 통해 최종결과가 발생함

4.4 aggregateByKey

  • 키와 값의 RDD에서 사용 가능
  • 초깃값을 설정할 수 있는 점을 제외하면 comebineByKey와 동일
def combineByKey[C](createCombiner:(V)=>C, mergeValue:(C,V)=>V, mergeCombiners:(C,C)=>C):RDD[(K,C)]
def aggregateByKey[U](zeroValue: U)(seqOp:(U,V)=>U, combOp:(U,U)=>U):RDD[(K,U)]
  • aggregateByKey에서 seqOp는 mergeValue역할을, comOp는 mergeCombiner역할을 함.
  • combineByKey에서 createCombiner로 병합을 위한 초깃값을 구하지만 aggregateByKey는 함수를 이용해 초깃값을 설정하는 대신 바로 ‘값’으로 초기값을 설정함
rdd = sc.parallelize([("Math", 100), ("Eng", 80), ("Math", 50), ("Eng", 70), ("Eng", 90)])
rdd2 = rdd.aggregateByKey(Record(0,0), lambda c,v:mergeValue(c,v), lambda c1,c2:mergeCombiners(c1,c2))
print('Math :', rdd2.collectAsMap()['Math'], 'Eng :', rdd2.collectAsMap()['Eng'])
>>>Math : avg:75.0 Eng : avg:80.0

5. pipe 및 파티션과 관련된 연산

5.1 pipe

  • pipe를 이용하면 데이터를 처리하는 과정에서 외부 프로세스를 활용할 수 있음
  • 세 개의 숫자로 구성된 문자열을 리눅스의 cut 유틸리티를 이용해 분리한 뒤첫번째와 세번재 숫자를 뽑아내는 예제
>>> rdd = sc.parallelize(["1,2,3", "4,5,6", "7,8,9"])
>>> rdd2 = rdd.pipe("cut -f 1,3 -d ,")
>>> rdd2.collect()
['1,3', '4,6', '7,9']

5.2. coalesce와 repartition

  • 현재 RDD에 사용된 파티션 개수를 조정함
  • coalesce는 파티션 개수를 줄이기만 되고, repartition은 늘리는 것과 줄이는 것 둘 다 가능
  • coalesce가 따로 있는 이유는 처리 방식에 따른 성능 차이 때문임
    • repartition은 셔플을 기반으로 동작을 수행하는 데 반해, coalesce는 강제로 셔플을 수행하라는 옵션을 지정하지 않는 한 셔플을 사용하지 않음.
    • 따라서 필터링 등으로 인해 데이터 개수가 줄어든 경우 coalesce을 사용하는 것이 좋음
>>> rdd = sc.parallelize(list(range(1,11)),10)
>>> rdd2 = rdd.coalesce(5)
>>> rdd3 = rdd2.repartition(10)
>>> print("partition size : %d" 
>>> print("partition size : %d" %rdd2.getNumPartitions())
partition size : 5
>>> print("partition size : %d" %rdd3.getNumPartitions())
partition size : 10

5.3. repartitionAndSortWithinPartitions

  • 키와 값으로 구성된 RDD에서 사용 가능
  • RDD를 구성하는 모든 데이터를 특정 기준에 따라 여러 개의 파티션으로 분리하고 각 파티션 단위로 정렬을 수행한 뒤 새로운 RDD를 생성해 주는 메서드임
  • 각 데이터가 어떤 파티션에 속할지 결정하기 위한 파티셔너(org.apache.spark.Partitioner)설정
    • 키 값을 이용하여 어떤 파티션에 속할지 결정할 뿐만 아니라 키 값을 이용한 정렬도 수행함
    • 파티션 재할당을 위해 셔플을 수행하는 단계에서 정렬도 함께 다루게 되어 파티션과 정렬을 각각 따로하는 것에 비해 더 높은 성능을 발휘할 수 있음
  • 10개의 무작위 숫자를 위 메서드를 이용해 3개의 파티션으로 분리해 보는 예제
>>> data = [random.randrange(1,100) for i in range(0,10)]
>>> rdd1 = sc.parallelize(data).map(lambda v:(v,"-"))
>>> rdd2 = rdd1.repartitionAndSortWithinPartitions(3, lambda x:x)
>>> rdd2.foreachPartition(lambda values:print(list(values)))
[(50, '-')]
[(16, '-'), (52, '-'), (61, '-'), (67, '-')]
[(6, '-'), (12, '-'), (48, '-'), (51, '-'), (87, '-')]
  • pyspark에서 repartitionAndSortWithinPartitions에서 default 파티셔너는 hash 파티셔너로 되어있음
  • foreachPartition은 partition단위로 특정함수를 실행해주는 메서드임. 위의 예제에서는 파티션단위로 파티션에 속해있는 값을 프린트해주는 함수를 실행했음

5.4. partitionBy

  • 키와 값으로 구성된 RDD에서 사용가능
  • 파티션을 변경하고 싶을 때 사용가능
  • 기본적으로, hashpartitioner와 rangepartitioner가 있음
  • org.apache.spark.partitioner 클래스를 상속해서 파티셔너를 커스터마이징도 가능
>>> rdd1 = sc.parallelize([("apple",1),("mouse",1),("monitor",1)],5)
>>> rdd1.collect()
[('apple', 1), ('mouse', 1), ('monitor', 1)]
>>> rdd2 = rdd1.partitionBy(3)
>>> print("rdd1: %d, rdd2: %d" %(rdd1.getNumPartitions(), rdd2.getNumPartitions()))
rdd1: 5, rdd2: 3
  • 위의 예제에서 partitionby에 의해 파티션 갯수가 변경된 것을 확인할 수 있음

6. 필터와 정렬 연산

특정 조건을 만족하는 요소만 선택하거나, 각 요소를 정해진 기준에 따라 정렬함

6.1. filter

  • RDD의 각각 요소에 조건에 따라 True/False로 가려내는 함수를 적용하여 True에 해당하는 요소만 걸러냄
>>> rdd1 = sc.parallelize(range(1,6))
>>> rdd2 = rdd1.filter(lambda i:i>2)
>>> print(rdd2.collect())
[3, 4, 5]

6.2. sortByKey

  • 키 값을 기준으로 요소를 정렬하는 연산임
  • 따라서, 키와 값으로 구성된 RDD에 적용 가능함
>>> rdd1 = sc.parallelize([("q",1),("z",1),("a",1)])
>>> result = rdd1.sortByKey()
>>> print(result.collect())
[('a', 1), ('q', 1), ('z', 1)]

6.3. keys, values

  • 키와 값으로 구성된 RDD에 적용 가능함
  • keys는 RDD의 키 요소로 구성된 RDD를 생성하고, values는 RDD의 value요소로 구성된 RDD를 생성함
>>> rdd1 = sc.parallelize([("q",1),("z",1),("a",1)])
>>> rdd2 = rdd1.keys()
>>> rdd3 = rdd1.values()
>>> print(rdd2.collect(), rdd3.collect())
['q', 'z', 'a'] [1, 1, 1]

6.4. sample

  • 샘플을 추출하는 RDD메서드
  • sample(withReplacement, fraction, seed=None)
    • withReplacement : True/False복원추출 결정
    • fraction
      • 복원추출인 경우, RDD각 요소당 평균 추출횟수를 의미함
      • 비복원추출인 경우, RDD각 요소당 샘플될 확률을 의미함
      • fraction이 sample사이즈를 결정하는 것은 아님. 아래 예제를 보면, sample사이즈는 random한 것을 알 수 있음.
>>> rdd1 = sc.parallelize(range(100))
>>> rdd2 = rdd1.sample(True, 1.5, seed=1234)
>>> rdd3 = rdd1.sample(False, 0.2, seed=1234)
>>> 
>>> rdd2.collect()
[1, 1, 2, 6, 6, 7, 7, 9, 10, 10, 11, 12, 12, 12, 13, 15, 17, 18, 19, 19, 19, 20, 21, 21, 23, 24, 25, 25, 26, 26, 26, 26, 26, 27, 28, 28, 28, 29, 30, 31, 32, 33, 33, 34, 35, 36, 36, 36, 37, 37, 38, 39, 39, 42, 42, 44, 44, 45, 45, 46, 48, 48, 48, 49, 49, 49, 50, 50, 50, 51, 53, 54, 57, 58, 59, 60, 63, 64, 65, 65, 66, 69, 71, 71, 71, 72, 72, 72, 73, 73, 75, 76, 77, 79, 80, 80, 81, 84, 84, 85, 85, 85, 86, 88, 88, 88, 89, 89, 89, 90, 90, 91, 91, 92, 92, 92, 94, 94, 95, 95, 95, 95, 96, 97, 97, 99]
>>> rdd3.collect()
[0, 5, 6, 7, 8, 11, 15, 35, 39, 41, 55, 56, 58, 61, 62, 71, 72, 78, 81, 89, 90, 93, 97, 99]

이상으로 본 포스팅을 마치겠습니다. 다음 포스팅은 <RDD, Resilient Distributed Dataset에 대하여[3] - RDD액션, RDD데이터 불러오기와 저장하기> 에 대해 진행하도록 하겠습니다.