Kubernetes) Spark Thrift Server 클러스터에서 올리기

Kubernetes) Spark Thrift Server 클러스터에서 올리기

개요

thrift server란?

로컬에서 실행하는 예시 부하가 걸리면 죽음ㅜ

Kubernetes에서 thrift cluster로실행하는 방법 준비

hive-metastore, mysql (Pod) thrift-custom jar 준비 (Custom) Docker Image 빌드 실행 init용 pod 띄우기 init용 pod에서 명령어 날리기

시행착오 ㅊ

thrift server란?

spark 에 sql을 날릴 수 있는 Thrift Server 라는게 있어요

daemon 처럼 jdbc 를 날릴 수 있는 환경을 구성해주는거죠

beeline을 이용하거나, dbeaver와 같은 툴을 이용해 조회할 수 있답니다.

https://moons08.github.io/programming/thrift_idl_rpc/

local에서 실행하는 방법

로컬에서 thrift를 실행하는건 엄청 쉬워요.

기본 spark.tar 에 들어가 있거든요

출처 : https://spark.apache.org/docs/latest/sql-distributed-sql-engine.html

./sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port= \ --hiveconf hive.server2.thrift.bind.host= \ --master

그런데 갑자기 큰 부하가 걸리면 아무리 설정값을 바꿔봐도 죽는다는 문제가 있어요ㅜ

maxresultsize를 늘리고 memory를 32G를 주고 shuffle을 키고 그래도..

그래서 kubernetes의 POD를 이용해 올리는 방법을 생각했어요.

Kubernetes에서 thrift cluster로실행하는 방법

https://itnext.io/hive-on-spark-in-kubernetes-115c8e9fa5c1

딱 필요한 기능을 설명한 블로그가 있어서 따라해봣어요. 블로그와 차이점이 있는 부분은 따로 표시했습니다 :)

총 3가지 준비가 필요해요

준비 hive-metastore, mysql (Pod) thrift-custom jar 준비 (Custom) Docker Image 빌드

실행 init용 pod 띄우기 init용 pod에서 명령어 날리기

결과 spark_submit -> Driver -> Executor 1, 2, 3

hive-metastore, mysql (Pod)

먼저 hiver-metastore, mysql을 띄워줘요.

kidong 선생님의 블로그 글을 따라하면 쉽게 띄울 수 있어요

https://itnext.io/hive-on-spark-in-kubernetes-115c8e9fa5c1

thrift-custom jar 준비 (Custom)

examples/spark-thrift-server에 가서 mvn 빌드를 해줘요

mvn -e -DskipTests=true clean install shade:shade;

thrift server를 cluster에서 실행하기 위해서는 class를 한번 감싸주는 작업이에요

package io.mykidong.hive; public class SparkThriftServerRunner { public static void main(String[] args) { org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(args); } }

dirver안에서 class를 호출하거든요

그래서 아래처럼 --class 코드가 추가되요(자세한거는 뒤에서 설명)

opt/spark/bin/spark_submit (...) --class io.mykidong.hive.SparkThriftServerRunner \ (...) local:///opt/spark/jars/spark-thrift-server-1.0.0-SNAPSHOT-spark-job.jar

mvn build는 똑같이하면 spark-thrift-server-1.0.0-SNAPSHOT-spark-job.jar 가 나와요

여기까지 완료가 되면, hive-metastore/mysql 이 pod로 떠있고 jar 파일도 준비가 되어 있을거에요

+ 참고한 블로그와 다른점

kidong 선생님의 예시에는 s3, delta 관련 dependency가 pom.xml에 잇었는데

저는 다 주석을 했어요. 인터넷이 안되는 곳에서 진행하다보니까. .jar가 겹쳐서 존재하지 않는다고 뜨더라고요

pom.xml : 주석(aws-java-sdk-s3:1.11.375, hadoop-aws:3.2.0, delta)

Docker Image 빌드

Dockerfile은 아래와 같아요

FROM project-private/test/mykidong/spark:v3.0.0

USER root

# spark-thrift-server-1.0.0-SNAPSHOT-spark-job.jar

