Tuesday, January 27, 2015

Deploy Tez Based On Hadoop-2.2.0

Tez is a computing engine parallel to MapReduce, whose target is to build an application framework which allows for a complex directed-acyclic-graph (DAG) of tasks for processing data. It is currently built atop Apache Hadoop YARN.

The most significant advantage of Tez against MapReduce is that Disk IO will be saved when there's multiple MR tasks which are to be executed in series in Hive. This in-memory computing mechanism is somewhat like Spark.

Now, the procedure of deploying Tez on Hadoop-2.2.0 is shown as below.

--CAVEAT--
1. The Official Deploy Instruction For Tez is absolutely suitable for all release versions of Tez, except the incubating version. Thus, the following deploy instruction is not exactly the same as the official one. (Supplementary may sound more appropriate)
2. In the official document, it says that we have to change hadoop.version to our currently-using version, which is not true after verifying. For instance, there will be ERRORs when execute `mvn clean package ...` provided we change hadoop.version from 2.6.0 to 2.2.0 forcibly in Tez-0.6.0. Consequently, we have to use tez-0.4.1-incubating whose default setting of hadoop.version is 2.2.0.

Ok, now let's get back on track!

Firstly, we have to install JDK6 or later, Maven 3 or later and Protocol Buffers (protoc compiler) 2.5 or later as prerequisite, whose procedure is omitted.

Retrieve tez-0.4.1-incubating from official website and decompress it:
wget http://archive.apache.org/dist/incubator/tez/tez-0.4.1-incubating/tez-0.4.1-incubating-src.tar.gz
tar xzf tez-0.4.1-incubating-src.tar.gz

Check hadoop.version, protobuf.version and 'hardcode' protoc.path as is shown below:
<properties>
    <maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile>
    <clover.license>${user.home}/clover.license</clover.license>
    <hadoop.version>2.2.0</hadoop.version>
    <jetty.version>7.6.10.v20130312</jetty.version>
    <distMgmtSnapshotsId>apache.snapshots.https</distMgmtSnapshotsId>
    <distMgmtSnapshotsName>Apache Development Snapshot Repository</distMgmtSnapshotsName>
    <distMgmtSnapshotsUrl>https://repository.apache.org/content/repositories/snapshots</distMgmtSnapshotsUrl>
    <distMgmtStagingId>apache.staging.https</distMgmtStagingId>
    <distMgmtStagingName>Apache Release Distribution Repository</distMgmtStagingName>
    <distMgmtStagingUrl>https://repository.apache.org/service/local/staging/deploy/maven2</distMgmtStagingUrl>
    <failIfNoTests>false</failIfNoTests>
    <protobuf.version>2.5.0</protobuf.version>
    <protoc.path>/usr/local/bin/protoc</protoc.path>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <scm.url>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-tez.git</scm.url>
  </properties>

Execute maven package command.
mvn clean package -DskipTests=true -Dmaven.javadoc.skip=true

After building, we could find all the compiled jar files in '$TEZ_HOME/tez-dist/target/tez-0.4.1-incubating-full/tez-0.4.1-incubating-full/', assuming which as environment variable $TEZ_JARS.

Find a HDFS path, in which $TEZ_JARS will be uploaded. In my case, '/user/supertool/zhudi/tez-dist' is applied.
hadoop fs -copyFromLocal $TEZ_JARS /user/supertool/zhudi/tez-dist

Create a tez-site.xml in '$HADOOP_HOME/etc/hadoop', add the following content which refers to the HDFS path. Be sure that the HDFS path is in full-path format, that is to say, with 'hdfs://ns1' header.
 <configuration>
     <property>
         <name>tez.lib.uris</name>
        <value>hdfs://ns1/user/supertool/zhudi/tez-dist/tez-0.4.1-incubating-full,hdfs://ns1/user/supertool/zhudi/tez-dist/tez-0.4.1-incubating-full/lib</value>
     </property>
 </configuration>

