There's a Hive table, which has 1.5 billion records and no partition, needs to be transferred to a MySQL (Aurora) table. According to Connecting to SQL Databases using JDBC. The code snippet is as follows:
import org.apache.spark.sql.SaveMode
import java.util.Properties
val jdbc_url = "jdbc:mysql://jdbc_url:3306/db_name?user=admin&password=xxxxxx"
val connectionProperties = new Properties()
spark.sql("select * from db.tbl")
.write
.mode(SaveMode.Overwrite)
.jdbc(jdbc_url, "tbl", connectionProperties)
But this is super slow, estimating from Spark Jobs monitoring webpage that it will cost 38 hours to finish.
After reading this post, which mentions a parameter,
rewriteBatchedStatements=true
, that need to be specified every time bulk insert need to be applied via JDBC. This parameter will convert multiple single insert into one bulk insert illustrated as follows:--FROM
INSERT INTO X VALUES (A1,B1,C1)
INSERT INTO X VALUES (A2,B2,C2)
...
INSERT INTO X VALUES (An,Bn,Cn)
-- TO
INSERT INTO X VALUES (A1,B1,C1),(A2,B2,C2),...,(An,Bn,Cn)
Moreover, since there's no partition in this Hive table, we need to use
repartition
to make the transferring procedure parallelized. So the code will be optimized to the following:import org.apache.spark.sql.SaveMode
import java.util.Properties
val jdbc_url = "jdbc:mysql://jdbc_url:3306/db_name?user=admin&password=xxxxxx&rewriteBatchedStatements=true"
val connectionProperties = new Properties()
spark.sql("select * from db.tbl")
.repartition(40)
.write
.mode(SaveMode.Overwrite)
.jdbc(jdbc_url, "tbl", connectionProperties)
And the Aurora instance's capacity is increased accordingly (16CPUs, 32GB). The time has been reduced from 38 hours to 6.6 hours (62920 rec/s) estimated.
Lastly, the way to check big table's count is not by
select count(*) from tbl
(REF), which will slow down the LOAD process. Instead, using SELECT table_rows FROM information_schema.tables WHERE table_name = 'tbl'
will be more efficient.
No comments:
Post a Comment