MaxCompute Spark resource usage optimization

MaxCompute Spark resource usage optimization

1. Overview

This article mainly explains MaxCompute Spark resource tuning. The purpose is to guide users to better optimize the use of Spark job resources, maximize resource utilization, and reduce costs while ensuring the normal operation of Spark tasks.

2. Sensor

Sensor provides a visual way to monitor the running Spark process. Each worker (Executor) and master (Driver) has its own status monitoring graph, and the entry can be found through Logview, as shown in the following figure:

After opening the Sensor, you can see the following figure that provides the CPU and memory usage of the Driver/Executor during its life cycle:
cpu_plan/mem_plan (blue line) represents the CPU and memory plan requested by the user

Users can intuitively see the CPU utilization of the task from the cpu_usage graph
mem_usage represents the memory usage during task execution, which is the sum of mem_rss and page cache. See below for details.

Memory Metrics

mem_rss represents the resident memory occupied by the process. This part of memory is the actual memory used by the Spark task. It usually requires the user's attention. If the memory exceeds the amount of memory requested by the user, OOM may occur, causing the Driver/Executor process to terminate. In addition, this curve can also be used to guide users to optimize memory. If the actual usage is much less than the amount requested by the user, you can reduce memory requests, maximize resource utilization, and reduce costs.

mem_cache (page_cache) is used to cache data in disk into memory, thereby reducing disk I/O operations. It is usually managed by the system. If the physical machine has sufficient memory, mem_cache may be used a lot, and users do not need to worry about the allocation and recovery of this memory.

3. Resource parameter tuning

(1) Executor Cores

Related parameters: spark.executor.cores
The number of cores in each Executor, that is, the number of tasks that can run simultaneously in each Executor
The maximum parallelism of a Spark task is num-executors * executor-cores
When Spark tasks are executed, a CPU core can only execute one task at a time. If there are enough CPU cores, these tasks can usually be executed quickly and efficiently. At the same time, please note that the memory of each Executor is shared by multiple Tasks. If a single Executor has too many cores and too little memory, OOM is likely to occur.

(2) Executor Number

Related parameters: spark.executor.instances
This parameter is used to set the total number of Executor processes that Spark jobs will use to execute. Usually, users can decide how many Executors they need to apply for based on the complexity of the task.
In addition, it should be noted that if the Executor disk space is insufficient or some Executors are out of memory, you can reduce the number of cores of a single Executor and increase the number of Executor instances to ensure that the overall parallelism of the task remains unchanged and reduce the risk of task failure.

(3) Executor Memory

Related parameters: spark.executor.memory
This parameter is used to set the memory of each Executor process. The size of the Executor memory often directly determines the performance of the Spark job, and JVM OOM is more common in the Executor.
Related parameter 2: spark.executor.memoryOverhead
Set the off-heap memory for the Executor, which is mainly used for the JVM itself, string, NIO Buffer and other overheads. Note that memoryOverhead is not used for calculations and cannot be directly operated by user code or Spark.
If this value is not set, the default is spark.executor.memory * 0.10, with a minimum of 384 MB.
The manifestation of insufficient Executor memory is as follows:
Cannot allocate memory appears in the Executor log (Logview->A worker->StdErr)

The first line of the Logview result after the task is completed appears: The job has been killed by "OOM Killer", please check your job's memory usage.

Memory usage is very high in Sensor

java.lang.OutOfMemoryError: Java heap space appears in the Executor log
GC overhead limit exceeded appears in the Executor log

Frequent GC information found in the Spark UI may be an indirect manifestation of OOM: some Executors have errors such as No route to host: workerd********* / Could not find CoarseGrainedScheduler. Possible causes and solutions:
Limit executor parallelism and reduce cores: Multiple tasks running simultaneously will share the memory of an executor, which reduces the memory available for a single task. Reducing parallelism can alleviate memory pressure. Increase the memory of a single executor. Increase the number of partitions to reduce the load of each executor. Consider data skew, because data skew causes insufficient memory for a task, while other tasks have sufficient memory. If the above-mentioned "Cannot allocate memory" or "The job has been killed by "OOM Killer" occurs, please check your job's memory usage. This situation is usually due to insufficient system memory. You can appropriately increase some off-heap memory to alleviate memory pressure. Usually, setting spark.executor.memoryOverhead to 1g/2g is sufficient.

(4) Driver Cores

