Monday, May 7, 2018

spark读取hive-site.xml无法识别里面spark相关参数问题

过如下语句启动spark-sql时,如果有spark相关参数在hive-site.xml中,并不会被load到spark environment里。spark只会从hive-site.xml中读取hive相关的参数(例如metastore信息等)。
/home/hadoop/software/spark/bin/spark-sql \
--master yarn \
--deploy-mode client \
--queue queue_1 \
--conf spark.rpc.message.maxSize=2047 \
--conf spark.yarn.dist.files="/path/to/hive-site.xml"
同理,即使将spark.yarn.dist.files行的配置换成了--files /path/to/hive-site.xml或者--properties-file /path/to/hive-site.xml也没有用(--properties-file的解释为"Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.conf.", 读取的文件内容不应该为xml格式)。
如果需要配置spark相关的信息,需要在SPARK_HOME/conf/spark-defaults.conf中配置。

Saturday, May 5, 2018

HiveOnSpark系列:metadata.HiveException: java.util.concurrent.TimeoutException


行一个数据量很大的SparkOnHive SQL(如下), 会报TimeoutException。



ERROR : Failed to monitor Job[0] with exception 'org.apache.hadoop.hive.ql.metadata.HiveException(java.util.concurrent.TimeoutException)'
org.apache.hadoop.hive.ql.metadata.HiveException: java.util.concurrent.TimeoutException
at org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus.getSparkJobInfo(RemoteSparkJobStatus.java:174) ~[hive-exec-2.3.2.jar:2.3.2]
at org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus.getState(RemoteSparkJobStatus.java:81) ~[hive-exec-2.3.2.jar:2.3.2]
at org.apache.hadoop.hive.ql.exec.spark.status.RemoteSparkJobMonitor.startMonitor(RemoteSparkJobMonitor.java:82) [hive-exec-2.3.2.jar:2.3.2]
at org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobRef.monitorJob(RemoteSparkJobRef.java:60) [hive-exec-2.3.2.jar:2.3.2]
at org.apache.hadoop.hive.ql.exec.spark.SparkTask.execute(SparkTask.java:116) [hive-exec-2.3.2.jar:2.3.2]
at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:199) [hive-exec-2.3.2.jar:2.3.2]
at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:100) [hive-exec-2.3.2.jar:2.3.2]
at org.apache.hadoop.hive.ql.exec.TaskRunner.run(TaskRunner.java:79) [hive-exec-2.3.2.jar:2.3.2]
Caused by: java.util.concurrent.TimeoutException
at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:56) ~[netty-all-4.0.52.Final.jar:4.0.52.Final]
at org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus.getSparkJobInfo(RemoteSparkJobStatus.java:171) ~[hive-exec-2.3.2.jar:2.3.2]
... 7 more
根据错误栈信息追踪源代码(入口: RemoteSparkJobStatus.java:174),发现有个设置超时选项,regarding "Timeout for requests from Hive client to remote Spark driver". 默认是60s,应该是因为application比较复杂,导致请求时间会久一些,设置为600s即可解决。
sparkClientTimtout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS);
SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
"60s", new TimeValidator(TimeUnit.SECONDS),
"Timeout for requests from Hive client to remote Spark driver.")

Thursday, May 3, 2018

Hive query(只select,不insert table/partition)时产生大量小文件问题定位思路

行一个简单的select+filter+limit语句,因为filter中带non-partition field, 所以会启动MR(相关参考: Hive带non-parttiion-filter的query自动转化为local FetchTask问题)
在hive终端,发现在MR job执行succeed之后(yarn观察也是succeed),会一直卡在INFO: OK日志后,好像拉取结果时卡住了。
通过jstack打印线程栈如下,卡在了从NameNode拉取DFS文件元信息。
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at org.apache.hadoop.ipc.Client.call(Client.java:1463)
- locked <0x00000003377bf118> (a org.apache.hadoop.ipc.Client$Call)
at org.apache.hadoop.ipc.Client.call(Client.java:1409)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
at com.sun.proxy.$Proxy29.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:256)
at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
at com.sun.proxy.$Proxy30.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1279)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1266)
at org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:1324)
at org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:237)
at org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:233)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:233)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:224)
at org.apache.hadoop.fs.FilterFileSystem.getFileBlockLocations(FilterFileSystem.java:148)
at org.apache.hadoop.fs.viewfs.ChRootedFileSystem.getFileBlockLocations(ChRootedFileSystem.java:211)
at org.apache.hadoop.fs.viewfs.ViewFileSystem.getFileBlockLocations(ViewFileSystem.java:330)
at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1776)
at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1759)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:270)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
at org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:45)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextSplits(FetchOperator.java:372)
at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:304)
at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:459)
at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:428)
at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:147)
at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:2213)
at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:253)
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:184)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:403)
at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:821)
at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:759)
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:686)
观察发现mr job一共启动了30k个mapper,0个reducer。相应的,会产生30k个小文件,NameNode压力会很大。
如此发现,即使开启了hive.merge.mapfileshive.merge.mapredfiles(相关参考:Hive的mr作业产生很多小文件或空文件的解决方案),对于这种select的query,小文件依旧很多。
解决思路有二: 
1. 在上述语句后加order by,字段随意。这样会产生一个reducer,不会去读所有的小文件元信息。 
2. 将所有这种select语句都转换为写temporary table,这样小文件会根据上述两个参数自动合并,再从这个table里select即可。

Hive带non-parttiion-filter的query自动转化为local FetchTask问题

下参数设置为minimal/more之后,对于普通的select(和其他一些cases),task会转化为local的FetchTask直接拉数,不会走mapreduce。
set hive.fetch.task.conversion=minimal; set hive.fetch.task.conversion.threshold=10000000;
但对于一个partition是p_date和p_hourmin的table,如下语句依旧转化成了一个FetchTask(bin/hive执行下面SQL后会发现在打本地log,而不是启动mr job):
select * from tbl where user_id=123 and p_date='20180501' and p_hourmin = '19' limit 10;
但上述语句的问题在于,这个partition对应的大小是2T,转换为FetchTask肯定是不合理的。
debug思路: 
1. 跟到源代码对应位置,在所有执行分支入口打log,复现问题发生时的代码走势
2. 在有问题的分支方法入口,将参数serialize到本地文件,构造问题复现环境
经过上述两步后,发现在SimpleFetchOptimizer里data.hasOnlyPruningFilter()返回的true,所以也没有走到后面的代码片段,用hive.fetch.task.conversion.threshold做判断。
继续深入看hasOnlyPruningFilter是谁写入的,发现在PartitionPruner.onlyContainsPartnCols()里,有对当前SQL filter项的判断,继续打印日志,发现上面SQL对应的expression string是"[(null and (p_date = '20180501') and (p_hourmin = '19'))]", 相当于将non-partition field变为了null传入进来,但在逻辑上并没有对null做判断。
在方法中添加如下逻辑后(即:如果有null存在,则一定为non-partition field filter, 则onlyContainsPartnCols应该返回false),上述SQL会去执行MR job,问题解决。
if(StringUtils.isEmpty(expr.getExprString()) || StringUtils.equalsIgnoreCase(expr.getExprString(), "null")) {
return false;
}
P.S. Hive: 0.23