Currently, I'm helping one of my colleagues out a Spark scenario: there are two datasets residing on HDFS. one (alias A) is relatively small in size, with approximately 20,000 line count. Whereas the other one (alias B) is remarkably huge in size, with about 3,000,000,000 lines. The requirement is to calculate the line count of B, whose spid (a kind of id) is in A.
The code is as follows:
val currentDay = args(0) val conf = new SparkConf().setAppName("Spark-MonitorPlus-LogStatistic") val sc = new SparkContext(conf) //---RDD A transforming to RDD[(spid, spid)]--- val spidRdds = sc.textFile("/diablo/task/spid-date/" + currentDay + "-spid-media").map(line => line.split(",")(0).trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32)); val logRdds: RDD[(LongWritable, Text)] = MzFileUtils.getFileRdds(sc, currentDay, "") val logMapRdds = MzFileUtils.mapToMzlog(logRdds) //---RDD B transforming to RDD[(spid, spid)]--- val tongYuanRdd = logMapRdds.filter(kvs => kvs("plt") == "0" && kvs("tp") == "imp").map(kvs => kvs("p").trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32)); val filteredTongYuanRdd = tongYuanRdd.join(spidRdds); println("Total TongYuan Imp: " + filteredTongYuanRdd.count())
It needs to be noticed regarding the usage of both `.partitionBy(new HashPartitioner(32))`. This will guarantee the same key (spid) will be allocated to the same executors when RDD_B joins RDD_A, which is called 'co-partition'. Detailed information on co-partition and co-location is described in REFERENCE_8 (Search keyword 'One is data co-partition, and then data co-location' in that webpage).
Moreover, the above code has a severe bug. If RDD_A has two elements, both of which are (1, 1), whereas RDD_B also has 3 elements of (1, 1). Then after `join` operation, the count will be 2*3=6, which apparently is incorrect when comparing to the correct answer: 3. Thus we need to distinct RDD_A before `join`.
Moreover, the above code has a severe bug. If RDD_A has two elements, both of which are (1, 1), whereas RDD_B also has 3 elements of (1, 1). Then after `join` operation, the count will be 2*3=6, which apparently is incorrect when comparing to the correct answer: 3. Thus we need to distinct RDD_A before `join`.
val filteredTongYuanRdd = tongYuanRdd.join(spidRdds.distinct()); println("Total TongYuan Imp: " + filteredTongYuanRdd.count())
When running this code via command `spark-submit --class "com.hide.diablo.sparktools.core.LogAnalysis" --master yarn-cluster --num-executors 32 --driver-memory 8g --executor-memory 4g --executor-cores 4 --queue root.diablo diablo.spark-tools-1.0-SNAPSHOT-jar-with-dependencies.jar 20150515`, there is substantial non-trivial information displayed on Spark UI.
Firstly, navigate to the portal of spark UI for this spark task via applicationId on Yarn monitoring webpage. The following content is all based on Spark 1.2.0.
Firstly, navigate to the portal of spark UI for this spark task via applicationId on Yarn monitoring webpage. The following content is all based on Spark 1.2.0.
After entering Spark UI, we are at 'Jobs' tab as default, in which we could see all our jobs generated by Spark according to our transformations on RDD.
When expanding a specific job, it shows all the stages information.
As we can see, there are two maps and one count (include the `join` transformation) stage. Next, expanding into a specific stages, there are two tables exhibiting metrics on executors and tasks respectively.
Apart from 'Jobs' tab, we could also check out all thread dumps for every executor on 'Executors' tab. As a concrete example, the following executor is stuck on `epollWait`, which means it is at the procedure of network IO transmission.
After sketching through most of essential parts of Spark UI, we come back moving on to the above code execution procedure. All stages acts normal until running on 'count' stage, some fails and retries occurs.
Particularly, GC time is relatively high (above 1min) and errors are as below:
According to REFERENCE_3, we need to enlarge the memory of executors by increasing execution parameter '--executor-memory' to a bigger one, in my case, '14g'. (At this moment, there's an episode that error 'java.lang.IllegalArgumentException: Required executor memory (16384+384 MB) is above the max threshold (15360 MB) of this cluster' may complain after invoking `spark-submit` command, which is solved in REFERENCE_7 and could be referred to a post of mine regarding parameter 'yarn.scheduler.maximum-allocation-mb')
At this time, the spark task is completed in 57min without any OOM complaining.
The above join method is of type reduce-side join, which should be applied on two RDDs whose size are both very large. In this scenario, RDD_A is relatively small, thus a map-side join will increase performance dramatically by diminishing shuffle spilling (For more details on map/reduce-side join, refer to REFERENCE_4 (keyword 'broadcast variables') and REFERENCE_9). Take the reduce-side join case as an example, you could see that shuffle spill (memory) and shuffle spill (disk) is very high:
FYI: Shuffle spill (memory) is the size of the deserialized form of the data in memory at the time when we spill it, whereas shuffle spill (disk) is the size of the serialized form of the data on disk after we spill it. This is why the latter tends to be much smaller than the former. Note that both metrics are aggregated over the entire duration of the task (i.e. within each task you can spill multiple times).
The enhanced solution is to implement via map-side join, which takes advantage of broadcast variable feature in Spark.
In this way, shuffle spilling is drastically mitigated and the execution time reduced to 36min.
After sketching through most of essential parts of Spark UI, we come back moving on to the above code execution procedure. All stages acts normal until running on 'count' stage, some fails and retries occurs.
Particularly, GC time is relatively high (above 1min) and errors are as below:
org.apache.spark.shuffle.FetchFailedException: Failed to connect toXXXXX/XXXXX:XXXXX java.lang.OutOfMemoryError: GC overhead limit exceeded java.lang.OutOfMemoryError: Java heap space
According to REFERENCE_3, we need to enlarge the memory of executors by increasing execution parameter '--executor-memory' to a bigger one, in my case, '14g'. (At this moment, there's an episode that error 'java.lang.IllegalArgumentException: Required executor memory (16384+384 MB) is above the max threshold (15360 MB) of this cluster' may complain after invoking `spark-submit` command, which is solved in REFERENCE_7 and could be referred to a post of mine regarding parameter 'yarn.scheduler.maximum-allocation-mb')
At this time, the spark task is completed in 57min without any OOM complaining.
The above join method is of type reduce-side join, which should be applied on two RDDs whose size are both very large. In this scenario, RDD_A is relatively small, thus a map-side join will increase performance dramatically by diminishing shuffle spilling (For more details on map/reduce-side join, refer to REFERENCE_4 (keyword 'broadcast variables') and REFERENCE_9). Take the reduce-side join case as an example, you could see that shuffle spill (memory) and shuffle spill (disk) is very high:
The enhanced solution is to implement via map-side join, which takes advantage of broadcast variable feature in Spark.
val conf = new SparkConf().setAppName("Spark-MonitorPlus-LogStatistic") val sc = new SparkContext(conf) //---RDD A transforming to RDD[(spid, spid)]--- val spidRdds = sc.textFile("/diablo/task/spid-date/" + currentDay + "-spid-media").map(line => line.split(",")(0).trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32)); val logRdds: RDD[(LongWritable, Text)] = MzFileUtils.getFileRdds(sc, currentDay, "") val logMapRdds = MzFileUtils.mapToMzlog(logRdds) //---RDD B transforming to RDD[(spid, spid)]--- val tongYuanRdd = logMapRdds.filter(kvs => kvs("plt") == "0" && kvs("tp") == "imp").map(kvs => kvs("p").trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32)); val globalSpids = sc.broadcast(spidRdds.collectAsMap()); val filteredTongYuanRdd = tongYuanRdd.mapPartitions({ iter => val m = globalSpids.value for { (spid, spid_cp) <- iter if m.contains(spid) } yield spid }, preservesPartitioning = true); println("Total TongYuan Imp: " + filteredTongYuanRdd.count())
In this way, shuffle spilling is drastically mitigated and the execution time reduced to 36min.
REFERENCE:
6. Tuning Spark