Eventually, add the following content to ~/.bashrc and `source ~/.bashrc`.
 export TEZ_CONF_DIR=/home/workspace/tez-0.4.1-incubating-src
 export TEZ_JARS=/home/workspace/tez-0.4.1-incubating-src/tez-dist/target/tez-0.4.1-incubating-full/tez-0.4.1-incubating-full
 export HADOOP_CLASSPATH=${TEZ_CONF_DIR}:${TEZ_JARS}/*:${TEZ_JARS}/lib/*

We could run the tez-examples.jar, which is a MapReduce task, for testing:
hadoop jar /home/workspace/tez-0.4.1-incubating-src/tez-mapreduce-examples/target/tez-mapreduce-examples-0.4.1-incubating.jar orderedwordcount /user/supertool/zhudi/mrTest/input /user/supertool/zhudi/mrTest/output

For hive, simply add the following command before executing HQL.
set hive.execution.engine=tez;

If 'hive.input.format' need to be specified when applying MapReduce Computing Engine, which is default, remember to append the following command when switching to Tez:
set hive.input.format=com.XXX.RuntimeCombineHiveInputFormat;
set hive.tez.input.format=com.XXX.RuntimeCombineHiveInputFormat;

Likewise, if 'mapred.job.queue.name' need to be specified, replace it with 'tez.queue.name'.


One last thing: Only the gateway node, which is going to submit tasks using Tez, in Hadoop cluster needs to be deployed.


Possible ERROR #1:
When using custom UDF in hive/tez, there are times that the exactly same task failed whereas in other times, it succeeded. After looking through the detailed log retrieved by `yarn logs -applicationId <app_id>`, the following ERROR could be found:
java.lang.NoSuchMethodError: org.apache.commons.collections.CollectionUtils.isEmpty(Ljava/util/Collection;)Z
at com.XXX.inputformat.hive.SplitInfo.mergeSplitFiles(SplitInfo.java:86)
at com.XXX.inputformat.hive.RuntimeCombineHiveInputFormat.getSplits(RuntimeCombineHiveInputFormat.java:105)
at org.apache.tez.mapreduce.hadoop.MRHelpers.generateOldSplits(MRHelpers.java:263)
at org.apache.tez.mapreduce.hadoop.MRHelpers.generateInputSplitsToMem(MRHelpers.java:379)
at org.apache.tez.mapreduce.common.MRInputAMSplitGenerator.initialize(MRInputAMSplitGenerator.java:161)
at org.apache.tez.dag.app.dag.RootInputInitializerRunner$InputInitializerCallable$1.run(RootInputInitializerRunner.java:154)
at org.apache.tez.dag.app.dag.RootInputInitializerRunner$InputInitializerCallable$1.run(RootInputInitializerRunner.java:146)

Then I looked into $HADOOP_HOME/share/hadoop/common/lib/ and $HIVE_HOME/lib, finding that the version of commons-collections.jar is 3.2.1 and 3.1 respectively. Then I found out that there is no 'org.apache.commons.collections.CollectionUtils.isEmpty' method in version 3.1. It is obvious that the culprit is maven dependency confliction. Thus, I replaced the 3.1 with 3.2.1 and all things just worked out fine.


References:
1. Official Deploy Instruction For Tez
2. Deploy Tez on Hadoop 2.2.0 - CSDN


© 2014-2017 jason4zhu.blogspot.com All Rights Reserved 
If transfering, please annotate the origin: Jason4Zhu

Monday, January 26, 2015

Notes: Build, Deploy Spark 1.2.0 And Run Spark-Example In Intellij Idea

Build Reference:
building-spark-1-2-0 - Official Document

Run Spark On Yarn Reference:
running-spark-1-2-0-on-yarn - Official Document

Tutorial:
quick-start - Spark
Spark Programming Guide - Spark
Spark SQL Programming Guide - Spark SQL
Spark RDD API Examples - Zhen He

Environment:
Spark-1.2.0
Hadoop-2.2.0
Hive-0.13.0
Scala-2.10.4


Configure Intellij Idea for Spark Application (SBT version)
Before preparing for Spark environment in Intellij Idea, we should firstly make it possible to write scala in it. In fact, it is much easier than we thought since there is a scala plugin which is available at Plugins Manager: Go to preferences(⌘+,) --> plugins --> install JetBrain plugins --> scala.

After restarting Intellij Idea, we could simply create a SBT scala project by "File --> New Project --> scala --> SBT"


Then, mkdir src/main/java to the root of the project path. Create a new file named 'SimpleApp.scala', whose content is as follows:
/* SimpleApp.scala */

import _root_.org.apache.spark.SparkConf
import _root_.org.apache.spark.SparkContext
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "/user/hadoop/poem" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

Apparently, it is a simple Spark Application.

Then we should configure the project library and module so as to make it compile correctly. Open Project Structure(⌘+;). In Modules column, select currently-used Java JDK; whereas in Libraries column, add spark jar file which could be retrieved from online deployed Spark path: '$SPARK_HOME/assembly/target/scala-2.10/spark-assembly-1.2.0-hadoop2.2.0.jar'. At this very moment, a bug of Intellij Idea could appear when importing this .jar file with prompt 'IDEA cannot determine what kind of files the chosen items contain. Choose the appropriate categories from the list.'. Have a look at Possible Error #4 for detailed information.


At this time, no compile error should be found in SimpleApp.scala file.

Since our Spark environment is online, thus I implemented a script in order to upload my Spark application to online node and build it there.
#!/bin/bash

if [ "$#" -ne 1 ]; then
  echo "One argument (Spark_Project_Name) is needed." >&2
  exit 1
fi

rsync -r --delete --exclude=target /Users/jasonzhu/IdeaProjects/$1 hadoop@d80.hide.cn:/home/hadoop/zhudi
ssh hadoop@d80.hide.cn "source /etc/profile; cd /home/hadoop/zhudi/$1; ls -l; sbt package;"

After executing this script and ssh to the online machine, execute the following command to launch the SimpleApp:
spark-submit --class "SimpleApp" --master yarn-cluster --num-executors 3 simple-project_2.11-1.0.jar

Whose result could be seen by:
yarn logs -applicationId application_1421455790417_19073 | grep "Lines with a"


Configure Intellij Idea for Spark Application (non-SBT version)
Still, install scala plugin in Intellij Idea following the aforementioned procedure.

create a scala project by "File --> New Project --> scala --> non-SBT"

Import scala and Spark jar as above. Create a package, a SimpleApp.scala file in that package respectively, in which put the above code in it.

Next, we should specify the way this project assembles when building it. go to Project Structure --> Artifacts --> Add. Configure it as follows:


After configuring, Make Project(⌘+F9). Then we could find the jar file in the above output directory.

Copy the jar file to the online server, launch it via command:
spark-submit --class "com.miaozhen.etl.region.distribution.SimpleApp" --master yarn-cluster --num-executors 3 scala.test.jar



One More Step: Integrate Maven With Scala In IDEA
Firstly, we should create a simple maven project in IDEA. Then we add Scala Facet in Project Structure(⌘+;) as follows:

At this time, no Scala compiler is specified. Thus we have to import scala-compiler in Libraries. Select Add->Java, the select all the .jar files in $SCALA_HOME/lib directory, then click OK.


After which, we should have imported scala-compiler Library as a whole:

We could then specify Scala Compiler in Scala Facet interface:

Add the following dependency in pom.xml:
<dependencies>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.3.2</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <target>1.7</target>
                <source>1.7</source>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>

            <configuration>
                <appendAssemblyId>true</appendAssemblyId>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.1.6</version>
            <executions>
                <execution>
                    <id>compile-scala</id>
                    <phase>compile</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <id>test-compile-scala</id>
                    <phase>test-compile</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

In which, 'scala-maven-plugin' will compile and package *.scala files.

Create a Scala Class under src/main/java, and test it as below:
object Test {
  def main(args: Array[String]) {
    println("hello from scala.")
    System.out.println("hello from java.")
    println(org.apache.commons.lang3.StringUtils.isEmpty("test dependency in pom.xml"))
  }
}




Read HDFS Files With Custom InputFormat Applied

import com.miaozhen.dm.sdk.inputformat.mr.DMCombineFileInputFormat
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.spark.{SparkContext, SparkConf}

object EtlRegionDist {
  def main(args: Array[String]): Unit = {
    val hconf = new Configuration()
    val conf = new SparkConf().setAppName("ETL_Region_Distribution_[ZHUDI]")
    val sc = new SparkContext(conf)


    val hadoopFile = sc.newAPIHadoopFile(
                              "/hdfs_path/file-r-00013",
                              classOf[DMCombineFileInputFormat],
                              classOf[LongWritable],
                              classOf[Text],
                              hconf)
    val lineCounts = hadoopFile.map(line => (1L)).reduce((a, b) => (a+b))
    println("LineCounts=[%s]".format(lineCounts))
  }
}

There are two sets of method supplied by class SparkContext, namely, `hadoopFile` and `newAPIHadoopFile` respectively. The former is used for InputFormat from package 'org.apache.hadoop.mapred', whereas the latter one is for that from 'org.apache.hadoop.mapreduce'.



Integrate HDFS File with Spark SQL

Here's the main code:

import com.miaozhen.dm.sdk.inputformat.mr.DMCombineFileInputFormat
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, FileSystem}
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql._

object EtlRegionDist {

  def getFilePaths(hconf: Configuration, dir: String): List[Path] =  {
    val fs = FileSystem.get(hconf)
    val path = new Path(dir)

    var res = List[Path]()
    fs.listStatus(path).foreach(fss => {
      res :+= fss.getPath
    })

    return res
  }

  def main(args: Array[String]): Unit = {
    // -- configuration --
    val hconf = new Configuration()
    val conf = new SparkConf().setAppName("ETL_Region_Distribution_[ZHUDI]")
    val sc = new SparkContext(conf)

    /**
     * Spark SQL Context
     */
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    // -- Read multiple HDFS files into a merged HadoopFileRDD --
    var hadoopFileSeqs = Seq[org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)]]()
    getFilePaths(hconf, "/tong/data/output/dailyMerger/20140318").foreach(path => {
      val strPath = path.toString
      if(strPath contains "campaign")  {
        val hadoopFile = sc.newAPIHadoopFile(
          strPath,
          classOf[DMCombineFileInputFormat],
          classOf[LongWritable],
          classOf[Text],
          hconf)
        hadoopFileSeqs :+= hadoopFile
      }
    })
    val hadoopFiles = sc.union(hadoopFileSeqs)


 // -- Create SchemaRDD --
    val schema = StructType("uuid ip plt".split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    val rowRDD = hadoopFiles.map(line =>
          line._2.toString.split("\\^") flatMap {
            field => {
              var pair = field.split("=", 2)
              if(pair.length == 2)
                Some(pair(0) -> pair(1))
              else
                None
            }
          } toMap
        ).map(kvs => Row(kvs("uuid"), kvs("ip"), kvs("plt").trim))
    val schemaRDD = sqlContext.applySchema(rowRDD, schema)
    schemaRDD.registerTempTable("etllog")


    val ipToRegion = new IpToRegionFunction

    sqlContext.registerFunction("ipToRegion", (x:String) => ipToRegion.evaluate(x, "/tong/data/resource/dicmanager/IPlib-Region-000000000000000000000008-top100-top100-top100-merge-20141024182445"))
    var result = sqlContext.sql("SELECT ipToRegion(ip), count(distinct(uuid)) from etllog where plt='0' group by ipToRegion(ip)")
    result.collect().foreach(println)


  }
}

In which, a custom function is applied, as with 'UDF' in Hive. In my case, this custom function is transplanted from a pure java Class Served as UDF:

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.util.TreeMap;


public class IpToRegionFunction implements Serializable{

    private TreeMap<Pair, Integer> iplib = null;

