2019년 8월 6일 화요일

spark elasticsearch 연동시 설정 예제



실제 운영에서 사용할 수 있는 옵션들 정리해 봤다. 실제 운영하기도 했던 설정이다. 2억건의 데이터를 수시간내에 제한된 instance로 update하고 ranking 구하고 기타 등등을 해야 하는 상황이었고, spark도 elasticsearch도 굉장히 많은 데이터를 벌크 처리하도록 하였다. 모종의 2억건의 데이터를 140여개 table 쿼리하고, groupby, join, rank, elasticsearch 넣고 빼고 ... spark dynamic allocation은 사용하지 않았다.

Reference

#!/bin/bash
nohup spark-submit \
--class com.ctojang.spark.example \
--master yarn \
--deploy-mode client \
--conf spark.yarn.maxAppAttempts=4 \  작업 제출 재시도 횟수
--conf spark.yarn.am.attemptFailuresValidityInterval=1h \ AM 작업실패를 트래킹하는 용도로 성공시 실패 카운터는 리셋됨
--conf spark.yarn.max.executor.failures=48 \ 개별 작업이 실패할 때도 있는데, 48번 실패하면 이 어플리케이션이 실패했다고 판단(numExecutors * 2, with minimum of 3)
--conf spark.yarn.executor.failuresValidityInterval=1h \ executor의 작업실패를 트래킹하는 용도
--conf spark.task.maxFailures=8 \ 개별 작업이 실패로 판단되는 기준 횟수
--conf spark.hadoop.fs.hdfs.impl.disable.cache=true \ 작업을 오래 돌리다 보면 HDFS 인증 token이 만료되어 에러나는 경우가 생김(우리는 발생하지 않았음)
--conf spark.yarn.am.memory=6g \ “client mode" 일때 설정
--conf spark.driver.memory=6g \  “cluster mode" 일때 설정
--conf spark.yarn.am.memoryOverhead=3g \ “client mode" 일때 설정
--conf spark.driver.memoryOverhead=3g \  “cluster mode" 일때 설정
--conf spark.executor.memory=3g \ executor 개수에 영향을 주는 요소, core개수에 memory 잔여량까지 더해져서 executor 개수가 정해짐
--conf spark.driver.maxResultSize=2g \ driver에게 모이는 결과 값의 크기, 기본 1g, 이 값이 작으면 physical memory가 작아서 에러가 난다는 메시지가 출현함
--conf spark.executor.instances=4 \ 최소 기동을 보장하는 executor의 개수
--conf spark.es.batch.size.bytes=100mb \ ES batch API를 통해서 한번에 기록하는 데이터 크기(기본 1mb)
--conf spark.es.batch.size.entries=10000 \ ES batch API를 통해서 한번에 기록하는 데이터 갯수(크기와 갯수 중 하나만 맞으면 바로 쓰기작업실행함)
--conf spark.es.batch.write.retry.wait=6 \ ES batch API 실패시 재시도 간격(초단위)
--conf spark.es.batch.write.retry.count=30 \ ES batch API 실패 재시도 횟수
--conf spark.es.nodes.discovery=true \ ES node 하나면 연결되면 클러스터 구성 node를 모두 찾는 기능(첫 기동시에만 discover됨). 이걸 안켜면 es.nodes에 기입된 호스트에게 요청
--conf spark.es.nodes.client.only=false \ 모든 요청을 client node에게 하는 모드
--conf spark.es.nodes.data.only=true \ 모든 요청을 data node에게 하는 모드(기본값)
--conf spark.es.transport.tcp.compress=true \ 전송 데이터 압축 여부
--conf spark.es.nodes=internal-example-es-elb-data-pr-719749396.ap-northeast-2.elb.amazonaws.com \
--files s3://example/resources/spark-log4j.properties \
--driver-java-options "-Dlog4j.configuration=fi-spark-log4j.properties -Dexample.logging.level=DEBUG" \
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=fi-spark-log4j.properties -Dexample.logging.level=DEBUG -Denv=prd -DappName=example-fullindex -DdbFetchSize=500000 -DesFetchSize=2000" \
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=fi-spark-log4j.properties -Dexample.logging.level=DEBUG" \
s3://example/bin/example-fullindex-assembly-0.1.jar &

댓글 없음:

댓글 쓰기