research blog for data science

dataframe, numpy 등 array에서 double-colon(::) slicing

|

pandas, numpy 등 자주 헷갈리는 코드 사용을 모아두었습니다.


df[::c]

시작부터 c 간격마다 있는 row를 슬라이싱해줍니다. 자세히 설명하면, 1번째, (1+c)번째, (1+2c)번째, …, (1+nc)번째 row가 선택됩니다. 아래는 예제입니다.

import pandas as pd
import numpy as np
a = np.random.normal(size=200)
b = np.random.uniform(size=200)
sampledf = pd.DataFrame({'A':a,'B':b})
sampledf[::2]
A B
0 0.312234 0.788584
2 -0.123720 0.445176
4 0.411344 0.617469
6 -0.434367 0.674210
8 -0.563221 0.009331
... ... ...
190 1.797756 0.963394
192 -0.679177 0.033222
194 0.975527 0.041236
196 -1.354463 0.450887
198 -2.341788 0.009804

100 rows × 2 columns

위에 sampledf[::2]를 보시면 첫번째(index=0), 세번째(index=2), …., 199번째(index=198)이 선택되는 것을 확인하실 수 있습니다. 2의 간격 크기로 행이 선택되는 것입니다.

df[::-1]

df[::-1] 인 경우는 열의 배치를 뒤집어줍니다. 아래는 예시입니다.

sampledf[::-1]
A B
199 2.600890 0.775489
198 -2.341788 0.009804
197 -0.365103 0.413758
196 -1.354463 0.450887
195 0.685687 0.933069
... ... ...
4 0.411344 0.617469
3 1.703587 0.718288
2 -0.123720 0.445176
1 0.208545 0.459722
0 0.312234 0.788584

200 rows × 2 columns

df[::-c]

마찬가지로, df[::-c] 이면 뒤에 row부터 2간격마다 row가 선택됩니다. 아래는 예시입니다.

sampledf[::-2]
A B
199 2.600890 0.775489
197 -0.365103 0.413758
195 0.685687 0.933069
193 0.267967 0.020342
191 -0.918194 0.917082
... ... ...
9 0.924938 0.837344
7 0.890616 0.096270
5 -0.603043 0.697143
3 1.703587 0.718288
1 0.208545 0.459722

100 rows × 2 columns

pandas.DataFrame.any(), numpy.any()

|

평소에 헷갈리는 any(), all()에 대해 정리하였습니다.


df.isna()

In [1]:
import pandas as pd
In [2]:
df = pd.read_csv('./data/top1_1880109251922.csv', index_col=[0])
df['date'] = pd.to_datetime(df['date'])
df['Date'] = pd.to_datetime(df['date'])
df = df.set_index('Date')
df = df.asfreq('D')

df.isna()는 데이터프레임에서 NaN 요소에 해당되는 부분을 True로 리턴해준다.

In [3]:
df.isna()
date store product_c sales
Date
2018-02-01 False False False False
2018-02-02 False False False False
2018-02-03 False False False False
2018-02-04 False False False False
2018-02-05 False False False False
... ... ... ... ...
2019-07-27 True True True True
2019-07-28 True True True True
2019-07-29 False False False False
2019-07-30 False False False False
2019-07-31 False False False False

546 rows × 4 columns

df.any()

여기서, dataframe.any(axis=0)인 경우엔 각 column의 row를 다 훑어서, row요소들 중 적어도 하나의 row애 True가 있으면, True를 반환합니다.

In [4]:
df.isna().any(axis=0)
date         True
store        True
product_c    True
sales        True
dtype: bool

반면에, dataframe.any(axis=1)인 경우엔 각 index별로 column요소를 다 훑어서 적어도 하나의 column에 True가 있으면 True를 반환합니다. 아래 코드를 보면, 해당 index에 data, store, product_c, sales가 모두 True이면 해당 index row는 True를 반환합니다.

In [5]:
df.isna().any(axis=1)
Date
2018-02-01    False
2018-02-02    False
2018-02-03    False
2018-02-04    False
2018-02-05    False
              ...  
2019-07-27     True
2019-07-28     True
2019-07-29    False
2019-07-30    False
2019-07-31    False
Freq: D, Length: 546, dtype: bool

np.any()

np.any() 는 dataframe.any()와 유사합니다. 주어진 축(axis) 정보에 따라 해당 요소에서 True가 하나 이상이라도 있으면 True를 반환합니다. 아래는 np.any()의 예제입니다.

In [6]:
import numpy as np

먼저, 예제 array를 생성합니다. True, False로 구성된 random한 array를 만들었습니다.

In [7]:
samplearr = [True, False]
a = np.random.choice(samplearr, size=(47))
b = np.random.choice(samplearr, size=(47))
c = np.random.choice(samplearr, size=(47))
print(a)
print(b)
print(c)
[ True  True  True  True  True  True False  True  True  True False False
 False  True False  True False False False False  True False  True  True
 False  True False False False  True False  True False  True  True False
 False  True False False  True False False  True  True  True False]
[False False  True False  True False  True  True False  True  True False
  True False  True False False False False False  True  True False False
 False False  True False False  True False False False  True False  True
  True  True False False  True  True  True  True False  True  True]
[False False  True False False False False  True False False  True  True
  True False False  True  True  True  True False False False False  True
 False  True  True  True  True  True  True  True False False False  True
 False False False False False False  True  True  True  True  True]

np.any(a,b,c)는 에러를 발생합니다. 반드시, 하나의 array나 아니면 array와 유사한 list형식으로 묶어서 넣어줘야합니다.

