2019년 8월 6일 화요일

Kubeflow: Volume test

아래와 같이 해야 하는데,
dsl.ContainerOp(...).add_volume(vol).add_volume_mount(mnt)
그래서 원래는 volume claim을 아래와 같이 해야 하는데 이렇게 하면 안된다. 개별 pod은 되지만 argo workflow에서 기동되는 container에서는 할당이 안된다. claim이 안되서 계속 pending 상태로 나온다.
vol = k8s_client.V1Volume(
      name='my-volume', 
      persistent_volume_claim=
        k8s_client.V1PersistentVolumeClaimVolumeSource(
          claim_name='default/task-pv-claim'
        )
  )
결국은 아래와 같이 hostpath를 직접 붙이는 방법으로 해야 붙일 수 있다.
vol = k8s_client.V1Volume(
      name='my-volume', 
      host_path=k8s_client.V1HostPathVolumeSource(path='/home/vagrant/shared'),
  )
ls.py를 실행하면 logs에 아래와 같이 나온다.
total 48
drwxrwxr-x    2 1000     1000          4096 Apr 24 20:17 .
drwxr-xr-x    1 root     root          4096 Apr 24 20:35 ..
-rw-rw-r--    1 1000     1000         15986 Apr 24 17:52 flowlist.txt
-rw-rw-r--    1 1000     1000          3000 Apr 24 20:04 greetings.yaml
-rw-rw-r--    1 1000     1000           443 Apr 24 20:03 ls-volume-test.yaml
-rw-rw-r--    1 1000     1000          1766 Apr 24 19:48 out
-rw-rw-r--    1 1000     1000           189 Apr 24 17:28 pv-claim.yaml
-rw-rw-r--    1 1000     1000           405 Apr 24 18:09 pv-pod.yaml
-rw-rw-r--    1 1000     1000           240 Apr 24 17:24 pv.yaml

개별 pod에 volume 붙이는 방법

위에서 argo는 실패하였지만 pv-pod.yaml을 보면 개발 pod은 정상적으로 붙일 수 있다. 즉 다음과 같은 순서로 가능하다.
  1. PV를 선언하고: pv.yaml
  2. claim을 한다.: pv-claim.yaml
  3. pod에 붙인다.: pv-pod.yaml

AutoML

  • AutoML
    • Automated Machine Learning: 다음의 업무를 자동화
      • Feature Extraction/Enginnering
      • Architecture Search
      • Hyperparameter Optimization: learning rate, mini-batch size 등
    • "인공지능 대중화" autoML
    • 자동화된 머신러닝 환경
      • AutoWEKAalgorithm과 그의 hyperparameters 중심
      • Auto-sklearn: scikit-learn 기반하에 AutoWEKA의 extension으로 만든 것, scikit-learn의 classifiers and regressors 대체제
      • TPOT: machine learning pipelines 최적화 중심 Automated by TPOT
      • H2O AutoML: H2O 기반 분석의 model selection(training)과 ensembling을 자동화 한 것 (TutorialUser Guide)
      • TransmogrifAI: Spark 기반의 AutoML library by salesforce.com
  • Keras, Gluon 등과 같은 Tensorflow와 mxnet를 더욱 쉽게 사용할 수 있게 하는 Framework
  • ONNX와 같은 모델의 배포 및 변환, Framework간의 상호호환성 보장을 위한 규약(by Microsoft, Facebook and aws)

HDFS: Computing Resource와 Storage의 분리

Computing Resource와 Storage의 분리

