ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [빅데이터 전문가의 하둡관리] 7. 스파크 애플리케이션 실행하기
    Software Development/Big Data 2022. 10. 23. 00:00

    http://www.kyobobook.co.kr/product/detailViewKor.laf?mallGb=KOR&ejkGb=KOR&barcode=9788931555752

     

    빅데이터 전문가의 하둡 관리 - 교보문고

    스파크 얀 HDFS 관리, 튜닝 및 보안 비법 대공개! | 빅데이터 전문가의 하둡 관리 데이터 양이 많은 페이스북같은 기업에서 서버의 트래픽이 몰리지 않고 사용자가 빠른 피드백을 받도록 하려면?

    www.kyobobook.co.kr

    위 책의 내용을 읽으며 공부한 내용을 요약 및 정리한 글입니다. 자세한 내용은 위 책에서 알 수 있습니다.

    스파크 프로그래밍 모델

    스파크 프로그래밍과 RDD

    스파크 프로세싱의 정수는 RDD다. RDD는 분산 저장된 엘리먼트나 객체들의 집합이다. 스파크에서 하는 모든 작업은 RDD를 생성하고 변환하고 변경하는 작업으로 이뤄진다. 스파크 코어는 RDD에 저장된 데이터를 분산하는 작업과 클러스터를 통해 오퍼레이션을 병렬 처리하는 작업을 담당한다.

     

    RDD는 단순히 객체들의 집합이다. 이것들은 나눠서 저장되고, 클러스터의 여러 노드에서 프로세싱될 수 있다. 데이터를 생성하는 가장 일반적인 방법은 외부 데이터셋을 로딩하는 것이다.

    RDD를 만들고 나면 RDD에 transformation과 action 오퍼레이션을 실행할 수 있다. transformation을 하면 기존 RDD로부터 새로운 RDD를 생성한다.

    action은 RDD로부터 어떤 결과를 계산한다. 계산된 결과는 HDFS에 저장되거나 드라이버 프로그램에 반환한다.

    스파크는 RDD에 action을 수행할 때마다 RDD를 다시 계산한다. RDD를 재사용할 생각이라면 메모리에 RDD를 저장하도록 스파크에 요청해야 한다. 물론 디스크에 데이터를 저장하는 것도 가능하다. 어떤 데이터를 반복해서 요청한다면 그 데이터의 일부를 메모리에 로드하는 것이 좋다.

    스파크는 확장성과 장애허용 기능을 모두 지원하고 사용자가 어느 시점에서든 데이터를 파이프라인에서 메모리로 저장하도록 하여 인메모리 프로세싱을 지원한다. 이 기능으로 같은 결과를 반복적으로 계산하지 않아도 된다.

    함수 프로그래밍

    함수에는 상태나 사이드 이펙트가 없다. 입력과 출력으로 구성되어 있다.

     

    함수를 매개변수로 넘기기

    스파크는 함수를 매개변수로 사용할 수 있다. 대부분의 RDD 오퍼레이션에서 함수를 파라미터로 받고 있다. 스파크는 이 함수를 RDD 레코드에 적용한다.

    def toUpper(s):
     	return s.upper()
    
    mydata = sc.textFile("purplecow.txt")
    mydata.map(toUpper).take(2)

    익명 함수

    식별자(이름)가 없는 인라인 함수를 사용할 수 있다.

    스파크 프로그래밍

    드라이버 머신에서 코드로 RDD를 정의한다. 그러면 스파크 클러스터에 걸쳐 RDD를 지연 평가한다. 다음은 스파크를 갖고 어떻게 프로그래밍하는지 간결하게 설명한다.

    • 사용 가능한 메서드로 하나 이상의 RDD를 만드는 방법을 정의한다. 디스크에서 RDD생성 또는 기존 RDD를 새로운 RDD로 변경, 메모리에 데이터 집합을 처리한다.
    • RDD의 엘리먼트에 함수를 전달해 다양한 오퍼레이션을 수행한다. 스파크는 80개 이상의 고수준 오퍼레이션을 제공한다.
    • 액션을 수행한다. count, collect

    스파크는 워커 노드에서 함수를 실행할 때, 함수에서 사용하는 변수들을 워커 노드에 복사한다.

    • 브로드캐스트 변수: 사이즈가 큰 읽기 전용 데이터를 워커 노드에 한 번 보낸다. 예를 들어, 룩업 테이블 같은 것이 있다.
    • 어큐뮬레이터: 워커 노드가 관련 오퍼레이션에 추가할 수 있는 변수들이다. Spark 드라이버는 이 변수를 카운터로 이용한다. 이런 변수들은 드라이버에게는 읽기 전용이다.

    스파크의 지연 실행 모델

    액션을 수행할 때까지 실행을 지연시킨다. 액션이 실제로 필요할 때까지 transformation하지 않는다.

     

    변형을 체인으로 연결하기

    스파크에서는 transformation으로 연결해 체인처럼 만들 수 있다.

    sc.textFile("purplecow.txt").map(line => line.toUpperCase()).filter(line => line.startWith("I").count()

    RDD Lineage

    리니지란 어떤 RDD가 어디서왔는지에 대한 관계를 말한다. toDebugString을 이용하면 모든 RDD의 리니지를 볼 수 있다.

    val mydata_filt = sc.textFile("a.txt").map(line => line.toUpperCase()).filter(line => line.startWith("I"))
    mydata_filt.toDebugString

    스파크 애플리케이션

    스파크 애플리케이션은 드라이버 프로그램으로 이뤄져 있다. 이들은 메인 함수로 동작하면서 클러스터에서 병렬로 오퍼레이션을 수행한다. 스파크 애플리케이션은 2개의 추상적인 것에 의존한다.

    • RDD
    • 공유 변수(Shared variables)

    스파크의 핵심은 RDD다. RDD는 병렬로 작업할 수 있는 엘리먼트들의 집합이다. RDD는 클러스터 내 여러 노드에 걸쳐 저장돼 있는 읽기 전용 엘리먼트 집합을 추상화한 것이다. 여러 노드들이 병렬로 처리하는 것이 가능하다.

    • Resilient(회복 가능한): 스파크는 데이터를 잃어버리면 메모리에 데이터를 다시 생성한다. 장애 허용을 보장.
    • Distributed: 데이터는 여러 노드에 분산 저장되고 병렬 오퍼레이션을 이용해 접근할 수 있다.
    • Dataset: 데이터를 프로그램을 이용해 만들거나 데이터 파일에서 가져올 수 있다.

    RDD는 HDFS와 같은 분산 스토리지에 저장될 수 있다.

    RDD의 기초

    RDD는 다음과 같은 종류의 엘리먼트를 가질 수 있다.

    • 기본 타입: 정수, 문자
    • 시퀀스 타입: 문자열, 리스트, 배열, 사전
    • 직렬화할 수 있는 스칼라와 자바 객체

    RDD 생성

    RDD를 만드는 두 가지 방법

    • 스파크 드라이버 프로그램에서 기존의 콜렉션들을 병럴 처리할 수 있다.
    • 관계 데이터베이스, HDFS 또는 공유 파일 시스템에 있는 외부 데이터 세트를 참고할 수 있다.

    병렬로 RDD 생성하기

    val data = Array(1,2,3,4,5)
    val distData = sc.parallelize(data)

    이 코드는 distData라는 분산 데이터셋을 만든다. 이 데이터셋에 오퍼레이션을 병렬로 실행할 수 있다. distData.reduce((a,b) => a + b)

    스파크는 사용하는 클러스터에 따라 자동으로 몇 개의 파티션을 데이터셋으로 나누는 작업을 한다. parallelize 메서드에 파티션을 나눌 숫자를 두 번째 파라미터로 설정할 수 있다.

    val distData = sc.parallelize(data, 20)

    RDD 오퍼레이션

    • 액션: RDD로부터 어떤 값을 계산한다. count() 오퍼레이션과 같은 것이다. 새로운 RDD를 생성하지 않는다.
    • 변환: 새로운 RDD 생성.

    액션과 연관된 공통 RDD 오퍼레이션

    • count(): 엘리먼트 개수를 반환.
    • saveAsTextFile: 텍스트 파일에 저장.
    • take: 처음 n개의 엘리먼트 배열을 반환.
    • first: RDD의 첫 번째 엘리먼트를 반환.
    • top: 상위 n개의 엘리먼트를 반환.

    변환을 포함한 일반 RDD 오퍼레이션

    • map: RDD의 각 레코드에 주어진 함수를 실행해 생성된 데이터로 새로운 RDD로 반환.
    • sample: 기존 RDD에서 일부를 샘플링해 새로운 RDD 생성.
    • filter: 기존 RDD의 일부 레코드를 포함하거나 제거해 새로운 RDD를 생성.
    • flatMap: map과 비슷하지만, 각각의 입력에 대해 0개 또는 여러 개의 이웃을 매칭한다는 점에서 다르다. 따라서 주어지는 함수는 하나의 결과를 내는 함수가 아니라 시퀀스 데이터를 반환.
    • distinct: 중복 데이터를 제거
    • sortBy: 정렬

    RDD 저장

    persist() 또는 cache() 메서드를 통해 RDD를 저장할 수 있다. 메모리, 오프-힙, 디스크에 저장할 수 있다.

    스파크 애플리케이션의 아키텍처

    스파크 용어

    • 애플리케이션: 하나의 드라이버에서 관리하는 하나 이상의 잡을 포함한다.
    • 잡: 실행된 태스크 집합
    • 스테이지: 스파크가 병렬로 실행되는 잡의 태스크 집합
    • 태스크: 익스큐터에 할당된 독립적인 작업 단위

    스파크 애플리케이션 컴포넌트

    • 드라이버 프로그램
      • 애플리케이션으로 스파크가 클러스터의 워커 노드에서 실행할 프러세싱 코드를 갖고 있다. 드라이버 프로그램은 클러스터상에 하나 또는 그 이상의 잡을 실행할 수 있다.
    • 클러스터 매니저
      • 스파크를 클러스터에 분산된 형태로 실행하려고 할 때는 클러스터 리소스를 관리하는 클러스터 매니저가 필요하다. 얀, 메소스, 독립형 스파크 클러스터
    • 워커 노드에서 실행되는 워커 프로세스
      • 클러스터의 워커 노드에서 각각 실행된다. 스파크 애플리케이션이 필요로 하는 CPU, 메모리 그리고 스토리지와 같은 리소스를 제공
    • 워커 노드에서 실행되는 익스큐터 프로세스
      • 스파크는 각각의 애플리케이션을 실행하기 위해 익스큐터를 생성한다. 이 프로세스는 JVM 프로세스로 클러스터의 워커 노드마다 생성된다. 익스큐터 프로세스는 애플리케이션 코드를 실행하고, 필요할 때 데이터를 메모리나 디스크에 캐시한다. 각각의 애플리케이션은 자신만의 익스큐터를 가진다. 그리고 애플리케이션이 종료되면 익스큐퍼 프로세스도 사라진다.
    • 워커 노드에서 실행되는 태스크 프로세스
      • 스파크 태스크는 익스큐터에서 실행할 수 있는 가장 작은 일이다. 하나의 태스크는 하나의 익스큐터로 보내지는 일의 단위다. 워커 노드의 익스큐터는 많은 스레드를 갖고 있는데, 각 태스크는 이들 중 하나의 스레드에서 실행된다. 대스크는 계산을 하고, 그 결과를 반환하거나 오퍼레이션을 실행한다.
    • 잡과 스테이지
      • 잡은 태스크들의 집합으로 병렬 계산을 할 수 있다. 잡당 할당되는 태스크 수는 데이터 파티션의 숫자에 좌우된다. 따라서 데이터 파티션의 수는 스파크 잡이 얼마나 병렬도 처리할 수 있느냐는 결정한다. 스테이지는 잡을 의존성이 있는 작은 대스크 셋으로 나눈 것이다.

    인터랙티브하게 스파크 애플리케이션 실행하기

    스파크 애플리케이션을 인터랙티브하게 실행할 때는 데이터를 여러 가지로 확인하려는 단계에서 실시한다.

    스파크 셸과 스파크 애플리케이션

    스파크 셸을 통한 잡의 실행과 스파크 애플리케이션을 통한 실행의 차이.

    • 스파크 셸은 데이터를 인터랙티브하게 조사하거나 조작할 수 있다.
    • 스파크 애플리케이션은 스파크에서 동작하는 별도의 프로그램이다.

    스파크 애플리케이션이 실행되기 전에 먼저 스파크컨텍스트 객체를 만들어줘야 한다. 스파크가 클러스터에 접속하기 위한 정보를 갖는다.

    클러스터에서 spark-shell 실행하기

    pyspark와 spark-shell 명령에는 --master 옵션이 있다. 이 옵션으로 yarn을 명시할 수 있다. 기본값은 local이다.

     

    다음은 예제는 얀으로 관리되는 하둡 클러스터에서 클라이언트 모드로 스파크 셸을 어떻게 실행하는지 보여준다.

    spark-shell --master yarn -deploy-mode client

    스파크 클러스터 실행에 대한 개요

    스파크 애플리케이션을 클러스터에 제출하면 애플리케이션의 드라이버 프로그램을 통해 모든 것을 처리한다. 메인 프로그램(드라이버 프로그램)의 스파크컨텍스트 객체를 통해 클러스터의 독립적인 프로세스로 동작하는 애플리케이션을 관리한다.

    스파크컨텍스트는 프로그램이 클러스터에서 실행될 때, 다양한 클러스터 매니저(yarn,mesos)에 접속할 수 있다.

    드라이버 프로그램을 시작하면, 스파크 프레임워크가 스파크가 처리할 데이터가 있는 클러스터 노드에 익스큐터 프로세스를 초기화시킨다.

    1. 드라이버가 실행되고 스파크 애플리케이션의 main 메서드가 호출된다.

    2. 드라이버는 클러스터 매니저에게 프로세스를 실행할 리소스를 요청한다.

    3. 드라이버 프로그램을 대신해 클러스터 매니저가 익스큐터를 실행한다.

    4. 드라이버가 애플리케이션 로직을 실행한다. 그리고 태스크를 익스큐터에게 보낸다.

    5. 익스큐터가 태스크를 실행해 계산을 수행하고 그 결과를 저장한다.

    6. main 메서드가 종료되거나 stop이 호출되면 드라이버가 모든 익스큐터를 중지시키고, 클러스터 매니저에게 받은 리소스들을 반환한다.

    스파크 애플리케이션을 만들고 서밋하기

    스파크 애플리케이션 만들기

    스칼라 또는 자바를 이용해 스파크 애플리케이션을 만들면, 애플리케이션을 컴파일하고 JAR 파일로 만들어 워커 노드로 보내야 한다.

     

    5개의 스레드를 사용해 로컬에서 스파크 애플리케이션을 실행.

    spark-submit -master 'local[5]' -class WordCount MyJarFile.jar fileURL

    독립적 스파크 클러스터에서 애플리케이션 실행하기

    • 클라이언트 모드: 클라이언트는 spark-submit 스크립트 자신이다. 클라이언트 프로세스에서 실행. 클라이언트 프로세스가 spark-submit 스크립트를 실행
    • 클러스터 모드: 스파크 애플리케이션은 클러스터 모드에서 클러스터의 워커 노드에서 실행된다.

    --master 플래그

    클러스터 URL을 지정하는 것으로, 클러스터 매니저를 가르킨다.

    • spark:host:port -> 스파크 독립 클러스터에 연결한다.
    • mesos://host:port -> 메소스 클러스터에 연결한다.
    • yarn: 하둡 클러스터에 연결한다. 클러스터의 위치는 환경 변수 HADOOP_CONF_DIR 또는 YARNCONF_DIR를 통해 지정한다. yarn-client로 지정하면, --deploy-mode client 함께 설정한 것과 같고, yarn-cluster로 지정하면, --deploy-mode cluster도 함께 설정한 것과 같다.
    • local: 로컬 모드 --master 플래그의 기본값이다.
    • local(n): n개의 코어로 동작하는 로컬 모드.

    HDFS와 스파크

    스파크의 데이터 소스로 HDFS를 사용한다면 다음에 설명하는 두 파일을 스파크의 클래스 경로에 추가해야 한다.

    • hdfs-site.xml: HDFS 클라이언트를 위한 기본 동작을 포함한다.
    • core-site.xml: 기본 파일 시스템 이름을 설정한다.

    이 파일을 스파크에게 노출시키기 위해서는 $SPARK_HOME/spark-env.sh의 환경 변수 HADOOP_CONF_DIR를 설정해 이 두 파일의 위치를 알 수 있도록 해야 한다.

    JDBC/ODBC 서버 사용하기

    스파크는 JDBC 서버를 제공한다. 이 서버는 클라이언트가 공유할 수 있는 독립적인 드라이버 프로그램처럼 동작한다. BI툴도 JDBC 서버를 통해 스파크 클러스터에 접속할 수 있다.

     

    스파크의 JDBC 서버는 하이브의 HiveServer2와 매우 비슷하게 동작한다. HiveServer2처럼 Thrift 통신 프로토콜을 사용하고, Thrift Server로 불린다. 스파크 홈의 sbin 디렉토리에 있는 start-thriftserver.sh로 스파크 JDBC 서버를 실행할 수 있다. HiveServer2처럼 10000 포트를 기본 포트로 사용한다.

    ./sbin/start-thriftserver.sh --master sparkMaster

    JDBC 서버가 시작되면 beeline 클라이언트 프로그램으로 연결할 수 있다. 이 프로그램은 SQL 셸을 열고, JDBC 서버를 이용해 명령을 수행할 수 있다.

    ./bin/beeline -u jdbc:hive2://localhost:10000

    스파크 스트리밍으로 스트리밍 데이터 다루기

    스트림은 다양한 데이터 소스가 있다. 트위터, 카프카, 플룸

    스파크 스트리밍 동작 방식

    스파크는 지속적으로 들어오는 스트리밍 데이터를 시간을 이용해 슬라이스로 나눈다. 이것을 배치 주기라 하고, 이 데이터를 스파크가 처리한다. StreamingContext생성할 때, 사용할 타임 슬라이스 길이를 지정할 수 있다.

    스트리밍 데이터가 나눠진 각각의 배치들이 각각 별도의 RDD가 된다.

    마이크로 배치

    배치 주기를 결정하면, 스파크 스트리밍은 들어오는 스트리밍 데이터를 배치 주기의 단위로 나눠 스파크가 프로세싱할 수 있도록 보낸다.

    스파크가 마이크로 배치를 처리하는 방식

    스파크 코어로 보내진 마이크로 배치는 RDD의 스트리밍이 된다. 스파크 스트리밍은 불연속 스트림이라 불리는 추상화된 형태의 RDD로 표현된다. 이 논리적인 엔티티를 실행함으로써 내부의 RDD가 실행된다.

    스파크 스트리밍 소스의 기본 두 타입

    • 기본 소스: 여기에는 파일고 소켓 연결이 포함된다.
    • 응용 소스: 여기에는 카프가와 플룸 같은 스트리밍 솔루션이 포함된다.

    윈도 계산

    스파크 스트리밍의 윈도 계산 기능을 사용하면, 데이터의 슬라이딩 윈도를 바탕으로 변환을 적용할 수 있다. 다음 두 매개변수를 통해 슬라이딩 윈도를 설정할 수 있다.

    • window length: 이 매개변수는 1minute처럼 설정해 윈도의 기간을 정할 수 있다.
    • sliding interval: 오퍼레이션 주기를 결정한다.

    window length와 sliding interval 값은 배치 주기의 배수가 돼야 한다. window length는 sliding interval의 배수가 돼야 한다.

    스파크 SQL을 사용해 구조화된 데이터 처리하기

    스키마를 갖고 있는 데이터는 구조화된 데이터로 간주된다. 스파크 SQL을 이용해 데이터에 대한 쿼리를 할 수 있다. 두 가지 방법이 있다.

    • 스파크 애플리케이션에 SQL문을 사용해 쿼리한다.
    • 태블로와 같은 외부의 BI툴을 사용해 쿼리한다. 이 방식은 JDBC 또는 ODBC를 통해 Spark SQL에 접속한다.

    HiveContext와 SQLContext

    • SQLContext: 가장 기본이 되는 것으로 스파크 SQL 기능의 일부를 제공.
    • HiveContext: HiveQL의 연결을 제공하고 하이브의 기능성과 관련 있다.

    HiveContext를 추천한다. HiveQL을 사용해 스파크 SQL을 연동해 사용하는 방식이 가장 좋다.

    스파크 SQL에서 하이브로 연결하기

    스파크 SQL은 하이브를 사용하지 않는다고 하더라도 잘 동작한다. 하지만 스파크 SQL을 설치된 하이브에 연결하려면 하이브 설정 파일(hive-site.xml)을 SparkConf 디렉토리($SPARKHOME/conf)에 복사해야 한다.

    스파크 SQL과 연동하기

    스파크 SQL을 사용하는 가장 좋은 방법은 스파크 애플리케이션에서 데이터를 로드하고 쿼리하는 데 사용하는 것이다.

     

     

     

     

     

    댓글

Designed by Tistory.