Wednesday, October 28, 2015

Apache Spark Hello World on Windows

This explains the coding, building and  running a simple scala program on windows. This should be used only for testing purpose.

Pre-requiste : Apache Spark 1.5.x, Scala 2.11 and sbt installed on your windows machine.

Create a simple scala file, SimpleApp.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    System.setProperty("hadoop.home.dir", "C:\\winutil\\");
    val logFile = "C:/spark-1.5.1-bin-hadoop2.6/README.md" 
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

You need to set the System property specifying the path of hadoop home directory. This is required only on windows. You can download the winutils.exe file from the following link

http://public-repo-1.hortonworks.com/hdp-win-alpha/winutils.exe

Create a sbt file, SimpleApp.sbt

name := "Simple App project"
version := "1.0"
scalaVersion :=  "2.11.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1"

Your directory structure should be as follows

SimpleApp.sbt
src\main\scala\SimpleApp.scala

Building a deploying a package

sbt package
spark-submit --class "SimpleApp" --master local[4] target\scala-2.11\simple-app-project_2.11-1.0.jar



Thursday, October 22, 2015

Apache Spark : Loading CSV file as DataFrame

CSV data can be analyzed in Apache Spark, the best way to do a exploratory analysis of a CSV data is to use Spark Shell and convert CSV into DataFrame.

After that, SQL can be used to analyse the data.

I am using Apache Spark 1.5.0 on windows machine. The CSV file used is available at Red Wine Quality Data.

Extract only first 3 columns of from the CSV file and print it.





  val sqlContext= new org.apache.spark.sql.SQLContext(sc)
  import sqlContext.implicits._

  val wineCSV = sc.textFile("winequality-red.csv")  
  val header = wineCSV.first()
  val wineData = wineCSV.filter(_ != header)

  case class WineData(fixedAcidity: Float, volatileAcidity: Float, citiricAcid: Float)

  val wineCols = wineData.map(_.split(";")).map( p => WineData(p(0).toFloat,p(1).toFloat, p(2).toFloat)).toDF()

  wineCols.registerTempTable("wine")

  sqlContext.sql("SELECT * FROM  wine").collect().foreach(println)

Friday, October 9, 2015

Approach for creating high performance Kafka Consumer using Spring

I am sharing this post to share my experience in writing fast kafka consumer, using Spring Kafka

https://spring.io/blog/2015/06/19/spring-integration-kafka-1-2-is-available-with-0-8-2-support-and-performance-enhancements

As you know Cassandra writes are extremely fast but most of the time it is your program which is slow. Similarly you can consume messages from Kafka faster than you can write in to Cassandra, provided partitioning is done properly.

My consumer worked fine when there was only one topic, but performance dropped significantly when more topics were added.  I used the following approach to increase the throughput and reduce the latency. One can find more details about the issue over here.

http://stackoverflow.com/questions/28412482/slow-consumer-throuput-when-using-2-consumer-configuration

1. Create separate channel for each type of topic
2. Create dedicated inbound-channel-adapter for each channel
3. Create dedicated consumer group for each type of inbound-channel-adapter


I was able to consume 18000 messages/sec on a single 4 CPU, 4 GB VM and write that into Cassandra in the same thread. All the topic had 6 partitions and equal number of stream were created to consume it.

Kafka Setup:  3 brokers each running on different machine
Cassandra Setup: 3 node  each running on similar hardware. The keyspace had a replication factor of 2.

You can add more partition and and run more consumer in parallel on different machine to consume more. You will be limited by your number of nodes in Cassandra.


     <int:channel id="inputFromKafkaXXX">
    </int:channel>
    <int:channel id="inputFromKafkaYYY">
    </int:channel>
    <int:channel id="inputFromKafkaZZZ">
    </int:channel>


    <int-kafka:inbound-channel-adapter
            id="kafkaInboundChannelAdapterXXX" kafka-consumer-context-ref="consumerContextXXX"
            auto-startup="true" channel="inputFromKafkaXXX">
        <int:poller fixed-delay="100" time-unit="MILLISECONDS"
                    max-messages-per-poll="200" />
    </int-kafka:inbound-channel-adapter>
    <int-kafka:inbound-channel-adapter
            id="kafkaInboundChannelAdapterYYY" kafka-consumer-context-ref="consumerContextYYY"
            auto-startup="true" channel="inputFromKafkaYYY">
        <int:poller fixed-delay="100" time-unit="MILLISECONDS"
                    max-messages-per-poll="200" />
    </int-kafka:inbound-channel-adapter>
    <int-kafka:inbound-channel-adapter
            id="kafkaInboundChannelAdapterZZZ" kafka-consumer-context-ref="consumerContextZZZ"
            auto-startup="true" channel="inputFromKafkaZZZ">
        <int:poller fixed-delay="100" time-unit="MILLISECONDS"
                    max-messages-per-poll="200" />
    </int-kafka:inbound-channel-adapter>

    <bean id="consumerProperties"
          class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="auto.offset.reset">smallest</prop>
                <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M -->
                <prop key="fetch.message.max.bytes">5242880</prop>
                <prop key="auto.commit.interval.ms">1000</prop>
            </props>
        </property>
    </bean>
    <int-kafka:consumer-context id="consumerContextXXX"
                                consumer-timeout="10" zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties">
        <int-kafka:consumer-configurations>
            <int-kafka:consumer-configuration
                    group-id="cass_group_xxx" max-messages="200">
                <int-kafka:topic-filter pattern="VerType-xx.*" streams="6" exclude="false"/>
            </int-kafka:consumer-configuration>
        </int-kafka:consumer-configurations>
    </int-kafka:consumer-context>
    <int-kafka:consumer-context id="consumerContextYYY"
                                consumer-timeout="10" zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties">
        <int-kafka:consumer-configurations>
            <int-kafka:consumer-configuration
                    group-id="cass_group_yyy" max-messages="200">
                <int-kafka:topic-filter pattern="VerType-yyy.*" streams="6" exclude="false"/>
            </int-kafka:consumer-configuration>
        </int-kafka:consumer-configurations>
    </int-kafka:consumer-context>

    <int-kafka:consumer-context id="consumerContextZZZ"
                                consumer-timeout="10" zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties">
        <int-kafka:consumer-configurations>
            <int-kafka:consumer-configuration
                    group-id="cass_group_zzz" max-messages="200">
                <int-kafka:topic-filter pattern="VerType-zzz.*" streams="6" exclude="false"/>
            </int-kafka:consumer-configuration>
        </int-kafka:consumer-configurations>
    </int-kafka:consumer-context>

    <int-kafka:zookeeper-connect id="zookeeperConnect"
                                 zk-connect="zookepeer.ip"  zk-connection-timeout="10000"
                                 zk-session-timeout="10000" zk-sync-time="20000" />

 
</beans>

Thursday, September 10, 2015

Setting up SparkR on Windows

SparkR  (R on Spark). It is a functionality to access power of Apache Spark from R. 

One can use SparkR in some such scenarios:

  • Dataset does not fit into memory.
  • Need to use distributed machine learning algorithms from R.

Pre-requisite

Install R and R Studio
Install Apache Spark http://spark.apache.org/downloads.html
Set SPARK_HOME and PATH

Open R Studio and follow the following steps:

Install JavaR
library("JavaR") - In case of any error, just close the R Studio and open it again. It happened in my case
library(SparkR, lib.loc="C:\\spark-1.4.0-bin-hadoop2.6\\R\\lib")

Simple Test

You can either take the code from http://spark.apache.org/docs/latest/sparkr.html or use the following


sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
df <- createDataFrame(sqlContext, faithful)  // faithful is shipped with R
head(df)