Hadoop은 태생부터 분산파일시스템인 HDFS가 연산 프레임워크인 MapReduce와 독립적으로 사용될 수 있었다. 게다가 HDFS에 저장한 데이터를 분산처리 하기 위한 프레임 워크가 Map Reduce 밖에 없어 매우 경직되었으며 개방적이지 못한 상태였다. Hadoop v2에 들어서면서 YARN이라는 개방적인 프레임워크가 등장하였고 이를 토대로 Spark 등 많은 분석 환경이 YARN위에서 자유로운 scale-in/out을 하면서 동작하게 되었다. Spark 자원 관리는 mesos, YARN, standalone등 선택이 가능하지만 추가/제거되는 서버상황을 자동 인지하고 이에 맞추어 동작시키려면 YARN 기반하에 사용해야 한다.
hadoop을 구축하고 운영하면서 부딪히는 어려움 중 하나는 증가하는 Computing resource 사용량과 Storage capacity간의 조화이다.
구축 초기에는 대체로 데이터를 저장하기에 급급하다. 즉, Storage 사용량이 증가한다. 분석을 위해 더 많은 데이터의 저장을 요구하고 이에 따라 새로운 데이터를 정의하고 수집한다. 수집된 데이터를 탐색하고 중간 데이터들을 만들어 내면서 임시 파일과 테이블들이 급격히 증가하게 된다. 이러한 상황은 대개 데이터 거버넌스가 제대로 동작하지 않거나 그 정도의 기간에 미치지 못한 경우가 많다. 또한 분석가들이 빅데이터에 익숙하지 않은 채 이것 저것 해 보는 경우가 있다. CPU 사용량은 비정기적으로 튀고 있으며 누군가는 자원을 독점하여 문제를 일으키기도 한다. 그러다 보면 Storage가 모자라게 된다. 그러나 CPU와 메모리 사용량은 크지 않은 경우가 많다. 그래서 Storage를 늘리기 위해 hadoop node를 늘린다. CPU, 메모리 역시 같이 늘리게 된다. 클라우드에서야 점진적인 확장이 가능하지 On-premise에서는 한두대씩 사는 것이 가격 결정력을 떨어뜨리기 때문에 대개 계단형으로 늘리게 된다.시간이 지나 분석가들이 하나 둘씩 늘어나고 정규 배치작업들이 구동하기 시작하면서 이제는 CPU, 메모리 등 컴퓨팅 자원이 부족해 지게 된다. 역시 늘릴 필요도 없는 Storage와 함께 늘리게 된다. 이러한 문제는 aws라면 S3가 있기에 애초에 고민할 필요도 없다. 그러나 On-premise에서는 상황이 다르다. S3와 같이 규모의 경제를 만들어 주는 저렴한 스토리지는 없다. EMC, NetApp, Pure Storage 등의 스토리지는 기존 hadoop 구축 비용과 비교가 불가할 정도이다. 또한 이러한 구조는 대용량의 bandwidth를 갖는 네트워크 스위치가 필요하다.
하나의 hadoop 클러스터로 모든 것을 하기에는 위험하다.
hadoop 클러스터는 데이터 수집 및 저장, 분석 그리고 서비스용 데이터 제공까지 다양한 역할을 수행한다. 이러한 역할로 인해 클러스터에 설치되는 SW 역시 다양하다. 각 역할간의 자원 경합, 각 SW 간의 간섭 등 하나의 거대한 클러스터로는 운영에 있어 위험 요소가 너무나 다양하고 복잡하다. 따라서 데이터 플랫폼과 이를 운영하고 사용하는 조직이 발전하면서 클러스터는 용도별로 분리되고 있다.[1]
  • 분석용 클러스터: 컴퓨팅 자원을 주로 사용하며 사용자간 자원 분배가 중요
  • 데이터 입수/저장용 클러스터: 스토리지 위주이나 데이터 입수에 장애가 발생하지 않도록 하기 위해 입수용 자원은 일정 수준을 보장해야 함.입수에 필요한 배치 작업의 수행 시간 임계치 등 설정 필요
  • 서비스용 클러스터: SLA(응답시간, 정기점검 등) 필요. 분석, 모델링 등과의 자원 경합에서 독립적이어야 함
  • 명확히 대용량의 스토리지가 필요한 경우를 제외하고는 hadoop이 필요없을 수 있음
