ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [FP In Scala] 순수 함수적 병렬성
    Software Development/Scala 2021. 5. 2. 15:52

    병렬 및 비동기 계산의 생성을 위한 순수 함수적 라이브러리 하나를 구축한다.

    병렬적 프로그램에 내재하는 복잡성을, 오직 순수 함수만으로 프로그램을 서술함으로써 통제해 볼 것이다.

    이를 통해 치환 모형을 이용해서 추론을 단순화할 수 있으며, 그러다 보면 바라건대 동시적 계산을 쉽고 즐겁게 다룰 수 있게 될 것이다.

     

    순수 함수적 병렬성을 위한 라이브러리의 작성 방법.

    순수 함수적 라이브러리의 설계 문제에 대한 접근방식.

     

    주제: 계산의 서술이라는 관심사 -> 계산의 실제 실행이라는 관심사와 분리

     

    프로그램의 구체적인 실행 방식에 관한 세부사항과는 격리된 수준에서 프로그램을 작성할 수 있어야 한다.

    • parMap이라는 조합기를 개발할 것인데, 이를 이용하면 함수 f를 한 컬렉션의 모든 요소에 동시에 적용할 수 있다.
    val outputList = parMap(inputList)(f)

    이 조합기에 도달할 때까지 반복적인 과정을 거치게 된다.

    • 라이브러리가 처리하고자 하는 간단한 용례
    • 용례를 촉진하는 인터페이스
    • 인터페이스의 구현 방법에 대한 고민

    설계를 계속 개선 -> 인터페이스와 구현을 오가며 -> 점점 복잡한 용례들을 거치면서 -> 문제 영역과 설계 공간에 대한 이해도를 향상.

    대수적 추론에 역점을 두고, API를 특정 법칙들을 따르는 하나의 대수(algebra)로 서술할 수 있다는 착안을 소개할 것이다.

     

    라이브러리를 직접 설계하는 이유

    • 학습에 도움이 되기 때문에.
      • 실용적인 라이브러리를 직접 설계하기가 얼마나 쉬운지 독자에게 보여주고자 한다.
    • 절대적인 권위를 가진, 또는 다시 고찰할 필요가 없는 기존의 라이브러리는 없다는 관점을 제시.
      • 관례적인 것이 가장 실용적인 것이라는 보장은 없다.

    이번 장의 라이브러리에서 근본적인 가정은, 이 라이브러리가 부수 효과를 절대로 허용하지 않는다는 것.

     

    1. 자료 형식과 함수의 선택

    함수적 라이브러리를 설계할 때에는 그 라이브러리로 무엇을 하고자 하는지에 대한 전반적인 착안을 가지고 설계 과정을 시작.

    착안을 정련(refining)하고, 원하는 가능성을 가능하게 하는 자료 형식을 찾는 데 있다.

    이번 예제 라이브러리에서 원하는 것은 "병렬 계산을 생성할 수 있어야 한다"는 것. 정확히 어떤 의미일까?

    간단하고 병렬화할 수 있는 계산을 살펴보자. 이를 통해 착한을 구현이 가능한 어떤 것으로 정련한다.

     

    병렬화할 계산은 목록에 있는 정수들의 합을 구하는 것.

    통상적인 왼쪽 접기로 수행하는 코드는 다음과 같다.

      /* Seq는 표준 라이브러리에 있는 목록과 기타 순차열들의 상위 클래스다.
      포인트는 Seq에 foldLeft 메서드가 있다.
      */  
      def sum(ints: Seq[Int]): Int = 
        ints.foldLeft(0)((a, b) => a + b)

    정수들의 순차적으로 접는 대신, 다음과 같이 분할정복 알고리즘을 적용할 수도 있다.

      /* IndexedSeq는 표준 라이브러리의 Vector와 비슷한 임의 접근 순차열들의 상위 클래스다.
      목록과는 달리 이런 순차열은 순차열을
      특정 지점에서 두 부분으로 분할하는 효율적인 splitAt 메서드를 제공.
       */
      def sumdc(ints: IndexedSeq[Int]): Int =
        if (ints.size <= 1)
          // headOption은 스칼라의 모든 컬렉션에 정의되는 메서드다.
          ints.headOption getOrElse 0
        else {
          // splitAt 함수를 이용해서 순차열을 반으로 나눈다.
          val (l, r) = ints.splitAt(ints.length/2)
          sumdc(l) + sum(r)
        }
    

    foldLeft 기반의 구현과는 달리 이 구현은 병렬화할 수 있다. 즉, 두 절반을 병렬로 합할 수 있는 것이다. 

     

    이 계산을 병렬화하려면 어떤 종류의 자료 형식과 함수가 필요할지 고민하다 보면 이전과는 다른 관점으로 문제를 바라보게 된다.

     

    간단한 예제들에서 영감을 얻어서 이상적인 API를 직접 설계하고 그것으로부터 구현으로 되돌아가는 접근방식으로 알아본다.

     

    1.1 병렬 계산을 위한 자료 형식 하나

    표현식 sum(l) + sum(r)의 한 줄의 코만 봐도, 병렬 계산을 나타내는 자료 형식이 하나의 결과를 담을 수 있어야 한다는 점을 알 수 있다.

     

    그 결과는 어떤 의미 있는 형식(예제에서는 Int)이어야 하며, 그 결과를 추출하는 수단도 갖추어야 한다.

     

    결과를 담을 컨테이너 형식 Par[A](parallel의 줄인 이름)를 새로 창안한다.

     

    • def unit[A](a: => A): Par[A] - 평가되지 않은 A를 받고, 그것을 개별적인 스레드에서 평가할 수 있는 계산을 돌려준다. 함수의 이름이 unit인 것은, 이 함수가 하나의 값을 감싸는 병렬성의 한 단위를 생성한다고 생각할 수 있기 때문이다.
    • def get[A](a: Par[A]): A - 병렬 계산에서 결과 값을 추출.
      def sum(ints: IndexedSeq[Int]): Int =
        if (ints.size <= 1)
          ints.headOption getOrElse 0
        else {
          val (l, r) = ints.splitAt(ints.length/2)
          // 왼쪽 절반을 병렬로 계산.      
          val sumL: Par[Int] = Par.unit(sum(l))
          // 오른쪽 절반을 병렬로 계산.
          val sumR: Par[Int] = Par.unit(sum(r))
          // 두 결과를 추출해서 합한다.      
          Par.get(sumL) + Par.get(sumR)
        }

    이 버전은 두 재귀적 sum 호출을 unit으로 감싸고, 두 부분 계산의 결과들을 get을 이용해서 추출한다.

     

    unit주어진 인수를 개별적인 스레드(논리적 스레드)에서 즉시 평가할 수도 있고, 인수를 그냥 가지고 있다가 get이 호출되면 평가를 시작할 수도 있다.

     

    예제에서 병렬성의 이점을 취하기 위해서는 unit이 인수의 동시적 평가를 시작한 후 즉시 반환되어야 한다.

     

    unit이 인수들의 평가를 동시에 시작한다면 get 호출에서 참조 투명성이 깨질 수 있다.

    sumL과 sumR을 해당 정의로 치환해 보면 이 점이 명백해진다. 프로그램은 병렬로 실행되지 않는다.

    Par.get(Par.unit(sum(l))) + Par.get(Par.unit(sum(r)))

     

    unit이 자신의 인수를 즉시 평가하기 시작한다면, 그다음으로 일어나는 일은 get이 그 평가의 완료를 기다리는 것이다.

     

    sumL 변수와 sumR 변수를 그냥 단순히 나열하면 + 기호의 양변은 병렬로 실행되지 않는다.

     

    이는 unit에 한정적인 부수 효과가 존재함을 의미한다. 단, 그 부수 효과는 get에만 관련된 것이다.

    unit은 비동기 계산을 나타내는 Par[Int]를 돌려준다. 그런데 Par를 get으로 넘겨주는 즉시, get의 완료까지 실행이 차단된다는 부수 효과가 드러난다. 

     

    따라서 get을 호출하지 않거나, 적어도 호출을 최대한 미루어야 한다. 즉, 비동기 계산들을 그 완료를 기다리지 않고도 조합할 수 있어야 한다.

     

    1.2 병렬 계산의 조합

    unit과 get 조합의 문제점을 어떻게 피할 수 있을까? get을 호출하지 않는다면 sum 함수는 반드시 Par[Int]를 돌려주어야 한다.

      def sum(ints: IndexedSeq[Int]): Par[Int] =
        if (ints.size <= 1)
          Par.unit(ints.headOption getOrElse 0)
        else {
          val (l, r) = ints.splitAt(ints.length/2)
          Par.map2(sum(l), sum(r))(_ + _)
        }

    이제 재귀의 경우에 unit을 호출하지 않는다. 그리고 이제 unit의 인수가 게으른 인수이어야 하는지도 명확하지 않다. 지금의 예제에서는 인수를 게으르게 받는다는 것이 별로 이득이 되지 않는다. 항상 그런지는 확실하지 않다. 이 문제는 나중에 다시 생각해 볼 것이다.

     

    map2는 인수를 게으르게 받아야 할까? map2의 경우에는 계산의 양변에 동등한 실행 기회를 주어서 양변이 병렬로 계산되게 하는 것이 합당하다(인수들의 순서는 중요하지 않으며, 결합되는 두 계산이 독립적이며, 병렬로 실행될 수 있음을 나타내는 것이 중요하다).

     

    test case로, map2의 두 인수가 엄격하게 평가된다고 할 때 sum(IndexedSeq(1, 2, 3, 4))의 평가가 어떻게 진행되는지 생각해 보자.

    P.126, 목록 7.3 참고

     

    map2는 엄격한 함수이므로 스칼라는 그 인수들을 왼쪽에서 오른쪽으로 평가한다. 

     

    따라서 map2(sum(x), sum(y))(_ + _)를 만날 때마다 sum(x) 등을 재귀적으로 평가해야 한다.

     

    이는, 합산 트리의 왼쪽 절반 전체를 엄격하게 구축한 후에야 오른쪽 절반을 엄격하게 구축할 수 있다는 결과로 이어진다.

     

    sum(IndexedSeq(1, 2))가 완전히 전개된 후에야 sum(IndexedSeq(3, 4))의 평가가 시작된다. 

     

    map2가 인수들을 병렬로 평가한다면(스레드 풀 등, 병렬성을 구현하는 데 쓰이는 어떤 자원을 이용해서), 이는 계산의 오른쪽 절반을 구축을 시작하기도 전에 계산의 왼쪽 절반이 실행되기 시작함을 의미한다.

     

    map2를 엄격하게 유지하되 그 실행이 즉시 시작되지 않게 하면 어떨까? 

     

    map2의 평가가 즉시 시작되지 않는다는 것Par 값이라는 것단지 병렬로 계산해야 할 것의 서술을 구축하는 것임을 의미한다.

    그 서술을 평가(get 같은 함수를 이용해서)하기 전까지는 아무 일도 일어나지 않는다. 

     

    문제는 만일 그러한 서술을 엄격하게 구축한다면, 서술을 나타내는 객체가 상당히 무거운 객체가 될 것이라는 점이다. 

    수행할 연산들의 전체 트리를 담아야 한다.

     

    이 서술을 어떤 자료구조에 담든, 그 자료구조는 아마 원래의 목록 자체보다 더 많은 공간을 차지할 것이다. 만일 서술을 더 가볍게 표현할 수 있다면 좋을 것이다.

     

    map2를 게으르게 만들고 양변을 병렬로 즉시 실행하는 것이 나아 보인다. 그러면 양변에 동등한 실행 기회를 부여하는 문제도 해결된다.

     

    1.3 명시적 분기

    map2의 두 인수를 병렬로 평가하는 것이 항상 바람직할까? 다음과 같은 예를 보자.

    Par.map2(Par.unit(1), Par.unit(1))(_ + _)

    이 예에서 결합하고자 하는 두 계산은 아주 빠르게 완료될 것이 자명하며, 따라서 굳이 개별적인 논리적 스레드를 띄울 필요가 없다.

     

    그러나 현재의 API에는 이런 정보를 제공할 수단이 갖추어져 있지 않다. 즉, 현재의 API는 계산을 주 스레드로부터 분기하는 시점에 관해 그리 명료하지 않다.

     

    즉, 프로그래머는 그러한 분기가 일어나는 지점 또는 시점을 구체적으로 지정할 수 없다. 분기를 좀 더 명시적으로 만들 수 있을까? 

     

    이를 위해 또 다른 함수 def fork[A](a: => Par[A]): Par[A]를 추가한다.

     

    이 함수는 주어진 Par가 개별 논리적 스레드에서 실행되어야 함을 명시적으로 지정하는 용도로 쓰인다.

      def sum(ints: IndexedSeq[Int]): Par[Int] =
        if (ints.size <= 1)
          Par.unit(ints.headOption getOrElse 0)
        else {
          val (l, r) = ints.splitAt(ints.length/2)
          Par.map2(Par.fork(sum(l)), Par.fork(sum(r)))(_ + _)
        }
    

     

    fork 덕분에 이제는 map2를 엄격한 함수로 만들고, 인수들을 감싸는 것은 프로그래머의 뜻에 맡길 수 있게 되었다. 

    fork 같은 함수는 병렬 계산들을 너무 엄격하게 인스턴스화하는 문제를 해결해 주나, 좀 더 근본적으로는 병렬성을 명시적으로 프로그래머의 통제하에 두는 역할을 한다. 

     

    여기서 다루는 관심사는 두 가지이다.

    • 두 병렬 task의 결과들이 겹합되어야 함을 지정하는 수단이 필요하다.
    • 특정 과제를 비동기적으로 수행할지 아닐지를 선택하는 수단이 필요하다.

    관심사들을 분리한 덕분에 map2나 기타 조합기들에 병렬성에 관한 그 어떤 전역 방침도 내장할 필요가 없다. 

     

    unit이 엄격해야 하는지 게을러야 하는지의 문제로 돌어간다.

    fork가 있어 이제 unit을 엄격하게 만들어도 표현력이 전혀 감소하지 않는다. 

    이 함수의 비엄격 버전(lazyUnit)은 unit과 fork로 간단히 구현할 수 있다.

     

    def unit[A](a: A): Par[A]
    def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
    

    lazyUnit 함수는 unit 같은 기본 조합기가 아니라 파생된 조합기의 간단한 예다. 

     

    lazyUnit은 다른 연산들을 이용해서 정의할 수 있다. 나중에 Par의 구체적인 표현을 선택할 텐데, lazyUnit은 그 표현에 대해 아무것도 알 필요가 없다. 단지 Par가 Par에 대해 정의된 연산 fork와 unit을 거치게 된다는 점만 알고 있다.

     

    fork는 인수들을 개별 논리적 스레드에서 평가되게 하는 수단이다.

    그런데 그러한 평가가 호출 즉시 일어나게 할 것인지, 아니면 get 같은 어떤 함수에 의해 계산이 강제될 때까지 개별 논리적 스레드에서의 평가를 미룰 것인지는 아직 결정하지 않았다. 다른 말로 하자면,

     

    평가가 fork의 책임인지, 아니면 get의 책임인지는 아직 결정하지 않았다. 

     

    평가를 적극적으로 수행할 것인지 게으르게 수행할 것인지를 선택한다고 말해도 될 것이다.

     

    fork와 get의 구현에 어떤 정보가 필요한가를 생각해보자.

     

    fork가 자신의 인수를 즉시 병렬로 평가하기 시작한다면, 그 구현은 스레드를 생성하는 방법이나 과제를 일종의 스레드 풀에 제출하는 방법을 직접적으로든 간접적으로든 알고 있어야 한다. 이는 스레드 풀이 반드시 접근 가능한(전역적으로) 자원이어야 하며, fork를 호출하는 시점에서 이미 적절히 초기화되어 있어야 함을 의미한다. 

     

    그런 조건을 만족하려면 프로그램의 여러 부분에서 쓰이는 병렬성 전략을 프로그래머가 임의로 제어할 수 있는 능력을 포기해야 한다. 

     

    병렬 과제들의 실행을 위해 전역 자원을 두는 것이 근본적으로 잘못된 일은 아니지만, 구현이 무엇을 언제 사용할 것인지를 프로그래머가 좀 더 세밀하게 제어할 수 있다면 더 좋을 것이다. 

     

    따라서 스레드 생성과 실행 과제 제출의 책임get에 부여하는 것이 훨씬 적합하다.

     

    fork와 get의 구체적인 구현 방식을 알지 못해도, 심지어 Par의 구체적인 표현을 결정하지 않고도 이런 결론에 도달할 수 있다.

     

    만일 fork가 그냥 인수의 평가를 뒤로 미루게 한다면, fork는 병렬성 구현을 위한 메커니즘에 접근할 필요가 없다.

    그냥 평가되지 않는 Par 인수를 받고 그 인수에 동시적 평가가 필요하다는 점표시만 해 두면 된다.

    이것이 fork의 의미라고 가정하자.

    이러한 모형에서, Par 자체는 병렬성의 구체적인 구현 방법을 알 필요가 없다. Par는 나중에 get 함수 같은 무언가에 의해 해석될 병렬 계산에 관한 서술에 가깝다.

     

    이는 Par를 나중에 준비되었을 때 조회(get)할 어떤 값을 담은 컨테이너라고 생각했던 것과 다른 발상이다. 이제는 실행이 가능한 일급 프로그램에 좀 더 가까워졌다. 그런 취지에서 get 함수의 이름을 run으로 바꾸고, 병렬성이 실제로 구현되는 지점이 바로 이 run 함수임을 천명하기로 하자.

    def run[A](a: Par[A]): A

     

    2. 표현의 선택

    // 즉시 평가되어서 결과 a를 산출하는 계산을 생성한다.
    def unit[A](a: A): Par[A]
    // 두 병렬 계산의 결과들을 이항 함수로 조합한다.
    def map2[A, B, C](a: Par[A], b: Par[B])(f: (A, B) => C): Par[C]
    // 이후에 run이 동시적으로 평가할 계산임을 표시한다.
    def fork[A](a: => Par[A]): Par[A]
    // run이 동시적으로 평가할 표현식 a를 감싼다.
    def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
    // 주어진 Par를 ,fork의 요청에 따라 병렬 계산들을 수행하고 그 결과 값을 추출함으로써 완전히 평가한다.
    def run[A](a: Par[A]): A

    여러 함수에 느슨하게나마 의미를 부여했다.

    • unit은 상수 값을 병렬 계산으로 승격한다.
    • map2는 두 병렬 계산의 결과들을 이항 함수로 조합한다.
    • fork는 주어진 인수 동시적으로 평가될 계산임을 표시한다. 그 평가는 run에 강제되어야 실제로 실행된다.
    • lazyUnit은 평가되지 않은 인수를 Par로 감싸고, 그것을 병렬 평가 대상으로 표시한다.
    • run은 계산을 실제로 실행해서 Par로부터 값을 추출한다.

    run이 어떤 방법으로든 비동기적 과제들을 실행해야 함을 알고 있다. java.util.concurrent.ExecutorService가 이미 있다. 다음은 이 클래스의 API를 스칼라로 옮긴 것이다.

    class ExecutorService {
      def submit[A](a: Callable[A]): Future[A]
    }
    // 사실상 그냥 게으른 A임.
    trait Callable[A] { def call: A}
    trait Future[A] {
      def get: A
      def get(timeout: Long, unit: TimeUnit): A
      def cancel(evenIfRunning: Boolean): Boolean
      def isDone: Boolean
      def isCancelled: Boolean
    }

    ExecutorService의 submit 메서드는 주어진 Callable 값에 대응되는, 필요에 따라 개별 스레드에서 실행될 계산을 처리해 주는 Future 객체를 돌려준다. 

    계산의 결과Future의 get 메서드로 얻을 수 있다. Future는 또한 계산의 취소를 위한 추가적인 기능도 제공한다.

     

    run 함수가 ExecutorService에 접근할 수 있다고 가정하고, 그것이 Par의 표현을 선택하는 데 어떤 통찰을 제공하는지 살펴본다.

    def run[A](s: ExecutorService)(a: Par[A]): A

    Par[A]의 표현으로 사용할 수 있는 가장 간단한 모형은 ExecutorService => A일 것이다. 이 표현을 선택한다면 run을 구현하기가 아주 간단해짐이 명백하다. 그러나 계산 완료까지의 대기 시간이나 취소 여부를 run 호출자가 결정할 수 있게 하면 더욱 좋을 것이다. 이를 위해 Par[A]를 ExecutorService => Future[A]로 두고, run은 그냥 Future를 돌려주게 하자.

     

    type Par[A] = ExecutorService => Future[A]
    
    def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)
    

    Par가 ExecutorService를 필요로 하는 하나의 함수로 표현되었기 때문에, Future의 생성은 이 ExecutorService가 제공되기 전까지는 일어나지 않음을 주목하기 바란다.

    3. API의 정련

    지금까지의 API를 조사하고 개선한다.

     

    지금까지 만든 것들을 이용해서 어떤 연산을 표현할 수 있는지 시도해 본다.

     

    지금까지 개발한 API의 함수들을 구현하는 것으로 시작한다.

    다음은 선택된 Par 표현을 이용한 단순한 구현이다.

    object Par {
      /* unit은 UnitFuture를 돌려주는 함수로 표현된다. UnitFuture는 Future의 간단한 구현으로,
      그냥 상수 값을 감싸기만 할 뿐 ExcutorService는 전혀 사용하지 않는다. UnitFuture는 항상 완료 가능하며, 취소는 불가능하다.
      UnitFuture의 get 메서드는 이전에 주어진 상수 값을 돌려주기만 한다.
      */
      def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a)
    
      private case class UnitFuture[A](get: A) extends Future[A] {
        def isDone = true
        def get(timeout: Long, units: TimeUnit) = get
        def isCancelled = false
        def cancel(evenIfRunning: Boolean): Boolean = false
      }
    
      /* 이 API에서 병렬성 제어는 오직 fork 함수만 담당한다는 설계상의 선택에 따라, map2는 f 호출을 개별 논리적 스레드에서 평가하지 않는다.
      f를 개별 스레드에서 평가하고 싶다면 fork(map2(a,b)(f))를 사용하면 된다.
      */
      def map2[A, B, C](a: Par[A], b: Par[B])(f: (A, B) => C): Par[C] =
        (es: ExecutorService) => {
          val af = a(es)
          val bf = b(es)
          /* map2의 이 구현은 만료 시간을 지키지 않는다. 이 구현은 그냥 ExcutorService를 두 Par 값에 전달하고, Future의 af와 bf의 결과들을 기다리고,
          그것들에 f를 적용하고, 적용 결과를 UnitFuture로 감쌀 뿐이다. 만료 시간을 지키기 위해서는, af의 평가에 걸린 시간을 측정하고 bf의 평가에 걸린 시간에서 그 시간을
          뺴는 식의 새로운 Future 구현이 필요할 것이다.
          */
          UnitFuture(f(af.get, bf.get))
        }
    
      /* 이것이 fork의 가장 간단하고 가장 자연스러운 구현이나, 몇 가지 문제점이 있다. 예를 들어 외각의 Callable은 '안쪽' 과제가 완료될 때까지 차단된다.
      이러한 차단이 스레드 풀의 한 스레드를 점유하며, 이는 잠재적 병렬성의 일부가 소실될 수 있음을 의미한다. 본질적으로 이 구현은 한 스레드로 충분한 작업을 두 새의 스레드로 수행한다.
      이 구현의 좀 더 심각한 문제점의 증상이다.
      */
      def fork[A](a: => Par[A]): Par[A] =
        es => es.submit(new Callable[A] {
          def call = a(es).get
        })
    }

    Future의 인터페이스가 순수 함수적이지는 않음을 주목하자.

    이는 이 예제 라이브러리의 사용자가 Future를 직접 다루게 하는 것이 바람직하지 않은 이유의 일부이다. 

     

    비록 Future의 메서드들이 부수 효과에 의존하긴 하지만, Par API 자체는 여전히 순수하다.

    Future의 내부 작동 방식은 오직 사용자가 run을 호출해서 구현이 ExecutorService를 받게 되어야 드러난다

    구현이 언젠가는 효과들에 의존하긴 하지만, 사용자는 항상 순수한 인터페이스로 프로그램을 짤 수 있다. 

     

    API가 순수하므로 그 효과들은 사실상 부수 효과가 아니다. 4부에서 추후 논의.

     

    Future의 만료 시간 계약을 존중하도록 map2의 구현을 개선.

    /* This version respects timeouts. See `Map2Future` below. */
    def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C] =
      es => {
        val (af, bf) = (a(es), b(es))
        Map2Future(af, bf, f)
      }
    
    /*
    Note: this implementation will not prevent repeated evaluation if multiple threads call `get` in parallel. We could prevent this using synchronization, but it isn't needed for our purposes here (also, repeated evaluation of pure values won't affect results).
    */
    case class Map2Future[A,B,C](a: Future[A], b: Future[B],
                                 f: (A,B) => C) extends Future[C] {
      @volatile var cache: Option[C] = None
      def isDone = cache.isDefined
      def isCancelled = a.isCancelled || b.isCancelled
      def cancel(evenIfRunning: Boolean) =
        a.cancel(evenIfRunning) || b.cancel(evenIfRunning)
      def get = compute(Long.MaxValue)
      def get(timeout: Long, units: TimeUnit): C =
        compute(TimeUnit.NANOSECONDS.convert(timeout, units))
    
      private def compute(timeoutInNanos: Long): C = cache match {
        case Some(c) => c
        case None =>
          val start = System.nanoTime
          val ar = a.get(timeoutInNanos, TimeUnit.NANOSECONDS)
          val stop = System.nanoTime;val aTime = stop-start
          val br = b.get(timeoutInNanos - aTime, TimeUnit.NANOSECONDS)
          val ret = f(ar, br)
          cache = Some(ret)
          ret
      }
    }
    

     

    임의의 함수 A => B를 그 결과가 비동기적으로 평가되는 함수로 변환하는 함수를 lazyUnit을 이용해서 작성.

    def asyncF[A, B](f: A => B): A => Par[B] =
    	a => lazyUnit(f(a))
    

    하나의 List[Int]를 산출하는 병렬 계산을 나타내는 Par[List[Int]]가 있다고 하자.

    결과가 정렬된 Par[List[Int]]로 변환하고자 한다.

    def sortPar(parList: Par[List[Int]]): Par[List[Int]]

    Par에 run을 적용하고, 결과 목록을 정렬하고, 정렬된 목록을 unit을 이용해서 다시 Par로 꾸릴 수 있다.

    그러나 run의 호출을 피하고자 한다. unit 외에, Par의 값을 어떤 방식으로든 조작할 수 있는 조합기는 map2뿐이라고 한다.

    만일 parList를 map2의 양변 중 하나에 지정한다면 List의 내부에 접근해서 목록을 정렬할 수 있다.

    map2의 다른 변에는 아무것이나 넣어도 되므로, 그냥 무연산(no-op)을 전달하기로 한다.

    def sortPar(parList: Par[List[Int]]): Par[List[Int]] =
    	map2(parList, unit(()))((a, _) => a.sorted

    이제 Par[List[Int]]에게 목록을 정렬하라고 알려 줄 수 있다. 

    이를 일반화하는 것도 가능하다.

    A => B 형식의 임의의 함수를, Par[A]를 받고 Par[B]를 돌려주는 함수로 승급시킬 수 있다.

     

    다음은 Par에 대해 임의의 함수를 사상하는 map의 정의다.

    def map[A, B](pa: Par[A])(f: A => B): Par[B] =
    	map2(pa. unit(()))((a, _) => f(a))

    sortPar는 다음과 같이 정의하면 된다.

    def sortPar(parList: Par[List[Int]]) = map(parList)(_.sorted)

    간결하고 명확한 정의다. 연산들을 조합해서 형식들이 잘 맞아떨어지게 했을 뿐이지만, map2와 unit의 구현을 살펴본다면 map의 이러한 구현에 뭔가 중요한 의미가 있음을 확실히 알 수 있을 것이다.

     

    map2가 둘째 인수를 그냥 무시하게 만들기 위해 둘째 인수에 무의미한 값 unit(())을 전달하는 것이 속임수일까? 그렇지 않다.

    map2를 이용해서 map을 구현할 수 있지만 그 반대로는 불가능하다는 사실은 map2가 map보다 더 강력함을 보여줄 뿐이다.

     

    하나의 목록에 map을 병렬로 적용할 수 있을까? 그럼 map2처럼 두 개의 병렬 계산을 결합하는 것이 아니라 N개의 병렬 계산을 결합하는 함수를 만들어 본다.

    def parMap[A, B](ps: List[A])(f: A => B): Par[List[B]]

    parMap을 그냥 새로운 기본수단으로서 작성할 수도 있었다. Par[A]는 ExecutorService => Future[A]의 별칭일 뿐임을 기억하자.

     

    parMap을 기존 조합기들을 이용해서 어느 정도나 구현할 수 있는지 본다.

    def parMap[A, B](ps: List[A])(f: A => B): Par[List[B]] = {
    
        val fbs: List[Par[B]] = ps.map(asyncF(f))
        ...
    }

     

    asyncF가 병렬 계산 하나를 분기해서 결과를 산출함으로써 A => B를 A => Par[B]로 변환함을 기억할 것이다.

     

    이를 이용하면 N개의 병렬 계산을 수월하게 분기시킬 수 있다. 

    이제 계산들의 결과취합할 방법이 필요하다. 

    형식들을 살펴보면, List[Par[B]]를 parMap의 반환 형식이 요구하는 Par[List[B]]로 변환하는 수단이 필요함을 알 수 있다.

     

    sequence 함수 작성. 

    def sequence_simple[A](l: List[Par[A]]): Par[List[A]] = 
      l.foldRight[Par[List[A]]](unit(List()))((h,t) => map2(h,t)(_ :: _))
    
    // This implementation forks the recursive step off to a new logical thread,
    // making it effectively tail-recursive. However, we are constructing
    // a right-nested parallel program, and we can get better performance by 
    // dividing the list in half, and running both halves in parallel. 
    // See `sequenceBalanced` below.
    def sequenceRight[A](as: List[Par[A]]): Par[List[A]] = 
      as match {
        case Nil => unit(Nil)
        case h :: t => map2(h, fork(sequenceRight(t)))(_ :: _)
      }
    
    // We define `sequenceBalanced` using `IndexedSeq`, which provides an 
    // efficient function for splitting the sequence in half. 
    def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] = fork {
      if (as.isEmpty) unit(Vector())
      else if (as.length == 1) map(as.head)(a => Vector(a))
      else {
        val (l,r) = as.splitAt(as.length/2)
        map2(sequenceBalanced(l), sequenceBalanced(r))(_ ++ _)
      }
    }
    
    def sequence[A](as: List[Par[A]]): Par[List[A]] =
      map(sequenceBalanced(as.toIndexedSeq))(_.toList)
    

    sequence를 구현했다면 parMap의 구현을 완성할 수 있다.

    def parMap[A, B](ps: List[A])(f: A => B): Par[List[B]] = fork {
    
        val fbs: List[Par[B]] = ps.map(asyncF(f))
        sequence(fbs)
    }

    구현을 fork 호출 하나로 감쌌다. 이 구현에서 parMap은 입력이 거대한 목록이라고 해도 즉시 반환된다. 이후에 run을 호출하면 하나의 비동기 계산이 분기되며, 그 계산을 N개의 병렬 계산을 띄우고 계산들이 끝나길 기다렸다가 결과들을 하나의 목록으로 취합한다.

     

    요소들을 병렬로 걸러내는 parFilter 구현.

    def parFilter[A](l: List[A])(f: A => Boolean): Par[List[A]] = {
      val pars: List[Par[List[A]]] = 
        l map (asyncF((a: A) => if (f(a)) List(a) else List())) 
      map(sequence(pars))(_.flatten) // convenience method on `List` for concatenating a list of lists
    }
    

    4. API의 대수

    API를 하나의 대수, 즉 일단의 법칙 또는 참이라고 가정하는 속성들을 가진 추상적인 연산 집합으로 간주하고, 그 대수에 정의된 게임 규칙에 따라 그냥 형식적으로 기호를 조작하면서 문제를 풀어나간다.

     

    4.1 map에 관한 법칙

    다음은 라이브러리를 위한 검사 코드를 작성할 때 하나의 검례로 사용할 만한 코드이다.

    map(unit(1))(_ + 1) == unit(2)

    unit(1)에 _ + 1 함수를 사상한 것이 어던 의미로는 unit(2)와 동등함을 의미한다.

    어떤 의미에서 이들이 동등한가?

    우선 임의의 유효한 ExecutorService에 대해 두 Par 객체의 Future 결과가 서로 같다면 그 두 Par 객체가 동등하다고 말하기로 한다.

    어떤 ExecutorService에 대해 이 법칙이 성립하는지는 다음과 같은 함수로 점검할 수 있다.

    def equal[A](e: ExecutorService)(p: Par[A], p2: Par[A]): Boolean =
    	p(e).get == p2(e).get

    함수를 일반화할 수 있듯이, 법칙도 일반화할 수 있다.

    map(unit(x))(f) == unit(f(x))

    이는 이 법칙이 1과 _ + 1 함수뿐만 아니라 임의의 x와 f에 대해 성립함을 의미한다.

    unit은 자신이 받은 것을 넘겨주기만 해야 한다. ExecutorService도 마찬가지다. Callable 객체들의 실행을 위해 제출할 때, 자신이 받을 갑에 대해 어떤한 가정을 두거나 그 값에 기초해서 행동 방식을 변경해서는 안 된다.

     

    이 법칙은 map과 unit의 구현에서 하향 캐스팅이나 isInstanceOf 점검을 허용하지 않는다.

     

    어떤 함수를 정의할 때 그보다 더 간단한 함수들, 즉 한 가지 일만 하는 함수들을 이용해서 정의하려는 것과 아주 비슷하게, 어떤 법칙을 정의할 때에는 한 가지 사실만 말하는 더 간단한 법칙들을 이용해서 정의할 수 있다.

     

    이 법칙이 임의의 x와 f에 대해 성립했으면 한다고 말했다. f를 항등 함수로 치환하면 흥미로운 점이 생긴다.

    // 초기법칙
    map(unit(x)(f) == unit(f(x))
    // f를 항등 함수로 치환
    map(unit(x)(id) == unit(id(x))
    // 단순화
    map(unit(x))(id) == unit(x)
    // 양변에서 unit(x)를 y로 치환
    map(y)(id) == y

    돌이켜 보면 unit의 언급은 군더더기였음을 알 수 있다.

     

    map이 할 수 없는 것을 생각해 보자.

    map은 함수를 결과에 적용하기 전에 예외를 던지고 계산을 망치지는 못한다. 

    map(y)(id) == y라고 할 때 반대 방향의 치환들을 통해서 원래의, 좀 더 복잡합 법칙으로 돌아갈 수 있다는 것이다.

    이러한 자유도는 map이 주어진 함수에 따라 다른 방식으로 행동하지 않기 때문에 생긴 것이다. 즉, 만일 map(y)(id) ==y라면 반드시 map(unit(x))(f) == unit(f(x))도 참이어야 한다.

    map의 매개변수성 덕분에 이 2차 법칙 또는 정리가 공짜로 생겼다는 점에서, 이를 공짜 정리라고 부르기도 한다.

    4.2 fork에 관한 법칙

    fork가 병렬 계산의 결과에 영향을 미치지 말아야 한다는 속성을 생각해 본다.

    fork(x) == x

    이전의 구현이 이 속성을 실제로 만족하고 있고, 이것이 fork의 작동 방식에 대한 기대와 부합하는 바람직한 속성이다.

    fork(x)는 x와 동일한 일을 수행하되, 주 스레드와는 개별적인 논리적 스레드에서 비동기적으로 수행해야 한다는 것이 fork의 작동 방식에 대한 기대이다.

     

    만일 이 법칙이 항상 성립하지는 않는다면, 의미의 변경없이 fork를 안전하게 호출할 수 있는 상황을 형식 시스템의 도움을 전혀 받지 않고 알아내는 수단을 찾아내야 했을 것이다. 

     

    이 간단한 속성은 fork의 구현을 강하게 제한한다. 

    4.3 법칙 깨기: 미묘한 버그 하나

    디버거의 관점에서 법칙을 깨본다. 모든 x와 ExecutorService에 대해 fork(x) == x라고 기대한다. x로 적합한 것은 fork나 unit, map2를 이용한 어떤 표현식이다. ExecutorService는 어떨까? 이를 구현하는 방법은 어떤 것들이 있을까? java.util.concurrent.Executors에 여러 구현들이 나열되어 있다.

     

    fork의 구현들 대부분에서 발생하는 다소 미묘한 문제점이 하나 있다. 내부적으로 고정된 크기의 스레드 풀을 사용하는 ExecutorService 구현은 교착에 빠지기가 아주 쉽다. 

    스레드 풀의 최대 스레드 개수가 1이라고 할 때, 현재의 구현을 이용해서 다음 코드를 수행하면 어떤 일이 발생할까?

    val a = lazyUnit(42 + 1)
    val S = Executors.newFixedThreadPool(1)
    println(Par.equal(S)(a, fork(a)))

    대부분의 fork 구현은 이 코드에서 교착에 빠질 것이다. 현재의 fork 구현을 다시 본다.

      def fork[A](a: => Par[A]): Par[A] =
        es => es.submit(new Callable[A] {
          // 다른 Callable 안에 있는 한 Callable의 결과를 기다린다. 
          def call = a(es).get
        })
    

    이 코드는 먼저 Callable을 제출하고, 그 Callable 안에서 또 다른 Callable을 ExecutorService에 제출한다. 그러면 그 결과가 나올 때까지 실행이 차단된다. 

     

    만일 스레드 풀의 크기가 1이면 문제가 발생한다. 바깥쪽 Callable을 제출되면 스레드 풀의 유일한 스레드가 할당된다. 그런데 스레드는 또 다른 Callable을 제출하고, 그 결과를 기다린다. 그런데 스레드 풀에는 또 다른 Callable을 실행할 스레드가 남아 있지 않다. 둘은 서로 기다리게 되면 교착이 발생한다.

     

    이런 반례가 있다면,  법칙이 성립하도록 구현을 고쳐보는 것, 법칙이 성립하는 조건들을 좀 더 명시적으로 밝히도록 법칙을 정련하는 두 가지 선택이 있다.

     

    고정 크기 스레드 풀에 대해 잘 작동하도록 fork를 고칠 수 있을까? 이전과는 다른 구현을 생각해 본다.

    def fork[A](fa: => Par[A]): Par[A] =
    	es => fa(es)

    이렇게 하면 교착이 방지된다. 문제점은 실제로 개별적인 논리적 스레드를 띄워서 fa를 평가하지 않는다는 점이다. 즉, 어떤 ExecutorService es에 대해 fork(hugeComputation)(es)주 스레드에서 hugeComputation을 실행한다. 이는 애초에 fork의 호출로 피하고자 했던 상황이다. 이 fork 구현도 여전히 좋은 조합기다.

    계산의 인스턴스화를 실제로 필요한 시점까지 미루는 용도로 유용하게 사용할 수 있기 때문이다. 이 구현은 좀 더 적합한 이름인 delay라고 부르기고 한다.

    def delay[A](fa: => Par[A]): Par[A] =
    	es => fa(es)

    그러나 원했던 것은 임의의 계산을 고정 크기 스레드 풀로 돌리는 것이다. 이를 위해서는 Par를 다른 방식으로 구현해야 한다.

    4.4 행위자를 이용한 완전 비차단 Par 구현

    고정 크기 스레드 풀로도 잘 작동하며 전혀 차단되지 않는(fully non-blocking) 방식의 Par 구현을 개발한다. 이 부분은 함수적 설계의 여러 측면을 논의한다는 이 전반적인 주제에 꼭 필요한 것이 아니다. 

     

    현재 표현의 본질적인 문제는, Future의 get 메서드를 호출하지 않고서는 Future에서 값을 꺼낼 수 없는데, 그 메서드를 호출하면 현재 스레드의 실행이 차단된다는 점이다. Par의 표현이 이런 식으로 자원을 흘리지 않도록 하려면 반드시 non-blocking 방식이어야 한다. 여기서 non-blocking이라는 것은 fork와 map2 구현이 현재 스레드를 차단하는 메서드(Future.get)를 절대로 호출하지 말아야 한다는 의미다.

    기본 착안

    Par의 비차단 표현을 어떻게 구현해야 할까? 착안은 간단하다. Par를 java.util.concurrent.Future로 바꾸어서 값을 꺼내는 대신, 적당한 때에 호출되는 콜백을 등록할 수 있는 Future를 도입하는 것이다. 

     

    sealed trait Future[A] {
    	// apply 메서드는 fpinscala.paralleism 패키지의 private 멤버다.
        // 즉, 오직 그 패키지안의 코드에서만 이 메서드에 접근할 수 있다.
    	private[parallelism] def apply(k: A => Unit): Unit
    }
    // Par는 이전과 동일한 모습이지만 java.util.concurrent의 것이 아니라
    // 우리가 직접 만든 새로운 비차단 Future를 사용한다.
    type Par[+A] = ExecutorService => Future[A]

    새로운 Par 형식은 이전과 동일한 모습이나, 직접 만든 Future를 사용한다. 새 Future의 API는 java.util.concurrent의 것과 다르다.

     

    이전의 Future는 결과를 돌려주는 get 메서드를 제공했지만, 새 Future는 A 형식의 결과를 산출하는 함수 k받고 결과를 이용해서 어떠한 효과를 수행하는 apply 메서드를 제공한다. 이런 종류의 함수를 계속 함수 또는 콜백 함수라고 부르기도 한다. 

     

    apply 메서드에 private[parallelism]이 지정되어 있다. 즉, 이 메서드는 라이브러리의 사용자에게 노출되지 않는다. 이렇게 해야 API가 순수성을 유지하며, 법칙들의 성립이 보장된다.

     

    이제 run 함수의 구현을 살펴본다. 이전과는 달리 새로운 run 함수는 그냥 A를 돌려준다. 이 함수는 Par[A]를 받고 A를 돌려주므로, 콜백 함수를 만들어서 Future 값의 apply 메서드에 전달할 필요가 있다.

    def run[A](es: ExecutorService)(p: Par[A]): A = {
    	// 결과를 저장하는 데 사용할, 변이 가능하고 스레드에 안전한 참조.
        // 이런 클래스들에 대한 좀 더 자세한 정보는 java.util.concurrent.atomic 패키지 참고.
    	val ref = new AtomicReference[A]
        // java.util.concurrent.CountDownLatch를 이용하면 countDown 메서드가 일정 횟수만큼
        // 호출될 때까지 스레드를 대기시킬 수 있다.
        // 여기서 countDown 메서드는 p로부터 A 형식의 값을 받을 때 한 번 호출된다.
        // run 구현은 그때까지 하단되어야 한다.
        val latch = new CountDownLatch(1)
        // 값을 받았으면 결과를 설정하고 빗장(latch)을 푼다. 
        p(es) { a => ref.set(a); latch.countDown }
        // 결과가 준비될 때까지 기다렸다가 빗장을 푼다.
        latch.await
        // 일단 빗장을 통과했다면 ref가 설정된 것이다. 이제 그 값을 돌려준다.
        ref.get
    }

    latch가 풀리길 기다리는 동안 run을 호출한 스레드가 차단됨을 주목하자. 그렇게 차단되지 않도록 run을 구현하는 것은 불가능하다. 이 함수는 A 형식의 값을 돌려주어야 하므로, 그 값이 준비되기 기다려야 한다. 이 때문에 이 API의 사용자는 결과를 기다릴수 있음이 확실한 때에만 run을 호출해야 한다. 

     

    API에서 run을 아예 제거하고 대신 Par에 대한 apply 메서드를 노출해서 사용자가 비동기 콜백을 등록하게 만들 수도 있다. 그 역시 유효한 설계상의 선택이다.

     

    Par를 실제로 생성하는 예를 본다. 가장 간단한 방식은 unit 함수이다. 

    def unit[A](a: A): Par[A] =
    	es => new Future[A] {
        	def apply(cb: A => Unit): Unit =
            	cb(a)
        }

    unit은 이미 사용할 수 있는 A 형식의 값을 가지고 있으므로, 그냥 그 값을 전달해서 콜백 함수 cb를 호출하기만 하면 된다. 만일 그 콜백 함수가 run 구현에서 비롯된 것이라면, 그 호출에 의해 빗장이 풀려서 결과가 즉시 마련된다.

     

    fork 함수에서 실제 병렬성이 도입된다.

    def fork[A](a: => Par[A]): Par[A] =
    	es => new Future[A] {
        	def apply(cb: A => Unit): Unit =
                // eval은 a의 평가를 위한 작업을 띄운 후 즉시 반환된다.
                // 콜백은 이후 다른 스레드에서 비동기적으로 호출된다.
            	eval(es)(a(es)(cb))
        }
    // 어떤 ExecutorService를 이용해서 계산을 비동기적으로 평가하기 위한 보조 함수.
    def eval(es: ExecutorService)(r: => Unit): Unit =
    	es.submit(new Callable[Unit] { def call = r})

    fork가 돌려준 Future는 이름으로 전달된 인수 a의 평가를 위한 작업을 띄운다. 그 작업은 a를 실제로 평가, 호출해서 하나의 Future[A]를 산출하며, 그 Future가 결과 A를 산출했을 때 호출될 콜백 함수 cb(Future 호출 시 인수로 주어진)를 등록한다.

     

    map2의 서명.

    def map2[A, B, C](a: Par[A], b: Par[B])(fL (A, B) => C): Par[C]

    map2는 두 Par 인수를 병렬로 실행해야 한다. 두 결과가 마련되면 그 둘로 f를 호출하고, 그 결과로 나온 c를 콜백 함수에 넘겨주어야 한다. 그런데 그 과정에서 몇 가지 경쟁 조건이 발생할 여지가 있다. java.util.concurrent의 저수준 기본수단들만으로는 정확한 비차단 구현이 어렵다.

    간략한 행위자 소개

    map2를 구현하기 위해 actor(행위자)라고 부르는 비차단 동시성 기본수단을 사용하기로 한다. 본질적으로 행위자는 하나의 동시적 프로세스로, 스레드를 계속해서 차지하지 않는다는 점이 특징이다. 대신 행위자는 메세지를 받았을 때에만 스레드를 점유한다.

     

    여러 스레드가 동시에 하나의 행위자에 메세지를 보낼 수 있지만, 행위자는 그 메세지들을 오직 한 번에 하나씩만 처리한다는 것이다. 나머지 메세지들은 이후의 처리를 위해 대기열에 담아 둔다. 

     

    이런 특징을 덕분에 행위자라는 동시성 기본수단은 여러 스레드가 접근해야 하는 까다로운 코드를 작성할 때 유용하다. 행위자 없이 그런 코드를 작성한다면 경쟁 조건이나 교착 상태가 발생하기 쉽다.

     

    행위자를 이용한 map2 구현

    이제 두 인수의 결과를 취합하는 map2를 Actor를 이용해서 구현해 보자. 코드는 직관적이며, Actor가 메세지를 한 번에 하나씩만 처리할 것이므로 경쟁 조건을 걱정할 필요가 없다.

    def map2[A, B, C](p: Par[A], p2: Par[B])(f: (A, B) => C): Par[C] -
    	es => new Future[C] {
        	def apply(cb: C => Unit): Unit = {
                // 두 개의 변이 가능 var가 두 개의 결과를 저장하는 데 쓰인다.
                val ar: Option[A] = None
                val br: Option[B] = None
                
                // 두 결과를 기다렸다가 f로 결합해 서 cb에 넘겨주는 행위자
                val combiner = Actor[Either[A, B](es) {
                	// 만일 A 결과가 먼저 오면 그것을 ar에 담아두고 B를 기다린다.
                    // B 결과를 이미 받았고 이제 A 결과가 왔다면, 두 결과로 f를 호출해서
                    // C를 얻고 그것을 콜백 함수 cb에 전달한다.
                	case Left(a) => br match {
                        case None => ar = Some(a)
                        case Some(b) => eval(es)(cb(f(a, b)))
                    }
                    case Right(b) => ar match {
                        case None => br = Some(b)
                        case Some(a) => eval(es)(cb(f(a, b)))
                    }
                }
                
                p(es)(a => combiner ! Left(a))
                p2(es)(b => combiner ! Right(b))
            }
        }

    이러한 구현들이 있으면 아무리 복잡한 Par 값들이라도 스레드 고갈을 걱정하지 않고 실행할 수 있다. 심지어 행위자가 하나의 JVM 스레드에만 접근할 수 있어도 마찬가지다이다.

    5. 조합기들을 가장 일반적인 형태로 정련

    조합기를 가장 일반적인 형태로 정련할 수 있는지 살펴보는 것이 중요하다. 그러면 새 조합기가 필요한 것이 아니라 좀 더 일반적인 조합기의 특별한 경우가 필요했던 것일 뿐임을 깨닫게 될 수도 있기 때문이다.

     

    이에 관한 예로, 두 분기 계산 중 하나를 초기 계산의 결과에 기초해서 선택하는 함수가 필요하다고 하자.

    def choice[A](cond: Par[Boolean])(t: Par[A], f: Par[A]): Par[A]

    이 함수는 만일 cond 결과가 true이면 t를 사용해서 계산을 진행하고 cond의 결과가 false이면 f를 사용해서 계산을 진행한다. 이를, cond의 결과가 나올 때까지 실행을 차단하고 그 결과를 이용해서 t나 f의 실행을 결정하는 식으로 구현할 수도 있다. 다음은 그러한 간단한 차단식 구현이다.

    def choice[A](cond: Par[Boolean])(t: Par[A], f: Par[A]): Par[A] = 
    	es =>
        	// cond의 결과가 나올 때까지 차단됨을 주의할 것.
        	if (run(es)(cond).get) t(es)
            else f(es)

    이 조합기를 좀 더 생각해 보자. 

    이 조합기는 cond를 실행하고, 결과가 나오면 t나 f 중 하나를 실행한다. 합리적인 것 처럼 보이지만 몇 가지 변형들을 살펴본다면 이 조합기의 본질을 좀 더 잘 파악할 수 있을 것이다. 여기서 Boolean을 사용하는 것은 다소 자의적이다. 그리고 가능한 두 병렬 계산 t와 f 중 하나를 선택한다는 사실도 다소 자의적이다. 꼭 두 개일 필요가 있을까? 어떤 조건에 기초해서 두 병렬 계산 중 하나를 선택하는 것이 유용하다면, N개의 계산 중 하나를 선택하는 것도 당연히 유용할 것이다.

    def choiceN[A](n: Par[Int])(choices: List[Par[A]]): Par[A]

    이 choiceN은 n을 실행하고, 그 결과에 기초해서 choices의 병렬 계산 중 하나를 선택한다. 이는 choice보다 좀 더 일반적이다.

     

    choiceN을 구현하고, choice를 choiceN을 이용해서 구현.

    def choiceN[A](n: Par[Int])(choices: List[Par[A]]): Par[A] = 
      es => {
        val ind = run(es)(n).get // Full source files
        run(es)(choices(ind))
      }
    
    def choiceViaChoiceN[A](a: Par[Boolean])(ifTrue: Par[A], ifFalse: Par[A]): Par[A] =
      choiceN(map(a)(b => if (b) 0 else 1))(List(ifTrue, ifFalse))
    

    choice를 더욱 일반적인 조합기로 정련할 수 있는지 시험한다.

     

    List를 사용하는 것은 필요 이상으로 구체적인 것 같다. 이 조합기는 컨테이너의 구체적인 종류에 구애받을 필요가 없다.

    def choiceMap[K,V](key: Par[K])(choices: Map[K,Par[V]]): Par[V] =
      es => {
        val k = run(es)(key).get
        run(es)(choices(k))
      }
    

    List와 마찬가지로, 가능한 선택들의 집합을 이처럼 Map으로 표현하는 것도 필요 이상으로 구체적이다. choiceMap의 구현을 살펴보면 Map의 API는 그리 많이 사용하지 않음을 알 수 있다. 

    Map[A, Par[A]]는 단지 A => Par[B] 형식의 함수를 제공하는 데 쓰일 뿐이다.

    이를 염두에 두고 choice와 choiceN을 다시 보면, choice에서는 한 쌍의 인수들이 그냥 Boolean => Par[A] 형식의 함수로 쓰이고 choiceN에서는 목록이 Int => Par[A] 형식의 함수로 쓰일 뿐임을 알 수 있을 것이다.

     

    이들을 모두 통합하는 좀 더 일반적인 서명을 만든다면 다음과 같다.

    def chooser[A,B](p: Par[A])(choices: A => Par[B]): Par[B] = 
      es => {
        val k = run(es)(p).get
        run(es)(choices(k))
      }
    
    /* `chooser` is usually called `flatMap` or `bind`. */
    def flatMap[A,B](p: Par[A])(choices: A => Par[B]): Par[B] = 
      es => {
        val k = run(es)(p).get
        run(es)(choices(k))
      }
    
    def choiceViaFlatMap[A](p: Par[Boolean])(f: Par[A], t: Par[A]): Par[A] =
      flatMap(p)(b => if (b) t else f)
    
    def choiceNViaFlatMap[A](p: Par[Int])(choices: List[Par[A]]): Par[A] =
      flatMap(p)(i => choices(i))
    

    함수들을 이런 식으로 일반활 때에는, 일반화의 결과로 나온 함수를 비판적으로 살펴보기 바란다. 

    chooser는 아마도 이 연산에 가장 적합한 이름이 아닐 것이다. chooser는 일단 실행되면 첫 계산을 실행하고 그 결과에 따라 둘째 계산을 결정하는 하나의 병렬 계산이다. 

    그런데 첫 계산의 결과가 준비되기 전에 둘째 계산이 반드시 존재해야 할 필요는 없다. List나 Map 같은 컨테이너에 저장해 둘 필요가 없는 것이다. 

    첫 계산의 결과를 이용해서 둘째 계산을 즉석에서 생성할 수도 있다. 함수적 라이브러리에 흔히 등장하는 그러한 함수를 흔히 bind나 flatmap이라고 부른다.

    def flatmap[A, B](a: Par[A])(f: A => Par[B]): Par[B]

    이 flatmap이 가장 기본적인 함수일까? 더욱 일반화할 수 있을까? 

    flatmap이라는 이름은 이 연산을 두 단계로 분해할 수 있음을 암시한다. 

    첫 단계는 Par[A]에 f: A => Par[B]를 사상해서 Par[Par[B]]를 생성하는 것이고, 둘째 단계는 중첩된 Par[Par[B]]를 평평하게 만들어서 Par[B]를 만드는 것이다. 이 부분이 흥미롭다. 임의의 x에 대해 Par[Par[X]]를 Par[X]로 변환하는 더욱 간단한 조합기를 추가하기만 하면 된다는 점을 암시해 준다. 이를 join이라 부르기로 하자.

    def join[A](a: Par[Par[A]]): Par[A]

    함수의 이름을 join이라고 한 것은, 개념적으로 이 함수는 일단 실행되면 내부 계산을 수행하고, 그 계산이 완료되길 기다리고, 그 결과를 돌려주기 때문이다.

    def join[A](a: Par[Par[A]]): Par[A] = 
      es => run(es)(run(es)(a).get())
    
    def joinViaFlatMap[A](a: Par[Par[A]]): Par[A] = 
      flatMap(a)(x => x)
    
    def flatMapViaJoin[A,B](p: Par[A])(f: A => Par[B]): Par[B] = 
      join(map(p)(f))
    

    댓글

Designed by Tistory.