Related parameters spark.driver.cores
Usually, the number of Driver Cores does not need to be too large. However, if the task is complex (such as too many stages and tasks) or there are too many Executors (the Driver needs to communicate with each Executor and maintain heartbeats), and the CPU utilization rate in the Sensor is very high, then you may need to increase the number of Driver Cores appropriately.
Also note that when running Spark tasks in Yarn-Cluster mode, you cannot set the Driver resource configuration (core/memory) directly in the code, because this parameter is required when the JVM is started. Therefore, you need to set it through the --driver-memory command line option or in the spark-defaults.conf file/DataWorks configuration item.

(5) Driver Memory

Related parameter 1: spark.driver.memory
Set the heap memory for the Driver, similar to the executor related parameter 2: spark.driver.maxResultSize
Represents the total size limit of the result of each Spark action (such as collect), the default value is 1g. If the total size exceeds this limit, the job will be aborted. If the value is too high, it may cause OOM in the Driver, so the user needs to set an appropriate value based on the actual situation of the job.
Related parameter 3: spark.driver.memoryOverhead
Set the off-heap memory for Driver, similar to executor
The memory of the Driver usually does not need to be too large. If the Driver is out of memory, it is usually because the Driver has collected too much data. If you need to use the collect operator to pull all RDD data to the Driver for processing, you must ensure that the memory of the Driver is large enough.
Manifestation:

The Spark application becomes unresponsive or stops directly. A Driver OutOfMemory error is found in the Driver log (Logview->Master->StdErr).
Frequent GC information is found in the Spark UI. Memory usage is very high in the Sensor. Cannot allocate memory appears in the Driver log.

Possible causes and solutions:
The code may use the collect operation to collect a data set that is too large to the Driver node. The code may create an array that is too large, or load a data set that is too large to the Driver process for aggregation.
SparkContext and DAGScheduler are both running on the Driver side. The corresponding RDD stage segmentation is also running on the Driver side. If the program written by the user has too many steps and too many stages are segmented, this part of information consumes the memory of the Driver. At this time, the memory of the Driver needs to be increased. Sometimes, if there are too many stages, the Driver side may even have a stack overflow.

(6) Local disk space

Related parameters: spark.hadoop.odps.cupid.disk.driver.device_size:
This parameter represents the disk space size requested for a single Driver or Executor. The default value is 20g and the maximum supported is 100g.
Shuffle data and BlockManager overflow data are stored on disk

Symptoms of insufficient disk space:
No space left on device error found in Executor/Driver log

Solution:

The easiest way is to directly add more disk space and increase spark.hadoop.odps.cupid.disk.driver.device_size
If the error still occurs after increasing to 100g, it may be due to data skew. The data may be distributed in certain blocks during the shuffle or cache process. Or the shuffle data volume of a single Executor may be too large. You can try:

Repartition the data to solve the data skew problem

Reduce the task concurrency of a single Executor spark.executor.cores

Reduce the concurrent reading of the table spark.hadoop.odps.input.split.size

Increase the number of executors spark.executor.instances

Note:

Also, because the disk needs to be mounted before the JVM is started, this parameter must be configured in the spark-defaults.conf file or the dataworks configuration item, and cannot be configured in the user code. In addition, it should be noted that the unit of this parameter is g, and g cannot be omitted.
Many times, due to the user's incorrect configuration position or the absence of the unit g, the parameters do not actually take effect and the task still fails.

4. Conclusion

The above mainly introduces the resource shortage problem that may be encountered during the use of MaxCompute Spark and the corresponding solution ideas. In order to maximize the utilization of resources, it is first recommended to apply for a single worker resource in a ratio of 1: 4, that is, 1 core: 4 gb memory. If OOM occurs, you need to check the log and sensor to preliminarily locate the problem, and then make corresponding optimizations and resource adjustments. It is not recommended to set too many cores for a single executor. Usually, a single executor is relatively safe at 2-8 cores. If it exceeds 8, it is recommended to increase the number of instances. Appropriately increasing the off-heap memory (reserving some memory resources for the system) is also a common tuning method, which can usually solve many OOM problems in practice. Finally, users can refer to the official document https://spark.apache.org/docs/2.4.5/tuning.html, which contains more memory tuning techniques, such as gc optimization, data serialization, etc.

<<:  5G accelerates the energy revolution, and future development trends are becoming increasingly clear

>>:  How to quickly build an enterprise full-scenario database management platform in one stop?

Recommend

Huawei's Smart City Nervous System shines at CIIE

From August 21st to 23rd, the 2018 (4th) China Sm...

...

5G bidding is finalized, and competition is changing again

[[417538]] 2021 is the third year of 5G commercia...

Cisco ACI original core technology expert reveals the birth of ACI

The official development of ACI began in January ...

Why is the WiFi signal full but the internet speed is still slow?

I believe many of my friends have encountered suc...

Pre-terminated trunk copper cable and method of using the same

High-density cabling products and standard modula...