Computing resource와 Storage를 분리하는 것은 어떠한 가능성을 가지고 있을까?
  • 효과적인 증설: Storage를 반드시 고가의 All flash가 아닌 기존 서버 기반의 hadoop이라고 해 보자. 여전히 hadoop은 서버 기반이기에 함께 늘려야 한다는 생각을 버리고 CPU와 메모리만 탑재된 랙 공간을 효과적으로 사용할 수 있는 서버를 꽂는다. 그렇다면 hadoop의 사용성을 유지하려면 무엇을 해야 하는가? 네트워크 구간을 확장해 주어야 한다. 이것을 기반으로 비교적 합리적인 분리가 가능하다. 그렇다면 hadoop의 최대 장점이라 여겨지는 data locality는 어떻게 되는가? data locality가 중요시 여겨지던 때는 네트워크가 디스크 속도를 따라오지 못할 때다. 또한 MapReduce는 연산 작업을 작게 쪼개고 이를 해당 데이터가 존재하는 곳에서 구동시킨다. 두 번째 연산 부터는 어떻게 하나? disk locality와 별 관계가 없어진다. 한편 Spark을 주로 사용하는 환경에서 data locality는 MapReduce 환경에 비해 크게 문제가 되지 않는다. Disk에서는 어차피 한번 읽어 오면 된다. 그래서 네트워크가 확보되면 data locality로 인한 문제는 상당히 감소될 수 있다.[1][2]
Thus, as observed previously, reads from local disk are comparable to reads from local network. Figure 1 confirms this by showing that the bandwidth difference is about 8%. (실험 결과 로컬 디스크에서 읽는 것과 로컬 네트워크에서 읽는 것은 8% 정도의 대역폭 차이를 보인다.)
  • 분석용 SW의 빠른 릴리즈, 빠른 업데이트: 2019년 초인 현 시점에서 볼 때 한 가지는 여전히 빠르게 릴리즈 되고 있는 분석, 모델링용 SW의 신속한 업데이트가 있다고 할 수 있다. Tensorflow는 많으면 한달에 2번 이상 릴리즈 되고 있다. 신규 기능들이 나오고 새로운 알고리즘들이 배포되면 분석가들은 사용해 보고 싶어하고 그것을 통해 자신이 만든 모델의 성능을 높이고 싶어한다. Spark는 어떤가? 지난 3월말 2.4.1버전의 릴리즈까지 거의 매달 릴리즈가 되고 있다.

5 References

[2] Disk-Locality in Datacenter Computing Considered Irrelevant, 2011, Ganesh Ananthanarayanan, Ali Ghodsi, Scott Shenker, Ion Stoica, University of California, Berkeley

[3] Decoupling Storage and Computation in Hadoop with SuperDataNodes, 2010, George Porter, UC San Diego

NiFi: Processors for Querying Databases

Database Extract with NiFi

Processors for Querying Databases

  • ExecuteSQL: 쿼리를 직접 작성하고 전송할 목적, pagination이 적용되는 쿼리에 적합. 
    executes an arbitrary SQL statement and returns the results as one FlowFile, in Avro format, containing all of the result records. Very simple and flexible, works with a broad set of statements including stored procedure calls. Designed for general-purpose use, does not have specific features for incremental extraction. ExecuteSQL can accept incoming FlowFiles, and FlowFile attributes may be used in expression language statements to make the SQL.
  • QueryDatabaseTable: 증분 쿼리, 최근 변경 사항만 가져오는 용도
    designed specifically for incremental extraction. Computes SQL queries based on a given table name and incrementing column. Maintains NiFi state data tracking the last incremental value retrieved. Results are formatted as Avro files.
  • GenerateTableFetch: Pagination 적용 쿼리, 데이터가 큰 경우
    New in NiFi 1.0.0. May be used to generate a sequence of paged query statements for use with ExecuteSQL, making it practical to query very large data sets in manageable chunks.
If you are doing an incremental extract, trying to get only the latest records, then QueryDatabaseTable is probably your processor. If you need to customize the SQL statement to individual input FlowFiles, ExecuteSQL is the only way to go. If you wish to run a routine query on a scheduleExecuteSQL is probably a better fit.
NiFi에서 DB connection test를 하기 위해서는 ExecuteSQL이 적합할 것으로 예상됨. Oracle을 예로 들면 다음과 같이 connection test를 할 것임
SELECT 1 FROM DUAL
이러한 custom SQL statement를 작성하고 결과를 받기위해서 사용하는 것이 ExecuteSQL이므로 가장 적합해 보임

