Monday, November 21, 2016

LInking spark-streaming-kafka jar to Spark

When trying to run kafka_wordcount.py from Spark's example, it complains error as below:

./bin/spark-submit  examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 triest
OUTPUT:
...
________________________________________________________________________________________________

  Spark Streaming's Kafka libraries not found in class path. Try one of the following.

  1. Include the Kafka library and its dependencies with in the
     spark-submit command as

     $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8:2.0.1 ...

  2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
     Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-0-8-assembly, Version = 2.0.1.
     Then, include the jar in the spark-submit command as

     $ bin/spark-submit --jars <spark-streaming-kafka-0-8-assembly.jar> ...

________________________________________________________________________________________________


Traceback (most recent call last):
  File "/Users/user_tmp/general/spark-2.0.1-bin-hadoop2.7/examples/src/main/python/streaming/kafka_wordcount.py", line 48, in <module>
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})user_tmp
  File "/Users/user_tmp/general/spark-2.0.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 69, in createStream
  File "/Users/user_tmp/general/spark-2.0.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 195, in _get_helper
TypeError: 'JavaPackage' object is not callable

From which, we could extract information regarding kafka-0-8 and version=2.0.1, remember that for later use. Then we navigate to Spark Streaming Programming Guide - Linking Part, change the URL(http://spark.apache.org/docs/2.0.1/streaming-programming-guide.html#linking) to the corresponding Spark's version and click Maven Repository link in Linking Part so it will automatically search packages corresponding to your Spark's version. After that, finding ArtifactId in format 'spark-streaming-kafka-0-8-assembly_2.11' and version column equals to the previous version(=2.0.1), where 0-8 is retrieved from above and 2.11 is your Scala version. Log groupId, artifactId as well as version from this row.

Edit $SPARK_HOME/conf/spark-defaults.conf (origined from spark-defaults.conf.template file), append 'spark.jars.packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.0.1', which is in format 'spark.jars.packages groupId:artifactId:version'.

Eventually, run kafka_wordcount.py again, it should works normal this time.



3 comments:


  1. Really Good blog post.provided a helpful information.I hope that you will post more updates like this Big data hadoop online Course Hyderabad

    ReplyDelete
  2. Hello i still getting the same error how can i fix it

    ReplyDelete
  3. I like your post very much. It is very much useful for my research. I hope you to share more info about this. Keep posting Apache Spark Certification

    ReplyDelete