    public Integer evaluate(String ip, String iplibFile) throws IOException {
        long sum = 0L;

        if (iplib == null) {
            iplib = new TreeMap<Pair, Integer>();
            readFile(iplibFile);
        }
        String[] parts = ip.split("\\.");
        if (parts.length == 4) {
            long a = Long.valueOf(parts[0]);
            long b = Long.valueOf(parts[1]);
            long c = Long.valueOf(parts[2]);
            long d = Long.valueOf(parts[3]);

            sum = a * 256l * 256l * 256l + b * 256l * 256l + c * 256l + d;
        }
        Pair pair = new Pair( sum , sum );

        return iplib.get(iplib.floorKey(pair)) ;
    }

    private void readFile(String filename) throws IOException {
        Configuration conf = new Configuration();
        Path path = new Path(filename);

        String line;
        BufferedReader in = null;
        try {
            FileSystem fs = path.getFileSystem(conf);
            in = new BufferedReader(new InputStreamReader(fs.open(path)));
            int columns = 0;
            while ((line = in.readLine()) != null) {
                String[] arr = line.split(",");
                if (columns == 0) {
                    columns = arr.length;
                }
                if (columns == 4 && arr[0].equals("0")) {
                    iplib.put(new Pair(Long.valueOf(arr[1]), Long.valueOf(arr[2])), Integer.parseInt(arr[3]));
                } else if (columns == 3 && arr.length == 3) {
                    iplib.put(new Pair(Long.valueOf(arr[0]), Long.valueOf(arr[1])), Integer.parseInt(arr[2]));
                }

            }
        } finally {
            IOUtils.closeQuietly(in);
        }
    }

    private static class Pair implements Comparable{
        final long start;
        final long end;

        Pair(long start, long end) {
            this.start = start;
            this.end = end;
        }

        private boolean isBetween(long ip) {
            if (ip >= start && ip <= end)
                return true;
            return false;
        }
        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;

            Pair pair = (Pair) o;

            if (end != pair.end) return false;
            if (start != pair.start) return false;

            return true;
        }
        public int compareTo(Object other) {
            long other_start = ((Pair)other).start;
            if( start != other_start )
                return (start < other_start) ? -1 : 1;
            else
                return 0;
        }
        @Override
        public int hashCode() {
            int result = (int) (start ^ (start >>> 32));
            result = 31 * result + (int) (end ^ (end >>> 32));
            return result;
        }
    }
}

We don't essentially have to care about the detailed logic in this Java Class. Only one thing need to be aware of: This class must implement Serializable, or it will throw an Exception like "Exception in thread "Driver" org.apache.spark.SparkException: Task not serializable, ..., Caused by: java.io.NotSerializableException: com.miaozhen.etl.region.distribution.IpToRegionFunction".

After build it in IDEA, upload it to server and execute with command:
spark-submit --class "com.hide.region.distribution.EtlRegionDist" --master yarn-cluster --num-executors 32 --driver-memory 8g --executor-memory 4g --executor-cores 4 scala.test.jar


Apply Custom Jar (Lib) To spark-shell

Add your custom jar file path to SPARK_CLASSPATH environment variable will do the trick.
export SPARK_CLASSPATH=/home/workspace/spark-1.2.0-bin-hadoop2/lib_managed/jars/com.hide.dm.sdk-1.0.1-SNAPSHOT.jar

Another way to do so is to use `spark-shell/spark-submit --jars <custom_jars>`.

For both ways, remember to put .jar file both at the gateway for Spark as well as all the nodes in Hadoop cluster, in the same directory.


Execute HQL in Spark SQL

Since a great number of our production tasks are realized via HQL (HiveQL), a transplantation from HQL to Spark SQL is very costly. Fortunately, Spark SQL provides an interface called HiveContext, which supports reading and writing data stored in Hive. However, since Hive has a large number of dependencies, it is not included in the default Spark assembly. To equip Spark with Hive, we have to add -Phive and -Phive-thriftserver flags when building Spark according to official document. But in my case, the following error would be thrown when building Spark provided '-Phive-thriftserver' flag is added.
[ERROR] Failed to execute goal on project spark-assembly_2.10: Could not resolve dependencies
for project org.apache.spark:spark-assembly_2.10:pom:1.2.0: The following artifacts could not
be resolved: org.apache.spark:spark-repl_2.11:jar:1.2.0, org.apache.spark:spark-yarn_2.11:jar:1.2.0,
org.apache.spark:spark-hive-thriftserver_2.11:jar:1.2.0: Failure to find org.apache.spark:spark-repl_2.11:jar:1.2.0
in https://repo1.maven.org/maven2 was cached in the local repository, resolution will not be reattempted
until the update interval of central has elapsed or updates are forced -> [Help 1]

Thus my building command is as follows:
mvn -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Dscala-2.11 -Phive -DskipTests clean package

The above ERROR could be resolved simply by invoking the following command, since it is stated in Official Document, scala2.11 for spark1.2.0 is still at experimental stage, thus I change my scala to 2.10.4 and it works fine:
mvn -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Phive -Phive-thriftserver -DskipTests clean package

When importing this new 'spark-assembly-1.2.0-hadoop2.2.0.jar' in Intellij Idea, we could use HiveContext to implement our Hello-HQL-from-SparkSQL project.
package com.miaozhen.etl.region.distribution

