Software Development/Big Data
[Spark] 효율적인 트랜스포메이션
루ㅌ
2021. 2. 14. 19:52
효율적인 트랜스포메이션
- 스파크 성능의 상당 부분은 데이터 변형 능력, RDD를 반환하는 연산에서 나온다.
좁은 트랜스포메이션 vs. 넓은 트랜스포메이션
- 트랜스포메이션이 넓은지 혹은 좁은지 어떻게 구분하는지, 이 구분 기준이 왜 평가와 성능에 영향을 끼치는지 알아본다.
- 넓은 트랜스포메이션 -> 셔플 요구, 좁은 트랜스포메이션 -> 셔플 요구 X
- 좁은 종속성의 트랜스포메이션: 부모 RDD의 각 파티션이 자식 RDD의 최대 하나의 파티션에 의해 사용되는 것[1]
- 자식 RDD의 종속성에 따라 좁고 넓은 종속성을 정의(2장) vs 부모 RDD의 종속성 위주로 좁고 넓은 종속성을 정의[1]
- 스파크 평가 엔진(DAG): 출력(액션) -> 입력 RDD로 실행 계획을 역으로 구축
- 좁은 종속성: 부모파티션은 오직 하나, coalesce(합체)의 경우 파티션을 줄일 때
- RDD를 평가할 때 연산을 완료하는 태스크의 개수가 왜 입력 파티션이 아니라 출력 파티션과 관계가 있는지 설명
- 스파크 평가 엔진(DAG): 출력(액션) -> 입력 RDD로 실행 계획을 역으로 구축
// 좁은 종속성. rdd에 map 연산으로 (x, 1)의 튜플로 만든다
val rdd2 = rdd1.map(x => (x, 1))
// 넓은 종속성, groupByKey
val rdd3 = rdd2.groupByKey()
- map 연산: 파티션끼리 데이터가 이동할 필요가 없음. 자식 파티션은 하나의 부모 파티션에만 의존
- groupByKey 연산: 반복자에 하나의 키 연계된 값들만 연결 필요 -> 동일한 키 레코드들이 동일한 파티션으로 모여야함 -> 자식 파티션은 다수의 부모RDD의 다수의 파티션으로 만들어짐
성능에 대한 고려 사항
- 좁은 종속성은 파티션간 데이터의 이동을 요구 X -> 드라이버와 통신이 필요 없음 -> 드라이버가 보낸 명령을 파티션에서 바로 실행
- 좁은 트랜스포메이션들의 각 모음이 하나의 스테이지 안에서 연산 가능
- 넓은 종속성의 트랜스포메이션들은 파티션간의 데이터 이동이 필요 -> 이후 연산은 셔플이 완료전까지 시작되지 않음
- 정렬은 모든 레코드들이 정의된 순서에 따라 정렬이 돼야함 -> 좁은 트랜스포메이션으로 해결할 수 없음
- 셔플은 데이터 이동과 디스크 입출력을 요구하기 때문에 비싸고 병렬 수행도 제한된다.
장애 내구성의 고려 사항
- 넓은 종속성에서 한 파티션의 실패 -> 재연산 필요한 파티션이 훨씬 많음 -> 많은 비용을 요구
- 넓은 트랜스포메이션들을 연속으로 놓고 실행하는 것은(만약 메모리 오류를 일으킬 확률이 높다면) 높은 부하의 재연산을 불러올 수도 있다. -> 재연산 비용이 너무 높은 경우 RDD 체크포인팅 고려
coalesce의 특별한 경우
- 파티션 개수를 바꾸고자 할 때 사용
- 파티션 개수를 줄이면 -> 좁은 트랜스포메이션
- 파티션 개수를 늘리면 -> 부모 파티션은 다수의 자식 파티션에 의존
내 트랜스포메이션은 어떤 타입의 RDD를 반환하는가?
- RDD는 거의 모든 타입의 레코드를 가질 수 있다.
- RDD 구현체에 동일한 트랜스포메이션이 호출되어도 다르게 평가 될 수 있다.
- 스파크 프로그램이 복잡해지면서 난이도도 크게 증가할 수 있다.
- 레코드 타입 정보는 스파크 API의 암죽적인 변환으로 인해 어려움이 생길 수 있다.
- 제네릭 타입을 쓰지 않는 것이 포인트
- Dataframe을 RDD로 작업할 때 정보를 타입 정보를 잃어버릴 수 있다.
- 정확한 타입으로 캐스팅해야 한다. Dataframe의 스키마를 변수에 바인딩
- Dataset API 강타입이라 RDD 변환한 뒤에도 타입 정보가 유지
객체 생성 최소화하기
- GC 직렬화에 걸리는 시간을 늘리면서 심각하게 느려질 수 있다.
- 객체의 크기나 숫자를 줄여 GC에 대한 비용 최소화
- 기존 객체 재활용 및 기본 타입 사용
기존 객체 재활용하기
- GC 직렬화에 걸리는 시간을 늘리면서 심각하게 느려질 수 있다.
- 객체의 크기나 숫자를 줄여 GC에 대한 비용 최소화
class MetricsCalculator(
val totalwords : Int,
val longestWord: Int,
val happyMentions: Int,
val numberReportCards: Int) extends Serializable {
def sequenceOp(reportCardContent: String) : MetricsCalculator = {
val words = reportCardContent.split(" ")
val tW = words.length
val lW = words.map(w => w.length).max
val hM = words.count(w => w.toLowerCase.equals("happy"))
new MetricsCalculator(
tW + totalWords
Math.max(longestWord, lW)
hM + happyMentions
numberReportCards + 1)
}
def compOp(other: MetricsCalculator) : MetricsCalculator = {
new MetricsCalculator(
this.totalWords + other.totalWords,
Math.max(this.longestWord, other.longestWord),
this.happyMentions + other.happyMentions,
this.numberReportCards + other.numberReportCards)
)
}
def toReportCardMetrics =
ReportCardMetrics(
longestWord,
happyMentions,
totalwords.toDouble/numberReportCards)
}
- 객체를 재사용하지 않는 클래스
- 두번의 map과 한 번의 reduceByKey 메서드를 사용하는 것보다 좋다.
- 그러나 combine 단계마다 새로운 인스턴스 생성
class MetricsCalculator(
val totalwords : Int,
val longestWord: Int,
val happyMentions: Int,
val numberReportCards: Int) extends Serializable {
def sequenceOp(reportCardContent: String) : MetricsCalculator = {
val words = reportCardContent.split(" ")
totalwords += words.length
longestWord = Math.max(longestWord, word.map(w => w.length).max)
happyMentions += words.count(w => w.toLowerCase.equals("happy"))
numberReportCards += 1
this
}
def compOp(other: MetricsCalculator) : MetricsCalculator = {
totalWords += other.totalWords,
longestWord = Math.max(this.longestWord, other.longestWord),
happyMentions += other.happyMentions,
numberReportCards += other.numberReportCards
this
}
def toReportCardMetrics =
ReportCardMetrics(
longestWord,
happyMentions,
totalwords.toDouble/numberReportCards)
}
- 객체를 재사용하는 클래스
더 작은 자료 구조 사용하기
- 스파크 시간과 공간 양쪽으로 최적화하는 중요한 수단 -> 사용자 정의 클래스가 아니라 기본 타입 사용하는 것
- 가독성은 떨어질 수 있지만, GC 오버헤드를 줄이는 데 효과적이다.
- 스칼라 배열은 컬렉션 타입 중 메모리 효율이 가장 뛰어나다.
object MetricsCalculator_Arrays extends Serializable {
val totalwordIndex = 0
val longestWordIndex = 1
val happyMentionsIndex = 2
val numberReportCardIndex = 3
def sequenceOp(reportCardMetrics : Array[Int],
reportCardContent : String) : Array[Int] = {
val words = reportCardContent.split(" ")
reportCardMetrics(totalwordIndex) += word.length
reportCardMetrics(longestWordIndex) = Math.max(
reportCardMetrics(longestWordIndex),
word.map(w => w.length).max)
reportCardMetrics(happyMentionsIndex) += word.count(
w => w.toLowerCase.equals("happy"))
reportCardMetrics(numberReportCardIndex) += 1
reportCardMetrics
}
def combOp(x : Array[Int], y : Array[Int]) : Array[Int]) = {
x(totalwordIndex) += y(totalwordIndex)
x(longestWordIndex) += Math.max(x(longestWordIndex), y(longestWordIndex))
x(happyMentionsIndex) += y(happyMentionsIndex)
x(numberReportCardIndex) += y(numberReportCardIndex)
x
}
def toReportCardMetrics(ar : Array[Int]) : ReportCardMetrics =
ReportCardMetrics(
ar(longestWordIndex),
ar(happyMentionsIndex),
ar(totalwordIndex)/ar(numberReportCardIndex),
}
- 시퀀스와 콤바인 연산을 문자열과 배열에 대한 함수로 정의하여 동일한 가독성의 코드 사상을 유지할 수 있다.
- 함수 내에서 중간 객체 생성을 피하는 것도 이득을 얻을 수 있다.
- 타입 간의 변환이 중간 객체를 만들게 된다. 암족적인 변환이 운 나쁘게 성능에 영향을 끼치는 또 다른 부분이다.
mapPartitions로 수행하는 반복자-반복자 트랜스포메이션
- mapPartitions 함수는 레코드들의 반복자를 받아 또 다른 반복자로 출력하는 함수를 인자로 받는다.
- 한 파티션의 데이터를 대상으로 임의의 코드를 정의할 수 있게 해주기에 스파크의 강력한 함수 중 하나다.
반복자-반복자 트랜스포메이션이란?
- 스칼라 반복자 객체:
- 컬렉션은 아니다. 컬렉션 내의 원소들에 하나씩 접근 할 수 있다.
- 변경 불가능한 객체
- 동일한 원소에 두 번 이상 접근할 수 없음.
- 스파크 트랜스포메이션과 달리 반복자의 트랜스포메이션은 병렬적으로 실행되지 안혹 한 번에 한 개씩 순차적으로 실행
- 컬렉션 객체로 변환하는 것은 반복자-반복자 트랜스포메이션의 이점을 없앤다.
시간적/공간적인 이득
- 스파크가 선택적으로 데이터를 디스크에 쓸 수 있다.
- 스파크가 하나의 익스큐터에서 메모리에 담기에는 너무 큰 파티션들을 메모리 에러 없이 처리해 준다.
- 중간 데이터용 자료 구조를 정의하지 않아도 된다.
- 큰 크기의 중간 자료 구조를 쓰는 횟수를 줄이는 것은 필요 없는 객체 생성을 피하는 방법이다.
- GC 속도가 늦어지는 위험도 피할 수 있다.
- 큰 크기의 중간 자료 구조를 쓰는 횟수를 줄이는 것은 필요 없는 객체 생성을 피하는 방법이다.
예제
private def findTargetRanksIteratively(
sortedAggregatedValueColumnPairs : RDD[((Double, Int), Long)],
ranksLocations : Array[(Int, List[(Int, Long)])]): RDD[(Int, Double)] ={
sortedAggregatedValueColumnPairs.mapPartitionsWithIndex((partitionIndex: Int,
aggregatedValueColumnPairs : Iterator[((Double, Int), Long)]) =>{
val targetsInThisPart: List[(Int, Long)] = ranksLocations(partitionIndex)._2
if (targetsInThisPart.nonEmpty) {
FindTargetsSubRoutine.asIteratorToIteratorTransformation(
aggregatedValueColumnPairs,
targetsInThisPart)
} else {
Iterator.empty
}
})
}
- mapPartitions 예제
def withArrayBuffer(valueColumnPairsIter : Iterator[((Double, Int), Long)],
targetsInThisPart: List[(Int, Long)] ): Iterator[(Int, Double)] = {
val columnsRelativeIndex: Predef.Map[Int, List[Long]] =
targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))
val columnsInThisPart: List[Int] = targetsInThisPart.map(_._1).distinct
val runningTotals : mutable.HashMap[Int, Long]= new mutable.HashMap()
runningTotals ++= columnsInThisPart.map(columnIndex => (columnIndex, 0L)).toMap
//결과 반복자를 만들기 위해 배열 버퍼 사용
val result: ArrayBuffer[(Int, Double)] =
new scala.collection.mutable.ArrayBuffer()
valueColumnPairsIter.foreach {
case ((value, colIndex), count) =>
if (columnsInThisPart contains colIndex) {
val total = runningTotals(colIndex)
//the ranks that are contains by this element of the input iterator.
//get by filtering the
val ranksPresent = columnsRelativeIndex(colIndex)
.filter(index => (index <= count + total) && (index > total))
ranksPresent.foreach(r => result += ((colIndex, value)))
//update the running totals.
runningTotals.update(colIndex, total + count)
}
}
// 변환
result.toIterator
}
- 반복자-반복자 트랜스포메이션이 없는 mappartitions 예제
- 배열 버퍼는 스칼라 컬렉션을 만드는 데에 상대적으로 우수한 성능을 보인다.
- 입력 데이터가 클러스터의 처리량에 비해 상태적으로 크다면 메모리 부족 에러나 실패함
- 병렬적이진 않지만 서브루틴을 반복자-반복자 트랜스포메이셔으로 변환할 수 있다.
def asIteratorToIteratorTransformation(
valueColumnPairsIter : Iterator[((Double, Int), Long)],
targetsInThisPart: List[(Int, Long)] ): Iterator[(Int, Double)] = {
val columnsRelativeIndex = targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))
val columnsInThisPart = targetsInThisPart.map(_._1).distinct
val runningTotals : mutable.HashMap[Int, Long]= new mutable.HashMap()
runningTotals ++= columnsInThisPart.map(columnIndex => (columnIndex, 0L)).toMap
val pairsWithRanksInThisPart = valueColumnPairsIter.filter{
case (((value, colIndex), count)) =>
columnsInThisPart contains colIndex
}
pairsWithRanksInThisPart.flatMap{
case (((value, colIndex), count)) =>
val total = runningTotals(colIndex)
val ranksPresent: List[Long] = columnsRelativeIndex(colIndex)
.filter(index => (index <= count + total)
&& (index > total))
val nextElems: Iterator[(Int, Double)] =
ranksPresent.map(r => (colIndex, value)).toIterator
runningTotals.update(colIndex, total + count)
nextElems
}
}
- 한 번에 하나씩 반복자의 각 아이템을 처리 -> 함수에서 선택적으로 디스크에 쓰기 가능
- 배열 버퍼가 없기 때문에 가비지 컬렉션에서도 이득
집합 연산
- RDD는 레코드들이 구별이 안되기 때문에 중복 데이터를 다루는 방식에서 수학적인 연산과 차이가 있다.