Showing posts with label hadoop. Show all posts
Showing posts with label hadoop. Show all posts

Wednesday, July 11, 2018

MR/TEZ作业数据倾斜导致OOM问题排查思路

对于如下sql,可能会出现最后几个reducer task失败,日志显示OutOfMemory Exception.
select *
from a
join b
on a.uid = b.uid


这种情况一般是由于join的key存在严重倾斜导致的,所以需要分别看下在a表和b表里,uid的分布情况:
select uid, count(1) as cnt
from a
group by uid
order by cnt desc
limit 1000
实际情况可能是如下图所示。


如果在sql中通过where filter去掉这些uid,则任务成功。对于这些倾斜的value,可以分开单独处理(通过增加reducer内存等方式)。

Friday, July 6, 2018

udf包污染导致tez任务在am端log4j stackoverlow问题排查过程

发现有log4j-over-slf4j,和slf4j-log4j12, 两者会循环引用,导致log4j相关的stackoverflow.

报错如下:
Exception in thread "main" java.lang.StackOverflowError
at java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
at org.apache.log4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:39)
at org.apache.log4j.LogManager.getLogger(LogManager.java:45)
at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:64)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:358)
at org.apache.log4j.Category.<init>(Category.java:57)
at org.apache.log4j.Logger.<init>(Logger.java:37)
at org.apache.log4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:43)
at org.apache.log4j.LogManager.getLogger(LogManager.java:45)
at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:64)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:358)
at org.apache.log4j.Category.<init>(Category.java:57)
at org.apache.log4j.Logger.<init>(Logger.java:37)
at org.apache.log4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:43)
at org.apache.log4j.LogManager.getLogger(LogManager.java:45)
at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:64)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:358)
at org.apache.log4j.Category.<init>(Category.java:57)
at org.apache.log4j.Logger.<init>(Logger.java:37)
at org.apache.log4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:43)
at org.apache.log4j.LogManager.getLogger(LogManager.java:45)
at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:64)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:358)
at org.apache.log4j.Category.<init>(Category.java:57)
at org.apache.log4j.Logger.<init>(Logger.java:37)
at org.apache.log4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:43)
at org.apache.log4j.LogManager.getLogger(LogManager.java:45)
at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:64)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:358)
at org.apache.log4j.Category.<init>(Category.java:57)
at org.apache.log4j.Logger.<init>(Logger.java:37)
at org.apache.log4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:43)
at org.apache.log4j.LogManager.getLogger(LogManager.java:45)
at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:64)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:358)

exclude掉log4j-over-slf4j和log-to-slf4j即可。

反思:在udf中引用任何包后都要通过`mvn dependency:tree`和`jar -tf ...`命令查看引用的包包含的package和namespace,以防止后期的包污染问题扩散。

REF 

Thursday, May 3, 2018

Hive query(只select,不insert table/partition)时产生大量小文件问题定位思路

行一个简单的select+filter+limit语句,因为filter中带non-partition field, 所以会启动MR(相关参考: Hive带non-parttiion-filter的query自动转化为local FetchTask问题)
在hive终端,发现在MR job执行succeed之后(yarn观察也是succeed),会一直卡在INFO: OK日志后,好像拉取结果时卡住了。
通过jstack打印线程栈如下,卡在了从NameNode拉取DFS文件元信息。
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at org.apache.hadoop.ipc.Client.call(Client.java:1463)
- locked <0x00000003377bf118> (a org.apache.hadoop.ipc.Client$Call)
at org.apache.hadoop.ipc.Client.call(Client.java:1409)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
at com.sun.proxy.$Proxy29.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:256)
at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
at com.sun.proxy.$Proxy30.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1279)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1266)
at org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:1324)
at org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:237)
at org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:233)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:233)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:224)
at org.apache.hadoop.fs.FilterFileSystem.getFileBlockLocations(FilterFileSystem.java:148)
at org.apache.hadoop.fs.viewfs.ChRootedFileSystem.getFileBlockLocations(ChRootedFileSystem.java:211)
at org.apache.hadoop.fs.viewfs.ViewFileSystem.getFileBlockLocations(ViewFileSystem.java:330)
at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1776)
at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1759)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:270)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
at org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:45)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextSplits(FetchOperator.java:372)
at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:304)
at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:459)
at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:428)
at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:147)
at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:2213)
at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:253)
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:184)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:403)
at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:821)
at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:759)
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:686)
观察发现mr job一共启动了30k个mapper,0个reducer。相应的,会产生30k个小文件,NameNode压力会很大。
如此发现,即使开启了hive.merge.mapfileshive.merge.mapredfiles(相关参考:Hive的mr作业产生很多小文件或空文件的解决方案),对于这种select的query,小文件依旧很多。
解决思路有二: 
1. 在上述语句后加order by,字段随意。这样会产生一个reducer,不会去读所有的小文件元信息。 
2. 将所有这种select语句都转换为写temporary table,这样小文件会根据上述两个参数自动合并,再从这个table里select即可。

Monday, April 16, 2018

Hive的mr作业产生很多小文件或空文件的解决方案

据hive关于merge file的官方文档(keyword: hive.merge.*), 设置如下4个参数即可:
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.smallfiles.avgsize=16000000;
set hive.merge.size.per.task=67108864;
引用一段具体的scenario description:
By default hive.merge.smallfiles.avgsize=16000000 and hive.merge.size.per.task=256000000, so if the average file size is about 17MB, the merge job will not be triggered. Sometimes if we really want only 1 file being generated in the end, we need to increase hive.merge.smallfiles.avgsize to large enough to trigger the merge; and also you need to increase hive.merge.size.per.task to the get the needed number of files in the end.
REFERENCE:

HiveOnSpark系列:spark jobs partition数量调优问题


HiveOnSpark上跑Spark application时,发现部分stage对应的task数量很少,导致full-gc严重。例如下图中的stage只有17个tasks在跑。即使设置了spark.sql.shuffle.partitions=1201和spark.default.parallelism=1202也没有用,依旧是17个。

通过trace spark源码发现mapred.reduce.tasks参数虽然已经deprecated,但会优先spark.sql.shuffle.partitions设置到环境变量中。(spark-2.0:SetCommand:44)



同时,根据同事给的hadoop参数优化项里hive.exec.reducers.bytes.per.reducer,把两者按如下配置后,4个stage启动的tasks数量分别从82/33/17/526变为了82/1201/1201/1201,大大增加了partition数量并as expected. 由此可见,HiveOnSpark时,Spark的execution plan应该还是走了Hive的逻辑,所以部分hadoop相关的参数会主导spark的执行计划的生成逻辑和结果


set hive.exec.reducers.bytes.per.reducer=67108864;
set mapred.reduce.tasks=1201;



P.S. 
Hive version: 0.23 
Spark version: 2.0.3

REFERENCE:
  • https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.exec.reducers.bytes.per.reducer