Showing posts with label hive. Show all posts
Showing posts with label hive. Show all posts

Thursday, June 28, 2018

多个count(distinct)导致data skew的优化策略

hive中对于count(distinct)的执行逻辑大体是,在mapper端用HashSet将key去重后,全部发送给1个reducer再做去重,这样的问题在于会有单点问题。如果只有一个count(distinct),则通过设置SET hive.groupby.skewindata = true;可以使执行逻辑自动优化避免data skew. 但如果有两个及以上则上述参数不会起作用。
具体case:
如下sql每天会执行11h,切换成优化后的select count(1) from (select * from ... group by ...) tmp之后,只需要28min即可。
--before optimization
select
COUNT(DISTINCT (CASE WHEN field_a = 1 THEN field_b ELSE NULL END)) / COUNT(DISTINCT field_b)
from
table_name
where
label_spam_user = 0
--after optimization
with tmp1 as(
select
field_b
from table_name
where field_a = 1
group by field_b
)
, tmp2 as(
select
count(1) as cnt_1
from tmp1
)
, tmp3 as(
select
field_b
from table_name
group by field_b
)
, tmp4 as(
select
count(1) as cnt_2
from tmp3
)
select
cnt_1/cnt_2
from tmp2
join tmp4 on 1=1;

Tuesday, June 12, 2018

HiveOnTez: 包冲突问题排查思路


执行tez报错,观察YARN日志,Driver端报错为:File does not exist: hdfs://lt-nameservice3.sy/tmp/hive/app/_tez_session_dir/09ff9062-cc3e-4cb3-bc8d-77c275266d94/.tez/application_1528552108294_273009/tez.session.local-resources.pb java.io.FileNotFoundException
此时进入ApplicationMaster的log观察,根本报错内容如下,显然为guava包冲突导致(guava21以上会移除一些method接口不再向前兼容)。
Caused by: java.lang.NoSuchMethodError: com.google.common.base.Objects.toStringHelper(Ljava/lang/Object;)Lcom/google/common/base/Objects$ToStringHelper;
at org.apache.hadoop.metrics2.lib.MetricsRegistry.toString(MetricsRegistry.java:406)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:131)
at org.apache.hadoop.ipc.metrics.RpcMetrics.<init>(RpcMetrics.java:74)
at org.apache.hadoop.ipc.metrics.RpcMetrics.create(RpcMetrics.java:80)
at org.apache.hadoop.ipc.Server.<init>(Server.java:2213)
at org.apache.hadoop.ipc.RPC$Server.<init>(RPC.java:1029)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server.<init>(ProtobufRpcEngine.java:537)
at org.apache.hadoop.ipc.ProtobufRpcEngine.getServer(ProtobufRpcEngine.java:512)
at org.apache.hadoop.ipc.RPC$Builder.build(RPC.java:874)
at org.apache.tez.dag.api.client.DAGClientServer.createServer(DAGClientServer.java:127)
at org.apache.tez.dag.api.client.DAGClientServer.serviceStart(DAGClientServer.java:79)
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
at org.apache.tez.dag.app.DAGAppMaster$ServiceWithDependency.start(DAGAppMaster.java:1838)
at org.apache.tez.dag.app.DAGAppMaster$ServiceThread.run(DAGAppMaster.java:1859)

科普下Driver日志和ApplicationMaster日志查看位置:
如图,从YARN页面点击application id进入的页面为Driver页面,红框内容为Driver端报错信息;点击蓝框才会进入ApplicationMaster日志。


此时,在ApplicationMaster中搜索java.class.path可以拿到当前am环境下的所有classpath,从url中获取当前am所在节点,把所有jar包拉出来,找下包版本冲突所在jar包即可。如果为自己jar包里的dependency冲突,或shade,或exclude即可解决。

相关命令如下:

jar -tf platform_udf-1.0-SNAPSHOT.jar | grep -i com.google.common
javap -classpath platform_udf-1.0-SNAPSHOT.jar com.google.common.base.Objects


Monday, May 7, 2018

spark读取hive-site.xml无法识别里面spark相关参数问题

过如下语句启动spark-sql时,如果有spark相关参数在hive-site.xml中,并不会被load到spark environment里。spark只会从hive-site.xml中读取hive相关的参数(例如metastore信息等)。
/home/hadoop/software/spark/bin/spark-sql \
--master yarn \
--deploy-mode client \
--queue queue_1 \
--conf spark.rpc.message.maxSize=2047 \
--conf spark.yarn.dist.files="/path/to/hive-site.xml"
同理,即使将spark.yarn.dist.files行的配置换成了--files /path/to/hive-site.xml或者--properties-file /path/to/hive-site.xml也没有用(--properties-file的解释为"Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.conf.", 读取的文件内容不应该为xml格式)。
如果需要配置spark相关的信息,需要在SPARK_HOME/conf/spark-defaults.conf中配置。

