Monday, June 29, 2015

Roaming Through Spark UI And Tune Performance Upon A Specific Use Case

Currently, I'm helping one of my colleagues out a Spark scenario: there are two datasets residing on HDFS. one (alias A) is relatively small in size, with approximately 20,000 line count. Whereas the other one (alias B) is remarkably huge in size, with about 3,000,000,000 lines. The requirement is to calculate the line count of B, whose spid (a kind of id) is in A.

The code is as follows:
val currentDay = args(0)

val conf = new SparkConf().setAppName("Spark-MonitorPlus-LogStatistic")
val sc = new SparkContext(conf)

//---RDD A transforming to RDD[(spid, spid)]---
val spidRdds = sc.textFile("/diablo/task/spid-date/" + currentDay + "-spid-media").map(line =>
line.split(",")(0).trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));
val logRdds: RDD[(LongWritable, Text)] = MzFileUtils.getFileRdds(sc, currentDay, "")
val logMapRdds = MzFileUtils.mapToMzlog(logRdds)

//---RDD B transforming to RDD[(spid, spid)]---
val tongYuanRdd = logMapRdds.filter(kvs => kvs("plt") == "0" && kvs("tp") == "imp").map(kvs => kvs("p").trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));

val filteredTongYuanRdd = tongYuanRdd.join(spidRdds);
println("Total TongYuan Imp: " + filteredTongYuanRdd.count())

It needs to be noticed regarding the usage of both `.partitionBy(new HashPartitioner(32))`. This will guarantee the same key (spid) will be allocated to the same executors when RDD_B joins RDD_A, which is called 'co-partition'. Detailed information on co-partition and co-location is described in REFERENCE_8 (Search keyword 'One is data co-partition, and then data co-location' in that webpage).

Moreover, the above code has a severe bug. If RDD_A has two elements, both of which are (1, 1), whereas RDD_B also has 3 elements of (1, 1). Then after `join` operation, the count will be 2*3=6, which apparently is incorrect when comparing to the correct answer: 3. Thus we need to distinct RDD_A before `join`.
val filteredTongYuanRdd = tongYuanRdd.join(spidRdds.distinct());
println("Total TongYuan Imp: " + filteredTongYuanRdd.count())

When running this code via command `spark-submit --class "com.hide.diablo.sparktools.core.LogAnalysis" --master yarn-cluster --num-executors 32 --driver-memory 8g --executor-memory 4g --executor-cores 4 --queue root.diablo diablo.spark-tools-1.0-SNAPSHOT-jar-with-dependencies.jar 20150515`, there is substantial non-trivial information displayed on Spark UI.

Firstly, navigate to the portal of spark UI for this spark task via applicationId on Yarn monitoring webpage. The following content is all based on Spark 1.2.0.

After entering Spark UI, we are at 'Jobs' tab as default, in which we could see all our jobs generated by Spark according to our transformations on RDD.

When expanding a specific job, it shows all the stages information.

As we can see, there are two maps and one count (include the `join` transformation) stage. Next, expanding into a specific stages, there are two tables exhibiting metrics on executors and tasks respectively.


Apart from 'Jobs' tab, we could also check out all thread dumps for every executor on 'Executors' tab. As a concrete example, the following executor is stuck on `epollWait`, which means it is at the procedure of network IO transmission.


After sketching through most of essential parts of Spark UI, we come back moving on to the above code execution procedure. All stages acts normal until running on 'count' stage, some fails and retries occurs.


Particularly, GC time is relatively high (above 1min) and errors are as below:
org.apache.spark.shuffle.FetchFailedException: Failed to connect toXXXXX/XXXXX:XXXXX
java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: Java heap space

