def withArrayBuffer(valueColumnPairsIter : Iterator[((Double, Int), Long)],
targetsInThisPart: List[(Int, Long)] ): Iterator[(Int, Double)] = {
val columnsRelativeIndex: Predef.Map[Int, List[Long]] =
targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))
val columnsInThisPart: List[Int] = targetsInThisPart.map(_._1).distinct
val runningTotals : mutable.HashMap[Int, Long]= new mutable.HashMap()
runningTotals ++= columnsInThisPart.map(columnIndex => (columnIndex, 0L)).toMap
//결과 반복자를 만들기 위해 배열 버퍼 사용
val result: ArrayBuffer[(Int, Double)] =
new scala.collection.mutable.ArrayBuffer()
valueColumnPairsIter.foreach {
case ((value, colIndex), count) =>
if (columnsInThisPart contains colIndex) {
val total = runningTotals(colIndex)
//the ranks that are contains by this element of the input iterator.
//get by filtering the
val ranksPresent = columnsRelativeIndex(colIndex)
.filter(index => (index <= count + total) && (index > total))
ranksPresent.foreach(r => result += ((colIndex, value)))
//update the running totals.
runningTotals.update(colIndex, total + count)
}
}
// 변환
result.toIterator
}
반복자-반복자 트랜스포메이션이 없는 mappartitions 예제
배열 버퍼는 스칼라 컬렉션을 만드는 데에 상대적으로 우수한 성능을 보인다.
입력 데이터가 클러스터의 처리량에 비해 상태적으로 크다면 메모리 부족 에러나 실패함
병렬적이진 않지만 서브루틴을 반복자-반복자 트랜스포메이셔으로 변환할 수 있다.
def asIteratorToIteratorTransformation(
valueColumnPairsIter : Iterator[((Double, Int), Long)],
targetsInThisPart: List[(Int, Long)] ): Iterator[(Int, Double)] = {
val columnsRelativeIndex = targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))
val columnsInThisPart = targetsInThisPart.map(_._1).distinct
val runningTotals : mutable.HashMap[Int, Long]= new mutable.HashMap()
runningTotals ++= columnsInThisPart.map(columnIndex => (columnIndex, 0L)).toMap
val pairsWithRanksInThisPart = valueColumnPairsIter.filter{
case (((value, colIndex), count)) =>
columnsInThisPart contains colIndex
}
pairsWithRanksInThisPart.flatMap{
case (((value, colIndex), count)) =>
val total = runningTotals(colIndex)
val ranksPresent: List[Long] = columnsRelativeIndex(colIndex)
.filter(index => (index <= count + total)
&& (index > total))
val nextElems: Iterator[(Int, Double)] =
ranksPresent.map(r => (colIndex, value)).toIterator
runningTotals.update(colIndex, total + count)
nextElems
}
}
한 번에 하나씩 반복자의 각 아이템을 처리 -> 함수에서 선택적으로 디스크에 쓰기 가능
배열 버퍼가 없기 때문에 가비지 컬렉션에서도 이득
집합 연산
RDD는 레코드들이 구별이 안되기 때문에 중복 데이터를 다루는 방식에서 수학적인 연산과 차이가 있다.