Thursday, May 3, 2018

Hive query(只select,不insert table/partition)时产生大量小文件问题定位思路

行一个简单的select+filter+limit语句,因为filter中带non-partition field, 所以会启动MR(相关参考: Hive带non-parttiion-filter的query自动转化为local FetchTask问题)
在hive终端,发现在MR job执行succeed之后(yarn观察也是succeed),会一直卡在INFO: OK日志后,好像拉取结果时卡住了。
通过jstack打印线程栈如下,卡在了从NameNode拉取DFS文件元信息。
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at org.apache.hadoop.ipc.Client.call(Client.java:1463)
- locked <0x00000003377bf118> (a org.apache.hadoop.ipc.Client$Call)
at org.apache.hadoop.ipc.Client.call(Client.java:1409)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
at com.sun.proxy.$Proxy29.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:256)
at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
at com.sun.proxy.$Proxy30.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1279)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1266)
at org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:1324)
at org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:237)
at org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:233)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:233)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:224)
at org.apache.hadoop.fs.FilterFileSystem.getFileBlockLocations(FilterFileSystem.java:148)
at org.apache.hadoop.fs.viewfs.ChRootedFileSystem.getFileBlockLocations(ChRootedFileSystem.java:211)
at org.apache.hadoop.fs.viewfs.ViewFileSystem.getFileBlockLocations(ViewFileSystem.java:330)
at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1776)
at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1759)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:270)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
at org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:45)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextSplits(FetchOperator.java:372)
at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:304)
at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:459)
at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:428)
at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:147)
at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:2213)
at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:253)
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:184)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:403)
at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:821)
at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:759)
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:686)
观察发现mr job一共启动了30k个mapper,0个reducer。相应的,会产生30k个小文件,NameNode压力会很大。
如此发现,即使开启了hive.merge.mapfileshive.merge.mapredfiles(相关参考:Hive的mr作业产生很多小文件或空文件的解决方案),对于这种select的query,小文件依旧很多。
解决思路有二: 
1. 在上述语句后加order by,字段随意。这样会产生一个reducer,不会去读所有的小文件元信息。 
2. 将所有这种select语句都转换为写temporary table,这样小文件会根据上述两个参数自动合并,再从这个table里select即可。

Hive带non-parttiion-filter的query自动转化为local FetchTask问题

下参数设置为minimal/more之后,对于普通的select(和其他一些cases),task会转化为local的FetchTask直接拉数,不会走mapreduce。
set hive.fetch.task.conversion=minimal; set hive.fetch.task.conversion.threshold=10000000;
但对于一个partition是p_date和p_hourmin的table,如下语句依旧转化成了一个FetchTask(bin/hive执行下面SQL后会发现在打本地log,而不是启动mr job):
select * from tbl where user_id=123 and p_date='20180501' and p_hourmin = '19' limit 10;
但上述语句的问题在于,这个partition对应的大小是2T,转换为FetchTask肯定是不合理的。
debug思路: 
1. 跟到源代码对应位置,在所有执行分支入口打log,复现问题发生时的代码走势
2. 在有问题的分支方法入口,将参数serialize到本地文件,构造问题复现环境
经过上述两步后,发现在SimpleFetchOptimizer里data.hasOnlyPruningFilter()返回的true,所以也没有走到后面的代码片段,用hive.fetch.task.conversion.threshold做判断。
继续深入看hasOnlyPruningFilter是谁写入的,发现在PartitionPruner.onlyContainsPartnCols()里,有对当前SQL filter项的判断,继续打印日志,发现上面SQL对应的expression string是"[(null and (p_date = '20180501') and (p_hourmin = '19'))]", 相当于将non-partition field变为了null传入进来,但在逻辑上并没有对null做判断。
在方法中添加如下逻辑后(即:如果有null存在,则一定为non-partition field filter, 则onlyContainsPartnCols应该返回false),上述SQL会去执行MR job,问题解决。
if(StringUtils.isEmpty(expr.getExprString()) || StringUtils.equalsIgnoreCase(expr.getExprString(), "null")) {
return false;
}
P.S. Hive: 0.23