Dealing with Avro Files

  • SplitAvro: 여러 레코드가 저장된 avro file을 개별 레코드 또는 일정한 크기의 FlowFile로 분리시킴
    splits an Avro file with multiple records into individual FlowFiles, or FlowFiles of an arbitrary size. Since an Avro file may contain more than one record, using SplitAvro can give you consistent size of the FlowFiles in your flow.
  • ConvertAvroToJSON: appropriately named processor that converts Avro files to -- wait for it -- JSON. This is extremely useful both for the flexibility of processing JSON and the ease of visually inspecting JSON data. Combined with the SplitAvro processor, you can easily convert a stream of database records into a stream of single-record JSON FlowFiles.

Large Result Sets

  • 대용량 데이터 조회시 QueryDatabaseTable은 사용하지 말 것
  • ExecuteSQL + GenerateTableFetch를 사용할 것
Due to the wide variance in SQL support for limiting result sets, NiFi does not provide a means to automagically keep result sets down to hundreds or thousands of rows. Reading a large table with QueryDatabaseTable may not be practical.
But remember that ExecuteSQL allows you to customize the SQL statement with an incoming FlowFile. If you have NiFi 1.0.0, you can use the **GenerateTableFetch **processor to do build a stream of incremental queries...

Reference

spark elasticsearch 연동시 설정 예제



실제 운영에서 사용할 수 있는 옵션들 정리해 봤다. 실제 운영하기도 했던 설정이다. 2억건의 데이터를 수시간내에 제한된 instance로 update하고 ranking 구하고 기타 등등을 해야 하는 상황이었고, spark도 elasticsearch도 굉장히 많은 데이터를 벌크 처리하도록 하였다. 모종의 2억건의 데이터를 140여개 table 쿼리하고, groupby, join, rank, elasticsearch 넣고 빼고 ... spark dynamic allocation은 사용하지 않았다.

Reference

#!/bin/bash
nohup spark-submit \
--class com.ctojang.spark.example \
--master yarn \
--deploy-mode client \
--conf spark.yarn.maxAppAttempts=4 \  작업 제출 재시도 횟수
--conf spark.yarn.am.attemptFailuresValidityInterval=1h \ AM 작업실패를 트래킹하는 용도로 성공시 실패 카운터는 리셋됨
--conf spark.yarn.max.executor.failures=48 \ 개별 작업이 실패할 때도 있는데, 48번 실패하면 이 어플리케이션이 실패했다고 판단(numExecutors * 2, with minimum of 3)
--conf spark.yarn.executor.failuresValidityInterval=1h \ executor의 작업실패를 트래킹하는 용도
--conf spark.task.maxFailures=8 \ 개별 작업이 실패로 판단되는 기준 횟수
--conf spark.hadoop.fs.hdfs.impl.disable.cache=true \ 작업을 오래 돌리다 보면 HDFS 인증 token이 만료되어 에러나는 경우가 생김(우리는 발생하지 않았음)
--conf spark.yarn.am.memory=6g \ “client mode" 일때 설정
--conf spark.driver.memory=6g \  “cluster mode" 일때 설정
--conf spark.yarn.am.memoryOverhead=3g \ “client mode" 일때 설정
--conf spark.driver.memoryOverhead=3g \  “cluster mode" 일때 설정
--conf spark.executor.memory=3g \ executor 개수에 영향을 주는 요소, core개수에 memory 잔여량까지 더해져서 executor 개수가 정해짐
--conf spark.driver.maxResultSize=2g \ driver에게 모이는 결과 값의 크기, 기본 1g, 이 값이 작으면 physical memory가 작아서 에러가 난다는 메시지가 출현함
--conf spark.executor.instances=4 \ 최소 기동을 보장하는 executor의 개수
--conf spark.es.batch.size.bytes=100mb \ ES batch API를 통해서 한번에 기록하는 데이터 크기(기본 1mb)
--conf spark.es.batch.size.entries=10000 \ ES batch API를 통해서 한번에 기록하는 데이터 갯수(크기와 갯수 중 하나만 맞으면 바로 쓰기작업실행함)
--conf spark.es.batch.write.retry.wait=6 \ ES batch API 실패시 재시도 간격(초단위)
--conf spark.es.batch.write.retry.count=30 \ ES batch API 실패 재시도 횟수
--conf spark.es.nodes.discovery=true \ ES node 하나면 연결되면 클러스터 구성 node를 모두 찾는 기능(첫 기동시에만 discover됨). 이걸 안켜면 es.nodes에 기입된 호스트에게 요청
--conf spark.es.nodes.client.only=false \ 모든 요청을 client node에게 하는 모드
--conf spark.es.nodes.data.only=true \ 모든 요청을 data node에게 하는 모드(기본값)
--conf spark.es.transport.tcp.compress=true \ 전송 데이터 압축 여부
--conf spark.es.nodes=internal-example-es-elb-data-pr-719749396.ap-northeast-2.elb.amazonaws.com \
--files s3://example/resources/spark-log4j.properties \
--driver-java-options "-Dlog4j.configuration=fi-spark-log4j.properties -Dexample.logging.level=DEBUG" \
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=fi-spark-log4j.properties -Dexample.logging.level=DEBUG -Denv=prd -DappName=example-fullindex -DdbFetchSize=500000 -DesFetchSize=2000" \
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=fi-spark-log4j.properties -Dexample.logging.level=DEBUG" \
s3://example/bin/example-fullindex-assembly-0.1.jar &

