-
[Pinot] Apache Pinot 조사Software Development/Data Engineering 2025. 2. 1. 01:18
Pinot은 링크드인에서 개발한 실시간 분산 OLAP 데이터스토어이다. 초저지연 및 높은 처리량을 제공합니다.
주요 특징으로 다음과 같은 것들이 있습니다.
Fast Queries: 페타바이트 데이터셋에 대한 대부분의 필터 및 집계를 수십 밀리초 지연으로 제공합니다.
High Concurrency: 동시에 수십만 쿼리를 실행할 수 있습니다.
Batch and Streaming Ingest
Upserts: 같은 레코드를 여러번 입수해도 되고 최신 레코드만 쿼리할 수 있습니다.
Versatile Joins: 다양한 조인이 가능합니다. fact/dimension, fact/fact, ...
Rich Indexing Options: timestamp, inverted, StartTree, bloom filter, range, text, json 등의 인덱스를 지원
Built for Scale
SQL Query Interface
Built-in Multitenancy
Pinot이 어떤 방식으로
- OLAP을 초저지연 및 높은 처리량은 제공하는지
- 동시에 수십만 쿼리를 실행할 수 있는지
- 다양한 조인에서 좋은 성능을 낼 수 있는지
- 여러 종류의 인덱스를 어떤 방식으로 제공 및 관리하는지
알아가는게 핵심이라 볼 수 있습니다.
User-facing real-time analyticsuser-facing analytics는 모든 유저가 개인화된 분석을 할 수 있습니다. 위에서 설명한 Pinot의 강점을 통해서 개인화된 분석을 지원할 수 있습니다.Concepts
columnar form
Sharding of data to scale both storage and computation
distributed architecture
tabular data model
Pinot storage model
Pinot은 Table, Segment, Tenant, Cluster와 같은 용어를 사용하여 스토리지 모델을 정의합니다.
Table: 데이터를 저장하기 위해
Segment: 데이터를 파티셔닝하기 위해
Tenant: 데이터를 격리시키기 위해
Cluster: 데이터를 관리하기 위해
Table: 전통적인 트랜잭셔널 데이터베이스 처럼, related data의 모음을 부르는 논리적인 단위입니다. 테이블은 column들과 row들로 구성되어 있고 SQL로 질의가 가능합니다. 스키마가 존재하고 컬럼과 그에 대한 데이터 타입을 가지고 있습니다.
단일 스키마를 상속받아서 각기 다른 인덱싱, 파티셔닝, 복제와 같은 설정을 하여 여러 테이블을 생성할 수 있습니다.
스키마는 JSON 파일에 정의되고 각 테이블은 각자의 JSON 파일을 가지고 있습니다. 멀티플 테이블은 단일 스키마를 공유합니다. 그리고 각자의 유니크한 이름 및 인덱싱 전략 파티셔닝 데이터 소스 그리고 메타데이터를 가지고 있습니다.
Segment: Pinot 테이블은 하나 이상의 독립적인 세그먼트에 샤딩하여 테이블의 데이터를 저장합니다. 세그먼트는 테이블 데이터를 시간 기반으로 파티셔닝하며 저장 및 계산 요구에 따라 수평적으로 확장되는 Pinot 서버에 저장됩니다.
Tenant: 테넌트를 통해서 논리적인 네임스페이스에 속한 모든 테이블들은 다른 테이블들과 격리할 수 있습니다. 단일 클러스터를 통해서 테넌트를 분리하여 Pinot을 사용할 수 있습니다.
Cluster: Cluster는 소프트웨어 및 하드웨어 리소스들 입니다.
Controller: 클러스터의 메타데이터 및 클러스터 자원을 관리합니다.Zookeeper: 컨트롤러가 사용합니다. 내고장성 및 메타데이터의 영속성, 테이블 설정, 스키마, 메타데이터 등을 관리합니다.Broker: 클라이언트 프로세스의 쿼리를 받아 서버에게 전달합니다.Server: 세그먼트 파일과 쿼리 프로세싱을 제공합니다.Minion: 백그라운드 계산을 수행합니다. 세그먼트 최적화 및 성능을 위한 추가적인 인덱스를 생성합니다.Controller: Pinot 클러스터의 일관성 및 라우팅을 관리하는 core orchestrator입니다. 컨트롤러는 테이블의 리소스 할당과 관련된 시스템의 상태 변경에 대해서 관리합니다.Server: real-time server와 offline 서버가 있습니다. real-time server는 외부 시스템으로 부터 실시간 데이터를 지속적으로 수집합니다.Architecture
Distributed design principlesHighly available: Pinot은 SPOF가 없습니다. Replication을 지원하기 때문입니다. 그래서 몇개의 노드가 죽어도 여전히 쿼리 프로세싱이 가능합니다.Horizontally scalable: 워크로드가 증가할 때 새로운 노드를 추가하여 확장시킬 수 있습니다.Immutable data: Pinot은 저장된 모든 데이터가 immutable하다고 가정합니다. 이러한 특징은 데이터 저장 및 복제를 시스템에서 단순화하는데 도움을 줍니다.Dynamic configuration changes: 새로운 테이블을 추가, 수정, 확장, 데이터 인입은 쿼리 가용성에 영향을 주지 않습니다.Apache Helix and ZooKeeper
분산 시스템은 스스로를 유지 관리하지 않으며, 원활한 작동을 위해 정교한 스케줄링과 리소스 관리가 필요합니다. Helix가 이러한 역할을 합니다. Helix 그 자체는 Pinot과 독립적이지만 Pinot에서 나왔습니다. Helix는 컨트롤러 프로세스 위에서 실행됩니다. Zookeeper를 내결함성, 일관성, 내구성을 갖춘 상태 저장소로 사용합니다.
Helix는 클러스터의 의도된 상태를 유지 관리하며, 여기에는 서버와 브로커의 수, 모든 테이블의 구성 및 스키마, 스트리밍 데이터 수집 소스와의 연결, 현재 실행 중인 배치 수집 작업, 클러스터 내 서버에 대한 테이블 세그먼트 할당 등이 포함됩니다.
Helix 클러스터는 3가지의 물리적 노드 타입이 있습니다.
Participant: 데이터 저장 계산 작업 수행을 합니다.
Spectator: 해당 노드는 이벤트를 통해서 Participant의 변화하는 상태를 관찰합니다.
Controller: Participant 노드의 상태를 관찰하고 관리합니다. 컨트롤러는 클러스터 내에서 모든 상태 전환을 조정하며, 상태 제약 조건을 충족하면서 클러스터 안정성을 유지하는 역할을 담당합니다.
Helix는 스토리지 추상화를 표현하기 위해 두 가지 논리적 구성 요소를 정의합니다.Partition: 하나의 파티션이 있는 데이터 스토리지 단위입니다. Pinot Segment가 파티션입니다.Resource: 파티션들의 논리적 모음입니다. Pinot 테이블이 리소스입니다.Pinot과 Helix는 다음과 같이 매핑됩니다.Pinot Component Helix Component Segment Helix Partition Table Helix Resource Controller Helix Controller or Helix Agent Server Helix Participant Broker Helix Spectator Minion Helix Participant Helix는 Zookeeper를 클러스터 상태를 유지하기 위해 사용합니다. Zookeeper는 Helix Spectator들에게 클러스터 상태 변경에 대한 알림을 보냅니다. Zookeeper는 클러스터의 다음과 같은 정보를 저장합니다.
Resource Stored Properties Controller 현재 리더 컨트롤러 Servers and Brokers 서버와 브로커의 리스트. 모든 서버와 브로커의 현재 설정값. 모든 서버와 브로커의 현재 헬스 상태 Tables 테이블 리스트, 테이블 설정, 테이블 스키마, 테이블 세그먼트 종류 Segment 세그먼트의 서버 로케이션, 세그먼트의 현재 상태, 각 세그먼트의 메타데이터 Controller
Pinot의 메타데이터가 변경되거나 노드가 실패했을 때 자원을 스케줄링하거나 재스케줄링합니다. Apache Helix 컨트롤러로서 클러스터를 구성하는 리소스를 스케줄링하고, 특정 외부 프로세스와 클러스터 구성 요소 간의 연결(예: 실시간 테이블 및 오프라인 테이블의 데이터 수집)을 조정합니다. 컨트롤러는 단독 서버에서 단일 프로세스로 배포하거나, 활성/대기(active/passive) 구성으로 중복 서버 그룹으로 배포할 수 있습니다. 오직 하나의 컨트롤러만이 Active할 수 있습니다.
Broker
적절한 서버 인스턴스에 쿼리를 라우팅합니다. 브로커는 모든 서버에서 응답을 모아서 최종 결과를 반환합니다.
Query processing: single-stage engine or multi-stage engine
single-stage engine
- 테이블 설정에 정의된 라우팅 전략에 따라 라우팅
- 각 서버에서 쿼리를 위해 세그먼트 리스트를 계산
- 각 서버에 쿼리를 전송하여 해당 세그먼트에 대해 로컬로 실행
- 각 서버로부터 결과를 수신하고 이를 병합
- 클라이언트에게 쿼리 결과를 전송
multi-stage engine- 쿼리 실행에 필요한 세그먼트를 기준으로 첫 번째 단계에서 실행될 서버를 선택- 쿼리 계획의 각 단계에 대해 해당 계획의 관련 부분을 클러스터 내 하나 이상의 서버로 전송- 쿼리 계획을 수신한 서버는 각자 자신의 쿼리 부분을 실행- 브로커는 쿼리의 최종 단계에서 항상 단일 서버로부터 완전한 결과 집합을 수신- 브로커는 쿼리 결과를 클라이언트로 전송Server
서버는 로컬에 연결된 저장소에 세그먼트를 호스팅하고, 해당 세그먼트에서 쿼리를 처리합니다. 운영자들은 "실시간" 서버와 "오프라인" 서버를 구분하여 말하지만, 실제 서버 프로세스나 설정에는 이 둘을 구분하는 차이가 없습니다.
Offline Server
Real-time serversMinion
Minion은 브로커와 서버가 수행하는 쿼리 처리 외에 테이블 데이터에 대해 백그라운드 작업을 수행하는 선택적인 클러스터 Component입니다. 독립적인 하드웨어에서 실행되며 컨트롤러가 지시한대로 작업을 실행합니다.
Components
Indexing
Pinot은 다음과 같은 인덱스를 지원합니다.
Bloom Filter: 확률적 자료구조를 사용기 때문에 EQUALITY or IN predicate 사용 패턴에서 좋습니다. 질의에서 없는데 있다고 할 순 있으나 있는 것 없다고 하진 않습니다.
SELECT COUNT(*) FROM baseballStats WHERE playerID = 12345 OR SELECT COUNT(*) FROM baseballStats WHERE playerID IN(12345, 45668, 56789)
Forward Index: Dictionary-encoded forward index, run-length 인코딩 forward index, raw forward index
Dictionary-encoded forward index
스토리지 효율성과 쿼리 지연 시간을 줄이기 위해서 반복되는 데이터셋에 대해서 dictionary index를 쓰는 것을 추천합니다.
만일 휴리스틱하게 컬럼 후보를 지정하기 위해서는 다음과 같은 고려사항을 검토할 수 있습니다.
- 사전 인코딩으로 표시되어 있어야 합니다(원시로 표시된 열은 항상 원시로 인코딩됩니다).
- 단일 값(single-valued)이어야 합니다(다중 값(multi-valued) 열은 이 휴리스틱에서 고려되지 않습니다).
- int, long, double, timestamp 등 고정 크기 유형이어야 합니다. json, string, byte와 같은 가변 크기 유형은 이 휴리스틱에서 절대 고려되지 않습니다.
- 텍스트 인덱스나 JSON 인덱스로 인덱싱되지 않아야 합니다(이 인덱스들은 카디널리티가 매우 큰 경우에만 유용하기 때문입니다).
Raw-forward indexForward index는 기본적으로 활성화되어 있으며, 명시적으로 비활성화하지 않는 한 모든 열은 Forward index를 가집니다. 필요없는 케이스에 대해서는 Forward index를 비활성화한다면 저장 공간을 절약할 수 있습니다.
Dictionary Raw Value Provides compression when low to medium cardinality Eliminate padding overhead (역인덱스와 같은) 인덱싱을 지원합니다. 역인덱스 없음(오직 JSON/텍스트/FST 인덱스만 지원)한 단계의 참조 해제를 추가하므로 디스크 탐색이 증가할 수 있습니다.추가 참조 해제가 제거되므로, 관심 있는 모든 문서가 연속적인 경우에 적합합니다.문자열(String)의 경우, 사전(Dictionary)에서 모든 값을 동일한 길이로 맞추기 위해 패딩을 추가합니다. 문서들이 공간적 지역성을 갖지 않으면, 청크 압축 해제 오버헤드가 발생합니다.Dictionary-encoded forward index with bit compression (기본값)이 방식은 column의 각 unique 값에 ID를 할당하고 이러한 ID를 해당 값으로 다시 매핑하는 dictionary가 구성됩니다. 실제 값을 저장하는 대신, 기본 forward index는 비트 압축된 ID를 저장합니다. 이 방법은 고유 값이 적은 column을 다룰 때 특히 효과적이며, 공간 효율성을 크게 향상시킵니다.colA의 경우 중복된 값이 있어서 공간을 절약할 수 있습니다. colB는 전부 Unique하기 때문에 압축 효과가 없으며 패딩 오버헤드가 있습니다.Sorted forward index with run-length encodingcolumn이 정렬 가능할 경우에 대해서는 run-length encoding을 적용하는 것이 좋습니다. 시작과 끝의 document ID만 저장하기 때문입니다.정렬된 순방향 인덱스는 효율적인 압축과 데이터 지역성(locality)의 이점을 제공하며, 역인덱스(inverted index)로도 사용할 수 있습니다.해당 세그먼트가 해당 열로 정렬되어 있고 dictionary가 활성화되어 있는 경우에 대해서 활성화됩니다.특정 열로 세그먼트를 정렬하는 방법
- realtime table의 경우: tableIndexConfig.sortedColumn 속성을 사용합니다.
- offline table의 경우: Pinot에 입력하기 전에 지정된 열로 데이터를 미리 정렬해야합니다.Raw value forward index위 인덱스는 ID를 사용하지 않고 실제 값을 저장합니다. Dictionary lookup이 없기 때문에 fetch시에 쿼리 성능이 좋습니다.많은 unique 값이 있을 경우에 dictionary encoding이 효과가 없기 때문에 이때 사용하면 좋습니다.역색인을 지원하지 않으며, JSON/TEXT/Range 등의 인덱스를 지원합니다.FST(Finite State Transducer) Index: 텍스트 데이터를 효율적으로 저장하고 검색하기 위해 설계된 데이터 구조로, regex로 텍스트를 검색하며, 4-6배 정도 디스크 사이즈를 줄입니다.
Geospatial Index
Inverted Index: Bitmap inverted 인덱스, Sorted inverted 인덱스(정렬이 된 경우 자동 제공)
특정 컬럼의 equal, in, gt 등의 필터링 작업에 유용합니다.
컬럼에 대해 역방향 인덱스가 활성화되면 Pinot은 각 값에서 row의 비트맵으로 매핑을 유지하며, 이를 통해 값 조회가 constant time 안에 이루어집니다. 필터링에 자주 사용되는 컬럼이 있는 경우 역방향 인덱스를 추가하면 성능이 크게 향상됩니다.
다중 값(Multi-Value) 열에서도 역방향 인덱스를 생성할 수 있습니다.
sorted 및 dictionary encoding가 활성화된 컬럼은 forward-index 및 inverted-index 모두를 지원하는 특수한 방식으로 인코딩됩니다.
sorted inverted-index는 일반 bitmap inverted-index에 비해 확실히 뛰어난 성능을 제공합니다. 그러나 이는 정렬된 열에만 적용할 수 있습니다.
일반 bitmap inverted-index를 사용할 때 쿼리 성능이 만족스럽지 않은 경우, 특히 질의하는 많은 쿼리가 동일한 열(예: memberId)에 필터를 적용하는 경우,sorted inverted-index 사용하면 쿼리 성능을 크게 향상시킬 수 있습니다.JSON Index
SELECT * FROM mytable WHERE JSON_EXTRACT_SCALAR(person, '$.name', 'STRING') = 'adam'
JSON string column들에 대해서 필터링 및 lookup을 성능을 향상시킬 수 있습니다.
JSON 인덱스는 모든 JSON 객체를 스캔하고 재구성하지 않고도 JSON 문자열 열에 대한 필터링 속도를 높이도록 설계되었습니다.
{ "fieldConfigList": [ { "name": "person", "indexes": { "json": { "maxLevels": 2, "excludeArray": false, "disableCrossArrayUnnest": true, "includePaths": null, "excludePaths": null, "excludeFields": null, "indexPaths": null } } } ], ... }
Native text index(Experimental)Range Index(inverted index의 변형)
value에서 column으로 매핑하는 대신, 값의 범위에서 column으로의 매핑을 생성합니다.
Star Tree Index
다른 인덱스들이 단일 컬럼에 대해서 작동한는 방식과 다르게 Star Tree Index는 여러 컬럼을 기반으로 구축되고 pre-aggregation을 활용하여 처리해야할 값의 수를 줄이는 방식으로 쿼리 성능을 개선합니다.
OLAP 시스템에서 가장 큰 도전 과제 중 하나는 대규모 데이터셋에 대해서 지연시간과 처리량의 SLA를 준수하는 것입니다.
sorted index 및 inverted-index는 쿼리 지연 시간을 개선하지만 속도 개선은 도큐먼트 수에 영향을 받습니다. 결과를 미리 집계하면 쿼리 성능을 일정하게 유지할 수 있지만 저장 공간을 엄청나게 사용하게 됩니다.Star Tree Index를 사용하면 pre-aggregation된 document를 활용하면 집계 및 group by 쿼리에 대해서 낮은 지연 시간과 효율적인 공간 사용을 동시에 달성할 수 있습니다.
Tree structurestar-tree index는 다음과 같은 속성을 가진 구조로 데이터를 저장합니다.
-
Root node (주황): 나머지 트리를 탐색할 수 있는 단일 루트 노드
-
Leaf node (파란): 리프노드는 T개의 레코드를 포함할 수 있으며 T는 설정할 수 있습니다.
-
Non-leaf node (초록): T개 이상의 레코드를 가진 노드는 자식 노드로 분할됩니다.
-
Star node (노랑): Non-leaf nodes는 star node라는 특별한 자식 노드를 가질 수 있습니다. 이 노드는 해당 레벨에서 데이터를 split한 dimension을 제거한 후 pre-aggregated 레코드를 포함합니다.
-
Dimensions split order ([D1, D2]): 트리의 주어진 레벨에서 노드는 특정 dimension의 모든 값에 대해 자식 노드로 분할됩니다.
Node properties
각 노드에 저장된 속성은 다음과 같습니다
- Dimension: 노드가 분할된 차원
- Start/End Document Id: 이 노드가 가리키는 문서의 범위
- Aggregated Document Id: 이 노드가 가리키는 모든 문서의 집계 결과를 나타내는 하나의 문서
인덱스 생성
- dimensionsSplitOrder에 따라 데이터가 projection됩니다. dimensionsSplitOrder에 있는 dimension을 제외하고 drop합니다. dimensionsSplitOrder의 dimension의 고유한 조합마다 메트릭을 집계하고 집계된 문서를 파일에 기록하고 원본 문서와 분리되어 초기 star-tree document로 제공됩니다.
- dimensionsSplitOrder의 순서에 따라 star-tree documents를 정렬합니다. 첫번째 차원을 기준으로 정렬하고 다음 순서대로 정렬합니다. tree의 각 노드로 정렬된 문서의 범위를 알 수 있습니다.
- 트리 구조는 재귀적으로 생성됩니다.
- 노드에 T개 이상의 레코드가 있으면 여러개의 자식 노드로 분할됩니다.
- 현재 노드에 대해 star node가 생성될 수 있습니다. 분할된 dimension을 제외하고 동일한 값을 가진 dimension들의 메트릭을 집계한 후 집계된 문서들을 스타 트리 문서 끝에 추가합니다.
- 만약 현재 dimension에 값이 하나가 있다면 star node는 생성되지 않습니다.
- 더 이상 분할 노드가 없을 때 까지 반복합니다.
Aggregation
Aggregation은 Aggregation과 Column을 쌍으로 설정하여 구성됩니다.
예제SELECT SUM(Impressions) FROM myTable WHERE Country = 'USA' AND Browser = 'Chrome' GROUP BY Locale
위와 같은 SQL에 있을 때 아래와 같은 설정으로 star-tree index를 생성할 수 있습니다.
"tableIndexConfig": { "starTreeIndexConfigs": [{ "dimensionsSplitOrder": [ "Country", "Browser", "Locale" ], "skipStarNodeCreationForDimensions": [ ], "functionColumnPairs": [ "SUM__Impressions" ], "maxLeafRecords": 1 }], ... }
만일 집계의 압축을 설정하고 싶을 경우 아래와 같이 설정할 수도 있습니다.
"tableIndexConfig": { "starTreeIndexConfigs": [{ "dimensionsSplitOrder": [ "Country", "Browser", "Locale" ], "skipStarNodeCreationForDimensions": [ ], "aggregationConfigs": [ { "columnName": "Impressions", "aggregationFunction": "SUM", "compressionCodec": "LZ4" } ], "maxLeafRecords": 1 }], ... }
Text Search Support
Timestamp Index
Timestamp 데이터 타입에 대해서 적용할 수 있고 벤치마크에서 해당 인덱스를 적용했을 때 42초에서 4.2초로 약 10배 정도의 성능 개선을 보입니다. 타임스탬프 column에 대한 range query 및 group by query의 성능을 향상시키는 데 사용됩니다.
Timestamp Index는 테이블 설정에 fieldConfigList에 설정됩니다.
인덱스를 생성하는 방법으로 Segment 생성 시, Data Ingestion시에 생성할 수 있습니다.
동적으로 인덱스를 생성하거나 삭제할 수 있습니다. 필요한 어떤 시점이든 원할 때 동적으로 생성하거나 제거할 수 있습니다.
foo라는 field에 inverted index가 있는 테이블이 있을 때 bar라는 필드도 추가하고 싶을 경우, 다음과 같이 설정을 변경해주면 됩니다.
아래와 같은 설정이라면"tableIndexConfig": { "invertedIndexColumns": ["foo"], ... }
다음과 같이 변경해주면 됩니다."tableIndexConfig": { "invertedIndexColumns": ["foo", "bar"], ... }
업데이트된 인덱스를 reload API해야만 적용됩니다. 이 API는 Helix를 통해서 모든 서버에 Reload 서버에 전송됩니다. 이 과정에서 세그먼트가 추가되거나 삭제됩니다. 해당 작업 수행시 다운타임은 없고 쿼리 수행에도 영향을 미치지 않습니다.
엄격한 저지연이 요구되지 않는다면 대부분의 케이스에서 inverted index가 좋은 성능을 냅니다.
만일 쿼리 속도가 아쉽다면 Sorted Index 나 Star-tree Index를 고려할 수 있습니다.
'Software Development > Data Engineering' 카테고리의 다른 글
[Flink] 플링크 공부 (0) 2025.01.27 [Kafka] 단일 파티션에 대한 고민 (0) 2024.05.27 [Spark] Spark JDBC와 하이브 연동 이슈들. (0) 2023.05.05 [Hive] 하이브 테이블에 Spark으로 적재 시, HQL로 읽을 때, 값이 Null로 조회되는 이슈. (0) 2023.05.02 [Data Pipelines with Apache Airflow] 3. Airflow의 스케줄링 (0) 2022.08.02 -