In [8]:
np.any(a,b,c)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-8-7a7facd3228c> in <module>
----> 1 np.any(a,b,c)
<__array_function__ internals> in any(*args, **kwargs)
/opt/anaconda3/lib/python3.7/site-packages/numpy/core/fromnumeric.py in any(a, axis, out, keepdims)
   2328 
   2329     """
-> 2330     return _wrapreduction(a, np.logical_or, 'any', axis, None, out, keepdims=keepdims)
   2331 
   2332 
/opt/anaconda3/lib/python3.7/site-packages/numpy/core/fromnumeric.py in _wrapreduction(obj, ufunc, method, axis, dtype, out, **kwargs)
     85                 return reduction(axis=axis, out=out, **passkwargs)
     86 
---> 87     return ufunc.reduce(obj, axis, dtype, out, **passkwargs)
     88 
     89 
TypeError: only integer scalar arrays can be converted to a scalar index

axis=0 vs. axis=1

그전에 axis=0과 1에 따라 차이를 살펴봅시다. axis=0인 경우엔 각 column의 모든 row를 훑고, axis=1인 경우엔 각 row의 모든 column을 훑습니다. 아래는 관련 그림입니다.

jpg

axis=0인 경우, 각각의 column요소에서 모든 row를 훑어서 하나 이상이 True요소라면 True를 반환합니다. 결과는 [a,b,c]의 column의 갯수만큼 출력됩니다.

In [9]:
np.any([a,b,c], axis=0)
array([ True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True, False,  True,  True,  True,  True, False,  True,  True,
        True,  True,  True,  True,  True, False,  True,  True,  True,
        True,  True, False, False,  True,  True,  True,  True,  True,
        True,  True])

axis=1인 경우, 각각의 row요소에서 모든 column을 훑어서 하나 이상이 True요소라면 True를 반환합니다. 결과는 [a,b,c]의 row 갯수만큼 출력됩니다.

In [10]:
np.any([a,b,c], axis=1)
array([ True,  True,  True])

np.all()

np.all()은 np.any()와 반대로, 검사할 축에 모든 요소가 True여야지만 True를 반환합니다. 아래는 예제입니다.

In [11]:
np.all([a,b,c], axis=0)
array([False, False,  True, False, False, False, False,  True, False,
       False, False, False, False, False, False, False, False, False,
       False, False, False, False, False, False, False, False, False,
       False, False,  True, False, False, False, False, False, False,
       False, False, False, False, False, False, False,  True, False,
        True, False])
In [12]:
np.all([a,b,c], axis=1)
array([False, False, False])

General Approach to Time Series Analysis - Time Series Data, Stationarity 등에 대하여

|

이번 포스팅을 시작으로, 시계열 분석에 대해서 다루도록 하겠습니다. 메인 교재는 Brockwell와 Richard A. Davis의 < Introduction to Time Series and Forecasting > 와 패스트캠퍼스의 <파이썬을 활용한 시계열 분석 A-Z> 를 듣고 정리하였습니다.


1.1. What is Time Series?

시계열이란, 일정 시간 간격으로 배치된 데이터들의 수열입니다. 그 중에서 discrete한 시계열이란 관측이 발생한 시각 t의 집합 $T_0$ 가 discrete한 경우이며, 관측치가 시간 구간 안에서 연속적으로 발생한다면 continuous한 시계열입니다.

시계열 시퀀스는 일반적으로 자기 상관성이 있는 수열입니다. 즉, 과거의 데이터가 현재를 넘어서 미래까지 영향을 미치는 것을 뜻합니다.

[Cov(X_i, X_j) \neq 0]

따라서, 시계열 데이터로 모델링을 하기 위해선 먼저 데이터를 최대한 분해해서 살펴봐야 합니다. 확률 모델링을 하기 위해선 i.i.d 여야 하기 때문입니다. 일반적으로 시계열 데이터는 trend, seasonality, noise 항으로 구성되어 있습니다. 여기서 시계열 데이터가 자기 상관성을 가지게 되는 요인은 trend와 seasonality 요소 때문이고, noise는 i.i.d한 독립변수로 구성된 에러항입니다.

1.2. Objectives of Time Series Analysis

시계열 분석의 목적은 주로 시계열 데이터를 보고 앞으로 일어날 일들을 예측하는 것입니다. 그러기 위에선 기존에 있는 시계열 데이터를 가지고 추론을 해야합니다. 따라서, 이러한 추론을 하기 위해선 가정에 맞는 적절한 확률 모델을 선택하여 모델링을 진행해야 합니다.

그러나, 시계열 데이터는 자기 상관성이 존재하는 데이터입니다. 따라서 확률적 모델링을 통해 이 시계열 데이터를 서로 독립인 데이터로 변환해야 하는데 이 과정이 seasonal adjustment 또는 trend and seasonal decomposition입니다. 그 밖에 log transformation, differencing 같은 과정도 존재합니다.

어쨌든, 시계열 분석의 궁극적인 목표는 독립적인 변수로 최대한 변환한 뒤, 이를 기반으로 확률적 통계 모델링을 해서, inference를 하는 것입니다. Inference 결과는 다시 우리가 얻고자 하는 예측값으로 바꾸기 위한 reverting 과정을 거쳐야 합니다. 왜냐하면, seasonal adjustment나 Decomposition을 통해 상관성을 제거했기 때문에 원하는 예측값을 얻기 위해선 다시 원래대로 이 과정을 뒤집어서 돌아가야 하는 것입니다.

1.3. Some Simple Time Series Models

위에서 말씀 드린 것과 같이 시계열 데이터를 보고 적절한 확률 모델을 선택하는 것은 매우 중요합니다. 따라서, 몇가지 간단한 time series model을 소개하겠습니다.

1.3.1. Definition of a time series model

관측된 ${x_t}$ 에 대한 time-series 모델이란, 랜덤 변수 ${X_t}$ 시퀀스들의 joint distribution 을 의미합니다.

A time series model for the observed data ${x_t}$ is a specification of the joint distributions(or possibly only the means and covariances) of a sequence of random variables ${X_t}$ of which ${x_t}$ is postulated to be a realization.

즉, 랜덤 변수들의 시퀀스 ${X_1, X_2, \dots }$ 로 구성된 time-series 확률 모델은 랜덤 벡터 $(X_1, \dots, x_n)’ ,\,\, n=1,2,\dots,$ 의 결합 분포입니다. 아래 그림은 랜던 변수들의 시퀀스 ${S_t, t=1, \dots, 200}$ 로 나올 수 있는 가능성 중 한가지가 ‘실현’ 된 것입니다.

그림 1. Time-series 예시

1.3.2. Some Simple Time Series Model

  1. iid Noise
    가장 기본적인 time series 모델은 noise항으로만 이뤄진 경우입니다.(거의 현실세계에선 없다고 생각하시면 됩니다.)
  2. Binary Process
    i.i.d Noise의 종류로, binary 분포를 따르는 noise인 경우입니다. 랜덤 변수들의 시퀀스 $\{X_t,\,\,t=1,2,\dots,\}$ 가 $P[X_t = 1]=p$ , $P[X_t = -1] = 1-p$ 를 따릅니다.
  3. Models with only Trend
    trend요소와 noise항만 있는 경우입니다. 여기서 trend요소란 패턴이 선형관계를 가지고 있을 때입니다. 자세히 말하면, 시계열이 시간에 따라 증가, 감소, 또는 일정 수준을 유지하는 경우입니다. $$X_t = m_t + Y_t$$

    그림 2. Time series with only trend component

  4. Models with only Seaonality
    seasonal요소와 noise항만 있는 경우입니다. 여기서 seasonal요소란 일정한 빈도로 주기적으로 반복되는 패턴을 말합니다. 반면에, 일정하지 않은 빈도로 발생하는 패턴은 Cycle이라 합니다.(여기서는 seasonal 기준으로 설명하겠습니다.) $$X_t = S_t + Y_t$$
  5. 그림 3. Times series with only seasonality(period=12month)

1.3.3 A General Approach to Time Series Modeling

시계열 분석에 대해 깊게 들어가기 전에, 시계열 데이터 모델링하는 방법에 대해 대략적으로 알아봅시다.

1) 그래프로 그린 후, 그래프 상에서 아래와 같은 요소가 있는지 체크한다.(Plot the series and examine the main features of the graph)

  • trend
  • a seasonal component
  • any apparent sharp changes in behavior
  • any outlying observations

2) 정상상태의 잔차를 얻기 위해, trend와 seasonality 요소를 제거한다. (Remove the trend and seasonal components to get stationary residuals)
     trend와 seasonality 요소를 제거하기 전에, 전처리를 해야하는 경우가 있습니다. 예를 들어, 아래와 같이 지수적으로 증가하는 경우에, 로그를 취해서 variance가 일정하도록 만든 후 모델링을 하면 정확도를 높일 수 있습니다.

그림 1. 로그 취하기 전

그림 2. 로그 취한 후

이외에도 여러 방법이 있습니다. 추후에 설명하도록 하겠습니다. 어쨌든, 이 모든 방법들의 핵심은 정상상태의 잔차를 만드는 것입니다.

3) auto-correlation 함수, 여러 다양한 통계량을 이용하여 잔차를 핏팅할 모델을 선택한다. (Choose a model to fit the residuals, making use of various sample statistics including the sample autocorrelation function)

4) 핏팅된 모델로 예측한다.
     여기서 잔차를 예측하는 것이고, 예측된 잔차를 원래 예측해야 할 값으로 변환한다.

1.4. Stationary Models and the Autocorrelation Function

시계열 데이터가 정상상태(stationarity)를 가지기 위해서, 시계열이 확률적인 특징이 시간이 지남에 따라 변하지 않는다는 가정을 충족시켜야 합니다. 그러나 시계열 데이터는 trend와 seasonality요소로 인해, 평균과 분산이 변할 수 있습니다.

a time series ${{X_t, t=0, \pm1, …}}$ is said to be stationary if it has statistical properties similar to those of the “time-shifted” series ${{X_{t+h}, t=0, \pm1, …}}$ for each integer h.

Trends can result in a varying mean over time, whereas seasonality can result in a changing variance over time, both which define a time series as being non-stationary. Stationary datasets are those that have a stable mean and variance, and are in turn much easier to model.

시계열에 대한 평균과 공분산은 아래와 같이 정의됩니다.

그림 3. 시계열의 평균과 공분산

Strict Stationarity vs. Weak Stationarity

엄격한 정상상태가 되려면, $(X_1,\dots , X_n)$ 의 결합분포와 $(X_{1+h}, \dots, X_{n+h})$ 의 결합분포가 시간간격 h에 상관없이 동일해야 합니다. 그러나 이를 이론적으로 증명하기 어렵기 때문에, 약한 정상상태(weak stationarity)만을 만족하면 정상상태에 있다고 생각하고 시계열 문제를 풉니다. 약한 정상상태는 아래 조건을 만족합니다. 즉, 결합분포가 동일해야 한다는 강력한 조건이 사라졌기 때문에 약한 정상상태라고 하는 것입니다.

\((1) \,\, E(X_t) = u\) \((2) \,\, Cov(X_{t+h}, X_{t}) = \gamma_h, for\; all\; h\) \((3)\,\, Var(X_t) = Var(X_{t+h})\)

(2)식은 공분산은 t에 독립임을 의미합니다. 정상상태 시계열의 공분산은 아래와 같이 하나의 변수 h에 대해 나타낼 수 있습니다.

[\gamma_X(h) = \gamma_X(h,0) = \gamma_X(t+h, t)]

이때 함수 $\gamma_X(\cdot)$ 을 lag h에 대한 auto-covariance 함수(ACVF)라 합니다. auto-correlation 함수(ACF)는 ACVF를 이용해 아래와 같이 정의됩니다.

[\rho_X(h)=\frac{\gamma_X(h)}{\gamma_X(0)}=Cor(X_{t+h}, X_t)]

White Noise

시계열 ${{X_t}}$ 가 독립적인 랜덤 변수의 시퀀스이고, 평균이 0이고, 분산이 $\sigma^2$ 이면, White Noise라 합니다. 아래는 White Noise의 조건입니다.

  • \[E(X_t)=0\]
  • \[V(X_t)=V(X_{t+h})=\sigma^2\]
  • \[\gamma_X(t+h, t)=0\;(h\neq0)\]

1.4.1 The Sample Autocorrelation Function

관측 데이터 가지고 자기 상관의 정도를 볼때, sample auto-correlation 함수(sample ACF)를 사용합니다. Sample ACF는 ACF의 추정으로, 계산은 아래와 같습니다.

그림 4. Sample ACF

White Noise인 경우, 시계열 그래프와 ACF 그래프는 아래와 같습니다. lag가 1이상인 경우, 거의 ACF값이 0에 가까운 것을 볼 수 있고, 95% 신뢰구간 안에 들어와 있습니다.

그림 5. White Noise ACF

아래는 그림 1. 그래프에 플롯된 데이터를 가지고 그린 ACF입니다. 보시면, ACF가 lag가 커짐에 따라 서서히 감소하는 형태를 띄는데 이는 trend가 있는 데이터에서 나타납니다.

그림 6. Sequence with trend ACF

1.5. Estimation and Elimination of Trend and Seasonal Components

trend와 seasonality가 존재하는 시계열의 모델링인 경우, 아래와 같이 additive 형태를 띌 수 있습니다.

[X_t = m_t + s_t + Y_t]

시계열 모델링의 최종 목표는 잔차항 $Y_t$ 가 정상상태에 놓이게 하는 것입니다. 따라서 잔차항을 분석하기 위해서 trend 요소 $m_t$ 와 seasonal 요소 $s_t$ 를 제거해야 합니다.

1.5.1. Estimation and Elimination of Trend in the Absence of Seasonality

seasonal 요소가 없고, trend요소만 있는 모델링은 아래와 같이 진행할 수 있습니다.

[X_t = m_t + Y_t, \quad t=1, \dots ,n, \; where \; EY_t = 0]

method1. Trend Estimation

trend 요소를 추정하는 방법은 Moving Average와 Smoothing을 이용하는 방법 2가지가 있습니다.

a) Smoothing with a finite moving average filter

과거 n개의 시점을 평균을 구해 다음 시점을 예측하는 방식입니다.

[W_t = (2q+1)^{-1}\sum_{j=-q}^{q}X_{t-j}]

이때, $X_t = m_t + Y_t$ 이므로, 아래와 같은 식으로 유도됩니다.

[W_t = (2q+1)^{-1}\sum_{j=-q}^{q}X_{t-j} = (2q+1)^{-1}\sum_{j=-q}^{q}m_{t-j} + (2q+1)^{-1}\sum_{j=-q}^{q}Y_{t-j}]

만약에 $m_t$ 가 대략 선형관계를 띄고 있다면 잔차항의 평균은 0에 가까울 것입니다. 즉, 트렌드가 선형관계를 띄고 있을 때, moving average filter를 씌어주면 trend요소만 추출할 수 있는 것을 의미합니다.

[W_t = (2q+1)^{-1}\sum_{j=-q}^{q}X_{t-j} = (2q+1)^{-1}\sum_{j=-q}^{q}m_{t-j} + (2q+1)^{-1}\sum_{j=-q}^{q}Y_{t-j} \approx m_t]

그림 7. Moving average filter 취하기 전

그림 8. Moving average filter 취한 후

그림 9. Trend 제거 후 잔차항
위에 그림 7,8,9 를 살펴 봅시다. 그림 8은 그림 7에서 과거시점 5개를 이용하여 moving average 필터를 씌운 후입니다. 뚜렷한 트렌드가 있지 않음을 보실 수 있습니다. ~~잔차항에 대한 분석은 다시 한번 살펴봐야 할 것 같습니다.~~ Simple Moving Average Filter는 trend가 linear하고, Noise가 White Noise일 때, 시계열 데이터에서 trend요소를 잘 추출할 수 있습니다. 그러나 Non-linear한 trend라면, Noise가 White Noise라 하더라도, trend 추정이 올바르지 않습니다. 그럴 땐, 적절한 가중치를 부여하여 Moving Average Filter를 씌워야 합니다. $$\sum_{j=-7}^{j=7}a_jX_{t-j} = \sum_{j=-7}^{j=7}a_j m_{t-j}+\sum_{j=-7}^{j=7}a_jY_{t-j} \approx \sum_{j=-7}^{j=7}a_j m_{t-j} = m_t$$
b) Exponential smoothing
Moving averages는 과거 n개의 시점에 동일한 가중치를 부여하는 방법입니다. 그러나, 현재시점과 가까울수록 좀 더 현재시점에 영향을 많이 미치는 경우가 일반적으로 생각하기엔 자연스러울 수 있습니다. 예로 주식을 생각하면 될 것 같습니다. 따라서, Exponential smoothing 방법은 현재 시점에 가까울수록 더 큰 가중치를 주는 방법입니다.

그림 10. Exponential Smoothing

Exponential Smoothing 수식은 아래와 같습니다. $$\hat{m}_t = \alpha X_t + (1-\alpha)\hat{m}_{t-1},\,\,t=2, \dots, n,$$ $$\hat{m}_1=X_1$$ 아래 그림은 그림 7을 exponential smoothing을 취한 trend 추정 그래프입니다.

그림 11. Exponential Smoothing 취한 후

c) Smoothing by elimination of high-frequency component
trend를 추출하는 방법 중 하나로, 여러 frequency의 합으로 trend를 표현해서 이를 제거하는 것입니다(이 부분은 추후에 4장에 가서 다시 설명하도록 하겠습니다).

그림 12. frequency합으로 smoothing을 취한 후( $\alpha=0.4$ )

d) Polynomial fitting
$m_t = a_0 + a_1t + a_2t^2 + \dots + a_nt^n$ 으로 모델링하여, $\sum_{t=1}^n(x_t-m_t)^2$ 을 최소화하는 방식으로 파라미터 $a_k,\,(k=0, \dots, k=n$ 을 구하는 방식으로 trend를 추정할 수 있습니다. $X_t - Y_t = m_t$ 에서, $Y_t$ 는 stationary state을 가정하고 있기 때문에, polynomial model을 구축할 수 있는 것입니다.

method2. Trend Elimination by Differencing

method1 방법은 trend를 추정한 뒤, 시계열 $\{X_t\}$ 에서 빼주는 방식으로 trend를 제거하였습니다. 이번엔 difference(차분)를 통해서 trend요소를 제거하는 방법을 알아보도록 하겠습니다. Lag-1 difference operator $\bigtriangledown$ 는 아래와 같습니다. $$\bigtriangledown X_t = X_t-X_{t-1} = (1-B)X_t$$ B는 backward-shift operator로 $BX_t = X{t-1}$ 입니다. j lag difference는 $\bigtriangledown (X_t) = \bigtriangledown (\bigtriangledown^{j-1} (X_t))$ 입니다. 예를 들어, 2-lag difference는 아래와 같습니다. $$ \begin{align*} \bigtriangledown^2 X_t&=\bigtriangledown (\bigtriangledown (X_t))=\bigtriangledown ((\bigtriangledown (X_t))\\&=(1-B)(1-B)X_t=(1-2B+B^2)X_t = X_t - 2X_{t-1} + X_{t-2}\end{align*} $$
Why difference helps eliminating trend components? (Maybe or seasonal components)
여기서, 제가 공부하면서 궁금했던 포인트는 왜 difference가 trend 제거에 도움이 되는가? 였습니다. 제가 생각한 답은 아래와 같습니다. trend와 seasonal 요소를 제거하려는 이유는 '고정된 평균과 분산을 가지는 분포'를 가지기 위해서입니다. 그래야지 통계적 모델링이 가능하기 때문입니다. 즉 반대로 말하면, trend와 seasonal 요소는 시간에 따라 평균과 분산이 변함을 의미합니다. 즉 그 변하는 요소를 제거하기 위해서 difference를 하는 것입니다. difference를 통해서 변동성을 제거하는 건 고등학교 수학 때 배웠던 미분을 통해 이해할 수 있습니다. 예를 들어, 일차함수 $y=a+bx$ 는 x값에 따라 y값이 변합니다. 그러나 일차미분을 통해 구한 기울기 b값은 고정이 됩니다. 반면에 이차함수 $y=ax^2 + bx + c$ 는 이차미분을 통해 2a라는 고정값을 갖게 됩니다. 여기서 미분 과정을 difference라 생각하시면 됩니다. > 영어로도 미분이 differentiation 임을 생각하면 와닿습니다. 일차함수 y는 변하는 특성 + 고정된 특성을 둘다 가지고 있는데 일차 미분을 통해 a라는 고정된 특성만을 추출하는 것입니다. 만약에 trend가 일차함수와 같은 관계를 가지고 있다면 1-lag difference 만으로도 변동성을 잡을 수 있게 되는 것이지요. 마찬가지로 2-lag difference는 trend가 이차함수와 같은 관계를 가지고 있다면 적용되는 것입니다. 그러나, 과도한 difference는 시계열을 과하게 변동성을 제거해 버려서, over-correction이 될 수도 있기 때문에 조심해야 합니다.

그림 13. Difference 적용 전

그림 14. Difference 적용 후

1.5.2. Estimation and Elimination of Both Trend and Seasonality

trend와 seasonal 요소가 다 있는 경우 아래와 같이 표현될 수 있습니다(additive model인 경우).multiplicative model인 케이스도 있습니다. $$X_t = m_t + s_t + Y_t, \,\, t=1, \dots, n,$$ $$where,\,\,EY_t = 0, s_{t+d}=s_t,\,\,and\,\,\sum_{j=1}^{d}s_j=0$$ 두 가지 방법을 소개하겠습니다. 먼저, 첫번째 방법입니다.

method 1. Estimation of Trend and Seasonal components

아래와 같은 데이터가 있을 때, trend와 seasonal 요소를 제거해 봅시다. 아래 시계열 같은 경우, 주기가 d=12로, 1년 단위로 싸이클이 반복되는 것을 확인할 수 있습니다.

그림 15. Accidental Deaths, U.S.A., 1973-1978

  1. 먼저, trend 요소를 제외합니다. trend 요소를 제외하는 방법으로 moving average filter를 이용할 수 있습니다.

    예를 들어, 시계열 시퀀스 $\{x_1, x_2, \dots, x_n\}$ 이고, 주기 period $d=2q$ 라 한다면, 아래와 같은 moving average filter 식을 세울 수 있습니다. $$\hat{m_t} = (0.5x_{t-q} + x_{t-q+1} + \dots + x_{t+q-1} + 0.5x_{t+q})/d,\,\, q<t\leq n-q$$ > 양 끝에 0.5씩 붙는 이유는 분자의 갯수는 홀수개 즉 $2q+1$ 이지만, 분모는 짝수 $d=2q$ 이기 때문에, 양 끝에 항의 가중치를 0.5씩만 해주는 것입니다. 반면에, 주기가 $d=2q+1$ 이라면, 아래와 같은 식을 세울 수 있습니다. $$\hat{m_t} = (2q+1)^{-1}\sum_{j=-q}^{q}X_{t-j}$$
  2. 그 다음은 seasonality 요소를 구하는 차례입니다. 먼저, 위에서 구한 trend요소를 원 시계열 데이터에서 $x_{k+jd} - \hat{m_{k+jd}}$ 와 같이 제거해야 합니다. 그런 다음, 동일한 주기에 해당하는 $x-\hat{m}$ 요소들을 가지고 평균 $w_k, \,\,(k=1, \dots, d)$ 를 구해줍니다.
  3. 이 때, $w_k$ 들의 평균은 0이 아닐 수 있습니다. 따라서, seasonal 요소들의 평균이 0이 되도록 다시 한번 평균을 빼줍니다.다시 한번 정규화가 되도록 해주는 것입니다.) $$\hat{s_k} = w_k - \frac{1}{d}\sum_{i=1}^{d}w_i,\,\,k=1, \dots, d$$ $$and, \,\, \hat{s_k}=\hat{s_{k-d}},\,k>d$$ 따라서, deseaonalized된 데이터는 $d_t = x_t - \hat{s_t},\,\, t=1,\dots ,n$ 이며, detrended된 데이터는 $d_t = x_t - \hat{m_t},\,\, t=1, \dots, n$ 입니다.
  4. 마지막으로, noise 추정값은 trend와 seasonal 요소를 모두 제거한 항입니다.
  5. 또한 trend 모델링을 위해, 다시 한번 parametric form으로 다시 한번 재추정하는 과정을 거칩니다. Parametric form으로 다시 한번 trend 요소를 재추정 하는 목적은 prediction과 simulation을 하기 위해서 입니다.

그림 16. Trend and seasonal decomposition 예시

그림 17. Trend and seasonal decomposition 예시

method 2. Elimination of Trend and Seasonal components by Differencing

Trend 요소를 Differencing 방법을 통해 제거한 것과 동일하게 진행됩니다. Differencing operator $\bigtriangledown_d$ 을 $X_t = m_t + s_t + Y_t$ 식 양변에 취해주면 아래와 같습니다. $$\bigtriangledown_dX_t = m_t - m_{t-d} + Y_t - Y_{t-d}$$

1.6. Testing the Estimated Noise Sequence

1.5까지 과정을 거치면 우린 Noise 항을 갖게 됩니다. 그러나 이 Noise 항이 White Noise 항인지는 확인이 필요합니다. 만약에 white noise 항이 맞다면, noise sequence를 모델링 하는 것입니다. 만약에 noise 항이 white noise가 아니라 여전히 depedency가 보인다면 다른 방법을 적용해야 합니다. 이번 챕터에서는 white noise인지를 확인하는 방법에 대해 살펴봅니다.

(a) The sample autocorrelation function

Sample acf를 그려서, 95%신뢰구간 안에 대부분 들어와 있는지 확인합니다. 만약 2,3개 이상이 신뢰구간 밖에 있거나 1개가 유난히 구간 안에 멀리 벗어 났다면, 우린 white noise라고 세웠던 가설을 기각해야 합니다.

(b) The portmanteau test(Ljung-Box test)

Portmanteau 검정 통계량은 일정 기간 동안 일련의 관측치가 랜덤이고 독립적인지 여부를 검사하는데 사용합니다. 통계량은 아래와 같습니다(Box-pierece 검정이라고도 합니다.). $$Q = n\sum_{j=1}^{h}\hat{\rho(j)^2}$$ $\hat{\rho(j)}$ 가 0에 가깝다면, $\hat{\rho(j)^2}$ 은 더욱 0에 가까울 것이지만, 몇몇 $\hat{\rho(j)}$ 의 절대값이 크다면, 그 항들에 영향을 받아 전체적인 Q값도 커지게 될 것입니다. > h는 lag입니다. h를 무리하게 크게 잡는다면, Q값은 커질 위험이 있습니다. 적당한 h를 잡는 것이 중요합니다. 귀무가설은 시차 h에 대한 자기 상관이 0이라는 귀무가설을 검정합니다. 통계량이 지정된 임계값보다 크면 하나 이상의 시차에 대한 자기 상관이 0과 유의하게 다르며, 일정 기간 랜덤 및 독립적이지 않음을 뜻합니다. 아래는 좀 더 refined된 통계량으로 Ljung-Box 라 합니다. $$Q = n(n+2)\sum_{k=1}^{h}\hat{\rho(k)}/(n-k)$$ 그 밖에, turning point test, difference-sign test, rank test, fitting an autoregressive model, checking for normality등이 있습니다.
이상으로, <Introduction to Time Series and forecasting 리뷰) 1. Introduction to Time Series> 포스팅을 마치겠습니다.
  1. Strict Stationarity vs. Weak Stationarity, https://blog.naver.com/sw4r/221024668866
  2. 고려대학교 김성범 교수님 <예측모델> 수업자료</li>
  3. portmanteau 검정 : https://otexts.com/fppkr/residuals.html</li></ol>

RDD, Resilient Distributed DataSet에 대하여[1]

|

이번 포스팅은 “빅데이터 분석을 위한 스파크2 프로그래밍 - Chaper2. RDD” 를 읽고 정리하였습니다. 정리 순서는 책 순서와 동일하고, 책을 읽어가면서 이해가 안되는 부분을 추가적으로 정리하였습니다.

2.1 RDD

2.1.1 들어가기에 앞서

RDD를 공부하기 전 기억하고 넘어가야 할 것들에 대해 정리하였습니다.

1. 스파크 클러스터

클러스터란 여러 대의 서버가 마치 한대의 서버처럼 동작하는 것을 뜻합니다. 스파크는 클러스터 환경에서 동작하며 대량의 데이터를 여러 서버에서 병렬 처리합니다

2. 분산 데이터로서의 RDD

RDD는 Resilient Distrubuted Datasets으로, ‘회복력을 가진 분산 데이터 집합’이란 뜻입니다. (Resilient : 회복력이 있는) 데이터를 처리하는 과정에서 문제가 발생하더라도 스스로 복구할 수 있는 것을 의미합니다. 이는 그 다음 설명 트랜스포메이션과 액션지연(lazy) 동작과 최적화 부분과 함께 다시 설명드리도록 하겠습니다.

3. 트랜스포메이션과 액션

RDD가 제공하는 연산은 크게 트랜스포메이션과 액션이 있습니다. “연산”은 흔히 “메서드”로 이해하시면 됩니다.
트랜스포메이션은 RDD의 변형을 일으키는 연산이고, 실제로 동작이 수행되지는 않습니다.

그림 1. RDD 예시

그림 2.RDD 예시(2)

아래 예시를 보면, 데이터를 읽어 RDD를 생성해서 file변수에 저장한 뒤, flatMap -> map -> reduceByKey 함수를 거치면서 RDD[2], RDD[3], RDD[8]을 새로 생성하는 것을 볼 수 있습니다. 이렇게 transformation을 이전 RDD를 변형해서 새로운 RDD를 생성하는 것입니다.

반면에, action은 동작을 수행해서 원하는 타입의 결과를 만들어내는 것이므로, saveAsTextFile로 수행됩니다. 따라서, saveAsTextFile은 action 연산에 해당됩니다.

4. 지연 동작과 최적화

지연 동작이란, 액션 연산이 수행되기 전까지 실제로 트랜스포메이션 연산을 수행하지 않는 것입니다. 이는 RDD의 특성 중 하나인 ‘회복력’과 관련있습니다. 액션 연산이 수행되기 전까지 동작이 지연이 되는데, 대신에 RDD가 생성되는 방법을 기억하는 것입니다. 따라서 문제가 발생하더라도 기존에 RDD가 생성되는 방법을 기억하여 연산 수행에 문제가 없도록 하는 것입니다. 이는 위의 예시에서 reduceByKey까지는 실제로 트랜스포메이션 연산을 수행하는 것이 아니라 해당 연산을 순서대로 기억해놨다가, saveAsFile연산이 수행될 때(액션 연산이 수행될 때) 비로소 트랜스포메이션 연산도 수행된 것입니다.

지연 동작 방식의 큰 장점은 실행계획의 최적화입니다.

RDD의 불변성

오류로 인해 스파크의 데이터가 일부 유실되면, 데이터를 다시 만들어내는 방식으로 복구되는 것이 RDD의 불변성입니다. 이는 위에서 계속 언급한 “회복력”과 관련됩니다.

RDD는 RDD1->RDD2-> … 가 되면서 한번 만들어진 RDD는 내용이 변경되지 않습니다. RDD를 만드는 방법을 기억해서 문제가 발생 시 언제든지 똑같은 데이터를 생성할 수 있습니다.

5. 파티션과 HDFS

  • RDD데이터는 클러스터를 구성하는 여러 서버에 나뉘어서 저장됨
  • 이 때, 분할된 데이터를 파티션 단위로 관리합니다.
  • HDFS는 하둡의 파일 시스템(hadoop distributed file system)
  • 스파크는 하둡 파일 입출력 API에 의존성을 가지고 있음.

6. Job, Executor, 드라이버 프로그램

  • Job : 스파크 프로그램 실행하는 것 = 스파크 잡(job)을 실행하는 것
  • 하나의 잡은 클러스터에서 병렬로 처리됨
  • 이 때, 클러스터를 구성하는 각 서버마다 executor라는 프로세스가 생성
  • 각 executor는 할당된 파티션 데이터를 처리함
  • 드라이버란 ? 스파크에서 잡을 실행하는 프로그램으로, 메인함수를 가지고 있는 프로그램
  • 드라이버에서 스파크 컨테스트를 생성하고 그 인스턴스를 포함하고 있는 프로그램
  • 스파크컨테스트를 생성해 클러스터의 각 워커 노드들에게 작업을 지시하고 결과를 취합하는 역할을 수행
  • 아래 코드를 보면, main함수 안에 sparkcontext를 생성하고 sc라는 인스턴스를 포함하고 있는 것을 볼 수 있음. 즉, main함수를 가지고 있는 프로그램이 ‘드라이버’에 해당됨
Public static void main(String[] args){
	...
	JavaSparkContext s c = getSparkContext("WordCount", args[0]);
	...}

7. 함수의 전달

  • 스파크는 함수형 프로그래밍 언어인 스칼라로 작성되어 “함수”를 다른 함수의 “매개변수”로서 전달 가능
  • 아래 예제(Scala)를 보면 map의 인자에 ‘_+1’이 전달되는데, 익명 함수로 전달되는 것임
val rdd1 = sc.paralleize(1 to 10)
val rdd2 = rdd1.map(_+1)
  • 파이썬으로 작성하면 아래와 같이, lambda 함수가 매개변수로 들어가게 됨
rdd1.map(lambda v:v+1)

[참고]함수형 프로그래밍

함수형 프로그래밍과 객체 지향 프로그래밍의 차이를 통해 이해해보겠습니다. 객체 지향 프로그래밍은 객체 안에 상태를 저장하고, 해당 상태를 이용해서 제공할 수 있는(메소드)를 추가하고 상태변화를 ‘누가 어디까지 볼 수 있게 할지’를 설정하고 조정합니다. 따라서 적절한 상태 변경이 되도록 구성합니다. 반면에 함수형 프로그래밍은 상태 변경을 피하며 함수 간의 데이터 흐름을 사용합니다. 입력은 여러 함수들을 통해 흘러 다니게 됩니다. 따라서, 함수의 인자로 함수가 들어오고 반환의 결과로도 함수가 나올 수 있습니다.

함수 전달 시 유의할 점

Class PassingFunctionSample{
	val count=1
	def add(I: int):Int={
	count+i
	}
	
	def runMapSample(sc:SparkContext){
	val rdd1 = sc.parallelize(1 to 10);
	val rdd2 = rdd1.map(add)}
	}

위와 같이 코드를 작성해서 실행하면, ‘java.io.NotSerializaionException’이라는 오류가 발생합니다. 이는 전달된 add함수가 클러스터를 구성하는 각 서버에서 동작할 수 있도록 전달되어야 하는데, 전달이 안되기 때문입니다. 그 이유는 add함수는 PassingFunctionSample의 메소드로 결국 클래스 PassingFunctionSample이 전체 다 전달되기 때문입니다. 해당 클래스는 Serializable 인터페이스를 구현하지 않습니다. 즉, 클래스가 각 서버에 전달될 수 있는 기능을 가지고 있지 않는 것입니다. 함수만 따로 전달되어야 하는 것입니다.

스칼라 같은 경우 ‘싱글톤 객체’를 이용하여 해결 할 수 있습니다. 파이썬의 예제도 살펴보면, 아래는 클래스 전체가 전달되는 잘못된 예입니다.

class PassingFunctionSample():

    def add1(self, i):
        return i + 1

    def runMapSample1(self, sc):
        rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
        rdd2 = rdd1.map(self.add1) 
        # rdd2 = rdd1.map(add2)
        print(", ".join(str(i) for i in rdd2.collect()))

self로 인해 전체 클래스가 전달됩니다.(파이썬은 예외없이 실행되므로 유의할 것!)

class PassingFunctionSample():

    @staticmethod
    def add1(self, i):
        return i + 1

    def runMapSample1(self, sc):
        rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
        rdd2 = rdd1.map(add2)
        print(", ".join(str(i) for i in rdd2.collect()))


if __name__ == "__main__":

    def add2(i):
        return i + 1

    conf = SparkConf()
    sc = SparkContext(master="local[*]", appName="PassingFunctionSample", conf=conf)

    obj = PassingFunctionSample()
    obj.runMapSample1(sc)

위와 같이 함수 add2가 독립적으로(클래스 전체가) 전달될 수 있도록 해야합니다.

변수 전달 시 유의할 점

class PassingFunctionSample {

	var increment = 1

  def runMapSample3(sc: SparkContext) {
    val rdd1 = sc.parallelize(1 to 10)
    val rdd2 = rdd1.map(_ + increment) \\익명함수 전달
    print(rdd2.collect.toList)
  }

  def runMapSample4(sc: SparkContext) {
    val rdd1 = sc.parallelize(1 to 10)
    val localIncrement = increment
    val rdd2 = rdd1.map(_ + localIncrement)
    print(rdd2.collect().toList)
  }
}

runMapSample3 처럼 변수가 직접 전달되면 안되고, runMapSample4처럼 지역변수로 변환해서 전달해야 합니다. 그래야 나중에 변수가 변경되어 생기는 문제를 방지할 수 있습니다.

데이터 타입에 따른 RDD 연산

RDD 연산 함수에서 인자 타입을 보고 적절하게 맞는 연산 함수를 사용해야 합니다.


이상으로 <RDD, Resilient Distributed DataSet에 대하여[1]> 마치겠습니다. 다음 포스팅에서 이어가도록 하겠습니다.


  1. 함수형 언어, https://sungjk.github.io/2017/07/17/fp.html, https://docs.python.org/ko/3/howto/functional.html

스파크(Spark) 소개

|

데이터 분석가의 역할이 점차 데이터 분석 영역에서 벗어나 조금씩 엔지니어의 영역까지 요구되고 있는 것 같아, 데이터 엔지니어링 관련 공부에 대한 필요성을 느꼈습니다. 데이터 분석을 직업으로 함에도 불구하고, 하둡이나 스파크 등과 같은 빅데이터 분석 쪽은 많이 접해보지 못했습니다. 따라서 이번 포스팅을 시작해서 해당 관련 포스팅을 틈틈히 올리도록 하겠습니다.

먼저, 교재 <빅데이터 분석을 위한 스파크2 프로그래밍, 대용량 데이터 처리부터 머신러닝까지 - 백성민 저자>를 읽고 포스팅을 진행하도록 하겠습니다. 본 포스팅은 <ch1. 스파크 소개> 에 대해 다뤘습니다.


빅데이터의 등장 및 정의

  • 4차 산업 혁명과 더불어 데이터 처리 기술이 발달함
  • 빅데이터를 크기(volume), 다양성(variety), 속도(velocity)를 이용해서 초기에 정의 내림
    • 다양한 형태를 지닌 대량의 데이터가 빠른 속도로 쌓이고 있다면 이를 빅데이터라 부름
  • 그러나 추후에 가변성(variability), 정확성(veracity), 복잡성(complexity), 시인성(visibility)가 추가됨

스파크

스파크는 하둡의 단점이 개선되어 나온 것으로, 하둡은 아래와 같음

하둡이란?

  • 구글이 대용량 처리와 관련된 두 개의 논문을 더그 커팅이 실제 구현하면서 시작된 아파치 프로젝트임
  • 분산 환경의 병렬처리 프레임워크로, 분산 파일 시스템인 HDFS와 데이터 처리를 위한 맵리듀스 프레임워크로 구성됨
    • 이후에 CPU, 메모리 등 컴퓨팅 자원 관리를 전담하는 리소스 관림 시스템인 Yarn을 포함해, 기존 맵리듀스 프로그래밍 모델을 Yarn 기반으로 구축 관리할 수 있게 함
  • 여러대의 서버를 하나의 클러스터로 구성하여, 클러스터 컴퓨팅 환경 제공
  • 기본적인 동작 방법은, 데이터는 HDFS에 저장하고 데이터 처리를 맵리듀스 프로그래밍을 이용하여 HDFS 위에서 수행
  • HDFS?
    • 하나의 네임노드와 여러 데이터노드로 구성됨
    • 하나의 네임노드가 다른 데이터노드를 관리하는 형태로 시스템이 돌아감.
    • 전체 데이터는 일정한 크기(블록)로 나눠서 여러 데이터 노드에 분산되어 저장하고, 각 블록이 어디에 저장되어 있는지에 대한 메타 정보가 네임노드에 저장됨

그림 1. HDFS

  • 맵리듀스 프레임워크?
    • 데이터 처리 프레임워크로 데이터를 여러 개의 맵 프로세스와 리듀서 프로세스로 나눠서 처리하는 방식
    • 위 그림에서 맵리듀스 잡이 실행되면 네임노드로부터 메타정보를 읽어서 데이터 위치를 확인하고, 데이터를 처리함
    • 맵 프로세스는 여러 데이터 노드에 분산 저장된 데이터를 각 서버에서 병렬로 나누어 처리하며, 리듀서는 그러한 맵 프로세스들의 결과를 조합해 최종 결과를 만들어냄
    • 그러나, 하둡의 맵리듀스 잡은 대부분의 연산 작업을 파일시스템 기반으로 처리하기 때문에 데이터 처리 성능이 떨어짐
    • 반면에, 스파크는 메모리를 이용한 데이터 처리 방식을 제공함으로써 높은 성능을 보여줌

스파크란?

  • 스파크는 하둡 기반의 맵리듀스 작업의 단점을 보완한 것으로, 데이터를 메모리 기반으로 처리하기 때문에 처리 성능이 향상됨
  • 또한 작업을 실행하기 전에 최적의 처리 흐름을 찾는 과정을 포함함
  • 맵리듀스에 비해 훨씬 다양한 데이터 처리 함수 제공

RDD, 데이터프레임, 데이터셋 소개와 연산

  • 스파크 프로그램 내에서 ‘데이터’를 표현하고 처리하기 위한 프로그래밍 모델로, RDD, 데이터프레임(Dataframe), 데이터셋(Dataset)이 있음
    • 데이터프레임과 데이터셋은 RDD가 보완되서 나온 것이므로, RDD가 기본임

RDD

  • RDD란, resilient distributed dataset으로, 병렬 처리가 가능한 요소로 구성되며 데이터를 처리하는 과정에서 일시적인 문제가 발생하더라도 스스로 에러를 복구할 수 있는 능력을 지닌 데이터 모델임
  • RDD에 속한 요소들은 파티션이라는 단위로 나눠질 수 있는데, 스파크는 데이터 처리 작업을 수행할 때, 파티션 단위로 나뉘어서 병렬 처리함. 하둡에서 맵 프로세스가 여러 데이터 노드에 분산 저장된 데이터를 각 서버에서 병렬 처리하는 것과 유사하다고 생각함
  • 기존 RDD는 덧셈 연산, 그룹화 연산등을 적용 시, 파티션에 속한 데이터들이 네트워크를 통해 다른 서버로 이동하는 셔플링이 발생될 수 있음

  • 그런데, 작업 도중에 장애가 발생하여 결과가 유실될 수 있으나, 스파크 같은 경우는 RDD의 생성 과정을 기록해 뒀다가 다시 복구해 주는 기능을 가지고 있음. 이를 수행 하기 위해선 한번 생성된 RDD는 바뀌지 않아야 하는 조건이 있음
    • 이렇게 스파크에서 RDD 생성 과정을 기록해 둔 것을 리니지(lineage)라고 함

DAG

  • 맵 리듀스 작업으로 데이터 처리 시, 데이터 처리 방법의 너무 복잡한 경우나 데이터의 크기가 너무 큰 경우 서버의 가용량을 넘겨서 문제가 발생하는 경우가 발생
  • 따라서, 한번의 작업으로 모든 것을 끝낸다기 보다는 하나의 작업을 여러 개의 작은 작업으로 나눠 놓고 각 작업을 최적화해서 일련의 순서대로 나누어 실행하는 경우가 종종 있음
  • 일련의 작업 흐름을 나타내는 워크플로우는 DAG(Directed acyclic graph)를 구성하고, 이를 이용해 일련의 작업을 수행하면 다양한 라이브러리를 연동해서 데이터 처리를 수행 가능
    • DAG? 란 꼭짓점과 방향성을 가진 엣지로 구성된 그래프 모델임
    • 빅데이터는 DAG를 이용해 복잡한 일련의 작업 흐름을 나타냄
    • 예) 우지 워크플로우 오픈소스

스파크 내에서의 DAG

  • 스파크에서 DAG 처리를 담당하는 부분 : DAG스케쥴러
  • 스파크는 전체 작업을 스테이지로 나누고, 다시 여러개의 태스크로 나눠서 수행
  • 드라이버의 메인 함수에서는 스파크 애플리케이션과 스파크 클러스터 연동을 담당하는 스파크 컨텍스트가 있음. 스파크 컨텍스트를 이용해 잡을 실행하고 종료하는 역할을 함
    • 드라이버란 ? RDD를 생성하고 각종 연산을 호출하는 프로그램
  • 드라이버가 스파크 컨텍스트를 통해 RDD의 연산 정보를 DAG스케쥴러에 전달하면 스케쥴러는 이 정보를 가지고 실행계획을 수립. 그 후, 클러스터 매니저에게 전달함
  • DAG스케쥴러는 데이터에 대한 지역성을 높이는 전략과 관련된 것. 즉, 전체 데이터 처리 흐름을 분석해서 네트워크를 통한 데이터 이동이 최소화되도록 스테이지를 구성하는 것이 스케쥴링의 역할임
    • 데이터의 지역성이란 ? 데이터를 처리하는 장소로 따로 보내 처리하는 것이 아니라, 데이터가 저장되어 있는 서버(블록별로 저장되어 있음)에서 처리하는 것.
  • 데이터 이동이 최소화되도록 한다는 것은 네트워크를 통한 데이터 이동 즉 “셔플링”을 최소화하는 것임
  • 셔플링 관련 예시
  • 방법 1)
    1. 10대 서버에 저장된 모든 로그를 상품에 따라 재분류 한다.
    2. 상품별 총 판매건수를 계산한 후 원하는 상품의 정보만을 출력한다.
  • 방법 2)
    1. 전체 로그를 분류하기 전에 각 서버별로 원하는 상품 정보만 걸러낸 후 상품별 총 판매건수를 계산한다.
    2. 각 서버별로 따로 계산된 상품별 판매건수를 한 곳으로 모아서 더한 후 출력한다.
  • 방법 1의 경우, 10대 서버의 모든 로그 파일을 상품별로 재분류하는 것으로 작업을 시작하나, 동일한 상품번호를 가진 로그파일이 10대의 서버의 무작위로 흩어져 있음. 따라서, 재분류를 위한 “셔플링”이 대량으로 발생함
  • 방법 2의 경우, 셔플을 수행하기 전에 각 서버에 먼저 상품별 부분 집계를 수행한 후 집계된 결과 파일만을 대상으로 네트워크를 통한 2차 합계 연산을 수행하기 때문에, 방법 1에 비해 셔플이 적음
  • $\rightarrow$ 셔플이 최소화되는 방향으로 연산 수행해야 함

람다 아키텍쳐

  • 빅데이터 처리를 위한 시스템을 구성하는 방법 중 하나로, 네이선 마츠가 제안한 아키텍쳐 모델
  • 기존과 같은 대용량 데이터 처리 뿐만 아니라 실시간 로그 분석과 같은 실시간 처리도 중요해짐에 따라 두가지 요구를 충족시키는 아키텍쳐가 필요해졌음

그림 2. 람다 아키텍쳐[1]

  • 람다 아키텍쳐 운영 방법
  1. 새로운 데이터는 일괄 처리 계층(Batch layer)과 속도 계층(speed layer) 모두에게 전달됨
  2. 일괄 처리 계층(Batch layer)은 원본 데이터를 저장하고 일정 주기마다 한번 씩 일괄적으로 가공해서 배치뷰를 생성함. 이때 배치란, 특정시간마다 주기적으로 계산을 수행하는 것을 말함. 뷰라고 한 부분은 외부에 보여지는 데이터, 결과 데이터임
  3. 속도 계층은 들어오는 데이터를 즉시 또는 매우 짧은 주기로 철리해 실시간 뷰(Real-time view)를 생성함
  4. 서빙 계층(serving layer)은 실시간 뷰와 배치 뷰의 결과를 적절히 조합하여 사용자에게 데이터를 전달함. 물론 서빙 계층을 거치지 않고 배치뷰 또는 실시간뷰를 직접 조회(query)할 수 있음
  • 즉, 일괄 처리 작업을 통해 데이터를 처리하지만 아직 배치 처리가 수행되지 않은 부분은 실시간 처리를 통해 보완함

이상으로, 본 포스팅을 마치겠습니다. 다음 포스팅은 <RDD, Resilient Distributed DataSet에 대하여[1]> 에 대해 진행하도록 하겠습니다.


1. 람다 아키텍쳐, https://jhleed.tistory.com/122