import com.miaozhen.dm.sdk.inputformat.mr.DMCombineFileInputFormat
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, FileSystem}
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql._

object HiveTest {

  def main(args: Array[String]): Unit = {
    // -- configuration --
    val hconf = new Configuration()
    val conf = new SparkConf().setAppName("HiveTest_[ZHUDI]")
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val hqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

    hqlContext.sql("show databases").collect().foreach(println)
  }
}

Upon submitting this Spark task, several obstacles have been encountered.

ERROR #1:
Exception in thread "Driver" java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
...
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
...
Caused by: java.lang.reflect.InvocationTargetException
...
Caused by: javax.jdo.JDOFatalUserException: Class org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found.
...
Caused by: java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory
...

Obviously, some jar files are not referenced correctly. After googling, submit the task via the following command will solve the above issue:
spark-submit --class "com.miaozhen.etl.region.distribution.HiveTest" \
--jars /home/workspace/spark-1.2.0-bin-hadoop2/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar,/home/workspace/spark-1.2.0-bin-hadoop2/lib_managed/jars/datanucleus-rdbms-3.2.9.jar,/home/workspace/spark-1.2.0-bin-hadoop2/lib_managed/jars/datanucleus-core-3.2.10.jar,/home/workspace/hadoop/sqoop/lib/mysql-connector-java-5.1.31.jar \
--master yarn-cluster \
--num-executors 16 \
--driver-memory 8g \
--executor-memory 4g \
--executor-cores 4 \
scala.test.jar

ERROR #2:
ERROR metastore.RetryingHMSHandler: NoSuchObjectException(message:There is no database named logbase)
...
15/02/09 10:08:50 ERROR exec.DDLTask: org.apache.hadoop.hive.ql.metadata.HiveException: Database does not exist: logbase
...
Exception in thread "Driver" org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Database does not exist: logbase
...

This error has two possibilities, one is that the HQL is incorrect, start a hive cli, copy-paste the HQL in it in order to double-check the correctness of the HQL. The other possibility is likely that the 'hive-site.xml' is not loaded by Spark engine. If you are using remote metastore which requires to configure 'thrift' in 'hive-site.xml', check whether there is any info related to 'thrift' in the yarn log:
> grep -i thrift ttt17.txt --color
ttt17.txt:15/02/09 15:42:33 INFO hive.metastore: Trying to connect to metastore with URI thrift://192.168.7.11:9083

If nothing is related to thrift, then it is sure that the second case is the culprit. According to the solution of a blog, we should put hive-site.xml to '$HADOOP_HOME/etc/hadoop' both at the gateway which deploys the spark environment, and at all the nodes in Hadoop cluster. In this way, problem solved!

P.S. Sometimes, we could launch a `spark-shell` with the same arguments as `spark-submit` to debugging our code and retrieving more detailed and targeted information upon the ERROR.

Some other references are listed here:
1. http://www.it165.net/pro/html/201501/31478.html

ERROR #3:
When using Hive Syntax `add jar`, FileNotFoundException is thrown. According to this post, we have to use `spark-shell/spark-submit --jars <custom_jars>` to specify our own jar files, in which, custom_jars is a list of path for your .jar file delimited by comma. Moreover, remember to put .jar file to all the machines including the gateway for submitting Spark task as well as all the nodes in Hadoop cluster. The example is illustrated as above in ERROR #1.



Possible Error 1)
Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

Solution:
1. http://solaimurugan.blogspot.tw/2014/10/error-and-solaution-detailed-step-by.html
2. https://groups.google.com/forum/#!topic/predictionio-user/Bq0HBCM1ytI
3. https://spark.apache.org/docs/1.1.1/spark-standalone.html


Possible Error 2)
When launching a scala project by spark-submit, the following error is thrown:
$ spark-submit --class "com.miaozhen.etl.region.distribution.ListFilesInDir"  scala.test.jar

...
Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
 at com.miaozhen.etl.region.distribution.ListFilesInDir$.getf(ListFilesInDir.scala:34)
 at com.miaozhen.etl.region.distribution.ListFilesInDir$.main(ListFilesInDir.scala:50)
 at com.miaozhen.etl.region.distribution.ListFilesInDir.main(ListFilesInDir.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:601)
 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Solution:
It is because that Spark-1.2.0 uses scala-2.10 as default, whereas our scala project is compiled in scala-2.11.

If we intending to compile Spark with scala-2.11, some explicit operations need to be done at first which is well-stated in official document:
dev/change-version-to-2.11.sh
mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package


Possible Error 3)
When running Spark task in yarn-cluster mode like:
spark-submit --class "com.hide.region.distribution.EtlRegionDist" --master yarn-cluster --num-executors 3 scala.test.jar

The most commonly-seen error would be:
Exception in thread "main" org.apache.spark.SparkException: Application finished with failed status
 at org.apache.spark.deploy.yarn.ClientBase$class.run(ClientBase.scala:504)
 at org.apache.spark.deploy.yarn.Client.run(Client.scala:35)
 at org.apache.spark.deploy.yarn.Client$.main(Client.scala:139)
 at org.apache.spark.deploy.yarn.Client.main(Client.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:601)
 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

When checking the detailed error info via `yarn logs -applicationId <app_id> > log.out`, we could not find 'ERROR' or 'FATAL' in the output file. This is because the exception is printed in INFO level occasionally. Thus, we have to search for keyword 'Exception' (or ignore case: 'exception\c') to pinpoint the real culprit.


