Wednesday, November 29, 2017

Hive table based on AWS S3 Suffers from S3 Eventual Consistency Issue

After generating hive table based on AWS S3, there's sometime that it will suffer from eventual consistency problem from S3, with the following error complains when trying to alter current table rename to another one or select from this table:

org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table. Alter Table operation for db_name.tbl_name failed to move data due to: 'com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 1BAD9D1409E5AE00), S3 Extended Request ID: tPOcJz0iymDMQLq2Ucgzbxc2BG8A7xWARyg+E1cf27HLoTE/LwFiNKz/DcVzumtFytZo3ircOWI=' See hive log file for details.;], stacktrace=[org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table. Alter Table operation for db_name.tbl_name failed to move data due to: 'com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 1BAD9D1409E5AE00), S3 Extended Request ID: tPOcJz0iymxxxxxxxcgzbxc2BG8A7xWARyg+E1cf27HLoTE/LwFiNKz/DcVzumtFytZo3ircOWI=' See hive log file for details.;
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:98)
at org.apache.spark.sql.hive.HiveExternalCatalog.renameTable(HiveExternalCatalog.scala:460)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.renameTable(SessionCatalog.scala:495)
at org.apache.spark.sql.execution.command.AlterTableRenameCommand.run(tables.scala:159)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)

After some testing, there's a way to mitigate this phenomenon remarkably by constraining the partition number when persisting to the table.

Occasionally, I explicitly set spark.sql.shuffle.partitions=2000, which by default is 200, in order to solve imbalance of partition key issue. After this setting, it appears that more tables will suffer from this s3 eventual consistency issue from then on. Empirically, it could be caused by having too much small files (2000, in current case) writing to S3 simultaneously which will cause it more likely to suffering from long-time eventual consistency problem. so I try to coalesce DataFrame before writing to table as below and this time, it seems all good now.

val targetDf = session.sql(sql).coalesce(25)
targetDf
  .write.mode(SaveMode.Overwrite)
  .partitionBy(partitionKey)
  .saveAsTable(tableFullName)