[Spark] 잡 종료되는 이슈 대응을 위한 옵션 튜닝
아래와 같이 Lambda를 통해 EMR 을 기동하고 있으며 datadog agent가 설치된 이미지에 EMR을 기동하고 있다.
import json
import boto3
client=boto3.client('emr')
s3=boto3.resource('s3')
emr_config_json=''
def lambda_handler(event, context):
emr_create()
return { 'statusCode': 200, 'body': 'emr create success!!!' }
def emr_create():
get_emr_config()
response = client.run_job_flow(
Name= "DEV-EMR",
LogUri= "s3://test-bucket/logs/",
ReleaseLabel= "emr-6.5.0" ,
Instances={
"Ec2KeyName":"test",
#"Ec2SubnetId":"subnet-..",
"Ec2SubnetId":"subnet-..",
"EmrManagedMasterSecurityGroup":"sg-..",
"EmrManagedSlaveSecurityGroup":"sg-..",
"ServiceAccessSecurityGroup":"sg-..",
"AdditionalMasterSecurityGroups": ["sg-.."],
"KeepJobFlowAliveWhenNoSteps": True,
# datadog image
"InstanceGroups": [{"InstanceCount":4,"CustomAmiId":"ami-..","EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":120,"VolumeType":"gp3","Throughput":300},"VolumesPerInstance":1}]},"InstanceRole":"CORE","InstanceType":"c6g.12xlarge","Name":"Core - 2"},{"InstanceCount":1,"CustomAmiId":"ami-..","EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":64,"VolumeType":"gp3","Iops":6000},"VolumesPerInstance":1}]},"InstanceRole":"MASTER","InstanceType":"c6g.4xlarge","Name":"Master - 1"}]
#"InstanceGroups": [{"InstanceCount":4,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":120,"VolumeType":"gp3","Throughput":300},"VolumesPerInstance":1}]},"InstanceRole":"CORE","InstanceType":"c6g.12xlarge","Name":"Core - 2"},{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":64,"VolumeType":"gp3","Iops":6000},"VolumesPerInstance":1}]},"InstanceRole":"MASTER","InstanceType":"c6g.4xlarge","Name":"Master - 1"}]
},
ManagedScalingPolicy={
"ComputeLimits": {
"MaximumCapacityUnits": 20,
"MaximumCoreCapacityUnits": 4,
"MaximumOnDemandCapacityUnits": 4,
"MinimumCapacityUnits": 4,
"UnitType": "Instances"
}
},
Applications = [{"Name": "Hadoop"},{"Name": "Hive"},{"Name": "ZooKeeper"},{"Name": "Sqoop"},{"Name": "Zeppelin"},{"Name": "Tez"},{"Name": "Spark"},{"Name": "Hue"}],
Configurations = emr_config_json,
JobFlowRole = "EMR_EC2_DefaultRole",
ServiceRole = "EMR_DefaultRole",
AutoScalingRole="EMR_AutoScaling_DefaultRole",
Steps=[
{
"Name": "test.sh",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "s3://ap-northeast-2.elasticmapreduce/libs/script-runner/script-runner.jar",
"Args": [
"s3://test-bucket/script/etc/test.sh"
]
}
}
],
Tags=[
{
"Key": "Name",
"Value": "DEV-EMR"
}
],
EbsRootVolumeSize=30
)
def get_emr_config():
content_object = s3.Object('test-bucket', 'config/emr_config.json')
file_content = content_object.get()['Body'].read().decode('utf-8')
global emr_config_json
emr_config_json = json.loads(file_content)
# 서버 스펙(c6g.12xlarge) : 16 core, mem 30 GB
# spark-defaults.conf
spark.driver.memory : 2048M
spark.executor.memory : 4743M
spark.executor.cores : 3
spark.emr.default.executor.memory : 4743M
spark.emr.default.executor.cores : 3
spark.dynamicAllocation.enabled : true
스파크로 잡을 돌렸으며 계속적으로 ExecutorLostFailure, RuntimeException 오류가 발생하며 잡이 실패 하였으며
데이터독으로 확인 했을때는 CPU 70%, MEM 30%, DISK 40% 사용하였다.
우선 spark.dynamicAllocation.enabled가 true이기에 excutor수는 자동으로 산정 되며
아래 공식 문서 참고하여 옵션을 수정 하였다.
# spark-defaults.conf
spark.driver.memory : 4743M
spark.executor.memory : 4743M
spark.executor.cores : 5
spark.emr.default.executor.memory : 4743M
spark.emr.default.executor.cores : 5
spark.dynamicAllocation.enabled : true
- AWS 공식 문서 - 최적의 설정 방안 : https://aws.amazon.com/ko/blogs/korea/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/
모니터링 결과,
잡이 성공 하였으나 아래와 같이 Failed Task가 있고, TASK 노드 2대 추가되면서 잡이 성공한것으로 보인다.
Yarn Nodes 를 보니 Active Node가 중간에 할당 받은 2대 노드 밖에 없는것을 볼 수 있으며
기존 노드가 제외된 원인이 Disk 부족인 것을 알 수있다. (M 은 마스터 노드, 노란 다른 그래프는 Task, Core 노드)
그럼 디스크를 차지하는것은 무엇일까?
[hadoop@ip-172-22-50-220 ~]$ df -h
Filesystem Size Used Avail Use% Mounted on
devtmpfs 47G 0 47G 0% /dev
tmpfs 47G 0 47G 0% /dev/shm
tmpfs 47G 496K 47G 1% /run
tmpfs 47G 0 47G 0% /sys/fs/cgroup
/dev/nvme0n1p1 30G 5.7G 25G 19% /
/dev/nvme0n1p128 10M 3.8M 6.3M 38% /boot/efi
/dev/nvme1n1p1 5.0G 48M 5.0G 1% /emr
/dev/nvme1n1p2 115G 104G 12G 90% /mnt
tmpfs 9.3G 0 9.3G 0% /run/user/1001
yarn 디렉토리가 대부분 차지 한다.
[hadoop@ip-172-22-50-220 ]$ cd /mnt
[hadoop@ip-172-22-50-220 mnt]$ sudo du -sh
103G .
[hadoop@ip-172-22-50-220 mnt]$ sudo du -sh var/
1.1G var/
[hadoop@ip-172-22-50-220 mnt]$ sudo du -sh hdfs/
2.8G hdfs/
[hadoop@ip-172-22-50-220 mnt]$ sudo du -sh mapred/
0 mapred/
[hadoop@ip-172-22-50-220 mnt]$ sudo du -sh yarn/
100G yarn/
[hadoop@ip-172-22-50-220 mnt]$ sudo du -sh tmp/
936K tmp/
[hadoop@ip-172-22-50-220 mnt]$ sudo du -sh s3/
0 s3/
yarn usercache가 많은 공간을 차지하고 있는것을 확인 하였으며 이는 컨테이너 실행중에 제거 되지 않는다고 한다.
[hadoop@ip-172-22-50-220 yarn]$ ll
total 0
drwxr-xr-x 2 yarn yarn 6 Mar 27 01:23 filecache
drwx------ 3 yarn yarn 44 Mar 27 03:25 nmPrivate
drwxr-xr-x 3 yarn yarn 22 Mar 27 01:42 usercache
[hadoop@ip-172-22-50-220 yarn]$ sudo du -sh filecache/
0 filecache/
[hadoop@ip-172-22-50-220 yarn]$ sudo du -sh nmPrivate/
0 nmPrivate/
[hadoop@ip-172-22-50-220 yarn]$ sudo du -sh usercache/
100G usercache/
[hadoop@ip-172-22-50-220 yarn]$ sudo du -sh usercache/zeppelin/appcache/
99G usercache/zeppelin/appcache/
그래서 EBS 볼륨을 늘리고 돌려보니 30분 걸려 실패한 잡이 17분만에 결과를 받을 수 있었다.
이슈는 발생 하지 않았지만 디스크를 상단 부분 차지하고 있어 아래 문서 참고 하여 캐시 제거 옵션을 변경 하고자 한다.
그 전에 디스크를 차지 하는 이유는 어떤것일까?
빠른 처리를 위해 yarn에서 로컬 디스크에 생성한 로컬 캐시가 원인이며
yarn에서 생성한 로컬 캐시의 경우 사용하지 않을때 'interval-ms' 주기로 체크하여 'target-size-mb' 크기 이상일때 삭제를 진행 한다.
# /etc/hadoop/conf/yarn-site.xml
yarn.nodemanager.localizer.cache.cleanup.interval-ms
yarn.nodemanager.localizer.cache.target-size-mb
설계상 Spark 드라이버는 코드 즉시 실행을 위해 컨테이너를 계속 가지고 있고 반환하지 않기에
yarn에서는 계속적으로 컨테이너를 사용하고 있다고 판단하여 해당 로컬캐시가 계속적으로 제거 되지 않고 남아 있어 디스크 이슈 발생하였다.
실제로 yarn application 을 아래 명령어로 삭제 했을시
$ sudo yarn application -kill [application id]
해당 캐시를 제거하며 디스크 공간 확보됨을 확인 하였다.
- spark job 디스크 이슈 관련 참고 문서 : https://repost.aws/ko/knowledge-center/user-cache-disk-space-emr
디스크 공간 확보를 위해 zeppelin에 아래와 같은 옵션을 적용하여 1분간격으로 체크하며 2분간 사용하지 않을 시점에 yarn application을 종료하여 컨테이너를 반환 하도록 하자!
<property>
<name>zeppelin.interpreter.lifecyclemanager.class</name>
<value>org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager</value>
<description>This is the LifecycleManager class for managing the lifecycle of interpreters. The interpreter terminates after the idle timeout period.</description>
</property>
<property>
<name>zeppelin.interpreter.lifecyclemanager.timeout.checkinterval</name>
<value>60000</value>
<description>The interval for checking whether the interpreter has timed out, in milliseconds.</description>
</property>
<property>
<name>zeppelin.interpreter.lifecyclemanager.timeout.threshold</name>
<value>120000</value>
<description>The idle timeout limit, in milliseconds.</description>
</property>
</configuration>
EMR 의 경우 아래와 같이 Json Config 를 주어 기동 시점에 설정 되도록 할 수 있다.
{
"Classification": "zeppelin-site",
"Properties": {
"zeppelin.interpreter.lifecyclemanager.class": "org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager",
"zeppelin.interpreter.lifecyclemanager.timeout.checkinterval": "60000",
"zeppelin.interpreter.lifecyclemanager.timeout.threshold": "60000"
}
}
정상적으로 동작하는지 확인 한다.
- AWS 공식 가이드 : https://repost.aws/knowledge-center/yarn-uses-resources-after-emr-spark-job
끝!!