Possible Error 4)
When importing the 'spark-assembly-1.2.0-hadoop2.2.0.jar' file in Project Structure(⌘+;), Intellij Idea prompts that 'IDEA cannot determine what kind of files the chosen items contain. Choose the appropriate categories from the list.'. Though a prompt it is, after you choose 'classes' and apply it to current project, there will be so many grammar errors in your code, however, it will succeed if you build your project(⌘+F9).

A distinct characteristic between a normal .jar file and this 'spark-assembly-1.2.0-hadoop2.2.0.jar' file is that the latter one's content cannot be extracted in the selection dialog:

I put this phenomenon on stackoverflow, and as of now, no answer has been provided. Anyway, a cumbersome solution has been found by myself: Open the jar file with Archive Expert in Mac OSX, select 'Extract All' to a folder, then go into this folder in Finder, select all content, right click and select 'Compress .. Items', this will generate a .zip file, rename it to .jar, say, 'spark-assembly-1.2.0-hadoop2.2.0_regen.jar'. This new jar file can be recognized by Intellij Idea, thus no grammar error will be found. However, when building the project, 'Package XXX does not exist' will be encountered. Consequently, we have to import this jar file together with the original one: 'spark-assembly-1.2.0-hadoop2.2.0.jar'. In this time, both grammar and building will not shoot any error anymore.



Possible Error 5)
When writing spark program in Intellij Idea, methods like "reduceByKey", "join", which all seems to be in PairRDDFunctions, will complains grammar error, and no smart hint is available:

This is due to the improper import of class SparkContext. What we need to do is to append `import org.apache.spark.SparkContext._` to the import zone.








Wednesday, January 21, 2015

Commonly-Used Commands For Hadoop


YARN Service (ResourceManager + NodeManager):
$HADOOP_HOME/sbin/stop-yarn.sh
$HADOOP_HOME/sbin/start-yarn.sh

HDFS Service (NameNode + DataNode):
$HADOOP_HOME/sbin/stop-dfs.sh
$HADOOP_HOME/sbin/start-dfs.sh

Balancer Service (Do Balance):
$HADOOP_HOME/sbin/stop-balancer.sh
$HADOOP_HOME/sbin/start-balancer.sh

Start DataNode, NameNode, NodeManager, ResourceManager Service respectively:
$HADOOP_HOME/sbin/hadoop-daemon.sh start datanode
$HADOOP_HOME/sbin/hadoop-daemon.sh start namenode
$HADOOP_HOME/sbin/yarn-daemon.sh start nodemanager
$HADOOP_HOME/sbin/yarn-daemon.sh start resourcemanager

Start zkfc(DFSZKFailoverController) Service:
./sbin/hadoop-daemon.sh start zkfc
./bin/hdfs zkfc   (which will show detailed launching information for debugging)

ZooKeeper(QuorumPeerMain) Service:
$ZK_HOME/bin/zkServer.sh stop
$ZK_HOME/bin/zkServer.sh start
$ZK_HOME/bin/zkServer.sh restart
$ZK_HOME/bin/zkServer.sh status

Start JournalNode Service:
$HADOOP_HOME/sbin/hadoop-daemon.sh start journalnode

NameNode-HA-Related Operation:
Check NameNode Status(active/standby):    hdfs haadmin -getServiceState <serviceId>
Transfer from standby to active manually:  hdfs haadmin -transitionToActive <serviceId>

List Current Undone MapReduce Task:
mapred job -list

Kill MapReduce Task:
mapred job -kill <jobid>

List Current Undone YARN Task:
yarn application -list

Kill YARN Task:
yarn application -kill <YARN_Application_id>



© 2014-2017 jason4zhu.blogspot.com All Rights Reserved 
If transfering, please annotate the origin: Jason4Zhu

Note On NameNode HA

The overall procedure is well-explained in the references listed at the bottom. Here's just some essential points that I want to emphasize.

Here's the configurations for our Hadoop cluster, which is all related with NameNode HA:
core-site.xml:
<property>
    <name>fs.defaultFS</name>
    <value>hdfs://ns1</value>
</property>

<property>
    <name>ha.zookeeper.quorum</name>
    <value>644v4.mzhen.cn:2181,644v5.mzhen.cn:2181,644v6.mzhen.cn:2181</value>
</property>

hdfs-site.xml:
<property>
    <name>dfs.nameservices</name>
    <value>ns1</value>
</property>

<property>
     <name>dfs.ha.namenodes.ns1</name>
     <value>nn1,nn2</value>
</property>

<property>
     <name>dfs.namenode.rpc-address.ns1.nn1</name>
     <value>644v1.mzhen.cn:9000</value>
</property>

<property>
     <name>dfs.namenode.rpc-address.ns1.nn2</name>
     <value>644v2.mzhen.cn:9000</value>
</property>

<property>
     <name>dfs.namenode.http-address.ns1.nn1</name>
     <value>644v1.mzhen.cn:10001</value>
</property>

<property>
     <name>dfs.namenode.http-address.ns1.nn2</name>
     <value>644v2.mzhen.cn:10001</value>
</property>

<property>
     <name>dfs.namenode.shared.edits.dir</name>
     <value>qjournal://644v4.mzhen.cn:8485;644v5.mzhen.cn:8485;644v6.mzhen.cn:8485/ns1</value>