Kafka의 특징 요약

Kafka의 구성

System 구성은 다음과 같이 직관적이다.
Producer -> Kafka -> Consumer
  • Message를 생산하는 Producer
  • Message를 보관하는 Kafka
  • Message를 사용하는 Consumer

Kafka 저장소 형태

Topic > Partition > Leader(원본) & Follower(복제본) 순으로 깊게 들어간다.
  • Kafka에서는 데이터를 메시지라는 용어로 사용한다.
  • 메시지는 논리적 저장 공간인 Topic에 저장된다.
  • Topic에 저장되는 메시지들은 다수의 서버에 분산되어 있는 Partition에 저장된다.(Partition 개수 설정 가능)
  • 메시지는 다수의 Partition에 저장되는데 이들은 Replication Factor를 통해 복제본을 만들어 저장된다.
  • 하나의 Partition은 Leader와 Follower로 나뉘게 된다.
  • Replication factor 만큼 복제를 해야 하니 누군가는 첫번째 원본을 저장하고 있어야 하고(leader) 누군가는 그것의 복제본을 저장하고 있어야 한다.(follower)
  • Replication factor가 3이라면 1개의 원본, 2개의 복제본을 만든다.
  • Leader가 원본을 저장하기에 Producer와 Consumer는 Leader하고만 통신하면 된다.
  • 그럼 특정 노드만 부하가 올라가냐? 하나의 Partition에 Leader가 존재하고, 서로 다른 Partition의 Leader는 여러 서버에 분산되어 존재한다. 그래서 서버가 충분히 많다면 걱정할 필요는 없다. 그러나 여기서 짐작하듯이 서버수와 Partition 수가 상관관계가 있을 수 밖에 없구나... 라고 느끼게 된다.
  • Leader와 Follower는 High Availability에 연관되어 장애시 failover 정책에 따라 역할이 정해진다.

Leader & Follower

Leader 선출

보통 Leader 선출한다고 하면 드는 생각이 '아! zookeeper에 ephemeral node를 선점하는 녀석이 이기겠구나! 라던지 lock node를 선점하는 녀석이 이기는 거겠지?' 라고 생각할 수 있으나 kafka는 좀 다르다.;;;
Kafka는 Leader를 선정할 때 여러 노드에 분산하여 선정한다. 그래야 특정 서버에 부하가 몰리지 않겠지. 장애 등으로 인해 leader가 죽으면 follower 중 하나를 leader로 세우겠지. 다음과 같은 정책이 있다.
  • prefered replica leader: kafka가 보기에 '이 partition의 leader는 니(노드)가 하는 게 좋겠어' 라고 지정한 것
  • auto.leader.rebalance.enable=true: 장애등으로 인해 특정 leader가 죽었다면 follower중 하나가 leader가 되어야 하는데 kafka가 공들여서 만들어 놓은 leader 분배 밸런스가 깨지겠지? 그럼 다시 밸런스를 맞춰야 겠지. 이 옵션을 켜면 그렇게 해 준다. (kafka-preferred-replica-election.sh으로 수동으로 할 수도 있음)

ISR(In-Sync Replicas)