According to REFERENCE_3, we need to enlarge the memory of executors by increasing execution parameter '--executor-memory' to a bigger one, in my case, '14g'. (At this moment, there's an episode that error 'java.lang.IllegalArgumentException: Required executor memory (16384+384 MB) is above the max threshold (15360 MB) of this cluster' may complain after invoking `spark-submit` command, which is solved in REFERENCE_7 and could be referred to a post of mine regarding parameter 'yarn.scheduler.maximum-allocation-mb')

At this time, the spark task is completed in 57min without any OOM complaining.

The above join method is of type reduce-side join, which should be applied on two RDDs whose size are both very large. In this scenario, RDD_A is relatively small, thus a map-side join will increase performance dramatically by diminishing shuffle spilling (For more details on map/reduce-side join, refer to REFERENCE_4 (keyword 'broadcast variables') and REFERENCE_9). Take the reduce-side join case as an example, you could see that shuffle spill (memory) and shuffle spill (disk) is very high:


FYI: Shuffle spill (memory) is the size of the deserialized form of the data in memory at the time when we spill it, whereas shuffle spill (disk) is the size of the serialized form of the data on disk after we spill it. This is why the latter tends to be much smaller than the former. Note that both metrics are aggregated over the entire duration of the task (i.e. within each task you can spill multiple times).

The enhanced solution is to implement via map-side join, which takes advantage of broadcast variable feature in Spark.
val conf = new SparkConf().setAppName("Spark-MonitorPlus-LogStatistic")
val sc = new SparkContext(conf)

//---RDD A transforming to RDD[(spid, spid)]---
val spidRdds = sc.textFile("/diablo/task/spid-date/" + currentDay + "-spid-media").map(line =>
line.split(",")(0).trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));
val logRdds: RDD[(LongWritable, Text)] = MzFileUtils.getFileRdds(sc, currentDay, "")
val logMapRdds = MzFileUtils.mapToMzlog(logRdds)

//---RDD B transforming to RDD[(spid, spid)]---
val tongYuanRdd = logMapRdds.filter(kvs => kvs("plt") == "0" && kvs("tp") == "imp").map(kvs => kvs("p").trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));

val globalSpids = sc.broadcast(spidRdds.collectAsMap());
val filteredTongYuanRdd = tongYuanRdd.mapPartitions({
  iter =>
    val m = globalSpids.value
    for {
      (spid, spid_cp) <- iter
      if m.contains(spid)
    } yield spid
}, preservesPartitioning = true);
println("Total TongYuan Imp: " + filteredTongYuanRdd.count())

In this way, shuffle spilling is drastically mitigated and the execution time reduced to 36min.







REFERENCE:


Friday, June 12, 2015

Troubleshooting On SSH hangs and Machine Stuck When Massive IO-Intensive Processes Are Running

It is inevitable that some users will write and execute devastating code inadvertently on public service-providing machines. Hence, we should enhance the robustness of our machine to the greatest extent.

Today, one of our users forked processes as many as possible, all executing `hadoop get`. As a result, it ate up all of our IO resources even though I've constrained CPU and Memory via `cgroups` upon this specific user. At this time, the phenomenon is that we could neither interact with the prompt nor ssh to this machine anymore. I managed to invoke `top` command before it was fully stuck. This is what it shows:
top - 18:26:10 up 238 days,  5:43,  3 users,  load average: 1782.01, 1824.47, 1680.36
Tasks: 1938 total,   1 running, 1937 sleeping,   0 stopped,   0 zombie
Cpu(s):  2.4%us,  3.0%sy,  0.0%ni,  0.0%id, 94.5%wa,  0.0%hi,  0.0%si,  0.0%st
Mem:  65923016k total, 65698400k used,   224616k free,    13828k buffers
Swap: 33030136k total, 17799704k used, 15230432k free,   157316k cached

As you can see, load average is at an unacceptable peak, and %wa is above 90 (%wa - iowait: Amount of time the CPU has been waiting for I/O to complete). In the meantime, memory has swallowed almost all memory with 17GB swap space being used. Obviously, this is due to the too many processes executing `hadoop get` command.

Eventually, I set the maximum number of processes and open files that the user can fork and open respectively. In this way, conditions will be alleviated to an acceptable scenario.

The configuration is at '/etc/security/limits.conf', we have to edit it in root user. Append the following content to the file:
username      soft    nofile  5000
username      hard    nofile  5000
username      soft    nproc   100
username      hard    nproc   100

Also, there's a nice post regarding how to catch the culprit causing high IO wait in linux.

Reference:
1. Limiting Maximum Number of Processes Available for the Oracle User
2. how to change the maximum number of fork process by user in linux
3. ulimit: difference between hard and soft limits
4. High on %wa from top command, is there any way to constrain it
5. How to ensure ssh via cgroups on centos
6. fork and exec in bash