COPY ./jars/spark-thrift-server-1.0.0-SNAPSHOT-spark-job.jar \

/opt/spark/jars/spark-thrift-server-1.0.0-SNAPSHOT-spark-job.jar

# s3 library

COPY ./jars/aws-java-sdk-s3-1.11.375.jar /opt/spark/jars/aws-java-sdk-s3-1.11.375.jar

COPY ./jars/hadoop-aws-3.2.0.jar /opt/spark/jars/hadoop-aws-3.2.0.jar

빌드하고 푸시해줘요

docker push project-private/test/spark-thrift-mykidong:v3.0.0

docker build -t project-private/test/spark-thrift-mykidong:v3.0.0 .

빌드해주면 준비가 끝이 납니다.

실행

init용 pod 띄우기

이제 spark-submit만 하면되요

저 같은 경우는 로컬에 spark가 없어서 pod로 띄웟어요

그리고 master api에 ip 가 허용이되지 않아서, host 명으로 임시 등록했어요. (이 부분은 공부 더 하고 보충할게요)

$ k run -it temp_spark --image=project-private/test/spark-thrift-mykidong:v3.0.0 -n thrift -- /bin/sh

# echo "172.241.xx.xx lb.kubesphere.local " >> /etc/hosts

init용 pod에서 명령어 날리기

k exec -it -temp_spark -n thrift -- /bin/sh

# spark-submit \

--ma ster k8s://https:// lb.kubesphere.local:6443

--deploy-mode cluster \

--name spark-thrift-server \

--class io.mykidong.hive.SparkThriftServerRunner \

--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint \

--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.subPath=checkpoint \

--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false \

--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-driver-pvc \

--conf spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint \

--conf spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.subPath=checkpoint \

--conf spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false \

--conf spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-exec-pvc \

--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.spark-local-dir-localdirpvc.mount.path=/localdir \

--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.spark-local-dir-localdirpvc.mount.readOnly=false \

--conf spark.kubernetes.driver.volumes.persistentVolumeClaim.spark-local-dir-localdirpvc.options.claimName=spark-driver-localdir-pvc \

--conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-localdirpvc.mount.path=/localdir \

--conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-localdirpvc.mount.readOnly=false \

--conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-localdirpvc.options.claimName=spark-exec-localdir-pvc \

--conf spark.kubernetes.container.image.pullPolicy=Always \

--conf spark.hadoop.hive.metastore.client.connect.retry.delay=5 \

--conf spark.hadoop.hive.metastore.client.socket.timeout=1800 \

--conf spark.hadoop.hive.server2.enable.doAs=false \

--conf spark.hadoop.hive.server2.thrift.http.port=10002 \

--conf spark.hadoop.hive.server2.thrift.port=10016 \

--conf spark.hadoop.hive.server2.transport.mode=binary \

--conf spark.hadoop.metastore.catalog.default=spark \

--conf spark.hadoop.hive.execution.engine=spark \

--conf spark.hadoop.fs.s3a.connection.ssl.enabled=true \

--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \

--conf spark.hadoop.fs.s3a.fast.upload=true \

--conf spark.hadoop.fs.s3a.path.style.access=true \

--conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp" \

--conf spark.executor.instances=2 \

--conf spark.executor.memory=2G \

--conf spark.executor.cores=1 \

--conf spark.driver.memory=1G \

--verbose \ # 밑에 부분이 제가 수정한 부분

--conf spark.kubernetes.file.upload.path=s3a://thrift/spark-thrift-server \

--conf spark.kubernetes.namespace=thrift \

--conf spark.kubernetes.container.image=project-private/test/mykidong/spark:v3.0.3 \

--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \

--conf spark.hadoop.hive.metastore.uris=thrift://10.233.**.**:9083 \ #hive-metastore endpoint (POD)

--conf spark.sql.warehouse.dir=s3a://gtc/apps/spark/warehouse \

--conf spark.hadoop.fs.s3a.endpoint=http://172.17.172.86:31561 \ #object storage or s3 endpoint