"In-Sync" 라는 말은 '동기화 되어 있다.' 정도로 해석할 수 있지 않을까? 즉, follower가 나의 leader와 동기화 되어 있다!라고 생각해야 제대로 된 replica라고 할 수 있지 않겠나.
"ISR"은 하나의 partition에서 동기화 되어 있다고 판단되는 replica set을 이야기 한다. 즉, leader와 follower의 집합이다.
그럼 동기화 주기는 어떻게 되는가? replica.lag.time.max.ms 시간 내 동기화 되었다고 느끼면 된거다.(default 500ms)
그런데 조금 이상하다. Follower 입장에서 동기화라고 하면 '내가 얼마 주기로 동기화 해야 하지?' 라던지 '계속 폴링 해야지?' 라던지... 'replica.lag.time.max.ms'라는 옵션을 해석해 보자. '동기화에 걸리는 최대 지연시간'. 뭔가 굉장히 강압적인 이 느낌. 그렇다. follower는 저 시간내 동기화 되지 못하면 짤린다. 즉, ISR에서 퇴출된다.
Follower가 ISR 멤버임을 유지할 수 있는 조건은 다음과 같다. 실제로는 replica.lag.time.max.ms 만 설정해도 된다.
  • replica.lag.time.max.ms: leader와 최소 이 시간 마다 동기화 해야 함
  • replica.lag.max.messages: leader와 동기화된 메시지의 최대 차이를 이 수 내에서 유지해야 함 (있다는 것만 알아도 된다.)
동기화가 되어 있지 않은 follower에는 어떤 유형이 있을까?
  • Slow replica: I/O 등의 이슈로 동기화가 느린 유형
  • Stuck replica: GC 발생 또는 서버 장애 등의 이유로 전혀 동작하지 않는 유형
  • Boostrapping replica: ISR 수를 조정하여 새로운 follower로 등록된 경우 leader가 가진 모든 메시지를 복사하는 유형
아무튼 Follower가 이렇게 기구한 운명을 타고난 이유를 살펴보기 위해서는 메시지가 저장되는 과정의 일부를 살펴볼 필요가 있다.

Acknowledgement

publisher, 즉 메시지를 생산하는 입장에서 중요한 것은 역시 내가 보낸 메시지가 안전하게 저장되었는 지 확인하는 것이다. 중요한 것은 안전한 전송이 아니라 안전한 저장 이다. 즉, 메시지가 안전하게 전송된 후 역시 안전하게 저장되어야 ACK를 받는다는 것을 기저에 두고 있다.
  • acks=0: publisher는 leader에게 메시지를 전송하지만 저장여부에 관심이 없다. 그냥 계속 보내기만 한다.
  • acks=1: publisher는 leader에게 메시지를 전송하고 ACK를 기다린다. leader는 자기만 저장하면 ACK를 전송한다.
  • acks=all: publisher는 leader에게 메시지를 전송하고 ACK를 기다린다. leader는 자기와 follower들이 모두 메시지를 저장한 다음 ACK를 보낸다. 즉, ISR이 모두 메시지를 디스크에 저장하면 leader가 ACK를 보낸다.
다시 ISR 이야기로 돌아가 보자. 결국 acks=all의 경우 producer에게는 견뎌야 하는 지연이 발생한다. 이 지연 시간을 최소화 하기 위해서는 follower들이 leader가 수신한 메시지를 최대한 빠르게 복제해야 한다. 결과적으로 뭔가 문제가 있어 replica.lag.time.max.ms시간 내에 복사를 제대로 못하는 follower는 publisher, kafka에 이르는 전체 시스템에 영향을 미치기에 짤린다.
그런데 무작정 짜르기만 한다면 언제까지 자를 것인가? min.insync.replicas 이 설정이 바로 최소의 ISR내 replica 요건이 된다. 만약 이런 저런 이유로 follower 들이 줄어들다가 이 수 보다 작아지는 경우, leader는 publisher의 메시지 송신 응답으로 "Not engough replicas"라는 에러 응답을 하게 된다.
Availability 보다 Consistency가 더 중요하다는 이야기다.
중요한 옵션이 하나 더 있는데 unclean.leader.election.enable 이다. 이 옵션은 ISR이 무너진 경우 ISR에서 쫒겨났지만 살아 있는 노드에게 leader 역할을 넘기는 것을 허용할 지 여부를 결정한다. 그러나 ISR에서 쫒겨난 경우 이미 leader와 메시지 동기화 수준이 많이 떨어져 있기 때문에 상당한 메시지 손실을 볼 수 있다.
애초에 replica를 위한 서버 구성을 하는 것이 필요하겠다. 모든 것은 돈이 문제라...