</property>

<property>
     <name>dfs.journalnode.edits.dir</name>
     <value>/home/data/hdfsdir/journal</value>
</property>

<property>
     <name>dfs.client.failover.proxy.provider.ns1</name>
     <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

<property>
     <name>dfs.ha.fencing.methods</name>
     <value>sshfence</value>
</property>

<property>
     <name>dfs.ha.fencing.ssh.private-key-files</name>
     <value>/home/supertool/.ssh/id_rsa</value>
</property>

<property>
     <name>dfs.ha.automatic-failover.enabled</name>
     <value>true</value>
</property>


After Hadoop cluster is fully started, there are some checkpoints that should be verified to make sure NameNode HA is fully applied:

1. Nodes corresponding to "dfs.ha.namenodes.ns1" argument in hdfs-site.xml should have processes named "DFSZKFailoverController", "NameNode".
2. Nodes corresponding to "dfs.namenode.shared.edits.dir" argument in hdfs-site.xml should have process named "JournalNode".
3. Nodes corresponding to "ha.zookeeper.quorum" argument in core-site.xml should have process named "QuorumPeerMain".

The way to launch all the processes above, if needed to do so respectively, is listed as below:

QuorumPeerMain: The service for ZooKeeper.
        bin/zkServer.sh start
        bin/zkServer.sh status
        bin/zkServer.sh stop
        bin/zkServer.sh restart

JournalNode: In order for the Standby node to keep its state synchronized with the Active node, both nodes communicate with a group of separate daemons called "JournalNodes" (JNs)
        ./sbin/hadoop-daemon.sh stop journalnode
        ./sbin/hadoop-daemon.sh start journalnode

NameNode:
        ./sbin/hadoop-daemon.sh stop namenode
        ./sbin/hadoop-daemon.sh start namenode

DFSZKFailoverController:
        ./sbin/hadoop-daemon.sh stop zkfc
        ./sbin/hadoop-daemon.sh start zkfc
        Attention that if the above command fails to start with no explicit errors, you could try executing command `./bin/hdfs zkfc` so as to retrieve detailed information.

Lastly, Some common commands relevant with NameNode HA is listed here:
## Get the status of NameNode, active or standby.
hdfs haadmin -getServiceState nn1

## Transfer a NameNode to active manually, which requires 'dfs.ha.automatic-failover.enabled' be set to 'false'.
hdfs haadmin -transitionToActive nn1



Reference:
1. High Availability for Hadoop - Hortonworks
2. HDFS High Availability Using the Quorum Journal Manager - Apache Hadoop



Monday, January 19, 2015

One Simple Example To Illustrate The Difference Between $@ and $* In Shell

The following code is well self-explained:
bash test.sh 123 "456 7" 8

#!/bin/bash

echo "--\"\$*\"--"
for args in "$*"
do
    echo "#"$args"#"
done

echo "--\$*--"
for args in $*
do
    echo "#"$args"#"
done

echo "--\"\$@\"--"
for args in "$@"
do
    echo "#"$args"#"
done

echo "--\$@--"
for args in $@
do
    echo "#"$args"#"
done

Output:
--"$*"--
#123 456 7 8#
--$*--
#123#
#456#
#7#
#8#
--"$@"--
#123#
#456 7#
#8#
--$@--
#123#
#456#
#7#
#8#


Reference:
1. Difference between $@ and $* - CSDN



© 2014-2017 jason4zhu.blogspot.com All Rights Reserved 
If transfering, please annotate the origin: Jason4Zhu

Tuesday, January 13, 2015

Deal With Error "A semaphore set has to be created but the system limit for the maximum number of semaphore sets has been exceeded"

There are some time that a specific node in our Hadoop cluster will be stuck at ssh connection procedure, whereas ping command is traversable. When this node recovers by itself, I look through the system messages in '/var/log/messages', where all of the content is:
A semaphore set has to be created but the system limit for the maximum number of semaphore sets has been exceeded

There are some commands that we could apply to check on semaphore status. You can run ipcs -s to list all of the semaphores currently allocated on your system and then use ipcrm -s <id> to remove a semaphore (if you're reasonably sure it's no longer needed). You might also want to track down the program that created them (using information from ipcs -s -i <id>) to make sure it's not leaking semaphores.

Alternatively, you could check the semaphore setting in sysctl(8) via issuing command `sysctl -a | grep kernel.sem`, it is equivalent to invoke command `ipcs -ls`, which would show descriptions to all values:
$ /sbin/sysctl -a | grep kernel.sem
kernel.sem = 250 32000 32 128

$ ipcs -ls

------ Semaphore Limits --------
max number of arrays = 128
max semaphores per array = 250
max semaphores system wide = 32000
max ops per semop call = 32
semaphore max value = 32767

The preceding error is related with the 'max semaphores system wide', thus we could increase this value by increasing either 'max number of arrays' or 'max semaphores per array'. However, there's something should be noticed, that when revising, we should always validate the equation: (max number of arrays)*(max semaphores per array)=(max semaphores system wide).

In my case, I changed the 'max number of arrays' to 1280, and 'max semaphores system wide' to 320000 correspondingly by appending "kernel.sem = 250 320000 32 1280" to '/etc/sysctl.conf' and execute `/sbin/sysctl -p`.