--conf spark.hadoop.fs.defaultFS=s3a://test \

--conf spark.hadoop.fs.s3a.access.key=test-key \

--conf spark.hadoop.fs.s3a.secret.key=test-key \

local:///opt/spark/jars/spark-thrift-server-1.0.0-SNAPSHOT-spark-job.jar

블로그와 다른점

외부망이 되지 않는 환경이어서 --packages를 사용하면 오류가 나더라고요

.ivy, mvn에 넣어도 안되고, 프록시 설정을 추가해도 안되고..

그래서 외부 repo를 찍고 오는 --package 를 주석했어요.

--packages com.amazonaws:aws-java-sdk-s3:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0

1. 앞에 Dockerfile에 필요한 jar를 copy하고

2. spark-thrift-server-1.0.0-SNAPSHOT-spark-job.jar (custom)을 빌드할 때는 pom.xml에서 겹치는 설정을 주석했어요.

제가 추가하고, 제외한 jar들은 aws 관련된 것들입니다.

aws-java-sdk-s3:1.11.375, hadoop-aws:3.2.0

시행착오

해결

aws 관련 라이브러리 없음

문제 : aws 관련 jar 가 있는데, 없다고 에러를 뱉음

파악 :

- 구글링해보니, aws 관련 jar가 2개가 있으면 없다는 에러를 뱉는다고 함.

- '--packages '를 이용하면 dependency를 관리해준다고 함 -> 하지만 나는 내부망이라 못함ㅜ

- custom jar에서 shaded 되는 과정에서 path가 꼬인것 같다고 추정

- custom jar를 열어보니, aws 관련 jar가 있었음.

결론 :

pom.xml에서 관련 jar 주석하고 Dockerfile에 aws 관련 jar copy함

관련 로그

Please make sure that jars for your version of hive and hadoop are included in the paths passed to spark.sql.hive.metastore.jar

at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:310)

at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:431)

at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:324)

at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:68)

at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:67)

at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:221)

at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)

at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)

at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:221)

at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:137)

at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:127)

at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:58)

at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:93)

at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala)

at io.mykidong.hive.SparkThriftServerRunner.main(SparkThriftServerRunner.java:6)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)

at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)

at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)

at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)

at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)

at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: java.lang.reflect.InvocationTargetException

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:301)

... 26 more

Caused by: java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:348)

at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2532)

at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2497)

at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2593)

at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3269)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3301)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:227)

at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:583)

at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:548)

at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:175)

at org.apache.spark.sql.hive.client.HiveClientImpl.(HiveClientImpl.scala:128)

... 31 more

Caused by: java.lang.ClassNotFoundException: com.amazonaws.AmazonClientException

at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

at java.lang.ClassLoader.loadClass(ClassLoader.java:418)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)

at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

... 47 more

21/11/25 04:11:10 INFO SparkContext: Invoking stop() from shutdown hook

(미해결)

문제 : deploy가 아니라서 OOM나서 죽으면 답이 없음 deployment로 만들어야할 것같음 memory를 32G 로 늘리고, Instance를 늘려도 무지막지한 SQL에 죽음ㅜ

문제: object storage pv연결이 안되서 object storage(s3)를 연결함 spark history server에는 driver 수동 종료시에만 값이 뜸 실시간이나 OOM나면 history server 사용 못함 spark operator 버전 문제인거같음. 쿠버 1.21에서는 되는데 1.22에서는 안됨

너무 두서없이 적은거같은데 계속 업데이트하겠습니다ㅎ

참조

https://github.com/cloudcheflabs/dataroaster/blob/master/components/hive/spark-thrift-server/src/main/java/com/cloudcheflabs/dataroaster/hive/SparkThriftServerRunner.java

https://itnext.io/hive-on-spark-in-kubernetes-115c8e9fa5c1

영어 글인데 한국분이셨어요. 기회가 되면 기프티콘이라도 보내드려야겠어요

위 블로그에는 아래 사항으르 순서대로 설명해줍니다.

from http://mightytedkim.tistory.com/46 by ccl(A) rewrite - 2021-12-20 00:01:22