중요한 설정

  • replica.lag.time.max.ms: follower가 이 시간 내에는 반드시 메시지 동기화를 수행해야 함
  • unclean.leader.election.enable: ISR 내에 더 이상 leader 역할을 할 수 없을 때 ISR 밖에서 leader를 선임할 수 있는 지 여부를 설정 - consistency는 포기하더라고 availability가 중요한 경우...라도 정말 좋지 않은 상황인듯?
  • min.insync.replicas: ISR내 최소 이 수의 leader+follower 수는 확보되어야 함을 의미

Reference

HDFS High Availability

아래 모든 설명은 하나 떠 있는 구조가 아닌 실제 운영환경, 완전 분산 모드로 설치한 것을 가정한다.

구성요소

HDFS의 구성요소는 다음과 같다.
  • NameNode: Active & Standby, Zookeeper Failover Controller
  • DataNode
  • Quorum Journal Manager
  • Zookeeper
edits log를 모아서 FS image를 만드는 역할을 수행했던 Secondary NameNode는 HA 구성에서 필요 없다. Standby NameNode가 해당 역할을 수행한다.

High Availability

hadoop은 예전부터 이중화 이야기가 항상 꽂을 피웠다. 공식적으로 이중화 안되던 시절에 facebook이 avatar node라는 걸 만들기도 했었다. 나는 0.18 버전 사용하던 당시 zookeeper client 두개를 놓고 ephemeral node를 먼저 가진 녀석이 자기 namenode를 기동시키도록 만들었었다. 그 상태 관리는 SCXML이라는 오픈소스를 이용해서 했었고 FS image와 edits log를 위해서는 NFS를 썼다. 어찌 보면 지금의 ZKFC와 유사한 것 같다.

namenode 이중화

namenode는 Active와 Standby로 구성된다. Active namenode는 현재 datanode 및 client가 대화하는 상대이고 Standby namenode는 Active가 죽으면 그 역할을 대신하기 위해 떠 있다. 다만 Secondary namenode가 수행하던 checkpoint 기능을 Standby namenode가 대신 수행한다. 즉, edits log를 가지고 FS image를 만드는 역할을 Standby namenode가 수행한다.

QJM(Quorum Journal Manager)

Hadoop만을 위해서 만들어진 것이다. 통상 3개의 QJM이 동시에 edit log를 작성한다. Quorum이기에 3 노드 중 2 노드가 찬성 하면 맞는 거다. 별도의 프로세스가 기동된다. 하지만 작아서 namenode, resource manager 등과 같이 띄워도 무방하다. hadoop의 master node들은 다들 하는 일이 별로 없다.

Conventional Shared Storage

edit log와 FS image를 위한 공용 스토리지를 말한다. 이렇게 구성하면 namenode 2대와 별도로 안정성이 보장되는 shared storage가 있어야 한다. 아주 오래된 방법이긴 하나 구성이 간단해서 좋긴하다.

ZKFC(Zookeeper Failover Controller)

zookeeper의 'ActiveStandbyElectorLock(ephemeral)'와 'ActiveBreadCrumb(persistent)' 노드를 만들어 놓고 자신이 관리하는 namenode가 active임을 명시한다. haadmin으로 failover test를 하면서 이 값을 모니터링 하면 바뀌는 것을 확인할 수 있다.

Fencing Methods

간단하게 sshfence를 사용할 수 있다. 아니면 임의의 shell file을 실행시킬 수도 있다. sshfence를 이용하려면 상호 비번 없이 ssh가 가능하게 뚤어 놓자. 이때 주의할 것은 namenode 서버 모두에 반드시 fuser가 설치되어 있어야 한다는 것이다. sshfence는 내부적으로 상대 namenode의 namenode process를 죽이는데 이때 fuser를 사용한다.