-
[Spark] 튜닝, 디버깅, 그리고 개발자가 신경 쓰지 않는 것들Software Development/Big Data 2021. 3. 28. 20:00
스파크 튜닝과 클러스터 사이징
대부분의 스파크 세팅은 애플리케이션 수준에서만 조정이 가능하다.
이러한 세팅들은 작업 속도와 완료 여부에 큰 영향을 끼친다.
스파크의 기본 세팅은 작은 크기의 클러스터에서도 동작할 수 있도록 맞춰진 것이며 실제 업무용으로는 추천하지 않는다.
세팅들은 가용한 자원의 활용성을 높이고 작업을 완료할 수 있는 수준의 최적화를 위해 자주 변경될 것이다.
스파크는 환경 설정을 위해 여러 가지의 제어 항목을 제공하는데, 익스큐터의 메모리 부족 오류가 스파크 작업이 실패하는 일반적인 이유 중의 하나이다. 물론 데이터가 편향적이거나 셔플을 줄이는 등의 기술적인 부분에 집중하는 것이 최선이기는 하나, 메모리가 큰 익스큐터를 더 적게 쓰는 것이 실패를 막아 주기도 한다.
스파크 잡의 설정을 선택하는 것은 데이터 저장 솔루션의 크기와 세팅, 실행되는 잡의 크기(얼마나 많은 데이터를 처리하는가), 잡의 종류 등에 따라 다르다.
예를 들어 많은 데이터를 캐싱하고 반복적인 연산을 계속 수행하는 형태의 잡이라면 큰 규모의 셔플을 몇 번만 수행하면 되는 잡과는 다른 요구 사항을 가지게 된다.
또한 애플리케이션 튜닝은 팀의 상황에 따라서도 달라질 수 있다. 자원을 다른 곳과 공유하고 있는 상황이라면 최소한의 자원을 써서 작업을 완료하는 것 자체를 중요하게 생각할 수도 있다. 그렇지 않다면 가능한 만큼의 자원을 애플리케이션에 할당해서 가능한 최고의 성능을 내는 것이 목표일 것이다.
성능에 큰 영향을 끼치는 세팅을 어떻게 활용하는지에 대한 배경 지식과 조언들에 집중해서 알아본다.
애플리케이션을 제출할 시스템을 이미 갖고 있지만 애플리케이션이 더 빠르고 더 많은 데이터를 처리하도록 어떻게 설정할지를 고민 중인 것으로 가정한다.
스파크 세팅은 어떻게 설정하는가?
SparkContext 객체는(2에서의 SparkSession) 스파크 애플리케이션에 대한 연결을 표현한다. 이 객체는 시스템에서의 스파크 애플리케이션이 어떻게 설정되는지를 정의한 SparkConf 객체를 내부에 갖고 있다.
SparkConf는 모든 설정과 기본값, 스파크 애플리케이션의 동작을 관리하는 환경 정보 등을 가진다. 이 설정값들은 키/값 쌍으로 표현된다.
spark.executor.instances 속성을 5로 설정한다면 다섯 개의 익스큐터(스파크 JVM)로 잡을 제출한다는 의미다.
SparkContext 시작 전에 원하는 파라미터로 SparkConf를 생성할 수도 있다. 애플리케이션의 이름 같은 일부 속성들은 관련된 API를 따로 갖고 있기도 한다. 그렇지 않은 경우라면 SparkConf에 인자로 속성의 키/값 쌍을 받아들이는 .set() 메서드를 바로 호출할 수 있다.
제출할 때마다 스파크 애플리케이션의 설정을 다르게 하고 싶다면 실행 시점에 단순히 빈 SparkConf 객체를 만들어서 넘겨주면 된다.
자신의 클러스터에 대해 적절한 정보를 파악하는 법
스파크 애플리케이션이 다루는 주요한 자원은 CPU(코어 개수)와 메모리다. 스파크의 요청은 실행 중인 환경에서 쓸 수 있는 자원보다 더 많은 자원을 요구할 수 없다. 그러므로 일단 잡이 실행될 환경의 가용한 CPU와 메모리 상태를 파악하는 것이 중요하다.
- 하나의 요청이 얼마나 큰가?
- 대부분의 시스템은 각 요청에 제한을 두며 이는 각 익스큐터와 드라이버가 쓸 수 있는 자원의 양을 제한한다. YARN 클러스터 모드에서는 이는 YARN 컨테이너의 최대 크기가 된다. 각 스파크 익스큐터와 드라이버는 이 제한을 넘지 않고 맞아야 한다. YARN 클라이언트 모드라면 드라이버는 클라이언트에서 프로세스로 돌기 때문에 클러스터는 스파크 익스큐터들이 요구하는 자원들만 관리하면 되는데, 이는 드라이버에는 적용되지 않는다.
- 각 노드는 얼마나 큰가?
- 익스큐터의 숫자와 각 익스큐터에 할당되는 코어 개수를 결정할 때 하나의 익스큐터는 오직 하나의 노드에서만 자원을 쓸 수 있으므로 각 노드에 얼마나 많은 메모리와 CPU 자원을 쓸 수 있는지 아는 것이 중요하다. 각 노드에서 쓸 수 있는 메모리는 하나의 컨테이너가 쓸 수 있는 메모리보다 크거나 같다. 이 질문은 익스큐터의 수를 결정하기 위해서도 중요하다. 예를 들어 20GB짜리의 노드 세 개로 이루어진 클러스터를 갖고 있고 YARN 컨테이너의 최대치를 15GB로 설정한다면 네 번째 익스큐터는 여러 노드로 나누어져야 하므로 네 개의 익스큐터를 실행시킬 수는 없다.
- 클러스터는 얼마나 많은 노드를 갖고 있는가? 몇 개나 쓸 수 있는가?
- 일반적으로는 한 노드에 하나의 익스큐터를 실행하는 것이 최선이다. 얼마나 많은 노드가 가능한지 파악하는 것은 전체 자원량을 체크하는 데에도 도움이 된다.
- 잡이 제출되는 시스템에는 몇 퍼센트의 자원이 사용 가능한가?
- 만약 공유하는 환경의 클러스터라면 클러스터는 얼마나 바쁘게 돌아가는가? 잡은 하나의 큐에 제출되고 그 큐에 대해서 얼마나 많은 자원이 사용 가능한가? 주기적인 잡이라면 YARN API를(혹은 메소스) 호출하여 잡 제출 전에 가능한 자원량을 파악할 필요가 있을 것이다. 한 스파크 애플리케이션이 제출된 큐에 존재하는 자원보다는 많지만 전체 클러스터의 자원보다는 적은 자원량을 요구할 경우 이 잡은 실패하지는 않고 대기 상태에 걸릴 것이다.
- 큐마다 자원을 얼마나 사용할 수 있을지는 시스템이 어떤 종류의 스케줄러를 쓰냐에 따라 다르다.
- capacity 스케줄러 종류라면 각 사용자는 가능한 자원에서 고정된 비율만큼을 할당받게 된다.
- fair 스케줄러의 경우는 동작 중인 애플리케이션들에게 자원을 고루 할당해 준다.
- wikidocs.net/22936
기본적인 스파크 코어 세팅: 스파크 애플리케이션에 얼마나 많은 자원이 할당되는가?
SparkSession/SparkContext는 익스큐터들을 위한 JVM을 시작시킨다.
익스큐터들이 각 익스큐터에 할당된 코어들로 각 태스크를 실행한다. 각 익스큐터들에서 일부 자원은 계산을 위해 사용되지만 일부는 캐싱 데이터 저장을 위해서도 쓰인다. 드라이버의 규모, 익스큐터들의 규모, 각 익스큐터에 연계된 코어 개수들은 설정 객체에 의해 설정 가능하며, 스파크 애플리케이션의 동작 동안 고정적이다.
모든 익스큐터들은 동일한 크기를 가져야만 한다. 동적 할당을 사용한다면 스파크는 스테이지 사이에 일부 익스큐터의 종료를 요청할 수도 있다.
각 익스큐터와 드라이버의 규모, 각 익스큐터의 코어 개수 등은 질의의 데이터 사이즈에 관게없이 계속 고정된 상태이다.
그러므로 동적 할당을 써서 스파크에 연산을 위한 익스큐터를 추가할 수 있다 하더라도 스파크는 특별히 과도한 파티션을 연산하거나 환경이 허락하는 자원 이상의 규모의 익스큐터를 추가할 수는 없다.
각 세팅의 의미를 살펴보자.
스파크 설정 이름 의미 기본값 제한 가이드라인 spark.driver.memory 스파크 드라이버의 크기 1024MB YARN 클러스터 모드에서는 오버헤드를 포함해 YARN 컨테이너의 크기를 넘을 수 없다. 드라이버로 큰 사이즈의 RDD를 가져오거나 로컬 연산을 많이 수행하는 경우는 더 높이 설정할 필요가 있다. spark.executor.memory 각 스파크 워커의 메모리 크기 1024MB 하나의 익스큐터와 오버헤드의 합이 요청 1회의 제한보다 클 수 없다. 특히 균형이 안 맞는 셔플을 수행하는 경우 워커 사이즈가 크면 메모리 부족 오류를 방지할 수 있지만 성능은 떨어진다. spark.executor.cores 각 익스큐터에 할당될 가상 코어의 개수 1 YARN 컨테이너에서 쓸 수 있는 코어 개수보다 커질 수 없다. 다섯 개 전후가 좋다. 자원이 허락한다면 늘릴 수 있다. 모든 익스큐터와 드라이버에서 요구하는 총메모리 크기는 클러스터에서 가능한 메모리양보다 클 수는 없다. YARN 클라이언트 모드라면 드라이버는 클러스터의 자원을 쓰지는 않는다.
익스큐터와 드라이버의 메모리 오버헤드 계산
YARN 클러스터 모드와 클라이언트 모드에서 익스큐터 메모리 오버헤드와 드라이버 메모리 오버헤드는 모두 수동으로 설정할 수 있다.
익스큐터 메모리 오버헤드: spark.yarn.executor.memoryOverhead로 설정
YARN 클러스터에서 드라이버 메모리: spark.yarn.driver.memoryOverhead로 설정
YARN 클라이언트 모드: spark.yarn.am.memoryOverhead의 값을 설정
어느 쪽이든 이런 값들이 설정되지 않았을 때는 다음의 식에 따라 메모리 오버헤드를 얼마나 다룰지 결정한다.
메모리 오버헤드 = Max(MEMORY_OVERHEAD_FACTOR X 요청 메모리, MEMORY_OVER_MINIMUM)
MEMORY_OVERHEAD_FACTOR = 0.10
MEMORY_OVERHEAD_MINIMUM = 384MB
메모리 오버헤드 = Max(MEMORY_OVERHEAD_FACTOR X 요청 메모리, MEMORY_OVER_MINIMUM) MEMORY_OVERHEAD_FACTOR = 0.10 MEMORY_OVERHEAD_MINIMUM = 384MB
스파크 드라이버의 규모
대부분의 연산 작업은 익스큐터에서 이루어지므로 드라이버의 사이즈를 늘리는 것은 연산의 속도를 올리는 것에 별 도움이 되지 않는다.
하지만 드라이버로 많은 데이터를 가져오거나 로컬 연산이 큰 작업인 경우라면 작업이 실패할 가능성이 있다.
spark.driver.maxResultSize의 값을 설정하면 드라이버에서의 메모리 부족 오류를 막을 수 있다.
드라이버에서 메모리 오류를 일으키지 않는 가능한 가장 낮은 값을 설정하고 익스큐터에 가능한 최대의 자원을 할당한다.
YARN과 메소스 클러스터 모드에서는 드라이버가 spark.driver.core의 값에 따라 여러 개의 코어를 가질 수 있으며 멀티스레드 프로세스로 동작한다. 그렇지 않으면 하나의 코어만을 필요로 한다.
소수의 큰 익스큐터 VS 다수의 작은 익스큐터
임의의 클러스터와 애플리케이션에 대해 최적의 설정을 찾는 것 자체는 정확한 과학이 될 수 없다.
대신에 CPU나 메모리 측면에서 너무 크거나 작은 익스큐터의 현상을 어떻게 파악하는지에 대한 몇 가지 팁은 제공할 수 있다.
애플리케이션 설정에 대한 안정적인 추측과 더불어 잘못된 설정의 징후를 파악했을 때 작업을 수정하는 것에 대해 도움을 줄 수 있을 것이다.
- 다수의 작은 익스큐터
- 하나의 파티션이 처리할 만한 자원이 바닥나 버릴 위험이 있다. 하나의 파티션이 여러 익스큐터 위에서 계산될 수는 없기 때문에 각 파티션의 크기는 계산이 이루어지는 공간의 크기에 제한을 받게 마련이다. 셔플을 하거나 균형이 맞지 않은 데이터를 캐시하거나 비싼 비용의 좁은 트랜스포메이션을 수행할 때에 메모리 문제를 만나거나 디스크로 데이터가 넘칠 우려가 있다. 만약 익스큐터가 하나의 코어만 갖고 있다면 각 익스큐터에서는 최대 하나씩의 태스크만 수행이 가능하며, 각 익스큐터로 보내지는 브로드캐스트 변수의 이점은 얻지 못하게 된다.
- 너무 적으면 JVM을 공유하는 장점이 사라진다.
- spark에서 어떤 데이터를 broadcast 했을 때 동일한 JVM(= 동일한 executor)의 task들이 해당 데이터를 공유하는데 익스큐터 코어가 1이라면 익스큐터가 하나의 태스크만 실행하는 것이다. 즉 많은 executor들이 똑같은 데이터를 가지게 된다.
- 너무 적으면 JVM을 공유하는 장점이 사라진다.
- 자원을 효과적으로 쓰는 것에 그다지 도움이 되지 않는다는 것이다. 각 익스큐터는 약간씩의 오버헤드를 가지는데, 이로 인해 심지어 같은 노드 내의 익스큐터끼리의 통신에도 약간의 비용이 필요하다. 메모리 오버헤드에 대한 얘기에서 최소 오버헤드가 400MB 이하라도 얘기했던 것을 기억해 보자. 이는 1GB 익스큐터를 다수 갖고 있는 경우에 각 익스큐터가 연산을 제외하고 오버헤드를 위해서 클러스터의 거의 25% 수준의 공간을 써야 한다는 얘기가 된다. 자원이 허용된다면 익스큐터는 앞쪽의 공식에 사용했던 메모리 오버헤드 상수를 기준으로 최소 4GB 이상 되는 것이 좋다.
- 하나의 파티션이 처리할 만한 자원이 바닥나 버릴 위험이 있다. 하나의 파티션이 여러 익스큐터 위에서 계산될 수는 없기 때문에 각 파티션의 크기는 계산이 이루어지는 공간의 크기에 제한을 받게 마련이다. 셔플을 하거나 균형이 맞지 않은 데이터를 캐시하거나 비싼 비용의 좁은 트랜스포메이션을 수행할 때에 메모리 문제를 만나거나 디스크로 데이터가 넘칠 우려가 있다. 만약 익스큐터가 하나의 코어만 갖고 있다면 각 익스큐터에서는 최대 하나씩의 태스크만 수행이 가능하며, 각 익스큐터로 보내지는 브로드캐스트 변수의 이점은 얻지 못하게 된다.
- 다수의 큰 익스큐터
- 너무 큰 익스큐터는 노드에 익스큐터를 띄우는 것이 배낭 채우기 문제가 되므로 낭비가 될 수 있다.
- 모든 자원을 사용하면서 드라이버가 하나의 노드의 크기보다 작게 하려면 노드당 두 개 이상의 익스큐터를 가져야 한다. 예를 들어 네 개의 큰 노드만 존재하고 실행할 매우 작은 수준의 드라이버 메모리만 요구한다고 가정하자.
- 연산은 매우 큰 익스큐터와 익스큐터의 절반 크기의 드라이버를 띄우게 되면 드라이버 노드의 절반의 메모리는 쓰이지 않으므로 낭비가 된다.
- 너무 큰 익스큐터는 힙 사이즈가 클수록 GC가 시작되는 시점을 지연시켜 결과적으로 풀 GC로 인한 지연이 더욱 길어질 수 있다. 그리고 익스큐터당 많은 수의 코어를 쓰는 것은 동시 스레드가 많아지면서 스레드를 다루는 HDFS의 제한으로 인해 성능이 더 떨어질 수도 있다.
- HDFS I/O
- 샌디 라이자는 익스큐터당 다섯 개의 코어를 최대로 보아야 한다고 제안하다. CPU 자원 끼준으로 익스큐터의 개수를 정하고 각 노드의 CPU 코어를 대략 5로 나눔 - 익스큐터당 메모리를 익스큐터 개수 기준으로 설정하는 식으로 좋은 결과를 얻을 수 있다.
클러스터 자원 할당과 동적 할당
동적 할당은 애플리케이션이 진행되는 중간에 스파크 애플리케이션이 익스큐터를 요청하고 제거할 수 있는 방식.
애플리케이션이 자원이 사용 가능해질 때 자원을 가져다 쓰고 필요하지 않을 때 자원을 되돌려 주기 때문에 번잡한 클러스터에서 성능응 극적으로 끌어 올릴 수 있다.
다음 규칙들은 스파크가 동적 할당으로 익스큐터들을 추가하거나 제거하는 것을 관장한다.
- 첫번째, 멈춰 있는(pending) 태스크들이 있을 때 스파크는 추가 익스큐터를 요청한다.
- 두 번째, 스파크는 spark.dynamicAllocation.executorIdleTime에 설정된 시간 동안 연산하지 않은 익스큐터들은 제거한다(기본값 60초).
- 익스큐터가 한 번 제거되고 나면 캐시되었던 데이터들을 쓰기 위해 재연산이 필요하기 때문에 스파크 캐시 데이터를갖고 있는 익스큐터를 제거하지는 않는다.
- spark.dynamicAllocation.cachedExecutorIdleTimeout 기본값인 infinity 대신 다른 값을 설정하면 바꿀 수 있다. 이 경우 캐시 데이터를 가진 익스큐터들은 설정된 시간만큼 사용되지 않으면 제거될 것이다.
- spark.dynamicAllocation.initialExecutors의 값을 설정하여(기본값 0) 애플리케이션을 시작할 때 스파크가 몇 개의 익스큐터로 시작하게 할지 결정할 수도 있다.
- 클러스터 자원이 충분하고 비싼 비용의 작업을 실행한다면 이 값을 높이는 것을 추천한다. 그렇지 않으면 기본값인 0으로 그대로 두는 것이 애플리케이션이 자원을 점진적으로 늘리게 하므로 더 이득이 될 수 있다.
- 작업이 실행되는 동안 쓰이는 익스큐터의 최소 개수와 최대 개수를 위한 설정값들도 존재한다.
- 최대 개수에 대한 설정은 사용자가 작업을 제출했을 때 클러스터의 자원을 독차지하는 수준이 되지 않도록 주의를 기울여 설정하도록 한다.
동적 할당은 실행 중에 익스큐터의 크기 변경을 허용하지는 않으므로 작업 시작 전에 각 익스큐터의 사이즈를 결정해야 하는 것은 마찬가지다.
익스큐터들의 크기는 클러스터의 전체 자원을 쓰는 것처럼 가정하고 결정하는 것을 추천한다. 이렇게 하면 비싼 작업을 실행하고 스파크가 최대 개수의 익스큐터의 요청했을 때 자원들이 제대로 할당되고 남아도는 자원이 생기지 않는다.
한 가지 예외라면 트래픽이 매우 많은 클러스터의 경우다. 이 경우는 여유 자원이 단편적으로 생기므로 작은 익스큐터를 쓰는 것이 동적 할당 시 자원을 빠르게 가져오는 방법이다.
- 동적 할당의 제한
- 동적할당은 설정은 쉽지 않다. 동적 할당이 동작하게 하려면 아래의 것들이 필요하다.
- spark.dynamicAllocation.enabled를 true로 설정한다.
- 각 작업 노드마다 외부 셔플 서비스를 설정해 준다. 이는 클러스터 매니저마다 다르다.
- spark.shuffle.service.enabled 설정을 true로 한다.
- spark.executor.instances 설정에는 값을 주지 않는다. 심지어 동적 할당이 설정되어 있다 하더라도 스파크는 무시하고 이 값이 설정에 존재한다면 숫자만큼의 익스큐터만 사용할 것이다.
- spark.dynamicAllocation.enabled를 true로 설정한다.
- 동적할당은 설정은 쉽지 않다. 동적 할당이 동작하게 하려면 아래의 것들이 필요하다.
동적 할당을 쓰는 경우 셔플 서비스가 잘못 설정되어 있다면, 노드들은 익스큐터를 요청하는 메커니즘을 갖고 있지 않기 때문에 작업은 pending 상태에서 멈춰 있게 될 것이다. 클러스터에 자원이 충분한데 이런 현상이 보인다면 각 작업 노드에 셔플 서비스가 설정되어 있고 YARN 설정이 올바른 셔플 서비스에 대한 클래스 패스를 갖고 있는지 확인해 보아야 한다.
익스큐터의 공간 할당
익스큐터는 캐싱과 실행을 위한 공간을 각각 갖고 있는 JVM이다.
- 영역들이 고정적으로 나누어진 게 아니기 때문에 익스큐터의 메모리 사용 분할은 다이어그램 보다 더 복잡하다.
- spark.executor.memory로 설정하는 JVM 크기는 오버헤드를 포함하지 않는다. -> 스파크 익스큐터는 클러스터에 설정된 수치보다 더 많은 공간을 요구한다.
- 익스큐터 메모리 내에서는 일부 공간은 스파크의 내부 메타데이터와 사용자 자료 구조를 위해 미리 예약되어야 한다.(약 25%)
- 익스큐터의 남은 공간은 스파크 문서에서 M이라 부르고 실행과 저장을 위해 쓰이는데, 스파크 트랜스포메이션을 연산하기 위해 필요하다.
- 익스큐터에서 실행과 캐싱을 위한 공간은 고정된 비율값에 의해 결정되며 spark.memory.fraction 설정으로 공개되어 있다.
- 트랜스포메이션 실행 중의 메모리 부족 오류나 캐싱된 파티션이 공간이 모자라 디스크로 씌어지는 것은 보통 이 공간의 제한 때문에 발생한다.
- M의 기본값은 0.6이므로 익스큐터의 60%의 공간이 저장과 실행을 위해 쓰이는 것이다.
- M의 크기를 늘려서 내부 메타데이터 사용 공간을 줄이는 것도 가능하나, 내부 프로세스에 의한 메모리 부족 오류를 막아 주는 역할을 하기 때문에 위험해질 수 있다.
- M(spark.memory.fraction)의 공간 안에서 일부 공간은 '저장'용으로 쓰고 나머지는 저장과 연산 모두에 쓸 수 있다.
- 이 경우 저장 공간은 직렬화 여부에 상관없이 스파크 파티션들의 내부 메모리 저장소를 의미한다.
- 스파크는 저장 영역을 고정적으로 제공하지 않는 대신 메모리에 캐싱을 하지 않는 애플리케이션이라면 모든 메모리 영역을 실행용으로 쓸 수 있도록 한다.
- 실행 공간 내에서 스파크는 R이라고 불리는 캐시 데이터 저장을 위한 영역을 정의한다.
- R의 크기는 spark.memory.storageFraction에 설정되는 M의 퍼센티지값으로 결정된다.
- R은 캐시된 데이터가 존재하는 경우 스파크가 실행을 위해 회수하지 않는 공간이다.
- 어떤 파티션도 메모리에서 삭제되지 않도록 RDD를 캐싱하려면 모든 캐시 데이터는 R 안에 들어갈 수 있어야 한다.
- R은 다음 공식에 의해 결정된다.
- R = spark.executor.memory x spark.memory.fraction x spark.memorystrageFraction
다음 그림은 M과 R의 관계를 보여준다.
- 큰 사각형은 하나의 익스큐터를 의미
- 빗금 친 사각형이 R이며 오버헤드 아래의 전체 공간이 M이다.
- 그림에서 서로 다른 두 RDD가 캐싱되어 있다고 가정한다.
- 파티션들은 이 익스큐터에 캐시되어 있는데, 점무늬 사각형들은 캐싱 순서에 상관없이 가장 최근에 사용된 파티션들이다.
- 파티션들이 R 영역 안을 모두 쓰고 있지는 않으므로 하나의 큰 연산이 저장 영역을 쓸 수 있다.
동일한 애플리케이션에 다른 파티션을 캐싱한 잡이 포함되어 있다고 가정하자.
- 캐시된 파티션들이 R보다 큰 공간을 차지.
- 연산이 이 수준까지 공간을 사용하지 않는다면 이는 충분히 허용되는 상황
- 이 작업에서 점무늬 파티션을 사용하고 빗금 무늬 파티션이 가장 오래된 파티션
이제 이 익스큐터에서 큰 연산을 수행한다고 가정하면 결과는 다음 그림과 같다.
- 캐시된 데이터는 R영역에 존재할 수 있는 수준을 초과하므로 연산을 위한 공간 확보를 위해 부수적인 파티션들은 모두 축출된다.
- 스파크는 LRU 캐싱 정책을 따르므로 빗금친 파티션이 삭제된다.
메모리와 저장 영역 설정을 조정하는 것은 전적으로 수행하려고 하는 작업의 종류와 관계가 있다.
데이터를 캐싱할 생각이 없다면 어쨋든 모든 M 영역이 연산을 위해 쓰일 수 있으므로 큰 상관이 없다.
하지만 연산이 RDD에 반복적인 접근을 필요로 하고 RDD를 메모리에 캐싱하는 것으로 성능 향상이 기대된다면, 캐시된 RDD가 제거되는 것을 막기 위해 저장 영역 크기를 늘리는 것도 도움이 된다.
파티션 개수와 크기
RDD가 안정적인 저장소에서 데이터를 읽어서 생성될 때는 파티션 개수는 입력 포맷에 결정된 스플릿과 연관이 있다.
파티션 개수는 coalesce나 repartition을 쓰거나 reduceByKey나 sort 같은 넓은 트랜스포메이션이 실행되는 동안 바꿀 수 있다.
넓은 트랜스포메이션이 실행될 때 파티션 개수가 따로 지정되어 있지 않다면 spark.default.parallelism 설정값을 사용한다.
- spark.default.parallelism의 기본값은 애플리케이션이 실행되는 환경에 의존한다.
- YARN 클러스터 모드에서 이는 익스큐터 개수 x 코어 개수다(즉, 동시에 실행할 수 있는 태스크 개수).
이는 파티션 개수로 사용해야 하는 최솟값이지만 최적의 값은 아니다.
최솟값이라면 얼마나 많은 파티션을 써야 하는 것일까?
파티션 개수를 늘리는 것은 일정 수준 오버 헤드가 너무 많아지는 수준이 되기 전까지는 성능을 높여 준다. 총 코어 개수보다 적은 파티션을 쓰면 일부 CPU가 놀게 되므로 최소한 총코어 개수 이상의 파티션을 사용해야 한다.
파티션 개수를 늘리는 것은 각 익스큐터에서 스파크가 한 번에 처리하는 데이터양이 작아지므로 메모리 부족 오류를 줄이는 데도 도움이 된다.
파티션의 최대 크기를 결정하는 것도 하나의 전략이 될 수 있다. 가장 큰 파티션이 있고 이 크기가 하나의 태스크를 위한 공간에 들어맞는 크기라면 각 파티션이 이 크기를 초과하지 않도록 역으로 파티션의 개수를 설정할 수 있다.
익스큐터는 각 코어당 하나의 태스크를 실행할 수 있으며 한 파티션이 하나의 태스크와 연계된다.
- 익스큐터가 연산에 쓸 수 있는 공간은 캐시된 데이터의 양에 따라 M에서 M-R사이다.
- 연산에 쓸 수 있는 메모리(M) < (spark.executor.memory - 오버헤드) * spark.memory.fraction
- 만약 캐시된 데이터가 있다면
- 연산에 쓸 수 있는 메모리(M-R) < (spark.executor.memory - 오버헤드) * spark.memory.fraction * (1 - spark.memory.storage.fraction)
- 태스크들 간에 공간이 동일하게 나눠진다고 가정하면 각 태스크는 다음을 초과할 수 없다.
- 태스크당 메모리 연산에 쓸 수 있는 메모리 /spark.executor.cores
- 파티션 개수는 다음 공식에 의해 결정
- 파티션 개수 = 셔플 단계에서의 크기 / 태스크당 메모리
만약 파티션이 이 크기보다 크면 태스크 하나는 코어 하나만 쓸 수 있고, 동시에 실행할 수 있는 만큼의 태스크를 연산하지 못하게 되므로 CPU 자원이 놀게 된다. 이로 인해 메모리 부족 오류를 일으킬 가능성도 높아질 수 있다.
셔플의 메모리 비용을 계산하기 위해 셔플 단계를 살펴봐서 디스크 연산이 쓰지 않는 것을 확인했다면 각 파티션은 메모리에 잘 들어맞는 크기라고 볼 수 있다.
만약 작업이 디스크에 씌어진다면 이는 셔플 사이즈를 가늠해 보고 파티션의 개수를 조정해 볼 가치가 있다. '셔플 사이즈'란 것이 확실하게 가늠할 수 있는 정보가 아니다.
샌디 라이자 초과 셔플 메모리 수치와 초과 셔플 디스크 수치의 비율을 구하고 여기에 디스크의 데이터 사이즈를 곱하여 셔플 파일의 대략적인 수치를 구할 것을 제안.
- 초과 셔플 메모리: 디스크에 씌어지기 전에 메모리에서 차지하고 있는 레코드의 공간 크기를 말한다.
- 초과 셔플 디스크: 디스크에 넘쳐 쓰인 이후에 레코드들이 차지하는 공간의 크기다.
- 두 값의 비율은 디스크에서보다 메모리에서 얼마나 더 많은 공간을 쓰는지 파악하는 척도가 된다.
- 이 비율을 '메모리 확장 비율'이라 한다.
- 메모리 확장 비율 = 초과 셔플 메모리 / 초과 셔플 디스크
- 이 비율을 '메모리 확장 비율'이라 한다.
웹 UI의 stage 탭 초기 화면에서 셔플로 씌어지는 총크기를(Shuffle write) 볼 수 있다. 이는 디스크에 씌어진 셔플 파일의 크기이다.
메모리 파일 사이즈는 다음 공식으로 알 수 있다.
메모리 내의 셔플 크기 = Shuffle Write * 초과 셔플 메모리 / 초과 셔플 디스크
모든 레코드가 메모리로 읽어 들일 때 동일한 비율로 확장된다는, 틀린 가정에 기초한다.
단순히 성능 향상이 더 이상 이루어지지 않을 때까지 파티션 개수를 늘려 보는 것 이상의 대안은 아직 없다.
직렬화 옵션
RDD에는 자바 직렬화를 쓴다.
DataFrame/Dataset에는 텅스텐(Tungsten) 기반 직렬화를 쓴다.
DataFrame이나 Dataset를 쓰는 것이 훨씬 더 효율적인 직렬화 계층을 사용해 볼 수 있는 방법이지만, RDD로 작업한다면 크리오 직렬화를 쓰는 것을 고려해 볼 수 있다.
크리오
크리오 직렬화도 자바 직렬화가 가능한 모든 타입을 바로 지원하고 있지는 않다.
개선중이다.
몇몇 추가적인 디버깅 테크닉
분산 시스템이기 때문에 문제가 발생한 노드를 찾기가 어려우며 디버깅을 위해 작업 노드에 접근하는 것 자체도 쉽지 않다.
지연 평가 기술은 기존 시스템에 식숙한 개발자로 하여금 로그나 스택 트레이스만 훑어보게 함으로써 잘못된 방향으로 이끌 수도 있다.
스파크 셸 처럼 상호작용이 가능한 환경에서 부모 RDD나 DataFrame에 take(1)이나 count를 호출해서 어디가 문제인지 빨리 범위를 좁혀 나가야 한다.
그러나 서비스 중인 코드를 디버깅하는 것이라면 부분적으로 연산 시간이 크게 늘어날 여지가 있으므로 take(1)이나 count를 써 보는 것은 무리가 있다.
에러 스택 트레이스는 많은 정보를 담고 있지만 디버깅하기에는 대부분 도움이 되지 않는다.
에러가 작업 노드의 내부에서 오게 되므로 드라이버 스택 트레이스의 정보들은 무시해도 된다.
익스큐터에서 발생한 에러를 보면 실패한 함수와 관련된 라인 번호를 찾을 수 있다. 데이터와 반복자의 연계는 스파크 내부에서 이루어지므로 mapPartitions로 작업하거나 직접 만든 반복자를 되돌려 주는 게 아니라면 예외의 나머지 부분은 신경 쓰지 않아도 된다.
어떤 문장에서 에러가 발생했는지 판단하기 위해서는 특히 익명 내부 함수의 경우라면 편법을 써야 한다.
모든 예외가 org.apache.spark.SparkException으로 감싸지진 않는다.
존재하지 않는 하둡 입력으로 만들어진 RDD를 연산하려고 시도하면 해당 예외를 직접 되돌려 주는 좀 더 단순한 스택 트레이스를 볼 수 있다.
드라이버의 메모리 부족 오류
디버깅하는 것은 쉽지 않다.
collect문을 신경쓰자. -> 드라이버 프로그램에 방대한 결과를 줄 수 있는 countByKey
ML쪽에는 다른 모델로 교체하자.
드라이버 프로그램이 쓸 메모리를 할당하는 절차는 스파크 애플리케이션을 어떤 방식으로 제출하느냐에 따라 달라진다.
- 디스크 부족 오류
- 디스크 공간이 크지 않은 클러스터에서 발생할 수 있다.
- 디스크 부족 오류는 RDD가 최상위 레벨에서 만들어지거나 가비지 컬렉션이 일어나지 않는, 장시간 동작하는 셸 환경에 의해 발생한다.
- 해결책은 명시적으로 가비지 컬렉션을 실행해 주는 것이다.
- 체크포인팅을 시도하는 것도 RDD가 가비지 컬렉션 되는 데에 도움이 될 수 있다.
- 로깅
- log4j를 쓴다면 이미 셋업이 되어 있다는 걸 보장할 수 있으므로 좋은 선택이라 할 수 있다.
- 타입세이프의 scalalogging 패키지를 쓰면 스파크 내부 로깅과 유사한 기능에 접근할 수 있다.
- LazyLogging 제공.
- 디버깅 로깅의 간단한 예로는 필터링되는 데이터들이 어떤 것들이 잇는지 살펴보는 것
- LazyLogging 제공.
- 로깅 설정
- 스파크는 JVM 로깅에 log4j를 사용한다.
- conf/log4j.properties.template 파일로 로그 레벨과 출력을 바꿀 수 있다.
- 너무 출력이 많으면 ERROR로 기본 로깅을 ERROR로 바꿀 수 있다.
- log4j.properties가 요구 사항에 맞지 않는다면 자체적으로 log4j.xml을 작성해서 이를 익스큐터에 제공하고 spark.executor.extraJavaOptions에 -Dlog4j.configuration=log4j.xml을 추가해서 익스큐터가 파일을 가져갈 수 있도록 한다.
- 로그 열람
- 실행중이라면 웹 UI
- 종료되었다면 배포 메커니즘에 따라 다르다.
- 얀 배포 환경에서 로그 취합을 원하면 yarn logs 명령으로 로그를 가져올 수 있다.
- 디버거 연결
- 로그 파일, 어커뮬레이터, 각종 지표들이 애플리케이션 디버깅을 도와주지만 IDE 인터페이스가 도움이 될 때가 있다.
- 단순하게 스파크를 로컬 모드로 띄우고 로컬 JVM에 디버거를 연결해 주는것이다.
- 하지만 실제 클러스터에서의 모든 문제가 로컬 모드에서 재현될 수 없다.
- 원격 클러스터 디버깅을 위해 JWDP를 설정할 수 있다.
- 파이썬에서의 원격 디버깅은 디버깅 라이브러리 시작을 위해 코드 수정이 필요하다.
- 노트북에서 디버깅 하기
- 노트북 내에서 스파크를 시작하는 것은 설정 내을을 다루는 방식에 예상치 못한 영향을 주게 될 수도 있다.
- 가장 큰 차이점은 드라이버를 위한 설정을 다루는 방식이다.
- 파이썬 디버깅
- 오류의 원인을 찾기가 더 복잡한 이유는 파이썬과 JVM 사이의 데이터 교환 횟수를 줄이기 위해 파이스파크의 RDD들의 평가를 함께 엮어서 수행하기 때문이다.
- 표준 logging 라이브러리를 쓰자.
- 파이스파크에서 RDD 데이터 쏠림을 디버깅하는 것은 종종 간과하는 특징인 '배치 직렬화'때운에 더욱 어려운 일이 된다.
- 얀에서 파이썬을 쓰는 것은 메모리 오버헤드 오류를 일으킬 수 있다.
- 파이썬을 실행할 떄는 전체 파이썬 프로세스가 오버헤드 공간에 들어갈 수준이어야 한다.
- 이런 경우들은 에러 메세지가 몇몇 다른 경우들과 같게 나오므로 디버깅이 어렵다.
- 메모리 오류가 발생하는 첫 번째 원인은 파티션들의 불균형 또는 너무 크기 때문이다.
- 파티션들이 너무 크다면 파이썬 작업 노드들이 데이터를 불러오기에 충분한 공간이 없을 수도 있다.
- 웹 UI를 통해 파티션 크기를 확인 균등하지 않다면?
- 해결책 -> 파티셔닝을 새로하자.
- 두 번째 원인이 될 수 있는 것은 파이썬에 할당된 오버헤드가 충분하지 않은 것.
- 파이썬 작업 노드는 JVM이 나머지 공간을 쓴 후에 컨테이너에 남은 '오버헤드'공간만 사용할 수 있다.
- 기본 설정값이 384MB 혹은 전체 컨테이너의 10%인데 부족할 수 있다.
'Software Development > Big Data' 카테고리의 다른 글
[빅데이터 전문가의 하둡관리] 1. 하둡 소개 및 하둡의 주변 환경 (0) 2022.08.20 [스파크 완벽 가이드] 1. 아파치 스파크란 (0) 2022.05.08 [Spark] 효율적인 트랜스포메이션 (0) 2021.02.14 Hadoop WordCount 소스 코드 레벨에서 살펴보기 (0) 2020.07.01 hadoop wordcount 예제 eclilpse maven build 시 발생하는 오류 (0) 2020.07.01 - 하나의 요청이 얼마나 큰가?