ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spark] 효율적인 트랜스포메이션
    Software Development/Big Data 2021. 2. 14. 19:52

    효율적인 트랜스포메이션

    • 스파크 성능의 상당 부분은 데이터 변형 능력, RDD를 반환하는 연산에서 나온다.

    좁은 트랜스포메이션 vs. 넓은 트랜스포메이션

    • 트랜스포메이션이 넓은지 혹은 좁은지 어떻게 구분하는지, 이 구분 기준이 왜 평가와 성능에 영향을 끼치는지 알아본다.
    • 넓은 트랜스포메이션 -> 셔플 요구, 좁은 트랜스포메이션 -> 셔플 요구 X
    • 좁은 종속성의 트랜스포메이션: 부모 RDD의 각 파티션이 자식 RDD의 최대 하나의 파티션에 의해 사용되는 것[1]
    • 자식 RDD의 종속성에 따라 좁고 넓은 종속성을 정의(2장) vs 부모 RDD의 종속성 위주로 좁고 넓은 종속성을 정의[1]
      • 스파크 평가 엔진(DAG): 출력(액션) -> 입력 RDD로 실행 계획을 역으로 구축
        • 좁은 종속성: 부모파티션은 오직 하나, coalesce(합체)의 경우 파티션을 줄일 때
        • RDD를 평가할 때 연산을 완료하는 태스크의 개수가 왜 입력 파티션이 아니라 출력 파티션과 관계가 있는지 설명
      •  

    RDD 종속성

    // 좁은 종속성. rdd에 map 연산으로 (x, 1)의 튜플로 만든다
    val rdd2 = rdd1.map(x => (x, 1))
    // 넓은 종속성, groupByKey
    val rdd3 = rdd2.groupByKey()

     

    • map 연산: 파티션끼리 데이터가 이동할 필요가 없음. 자식 파티션은 하나의 부모 파티션에만 의존
    • groupByKey 연산: 반복자에 하나의 키 연계된 값들만 연결 필요 -> 동일한 키 레코드들이 동일한 파티션으로 모여야함 -> 자식 파티션은 다수의 부모RDD의 다수의 파티션으로 만들어짐

    성능에 대한 고려 사항

    • 좁은 종속성은 파티션간 데이터의 이동을 요구 X -> 드라이버와 통신이 필요 없음 -> 드라이버가 보낸 명령을 파티션에서 바로 실행
    • 좁은 트랜스포메이션들의 각 모음이 하나의 스테이지 안에서 연산 가능
    • 넓은 종속성의 트랜스포메이션들은 파티션간의 데이터 이동이 필요 -> 이후 연산은 셔플이 완료전까지 시작되지 않음
      • 정렬은 모든 레코드들이 정의된 순서에 따라 정렬이 돼야함 -> 좁은 트랜스포메이션으로 해결할 수 없음
    • 셔플은 데이터 이동과 디스크 입출력을 요구하기 때문에 비싸고 병렬 수행도 제한된다.

    장애 내구성의 고려 사항

    • 넓은 종속성에서 한 파티션의 실패 -> 재연산 필요한 파티션이 훨씬 많음 -> 많은 비용을 요구
    • 넓은 트랜스포메이션들을 연속으로 놓고 실행하는 것은(만약 메모리 오류를 일으킬 확률이 높다면) 높은 부하의 재연산을 불러올 수도 있다. -> 재연산 비용이 너무 높은 경우 RDD 체크포인팅 고려

    coalesce의 특별한 경우

    • 파티션 개수를 바꾸고자 할 때 사용
    • 파티션 개수를 줄이면 -> 좁은 트랜스포메이션
    • 파티션 개수를 늘리면 -> 부모 파티션은 다수의 자식 파티션에 의존

    내 트랜스포메이션은 어떤 타입의 RDD를 반환하는가?

    • RDD는 거의 모든 타입의 레코드를 가질 수 있다.
    • RDD 구현체에 동일한 트랜스포메이션이 호출되어도 다르게 평가 될 수 있다.
    • 스파크 프로그램이 복잡해지면서 난이도도 크게 증가할 수 있다.
    • 레코드 타입 정보는 스파크 API의 암죽적인 변환으로 인해 어려움이 생길 수 있다.
      • 제네릭 타입을 쓰지 않는 것이 포인트
    • Dataframe을 RDD로 작업할 때 정보를 타입 정보를 잃어버릴 수 있다.
      • 정확한 타입으로 캐스팅해야 한다. Dataframe의 스키마를 변수에 바인딩
      • Dataset API 강타입이라 RDD 변환한 뒤에도 타입 정보가 유지

    객체 생성 최소화하기

    • GC 직렬화에 걸리는 시간을 늘리면서 심각하게 느려질 수 있다.
    • 객체의 크기나 숫자를 줄여 GC에 대한 비용 최소화
      • 기존 객체 재활용 및 기본 타입 사용

    기존 객체 재활용하기

    • GC 직렬화에 걸리는 시간을 늘리면서 심각하게 느려질 수 있다.
    • 객체의 크기나 숫자를 줄여 GC에 대한 비용 최소화
    class MetricsCalculator(
        val totalwords : Int,
        val longestWord: Int,
        val happyMentions: Int,
        val numberReportCards: Int) extends Serializable {
        
        def sequenceOp(reportCardContent: String) : MetricsCalculator = {
            val words = reportCardContent.split(" ")
            val tW = words.length
            val lW = words.map(w => w.length).max
            val hM = words.count(w => w.toLowerCase.equals("happy"))
            
            new MetricsCalculator(
                tW + totalWords
                Math.max(longestWord, lW)
                hM + happyMentions
                numberReportCards + 1)
        }
        
        def compOp(other: MetricsCalculator) : MetricsCalculator = {
            new MetricsCalculator(
                this.totalWords + other.totalWords,
                Math.max(this.longestWord, other.longestWord),
                this.happyMentions + other.happyMentions,
                this.numberReportCards + other.numberReportCards)
                )
        }
        
        def toReportCardMetrics = 
            ReportCardMetrics(
                longestWord,
                happyMentions,
                totalwords.toDouble/numberReportCards)
    }
    • 객체를 재사용하지 않는 클래스
    • 두번의 map과 한 번의 reduceByKey 메서드를 사용하는 것보다 좋다.
    • 그러나 combine 단계마다 새로운 인스턴스 생성
    class MetricsCalculator(
        val totalwords : Int,
        val longestWord: Int,
        val happyMentions: Int,
        val numberReportCards: Int) extends Serializable {
        
        def sequenceOp(reportCardContent: String) : MetricsCalculator = {
            val words = reportCardContent.split(" ")
            totalwords += words.length
            longestWord = Math.max(longestWord, word.map(w => w.length).max)
            happyMentions += words.count(w => w.toLowerCase.equals("happy"))
            numberReportCards += 1
            this
        }
        
        def compOp(other: MetricsCalculator) : MetricsCalculator = {
            totalWords += other.totalWords,
            longestWord = Math.max(this.longestWord, other.longestWord),
            happyMentions += other.happyMentions,
            numberReportCards += other.numberReportCards
            this
        }
        
        def toReportCardMetrics = 
            ReportCardMetrics(
                longestWord,
                happyMentions,
                totalwords.toDouble/numberReportCards)
    }
    • 객체를 재사용하는 클래스

    더 작은 자료 구조 사용하기

    • 스파크 시간과 공간 양쪽으로 최적화하는 중요한 수단 -> 사용자 정의 클래스가 아니라 기본 타입 사용하는 것
    • 가독성은 떨어질 수 있지만, GC 오버헤드를 줄이는 데 효과적이다.
    • 스칼라 배열은 컬렉션 타입 중 메모리 효율이 가장 뛰어나다.
    object MetricsCalculator_Arrays extends Serializable {
        val totalwordIndex = 0
        val longestWordIndex = 1
        val happyMentionsIndex = 2
        val numberReportCardIndex = 3
        
        def sequenceOp(reportCardMetrics : Array[Int],
        	reportCardContent : String) : Array[Int] = {
            val words = reportCardContent.split(" ")
            reportCardMetrics(totalwordIndex) += word.length
            reportCardMetrics(longestWordIndex) = Math.max(
            	reportCardMetrics(longestWordIndex),
                word.map(w => w.length).max)
            reportCardMetrics(happyMentionsIndex) += word.count(
            	w => w.toLowerCase.equals("happy"))
            reportCardMetrics(numberReportCardIndex) += 1
            reportCardMetrics
    	}
        
        def combOp(x : Array[Int], y : Array[Int]) : Array[Int]) = {
        	x(totalwordIndex) += y(totalwordIndex)
            x(longestWordIndex) += Math.max(x(longestWordIndex), y(longestWordIndex))
            x(happyMentionsIndex) += y(happyMentionsIndex)
            x(numberReportCardIndex) += y(numberReportCardIndex)
            x
        }
        
        def toReportCardMetrics(ar : Array[Int]) : ReportCardMetrics = 
            ReportCardMetrics(
                ar(longestWordIndex),
                ar(happyMentionsIndex),
                ar(totalwordIndex)/ar(numberReportCardIndex),
    }
    • 시퀀스와 콤바인 연산을 문자열과 배열에 대한 함수로 정의하여 동일한 가독성의 코드 사상을 유지할 수 있다.
    • 함수 내에서 중간 객체 생성을 피하는 것도 이득을 얻을 수 있다.
      • 타입 간의 변환이 중간 객체를 만들게 된다. 암족적인 변환이 운 나쁘게 성능에 영향을 끼치는 또 다른 부분이다. 

    mapPartitions로 수행하는 반복자-반복자 트랜스포메이션

    • mapPartitions 함수는 레코드들의 반복자를 받아 또 다른 반복자로 출력하는 함수를 인자로 받는다.
    • 한 파티션의 데이터를 대상으로 임의의 코드를 정의할 수 있게 해주기에 스파크의 강력한 함수 중 하나다.

    반복자-반복자 트랜스포메이션이란?

    • 스칼라 반복자 객체:
      • 컬렉션은 아니다. 컬렉션 내의 원소들에 하나씩 접근 할 수 있다.
      • 변경 불가능한 객체
      • 동일한 원소에 두 번 이상 접근할 수 없음.
      • 스파크 트랜스포메이션과 달리 반복자의 트랜스포메이션은 병렬적으로 실행되지 안혹 한 번에 한 개씩 순차적으로 실행
      • 컬렉션 객체로 변환하는 것은 반복자-반복자 트랜스포메이션의 이점을 없앤다.

    시간적/공간적인 이득

    •  스파크가 선택적으로 데이터를 디스크에 쓸 수 있다.
    • 스파크가 하나의 익스큐터에서 메모리에 담기에는 너무 큰 파티션들을 메모리 에러 없이 처리해 준다.
    • 중간 데이터용 자료 구조를 정의하지 않아도 된다.
      • 큰 크기의 중간 자료 구조를 쓰는 횟수를 줄이는 것은 필요 없는 객체 생성을 피하는 방법이다.
        • GC 속도가 늦어지는 위험도 피할 수 있다.

    예제

    private def findTargetRanksIteratively(
        sortedAggregatedValueColumnPairs : RDD[((Double, Int), Long)],
        ranksLocations : Array[(Int, List[(Int, Long)])]): RDD[(Int, Double)] ={
            
        sortedAggregatedValueColumnPairs.mapPartitionsWithIndex((partitionIndex: Int,
            aggregatedValueColumnPairs : Iterator[((Double, Int), Long)]) =>{
            
            val targetsInThisPart: List[(Int, Long)] = ranksLocations(partitionIndex)._2
            if (targetsInThisPart.nonEmpty) {
                FindTargetsSubRoutine.asIteratorToIteratorTransformation(
                    aggregatedValueColumnPairs,
                    targetsInThisPart)
            } else {
                Iterator.empty
            }
        })
    }

     

    • mapPartitions 예제 

     

      def withArrayBuffer(valueColumnPairsIter : Iterator[((Double, Int), Long)],
        targetsInThisPart: List[(Int, Long)] ): Iterator[(Int, Double)] = {
    
          val columnsRelativeIndex: Predef.Map[Int, List[Long]] =
            targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))
    
          val columnsInThisPart: List[Int] = targetsInThisPart.map(_._1).distinct
    
          val runningTotals : mutable.HashMap[Int, Long]= new mutable.HashMap()
          runningTotals ++= columnsInThisPart.map(columnIndex => (columnIndex, 0L)).toMap
    
        //결과 반복자를 만들기 위해 배열 버퍼 사용
          val result: ArrayBuffer[(Int, Double)] =
          new scala.collection.mutable.ArrayBuffer()
    
          valueColumnPairsIter.foreach {
            case ((value, colIndex), count) =>
    
              if (columnsInThisPart contains colIndex) {
    
                val total = runningTotals(colIndex)
                //the ranks that are contains by this element of the input iterator.
                //get by filtering the
                val ranksPresent = columnsRelativeIndex(colIndex)
                  .filter(index => (index <= count + total) && (index > total))
                ranksPresent.foreach(r => result += ((colIndex, value)))
                //update the running totals.
                runningTotals.update(colIndex, total + count)
            }
          }
        // 변환
        result.toIterator
      }
    
    •  반복자-반복자 트랜스포메이션이 없는 mappartitions 예제
    • 배열 버퍼는 스칼라 컬렉션을 만드는 데에 상대적으로 우수한 성능을 보인다.
    • 입력 데이터가 클러스터의 처리량에 비해 상태적으로 크다면 메모리 부족 에러나 실패함
    • 병렬적이진 않지만 서브루틴을 반복자-반복자 트랜스포메이셔으로 변환할 수 있다.
      def asIteratorToIteratorTransformation(
        valueColumnPairsIter : Iterator[((Double, Int), Long)],
        targetsInThisPart: List[(Int, Long)] ): Iterator[(Int, Double)] = {
    
        val columnsRelativeIndex = targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))
        val columnsInThisPart = targetsInThisPart.map(_._1).distinct
    
        val runningTotals : mutable.HashMap[Int, Long]= new mutable.HashMap()
         runningTotals ++= columnsInThisPart.map(columnIndex => (columnIndex, 0L)).toMap
    
        val pairsWithRanksInThisPart = valueColumnPairsIter.filter{
          case (((value, colIndex), count)) =>
            columnsInThisPart contains colIndex
         }
    
        pairsWithRanksInThisPart.flatMap{
    
          case (((value, colIndex), count)) =>
    
              val total = runningTotals(colIndex)
              val ranksPresent: List[Long] = columnsRelativeIndex(colIndex)
                                             .filter(index => (index <= count + total)
                                               && (index > total))
    
              val nextElems: Iterator[(Int, Double)] =
                ranksPresent.map(r => (colIndex, value)).toIterator
    
              runningTotals.update(colIndex, total + count)
              nextElems
        }
      }
    
    •  한 번에 하나씩 반복자의 각 아이템을 처리 -> 함수에서 선택적으로 디스크에 쓰기 가능
    • 배열 버퍼가 없기 때문에 가비지 컬렉션에서도 이득

    집합 연산

    •  RDD는 레코드들이 구별이 안되기 때문에 중복 데이터를 다루는 방식에서 수학적인 연산과 차이가 있다.

     

     

    [1] http://bit.ly/2Jnvewf

     

    댓글

Designed by Tistory.