Reference:
1. Answer from ServerFault
2. Power Panel process fails to start - Parallels
3. 8.5. Setting Semaphore Parameters - RedHat
4. Setting the Kernel Parameters - SYBASE



© 2014-2017 jason4zhu.blogspot.com All Rights Reserved 
If transfering, please annotate the origin: Jason4Zhu

Essential Keypoint Of Hive Metastore And Notes On Configuring Remote Metastore

MARK.

It is well-explained in the following references.

Reference:
1. Hive: AdminManual+MetastoreAdmin
2. Configuring the Hive Metastore - Cloudera
3. Starting the Metastore - Cloudera
4. Hive Ports - Hortonworks



Sunday, January 11, 2015

Way To Set Up Replication In MySQL

Rationale

When it comes to talking about replication in MySQL, there should be one master node (in the sense that one slave could have only one master, although you may set multiple master nodes globally) and multiple slaves nodes.

Master node will write replication events to a special log called binary log, which will be read by slave nodes via IO thread and stores in a file called relay log. Eventually, SQL thread in slave nodes will reads events from relay log and then applies them to current slave MySQL server. As shown below:


As we can see, though multiple nodes in MySQL cluster there are, it is still single point of failure (SPOF) given its semantics that "when master node breaks down, the service supplied by MySQL is down", which is not virtually the same as a decentralized distributed system.

Configuration Process

Find the valid my.ini or my.cnf used by MySQL server:
[mysql@644v3 mysql]$ $MYSQL_HOME/libexec/mysqld --verbose --help | grep -B 1 -E "my.(cnf|ini)" --color
Default options are read from the following files in the given order:
/etc/my.cnf ~/.my.cnf /usr/local/mysql/etc/my.cnf 

Edit one of the configuration files listed above (in my case, "/usr/local/mysql/etc/my.cnf" is used) on master node. The following configuration enables binary logging using a log file name prefix of mysql-bin, and configure a server ID of 1:
[mysqld]
log-bin=mysql-bin
server-id=1

Likewise, append the following configuration to my.cnf on slave nodes, in which, server-id should be unique in each slave node:
[mysqld]
server-id=2

read_only=1

After setting, we need to restart master and slave node respectively.
shell> $MYSQL_HOME/share/mysql/mysql.server stop
shell> $MYSQL_HOME/share/mysql/mysql.server start

As we can see from the rationale section, only operations via SQL on master node from the time binary log is on will be synchronized to slave nodes, thus we have to sync existing MySQL data to slave nodes ourselves.

Before exporting current data, we have to make the whole process as a transaction for consistency and integrity. Therefore, the following command is executed on master node to disable statements like insert/update/delete/replace/alter.
mysql> FLUSH TABLES WITH READ LOCK;

Then we can dump current data out to local filesystem and record current binary log's name and position:
# Syntax of mysqldump
# shell> mysqldump [options] db_name [tbl_name ...]
# shell> mysqldump [options] --databases db_name ...
# shell> mysqldump [options] --all-databases

shell> $MYSQL_HOME/bin/mysqldump -u repl -p test > ./mysql_snapshot.sql


#show current binary log's name and position
mysql> show master status;
+------------------+----------+--------------+------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB |
+------------------+----------+--------------+------------------+
| mysql-bin.000001 |     2114 |              |                  |
+------------------+----------+--------------+------------------+
1 row in set (0.00 sec)

When the above oprations done, remember to release the read lock via `UNLOCK TABLES;`.

Scp the dump file "mysql_snapshot.sql" to slave nodes and restore it via command, in which, test is the name of database:
mysql -uroot -p test < mysql_snapshot.sql

Next, we are going to add a MySQL user on master node especially for slave node connection with the only replication privilege.
mysql> CREATE USER 'repl'@'%' IDENTIFIED BY 'slavepass';
mysql> GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';

There's something beside the main topic: When specifying '%', it stands for accepting login sessions from any host, EXCEPT from 'localhost'. Actually, 'localhost' is special in mysql, it means a connection over a unix socket (or named pipes on windows I believe) as opposed to a TCP/IP socket. using '%' as the host does not include 'localhost'. Thus, create a user named 'repl'@'localhost' if necessary.

Eventually, we have to specify the start position of synchronizing from master node, which is recorded above (in my case, the position is 2114 and filename is mysql-bin.000001), and start slave threads for replication:
mysql> CHANGE MASTER TO
        MASTER_HOST='master_host',
        MASTER_USER='repl',
        MASTER_PASSWORD='slavepass',
        MASTER_LOG_FILE='mysql-bin.000001',
        MASTER_LOG_POS=2114;

mysql> START SLAVE;

If we intend to see the slave's status, we could simply invoke `show slave status;` in MySQL prompt, which will show information like the progress of synchronization, etc.

We could verify the replication in MySQL by executing an insert statement in master node, and the newly-added record appears in slave node synchronously.


Reference
1. MySQL Official Documentation - Replication Configuration
2. MySQL Replication Configuration - CSDN
3. what-exactly-does-flush-tables-with-read-lock-do
4. creating-a-mysql-user-without-host-specified
5. MySQL Official Documentation - mysqldump



© 2014-2017 jason4zhu.blogspot.com All Rights Reserved 
If transfering, please annotate the origin: Jason4Zhu